1use std::{
17 collections::HashMap,
18 sync::{
19 Arc,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::Duration,
23};
24
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use nautilus_common::live::get_runtime;
28use tokio::{
29 sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
30 time,
31};
32
33use crate::{
34 common::consts::INFLIGHT_MAX,
35 http::{
36 error::{Error, Result},
37 models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
38 },
39 websocket::messages::{
40 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
41 OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
42 },
43};
44
45#[derive(Debug)]
46struct Waiter {
47 tx: oneshot::Sender<PostResponse>,
48 _permit: OwnedSemaphorePermit,
50}
51
52#[derive(Debug)]
53pub struct PostRouter {
54 inner: Mutex<HashMap<u64, Waiter>>,
55 inflight: Arc<Semaphore>, }
57
58impl Default for PostRouter {
59 fn default() -> Self {
60 Self {
61 inner: Mutex::new(HashMap::new()),
62 inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
63 }
64 }
65}
66
67impl PostRouter {
68 pub fn new() -> Arc<Self> {
69 Arc::new(Self::default())
70 }
71
72 pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
74 let permit = self
76 .inflight
77 .clone()
78 .acquire_owned()
79 .await
80 .map_err(|_| Error::transport("post router semaphore closed"))?;
81
82 let (tx, rx) = oneshot::channel::<PostResponse>();
83 let mut map = self.inner.lock().await;
84 if map.contains_key(&id) {
85 return Err(Error::transport(format!("post id {id} already registered")));
86 }
87 map.insert(
88 id,
89 Waiter {
90 tx,
91 _permit: permit,
92 },
93 );
94 Ok(rx)
95 }
96
97 pub async fn complete(&self, resp: PostResponse) {
99 let id = resp.id;
100 let waiter = {
101 let mut map = self.inner.lock().await;
102 map.remove(&id)
103 };
104 if let Some(waiter) = waiter {
105 if waiter.tx.send(resp).is_err() {
106 log::warn!("Post waiter dropped before delivery: id={id}");
107 }
108 } else {
110 log::warn!("Post response with unknown id (late/duplicate?): id={id}");
111 }
112 }
113
114 pub async fn cancel(&self, id: u64) {
116 let _ = {
117 let mut map = self.inner.lock().await;
118 map.remove(&id)
119 };
120 }
122
123 pub async fn await_with_timeout(
125 &self,
126 id: u64,
127 rx: oneshot::Receiver<PostResponse>,
128 timeout: Duration,
129 ) -> Result<PostResponse> {
130 match time::timeout(timeout, rx).await {
131 Ok(Ok(resp)) => Ok(resp),
132 Ok(Err(_closed)) => {
133 self.cancel(id).await;
134 Err(Error::transport("post response channel closed"))
135 }
136 Err(_elapsed) => {
137 self.cancel(id).await;
138 Err(Error::Timeout)
139 }
140 }
141 }
142}
143
144#[derive(Debug)]
145pub struct PostIds(AtomicU64);
146
147impl PostIds {
148 pub fn new(start: u64) -> Self {
149 Self(AtomicU64::new(start))
150 }
151 pub fn next(&self) -> u64 {
152 self.0.fetch_add(1, Ordering::Relaxed)
153 }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum PostLane {
158 Alo, Normal, }
161
162#[derive(Debug)]
163pub struct ScheduledPost {
164 pub id: u64,
165 pub request: PostRequest,
166 pub lane: PostLane,
167}
168
169#[derive(Debug)]
170pub struct PostBatcher {
171 tx_alo: mpsc::Sender<ScheduledPost>,
172 tx_normal: mpsc::Sender<ScheduledPost>,
173}
174
175impl PostBatcher {
176 pub fn new<F>(send_fn: F) -> Self
178 where
179 F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
180 {
181 let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
182 let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
183
184 get_runtime().spawn(Self::run_lane(
186 "ALO",
187 rx_alo,
188 Duration::from_millis(100),
189 send_fn.clone(),
190 ));
191
192 get_runtime().spawn(Self::run_lane(
194 "NORMAL",
195 rx_normal,
196 Duration::from_millis(50),
197 send_fn,
198 ));
199
200 Self { tx_alo, tx_normal }
201 }
202
203 async fn run_lane<F>(
204 lane_name: &'static str,
205 mut rx: mpsc::Receiver<ScheduledPost>,
206 tick: Duration,
207 mut send_fn: F,
208 ) where
209 F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
210 {
211 let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
212 let mut interval = time::interval(tick);
213 interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
214
215 loop {
216 tokio::select! {
217 maybe_item = rx.recv() => {
218 match maybe_item {
219 Some(item) => pend.push(item),
220 None => break, }
222 }
223 _ = interval.tick() => {
224 if pend.is_empty() { continue; }
225 let to_send = std::mem::take(&mut pend);
226 for item in to_send {
227 let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
228 if let Err(e) = send_fn(req).await {
229 log::error!("Failed to send post: lane={lane_name}, id={}, {e}", item.id);
230 }
231 }
232 }
233 }
234 }
235 log::info!("Post lane terminated: lane={lane_name}");
236 }
237
238 pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
239 match item.lane {
240 PostLane::Alo => self
241 .tx_alo
242 .send(item)
243 .await
244 .map_err(|_| Error::transport("ALO lane closed")),
245 PostLane::Normal => self
246 .tx_normal
247 .send(item)
248 .await
249 .map_err(|_| Error::transport("NORMAL lane closed")),
250 }
251 }
252}
253
254pub fn lane_for_action(action: &ActionRequest) -> PostLane {
256 match action {
257 ActionRequest::Order { orders, .. } => {
258 if orders.is_empty() {
259 return PostLane::Normal;
260 }
261 let all_alo = orders.iter().all(|o| {
262 matches!(
263 o.t,
264 OrderTypeRequest::Limit {
265 tif: TimeInForceRequest::Alo
266 }
267 )
268 });
269 if all_alo {
270 PostLane::Alo
271 } else {
272 PostLane::Normal
273 }
274 }
275 _ => PostLane::Normal,
276 }
277}
278
279#[derive(Debug, Clone, Copy, Default)]
280pub enum Grouping {
281 #[default]
282 Na,
283 NormalTpsl,
284 PositionTpsl,
285}
286impl Grouping {
287 pub fn as_str(&self) -> &'static str {
288 match self {
289 Self::Na => "na",
290 Self::NormalTpsl => "normalTpsl",
291 Self::PositionTpsl => "positionTpsl",
292 }
293 }
294}
295
296#[derive(Debug, Clone, Builder)]
298pub struct LimitOrderParams {
299 pub asset: u32,
300 pub is_buy: bool,
301 pub px: String,
302 pub sz: String,
303 pub reduce_only: bool,
304 pub tif: TimeInForceRequest,
305 pub cloid: Option<String>,
306}
307
308#[derive(Debug, Clone, Builder)]
310pub struct TriggerOrderParams {
311 pub asset: u32,
312 pub is_buy: bool,
313 pub px: String,
314 pub sz: String,
315 pub reduce_only: bool,
316 pub is_market: bool,
317 pub trigger_px: String,
318 pub tpsl: TpSlRequest,
319 pub cloid: Option<String>,
320}
321
322#[derive(Debug, Default)]
324pub struct OrderBuilder {
325 orders: Vec<OrderRequest>,
326 grouping: Grouping,
327}
328
329impl OrderBuilder {
330 pub fn new() -> Self {
331 Self::default()
332 }
333
334 #[must_use]
335 pub fn grouping(mut self, g: Grouping) -> Self {
336 self.grouping = g;
337 self
338 }
339
340 #[allow(clippy::too_many_arguments)]
342 #[must_use]
343 pub fn push_limit(
344 self,
345 asset: u32,
346 is_buy: bool,
347 px: impl ToString,
348 sz: impl ToString,
349 reduce_only: bool,
350 tif: TimeInForceRequest,
351 cloid: Option<String>,
352 ) -> Self {
353 let params = LimitOrderParams {
354 asset,
355 is_buy,
356 px: px.to_string(),
357 sz: sz.to_string(),
358 reduce_only,
359 tif,
360 cloid,
361 };
362 self.push_limit_order(params)
363 }
364
365 #[must_use]
367 pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
368 self.orders.push(OrderRequest {
369 a: params.asset,
370 b: params.is_buy,
371 p: params.px,
372 s: params.sz,
373 r: params.reduce_only,
374 t: OrderTypeRequest::Limit { tif: params.tif },
375 c: params.cloid,
376 });
377 self
378 }
379
380 #[allow(clippy::too_many_arguments)]
382 #[must_use]
383 pub fn push_trigger(
384 self,
385 asset: u32,
386 is_buy: bool,
387 px: impl ToString,
388 sz: impl ToString,
389 reduce_only: bool,
390 is_market: bool,
391 trigger_px: impl ToString,
392 tpsl: TpSlRequest,
393 cloid: Option<String>,
394 ) -> Self {
395 let params = TriggerOrderParams {
396 asset,
397 is_buy,
398 px: px.to_string(),
399 sz: sz.to_string(),
400 reduce_only,
401 is_market,
402 trigger_px: trigger_px.to_string(),
403 tpsl,
404 cloid,
405 };
406 self.push_trigger_order(params)
407 }
408
409 #[must_use]
411 pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
412 self.orders.push(OrderRequest {
413 a: params.asset,
414 b: params.is_buy,
415 p: params.px,
416 s: params.sz,
417 r: params.reduce_only,
418 t: OrderTypeRequest::Trigger {
419 is_market: params.is_market,
420 trigger_px: params.trigger_px,
421 tpsl: params.tpsl,
422 },
423 c: params.cloid,
424 });
425 self
426 }
427 pub fn build(self) -> ActionRequest {
428 ActionRequest::Order {
429 orders: self.orders,
430 grouping: self.grouping.as_str().to_string(),
431 }
432 }
433
434 pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
451 Self::new().push_limit_order(params).build()
452 }
453
454 pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
473 Self::new().push_trigger_order(params).build()
474 }
475}
476
477pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
478 ActionRequest::Cancel {
479 cancels: cancels
480 .into_iter()
481 .map(|(a, o)| CancelRequest { a, o })
482 .collect(),
483 }
484}
485pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
486 ActionRequest::CancelByCloid {
487 cancels: vec![CancelByCloidRequest {
488 asset,
489 cloid: cloid.into(),
490 }],
491 }
492}
493pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
494 ActionRequest::Modify {
495 modifies: vec![ModifyRequest {
496 oid,
497 order: new_order,
498 }],
499 }
500}
501
502pub fn info_l2_book(coin: &str) -> PostRequest {
504 PostRequest::Info {
505 payload: serde_json::json!({"type":"l2Book","coin":coin}),
506 }
507}
508pub fn info_all_mids() -> PostRequest {
509 PostRequest::Info {
510 payload: serde_json::json!({"type":"allMids"}),
511 }
512}
513pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
514 PostRequest::Info {
515 payload: serde_json::json!({"type":"orderStatus","user":user,"oid":oid}),
516 }
517}
518pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
519 let mut body = serde_json::json!({"type":"openOrders","user":user});
520 if let Some(fe) = frontend {
521 body["frontend"] = serde_json::json!(fe);
522 }
523 PostRequest::Info { payload: body }
524}
525pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
526 let mut body = serde_json::json!({"type":"userFills","user":user});
527 if let Some(agg) = aggregate_by_time {
528 body["aggregateByTime"] = serde_json::json!(agg);
529 }
530 PostRequest::Info { payload: body }
531}
532pub fn info_user_rate_limit(user: &str) -> PostRequest {
533 PostRequest::Info {
534 payload: serde_json::json!({"type":"userRateLimit","user":user}),
535 }
536}
537pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
538 PostRequest::Info {
539 payload: serde_json::json!({"type":"candle","coin":coin,"interval":interval}),
540 }
541}
542
543pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
544 serde_json::from_value(payload.clone()).map_err(Error::Serde)
545}
546pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
547 serde_json::from_value(payload.clone()).map_err(Error::Serde)
548}
549pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
550 serde_json::from_value(payload.clone()).map_err(Error::Serde)
551}
552
553#[derive(Debug)]
555pub enum ActionOutcome<'a> {
556 Resting {
557 oid: u64,
558 },
559 Filled {
560 total_sz: &'a str,
561 avg_px: &'a str,
562 oid: Option<u64>,
563 },
564 Error {
565 msg: &'a str,
566 },
567 Unknown(&'a serde_json::Value),
568}
569pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
570 if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
571 if let (Some(total_sz), Some(avg_px)) = (
572 payload.get("totalSz").and_then(|v| v.as_str()),
573 payload.get("avgPx").and_then(|v| v.as_str()),
574 ) {
575 return ActionOutcome::Filled {
576 total_sz,
577 avg_px,
578 oid: Some(oid),
579 };
580 }
581 return ActionOutcome::Resting { oid };
582 }
583 if let (Some(total_sz), Some(avg_px)) = (
584 payload.get("totalSz").and_then(|v| v.as_str()),
585 payload.get("avgPx").and_then(|v| v.as_str()),
586 ) {
587 return ActionOutcome::Filled {
588 total_sz,
589 avg_px,
590 oid: None,
591 };
592 }
593 if let Some(msg) = payload
594 .get("error")
595 .and_then(|v| v.as_str())
596 .or_else(|| payload.get("message").and_then(|v| v.as_str()))
597 {
598 return ActionOutcome::Error { msg };
599 }
600 ActionOutcome::Unknown(payload)
601}
602
603#[derive(Clone, Debug)]
604pub struct WsSender {
605 inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
606}
607
608impl WsSender {
609 pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
610 Self {
611 inner: Arc::new(tokio::sync::Mutex::new(tx)),
612 }
613 }
614
615 pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
616 let sender = self.inner.lock().await;
617 sender
618 .send(req)
619 .await
620 .map_err(|_| Error::transport("WebSocket sender closed"))
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use nautilus_common::testing::wait_until_async;
627 use rstest::rstest;
628 use tokio::{
629 sync::oneshot,
630 time::{Duration, timeout},
631 };
632
633 use super::*;
634 use crate::{
635 common::consts::INFLIGHT_MAX,
636 websocket::messages::{
637 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
638 OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
639 },
640 };
641
642 fn mk_limit_alo(asset: u32) -> OrderRequest {
645 OrderRequest {
646 a: asset,
647 b: true,
648 p: "1".to_string(),
649 s: "1".to_string(),
650 r: false,
651 t: OrderTypeRequest::Limit {
652 tif: TimeInForceRequest::Alo,
653 },
654 c: None,
655 }
656 }
657
658 fn mk_limit_gtc(asset: u32) -> OrderRequest {
659 OrderRequest {
660 a: asset,
661 b: true,
662 p: "1".to_string(),
663 s: "1".to_string(),
664 r: false,
665 t: OrderTypeRequest::Limit {
666 tif: TimeInForceRequest::Gtc,
668 },
669 c: None,
670 }
671 }
672
673 #[rstest]
676 #[tokio::test(flavor = "multi_thread")]
677 async fn register_duplicate_id_errors() {
678 let router = PostRouter::new();
679 let _rx = router.register(42).await.expect("first register OK");
680
681 let err = router.register(42).await.expect_err("duplicate must error");
682 let msg = err.to_string().to_lowercase();
683 assert!(
684 msg.contains("already") || msg.contains("duplicate"),
685 "unexpected error: {msg}"
686 );
687 }
688
689 #[rstest]
690 #[tokio::test(flavor = "multi_thread")]
691 async fn timeout_cancels_and_allows_reregister() {
692 let router = PostRouter::new();
693 let id = 7;
694
695 let rx = router.register(id).await.unwrap();
696 let err = router
698 .await_with_timeout(id, rx, Duration::from_millis(25))
699 .await
700 .expect_err("should timeout");
701 assert!(
702 err.to_string().to_lowercase().contains("timeout")
703 || err.to_string().to_lowercase().contains("closed"),
704 "unexpected error kind: {err}"
705 );
706
707 let _rx2 = router
709 .register(id)
710 .await
711 .expect("id should be reusable after timeout cancel");
712 }
713
714 #[rstest]
715 #[tokio::test(flavor = "multi_thread")]
716 async fn inflight_cap_blocks_then_unblocks() {
717 let router = PostRouter::new();
718
719 let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
721 for i in 0..INFLIGHT_MAX {
722 let rx = router.register(i as u64).await.unwrap();
723 rxs.push(rx); }
725
726 let router2 = Arc::clone(&router);
728 let (entered_tx, entered_rx) = oneshot::channel::<()>();
729 let (done_tx, done_rx) = oneshot::channel::<()>();
730 let (check_tx, check_rx) = oneshot::channel::<()>(); get_runtime().spawn(async move {
733 let _ = entered_tx.send(());
734 let _rx = router2.register(9_999_999).await.unwrap();
735 let _ = done_tx.send(());
736 });
737
738 entered_rx.await.unwrap();
740
741 get_runtime().spawn(async move {
743 if done_rx.await.is_ok() {
744 let _ = check_tx.send(());
745 }
746 });
747
748 assert!(
749 timeout(Duration::from_millis(50), check_rx).await.is_err(),
750 "should still be blocked while at cap"
751 );
752
753 router.cancel(0).await;
755
756 tokio::time::sleep(Duration::from_millis(100)).await;
758 }
759
760 #[rstest(
763 orders, expected,
764 case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
765 case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
766 case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
767 case::empty(vec![], PostLane::Normal),
768 )]
769 fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
770 let action = ActionRequest::Order {
771 orders,
772 grouping: "na".to_string(),
773 };
774 assert_eq!(lane_for_action(&action), expected);
775 }
776
777 #[rstest]
780 fn test_order_request_builder() {
781 let order = OrderRequestBuilder::default()
783 .a(0)
784 .b(true)
785 .p("40000.0".to_string())
786 .s("0.01".to_string())
787 .r(false)
788 .t(OrderTypeRequest::Limit {
789 tif: TimeInForceRequest::Gtc,
790 })
791 .c(Some("test-order-1".to_string()))
792 .build()
793 .expect("should build order");
794
795 assert_eq!(order.a, 0);
796 assert!(order.b);
797 assert_eq!(order.p, "40000.0");
798 assert_eq!(order.s, "0.01");
799 assert!(!order.r);
800 assert_eq!(order.c, Some("test-order-1".to_string()));
801 }
802
803 #[rstest]
804 fn test_limit_order_params_builder() {
805 let params = LimitOrderParamsBuilder::default()
807 .asset(0)
808 .is_buy(true)
809 .px("40000.0".to_string())
810 .sz("0.01".to_string())
811 .reduce_only(false)
812 .tif(TimeInForceRequest::Alo)
813 .cloid(Some("test-limit-1".to_string()))
814 .build()
815 .expect("should build limit params");
816
817 assert_eq!(params.asset, 0);
818 assert!(params.is_buy);
819 assert_eq!(params.px, "40000.0");
820 assert_eq!(params.sz, "0.01");
821 assert!(!params.reduce_only);
822 assert_eq!(params.cloid, Some("test-limit-1".to_string()));
823 }
824
825 #[rstest]
826 fn test_trigger_order_params_builder() {
827 let params = TriggerOrderParamsBuilder::default()
829 .asset(1)
830 .is_buy(false)
831 .px("39000.0".to_string())
832 .sz("0.02".to_string())
833 .reduce_only(false)
834 .is_market(true)
835 .trigger_px("39500.0".to_string())
836 .tpsl(TpSlRequest::Sl)
837 .cloid(Some("test-trigger-1".to_string()))
838 .build()
839 .expect("should build trigger params");
840
841 assert_eq!(params.asset, 1);
842 assert!(!params.is_buy);
843 assert_eq!(params.px, "39000.0");
844 assert!(params.is_market);
845 assert_eq!(params.trigger_px, "39500.0");
846 }
847
848 #[rstest]
849 fn test_order_builder_single_limit_convenience() {
850 let params = LimitOrderParamsBuilder::default()
852 .asset(0)
853 .is_buy(true)
854 .px("40000.0".to_string())
855 .sz("0.01".to_string())
856 .reduce_only(false)
857 .tif(TimeInForceRequest::Gtc)
858 .cloid(None)
859 .build()
860 .unwrap();
861
862 let action = OrderBuilder::single_limit_order(params);
863
864 match action {
865 ActionRequest::Order { orders, grouping } => {
866 assert_eq!(orders.len(), 1);
867 assert_eq!(orders[0].a, 0);
868 assert!(orders[0].b);
869 assert_eq!(grouping, "na");
870 }
871 _ => panic!("Expected ActionRequest::Order variant"),
872 }
873 }
874
875 #[rstest]
876 fn test_order_builder_single_trigger_convenience() {
877 let params = TriggerOrderParamsBuilder::default()
879 .asset(1)
880 .is_buy(false)
881 .px("39000.0".to_string())
882 .sz("0.02".to_string())
883 .reduce_only(false)
884 .is_market(true)
885 .trigger_px("39500.0".to_string())
886 .tpsl(TpSlRequest::Sl)
887 .cloid(Some("sl-order".to_string()))
888 .build()
889 .unwrap();
890
891 let action = OrderBuilder::single_trigger_order(params);
892
893 match action {
894 ActionRequest::Order { orders, grouping } => {
895 assert_eq!(orders.len(), 1);
896 assert_eq!(orders[0].a, 1);
897 assert_eq!(orders[0].c, Some("sl-order".to_string()));
898 assert_eq!(grouping, "na");
899 }
900 _ => panic!("Expected ActionRequest::Order variant"),
901 }
902 }
903
904 #[rstest]
905 fn test_order_builder_batch_orders() {
906 let params1 = LimitOrderParams {
908 asset: 0,
909 is_buy: true,
910 px: "40000.0".to_string(),
911 sz: "0.01".to_string(),
912 reduce_only: false,
913 tif: TimeInForceRequest::Gtc,
914 cloid: Some("order-1".to_string()),
915 };
916
917 let params2 = LimitOrderParams {
918 asset: 1,
919 is_buy: false,
920 px: "2000.0".to_string(),
921 sz: "0.5".to_string(),
922 reduce_only: false,
923 tif: TimeInForceRequest::Ioc,
924 cloid: Some("order-2".to_string()),
925 };
926
927 let action = OrderBuilder::new()
928 .grouping(Grouping::NormalTpsl)
929 .push_limit_order(params1)
930 .push_limit_order(params2)
931 .build();
932
933 match action {
934 ActionRequest::Order { orders, grouping } => {
935 assert_eq!(orders.len(), 2);
936 assert_eq!(orders[0].c, Some("order-1".to_string()));
937 assert_eq!(orders[1].c, Some("order-2".to_string()));
938 assert_eq!(grouping, "normalTpsl");
939 }
940 _ => panic!("Expected ActionRequest::Order variant"),
941 }
942 }
943
944 #[rstest]
945 fn test_action_request_constructors() {
946 let order1 = mk_limit_gtc(0);
948 let order2 = mk_limit_gtc(1);
949 let action = ActionRequest::order(vec![order1, order2], "na");
950
951 match action {
952 ActionRequest::Order { orders, grouping } => {
953 assert_eq!(orders.len(), 2);
954 assert_eq!(grouping, "na");
955 }
956 _ => panic!("Expected ActionRequest::Order variant"),
957 }
958
959 let cancels = vec![CancelRequest { a: 0, o: 12345 }];
961 let action = ActionRequest::cancel(cancels);
962 assert!(matches!(action, ActionRequest::Cancel { .. }));
963
964 let cancels = vec![CancelByCloidRequest {
966 asset: 0,
967 cloid: "order-1".to_string(),
968 }];
969 let action = ActionRequest::cancel_by_cloid(cancels);
970 assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
971 }
972
973 #[rstest]
976 #[tokio::test(flavor = "multi_thread")]
977 async fn batcher_sends_on_tick() {
978 let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
980 let sent_closure = sent.clone();
981
982 let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
983 let sent_inner = sent_closure.clone();
984 Box::pin(async move {
985 if let HyperliquidWsRequest::Post { id, .. } = req {
986 sent_inner.lock().await.push(id);
987 }
988 Ok(())
989 })
990 };
991
992 let batcher = PostBatcher::new(send_fn);
993
994 for id in 1..=5u64 {
996 batcher
997 .enqueue(ScheduledPost {
998 id,
999 request: PostRequest::Info {
1000 payload: serde_json::json!({"type":"allMids"}),
1001 },
1002 lane: PostLane::Normal,
1003 })
1004 .await
1005 .unwrap();
1006 }
1007
1008 let sent_check = sent.clone();
1010 wait_until_async(
1011 || {
1012 let sent_inner = sent_check.clone();
1013 async move { sent_inner.lock().await.len() == 5 }
1014 },
1015 Duration::from_secs(2),
1016 )
1017 .await;
1018
1019 let got = sent.lock().await.clone();
1020 assert_eq!(got, vec![1, 2, 3, 4, 5]);
1021 }
1022}