1use std::{
34 fmt::Debug,
35 future::Future,
36 pin::Pin,
37 sync::{
38 Arc,
39 atomic::{AtomicBool, AtomicU64, Ordering},
40 },
41 time::Duration,
42};
43
44use futures_util::future;
45use nautilus_model::{
46 enums::OrderSide,
47 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
48 instruments::InstrumentAny,
49 reports::OrderStatusReport,
50};
51use tokio::{sync::RwLock, task::JoinHandle, time::interval};
52
53use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
54
55trait CancelExecutor: Send + Sync {
77 fn add_instrument(&self, instrument: InstrumentAny);
79
80 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
82
83 fn cancel_order(
85 &self,
86 instrument_id: InstrumentId,
87 client_order_id: Option<ClientOrderId>,
88 venue_order_id: Option<VenueOrderId>,
89 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
90
91 fn cancel_orders(
93 &self,
94 instrument_id: InstrumentId,
95 client_order_ids: Option<Vec<ClientOrderId>>,
96 venue_order_ids: Option<Vec<VenueOrderId>>,
97 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
98
99 fn cancel_all_orders(
101 &self,
102 instrument_id: InstrumentId,
103 order_side: Option<OrderSide>,
104 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
105}
106
107impl CancelExecutor for BitmexHttpClient {
108 fn add_instrument(&self, instrument: InstrumentAny) {
109 Self::cache_instrument(self, instrument);
110 }
111
112 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
113 Box::pin(async move {
114 Self::get_server_time(self)
115 .await
116 .map(|_| ())
117 .map_err(|e| anyhow::anyhow!("{e}"))
118 })
119 }
120
121 fn cancel_order(
122 &self,
123 instrument_id: InstrumentId,
124 client_order_id: Option<ClientOrderId>,
125 venue_order_id: Option<VenueOrderId>,
126 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
127 Box::pin(async move {
128 Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
129 })
130 }
131
132 fn cancel_orders(
133 &self,
134 instrument_id: InstrumentId,
135 client_order_ids: Option<Vec<ClientOrderId>>,
136 venue_order_ids: Option<Vec<VenueOrderId>>,
137 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
138 Box::pin(async move {
139 Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
140 })
141 }
142
143 fn cancel_all_orders(
144 &self,
145 instrument_id: InstrumentId,
146 order_side: Option<OrderSide>,
147 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
148 Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct CancelBroadcasterConfig {
155 pub pool_size: usize,
157 pub api_key: Option<String>,
159 pub api_secret: Option<String>,
161 pub base_url: Option<String>,
163 pub testnet: bool,
165 pub timeout_secs: Option<u64>,
167 pub max_retries: Option<u32>,
169 pub retry_delay_ms: Option<u64>,
171 pub retry_delay_max_ms: Option<u64>,
173 pub recv_window_ms: Option<u64>,
175 pub max_requests_per_second: Option<u32>,
177 pub max_requests_per_minute: Option<u32>,
179 pub health_check_interval_secs: u64,
181 pub health_check_timeout_secs: u64,
183 pub expected_reject_patterns: Vec<String>,
185 pub idempotent_success_patterns: Vec<String>,
187 pub proxy_urls: Vec<Option<String>>,
193}
194
195impl Default for CancelBroadcasterConfig {
196 fn default() -> Self {
197 Self {
198 pool_size: 2,
199 api_key: None,
200 api_secret: None,
201 base_url: None,
202 testnet: false,
203 timeout_secs: Some(60),
204 max_retries: None,
205 retry_delay_ms: Some(1_000),
206 retry_delay_max_ms: Some(5_000),
207 recv_window_ms: Some(10_000),
208 max_requests_per_second: Some(10),
209 max_requests_per_minute: Some(120),
210 health_check_interval_secs: 30,
211 health_check_timeout_secs: 5,
212 expected_reject_patterns: vec![
213 r"Order had execInst of ParticipateDoNotInitiate".to_string(),
214 ],
215 idempotent_success_patterns: vec![
216 r"AlreadyCanceled".to_string(),
217 r"orderID not found".to_string(),
218 r"Unable to cancel order due to existing state".to_string(),
219 ],
220 proxy_urls: vec![],
221 }
222 }
223}
224
225#[derive(Clone)]
227struct TransportClient {
228 executor: Arc<dyn CancelExecutor>,
233 client_id: String,
234 healthy: Arc<AtomicBool>,
235 cancel_count: Arc<AtomicU64>,
236 error_count: Arc<AtomicU64>,
237}
238
239impl Debug for TransportClient {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("TransportClient")
242 .field("client_id", &self.client_id)
243 .field("healthy", &self.healthy)
244 .field("cancel_count", &self.cancel_count)
245 .field("error_count", &self.error_count)
246 .finish()
247 }
248}
249
250impl TransportClient {
251 fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
252 Self {
253 executor: Arc::new(executor),
254 client_id,
255 healthy: Arc::new(AtomicBool::new(true)),
256 cancel_count: Arc::new(AtomicU64::new(0)),
257 error_count: Arc::new(AtomicU64::new(0)),
258 }
259 }
260
261 fn is_healthy(&self) -> bool {
262 self.healthy.load(Ordering::Relaxed)
263 }
264
265 fn mark_healthy(&self) {
266 self.healthy.store(true, Ordering::Relaxed);
267 }
268
269 fn mark_unhealthy(&self) {
270 self.healthy.store(false, Ordering::Relaxed);
271 }
272
273 fn get_cancel_count(&self) -> u64 {
274 self.cancel_count.load(Ordering::Relaxed)
275 }
276
277 fn get_error_count(&self) -> u64 {
278 self.error_count.load(Ordering::Relaxed)
279 }
280
281 async fn health_check(&self, timeout_secs: u64) -> bool {
282 match tokio::time::timeout(
283 Duration::from_secs(timeout_secs),
284 self.executor.health_check(),
285 )
286 .await
287 {
288 Ok(Ok(_)) => {
289 self.mark_healthy();
290 true
291 }
292 Ok(Err(e)) => {
293 tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
294 self.mark_unhealthy();
295 false
296 }
297 Err(_) => {
298 tracing::warn!("Health check timeout for client {}", self.client_id);
299 self.mark_unhealthy();
300 false
301 }
302 }
303 }
304
305 async fn cancel_order(
306 &self,
307 instrument_id: InstrumentId,
308 client_order_id: Option<ClientOrderId>,
309 venue_order_id: Option<VenueOrderId>,
310 ) -> anyhow::Result<OrderStatusReport> {
311 self.cancel_count.fetch_add(1, Ordering::Relaxed);
312
313 match self
314 .executor
315 .cancel_order(instrument_id, client_order_id, venue_order_id)
316 .await
317 {
318 Ok(report) => {
319 self.mark_healthy();
320 Ok(report)
321 }
322 Err(e) => {
323 self.error_count.fetch_add(1, Ordering::Relaxed);
324 Err(e)
325 }
326 }
327 }
328}
329
330#[cfg_attr(feature = "python", pyo3::pyclass)]
336#[derive(Debug)]
337pub struct CancelBroadcaster {
338 config: CancelBroadcasterConfig,
339 transports: Arc<Vec<TransportClient>>,
340 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
341 running: Arc<AtomicBool>,
342 total_cancels: Arc<AtomicU64>,
343 successful_cancels: Arc<AtomicU64>,
344 failed_cancels: Arc<AtomicU64>,
345 expected_rejects: Arc<AtomicU64>,
346 idempotent_successes: Arc<AtomicU64>,
347}
348
349impl CancelBroadcaster {
350 pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
356 let mut transports = Vec::with_capacity(config.pool_size);
357
358 let base_url = if config.testnet && config.base_url.is_none() {
360 Some(BITMEX_HTTP_TESTNET_URL.to_string())
361 } else {
362 config.base_url.clone()
363 };
364
365 for i in 0..config.pool_size {
366 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
368
369 let client = BitmexHttpClient::with_credentials(
370 config.api_key.clone(),
371 config.api_secret.clone(),
372 base_url.clone(),
373 config.timeout_secs,
374 config.max_retries,
375 config.retry_delay_ms,
376 config.retry_delay_max_ms,
377 config.recv_window_ms,
378 config.max_requests_per_second,
379 config.max_requests_per_minute,
380 proxy_url,
381 )
382 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
383
384 transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
385 }
386
387 Ok(Self {
388 config,
389 transports: Arc::new(transports),
390 health_check_task: Arc::new(RwLock::new(None)),
391 running: Arc::new(AtomicBool::new(false)),
392 total_cancels: Arc::new(AtomicU64::new(0)),
393 successful_cancels: Arc::new(AtomicU64::new(0)),
394 failed_cancels: Arc::new(AtomicU64::new(0)),
395 expected_rejects: Arc::new(AtomicU64::new(0)),
396 idempotent_successes: Arc::new(AtomicU64::new(0)),
397 })
398 }
399
400 pub async fn start(&self) -> anyhow::Result<()> {
406 if self.running.load(Ordering::Relaxed) {
407 return Ok(());
408 }
409
410 self.running.store(true, Ordering::Relaxed);
411
412 self.run_health_checks().await;
414
415 let transports = Arc::clone(&self.transports);
417 let running = Arc::clone(&self.running);
418 let interval_secs = self.config.health_check_interval_secs;
419 let timeout_secs = self.config.health_check_timeout_secs;
420
421 let task = tokio::spawn(async move {
422 let mut ticker = interval(Duration::from_secs(interval_secs));
423 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
424
425 loop {
426 ticker.tick().await;
427
428 if !running.load(Ordering::Relaxed) {
429 break;
430 }
431
432 let tasks: Vec<_> = transports
433 .iter()
434 .map(|t| t.health_check(timeout_secs))
435 .collect();
436
437 let results = future::join_all(tasks).await;
438 let healthy_count = results.iter().filter(|&&r| r).count();
439
440 tracing::debug!(
441 "Health check complete: {}/{} clients healthy",
442 healthy_count,
443 results.len()
444 );
445 }
446 });
447
448 *self.health_check_task.write().await = Some(task);
449
450 tracing::info!(
451 "CancelBroadcaster started with {} clients",
452 self.transports.len()
453 );
454
455 Ok(())
456 }
457
458 pub async fn stop(&self) {
460 if !self.running.load(Ordering::Relaxed) {
461 return;
462 }
463
464 self.running.store(false, Ordering::Relaxed);
465
466 if let Some(task) = self.health_check_task.write().await.take() {
467 task.abort();
468 }
469
470 tracing::info!("CancelBroadcaster stopped");
471 }
472
473 async fn run_health_checks(&self) {
474 let tasks: Vec<_> = self
475 .transports
476 .iter()
477 .map(|t| t.health_check(self.config.health_check_timeout_secs))
478 .collect();
479
480 let results = future::join_all(tasks).await;
481 let healthy_count = results.iter().filter(|&&r| r).count();
482
483 tracing::debug!(
484 "Health check complete: {}/{} clients healthy",
485 healthy_count,
486 results.len()
487 );
488 }
489
490 fn is_expected_reject(&self, error_message: &str) -> bool {
491 self.config
492 .expected_reject_patterns
493 .iter()
494 .any(|pattern| error_message.contains(pattern))
495 }
496
497 fn is_idempotent_success(&self, error_message: &str) -> bool {
498 self.config
499 .idempotent_success_patterns
500 .iter()
501 .any(|pattern| error_message.contains(pattern))
502 }
503
504 async fn process_cancel_results<T>(
508 &self,
509 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
510 idempotent_result: impl FnOnce() -> anyhow::Result<T>,
511 operation: &str,
512 params: String,
513 idempotent_reason: &str,
514 ) -> anyhow::Result<T>
515 where
516 T: Send + 'static,
517 {
518 let mut errors = Vec::new();
519
520 while !handles.is_empty() {
521 let current_handles = std::mem::take(&mut handles);
522 let (result, _idx, remaining) = future::select_all(current_handles).await;
523 handles = remaining.into_iter().collect();
524
525 match result {
526 Ok((client_id, Ok(result))) => {
527 for handle in &handles {
529 handle.abort();
530 }
531 self.successful_cancels.fetch_add(1, Ordering::Relaxed);
532
533 tracing::debug!(
534 "{} broadcast succeeded [{}] {}",
535 operation,
536 client_id,
537 params
538 );
539
540 return Ok(result);
541 }
542 Ok((client_id, Err(e))) => {
543 let error_msg = e.to_string();
544
545 if self.is_idempotent_success(&error_msg) {
546 for handle in &handles {
548 handle.abort();
549 }
550 self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
551
552 tracing::debug!(
553 "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
554 );
555
556 return idempotent_result();
557 }
558
559 if self.is_expected_reject(&error_msg) {
560 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
561 tracing::debug!(
562 "Expected {} rejection [{}]: {} {}",
563 operation.to_lowercase(),
564 client_id,
565 error_msg,
566 params
567 );
568 errors.push(error_msg);
569 } else {
570 tracing::warn!(
571 "{} request failed [{}]: {} {}",
572 operation,
573 client_id,
574 error_msg,
575 params
576 );
577 errors.push(error_msg);
578 }
579 }
580 Err(e) => {
581 tracing::warn!("{} task join error: {e:?}", operation);
582 errors.push(format!("Task panicked: {e:?}"));
583 }
584 }
585 }
586
587 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
589 tracing::error!(
590 "All {} requests failed: {errors:?} {params}",
591 operation.to_lowercase(),
592 );
593 Err(anyhow::anyhow!(
594 "All {} requests failed: {errors:?}",
595 operation.to_lowercase(),
596 ))
597 }
598
599 pub async fn broadcast_cancel(
611 &self,
612 instrument_id: InstrumentId,
613 client_order_id: Option<ClientOrderId>,
614 venue_order_id: Option<VenueOrderId>,
615 ) -> anyhow::Result<Option<OrderStatusReport>> {
616 self.total_cancels.fetch_add(1, Ordering::Relaxed);
617
618 let healthy_transports: Vec<TransportClient> = self
619 .transports
620 .iter()
621 .filter(|t| t.is_healthy())
622 .cloned()
623 .collect();
624
625 if healthy_transports.is_empty() {
626 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
627 anyhow::bail!("No healthy transport clients available");
628 }
629
630 let mut handles = Vec::new();
631 for transport in healthy_transports {
632 let handle = tokio::spawn(async move {
633 let client_id = transport.client_id.clone();
634 let result = transport
635 .cancel_order(instrument_id, client_order_id, venue_order_id)
636 .await
637 .map(Some); (client_id, result)
639 });
640 handles.push(handle);
641 }
642
643 self.process_cancel_results(
644 handles,
645 || Ok(None),
646 "Cancel",
647 format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
648 "order already cancelled/not found",
649 )
650 .await
651 }
652
653 pub async fn broadcast_batch_cancel(
659 &self,
660 instrument_id: InstrumentId,
661 client_order_ids: Option<Vec<ClientOrderId>>,
662 venue_order_ids: Option<Vec<VenueOrderId>>,
663 ) -> anyhow::Result<Vec<OrderStatusReport>> {
664 self.total_cancels.fetch_add(1, Ordering::Relaxed);
665
666 let healthy_transports: Vec<TransportClient> = self
667 .transports
668 .iter()
669 .filter(|t| t.is_healthy())
670 .cloned()
671 .collect();
672
673 if healthy_transports.is_empty() {
674 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
675 anyhow::bail!("No healthy transport clients available");
676 }
677
678 let mut handles = Vec::new();
679
680 for transport in healthy_transports {
681 let client_order_ids_clone = client_order_ids.clone();
682 let venue_order_ids_clone = venue_order_ids.clone();
683 let handle = tokio::spawn(async move {
684 let client_id = transport.client_id.clone();
685 let result = transport
686 .executor
687 .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
688 .await;
689 (client_id, result)
690 });
691 handles.push(handle);
692 }
693
694 self.process_cancel_results(
695 handles,
696 || Ok(Vec::new()),
697 "Batch cancel",
698 format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
699 "orders already cancelled/not found",
700 )
701 .await
702 }
703
704 pub async fn broadcast_cancel_all(
710 &self,
711 instrument_id: InstrumentId,
712 order_side: Option<OrderSide>,
713 ) -> anyhow::Result<Vec<OrderStatusReport>> {
714 self.total_cancels.fetch_add(1, Ordering::Relaxed);
715
716 let healthy_transports: Vec<TransportClient> = self
717 .transports
718 .iter()
719 .filter(|t| t.is_healthy())
720 .cloned()
721 .collect();
722
723 if healthy_transports.is_empty() {
724 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
725 anyhow::bail!("No healthy transport clients available");
726 }
727
728 let mut handles = Vec::new();
729 for transport in healthy_transports {
730 let handle = tokio::spawn(async move {
731 let client_id = transport.client_id.clone();
732 let result = transport
733 .executor
734 .cancel_all_orders(instrument_id, order_side)
735 .await;
736 (client_id, result)
737 });
738 handles.push(handle);
739 }
740
741 self.process_cancel_results(
742 handles,
743 || Ok(Vec::new()),
744 "Cancel all",
745 format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
746 "no orders to cancel",
747 )
748 .await
749 }
750
751 pub fn get_metrics(&self) -> BroadcasterMetrics {
753 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
754 let total_clients = self.transports.len();
755
756 BroadcasterMetrics {
757 total_cancels: self.total_cancels.load(Ordering::Relaxed),
758 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
759 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
760 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
761 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
762 healthy_clients,
763 total_clients,
764 }
765 }
766
767 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
769 self.get_metrics()
770 }
771
772 pub fn get_client_stats(&self) -> Vec<ClientStats> {
774 self.transports
775 .iter()
776 .map(|t| ClientStats {
777 client_id: t.client_id.clone(),
778 healthy: t.is_healthy(),
779 cancel_count: t.get_cancel_count(),
780 error_count: t.get_error_count(),
781 })
782 .collect()
783 }
784
785 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
787 self.get_client_stats()
788 }
789
790 pub fn cache_instrument(&self, instrument: InstrumentAny) {
792 for transport in self.transports.iter() {
793 transport.executor.add_instrument(instrument.clone());
794 }
795 }
796
797 #[must_use]
798 pub fn clone_for_async(&self) -> Self {
799 Self {
800 config: self.config.clone(),
801 transports: Arc::clone(&self.transports),
802 health_check_task: Arc::clone(&self.health_check_task),
803 running: Arc::clone(&self.running),
804 total_cancels: Arc::clone(&self.total_cancels),
805 successful_cancels: Arc::clone(&self.successful_cancels),
806 failed_cancels: Arc::clone(&self.failed_cancels),
807 expected_rejects: Arc::clone(&self.expected_rejects),
808 idempotent_successes: Arc::clone(&self.idempotent_successes),
809 }
810 }
811
812 #[cfg(test)]
813 fn new_with_transports(
814 config: CancelBroadcasterConfig,
815 transports: Vec<TransportClient>,
816 ) -> Self {
817 Self {
818 config,
819 transports: Arc::new(transports),
820 health_check_task: Arc::new(RwLock::new(None)),
821 running: Arc::new(AtomicBool::new(false)),
822 total_cancels: Arc::new(AtomicU64::new(0)),
823 successful_cancels: Arc::new(AtomicU64::new(0)),
824 failed_cancels: Arc::new(AtomicU64::new(0)),
825 expected_rejects: Arc::new(AtomicU64::new(0)),
826 idempotent_successes: Arc::new(AtomicU64::new(0)),
827 }
828 }
829}
830
831#[derive(Debug, Clone)]
833pub struct BroadcasterMetrics {
834 pub total_cancels: u64,
835 pub successful_cancels: u64,
836 pub failed_cancels: u64,
837 pub expected_rejects: u64,
838 pub idempotent_successes: u64,
839 pub healthy_clients: usize,
840 pub total_clients: usize,
841}
842
843#[derive(Debug, Clone)]
845pub struct ClientStats {
846 pub client_id: String,
847 pub healthy: bool,
848 pub cancel_count: u64,
849 pub error_count: u64,
850}
851
852#[cfg(test)]
853mod tests {
854 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
855
856 use nautilus_core::UUID4;
857 use nautilus_model::{
858 enums::{
859 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
860 },
861 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
862 reports::OrderStatusReport,
863 types::{Price, Quantity},
864 };
865
866 use super::*;
867
868 #[derive(Clone)]
870 #[allow(clippy::type_complexity)]
871 struct MockExecutor {
872 handler: Arc<
873 dyn Fn(
874 InstrumentId,
875 Option<ClientOrderId>,
876 Option<VenueOrderId>,
877 )
878 -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
879 + Send
880 + Sync,
881 >,
882 }
883
884 impl MockExecutor {
885 fn new<F, Fut>(handler: F) -> Self
886 where
887 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
888 + Send
889 + Sync
890 + 'static,
891 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
892 {
893 Self {
894 handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
895 }
896 }
897 }
898
899 impl CancelExecutor for MockExecutor {
900 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
901 Box::pin(async { Ok(()) })
902 }
903
904 fn cancel_order(
905 &self,
906 instrument_id: InstrumentId,
907 client_order_id: Option<ClientOrderId>,
908 venue_order_id: Option<VenueOrderId>,
909 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
910 (self.handler)(instrument_id, client_order_id, venue_order_id)
911 }
912
913 fn cancel_orders(
914 &self,
915 _instrument_id: InstrumentId,
916 _client_order_ids: Option<Vec<ClientOrderId>>,
917 _venue_order_ids: Option<Vec<VenueOrderId>>,
918 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
919 {
920 Box::pin(async { Ok(Vec::new()) })
921 }
922
923 fn cancel_all_orders(
924 &self,
925 instrument_id: InstrumentId,
926 _order_side: Option<OrderSide>,
927 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
928 {
929 let handler = Arc::clone(&self.handler);
931 Box::pin(async move {
932 let result = handler(instrument_id, None, None).await;
934 match result {
935 Ok(_) => Ok(Vec::new()),
936 Err(e) => Err(e),
937 }
938 })
939 }
940
941 fn add_instrument(&self, _instrument: InstrumentAny) {
942 }
944 }
945
946 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
947 OrderStatusReport {
948 account_id: AccountId::from("BITMEX-001"),
949 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
950 venue_order_id: VenueOrderId::from(venue_order_id),
951 order_side: OrderSide::Buy,
952 order_type: OrderType::Limit,
953 time_in_force: TimeInForce::Gtc,
954 order_status: OrderStatus::Canceled,
955 price: Some(Price::new(50000.0, 2)),
956 quantity: Quantity::new(100.0, 0),
957 filled_qty: Quantity::new(0.0, 0),
958 report_id: UUID4::new(),
959 ts_accepted: 0.into(),
960 ts_last: 0.into(),
961 ts_init: 0.into(),
962 client_order_id: None,
963 avg_px: None,
964 trigger_price: None,
965 trigger_type: None,
966 contingency_type: ContingencyType::NoContingency,
967 expire_time: None,
968 order_list_id: None,
969 venue_position_id: None,
970 linked_order_ids: None,
971 parent_order_id: None,
972 display_qty: None,
973 limit_offset: None,
974 trailing_offset: None,
975 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
976 post_only: false,
977 reduce_only: false,
978 cancel_reason: None,
979 ts_triggered: None,
980 }
981 }
982
983 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
984 where
985 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
986 + Send
987 + Sync
988 + 'static,
989 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
990 {
991 let executor = MockExecutor::new(handler);
992 TransportClient::new(executor, client_id.to_string())
993 }
994
995 #[tokio::test]
996 async fn test_broadcast_cancel_immediate_success() {
997 let report = create_test_report("ORDER-1");
998 let report_clone = report.clone();
999
1000 let transports = vec![
1001 create_stub_transport("client-0", move |_, _, _| {
1002 let report = report_clone.clone();
1003 async move { Ok(report) }
1004 }),
1005 create_stub_transport("client-1", |_, _, _| async {
1006 tokio::time::sleep(Duration::from_secs(10)).await;
1007 anyhow::bail!("Should be aborted")
1008 }),
1009 ];
1010
1011 let config = CancelBroadcasterConfig::default();
1012 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1013
1014 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1015 let result = broadcaster
1016 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1017 .await;
1018
1019 assert!(result.is_ok());
1020 let returned_report = result.unwrap();
1021 assert!(returned_report.is_some());
1022 assert_eq!(
1023 returned_report.unwrap().venue_order_id,
1024 report.venue_order_id
1025 );
1026
1027 let metrics = broadcaster.get_metrics_async().await;
1028 assert_eq!(metrics.successful_cancels, 1);
1029 assert_eq!(metrics.failed_cancels, 0);
1030 assert_eq!(metrics.total_cancels, 1);
1031 }
1032
1033 #[tokio::test]
1034 async fn test_broadcast_cancel_idempotent_success() {
1035 let transports = vec![
1036 create_stub_transport("client-0", |_, _, _| async {
1037 anyhow::bail!("AlreadyCanceled")
1038 }),
1039 create_stub_transport("client-1", |_, _, _| async {
1040 tokio::time::sleep(Duration::from_secs(10)).await;
1041 anyhow::bail!("Should be aborted")
1042 }),
1043 ];
1044
1045 let config = CancelBroadcasterConfig::default();
1046 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1047
1048 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1049 let result = broadcaster
1050 .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1051 .await;
1052
1053 assert!(result.is_ok());
1054 assert!(result.unwrap().is_none());
1055
1056 let metrics = broadcaster.get_metrics_async().await;
1057 assert_eq!(metrics.idempotent_successes, 1);
1058 assert_eq!(metrics.successful_cancels, 0);
1059 assert_eq!(metrics.failed_cancels, 0);
1060 }
1061
1062 #[tokio::test]
1063 async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1064 let transports = vec![
1065 create_stub_transport("client-0", |_, _, _| async {
1066 anyhow::bail!("502 Bad Gateway")
1067 }),
1068 create_stub_transport("client-1", |_, _, _| async {
1069 anyhow::bail!("orderID not found")
1070 }),
1071 ];
1072
1073 let config = CancelBroadcasterConfig::default();
1074 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1075
1076 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1077 let result = broadcaster
1078 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1079 .await;
1080
1081 assert!(result.is_ok());
1082 assert!(result.unwrap().is_none());
1083
1084 let metrics = broadcaster.get_metrics_async().await;
1085 assert_eq!(metrics.idempotent_successes, 1);
1086 assert_eq!(metrics.failed_cancels, 0);
1087 }
1088
1089 #[tokio::test]
1090 async fn test_broadcast_cancel_all_failures() {
1091 let transports = vec![
1092 create_stub_transport("client-0", |_, _, _| async {
1093 anyhow::bail!("502 Bad Gateway")
1094 }),
1095 create_stub_transport("client-1", |_, _, _| async {
1096 anyhow::bail!("Connection refused")
1097 }),
1098 ];
1099
1100 let config = CancelBroadcasterConfig::default();
1101 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1102
1103 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1104 let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1105
1106 assert!(result.is_err());
1107 assert!(
1108 result
1109 .unwrap_err()
1110 .to_string()
1111 .contains("All cancel all requests failed")
1112 );
1113
1114 let metrics = broadcaster.get_metrics_async().await;
1115 assert_eq!(metrics.failed_cancels, 1);
1116 assert_eq!(metrics.successful_cancels, 0);
1117 assert_eq!(metrics.idempotent_successes, 0);
1118 }
1119
1120 #[tokio::test]
1121 async fn test_broadcast_cancel_no_healthy_clients() {
1122 let transport = create_stub_transport("client-0", |_, _, _| async {
1123 Ok(create_test_report("ORDER-1"))
1124 });
1125 transport.healthy.store(false, Ordering::Relaxed);
1126
1127 let config = CancelBroadcasterConfig::default();
1128 let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1129
1130 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1131 let result = broadcaster
1132 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1133 .await;
1134
1135 assert!(result.is_err());
1136 assert!(
1137 result
1138 .unwrap_err()
1139 .to_string()
1140 .contains("No healthy transport clients available")
1141 );
1142
1143 let metrics = broadcaster.get_metrics_async().await;
1144 assert_eq!(metrics.failed_cancels, 1);
1145 }
1146
1147 #[tokio::test]
1148 async fn test_broadcast_cancel_metrics_increment() {
1149 let report1 = create_test_report("ORDER-1");
1150 let report1_clone = report1.clone();
1151 let report2 = create_test_report("ORDER-2");
1152 let report2_clone = report2.clone();
1153
1154 let transports = vec![
1155 create_stub_transport("client-0", move |_, _, _| {
1156 let report = report1_clone.clone();
1157 async move { Ok(report) }
1158 }),
1159 create_stub_transport("client-1", move |_, _, _| {
1160 let report = report2_clone.clone();
1161 async move { Ok(report) }
1162 }),
1163 ];
1164
1165 let config = CancelBroadcasterConfig::default();
1166 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1167
1168 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1169
1170 let _ = broadcaster
1171 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1172 .await;
1173
1174 let _ = broadcaster
1175 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1176 .await;
1177
1178 let metrics = broadcaster.get_metrics_async().await;
1179 assert_eq!(metrics.total_cancels, 2);
1180 assert_eq!(metrics.successful_cancels, 2);
1181 }
1182
1183 #[tokio::test]
1184 async fn test_broadcast_cancel_expected_reject_pattern() {
1185 let transports = vec![
1186 create_stub_transport("client-0", |_, _, _| async {
1187 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1188 }),
1189 create_stub_transport("client-1", |_, _, _| async {
1190 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1191 }),
1192 ];
1193
1194 let config = CancelBroadcasterConfig::default();
1195 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1196
1197 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1198 let result = broadcaster
1199 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1200 .await;
1201
1202 assert!(result.is_err());
1203
1204 let metrics = broadcaster.get_metrics_async().await;
1205 assert_eq!(metrics.expected_rejects, 2);
1206 assert_eq!(metrics.failed_cancels, 1);
1207 }
1208
1209 #[tokio::test]
1210 async fn test_broadcaster_creation_with_pool() {
1211 let config = CancelBroadcasterConfig {
1212 pool_size: 3,
1213 api_key: Some("test_key".to_string()),
1214 api_secret: Some("test_secret".to_string()),
1215 base_url: Some("https://test.example.com".to_string()),
1216 testnet: false,
1217 timeout_secs: Some(5),
1218 max_retries: Some(2),
1219 retry_delay_ms: Some(100),
1220 retry_delay_max_ms: Some(1000),
1221 recv_window_ms: Some(5000),
1222 max_requests_per_second: Some(10),
1223 max_requests_per_minute: Some(100),
1224 health_check_interval_secs: 30,
1225 health_check_timeout_secs: 5,
1226 expected_reject_patterns: vec!["test_pattern".to_string()],
1227 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1228 proxy_urls: vec![],
1229 };
1230
1231 let broadcaster = CancelBroadcaster::new(config.clone());
1232 assert!(broadcaster.is_ok());
1233
1234 let broadcaster = broadcaster.unwrap();
1235 let metrics = broadcaster.get_metrics_async().await;
1236
1237 assert_eq!(metrics.total_clients, 3);
1238 assert_eq!(metrics.total_cancels, 0);
1239 assert_eq!(metrics.successful_cancels, 0);
1240 assert_eq!(metrics.failed_cancels, 0);
1241 }
1242
1243 #[tokio::test]
1244 async fn test_broadcaster_lifecycle() {
1245 let config = CancelBroadcasterConfig {
1246 pool_size: 2,
1247 api_key: Some("test_key".to_string()),
1248 api_secret: Some("test_secret".to_string()),
1249 base_url: Some("https://test.example.com".to_string()),
1250 testnet: false,
1251 timeout_secs: Some(5),
1252 max_retries: None,
1253 retry_delay_ms: None,
1254 retry_delay_max_ms: None,
1255 recv_window_ms: None,
1256 max_requests_per_second: None,
1257 max_requests_per_minute: None,
1258 health_check_interval_secs: 60, health_check_timeout_secs: 1,
1260 expected_reject_patterns: vec![],
1261 idempotent_success_patterns: vec![],
1262 proxy_urls: vec![],
1263 };
1264
1265 let broadcaster = CancelBroadcaster::new(config).unwrap();
1266
1267 assert!(!broadcaster.running.load(Ordering::Relaxed));
1269
1270 let start_result = broadcaster.start().await;
1272 assert!(start_result.is_ok());
1273 assert!(broadcaster.running.load(Ordering::Relaxed));
1274
1275 let start_again = broadcaster.start().await;
1277 assert!(start_again.is_ok());
1278
1279 broadcaster.stop().await;
1281 assert!(!broadcaster.running.load(Ordering::Relaxed));
1282
1283 broadcaster.stop().await;
1285 assert!(!broadcaster.running.load(Ordering::Relaxed));
1286 }
1287
1288 #[tokio::test]
1289 async fn test_client_stats_collection() {
1290 let config = CancelBroadcasterConfig {
1291 pool_size: 2,
1292 api_key: Some("test_key".to_string()),
1293 api_secret: Some("test_secret".to_string()),
1294 base_url: Some("https://test.example.com".to_string()),
1295 testnet: false,
1296 timeout_secs: Some(5),
1297 max_retries: None,
1298 retry_delay_ms: None,
1299 retry_delay_max_ms: None,
1300 recv_window_ms: None,
1301 max_requests_per_second: None,
1302 max_requests_per_minute: None,
1303 health_check_interval_secs: 60,
1304 health_check_timeout_secs: 5,
1305 expected_reject_patterns: vec![],
1306 idempotent_success_patterns: vec![],
1307 proxy_urls: vec![],
1308 };
1309
1310 let broadcaster = CancelBroadcaster::new(config).unwrap();
1311 let stats = broadcaster.get_client_stats_async().await;
1312
1313 assert_eq!(stats.len(), 2);
1314 assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1315 assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1316 assert!(stats[0].healthy); assert!(stats[1].healthy);
1318 assert_eq!(stats[0].cancel_count, 0);
1319 assert_eq!(stats[1].cancel_count, 0);
1320 assert_eq!(stats[0].error_count, 0);
1321 assert_eq!(stats[1].error_count, 0);
1322 }
1323
1324 #[tokio::test]
1325 async fn test_testnet_config_sets_base_url() {
1326 let config = CancelBroadcasterConfig {
1327 pool_size: 1,
1328 api_key: Some("test_key".to_string()),
1329 api_secret: Some("test_secret".to_string()),
1330 base_url: None, testnet: true, timeout_secs: Some(5),
1333 max_retries: None,
1334 retry_delay_ms: None,
1335 retry_delay_max_ms: None,
1336 recv_window_ms: None,
1337 max_requests_per_second: None,
1338 max_requests_per_minute: None,
1339 health_check_interval_secs: 60,
1340 health_check_timeout_secs: 5,
1341 expected_reject_patterns: vec![],
1342 idempotent_success_patterns: vec![],
1343 proxy_urls: vec![],
1344 };
1345
1346 let broadcaster = CancelBroadcaster::new(config);
1347 assert!(broadcaster.is_ok());
1348 }
1349
1350 #[tokio::test]
1351 async fn test_default_config() {
1352 let config = CancelBroadcasterConfig {
1353 api_key: Some("test_key".to_string()),
1354 api_secret: Some("test_secret".to_string()),
1355 base_url: Some("https://test.example.com".to_string()),
1356 ..Default::default()
1357 };
1358
1359 let broadcaster = CancelBroadcaster::new(config);
1360 assert!(broadcaster.is_ok());
1361
1362 let broadcaster = broadcaster.unwrap();
1363 let metrics = broadcaster.get_metrics_async().await;
1364
1365 assert_eq!(metrics.total_clients, 2);
1367 }
1368
1369 #[tokio::test]
1370 async fn test_clone_for_async() {
1371 let config = CancelBroadcasterConfig {
1372 pool_size: 1,
1373 api_key: Some("test_key".to_string()),
1374 api_secret: Some("test_secret".to_string()),
1375 base_url: Some("https://test.example.com".to_string()),
1376 testnet: false,
1377 timeout_secs: Some(5),
1378 max_retries: None,
1379 retry_delay_ms: None,
1380 retry_delay_max_ms: None,
1381 recv_window_ms: None,
1382 max_requests_per_second: None,
1383 max_requests_per_minute: None,
1384 health_check_interval_secs: 60,
1385 health_check_timeout_secs: 5,
1386 expected_reject_patterns: vec![],
1387 idempotent_success_patterns: vec![],
1388 proxy_urls: vec![],
1389 };
1390
1391 let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1392
1393 broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1395
1396 let broadcaster2 = broadcaster1.clone_for_async();
1398 let metrics2 = broadcaster2.get_metrics_async().await;
1399
1400 assert_eq!(metrics2.total_cancels, 1); broadcaster2
1404 .successful_cancels
1405 .fetch_add(5, Ordering::Relaxed);
1406
1407 let metrics1 = broadcaster1.get_metrics_async().await;
1409 assert_eq!(metrics1.successful_cancels, 5);
1410 }
1411
1412 #[tokio::test]
1413 async fn test_pattern_matching() {
1414 let config = CancelBroadcasterConfig {
1416 pool_size: 1,
1417 api_key: Some("test_key".to_string()),
1418 api_secret: Some("test_secret".to_string()),
1419 base_url: Some("https://test.example.com".to_string()),
1420 testnet: false,
1421 timeout_secs: Some(5),
1422 max_retries: None,
1423 retry_delay_ms: None,
1424 retry_delay_max_ms: None,
1425 recv_window_ms: None,
1426 max_requests_per_second: None,
1427 max_requests_per_minute: None,
1428 health_check_interval_secs: 60,
1429 health_check_timeout_secs: 5,
1430 expected_reject_patterns: vec![
1431 "ParticipateDoNotInitiate".to_string(),
1432 "Close-only".to_string(),
1433 ],
1434 idempotent_success_patterns: vec![
1435 "AlreadyCanceled".to_string(),
1436 "orderID not found".to_string(),
1437 "Unable to cancel".to_string(),
1438 ],
1439 proxy_urls: vec![],
1440 };
1441
1442 let broadcaster = CancelBroadcaster::new(config).unwrap();
1443
1444 assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1446 assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1447 assert!(!broadcaster.is_expected_reject("Connection timeout"));
1448
1449 assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1451 assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1452 assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1453 assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1454 }
1455
1456 #[tokio::test]
1460 async fn test_broadcast_batch_cancel_structure() {
1461 let config = CancelBroadcasterConfig {
1463 pool_size: 2,
1464 api_key: Some("test_key".to_string()),
1465 api_secret: Some("test_secret".to_string()),
1466 base_url: Some("https://test.example.com".to_string()),
1467 testnet: false,
1468 timeout_secs: Some(5),
1469 max_retries: None,
1470 retry_delay_ms: None,
1471 retry_delay_max_ms: None,
1472 recv_window_ms: None,
1473 max_requests_per_second: None,
1474 max_requests_per_minute: None,
1475 health_check_interval_secs: 60,
1476 health_check_timeout_secs: 5,
1477 expected_reject_patterns: vec![],
1478 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1479 proxy_urls: vec![],
1480 };
1481
1482 let broadcaster = CancelBroadcaster::new(config).unwrap();
1483 let metrics = broadcaster.get_metrics_async().await;
1484
1485 assert_eq!(metrics.total_clients, 2);
1487 assert_eq!(metrics.total_cancels, 0);
1488 assert_eq!(metrics.successful_cancels, 0);
1489 assert_eq!(metrics.failed_cancels, 0);
1490 }
1491
1492 #[tokio::test]
1493 async fn test_broadcast_cancel_all_structure() {
1494 let config = CancelBroadcasterConfig {
1496 pool_size: 3,
1497 api_key: Some("test_key".to_string()),
1498 api_secret: Some("test_secret".to_string()),
1499 base_url: Some("https://test.example.com".to_string()),
1500 testnet: false,
1501 timeout_secs: Some(5),
1502 max_retries: None,
1503 retry_delay_ms: None,
1504 retry_delay_max_ms: None,
1505 recv_window_ms: None,
1506 max_requests_per_second: None,
1507 max_requests_per_minute: None,
1508 health_check_interval_secs: 60,
1509 health_check_timeout_secs: 5,
1510 expected_reject_patterns: vec![],
1511 idempotent_success_patterns: vec!["orderID not found".to_string()],
1512 proxy_urls: vec![],
1513 };
1514
1515 let broadcaster = CancelBroadcaster::new(config).unwrap();
1516 let metrics = broadcaster.get_metrics_async().await;
1517
1518 assert_eq!(metrics.total_clients, 3);
1520 assert_eq!(metrics.healthy_clients, 3);
1521 assert_eq!(metrics.total_cancels, 0);
1522 }
1523
1524 #[tokio::test]
1526 async fn test_single_cancel_metrics_with_mixed_responses() {
1527 let transports = vec![
1530 create_stub_transport("client-0", |_, _, _| async {
1531 anyhow::bail!("Connection timeout")
1532 }),
1533 create_stub_transport("client-1", |_, _, _| async {
1534 anyhow::bail!("AlreadyCanceled")
1535 }),
1536 ];
1537
1538 let config = CancelBroadcasterConfig::default();
1539 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1540
1541 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1542 let result = broadcaster
1543 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1544 .await;
1545
1546 assert!(result.is_ok());
1548 assert!(result.unwrap().is_none());
1549
1550 let metrics = broadcaster.get_metrics_async().await;
1552 assert_eq!(
1553 metrics.failed_cancels, 0,
1554 "Idempotent success should not increment failed_cancels"
1555 );
1556 assert_eq!(metrics.idempotent_successes, 1);
1557 assert_eq!(metrics.successful_cancels, 0);
1558 }
1559
1560 #[tokio::test]
1561 async fn test_metrics_initialization_and_health() {
1562 let config = CancelBroadcasterConfig {
1564 pool_size: 4,
1565 api_key: Some("test_key".to_string()),
1566 api_secret: Some("test_secret".to_string()),
1567 base_url: Some("https://test.example.com".to_string()),
1568 testnet: false,
1569 timeout_secs: Some(5),
1570 max_retries: None,
1571 retry_delay_ms: None,
1572 retry_delay_max_ms: None,
1573 recv_window_ms: None,
1574 max_requests_per_second: None,
1575 max_requests_per_minute: None,
1576 health_check_interval_secs: 60,
1577 health_check_timeout_secs: 5,
1578 expected_reject_patterns: vec![],
1579 idempotent_success_patterns: vec![],
1580 proxy_urls: vec![],
1581 };
1582
1583 let broadcaster = CancelBroadcaster::new(config).unwrap();
1584 let metrics = broadcaster.get_metrics_async().await;
1585
1586 assert_eq!(metrics.total_cancels, 0);
1588 assert_eq!(metrics.successful_cancels, 0);
1589 assert_eq!(metrics.failed_cancels, 0);
1590 assert_eq!(metrics.expected_rejects, 0);
1591 assert_eq!(metrics.idempotent_successes, 0);
1592
1593 assert_eq!(metrics.healthy_clients, 4);
1595 assert_eq!(metrics.total_clients, 4);
1596 }
1597
1598 #[tokio::test]
1600 async fn test_health_check_task_lifecycle() {
1601 let config = CancelBroadcasterConfig {
1602 pool_size: 1,
1603 api_key: Some("test_key".to_string()),
1604 api_secret: Some("test_secret".to_string()),
1605 base_url: Some("https://test.example.com".to_string()),
1606 testnet: false,
1607 timeout_secs: Some(5),
1608 max_retries: None,
1609 retry_delay_ms: None,
1610 retry_delay_max_ms: None,
1611 recv_window_ms: None,
1612 max_requests_per_second: None,
1613 max_requests_per_minute: None,
1614 health_check_interval_secs: 1, health_check_timeout_secs: 1,
1616 expected_reject_patterns: vec![],
1617 idempotent_success_patterns: vec![],
1618 proxy_urls: vec![],
1619 };
1620
1621 let broadcaster = CancelBroadcaster::new(config).unwrap();
1622
1623 broadcaster.start().await.unwrap();
1625 assert!(broadcaster.running.load(Ordering::Relaxed));
1626
1627 {
1629 let task_guard = broadcaster.health_check_task.read().await;
1630 assert!(task_guard.is_some());
1631 }
1632
1633 tokio::time::sleep(Duration::from_millis(100)).await;
1635
1636 broadcaster.stop().await;
1638 assert!(!broadcaster.running.load(Ordering::Relaxed));
1639
1640 {
1642 let task_guard = broadcaster.health_check_task.read().await;
1643 assert!(task_guard.is_none());
1644 }
1645 }
1646}