1use std::{
17 collections::HashMap,
18 sync::{
19 Arc,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::Duration,
23};
24
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use nautilus_common::live::get_runtime;
28use tokio::{
29 sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
30 time,
31};
32
33use crate::{
34 common::consts::INFLIGHT_MAX,
35 http::{
36 error::{Error, Result},
37 models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
38 },
39 websocket::messages::{
40 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
41 OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
42 },
43};
44
45#[derive(Debug)]
51struct Waiter {
52 tx: oneshot::Sender<PostResponse>,
53 _permit: OwnedSemaphorePermit,
55}
56
57#[derive(Debug)]
58pub struct PostRouter {
59 inner: Mutex<HashMap<u64, Waiter>>,
60 inflight: Arc<Semaphore>, }
62
63impl Default for PostRouter {
64 fn default() -> Self {
65 Self {
66 inner: Mutex::new(HashMap::new()),
67 inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
68 }
69 }
70}
71
72impl PostRouter {
73 pub fn new() -> Arc<Self> {
74 Arc::new(Self::default())
75 }
76
77 pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
79 let permit = self
81 .inflight
82 .clone()
83 .acquire_owned()
84 .await
85 .map_err(|_| Error::transport("post router semaphore closed"))?;
86
87 let (tx, rx) = oneshot::channel::<PostResponse>();
88 let mut map = self.inner.lock().await;
89 if map.contains_key(&id) {
90 return Err(Error::transport(format!("post id {id} already registered")));
91 }
92 map.insert(
93 id,
94 Waiter {
95 tx,
96 _permit: permit,
97 },
98 );
99 Ok(rx)
100 }
101
102 pub async fn complete(&self, resp: PostResponse) {
104 let id = resp.id;
105 let waiter = {
106 let mut map = self.inner.lock().await;
107 map.remove(&id)
108 };
109 if let Some(waiter) = waiter {
110 if waiter.tx.send(resp).is_err() {
111 tracing::warn!(id, "post waiter dropped before delivery");
112 }
113 } else {
115 tracing::warn!(id, "post response with unknown id (late/duplicate?)");
116 }
117 }
118
119 pub async fn cancel(&self, id: u64) {
121 let _ = {
122 let mut map = self.inner.lock().await;
123 map.remove(&id)
124 };
125 }
127
128 pub async fn await_with_timeout(
130 &self,
131 id: u64,
132 rx: oneshot::Receiver<PostResponse>,
133 timeout: Duration,
134 ) -> Result<PostResponse> {
135 match time::timeout(timeout, rx).await {
136 Ok(Ok(resp)) => Ok(resp),
137 Ok(Err(_closed)) => {
138 self.cancel(id).await;
139 Err(Error::transport("post response channel closed"))
140 }
141 Err(_elapsed) => {
142 self.cancel(id).await;
143 Err(Error::Timeout)
144 }
145 }
146 }
147}
148
149#[derive(Debug)]
154pub struct PostIds(AtomicU64);
155
156impl PostIds {
157 pub fn new(start: u64) -> Self {
158 Self(AtomicU64::new(start))
159 }
160 pub fn next(&self) -> u64 {
161 self.0.fetch_add(1, Ordering::Relaxed)
162 }
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum PostLane {
171 Alo, Normal, }
174
175#[derive(Debug)]
176pub struct ScheduledPost {
177 pub id: u64,
178 pub request: PostRequest,
179 pub lane: PostLane,
180}
181
182#[derive(Debug)]
183pub struct PostBatcher {
184 tx_alo: mpsc::Sender<ScheduledPost>,
185 tx_normal: mpsc::Sender<ScheduledPost>,
186}
187
188impl PostBatcher {
189 pub fn new<F>(send_fn: F) -> Self
191 where
192 F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
193 {
194 let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
195 let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
196
197 get_runtime().spawn(Self::run_lane(
199 "ALO",
200 rx_alo,
201 Duration::from_millis(100),
202 send_fn.clone(),
203 ));
204
205 get_runtime().spawn(Self::run_lane(
207 "NORMAL",
208 rx_normal,
209 Duration::from_millis(50),
210 send_fn,
211 ));
212
213 Self { tx_alo, tx_normal }
214 }
215
216 async fn run_lane<F>(
217 lane_name: &'static str,
218 mut rx: mpsc::Receiver<ScheduledPost>,
219 tick: Duration,
220 mut send_fn: F,
221 ) where
222 F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
223 {
224 let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
225 let mut interval = time::interval(tick);
226 interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
227
228 loop {
229 tokio::select! {
230 maybe_item = rx.recv() => {
231 match maybe_item {
232 Some(item) => pend.push(item),
233 None => break, }
235 }
236 _ = interval.tick() => {
237 if pend.is_empty() { continue; }
238 let to_send = std::mem::take(&mut pend);
239 for item in to_send {
240 let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
241 if let Err(e) = send_fn(req).await {
242 tracing::error!(lane=%lane_name, id=%item.id, "failed to send post: {e}");
243 }
244 }
245 }
246 }
247 }
248 tracing::info!(lane=%lane_name, "post lane terminated");
249 }
250
251 pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
252 match item.lane {
253 PostLane::Alo => self
254 .tx_alo
255 .send(item)
256 .await
257 .map_err(|_| Error::transport("ALO lane closed")),
258 PostLane::Normal => self
259 .tx_normal
260 .send(item)
261 .await
262 .map_err(|_| Error::transport("NORMAL lane closed")),
263 }
264 }
265}
266
267pub fn lane_for_action(action: &ActionRequest) -> PostLane {
269 match action {
270 ActionRequest::Order { orders, .. } => {
271 if orders.is_empty() {
272 return PostLane::Normal;
273 }
274 let all_alo = orders.iter().all(|o| {
275 matches!(
276 o.t,
277 OrderTypeRequest::Limit {
278 tif: TimeInForceRequest::Alo
279 }
280 )
281 });
282 if all_alo {
283 PostLane::Alo
284 } else {
285 PostLane::Normal
286 }
287 }
288 _ => PostLane::Normal,
289 }
290}
291
292#[derive(Debug, Clone, Copy, Default)]
297pub enum Grouping {
298 #[default]
299 Na,
300 NormalTpsl,
301 PositionTpsl,
302}
303impl Grouping {
304 pub fn as_str(&self) -> &'static str {
305 match self {
306 Self::Na => "na",
307 Self::NormalTpsl => "normalTpsl",
308 Self::PositionTpsl => "positionTpsl",
309 }
310 }
311}
312
313#[derive(Debug, Clone, Builder)]
315pub struct LimitOrderParams {
316 pub asset: u32,
317 pub is_buy: bool,
318 pub px: String,
319 pub sz: String,
320 pub reduce_only: bool,
321 pub tif: TimeInForceRequest,
322 pub cloid: Option<String>,
323}
324
325#[derive(Debug, Clone, Builder)]
327pub struct TriggerOrderParams {
328 pub asset: u32,
329 pub is_buy: bool,
330 pub px: String,
331 pub sz: String,
332 pub reduce_only: bool,
333 pub is_market: bool,
334 pub trigger_px: String,
335 pub tpsl: TpSlRequest,
336 pub cloid: Option<String>,
337}
338
339#[derive(Debug, Default)]
341pub struct OrderBuilder {
342 orders: Vec<OrderRequest>,
343 grouping: Grouping,
344}
345
346impl OrderBuilder {
347 pub fn new() -> Self {
348 Self::default()
349 }
350
351 #[must_use]
352 pub fn grouping(mut self, g: Grouping) -> Self {
353 self.grouping = g;
354 self
355 }
356
357 #[allow(clippy::too_many_arguments)]
359 #[must_use]
360 pub fn push_limit(
361 self,
362 asset: u32,
363 is_buy: bool,
364 px: impl ToString,
365 sz: impl ToString,
366 reduce_only: bool,
367 tif: TimeInForceRequest,
368 cloid: Option<String>,
369 ) -> Self {
370 let params = LimitOrderParams {
371 asset,
372 is_buy,
373 px: px.to_string(),
374 sz: sz.to_string(),
375 reduce_only,
376 tif,
377 cloid,
378 };
379 self.push_limit_order(params)
380 }
381
382 #[must_use]
384 pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
385 self.orders.push(OrderRequest {
386 a: params.asset,
387 b: params.is_buy,
388 p: params.px,
389 s: params.sz,
390 r: params.reduce_only,
391 t: OrderTypeRequest::Limit { tif: params.tif },
392 c: params.cloid,
393 });
394 self
395 }
396
397 #[allow(clippy::too_many_arguments)]
399 #[must_use]
400 pub fn push_trigger(
401 self,
402 asset: u32,
403 is_buy: bool,
404 px: impl ToString,
405 sz: impl ToString,
406 reduce_only: bool,
407 is_market: bool,
408 trigger_px: impl ToString,
409 tpsl: TpSlRequest,
410 cloid: Option<String>,
411 ) -> Self {
412 let params = TriggerOrderParams {
413 asset,
414 is_buy,
415 px: px.to_string(),
416 sz: sz.to_string(),
417 reduce_only,
418 is_market,
419 trigger_px: trigger_px.to_string(),
420 tpsl,
421 cloid,
422 };
423 self.push_trigger_order(params)
424 }
425
426 #[must_use]
428 pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
429 self.orders.push(OrderRequest {
430 a: params.asset,
431 b: params.is_buy,
432 p: params.px,
433 s: params.sz,
434 r: params.reduce_only,
435 t: OrderTypeRequest::Trigger {
436 is_market: params.is_market,
437 trigger_px: params.trigger_px,
438 tpsl: params.tpsl,
439 },
440 c: params.cloid,
441 });
442 self
443 }
444 pub fn build(self) -> ActionRequest {
445 ActionRequest::Order {
446 orders: self.orders,
447 grouping: self.grouping.as_str().to_string(),
448 }
449 }
450
451 pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
468 Self::new().push_limit_order(params).build()
469 }
470
471 pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
490 Self::new().push_trigger_order(params).build()
491 }
492}
493
494pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
495 ActionRequest::Cancel {
496 cancels: cancels
497 .into_iter()
498 .map(|(a, o)| CancelRequest { a, o })
499 .collect(),
500 }
501}
502pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
503 ActionRequest::CancelByCloid {
504 cancels: vec![CancelByCloidRequest {
505 asset,
506 cloid: cloid.into(),
507 }],
508 }
509}
510pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
511 ActionRequest::Modify {
512 modifies: vec![ModifyRequest {
513 oid,
514 order: new_order,
515 }],
516 }
517}
518
519pub fn info_l2_book(coin: &str) -> PostRequest {
521 PostRequest::Info {
522 payload: serde_json::json!({"type":"l2Book","coin":coin}),
523 }
524}
525pub fn info_all_mids() -> PostRequest {
526 PostRequest::Info {
527 payload: serde_json::json!({"type":"allMids"}),
528 }
529}
530pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
531 PostRequest::Info {
532 payload: serde_json::json!({"type":"orderStatus","user":user,"oid":oid}),
533 }
534}
535pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
536 let mut body = serde_json::json!({"type":"openOrders","user":user});
537 if let Some(fe) = frontend {
538 body["frontend"] = serde_json::json!(fe);
539 }
540 PostRequest::Info { payload: body }
541}
542pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
543 let mut body = serde_json::json!({"type":"userFills","user":user});
544 if let Some(agg) = aggregate_by_time {
545 body["aggregateByTime"] = serde_json::json!(agg);
546 }
547 PostRequest::Info { payload: body }
548}
549pub fn info_user_rate_limit(user: &str) -> PostRequest {
550 PostRequest::Info {
551 payload: serde_json::json!({"type":"userRateLimit","user":user}),
552 }
553}
554pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
555 PostRequest::Info {
556 payload: serde_json::json!({"type":"candle","coin":coin,"interval":interval}),
557 }
558}
559
560pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
565 serde_json::from_value(payload.clone()).map_err(Error::Serde)
566}
567pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
568 serde_json::from_value(payload.clone()).map_err(Error::Serde)
569}
570pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
571 serde_json::from_value(payload.clone()).map_err(Error::Serde)
572}
573
574#[derive(Debug)]
576pub enum ActionOutcome<'a> {
577 Resting {
578 oid: u64,
579 },
580 Filled {
581 total_sz: &'a str,
582 avg_px: &'a str,
583 oid: Option<u64>,
584 },
585 Error {
586 msg: &'a str,
587 },
588 Unknown(&'a serde_json::Value),
589}
590pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
591 if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
592 if let (Some(total_sz), Some(avg_px)) = (
593 payload.get("totalSz").and_then(|v| v.as_str()),
594 payload.get("avgPx").and_then(|v| v.as_str()),
595 ) {
596 return ActionOutcome::Filled {
597 total_sz,
598 avg_px,
599 oid: Some(oid),
600 };
601 }
602 return ActionOutcome::Resting { oid };
603 }
604 if let (Some(total_sz), Some(avg_px)) = (
605 payload.get("totalSz").and_then(|v| v.as_str()),
606 payload.get("avgPx").and_then(|v| v.as_str()),
607 ) {
608 return ActionOutcome::Filled {
609 total_sz,
610 avg_px,
611 oid: None,
612 };
613 }
614 if let Some(msg) = payload
615 .get("error")
616 .and_then(|v| v.as_str())
617 .or_else(|| payload.get("message").and_then(|v| v.as_str()))
618 {
619 return ActionOutcome::Error { msg };
620 }
621 ActionOutcome::Unknown(payload)
622}
623
624#[derive(Clone, Debug)]
629pub struct WsSender {
630 inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
631}
632
633impl WsSender {
634 pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
635 Self {
636 inner: Arc::new(tokio::sync::Mutex::new(tx)),
637 }
638 }
639
640 pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
641 let sender = self.inner.lock().await;
642 sender
643 .send(req)
644 .await
645 .map_err(|_| Error::transport("WebSocket sender closed"))
646 }
647}
648
649#[cfg(test)]
650mod tests {
651 use rstest::rstest;
652 use tokio::{
653 sync::oneshot,
654 time::{Duration, sleep, timeout},
655 };
656
657 use super::*;
658 use crate::{
659 common::consts::INFLIGHT_MAX,
660 websocket::messages::{
661 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
662 OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
663 },
664 };
665
666 fn mk_limit_alo(asset: u32) -> OrderRequest {
669 OrderRequest {
670 a: asset,
671 b: true,
672 p: "1".to_string(),
673 s: "1".to_string(),
674 r: false,
675 t: OrderTypeRequest::Limit {
676 tif: TimeInForceRequest::Alo,
677 },
678 c: None,
679 }
680 }
681
682 fn mk_limit_gtc(asset: u32) -> OrderRequest {
683 OrderRequest {
684 a: asset,
685 b: true,
686 p: "1".to_string(),
687 s: "1".to_string(),
688 r: false,
689 t: OrderTypeRequest::Limit {
690 tif: TimeInForceRequest::Gtc,
692 },
693 c: None,
694 }
695 }
696
697 #[rstest]
700 #[tokio::test(flavor = "multi_thread")]
701 async fn register_duplicate_id_errors() {
702 let router = PostRouter::new();
703 let _rx = router.register(42).await.expect("first register OK");
704
705 let err = router.register(42).await.expect_err("duplicate must error");
706 let msg = err.to_string().to_lowercase();
707 assert!(
708 msg.contains("already") || msg.contains("duplicate"),
709 "unexpected error: {msg}"
710 );
711 }
712
713 #[rstest]
714 #[tokio::test(flavor = "multi_thread")]
715 async fn timeout_cancels_and_allows_reregister() {
716 let router = PostRouter::new();
717 let id = 7;
718
719 let rx = router.register(id).await.unwrap();
720 let err = router
722 .await_with_timeout(id, rx, Duration::from_millis(25))
723 .await
724 .expect_err("should timeout");
725 assert!(
726 err.to_string().to_lowercase().contains("timeout")
727 || err.to_string().to_lowercase().contains("closed"),
728 "unexpected error kind: {err}"
729 );
730
731 let _rx2 = router
733 .register(id)
734 .await
735 .expect("id should be reusable after timeout cancel");
736 }
737
738 #[rstest]
739 #[tokio::test(flavor = "multi_thread")]
740 async fn inflight_cap_blocks_then_unblocks() {
741 let router = PostRouter::new();
742
743 let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
745 for i in 0..INFLIGHT_MAX {
746 let rx = router.register(i as u64).await.unwrap();
747 rxs.push(rx); }
749
750 let router2 = Arc::clone(&router);
752 let (entered_tx, entered_rx) = oneshot::channel::<()>();
753 let (done_tx, done_rx) = oneshot::channel::<()>();
754 let (check_tx, check_rx) = oneshot::channel::<()>(); get_runtime().spawn(async move {
757 let _ = entered_tx.send(());
758 let _rx = router2.register(9_999_999).await.unwrap();
759 let _ = done_tx.send(());
760 });
761
762 entered_rx.await.unwrap();
764
765 get_runtime().spawn(async move {
767 if done_rx.await.is_ok() {
768 let _ = check_tx.send(());
769 }
770 });
771
772 assert!(
773 timeout(Duration::from_millis(50), check_rx).await.is_err(),
774 "should still be blocked while at cap"
775 );
776
777 router.cancel(0).await;
779
780 tokio::time::sleep(Duration::from_millis(100)).await;
782 }
783
784 #[rstest(
787 orders, expected,
788 case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
789 case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
790 case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
791 case::empty(vec![], PostLane::Normal),
792 )]
793 fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
794 let action = ActionRequest::Order {
795 orders,
796 grouping: "na".to_string(),
797 };
798 assert_eq!(lane_for_action(&action), expected);
799 }
800
801 #[rstest]
804 fn test_order_request_builder() {
805 let order = OrderRequestBuilder::default()
807 .a(0)
808 .b(true)
809 .p("40000.0".to_string())
810 .s("0.01".to_string())
811 .r(false)
812 .t(OrderTypeRequest::Limit {
813 tif: TimeInForceRequest::Gtc,
814 })
815 .c(Some("test-order-1".to_string()))
816 .build()
817 .expect("should build order");
818
819 assert_eq!(order.a, 0);
820 assert!(order.b);
821 assert_eq!(order.p, "40000.0");
822 assert_eq!(order.s, "0.01");
823 assert!(!order.r);
824 assert_eq!(order.c, Some("test-order-1".to_string()));
825 }
826
827 #[rstest]
828 fn test_limit_order_params_builder() {
829 let params = LimitOrderParamsBuilder::default()
831 .asset(0)
832 .is_buy(true)
833 .px("40000.0".to_string())
834 .sz("0.01".to_string())
835 .reduce_only(false)
836 .tif(TimeInForceRequest::Alo)
837 .cloid(Some("test-limit-1".to_string()))
838 .build()
839 .expect("should build limit params");
840
841 assert_eq!(params.asset, 0);
842 assert!(params.is_buy);
843 assert_eq!(params.px, "40000.0");
844 assert_eq!(params.sz, "0.01");
845 assert!(!params.reduce_only);
846 assert_eq!(params.cloid, Some("test-limit-1".to_string()));
847 }
848
849 #[rstest]
850 fn test_trigger_order_params_builder() {
851 let params = TriggerOrderParamsBuilder::default()
853 .asset(1)
854 .is_buy(false)
855 .px("39000.0".to_string())
856 .sz("0.02".to_string())
857 .reduce_only(false)
858 .is_market(true)
859 .trigger_px("39500.0".to_string())
860 .tpsl(TpSlRequest::Sl)
861 .cloid(Some("test-trigger-1".to_string()))
862 .build()
863 .expect("should build trigger params");
864
865 assert_eq!(params.asset, 1);
866 assert!(!params.is_buy);
867 assert_eq!(params.px, "39000.0");
868 assert!(params.is_market);
869 assert_eq!(params.trigger_px, "39500.0");
870 }
871
872 #[rstest]
873 fn test_order_builder_single_limit_convenience() {
874 let params = LimitOrderParamsBuilder::default()
876 .asset(0)
877 .is_buy(true)
878 .px("40000.0".to_string())
879 .sz("0.01".to_string())
880 .reduce_only(false)
881 .tif(TimeInForceRequest::Gtc)
882 .cloid(None)
883 .build()
884 .unwrap();
885
886 let action = OrderBuilder::single_limit_order(params);
887
888 match action {
889 ActionRequest::Order { orders, grouping } => {
890 assert_eq!(orders.len(), 1);
891 assert_eq!(orders[0].a, 0);
892 assert!(orders[0].b);
893 assert_eq!(grouping, "na");
894 }
895 _ => panic!("Expected ActionRequest::Order variant"),
896 }
897 }
898
899 #[rstest]
900 fn test_order_builder_single_trigger_convenience() {
901 let params = TriggerOrderParamsBuilder::default()
903 .asset(1)
904 .is_buy(false)
905 .px("39000.0".to_string())
906 .sz("0.02".to_string())
907 .reduce_only(false)
908 .is_market(true)
909 .trigger_px("39500.0".to_string())
910 .tpsl(TpSlRequest::Sl)
911 .cloid(Some("sl-order".to_string()))
912 .build()
913 .unwrap();
914
915 let action = OrderBuilder::single_trigger_order(params);
916
917 match action {
918 ActionRequest::Order { orders, grouping } => {
919 assert_eq!(orders.len(), 1);
920 assert_eq!(orders[0].a, 1);
921 assert_eq!(orders[0].c, Some("sl-order".to_string()));
922 assert_eq!(grouping, "na");
923 }
924 _ => panic!("Expected ActionRequest::Order variant"),
925 }
926 }
927
928 #[rstest]
929 fn test_order_builder_batch_orders() {
930 let params1 = LimitOrderParams {
932 asset: 0,
933 is_buy: true,
934 px: "40000.0".to_string(),
935 sz: "0.01".to_string(),
936 reduce_only: false,
937 tif: TimeInForceRequest::Gtc,
938 cloid: Some("order-1".to_string()),
939 };
940
941 let params2 = LimitOrderParams {
942 asset: 1,
943 is_buy: false,
944 px: "2000.0".to_string(),
945 sz: "0.5".to_string(),
946 reduce_only: false,
947 tif: TimeInForceRequest::Ioc,
948 cloid: Some("order-2".to_string()),
949 };
950
951 let action = OrderBuilder::new()
952 .grouping(Grouping::NormalTpsl)
953 .push_limit_order(params1)
954 .push_limit_order(params2)
955 .build();
956
957 match action {
958 ActionRequest::Order { orders, grouping } => {
959 assert_eq!(orders.len(), 2);
960 assert_eq!(orders[0].c, Some("order-1".to_string()));
961 assert_eq!(orders[1].c, Some("order-2".to_string()));
962 assert_eq!(grouping, "normalTpsl");
963 }
964 _ => panic!("Expected ActionRequest::Order variant"),
965 }
966 }
967
968 #[rstest]
969 fn test_action_request_constructors() {
970 let order1 = mk_limit_gtc(0);
972 let order2 = mk_limit_gtc(1);
973 let action = ActionRequest::order(vec![order1, order2], "na");
974
975 match action {
976 ActionRequest::Order { orders, grouping } => {
977 assert_eq!(orders.len(), 2);
978 assert_eq!(grouping, "na");
979 }
980 _ => panic!("Expected ActionRequest::Order variant"),
981 }
982
983 let cancels = vec![CancelRequest { a: 0, o: 12345 }];
985 let action = ActionRequest::cancel(cancels);
986 assert!(matches!(action, ActionRequest::Cancel { .. }));
987
988 let cancels = vec![CancelByCloidRequest {
990 asset: 0,
991 cloid: "order-1".to_string(),
992 }];
993 let action = ActionRequest::cancel_by_cloid(cancels);
994 assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
995 }
996
997 #[rstest]
1000 #[tokio::test(flavor = "multi_thread")]
1001 async fn batcher_sends_on_tick() {
1002 let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1004 let sent_closure = sent.clone();
1005
1006 let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
1007 let sent_inner = sent_closure.clone();
1008 Box::pin(async move {
1009 if let HyperliquidWsRequest::Post { id, .. } = req {
1010 sent_inner.lock().await.push(id);
1011 }
1012 Ok(())
1013 })
1014 };
1015
1016 let batcher = PostBatcher::new(send_fn);
1017
1018 for id in 1..=5u64 {
1020 batcher
1021 .enqueue(ScheduledPost {
1022 id,
1023 request: PostRequest::Info {
1024 payload: serde_json::json!({"type":"allMids"}),
1025 },
1026 lane: PostLane::Normal,
1027 })
1028 .await
1029 .unwrap();
1030 }
1031
1032 sleep(Duration::from_millis(80)).await;
1034
1035 let got = sent.lock().await.clone();
1036 assert_eq!(got.len(), 5, "expected 5 sends on first tick");
1037 assert_eq!(got, vec![1, 2, 3, 4, 5]);
1038 }
1039}