nautilus_hyperliquid/websocket/
post.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    sync::{
19        Arc,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::Duration,
23};
24
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use tokio::{
28    sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
29    time,
30};
31
32use crate::{
33    common::consts::INFLIGHT_MAX,
34    http::{
35        error::{Error, Result},
36        models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
37    },
38    websocket::messages::{
39        ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
40        OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
41    },
42};
43
44// -------------------------------------------------------------------------------------------------
45// Correlation router for "channel":"post" → correlate by id
46//  - Enforces inflight cap using OwnedSemaphorePermit stored per waiter
47// -------------------------------------------------------------------------------------------------
48
49#[derive(Debug)]
50struct Waiter {
51    tx: oneshot::Sender<PostResponse>,
52    // When this is dropped, the permit is released, shrinking inflight
53    _permit: OwnedSemaphorePermit,
54}
55
56#[derive(Debug)]
57pub struct PostRouter {
58    inner: Mutex<HashMap<u64, Waiter>>,
59    inflight: Arc<Semaphore>, // hard cap per HL docs (e.g., 100)
60}
61
62impl Default for PostRouter {
63    fn default() -> Self {
64        Self {
65            inner: Mutex::new(HashMap::new()),
66            inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
67        }
68    }
69}
70
71impl PostRouter {
72    pub fn new() -> Arc<Self> {
73        Arc::new(Self::default())
74    }
75
76    /// Registers interest in a post id, enforcing inflight cap.
77    pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
78        // Acquire and retain a permit per inflight call
79        let permit = self
80            .inflight
81            .clone()
82            .acquire_owned()
83            .await
84            .map_err(|_| Error::transport("post router semaphore closed"))?;
85
86        let (tx, rx) = oneshot::channel::<PostResponse>();
87        let mut map = self.inner.lock().await;
88        if map.contains_key(&id) {
89            return Err(Error::transport(format!("post id {id} already registered")));
90        }
91        map.insert(
92            id,
93            Waiter {
94                tx,
95                _permit: permit,
96            },
97        );
98        Ok(rx)
99    }
100
101    /// Completes a waiting caller when a response arrives (releases inflight via Waiter drop).
102    pub async fn complete(&self, resp: PostResponse) {
103        let id = resp.id;
104        let waiter = {
105            let mut map = self.inner.lock().await;
106            map.remove(&id)
107        };
108        if let Some(waiter) = waiter {
109            if waiter.tx.send(resp).is_err() {
110                tracing::warn!(id, "post waiter dropped before delivery");
111            }
112            // waiter drops here → permit released
113        } else {
114            tracing::warn!(id, "post response with unknown id (late/duplicate?)");
115        }
116    }
117
118    /// Cancel a pending id (e.g., timeout); quietly succeed if id wasn't present.
119    pub async fn cancel(&self, id: u64) {
120        let _ = {
121            let mut map = self.inner.lock().await;
122            map.remove(&id)
123        };
124        // Waiter (and its permit) drop here if it existed
125    }
126
127    /// Await a response with timeout. On timeout or closed channel, cancels the id.
128    pub async fn await_with_timeout(
129        &self,
130        id: u64,
131        rx: oneshot::Receiver<PostResponse>,
132        timeout: Duration,
133    ) -> Result<PostResponse> {
134        match time::timeout(timeout, rx).await {
135            Ok(Ok(resp)) => Ok(resp),
136            Ok(Err(_closed)) => {
137                self.cancel(id).await;
138                Err(Error::transport("post response channel closed"))
139            }
140            Err(_elapsed) => {
141                self.cancel(id).await;
142                Err(Error::Timeout)
143            }
144        }
145    }
146}
147
148// -------------------------------------------------------------------------------------------------
149// ID generation
150// -------------------------------------------------------------------------------------------------
151
152#[derive(Debug)]
153pub struct PostIds(AtomicU64);
154
155impl PostIds {
156    pub fn new(start: u64) -> Self {
157        Self(AtomicU64::new(start))
158    }
159    pub fn next(&self) -> u64 {
160        self.0.fetch_add(1, Ordering::Relaxed)
161    }
162}
163
164// -------------------------------------------------------------------------------------------------
165// Lanes & batcher (scaffold). You can expand policy later.
166// -------------------------------------------------------------------------------------------------
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169pub enum PostLane {
170    Alo,    // Post-only orders
171    Normal, // IOC/GTC + info + anything else
172}
173
174#[derive(Debug)]
175pub struct ScheduledPost {
176    pub id: u64,
177    pub request: PostRequest,
178    pub lane: PostLane,
179}
180
181#[derive(Debug)]
182pub struct PostBatcher {
183    tx_alo: mpsc::Sender<ScheduledPost>,
184    tx_normal: mpsc::Sender<ScheduledPost>,
185}
186
187impl PostBatcher {
188    /// Spawns two lane tasks that batch-send scheduled posts via `send_fn`.
189    pub fn new<F>(send_fn: F) -> Self
190    where
191        F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
192    {
193        let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
194        let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
195
196        // ALO lane: batchy tick, low jitter
197        tokio::spawn(Self::run_lane(
198            "ALO",
199            rx_alo,
200            Duration::from_millis(100),
201            send_fn.clone(),
202        ));
203
204        // NORMAL lane: faster tick; adjust as needed
205        tokio::spawn(Self::run_lane(
206            "NORMAL",
207            rx_normal,
208            Duration::from_millis(50),
209            send_fn,
210        ));
211
212        Self { tx_alo, tx_normal }
213    }
214
215    async fn run_lane<F>(
216        lane_name: &'static str,
217        mut rx: mpsc::Receiver<ScheduledPost>,
218        tick: Duration,
219        mut send_fn: F,
220    ) where
221        F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
222    {
223        let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
224        let mut interval = time::interval(tick);
225        interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
226
227        loop {
228            tokio::select! {
229                maybe_item = rx.recv() => {
230                    match maybe_item {
231                        Some(item) => pend.push(item),
232                        None => break, // sender dropped → terminate lane task
233                    }
234                }
235                _ = interval.tick() => {
236                    if pend.is_empty() { continue; }
237                    let to_send = std::mem::take(&mut pend);
238                    for item in to_send {
239                        let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
240                        if let Err(e) = send_fn(req).await {
241                            tracing::error!(lane=%lane_name, id=%item.id, "failed to send post: {e}");
242                        }
243                    }
244                }
245            }
246        }
247        tracing::info!(lane=%lane_name, "post lane terminated");
248    }
249
250    pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
251        match item.lane {
252            PostLane::Alo => self
253                .tx_alo
254                .send(item)
255                .await
256                .map_err(|_| Error::transport("ALO lane closed")),
257            PostLane::Normal => self
258                .tx_normal
259                .send(item)
260                .await
261                .map_err(|_| Error::transport("NORMAL lane closed")),
262        }
263    }
264}
265
266// Helpers to classify lane from an action
267pub fn lane_for_action(action: &ActionRequest) -> PostLane {
268    match action {
269        ActionRequest::Order { orders, .. } => {
270            if orders.is_empty() {
271                return PostLane::Normal;
272            }
273            let all_alo = orders.iter().all(|o| {
274                matches!(
275                    o.t,
276                    OrderTypeRequest::Limit {
277                        tif: TimeInForceRequest::Alo
278                    }
279                )
280            });
281            if all_alo {
282                PostLane::Alo
283            } else {
284                PostLane::Normal
285            }
286        }
287        _ => PostLane::Normal,
288    }
289}
290
291// -------------------------------------------------------------------------------------------------
292// Typed builders (produce ActionRequest), plus Info request helpers.
293// -------------------------------------------------------------------------------------------------
294
295#[derive(Debug, Clone, Copy, Default)]
296pub enum Grouping {
297    #[default]
298    Na,
299    NormalTpsl,
300    PositionTpsl,
301}
302impl Grouping {
303    pub fn as_str(&self) -> &'static str {
304        match self {
305            Self::Na => "na",
306            Self::NormalTpsl => "normalTpsl",
307            Self::PositionTpsl => "positionTpsl",
308        }
309    }
310}
311
312/// Parameters for creating a limit order
313#[derive(Debug, Clone, Builder)]
314pub struct LimitOrderParams {
315    pub asset: u32,
316    pub is_buy: bool,
317    pub px: String,
318    pub sz: String,
319    pub reduce_only: bool,
320    pub tif: TimeInForceRequest,
321    pub cloid: Option<String>,
322}
323
324/// Parameters for creating a trigger order
325#[derive(Debug, Clone, Builder)]
326pub struct TriggerOrderParams {
327    pub asset: u32,
328    pub is_buy: bool,
329    pub px: String,
330    pub sz: String,
331    pub reduce_only: bool,
332    pub is_market: bool,
333    pub trigger_px: String,
334    pub tpsl: TpSlRequest,
335    pub cloid: Option<String>,
336}
337
338// ORDER builder (single or many)
339#[derive(Debug, Default)]
340pub struct OrderBuilder {
341    orders: Vec<OrderRequest>,
342    grouping: Grouping,
343}
344
345impl OrderBuilder {
346    pub fn new() -> Self {
347        Self::default()
348    }
349    pub fn grouping(mut self, g: Grouping) -> Self {
350        self.grouping = g;
351        self
352    }
353
354    /// Create a limit order with individual parameters (legacy method)
355    #[allow(clippy::too_many_arguments)]
356    pub fn push_limit(
357        self,
358        asset: u32,
359        is_buy: bool,
360        px: impl ToString,
361        sz: impl ToString,
362        reduce_only: bool,
363        tif: TimeInForceRequest,
364        cloid: Option<String>,
365    ) -> Self {
366        let params = LimitOrderParams {
367            asset,
368            is_buy,
369            px: px.to_string(),
370            sz: sz.to_string(),
371            reduce_only,
372            tif,
373            cloid,
374        };
375        self.push_limit_order(params)
376    }
377
378    /// Create a limit order using parameters struct
379    pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
380        self.orders.push(OrderRequest {
381            a: params.asset,
382            b: params.is_buy,
383            p: params.px,
384            s: params.sz,
385            r: params.reduce_only,
386            t: OrderTypeRequest::Limit { tif: params.tif },
387            c: params.cloid,
388        });
389        self
390    }
391
392    /// Create a trigger order with individual parameters (legacy method)
393    #[allow(clippy::too_many_arguments)]
394    pub fn push_trigger(
395        self,
396        asset: u32,
397        is_buy: bool,
398        px: impl ToString,
399        sz: impl ToString,
400        reduce_only: bool,
401        is_market: bool,
402        trigger_px: impl ToString,
403        tpsl: TpSlRequest,
404        cloid: Option<String>,
405    ) -> Self {
406        let params = TriggerOrderParams {
407            asset,
408            is_buy,
409            px: px.to_string(),
410            sz: sz.to_string(),
411            reduce_only,
412            is_market,
413            trigger_px: trigger_px.to_string(),
414            tpsl,
415            cloid,
416        };
417        self.push_trigger_order(params)
418    }
419
420    /// Create a trigger order using parameters struct
421    pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
422        self.orders.push(OrderRequest {
423            a: params.asset,
424            b: params.is_buy,
425            p: params.px,
426            s: params.sz,
427            r: params.reduce_only,
428            t: OrderTypeRequest::Trigger {
429                is_market: params.is_market,
430                trigger_px: params.trigger_px,
431                tpsl: params.tpsl,
432            },
433            c: params.cloid,
434        });
435        self
436    }
437    pub fn build(self) -> ActionRequest {
438        ActionRequest::Order {
439            orders: self.orders,
440            grouping: self.grouping.as_str().to_string(),
441        }
442    }
443
444    /// Create a single limit order action directly (convenience method)
445    ///
446    /// # Example
447    /// ```ignore
448    /// let action = OrderBuilder::single_limit_order(
449    ///     LimitOrderParamsBuilder::default()
450    ///         .asset(0)
451    ///         .is_buy(true)
452    ///         .px("40000.0")
453    ///         .sz("0.01")
454    ///         .reduce_only(false)
455    ///         .tif(TimeInForceRequest::Gtc)
456    ///         .build()
457    ///         .unwrap()
458    /// );
459    /// ```
460    pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
461        Self::new().push_limit_order(params).build()
462    }
463
464    /// Create a single trigger order action directly (convenience method)
465    ///
466    /// # Example
467    /// ```ignore
468    /// let action = OrderBuilder::single_trigger_order(
469    ///     TriggerOrderParamsBuilder::default()
470    ///         .asset(0)
471    ///         .is_buy(false)
472    ///         .px("39000.0")
473    ///         .sz("0.01")
474    ///         .reduce_only(false)
475    ///         .is_market(true)
476    ///         .trigger_px("39500.0")
477    ///         .tpsl(TpSlRequest::Sl)
478    ///         .build()
479    ///         .unwrap()
480    /// );
481    /// ```
482    pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
483        Self::new().push_trigger_order(params).build()
484    }
485}
486
487pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
488    ActionRequest::Cancel {
489        cancels: cancels
490            .into_iter()
491            .map(|(a, o)| CancelRequest { a, o })
492            .collect(),
493    }
494}
495pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
496    ActionRequest::CancelByCloid {
497        cancels: vec![CancelByCloidRequest {
498            asset,
499            cloid: cloid.into(),
500        }],
501    }
502}
503pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
504    ActionRequest::Modify {
505        modifies: vec![ModifyRequest {
506            oid,
507            order: new_order,
508        }],
509    }
510}
511
512// Info wrappers (bodies go under PostRequest::Info{ payload })
513pub fn info_l2_book(coin: &str) -> PostRequest {
514    PostRequest::Info {
515        payload: serde_json::json!({"type":"l2Book","coin":coin}),
516    }
517}
518pub fn info_all_mids() -> PostRequest {
519    PostRequest::Info {
520        payload: serde_json::json!({"type":"allMids"}),
521    }
522}
523pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
524    PostRequest::Info {
525        payload: serde_json::json!({"type":"orderStatus","user":user,"oid":oid}),
526    }
527}
528pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
529    let mut body = serde_json::json!({"type":"openOrders","user":user});
530    if let Some(fe) = frontend {
531        body["frontend"] = serde_json::json!(fe);
532    }
533    PostRequest::Info { payload: body }
534}
535pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
536    let mut body = serde_json::json!({"type":"userFills","user":user});
537    if let Some(agg) = aggregate_by_time {
538        body["aggregateByTime"] = serde_json::json!(agg);
539    }
540    PostRequest::Info { payload: body }
541}
542pub fn info_user_rate_limit(user: &str) -> PostRequest {
543    PostRequest::Info {
544        payload: serde_json::json!({"type":"userRateLimit","user":user}),
545    }
546}
547pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
548    PostRequest::Info {
549        payload: serde_json::json!({"type":"candle","coin":coin,"interval":interval}),
550    }
551}
552
553// -------------------------------------------------------------------------------------------------
554// Minimal response helpers
555// -------------------------------------------------------------------------------------------------
556
557pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
558    serde_json::from_value(payload.clone()).map_err(Error::Serde)
559}
560pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
561    serde_json::from_value(payload.clone()).map_err(Error::Serde)
562}
563pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
564    serde_json::from_value(payload.clone()).map_err(Error::Serde)
565}
566
567/// Heuristic classification for action responses.
568#[derive(Debug)]
569pub enum ActionOutcome<'a> {
570    Resting {
571        oid: u64,
572    },
573    Filled {
574        total_sz: &'a str,
575        avg_px: &'a str,
576        oid: Option<u64>,
577    },
578    Error {
579        msg: &'a str,
580    },
581    Unknown(&'a serde_json::Value),
582}
583pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
584    if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
585        if let (Some(total_sz), Some(avg_px)) = (
586            payload.get("totalSz").and_then(|v| v.as_str()),
587            payload.get("avgPx").and_then(|v| v.as_str()),
588        ) {
589            return ActionOutcome::Filled {
590                total_sz,
591                avg_px,
592                oid: Some(oid),
593            };
594        }
595        return ActionOutcome::Resting { oid };
596    }
597    if let (Some(total_sz), Some(avg_px)) = (
598        payload.get("totalSz").and_then(|v| v.as_str()),
599        payload.get("avgPx").and_then(|v| v.as_str()),
600    ) {
601        return ActionOutcome::Filled {
602            total_sz,
603            avg_px,
604            oid: None,
605        };
606    }
607    if let Some(msg) = payload
608        .get("error")
609        .and_then(|v| v.as_str())
610        .or_else(|| payload.get("message").and_then(|v| v.as_str()))
611    {
612        return ActionOutcome::Error { msg };
613    }
614    ActionOutcome::Unknown(payload)
615}
616
617// -------------------------------------------------------------------------------------------------
618// Glue helpers used by the client (wired in client.rs)
619// -------------------------------------------------------------------------------------------------
620
621#[derive(Clone, Debug)]
622pub struct WsSender {
623    inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
624}
625
626impl WsSender {
627    pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
628        Self {
629            inner: Arc::new(tokio::sync::Mutex::new(tx)),
630        }
631    }
632
633    pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
634        let sender = self.inner.lock().await;
635        sender
636            .send(req)
637            .await
638            .map_err(|_| Error::transport("WebSocket sender closed"))
639    }
640}
641
642////////////////////////////////////////////////////////////////////////////////
643// Tests
644////////////////////////////////////////////////////////////////////////////////
645
646#[cfg(test)]
647mod tests {
648    use rstest::rstest;
649    use tokio::{
650        sync::oneshot,
651        time::{Duration, sleep, timeout},
652    };
653
654    use super::*;
655    use crate::{
656        common::consts::INFLIGHT_MAX,
657        websocket::messages::{
658            ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
659            OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
660        },
661    };
662
663    // --- helpers -------------------------------------------------------------------------------
664
665    fn mk_limit_alo(asset: u32) -> OrderRequest {
666        OrderRequest {
667            a: asset,
668            b: true,
669            p: "1".to_string(),
670            s: "1".to_string(),
671            r: false,
672            t: OrderTypeRequest::Limit {
673                tif: TimeInForceRequest::Alo,
674            },
675            c: None,
676        }
677    }
678
679    fn mk_limit_gtc(asset: u32) -> OrderRequest {
680        OrderRequest {
681            a: asset,
682            b: true,
683            p: "1".to_string(),
684            s: "1".to_string(),
685            r: false,
686            t: OrderTypeRequest::Limit {
687                // any non-ALO TIF keeps it in the Normal lane
688                tif: TimeInForceRequest::Gtc,
689            },
690            c: None,
691        }
692    }
693
694    // --- PostRouter ---------------------------------------------------------------------------
695
696    #[rstest]
697    #[tokio::test(flavor = "multi_thread")]
698    async fn register_duplicate_id_errors() {
699        let router = PostRouter::new();
700        let _rx = router.register(42).await.expect("first register OK");
701
702        let err = router.register(42).await.expect_err("duplicate must error");
703        let msg = err.to_string().to_lowercase();
704        assert!(
705            msg.contains("already") || msg.contains("duplicate"),
706            "unexpected error: {msg}"
707        );
708    }
709
710    #[rstest]
711    #[tokio::test(flavor = "multi_thread")]
712    async fn timeout_cancels_and_allows_reregister() {
713        let router = PostRouter::new();
714        let id = 7;
715
716        let rx = router.register(id).await.unwrap();
717        // No complete() → ensure we time out and the waiter is removed.
718        let err = router
719            .await_with_timeout(id, rx, Duration::from_millis(25))
720            .await
721            .expect_err("should timeout");
722        assert!(
723            err.to_string().to_lowercase().contains("timeout")
724                || err.to_string().to_lowercase().contains("closed"),
725            "unexpected error kind: {err}"
726        );
727
728        // After timeout, id should be reusable (cancel dropped the waiter & released the permit).
729        let _rx2 = router
730            .register(id)
731            .await
732            .expect("id should be reusable after timeout cancel");
733    }
734
735    #[rstest]
736    #[tokio::test(flavor = "multi_thread")]
737    async fn inflight_cap_blocks_then_unblocks() {
738        let router = PostRouter::new();
739
740        // Fill the inflight capacity.
741        let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
742        for i in 0..INFLIGHT_MAX {
743            let rx = router.register(i as u64).await.unwrap();
744            rxs.push(rx); // keep waiters alive
745        }
746
747        // Next register should block until a permit is freed.
748        let router2 = Arc::clone(&router);
749        let (entered_tx, entered_rx) = oneshot::channel::<()>();
750        let (done_tx, done_rx) = oneshot::channel::<()>();
751        let (check_tx, check_rx) = oneshot::channel::<()>(); // separate channel for checking
752
753        tokio::spawn(async move {
754            let _ = entered_tx.send(());
755            let _rx = router2.register(9_999_999).await.unwrap();
756            let _ = done_tx.send(());
757        });
758
759        // Confirm the task is trying to register…
760        entered_rx.await.unwrap();
761
762        // …and that it doesn't complete yet (still blocked on permit).
763        tokio::spawn(async move {
764            if done_rx.await.is_ok() {
765                let _ = check_tx.send(());
766            }
767        });
768
769        assert!(
770            timeout(Duration::from_millis(50), check_rx).await.is_err(),
771            "should still be blocked while at cap"
772        );
773
774        // Free one permit by cancelling a waiter.
775        router.cancel(0).await;
776
777        // Wait for the blocked register to complete.
778        tokio::time::sleep(Duration::from_millis(100)).await;
779    }
780
781    // --- Lane classifier -----------------------------------------------------------------------
782
783    #[rstest(
784        orders, expected,
785        case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
786        case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
787        case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
788        case::empty(vec![], PostLane::Normal),
789    )]
790    fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
791        let action = ActionRequest::Order {
792            orders,
793            grouping: "na".to_string(),
794        };
795        assert_eq!(lane_for_action(&action), expected);
796    }
797
798    // --- Builder Pattern Tests -----------------------------------------------------------------
799
800    #[test]
801    fn test_order_request_builder() {
802        // Test OrderRequestBuilder derived from #[derive(Builder)]
803        let order = OrderRequestBuilder::default()
804            .a(0)
805            .b(true)
806            .p("40000.0".to_string())
807            .s("0.01".to_string())
808            .r(false)
809            .t(OrderTypeRequest::Limit {
810                tif: TimeInForceRequest::Gtc,
811            })
812            .c(Some("test-order-1".to_string()))
813            .build()
814            .expect("should build order");
815
816        assert_eq!(order.a, 0);
817        assert!(order.b);
818        assert_eq!(order.p, "40000.0");
819        assert_eq!(order.s, "0.01");
820        assert!(!order.r);
821        assert_eq!(order.c, Some("test-order-1".to_string()));
822    }
823
824    #[test]
825    fn test_limit_order_params_builder() {
826        // Test LimitOrderParamsBuilder
827        let params = LimitOrderParamsBuilder::default()
828            .asset(0)
829            .is_buy(true)
830            .px("40000.0".to_string())
831            .sz("0.01".to_string())
832            .reduce_only(false)
833            .tif(TimeInForceRequest::Alo)
834            .cloid(Some("test-limit-1".to_string()))
835            .build()
836            .expect("should build limit params");
837
838        assert_eq!(params.asset, 0);
839        assert!(params.is_buy);
840        assert_eq!(params.px, "40000.0");
841        assert_eq!(params.sz, "0.01");
842        assert!(!params.reduce_only);
843        assert_eq!(params.cloid, Some("test-limit-1".to_string()));
844    }
845
846    #[test]
847    fn test_trigger_order_params_builder() {
848        // Test TriggerOrderParamsBuilder
849        let params = TriggerOrderParamsBuilder::default()
850            .asset(1)
851            .is_buy(false)
852            .px("39000.0".to_string())
853            .sz("0.02".to_string())
854            .reduce_only(false)
855            .is_market(true)
856            .trigger_px("39500.0".to_string())
857            .tpsl(TpSlRequest::Sl)
858            .cloid(Some("test-trigger-1".to_string()))
859            .build()
860            .expect("should build trigger params");
861
862        assert_eq!(params.asset, 1);
863        assert!(!params.is_buy);
864        assert_eq!(params.px, "39000.0");
865        assert!(params.is_market);
866        assert_eq!(params.trigger_px, "39500.0");
867    }
868
869    #[test]
870    fn test_order_builder_single_limit_convenience() {
871        // Test OrderBuilder::single_limit_order convenience method
872        let params = LimitOrderParamsBuilder::default()
873            .asset(0)
874            .is_buy(true)
875            .px("40000.0".to_string())
876            .sz("0.01".to_string())
877            .reduce_only(false)
878            .tif(TimeInForceRequest::Gtc)
879            .cloid(None)
880            .build()
881            .unwrap();
882
883        let action = OrderBuilder::single_limit_order(params);
884
885        match action {
886            ActionRequest::Order { orders, grouping } => {
887                assert_eq!(orders.len(), 1);
888                assert_eq!(orders[0].a, 0);
889                assert!(orders[0].b);
890                assert_eq!(grouping, "na");
891            }
892            _ => panic!("Expected ActionRequest::Order variant"),
893        }
894    }
895
896    #[test]
897    fn test_order_builder_single_trigger_convenience() {
898        // Test OrderBuilder::single_trigger_order convenience method
899        let params = TriggerOrderParamsBuilder::default()
900            .asset(1)
901            .is_buy(false)
902            .px("39000.0".to_string())
903            .sz("0.02".to_string())
904            .reduce_only(false)
905            .is_market(true)
906            .trigger_px("39500.0".to_string())
907            .tpsl(TpSlRequest::Sl)
908            .cloid(Some("sl-order".to_string()))
909            .build()
910            .unwrap();
911
912        let action = OrderBuilder::single_trigger_order(params);
913
914        match action {
915            ActionRequest::Order { orders, grouping } => {
916                assert_eq!(orders.len(), 1);
917                assert_eq!(orders[0].a, 1);
918                assert_eq!(orders[0].c, Some("sl-order".to_string()));
919                assert_eq!(grouping, "na");
920            }
921            _ => panic!("Expected ActionRequest::Order variant"),
922        }
923    }
924
925    #[test]
926    fn test_order_builder_batch_orders() {
927        // Test existing batch order functionality still works
928        let params1 = LimitOrderParams {
929            asset: 0,
930            is_buy: true,
931            px: "40000.0".to_string(),
932            sz: "0.01".to_string(),
933            reduce_only: false,
934            tif: TimeInForceRequest::Gtc,
935            cloid: Some("order-1".to_string()),
936        };
937
938        let params2 = LimitOrderParams {
939            asset: 1,
940            is_buy: false,
941            px: "2000.0".to_string(),
942            sz: "0.5".to_string(),
943            reduce_only: false,
944            tif: TimeInForceRequest::Ioc,
945            cloid: Some("order-2".to_string()),
946        };
947
948        let action = OrderBuilder::new()
949            .grouping(Grouping::NormalTpsl)
950            .push_limit_order(params1)
951            .push_limit_order(params2)
952            .build();
953
954        match action {
955            ActionRequest::Order { orders, grouping } => {
956                assert_eq!(orders.len(), 2);
957                assert_eq!(orders[0].c, Some("order-1".to_string()));
958                assert_eq!(orders[1].c, Some("order-2".to_string()));
959                assert_eq!(grouping, "normalTpsl");
960            }
961            _ => panic!("Expected ActionRequest::Order variant"),
962        }
963    }
964
965    #[test]
966    fn test_action_request_constructors() {
967        // Test ActionRequest::order() constructor
968        let order1 = mk_limit_gtc(0);
969        let order2 = mk_limit_gtc(1);
970        let action = ActionRequest::order(vec![order1, order2], "na");
971
972        match action {
973            ActionRequest::Order { orders, grouping } => {
974                assert_eq!(orders.len(), 2);
975                assert_eq!(grouping, "na");
976            }
977            _ => panic!("Expected ActionRequest::Order variant"),
978        }
979
980        // Test ActionRequest::cancel() constructor
981        let cancels = vec![CancelRequest { a: 0, o: 12345 }];
982        let action = ActionRequest::cancel(cancels);
983        assert!(matches!(action, ActionRequest::Cancel { .. }));
984
985        // Test ActionRequest::cancel_by_cloid() constructor
986        let cancels = vec![CancelByCloidRequest {
987            asset: 0,
988            cloid: "order-1".to_string(),
989        }];
990        let action = ActionRequest::cancel_by_cloid(cancels);
991        assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
992    }
993
994    // --- Batcher (tick flush path) --------------------------------------------------------------
995
996    #[rstest]
997    #[tokio::test(flavor = "multi_thread")]
998    async fn batcher_sends_on_tick() {
999        // Capture sent ids to prove dispatch happened.
1000        let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1001        let sent_closure = sent.clone();
1002
1003        let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
1004            let sent_inner = sent_closure.clone();
1005            Box::pin(async move {
1006                if let HyperliquidWsRequest::Post { id, .. } = req {
1007                    sent_inner.lock().await.push(id);
1008                }
1009                Ok(())
1010            })
1011        };
1012
1013        let batcher = PostBatcher::new(send_fn);
1014
1015        // Enqueue a handful of posts into the NORMAL lane; tick is ~50ms.
1016        for id in 1..=5u64 {
1017            batcher
1018                .enqueue(ScheduledPost {
1019                    id,
1020                    request: PostRequest::Info {
1021                        payload: serde_json::json!({"type":"allMids"}),
1022                    },
1023                    lane: PostLane::Normal,
1024                })
1025                .await
1026                .unwrap();
1027        }
1028
1029        // Wait slightly past one tick to allow the lane to flush.
1030        sleep(Duration::from_millis(80)).await;
1031
1032        let got = sent.lock().await.clone();
1033        assert_eq!(got.len(), 5, "expected 5 sends on first tick");
1034        assert_eq!(got, vec![1, 2, 3, 4, 5]);
1035    }
1036}