1use std::{
34 sync::{
35 Arc,
36 atomic::{AtomicBool, AtomicU64, Ordering},
37 },
38 time::Duration,
39};
40
41use futures_util::future;
42use nautilus_model::{
43 enums::OrderSide,
44 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
45 instruments::InstrumentAny,
46 reports::OrderStatusReport,
47};
48use tokio::{sync::RwLock, task::JoinHandle, time::interval};
49
50use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
51
52trait CancelExecutor: Send + Sync {
74 fn add_instrument(&self, instrument: InstrumentAny);
76
77 fn health_check(
79 &self,
80 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>;
81
82 fn cancel_order(
84 &self,
85 instrument_id: InstrumentId,
86 client_order_id: Option<ClientOrderId>,
87 venue_order_id: Option<VenueOrderId>,
88 ) -> std::pin::Pin<
89 Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
90 >;
91
92 fn cancel_orders(
94 &self,
95 instrument_id: InstrumentId,
96 client_order_ids: Option<Vec<ClientOrderId>>,
97 venue_order_ids: Option<Vec<VenueOrderId>>,
98 ) -> std::pin::Pin<
99 Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
100 >;
101
102 fn cancel_all_orders(
104 &self,
105 instrument_id: InstrumentId,
106 order_side: Option<OrderSide>,
107 ) -> std::pin::Pin<
108 Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
109 >;
110}
111
112impl CancelExecutor for BitmexHttpClient {
113 fn add_instrument(&self, instrument: InstrumentAny) {
114 BitmexHttpClient::add_instrument(self, instrument);
115 }
116
117 fn health_check(
118 &self,
119 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>> {
120 Box::pin(async move {
121 BitmexHttpClient::http_get_server_time(self)
122 .await
123 .map(|_| ())
124 .map_err(|e| anyhow::anyhow!("{e}"))
125 })
126 }
127
128 fn cancel_order(
129 &self,
130 instrument_id: InstrumentId,
131 client_order_id: Option<ClientOrderId>,
132 venue_order_id: Option<VenueOrderId>,
133 ) -> std::pin::Pin<
134 Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
135 > {
136 Box::pin(async move {
137 BitmexHttpClient::cancel_order(self, instrument_id, client_order_id, venue_order_id)
138 .await
139 })
140 }
141
142 fn cancel_orders(
143 &self,
144 instrument_id: InstrumentId,
145 client_order_ids: Option<Vec<ClientOrderId>>,
146 venue_order_ids: Option<Vec<VenueOrderId>>,
147 ) -> std::pin::Pin<
148 Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
149 > {
150 Box::pin(async move {
151 BitmexHttpClient::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids)
152 .await
153 })
154 }
155
156 fn cancel_all_orders(
157 &self,
158 instrument_id: InstrumentId,
159 order_side: Option<nautilus_model::enums::OrderSide>,
160 ) -> std::pin::Pin<
161 Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
162 > {
163 Box::pin(async move {
164 BitmexHttpClient::cancel_all_orders(self, instrument_id, order_side).await
165 })
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct CancelBroadcasterConfig {
172 pub pool_size: usize,
174 pub api_key: Option<String>,
176 pub api_secret: Option<String>,
178 pub base_url: Option<String>,
180 pub testnet: bool,
182 pub timeout_secs: Option<u64>,
184 pub max_retries: Option<u32>,
186 pub retry_delay_ms: Option<u64>,
188 pub retry_delay_max_ms: Option<u64>,
190 pub recv_window_ms: Option<u64>,
192 pub max_requests_per_second: Option<u32>,
194 pub max_requests_per_minute: Option<u32>,
196 pub health_check_interval_secs: u64,
198 pub health_check_timeout_secs: u64,
200 pub expected_reject_patterns: Vec<String>,
202 pub idempotent_success_patterns: Vec<String>,
204}
205
206impl Default for CancelBroadcasterConfig {
207 fn default() -> Self {
208 Self {
209 pool_size: 2,
210 api_key: None,
211 api_secret: None,
212 base_url: None,
213 testnet: false,
214 timeout_secs: Some(60),
215 max_retries: None,
216 retry_delay_ms: Some(1_000),
217 retry_delay_max_ms: Some(5_000),
218 recv_window_ms: Some(10_000),
219 max_requests_per_second: Some(10),
220 max_requests_per_minute: Some(120),
221 health_check_interval_secs: 30,
222 health_check_timeout_secs: 5,
223 expected_reject_patterns: vec![
224 r"Order had execInst of ParticipateDoNotInitiate".to_string(),
225 ],
226 idempotent_success_patterns: vec![
227 r"AlreadyCanceled".to_string(),
228 r"orderID not found".to_string(),
229 r"Unable to cancel order due to existing state".to_string(),
230 ],
231 }
232 }
233}
234
235#[derive(Clone)]
237struct TransportClient {
238 executor: Arc<dyn CancelExecutor>,
243 client_id: String,
244 healthy: Arc<AtomicBool>,
245 cancel_count: Arc<AtomicU64>,
246 error_count: Arc<AtomicU64>,
247}
248
249impl std::fmt::Debug for TransportClient {
250 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 f.debug_struct("TransportClient")
252 .field("client_id", &self.client_id)
253 .field("healthy", &self.healthy)
254 .field("cancel_count", &self.cancel_count)
255 .field("error_count", &self.error_count)
256 .finish()
257 }
258}
259
260impl TransportClient {
261 fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
262 Self {
263 executor: Arc::new(executor),
264 client_id,
265 healthy: Arc::new(AtomicBool::new(true)),
266 cancel_count: Arc::new(AtomicU64::new(0)),
267 error_count: Arc::new(AtomicU64::new(0)),
268 }
269 }
270
271 fn is_healthy(&self) -> bool {
272 self.healthy.load(Ordering::Relaxed)
273 }
274
275 fn mark_healthy(&self) {
276 self.healthy.store(true, Ordering::Relaxed);
277 }
278
279 fn mark_unhealthy(&self) {
280 self.healthy.store(false, Ordering::Relaxed);
281 }
282
283 async fn health_check(&self, timeout_secs: u64) -> bool {
284 match tokio::time::timeout(
285 Duration::from_secs(timeout_secs),
286 self.executor.health_check(),
287 )
288 .await
289 {
290 Ok(Ok(_)) => {
291 self.mark_healthy();
292 true
293 }
294 Ok(Err(e)) => {
295 tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
296 self.mark_unhealthy();
297 false
298 }
299 Err(_) => {
300 tracing::warn!("Health check timeout for client {}", self.client_id);
301 self.mark_unhealthy();
302 false
303 }
304 }
305 }
306
307 async fn cancel_order(
308 &self,
309 instrument_id: InstrumentId,
310 client_order_id: Option<ClientOrderId>,
311 venue_order_id: Option<VenueOrderId>,
312 ) -> anyhow::Result<OrderStatusReport> {
313 self.cancel_count.fetch_add(1, Ordering::Relaxed);
314
315 match self
316 .executor
317 .cancel_order(instrument_id, client_order_id, venue_order_id)
318 .await
319 {
320 Ok(report) => {
321 self.mark_healthy();
322 Ok(report)
323 }
324 Err(e) => {
325 self.error_count.fetch_add(1, Ordering::Relaxed);
326 Err(e)
327 }
328 }
329 }
330
331 fn get_cancel_count(&self) -> u64 {
332 self.cancel_count.load(Ordering::Relaxed)
333 }
334
335 fn get_error_count(&self) -> u64 {
336 self.error_count.load(Ordering::Relaxed)
337 }
338}
339
340#[cfg_attr(feature = "python", pyo3::pyclass)]
346#[derive(Debug)]
347pub struct CancelBroadcaster {
348 config: CancelBroadcasterConfig,
349 transports: Arc<RwLock<Vec<TransportClient>>>,
350 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
351 running: Arc<AtomicBool>,
352 total_cancels: Arc<AtomicU64>,
353 successful_cancels: Arc<AtomicU64>,
354 failed_cancels: Arc<AtomicU64>,
355 expected_rejects: Arc<AtomicU64>,
356 idempotent_successes: Arc<AtomicU64>,
357}
358
359impl CancelBroadcaster {
360 pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
366 let mut transports = Vec::with_capacity(config.pool_size);
367
368 let base_url = if config.testnet && config.base_url.is_none() {
370 Some(BITMEX_HTTP_TESTNET_URL.to_string())
371 } else {
372 config.base_url.clone()
373 };
374
375 for i in 0..config.pool_size {
376 let client = BitmexHttpClient::with_credentials(
377 config.api_key.clone(),
378 config.api_secret.clone(),
379 base_url.clone(),
380 config.timeout_secs,
381 config.max_retries,
382 config.retry_delay_ms,
383 config.retry_delay_max_ms,
384 config.recv_window_ms,
385 config.max_requests_per_second,
386 config.max_requests_per_minute,
387 )
388 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
389
390 transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
391 }
392
393 Ok(Self {
394 config,
395 transports: Arc::new(RwLock::new(transports)),
396 health_check_task: Arc::new(RwLock::new(None)),
397 running: Arc::new(AtomicBool::new(false)),
398 total_cancels: Arc::new(AtomicU64::new(0)),
399 successful_cancels: Arc::new(AtomicU64::new(0)),
400 failed_cancels: Arc::new(AtomicU64::new(0)),
401 expected_rejects: Arc::new(AtomicU64::new(0)),
402 idempotent_successes: Arc::new(AtomicU64::new(0)),
403 })
404 }
405
406 pub async fn start(&self) -> anyhow::Result<()> {
412 if self.running.load(Ordering::Relaxed) {
413 return Ok(());
414 }
415
416 self.running.store(true, Ordering::Relaxed);
417
418 self.run_health_checks().await;
420
421 let transports = Arc::clone(&self.transports);
423 let running = Arc::clone(&self.running);
424 let interval_secs = self.config.health_check_interval_secs;
425 let timeout_secs = self.config.health_check_timeout_secs;
426
427 let task = tokio::spawn(async move {
428 let mut ticker = interval(Duration::from_secs(interval_secs));
429 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
430
431 loop {
432 ticker.tick().await;
433
434 if !running.load(Ordering::Relaxed) {
435 break;
436 }
437
438 let transports_guard = transports.read().await;
439 let transports_clone: Vec<_> = transports_guard.clone();
440 drop(transports_guard);
441
442 let tasks: Vec<_> = transports_clone
443 .iter()
444 .map(|t| t.health_check(timeout_secs))
445 .collect();
446
447 let results = future::join_all(tasks).await;
448 let healthy_count = results.iter().filter(|&&r| r).count();
449
450 tracing::debug!(
451 "Health check complete: {}/{} clients healthy",
452 healthy_count,
453 results.len()
454 );
455 }
456 });
457
458 *self.health_check_task.write().await = Some(task);
459
460 tracing::info!(
461 "CancelBroadcaster started with {} clients",
462 self.transports.read().await.len()
463 );
464
465 Ok(())
466 }
467
468 pub async fn stop(&self) {
470 if !self.running.load(Ordering::Relaxed) {
471 return;
472 }
473
474 self.running.store(false, Ordering::Relaxed);
475
476 if let Some(task) = self.health_check_task.write().await.take() {
477 task.abort();
478 }
479
480 tracing::info!("CancelBroadcaster stopped");
481 }
482
483 async fn run_health_checks(&self) {
484 let transports_guard = self.transports.read().await;
485 let transports_clone: Vec<_> = transports_guard.clone();
486 drop(transports_guard);
487
488 let tasks: Vec<_> = transports_clone
489 .iter()
490 .map(|t| t.health_check(self.config.health_check_timeout_secs))
491 .collect();
492
493 let results = future::join_all(tasks).await;
494 let healthy_count = results.iter().filter(|&&r| r).count();
495
496 tracing::debug!(
497 "Health check complete: {}/{} clients healthy",
498 healthy_count,
499 results.len()
500 );
501 }
502
503 fn is_expected_reject(&self, error_message: &str) -> bool {
504 self.config
505 .expected_reject_patterns
506 .iter()
507 .any(|pattern| error_message.contains(pattern))
508 }
509
510 fn is_idempotent_success(&self, error_message: &str) -> bool {
511 self.config
512 .idempotent_success_patterns
513 .iter()
514 .any(|pattern| error_message.contains(pattern))
515 }
516
517 async fn process_cancel_results<T>(
521 &self,
522 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
523 idempotent_result: impl FnOnce() -> anyhow::Result<T>,
524 operation: &str,
525 params: String,
526 idempotent_reason: &str,
527 ) -> anyhow::Result<T>
528 where
529 T: Send + 'static,
530 {
531 let mut errors = Vec::new();
532
533 while !handles.is_empty() {
534 let current_handles = std::mem::take(&mut handles);
535 let (result, _idx, remaining) = future::select_all(current_handles).await;
536 handles = remaining.into_iter().collect();
537
538 match result {
539 Ok((client_id, Ok(result))) => {
540 for handle in &handles {
542 handle.abort();
543 }
544 self.successful_cancels.fetch_add(1, Ordering::Relaxed);
545 tracing::debug!(
546 "{} broadcast succeeded [{}] {}",
547 operation,
548 client_id,
549 params
550 );
551 return Ok(result);
552 }
553 Ok((client_id, Err(e))) => {
554 let error_msg = e.to_string();
555
556 if self.is_idempotent_success(&error_msg) {
557 for handle in &handles {
559 handle.abort();
560 }
561 self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
562 tracing::debug!(
563 "Idempotent success [{}] - {}: {} {}",
564 client_id,
565 idempotent_reason,
566 error_msg,
567 params
568 );
569 return idempotent_result();
570 }
571
572 if self.is_expected_reject(&error_msg) {
573 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
574 tracing::debug!(
575 "Expected {} rejection [{}]: {} {}",
576 operation.to_lowercase(),
577 client_id,
578 error_msg,
579 params
580 );
581 errors.push(error_msg);
582 } else {
583 tracing::warn!(
584 "{} request failed [{}]: {} {}",
585 operation,
586 client_id,
587 error_msg,
588 params
589 );
590 errors.push(error_msg);
591 }
592 }
593 Err(e) => {
594 tracing::warn!("{} task join error: {e:?}", operation);
595 errors.push(format!("Task panicked: {e:?}"));
596 }
597 }
598 }
599
600 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
602 tracing::error!(
603 "All {} requests failed: {:?} {}",
604 operation.to_lowercase(),
605 errors,
606 params
607 );
608 Err(anyhow::anyhow!(
609 "All {} requests failed: {:?}",
610 operation.to_lowercase(),
611 errors
612 ))
613 }
614
615 pub async fn broadcast_cancel(
627 &self,
628 instrument_id: InstrumentId,
629 client_order_id: Option<ClientOrderId>,
630 venue_order_id: Option<VenueOrderId>,
631 ) -> anyhow::Result<Option<OrderStatusReport>> {
632 self.total_cancels.fetch_add(1, Ordering::Relaxed);
633
634 let transports_guard = self.transports.read().await;
636 let healthy_transports: Vec<TransportClient> = transports_guard
637 .iter()
638 .filter(|t| t.is_healthy())
639 .cloned()
640 .collect();
641 drop(transports_guard);
642
643 if healthy_transports.is_empty() {
644 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
645 anyhow::bail!("No healthy transport clients available");
646 }
647
648 let mut handles = Vec::new();
650 for transport in healthy_transports {
651 let handle = tokio::spawn(async move {
652 let client_id = transport.client_id.clone();
653 let result = transport
654 .cancel_order(instrument_id, client_order_id, venue_order_id)
655 .await
656 .map(Some); (client_id, result)
658 });
659 handles.push(handle);
660 }
661
662 self.process_cancel_results(
663 handles,
664 || Ok(None),
665 "Cancel",
666 format!(
667 "(client_order_id={:?}, venue_order_id={:?})",
668 client_order_id, venue_order_id
669 ),
670 "order already cancelled/not found",
671 )
672 .await
673 }
674
675 pub async fn broadcast_batch_cancel(
681 &self,
682 instrument_id: InstrumentId,
683 client_order_ids: Option<Vec<ClientOrderId>>,
684 venue_order_ids: Option<Vec<VenueOrderId>>,
685 ) -> anyhow::Result<Vec<OrderStatusReport>> {
686 self.total_cancels.fetch_add(1, Ordering::Relaxed);
687
688 let transports_guard = self.transports.read().await;
690 let healthy_transports: Vec<TransportClient> = transports_guard
691 .iter()
692 .filter(|t| t.is_healthy())
693 .cloned()
694 .collect();
695 drop(transports_guard);
696
697 if healthy_transports.is_empty() {
698 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
699 anyhow::bail!("No healthy transport clients available");
700 }
701
702 let mut handles = Vec::new();
704
705 for transport in healthy_transports {
706 let client_order_ids_clone = client_order_ids.clone();
707 let venue_order_ids_clone = venue_order_ids.clone();
708 let handle = tokio::spawn(async move {
709 let client_id = transport.client_id.clone();
710 let result = transport
711 .executor
712 .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
713 .await;
714 (client_id, result)
715 });
716 handles.push(handle);
717 }
718
719 self.process_cancel_results(
720 handles,
721 || Ok(Vec::new()),
722 "Batch cancel",
723 format!(
724 "(client_order_ids={:?}, venue_order_ids={:?})",
725 client_order_ids, venue_order_ids
726 ),
727 "orders already cancelled/not found",
728 )
729 .await
730 }
731
732 pub async fn broadcast_cancel_all(
738 &self,
739 instrument_id: InstrumentId,
740 order_side: Option<nautilus_model::enums::OrderSide>,
741 ) -> anyhow::Result<Vec<OrderStatusReport>> {
742 self.total_cancels.fetch_add(1, Ordering::Relaxed);
743
744 let transports_guard = self.transports.read().await;
746 let healthy_transports: Vec<TransportClient> = transports_guard
747 .iter()
748 .filter(|t| t.is_healthy())
749 .cloned()
750 .collect();
751 drop(transports_guard);
752
753 if healthy_transports.is_empty() {
754 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
755 anyhow::bail!("No healthy transport clients available");
756 }
757
758 let mut handles = Vec::new();
760 for transport in healthy_transports {
761 let handle = tokio::spawn(async move {
762 let client_id = transport.client_id.clone();
763 let result = transport
764 .executor
765 .cancel_all_orders(instrument_id, order_side)
766 .await;
767 (client_id, result)
768 });
769 handles.push(handle);
770 }
771
772 self.process_cancel_results(
773 handles,
774 || Ok(Vec::new()),
775 "Cancel all",
776 format!(
777 "(instrument_id={}, order_side={:?})",
778 instrument_id, order_side
779 ),
780 "no orders to cancel",
781 )
782 .await
783 }
784
785 pub fn get_metrics(&self) -> BroadcasterMetrics {
787 let transports_guard = self.transports.blocking_read();
788 let healthy_clients = transports_guard.iter().filter(|t| t.is_healthy()).count();
789 let total_clients = transports_guard.len();
790 drop(transports_guard);
791
792 BroadcasterMetrics {
793 total_cancels: self.total_cancels.load(Ordering::Relaxed),
794 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
795 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
796 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
797 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
798 healthy_clients,
799 total_clients,
800 }
801 }
802
803 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
805 let transports_guard = self.transports.read().await;
806 let healthy_clients = transports_guard.iter().filter(|t| t.is_healthy()).count();
807 let total_clients = transports_guard.len();
808 drop(transports_guard);
809
810 BroadcasterMetrics {
811 total_cancels: self.total_cancels.load(Ordering::Relaxed),
812 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
813 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
814 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
815 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
816 healthy_clients,
817 total_clients,
818 }
819 }
820
821 pub fn get_client_stats(&self) -> Vec<ClientStats> {
823 let transports = self.transports.blocking_read();
824 transports
825 .iter()
826 .map(|t| ClientStats {
827 client_id: t.client_id.clone(),
828 healthy: t.is_healthy(),
829 cancel_count: t.get_cancel_count(),
830 error_count: t.get_error_count(),
831 })
832 .collect()
833 }
834
835 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
837 let transports = self.transports.read().await;
838 transports
839 .iter()
840 .map(|t| ClientStats {
841 client_id: t.client_id.clone(),
842 healthy: t.is_healthy(),
843 cancel_count: t.get_cancel_count(),
844 error_count: t.get_error_count(),
845 })
846 .collect()
847 }
848
849 pub fn add_instrument(&self, instrument: nautilus_model::instruments::any::InstrumentAny) {
851 let transports = self.transports.blocking_read();
852 for transport in transports.iter() {
853 transport.executor.add_instrument(instrument.clone());
854 }
855 }
856
857 pub fn clone_for_async(&self) -> Self {
858 Self {
859 config: self.config.clone(),
860 transports: Arc::clone(&self.transports),
861 health_check_task: Arc::clone(&self.health_check_task),
862 running: Arc::clone(&self.running),
863 total_cancels: Arc::clone(&self.total_cancels),
864 successful_cancels: Arc::clone(&self.successful_cancels),
865 failed_cancels: Arc::clone(&self.failed_cancels),
866 expected_rejects: Arc::clone(&self.expected_rejects),
867 idempotent_successes: Arc::clone(&self.idempotent_successes),
868 }
869 }
870
871 #[cfg(test)]
872 fn new_with_transports(
873 config: CancelBroadcasterConfig,
874 transports: Vec<TransportClient>,
875 ) -> Self {
876 Self {
877 config,
878 transports: Arc::new(RwLock::new(transports)),
879 health_check_task: Arc::new(RwLock::new(None)),
880 running: Arc::new(AtomicBool::new(false)),
881 total_cancels: Arc::new(AtomicU64::new(0)),
882 successful_cancels: Arc::new(AtomicU64::new(0)),
883 failed_cancels: Arc::new(AtomicU64::new(0)),
884 expected_rejects: Arc::new(AtomicU64::new(0)),
885 idempotent_successes: Arc::new(AtomicU64::new(0)),
886 }
887 }
888}
889
890#[derive(Debug, Clone)]
892pub struct BroadcasterMetrics {
893 pub total_cancels: u64,
894 pub successful_cancels: u64,
895 pub failed_cancels: u64,
896 pub expected_rejects: u64,
897 pub idempotent_successes: u64,
898 pub healthy_clients: usize,
899 pub total_clients: usize,
900}
901
902#[derive(Debug, Clone)]
904pub struct ClientStats {
905 pub client_id: String,
906 pub healthy: bool,
907 pub cancel_count: u64,
908 pub error_count: u64,
909}
910
911#[cfg(test)]
916mod tests {
917 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
918
919 use nautilus_core::UUID4;
920 use nautilus_model::{
921 enums::{
922 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
923 },
924 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
925 reports::OrderStatusReport,
926 types::{Price, Quantity},
927 };
928
929 use super::*;
930
931 #[derive(Clone)]
933 #[allow(clippy::type_complexity)]
934 struct MockExecutor {
935 handler: Arc<
936 dyn Fn(
937 InstrumentId,
938 Option<ClientOrderId>,
939 Option<VenueOrderId>,
940 ) -> std::pin::Pin<
941 Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send>,
942 > + Send
943 + Sync,
944 >,
945 }
946
947 impl MockExecutor {
948 fn new<F, Fut>(handler: F) -> Self
949 where
950 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
951 + Send
952 + Sync
953 + 'static,
954 Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
955 {
956 Self {
957 handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
958 }
959 }
960 }
961
962 impl CancelExecutor for MockExecutor {
963 fn health_check(
964 &self,
965 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>
966 {
967 Box::pin(async { Ok(()) })
968 }
969
970 fn cancel_order(
971 &self,
972 instrument_id: InstrumentId,
973 client_order_id: Option<ClientOrderId>,
974 venue_order_id: Option<VenueOrderId>,
975 ) -> std::pin::Pin<
976 Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
977 > {
978 (self.handler)(instrument_id, client_order_id, venue_order_id)
979 }
980
981 fn cancel_orders(
982 &self,
983 _instrument_id: InstrumentId,
984 _client_order_ids: Option<Vec<ClientOrderId>>,
985 _venue_order_ids: Option<Vec<VenueOrderId>>,
986 ) -> std::pin::Pin<
987 Box<
988 dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
989 + Send
990 + '_,
991 >,
992 > {
993 Box::pin(async { Ok(Vec::new()) })
994 }
995
996 fn cancel_all_orders(
997 &self,
998 instrument_id: InstrumentId,
999 _order_side: Option<nautilus_model::enums::OrderSide>,
1000 ) -> std::pin::Pin<
1001 Box<
1002 dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
1003 + Send
1004 + '_,
1005 >,
1006 > {
1007 let handler = Arc::clone(&self.handler);
1009 Box::pin(async move {
1010 let result = handler(instrument_id, None, None).await;
1012 match result {
1013 Ok(_) => Ok(Vec::new()),
1014 Err(e) => Err(e),
1015 }
1016 })
1017 }
1018
1019 fn add_instrument(&self, _instrument: nautilus_model::instruments::any::InstrumentAny) {
1020 }
1022 }
1023
1024 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
1025 OrderStatusReport {
1026 account_id: AccountId::from("BITMEX-001"),
1027 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
1028 venue_order_id: VenueOrderId::from(venue_order_id),
1029 order_side: OrderSide::Buy,
1030 order_type: OrderType::Limit,
1031 time_in_force: TimeInForce::Gtc,
1032 order_status: OrderStatus::Canceled,
1033 price: Some(Price::new(50000.0, 2)),
1034 quantity: Quantity::new(100.0, 0),
1035 filled_qty: Quantity::new(0.0, 0),
1036 report_id: UUID4::new(),
1037 ts_accepted: 0.into(),
1038 ts_last: 0.into(),
1039 ts_init: 0.into(),
1040 client_order_id: None,
1041 avg_px: None,
1042 trigger_price: None,
1043 trigger_type: None,
1044 contingency_type: ContingencyType::NoContingency,
1045 expire_time: None,
1046 order_list_id: None,
1047 venue_position_id: None,
1048 linked_order_ids: None,
1049 parent_order_id: None,
1050 display_qty: None,
1051 limit_offset: None,
1052 trailing_offset: None,
1053 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
1054 post_only: false,
1055 reduce_only: false,
1056 cancel_reason: None,
1057 ts_triggered: None,
1058 }
1059 }
1060
1061 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
1062 where
1063 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
1064 + Send
1065 + Sync
1066 + 'static,
1067 Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
1068 {
1069 let executor = MockExecutor::new(handler);
1070 TransportClient::new(executor, client_id.to_string())
1071 }
1072
1073 #[tokio::test]
1074 async fn test_broadcast_cancel_immediate_success() {
1075 let report = create_test_report("ORDER-1");
1076 let report_clone = report.clone();
1077
1078 let transports = vec![
1079 create_stub_transport("client-0", move |_, _, _| {
1080 let report = report_clone.clone();
1081 async move { Ok(report) }
1082 }),
1083 create_stub_transport("client-1", |_, _, _| async {
1084 tokio::time::sleep(Duration::from_secs(10)).await;
1085 anyhow::bail!("Should be aborted")
1086 }),
1087 ];
1088
1089 let config = CancelBroadcasterConfig::default();
1090 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1091
1092 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1093 let result = broadcaster
1094 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1095 .await;
1096
1097 assert!(result.is_ok());
1098 let returned_report = result.unwrap();
1099 assert!(returned_report.is_some());
1100 assert_eq!(
1101 returned_report.unwrap().venue_order_id,
1102 report.venue_order_id
1103 );
1104
1105 let metrics = broadcaster.get_metrics_async().await;
1106 assert_eq!(metrics.successful_cancels, 1);
1107 assert_eq!(metrics.failed_cancels, 0);
1108 assert_eq!(metrics.total_cancels, 1);
1109 }
1110
1111 #[tokio::test]
1112 async fn test_broadcast_cancel_idempotent_success() {
1113 let transports = vec![
1114 create_stub_transport("client-0", |_, _, _| async {
1115 anyhow::bail!("AlreadyCanceled")
1116 }),
1117 create_stub_transport("client-1", |_, _, _| async {
1118 tokio::time::sleep(Duration::from_secs(10)).await;
1119 anyhow::bail!("Should be aborted")
1120 }),
1121 ];
1122
1123 let config = CancelBroadcasterConfig::default();
1124 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1125
1126 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1127 let result = broadcaster
1128 .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1129 .await;
1130
1131 assert!(result.is_ok());
1132 assert!(result.unwrap().is_none());
1133
1134 let metrics = broadcaster.get_metrics_async().await;
1135 assert_eq!(metrics.idempotent_successes, 1);
1136 assert_eq!(metrics.successful_cancels, 0);
1137 assert_eq!(metrics.failed_cancels, 0);
1138 }
1139
1140 #[tokio::test]
1141 async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1142 let transports = vec![
1143 create_stub_transport("client-0", |_, _, _| async {
1144 anyhow::bail!("502 Bad Gateway")
1145 }),
1146 create_stub_transport("client-1", |_, _, _| async {
1147 anyhow::bail!("orderID not found")
1148 }),
1149 ];
1150
1151 let config = CancelBroadcasterConfig::default();
1152 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1153
1154 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1155 let result = broadcaster
1156 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1157 .await;
1158
1159 assert!(result.is_ok());
1160 assert!(result.unwrap().is_none());
1161
1162 let metrics = broadcaster.get_metrics_async().await;
1163 assert_eq!(metrics.idempotent_successes, 1);
1164 assert_eq!(metrics.failed_cancels, 0);
1165 }
1166
1167 #[tokio::test]
1168 async fn test_broadcast_cancel_all_failures() {
1169 let transports = vec![
1170 create_stub_transport("client-0", |_, _, _| async {
1171 anyhow::bail!("502 Bad Gateway")
1172 }),
1173 create_stub_transport("client-1", |_, _, _| async {
1174 anyhow::bail!("Connection refused")
1175 }),
1176 ];
1177
1178 let config = CancelBroadcasterConfig::default();
1179 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1180
1181 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1182 let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1183
1184 assert!(result.is_err());
1185 assert!(
1186 result
1187 .unwrap_err()
1188 .to_string()
1189 .contains("All cancel all requests failed")
1190 );
1191
1192 let metrics = broadcaster.get_metrics_async().await;
1193 assert_eq!(metrics.failed_cancels, 1);
1194 assert_eq!(metrics.successful_cancels, 0);
1195 assert_eq!(metrics.idempotent_successes, 0);
1196 }
1197
1198 #[tokio::test]
1199 async fn test_broadcast_cancel_no_healthy_clients() {
1200 let transport = create_stub_transport("client-0", |_, _, _| async {
1201 Ok(create_test_report("ORDER-1"))
1202 });
1203 transport.healthy.store(false, Ordering::Relaxed);
1204
1205 let config = CancelBroadcasterConfig::default();
1206 let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1207
1208 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1209 let result = broadcaster
1210 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1211 .await;
1212
1213 assert!(result.is_err());
1214 assert!(
1215 result
1216 .unwrap_err()
1217 .to_string()
1218 .contains("No healthy transport clients available")
1219 );
1220
1221 let metrics = broadcaster.get_metrics_async().await;
1222 assert_eq!(metrics.failed_cancels, 1);
1223 }
1224
1225 #[tokio::test]
1226 async fn test_broadcast_cancel_metrics_increment() {
1227 let report1 = create_test_report("ORDER-1");
1228 let report1_clone = report1.clone();
1229 let report2 = create_test_report("ORDER-2");
1230 let report2_clone = report2.clone();
1231
1232 let transports = vec![
1233 create_stub_transport("client-0", move |_, _, _| {
1234 let report = report1_clone.clone();
1235 async move { Ok(report) }
1236 }),
1237 create_stub_transport("client-1", move |_, _, _| {
1238 let report = report2_clone.clone();
1239 async move { Ok(report) }
1240 }),
1241 ];
1242
1243 let config = CancelBroadcasterConfig::default();
1244 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1245
1246 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1247
1248 let _ = broadcaster
1249 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1250 .await;
1251
1252 let _ = broadcaster
1253 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1254 .await;
1255
1256 let metrics = broadcaster.get_metrics_async().await;
1257 assert_eq!(metrics.total_cancels, 2);
1258 assert_eq!(metrics.successful_cancels, 2);
1259 }
1260
1261 #[tokio::test]
1262 async fn test_broadcast_cancel_expected_reject_pattern() {
1263 let transports = vec![
1264 create_stub_transport("client-0", |_, _, _| async {
1265 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1266 }),
1267 create_stub_transport("client-1", |_, _, _| async {
1268 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1269 }),
1270 ];
1271
1272 let config = CancelBroadcasterConfig::default();
1273 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1274
1275 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1276 let result = broadcaster
1277 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1278 .await;
1279
1280 assert!(result.is_err());
1281
1282 let metrics = broadcaster.get_metrics_async().await;
1283 assert_eq!(metrics.expected_rejects, 2);
1284 assert_eq!(metrics.failed_cancels, 1);
1285 }
1286
1287 #[tokio::test]
1288 async fn test_broadcaster_creation_with_pool() {
1289 let config = CancelBroadcasterConfig {
1290 pool_size: 3,
1291 api_key: Some("test_key".to_string()),
1292 api_secret: Some("test_secret".to_string()),
1293 base_url: Some("https://test.example.com".to_string()),
1294 testnet: false,
1295 timeout_secs: Some(5),
1296 max_retries: Some(2),
1297 retry_delay_ms: Some(100),
1298 retry_delay_max_ms: Some(1000),
1299 recv_window_ms: Some(5000),
1300 max_requests_per_second: Some(10),
1301 max_requests_per_minute: Some(100),
1302 health_check_interval_secs: 30,
1303 health_check_timeout_secs: 5,
1304 expected_reject_patterns: vec!["test_pattern".to_string()],
1305 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1306 };
1307
1308 let broadcaster = CancelBroadcaster::new(config.clone());
1309 assert!(broadcaster.is_ok());
1310
1311 let broadcaster = broadcaster.unwrap();
1312 let metrics = broadcaster.get_metrics_async().await;
1313
1314 assert_eq!(metrics.total_clients, 3);
1315 assert_eq!(metrics.total_cancels, 0);
1316 assert_eq!(metrics.successful_cancels, 0);
1317 assert_eq!(metrics.failed_cancels, 0);
1318 }
1319
1320 #[tokio::test]
1321 async fn test_broadcaster_lifecycle() {
1322 let config = CancelBroadcasterConfig {
1323 pool_size: 2,
1324 api_key: Some("test_key".to_string()),
1325 api_secret: Some("test_secret".to_string()),
1326 base_url: Some("https://test.example.com".to_string()),
1327 testnet: false,
1328 timeout_secs: Some(5),
1329 max_retries: None,
1330 retry_delay_ms: None,
1331 retry_delay_max_ms: None,
1332 recv_window_ms: None,
1333 max_requests_per_second: None,
1334 max_requests_per_minute: None,
1335 health_check_interval_secs: 60, health_check_timeout_secs: 1,
1337 expected_reject_patterns: vec![],
1338 idempotent_success_patterns: vec![],
1339 };
1340
1341 let broadcaster = CancelBroadcaster::new(config).unwrap();
1342
1343 assert!(!broadcaster.running.load(Ordering::Relaxed));
1345
1346 let start_result = broadcaster.start().await;
1348 assert!(start_result.is_ok());
1349 assert!(broadcaster.running.load(Ordering::Relaxed));
1350
1351 let start_again = broadcaster.start().await;
1353 assert!(start_again.is_ok());
1354
1355 broadcaster.stop().await;
1357 assert!(!broadcaster.running.load(Ordering::Relaxed));
1358
1359 broadcaster.stop().await;
1361 assert!(!broadcaster.running.load(Ordering::Relaxed));
1362 }
1363
1364 #[tokio::test]
1365 async fn test_client_stats_collection() {
1366 let config = CancelBroadcasterConfig {
1367 pool_size: 2,
1368 api_key: Some("test_key".to_string()),
1369 api_secret: Some("test_secret".to_string()),
1370 base_url: Some("https://test.example.com".to_string()),
1371 testnet: false,
1372 timeout_secs: Some(5),
1373 max_retries: None,
1374 retry_delay_ms: None,
1375 retry_delay_max_ms: None,
1376 recv_window_ms: None,
1377 max_requests_per_second: None,
1378 max_requests_per_minute: None,
1379 health_check_interval_secs: 60,
1380 health_check_timeout_secs: 5,
1381 expected_reject_patterns: vec![],
1382 idempotent_success_patterns: vec![],
1383 };
1384
1385 let broadcaster = CancelBroadcaster::new(config).unwrap();
1386 let stats = broadcaster.get_client_stats_async().await;
1387
1388 assert_eq!(stats.len(), 2);
1389 assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1390 assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1391 assert!(stats[0].healthy); assert!(stats[1].healthy);
1393 assert_eq!(stats[0].cancel_count, 0);
1394 assert_eq!(stats[1].cancel_count, 0);
1395 assert_eq!(stats[0].error_count, 0);
1396 assert_eq!(stats[1].error_count, 0);
1397 }
1398
1399 #[tokio::test]
1400 async fn test_testnet_config_sets_base_url() {
1401 let config = CancelBroadcasterConfig {
1402 pool_size: 1,
1403 api_key: Some("test_key".to_string()),
1404 api_secret: Some("test_secret".to_string()),
1405 base_url: None, testnet: true, timeout_secs: Some(5),
1408 max_retries: None,
1409 retry_delay_ms: None,
1410 retry_delay_max_ms: None,
1411 recv_window_ms: None,
1412 max_requests_per_second: None,
1413 max_requests_per_minute: None,
1414 health_check_interval_secs: 60,
1415 health_check_timeout_secs: 5,
1416 expected_reject_patterns: vec![],
1417 idempotent_success_patterns: vec![],
1418 };
1419
1420 let broadcaster = CancelBroadcaster::new(config);
1421 assert!(broadcaster.is_ok());
1422 }
1423
1424 #[tokio::test]
1425 async fn test_default_config() {
1426 let config = CancelBroadcasterConfig {
1427 api_key: Some("test_key".to_string()),
1428 api_secret: Some("test_secret".to_string()),
1429 base_url: Some("https://test.example.com".to_string()),
1430 ..Default::default()
1431 };
1432
1433 let broadcaster = CancelBroadcaster::new(config);
1434 assert!(broadcaster.is_ok());
1435
1436 let broadcaster = broadcaster.unwrap();
1437 let metrics = broadcaster.get_metrics_async().await;
1438
1439 assert_eq!(metrics.total_clients, 2);
1441 }
1442
1443 #[tokio::test]
1444 async fn test_clone_for_async() {
1445 let config = CancelBroadcasterConfig {
1446 pool_size: 1,
1447 api_key: Some("test_key".to_string()),
1448 api_secret: Some("test_secret".to_string()),
1449 base_url: Some("https://test.example.com".to_string()),
1450 testnet: false,
1451 timeout_secs: Some(5),
1452 max_retries: None,
1453 retry_delay_ms: None,
1454 retry_delay_max_ms: None,
1455 recv_window_ms: None,
1456 max_requests_per_second: None,
1457 max_requests_per_minute: None,
1458 health_check_interval_secs: 60,
1459 health_check_timeout_secs: 5,
1460 expected_reject_patterns: vec![],
1461 idempotent_success_patterns: vec![],
1462 };
1463
1464 let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1465
1466 broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1468
1469 let broadcaster2 = broadcaster1.clone_for_async();
1471 let metrics2 = broadcaster2.get_metrics_async().await;
1472
1473 assert_eq!(metrics2.total_cancels, 1); broadcaster2
1477 .successful_cancels
1478 .fetch_add(5, Ordering::Relaxed);
1479
1480 let metrics1 = broadcaster1.get_metrics_async().await;
1482 assert_eq!(metrics1.successful_cancels, 5);
1483 }
1484
1485 #[tokio::test]
1486 async fn test_pattern_matching() {
1487 let config = CancelBroadcasterConfig {
1489 pool_size: 1,
1490 api_key: Some("test_key".to_string()),
1491 api_secret: Some("test_secret".to_string()),
1492 base_url: Some("https://test.example.com".to_string()),
1493 testnet: false,
1494 timeout_secs: Some(5),
1495 max_retries: None,
1496 retry_delay_ms: None,
1497 retry_delay_max_ms: None,
1498 recv_window_ms: None,
1499 max_requests_per_second: None,
1500 max_requests_per_minute: None,
1501 health_check_interval_secs: 60,
1502 health_check_timeout_secs: 5,
1503 expected_reject_patterns: vec![
1504 "ParticipateDoNotInitiate".to_string(),
1505 "Close-only".to_string(),
1506 ],
1507 idempotent_success_patterns: vec![
1508 "AlreadyCanceled".to_string(),
1509 "orderID not found".to_string(),
1510 "Unable to cancel".to_string(),
1511 ],
1512 };
1513
1514 let broadcaster = CancelBroadcaster::new(config).unwrap();
1515
1516 assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1518 assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1519 assert!(!broadcaster.is_expected_reject("Connection timeout"));
1520
1521 assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1523 assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1524 assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1525 assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1526 }
1527
1528 #[tokio::test]
1532 async fn test_broadcast_batch_cancel_structure() {
1533 let config = CancelBroadcasterConfig {
1535 pool_size: 2,
1536 api_key: Some("test_key".to_string()),
1537 api_secret: Some("test_secret".to_string()),
1538 base_url: Some("https://test.example.com".to_string()),
1539 testnet: false,
1540 timeout_secs: Some(5),
1541 max_retries: None,
1542 retry_delay_ms: None,
1543 retry_delay_max_ms: None,
1544 recv_window_ms: None,
1545 max_requests_per_second: None,
1546 max_requests_per_minute: None,
1547 health_check_interval_secs: 60,
1548 health_check_timeout_secs: 5,
1549 expected_reject_patterns: vec![],
1550 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1551 };
1552
1553 let broadcaster = CancelBroadcaster::new(config).unwrap();
1554 let metrics = broadcaster.get_metrics_async().await;
1555
1556 assert_eq!(metrics.total_clients, 2);
1558 assert_eq!(metrics.total_cancels, 0);
1559 assert_eq!(metrics.successful_cancels, 0);
1560 assert_eq!(metrics.failed_cancels, 0);
1561 }
1562
1563 #[tokio::test]
1564 async fn test_broadcast_cancel_all_structure() {
1565 let config = CancelBroadcasterConfig {
1567 pool_size: 3,
1568 api_key: Some("test_key".to_string()),
1569 api_secret: Some("test_secret".to_string()),
1570 base_url: Some("https://test.example.com".to_string()),
1571 testnet: false,
1572 timeout_secs: Some(5),
1573 max_retries: None,
1574 retry_delay_ms: None,
1575 retry_delay_max_ms: None,
1576 recv_window_ms: None,
1577 max_requests_per_second: None,
1578 max_requests_per_minute: None,
1579 health_check_interval_secs: 60,
1580 health_check_timeout_secs: 5,
1581 expected_reject_patterns: vec![],
1582 idempotent_success_patterns: vec!["orderID not found".to_string()],
1583 };
1584
1585 let broadcaster = CancelBroadcaster::new(config).unwrap();
1586 let metrics = broadcaster.get_metrics_async().await;
1587
1588 assert_eq!(metrics.total_clients, 3);
1590 assert_eq!(metrics.healthy_clients, 3);
1591 assert_eq!(metrics.total_cancels, 0);
1592 }
1593
1594 #[tokio::test]
1596 async fn test_single_cancel_metrics_with_mixed_responses() {
1597 let transports = vec![
1600 create_stub_transport("client-0", |_, _, _| async {
1601 anyhow::bail!("Connection timeout")
1602 }),
1603 create_stub_transport("client-1", |_, _, _| async {
1604 anyhow::bail!("AlreadyCanceled")
1605 }),
1606 ];
1607
1608 let config = CancelBroadcasterConfig::default();
1609 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1610
1611 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1612 let result = broadcaster
1613 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1614 .await;
1615
1616 assert!(result.is_ok());
1618 assert!(result.unwrap().is_none());
1619
1620 let metrics = broadcaster.get_metrics_async().await;
1622 assert_eq!(
1623 metrics.failed_cancels, 0,
1624 "Idempotent success should not increment failed_cancels"
1625 );
1626 assert_eq!(metrics.idempotent_successes, 1);
1627 assert_eq!(metrics.successful_cancels, 0);
1628 }
1629
1630 #[tokio::test]
1631 async fn test_metrics_initialization_and_health() {
1632 let config = CancelBroadcasterConfig {
1634 pool_size: 4,
1635 api_key: Some("test_key".to_string()),
1636 api_secret: Some("test_secret".to_string()),
1637 base_url: Some("https://test.example.com".to_string()),
1638 testnet: false,
1639 timeout_secs: Some(5),
1640 max_retries: None,
1641 retry_delay_ms: None,
1642 retry_delay_max_ms: None,
1643 recv_window_ms: None,
1644 max_requests_per_second: None,
1645 max_requests_per_minute: None,
1646 health_check_interval_secs: 60,
1647 health_check_timeout_secs: 5,
1648 expected_reject_patterns: vec![],
1649 idempotent_success_patterns: vec![],
1650 };
1651
1652 let broadcaster = CancelBroadcaster::new(config).unwrap();
1653 let metrics = broadcaster.get_metrics_async().await;
1654
1655 assert_eq!(metrics.total_cancels, 0);
1657 assert_eq!(metrics.successful_cancels, 0);
1658 assert_eq!(metrics.failed_cancels, 0);
1659 assert_eq!(metrics.expected_rejects, 0);
1660 assert_eq!(metrics.idempotent_successes, 0);
1661
1662 assert_eq!(metrics.healthy_clients, 4);
1664 assert_eq!(metrics.total_clients, 4);
1665 }
1666
1667 #[tokio::test]
1669 async fn test_health_check_task_lifecycle() {
1670 let config = CancelBroadcasterConfig {
1671 pool_size: 1,
1672 api_key: Some("test_key".to_string()),
1673 api_secret: Some("test_secret".to_string()),
1674 base_url: Some("https://test.example.com".to_string()),
1675 testnet: false,
1676 timeout_secs: Some(5),
1677 max_retries: None,
1678 retry_delay_ms: None,
1679 retry_delay_max_ms: None,
1680 recv_window_ms: None,
1681 max_requests_per_second: None,
1682 max_requests_per_minute: None,
1683 health_check_interval_secs: 1, health_check_timeout_secs: 1,
1685 expected_reject_patterns: vec![],
1686 idempotent_success_patterns: vec![],
1687 };
1688
1689 let broadcaster = CancelBroadcaster::new(config).unwrap();
1690
1691 broadcaster.start().await.unwrap();
1693 assert!(broadcaster.running.load(Ordering::Relaxed));
1694
1695 {
1697 let task_guard = broadcaster.health_check_task.read().await;
1698 assert!(task_guard.is_some());
1699 }
1700
1701 tokio::time::sleep(Duration::from_millis(100)).await;
1703
1704 broadcaster.stop().await;
1706 assert!(!broadcaster.running.load(Ordering::Relaxed));
1707
1708 {
1710 let task_guard = broadcaster.health_check_task.read().await;
1711 assert!(task_guard.is_none());
1712 }
1713 }
1714}