Skip to main content

nautilus_hyperliquid/websocket/
post.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    sync::{
19        Arc,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::Duration,
23};
24
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use nautilus_common::live::get_runtime;
28use tokio::{
29    sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
30    time,
31};
32
33use crate::{
34    common::consts::INFLIGHT_MAX,
35    http::{
36        error::{Error, Result},
37        models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
38    },
39    websocket::messages::{
40        ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
41        OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
42    },
43};
44
45#[derive(Debug)]
46struct Waiter {
47    tx: oneshot::Sender<PostResponse>,
48    // When this is dropped, the permit is released, shrinking inflight
49    _permit: OwnedSemaphorePermit,
50}
51
52#[derive(Debug)]
53pub struct PostRouter {
54    inner: Mutex<HashMap<u64, Waiter>>,
55    inflight: Arc<Semaphore>, // hard cap per HL docs (e.g., 100)
56}
57
58impl Default for PostRouter {
59    fn default() -> Self {
60        Self {
61            inner: Mutex::new(HashMap::new()),
62            inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
63        }
64    }
65}
66
67impl PostRouter {
68    pub fn new() -> Arc<Self> {
69        Arc::new(Self::default())
70    }
71
72    /// Registers interest in a post id, enforcing inflight cap.
73    pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
74        // Acquire and retain a permit per inflight call
75        let permit = self
76            .inflight
77            .clone()
78            .acquire_owned()
79            .await
80            .map_err(|_| Error::transport("post router semaphore closed"))?;
81
82        let (tx, rx) = oneshot::channel::<PostResponse>();
83        let mut map = self.inner.lock().await;
84        if map.contains_key(&id) {
85            return Err(Error::transport(format!("post id {id} already registered")));
86        }
87        map.insert(
88            id,
89            Waiter {
90                tx,
91                _permit: permit,
92            },
93        );
94        Ok(rx)
95    }
96
97    /// Completes a waiting caller when a response arrives (releases inflight via Waiter drop).
98    pub async fn complete(&self, resp: PostResponse) {
99        let id = resp.id;
100        let waiter = {
101            let mut map = self.inner.lock().await;
102            map.remove(&id)
103        };
104        if let Some(waiter) = waiter {
105            if waiter.tx.send(resp).is_err() {
106                log::warn!("Post waiter dropped before delivery: id={id}");
107            }
108            // waiter drops here → permit released
109        } else {
110            log::warn!("Post response with unknown id (late/duplicate?): id={id}");
111        }
112    }
113
114    /// Cancel a pending id (e.g., timeout); quietly succeed if id wasn't present.
115    pub async fn cancel(&self, id: u64) {
116        let _ = {
117            let mut map = self.inner.lock().await;
118            map.remove(&id)
119        };
120        // Waiter (and its permit) drop here if it existed
121    }
122
123    /// Await a response with timeout. On timeout or closed channel, cancels the id.
124    pub async fn await_with_timeout(
125        &self,
126        id: u64,
127        rx: oneshot::Receiver<PostResponse>,
128        timeout: Duration,
129    ) -> Result<PostResponse> {
130        match time::timeout(timeout, rx).await {
131            Ok(Ok(resp)) => Ok(resp),
132            Ok(Err(_closed)) => {
133                self.cancel(id).await;
134                Err(Error::transport("post response channel closed"))
135            }
136            Err(_elapsed) => {
137                self.cancel(id).await;
138                Err(Error::Timeout)
139            }
140        }
141    }
142}
143
144#[derive(Debug)]
145pub struct PostIds(AtomicU64);
146
147impl PostIds {
148    pub fn new(start: u64) -> Self {
149        Self(AtomicU64::new(start))
150    }
151    pub fn next(&self) -> u64 {
152        self.0.fetch_add(1, Ordering::Relaxed)
153    }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum PostLane {
158    Alo,    // Post-only orders
159    Normal, // IOC/GTC + info + anything else
160}
161
162#[derive(Debug)]
163pub struct ScheduledPost {
164    pub id: u64,
165    pub request: PostRequest,
166    pub lane: PostLane,
167}
168
169#[derive(Debug)]
170pub struct PostBatcher {
171    tx_alo: mpsc::Sender<ScheduledPost>,
172    tx_normal: mpsc::Sender<ScheduledPost>,
173}
174
175impl PostBatcher {
176    /// Spawns two lane tasks that batch-send scheduled posts via `send_fn`.
177    pub fn new<F>(send_fn: F) -> Self
178    where
179        F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
180    {
181        let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
182        let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
183
184        // ALO lane: batchy tick, low jitter
185        get_runtime().spawn(Self::run_lane(
186            "ALO",
187            rx_alo,
188            Duration::from_millis(100),
189            send_fn.clone(),
190        ));
191
192        // NORMAL lane: faster tick; adjust as needed
193        get_runtime().spawn(Self::run_lane(
194            "NORMAL",
195            rx_normal,
196            Duration::from_millis(50),
197            send_fn,
198        ));
199
200        Self { tx_alo, tx_normal }
201    }
202
203    async fn run_lane<F>(
204        lane_name: &'static str,
205        mut rx: mpsc::Receiver<ScheduledPost>,
206        tick: Duration,
207        mut send_fn: F,
208    ) where
209        F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
210    {
211        let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
212        let mut interval = time::interval(tick);
213        interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
214
215        loop {
216            tokio::select! {
217                maybe_item = rx.recv() => {
218                    match maybe_item {
219                        Some(item) => pend.push(item),
220                        None => break, // sender dropped → terminate lane task
221                    }
222                }
223                _ = interval.tick() => {
224                    if pend.is_empty() { continue; }
225                    let to_send = std::mem::take(&mut pend);
226                    for item in to_send {
227                        let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
228                        if let Err(e) = send_fn(req).await {
229                            log::error!("Failed to send post: lane={lane_name}, id={}, {e}", item.id);
230                        }
231                    }
232                }
233            }
234        }
235        log::info!("Post lane terminated: lane={lane_name}");
236    }
237
238    pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
239        match item.lane {
240            PostLane::Alo => self
241                .tx_alo
242                .send(item)
243                .await
244                .map_err(|_| Error::transport("ALO lane closed")),
245            PostLane::Normal => self
246                .tx_normal
247                .send(item)
248                .await
249                .map_err(|_| Error::transport("NORMAL lane closed")),
250        }
251    }
252}
253
254// Helpers to classify lane from an action
255pub fn lane_for_action(action: &ActionRequest) -> PostLane {
256    match action {
257        ActionRequest::Order { orders, .. } => {
258            if orders.is_empty() {
259                return PostLane::Normal;
260            }
261            let all_alo = orders.iter().all(|o| {
262                matches!(
263                    o.t,
264                    OrderTypeRequest::Limit {
265                        tif: TimeInForceRequest::Alo
266                    }
267                )
268            });
269            if all_alo {
270                PostLane::Alo
271            } else {
272                PostLane::Normal
273            }
274        }
275        _ => PostLane::Normal,
276    }
277}
278
279#[derive(Debug, Clone, Copy, Default)]
280pub enum Grouping {
281    #[default]
282    Na,
283    NormalTpsl,
284    PositionTpsl,
285}
286impl Grouping {
287    pub fn as_str(&self) -> &'static str {
288        match self {
289            Self::Na => "na",
290            Self::NormalTpsl => "normalTpsl",
291            Self::PositionTpsl => "positionTpsl",
292        }
293    }
294}
295
296/// Parameters for creating a limit order.
297#[derive(Debug, Clone, Builder)]
298pub struct LimitOrderParams {
299    pub asset: u32,
300    pub is_buy: bool,
301    pub px: String,
302    pub sz: String,
303    pub reduce_only: bool,
304    pub tif: TimeInForceRequest,
305    pub cloid: Option<String>,
306}
307
308/// Parameters for creating a trigger order.
309#[derive(Debug, Clone, Builder)]
310pub struct TriggerOrderParams {
311    pub asset: u32,
312    pub is_buy: bool,
313    pub px: String,
314    pub sz: String,
315    pub reduce_only: bool,
316    pub is_market: bool,
317    pub trigger_px: String,
318    pub tpsl: TpSlRequest,
319    pub cloid: Option<String>,
320}
321
322// ORDER builder (single or many)
323#[derive(Debug, Default)]
324pub struct OrderBuilder {
325    orders: Vec<OrderRequest>,
326    grouping: Grouping,
327}
328
329impl OrderBuilder {
330    pub fn new() -> Self {
331        Self::default()
332    }
333
334    #[must_use]
335    pub fn grouping(mut self, g: Grouping) -> Self {
336        self.grouping = g;
337        self
338    }
339
340    /// Create a limit order with individual parameters (legacy method)
341    #[allow(clippy::too_many_arguments)]
342    #[must_use]
343    pub fn push_limit(
344        self,
345        asset: u32,
346        is_buy: bool,
347        px: impl ToString,
348        sz: impl ToString,
349        reduce_only: bool,
350        tif: TimeInForceRequest,
351        cloid: Option<String>,
352    ) -> Self {
353        let params = LimitOrderParams {
354            asset,
355            is_buy,
356            px: px.to_string(),
357            sz: sz.to_string(),
358            reduce_only,
359            tif,
360            cloid,
361        };
362        self.push_limit_order(params)
363    }
364
365    /// Create a limit order using parameters struct
366    #[must_use]
367    pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
368        self.orders.push(OrderRequest {
369            a: params.asset,
370            b: params.is_buy,
371            p: params.px,
372            s: params.sz,
373            r: params.reduce_only,
374            t: OrderTypeRequest::Limit { tif: params.tif },
375            c: params.cloid,
376        });
377        self
378    }
379
380    /// Create a trigger order with individual parameters (legacy method)
381    #[allow(clippy::too_many_arguments)]
382    #[must_use]
383    pub fn push_trigger(
384        self,
385        asset: u32,
386        is_buy: bool,
387        px: impl ToString,
388        sz: impl ToString,
389        reduce_only: bool,
390        is_market: bool,
391        trigger_px: impl ToString,
392        tpsl: TpSlRequest,
393        cloid: Option<String>,
394    ) -> Self {
395        let params = TriggerOrderParams {
396            asset,
397            is_buy,
398            px: px.to_string(),
399            sz: sz.to_string(),
400            reduce_only,
401            is_market,
402            trigger_px: trigger_px.to_string(),
403            tpsl,
404            cloid,
405        };
406        self.push_trigger_order(params)
407    }
408
409    /// Create a trigger order using parameters struct
410    #[must_use]
411    pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
412        self.orders.push(OrderRequest {
413            a: params.asset,
414            b: params.is_buy,
415            p: params.px,
416            s: params.sz,
417            r: params.reduce_only,
418            t: OrderTypeRequest::Trigger {
419                is_market: params.is_market,
420                trigger_px: params.trigger_px,
421                tpsl: params.tpsl,
422            },
423            c: params.cloid,
424        });
425        self
426    }
427    pub fn build(self) -> ActionRequest {
428        ActionRequest::Order {
429            orders: self.orders,
430            grouping: self.grouping.as_str().to_string(),
431        }
432    }
433
434    /// Create a single limit order action directly (convenience method)
435    ///
436    /// # Example
437    /// ```ignore
438    /// let action = OrderBuilder::single_limit_order(
439    ///     LimitOrderParamsBuilder::default()
440    ///         .asset(0)
441    ///         .is_buy(true)
442    ///         .px("40000.0")
443    ///         .sz("0.01")
444    ///         .reduce_only(false)
445    ///         .tif(TimeInForceRequest::Gtc)
446    ///         .build()
447    ///         .unwrap()
448    /// );
449    /// ```
450    pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
451        Self::new().push_limit_order(params).build()
452    }
453
454    /// Create a single trigger order action directly (convenience method)
455    ///
456    /// # Example
457    /// ```ignore
458    /// let action = OrderBuilder::single_trigger_order(
459    ///     TriggerOrderParamsBuilder::default()
460    ///         .asset(0)
461    ///         .is_buy(false)
462    ///         .px("39000.0")
463    ///         .sz("0.01")
464    ///         .reduce_only(false)
465    ///         .is_market(true)
466    ///         .trigger_px("39500.0")
467    ///         .tpsl(TpSlRequest::Sl)
468    ///         .build()
469    ///         .unwrap()
470    /// );
471    /// ```
472    pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
473        Self::new().push_trigger_order(params).build()
474    }
475}
476
477pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
478    ActionRequest::Cancel {
479        cancels: cancels
480            .into_iter()
481            .map(|(a, o)| CancelRequest { a, o })
482            .collect(),
483    }
484}
485pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
486    ActionRequest::CancelByCloid {
487        cancels: vec![CancelByCloidRequest {
488            asset,
489            cloid: cloid.into(),
490        }],
491    }
492}
493pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
494    ActionRequest::Modify {
495        modifies: vec![ModifyRequest {
496            oid,
497            order: new_order,
498        }],
499    }
500}
501
502// Info wrappers (bodies go under PostRequest::Info{ payload })
503pub fn info_l2_book(coin: &str) -> PostRequest {
504    PostRequest::Info {
505        payload: serde_json::json!({"type":"l2Book","coin":coin}),
506    }
507}
508pub fn info_all_mids() -> PostRequest {
509    PostRequest::Info {
510        payload: serde_json::json!({"type":"allMids"}),
511    }
512}
513pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
514    PostRequest::Info {
515        payload: serde_json::json!({"type":"orderStatus","user":user,"oid":oid}),
516    }
517}
518pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
519    let mut body = serde_json::json!({"type":"openOrders","user":user});
520    if let Some(fe) = frontend {
521        body["frontend"] = serde_json::json!(fe);
522    }
523    PostRequest::Info { payload: body }
524}
525pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
526    let mut body = serde_json::json!({"type":"userFills","user":user});
527    if let Some(agg) = aggregate_by_time {
528        body["aggregateByTime"] = serde_json::json!(agg);
529    }
530    PostRequest::Info { payload: body }
531}
532pub fn info_user_rate_limit(user: &str) -> PostRequest {
533    PostRequest::Info {
534        payload: serde_json::json!({"type":"userRateLimit","user":user}),
535    }
536}
537pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
538    PostRequest::Info {
539        payload: serde_json::json!({"type":"candle","coin":coin,"interval":interval}),
540    }
541}
542
543pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
544    serde_json::from_value(payload.clone()).map_err(Error::Serde)
545}
546pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
547    serde_json::from_value(payload.clone()).map_err(Error::Serde)
548}
549pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
550    serde_json::from_value(payload.clone()).map_err(Error::Serde)
551}
552
553/// Heuristic classification for action responses.
554#[derive(Debug)]
555pub enum ActionOutcome<'a> {
556    Resting {
557        oid: u64,
558    },
559    Filled {
560        total_sz: &'a str,
561        avg_px: &'a str,
562        oid: Option<u64>,
563    },
564    Error {
565        msg: &'a str,
566    },
567    Unknown(&'a serde_json::Value),
568}
569pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
570    if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
571        if let (Some(total_sz), Some(avg_px)) = (
572            payload.get("totalSz").and_then(|v| v.as_str()),
573            payload.get("avgPx").and_then(|v| v.as_str()),
574        ) {
575            return ActionOutcome::Filled {
576                total_sz,
577                avg_px,
578                oid: Some(oid),
579            };
580        }
581        return ActionOutcome::Resting { oid };
582    }
583    if let (Some(total_sz), Some(avg_px)) = (
584        payload.get("totalSz").and_then(|v| v.as_str()),
585        payload.get("avgPx").and_then(|v| v.as_str()),
586    ) {
587        return ActionOutcome::Filled {
588            total_sz,
589            avg_px,
590            oid: None,
591        };
592    }
593    if let Some(msg) = payload
594        .get("error")
595        .and_then(|v| v.as_str())
596        .or_else(|| payload.get("message").and_then(|v| v.as_str()))
597    {
598        return ActionOutcome::Error { msg };
599    }
600    ActionOutcome::Unknown(payload)
601}
602
603#[derive(Clone, Debug)]
604pub struct WsSender {
605    inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
606}
607
608impl WsSender {
609    pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
610        Self {
611            inner: Arc::new(tokio::sync::Mutex::new(tx)),
612        }
613    }
614
615    pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
616        let sender = self.inner.lock().await;
617        sender
618            .send(req)
619            .await
620            .map_err(|_| Error::transport("WebSocket sender closed"))
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use nautilus_common::testing::wait_until_async;
627    use rstest::rstest;
628    use tokio::{
629        sync::oneshot,
630        time::{Duration, timeout},
631    };
632
633    use super::*;
634    use crate::{
635        common::consts::INFLIGHT_MAX,
636        websocket::messages::{
637            ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
638            OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
639        },
640    };
641
642    // --- helpers -------------------------------------------------------------------------------
643
644    fn mk_limit_alo(asset: u32) -> OrderRequest {
645        OrderRequest {
646            a: asset,
647            b: true,
648            p: "1".to_string(),
649            s: "1".to_string(),
650            r: false,
651            t: OrderTypeRequest::Limit {
652                tif: TimeInForceRequest::Alo,
653            },
654            c: None,
655        }
656    }
657
658    fn mk_limit_gtc(asset: u32) -> OrderRequest {
659        OrderRequest {
660            a: asset,
661            b: true,
662            p: "1".to_string(),
663            s: "1".to_string(),
664            r: false,
665            t: OrderTypeRequest::Limit {
666                // any non-ALO TIF keeps it in the Normal lane
667                tif: TimeInForceRequest::Gtc,
668            },
669            c: None,
670        }
671    }
672
673    // --- PostRouter ---------------------------------------------------------------------------
674
675    #[rstest]
676    #[tokio::test(flavor = "multi_thread")]
677    async fn register_duplicate_id_errors() {
678        let router = PostRouter::new();
679        let _rx = router.register(42).await.expect("first register OK");
680
681        let err = router.register(42).await.expect_err("duplicate must error");
682        let msg = err.to_string().to_lowercase();
683        assert!(
684            msg.contains("already") || msg.contains("duplicate"),
685            "unexpected error: {msg}"
686        );
687    }
688
689    #[rstest]
690    #[tokio::test(flavor = "multi_thread")]
691    async fn timeout_cancels_and_allows_reregister() {
692        let router = PostRouter::new();
693        let id = 7;
694
695        let rx = router.register(id).await.unwrap();
696        // No complete() → ensure we time out and the waiter is removed.
697        let err = router
698            .await_with_timeout(id, rx, Duration::from_millis(25))
699            .await
700            .expect_err("should timeout");
701        assert!(
702            err.to_string().to_lowercase().contains("timeout")
703                || err.to_string().to_lowercase().contains("closed"),
704            "unexpected error kind: {err}"
705        );
706
707        // After timeout, id should be reusable (cancel dropped the waiter & released the permit).
708        let _rx2 = router
709            .register(id)
710            .await
711            .expect("id should be reusable after timeout cancel");
712    }
713
714    #[rstest]
715    #[tokio::test(flavor = "multi_thread")]
716    async fn inflight_cap_blocks_then_unblocks() {
717        let router = PostRouter::new();
718
719        // Fill the inflight capacity.
720        let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
721        for i in 0..INFLIGHT_MAX {
722            let rx = router.register(i as u64).await.unwrap();
723            rxs.push(rx); // keep waiters alive
724        }
725
726        // Next register should block until a permit is freed.
727        let router2 = Arc::clone(&router);
728        let (entered_tx, entered_rx) = oneshot::channel::<()>();
729        let (done_tx, done_rx) = oneshot::channel::<()>();
730        let (check_tx, check_rx) = oneshot::channel::<()>(); // separate channel for checking
731
732        get_runtime().spawn(async move {
733            let _ = entered_tx.send(());
734            let _rx = router2.register(9_999_999).await.unwrap();
735            let _ = done_tx.send(());
736        });
737
738        // Confirm the task is trying to register…
739        entered_rx.await.unwrap();
740
741        // …and that it doesn't complete yet (still blocked on permit).
742        get_runtime().spawn(async move {
743            if done_rx.await.is_ok() {
744                let _ = check_tx.send(());
745            }
746        });
747
748        assert!(
749            timeout(Duration::from_millis(50), check_rx).await.is_err(),
750            "should still be blocked while at cap"
751        );
752
753        // Free one permit by cancelling a waiter.
754        router.cancel(0).await;
755
756        // Wait for the blocked register to complete.
757        tokio::time::sleep(Duration::from_millis(100)).await;
758    }
759
760    // --- Lane classifier -----------------------------------------------------------------------
761
762    #[rstest(
763        orders, expected,
764        case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
765        case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
766        case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
767        case::empty(vec![], PostLane::Normal),
768    )]
769    fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
770        let action = ActionRequest::Order {
771            orders,
772            grouping: "na".to_string(),
773        };
774        assert_eq!(lane_for_action(&action), expected);
775    }
776
777    // --- Builder Pattern Tests -----------------------------------------------------------------
778
779    #[rstest]
780    fn test_order_request_builder() {
781        // Test OrderRequestBuilder derived from #[derive(Builder)]
782        let order = OrderRequestBuilder::default()
783            .a(0)
784            .b(true)
785            .p("40000.0".to_string())
786            .s("0.01".to_string())
787            .r(false)
788            .t(OrderTypeRequest::Limit {
789                tif: TimeInForceRequest::Gtc,
790            })
791            .c(Some("test-order-1".to_string()))
792            .build()
793            .expect("should build order");
794
795        assert_eq!(order.a, 0);
796        assert!(order.b);
797        assert_eq!(order.p, "40000.0");
798        assert_eq!(order.s, "0.01");
799        assert!(!order.r);
800        assert_eq!(order.c, Some("test-order-1".to_string()));
801    }
802
803    #[rstest]
804    fn test_limit_order_params_builder() {
805        // Test LimitOrderParamsBuilder
806        let params = LimitOrderParamsBuilder::default()
807            .asset(0)
808            .is_buy(true)
809            .px("40000.0".to_string())
810            .sz("0.01".to_string())
811            .reduce_only(false)
812            .tif(TimeInForceRequest::Alo)
813            .cloid(Some("test-limit-1".to_string()))
814            .build()
815            .expect("should build limit params");
816
817        assert_eq!(params.asset, 0);
818        assert!(params.is_buy);
819        assert_eq!(params.px, "40000.0");
820        assert_eq!(params.sz, "0.01");
821        assert!(!params.reduce_only);
822        assert_eq!(params.cloid, Some("test-limit-1".to_string()));
823    }
824
825    #[rstest]
826    fn test_trigger_order_params_builder() {
827        // Test TriggerOrderParamsBuilder
828        let params = TriggerOrderParamsBuilder::default()
829            .asset(1)
830            .is_buy(false)
831            .px("39000.0".to_string())
832            .sz("0.02".to_string())
833            .reduce_only(false)
834            .is_market(true)
835            .trigger_px("39500.0".to_string())
836            .tpsl(TpSlRequest::Sl)
837            .cloid(Some("test-trigger-1".to_string()))
838            .build()
839            .expect("should build trigger params");
840
841        assert_eq!(params.asset, 1);
842        assert!(!params.is_buy);
843        assert_eq!(params.px, "39000.0");
844        assert!(params.is_market);
845        assert_eq!(params.trigger_px, "39500.0");
846    }
847
848    #[rstest]
849    fn test_order_builder_single_limit_convenience() {
850        // Test OrderBuilder::single_limit_order convenience method
851        let params = LimitOrderParamsBuilder::default()
852            .asset(0)
853            .is_buy(true)
854            .px("40000.0".to_string())
855            .sz("0.01".to_string())
856            .reduce_only(false)
857            .tif(TimeInForceRequest::Gtc)
858            .cloid(None)
859            .build()
860            .unwrap();
861
862        let action = OrderBuilder::single_limit_order(params);
863
864        match action {
865            ActionRequest::Order { orders, grouping } => {
866                assert_eq!(orders.len(), 1);
867                assert_eq!(orders[0].a, 0);
868                assert!(orders[0].b);
869                assert_eq!(grouping, "na");
870            }
871            _ => panic!("Expected ActionRequest::Order variant"),
872        }
873    }
874
875    #[rstest]
876    fn test_order_builder_single_trigger_convenience() {
877        // Test OrderBuilder::single_trigger_order convenience method
878        let params = TriggerOrderParamsBuilder::default()
879            .asset(1)
880            .is_buy(false)
881            .px("39000.0".to_string())
882            .sz("0.02".to_string())
883            .reduce_only(false)
884            .is_market(true)
885            .trigger_px("39500.0".to_string())
886            .tpsl(TpSlRequest::Sl)
887            .cloid(Some("sl-order".to_string()))
888            .build()
889            .unwrap();
890
891        let action = OrderBuilder::single_trigger_order(params);
892
893        match action {
894            ActionRequest::Order { orders, grouping } => {
895                assert_eq!(orders.len(), 1);
896                assert_eq!(orders[0].a, 1);
897                assert_eq!(orders[0].c, Some("sl-order".to_string()));
898                assert_eq!(grouping, "na");
899            }
900            _ => panic!("Expected ActionRequest::Order variant"),
901        }
902    }
903
904    #[rstest]
905    fn test_order_builder_batch_orders() {
906        // Test existing batch order functionality still works
907        let params1 = LimitOrderParams {
908            asset: 0,
909            is_buy: true,
910            px: "40000.0".to_string(),
911            sz: "0.01".to_string(),
912            reduce_only: false,
913            tif: TimeInForceRequest::Gtc,
914            cloid: Some("order-1".to_string()),
915        };
916
917        let params2 = LimitOrderParams {
918            asset: 1,
919            is_buy: false,
920            px: "2000.0".to_string(),
921            sz: "0.5".to_string(),
922            reduce_only: false,
923            tif: TimeInForceRequest::Ioc,
924            cloid: Some("order-2".to_string()),
925        };
926
927        let action = OrderBuilder::new()
928            .grouping(Grouping::NormalTpsl)
929            .push_limit_order(params1)
930            .push_limit_order(params2)
931            .build();
932
933        match action {
934            ActionRequest::Order { orders, grouping } => {
935                assert_eq!(orders.len(), 2);
936                assert_eq!(orders[0].c, Some("order-1".to_string()));
937                assert_eq!(orders[1].c, Some("order-2".to_string()));
938                assert_eq!(grouping, "normalTpsl");
939            }
940            _ => panic!("Expected ActionRequest::Order variant"),
941        }
942    }
943
944    #[rstest]
945    fn test_action_request_constructors() {
946        // Test ActionRequest::order() constructor
947        let order1 = mk_limit_gtc(0);
948        let order2 = mk_limit_gtc(1);
949        let action = ActionRequest::order(vec![order1, order2], "na");
950
951        match action {
952            ActionRequest::Order { orders, grouping } => {
953                assert_eq!(orders.len(), 2);
954                assert_eq!(grouping, "na");
955            }
956            _ => panic!("Expected ActionRequest::Order variant"),
957        }
958
959        // Test ActionRequest::cancel() constructor
960        let cancels = vec![CancelRequest { a: 0, o: 12345 }];
961        let action = ActionRequest::cancel(cancels);
962        assert!(matches!(action, ActionRequest::Cancel { .. }));
963
964        // Test ActionRequest::cancel_by_cloid() constructor
965        let cancels = vec![CancelByCloidRequest {
966            asset: 0,
967            cloid: "order-1".to_string(),
968        }];
969        let action = ActionRequest::cancel_by_cloid(cancels);
970        assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
971    }
972
973    // --- Batcher (tick flush path) --------------------------------------------------------------
974
975    #[rstest]
976    #[tokio::test(flavor = "multi_thread")]
977    async fn batcher_sends_on_tick() {
978        // Capture sent ids to prove dispatch happened.
979        let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
980        let sent_closure = sent.clone();
981
982        let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
983            let sent_inner = sent_closure.clone();
984            Box::pin(async move {
985                if let HyperliquidWsRequest::Post { id, .. } = req {
986                    sent_inner.lock().await.push(id);
987                }
988                Ok(())
989            })
990        };
991
992        let batcher = PostBatcher::new(send_fn);
993
994        // Enqueue a handful of posts into the NORMAL lane; tick is ~50ms.
995        for id in 1..=5u64 {
996            batcher
997                .enqueue(ScheduledPost {
998                    id,
999                    request: PostRequest::Info {
1000                        payload: serde_json::json!({"type":"allMids"}),
1001                    },
1002                    lane: PostLane::Normal,
1003                })
1004                .await
1005                .unwrap();
1006        }
1007
1008        // Wait for all 5 posts to be sent
1009        let sent_check = sent.clone();
1010        wait_until_async(
1011            || {
1012                let sent_inner = sent_check.clone();
1013                async move { sent_inner.lock().await.len() == 5 }
1014            },
1015            Duration::from_secs(2),
1016        )
1017        .await;
1018
1019        let got = sent.lock().await.clone();
1020        assert_eq!(got, vec![1, 2, 3, 4, 5]);
1021    }
1022}