1use std::{
17 collections::HashMap,
18 sync::{
19 Arc,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::Duration,
23};
24
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use tokio::{
28 sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
29 time,
30};
31
32use crate::{
33 common::consts::INFLIGHT_MAX,
34 http::{
35 error::{Error, Result},
36 models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
37 },
38 websocket::messages::{
39 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
40 OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
41 },
42};
43
44#[derive(Debug)]
50struct Waiter {
51 tx: oneshot::Sender<PostResponse>,
52 _permit: OwnedSemaphorePermit,
54}
55
56#[derive(Debug)]
57pub struct PostRouter {
58 inner: Mutex<HashMap<u64, Waiter>>,
59 inflight: Arc<Semaphore>, }
61
62impl Default for PostRouter {
63 fn default() -> Self {
64 Self {
65 inner: Mutex::new(HashMap::new()),
66 inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
67 }
68 }
69}
70
71impl PostRouter {
72 pub fn new() -> Arc<Self> {
73 Arc::new(Self::default())
74 }
75
76 pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
78 let permit = self
80 .inflight
81 .clone()
82 .acquire_owned()
83 .await
84 .map_err(|_| Error::transport("post router semaphore closed"))?;
85
86 let (tx, rx) = oneshot::channel::<PostResponse>();
87 let mut map = self.inner.lock().await;
88 if map.contains_key(&id) {
89 return Err(Error::transport(format!("post id {id} already registered")));
90 }
91 map.insert(
92 id,
93 Waiter {
94 tx,
95 _permit: permit,
96 },
97 );
98 Ok(rx)
99 }
100
101 pub async fn complete(&self, resp: PostResponse) {
103 let id = resp.id;
104 let waiter = {
105 let mut map = self.inner.lock().await;
106 map.remove(&id)
107 };
108 if let Some(waiter) = waiter {
109 if waiter.tx.send(resp).is_err() {
110 tracing::warn!(id, "post waiter dropped before delivery");
111 }
112 } else {
114 tracing::warn!(id, "post response with unknown id (late/duplicate?)");
115 }
116 }
117
118 pub async fn cancel(&self, id: u64) {
120 let _ = {
121 let mut map = self.inner.lock().await;
122 map.remove(&id)
123 };
124 }
126
127 pub async fn await_with_timeout(
129 &self,
130 id: u64,
131 rx: oneshot::Receiver<PostResponse>,
132 timeout: Duration,
133 ) -> Result<PostResponse> {
134 match time::timeout(timeout, rx).await {
135 Ok(Ok(resp)) => Ok(resp),
136 Ok(Err(_closed)) => {
137 self.cancel(id).await;
138 Err(Error::transport("post response channel closed"))
139 }
140 Err(_elapsed) => {
141 self.cancel(id).await;
142 Err(Error::Timeout)
143 }
144 }
145 }
146}
147
148#[derive(Debug)]
153pub struct PostIds(AtomicU64);
154
155impl PostIds {
156 pub fn new(start: u64) -> Self {
157 Self(AtomicU64::new(start))
158 }
159 pub fn next(&self) -> u64 {
160 self.0.fetch_add(1, Ordering::Relaxed)
161 }
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169pub enum PostLane {
170 Alo, Normal, }
173
174#[derive(Debug)]
175pub struct ScheduledPost {
176 pub id: u64,
177 pub request: PostRequest,
178 pub lane: PostLane,
179}
180
181#[derive(Debug)]
182pub struct PostBatcher {
183 tx_alo: mpsc::Sender<ScheduledPost>,
184 tx_normal: mpsc::Sender<ScheduledPost>,
185}
186
187impl PostBatcher {
188 pub fn new<F>(send_fn: F) -> Self
190 where
191 F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
192 {
193 let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
194 let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
195
196 tokio::spawn(Self::run_lane(
198 "ALO",
199 rx_alo,
200 Duration::from_millis(100),
201 send_fn.clone(),
202 ));
203
204 tokio::spawn(Self::run_lane(
206 "NORMAL",
207 rx_normal,
208 Duration::from_millis(50),
209 send_fn,
210 ));
211
212 Self { tx_alo, tx_normal }
213 }
214
215 async fn run_lane<F>(
216 lane_name: &'static str,
217 mut rx: mpsc::Receiver<ScheduledPost>,
218 tick: Duration,
219 mut send_fn: F,
220 ) where
221 F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
222 {
223 let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
224 let mut interval = time::interval(tick);
225 interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
226
227 loop {
228 tokio::select! {
229 maybe_item = rx.recv() => {
230 match maybe_item {
231 Some(item) => pend.push(item),
232 None => break, }
234 }
235 _ = interval.tick() => {
236 if pend.is_empty() { continue; }
237 let to_send = std::mem::take(&mut pend);
238 for item in to_send {
239 let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
240 if let Err(e) = send_fn(req).await {
241 tracing::error!(lane=%lane_name, id=%item.id, "failed to send post: {e}");
242 }
243 }
244 }
245 }
246 }
247 tracing::info!(lane=%lane_name, "post lane terminated");
248 }
249
250 pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
251 match item.lane {
252 PostLane::Alo => self
253 .tx_alo
254 .send(item)
255 .await
256 .map_err(|_| Error::transport("ALO lane closed")),
257 PostLane::Normal => self
258 .tx_normal
259 .send(item)
260 .await
261 .map_err(|_| Error::transport("NORMAL lane closed")),
262 }
263 }
264}
265
266pub fn lane_for_action(action: &ActionRequest) -> PostLane {
268 match action {
269 ActionRequest::Order { orders, .. } => {
270 if orders.is_empty() {
271 return PostLane::Normal;
272 }
273 let all_alo = orders.iter().all(|o| {
274 matches!(
275 o.t,
276 OrderTypeRequest::Limit {
277 tif: TimeInForceRequest::Alo
278 }
279 )
280 });
281 if all_alo {
282 PostLane::Alo
283 } else {
284 PostLane::Normal
285 }
286 }
287 _ => PostLane::Normal,
288 }
289}
290
291#[derive(Debug, Clone, Copy, Default)]
296pub enum Grouping {
297 #[default]
298 Na,
299 NormalTpsl,
300 PositionTpsl,
301}
302impl Grouping {
303 pub fn as_str(&self) -> &'static str {
304 match self {
305 Self::Na => "na",
306 Self::NormalTpsl => "normalTpsl",
307 Self::PositionTpsl => "positionTpsl",
308 }
309 }
310}
311
312#[derive(Debug, Clone, Builder)]
314pub struct LimitOrderParams {
315 pub asset: u32,
316 pub is_buy: bool,
317 pub px: String,
318 pub sz: String,
319 pub reduce_only: bool,
320 pub tif: TimeInForceRequest,
321 pub cloid: Option<String>,
322}
323
324#[derive(Debug, Clone, Builder)]
326pub struct TriggerOrderParams {
327 pub asset: u32,
328 pub is_buy: bool,
329 pub px: String,
330 pub sz: String,
331 pub reduce_only: bool,
332 pub is_market: bool,
333 pub trigger_px: String,
334 pub tpsl: TpSlRequest,
335 pub cloid: Option<String>,
336}
337
338#[derive(Debug, Default)]
340pub struct OrderBuilder {
341 orders: Vec<OrderRequest>,
342 grouping: Grouping,
343}
344
345impl OrderBuilder {
346 pub fn new() -> Self {
347 Self::default()
348 }
349
350 #[must_use]
351 pub fn grouping(mut self, g: Grouping) -> Self {
352 self.grouping = g;
353 self
354 }
355
356 #[allow(clippy::too_many_arguments)]
358 #[must_use]
359 pub fn push_limit(
360 self,
361 asset: u32,
362 is_buy: bool,
363 px: impl ToString,
364 sz: impl ToString,
365 reduce_only: bool,
366 tif: TimeInForceRequest,
367 cloid: Option<String>,
368 ) -> Self {
369 let params = LimitOrderParams {
370 asset,
371 is_buy,
372 px: px.to_string(),
373 sz: sz.to_string(),
374 reduce_only,
375 tif,
376 cloid,
377 };
378 self.push_limit_order(params)
379 }
380
381 #[must_use]
383 pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
384 self.orders.push(OrderRequest {
385 a: params.asset,
386 b: params.is_buy,
387 p: params.px,
388 s: params.sz,
389 r: params.reduce_only,
390 t: OrderTypeRequest::Limit { tif: params.tif },
391 c: params.cloid,
392 });
393 self
394 }
395
396 #[allow(clippy::too_many_arguments)]
398 #[must_use]
399 pub fn push_trigger(
400 self,
401 asset: u32,
402 is_buy: bool,
403 px: impl ToString,
404 sz: impl ToString,
405 reduce_only: bool,
406 is_market: bool,
407 trigger_px: impl ToString,
408 tpsl: TpSlRequest,
409 cloid: Option<String>,
410 ) -> Self {
411 let params = TriggerOrderParams {
412 asset,
413 is_buy,
414 px: px.to_string(),
415 sz: sz.to_string(),
416 reduce_only,
417 is_market,
418 trigger_px: trigger_px.to_string(),
419 tpsl,
420 cloid,
421 };
422 self.push_trigger_order(params)
423 }
424
425 #[must_use]
427 pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
428 self.orders.push(OrderRequest {
429 a: params.asset,
430 b: params.is_buy,
431 p: params.px,
432 s: params.sz,
433 r: params.reduce_only,
434 t: OrderTypeRequest::Trigger {
435 is_market: params.is_market,
436 trigger_px: params.trigger_px,
437 tpsl: params.tpsl,
438 },
439 c: params.cloid,
440 });
441 self
442 }
443 pub fn build(self) -> ActionRequest {
444 ActionRequest::Order {
445 orders: self.orders,
446 grouping: self.grouping.as_str().to_string(),
447 }
448 }
449
450 pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
467 Self::new().push_limit_order(params).build()
468 }
469
470 pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
489 Self::new().push_trigger_order(params).build()
490 }
491}
492
493pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
494 ActionRequest::Cancel {
495 cancels: cancels
496 .into_iter()
497 .map(|(a, o)| CancelRequest { a, o })
498 .collect(),
499 }
500}
501pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
502 ActionRequest::CancelByCloid {
503 cancels: vec![CancelByCloidRequest {
504 asset,
505 cloid: cloid.into(),
506 }],
507 }
508}
509pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
510 ActionRequest::Modify {
511 modifies: vec![ModifyRequest {
512 oid,
513 order: new_order,
514 }],
515 }
516}
517
518pub fn info_l2_book(coin: &str) -> PostRequest {
520 PostRequest::Info {
521 payload: serde_json::json!({"type":"l2Book","coin":coin}),
522 }
523}
524pub fn info_all_mids() -> PostRequest {
525 PostRequest::Info {
526 payload: serde_json::json!({"type":"allMids"}),
527 }
528}
529pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
530 PostRequest::Info {
531 payload: serde_json::json!({"type":"orderStatus","user":user,"oid":oid}),
532 }
533}
534pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
535 let mut body = serde_json::json!({"type":"openOrders","user":user});
536 if let Some(fe) = frontend {
537 body["frontend"] = serde_json::json!(fe);
538 }
539 PostRequest::Info { payload: body }
540}
541pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
542 let mut body = serde_json::json!({"type":"userFills","user":user});
543 if let Some(agg) = aggregate_by_time {
544 body["aggregateByTime"] = serde_json::json!(agg);
545 }
546 PostRequest::Info { payload: body }
547}
548pub fn info_user_rate_limit(user: &str) -> PostRequest {
549 PostRequest::Info {
550 payload: serde_json::json!({"type":"userRateLimit","user":user}),
551 }
552}
553pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
554 PostRequest::Info {
555 payload: serde_json::json!({"type":"candle","coin":coin,"interval":interval}),
556 }
557}
558
559pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
564 serde_json::from_value(payload.clone()).map_err(Error::Serde)
565}
566pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
567 serde_json::from_value(payload.clone()).map_err(Error::Serde)
568}
569pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
570 serde_json::from_value(payload.clone()).map_err(Error::Serde)
571}
572
573#[derive(Debug)]
575pub enum ActionOutcome<'a> {
576 Resting {
577 oid: u64,
578 },
579 Filled {
580 total_sz: &'a str,
581 avg_px: &'a str,
582 oid: Option<u64>,
583 },
584 Error {
585 msg: &'a str,
586 },
587 Unknown(&'a serde_json::Value),
588}
589pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
590 if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
591 if let (Some(total_sz), Some(avg_px)) = (
592 payload.get("totalSz").and_then(|v| v.as_str()),
593 payload.get("avgPx").and_then(|v| v.as_str()),
594 ) {
595 return ActionOutcome::Filled {
596 total_sz,
597 avg_px,
598 oid: Some(oid),
599 };
600 }
601 return ActionOutcome::Resting { oid };
602 }
603 if let (Some(total_sz), Some(avg_px)) = (
604 payload.get("totalSz").and_then(|v| v.as_str()),
605 payload.get("avgPx").and_then(|v| v.as_str()),
606 ) {
607 return ActionOutcome::Filled {
608 total_sz,
609 avg_px,
610 oid: None,
611 };
612 }
613 if let Some(msg) = payload
614 .get("error")
615 .and_then(|v| v.as_str())
616 .or_else(|| payload.get("message").and_then(|v| v.as_str()))
617 {
618 return ActionOutcome::Error { msg };
619 }
620 ActionOutcome::Unknown(payload)
621}
622
623#[derive(Clone, Debug)]
628pub struct WsSender {
629 inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
630}
631
632impl WsSender {
633 pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
634 Self {
635 inner: Arc::new(tokio::sync::Mutex::new(tx)),
636 }
637 }
638
639 pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
640 let sender = self.inner.lock().await;
641 sender
642 .send(req)
643 .await
644 .map_err(|_| Error::transport("WebSocket sender closed"))
645 }
646}
647
648#[cfg(test)]
649mod tests {
650 use rstest::rstest;
651 use tokio::{
652 sync::oneshot,
653 time::{Duration, sleep, timeout},
654 };
655
656 use super::*;
657 use crate::{
658 common::consts::INFLIGHT_MAX,
659 websocket::messages::{
660 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
661 OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
662 },
663 };
664
665 fn mk_limit_alo(asset: u32) -> OrderRequest {
668 OrderRequest {
669 a: asset,
670 b: true,
671 p: "1".to_string(),
672 s: "1".to_string(),
673 r: false,
674 t: OrderTypeRequest::Limit {
675 tif: TimeInForceRequest::Alo,
676 },
677 c: None,
678 }
679 }
680
681 fn mk_limit_gtc(asset: u32) -> OrderRequest {
682 OrderRequest {
683 a: asset,
684 b: true,
685 p: "1".to_string(),
686 s: "1".to_string(),
687 r: false,
688 t: OrderTypeRequest::Limit {
689 tif: TimeInForceRequest::Gtc,
691 },
692 c: None,
693 }
694 }
695
696 #[rstest]
699 #[tokio::test(flavor = "multi_thread")]
700 async fn register_duplicate_id_errors() {
701 let router = PostRouter::new();
702 let _rx = router.register(42).await.expect("first register OK");
703
704 let err = router.register(42).await.expect_err("duplicate must error");
705 let msg = err.to_string().to_lowercase();
706 assert!(
707 msg.contains("already") || msg.contains("duplicate"),
708 "unexpected error: {msg}"
709 );
710 }
711
712 #[rstest]
713 #[tokio::test(flavor = "multi_thread")]
714 async fn timeout_cancels_and_allows_reregister() {
715 let router = PostRouter::new();
716 let id = 7;
717
718 let rx = router.register(id).await.unwrap();
719 let err = router
721 .await_with_timeout(id, rx, Duration::from_millis(25))
722 .await
723 .expect_err("should timeout");
724 assert!(
725 err.to_string().to_lowercase().contains("timeout")
726 || err.to_string().to_lowercase().contains("closed"),
727 "unexpected error kind: {err}"
728 );
729
730 let _rx2 = router
732 .register(id)
733 .await
734 .expect("id should be reusable after timeout cancel");
735 }
736
737 #[rstest]
738 #[tokio::test(flavor = "multi_thread")]
739 async fn inflight_cap_blocks_then_unblocks() {
740 let router = PostRouter::new();
741
742 let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
744 for i in 0..INFLIGHT_MAX {
745 let rx = router.register(i as u64).await.unwrap();
746 rxs.push(rx); }
748
749 let router2 = Arc::clone(&router);
751 let (entered_tx, entered_rx) = oneshot::channel::<()>();
752 let (done_tx, done_rx) = oneshot::channel::<()>();
753 let (check_tx, check_rx) = oneshot::channel::<()>(); tokio::spawn(async move {
756 let _ = entered_tx.send(());
757 let _rx = router2.register(9_999_999).await.unwrap();
758 let _ = done_tx.send(());
759 });
760
761 entered_rx.await.unwrap();
763
764 tokio::spawn(async move {
766 if done_rx.await.is_ok() {
767 let _ = check_tx.send(());
768 }
769 });
770
771 assert!(
772 timeout(Duration::from_millis(50), check_rx).await.is_err(),
773 "should still be blocked while at cap"
774 );
775
776 router.cancel(0).await;
778
779 tokio::time::sleep(Duration::from_millis(100)).await;
781 }
782
783 #[rstest(
786 orders, expected,
787 case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
788 case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
789 case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
790 case::empty(vec![], PostLane::Normal),
791 )]
792 fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
793 let action = ActionRequest::Order {
794 orders,
795 grouping: "na".to_string(),
796 };
797 assert_eq!(lane_for_action(&action), expected);
798 }
799
800 #[rstest]
803 fn test_order_request_builder() {
804 let order = OrderRequestBuilder::default()
806 .a(0)
807 .b(true)
808 .p("40000.0".to_string())
809 .s("0.01".to_string())
810 .r(false)
811 .t(OrderTypeRequest::Limit {
812 tif: TimeInForceRequest::Gtc,
813 })
814 .c(Some("test-order-1".to_string()))
815 .build()
816 .expect("should build order");
817
818 assert_eq!(order.a, 0);
819 assert!(order.b);
820 assert_eq!(order.p, "40000.0");
821 assert_eq!(order.s, "0.01");
822 assert!(!order.r);
823 assert_eq!(order.c, Some("test-order-1".to_string()));
824 }
825
826 #[rstest]
827 fn test_limit_order_params_builder() {
828 let params = LimitOrderParamsBuilder::default()
830 .asset(0)
831 .is_buy(true)
832 .px("40000.0".to_string())
833 .sz("0.01".to_string())
834 .reduce_only(false)
835 .tif(TimeInForceRequest::Alo)
836 .cloid(Some("test-limit-1".to_string()))
837 .build()
838 .expect("should build limit params");
839
840 assert_eq!(params.asset, 0);
841 assert!(params.is_buy);
842 assert_eq!(params.px, "40000.0");
843 assert_eq!(params.sz, "0.01");
844 assert!(!params.reduce_only);
845 assert_eq!(params.cloid, Some("test-limit-1".to_string()));
846 }
847
848 #[rstest]
849 fn test_trigger_order_params_builder() {
850 let params = TriggerOrderParamsBuilder::default()
852 .asset(1)
853 .is_buy(false)
854 .px("39000.0".to_string())
855 .sz("0.02".to_string())
856 .reduce_only(false)
857 .is_market(true)
858 .trigger_px("39500.0".to_string())
859 .tpsl(TpSlRequest::Sl)
860 .cloid(Some("test-trigger-1".to_string()))
861 .build()
862 .expect("should build trigger params");
863
864 assert_eq!(params.asset, 1);
865 assert!(!params.is_buy);
866 assert_eq!(params.px, "39000.0");
867 assert!(params.is_market);
868 assert_eq!(params.trigger_px, "39500.0");
869 }
870
871 #[rstest]
872 fn test_order_builder_single_limit_convenience() {
873 let params = LimitOrderParamsBuilder::default()
875 .asset(0)
876 .is_buy(true)
877 .px("40000.0".to_string())
878 .sz("0.01".to_string())
879 .reduce_only(false)
880 .tif(TimeInForceRequest::Gtc)
881 .cloid(None)
882 .build()
883 .unwrap();
884
885 let action = OrderBuilder::single_limit_order(params);
886
887 match action {
888 ActionRequest::Order { orders, grouping } => {
889 assert_eq!(orders.len(), 1);
890 assert_eq!(orders[0].a, 0);
891 assert!(orders[0].b);
892 assert_eq!(grouping, "na");
893 }
894 _ => panic!("Expected ActionRequest::Order variant"),
895 }
896 }
897
898 #[rstest]
899 fn test_order_builder_single_trigger_convenience() {
900 let params = TriggerOrderParamsBuilder::default()
902 .asset(1)
903 .is_buy(false)
904 .px("39000.0".to_string())
905 .sz("0.02".to_string())
906 .reduce_only(false)
907 .is_market(true)
908 .trigger_px("39500.0".to_string())
909 .tpsl(TpSlRequest::Sl)
910 .cloid(Some("sl-order".to_string()))
911 .build()
912 .unwrap();
913
914 let action = OrderBuilder::single_trigger_order(params);
915
916 match action {
917 ActionRequest::Order { orders, grouping } => {
918 assert_eq!(orders.len(), 1);
919 assert_eq!(orders[0].a, 1);
920 assert_eq!(orders[0].c, Some("sl-order".to_string()));
921 assert_eq!(grouping, "na");
922 }
923 _ => panic!("Expected ActionRequest::Order variant"),
924 }
925 }
926
927 #[rstest]
928 fn test_order_builder_batch_orders() {
929 let params1 = LimitOrderParams {
931 asset: 0,
932 is_buy: true,
933 px: "40000.0".to_string(),
934 sz: "0.01".to_string(),
935 reduce_only: false,
936 tif: TimeInForceRequest::Gtc,
937 cloid: Some("order-1".to_string()),
938 };
939
940 let params2 = LimitOrderParams {
941 asset: 1,
942 is_buy: false,
943 px: "2000.0".to_string(),
944 sz: "0.5".to_string(),
945 reduce_only: false,
946 tif: TimeInForceRequest::Ioc,
947 cloid: Some("order-2".to_string()),
948 };
949
950 let action = OrderBuilder::new()
951 .grouping(Grouping::NormalTpsl)
952 .push_limit_order(params1)
953 .push_limit_order(params2)
954 .build();
955
956 match action {
957 ActionRequest::Order { orders, grouping } => {
958 assert_eq!(orders.len(), 2);
959 assert_eq!(orders[0].c, Some("order-1".to_string()));
960 assert_eq!(orders[1].c, Some("order-2".to_string()));
961 assert_eq!(grouping, "normalTpsl");
962 }
963 _ => panic!("Expected ActionRequest::Order variant"),
964 }
965 }
966
967 #[rstest]
968 fn test_action_request_constructors() {
969 let order1 = mk_limit_gtc(0);
971 let order2 = mk_limit_gtc(1);
972 let action = ActionRequest::order(vec![order1, order2], "na");
973
974 match action {
975 ActionRequest::Order { orders, grouping } => {
976 assert_eq!(orders.len(), 2);
977 assert_eq!(grouping, "na");
978 }
979 _ => panic!("Expected ActionRequest::Order variant"),
980 }
981
982 let cancels = vec![CancelRequest { a: 0, o: 12345 }];
984 let action = ActionRequest::cancel(cancels);
985 assert!(matches!(action, ActionRequest::Cancel { .. }));
986
987 let cancels = vec![CancelByCloidRequest {
989 asset: 0,
990 cloid: "order-1".to_string(),
991 }];
992 let action = ActionRequest::cancel_by_cloid(cancels);
993 assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
994 }
995
996 #[rstest]
999 #[tokio::test(flavor = "multi_thread")]
1000 async fn batcher_sends_on_tick() {
1001 let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1003 let sent_closure = sent.clone();
1004
1005 let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
1006 let sent_inner = sent_closure.clone();
1007 Box::pin(async move {
1008 if let HyperliquidWsRequest::Post { id, .. } = req {
1009 sent_inner.lock().await.push(id);
1010 }
1011 Ok(())
1012 })
1013 };
1014
1015 let batcher = PostBatcher::new(send_fn);
1016
1017 for id in 1..=5u64 {
1019 batcher
1020 .enqueue(ScheduledPost {
1021 id,
1022 request: PostRequest::Info {
1023 payload: serde_json::json!({"type":"allMids"}),
1024 },
1025 lane: PostLane::Normal,
1026 })
1027 .await
1028 .unwrap();
1029 }
1030
1031 sleep(Duration::from_millis(80)).await;
1033
1034 let got = sent.lock().await.clone();
1035 assert_eq!(got.len(), 5, "expected 5 sends on first tick");
1036 assert_eq!(got, vec![1, 2, 3, 4, 5]);
1037 }
1038}