1use std::{
34 fmt::Debug,
35 future::Future,
36 pin::Pin,
37 sync::{
38 Arc,
39 atomic::{AtomicBool, AtomicU64, Ordering},
40 },
41 time::Duration,
42};
43
44use futures_util::future;
45use nautilus_model::{
46 enums::OrderSide,
47 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
48 instruments::InstrumentAny,
49 reports::OrderStatusReport,
50};
51use tokio::{sync::RwLock, task::JoinHandle, time::interval};
52
53use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
54
55trait CancelExecutor: Send + Sync {
77 fn add_instrument(&self, instrument: InstrumentAny);
79
80 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
82
83 fn cancel_order(
85 &self,
86 instrument_id: InstrumentId,
87 client_order_id: Option<ClientOrderId>,
88 venue_order_id: Option<VenueOrderId>,
89 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
90
91 fn cancel_orders(
93 &self,
94 instrument_id: InstrumentId,
95 client_order_ids: Option<Vec<ClientOrderId>>,
96 venue_order_ids: Option<Vec<VenueOrderId>>,
97 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
98
99 fn cancel_all_orders(
101 &self,
102 instrument_id: InstrumentId,
103 order_side: Option<OrderSide>,
104 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
105}
106
107impl CancelExecutor for BitmexHttpClient {
108 fn add_instrument(&self, instrument: InstrumentAny) {
109 Self::cache_instrument(self, instrument);
110 }
111
112 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
113 Box::pin(async move {
114 Self::get_server_time(self)
115 .await
116 .map(|_| ())
117 .map_err(|e| anyhow::anyhow!("{e}"))
118 })
119 }
120
121 fn cancel_order(
122 &self,
123 instrument_id: InstrumentId,
124 client_order_id: Option<ClientOrderId>,
125 venue_order_id: Option<VenueOrderId>,
126 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
127 Box::pin(async move {
128 Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
129 })
130 }
131
132 fn cancel_orders(
133 &self,
134 instrument_id: InstrumentId,
135 client_order_ids: Option<Vec<ClientOrderId>>,
136 venue_order_ids: Option<Vec<VenueOrderId>>,
137 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
138 Box::pin(async move {
139 Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
140 })
141 }
142
143 fn cancel_all_orders(
144 &self,
145 instrument_id: InstrumentId,
146 order_side: Option<OrderSide>,
147 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
148 Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct CancelBroadcasterConfig {
155 pub pool_size: usize,
157 pub api_key: Option<String>,
159 pub api_secret: Option<String>,
161 pub base_url: Option<String>,
163 pub testnet: bool,
165 pub timeout_secs: Option<u64>,
167 pub max_retries: Option<u32>,
169 pub retry_delay_ms: Option<u64>,
171 pub retry_delay_max_ms: Option<u64>,
173 pub recv_window_ms: Option<u64>,
175 pub max_requests_per_second: Option<u32>,
177 pub max_requests_per_minute: Option<u32>,
179 pub health_check_interval_secs: u64,
181 pub health_check_timeout_secs: u64,
183 pub expected_reject_patterns: Vec<String>,
185 pub idempotent_success_patterns: Vec<String>,
187 pub proxy_urls: Vec<Option<String>>,
193}
194
195impl Default for CancelBroadcasterConfig {
196 fn default() -> Self {
197 Self {
198 pool_size: 2,
199 api_key: None,
200 api_secret: None,
201 base_url: None,
202 testnet: false,
203 timeout_secs: Some(60),
204 max_retries: None,
205 retry_delay_ms: Some(1_000),
206 retry_delay_max_ms: Some(5_000),
207 recv_window_ms: Some(10_000),
208 max_requests_per_second: Some(10),
209 max_requests_per_minute: Some(120),
210 health_check_interval_secs: 30,
211 health_check_timeout_secs: 5,
212 expected_reject_patterns: vec![
213 r"Order had execInst of ParticipateDoNotInitiate".to_string(),
214 ],
215 idempotent_success_patterns: vec![
216 r"AlreadyCanceled".to_string(),
217 r"orderID not found".to_string(),
218 r"Unable to cancel order due to existing state".to_string(),
219 ],
220 proxy_urls: vec![],
221 }
222 }
223}
224
225#[derive(Clone)]
227struct TransportClient {
228 executor: Arc<dyn CancelExecutor>,
233 client_id: String,
234 healthy: Arc<AtomicBool>,
235 cancel_count: Arc<AtomicU64>,
236 error_count: Arc<AtomicU64>,
237}
238
239impl Debug for TransportClient {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("TransportClient")
242 .field("client_id", &self.client_id)
243 .field("healthy", &self.healthy)
244 .field("cancel_count", &self.cancel_count)
245 .field("error_count", &self.error_count)
246 .finish()
247 }
248}
249
250impl TransportClient {
251 fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
252 Self {
253 executor: Arc::new(executor),
254 client_id,
255 healthy: Arc::new(AtomicBool::new(true)),
256 cancel_count: Arc::new(AtomicU64::new(0)),
257 error_count: Arc::new(AtomicU64::new(0)),
258 }
259 }
260
261 fn is_healthy(&self) -> bool {
262 self.healthy.load(Ordering::Relaxed)
263 }
264
265 fn mark_healthy(&self) {
266 self.healthy.store(true, Ordering::Relaxed);
267 }
268
269 fn mark_unhealthy(&self) {
270 self.healthy.store(false, Ordering::Relaxed);
271 }
272
273 fn get_cancel_count(&self) -> u64 {
274 self.cancel_count.load(Ordering::Relaxed)
275 }
276
277 fn get_error_count(&self) -> u64 {
278 self.error_count.load(Ordering::Relaxed)
279 }
280
281 async fn health_check(&self, timeout_secs: u64) -> bool {
282 match tokio::time::timeout(
283 Duration::from_secs(timeout_secs),
284 self.executor.health_check(),
285 )
286 .await
287 {
288 Ok(Ok(_)) => {
289 self.mark_healthy();
290 true
291 }
292 Ok(Err(e)) => {
293 tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
294 self.mark_unhealthy();
295 false
296 }
297 Err(_) => {
298 tracing::warn!("Health check timeout for client {}", self.client_id);
299 self.mark_unhealthy();
300 false
301 }
302 }
303 }
304
305 async fn cancel_order(
306 &self,
307 instrument_id: InstrumentId,
308 client_order_id: Option<ClientOrderId>,
309 venue_order_id: Option<VenueOrderId>,
310 ) -> anyhow::Result<OrderStatusReport> {
311 self.cancel_count.fetch_add(1, Ordering::Relaxed);
312
313 match self
314 .executor
315 .cancel_order(instrument_id, client_order_id, venue_order_id)
316 .await
317 {
318 Ok(report) => {
319 self.mark_healthy();
320 Ok(report)
321 }
322 Err(e) => {
323 self.error_count.fetch_add(1, Ordering::Relaxed);
324 Err(e)
325 }
326 }
327 }
328}
329
330#[cfg_attr(feature = "python", pyo3::pyclass)]
336#[derive(Debug)]
337pub struct CancelBroadcaster {
338 config: CancelBroadcasterConfig,
339 transports: Arc<Vec<TransportClient>>,
340 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
341 running: Arc<AtomicBool>,
342 total_cancels: Arc<AtomicU64>,
343 successful_cancels: Arc<AtomicU64>,
344 failed_cancels: Arc<AtomicU64>,
345 expected_rejects: Arc<AtomicU64>,
346 idempotent_successes: Arc<AtomicU64>,
347}
348
349impl CancelBroadcaster {
350 pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
356 let mut transports = Vec::with_capacity(config.pool_size);
357
358 let base_url = if config.testnet && config.base_url.is_none() {
360 Some(BITMEX_HTTP_TESTNET_URL.to_string())
361 } else {
362 config.base_url.clone()
363 };
364
365 for i in 0..config.pool_size {
366 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
368
369 let client = BitmexHttpClient::with_credentials(
370 config.api_key.clone(),
371 config.api_secret.clone(),
372 base_url.clone(),
373 config.timeout_secs,
374 config.max_retries,
375 config.retry_delay_ms,
376 config.retry_delay_max_ms,
377 config.recv_window_ms,
378 config.max_requests_per_second,
379 config.max_requests_per_minute,
380 proxy_url,
381 )
382 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
383
384 transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
385 }
386
387 Ok(Self {
388 config,
389 transports: Arc::new(transports),
390 health_check_task: Arc::new(RwLock::new(None)),
391 running: Arc::new(AtomicBool::new(false)),
392 total_cancels: Arc::new(AtomicU64::new(0)),
393 successful_cancels: Arc::new(AtomicU64::new(0)),
394 failed_cancels: Arc::new(AtomicU64::new(0)),
395 expected_rejects: Arc::new(AtomicU64::new(0)),
396 idempotent_successes: Arc::new(AtomicU64::new(0)),
397 })
398 }
399
400 pub async fn start(&self) -> anyhow::Result<()> {
406 if self.running.load(Ordering::Relaxed) {
407 return Ok(());
408 }
409
410 self.running.store(true, Ordering::Relaxed);
411
412 self.run_health_checks().await;
414
415 let transports = Arc::clone(&self.transports);
417 let running = Arc::clone(&self.running);
418 let interval_secs = self.config.health_check_interval_secs;
419 let timeout_secs = self.config.health_check_timeout_secs;
420
421 let task = tokio::spawn(async move {
422 let mut ticker = interval(Duration::from_secs(interval_secs));
423 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
424
425 loop {
426 ticker.tick().await;
427
428 if !running.load(Ordering::Relaxed) {
429 break;
430 }
431
432 let tasks: Vec<_> = transports
433 .iter()
434 .map(|t| t.health_check(timeout_secs))
435 .collect();
436
437 let results = future::join_all(tasks).await;
438 let healthy_count = results.iter().filter(|&&r| r).count();
439
440 tracing::debug!(
441 "Health check complete: {}/{} clients healthy",
442 healthy_count,
443 results.len()
444 );
445 }
446 });
447
448 *self.health_check_task.write().await = Some(task);
449
450 tracing::info!(
451 "CancelBroadcaster started with {} clients",
452 self.transports.len()
453 );
454
455 Ok(())
456 }
457
458 pub async fn stop(&self) {
460 if !self.running.load(Ordering::Relaxed) {
461 return;
462 }
463
464 self.running.store(false, Ordering::Relaxed);
465
466 if let Some(task) = self.health_check_task.write().await.take() {
467 task.abort();
468 }
469
470 tracing::info!("CancelBroadcaster stopped");
471 }
472
473 async fn run_health_checks(&self) {
474 let tasks: Vec<_> = self
475 .transports
476 .iter()
477 .map(|t| t.health_check(self.config.health_check_timeout_secs))
478 .collect();
479
480 let results = future::join_all(tasks).await;
481 let healthy_count = results.iter().filter(|&&r| r).count();
482
483 tracing::debug!(
484 "Health check complete: {}/{} clients healthy",
485 healthy_count,
486 results.len()
487 );
488 }
489
490 fn is_expected_reject(&self, error_message: &str) -> bool {
491 self.config
492 .expected_reject_patterns
493 .iter()
494 .any(|pattern| error_message.contains(pattern))
495 }
496
497 fn is_idempotent_success(&self, error_message: &str) -> bool {
498 self.config
499 .idempotent_success_patterns
500 .iter()
501 .any(|pattern| error_message.contains(pattern))
502 }
503
504 async fn process_cancel_results<T>(
508 &self,
509 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
510 idempotent_result: impl FnOnce() -> anyhow::Result<T>,
511 operation: &str,
512 params: String,
513 idempotent_reason: &str,
514 ) -> anyhow::Result<T>
515 where
516 T: Send + 'static,
517 {
518 let mut errors = Vec::new();
519
520 while !handles.is_empty() {
521 let current_handles = std::mem::take(&mut handles);
522 let (result, _idx, remaining) = future::select_all(current_handles).await;
523 handles = remaining.into_iter().collect();
524
525 match result {
526 Ok((client_id, Ok(result))) => {
527 for handle in &handles {
529 handle.abort();
530 }
531 self.successful_cancels.fetch_add(1, Ordering::Relaxed);
532
533 tracing::debug!(
534 "{} broadcast succeeded [{}] {}",
535 operation,
536 client_id,
537 params
538 );
539
540 return Ok(result);
541 }
542 Ok((client_id, Err(e))) => {
543 let error_msg = e.to_string();
544
545 if self.is_idempotent_success(&error_msg) {
546 for handle in &handles {
548 handle.abort();
549 }
550 self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
551
552 tracing::debug!(
553 "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
554 );
555
556 return idempotent_result();
557 }
558
559 if self.is_expected_reject(&error_msg) {
560 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
561 tracing::debug!(
562 "Expected {} rejection [{}]: {} {}",
563 operation.to_lowercase(),
564 client_id,
565 error_msg,
566 params
567 );
568 errors.push(error_msg);
569 } else {
570 tracing::warn!(
571 "{} request failed [{}]: {} {}",
572 operation,
573 client_id,
574 error_msg,
575 params
576 );
577 errors.push(error_msg);
578 }
579 }
580 Err(e) => {
581 tracing::warn!("{} task join error: {e:?}", operation);
582 errors.push(format!("Task panicked: {e:?}"));
583 }
584 }
585 }
586
587 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
589 tracing::error!(
590 "All {} requests failed: {errors:?} {params}",
591 operation.to_lowercase(),
592 );
593 Err(anyhow::anyhow!(
594 "All {} requests failed: {errors:?}",
595 operation.to_lowercase(),
596 ))
597 }
598
599 pub async fn broadcast_cancel(
611 &self,
612 instrument_id: InstrumentId,
613 client_order_id: Option<ClientOrderId>,
614 venue_order_id: Option<VenueOrderId>,
615 ) -> anyhow::Result<Option<OrderStatusReport>> {
616 self.total_cancels.fetch_add(1, Ordering::Relaxed);
617
618 let healthy_transports: Vec<TransportClient> = self
619 .transports
620 .iter()
621 .filter(|t| t.is_healthy())
622 .cloned()
623 .collect();
624
625 if healthy_transports.is_empty() {
626 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
627 anyhow::bail!("No healthy transport clients available");
628 }
629
630 let mut handles = Vec::new();
631 for transport in healthy_transports {
632 let handle = tokio::spawn(async move {
633 let client_id = transport.client_id.clone();
634 let result = transport
635 .cancel_order(instrument_id, client_order_id, venue_order_id)
636 .await
637 .map(Some); (client_id, result)
639 });
640 handles.push(handle);
641 }
642
643 self.process_cancel_results(
644 handles,
645 || Ok(None),
646 "Cancel",
647 format!(
648 "(client_order_id={:?}, venue_order_id={:?})",
649 client_order_id, venue_order_id
650 ),
651 "order already cancelled/not found",
652 )
653 .await
654 }
655
656 pub async fn broadcast_batch_cancel(
662 &self,
663 instrument_id: InstrumentId,
664 client_order_ids: Option<Vec<ClientOrderId>>,
665 venue_order_ids: Option<Vec<VenueOrderId>>,
666 ) -> anyhow::Result<Vec<OrderStatusReport>> {
667 self.total_cancels.fetch_add(1, Ordering::Relaxed);
668
669 let healthy_transports: Vec<TransportClient> = self
670 .transports
671 .iter()
672 .filter(|t| t.is_healthy())
673 .cloned()
674 .collect();
675
676 if healthy_transports.is_empty() {
677 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
678 anyhow::bail!("No healthy transport clients available");
679 }
680
681 let mut handles = Vec::new();
682
683 for transport in healthy_transports {
684 let client_order_ids_clone = client_order_ids.clone();
685 let venue_order_ids_clone = venue_order_ids.clone();
686 let handle = tokio::spawn(async move {
687 let client_id = transport.client_id.clone();
688 let result = transport
689 .executor
690 .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
691 .await;
692 (client_id, result)
693 });
694 handles.push(handle);
695 }
696
697 self.process_cancel_results(
698 handles,
699 || Ok(Vec::new()),
700 "Batch cancel",
701 format!(
702 "(client_order_ids={:?}, venue_order_ids={:?})",
703 client_order_ids, venue_order_ids
704 ),
705 "orders already cancelled/not found",
706 )
707 .await
708 }
709
710 pub async fn broadcast_cancel_all(
716 &self,
717 instrument_id: InstrumentId,
718 order_side: Option<OrderSide>,
719 ) -> anyhow::Result<Vec<OrderStatusReport>> {
720 self.total_cancels.fetch_add(1, Ordering::Relaxed);
721
722 let healthy_transports: Vec<TransportClient> = self
723 .transports
724 .iter()
725 .filter(|t| t.is_healthy())
726 .cloned()
727 .collect();
728
729 if healthy_transports.is_empty() {
730 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
731 anyhow::bail!("No healthy transport clients available");
732 }
733
734 let mut handles = Vec::new();
735 for transport in healthy_transports {
736 let handle = tokio::spawn(async move {
737 let client_id = transport.client_id.clone();
738 let result = transport
739 .executor
740 .cancel_all_orders(instrument_id, order_side)
741 .await;
742 (client_id, result)
743 });
744 handles.push(handle);
745 }
746
747 self.process_cancel_results(
748 handles,
749 || Ok(Vec::new()),
750 "Cancel all",
751 format!(
752 "(instrument_id={}, order_side={:?})",
753 instrument_id, order_side
754 ),
755 "no orders to cancel",
756 )
757 .await
758 }
759
760 pub fn get_metrics(&self) -> BroadcasterMetrics {
762 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
763 let total_clients = self.transports.len();
764
765 BroadcasterMetrics {
766 total_cancels: self.total_cancels.load(Ordering::Relaxed),
767 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
768 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
769 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
770 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
771 healthy_clients,
772 total_clients,
773 }
774 }
775
776 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
778 self.get_metrics()
779 }
780
781 pub fn get_client_stats(&self) -> Vec<ClientStats> {
783 self.transports
784 .iter()
785 .map(|t| ClientStats {
786 client_id: t.client_id.clone(),
787 healthy: t.is_healthy(),
788 cancel_count: t.get_cancel_count(),
789 error_count: t.get_error_count(),
790 })
791 .collect()
792 }
793
794 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
796 self.get_client_stats()
797 }
798
799 pub fn cache_instrument(&self, instrument: InstrumentAny) {
801 for transport in self.transports.iter() {
802 transport.executor.add_instrument(instrument.clone());
803 }
804 }
805
806 pub fn clone_for_async(&self) -> Self {
807 Self {
808 config: self.config.clone(),
809 transports: Arc::clone(&self.transports),
810 health_check_task: Arc::clone(&self.health_check_task),
811 running: Arc::clone(&self.running),
812 total_cancels: Arc::clone(&self.total_cancels),
813 successful_cancels: Arc::clone(&self.successful_cancels),
814 failed_cancels: Arc::clone(&self.failed_cancels),
815 expected_rejects: Arc::clone(&self.expected_rejects),
816 idempotent_successes: Arc::clone(&self.idempotent_successes),
817 }
818 }
819
820 #[cfg(test)]
821 fn new_with_transports(
822 config: CancelBroadcasterConfig,
823 transports: Vec<TransportClient>,
824 ) -> Self {
825 Self {
826 config,
827 transports: Arc::new(transports),
828 health_check_task: Arc::new(RwLock::new(None)),
829 running: Arc::new(AtomicBool::new(false)),
830 total_cancels: Arc::new(AtomicU64::new(0)),
831 successful_cancels: Arc::new(AtomicU64::new(0)),
832 failed_cancels: Arc::new(AtomicU64::new(0)),
833 expected_rejects: Arc::new(AtomicU64::new(0)),
834 idempotent_successes: Arc::new(AtomicU64::new(0)),
835 }
836 }
837}
838
839#[derive(Debug, Clone)]
841pub struct BroadcasterMetrics {
842 pub total_cancels: u64,
843 pub successful_cancels: u64,
844 pub failed_cancels: u64,
845 pub expected_rejects: u64,
846 pub idempotent_successes: u64,
847 pub healthy_clients: usize,
848 pub total_clients: usize,
849}
850
851#[derive(Debug, Clone)]
853pub struct ClientStats {
854 pub client_id: String,
855 pub healthy: bool,
856 pub cancel_count: u64,
857 pub error_count: u64,
858}
859
860#[cfg(test)]
865mod tests {
866 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
867
868 use nautilus_core::UUID4;
869 use nautilus_model::{
870 enums::{
871 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
872 },
873 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
874 reports::OrderStatusReport,
875 types::{Price, Quantity},
876 };
877
878 use super::*;
879
880 #[derive(Clone)]
882 #[allow(clippy::type_complexity)]
883 struct MockExecutor {
884 handler: Arc<
885 dyn Fn(
886 InstrumentId,
887 Option<ClientOrderId>,
888 Option<VenueOrderId>,
889 )
890 -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
891 + Send
892 + Sync,
893 >,
894 }
895
896 impl MockExecutor {
897 fn new<F, Fut>(handler: F) -> Self
898 where
899 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
900 + Send
901 + Sync
902 + 'static,
903 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
904 {
905 Self {
906 handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
907 }
908 }
909 }
910
911 impl CancelExecutor for MockExecutor {
912 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
913 Box::pin(async { Ok(()) })
914 }
915
916 fn cancel_order(
917 &self,
918 instrument_id: InstrumentId,
919 client_order_id: Option<ClientOrderId>,
920 venue_order_id: Option<VenueOrderId>,
921 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
922 (self.handler)(instrument_id, client_order_id, venue_order_id)
923 }
924
925 fn cancel_orders(
926 &self,
927 _instrument_id: InstrumentId,
928 _client_order_ids: Option<Vec<ClientOrderId>>,
929 _venue_order_ids: Option<Vec<VenueOrderId>>,
930 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
931 {
932 Box::pin(async { Ok(Vec::new()) })
933 }
934
935 fn cancel_all_orders(
936 &self,
937 instrument_id: InstrumentId,
938 _order_side: Option<OrderSide>,
939 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
940 {
941 let handler = Arc::clone(&self.handler);
943 Box::pin(async move {
944 let result = handler(instrument_id, None, None).await;
946 match result {
947 Ok(_) => Ok(Vec::new()),
948 Err(e) => Err(e),
949 }
950 })
951 }
952
953 fn add_instrument(&self, _instrument: InstrumentAny) {
954 }
956 }
957
958 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
959 OrderStatusReport {
960 account_id: AccountId::from("BITMEX-001"),
961 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
962 venue_order_id: VenueOrderId::from(venue_order_id),
963 order_side: OrderSide::Buy,
964 order_type: OrderType::Limit,
965 time_in_force: TimeInForce::Gtc,
966 order_status: OrderStatus::Canceled,
967 price: Some(Price::new(50000.0, 2)),
968 quantity: Quantity::new(100.0, 0),
969 filled_qty: Quantity::new(0.0, 0),
970 report_id: UUID4::new(),
971 ts_accepted: 0.into(),
972 ts_last: 0.into(),
973 ts_init: 0.into(),
974 client_order_id: None,
975 avg_px: None,
976 trigger_price: None,
977 trigger_type: None,
978 contingency_type: ContingencyType::NoContingency,
979 expire_time: None,
980 order_list_id: None,
981 venue_position_id: None,
982 linked_order_ids: None,
983 parent_order_id: None,
984 display_qty: None,
985 limit_offset: None,
986 trailing_offset: None,
987 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
988 post_only: false,
989 reduce_only: false,
990 cancel_reason: None,
991 ts_triggered: None,
992 }
993 }
994
995 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
996 where
997 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
998 + Send
999 + Sync
1000 + 'static,
1001 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
1002 {
1003 let executor = MockExecutor::new(handler);
1004 TransportClient::new(executor, client_id.to_string())
1005 }
1006
1007 #[tokio::test]
1008 async fn test_broadcast_cancel_immediate_success() {
1009 let report = create_test_report("ORDER-1");
1010 let report_clone = report.clone();
1011
1012 let transports = vec![
1013 create_stub_transport("client-0", move |_, _, _| {
1014 let report = report_clone.clone();
1015 async move { Ok(report) }
1016 }),
1017 create_stub_transport("client-1", |_, _, _| async {
1018 tokio::time::sleep(Duration::from_secs(10)).await;
1019 anyhow::bail!("Should be aborted")
1020 }),
1021 ];
1022
1023 let config = CancelBroadcasterConfig::default();
1024 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1025
1026 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1027 let result = broadcaster
1028 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1029 .await;
1030
1031 assert!(result.is_ok());
1032 let returned_report = result.unwrap();
1033 assert!(returned_report.is_some());
1034 assert_eq!(
1035 returned_report.unwrap().venue_order_id,
1036 report.venue_order_id
1037 );
1038
1039 let metrics = broadcaster.get_metrics_async().await;
1040 assert_eq!(metrics.successful_cancels, 1);
1041 assert_eq!(metrics.failed_cancels, 0);
1042 assert_eq!(metrics.total_cancels, 1);
1043 }
1044
1045 #[tokio::test]
1046 async fn test_broadcast_cancel_idempotent_success() {
1047 let transports = vec![
1048 create_stub_transport("client-0", |_, _, _| async {
1049 anyhow::bail!("AlreadyCanceled")
1050 }),
1051 create_stub_transport("client-1", |_, _, _| async {
1052 tokio::time::sleep(Duration::from_secs(10)).await;
1053 anyhow::bail!("Should be aborted")
1054 }),
1055 ];
1056
1057 let config = CancelBroadcasterConfig::default();
1058 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1059
1060 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1061 let result = broadcaster
1062 .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1063 .await;
1064
1065 assert!(result.is_ok());
1066 assert!(result.unwrap().is_none());
1067
1068 let metrics = broadcaster.get_metrics_async().await;
1069 assert_eq!(metrics.idempotent_successes, 1);
1070 assert_eq!(metrics.successful_cancels, 0);
1071 assert_eq!(metrics.failed_cancels, 0);
1072 }
1073
1074 #[tokio::test]
1075 async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1076 let transports = vec![
1077 create_stub_transport("client-0", |_, _, _| async {
1078 anyhow::bail!("502 Bad Gateway")
1079 }),
1080 create_stub_transport("client-1", |_, _, _| async {
1081 anyhow::bail!("orderID not found")
1082 }),
1083 ];
1084
1085 let config = CancelBroadcasterConfig::default();
1086 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1087
1088 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1089 let result = broadcaster
1090 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1091 .await;
1092
1093 assert!(result.is_ok());
1094 assert!(result.unwrap().is_none());
1095
1096 let metrics = broadcaster.get_metrics_async().await;
1097 assert_eq!(metrics.idempotent_successes, 1);
1098 assert_eq!(metrics.failed_cancels, 0);
1099 }
1100
1101 #[tokio::test]
1102 async fn test_broadcast_cancel_all_failures() {
1103 let transports = vec![
1104 create_stub_transport("client-0", |_, _, _| async {
1105 anyhow::bail!("502 Bad Gateway")
1106 }),
1107 create_stub_transport("client-1", |_, _, _| async {
1108 anyhow::bail!("Connection refused")
1109 }),
1110 ];
1111
1112 let config = CancelBroadcasterConfig::default();
1113 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1114
1115 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1116 let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1117
1118 assert!(result.is_err());
1119 assert!(
1120 result
1121 .unwrap_err()
1122 .to_string()
1123 .contains("All cancel all requests failed")
1124 );
1125
1126 let metrics = broadcaster.get_metrics_async().await;
1127 assert_eq!(metrics.failed_cancels, 1);
1128 assert_eq!(metrics.successful_cancels, 0);
1129 assert_eq!(metrics.idempotent_successes, 0);
1130 }
1131
1132 #[tokio::test]
1133 async fn test_broadcast_cancel_no_healthy_clients() {
1134 let transport = create_stub_transport("client-0", |_, _, _| async {
1135 Ok(create_test_report("ORDER-1"))
1136 });
1137 transport.healthy.store(false, Ordering::Relaxed);
1138
1139 let config = CancelBroadcasterConfig::default();
1140 let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1141
1142 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1143 let result = broadcaster
1144 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1145 .await;
1146
1147 assert!(result.is_err());
1148 assert!(
1149 result
1150 .unwrap_err()
1151 .to_string()
1152 .contains("No healthy transport clients available")
1153 );
1154
1155 let metrics = broadcaster.get_metrics_async().await;
1156 assert_eq!(metrics.failed_cancels, 1);
1157 }
1158
1159 #[tokio::test]
1160 async fn test_broadcast_cancel_metrics_increment() {
1161 let report1 = create_test_report("ORDER-1");
1162 let report1_clone = report1.clone();
1163 let report2 = create_test_report("ORDER-2");
1164 let report2_clone = report2.clone();
1165
1166 let transports = vec![
1167 create_stub_transport("client-0", move |_, _, _| {
1168 let report = report1_clone.clone();
1169 async move { Ok(report) }
1170 }),
1171 create_stub_transport("client-1", move |_, _, _| {
1172 let report = report2_clone.clone();
1173 async move { Ok(report) }
1174 }),
1175 ];
1176
1177 let config = CancelBroadcasterConfig::default();
1178 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1179
1180 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1181
1182 let _ = broadcaster
1183 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1184 .await;
1185
1186 let _ = broadcaster
1187 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1188 .await;
1189
1190 let metrics = broadcaster.get_metrics_async().await;
1191 assert_eq!(metrics.total_cancels, 2);
1192 assert_eq!(metrics.successful_cancels, 2);
1193 }
1194
1195 #[tokio::test]
1196 async fn test_broadcast_cancel_expected_reject_pattern() {
1197 let transports = vec![
1198 create_stub_transport("client-0", |_, _, _| async {
1199 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1200 }),
1201 create_stub_transport("client-1", |_, _, _| async {
1202 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1203 }),
1204 ];
1205
1206 let config = CancelBroadcasterConfig::default();
1207 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1208
1209 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1210 let result = broadcaster
1211 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1212 .await;
1213
1214 assert!(result.is_err());
1215
1216 let metrics = broadcaster.get_metrics_async().await;
1217 assert_eq!(metrics.expected_rejects, 2);
1218 assert_eq!(metrics.failed_cancels, 1);
1219 }
1220
1221 #[tokio::test]
1222 async fn test_broadcaster_creation_with_pool() {
1223 let config = CancelBroadcasterConfig {
1224 pool_size: 3,
1225 api_key: Some("test_key".to_string()),
1226 api_secret: Some("test_secret".to_string()),
1227 base_url: Some("https://test.example.com".to_string()),
1228 testnet: false,
1229 timeout_secs: Some(5),
1230 max_retries: Some(2),
1231 retry_delay_ms: Some(100),
1232 retry_delay_max_ms: Some(1000),
1233 recv_window_ms: Some(5000),
1234 max_requests_per_second: Some(10),
1235 max_requests_per_minute: Some(100),
1236 health_check_interval_secs: 30,
1237 health_check_timeout_secs: 5,
1238 expected_reject_patterns: vec!["test_pattern".to_string()],
1239 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1240 proxy_urls: vec![],
1241 };
1242
1243 let broadcaster = CancelBroadcaster::new(config.clone());
1244 assert!(broadcaster.is_ok());
1245
1246 let broadcaster = broadcaster.unwrap();
1247 let metrics = broadcaster.get_metrics_async().await;
1248
1249 assert_eq!(metrics.total_clients, 3);
1250 assert_eq!(metrics.total_cancels, 0);
1251 assert_eq!(metrics.successful_cancels, 0);
1252 assert_eq!(metrics.failed_cancels, 0);
1253 }
1254
1255 #[tokio::test]
1256 async fn test_broadcaster_lifecycle() {
1257 let config = CancelBroadcasterConfig {
1258 pool_size: 2,
1259 api_key: Some("test_key".to_string()),
1260 api_secret: Some("test_secret".to_string()),
1261 base_url: Some("https://test.example.com".to_string()),
1262 testnet: false,
1263 timeout_secs: Some(5),
1264 max_retries: None,
1265 retry_delay_ms: None,
1266 retry_delay_max_ms: None,
1267 recv_window_ms: None,
1268 max_requests_per_second: None,
1269 max_requests_per_minute: None,
1270 health_check_interval_secs: 60, health_check_timeout_secs: 1,
1272 expected_reject_patterns: vec![],
1273 idempotent_success_patterns: vec![],
1274 proxy_urls: vec![],
1275 };
1276
1277 let broadcaster = CancelBroadcaster::new(config).unwrap();
1278
1279 assert!(!broadcaster.running.load(Ordering::Relaxed));
1281
1282 let start_result = broadcaster.start().await;
1284 assert!(start_result.is_ok());
1285 assert!(broadcaster.running.load(Ordering::Relaxed));
1286
1287 let start_again = broadcaster.start().await;
1289 assert!(start_again.is_ok());
1290
1291 broadcaster.stop().await;
1293 assert!(!broadcaster.running.load(Ordering::Relaxed));
1294
1295 broadcaster.stop().await;
1297 assert!(!broadcaster.running.load(Ordering::Relaxed));
1298 }
1299
1300 #[tokio::test]
1301 async fn test_client_stats_collection() {
1302 let config = CancelBroadcasterConfig {
1303 pool_size: 2,
1304 api_key: Some("test_key".to_string()),
1305 api_secret: Some("test_secret".to_string()),
1306 base_url: Some("https://test.example.com".to_string()),
1307 testnet: false,
1308 timeout_secs: Some(5),
1309 max_retries: None,
1310 retry_delay_ms: None,
1311 retry_delay_max_ms: None,
1312 recv_window_ms: None,
1313 max_requests_per_second: None,
1314 max_requests_per_minute: None,
1315 health_check_interval_secs: 60,
1316 health_check_timeout_secs: 5,
1317 expected_reject_patterns: vec![],
1318 idempotent_success_patterns: vec![],
1319 proxy_urls: vec![],
1320 };
1321
1322 let broadcaster = CancelBroadcaster::new(config).unwrap();
1323 let stats = broadcaster.get_client_stats_async().await;
1324
1325 assert_eq!(stats.len(), 2);
1326 assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1327 assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1328 assert!(stats[0].healthy); assert!(stats[1].healthy);
1330 assert_eq!(stats[0].cancel_count, 0);
1331 assert_eq!(stats[1].cancel_count, 0);
1332 assert_eq!(stats[0].error_count, 0);
1333 assert_eq!(stats[1].error_count, 0);
1334 }
1335
1336 #[tokio::test]
1337 async fn test_testnet_config_sets_base_url() {
1338 let config = CancelBroadcasterConfig {
1339 pool_size: 1,
1340 api_key: Some("test_key".to_string()),
1341 api_secret: Some("test_secret".to_string()),
1342 base_url: None, testnet: true, timeout_secs: Some(5),
1345 max_retries: None,
1346 retry_delay_ms: None,
1347 retry_delay_max_ms: None,
1348 recv_window_ms: None,
1349 max_requests_per_second: None,
1350 max_requests_per_minute: None,
1351 health_check_interval_secs: 60,
1352 health_check_timeout_secs: 5,
1353 expected_reject_patterns: vec![],
1354 idempotent_success_patterns: vec![],
1355 proxy_urls: vec![],
1356 };
1357
1358 let broadcaster = CancelBroadcaster::new(config);
1359 assert!(broadcaster.is_ok());
1360 }
1361
1362 #[tokio::test]
1363 async fn test_default_config() {
1364 let config = CancelBroadcasterConfig {
1365 api_key: Some("test_key".to_string()),
1366 api_secret: Some("test_secret".to_string()),
1367 base_url: Some("https://test.example.com".to_string()),
1368 ..Default::default()
1369 };
1370
1371 let broadcaster = CancelBroadcaster::new(config);
1372 assert!(broadcaster.is_ok());
1373
1374 let broadcaster = broadcaster.unwrap();
1375 let metrics = broadcaster.get_metrics_async().await;
1376
1377 assert_eq!(metrics.total_clients, 2);
1379 }
1380
1381 #[tokio::test]
1382 async fn test_clone_for_async() {
1383 let config = CancelBroadcasterConfig {
1384 pool_size: 1,
1385 api_key: Some("test_key".to_string()),
1386 api_secret: Some("test_secret".to_string()),
1387 base_url: Some("https://test.example.com".to_string()),
1388 testnet: false,
1389 timeout_secs: Some(5),
1390 max_retries: None,
1391 retry_delay_ms: None,
1392 retry_delay_max_ms: None,
1393 recv_window_ms: None,
1394 max_requests_per_second: None,
1395 max_requests_per_minute: None,
1396 health_check_interval_secs: 60,
1397 health_check_timeout_secs: 5,
1398 expected_reject_patterns: vec![],
1399 idempotent_success_patterns: vec![],
1400 proxy_urls: vec![],
1401 };
1402
1403 let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1404
1405 broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1407
1408 let broadcaster2 = broadcaster1.clone_for_async();
1410 let metrics2 = broadcaster2.get_metrics_async().await;
1411
1412 assert_eq!(metrics2.total_cancels, 1); broadcaster2
1416 .successful_cancels
1417 .fetch_add(5, Ordering::Relaxed);
1418
1419 let metrics1 = broadcaster1.get_metrics_async().await;
1421 assert_eq!(metrics1.successful_cancels, 5);
1422 }
1423
1424 #[tokio::test]
1425 async fn test_pattern_matching() {
1426 let config = CancelBroadcasterConfig {
1428 pool_size: 1,
1429 api_key: Some("test_key".to_string()),
1430 api_secret: Some("test_secret".to_string()),
1431 base_url: Some("https://test.example.com".to_string()),
1432 testnet: false,
1433 timeout_secs: Some(5),
1434 max_retries: None,
1435 retry_delay_ms: None,
1436 retry_delay_max_ms: None,
1437 recv_window_ms: None,
1438 max_requests_per_second: None,
1439 max_requests_per_minute: None,
1440 health_check_interval_secs: 60,
1441 health_check_timeout_secs: 5,
1442 expected_reject_patterns: vec![
1443 "ParticipateDoNotInitiate".to_string(),
1444 "Close-only".to_string(),
1445 ],
1446 idempotent_success_patterns: vec![
1447 "AlreadyCanceled".to_string(),
1448 "orderID not found".to_string(),
1449 "Unable to cancel".to_string(),
1450 ],
1451 proxy_urls: vec![],
1452 };
1453
1454 let broadcaster = CancelBroadcaster::new(config).unwrap();
1455
1456 assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1458 assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1459 assert!(!broadcaster.is_expected_reject("Connection timeout"));
1460
1461 assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1463 assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1464 assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1465 assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1466 }
1467
1468 #[tokio::test]
1472 async fn test_broadcast_batch_cancel_structure() {
1473 let config = CancelBroadcasterConfig {
1475 pool_size: 2,
1476 api_key: Some("test_key".to_string()),
1477 api_secret: Some("test_secret".to_string()),
1478 base_url: Some("https://test.example.com".to_string()),
1479 testnet: false,
1480 timeout_secs: Some(5),
1481 max_retries: None,
1482 retry_delay_ms: None,
1483 retry_delay_max_ms: None,
1484 recv_window_ms: None,
1485 max_requests_per_second: None,
1486 max_requests_per_minute: None,
1487 health_check_interval_secs: 60,
1488 health_check_timeout_secs: 5,
1489 expected_reject_patterns: vec![],
1490 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1491 proxy_urls: vec![],
1492 };
1493
1494 let broadcaster = CancelBroadcaster::new(config).unwrap();
1495 let metrics = broadcaster.get_metrics_async().await;
1496
1497 assert_eq!(metrics.total_clients, 2);
1499 assert_eq!(metrics.total_cancels, 0);
1500 assert_eq!(metrics.successful_cancels, 0);
1501 assert_eq!(metrics.failed_cancels, 0);
1502 }
1503
1504 #[tokio::test]
1505 async fn test_broadcast_cancel_all_structure() {
1506 let config = CancelBroadcasterConfig {
1508 pool_size: 3,
1509 api_key: Some("test_key".to_string()),
1510 api_secret: Some("test_secret".to_string()),
1511 base_url: Some("https://test.example.com".to_string()),
1512 testnet: false,
1513 timeout_secs: Some(5),
1514 max_retries: None,
1515 retry_delay_ms: None,
1516 retry_delay_max_ms: None,
1517 recv_window_ms: None,
1518 max_requests_per_second: None,
1519 max_requests_per_minute: None,
1520 health_check_interval_secs: 60,
1521 health_check_timeout_secs: 5,
1522 expected_reject_patterns: vec![],
1523 idempotent_success_patterns: vec!["orderID not found".to_string()],
1524 proxy_urls: vec![],
1525 };
1526
1527 let broadcaster = CancelBroadcaster::new(config).unwrap();
1528 let metrics = broadcaster.get_metrics_async().await;
1529
1530 assert_eq!(metrics.total_clients, 3);
1532 assert_eq!(metrics.healthy_clients, 3);
1533 assert_eq!(metrics.total_cancels, 0);
1534 }
1535
1536 #[tokio::test]
1538 async fn test_single_cancel_metrics_with_mixed_responses() {
1539 let transports = vec![
1542 create_stub_transport("client-0", |_, _, _| async {
1543 anyhow::bail!("Connection timeout")
1544 }),
1545 create_stub_transport("client-1", |_, _, _| async {
1546 anyhow::bail!("AlreadyCanceled")
1547 }),
1548 ];
1549
1550 let config = CancelBroadcasterConfig::default();
1551 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1552
1553 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1554 let result = broadcaster
1555 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1556 .await;
1557
1558 assert!(result.is_ok());
1560 assert!(result.unwrap().is_none());
1561
1562 let metrics = broadcaster.get_metrics_async().await;
1564 assert_eq!(
1565 metrics.failed_cancels, 0,
1566 "Idempotent success should not increment failed_cancels"
1567 );
1568 assert_eq!(metrics.idempotent_successes, 1);
1569 assert_eq!(metrics.successful_cancels, 0);
1570 }
1571
1572 #[tokio::test]
1573 async fn test_metrics_initialization_and_health() {
1574 let config = CancelBroadcasterConfig {
1576 pool_size: 4,
1577 api_key: Some("test_key".to_string()),
1578 api_secret: Some("test_secret".to_string()),
1579 base_url: Some("https://test.example.com".to_string()),
1580 testnet: false,
1581 timeout_secs: Some(5),
1582 max_retries: None,
1583 retry_delay_ms: None,
1584 retry_delay_max_ms: None,
1585 recv_window_ms: None,
1586 max_requests_per_second: None,
1587 max_requests_per_minute: None,
1588 health_check_interval_secs: 60,
1589 health_check_timeout_secs: 5,
1590 expected_reject_patterns: vec![],
1591 idempotent_success_patterns: vec![],
1592 proxy_urls: vec![],
1593 };
1594
1595 let broadcaster = CancelBroadcaster::new(config).unwrap();
1596 let metrics = broadcaster.get_metrics_async().await;
1597
1598 assert_eq!(metrics.total_cancels, 0);
1600 assert_eq!(metrics.successful_cancels, 0);
1601 assert_eq!(metrics.failed_cancels, 0);
1602 assert_eq!(metrics.expected_rejects, 0);
1603 assert_eq!(metrics.idempotent_successes, 0);
1604
1605 assert_eq!(metrics.healthy_clients, 4);
1607 assert_eq!(metrics.total_clients, 4);
1608 }
1609
1610 #[tokio::test]
1612 async fn test_health_check_task_lifecycle() {
1613 let config = CancelBroadcasterConfig {
1614 pool_size: 1,
1615 api_key: Some("test_key".to_string()),
1616 api_secret: Some("test_secret".to_string()),
1617 base_url: Some("https://test.example.com".to_string()),
1618 testnet: false,
1619 timeout_secs: Some(5),
1620 max_retries: None,
1621 retry_delay_ms: None,
1622 retry_delay_max_ms: None,
1623 recv_window_ms: None,
1624 max_requests_per_second: None,
1625 max_requests_per_minute: None,
1626 health_check_interval_secs: 1, health_check_timeout_secs: 1,
1628 expected_reject_patterns: vec![],
1629 idempotent_success_patterns: vec![],
1630 proxy_urls: vec![],
1631 };
1632
1633 let broadcaster = CancelBroadcaster::new(config).unwrap();
1634
1635 broadcaster.start().await.unwrap();
1637 assert!(broadcaster.running.load(Ordering::Relaxed));
1638
1639 {
1641 let task_guard = broadcaster.health_check_task.read().await;
1642 assert!(task_guard.is_some());
1643 }
1644
1645 tokio::time::sleep(Duration::from_millis(100)).await;
1647
1648 broadcaster.stop().await;
1650 assert!(!broadcaster.running.load(Ordering::Relaxed));
1651
1652 {
1654 let task_guard = broadcaster.health_check_task.read().await;
1655 assert!(task_guard.is_none());
1656 }
1657 }
1658}