1use std::{
17 collections::HashMap,
18 sync::{
19 Arc,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::Duration,
23};
24
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use tokio::{
28 sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
29 time,
30};
31
32use crate::{
33 common::consts::INFLIGHT_MAX,
34 http::{
35 error::{Error, Result},
36 models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
37 },
38 websocket::messages::{
39 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
40 OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
41 },
42};
43
44#[derive(Debug)]
50struct Waiter {
51 tx: oneshot::Sender<PostResponse>,
52 _permit: OwnedSemaphorePermit,
54}
55
56#[derive(Debug)]
57pub struct PostRouter {
58 inner: Mutex<HashMap<u64, Waiter>>,
59 inflight: Arc<Semaphore>, }
61
62impl Default for PostRouter {
63 fn default() -> Self {
64 Self {
65 inner: Mutex::new(HashMap::new()),
66 inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
67 }
68 }
69}
70
71impl PostRouter {
72 pub fn new() -> Arc<Self> {
73 Arc::new(Self::default())
74 }
75
76 pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
78 let permit = self
80 .inflight
81 .clone()
82 .acquire_owned()
83 .await
84 .map_err(|_| Error::transport("post router semaphore closed"))?;
85
86 let (tx, rx) = oneshot::channel::<PostResponse>();
87 let mut map = self.inner.lock().await;
88 if map.contains_key(&id) {
89 return Err(Error::transport(format!("post id {id} already registered")));
90 }
91 map.insert(
92 id,
93 Waiter {
94 tx,
95 _permit: permit,
96 },
97 );
98 Ok(rx)
99 }
100
101 pub async fn complete(&self, resp: PostResponse) {
103 let id = resp.id;
104 let waiter = {
105 let mut map = self.inner.lock().await;
106 map.remove(&id)
107 };
108 if let Some(waiter) = waiter {
109 if waiter.tx.send(resp).is_err() {
110 tracing::warn!(id, "post waiter dropped before delivery");
111 }
112 } else {
114 tracing::warn!(id, "post response with unknown id (late/duplicate?)");
115 }
116 }
117
118 pub async fn cancel(&self, id: u64) {
120 let _ = {
121 let mut map = self.inner.lock().await;
122 map.remove(&id)
123 };
124 }
126
127 pub async fn await_with_timeout(
129 &self,
130 id: u64,
131 rx: oneshot::Receiver<PostResponse>,
132 timeout: Duration,
133 ) -> Result<PostResponse> {
134 match time::timeout(timeout, rx).await {
135 Ok(Ok(resp)) => Ok(resp),
136 Ok(Err(_closed)) => {
137 self.cancel(id).await;
138 Err(Error::transport("post response channel closed"))
139 }
140 Err(_elapsed) => {
141 self.cancel(id).await;
142 Err(Error::Timeout)
143 }
144 }
145 }
146}
147
148#[derive(Debug)]
153pub struct PostIds(AtomicU64);
154
155impl PostIds {
156 pub fn new(start: u64) -> Self {
157 Self(AtomicU64::new(start))
158 }
159 pub fn next(&self) -> u64 {
160 self.0.fetch_add(1, Ordering::Relaxed)
161 }
162}
163
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
169pub enum PostLane {
170 Alo, Normal, }
173
174#[derive(Debug)]
175pub struct ScheduledPost {
176 pub id: u64,
177 pub request: PostRequest,
178 pub lane: PostLane,
179}
180
181#[derive(Debug)]
182pub struct PostBatcher {
183 tx_alo: mpsc::Sender<ScheduledPost>,
184 tx_normal: mpsc::Sender<ScheduledPost>,
185}
186
187impl PostBatcher {
188 pub fn new<F>(send_fn: F) -> Self
190 where
191 F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
192 {
193 let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
194 let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
195
196 tokio::spawn(Self::run_lane(
198 "ALO",
199 rx_alo,
200 Duration::from_millis(100),
201 send_fn.clone(),
202 ));
203
204 tokio::spawn(Self::run_lane(
206 "NORMAL",
207 rx_normal,
208 Duration::from_millis(50),
209 send_fn,
210 ));
211
212 Self { tx_alo, tx_normal }
213 }
214
215 async fn run_lane<F>(
216 lane_name: &'static str,
217 mut rx: mpsc::Receiver<ScheduledPost>,
218 tick: Duration,
219 mut send_fn: F,
220 ) where
221 F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
222 {
223 let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
224 let mut interval = time::interval(tick);
225 interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
226
227 loop {
228 tokio::select! {
229 maybe_item = rx.recv() => {
230 match maybe_item {
231 Some(item) => pend.push(item),
232 None => break, }
234 }
235 _ = interval.tick() => {
236 if pend.is_empty() { continue; }
237 let to_send = std::mem::take(&mut pend);
238 for item in to_send {
239 let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
240 if let Err(e) = send_fn(req).await {
241 tracing::error!(lane=%lane_name, id=%item.id, "failed to send post: {e}");
242 }
243 }
244 }
245 }
246 }
247 tracing::info!(lane=%lane_name, "post lane terminated");
248 }
249
250 pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
251 match item.lane {
252 PostLane::Alo => self
253 .tx_alo
254 .send(item)
255 .await
256 .map_err(|_| Error::transport("ALO lane closed")),
257 PostLane::Normal => self
258 .tx_normal
259 .send(item)
260 .await
261 .map_err(|_| Error::transport("NORMAL lane closed")),
262 }
263 }
264}
265
266pub fn lane_for_action(action: &ActionRequest) -> PostLane {
268 match action {
269 ActionRequest::Order { orders, .. } => {
270 if orders.is_empty() {
271 return PostLane::Normal;
272 }
273 let all_alo = orders.iter().all(|o| {
274 matches!(
275 o.t,
276 OrderTypeRequest::Limit {
277 tif: TimeInForceRequest::Alo
278 }
279 )
280 });
281 if all_alo {
282 PostLane::Alo
283 } else {
284 PostLane::Normal
285 }
286 }
287 _ => PostLane::Normal,
288 }
289}
290
291#[derive(Debug, Clone, Copy, Default)]
296pub enum Grouping {
297 #[default]
298 Na,
299 NormalTpsl,
300 PositionTpsl,
301}
302impl Grouping {
303 pub fn as_str(&self) -> &'static str {
304 match self {
305 Self::Na => "na",
306 Self::NormalTpsl => "normalTpsl",
307 Self::PositionTpsl => "positionTpsl",
308 }
309 }
310}
311
312#[derive(Debug, Clone, Builder)]
314pub struct LimitOrderParams {
315 pub asset: u32,
316 pub is_buy: bool,
317 pub px: String,
318 pub sz: String,
319 pub reduce_only: bool,
320 pub tif: TimeInForceRequest,
321 pub cloid: Option<String>,
322}
323
324#[derive(Debug, Clone, Builder)]
326pub struct TriggerOrderParams {
327 pub asset: u32,
328 pub is_buy: bool,
329 pub px: String,
330 pub sz: String,
331 pub reduce_only: bool,
332 pub is_market: bool,
333 pub trigger_px: String,
334 pub tpsl: TpSlRequest,
335 pub cloid: Option<String>,
336}
337
338#[derive(Debug, Default)]
340pub struct OrderBuilder {
341 orders: Vec<OrderRequest>,
342 grouping: Grouping,
343}
344
345impl OrderBuilder {
346 pub fn new() -> Self {
347 Self::default()
348 }
349 pub fn grouping(mut self, g: Grouping) -> Self {
350 self.grouping = g;
351 self
352 }
353
354 #[allow(clippy::too_many_arguments)]
356 pub fn push_limit(
357 self,
358 asset: u32,
359 is_buy: bool,
360 px: impl ToString,
361 sz: impl ToString,
362 reduce_only: bool,
363 tif: TimeInForceRequest,
364 cloid: Option<String>,
365 ) -> Self {
366 let params = LimitOrderParams {
367 asset,
368 is_buy,
369 px: px.to_string(),
370 sz: sz.to_string(),
371 reduce_only,
372 tif,
373 cloid,
374 };
375 self.push_limit_order(params)
376 }
377
378 pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
380 self.orders.push(OrderRequest {
381 a: params.asset,
382 b: params.is_buy,
383 p: params.px,
384 s: params.sz,
385 r: params.reduce_only,
386 t: OrderTypeRequest::Limit { tif: params.tif },
387 c: params.cloid,
388 });
389 self
390 }
391
392 #[allow(clippy::too_many_arguments)]
394 pub fn push_trigger(
395 self,
396 asset: u32,
397 is_buy: bool,
398 px: impl ToString,
399 sz: impl ToString,
400 reduce_only: bool,
401 is_market: bool,
402 trigger_px: impl ToString,
403 tpsl: TpSlRequest,
404 cloid: Option<String>,
405 ) -> Self {
406 let params = TriggerOrderParams {
407 asset,
408 is_buy,
409 px: px.to_string(),
410 sz: sz.to_string(),
411 reduce_only,
412 is_market,
413 trigger_px: trigger_px.to_string(),
414 tpsl,
415 cloid,
416 };
417 self.push_trigger_order(params)
418 }
419
420 pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
422 self.orders.push(OrderRequest {
423 a: params.asset,
424 b: params.is_buy,
425 p: params.px,
426 s: params.sz,
427 r: params.reduce_only,
428 t: OrderTypeRequest::Trigger {
429 is_market: params.is_market,
430 trigger_px: params.trigger_px,
431 tpsl: params.tpsl,
432 },
433 c: params.cloid,
434 });
435 self
436 }
437 pub fn build(self) -> ActionRequest {
438 ActionRequest::Order {
439 orders: self.orders,
440 grouping: self.grouping.as_str().to_string(),
441 }
442 }
443
444 pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
461 Self::new().push_limit_order(params).build()
462 }
463
464 pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
483 Self::new().push_trigger_order(params).build()
484 }
485}
486
487pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
488 ActionRequest::Cancel {
489 cancels: cancels
490 .into_iter()
491 .map(|(a, o)| CancelRequest { a, o })
492 .collect(),
493 }
494}
495pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
496 ActionRequest::CancelByCloid {
497 cancels: vec![CancelByCloidRequest {
498 asset,
499 cloid: cloid.into(),
500 }],
501 }
502}
503pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
504 ActionRequest::Modify {
505 modifies: vec![ModifyRequest {
506 oid,
507 order: new_order,
508 }],
509 }
510}
511
512pub fn info_l2_book(coin: &str) -> PostRequest {
514 PostRequest::Info {
515 payload: serde_json::json!({"type":"l2Book","coin":coin}),
516 }
517}
518pub fn info_all_mids() -> PostRequest {
519 PostRequest::Info {
520 payload: serde_json::json!({"type":"allMids"}),
521 }
522}
523pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
524 PostRequest::Info {
525 payload: serde_json::json!({"type":"orderStatus","user":user,"oid":oid}),
526 }
527}
528pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
529 let mut body = serde_json::json!({"type":"openOrders","user":user});
530 if let Some(fe) = frontend {
531 body["frontend"] = serde_json::json!(fe);
532 }
533 PostRequest::Info { payload: body }
534}
535pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
536 let mut body = serde_json::json!({"type":"userFills","user":user});
537 if let Some(agg) = aggregate_by_time {
538 body["aggregateByTime"] = serde_json::json!(agg);
539 }
540 PostRequest::Info { payload: body }
541}
542pub fn info_user_rate_limit(user: &str) -> PostRequest {
543 PostRequest::Info {
544 payload: serde_json::json!({"type":"userRateLimit","user":user}),
545 }
546}
547pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
548 PostRequest::Info {
549 payload: serde_json::json!({"type":"candle","coin":coin,"interval":interval}),
550 }
551}
552
553pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
558 serde_json::from_value(payload.clone()).map_err(Error::Serde)
559}
560pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
561 serde_json::from_value(payload.clone()).map_err(Error::Serde)
562}
563pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
564 serde_json::from_value(payload.clone()).map_err(Error::Serde)
565}
566
567#[derive(Debug)]
569pub enum ActionOutcome<'a> {
570 Resting {
571 oid: u64,
572 },
573 Filled {
574 total_sz: &'a str,
575 avg_px: &'a str,
576 oid: Option<u64>,
577 },
578 Error {
579 msg: &'a str,
580 },
581 Unknown(&'a serde_json::Value),
582}
583pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
584 if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
585 if let (Some(total_sz), Some(avg_px)) = (
586 payload.get("totalSz").and_then(|v| v.as_str()),
587 payload.get("avgPx").and_then(|v| v.as_str()),
588 ) {
589 return ActionOutcome::Filled {
590 total_sz,
591 avg_px,
592 oid: Some(oid),
593 };
594 }
595 return ActionOutcome::Resting { oid };
596 }
597 if let (Some(total_sz), Some(avg_px)) = (
598 payload.get("totalSz").and_then(|v| v.as_str()),
599 payload.get("avgPx").and_then(|v| v.as_str()),
600 ) {
601 return ActionOutcome::Filled {
602 total_sz,
603 avg_px,
604 oid: None,
605 };
606 }
607 if let Some(msg) = payload
608 .get("error")
609 .and_then(|v| v.as_str())
610 .or_else(|| payload.get("message").and_then(|v| v.as_str()))
611 {
612 return ActionOutcome::Error { msg };
613 }
614 ActionOutcome::Unknown(payload)
615}
616
617#[derive(Clone, Debug)]
622pub struct WsSender {
623 inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
624}
625
626impl WsSender {
627 pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
628 Self {
629 inner: Arc::new(tokio::sync::Mutex::new(tx)),
630 }
631 }
632
633 pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
634 let sender = self.inner.lock().await;
635 sender
636 .send(req)
637 .await
638 .map_err(|_| Error::transport("WebSocket sender closed"))
639 }
640}
641
642#[cfg(test)]
647mod tests {
648 use rstest::rstest;
649 use tokio::{
650 sync::oneshot,
651 time::{Duration, sleep, timeout},
652 };
653
654 use super::*;
655 use crate::{
656 common::consts::INFLIGHT_MAX,
657 websocket::messages::{
658 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
659 OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
660 },
661 };
662
663 fn mk_limit_alo(asset: u32) -> OrderRequest {
666 OrderRequest {
667 a: asset,
668 b: true,
669 p: "1".to_string(),
670 s: "1".to_string(),
671 r: false,
672 t: OrderTypeRequest::Limit {
673 tif: TimeInForceRequest::Alo,
674 },
675 c: None,
676 }
677 }
678
679 fn mk_limit_gtc(asset: u32) -> OrderRequest {
680 OrderRequest {
681 a: asset,
682 b: true,
683 p: "1".to_string(),
684 s: "1".to_string(),
685 r: false,
686 t: OrderTypeRequest::Limit {
687 tif: TimeInForceRequest::Gtc,
689 },
690 c: None,
691 }
692 }
693
694 #[rstest]
697 #[tokio::test(flavor = "multi_thread")]
698 async fn register_duplicate_id_errors() {
699 let router = PostRouter::new();
700 let _rx = router.register(42).await.expect("first register OK");
701
702 let err = router.register(42).await.expect_err("duplicate must error");
703 let msg = err.to_string().to_lowercase();
704 assert!(
705 msg.contains("already") || msg.contains("duplicate"),
706 "unexpected error: {msg}"
707 );
708 }
709
710 #[rstest]
711 #[tokio::test(flavor = "multi_thread")]
712 async fn timeout_cancels_and_allows_reregister() {
713 let router = PostRouter::new();
714 let id = 7;
715
716 let rx = router.register(id).await.unwrap();
717 let err = router
719 .await_with_timeout(id, rx, Duration::from_millis(25))
720 .await
721 .expect_err("should timeout");
722 assert!(
723 err.to_string().to_lowercase().contains("timeout")
724 || err.to_string().to_lowercase().contains("closed"),
725 "unexpected error kind: {err}"
726 );
727
728 let _rx2 = router
730 .register(id)
731 .await
732 .expect("id should be reusable after timeout cancel");
733 }
734
735 #[rstest]
736 #[tokio::test(flavor = "multi_thread")]
737 async fn inflight_cap_blocks_then_unblocks() {
738 let router = PostRouter::new();
739
740 let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
742 for i in 0..INFLIGHT_MAX {
743 let rx = router.register(i as u64).await.unwrap();
744 rxs.push(rx); }
746
747 let router2 = Arc::clone(&router);
749 let (entered_tx, entered_rx) = oneshot::channel::<()>();
750 let (done_tx, done_rx) = oneshot::channel::<()>();
751 let (check_tx, check_rx) = oneshot::channel::<()>(); tokio::spawn(async move {
754 let _ = entered_tx.send(());
755 let _rx = router2.register(9_999_999).await.unwrap();
756 let _ = done_tx.send(());
757 });
758
759 entered_rx.await.unwrap();
761
762 tokio::spawn(async move {
764 if done_rx.await.is_ok() {
765 let _ = check_tx.send(());
766 }
767 });
768
769 assert!(
770 timeout(Duration::from_millis(50), check_rx).await.is_err(),
771 "should still be blocked while at cap"
772 );
773
774 router.cancel(0).await;
776
777 tokio::time::sleep(Duration::from_millis(100)).await;
779 }
780
781 #[rstest(
784 orders, expected,
785 case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
786 case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
787 case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
788 case::empty(vec![], PostLane::Normal),
789 )]
790 fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
791 let action = ActionRequest::Order {
792 orders,
793 grouping: "na".to_string(),
794 };
795 assert_eq!(lane_for_action(&action), expected);
796 }
797
798 #[test]
801 fn test_order_request_builder() {
802 let order = OrderRequestBuilder::default()
804 .a(0)
805 .b(true)
806 .p("40000.0".to_string())
807 .s("0.01".to_string())
808 .r(false)
809 .t(OrderTypeRequest::Limit {
810 tif: TimeInForceRequest::Gtc,
811 })
812 .c(Some("test-order-1".to_string()))
813 .build()
814 .expect("should build order");
815
816 assert_eq!(order.a, 0);
817 assert!(order.b);
818 assert_eq!(order.p, "40000.0");
819 assert_eq!(order.s, "0.01");
820 assert!(!order.r);
821 assert_eq!(order.c, Some("test-order-1".to_string()));
822 }
823
824 #[test]
825 fn test_limit_order_params_builder() {
826 let params = LimitOrderParamsBuilder::default()
828 .asset(0)
829 .is_buy(true)
830 .px("40000.0".to_string())
831 .sz("0.01".to_string())
832 .reduce_only(false)
833 .tif(TimeInForceRequest::Alo)
834 .cloid(Some("test-limit-1".to_string()))
835 .build()
836 .expect("should build limit params");
837
838 assert_eq!(params.asset, 0);
839 assert!(params.is_buy);
840 assert_eq!(params.px, "40000.0");
841 assert_eq!(params.sz, "0.01");
842 assert!(!params.reduce_only);
843 assert_eq!(params.cloid, Some("test-limit-1".to_string()));
844 }
845
846 #[test]
847 fn test_trigger_order_params_builder() {
848 let params = TriggerOrderParamsBuilder::default()
850 .asset(1)
851 .is_buy(false)
852 .px("39000.0".to_string())
853 .sz("0.02".to_string())
854 .reduce_only(false)
855 .is_market(true)
856 .trigger_px("39500.0".to_string())
857 .tpsl(TpSlRequest::Sl)
858 .cloid(Some("test-trigger-1".to_string()))
859 .build()
860 .expect("should build trigger params");
861
862 assert_eq!(params.asset, 1);
863 assert!(!params.is_buy);
864 assert_eq!(params.px, "39000.0");
865 assert!(params.is_market);
866 assert_eq!(params.trigger_px, "39500.0");
867 }
868
869 #[test]
870 fn test_order_builder_single_limit_convenience() {
871 let params = LimitOrderParamsBuilder::default()
873 .asset(0)
874 .is_buy(true)
875 .px("40000.0".to_string())
876 .sz("0.01".to_string())
877 .reduce_only(false)
878 .tif(TimeInForceRequest::Gtc)
879 .cloid(None)
880 .build()
881 .unwrap();
882
883 let action = OrderBuilder::single_limit_order(params);
884
885 match action {
886 ActionRequest::Order { orders, grouping } => {
887 assert_eq!(orders.len(), 1);
888 assert_eq!(orders[0].a, 0);
889 assert!(orders[0].b);
890 assert_eq!(grouping, "na");
891 }
892 _ => panic!("Expected ActionRequest::Order variant"),
893 }
894 }
895
896 #[test]
897 fn test_order_builder_single_trigger_convenience() {
898 let params = TriggerOrderParamsBuilder::default()
900 .asset(1)
901 .is_buy(false)
902 .px("39000.0".to_string())
903 .sz("0.02".to_string())
904 .reduce_only(false)
905 .is_market(true)
906 .trigger_px("39500.0".to_string())
907 .tpsl(TpSlRequest::Sl)
908 .cloid(Some("sl-order".to_string()))
909 .build()
910 .unwrap();
911
912 let action = OrderBuilder::single_trigger_order(params);
913
914 match action {
915 ActionRequest::Order { orders, grouping } => {
916 assert_eq!(orders.len(), 1);
917 assert_eq!(orders[0].a, 1);
918 assert_eq!(orders[0].c, Some("sl-order".to_string()));
919 assert_eq!(grouping, "na");
920 }
921 _ => panic!("Expected ActionRequest::Order variant"),
922 }
923 }
924
925 #[test]
926 fn test_order_builder_batch_orders() {
927 let params1 = LimitOrderParams {
929 asset: 0,
930 is_buy: true,
931 px: "40000.0".to_string(),
932 sz: "0.01".to_string(),
933 reduce_only: false,
934 tif: TimeInForceRequest::Gtc,
935 cloid: Some("order-1".to_string()),
936 };
937
938 let params2 = LimitOrderParams {
939 asset: 1,
940 is_buy: false,
941 px: "2000.0".to_string(),
942 sz: "0.5".to_string(),
943 reduce_only: false,
944 tif: TimeInForceRequest::Ioc,
945 cloid: Some("order-2".to_string()),
946 };
947
948 let action = OrderBuilder::new()
949 .grouping(Grouping::NormalTpsl)
950 .push_limit_order(params1)
951 .push_limit_order(params2)
952 .build();
953
954 match action {
955 ActionRequest::Order { orders, grouping } => {
956 assert_eq!(orders.len(), 2);
957 assert_eq!(orders[0].c, Some("order-1".to_string()));
958 assert_eq!(orders[1].c, Some("order-2".to_string()));
959 assert_eq!(grouping, "normalTpsl");
960 }
961 _ => panic!("Expected ActionRequest::Order variant"),
962 }
963 }
964
965 #[test]
966 fn test_action_request_constructors() {
967 let order1 = mk_limit_gtc(0);
969 let order2 = mk_limit_gtc(1);
970 let action = ActionRequest::order(vec![order1, order2], "na");
971
972 match action {
973 ActionRequest::Order { orders, grouping } => {
974 assert_eq!(orders.len(), 2);
975 assert_eq!(grouping, "na");
976 }
977 _ => panic!("Expected ActionRequest::Order variant"),
978 }
979
980 let cancels = vec![CancelRequest { a: 0, o: 12345 }];
982 let action = ActionRequest::cancel(cancels);
983 assert!(matches!(action, ActionRequest::Cancel { .. }));
984
985 let cancels = vec![CancelByCloidRequest {
987 asset: 0,
988 cloid: "order-1".to_string(),
989 }];
990 let action = ActionRequest::cancel_by_cloid(cancels);
991 assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
992 }
993
994 #[rstest]
997 #[tokio::test(flavor = "multi_thread")]
998 async fn batcher_sends_on_tick() {
999 let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
1001 let sent_closure = sent.clone();
1002
1003 let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
1004 let sent_inner = sent_closure.clone();
1005 Box::pin(async move {
1006 if let HyperliquidWsRequest::Post { id, .. } = req {
1007 sent_inner.lock().await.push(id);
1008 }
1009 Ok(())
1010 })
1011 };
1012
1013 let batcher = PostBatcher::new(send_fn);
1014
1015 for id in 1..=5u64 {
1017 batcher
1018 .enqueue(ScheduledPost {
1019 id,
1020 request: PostRequest::Info {
1021 payload: serde_json::json!({"type":"allMids"}),
1022 },
1023 lane: PostLane::Normal,
1024 })
1025 .await
1026 .unwrap();
1027 }
1028
1029 sleep(Duration::from_millis(80)).await;
1031
1032 let got = sent.lock().await.clone();
1033 assert_eq!(got.len(), 5, "expected 5 sends on first tick");
1034 assert_eq!(got, vec![1, 2, 3, 4, 5]);
1035 }
1036}