nautilus_hyperliquid/websocket/
post.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    collections::HashMap,
18    sync::{
19        Arc,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::Duration,
23};
24
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use tokio::{
28    sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
29    time,
30};
31
32use crate::{
33    common::consts::INFLIGHT_MAX,
34    http::{
35        error::{Error, Result},
36        models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
37    },
38    websocket::messages::{
39        ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
40        OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
41    },
42};
43
44// -------------------------------------------------------------------------------------------------
45// Correlation router for "channel":"post" → correlate by id
46//  - Enforces inflight cap using OwnedSemaphorePermit stored per waiter
47// -------------------------------------------------------------------------------------------------
48
49#[derive(Debug)]
50struct Waiter {
51    tx: oneshot::Sender<PostResponse>,
52    // When this is dropped, the permit is released, shrinking inflight
53    _permit: OwnedSemaphorePermit,
54}
55
56#[derive(Debug)]
57pub struct PostRouter {
58    inner: Mutex<HashMap<u64, Waiter>>,
59    inflight: Arc<Semaphore>, // hard cap per HL docs (e.g., 100)
60}
61
62impl Default for PostRouter {
63    fn default() -> Self {
64        Self {
65            inner: Mutex::new(HashMap::new()),
66            inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
67        }
68    }
69}
70
71impl PostRouter {
72    pub fn new() -> Arc<Self> {
73        Arc::new(Self::default())
74    }
75
76    /// Registers interest in a post id, enforcing inflight cap.
77    pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
78        // Acquire and retain a permit per inflight call
79        let permit = self
80            .inflight
81            .clone()
82            .acquire_owned()
83            .await
84            .map_err(|_| Error::transport("post router semaphore closed"))?;
85
86        let (tx, rx) = oneshot::channel::<PostResponse>();
87        let mut map = self.inner.lock().await;
88        if map.contains_key(&id) {
89            return Err(Error::transport(format!("post id {id} already registered")));
90        }
91        map.insert(
92            id,
93            Waiter {
94                tx,
95                _permit: permit,
96            },
97        );
98        Ok(rx)
99    }
100
101    /// Completes a waiting caller when a response arrives (releases inflight via Waiter drop).
102    pub async fn complete(&self, resp: PostResponse) {
103        let id = resp.id;
104        let waiter = {
105            let mut map = self.inner.lock().await;
106            map.remove(&id)
107        };
108        if let Some(waiter) = waiter {
109            if waiter.tx.send(resp).is_err() {
110                tracing::warn!(id, "post waiter dropped before delivery");
111            }
112            // waiter drops here → permit released
113        } else {
114            tracing::warn!(id, "post response with unknown id (late/duplicate?)");
115        }
116    }
117
118    /// Cancel a pending id (e.g., timeout); quietly succeed if id wasn't present.
119    pub async fn cancel(&self, id: u64) {
120        let _ = {
121            let mut map = self.inner.lock().await;
122            map.remove(&id)
123        };
124        // Waiter (and its permit) drop here if it existed
125    }
126
127    /// Await a response with timeout. On timeout or closed channel, cancels the id.
128    pub async fn await_with_timeout(
129        &self,
130        id: u64,
131        rx: oneshot::Receiver<PostResponse>,
132        timeout: Duration,
133    ) -> Result<PostResponse> {
134        match time::timeout(timeout, rx).await {
135            Ok(Ok(resp)) => Ok(resp),
136            Ok(Err(_closed)) => {
137                self.cancel(id).await;
138                Err(Error::transport("post response channel closed"))
139            }
140            Err(_elapsed) => {
141                self.cancel(id).await;
142                Err(Error::Timeout)
143            }
144        }
145    }
146}
147
148// -------------------------------------------------------------------------------------------------
149// ID generation
150// -------------------------------------------------------------------------------------------------
151
152#[derive(Debug)]
153pub struct PostIds(AtomicU64);
154
155impl PostIds {
156    pub fn new(start: u64) -> Self {
157        Self(AtomicU64::new(start))
158    }
159    pub fn next(&self) -> u64 {
160        self.0.fetch_add(1, Ordering::Relaxed)
161    }
162}
163
164// -------------------------------------------------------------------------------------------------
165// Lanes & batcher (scaffold). You can expand policy later.
166// -------------------------------------------------------------------------------------------------
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169pub enum PostLane {
170    Alo,    // Post-only orders
171    Normal, // IOC/GTC + info + anything else
172}
173
174#[derive(Debug)]
175pub struct ScheduledPost {
176    pub id: u64,
177    pub request: PostRequest,
178    pub lane: PostLane,
179}
180
181#[derive(Debug)]
182pub struct PostBatcher {
183    tx_alo: mpsc::Sender<ScheduledPost>,
184    tx_normal: mpsc::Sender<ScheduledPost>,
185}
186
187impl PostBatcher {
188    /// Spawns two lane tasks that batch-send scheduled posts via `send_fn`.
189    pub fn new<F>(send_fn: F) -> Self
190    where
191        F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
192    {
193        let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
194        let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
195
196        // ALO lane: batchy tick, low jitter
197        tokio::spawn(Self::run_lane(
198            "ALO",
199            rx_alo,
200            Duration::from_millis(100),
201            send_fn.clone(),
202        ));
203
204        // NORMAL lane: faster tick; adjust as needed
205        tokio::spawn(Self::run_lane(
206            "NORMAL",
207            rx_normal,
208            Duration::from_millis(50),
209            send_fn,
210        ));
211
212        Self { tx_alo, tx_normal }
213    }
214
215    async fn run_lane<F>(
216        lane_name: &'static str,
217        mut rx: mpsc::Receiver<ScheduledPost>,
218        tick: Duration,
219        mut send_fn: F,
220    ) where
221        F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
222    {
223        let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
224        let mut interval = time::interval(tick);
225        interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
226
227        loop {
228            tokio::select! {
229                maybe_item = rx.recv() => {
230                    match maybe_item {
231                        Some(item) => pend.push(item),
232                        None => break, // sender dropped → terminate lane task
233                    }
234                }
235                _ = interval.tick() => {
236                    if pend.is_empty() { continue; }
237                    let to_send = std::mem::take(&mut pend);
238                    for item in to_send {
239                        let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
240                        if let Err(e) = send_fn(req).await {
241                            tracing::error!(lane=%lane_name, id=%item.id, "failed to send post: {e}");
242                        }
243                    }
244                }
245            }
246        }
247        tracing::info!(lane=%lane_name, "post lane terminated");
248    }
249
250    pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
251        match item.lane {
252            PostLane::Alo => self
253                .tx_alo
254                .send(item)
255                .await
256                .map_err(|_| Error::transport("ALO lane closed")),
257            PostLane::Normal => self
258                .tx_normal
259                .send(item)
260                .await
261                .map_err(|_| Error::transport("NORMAL lane closed")),
262        }
263    }
264}
265
266// Helpers to classify lane from an action
267pub fn lane_for_action(action: &ActionRequest) -> PostLane {
268    match action {
269        ActionRequest::Order { orders, .. } => {
270            if orders.is_empty() {
271                return PostLane::Normal;
272            }
273            let all_alo = orders.iter().all(|o| {
274                matches!(
275                    o.t,
276                    OrderTypeRequest::Limit {
277                        tif: TimeInForceRequest::Alo
278                    }
279                )
280            });
281            if all_alo {
282                PostLane::Alo
283            } else {
284                PostLane::Normal
285            }
286        }
287        _ => PostLane::Normal,
288    }
289}
290
291// -------------------------------------------------------------------------------------------------
292// Typed builders (produce ActionRequest), plus Info request helpers.
293// -------------------------------------------------------------------------------------------------
294
295#[derive(Debug, Clone, Copy, Default)]
296pub enum Grouping {
297    #[default]
298    Na,
299    NormalTpsl,
300    PositionTpsl,
301}
302impl Grouping {
303    pub fn as_str(&self) -> &'static str {
304        match self {
305            Self::Na => "na",
306            Self::NormalTpsl => "normalTpsl",
307            Self::PositionTpsl => "positionTpsl",
308        }
309    }
310}
311
312/// Parameters for creating a limit order.
313#[derive(Debug, Clone, Builder)]
314pub struct LimitOrderParams {
315    pub asset: u32,
316    pub is_buy: bool,
317    pub px: String,
318    pub sz: String,
319    pub reduce_only: bool,
320    pub tif: TimeInForceRequest,
321    pub cloid: Option<String>,
322}
323
324/// Parameters for creating a trigger order.
325#[derive(Debug, Clone, Builder)]
326pub struct TriggerOrderParams {
327    pub asset: u32,
328    pub is_buy: bool,
329    pub px: String,
330    pub sz: String,
331    pub reduce_only: bool,
332    pub is_market: bool,
333    pub trigger_px: String,
334    pub tpsl: TpSlRequest,
335    pub cloid: Option<String>,
336}
337
338// ORDER builder (single or many)
339#[derive(Debug, Default)]
340pub struct OrderBuilder {
341    orders: Vec<OrderRequest>,
342    grouping: Grouping,
343}
344
345impl OrderBuilder {
346    pub fn new() -> Self {
347        Self::default()
348    }
349
350    #[must_use]
351    pub fn grouping(mut self, g: Grouping) -> Self {
352        self.grouping = g;
353        self
354    }
355
356    /// Create a limit order with individual parameters (legacy method)
357    #[allow(clippy::too_many_arguments)]
358    #[must_use]
359    pub fn push_limit(
360        self,
361        asset: u32,
362        is_buy: bool,
363        px: impl ToString,
364        sz: impl ToString,
365        reduce_only: bool,
366        tif: TimeInForceRequest,
367        cloid: Option<String>,
368    ) -> Self {
369        let params = LimitOrderParams {
370            asset,
371            is_buy,
372            px: px.to_string(),
373            sz: sz.to_string(),
374            reduce_only,
375            tif,
376            cloid,
377        };
378        self.push_limit_order(params)
379    }
380
381    /// Create a limit order using parameters struct
382    #[must_use]
383    pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
384        self.orders.push(OrderRequest {
385            a: params.asset,
386            b: params.is_buy,
387            p: params.px,
388            s: params.sz,
389            r: params.reduce_only,
390            t: OrderTypeRequest::Limit { tif: params.tif },
391            c: params.cloid,
392        });
393        self
394    }
395
396    /// Create a trigger order with individual parameters (legacy method)
397    #[allow(clippy::too_many_arguments)]
398    #[must_use]
399    pub fn push_trigger(
400        self,
401        asset: u32,
402        is_buy: bool,
403        px: impl ToString,
404        sz: impl ToString,
405        reduce_only: bool,
406        is_market: bool,
407        trigger_px: impl ToString,
408        tpsl: TpSlRequest,
409        cloid: Option<String>,
410    ) -> Self {
411        let params = TriggerOrderParams {
412            asset,
413            is_buy,
414            px: px.to_string(),
415            sz: sz.to_string(),
416            reduce_only,
417            is_market,
418            trigger_px: trigger_px.to_string(),
419            tpsl,
420            cloid,
421        };
422        self.push_trigger_order(params)
423    }
424
425    /// Create a trigger order using parameters struct
426    #[must_use]
427    pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
428        self.orders.push(OrderRequest {
429            a: params.asset,
430            b: params.is_buy,
431            p: params.px,
432            s: params.sz,
433            r: params.reduce_only,
434            t: OrderTypeRequest::Trigger {
435                is_market: params.is_market,
436                trigger_px: params.trigger_px,
437                tpsl: params.tpsl,
438            },
439            c: params.cloid,
440        });
441        self
442    }
443    pub fn build(self) -> ActionRequest {
444        ActionRequest::Order {
445            orders: self.orders,
446            grouping: self.grouping.as_str().to_string(),
447        }
448    }
449
450    /// Create a single limit order action directly (convenience method)
451    ///
452    /// # Example
453    /// ```ignore
454    /// let action = OrderBuilder::single_limit_order(
455    ///     LimitOrderParamsBuilder::default()
456    ///         .asset(0)
457    ///         .is_buy(true)
458    ///         .px("40000.0")
459    ///         .sz("0.01")
460    ///         .reduce_only(false)
461    ///         .tif(TimeInForceRequest::Gtc)
462    ///         .build()
463    ///         .unwrap()
464    /// );
465    /// ```
466    pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
467        Self::new().push_limit_order(params).build()
468    }
469
470    /// Create a single trigger order action directly (convenience method)
471    ///
472    /// # Example
473    /// ```ignore
474    /// let action = OrderBuilder::single_trigger_order(
475    ///     TriggerOrderParamsBuilder::default()
476    ///         .asset(0)
477    ///         .is_buy(false)
478    ///         .px("39000.0")
479    ///         .sz("0.01")
480    ///         .reduce_only(false)
481    ///         .is_market(true)
482    ///         .trigger_px("39500.0")
483    ///         .tpsl(TpSlRequest::Sl)
484    ///         .build()
485    ///         .unwrap()
486    /// );
487    /// ```
488    pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
489        Self::new().push_trigger_order(params).build()
490    }
491}
492
493pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
494    ActionRequest::Cancel {
495        cancels: cancels
496            .into_iter()
497            .map(|(a, o)| CancelRequest { a, o })
498            .collect(),
499    }
500}
501pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
502    ActionRequest::CancelByCloid {
503        cancels: vec![CancelByCloidRequest {
504            asset,
505            cloid: cloid.into(),
506        }],
507    }
508}
509pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
510    ActionRequest::Modify {
511        modifies: vec![ModifyRequest {
512            oid,
513            order: new_order,
514        }],
515    }
516}
517
518// Info wrappers (bodies go under PostRequest::Info{ payload })
519pub fn info_l2_book(coin: &str) -> PostRequest {
520    PostRequest::Info {
521        payload: serde_json::json!({"type":"l2Book","coin":coin}),
522    }
523}
524pub fn info_all_mids() -> PostRequest {
525    PostRequest::Info {
526        payload: serde_json::json!({"type":"allMids"}),
527    }
528}
529pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
530    PostRequest::Info {
531        payload: serde_json::json!({"type":"orderStatus","user":user,"oid":oid}),
532    }
533}
534pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
535    let mut body = serde_json::json!({"type":"openOrders","user":user});
536    if let Some(fe) = frontend {
537        body["frontend"] = serde_json::json!(fe);
538    }
539    PostRequest::Info { payload: body }
540}
541pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
542    let mut body = serde_json::json!({"type":"userFills","user":user});
543    if let Some(agg) = aggregate_by_time {
544        body["aggregateByTime"] = serde_json::json!(agg);
545    }
546    PostRequest::Info { payload: body }
547}
548pub fn info_user_rate_limit(user: &str) -> PostRequest {
549    PostRequest::Info {
550        payload: serde_json::json!({"type":"userRateLimit","user":user}),
551    }
552}
553pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
554    PostRequest::Info {
555        payload: serde_json::json!({"type":"candle","coin":coin,"interval":interval}),
556    }
557}
558
559// -------------------------------------------------------------------------------------------------
560// Minimal response helpers
561// -------------------------------------------------------------------------------------------------
562
563pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
564    serde_json::from_value(payload.clone()).map_err(Error::Serde)
565}
566pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
567    serde_json::from_value(payload.clone()).map_err(Error::Serde)
568}
569pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
570    serde_json::from_value(payload.clone()).map_err(Error::Serde)
571}
572
573/// Heuristic classification for action responses.
574#[derive(Debug)]
575pub enum ActionOutcome<'a> {
576    Resting {
577        oid: u64,
578    },
579    Filled {
580        total_sz: &'a str,
581        avg_px: &'a str,
582        oid: Option<u64>,
583    },
584    Error {
585        msg: &'a str,
586    },
587    Unknown(&'a serde_json::Value),
588}
589pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
590    if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
591        if let (Some(total_sz), Some(avg_px)) = (
592            payload.get("totalSz").and_then(|v| v.as_str()),
593            payload.get("avgPx").and_then(|v| v.as_str()),
594        ) {
595            return ActionOutcome::Filled {
596                total_sz,
597                avg_px,
598                oid: Some(oid),
599            };
600        }
601        return ActionOutcome::Resting { oid };
602    }
603    if let (Some(total_sz), Some(avg_px)) = (
604        payload.get("totalSz").and_then(|v| v.as_str()),
605        payload.get("avgPx").and_then(|v| v.as_str()),
606    ) {
607        return ActionOutcome::Filled {
608            total_sz,
609            avg_px,
610            oid: None,
611        };
612    }
613    if let Some(msg) = payload
614        .get("error")
615        .and_then(|v| v.as_str())
616        .or_else(|| payload.get("message").and_then(|v| v.as_str()))
617    {
618        return ActionOutcome::Error { msg };
619    }
620    ActionOutcome::Unknown(payload)
621}
622
623// -------------------------------------------------------------------------------------------------
624// Glue helpers used by the client (wired in client.rs)
625// -------------------------------------------------------------------------------------------------
626
627#[derive(Clone, Debug)]
628pub struct WsSender {
629    inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
630}
631
632impl WsSender {
633    pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
634        Self {
635            inner: Arc::new(tokio::sync::Mutex::new(tx)),
636        }
637    }
638
639    pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
640        let sender = self.inner.lock().await;
641        sender
642            .send(req)
643            .await
644            .map_err(|_| Error::transport("WebSocket sender closed"))
645    }
646}
647
648#[cfg(test)]
649mod tests {
650    use rstest::rstest;
651    use tokio::{
652        sync::oneshot,
653        time::{Duration, sleep, timeout},
654    };
655
656    use super::*;
657    use crate::{
658        common::consts::INFLIGHT_MAX,
659        websocket::messages::{
660            ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
661            OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
662        },
663    };
664
665    // --- helpers -------------------------------------------------------------------------------
666
667    fn mk_limit_alo(asset: u32) -> OrderRequest {
668        OrderRequest {
669            a: asset,
670            b: true,
671            p: "1".to_string(),
672            s: "1".to_string(),
673            r: false,
674            t: OrderTypeRequest::Limit {
675                tif: TimeInForceRequest::Alo,
676            },
677            c: None,
678        }
679    }
680
681    fn mk_limit_gtc(asset: u32) -> OrderRequest {
682        OrderRequest {
683            a: asset,
684            b: true,
685            p: "1".to_string(),
686            s: "1".to_string(),
687            r: false,
688            t: OrderTypeRequest::Limit {
689                // any non-ALO TIF keeps it in the Normal lane
690                tif: TimeInForceRequest::Gtc,
691            },
692            c: None,
693        }
694    }
695
696    // --- PostRouter ---------------------------------------------------------------------------
697
698    #[rstest]
699    #[tokio::test(flavor = "multi_thread")]
700    async fn register_duplicate_id_errors() {
701        let router = PostRouter::new();
702        let _rx = router.register(42).await.expect("first register OK");
703
704        let err = router.register(42).await.expect_err("duplicate must error");
705        let msg = err.to_string().to_lowercase();
706        assert!(
707            msg.contains("already") || msg.contains("duplicate"),
708            "unexpected error: {msg}"
709        );
710    }
711
712    #[rstest]
713    #[tokio::test(flavor = "multi_thread")]
714    async fn timeout_cancels_and_allows_reregister() {
715        let router = PostRouter::new();
716        let id = 7;
717
718        let rx = router.register(id).await.unwrap();
719        // No complete() → ensure we time out and the waiter is removed.
720        let err = router
721            .await_with_timeout(id, rx, Duration::from_millis(25))
722            .await
723            .expect_err("should timeout");
724        assert!(
725            err.to_string().to_lowercase().contains("timeout")
726                || err.to_string().to_lowercase().contains("closed"),
727            "unexpected error kind: {err}"
728        );
729
730        // After timeout, id should be reusable (cancel dropped the waiter & released the permit).
731        let _rx2 = router
732            .register(id)
733            .await
734            .expect("id should be reusable after timeout cancel");
735    }
736
737    #[rstest]
738    #[tokio::test(flavor = "multi_thread")]
739    async fn inflight_cap_blocks_then_unblocks() {
740        let router = PostRouter::new();
741
742        // Fill the inflight capacity.
743        let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
744        for i in 0..INFLIGHT_MAX {
745            let rx = router.register(i as u64).await.unwrap();
746            rxs.push(rx); // keep waiters alive
747        }
748
749        // Next register should block until a permit is freed.
750        let router2 = Arc::clone(&router);
751        let (entered_tx, entered_rx) = oneshot::channel::<()>();
752        let (done_tx, done_rx) = oneshot::channel::<()>();
753        let (check_tx, check_rx) = oneshot::channel::<()>(); // separate channel for checking
754
755        tokio::spawn(async move {
756            let _ = entered_tx.send(());
757            let _rx = router2.register(9_999_999).await.unwrap();
758            let _ = done_tx.send(());
759        });
760
761        // Confirm the task is trying to register…
762        entered_rx.await.unwrap();
763
764        // …and that it doesn't complete yet (still blocked on permit).
765        tokio::spawn(async move {
766            if done_rx.await.is_ok() {
767                let _ = check_tx.send(());
768            }
769        });
770
771        assert!(
772            timeout(Duration::from_millis(50), check_rx).await.is_err(),
773            "should still be blocked while at cap"
774        );
775
776        // Free one permit by cancelling a waiter.
777        router.cancel(0).await;
778
779        // Wait for the blocked register to complete.
780        tokio::time::sleep(Duration::from_millis(100)).await;
781    }
782
783    // --- Lane classifier -----------------------------------------------------------------------
784
785    #[rstest(
786        orders, expected,
787        case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
788        case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
789        case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
790        case::empty(vec![], PostLane::Normal),
791    )]
792    fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
793        let action = ActionRequest::Order {
794            orders,
795            grouping: "na".to_string(),
796        };
797        assert_eq!(lane_for_action(&action), expected);
798    }
799
800    // --- Builder Pattern Tests -----------------------------------------------------------------
801
802    #[rstest]
803    fn test_order_request_builder() {
804        // Test OrderRequestBuilder derived from #[derive(Builder)]
805        let order = OrderRequestBuilder::default()
806            .a(0)
807            .b(true)
808            .p("40000.0".to_string())
809            .s("0.01".to_string())
810            .r(false)
811            .t(OrderTypeRequest::Limit {
812                tif: TimeInForceRequest::Gtc,
813            })
814            .c(Some("test-order-1".to_string()))
815            .build()
816            .expect("should build order");
817
818        assert_eq!(order.a, 0);
819        assert!(order.b);
820        assert_eq!(order.p, "40000.0");
821        assert_eq!(order.s, "0.01");
822        assert!(!order.r);
823        assert_eq!(order.c, Some("test-order-1".to_string()));
824    }
825
826    #[rstest]
827    fn test_limit_order_params_builder() {
828        // Test LimitOrderParamsBuilder
829        let params = LimitOrderParamsBuilder::default()
830            .asset(0)
831            .is_buy(true)
832            .px("40000.0".to_string())
833            .sz("0.01".to_string())
834            .reduce_only(false)
835            .tif(TimeInForceRequest::Alo)
836            .cloid(Some("test-limit-1".to_string()))
837            .build()
838            .expect("should build limit params");
839
840        assert_eq!(params.asset, 0);
841        assert!(params.is_buy);
842        assert_eq!(params.px, "40000.0");
843        assert_eq!(params.sz, "0.01");
844        assert!(!params.reduce_only);
845        assert_eq!(params.cloid, Some("test-limit-1".to_string()));
846    }
847
848    #[rstest]
849    fn test_trigger_order_params_builder() {
850        // Test TriggerOrderParamsBuilder
851        let params = TriggerOrderParamsBuilder::default()
852            .asset(1)
853            .is_buy(false)
854            .px("39000.0".to_string())
855            .sz("0.02".to_string())
856            .reduce_only(false)
857            .is_market(true)
858            .trigger_px("39500.0".to_string())
859            .tpsl(TpSlRequest::Sl)
860            .cloid(Some("test-trigger-1".to_string()))
861            .build()
862            .expect("should build trigger params");
863
864        assert_eq!(params.asset, 1);
865        assert!(!params.is_buy);
866        assert_eq!(params.px, "39000.0");
867        assert!(params.is_market);
868        assert_eq!(params.trigger_px, "39500.0");
869    }
870
871    #[rstest]
872    fn test_order_builder_single_limit_convenience() {
873        // Test OrderBuilder::single_limit_order convenience method
874        let params = LimitOrderParamsBuilder::default()
875            .asset(0)
876            .is_buy(true)
877            .px("40000.0".to_string())
878            .sz("0.01".to_string())
879            .reduce_only(false)
880            .tif(TimeInForceRequest::Gtc)
881            .cloid(None)
882            .build()
883            .unwrap();
884
885        let action = OrderBuilder::single_limit_order(params);
886
887        match action {
888            ActionRequest::Order { orders, grouping } => {
889                assert_eq!(orders.len(), 1);
890                assert_eq!(orders[0].a, 0);
891                assert!(orders[0].b);
892                assert_eq!(grouping, "na");
893            }
894            _ => panic!("Expected ActionRequest::Order variant"),
895        }
896    }
897
898    #[rstest]
899    fn test_order_builder_single_trigger_convenience() {
900        // Test OrderBuilder::single_trigger_order convenience method
901        let params = TriggerOrderParamsBuilder::default()
902            .asset(1)
903            .is_buy(false)
904            .px("39000.0".to_string())
905            .sz("0.02".to_string())
906            .reduce_only(false)
907            .is_market(true)
908            .trigger_px("39500.0".to_string())
909            .tpsl(TpSlRequest::Sl)
910            .cloid(Some("sl-order".to_string()))
911            .build()
912            .unwrap();
913
914        let action = OrderBuilder::single_trigger_order(params);
915
916        match action {
917            ActionRequest::Order { orders, grouping } => {
918                assert_eq!(orders.len(), 1);
919                assert_eq!(orders[0].a, 1);
920                assert_eq!(orders[0].c, Some("sl-order".to_string()));
921                assert_eq!(grouping, "na");
922            }
923            _ => panic!("Expected ActionRequest::Order variant"),
924        }
925    }
926
927    #[rstest]
928    fn test_order_builder_batch_orders() {
929        // Test existing batch order functionality still works
930        let params1 = LimitOrderParams {
931            asset: 0,
932            is_buy: true,
933            px: "40000.0".to_string(),
934            sz: "0.01".to_string(),
935            reduce_only: false,
936            tif: TimeInForceRequest::Gtc,
937            cloid: Some("order-1".to_string()),
938        };
939
940        let params2 = LimitOrderParams {
941            asset: 1,
942            is_buy: false,
943            px: "2000.0".to_string(),
944            sz: "0.5".to_string(),
945            reduce_only: false,
946            tif: TimeInForceRequest::Ioc,
947            cloid: Some("order-2".to_string()),
948        };
949
950        let action = OrderBuilder::new()
951            .grouping(Grouping::NormalTpsl)
952            .push_limit_order(params1)
953            .push_limit_order(params2)
954            .build();
955
956        match action {
957            ActionRequest::Order { orders, grouping } => {
958                assert_eq!(orders.len(), 2);
959                assert_eq!(orders[0].c, Some("order-1".to_string()));
960                assert_eq!(orders[1].c, Some("order-2".to_string()));
961                assert_eq!(grouping, "normalTpsl");
962            }
963            _ => panic!("Expected ActionRequest::Order variant"),
964        }
965    }
966
967    #[rstest]
968    fn test_action_request_constructors() {
969        // Test ActionRequest::order() constructor
970        let order1 = mk_limit_gtc(0);
971        let order2 = mk_limit_gtc(1);
972        let action = ActionRequest::order(vec![order1, order2], "na");
973
974        match action {
975            ActionRequest::Order { orders, grouping } => {
976                assert_eq!(orders.len(), 2);
977                assert_eq!(grouping, "na");
978            }
979            _ => panic!("Expected ActionRequest::Order variant"),
980        }
981
982        // Test ActionRequest::cancel() constructor
983        let cancels = vec![CancelRequest { a: 0, o: 12345 }];
984        let action = ActionRequest::cancel(cancels);
985        assert!(matches!(action, ActionRequest::Cancel { .. }));
986
987        // Test ActionRequest::cancel_by_cloid() constructor
988        let cancels = vec![CancelByCloidRequest {
989            asset: 0,
990            cloid: "order-1".to_string(),
991        }];
992        let action = ActionRequest::cancel_by_cloid(cancels);
993        assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
994    }
995
996    // --- Batcher (tick flush path) --------------------------------------------------------------
997
998    #[rstest]
999    #[tokio::test(flavor = "multi_thread")]
1000    async fn batcher_sends_on_tick() {
1001        // Capture sent ids to prove dispatch happened.
1002        let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1003        let sent_closure = sent.clone();
1004
1005        let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
1006            let sent_inner = sent_closure.clone();
1007            Box::pin(async move {
1008                if let HyperliquidWsRequest::Post { id, .. } = req {
1009                    sent_inner.lock().await.push(id);
1010                }
1011                Ok(())
1012            })
1013        };
1014
1015        let batcher = PostBatcher::new(send_fn);
1016
1017        // Enqueue a handful of posts into the NORMAL lane; tick is ~50ms.
1018        for id in 1..=5u64 {
1019            batcher
1020                .enqueue(ScheduledPost {
1021                    id,
1022                    request: PostRequest::Info {
1023                        payload: serde_json::json!({"type":"allMids"}),
1024                    },
1025                    lane: PostLane::Normal,
1026                })
1027                .await
1028                .unwrap();
1029        }
1030
1031        // Wait slightly past one tick to allow the lane to flush.
1032        sleep(Duration::from_millis(80)).await;
1033
1034        let got = sent.lock().await.clone();
1035        assert_eq!(got.len(), 5, "expected 5 sends on first tick");
1036        assert_eq!(got, vec![1, 2, 3, 4, 5]);
1037    }
1038}