nautilus_hyperliquid/websocket/
post.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    sync::{
19        Arc,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::Duration,
23};
24
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use nautilus_common::live::get_runtime;
28use tokio::{
29    sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
30    time,
31};
32
33use crate::{
34    common::consts::INFLIGHT_MAX,
35    http::{
36        error::{Error, Result},
37        models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
38    },
39    websocket::messages::{
40        ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
41        OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
42    },
43};
44
45// -------------------------------------------------------------------------------------------------
46// Correlation router for "channel":"post" → correlate by id
47//  - Enforces inflight cap using OwnedSemaphorePermit stored per waiter
48// -------------------------------------------------------------------------------------------------
49
50#[derive(Debug)]
51struct Waiter {
52    tx: oneshot::Sender<PostResponse>,
53    // When this is dropped, the permit is released, shrinking inflight
54    _permit: OwnedSemaphorePermit,
55}
56
57#[derive(Debug)]
58pub struct PostRouter {
59    inner: Mutex<HashMap<u64, Waiter>>,
60    inflight: Arc<Semaphore>, // hard cap per HL docs (e.g., 100)
61}
62
63impl Default for PostRouter {
64    fn default() -> Self {
65        Self {
66            inner: Mutex::new(HashMap::new()),
67            inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
68        }
69    }
70}
71
72impl PostRouter {
73    pub fn new() -> Arc<Self> {
74        Arc::new(Self::default())
75    }
76
77    /// Registers interest in a post id, enforcing inflight cap.
78    pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
79        // Acquire and retain a permit per inflight call
80        let permit = self
81            .inflight
82            .clone()
83            .acquire_owned()
84            .await
85            .map_err(|_| Error::transport("post router semaphore closed"))?;
86
87        let (tx, rx) = oneshot::channel::<PostResponse>();
88        let mut map = self.inner.lock().await;
89        if map.contains_key(&id) {
90            return Err(Error::transport(format!("post id {id} already registered")));
91        }
92        map.insert(
93            id,
94            Waiter {
95                tx,
96                _permit: permit,
97            },
98        );
99        Ok(rx)
100    }
101
102    /// Completes a waiting caller when a response arrives (releases inflight via Waiter drop).
103    pub async fn complete(&self, resp: PostResponse) {
104        let id = resp.id;
105        let waiter = {
106            let mut map = self.inner.lock().await;
107            map.remove(&id)
108        };
109        if let Some(waiter) = waiter {
110            if waiter.tx.send(resp).is_err() {
111                log::warn!("Post waiter dropped before delivery: id={id}");
112            }
113            // waiter drops here → permit released
114        } else {
115            log::warn!("Post response with unknown id (late/duplicate?): id={id}");
116        }
117    }
118
119    /// Cancel a pending id (e.g., timeout); quietly succeed if id wasn't present.
120    pub async fn cancel(&self, id: u64) {
121        let _ = {
122            let mut map = self.inner.lock().await;
123            map.remove(&id)
124        };
125        // Waiter (and its permit) drop here if it existed
126    }
127
128    /// Await a response with timeout. On timeout or closed channel, cancels the id.
129    pub async fn await_with_timeout(
130        &self,
131        id: u64,
132        rx: oneshot::Receiver<PostResponse>,
133        timeout: Duration,
134    ) -> Result<PostResponse> {
135        match time::timeout(timeout, rx).await {
136            Ok(Ok(resp)) => Ok(resp),
137            Ok(Err(_closed)) => {
138                self.cancel(id).await;
139                Err(Error::transport("post response channel closed"))
140            }
141            Err(_elapsed) => {
142                self.cancel(id).await;
143                Err(Error::Timeout)
144            }
145        }
146    }
147}
148
149// -------------------------------------------------------------------------------------------------
150// ID generation
151// -------------------------------------------------------------------------------------------------
152
153#[derive(Debug)]
154pub struct PostIds(AtomicU64);
155
156impl PostIds {
157    pub fn new(start: u64) -> Self {
158        Self(AtomicU64::new(start))
159    }
160    pub fn next(&self) -> u64 {
161        self.0.fetch_add(1, Ordering::Relaxed)
162    }
163}
164
165// -------------------------------------------------------------------------------------------------
166// Lanes & batcher (scaffold). You can expand policy later.
167// -------------------------------------------------------------------------------------------------
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum PostLane {
171    Alo,    // Post-only orders
172    Normal, // IOC/GTC + info + anything else
173}
174
175#[derive(Debug)]
176pub struct ScheduledPost {
177    pub id: u64,
178    pub request: PostRequest,
179    pub lane: PostLane,
180}
181
182#[derive(Debug)]
183pub struct PostBatcher {
184    tx_alo: mpsc::Sender<ScheduledPost>,
185    tx_normal: mpsc::Sender<ScheduledPost>,
186}
187
188impl PostBatcher {
189    /// Spawns two lane tasks that batch-send scheduled posts via `send_fn`.
190    pub fn new<F>(send_fn: F) -> Self
191    where
192        F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
193    {
194        let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
195        let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
196
197        // ALO lane: batchy tick, low jitter
198        get_runtime().spawn(Self::run_lane(
199            "ALO",
200            rx_alo,
201            Duration::from_millis(100),
202            send_fn.clone(),
203        ));
204
205        // NORMAL lane: faster tick; adjust as needed
206        get_runtime().spawn(Self::run_lane(
207            "NORMAL",
208            rx_normal,
209            Duration::from_millis(50),
210            send_fn,
211        ));
212
213        Self { tx_alo, tx_normal }
214    }
215
216    async fn run_lane<F>(
217        lane_name: &'static str,
218        mut rx: mpsc::Receiver<ScheduledPost>,
219        tick: Duration,
220        mut send_fn: F,
221    ) where
222        F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
223    {
224        let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
225        let mut interval = time::interval(tick);
226        interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
227
228        loop {
229            tokio::select! {
230                maybe_item = rx.recv() => {
231                    match maybe_item {
232                        Some(item) => pend.push(item),
233                        None => break, // sender dropped → terminate lane task
234                    }
235                }
236                _ = interval.tick() => {
237                    if pend.is_empty() { continue; }
238                    let to_send = std::mem::take(&mut pend);
239                    for item in to_send {
240                        let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
241                        if let Err(e) = send_fn(req).await {
242                            log::error!("Failed to send post: lane={lane_name}, id={}, {e}", item.id);
243                        }
244                    }
245                }
246            }
247        }
248        log::info!("Post lane terminated: lane={lane_name}");
249    }
250
251    pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
252        match item.lane {
253            PostLane::Alo => self
254                .tx_alo
255                .send(item)
256                .await
257                .map_err(|_| Error::transport("ALO lane closed")),
258            PostLane::Normal => self
259                .tx_normal
260                .send(item)
261                .await
262                .map_err(|_| Error::transport("NORMAL lane closed")),
263        }
264    }
265}
266
267// Helpers to classify lane from an action
268pub fn lane_for_action(action: &ActionRequest) -> PostLane {
269    match action {
270        ActionRequest::Order { orders, .. } => {
271            if orders.is_empty() {
272                return PostLane::Normal;
273            }
274            let all_alo = orders.iter().all(|o| {
275                matches!(
276                    o.t,
277                    OrderTypeRequest::Limit {
278                        tif: TimeInForceRequest::Alo
279                    }
280                )
281            });
282            if all_alo {
283                PostLane::Alo
284            } else {
285                PostLane::Normal
286            }
287        }
288        _ => PostLane::Normal,
289    }
290}
291
292// -------------------------------------------------------------------------------------------------
293// Typed builders (produce ActionRequest), plus Info request helpers.
294// -------------------------------------------------------------------------------------------------
295
296#[derive(Debug, Clone, Copy, Default)]
297pub enum Grouping {
298    #[default]
299    Na,
300    NormalTpsl,
301    PositionTpsl,
302}
303impl Grouping {
304    pub fn as_str(&self) -> &'static str {
305        match self {
306            Self::Na => "na",
307            Self::NormalTpsl => "normalTpsl",
308            Self::PositionTpsl => "positionTpsl",
309        }
310    }
311}
312
313/// Parameters for creating a limit order.
314#[derive(Debug, Clone, Builder)]
315pub struct LimitOrderParams {
316    pub asset: u32,
317    pub is_buy: bool,
318    pub px: String,
319    pub sz: String,
320    pub reduce_only: bool,
321    pub tif: TimeInForceRequest,
322    pub cloid: Option<String>,
323}
324
325/// Parameters for creating a trigger order.
326#[derive(Debug, Clone, Builder)]
327pub struct TriggerOrderParams {
328    pub asset: u32,
329    pub is_buy: bool,
330    pub px: String,
331    pub sz: String,
332    pub reduce_only: bool,
333    pub is_market: bool,
334    pub trigger_px: String,
335    pub tpsl: TpSlRequest,
336    pub cloid: Option<String>,
337}
338
339// ORDER builder (single or many)
340#[derive(Debug, Default)]
341pub struct OrderBuilder {
342    orders: Vec<OrderRequest>,
343    grouping: Grouping,
344}
345
346impl OrderBuilder {
347    pub fn new() -> Self {
348        Self::default()
349    }
350
351    #[must_use]
352    pub fn grouping(mut self, g: Grouping) -> Self {
353        self.grouping = g;
354        self
355    }
356
357    /// Create a limit order with individual parameters (legacy method)
358    #[allow(clippy::too_many_arguments)]
359    #[must_use]
360    pub fn push_limit(
361        self,
362        asset: u32,
363        is_buy: bool,
364        px: impl ToString,
365        sz: impl ToString,
366        reduce_only: bool,
367        tif: TimeInForceRequest,
368        cloid: Option<String>,
369    ) -> Self {
370        let params = LimitOrderParams {
371            asset,
372            is_buy,
373            px: px.to_string(),
374            sz: sz.to_string(),
375            reduce_only,
376            tif,
377            cloid,
378        };
379        self.push_limit_order(params)
380    }
381
382    /// Create a limit order using parameters struct
383    #[must_use]
384    pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
385        self.orders.push(OrderRequest {
386            a: params.asset,
387            b: params.is_buy,
388            p: params.px,
389            s: params.sz,
390            r: params.reduce_only,
391            t: OrderTypeRequest::Limit { tif: params.tif },
392            c: params.cloid,
393        });
394        self
395    }
396
397    /// Create a trigger order with individual parameters (legacy method)
398    #[allow(clippy::too_many_arguments)]
399    #[must_use]
400    pub fn push_trigger(
401        self,
402        asset: u32,
403        is_buy: bool,
404        px: impl ToString,
405        sz: impl ToString,
406        reduce_only: bool,
407        is_market: bool,
408        trigger_px: impl ToString,
409        tpsl: TpSlRequest,
410        cloid: Option<String>,
411    ) -> Self {
412        let params = TriggerOrderParams {
413            asset,
414            is_buy,
415            px: px.to_string(),
416            sz: sz.to_string(),
417            reduce_only,
418            is_market,
419            trigger_px: trigger_px.to_string(),
420            tpsl,
421            cloid,
422        };
423        self.push_trigger_order(params)
424    }
425
426    /// Create a trigger order using parameters struct
427    #[must_use]
428    pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
429        self.orders.push(OrderRequest {
430            a: params.asset,
431            b: params.is_buy,
432            p: params.px,
433            s: params.sz,
434            r: params.reduce_only,
435            t: OrderTypeRequest::Trigger {
436                is_market: params.is_market,
437                trigger_px: params.trigger_px,
438                tpsl: params.tpsl,
439            },
440            c: params.cloid,
441        });
442        self
443    }
444    pub fn build(self) -> ActionRequest {
445        ActionRequest::Order {
446            orders: self.orders,
447            grouping: self.grouping.as_str().to_string(),
448        }
449    }
450
451    /// Create a single limit order action directly (convenience method)
452    ///
453    /// # Example
454    /// ```ignore
455    /// let action = OrderBuilder::single_limit_order(
456    ///     LimitOrderParamsBuilder::default()
457    ///         .asset(0)
458    ///         .is_buy(true)
459    ///         .px("40000.0")
460    ///         .sz("0.01")
461    ///         .reduce_only(false)
462    ///         .tif(TimeInForceRequest::Gtc)
463    ///         .build()
464    ///         .unwrap()
465    /// );
466    /// ```
467    pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
468        Self::new().push_limit_order(params).build()
469    }
470
471    /// Create a single trigger order action directly (convenience method)
472    ///
473    /// # Example
474    /// ```ignore
475    /// let action = OrderBuilder::single_trigger_order(
476    ///     TriggerOrderParamsBuilder::default()
477    ///         .asset(0)
478    ///         .is_buy(false)
479    ///         .px("39000.0")
480    ///         .sz("0.01")
481    ///         .reduce_only(false)
482    ///         .is_market(true)
483    ///         .trigger_px("39500.0")
484    ///         .tpsl(TpSlRequest::Sl)
485    ///         .build()
486    ///         .unwrap()
487    /// );
488    /// ```
489    pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
490        Self::new().push_trigger_order(params).build()
491    }
492}
493
494pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
495    ActionRequest::Cancel {
496        cancels: cancels
497            .into_iter()
498            .map(|(a, o)| CancelRequest { a, o })
499            .collect(),
500    }
501}
502pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
503    ActionRequest::CancelByCloid {
504        cancels: vec![CancelByCloidRequest {
505            asset,
506            cloid: cloid.into(),
507        }],
508    }
509}
510pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
511    ActionRequest::Modify {
512        modifies: vec![ModifyRequest {
513            oid,
514            order: new_order,
515        }],
516    }
517}
518
519// Info wrappers (bodies go under PostRequest::Info{ payload })
520pub fn info_l2_book(coin: &str) -> PostRequest {
521    PostRequest::Info {
522        payload: serde_json::json!({"type":"l2Book","coin":coin}),
523    }
524}
525pub fn info_all_mids() -> PostRequest {
526    PostRequest::Info {
527        payload: serde_json::json!({"type":"allMids"}),
528    }
529}
530pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
531    PostRequest::Info {
532        payload: serde_json::json!({"type":"orderStatus","user":user,"oid":oid}),
533    }
534}
535pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
536    let mut body = serde_json::json!({"type":"openOrders","user":user});
537    if let Some(fe) = frontend {
538        body["frontend"] = serde_json::json!(fe);
539    }
540    PostRequest::Info { payload: body }
541}
542pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
543    let mut body = serde_json::json!({"type":"userFills","user":user});
544    if let Some(agg) = aggregate_by_time {
545        body["aggregateByTime"] = serde_json::json!(agg);
546    }
547    PostRequest::Info { payload: body }
548}
549pub fn info_user_rate_limit(user: &str) -> PostRequest {
550    PostRequest::Info {
551        payload: serde_json::json!({"type":"userRateLimit","user":user}),
552    }
553}
554pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
555    PostRequest::Info {
556        payload: serde_json::json!({"type":"candle","coin":coin,"interval":interval}),
557    }
558}
559
560// -------------------------------------------------------------------------------------------------
561// Minimal response helpers
562// -------------------------------------------------------------------------------------------------
563
564pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
565    serde_json::from_value(payload.clone()).map_err(Error::Serde)
566}
567pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
568    serde_json::from_value(payload.clone()).map_err(Error::Serde)
569}
570pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
571    serde_json::from_value(payload.clone()).map_err(Error::Serde)
572}
573
574/// Heuristic classification for action responses.
575#[derive(Debug)]
576pub enum ActionOutcome<'a> {
577    Resting {
578        oid: u64,
579    },
580    Filled {
581        total_sz: &'a str,
582        avg_px: &'a str,
583        oid: Option<u64>,
584    },
585    Error {
586        msg: &'a str,
587    },
588    Unknown(&'a serde_json::Value),
589}
590pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
591    if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
592        if let (Some(total_sz), Some(avg_px)) = (
593            payload.get("totalSz").and_then(|v| v.as_str()),
594            payload.get("avgPx").and_then(|v| v.as_str()),
595        ) {
596            return ActionOutcome::Filled {
597                total_sz,
598                avg_px,
599                oid: Some(oid),
600            };
601        }
602        return ActionOutcome::Resting { oid };
603    }
604    if let (Some(total_sz), Some(avg_px)) = (
605        payload.get("totalSz").and_then(|v| v.as_str()),
606        payload.get("avgPx").and_then(|v| v.as_str()),
607    ) {
608        return ActionOutcome::Filled {
609            total_sz,
610            avg_px,
611            oid: None,
612        };
613    }
614    if let Some(msg) = payload
615        .get("error")
616        .and_then(|v| v.as_str())
617        .or_else(|| payload.get("message").and_then(|v| v.as_str()))
618    {
619        return ActionOutcome::Error { msg };
620    }
621    ActionOutcome::Unknown(payload)
622}
623
624// -------------------------------------------------------------------------------------------------
625// Glue helpers used by the client (wired in client.rs)
626// -------------------------------------------------------------------------------------------------
627
628#[derive(Clone, Debug)]
629pub struct WsSender {
630    inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
631}
632
633impl WsSender {
634    pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
635        Self {
636            inner: Arc::new(tokio::sync::Mutex::new(tx)),
637        }
638    }
639
640    pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
641        let sender = self.inner.lock().await;
642        sender
643            .send(req)
644            .await
645            .map_err(|_| Error::transport("WebSocket sender closed"))
646    }
647}
648
649#[cfg(test)]
650mod tests {
651    use rstest::rstest;
652    use tokio::{
653        sync::oneshot,
654        time::{Duration, sleep, timeout},
655    };
656
657    use super::*;
658    use crate::{
659        common::consts::INFLIGHT_MAX,
660        websocket::messages::{
661            ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
662            OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
663        },
664    };
665
666    // --- helpers -------------------------------------------------------------------------------
667
668    fn mk_limit_alo(asset: u32) -> OrderRequest {
669        OrderRequest {
670            a: asset,
671            b: true,
672            p: "1".to_string(),
673            s: "1".to_string(),
674            r: false,
675            t: OrderTypeRequest::Limit {
676                tif: TimeInForceRequest::Alo,
677            },
678            c: None,
679        }
680    }
681
682    fn mk_limit_gtc(asset: u32) -> OrderRequest {
683        OrderRequest {
684            a: asset,
685            b: true,
686            p: "1".to_string(),
687            s: "1".to_string(),
688            r: false,
689            t: OrderTypeRequest::Limit {
690                // any non-ALO TIF keeps it in the Normal lane
691                tif: TimeInForceRequest::Gtc,
692            },
693            c: None,
694        }
695    }
696
697    // --- PostRouter ---------------------------------------------------------------------------
698
699    #[rstest]
700    #[tokio::test(flavor = "multi_thread")]
701    async fn register_duplicate_id_errors() {
702        let router = PostRouter::new();
703        let _rx = router.register(42).await.expect("first register OK");
704
705        let err = router.register(42).await.expect_err("duplicate must error");
706        let msg = err.to_string().to_lowercase();
707        assert!(
708            msg.contains("already") || msg.contains("duplicate"),
709            "unexpected error: {msg}"
710        );
711    }
712
713    #[rstest]
714    #[tokio::test(flavor = "multi_thread")]
715    async fn timeout_cancels_and_allows_reregister() {
716        let router = PostRouter::new();
717        let id = 7;
718
719        let rx = router.register(id).await.unwrap();
720        // No complete() → ensure we time out and the waiter is removed.
721        let err = router
722            .await_with_timeout(id, rx, Duration::from_millis(25))
723            .await
724            .expect_err("should timeout");
725        assert!(
726            err.to_string().to_lowercase().contains("timeout")
727                || err.to_string().to_lowercase().contains("closed"),
728            "unexpected error kind: {err}"
729        );
730
731        // After timeout, id should be reusable (cancel dropped the waiter & released the permit).
732        let _rx2 = router
733            .register(id)
734            .await
735            .expect("id should be reusable after timeout cancel");
736    }
737
738    #[rstest]
739    #[tokio::test(flavor = "multi_thread")]
740    async fn inflight_cap_blocks_then_unblocks() {
741        let router = PostRouter::new();
742
743        // Fill the inflight capacity.
744        let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
745        for i in 0..INFLIGHT_MAX {
746            let rx = router.register(i as u64).await.unwrap();
747            rxs.push(rx); // keep waiters alive
748        }
749
750        // Next register should block until a permit is freed.
751        let router2 = Arc::clone(&router);
752        let (entered_tx, entered_rx) = oneshot::channel::<()>();
753        let (done_tx, done_rx) = oneshot::channel::<()>();
754        let (check_tx, check_rx) = oneshot::channel::<()>(); // separate channel for checking
755
756        get_runtime().spawn(async move {
757            let _ = entered_tx.send(());
758            let _rx = router2.register(9_999_999).await.unwrap();
759            let _ = done_tx.send(());
760        });
761
762        // Confirm the task is trying to register…
763        entered_rx.await.unwrap();
764
765        // …and that it doesn't complete yet (still blocked on permit).
766        get_runtime().spawn(async move {
767            if done_rx.await.is_ok() {
768                let _ = check_tx.send(());
769            }
770        });
771
772        assert!(
773            timeout(Duration::from_millis(50), check_rx).await.is_err(),
774            "should still be blocked while at cap"
775        );
776
777        // Free one permit by cancelling a waiter.
778        router.cancel(0).await;
779
780        // Wait for the blocked register to complete.
781        tokio::time::sleep(Duration::from_millis(100)).await;
782    }
783
784    // --- Lane classifier -----------------------------------------------------------------------
785
786    #[rstest(
787        orders, expected,
788        case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
789        case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
790        case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
791        case::empty(vec![], PostLane::Normal),
792    )]
793    fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
794        let action = ActionRequest::Order {
795            orders,
796            grouping: "na".to_string(),
797        };
798        assert_eq!(lane_for_action(&action), expected);
799    }
800
801    // --- Builder Pattern Tests -----------------------------------------------------------------
802
803    #[rstest]
804    fn test_order_request_builder() {
805        // Test OrderRequestBuilder derived from #[derive(Builder)]
806        let order = OrderRequestBuilder::default()
807            .a(0)
808            .b(true)
809            .p("40000.0".to_string())
810            .s("0.01".to_string())
811            .r(false)
812            .t(OrderTypeRequest::Limit {
813                tif: TimeInForceRequest::Gtc,
814            })
815            .c(Some("test-order-1".to_string()))
816            .build()
817            .expect("should build order");
818
819        assert_eq!(order.a, 0);
820        assert!(order.b);
821        assert_eq!(order.p, "40000.0");
822        assert_eq!(order.s, "0.01");
823        assert!(!order.r);
824        assert_eq!(order.c, Some("test-order-1".to_string()));
825    }
826
827    #[rstest]
828    fn test_limit_order_params_builder() {
829        // Test LimitOrderParamsBuilder
830        let params = LimitOrderParamsBuilder::default()
831            .asset(0)
832            .is_buy(true)
833            .px("40000.0".to_string())
834            .sz("0.01".to_string())
835            .reduce_only(false)
836            .tif(TimeInForceRequest::Alo)
837            .cloid(Some("test-limit-1".to_string()))
838            .build()
839            .expect("should build limit params");
840
841        assert_eq!(params.asset, 0);
842        assert!(params.is_buy);
843        assert_eq!(params.px, "40000.0");
844        assert_eq!(params.sz, "0.01");
845        assert!(!params.reduce_only);
846        assert_eq!(params.cloid, Some("test-limit-1".to_string()));
847    }
848
849    #[rstest]
850    fn test_trigger_order_params_builder() {
851        // Test TriggerOrderParamsBuilder
852        let params = TriggerOrderParamsBuilder::default()
853            .asset(1)
854            .is_buy(false)
855            .px("39000.0".to_string())
856            .sz("0.02".to_string())
857            .reduce_only(false)
858            .is_market(true)
859            .trigger_px("39500.0".to_string())
860            .tpsl(TpSlRequest::Sl)
861            .cloid(Some("test-trigger-1".to_string()))
862            .build()
863            .expect("should build trigger params");
864
865        assert_eq!(params.asset, 1);
866        assert!(!params.is_buy);
867        assert_eq!(params.px, "39000.0");
868        assert!(params.is_market);
869        assert_eq!(params.trigger_px, "39500.0");
870    }
871
872    #[rstest]
873    fn test_order_builder_single_limit_convenience() {
874        // Test OrderBuilder::single_limit_order convenience method
875        let params = LimitOrderParamsBuilder::default()
876            .asset(0)
877            .is_buy(true)
878            .px("40000.0".to_string())
879            .sz("0.01".to_string())
880            .reduce_only(false)
881            .tif(TimeInForceRequest::Gtc)
882            .cloid(None)
883            .build()
884            .unwrap();
885
886        let action = OrderBuilder::single_limit_order(params);
887
888        match action {
889            ActionRequest::Order { orders, grouping } => {
890                assert_eq!(orders.len(), 1);
891                assert_eq!(orders[0].a, 0);
892                assert!(orders[0].b);
893                assert_eq!(grouping, "na");
894            }
895            _ => panic!("Expected ActionRequest::Order variant"),
896        }
897    }
898
899    #[rstest]
900    fn test_order_builder_single_trigger_convenience() {
901        // Test OrderBuilder::single_trigger_order convenience method
902        let params = TriggerOrderParamsBuilder::default()
903            .asset(1)
904            .is_buy(false)
905            .px("39000.0".to_string())
906            .sz("0.02".to_string())
907            .reduce_only(false)
908            .is_market(true)
909            .trigger_px("39500.0".to_string())
910            .tpsl(TpSlRequest::Sl)
911            .cloid(Some("sl-order".to_string()))
912            .build()
913            .unwrap();
914
915        let action = OrderBuilder::single_trigger_order(params);
916
917        match action {
918            ActionRequest::Order { orders, grouping } => {
919                assert_eq!(orders.len(), 1);
920                assert_eq!(orders[0].a, 1);
921                assert_eq!(orders[0].c, Some("sl-order".to_string()));
922                assert_eq!(grouping, "na");
923            }
924            _ => panic!("Expected ActionRequest::Order variant"),
925        }
926    }
927
928    #[rstest]
929    fn test_order_builder_batch_orders() {
930        // Test existing batch order functionality still works
931        let params1 = LimitOrderParams {
932            asset: 0,
933            is_buy: true,
934            px: "40000.0".to_string(),
935            sz: "0.01".to_string(),
936            reduce_only: false,
937            tif: TimeInForceRequest::Gtc,
938            cloid: Some("order-1".to_string()),
939        };
940
941        let params2 = LimitOrderParams {
942            asset: 1,
943            is_buy: false,
944            px: "2000.0".to_string(),
945            sz: "0.5".to_string(),
946            reduce_only: false,
947            tif: TimeInForceRequest::Ioc,
948            cloid: Some("order-2".to_string()),
949        };
950
951        let action = OrderBuilder::new()
952            .grouping(Grouping::NormalTpsl)
953            .push_limit_order(params1)
954            .push_limit_order(params2)
955            .build();
956
957        match action {
958            ActionRequest::Order { orders, grouping } => {
959                assert_eq!(orders.len(), 2);
960                assert_eq!(orders[0].c, Some("order-1".to_string()));
961                assert_eq!(orders[1].c, Some("order-2".to_string()));
962                assert_eq!(grouping, "normalTpsl");
963            }
964            _ => panic!("Expected ActionRequest::Order variant"),
965        }
966    }
967
968    #[rstest]
969    fn test_action_request_constructors() {
970        // Test ActionRequest::order() constructor
971        let order1 = mk_limit_gtc(0);
972        let order2 = mk_limit_gtc(1);
973        let action = ActionRequest::order(vec![order1, order2], "na");
974
975        match action {
976            ActionRequest::Order { orders, grouping } => {
977                assert_eq!(orders.len(), 2);
978                assert_eq!(grouping, "na");
979            }
980            _ => panic!("Expected ActionRequest::Order variant"),
981        }
982
983        // Test ActionRequest::cancel() constructor
984        let cancels = vec![CancelRequest { a: 0, o: 12345 }];
985        let action = ActionRequest::cancel(cancels);
986        assert!(matches!(action, ActionRequest::Cancel { .. }));
987
988        // Test ActionRequest::cancel_by_cloid() constructor
989        let cancels = vec![CancelByCloidRequest {
990            asset: 0,
991            cloid: "order-1".to_string(),
992        }];
993        let action = ActionRequest::cancel_by_cloid(cancels);
994        assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
995    }
996
997    // --- Batcher (tick flush path) --------------------------------------------------------------
998
999    #[rstest]
1000    #[tokio::test(flavor = "multi_thread")]
1001    async fn batcher_sends_on_tick() {
1002        // Capture sent ids to prove dispatch happened.
1003        let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1004        let sent_closure = sent.clone();
1005
1006        let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
1007            let sent_inner = sent_closure.clone();
1008            Box::pin(async move {
1009                if let HyperliquidWsRequest::Post { id, .. } = req {
1010                    sent_inner.lock().await.push(id);
1011                }
1012                Ok(())
1013            })
1014        };
1015
1016        let batcher = PostBatcher::new(send_fn);
1017
1018        // Enqueue a handful of posts into the NORMAL lane; tick is ~50ms.
1019        for id in 1..=5u64 {
1020            batcher
1021                .enqueue(ScheduledPost {
1022                    id,
1023                    request: PostRequest::Info {
1024                        payload: serde_json::json!({"type":"allMids"}),
1025                    },
1026                    lane: PostLane::Normal,
1027                })
1028                .await
1029                .unwrap();
1030        }
1031
1032        // Wait slightly past one tick to allow the lane to flush.
1033        sleep(Duration::from_millis(80)).await;
1034
1035        let got = sent.lock().await.clone();
1036        assert_eq!(got.len(), 5, "expected 5 sends on first tick");
1037        assert_eq!(got, vec![1, 2, 3, 4, 5]);
1038    }
1039}