1use std::{
34 fmt::Debug,
35 future::Future,
36 pin::Pin,
37 sync::{
38 Arc,
39 atomic::{AtomicBool, AtomicU64, Ordering},
40 },
41 time::Duration,
42};
43
44use futures_util::future;
45use nautilus_common::live::get_runtime;
46use nautilus_model::{
47 enums::OrderSide,
48 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
49 instruments::InstrumentAny,
50 reports::OrderStatusReport,
51};
52use tokio::{sync::RwLock, task::JoinHandle, time::interval};
53
54use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
55
56trait CancelExecutor: Send + Sync {
78 fn add_instrument(&self, instrument: InstrumentAny);
80
81 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
83
84 fn cancel_order(
86 &self,
87 instrument_id: InstrumentId,
88 client_order_id: Option<ClientOrderId>,
89 venue_order_id: Option<VenueOrderId>,
90 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
91
92 fn cancel_orders(
94 &self,
95 instrument_id: InstrumentId,
96 client_order_ids: Option<Vec<ClientOrderId>>,
97 venue_order_ids: Option<Vec<VenueOrderId>>,
98 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
99
100 fn cancel_all_orders(
102 &self,
103 instrument_id: InstrumentId,
104 order_side: Option<OrderSide>,
105 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
106}
107
108impl CancelExecutor for BitmexHttpClient {
109 fn add_instrument(&self, instrument: InstrumentAny) {
110 Self::cache_instrument(self, instrument);
111 }
112
113 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
114 Box::pin(async move {
115 Self::get_server_time(self)
116 .await
117 .map(|_| ())
118 .map_err(|e| anyhow::anyhow!("{e}"))
119 })
120 }
121
122 fn cancel_order(
123 &self,
124 instrument_id: InstrumentId,
125 client_order_id: Option<ClientOrderId>,
126 venue_order_id: Option<VenueOrderId>,
127 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
128 Box::pin(async move {
129 Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
130 })
131 }
132
133 fn cancel_orders(
134 &self,
135 instrument_id: InstrumentId,
136 client_order_ids: Option<Vec<ClientOrderId>>,
137 venue_order_ids: Option<Vec<VenueOrderId>>,
138 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
139 Box::pin(async move {
140 Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
141 })
142 }
143
144 fn cancel_all_orders(
145 &self,
146 instrument_id: InstrumentId,
147 order_side: Option<OrderSide>,
148 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
149 Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
150 }
151}
152
153#[derive(Debug, Clone)]
155pub struct CancelBroadcasterConfig {
156 pub pool_size: usize,
158 pub api_key: Option<String>,
160 pub api_secret: Option<String>,
162 pub base_url: Option<String>,
164 pub testnet: bool,
166 pub timeout_secs: Option<u64>,
168 pub max_retries: Option<u32>,
170 pub retry_delay_ms: Option<u64>,
172 pub retry_delay_max_ms: Option<u64>,
174 pub recv_window_ms: Option<u64>,
176 pub max_requests_per_second: Option<u32>,
178 pub max_requests_per_minute: Option<u32>,
180 pub health_check_interval_secs: u64,
182 pub health_check_timeout_secs: u64,
184 pub expected_reject_patterns: Vec<String>,
186 pub idempotent_success_patterns: Vec<String>,
188 pub proxy_urls: Vec<Option<String>>,
194}
195
196impl Default for CancelBroadcasterConfig {
197 fn default() -> Self {
198 Self {
199 pool_size: 2,
200 api_key: None,
201 api_secret: None,
202 base_url: None,
203 testnet: false,
204 timeout_secs: Some(60),
205 max_retries: None,
206 retry_delay_ms: Some(1_000),
207 retry_delay_max_ms: Some(5_000),
208 recv_window_ms: Some(10_000),
209 max_requests_per_second: Some(10),
210 max_requests_per_minute: Some(120),
211 health_check_interval_secs: 30,
212 health_check_timeout_secs: 5,
213 expected_reject_patterns: vec![
214 r"Order had execInst of ParticipateDoNotInitiate".to_string(),
215 ],
216 idempotent_success_patterns: vec![
217 r"AlreadyCanceled".to_string(),
218 r"orderID not found".to_string(),
219 r"Unable to cancel order due to existing state".to_string(),
220 ],
221 proxy_urls: vec![],
222 }
223 }
224}
225
226#[derive(Clone)]
228struct TransportClient {
229 executor: Arc<dyn CancelExecutor>,
234 client_id: String,
235 healthy: Arc<AtomicBool>,
236 cancel_count: Arc<AtomicU64>,
237 error_count: Arc<AtomicU64>,
238}
239
240impl Debug for TransportClient {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 f.debug_struct(stringify!(TransportClient))
243 .field("client_id", &self.client_id)
244 .field("healthy", &self.healthy)
245 .field("cancel_count", &self.cancel_count)
246 .field("error_count", &self.error_count)
247 .finish()
248 }
249}
250
251impl TransportClient {
252 fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
253 Self {
254 executor: Arc::new(executor),
255 client_id,
256 healthy: Arc::new(AtomicBool::new(true)),
257 cancel_count: Arc::new(AtomicU64::new(0)),
258 error_count: Arc::new(AtomicU64::new(0)),
259 }
260 }
261
262 fn is_healthy(&self) -> bool {
263 self.healthy.load(Ordering::Relaxed)
264 }
265
266 fn mark_healthy(&self) {
267 self.healthy.store(true, Ordering::Relaxed);
268 }
269
270 fn mark_unhealthy(&self) {
271 self.healthy.store(false, Ordering::Relaxed);
272 }
273
274 fn get_cancel_count(&self) -> u64 {
275 self.cancel_count.load(Ordering::Relaxed)
276 }
277
278 fn get_error_count(&self) -> u64 {
279 self.error_count.load(Ordering::Relaxed)
280 }
281
282 async fn health_check(&self, timeout_secs: u64) -> bool {
283 match tokio::time::timeout(
284 Duration::from_secs(timeout_secs),
285 self.executor.health_check(),
286 )
287 .await
288 {
289 Ok(Ok(())) => {
290 self.mark_healthy();
291 true
292 }
293 Ok(Err(e)) => {
294 log::warn!("Health check failed for client {}: {e:?}", self.client_id);
295 self.mark_unhealthy();
296 false
297 }
298 Err(_) => {
299 log::warn!("Health check timeout for client {}", self.client_id);
300 self.mark_unhealthy();
301 false
302 }
303 }
304 }
305
306 async fn cancel_order(
307 &self,
308 instrument_id: InstrumentId,
309 client_order_id: Option<ClientOrderId>,
310 venue_order_id: Option<VenueOrderId>,
311 ) -> anyhow::Result<OrderStatusReport> {
312 self.cancel_count.fetch_add(1, Ordering::Relaxed);
313
314 match self
315 .executor
316 .cancel_order(instrument_id, client_order_id, venue_order_id)
317 .await
318 {
319 Ok(report) => {
320 self.mark_healthy();
321 Ok(report)
322 }
323 Err(e) => {
324 self.error_count.fetch_add(1, Ordering::Relaxed);
325 Err(e)
326 }
327 }
328 }
329}
330
331#[cfg_attr(feature = "python", pyo3::pyclass)]
337#[derive(Debug)]
338pub struct CancelBroadcaster {
339 config: CancelBroadcasterConfig,
340 transports: Arc<Vec<TransportClient>>,
341 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
342 running: Arc<AtomicBool>,
343 total_cancels: Arc<AtomicU64>,
344 successful_cancels: Arc<AtomicU64>,
345 failed_cancels: Arc<AtomicU64>,
346 expected_rejects: Arc<AtomicU64>,
347 idempotent_successes: Arc<AtomicU64>,
348}
349
350impl CancelBroadcaster {
351 pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
357 let mut transports = Vec::with_capacity(config.pool_size);
358
359 let base_url = if config.testnet && config.base_url.is_none() {
361 Some(BITMEX_HTTP_TESTNET_URL.to_string())
362 } else {
363 config.base_url.clone()
364 };
365
366 for i in 0..config.pool_size {
367 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
369
370 let client = BitmexHttpClient::with_credentials(
371 config.api_key.clone(),
372 config.api_secret.clone(),
373 base_url.clone(),
374 config.timeout_secs,
375 config.max_retries,
376 config.retry_delay_ms,
377 config.retry_delay_max_ms,
378 config.recv_window_ms,
379 config.max_requests_per_second,
380 config.max_requests_per_minute,
381 proxy_url,
382 )
383 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
384
385 transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
386 }
387
388 Ok(Self {
389 config,
390 transports: Arc::new(transports),
391 health_check_task: Arc::new(RwLock::new(None)),
392 running: Arc::new(AtomicBool::new(false)),
393 total_cancels: Arc::new(AtomicU64::new(0)),
394 successful_cancels: Arc::new(AtomicU64::new(0)),
395 failed_cancels: Arc::new(AtomicU64::new(0)),
396 expected_rejects: Arc::new(AtomicU64::new(0)),
397 idempotent_successes: Arc::new(AtomicU64::new(0)),
398 })
399 }
400
401 pub async fn start(&self) -> anyhow::Result<()> {
407 if self.running.load(Ordering::Relaxed) {
408 return Ok(());
409 }
410
411 self.running.store(true, Ordering::Relaxed);
412
413 self.run_health_checks().await;
415
416 let transports = Arc::clone(&self.transports);
418 let running = Arc::clone(&self.running);
419 let interval_secs = self.config.health_check_interval_secs;
420 let timeout_secs = self.config.health_check_timeout_secs;
421
422 let task = get_runtime().spawn(async move {
423 let mut ticker = interval(Duration::from_secs(interval_secs));
424 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
425
426 loop {
427 ticker.tick().await;
428
429 if !running.load(Ordering::Relaxed) {
430 break;
431 }
432
433 let tasks: Vec<_> = transports
434 .iter()
435 .map(|t| t.health_check(timeout_secs))
436 .collect();
437
438 let results = future::join_all(tasks).await;
439 let healthy_count = results.iter().filter(|&&r| r).count();
440
441 log::debug!(
442 "Health check complete: {}/{} clients healthy",
443 healthy_count,
444 results.len()
445 );
446 }
447 });
448
449 *self.health_check_task.write().await = Some(task);
450
451 log::info!(
452 "CancelBroadcaster started with {} clients",
453 self.transports.len()
454 );
455
456 Ok(())
457 }
458
459 pub async fn stop(&self) {
461 if !self.running.load(Ordering::Relaxed) {
462 return;
463 }
464
465 self.running.store(false, Ordering::Relaxed);
466
467 if let Some(task) = self.health_check_task.write().await.take() {
468 task.abort();
469 }
470
471 log::info!("CancelBroadcaster stopped");
472 }
473
474 async fn run_health_checks(&self) {
475 let tasks: Vec<_> = self
476 .transports
477 .iter()
478 .map(|t| t.health_check(self.config.health_check_timeout_secs))
479 .collect();
480
481 let results = future::join_all(tasks).await;
482 let healthy_count = results.iter().filter(|&&r| r).count();
483
484 log::debug!(
485 "Health check complete: {}/{} clients healthy",
486 healthy_count,
487 results.len()
488 );
489 }
490
491 fn is_expected_reject(&self, error_message: &str) -> bool {
492 self.config
493 .expected_reject_patterns
494 .iter()
495 .any(|pattern| error_message.contains(pattern))
496 }
497
498 fn is_idempotent_success(&self, error_message: &str) -> bool {
499 self.config
500 .idempotent_success_patterns
501 .iter()
502 .any(|pattern| error_message.contains(pattern))
503 }
504
505 async fn process_cancel_results<T>(
509 &self,
510 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
511 idempotent_result: impl FnOnce() -> anyhow::Result<T>,
512 operation: &str,
513 params: String,
514 idempotent_reason: &str,
515 ) -> anyhow::Result<T>
516 where
517 T: Send + 'static,
518 {
519 let mut errors = Vec::new();
520
521 while !handles.is_empty() {
522 let current_handles = std::mem::take(&mut handles);
523 let (result, _idx, remaining) = future::select_all(current_handles).await;
524 handles = remaining.into_iter().collect();
525
526 match result {
527 Ok((client_id, Ok(result))) => {
528 for handle in &handles {
530 handle.abort();
531 }
532 self.successful_cancels.fetch_add(1, Ordering::Relaxed);
533
534 log::debug!("{operation} broadcast succeeded [{client_id}] {params}");
535
536 return Ok(result);
537 }
538 Ok((client_id, Err(e))) => {
539 let error_msg = e.to_string();
540
541 if self.is_idempotent_success(&error_msg) {
542 for handle in &handles {
544 handle.abort();
545 }
546 self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
547
548 log::debug!(
549 "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
550 );
551
552 return idempotent_result();
553 }
554
555 if self.is_expected_reject(&error_msg) {
556 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
557 log::debug!(
558 "Expected {} rejection [{}]: {} {}",
559 operation.to_lowercase(),
560 client_id,
561 error_msg,
562 params
563 );
564 errors.push(error_msg);
565 } else {
566 log::warn!(
567 "{operation} request failed [{client_id}]: {error_msg} {params}"
568 );
569 errors.push(error_msg);
570 }
571 }
572 Err(e) => {
573 log::warn!("{operation} task join error: {e:?}");
574 errors.push(format!("Task panicked: {e:?}"));
575 }
576 }
577 }
578
579 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
581 log::error!(
582 "All {} requests failed: {errors:?} {params}",
583 operation.to_lowercase(),
584 );
585 Err(anyhow::anyhow!(
586 "All {} requests failed: {errors:?}",
587 operation.to_lowercase(),
588 ))
589 }
590
591 pub async fn broadcast_cancel(
603 &self,
604 instrument_id: InstrumentId,
605 client_order_id: Option<ClientOrderId>,
606 venue_order_id: Option<VenueOrderId>,
607 ) -> anyhow::Result<Option<OrderStatusReport>> {
608 self.total_cancels.fetch_add(1, Ordering::Relaxed);
609
610 let healthy_transports: Vec<TransportClient> = self
611 .transports
612 .iter()
613 .filter(|t| t.is_healthy())
614 .cloned()
615 .collect();
616
617 if healthy_transports.is_empty() {
618 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
619 anyhow::bail!("No healthy transport clients available");
620 }
621
622 let mut handles = Vec::new();
623 for transport in healthy_transports {
624 let handle = get_runtime().spawn(async move {
625 let client_id = transport.client_id.clone();
626 let result = transport
627 .cancel_order(instrument_id, client_order_id, venue_order_id)
628 .await
629 .map(Some); (client_id, result)
631 });
632 handles.push(handle);
633 }
634
635 self.process_cancel_results(
636 handles,
637 || Ok(None),
638 "Cancel",
639 format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
640 "order already cancelled/not found",
641 )
642 .await
643 }
644
645 pub async fn broadcast_batch_cancel(
651 &self,
652 instrument_id: InstrumentId,
653 client_order_ids: Option<Vec<ClientOrderId>>,
654 venue_order_ids: Option<Vec<VenueOrderId>>,
655 ) -> anyhow::Result<Vec<OrderStatusReport>> {
656 self.total_cancels.fetch_add(1, Ordering::Relaxed);
657
658 let healthy_transports: Vec<TransportClient> = self
659 .transports
660 .iter()
661 .filter(|t| t.is_healthy())
662 .cloned()
663 .collect();
664
665 if healthy_transports.is_empty() {
666 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
667 anyhow::bail!("No healthy transport clients available");
668 }
669
670 let mut handles = Vec::new();
671
672 for transport in healthy_transports {
673 let client_order_ids_clone = client_order_ids.clone();
674 let venue_order_ids_clone = venue_order_ids.clone();
675 let handle = get_runtime().spawn(async move {
676 let client_id = transport.client_id.clone();
677 let result = transport
678 .executor
679 .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
680 .await;
681 (client_id, result)
682 });
683 handles.push(handle);
684 }
685
686 self.process_cancel_results(
687 handles,
688 || Ok(Vec::new()),
689 "Batch cancel",
690 format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
691 "orders already cancelled/not found",
692 )
693 .await
694 }
695
696 pub async fn broadcast_cancel_all(
702 &self,
703 instrument_id: InstrumentId,
704 order_side: Option<OrderSide>,
705 ) -> anyhow::Result<Vec<OrderStatusReport>> {
706 self.total_cancels.fetch_add(1, Ordering::Relaxed);
707
708 let healthy_transports: Vec<TransportClient> = self
709 .transports
710 .iter()
711 .filter(|t| t.is_healthy())
712 .cloned()
713 .collect();
714
715 if healthy_transports.is_empty() {
716 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
717 anyhow::bail!("No healthy transport clients available");
718 }
719
720 let mut handles = Vec::new();
721 for transport in healthy_transports {
722 let handle = get_runtime().spawn(async move {
723 let client_id = transport.client_id.clone();
724 let result = transport
725 .executor
726 .cancel_all_orders(instrument_id, order_side)
727 .await;
728 (client_id, result)
729 });
730 handles.push(handle);
731 }
732
733 self.process_cancel_results(
734 handles,
735 || Ok(Vec::new()),
736 "Cancel all",
737 format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
738 "no orders to cancel",
739 )
740 .await
741 }
742
743 pub fn get_metrics(&self) -> BroadcasterMetrics {
745 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
746 let total_clients = self.transports.len();
747
748 BroadcasterMetrics {
749 total_cancels: self.total_cancels.load(Ordering::Relaxed),
750 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
751 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
752 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
753 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
754 healthy_clients,
755 total_clients,
756 }
757 }
758
759 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
761 self.get_metrics()
762 }
763
764 pub fn get_client_stats(&self) -> Vec<ClientStats> {
766 self.transports
767 .iter()
768 .map(|t| ClientStats {
769 client_id: t.client_id.clone(),
770 healthy: t.is_healthy(),
771 cancel_count: t.get_cancel_count(),
772 error_count: t.get_error_count(),
773 })
774 .collect()
775 }
776
777 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
779 self.get_client_stats()
780 }
781
782 pub fn cache_instrument(&self, instrument: InstrumentAny) {
784 for transport in self.transports.iter() {
785 transport.executor.add_instrument(instrument.clone());
786 }
787 }
788
789 #[must_use]
790 pub fn clone_for_async(&self) -> Self {
791 Self {
792 config: self.config.clone(),
793 transports: Arc::clone(&self.transports),
794 health_check_task: Arc::clone(&self.health_check_task),
795 running: Arc::clone(&self.running),
796 total_cancels: Arc::clone(&self.total_cancels),
797 successful_cancels: Arc::clone(&self.successful_cancels),
798 failed_cancels: Arc::clone(&self.failed_cancels),
799 expected_rejects: Arc::clone(&self.expected_rejects),
800 idempotent_successes: Arc::clone(&self.idempotent_successes),
801 }
802 }
803
804 #[cfg(test)]
805 fn new_with_transports(
806 config: CancelBroadcasterConfig,
807 transports: Vec<TransportClient>,
808 ) -> Self {
809 Self {
810 config,
811 transports: Arc::new(transports),
812 health_check_task: Arc::new(RwLock::new(None)),
813 running: Arc::new(AtomicBool::new(false)),
814 total_cancels: Arc::new(AtomicU64::new(0)),
815 successful_cancels: Arc::new(AtomicU64::new(0)),
816 failed_cancels: Arc::new(AtomicU64::new(0)),
817 expected_rejects: Arc::new(AtomicU64::new(0)),
818 idempotent_successes: Arc::new(AtomicU64::new(0)),
819 }
820 }
821}
822
823#[derive(Debug, Clone)]
825pub struct BroadcasterMetrics {
826 pub total_cancels: u64,
827 pub successful_cancels: u64,
828 pub failed_cancels: u64,
829 pub expected_rejects: u64,
830 pub idempotent_successes: u64,
831 pub healthy_clients: usize,
832 pub total_clients: usize,
833}
834
835#[derive(Debug, Clone)]
837pub struct ClientStats {
838 pub client_id: String,
839 pub healthy: bool,
840 pub cancel_count: u64,
841 pub error_count: u64,
842}
843
844#[cfg(test)]
845mod tests {
846 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
847
848 use nautilus_core::UUID4;
849 use nautilus_model::{
850 enums::{
851 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
852 },
853 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
854 reports::OrderStatusReport,
855 types::{Price, Quantity},
856 };
857
858 use super::*;
859
860 #[derive(Clone)]
862 #[allow(clippy::type_complexity)]
863 struct MockExecutor {
864 handler: Arc<
865 dyn Fn(
866 InstrumentId,
867 Option<ClientOrderId>,
868 Option<VenueOrderId>,
869 )
870 -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
871 + Send
872 + Sync,
873 >,
874 }
875
876 impl MockExecutor {
877 fn new<F, Fut>(handler: F) -> Self
878 where
879 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
880 + Send
881 + Sync
882 + 'static,
883 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
884 {
885 Self {
886 handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
887 }
888 }
889 }
890
891 impl CancelExecutor for MockExecutor {
892 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
893 Box::pin(async { Ok(()) })
894 }
895
896 fn cancel_order(
897 &self,
898 instrument_id: InstrumentId,
899 client_order_id: Option<ClientOrderId>,
900 venue_order_id: Option<VenueOrderId>,
901 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
902 (self.handler)(instrument_id, client_order_id, venue_order_id)
903 }
904
905 fn cancel_orders(
906 &self,
907 _instrument_id: InstrumentId,
908 _client_order_ids: Option<Vec<ClientOrderId>>,
909 _venue_order_ids: Option<Vec<VenueOrderId>>,
910 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
911 {
912 Box::pin(async { Ok(Vec::new()) })
913 }
914
915 fn cancel_all_orders(
916 &self,
917 instrument_id: InstrumentId,
918 _order_side: Option<OrderSide>,
919 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
920 {
921 let handler = Arc::clone(&self.handler);
923 Box::pin(async move {
924 let result = handler(instrument_id, None, None).await;
926 match result {
927 Ok(_) => Ok(Vec::new()),
928 Err(e) => Err(e),
929 }
930 })
931 }
932
933 fn add_instrument(&self, _instrument: InstrumentAny) {
934 }
936 }
937
938 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
939 OrderStatusReport {
940 account_id: AccountId::from("BITMEX-001"),
941 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
942 venue_order_id: VenueOrderId::from(venue_order_id),
943 order_side: OrderSide::Buy,
944 order_type: OrderType::Limit,
945 time_in_force: TimeInForce::Gtc,
946 order_status: OrderStatus::Canceled,
947 price: Some(Price::new(50000.0, 2)),
948 quantity: Quantity::new(100.0, 0),
949 filled_qty: Quantity::new(0.0, 0),
950 report_id: UUID4::new(),
951 ts_accepted: 0.into(),
952 ts_last: 0.into(),
953 ts_init: 0.into(),
954 client_order_id: None,
955 avg_px: None,
956 trigger_price: None,
957 trigger_type: None,
958 contingency_type: ContingencyType::NoContingency,
959 expire_time: None,
960 order_list_id: None,
961 venue_position_id: None,
962 linked_order_ids: None,
963 parent_order_id: None,
964 display_qty: None,
965 limit_offset: None,
966 trailing_offset: None,
967 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
968 post_only: false,
969 reduce_only: false,
970 cancel_reason: None,
971 ts_triggered: None,
972 }
973 }
974
975 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
976 where
977 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
978 + Send
979 + Sync
980 + 'static,
981 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
982 {
983 let executor = MockExecutor::new(handler);
984 TransportClient::new(executor, client_id.to_string())
985 }
986
987 #[tokio::test]
988 async fn test_broadcast_cancel_immediate_success() {
989 let report = create_test_report("ORDER-1");
990 let report_clone = report.clone();
991
992 let transports = vec![
993 create_stub_transport("client-0", move |_, _, _| {
994 let report = report_clone.clone();
995 async move { Ok(report) }
996 }),
997 create_stub_transport("client-1", |_, _, _| async {
998 tokio::time::sleep(Duration::from_secs(10)).await;
999 anyhow::bail!("Should be aborted")
1000 }),
1001 ];
1002
1003 let config = CancelBroadcasterConfig::default();
1004 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1005
1006 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1007 let result = broadcaster
1008 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1009 .await;
1010
1011 assert!(result.is_ok());
1012 let returned_report = result.unwrap();
1013 assert!(returned_report.is_some());
1014 assert_eq!(
1015 returned_report.unwrap().venue_order_id,
1016 report.venue_order_id
1017 );
1018
1019 let metrics = broadcaster.get_metrics_async().await;
1020 assert_eq!(metrics.successful_cancels, 1);
1021 assert_eq!(metrics.failed_cancels, 0);
1022 assert_eq!(metrics.total_cancels, 1);
1023 }
1024
1025 #[tokio::test]
1026 async fn test_broadcast_cancel_idempotent_success() {
1027 let transports = vec![
1028 create_stub_transport("client-0", |_, _, _| async {
1029 anyhow::bail!("AlreadyCanceled")
1030 }),
1031 create_stub_transport("client-1", |_, _, _| async {
1032 tokio::time::sleep(Duration::from_secs(10)).await;
1033 anyhow::bail!("Should be aborted")
1034 }),
1035 ];
1036
1037 let config = CancelBroadcasterConfig::default();
1038 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1039
1040 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1041 let result = broadcaster
1042 .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1043 .await;
1044
1045 assert!(result.is_ok());
1046 assert!(result.unwrap().is_none());
1047
1048 let metrics = broadcaster.get_metrics_async().await;
1049 assert_eq!(metrics.idempotent_successes, 1);
1050 assert_eq!(metrics.successful_cancels, 0);
1051 assert_eq!(metrics.failed_cancels, 0);
1052 }
1053
1054 #[tokio::test]
1055 async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1056 let transports = vec![
1057 create_stub_transport("client-0", |_, _, _| async {
1058 anyhow::bail!("502 Bad Gateway")
1059 }),
1060 create_stub_transport("client-1", |_, _, _| async {
1061 anyhow::bail!("orderID not found")
1062 }),
1063 ];
1064
1065 let config = CancelBroadcasterConfig::default();
1066 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1067
1068 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1069 let result = broadcaster
1070 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1071 .await;
1072
1073 assert!(result.is_ok());
1074 assert!(result.unwrap().is_none());
1075
1076 let metrics = broadcaster.get_metrics_async().await;
1077 assert_eq!(metrics.idempotent_successes, 1);
1078 assert_eq!(metrics.failed_cancels, 0);
1079 }
1080
1081 #[tokio::test]
1082 async fn test_broadcast_cancel_all_failures() {
1083 let transports = vec![
1084 create_stub_transport("client-0", |_, _, _| async {
1085 anyhow::bail!("502 Bad Gateway")
1086 }),
1087 create_stub_transport("client-1", |_, _, _| async {
1088 anyhow::bail!("Connection refused")
1089 }),
1090 ];
1091
1092 let config = CancelBroadcasterConfig::default();
1093 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1094
1095 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1096 let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1097
1098 assert!(result.is_err());
1099 assert!(
1100 result
1101 .unwrap_err()
1102 .to_string()
1103 .contains("All cancel all requests failed")
1104 );
1105
1106 let metrics = broadcaster.get_metrics_async().await;
1107 assert_eq!(metrics.failed_cancels, 1);
1108 assert_eq!(metrics.successful_cancels, 0);
1109 assert_eq!(metrics.idempotent_successes, 0);
1110 }
1111
1112 #[tokio::test]
1113 async fn test_broadcast_cancel_no_healthy_clients() {
1114 let transport = create_stub_transport("client-0", |_, _, _| async {
1115 Ok(create_test_report("ORDER-1"))
1116 });
1117 transport.healthy.store(false, Ordering::Relaxed);
1118
1119 let config = CancelBroadcasterConfig::default();
1120 let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1121
1122 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1123 let result = broadcaster
1124 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1125 .await;
1126
1127 assert!(result.is_err());
1128 assert!(
1129 result
1130 .unwrap_err()
1131 .to_string()
1132 .contains("No healthy transport clients available")
1133 );
1134
1135 let metrics = broadcaster.get_metrics_async().await;
1136 assert_eq!(metrics.failed_cancels, 1);
1137 }
1138
1139 #[tokio::test]
1140 async fn test_broadcast_cancel_metrics_increment() {
1141 let report1 = create_test_report("ORDER-1");
1142 let report1_clone = report1.clone();
1143 let report2 = create_test_report("ORDER-2");
1144 let report2_clone = report2.clone();
1145
1146 let transports = vec![
1147 create_stub_transport("client-0", move |_, _, _| {
1148 let report = report1_clone.clone();
1149 async move { Ok(report) }
1150 }),
1151 create_stub_transport("client-1", move |_, _, _| {
1152 let report = report2_clone.clone();
1153 async move { Ok(report) }
1154 }),
1155 ];
1156
1157 let config = CancelBroadcasterConfig::default();
1158 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1159
1160 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1161
1162 let _ = broadcaster
1163 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1164 .await;
1165
1166 let _ = broadcaster
1167 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1168 .await;
1169
1170 let metrics = broadcaster.get_metrics_async().await;
1171 assert_eq!(metrics.total_cancels, 2);
1172 assert_eq!(metrics.successful_cancels, 2);
1173 }
1174
1175 #[tokio::test]
1176 async fn test_broadcast_cancel_expected_reject_pattern() {
1177 let transports = vec![
1178 create_stub_transport("client-0", |_, _, _| async {
1179 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1180 }),
1181 create_stub_transport("client-1", |_, _, _| async {
1182 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1183 }),
1184 ];
1185
1186 let config = CancelBroadcasterConfig::default();
1187 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1188
1189 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1190 let result = broadcaster
1191 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1192 .await;
1193
1194 assert!(result.is_err());
1195
1196 let metrics = broadcaster.get_metrics_async().await;
1197 assert_eq!(metrics.expected_rejects, 2);
1198 assert_eq!(metrics.failed_cancels, 1);
1199 }
1200
1201 #[tokio::test]
1202 async fn test_broadcaster_creation_with_pool() {
1203 let config = CancelBroadcasterConfig {
1204 pool_size: 3,
1205 api_key: Some("test_key".to_string()),
1206 api_secret: Some("test_secret".to_string()),
1207 base_url: Some("https://test.example.com".to_string()),
1208 testnet: false,
1209 timeout_secs: Some(5),
1210 max_retries: Some(2),
1211 retry_delay_ms: Some(100),
1212 retry_delay_max_ms: Some(1000),
1213 recv_window_ms: Some(5000),
1214 max_requests_per_second: Some(10),
1215 max_requests_per_minute: Some(100),
1216 health_check_interval_secs: 30,
1217 health_check_timeout_secs: 5,
1218 expected_reject_patterns: vec!["test_pattern".to_string()],
1219 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1220 proxy_urls: vec![],
1221 };
1222
1223 let broadcaster = CancelBroadcaster::new(config.clone());
1224 assert!(broadcaster.is_ok());
1225
1226 let broadcaster = broadcaster.unwrap();
1227 let metrics = broadcaster.get_metrics_async().await;
1228
1229 assert_eq!(metrics.total_clients, 3);
1230 assert_eq!(metrics.total_cancels, 0);
1231 assert_eq!(metrics.successful_cancels, 0);
1232 assert_eq!(metrics.failed_cancels, 0);
1233 }
1234
1235 #[tokio::test]
1236 async fn test_broadcaster_lifecycle() {
1237 let config = CancelBroadcasterConfig {
1238 pool_size: 2,
1239 api_key: Some("test_key".to_string()),
1240 api_secret: Some("test_secret".to_string()),
1241 base_url: Some("https://test.example.com".to_string()),
1242 testnet: false,
1243 timeout_secs: Some(5),
1244 max_retries: None,
1245 retry_delay_ms: None,
1246 retry_delay_max_ms: None,
1247 recv_window_ms: None,
1248 max_requests_per_second: None,
1249 max_requests_per_minute: None,
1250 health_check_interval_secs: 60, health_check_timeout_secs: 1,
1252 expected_reject_patterns: vec![],
1253 idempotent_success_patterns: vec![],
1254 proxy_urls: vec![],
1255 };
1256
1257 let broadcaster = CancelBroadcaster::new(config).unwrap();
1258
1259 assert!(!broadcaster.running.load(Ordering::Relaxed));
1261
1262 let start_result = broadcaster.start().await;
1264 assert!(start_result.is_ok());
1265 assert!(broadcaster.running.load(Ordering::Relaxed));
1266
1267 let start_again = broadcaster.start().await;
1269 assert!(start_again.is_ok());
1270
1271 broadcaster.stop().await;
1273 assert!(!broadcaster.running.load(Ordering::Relaxed));
1274
1275 broadcaster.stop().await;
1277 assert!(!broadcaster.running.load(Ordering::Relaxed));
1278 }
1279
1280 #[tokio::test]
1281 async fn test_client_stats_collection() {
1282 let config = CancelBroadcasterConfig {
1283 pool_size: 2,
1284 api_key: Some("test_key".to_string()),
1285 api_secret: Some("test_secret".to_string()),
1286 base_url: Some("https://test.example.com".to_string()),
1287 testnet: false,
1288 timeout_secs: Some(5),
1289 max_retries: None,
1290 retry_delay_ms: None,
1291 retry_delay_max_ms: None,
1292 recv_window_ms: None,
1293 max_requests_per_second: None,
1294 max_requests_per_minute: None,
1295 health_check_interval_secs: 60,
1296 health_check_timeout_secs: 5,
1297 expected_reject_patterns: vec![],
1298 idempotent_success_patterns: vec![],
1299 proxy_urls: vec![],
1300 };
1301
1302 let broadcaster = CancelBroadcaster::new(config).unwrap();
1303 let stats = broadcaster.get_client_stats_async().await;
1304
1305 assert_eq!(stats.len(), 2);
1306 assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1307 assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1308 assert!(stats[0].healthy); assert!(stats[1].healthy);
1310 assert_eq!(stats[0].cancel_count, 0);
1311 assert_eq!(stats[1].cancel_count, 0);
1312 assert_eq!(stats[0].error_count, 0);
1313 assert_eq!(stats[1].error_count, 0);
1314 }
1315
1316 #[tokio::test]
1317 async fn test_testnet_config_sets_base_url() {
1318 let config = CancelBroadcasterConfig {
1319 pool_size: 1,
1320 api_key: Some("test_key".to_string()),
1321 api_secret: Some("test_secret".to_string()),
1322 base_url: None, testnet: true, timeout_secs: Some(5),
1325 max_retries: None,
1326 retry_delay_ms: None,
1327 retry_delay_max_ms: None,
1328 recv_window_ms: None,
1329 max_requests_per_second: None,
1330 max_requests_per_minute: None,
1331 health_check_interval_secs: 60,
1332 health_check_timeout_secs: 5,
1333 expected_reject_patterns: vec![],
1334 idempotent_success_patterns: vec![],
1335 proxy_urls: vec![],
1336 };
1337
1338 let broadcaster = CancelBroadcaster::new(config);
1339 assert!(broadcaster.is_ok());
1340 }
1341
1342 #[tokio::test]
1343 async fn test_default_config() {
1344 let config = CancelBroadcasterConfig {
1345 api_key: Some("test_key".to_string()),
1346 api_secret: Some("test_secret".to_string()),
1347 base_url: Some("https://test.example.com".to_string()),
1348 ..Default::default()
1349 };
1350
1351 let broadcaster = CancelBroadcaster::new(config);
1352 assert!(broadcaster.is_ok());
1353
1354 let broadcaster = broadcaster.unwrap();
1355 let metrics = broadcaster.get_metrics_async().await;
1356
1357 assert_eq!(metrics.total_clients, 2);
1359 }
1360
1361 #[tokio::test]
1362 async fn test_clone_for_async() {
1363 let config = CancelBroadcasterConfig {
1364 pool_size: 1,
1365 api_key: Some("test_key".to_string()),
1366 api_secret: Some("test_secret".to_string()),
1367 base_url: Some("https://test.example.com".to_string()),
1368 testnet: false,
1369 timeout_secs: Some(5),
1370 max_retries: None,
1371 retry_delay_ms: None,
1372 retry_delay_max_ms: None,
1373 recv_window_ms: None,
1374 max_requests_per_second: None,
1375 max_requests_per_minute: None,
1376 health_check_interval_secs: 60,
1377 health_check_timeout_secs: 5,
1378 expected_reject_patterns: vec![],
1379 idempotent_success_patterns: vec![],
1380 proxy_urls: vec![],
1381 };
1382
1383 let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1384
1385 broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1387
1388 let broadcaster2 = broadcaster1.clone_for_async();
1390 let metrics2 = broadcaster2.get_metrics_async().await;
1391
1392 assert_eq!(metrics2.total_cancels, 1); broadcaster2
1396 .successful_cancels
1397 .fetch_add(5, Ordering::Relaxed);
1398
1399 let metrics1 = broadcaster1.get_metrics_async().await;
1401 assert_eq!(metrics1.successful_cancels, 5);
1402 }
1403
1404 #[tokio::test]
1405 async fn test_pattern_matching() {
1406 let config = CancelBroadcasterConfig {
1408 pool_size: 1,
1409 api_key: Some("test_key".to_string()),
1410 api_secret: Some("test_secret".to_string()),
1411 base_url: Some("https://test.example.com".to_string()),
1412 testnet: false,
1413 timeout_secs: Some(5),
1414 max_retries: None,
1415 retry_delay_ms: None,
1416 retry_delay_max_ms: None,
1417 recv_window_ms: None,
1418 max_requests_per_second: None,
1419 max_requests_per_minute: None,
1420 health_check_interval_secs: 60,
1421 health_check_timeout_secs: 5,
1422 expected_reject_patterns: vec![
1423 "ParticipateDoNotInitiate".to_string(),
1424 "Close-only".to_string(),
1425 ],
1426 idempotent_success_patterns: vec![
1427 "AlreadyCanceled".to_string(),
1428 "orderID not found".to_string(),
1429 "Unable to cancel".to_string(),
1430 ],
1431 proxy_urls: vec![],
1432 };
1433
1434 let broadcaster = CancelBroadcaster::new(config).unwrap();
1435
1436 assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1438 assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1439 assert!(!broadcaster.is_expected_reject("Connection timeout"));
1440
1441 assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1443 assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1444 assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1445 assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1446 }
1447
1448 #[tokio::test]
1452 async fn test_broadcast_batch_cancel_structure() {
1453 let config = CancelBroadcasterConfig {
1455 pool_size: 2,
1456 api_key: Some("test_key".to_string()),
1457 api_secret: Some("test_secret".to_string()),
1458 base_url: Some("https://test.example.com".to_string()),
1459 testnet: false,
1460 timeout_secs: Some(5),
1461 max_retries: None,
1462 retry_delay_ms: None,
1463 retry_delay_max_ms: None,
1464 recv_window_ms: None,
1465 max_requests_per_second: None,
1466 max_requests_per_minute: None,
1467 health_check_interval_secs: 60,
1468 health_check_timeout_secs: 5,
1469 expected_reject_patterns: vec![],
1470 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1471 proxy_urls: vec![],
1472 };
1473
1474 let broadcaster = CancelBroadcaster::new(config).unwrap();
1475 let metrics = broadcaster.get_metrics_async().await;
1476
1477 assert_eq!(metrics.total_clients, 2);
1479 assert_eq!(metrics.total_cancels, 0);
1480 assert_eq!(metrics.successful_cancels, 0);
1481 assert_eq!(metrics.failed_cancels, 0);
1482 }
1483
1484 #[tokio::test]
1485 async fn test_broadcast_cancel_all_structure() {
1486 let config = CancelBroadcasterConfig {
1488 pool_size: 3,
1489 api_key: Some("test_key".to_string()),
1490 api_secret: Some("test_secret".to_string()),
1491 base_url: Some("https://test.example.com".to_string()),
1492 testnet: false,
1493 timeout_secs: Some(5),
1494 max_retries: None,
1495 retry_delay_ms: None,
1496 retry_delay_max_ms: None,
1497 recv_window_ms: None,
1498 max_requests_per_second: None,
1499 max_requests_per_minute: None,
1500 health_check_interval_secs: 60,
1501 health_check_timeout_secs: 5,
1502 expected_reject_patterns: vec![],
1503 idempotent_success_patterns: vec!["orderID not found".to_string()],
1504 proxy_urls: vec![],
1505 };
1506
1507 let broadcaster = CancelBroadcaster::new(config).unwrap();
1508 let metrics = broadcaster.get_metrics_async().await;
1509
1510 assert_eq!(metrics.total_clients, 3);
1512 assert_eq!(metrics.healthy_clients, 3);
1513 assert_eq!(metrics.total_cancels, 0);
1514 }
1515
1516 #[tokio::test]
1518 async fn test_single_cancel_metrics_with_mixed_responses() {
1519 let transports = vec![
1522 create_stub_transport("client-0", |_, _, _| async {
1523 anyhow::bail!("Connection timeout")
1524 }),
1525 create_stub_transport("client-1", |_, _, _| async {
1526 anyhow::bail!("AlreadyCanceled")
1527 }),
1528 ];
1529
1530 let config = CancelBroadcasterConfig::default();
1531 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1532
1533 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1534 let result = broadcaster
1535 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1536 .await;
1537
1538 assert!(result.is_ok());
1540 assert!(result.unwrap().is_none());
1541
1542 let metrics = broadcaster.get_metrics_async().await;
1544 assert_eq!(
1545 metrics.failed_cancels, 0,
1546 "Idempotent success should not increment failed_cancels"
1547 );
1548 assert_eq!(metrics.idempotent_successes, 1);
1549 assert_eq!(metrics.successful_cancels, 0);
1550 }
1551
1552 #[tokio::test]
1553 async fn test_metrics_initialization_and_health() {
1554 let config = CancelBroadcasterConfig {
1556 pool_size: 4,
1557 api_key: Some("test_key".to_string()),
1558 api_secret: Some("test_secret".to_string()),
1559 base_url: Some("https://test.example.com".to_string()),
1560 testnet: false,
1561 timeout_secs: Some(5),
1562 max_retries: None,
1563 retry_delay_ms: None,
1564 retry_delay_max_ms: None,
1565 recv_window_ms: None,
1566 max_requests_per_second: None,
1567 max_requests_per_minute: None,
1568 health_check_interval_secs: 60,
1569 health_check_timeout_secs: 5,
1570 expected_reject_patterns: vec![],
1571 idempotent_success_patterns: vec![],
1572 proxy_urls: vec![],
1573 };
1574
1575 let broadcaster = CancelBroadcaster::new(config).unwrap();
1576 let metrics = broadcaster.get_metrics_async().await;
1577
1578 assert_eq!(metrics.total_cancels, 0);
1580 assert_eq!(metrics.successful_cancels, 0);
1581 assert_eq!(metrics.failed_cancels, 0);
1582 assert_eq!(metrics.expected_rejects, 0);
1583 assert_eq!(metrics.idempotent_successes, 0);
1584
1585 assert_eq!(metrics.healthy_clients, 4);
1587 assert_eq!(metrics.total_clients, 4);
1588 }
1589
1590 #[tokio::test]
1592 async fn test_health_check_task_lifecycle() {
1593 let config = CancelBroadcasterConfig {
1594 pool_size: 1,
1595 api_key: Some("test_key".to_string()),
1596 api_secret: Some("test_secret".to_string()),
1597 base_url: Some("https://test.example.com".to_string()),
1598 testnet: false,
1599 timeout_secs: Some(5),
1600 max_retries: None,
1601 retry_delay_ms: None,
1602 retry_delay_max_ms: None,
1603 recv_window_ms: None,
1604 max_requests_per_second: None,
1605 max_requests_per_minute: None,
1606 health_check_interval_secs: 1, health_check_timeout_secs: 1,
1608 expected_reject_patterns: vec![],
1609 idempotent_success_patterns: vec![],
1610 proxy_urls: vec![],
1611 };
1612
1613 let broadcaster = CancelBroadcaster::new(config).unwrap();
1614
1615 broadcaster.start().await.unwrap();
1617 assert!(broadcaster.running.load(Ordering::Relaxed));
1618
1619 {
1621 let task_guard = broadcaster.health_check_task.read().await;
1622 assert!(task_guard.is_some());
1623 }
1624
1625 broadcaster.stop().await;
1627 assert!(!broadcaster.running.load(Ordering::Relaxed));
1628
1629 {
1631 let task_guard = broadcaster.health_check_task.read().await;
1632 assert!(task_guard.is_none());
1633 }
1634 }
1635}