1use std::{
34 sync::{
35 Arc,
36 atomic::{AtomicBool, AtomicU64, Ordering},
37 },
38 time::Duration,
39};
40
41use futures_util::future;
42use nautilus_model::{
43 enums::OrderSide,
44 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
45 instruments::InstrumentAny,
46 reports::OrderStatusReport,
47};
48use tokio::{sync::RwLock, task::JoinHandle, time::interval};
49
50use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
51
52trait CancelExecutor: Send + Sync {
74 fn add_instrument(&self, instrument: InstrumentAny);
76
77 fn health_check(
79 &self,
80 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>;
81
82 fn cancel_order(
84 &self,
85 instrument_id: InstrumentId,
86 client_order_id: Option<ClientOrderId>,
87 venue_order_id: Option<VenueOrderId>,
88 ) -> std::pin::Pin<
89 Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
90 >;
91
92 fn cancel_orders(
94 &self,
95 instrument_id: InstrumentId,
96 client_order_ids: Option<Vec<ClientOrderId>>,
97 venue_order_ids: Option<Vec<VenueOrderId>>,
98 ) -> std::pin::Pin<
99 Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
100 >;
101
102 fn cancel_all_orders(
104 &self,
105 instrument_id: InstrumentId,
106 order_side: Option<OrderSide>,
107 ) -> std::pin::Pin<
108 Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
109 >;
110}
111
112impl CancelExecutor for BitmexHttpClient {
113 fn add_instrument(&self, instrument: InstrumentAny) {
114 Self::add_instrument(self, instrument);
115 }
116
117 fn health_check(
118 &self,
119 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>> {
120 Box::pin(async move {
121 Self::http_get_server_time(self)
122 .await
123 .map(|_| ())
124 .map_err(|e| anyhow::anyhow!("{e}"))
125 })
126 }
127
128 fn cancel_order(
129 &self,
130 instrument_id: InstrumentId,
131 client_order_id: Option<ClientOrderId>,
132 venue_order_id: Option<VenueOrderId>,
133 ) -> std::pin::Pin<
134 Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
135 > {
136 Box::pin(async move {
137 Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
138 })
139 }
140
141 fn cancel_orders(
142 &self,
143 instrument_id: InstrumentId,
144 client_order_ids: Option<Vec<ClientOrderId>>,
145 venue_order_ids: Option<Vec<VenueOrderId>>,
146 ) -> std::pin::Pin<
147 Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
148 > {
149 Box::pin(async move {
150 Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
151 })
152 }
153
154 fn cancel_all_orders(
155 &self,
156 instrument_id: InstrumentId,
157 order_side: Option<nautilus_model::enums::OrderSide>,
158 ) -> std::pin::Pin<
159 Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
160 > {
161 Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
162 }
163}
164
165#[derive(Debug, Clone)]
167pub struct CancelBroadcasterConfig {
168 pub pool_size: usize,
170 pub api_key: Option<String>,
172 pub api_secret: Option<String>,
174 pub base_url: Option<String>,
176 pub testnet: bool,
178 pub timeout_secs: Option<u64>,
180 pub max_retries: Option<u32>,
182 pub retry_delay_ms: Option<u64>,
184 pub retry_delay_max_ms: Option<u64>,
186 pub recv_window_ms: Option<u64>,
188 pub max_requests_per_second: Option<u32>,
190 pub max_requests_per_minute: Option<u32>,
192 pub health_check_interval_secs: u64,
194 pub health_check_timeout_secs: u64,
196 pub expected_reject_patterns: Vec<String>,
198 pub idempotent_success_patterns: Vec<String>,
200}
201
202impl Default for CancelBroadcasterConfig {
203 fn default() -> Self {
204 Self {
205 pool_size: 2,
206 api_key: None,
207 api_secret: None,
208 base_url: None,
209 testnet: false,
210 timeout_secs: Some(60),
211 max_retries: None,
212 retry_delay_ms: Some(1_000),
213 retry_delay_max_ms: Some(5_000),
214 recv_window_ms: Some(10_000),
215 max_requests_per_second: Some(10),
216 max_requests_per_minute: Some(120),
217 health_check_interval_secs: 30,
218 health_check_timeout_secs: 5,
219 expected_reject_patterns: vec![
220 r"Order had execInst of ParticipateDoNotInitiate".to_string(),
221 ],
222 idempotent_success_patterns: vec![
223 r"AlreadyCanceled".to_string(),
224 r"orderID not found".to_string(),
225 r"Unable to cancel order due to existing state".to_string(),
226 ],
227 }
228 }
229}
230
231#[derive(Clone)]
233struct TransportClient {
234 executor: Arc<dyn CancelExecutor>,
239 client_id: String,
240 healthy: Arc<AtomicBool>,
241 cancel_count: Arc<AtomicU64>,
242 error_count: Arc<AtomicU64>,
243}
244
245impl std::fmt::Debug for TransportClient {
246 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247 f.debug_struct("TransportClient")
248 .field("client_id", &self.client_id)
249 .field("healthy", &self.healthy)
250 .field("cancel_count", &self.cancel_count)
251 .field("error_count", &self.error_count)
252 .finish()
253 }
254}
255
256impl TransportClient {
257 fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
258 Self {
259 executor: Arc::new(executor),
260 client_id,
261 healthy: Arc::new(AtomicBool::new(true)),
262 cancel_count: Arc::new(AtomicU64::new(0)),
263 error_count: Arc::new(AtomicU64::new(0)),
264 }
265 }
266
267 fn is_healthy(&self) -> bool {
268 self.healthy.load(Ordering::Relaxed)
269 }
270
271 fn mark_healthy(&self) {
272 self.healthy.store(true, Ordering::Relaxed);
273 }
274
275 fn mark_unhealthy(&self) {
276 self.healthy.store(false, Ordering::Relaxed);
277 }
278
279 async fn health_check(&self, timeout_secs: u64) -> bool {
280 match tokio::time::timeout(
281 Duration::from_secs(timeout_secs),
282 self.executor.health_check(),
283 )
284 .await
285 {
286 Ok(Ok(_)) => {
287 self.mark_healthy();
288 true
289 }
290 Ok(Err(e)) => {
291 tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
292 self.mark_unhealthy();
293 false
294 }
295 Err(_) => {
296 tracing::warn!("Health check timeout for client {}", self.client_id);
297 self.mark_unhealthy();
298 false
299 }
300 }
301 }
302
303 async fn cancel_order(
304 &self,
305 instrument_id: InstrumentId,
306 client_order_id: Option<ClientOrderId>,
307 venue_order_id: Option<VenueOrderId>,
308 ) -> anyhow::Result<OrderStatusReport> {
309 self.cancel_count.fetch_add(1, Ordering::Relaxed);
310
311 match self
312 .executor
313 .cancel_order(instrument_id, client_order_id, venue_order_id)
314 .await
315 {
316 Ok(report) => {
317 self.mark_healthy();
318 Ok(report)
319 }
320 Err(e) => {
321 self.error_count.fetch_add(1, Ordering::Relaxed);
322 Err(e)
323 }
324 }
325 }
326
327 fn get_cancel_count(&self) -> u64 {
328 self.cancel_count.load(Ordering::Relaxed)
329 }
330
331 fn get_error_count(&self) -> u64 {
332 self.error_count.load(Ordering::Relaxed)
333 }
334}
335
336#[cfg_attr(feature = "python", pyo3::pyclass)]
342#[derive(Debug)]
343pub struct CancelBroadcaster {
344 config: CancelBroadcasterConfig,
345 transports: Arc<RwLock<Vec<TransportClient>>>,
346 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
347 running: Arc<AtomicBool>,
348 total_cancels: Arc<AtomicU64>,
349 successful_cancels: Arc<AtomicU64>,
350 failed_cancels: Arc<AtomicU64>,
351 expected_rejects: Arc<AtomicU64>,
352 idempotent_successes: Arc<AtomicU64>,
353}
354
355impl CancelBroadcaster {
356 pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
362 let mut transports = Vec::with_capacity(config.pool_size);
363
364 let base_url = if config.testnet && config.base_url.is_none() {
366 Some(BITMEX_HTTP_TESTNET_URL.to_string())
367 } else {
368 config.base_url.clone()
369 };
370
371 for i in 0..config.pool_size {
372 let client = BitmexHttpClient::with_credentials(
373 config.api_key.clone(),
374 config.api_secret.clone(),
375 base_url.clone(),
376 config.timeout_secs,
377 config.max_retries,
378 config.retry_delay_ms,
379 config.retry_delay_max_ms,
380 config.recv_window_ms,
381 config.max_requests_per_second,
382 config.max_requests_per_minute,
383 )
384 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
385
386 transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
387 }
388
389 Ok(Self {
390 config,
391 transports: Arc::new(RwLock::new(transports)),
392 health_check_task: Arc::new(RwLock::new(None)),
393 running: Arc::new(AtomicBool::new(false)),
394 total_cancels: Arc::new(AtomicU64::new(0)),
395 successful_cancels: Arc::new(AtomicU64::new(0)),
396 failed_cancels: Arc::new(AtomicU64::new(0)),
397 expected_rejects: Arc::new(AtomicU64::new(0)),
398 idempotent_successes: Arc::new(AtomicU64::new(0)),
399 })
400 }
401
402 pub async fn start(&self) -> anyhow::Result<()> {
408 if self.running.load(Ordering::Relaxed) {
409 return Ok(());
410 }
411
412 self.running.store(true, Ordering::Relaxed);
413
414 self.run_health_checks().await;
416
417 let transports = Arc::clone(&self.transports);
419 let running = Arc::clone(&self.running);
420 let interval_secs = self.config.health_check_interval_secs;
421 let timeout_secs = self.config.health_check_timeout_secs;
422
423 let task = tokio::spawn(async move {
424 let mut ticker = interval(Duration::from_secs(interval_secs));
425 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
426
427 loop {
428 ticker.tick().await;
429
430 if !running.load(Ordering::Relaxed) {
431 break;
432 }
433
434 let transports_guard = transports.read().await;
435 let transports_clone: Vec<_> = transports_guard.clone();
436 drop(transports_guard);
437
438 let tasks: Vec<_> = transports_clone
439 .iter()
440 .map(|t| t.health_check(timeout_secs))
441 .collect();
442
443 let results = future::join_all(tasks).await;
444 let healthy_count = results.iter().filter(|&&r| r).count();
445
446 tracing::debug!(
447 "Health check complete: {}/{} clients healthy",
448 healthy_count,
449 results.len()
450 );
451 }
452 });
453
454 *self.health_check_task.write().await = Some(task);
455
456 tracing::info!(
457 "CancelBroadcaster started with {} clients",
458 self.transports.read().await.len()
459 );
460
461 Ok(())
462 }
463
464 pub async fn stop(&self) {
466 if !self.running.load(Ordering::Relaxed) {
467 return;
468 }
469
470 self.running.store(false, Ordering::Relaxed);
471
472 if let Some(task) = self.health_check_task.write().await.take() {
473 task.abort();
474 }
475
476 tracing::info!("CancelBroadcaster stopped");
477 }
478
479 async fn run_health_checks(&self) {
480 let transports_guard = self.transports.read().await;
481 let transports_clone: Vec<_> = transports_guard.clone();
482 drop(transports_guard);
483
484 let tasks: Vec<_> = transports_clone
485 .iter()
486 .map(|t| t.health_check(self.config.health_check_timeout_secs))
487 .collect();
488
489 let results = future::join_all(tasks).await;
490 let healthy_count = results.iter().filter(|&&r| r).count();
491
492 tracing::debug!(
493 "Health check complete: {}/{} clients healthy",
494 healthy_count,
495 results.len()
496 );
497 }
498
499 fn is_expected_reject(&self, error_message: &str) -> bool {
500 self.config
501 .expected_reject_patterns
502 .iter()
503 .any(|pattern| error_message.contains(pattern))
504 }
505
506 fn is_idempotent_success(&self, error_message: &str) -> bool {
507 self.config
508 .idempotent_success_patterns
509 .iter()
510 .any(|pattern| error_message.contains(pattern))
511 }
512
513 async fn process_cancel_results<T>(
517 &self,
518 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
519 idempotent_result: impl FnOnce() -> anyhow::Result<T>,
520 operation: &str,
521 params: String,
522 idempotent_reason: &str,
523 ) -> anyhow::Result<T>
524 where
525 T: Send + 'static,
526 {
527 let mut errors = Vec::new();
528
529 while !handles.is_empty() {
530 let current_handles = std::mem::take(&mut handles);
531 let (result, _idx, remaining) = future::select_all(current_handles).await;
532 handles = remaining.into_iter().collect();
533
534 match result {
535 Ok((client_id, Ok(result))) => {
536 for handle in &handles {
538 handle.abort();
539 }
540 self.successful_cancels.fetch_add(1, Ordering::Relaxed);
541 tracing::debug!(
542 "{} broadcast succeeded [{}] {}",
543 operation,
544 client_id,
545 params
546 );
547 return Ok(result);
548 }
549 Ok((client_id, Err(e))) => {
550 let error_msg = e.to_string();
551
552 if self.is_idempotent_success(&error_msg) {
553 for handle in &handles {
555 handle.abort();
556 }
557 self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
558 tracing::debug!(
559 "Idempotent success [{}] - {}: {} {}",
560 client_id,
561 idempotent_reason,
562 error_msg,
563 params
564 );
565 return idempotent_result();
566 }
567
568 if self.is_expected_reject(&error_msg) {
569 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
570 tracing::debug!(
571 "Expected {} rejection [{}]: {} {}",
572 operation.to_lowercase(),
573 client_id,
574 error_msg,
575 params
576 );
577 errors.push(error_msg);
578 } else {
579 tracing::warn!(
580 "{} request failed [{}]: {} {}",
581 operation,
582 client_id,
583 error_msg,
584 params
585 );
586 errors.push(error_msg);
587 }
588 }
589 Err(e) => {
590 tracing::warn!("{} task join error: {e:?}", operation);
591 errors.push(format!("Task panicked: {e:?}"));
592 }
593 }
594 }
595
596 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
598 tracing::error!(
599 "All {} requests failed: {:?} {}",
600 operation.to_lowercase(),
601 errors,
602 params
603 );
604 Err(anyhow::anyhow!(
605 "All {} requests failed: {:?}",
606 operation.to_lowercase(),
607 errors
608 ))
609 }
610
611 pub async fn broadcast_cancel(
623 &self,
624 instrument_id: InstrumentId,
625 client_order_id: Option<ClientOrderId>,
626 venue_order_id: Option<VenueOrderId>,
627 ) -> anyhow::Result<Option<OrderStatusReport>> {
628 self.total_cancels.fetch_add(1, Ordering::Relaxed);
629
630 let transports_guard = self.transports.read().await;
632 let healthy_transports: Vec<TransportClient> = transports_guard
633 .iter()
634 .filter(|t| t.is_healthy())
635 .cloned()
636 .collect();
637 drop(transports_guard);
638
639 if healthy_transports.is_empty() {
640 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
641 anyhow::bail!("No healthy transport clients available");
642 }
643
644 let mut handles = Vec::new();
646 for transport in healthy_transports {
647 let handle = tokio::spawn(async move {
648 let client_id = transport.client_id.clone();
649 let result = transport
650 .cancel_order(instrument_id, client_order_id, venue_order_id)
651 .await
652 .map(Some); (client_id, result)
654 });
655 handles.push(handle);
656 }
657
658 self.process_cancel_results(
659 handles,
660 || Ok(None),
661 "Cancel",
662 format!(
663 "(client_order_id={:?}, venue_order_id={:?})",
664 client_order_id, venue_order_id
665 ),
666 "order already cancelled/not found",
667 )
668 .await
669 }
670
671 pub async fn broadcast_batch_cancel(
677 &self,
678 instrument_id: InstrumentId,
679 client_order_ids: Option<Vec<ClientOrderId>>,
680 venue_order_ids: Option<Vec<VenueOrderId>>,
681 ) -> anyhow::Result<Vec<OrderStatusReport>> {
682 self.total_cancels.fetch_add(1, Ordering::Relaxed);
683
684 let transports_guard = self.transports.read().await;
686 let healthy_transports: Vec<TransportClient> = transports_guard
687 .iter()
688 .filter(|t| t.is_healthy())
689 .cloned()
690 .collect();
691 drop(transports_guard);
692
693 if healthy_transports.is_empty() {
694 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
695 anyhow::bail!("No healthy transport clients available");
696 }
697
698 let mut handles = Vec::new();
700
701 for transport in healthy_transports {
702 let client_order_ids_clone = client_order_ids.clone();
703 let venue_order_ids_clone = venue_order_ids.clone();
704 let handle = tokio::spawn(async move {
705 let client_id = transport.client_id.clone();
706 let result = transport
707 .executor
708 .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
709 .await;
710 (client_id, result)
711 });
712 handles.push(handle);
713 }
714
715 self.process_cancel_results(
716 handles,
717 || Ok(Vec::new()),
718 "Batch cancel",
719 format!(
720 "(client_order_ids={:?}, venue_order_ids={:?})",
721 client_order_ids, venue_order_ids
722 ),
723 "orders already cancelled/not found",
724 )
725 .await
726 }
727
728 pub async fn broadcast_cancel_all(
734 &self,
735 instrument_id: InstrumentId,
736 order_side: Option<nautilus_model::enums::OrderSide>,
737 ) -> anyhow::Result<Vec<OrderStatusReport>> {
738 self.total_cancels.fetch_add(1, Ordering::Relaxed);
739
740 let transports_guard = self.transports.read().await;
742 let healthy_transports: Vec<TransportClient> = transports_guard
743 .iter()
744 .filter(|t| t.is_healthy())
745 .cloned()
746 .collect();
747 drop(transports_guard);
748
749 if healthy_transports.is_empty() {
750 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
751 anyhow::bail!("No healthy transport clients available");
752 }
753
754 let mut handles = Vec::new();
756 for transport in healthy_transports {
757 let handle = tokio::spawn(async move {
758 let client_id = transport.client_id.clone();
759 let result = transport
760 .executor
761 .cancel_all_orders(instrument_id, order_side)
762 .await;
763 (client_id, result)
764 });
765 handles.push(handle);
766 }
767
768 self.process_cancel_results(
769 handles,
770 || Ok(Vec::new()),
771 "Cancel all",
772 format!(
773 "(instrument_id={}, order_side={:?})",
774 instrument_id, order_side
775 ),
776 "no orders to cancel",
777 )
778 .await
779 }
780
781 pub fn get_metrics(&self) -> BroadcasterMetrics {
783 let transports_guard = self.transports.blocking_read();
784 let healthy_clients = transports_guard.iter().filter(|t| t.is_healthy()).count();
785 let total_clients = transports_guard.len();
786 drop(transports_guard);
787
788 BroadcasterMetrics {
789 total_cancels: self.total_cancels.load(Ordering::Relaxed),
790 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
791 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
792 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
793 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
794 healthy_clients,
795 total_clients,
796 }
797 }
798
799 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
801 let transports_guard = self.transports.read().await;
802 let healthy_clients = transports_guard.iter().filter(|t| t.is_healthy()).count();
803 let total_clients = transports_guard.len();
804 drop(transports_guard);
805
806 BroadcasterMetrics {
807 total_cancels: self.total_cancels.load(Ordering::Relaxed),
808 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
809 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
810 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
811 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
812 healthy_clients,
813 total_clients,
814 }
815 }
816
817 pub fn get_client_stats(&self) -> Vec<ClientStats> {
819 let transports = self.transports.blocking_read();
820 transports
821 .iter()
822 .map(|t| ClientStats {
823 client_id: t.client_id.clone(),
824 healthy: t.is_healthy(),
825 cancel_count: t.get_cancel_count(),
826 error_count: t.get_error_count(),
827 })
828 .collect()
829 }
830
831 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
833 let transports = self.transports.read().await;
834 transports
835 .iter()
836 .map(|t| ClientStats {
837 client_id: t.client_id.clone(),
838 healthy: t.is_healthy(),
839 cancel_count: t.get_cancel_count(),
840 error_count: t.get_error_count(),
841 })
842 .collect()
843 }
844
845 pub fn add_instrument(&self, instrument: nautilus_model::instruments::any::InstrumentAny) {
847 let transports = self.transports.blocking_read();
848 for transport in transports.iter() {
849 transport.executor.add_instrument(instrument.clone());
850 }
851 }
852
853 pub fn clone_for_async(&self) -> Self {
854 Self {
855 config: self.config.clone(),
856 transports: Arc::clone(&self.transports),
857 health_check_task: Arc::clone(&self.health_check_task),
858 running: Arc::clone(&self.running),
859 total_cancels: Arc::clone(&self.total_cancels),
860 successful_cancels: Arc::clone(&self.successful_cancels),
861 failed_cancels: Arc::clone(&self.failed_cancels),
862 expected_rejects: Arc::clone(&self.expected_rejects),
863 idempotent_successes: Arc::clone(&self.idempotent_successes),
864 }
865 }
866
867 #[cfg(test)]
868 fn new_with_transports(
869 config: CancelBroadcasterConfig,
870 transports: Vec<TransportClient>,
871 ) -> Self {
872 Self {
873 config,
874 transports: Arc::new(RwLock::new(transports)),
875 health_check_task: Arc::new(RwLock::new(None)),
876 running: Arc::new(AtomicBool::new(false)),
877 total_cancels: Arc::new(AtomicU64::new(0)),
878 successful_cancels: Arc::new(AtomicU64::new(0)),
879 failed_cancels: Arc::new(AtomicU64::new(0)),
880 expected_rejects: Arc::new(AtomicU64::new(0)),
881 idempotent_successes: Arc::new(AtomicU64::new(0)),
882 }
883 }
884}
885
886#[derive(Debug, Clone)]
888pub struct BroadcasterMetrics {
889 pub total_cancels: u64,
890 pub successful_cancels: u64,
891 pub failed_cancels: u64,
892 pub expected_rejects: u64,
893 pub idempotent_successes: u64,
894 pub healthy_clients: usize,
895 pub total_clients: usize,
896}
897
898#[derive(Debug, Clone)]
900pub struct ClientStats {
901 pub client_id: String,
902 pub healthy: bool,
903 pub cancel_count: u64,
904 pub error_count: u64,
905}
906
907#[cfg(test)]
912mod tests {
913 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
914
915 use nautilus_core::UUID4;
916 use nautilus_model::{
917 enums::{
918 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
919 },
920 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
921 reports::OrderStatusReport,
922 types::{Price, Quantity},
923 };
924
925 use super::*;
926
927 #[derive(Clone)]
929 #[allow(clippy::type_complexity)]
930 struct MockExecutor {
931 handler: Arc<
932 dyn Fn(
933 InstrumentId,
934 Option<ClientOrderId>,
935 Option<VenueOrderId>,
936 ) -> std::pin::Pin<
937 Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send>,
938 > + Send
939 + Sync,
940 >,
941 }
942
943 impl MockExecutor {
944 fn new<F, Fut>(handler: F) -> Self
945 where
946 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
947 + Send
948 + Sync
949 + 'static,
950 Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
951 {
952 Self {
953 handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
954 }
955 }
956 }
957
958 impl CancelExecutor for MockExecutor {
959 fn health_check(
960 &self,
961 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>
962 {
963 Box::pin(async { Ok(()) })
964 }
965
966 fn cancel_order(
967 &self,
968 instrument_id: InstrumentId,
969 client_order_id: Option<ClientOrderId>,
970 venue_order_id: Option<VenueOrderId>,
971 ) -> std::pin::Pin<
972 Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
973 > {
974 (self.handler)(instrument_id, client_order_id, venue_order_id)
975 }
976
977 fn cancel_orders(
978 &self,
979 _instrument_id: InstrumentId,
980 _client_order_ids: Option<Vec<ClientOrderId>>,
981 _venue_order_ids: Option<Vec<VenueOrderId>>,
982 ) -> std::pin::Pin<
983 Box<
984 dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
985 + Send
986 + '_,
987 >,
988 > {
989 Box::pin(async { Ok(Vec::new()) })
990 }
991
992 fn cancel_all_orders(
993 &self,
994 instrument_id: InstrumentId,
995 _order_side: Option<nautilus_model::enums::OrderSide>,
996 ) -> std::pin::Pin<
997 Box<
998 dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
999 + Send
1000 + '_,
1001 >,
1002 > {
1003 let handler = Arc::clone(&self.handler);
1005 Box::pin(async move {
1006 let result = handler(instrument_id, None, None).await;
1008 match result {
1009 Ok(_) => Ok(Vec::new()),
1010 Err(e) => Err(e),
1011 }
1012 })
1013 }
1014
1015 fn add_instrument(&self, _instrument: nautilus_model::instruments::any::InstrumentAny) {
1016 }
1018 }
1019
1020 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
1021 OrderStatusReport {
1022 account_id: AccountId::from("BITMEX-001"),
1023 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
1024 venue_order_id: VenueOrderId::from(venue_order_id),
1025 order_side: OrderSide::Buy,
1026 order_type: OrderType::Limit,
1027 time_in_force: TimeInForce::Gtc,
1028 order_status: OrderStatus::Canceled,
1029 price: Some(Price::new(50000.0, 2)),
1030 quantity: Quantity::new(100.0, 0),
1031 filled_qty: Quantity::new(0.0, 0),
1032 report_id: UUID4::new(),
1033 ts_accepted: 0.into(),
1034 ts_last: 0.into(),
1035 ts_init: 0.into(),
1036 client_order_id: None,
1037 avg_px: None,
1038 trigger_price: None,
1039 trigger_type: None,
1040 contingency_type: ContingencyType::NoContingency,
1041 expire_time: None,
1042 order_list_id: None,
1043 venue_position_id: None,
1044 linked_order_ids: None,
1045 parent_order_id: None,
1046 display_qty: None,
1047 limit_offset: None,
1048 trailing_offset: None,
1049 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
1050 post_only: false,
1051 reduce_only: false,
1052 cancel_reason: None,
1053 ts_triggered: None,
1054 }
1055 }
1056
1057 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
1058 where
1059 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
1060 + Send
1061 + Sync
1062 + 'static,
1063 Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
1064 {
1065 let executor = MockExecutor::new(handler);
1066 TransportClient::new(executor, client_id.to_string())
1067 }
1068
1069 #[tokio::test]
1070 async fn test_broadcast_cancel_immediate_success() {
1071 let report = create_test_report("ORDER-1");
1072 let report_clone = report.clone();
1073
1074 let transports = vec![
1075 create_stub_transport("client-0", move |_, _, _| {
1076 let report = report_clone.clone();
1077 async move { Ok(report) }
1078 }),
1079 create_stub_transport("client-1", |_, _, _| async {
1080 tokio::time::sleep(Duration::from_secs(10)).await;
1081 anyhow::bail!("Should be aborted")
1082 }),
1083 ];
1084
1085 let config = CancelBroadcasterConfig::default();
1086 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1087
1088 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1089 let result = broadcaster
1090 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1091 .await;
1092
1093 assert!(result.is_ok());
1094 let returned_report = result.unwrap();
1095 assert!(returned_report.is_some());
1096 assert_eq!(
1097 returned_report.unwrap().venue_order_id,
1098 report.venue_order_id
1099 );
1100
1101 let metrics = broadcaster.get_metrics_async().await;
1102 assert_eq!(metrics.successful_cancels, 1);
1103 assert_eq!(metrics.failed_cancels, 0);
1104 assert_eq!(metrics.total_cancels, 1);
1105 }
1106
1107 #[tokio::test]
1108 async fn test_broadcast_cancel_idempotent_success() {
1109 let transports = vec![
1110 create_stub_transport("client-0", |_, _, _| async {
1111 anyhow::bail!("AlreadyCanceled")
1112 }),
1113 create_stub_transport("client-1", |_, _, _| async {
1114 tokio::time::sleep(Duration::from_secs(10)).await;
1115 anyhow::bail!("Should be aborted")
1116 }),
1117 ];
1118
1119 let config = CancelBroadcasterConfig::default();
1120 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1121
1122 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1123 let result = broadcaster
1124 .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1125 .await;
1126
1127 assert!(result.is_ok());
1128 assert!(result.unwrap().is_none());
1129
1130 let metrics = broadcaster.get_metrics_async().await;
1131 assert_eq!(metrics.idempotent_successes, 1);
1132 assert_eq!(metrics.successful_cancels, 0);
1133 assert_eq!(metrics.failed_cancels, 0);
1134 }
1135
1136 #[tokio::test]
1137 async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1138 let transports = vec![
1139 create_stub_transport("client-0", |_, _, _| async {
1140 anyhow::bail!("502 Bad Gateway")
1141 }),
1142 create_stub_transport("client-1", |_, _, _| async {
1143 anyhow::bail!("orderID not found")
1144 }),
1145 ];
1146
1147 let config = CancelBroadcasterConfig::default();
1148 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1149
1150 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1151 let result = broadcaster
1152 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1153 .await;
1154
1155 assert!(result.is_ok());
1156 assert!(result.unwrap().is_none());
1157
1158 let metrics = broadcaster.get_metrics_async().await;
1159 assert_eq!(metrics.idempotent_successes, 1);
1160 assert_eq!(metrics.failed_cancels, 0);
1161 }
1162
1163 #[tokio::test]
1164 async fn test_broadcast_cancel_all_failures() {
1165 let transports = vec![
1166 create_stub_transport("client-0", |_, _, _| async {
1167 anyhow::bail!("502 Bad Gateway")
1168 }),
1169 create_stub_transport("client-1", |_, _, _| async {
1170 anyhow::bail!("Connection refused")
1171 }),
1172 ];
1173
1174 let config = CancelBroadcasterConfig::default();
1175 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1176
1177 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1178 let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1179
1180 assert!(result.is_err());
1181 assert!(
1182 result
1183 .unwrap_err()
1184 .to_string()
1185 .contains("All cancel all requests failed")
1186 );
1187
1188 let metrics = broadcaster.get_metrics_async().await;
1189 assert_eq!(metrics.failed_cancels, 1);
1190 assert_eq!(metrics.successful_cancels, 0);
1191 assert_eq!(metrics.idempotent_successes, 0);
1192 }
1193
1194 #[tokio::test]
1195 async fn test_broadcast_cancel_no_healthy_clients() {
1196 let transport = create_stub_transport("client-0", |_, _, _| async {
1197 Ok(create_test_report("ORDER-1"))
1198 });
1199 transport.healthy.store(false, Ordering::Relaxed);
1200
1201 let config = CancelBroadcasterConfig::default();
1202 let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1203
1204 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1205 let result = broadcaster
1206 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1207 .await;
1208
1209 assert!(result.is_err());
1210 assert!(
1211 result
1212 .unwrap_err()
1213 .to_string()
1214 .contains("No healthy transport clients available")
1215 );
1216
1217 let metrics = broadcaster.get_metrics_async().await;
1218 assert_eq!(metrics.failed_cancels, 1);
1219 }
1220
1221 #[tokio::test]
1222 async fn test_broadcast_cancel_metrics_increment() {
1223 let report1 = create_test_report("ORDER-1");
1224 let report1_clone = report1.clone();
1225 let report2 = create_test_report("ORDER-2");
1226 let report2_clone = report2.clone();
1227
1228 let transports = vec![
1229 create_stub_transport("client-0", move |_, _, _| {
1230 let report = report1_clone.clone();
1231 async move { Ok(report) }
1232 }),
1233 create_stub_transport("client-1", move |_, _, _| {
1234 let report = report2_clone.clone();
1235 async move { Ok(report) }
1236 }),
1237 ];
1238
1239 let config = CancelBroadcasterConfig::default();
1240 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1241
1242 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1243
1244 let _ = broadcaster
1245 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1246 .await;
1247
1248 let _ = broadcaster
1249 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1250 .await;
1251
1252 let metrics = broadcaster.get_metrics_async().await;
1253 assert_eq!(metrics.total_cancels, 2);
1254 assert_eq!(metrics.successful_cancels, 2);
1255 }
1256
1257 #[tokio::test]
1258 async fn test_broadcast_cancel_expected_reject_pattern() {
1259 let transports = vec![
1260 create_stub_transport("client-0", |_, _, _| async {
1261 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1262 }),
1263 create_stub_transport("client-1", |_, _, _| async {
1264 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1265 }),
1266 ];
1267
1268 let config = CancelBroadcasterConfig::default();
1269 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1270
1271 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1272 let result = broadcaster
1273 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1274 .await;
1275
1276 assert!(result.is_err());
1277
1278 let metrics = broadcaster.get_metrics_async().await;
1279 assert_eq!(metrics.expected_rejects, 2);
1280 assert_eq!(metrics.failed_cancels, 1);
1281 }
1282
1283 #[tokio::test]
1284 async fn test_broadcaster_creation_with_pool() {
1285 let config = CancelBroadcasterConfig {
1286 pool_size: 3,
1287 api_key: Some("test_key".to_string()),
1288 api_secret: Some("test_secret".to_string()),
1289 base_url: Some("https://test.example.com".to_string()),
1290 testnet: false,
1291 timeout_secs: Some(5),
1292 max_retries: Some(2),
1293 retry_delay_ms: Some(100),
1294 retry_delay_max_ms: Some(1000),
1295 recv_window_ms: Some(5000),
1296 max_requests_per_second: Some(10),
1297 max_requests_per_minute: Some(100),
1298 health_check_interval_secs: 30,
1299 health_check_timeout_secs: 5,
1300 expected_reject_patterns: vec!["test_pattern".to_string()],
1301 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1302 };
1303
1304 let broadcaster = CancelBroadcaster::new(config.clone());
1305 assert!(broadcaster.is_ok());
1306
1307 let broadcaster = broadcaster.unwrap();
1308 let metrics = broadcaster.get_metrics_async().await;
1309
1310 assert_eq!(metrics.total_clients, 3);
1311 assert_eq!(metrics.total_cancels, 0);
1312 assert_eq!(metrics.successful_cancels, 0);
1313 assert_eq!(metrics.failed_cancels, 0);
1314 }
1315
1316 #[tokio::test]
1317 async fn test_broadcaster_lifecycle() {
1318 let config = CancelBroadcasterConfig {
1319 pool_size: 2,
1320 api_key: Some("test_key".to_string()),
1321 api_secret: Some("test_secret".to_string()),
1322 base_url: Some("https://test.example.com".to_string()),
1323 testnet: false,
1324 timeout_secs: Some(5),
1325 max_retries: None,
1326 retry_delay_ms: None,
1327 retry_delay_max_ms: None,
1328 recv_window_ms: None,
1329 max_requests_per_second: None,
1330 max_requests_per_minute: None,
1331 health_check_interval_secs: 60, health_check_timeout_secs: 1,
1333 expected_reject_patterns: vec![],
1334 idempotent_success_patterns: vec![],
1335 };
1336
1337 let broadcaster = CancelBroadcaster::new(config).unwrap();
1338
1339 assert!(!broadcaster.running.load(Ordering::Relaxed));
1341
1342 let start_result = broadcaster.start().await;
1344 assert!(start_result.is_ok());
1345 assert!(broadcaster.running.load(Ordering::Relaxed));
1346
1347 let start_again = broadcaster.start().await;
1349 assert!(start_again.is_ok());
1350
1351 broadcaster.stop().await;
1353 assert!(!broadcaster.running.load(Ordering::Relaxed));
1354
1355 broadcaster.stop().await;
1357 assert!(!broadcaster.running.load(Ordering::Relaxed));
1358 }
1359
1360 #[tokio::test]
1361 async fn test_client_stats_collection() {
1362 let config = CancelBroadcasterConfig {
1363 pool_size: 2,
1364 api_key: Some("test_key".to_string()),
1365 api_secret: Some("test_secret".to_string()),
1366 base_url: Some("https://test.example.com".to_string()),
1367 testnet: false,
1368 timeout_secs: Some(5),
1369 max_retries: None,
1370 retry_delay_ms: None,
1371 retry_delay_max_ms: None,
1372 recv_window_ms: None,
1373 max_requests_per_second: None,
1374 max_requests_per_minute: None,
1375 health_check_interval_secs: 60,
1376 health_check_timeout_secs: 5,
1377 expected_reject_patterns: vec![],
1378 idempotent_success_patterns: vec![],
1379 };
1380
1381 let broadcaster = CancelBroadcaster::new(config).unwrap();
1382 let stats = broadcaster.get_client_stats_async().await;
1383
1384 assert_eq!(stats.len(), 2);
1385 assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1386 assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1387 assert!(stats[0].healthy); assert!(stats[1].healthy);
1389 assert_eq!(stats[0].cancel_count, 0);
1390 assert_eq!(stats[1].cancel_count, 0);
1391 assert_eq!(stats[0].error_count, 0);
1392 assert_eq!(stats[1].error_count, 0);
1393 }
1394
1395 #[tokio::test]
1396 async fn test_testnet_config_sets_base_url() {
1397 let config = CancelBroadcasterConfig {
1398 pool_size: 1,
1399 api_key: Some("test_key".to_string()),
1400 api_secret: Some("test_secret".to_string()),
1401 base_url: None, testnet: true, timeout_secs: Some(5),
1404 max_retries: None,
1405 retry_delay_ms: None,
1406 retry_delay_max_ms: None,
1407 recv_window_ms: None,
1408 max_requests_per_second: None,
1409 max_requests_per_minute: None,
1410 health_check_interval_secs: 60,
1411 health_check_timeout_secs: 5,
1412 expected_reject_patterns: vec![],
1413 idempotent_success_patterns: vec![],
1414 };
1415
1416 let broadcaster = CancelBroadcaster::new(config);
1417 assert!(broadcaster.is_ok());
1418 }
1419
1420 #[tokio::test]
1421 async fn test_default_config() {
1422 let config = CancelBroadcasterConfig {
1423 api_key: Some("test_key".to_string()),
1424 api_secret: Some("test_secret".to_string()),
1425 base_url: Some("https://test.example.com".to_string()),
1426 ..Default::default()
1427 };
1428
1429 let broadcaster = CancelBroadcaster::new(config);
1430 assert!(broadcaster.is_ok());
1431
1432 let broadcaster = broadcaster.unwrap();
1433 let metrics = broadcaster.get_metrics_async().await;
1434
1435 assert_eq!(metrics.total_clients, 2);
1437 }
1438
1439 #[tokio::test]
1440 async fn test_clone_for_async() {
1441 let config = CancelBroadcasterConfig {
1442 pool_size: 1,
1443 api_key: Some("test_key".to_string()),
1444 api_secret: Some("test_secret".to_string()),
1445 base_url: Some("https://test.example.com".to_string()),
1446 testnet: false,
1447 timeout_secs: Some(5),
1448 max_retries: None,
1449 retry_delay_ms: None,
1450 retry_delay_max_ms: None,
1451 recv_window_ms: None,
1452 max_requests_per_second: None,
1453 max_requests_per_minute: None,
1454 health_check_interval_secs: 60,
1455 health_check_timeout_secs: 5,
1456 expected_reject_patterns: vec![],
1457 idempotent_success_patterns: vec![],
1458 };
1459
1460 let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1461
1462 broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1464
1465 let broadcaster2 = broadcaster1.clone_for_async();
1467 let metrics2 = broadcaster2.get_metrics_async().await;
1468
1469 assert_eq!(metrics2.total_cancels, 1); broadcaster2
1473 .successful_cancels
1474 .fetch_add(5, Ordering::Relaxed);
1475
1476 let metrics1 = broadcaster1.get_metrics_async().await;
1478 assert_eq!(metrics1.successful_cancels, 5);
1479 }
1480
1481 #[tokio::test]
1482 async fn test_pattern_matching() {
1483 let config = CancelBroadcasterConfig {
1485 pool_size: 1,
1486 api_key: Some("test_key".to_string()),
1487 api_secret: Some("test_secret".to_string()),
1488 base_url: Some("https://test.example.com".to_string()),
1489 testnet: false,
1490 timeout_secs: Some(5),
1491 max_retries: None,
1492 retry_delay_ms: None,
1493 retry_delay_max_ms: None,
1494 recv_window_ms: None,
1495 max_requests_per_second: None,
1496 max_requests_per_minute: None,
1497 health_check_interval_secs: 60,
1498 health_check_timeout_secs: 5,
1499 expected_reject_patterns: vec![
1500 "ParticipateDoNotInitiate".to_string(),
1501 "Close-only".to_string(),
1502 ],
1503 idempotent_success_patterns: vec![
1504 "AlreadyCanceled".to_string(),
1505 "orderID not found".to_string(),
1506 "Unable to cancel".to_string(),
1507 ],
1508 };
1509
1510 let broadcaster = CancelBroadcaster::new(config).unwrap();
1511
1512 assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1514 assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1515 assert!(!broadcaster.is_expected_reject("Connection timeout"));
1516
1517 assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1519 assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1520 assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1521 assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1522 }
1523
1524 #[tokio::test]
1528 async fn test_broadcast_batch_cancel_structure() {
1529 let config = CancelBroadcasterConfig {
1531 pool_size: 2,
1532 api_key: Some("test_key".to_string()),
1533 api_secret: Some("test_secret".to_string()),
1534 base_url: Some("https://test.example.com".to_string()),
1535 testnet: false,
1536 timeout_secs: Some(5),
1537 max_retries: None,
1538 retry_delay_ms: None,
1539 retry_delay_max_ms: None,
1540 recv_window_ms: None,
1541 max_requests_per_second: None,
1542 max_requests_per_minute: None,
1543 health_check_interval_secs: 60,
1544 health_check_timeout_secs: 5,
1545 expected_reject_patterns: vec![],
1546 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1547 };
1548
1549 let broadcaster = CancelBroadcaster::new(config).unwrap();
1550 let metrics = broadcaster.get_metrics_async().await;
1551
1552 assert_eq!(metrics.total_clients, 2);
1554 assert_eq!(metrics.total_cancels, 0);
1555 assert_eq!(metrics.successful_cancels, 0);
1556 assert_eq!(metrics.failed_cancels, 0);
1557 }
1558
1559 #[tokio::test]
1560 async fn test_broadcast_cancel_all_structure() {
1561 let config = CancelBroadcasterConfig {
1563 pool_size: 3,
1564 api_key: Some("test_key".to_string()),
1565 api_secret: Some("test_secret".to_string()),
1566 base_url: Some("https://test.example.com".to_string()),
1567 testnet: false,
1568 timeout_secs: Some(5),
1569 max_retries: None,
1570 retry_delay_ms: None,
1571 retry_delay_max_ms: None,
1572 recv_window_ms: None,
1573 max_requests_per_second: None,
1574 max_requests_per_minute: None,
1575 health_check_interval_secs: 60,
1576 health_check_timeout_secs: 5,
1577 expected_reject_patterns: vec![],
1578 idempotent_success_patterns: vec!["orderID not found".to_string()],
1579 };
1580
1581 let broadcaster = CancelBroadcaster::new(config).unwrap();
1582 let metrics = broadcaster.get_metrics_async().await;
1583
1584 assert_eq!(metrics.total_clients, 3);
1586 assert_eq!(metrics.healthy_clients, 3);
1587 assert_eq!(metrics.total_cancels, 0);
1588 }
1589
1590 #[tokio::test]
1592 async fn test_single_cancel_metrics_with_mixed_responses() {
1593 let transports = vec![
1596 create_stub_transport("client-0", |_, _, _| async {
1597 anyhow::bail!("Connection timeout")
1598 }),
1599 create_stub_transport("client-1", |_, _, _| async {
1600 anyhow::bail!("AlreadyCanceled")
1601 }),
1602 ];
1603
1604 let config = CancelBroadcasterConfig::default();
1605 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1606
1607 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1608 let result = broadcaster
1609 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1610 .await;
1611
1612 assert!(result.is_ok());
1614 assert!(result.unwrap().is_none());
1615
1616 let metrics = broadcaster.get_metrics_async().await;
1618 assert_eq!(
1619 metrics.failed_cancels, 0,
1620 "Idempotent success should not increment failed_cancels"
1621 );
1622 assert_eq!(metrics.idempotent_successes, 1);
1623 assert_eq!(metrics.successful_cancels, 0);
1624 }
1625
1626 #[tokio::test]
1627 async fn test_metrics_initialization_and_health() {
1628 let config = CancelBroadcasterConfig {
1630 pool_size: 4,
1631 api_key: Some("test_key".to_string()),
1632 api_secret: Some("test_secret".to_string()),
1633 base_url: Some("https://test.example.com".to_string()),
1634 testnet: false,
1635 timeout_secs: Some(5),
1636 max_retries: None,
1637 retry_delay_ms: None,
1638 retry_delay_max_ms: None,
1639 recv_window_ms: None,
1640 max_requests_per_second: None,
1641 max_requests_per_minute: None,
1642 health_check_interval_secs: 60,
1643 health_check_timeout_secs: 5,
1644 expected_reject_patterns: vec![],
1645 idempotent_success_patterns: vec![],
1646 };
1647
1648 let broadcaster = CancelBroadcaster::new(config).unwrap();
1649 let metrics = broadcaster.get_metrics_async().await;
1650
1651 assert_eq!(metrics.total_cancels, 0);
1653 assert_eq!(metrics.successful_cancels, 0);
1654 assert_eq!(metrics.failed_cancels, 0);
1655 assert_eq!(metrics.expected_rejects, 0);
1656 assert_eq!(metrics.idempotent_successes, 0);
1657
1658 assert_eq!(metrics.healthy_clients, 4);
1660 assert_eq!(metrics.total_clients, 4);
1661 }
1662
1663 #[tokio::test]
1665 async fn test_health_check_task_lifecycle() {
1666 let config = CancelBroadcasterConfig {
1667 pool_size: 1,
1668 api_key: Some("test_key".to_string()),
1669 api_secret: Some("test_secret".to_string()),
1670 base_url: Some("https://test.example.com".to_string()),
1671 testnet: false,
1672 timeout_secs: Some(5),
1673 max_retries: None,
1674 retry_delay_ms: None,
1675 retry_delay_max_ms: None,
1676 recv_window_ms: None,
1677 max_requests_per_second: None,
1678 max_requests_per_minute: None,
1679 health_check_interval_secs: 1, health_check_timeout_secs: 1,
1681 expected_reject_patterns: vec![],
1682 idempotent_success_patterns: vec![],
1683 };
1684
1685 let broadcaster = CancelBroadcaster::new(config).unwrap();
1686
1687 broadcaster.start().await.unwrap();
1689 assert!(broadcaster.running.load(Ordering::Relaxed));
1690
1691 {
1693 let task_guard = broadcaster.health_check_task.read().await;
1694 assert!(task_guard.is_some());
1695 }
1696
1697 tokio::time::sleep(Duration::from_millis(100)).await;
1699
1700 broadcaster.stop().await;
1702 assert!(!broadcaster.running.load(Ordering::Relaxed));
1703
1704 {
1706 let task_guard = broadcaster.health_check_task.read().await;
1707 assert!(task_guard.is_none());
1708 }
1709 }
1710}