1use std::{
34 fmt::Debug,
35 future::Future,
36 pin::Pin,
37 sync::{
38 Arc,
39 atomic::{AtomicBool, AtomicU64, Ordering},
40 },
41 time::Duration,
42};
43
44use futures_util::future;
45use nautilus_common::live::get_runtime;
46use nautilus_model::{
47 enums::OrderSide,
48 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
49 instruments::InstrumentAny,
50 reports::OrderStatusReport,
51};
52use tokio::{sync::RwLock, task::JoinHandle, time::interval};
53
54use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
55
56trait CancelExecutor: Send + Sync {
78 fn add_instrument(&self, instrument: InstrumentAny);
80
81 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
83
84 fn cancel_order(
86 &self,
87 instrument_id: InstrumentId,
88 client_order_id: Option<ClientOrderId>,
89 venue_order_id: Option<VenueOrderId>,
90 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
91
92 fn cancel_orders(
94 &self,
95 instrument_id: InstrumentId,
96 client_order_ids: Option<Vec<ClientOrderId>>,
97 venue_order_ids: Option<Vec<VenueOrderId>>,
98 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
99
100 fn cancel_all_orders(
102 &self,
103 instrument_id: InstrumentId,
104 order_side: Option<OrderSide>,
105 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
106}
107
108impl CancelExecutor for BitmexHttpClient {
109 fn add_instrument(&self, instrument: InstrumentAny) {
110 Self::cache_instrument(self, instrument);
111 }
112
113 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
114 Box::pin(async move {
115 Self::get_server_time(self)
116 .await
117 .map(|_| ())
118 .map_err(|e| anyhow::anyhow!("{e}"))
119 })
120 }
121
122 fn cancel_order(
123 &self,
124 instrument_id: InstrumentId,
125 client_order_id: Option<ClientOrderId>,
126 venue_order_id: Option<VenueOrderId>,
127 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
128 Box::pin(async move {
129 Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
130 })
131 }
132
133 fn cancel_orders(
134 &self,
135 instrument_id: InstrumentId,
136 client_order_ids: Option<Vec<ClientOrderId>>,
137 venue_order_ids: Option<Vec<VenueOrderId>>,
138 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
139 Box::pin(async move {
140 Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
141 })
142 }
143
144 fn cancel_all_orders(
145 &self,
146 instrument_id: InstrumentId,
147 order_side: Option<OrderSide>,
148 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
149 Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
150 }
151}
152
153#[derive(Debug, Clone)]
155pub struct CancelBroadcasterConfig {
156 pub pool_size: usize,
158 pub api_key: Option<String>,
160 pub api_secret: Option<String>,
162 pub base_url: Option<String>,
164 pub testnet: bool,
166 pub timeout_secs: Option<u64>,
168 pub max_retries: Option<u32>,
170 pub retry_delay_ms: Option<u64>,
172 pub retry_delay_max_ms: Option<u64>,
174 pub recv_window_ms: Option<u64>,
176 pub max_requests_per_second: Option<u32>,
178 pub max_requests_per_minute: Option<u32>,
180 pub health_check_interval_secs: u64,
182 pub health_check_timeout_secs: u64,
184 pub expected_reject_patterns: Vec<String>,
186 pub idempotent_success_patterns: Vec<String>,
188 pub proxy_urls: Vec<Option<String>>,
194}
195
196impl Default for CancelBroadcasterConfig {
197 fn default() -> Self {
198 Self {
199 pool_size: 2,
200 api_key: None,
201 api_secret: None,
202 base_url: None,
203 testnet: false,
204 timeout_secs: Some(60),
205 max_retries: None,
206 retry_delay_ms: Some(1_000),
207 retry_delay_max_ms: Some(5_000),
208 recv_window_ms: Some(10_000),
209 max_requests_per_second: Some(10),
210 max_requests_per_minute: Some(120),
211 health_check_interval_secs: 30,
212 health_check_timeout_secs: 5,
213 expected_reject_patterns: vec![
214 r"Order had execInst of ParticipateDoNotInitiate".to_string(),
215 ],
216 idempotent_success_patterns: vec![
217 r"AlreadyCanceled".to_string(),
218 r"orderID not found".to_string(),
219 r"Unable to cancel order due to existing state".to_string(),
220 ],
221 proxy_urls: vec![],
222 }
223 }
224}
225
226#[derive(Clone)]
228struct TransportClient {
229 executor: Arc<dyn CancelExecutor>,
234 client_id: String,
235 healthy: Arc<AtomicBool>,
236 cancel_count: Arc<AtomicU64>,
237 error_count: Arc<AtomicU64>,
238}
239
240impl Debug for TransportClient {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 f.debug_struct("TransportClient")
243 .field("client_id", &self.client_id)
244 .field("healthy", &self.healthy)
245 .field("cancel_count", &self.cancel_count)
246 .field("error_count", &self.error_count)
247 .finish()
248 }
249}
250
251impl TransportClient {
252 fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
253 Self {
254 executor: Arc::new(executor),
255 client_id,
256 healthy: Arc::new(AtomicBool::new(true)),
257 cancel_count: Arc::new(AtomicU64::new(0)),
258 error_count: Arc::new(AtomicU64::new(0)),
259 }
260 }
261
262 fn is_healthy(&self) -> bool {
263 self.healthy.load(Ordering::Relaxed)
264 }
265
266 fn mark_healthy(&self) {
267 self.healthy.store(true, Ordering::Relaxed);
268 }
269
270 fn mark_unhealthy(&self) {
271 self.healthy.store(false, Ordering::Relaxed);
272 }
273
274 fn get_cancel_count(&self) -> u64 {
275 self.cancel_count.load(Ordering::Relaxed)
276 }
277
278 fn get_error_count(&self) -> u64 {
279 self.error_count.load(Ordering::Relaxed)
280 }
281
282 async fn health_check(&self, timeout_secs: u64) -> bool {
283 match tokio::time::timeout(
284 Duration::from_secs(timeout_secs),
285 self.executor.health_check(),
286 )
287 .await
288 {
289 Ok(Ok(_)) => {
290 self.mark_healthy();
291 true
292 }
293 Ok(Err(e)) => {
294 tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
295 self.mark_unhealthy();
296 false
297 }
298 Err(_) => {
299 tracing::warn!("Health check timeout for client {}", self.client_id);
300 self.mark_unhealthy();
301 false
302 }
303 }
304 }
305
306 async fn cancel_order(
307 &self,
308 instrument_id: InstrumentId,
309 client_order_id: Option<ClientOrderId>,
310 venue_order_id: Option<VenueOrderId>,
311 ) -> anyhow::Result<OrderStatusReport> {
312 self.cancel_count.fetch_add(1, Ordering::Relaxed);
313
314 match self
315 .executor
316 .cancel_order(instrument_id, client_order_id, venue_order_id)
317 .await
318 {
319 Ok(report) => {
320 self.mark_healthy();
321 Ok(report)
322 }
323 Err(e) => {
324 self.error_count.fetch_add(1, Ordering::Relaxed);
325 Err(e)
326 }
327 }
328 }
329}
330
331#[cfg_attr(feature = "python", pyo3::pyclass)]
337#[derive(Debug)]
338pub struct CancelBroadcaster {
339 config: CancelBroadcasterConfig,
340 transports: Arc<Vec<TransportClient>>,
341 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
342 running: Arc<AtomicBool>,
343 total_cancels: Arc<AtomicU64>,
344 successful_cancels: Arc<AtomicU64>,
345 failed_cancels: Arc<AtomicU64>,
346 expected_rejects: Arc<AtomicU64>,
347 idempotent_successes: Arc<AtomicU64>,
348}
349
350impl CancelBroadcaster {
351 pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
357 let mut transports = Vec::with_capacity(config.pool_size);
358
359 let base_url = if config.testnet && config.base_url.is_none() {
361 Some(BITMEX_HTTP_TESTNET_URL.to_string())
362 } else {
363 config.base_url.clone()
364 };
365
366 for i in 0..config.pool_size {
367 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
369
370 let client = BitmexHttpClient::with_credentials(
371 config.api_key.clone(),
372 config.api_secret.clone(),
373 base_url.clone(),
374 config.timeout_secs,
375 config.max_retries,
376 config.retry_delay_ms,
377 config.retry_delay_max_ms,
378 config.recv_window_ms,
379 config.max_requests_per_second,
380 config.max_requests_per_minute,
381 proxy_url,
382 )
383 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
384
385 transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
386 }
387
388 Ok(Self {
389 config,
390 transports: Arc::new(transports),
391 health_check_task: Arc::new(RwLock::new(None)),
392 running: Arc::new(AtomicBool::new(false)),
393 total_cancels: Arc::new(AtomicU64::new(0)),
394 successful_cancels: Arc::new(AtomicU64::new(0)),
395 failed_cancels: Arc::new(AtomicU64::new(0)),
396 expected_rejects: Arc::new(AtomicU64::new(0)),
397 idempotent_successes: Arc::new(AtomicU64::new(0)),
398 })
399 }
400
401 pub async fn start(&self) -> anyhow::Result<()> {
407 if self.running.load(Ordering::Relaxed) {
408 return Ok(());
409 }
410
411 self.running.store(true, Ordering::Relaxed);
412
413 self.run_health_checks().await;
415
416 let transports = Arc::clone(&self.transports);
418 let running = Arc::clone(&self.running);
419 let interval_secs = self.config.health_check_interval_secs;
420 let timeout_secs = self.config.health_check_timeout_secs;
421
422 let task = get_runtime().spawn(async move {
423 let mut ticker = interval(Duration::from_secs(interval_secs));
424 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
425
426 loop {
427 ticker.tick().await;
428
429 if !running.load(Ordering::Relaxed) {
430 break;
431 }
432
433 let tasks: Vec<_> = transports
434 .iter()
435 .map(|t| t.health_check(timeout_secs))
436 .collect();
437
438 let results = future::join_all(tasks).await;
439 let healthy_count = results.iter().filter(|&&r| r).count();
440
441 tracing::debug!(
442 "Health check complete: {}/{} clients healthy",
443 healthy_count,
444 results.len()
445 );
446 }
447 });
448
449 *self.health_check_task.write().await = Some(task);
450
451 tracing::info!(
452 "CancelBroadcaster started with {} clients",
453 self.transports.len()
454 );
455
456 Ok(())
457 }
458
459 pub async fn stop(&self) {
461 if !self.running.load(Ordering::Relaxed) {
462 return;
463 }
464
465 self.running.store(false, Ordering::Relaxed);
466
467 if let Some(task) = self.health_check_task.write().await.take() {
468 task.abort();
469 }
470
471 tracing::info!("CancelBroadcaster stopped");
472 }
473
474 async fn run_health_checks(&self) {
475 let tasks: Vec<_> = self
476 .transports
477 .iter()
478 .map(|t| t.health_check(self.config.health_check_timeout_secs))
479 .collect();
480
481 let results = future::join_all(tasks).await;
482 let healthy_count = results.iter().filter(|&&r| r).count();
483
484 tracing::debug!(
485 "Health check complete: {}/{} clients healthy",
486 healthy_count,
487 results.len()
488 );
489 }
490
491 fn is_expected_reject(&self, error_message: &str) -> bool {
492 self.config
493 .expected_reject_patterns
494 .iter()
495 .any(|pattern| error_message.contains(pattern))
496 }
497
498 fn is_idempotent_success(&self, error_message: &str) -> bool {
499 self.config
500 .idempotent_success_patterns
501 .iter()
502 .any(|pattern| error_message.contains(pattern))
503 }
504
505 async fn process_cancel_results<T>(
509 &self,
510 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
511 idempotent_result: impl FnOnce() -> anyhow::Result<T>,
512 operation: &str,
513 params: String,
514 idempotent_reason: &str,
515 ) -> anyhow::Result<T>
516 where
517 T: Send + 'static,
518 {
519 let mut errors = Vec::new();
520
521 while !handles.is_empty() {
522 let current_handles = std::mem::take(&mut handles);
523 let (result, _idx, remaining) = future::select_all(current_handles).await;
524 handles = remaining.into_iter().collect();
525
526 match result {
527 Ok((client_id, Ok(result))) => {
528 for handle in &handles {
530 handle.abort();
531 }
532 self.successful_cancels.fetch_add(1, Ordering::Relaxed);
533
534 tracing::debug!(
535 "{} broadcast succeeded [{}] {}",
536 operation,
537 client_id,
538 params
539 );
540
541 return Ok(result);
542 }
543 Ok((client_id, Err(e))) => {
544 let error_msg = e.to_string();
545
546 if self.is_idempotent_success(&error_msg) {
547 for handle in &handles {
549 handle.abort();
550 }
551 self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
552
553 tracing::debug!(
554 "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
555 );
556
557 return idempotent_result();
558 }
559
560 if self.is_expected_reject(&error_msg) {
561 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
562 tracing::debug!(
563 "Expected {} rejection [{}]: {} {}",
564 operation.to_lowercase(),
565 client_id,
566 error_msg,
567 params
568 );
569 errors.push(error_msg);
570 } else {
571 tracing::warn!(
572 "{} request failed [{}]: {} {}",
573 operation,
574 client_id,
575 error_msg,
576 params
577 );
578 errors.push(error_msg);
579 }
580 }
581 Err(e) => {
582 tracing::warn!("{} task join error: {e:?}", operation);
583 errors.push(format!("Task panicked: {e:?}"));
584 }
585 }
586 }
587
588 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
590 tracing::error!(
591 "All {} requests failed: {errors:?} {params}",
592 operation.to_lowercase(),
593 );
594 Err(anyhow::anyhow!(
595 "All {} requests failed: {errors:?}",
596 operation.to_lowercase(),
597 ))
598 }
599
600 pub async fn broadcast_cancel(
612 &self,
613 instrument_id: InstrumentId,
614 client_order_id: Option<ClientOrderId>,
615 venue_order_id: Option<VenueOrderId>,
616 ) -> anyhow::Result<Option<OrderStatusReport>> {
617 self.total_cancels.fetch_add(1, Ordering::Relaxed);
618
619 let healthy_transports: Vec<TransportClient> = self
620 .transports
621 .iter()
622 .filter(|t| t.is_healthy())
623 .cloned()
624 .collect();
625
626 if healthy_transports.is_empty() {
627 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
628 anyhow::bail!("No healthy transport clients available");
629 }
630
631 let mut handles = Vec::new();
632 for transport in healthy_transports {
633 let handle = get_runtime().spawn(async move {
634 let client_id = transport.client_id.clone();
635 let result = transport
636 .cancel_order(instrument_id, client_order_id, venue_order_id)
637 .await
638 .map(Some); (client_id, result)
640 });
641 handles.push(handle);
642 }
643
644 self.process_cancel_results(
645 handles,
646 || Ok(None),
647 "Cancel",
648 format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
649 "order already cancelled/not found",
650 )
651 .await
652 }
653
654 pub async fn broadcast_batch_cancel(
660 &self,
661 instrument_id: InstrumentId,
662 client_order_ids: Option<Vec<ClientOrderId>>,
663 venue_order_ids: Option<Vec<VenueOrderId>>,
664 ) -> anyhow::Result<Vec<OrderStatusReport>> {
665 self.total_cancels.fetch_add(1, Ordering::Relaxed);
666
667 let healthy_transports: Vec<TransportClient> = self
668 .transports
669 .iter()
670 .filter(|t| t.is_healthy())
671 .cloned()
672 .collect();
673
674 if healthy_transports.is_empty() {
675 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
676 anyhow::bail!("No healthy transport clients available");
677 }
678
679 let mut handles = Vec::new();
680
681 for transport in healthy_transports {
682 let client_order_ids_clone = client_order_ids.clone();
683 let venue_order_ids_clone = venue_order_ids.clone();
684 let handle = get_runtime().spawn(async move {
685 let client_id = transport.client_id.clone();
686 let result = transport
687 .executor
688 .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
689 .await;
690 (client_id, result)
691 });
692 handles.push(handle);
693 }
694
695 self.process_cancel_results(
696 handles,
697 || Ok(Vec::new()),
698 "Batch cancel",
699 format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
700 "orders already cancelled/not found",
701 )
702 .await
703 }
704
705 pub async fn broadcast_cancel_all(
711 &self,
712 instrument_id: InstrumentId,
713 order_side: Option<OrderSide>,
714 ) -> anyhow::Result<Vec<OrderStatusReport>> {
715 self.total_cancels.fetch_add(1, Ordering::Relaxed);
716
717 let healthy_transports: Vec<TransportClient> = self
718 .transports
719 .iter()
720 .filter(|t| t.is_healthy())
721 .cloned()
722 .collect();
723
724 if healthy_transports.is_empty() {
725 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
726 anyhow::bail!("No healthy transport clients available");
727 }
728
729 let mut handles = Vec::new();
730 for transport in healthy_transports {
731 let handle = get_runtime().spawn(async move {
732 let client_id = transport.client_id.clone();
733 let result = transport
734 .executor
735 .cancel_all_orders(instrument_id, order_side)
736 .await;
737 (client_id, result)
738 });
739 handles.push(handle);
740 }
741
742 self.process_cancel_results(
743 handles,
744 || Ok(Vec::new()),
745 "Cancel all",
746 format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
747 "no orders to cancel",
748 )
749 .await
750 }
751
752 pub fn get_metrics(&self) -> BroadcasterMetrics {
754 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
755 let total_clients = self.transports.len();
756
757 BroadcasterMetrics {
758 total_cancels: self.total_cancels.load(Ordering::Relaxed),
759 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
760 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
761 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
762 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
763 healthy_clients,
764 total_clients,
765 }
766 }
767
768 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
770 self.get_metrics()
771 }
772
773 pub fn get_client_stats(&self) -> Vec<ClientStats> {
775 self.transports
776 .iter()
777 .map(|t| ClientStats {
778 client_id: t.client_id.clone(),
779 healthy: t.is_healthy(),
780 cancel_count: t.get_cancel_count(),
781 error_count: t.get_error_count(),
782 })
783 .collect()
784 }
785
786 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
788 self.get_client_stats()
789 }
790
791 pub fn cache_instrument(&self, instrument: InstrumentAny) {
793 for transport in self.transports.iter() {
794 transport.executor.add_instrument(instrument.clone());
795 }
796 }
797
798 #[must_use]
799 pub fn clone_for_async(&self) -> Self {
800 Self {
801 config: self.config.clone(),
802 transports: Arc::clone(&self.transports),
803 health_check_task: Arc::clone(&self.health_check_task),
804 running: Arc::clone(&self.running),
805 total_cancels: Arc::clone(&self.total_cancels),
806 successful_cancels: Arc::clone(&self.successful_cancels),
807 failed_cancels: Arc::clone(&self.failed_cancels),
808 expected_rejects: Arc::clone(&self.expected_rejects),
809 idempotent_successes: Arc::clone(&self.idempotent_successes),
810 }
811 }
812
813 #[cfg(test)]
814 fn new_with_transports(
815 config: CancelBroadcasterConfig,
816 transports: Vec<TransportClient>,
817 ) -> Self {
818 Self {
819 config,
820 transports: Arc::new(transports),
821 health_check_task: Arc::new(RwLock::new(None)),
822 running: Arc::new(AtomicBool::new(false)),
823 total_cancels: Arc::new(AtomicU64::new(0)),
824 successful_cancels: Arc::new(AtomicU64::new(0)),
825 failed_cancels: Arc::new(AtomicU64::new(0)),
826 expected_rejects: Arc::new(AtomicU64::new(0)),
827 idempotent_successes: Arc::new(AtomicU64::new(0)),
828 }
829 }
830}
831
832#[derive(Debug, Clone)]
834pub struct BroadcasterMetrics {
835 pub total_cancels: u64,
836 pub successful_cancels: u64,
837 pub failed_cancels: u64,
838 pub expected_rejects: u64,
839 pub idempotent_successes: u64,
840 pub healthy_clients: usize,
841 pub total_clients: usize,
842}
843
844#[derive(Debug, Clone)]
846pub struct ClientStats {
847 pub client_id: String,
848 pub healthy: bool,
849 pub cancel_count: u64,
850 pub error_count: u64,
851}
852
853#[cfg(test)]
854mod tests {
855 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
856
857 use nautilus_core::UUID4;
858 use nautilus_model::{
859 enums::{
860 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
861 },
862 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
863 reports::OrderStatusReport,
864 types::{Price, Quantity},
865 };
866
867 use super::*;
868
869 #[derive(Clone)]
871 #[allow(clippy::type_complexity)]
872 struct MockExecutor {
873 handler: Arc<
874 dyn Fn(
875 InstrumentId,
876 Option<ClientOrderId>,
877 Option<VenueOrderId>,
878 )
879 -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
880 + Send
881 + Sync,
882 >,
883 }
884
885 impl MockExecutor {
886 fn new<F, Fut>(handler: F) -> Self
887 where
888 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
889 + Send
890 + Sync
891 + 'static,
892 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
893 {
894 Self {
895 handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
896 }
897 }
898 }
899
900 impl CancelExecutor for MockExecutor {
901 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
902 Box::pin(async { Ok(()) })
903 }
904
905 fn cancel_order(
906 &self,
907 instrument_id: InstrumentId,
908 client_order_id: Option<ClientOrderId>,
909 venue_order_id: Option<VenueOrderId>,
910 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
911 (self.handler)(instrument_id, client_order_id, venue_order_id)
912 }
913
914 fn cancel_orders(
915 &self,
916 _instrument_id: InstrumentId,
917 _client_order_ids: Option<Vec<ClientOrderId>>,
918 _venue_order_ids: Option<Vec<VenueOrderId>>,
919 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
920 {
921 Box::pin(async { Ok(Vec::new()) })
922 }
923
924 fn cancel_all_orders(
925 &self,
926 instrument_id: InstrumentId,
927 _order_side: Option<OrderSide>,
928 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
929 {
930 let handler = Arc::clone(&self.handler);
932 Box::pin(async move {
933 let result = handler(instrument_id, None, None).await;
935 match result {
936 Ok(_) => Ok(Vec::new()),
937 Err(e) => Err(e),
938 }
939 })
940 }
941
942 fn add_instrument(&self, _instrument: InstrumentAny) {
943 }
945 }
946
947 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
948 OrderStatusReport {
949 account_id: AccountId::from("BITMEX-001"),
950 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
951 venue_order_id: VenueOrderId::from(venue_order_id),
952 order_side: OrderSide::Buy,
953 order_type: OrderType::Limit,
954 time_in_force: TimeInForce::Gtc,
955 order_status: OrderStatus::Canceled,
956 price: Some(Price::new(50000.0, 2)),
957 quantity: Quantity::new(100.0, 0),
958 filled_qty: Quantity::new(0.0, 0),
959 report_id: UUID4::new(),
960 ts_accepted: 0.into(),
961 ts_last: 0.into(),
962 ts_init: 0.into(),
963 client_order_id: None,
964 avg_px: None,
965 trigger_price: None,
966 trigger_type: None,
967 contingency_type: ContingencyType::NoContingency,
968 expire_time: None,
969 order_list_id: None,
970 venue_position_id: None,
971 linked_order_ids: None,
972 parent_order_id: None,
973 display_qty: None,
974 limit_offset: None,
975 trailing_offset: None,
976 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
977 post_only: false,
978 reduce_only: false,
979 cancel_reason: None,
980 ts_triggered: None,
981 }
982 }
983
984 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
985 where
986 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
987 + Send
988 + Sync
989 + 'static,
990 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
991 {
992 let executor = MockExecutor::new(handler);
993 TransportClient::new(executor, client_id.to_string())
994 }
995
996 #[tokio::test]
997 async fn test_broadcast_cancel_immediate_success() {
998 let report = create_test_report("ORDER-1");
999 let report_clone = report.clone();
1000
1001 let transports = vec![
1002 create_stub_transport("client-0", move |_, _, _| {
1003 let report = report_clone.clone();
1004 async move { Ok(report) }
1005 }),
1006 create_stub_transport("client-1", |_, _, _| async {
1007 tokio::time::sleep(Duration::from_secs(10)).await;
1008 anyhow::bail!("Should be aborted")
1009 }),
1010 ];
1011
1012 let config = CancelBroadcasterConfig::default();
1013 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1014
1015 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1016 let result = broadcaster
1017 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1018 .await;
1019
1020 assert!(result.is_ok());
1021 let returned_report = result.unwrap();
1022 assert!(returned_report.is_some());
1023 assert_eq!(
1024 returned_report.unwrap().venue_order_id,
1025 report.venue_order_id
1026 );
1027
1028 let metrics = broadcaster.get_metrics_async().await;
1029 assert_eq!(metrics.successful_cancels, 1);
1030 assert_eq!(metrics.failed_cancels, 0);
1031 assert_eq!(metrics.total_cancels, 1);
1032 }
1033
1034 #[tokio::test]
1035 async fn test_broadcast_cancel_idempotent_success() {
1036 let transports = vec![
1037 create_stub_transport("client-0", |_, _, _| async {
1038 anyhow::bail!("AlreadyCanceled")
1039 }),
1040 create_stub_transport("client-1", |_, _, _| async {
1041 tokio::time::sleep(Duration::from_secs(10)).await;
1042 anyhow::bail!("Should be aborted")
1043 }),
1044 ];
1045
1046 let config = CancelBroadcasterConfig::default();
1047 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1048
1049 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1050 let result = broadcaster
1051 .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1052 .await;
1053
1054 assert!(result.is_ok());
1055 assert!(result.unwrap().is_none());
1056
1057 let metrics = broadcaster.get_metrics_async().await;
1058 assert_eq!(metrics.idempotent_successes, 1);
1059 assert_eq!(metrics.successful_cancels, 0);
1060 assert_eq!(metrics.failed_cancels, 0);
1061 }
1062
1063 #[tokio::test]
1064 async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1065 let transports = vec![
1066 create_stub_transport("client-0", |_, _, _| async {
1067 anyhow::bail!("502 Bad Gateway")
1068 }),
1069 create_stub_transport("client-1", |_, _, _| async {
1070 anyhow::bail!("orderID not found")
1071 }),
1072 ];
1073
1074 let config = CancelBroadcasterConfig::default();
1075 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1076
1077 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1078 let result = broadcaster
1079 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1080 .await;
1081
1082 assert!(result.is_ok());
1083 assert!(result.unwrap().is_none());
1084
1085 let metrics = broadcaster.get_metrics_async().await;
1086 assert_eq!(metrics.idempotent_successes, 1);
1087 assert_eq!(metrics.failed_cancels, 0);
1088 }
1089
1090 #[tokio::test]
1091 async fn test_broadcast_cancel_all_failures() {
1092 let transports = vec![
1093 create_stub_transport("client-0", |_, _, _| async {
1094 anyhow::bail!("502 Bad Gateway")
1095 }),
1096 create_stub_transport("client-1", |_, _, _| async {
1097 anyhow::bail!("Connection refused")
1098 }),
1099 ];
1100
1101 let config = CancelBroadcasterConfig::default();
1102 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1103
1104 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1105 let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1106
1107 assert!(result.is_err());
1108 assert!(
1109 result
1110 .unwrap_err()
1111 .to_string()
1112 .contains("All cancel all requests failed")
1113 );
1114
1115 let metrics = broadcaster.get_metrics_async().await;
1116 assert_eq!(metrics.failed_cancels, 1);
1117 assert_eq!(metrics.successful_cancels, 0);
1118 assert_eq!(metrics.idempotent_successes, 0);
1119 }
1120
1121 #[tokio::test]
1122 async fn test_broadcast_cancel_no_healthy_clients() {
1123 let transport = create_stub_transport("client-0", |_, _, _| async {
1124 Ok(create_test_report("ORDER-1"))
1125 });
1126 transport.healthy.store(false, Ordering::Relaxed);
1127
1128 let config = CancelBroadcasterConfig::default();
1129 let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1130
1131 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1132 let result = broadcaster
1133 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1134 .await;
1135
1136 assert!(result.is_err());
1137 assert!(
1138 result
1139 .unwrap_err()
1140 .to_string()
1141 .contains("No healthy transport clients available")
1142 );
1143
1144 let metrics = broadcaster.get_metrics_async().await;
1145 assert_eq!(metrics.failed_cancels, 1);
1146 }
1147
1148 #[tokio::test]
1149 async fn test_broadcast_cancel_metrics_increment() {
1150 let report1 = create_test_report("ORDER-1");
1151 let report1_clone = report1.clone();
1152 let report2 = create_test_report("ORDER-2");
1153 let report2_clone = report2.clone();
1154
1155 let transports = vec![
1156 create_stub_transport("client-0", move |_, _, _| {
1157 let report = report1_clone.clone();
1158 async move { Ok(report) }
1159 }),
1160 create_stub_transport("client-1", move |_, _, _| {
1161 let report = report2_clone.clone();
1162 async move { Ok(report) }
1163 }),
1164 ];
1165
1166 let config = CancelBroadcasterConfig::default();
1167 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1168
1169 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1170
1171 let _ = broadcaster
1172 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1173 .await;
1174
1175 let _ = broadcaster
1176 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1177 .await;
1178
1179 let metrics = broadcaster.get_metrics_async().await;
1180 assert_eq!(metrics.total_cancels, 2);
1181 assert_eq!(metrics.successful_cancels, 2);
1182 }
1183
1184 #[tokio::test]
1185 async fn test_broadcast_cancel_expected_reject_pattern() {
1186 let transports = vec![
1187 create_stub_transport("client-0", |_, _, _| async {
1188 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1189 }),
1190 create_stub_transport("client-1", |_, _, _| async {
1191 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1192 }),
1193 ];
1194
1195 let config = CancelBroadcasterConfig::default();
1196 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1197
1198 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1199 let result = broadcaster
1200 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1201 .await;
1202
1203 assert!(result.is_err());
1204
1205 let metrics = broadcaster.get_metrics_async().await;
1206 assert_eq!(metrics.expected_rejects, 2);
1207 assert_eq!(metrics.failed_cancels, 1);
1208 }
1209
1210 #[tokio::test]
1211 async fn test_broadcaster_creation_with_pool() {
1212 let config = CancelBroadcasterConfig {
1213 pool_size: 3,
1214 api_key: Some("test_key".to_string()),
1215 api_secret: Some("test_secret".to_string()),
1216 base_url: Some("https://test.example.com".to_string()),
1217 testnet: false,
1218 timeout_secs: Some(5),
1219 max_retries: Some(2),
1220 retry_delay_ms: Some(100),
1221 retry_delay_max_ms: Some(1000),
1222 recv_window_ms: Some(5000),
1223 max_requests_per_second: Some(10),
1224 max_requests_per_minute: Some(100),
1225 health_check_interval_secs: 30,
1226 health_check_timeout_secs: 5,
1227 expected_reject_patterns: vec!["test_pattern".to_string()],
1228 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1229 proxy_urls: vec![],
1230 };
1231
1232 let broadcaster = CancelBroadcaster::new(config.clone());
1233 assert!(broadcaster.is_ok());
1234
1235 let broadcaster = broadcaster.unwrap();
1236 let metrics = broadcaster.get_metrics_async().await;
1237
1238 assert_eq!(metrics.total_clients, 3);
1239 assert_eq!(metrics.total_cancels, 0);
1240 assert_eq!(metrics.successful_cancels, 0);
1241 assert_eq!(metrics.failed_cancels, 0);
1242 }
1243
1244 #[tokio::test]
1245 async fn test_broadcaster_lifecycle() {
1246 let config = CancelBroadcasterConfig {
1247 pool_size: 2,
1248 api_key: Some("test_key".to_string()),
1249 api_secret: Some("test_secret".to_string()),
1250 base_url: Some("https://test.example.com".to_string()),
1251 testnet: false,
1252 timeout_secs: Some(5),
1253 max_retries: None,
1254 retry_delay_ms: None,
1255 retry_delay_max_ms: None,
1256 recv_window_ms: None,
1257 max_requests_per_second: None,
1258 max_requests_per_minute: None,
1259 health_check_interval_secs: 60, health_check_timeout_secs: 1,
1261 expected_reject_patterns: vec![],
1262 idempotent_success_patterns: vec![],
1263 proxy_urls: vec![],
1264 };
1265
1266 let broadcaster = CancelBroadcaster::new(config).unwrap();
1267
1268 assert!(!broadcaster.running.load(Ordering::Relaxed));
1270
1271 let start_result = broadcaster.start().await;
1273 assert!(start_result.is_ok());
1274 assert!(broadcaster.running.load(Ordering::Relaxed));
1275
1276 let start_again = broadcaster.start().await;
1278 assert!(start_again.is_ok());
1279
1280 broadcaster.stop().await;
1282 assert!(!broadcaster.running.load(Ordering::Relaxed));
1283
1284 broadcaster.stop().await;
1286 assert!(!broadcaster.running.load(Ordering::Relaxed));
1287 }
1288
1289 #[tokio::test]
1290 async fn test_client_stats_collection() {
1291 let config = CancelBroadcasterConfig {
1292 pool_size: 2,
1293 api_key: Some("test_key".to_string()),
1294 api_secret: Some("test_secret".to_string()),
1295 base_url: Some("https://test.example.com".to_string()),
1296 testnet: false,
1297 timeout_secs: Some(5),
1298 max_retries: None,
1299 retry_delay_ms: None,
1300 retry_delay_max_ms: None,
1301 recv_window_ms: None,
1302 max_requests_per_second: None,
1303 max_requests_per_minute: None,
1304 health_check_interval_secs: 60,
1305 health_check_timeout_secs: 5,
1306 expected_reject_patterns: vec![],
1307 idempotent_success_patterns: vec![],
1308 proxy_urls: vec![],
1309 };
1310
1311 let broadcaster = CancelBroadcaster::new(config).unwrap();
1312 let stats = broadcaster.get_client_stats_async().await;
1313
1314 assert_eq!(stats.len(), 2);
1315 assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1316 assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1317 assert!(stats[0].healthy); assert!(stats[1].healthy);
1319 assert_eq!(stats[0].cancel_count, 0);
1320 assert_eq!(stats[1].cancel_count, 0);
1321 assert_eq!(stats[0].error_count, 0);
1322 assert_eq!(stats[1].error_count, 0);
1323 }
1324
1325 #[tokio::test]
1326 async fn test_testnet_config_sets_base_url() {
1327 let config = CancelBroadcasterConfig {
1328 pool_size: 1,
1329 api_key: Some("test_key".to_string()),
1330 api_secret: Some("test_secret".to_string()),
1331 base_url: None, testnet: true, timeout_secs: Some(5),
1334 max_retries: None,
1335 retry_delay_ms: None,
1336 retry_delay_max_ms: None,
1337 recv_window_ms: None,
1338 max_requests_per_second: None,
1339 max_requests_per_minute: None,
1340 health_check_interval_secs: 60,
1341 health_check_timeout_secs: 5,
1342 expected_reject_patterns: vec![],
1343 idempotent_success_patterns: vec![],
1344 proxy_urls: vec![],
1345 };
1346
1347 let broadcaster = CancelBroadcaster::new(config);
1348 assert!(broadcaster.is_ok());
1349 }
1350
1351 #[tokio::test]
1352 async fn test_default_config() {
1353 let config = CancelBroadcasterConfig {
1354 api_key: Some("test_key".to_string()),
1355 api_secret: Some("test_secret".to_string()),
1356 base_url: Some("https://test.example.com".to_string()),
1357 ..Default::default()
1358 };
1359
1360 let broadcaster = CancelBroadcaster::new(config);
1361 assert!(broadcaster.is_ok());
1362
1363 let broadcaster = broadcaster.unwrap();
1364 let metrics = broadcaster.get_metrics_async().await;
1365
1366 assert_eq!(metrics.total_clients, 2);
1368 }
1369
1370 #[tokio::test]
1371 async fn test_clone_for_async() {
1372 let config = CancelBroadcasterConfig {
1373 pool_size: 1,
1374 api_key: Some("test_key".to_string()),
1375 api_secret: Some("test_secret".to_string()),
1376 base_url: Some("https://test.example.com".to_string()),
1377 testnet: false,
1378 timeout_secs: Some(5),
1379 max_retries: None,
1380 retry_delay_ms: None,
1381 retry_delay_max_ms: None,
1382 recv_window_ms: None,
1383 max_requests_per_second: None,
1384 max_requests_per_minute: None,
1385 health_check_interval_secs: 60,
1386 health_check_timeout_secs: 5,
1387 expected_reject_patterns: vec![],
1388 idempotent_success_patterns: vec![],
1389 proxy_urls: vec![],
1390 };
1391
1392 let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1393
1394 broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1396
1397 let broadcaster2 = broadcaster1.clone_for_async();
1399 let metrics2 = broadcaster2.get_metrics_async().await;
1400
1401 assert_eq!(metrics2.total_cancels, 1); broadcaster2
1405 .successful_cancels
1406 .fetch_add(5, Ordering::Relaxed);
1407
1408 let metrics1 = broadcaster1.get_metrics_async().await;
1410 assert_eq!(metrics1.successful_cancels, 5);
1411 }
1412
1413 #[tokio::test]
1414 async fn test_pattern_matching() {
1415 let config = CancelBroadcasterConfig {
1417 pool_size: 1,
1418 api_key: Some("test_key".to_string()),
1419 api_secret: Some("test_secret".to_string()),
1420 base_url: Some("https://test.example.com".to_string()),
1421 testnet: false,
1422 timeout_secs: Some(5),
1423 max_retries: None,
1424 retry_delay_ms: None,
1425 retry_delay_max_ms: None,
1426 recv_window_ms: None,
1427 max_requests_per_second: None,
1428 max_requests_per_minute: None,
1429 health_check_interval_secs: 60,
1430 health_check_timeout_secs: 5,
1431 expected_reject_patterns: vec![
1432 "ParticipateDoNotInitiate".to_string(),
1433 "Close-only".to_string(),
1434 ],
1435 idempotent_success_patterns: vec![
1436 "AlreadyCanceled".to_string(),
1437 "orderID not found".to_string(),
1438 "Unable to cancel".to_string(),
1439 ],
1440 proxy_urls: vec![],
1441 };
1442
1443 let broadcaster = CancelBroadcaster::new(config).unwrap();
1444
1445 assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1447 assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1448 assert!(!broadcaster.is_expected_reject("Connection timeout"));
1449
1450 assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1452 assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1453 assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1454 assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1455 }
1456
1457 #[tokio::test]
1461 async fn test_broadcast_batch_cancel_structure() {
1462 let config = CancelBroadcasterConfig {
1464 pool_size: 2,
1465 api_key: Some("test_key".to_string()),
1466 api_secret: Some("test_secret".to_string()),
1467 base_url: Some("https://test.example.com".to_string()),
1468 testnet: false,
1469 timeout_secs: Some(5),
1470 max_retries: None,
1471 retry_delay_ms: None,
1472 retry_delay_max_ms: None,
1473 recv_window_ms: None,
1474 max_requests_per_second: None,
1475 max_requests_per_minute: None,
1476 health_check_interval_secs: 60,
1477 health_check_timeout_secs: 5,
1478 expected_reject_patterns: vec![],
1479 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1480 proxy_urls: vec![],
1481 };
1482
1483 let broadcaster = CancelBroadcaster::new(config).unwrap();
1484 let metrics = broadcaster.get_metrics_async().await;
1485
1486 assert_eq!(metrics.total_clients, 2);
1488 assert_eq!(metrics.total_cancels, 0);
1489 assert_eq!(metrics.successful_cancels, 0);
1490 assert_eq!(metrics.failed_cancels, 0);
1491 }
1492
1493 #[tokio::test]
1494 async fn test_broadcast_cancel_all_structure() {
1495 let config = CancelBroadcasterConfig {
1497 pool_size: 3,
1498 api_key: Some("test_key".to_string()),
1499 api_secret: Some("test_secret".to_string()),
1500 base_url: Some("https://test.example.com".to_string()),
1501 testnet: false,
1502 timeout_secs: Some(5),
1503 max_retries: None,
1504 retry_delay_ms: None,
1505 retry_delay_max_ms: None,
1506 recv_window_ms: None,
1507 max_requests_per_second: None,
1508 max_requests_per_minute: None,
1509 health_check_interval_secs: 60,
1510 health_check_timeout_secs: 5,
1511 expected_reject_patterns: vec![],
1512 idempotent_success_patterns: vec!["orderID not found".to_string()],
1513 proxy_urls: vec![],
1514 };
1515
1516 let broadcaster = CancelBroadcaster::new(config).unwrap();
1517 let metrics = broadcaster.get_metrics_async().await;
1518
1519 assert_eq!(metrics.total_clients, 3);
1521 assert_eq!(metrics.healthy_clients, 3);
1522 assert_eq!(metrics.total_cancels, 0);
1523 }
1524
1525 #[tokio::test]
1527 async fn test_single_cancel_metrics_with_mixed_responses() {
1528 let transports = vec![
1531 create_stub_transport("client-0", |_, _, _| async {
1532 anyhow::bail!("Connection timeout")
1533 }),
1534 create_stub_transport("client-1", |_, _, _| async {
1535 anyhow::bail!("AlreadyCanceled")
1536 }),
1537 ];
1538
1539 let config = CancelBroadcasterConfig::default();
1540 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1541
1542 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1543 let result = broadcaster
1544 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1545 .await;
1546
1547 assert!(result.is_ok());
1549 assert!(result.unwrap().is_none());
1550
1551 let metrics = broadcaster.get_metrics_async().await;
1553 assert_eq!(
1554 metrics.failed_cancels, 0,
1555 "Idempotent success should not increment failed_cancels"
1556 );
1557 assert_eq!(metrics.idempotent_successes, 1);
1558 assert_eq!(metrics.successful_cancels, 0);
1559 }
1560
1561 #[tokio::test]
1562 async fn test_metrics_initialization_and_health() {
1563 let config = CancelBroadcasterConfig {
1565 pool_size: 4,
1566 api_key: Some("test_key".to_string()),
1567 api_secret: Some("test_secret".to_string()),
1568 base_url: Some("https://test.example.com".to_string()),
1569 testnet: false,
1570 timeout_secs: Some(5),
1571 max_retries: None,
1572 retry_delay_ms: None,
1573 retry_delay_max_ms: None,
1574 recv_window_ms: None,
1575 max_requests_per_second: None,
1576 max_requests_per_minute: None,
1577 health_check_interval_secs: 60,
1578 health_check_timeout_secs: 5,
1579 expected_reject_patterns: vec![],
1580 idempotent_success_patterns: vec![],
1581 proxy_urls: vec![],
1582 };
1583
1584 let broadcaster = CancelBroadcaster::new(config).unwrap();
1585 let metrics = broadcaster.get_metrics_async().await;
1586
1587 assert_eq!(metrics.total_cancels, 0);
1589 assert_eq!(metrics.successful_cancels, 0);
1590 assert_eq!(metrics.failed_cancels, 0);
1591 assert_eq!(metrics.expected_rejects, 0);
1592 assert_eq!(metrics.idempotent_successes, 0);
1593
1594 assert_eq!(metrics.healthy_clients, 4);
1596 assert_eq!(metrics.total_clients, 4);
1597 }
1598
1599 #[tokio::test]
1601 async fn test_health_check_task_lifecycle() {
1602 let config = CancelBroadcasterConfig {
1603 pool_size: 1,
1604 api_key: Some("test_key".to_string()),
1605 api_secret: Some("test_secret".to_string()),
1606 base_url: Some("https://test.example.com".to_string()),
1607 testnet: false,
1608 timeout_secs: Some(5),
1609 max_retries: None,
1610 retry_delay_ms: None,
1611 retry_delay_max_ms: None,
1612 recv_window_ms: None,
1613 max_requests_per_second: None,
1614 max_requests_per_minute: None,
1615 health_check_interval_secs: 1, health_check_timeout_secs: 1,
1617 expected_reject_patterns: vec![],
1618 idempotent_success_patterns: vec![],
1619 proxy_urls: vec![],
1620 };
1621
1622 let broadcaster = CancelBroadcaster::new(config).unwrap();
1623
1624 broadcaster.start().await.unwrap();
1626 assert!(broadcaster.running.load(Ordering::Relaxed));
1627
1628 {
1630 let task_guard = broadcaster.health_check_task.read().await;
1631 assert!(task_guard.is_some());
1632 }
1633
1634 broadcaster.stop().await;
1636 assert!(!broadcaster.running.load(Ordering::Relaxed));
1637
1638 {
1640 let task_guard = broadcaster.health_check_task.read().await;
1641 assert!(task_guard.is_none());
1642 }
1643 }
1644}