nautilus_bitmex/execution/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Cancel request broadcaster for redundant order cancellation.
17//!
18//! This module provides the [`CancelBroadcaster`] which fans out cancel requests
19//! to multiple HTTP clients in parallel for redundancy. Key design patterns:
20//!
21//! - **Dependency injection via traits**: Uses `CancelExecutor` trait to abstract
22//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
23//! - **Trait objects over generics**: Uses `Arc<dyn CancelExecutor>` to avoid
24//!   generic type parameters on the public API (simpler Python FFI).
25//! - **Short-circuit on first success**: Aborts remaining requests once any client
26//!   succeeds, minimizing latency.
27//! - **Idempotent success handling**: Recognizes "already cancelled" responses as
28//!   successful outcomes.
29
30// TODO: Replace boxed futures in `CancelExecutor` once stable async trait object support
31// lands so we can drop the per-call heap allocation
32
33use std::{
34    fmt::Debug,
35    future::Future,
36    pin::Pin,
37    sync::{
38        Arc,
39        atomic::{AtomicBool, AtomicU64, Ordering},
40    },
41    time::Duration,
42};
43
44use futures_util::future;
45use nautilus_model::{
46    enums::OrderSide,
47    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
48    instruments::InstrumentAny,
49    reports::OrderStatusReport,
50};
51use tokio::{sync::RwLock, task::JoinHandle, time::interval};
52
53use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
54
55/// Trait for order cancellation operations.
56///
57/// This trait abstracts the execution layer to enable dependency injection and testing
58/// without conditional compilation. The broadcaster holds executors as `Arc<dyn CancelExecutor>`
59/// to avoid generic type parameters that would complicate the Python FFI boundary.
60///
61/// # Thread Safety
62///
63/// All methods must be safe to call concurrently from multiple threads. Implementations
64/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
65///
66/// # Error Handling
67///
68/// Methods return `anyhow::Result` for flexibility. Implementers should provide
69/// meaningful error messages that can be logged and tracked by the broadcaster.
70///
71/// # Implementation Note
72///
73/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
74/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
75/// `Clone`) to be used without modification.
76trait CancelExecutor: Send + Sync {
77    /// Adds an instrument for caching.
78    fn add_instrument(&self, instrument: InstrumentAny);
79
80    /// Performs a health check on the executor.
81    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
82
83    /// Cancels a single order.
84    fn cancel_order(
85        &self,
86        instrument_id: InstrumentId,
87        client_order_id: Option<ClientOrderId>,
88        venue_order_id: Option<VenueOrderId>,
89    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
90
91    /// Cancels multiple orders.
92    fn cancel_orders(
93        &self,
94        instrument_id: InstrumentId,
95        client_order_ids: Option<Vec<ClientOrderId>>,
96        venue_order_ids: Option<Vec<VenueOrderId>>,
97    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
98
99    /// Cancels all orders for an instrument.
100    fn cancel_all_orders(
101        &self,
102        instrument_id: InstrumentId,
103        order_side: Option<OrderSide>,
104    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
105}
106
107impl CancelExecutor for BitmexHttpClient {
108    fn add_instrument(&self, instrument: InstrumentAny) {
109        Self::cache_instrument(self, instrument);
110    }
111
112    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
113        Box::pin(async move {
114            Self::get_server_time(self)
115                .await
116                .map(|_| ())
117                .map_err(|e| anyhow::anyhow!("{e}"))
118        })
119    }
120
121    fn cancel_order(
122        &self,
123        instrument_id: InstrumentId,
124        client_order_id: Option<ClientOrderId>,
125        venue_order_id: Option<VenueOrderId>,
126    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
127        Box::pin(async move {
128            Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
129        })
130    }
131
132    fn cancel_orders(
133        &self,
134        instrument_id: InstrumentId,
135        client_order_ids: Option<Vec<ClientOrderId>>,
136        venue_order_ids: Option<Vec<VenueOrderId>>,
137    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
138        Box::pin(async move {
139            Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
140        })
141    }
142
143    fn cancel_all_orders(
144        &self,
145        instrument_id: InstrumentId,
146        order_side: Option<OrderSide>,
147    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
148        Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
149    }
150}
151
152/// Configuration for the cancel broadcaster.
153#[derive(Debug, Clone)]
154pub struct CancelBroadcasterConfig {
155    /// Number of HTTP clients in the pool.
156    pub pool_size: usize,
157    /// BitMEX API key (None will source from environment).
158    pub api_key: Option<String>,
159    /// BitMEX API secret (None will source from environment).
160    pub api_secret: Option<String>,
161    /// Base URL for BitMEX HTTP API.
162    pub base_url: Option<String>,
163    /// If connecting to BitMEX testnet.
164    pub testnet: bool,
165    /// Timeout in seconds for HTTP requests.
166    pub timeout_secs: Option<u64>,
167    /// Maximum number of retry attempts for failed requests.
168    pub max_retries: Option<u32>,
169    /// Initial delay in milliseconds between retry attempts.
170    pub retry_delay_ms: Option<u64>,
171    /// Maximum delay in milliseconds between retry attempts.
172    pub retry_delay_max_ms: Option<u64>,
173    /// Expiration window in milliseconds for signed requests.
174    pub recv_window_ms: Option<u64>,
175    /// Maximum REST burst rate (requests per second).
176    pub max_requests_per_second: Option<u32>,
177    /// Maximum REST rolling rate (requests per minute).
178    pub max_requests_per_minute: Option<u32>,
179    /// Interval in seconds between health check pings.
180    pub health_check_interval_secs: u64,
181    /// Timeout in seconds for health check requests.
182    pub health_check_timeout_secs: u64,
183    /// Substrings to identify expected cancel rejections for debug-level logging.
184    pub expected_reject_patterns: Vec<String>,
185    /// Substrings to identify idempotent success (order already cancelled/not found).
186    pub idempotent_success_patterns: Vec<String>,
187    /// Optional list of proxy URLs for path diversity.
188    ///
189    /// Each transport instance uses the proxy at its index. If the list is shorter
190    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
191    /// are ignored.
192    pub proxy_urls: Vec<Option<String>>,
193}
194
195impl Default for CancelBroadcasterConfig {
196    fn default() -> Self {
197        Self {
198            pool_size: 2,
199            api_key: None,
200            api_secret: None,
201            base_url: None,
202            testnet: false,
203            timeout_secs: Some(60),
204            max_retries: None,
205            retry_delay_ms: Some(1_000),
206            retry_delay_max_ms: Some(5_000),
207            recv_window_ms: Some(10_000),
208            max_requests_per_second: Some(10),
209            max_requests_per_minute: Some(120),
210            health_check_interval_secs: 30,
211            health_check_timeout_secs: 5,
212            expected_reject_patterns: vec![
213                r"Order had execInst of ParticipateDoNotInitiate".to_string(),
214            ],
215            idempotent_success_patterns: vec![
216                r"AlreadyCanceled".to_string(),
217                r"orderID not found".to_string(),
218                r"Unable to cancel order due to existing state".to_string(),
219            ],
220            proxy_urls: vec![],
221        }
222    }
223}
224
225/// Transport client wrapper with health monitoring.
226#[derive(Clone)]
227struct TransportClient {
228    /// Executor wrapped in Arc to enable cloning without requiring Clone on CancelExecutor.
229    ///
230    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
231    /// the executor across multiple TransportClient clones.
232    executor: Arc<dyn CancelExecutor>,
233    client_id: String,
234    healthy: Arc<AtomicBool>,
235    cancel_count: Arc<AtomicU64>,
236    error_count: Arc<AtomicU64>,
237}
238
239impl Debug for TransportClient {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("TransportClient")
242            .field("client_id", &self.client_id)
243            .field("healthy", &self.healthy)
244            .field("cancel_count", &self.cancel_count)
245            .field("error_count", &self.error_count)
246            .finish()
247    }
248}
249
250impl TransportClient {
251    fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
252        Self {
253            executor: Arc::new(executor),
254            client_id,
255            healthy: Arc::new(AtomicBool::new(true)),
256            cancel_count: Arc::new(AtomicU64::new(0)),
257            error_count: Arc::new(AtomicU64::new(0)),
258        }
259    }
260
261    fn is_healthy(&self) -> bool {
262        self.healthy.load(Ordering::Relaxed)
263    }
264
265    fn mark_healthy(&self) {
266        self.healthy.store(true, Ordering::Relaxed);
267    }
268
269    fn mark_unhealthy(&self) {
270        self.healthy.store(false, Ordering::Relaxed);
271    }
272
273    fn get_cancel_count(&self) -> u64 {
274        self.cancel_count.load(Ordering::Relaxed)
275    }
276
277    fn get_error_count(&self) -> u64 {
278        self.error_count.load(Ordering::Relaxed)
279    }
280
281    async fn health_check(&self, timeout_secs: u64) -> bool {
282        match tokio::time::timeout(
283            Duration::from_secs(timeout_secs),
284            self.executor.health_check(),
285        )
286        .await
287        {
288            Ok(Ok(_)) => {
289                self.mark_healthy();
290                true
291            }
292            Ok(Err(e)) => {
293                tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
294                self.mark_unhealthy();
295                false
296            }
297            Err(_) => {
298                tracing::warn!("Health check timeout for client {}", self.client_id);
299                self.mark_unhealthy();
300                false
301            }
302        }
303    }
304
305    async fn cancel_order(
306        &self,
307        instrument_id: InstrumentId,
308        client_order_id: Option<ClientOrderId>,
309        venue_order_id: Option<VenueOrderId>,
310    ) -> anyhow::Result<OrderStatusReport> {
311        self.cancel_count.fetch_add(1, Ordering::Relaxed);
312
313        match self
314            .executor
315            .cancel_order(instrument_id, client_order_id, venue_order_id)
316            .await
317        {
318            Ok(report) => {
319                self.mark_healthy();
320                Ok(report)
321            }
322            Err(e) => {
323                self.error_count.fetch_add(1, Ordering::Relaxed);
324                Err(e)
325            }
326        }
327    }
328}
329
330/// Broadcasts cancel requests to multiple HTTP clients for redundancy.
331///
332/// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
333/// in parallel, short-circuits when the first successful acknowledgement is received,
334/// and handles expected rejection patterns with appropriate log levels.
335#[cfg_attr(feature = "python", pyo3::pyclass)]
336#[derive(Debug)]
337pub struct CancelBroadcaster {
338    config: CancelBroadcasterConfig,
339    transports: Arc<Vec<TransportClient>>,
340    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
341    running: Arc<AtomicBool>,
342    total_cancels: Arc<AtomicU64>,
343    successful_cancels: Arc<AtomicU64>,
344    failed_cancels: Arc<AtomicU64>,
345    expected_rejects: Arc<AtomicU64>,
346    idempotent_successes: Arc<AtomicU64>,
347}
348
349impl CancelBroadcaster {
350    /// Creates a new [`CancelBroadcaster`] with internal HTTP client pool.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if any HTTP client fails to initialize.
355    pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
356        let mut transports = Vec::with_capacity(config.pool_size);
357
358        // Synthesize base_url when testnet is true but base_url is None
359        let base_url = if config.testnet && config.base_url.is_none() {
360            Some(BITMEX_HTTP_TESTNET_URL.to_string())
361        } else {
362            config.base_url.clone()
363        };
364
365        for i in 0..config.pool_size {
366            // Assign proxy from config list, or None if index exceeds list length
367            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
368
369            let client = BitmexHttpClient::with_credentials(
370                config.api_key.clone(),
371                config.api_secret.clone(),
372                base_url.clone(),
373                config.timeout_secs,
374                config.max_retries,
375                config.retry_delay_ms,
376                config.retry_delay_max_ms,
377                config.recv_window_ms,
378                config.max_requests_per_second,
379                config.max_requests_per_minute,
380                proxy_url,
381            )
382            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
383
384            transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
385        }
386
387        Ok(Self {
388            config,
389            transports: Arc::new(transports),
390            health_check_task: Arc::new(RwLock::new(None)),
391            running: Arc::new(AtomicBool::new(false)),
392            total_cancels: Arc::new(AtomicU64::new(0)),
393            successful_cancels: Arc::new(AtomicU64::new(0)),
394            failed_cancels: Arc::new(AtomicU64::new(0)),
395            expected_rejects: Arc::new(AtomicU64::new(0)),
396            idempotent_successes: Arc::new(AtomicU64::new(0)),
397        })
398    }
399
400    /// Starts the broadcaster and health check loop.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the broadcaster is already running.
405    pub async fn start(&self) -> anyhow::Result<()> {
406        if self.running.load(Ordering::Relaxed) {
407            return Ok(());
408        }
409
410        self.running.store(true, Ordering::Relaxed);
411
412        // Initial health check for all clients
413        self.run_health_checks().await;
414
415        // Start periodic health check task
416        let transports = Arc::clone(&self.transports);
417        let running = Arc::clone(&self.running);
418        let interval_secs = self.config.health_check_interval_secs;
419        let timeout_secs = self.config.health_check_timeout_secs;
420
421        let task = tokio::spawn(async move {
422            let mut ticker = interval(Duration::from_secs(interval_secs));
423            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
424
425            loop {
426                ticker.tick().await;
427
428                if !running.load(Ordering::Relaxed) {
429                    break;
430                }
431
432                let tasks: Vec<_> = transports
433                    .iter()
434                    .map(|t| t.health_check(timeout_secs))
435                    .collect();
436
437                let results = future::join_all(tasks).await;
438                let healthy_count = results.iter().filter(|&&r| r).count();
439
440                tracing::debug!(
441                    "Health check complete: {}/{} clients healthy",
442                    healthy_count,
443                    results.len()
444                );
445            }
446        });
447
448        *self.health_check_task.write().await = Some(task);
449
450        tracing::info!(
451            "CancelBroadcaster started with {} clients",
452            self.transports.len()
453        );
454
455        Ok(())
456    }
457
458    /// Stops the broadcaster and health check loop.
459    pub async fn stop(&self) {
460        if !self.running.load(Ordering::Relaxed) {
461            return;
462        }
463
464        self.running.store(false, Ordering::Relaxed);
465
466        if let Some(task) = self.health_check_task.write().await.take() {
467            task.abort();
468        }
469
470        tracing::info!("CancelBroadcaster stopped");
471    }
472
473    async fn run_health_checks(&self) {
474        let tasks: Vec<_> = self
475            .transports
476            .iter()
477            .map(|t| t.health_check(self.config.health_check_timeout_secs))
478            .collect();
479
480        let results = future::join_all(tasks).await;
481        let healthy_count = results.iter().filter(|&&r| r).count();
482
483        tracing::debug!(
484            "Health check complete: {}/{} clients healthy",
485            healthy_count,
486            results.len()
487        );
488    }
489
490    fn is_expected_reject(&self, error_message: &str) -> bool {
491        self.config
492            .expected_reject_patterns
493            .iter()
494            .any(|pattern| error_message.contains(pattern))
495    }
496
497    fn is_idempotent_success(&self, error_message: &str) -> bool {
498        self.config
499            .idempotent_success_patterns
500            .iter()
501            .any(|pattern| error_message.contains(pattern))
502    }
503
504    /// Processes cancel request results, handling success, idempotent success, and failures.
505    ///
506    /// This helper consolidates the common error handling loop used across all broadcast methods.
507    async fn process_cancel_results<T>(
508        &self,
509        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
510        idempotent_result: impl FnOnce() -> anyhow::Result<T>,
511        operation: &str,
512        params: String,
513        idempotent_reason: &str,
514    ) -> anyhow::Result<T>
515    where
516        T: Send + 'static,
517    {
518        let mut errors = Vec::new();
519
520        while !handles.is_empty() {
521            let current_handles = std::mem::take(&mut handles);
522            let (result, _idx, remaining) = future::select_all(current_handles).await;
523            handles = remaining.into_iter().collect();
524
525            match result {
526                Ok((client_id, Ok(result))) => {
527                    // First success - abort remaining handles
528                    for handle in &handles {
529                        handle.abort();
530                    }
531                    self.successful_cancels.fetch_add(1, Ordering::Relaxed);
532
533                    tracing::debug!(
534                        "{} broadcast succeeded [{}] {}",
535                        operation,
536                        client_id,
537                        params
538                    );
539
540                    return Ok(result);
541                }
542                Ok((client_id, Err(e))) => {
543                    let error_msg = e.to_string();
544
545                    if self.is_idempotent_success(&error_msg) {
546                        // First idempotent success - abort remaining handles and return success
547                        for handle in &handles {
548                            handle.abort();
549                        }
550                        self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
551
552                        tracing::debug!(
553                            "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
554                        );
555
556                        return idempotent_result();
557                    }
558
559                    if self.is_expected_reject(&error_msg) {
560                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
561                        tracing::debug!(
562                            "Expected {} rejection [{}]: {} {}",
563                            operation.to_lowercase(),
564                            client_id,
565                            error_msg,
566                            params
567                        );
568                        errors.push(error_msg);
569                    } else {
570                        tracing::warn!(
571                            "{} request failed [{}]: {} {}",
572                            operation,
573                            client_id,
574                            error_msg,
575                            params
576                        );
577                        errors.push(error_msg);
578                    }
579                }
580                Err(e) => {
581                    tracing::warn!("{} task join error: {e:?}", operation);
582                    errors.push(format!("Task panicked: {e:?}"));
583                }
584            }
585        }
586
587        // All tasks failed
588        self.failed_cancels.fetch_add(1, Ordering::Relaxed);
589        tracing::error!(
590            "All {} requests failed: {errors:?} {params}",
591            operation.to_lowercase(),
592        );
593        Err(anyhow::anyhow!(
594            "All {} requests failed: {errors:?}",
595            operation.to_lowercase(),
596        ))
597    }
598
599    /// Broadcasts a single cancel request to all healthy clients in parallel.
600    ///
601    /// # Returns
602    ///
603    /// - `Ok(Some(report))` if successfully cancelled with a report.
604    /// - `Ok(None)` if the order was already cancelled (idempotent success).
605    /// - `Err` if all requests failed.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if all cancel requests fail or no healthy clients are available.
610    pub async fn broadcast_cancel(
611        &self,
612        instrument_id: InstrumentId,
613        client_order_id: Option<ClientOrderId>,
614        venue_order_id: Option<VenueOrderId>,
615    ) -> anyhow::Result<Option<OrderStatusReport>> {
616        self.total_cancels.fetch_add(1, Ordering::Relaxed);
617
618        let healthy_transports: Vec<TransportClient> = self
619            .transports
620            .iter()
621            .filter(|t| t.is_healthy())
622            .cloned()
623            .collect();
624
625        if healthy_transports.is_empty() {
626            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
627            anyhow::bail!("No healthy transport clients available");
628        }
629
630        let mut handles = Vec::new();
631        for transport in healthy_transports {
632            let handle = tokio::spawn(async move {
633                let client_id = transport.client_id.clone();
634                let result = transport
635                    .cancel_order(instrument_id, client_order_id, venue_order_id)
636                    .await
637                    .map(Some); // Wrap success in Some for Option<OrderStatusReport>
638                (client_id, result)
639            });
640            handles.push(handle);
641        }
642
643        self.process_cancel_results(
644            handles,
645            || Ok(None),
646            "Cancel",
647            format!(
648                "(client_order_id={:?}, venue_order_id={:?})",
649                client_order_id, venue_order_id
650            ),
651            "order already cancelled/not found",
652        )
653        .await
654    }
655
656    /// Broadcasts a batch cancel request to all healthy clients in parallel.
657    ///
658    /// # Errors
659    ///
660    /// Returns an error if all cancel requests fail or no healthy clients are available.
661    pub async fn broadcast_batch_cancel(
662        &self,
663        instrument_id: InstrumentId,
664        client_order_ids: Option<Vec<ClientOrderId>>,
665        venue_order_ids: Option<Vec<VenueOrderId>>,
666    ) -> anyhow::Result<Vec<OrderStatusReport>> {
667        self.total_cancels.fetch_add(1, Ordering::Relaxed);
668
669        let healthy_transports: Vec<TransportClient> = self
670            .transports
671            .iter()
672            .filter(|t| t.is_healthy())
673            .cloned()
674            .collect();
675
676        if healthy_transports.is_empty() {
677            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
678            anyhow::bail!("No healthy transport clients available");
679        }
680
681        let mut handles = Vec::new();
682
683        for transport in healthy_transports {
684            let client_order_ids_clone = client_order_ids.clone();
685            let venue_order_ids_clone = venue_order_ids.clone();
686            let handle = tokio::spawn(async move {
687                let client_id = transport.client_id.clone();
688                let result = transport
689                    .executor
690                    .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
691                    .await;
692                (client_id, result)
693            });
694            handles.push(handle);
695        }
696
697        self.process_cancel_results(
698            handles,
699            || Ok(Vec::new()),
700            "Batch cancel",
701            format!(
702                "(client_order_ids={:?}, venue_order_ids={:?})",
703                client_order_ids, venue_order_ids
704            ),
705            "orders already cancelled/not found",
706        )
707        .await
708    }
709
710    /// Broadcasts a cancel all request to all healthy clients in parallel.
711    ///
712    /// # Errors
713    ///
714    /// Returns an error if all cancel requests fail or no healthy clients are available.
715    pub async fn broadcast_cancel_all(
716        &self,
717        instrument_id: InstrumentId,
718        order_side: Option<OrderSide>,
719    ) -> anyhow::Result<Vec<OrderStatusReport>> {
720        self.total_cancels.fetch_add(1, Ordering::Relaxed);
721
722        let healthy_transports: Vec<TransportClient> = self
723            .transports
724            .iter()
725            .filter(|t| t.is_healthy())
726            .cloned()
727            .collect();
728
729        if healthy_transports.is_empty() {
730            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
731            anyhow::bail!("No healthy transport clients available");
732        }
733
734        let mut handles = Vec::new();
735        for transport in healthy_transports {
736            let handle = tokio::spawn(async move {
737                let client_id = transport.client_id.clone();
738                let result = transport
739                    .executor
740                    .cancel_all_orders(instrument_id, order_side)
741                    .await;
742                (client_id, result)
743            });
744            handles.push(handle);
745        }
746
747        self.process_cancel_results(
748            handles,
749            || Ok(Vec::new()),
750            "Cancel all",
751            format!(
752                "(instrument_id={}, order_side={:?})",
753                instrument_id, order_side
754            ),
755            "no orders to cancel",
756        )
757        .await
758    }
759
760    /// Gets broadcaster metrics.
761    pub fn get_metrics(&self) -> BroadcasterMetrics {
762        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
763        let total_clients = self.transports.len();
764
765        BroadcasterMetrics {
766            total_cancels: self.total_cancels.load(Ordering::Relaxed),
767            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
768            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
769            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
770            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
771            healthy_clients,
772            total_clients,
773        }
774    }
775
776    /// Gets broadcaster metrics (async version for use within async context).
777    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
778        self.get_metrics()
779    }
780
781    /// Gets per-client statistics.
782    pub fn get_client_stats(&self) -> Vec<ClientStats> {
783        self.transports
784            .iter()
785            .map(|t| ClientStats {
786                client_id: t.client_id.clone(),
787                healthy: t.is_healthy(),
788                cancel_count: t.get_cancel_count(),
789                error_count: t.get_error_count(),
790            })
791            .collect()
792    }
793
794    /// Gets per-client statistics (async version for use within async context).
795    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
796        self.get_client_stats()
797    }
798
799    /// Caches an instrument in all HTTP clients in the pool.
800    pub fn cache_instrument(&self, instrument: InstrumentAny) {
801        for transport in self.transports.iter() {
802            transport.executor.add_instrument(instrument.clone());
803        }
804    }
805
806    pub fn clone_for_async(&self) -> Self {
807        Self {
808            config: self.config.clone(),
809            transports: Arc::clone(&self.transports),
810            health_check_task: Arc::clone(&self.health_check_task),
811            running: Arc::clone(&self.running),
812            total_cancels: Arc::clone(&self.total_cancels),
813            successful_cancels: Arc::clone(&self.successful_cancels),
814            failed_cancels: Arc::clone(&self.failed_cancels),
815            expected_rejects: Arc::clone(&self.expected_rejects),
816            idempotent_successes: Arc::clone(&self.idempotent_successes),
817        }
818    }
819
820    #[cfg(test)]
821    fn new_with_transports(
822        config: CancelBroadcasterConfig,
823        transports: Vec<TransportClient>,
824    ) -> Self {
825        Self {
826            config,
827            transports: Arc::new(transports),
828            health_check_task: Arc::new(RwLock::new(None)),
829            running: Arc::new(AtomicBool::new(false)),
830            total_cancels: Arc::new(AtomicU64::new(0)),
831            successful_cancels: Arc::new(AtomicU64::new(0)),
832            failed_cancels: Arc::new(AtomicU64::new(0)),
833            expected_rejects: Arc::new(AtomicU64::new(0)),
834            idempotent_successes: Arc::new(AtomicU64::new(0)),
835        }
836    }
837}
838
839/// Broadcaster metrics snapshot.
840#[derive(Debug, Clone)]
841pub struct BroadcasterMetrics {
842    pub total_cancels: u64,
843    pub successful_cancels: u64,
844    pub failed_cancels: u64,
845    pub expected_rejects: u64,
846    pub idempotent_successes: u64,
847    pub healthy_clients: usize,
848    pub total_clients: usize,
849}
850
851/// Per-client statistics.
852#[derive(Debug, Clone)]
853pub struct ClientStats {
854    pub client_id: String,
855    pub healthy: bool,
856    pub cancel_count: u64,
857    pub error_count: u64,
858}
859
860////////////////////////////////////////////////////////////////////////////////
861// Tests
862////////////////////////////////////////////////////////////////////////////////
863
864#[cfg(test)]
865mod tests {
866    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
867
868    use nautilus_core::UUID4;
869    use nautilus_model::{
870        enums::{
871            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
872        },
873        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
874        reports::OrderStatusReport,
875        types::{Price, Quantity},
876    };
877
878    use super::*;
879
880    /// Mock executor for testing.
881    #[derive(Clone)]
882    #[allow(clippy::type_complexity)]
883    struct MockExecutor {
884        handler: Arc<
885            dyn Fn(
886                    InstrumentId,
887                    Option<ClientOrderId>,
888                    Option<VenueOrderId>,
889                )
890                    -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
891                + Send
892                + Sync,
893        >,
894    }
895
896    impl MockExecutor {
897        fn new<F, Fut>(handler: F) -> Self
898        where
899            F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
900                + Send
901                + Sync
902                + 'static,
903            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
904        {
905            Self {
906                handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
907            }
908        }
909    }
910
911    impl CancelExecutor for MockExecutor {
912        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
913            Box::pin(async { Ok(()) })
914        }
915
916        fn cancel_order(
917            &self,
918            instrument_id: InstrumentId,
919            client_order_id: Option<ClientOrderId>,
920            venue_order_id: Option<VenueOrderId>,
921        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
922            (self.handler)(instrument_id, client_order_id, venue_order_id)
923        }
924
925        fn cancel_orders(
926            &self,
927            _instrument_id: InstrumentId,
928            _client_order_ids: Option<Vec<ClientOrderId>>,
929            _venue_order_ids: Option<Vec<VenueOrderId>>,
930        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
931        {
932            Box::pin(async { Ok(Vec::new()) })
933        }
934
935        fn cancel_all_orders(
936            &self,
937            instrument_id: InstrumentId,
938            _order_side: Option<OrderSide>,
939        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
940        {
941            // Try to get result from the single-order handler to propagate errors
942            let handler = Arc::clone(&self.handler);
943            Box::pin(async move {
944                // Call the handler to check if it would fail
945                let result = handler(instrument_id, None, None).await;
946                match result {
947                    Ok(_) => Ok(Vec::new()),
948                    Err(e) => Err(e),
949                }
950            })
951        }
952
953        fn add_instrument(&self, _instrument: InstrumentAny) {
954            // No-op for mock
955        }
956    }
957
958    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
959        OrderStatusReport {
960            account_id: AccountId::from("BITMEX-001"),
961            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
962            venue_order_id: VenueOrderId::from(venue_order_id),
963            order_side: OrderSide::Buy,
964            order_type: OrderType::Limit,
965            time_in_force: TimeInForce::Gtc,
966            order_status: OrderStatus::Canceled,
967            price: Some(Price::new(50000.0, 2)),
968            quantity: Quantity::new(100.0, 0),
969            filled_qty: Quantity::new(0.0, 0),
970            report_id: UUID4::new(),
971            ts_accepted: 0.into(),
972            ts_last: 0.into(),
973            ts_init: 0.into(),
974            client_order_id: None,
975            avg_px: None,
976            trigger_price: None,
977            trigger_type: None,
978            contingency_type: ContingencyType::NoContingency,
979            expire_time: None,
980            order_list_id: None,
981            venue_position_id: None,
982            linked_order_ids: None,
983            parent_order_id: None,
984            display_qty: None,
985            limit_offset: None,
986            trailing_offset: None,
987            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
988            post_only: false,
989            reduce_only: false,
990            cancel_reason: None,
991            ts_triggered: None,
992        }
993    }
994
995    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
996    where
997        F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
998            + Send
999            + Sync
1000            + 'static,
1001        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
1002    {
1003        let executor = MockExecutor::new(handler);
1004        TransportClient::new(executor, client_id.to_string())
1005    }
1006
1007    #[tokio::test]
1008    async fn test_broadcast_cancel_immediate_success() {
1009        let report = create_test_report("ORDER-1");
1010        let report_clone = report.clone();
1011
1012        let transports = vec![
1013            create_stub_transport("client-0", move |_, _, _| {
1014                let report = report_clone.clone();
1015                async move { Ok(report) }
1016            }),
1017            create_stub_transport("client-1", |_, _, _| async {
1018                tokio::time::sleep(Duration::from_secs(10)).await;
1019                anyhow::bail!("Should be aborted")
1020            }),
1021        ];
1022
1023        let config = CancelBroadcasterConfig::default();
1024        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1025
1026        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1027        let result = broadcaster
1028            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1029            .await;
1030
1031        assert!(result.is_ok());
1032        let returned_report = result.unwrap();
1033        assert!(returned_report.is_some());
1034        assert_eq!(
1035            returned_report.unwrap().venue_order_id,
1036            report.venue_order_id
1037        );
1038
1039        let metrics = broadcaster.get_metrics_async().await;
1040        assert_eq!(metrics.successful_cancels, 1);
1041        assert_eq!(metrics.failed_cancels, 0);
1042        assert_eq!(metrics.total_cancels, 1);
1043    }
1044
1045    #[tokio::test]
1046    async fn test_broadcast_cancel_idempotent_success() {
1047        let transports = vec![
1048            create_stub_transport("client-0", |_, _, _| async {
1049                anyhow::bail!("AlreadyCanceled")
1050            }),
1051            create_stub_transport("client-1", |_, _, _| async {
1052                tokio::time::sleep(Duration::from_secs(10)).await;
1053                anyhow::bail!("Should be aborted")
1054            }),
1055        ];
1056
1057        let config = CancelBroadcasterConfig::default();
1058        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1059
1060        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1061        let result = broadcaster
1062            .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1063            .await;
1064
1065        assert!(result.is_ok());
1066        assert!(result.unwrap().is_none());
1067
1068        let metrics = broadcaster.get_metrics_async().await;
1069        assert_eq!(metrics.idempotent_successes, 1);
1070        assert_eq!(metrics.successful_cancels, 0);
1071        assert_eq!(metrics.failed_cancels, 0);
1072    }
1073
1074    #[tokio::test]
1075    async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1076        let transports = vec![
1077            create_stub_transport("client-0", |_, _, _| async {
1078                anyhow::bail!("502 Bad Gateway")
1079            }),
1080            create_stub_transport("client-1", |_, _, _| async {
1081                anyhow::bail!("orderID not found")
1082            }),
1083        ];
1084
1085        let config = CancelBroadcasterConfig::default();
1086        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1087
1088        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1089        let result = broadcaster
1090            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1091            .await;
1092
1093        assert!(result.is_ok());
1094        assert!(result.unwrap().is_none());
1095
1096        let metrics = broadcaster.get_metrics_async().await;
1097        assert_eq!(metrics.idempotent_successes, 1);
1098        assert_eq!(metrics.failed_cancels, 0);
1099    }
1100
1101    #[tokio::test]
1102    async fn test_broadcast_cancel_all_failures() {
1103        let transports = vec![
1104            create_stub_transport("client-0", |_, _, _| async {
1105                anyhow::bail!("502 Bad Gateway")
1106            }),
1107            create_stub_transport("client-1", |_, _, _| async {
1108                anyhow::bail!("Connection refused")
1109            }),
1110        ];
1111
1112        let config = CancelBroadcasterConfig::default();
1113        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1114
1115        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1116        let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1117
1118        assert!(result.is_err());
1119        assert!(
1120            result
1121                .unwrap_err()
1122                .to_string()
1123                .contains("All cancel all requests failed")
1124        );
1125
1126        let metrics = broadcaster.get_metrics_async().await;
1127        assert_eq!(metrics.failed_cancels, 1);
1128        assert_eq!(metrics.successful_cancels, 0);
1129        assert_eq!(metrics.idempotent_successes, 0);
1130    }
1131
1132    #[tokio::test]
1133    async fn test_broadcast_cancel_no_healthy_clients() {
1134        let transport = create_stub_transport("client-0", |_, _, _| async {
1135            Ok(create_test_report("ORDER-1"))
1136        });
1137        transport.healthy.store(false, Ordering::Relaxed);
1138
1139        let config = CancelBroadcasterConfig::default();
1140        let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1141
1142        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1143        let result = broadcaster
1144            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1145            .await;
1146
1147        assert!(result.is_err());
1148        assert!(
1149            result
1150                .unwrap_err()
1151                .to_string()
1152                .contains("No healthy transport clients available")
1153        );
1154
1155        let metrics = broadcaster.get_metrics_async().await;
1156        assert_eq!(metrics.failed_cancels, 1);
1157    }
1158
1159    #[tokio::test]
1160    async fn test_broadcast_cancel_metrics_increment() {
1161        let report1 = create_test_report("ORDER-1");
1162        let report1_clone = report1.clone();
1163        let report2 = create_test_report("ORDER-2");
1164        let report2_clone = report2.clone();
1165
1166        let transports = vec![
1167            create_stub_transport("client-0", move |_, _, _| {
1168                let report = report1_clone.clone();
1169                async move { Ok(report) }
1170            }),
1171            create_stub_transport("client-1", move |_, _, _| {
1172                let report = report2_clone.clone();
1173                async move { Ok(report) }
1174            }),
1175        ];
1176
1177        let config = CancelBroadcasterConfig::default();
1178        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1179
1180        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1181
1182        let _ = broadcaster
1183            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1184            .await;
1185
1186        let _ = broadcaster
1187            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1188            .await;
1189
1190        let metrics = broadcaster.get_metrics_async().await;
1191        assert_eq!(metrics.total_cancels, 2);
1192        assert_eq!(metrics.successful_cancels, 2);
1193    }
1194
1195    #[tokio::test]
1196    async fn test_broadcast_cancel_expected_reject_pattern() {
1197        let transports = vec![
1198            create_stub_transport("client-0", |_, _, _| async {
1199                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1200            }),
1201            create_stub_transport("client-1", |_, _, _| async {
1202                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1203            }),
1204        ];
1205
1206        let config = CancelBroadcasterConfig::default();
1207        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1208
1209        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1210        let result = broadcaster
1211            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1212            .await;
1213
1214        assert!(result.is_err());
1215
1216        let metrics = broadcaster.get_metrics_async().await;
1217        assert_eq!(metrics.expected_rejects, 2);
1218        assert_eq!(metrics.failed_cancels, 1);
1219    }
1220
1221    #[tokio::test]
1222    async fn test_broadcaster_creation_with_pool() {
1223        let config = CancelBroadcasterConfig {
1224            pool_size: 3,
1225            api_key: Some("test_key".to_string()),
1226            api_secret: Some("test_secret".to_string()),
1227            base_url: Some("https://test.example.com".to_string()),
1228            testnet: false,
1229            timeout_secs: Some(5),
1230            max_retries: Some(2),
1231            retry_delay_ms: Some(100),
1232            retry_delay_max_ms: Some(1000),
1233            recv_window_ms: Some(5000),
1234            max_requests_per_second: Some(10),
1235            max_requests_per_minute: Some(100),
1236            health_check_interval_secs: 30,
1237            health_check_timeout_secs: 5,
1238            expected_reject_patterns: vec!["test_pattern".to_string()],
1239            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1240            proxy_urls: vec![],
1241        };
1242
1243        let broadcaster = CancelBroadcaster::new(config.clone());
1244        assert!(broadcaster.is_ok());
1245
1246        let broadcaster = broadcaster.unwrap();
1247        let metrics = broadcaster.get_metrics_async().await;
1248
1249        assert_eq!(metrics.total_clients, 3);
1250        assert_eq!(metrics.total_cancels, 0);
1251        assert_eq!(metrics.successful_cancels, 0);
1252        assert_eq!(metrics.failed_cancels, 0);
1253    }
1254
1255    #[tokio::test]
1256    async fn test_broadcaster_lifecycle() {
1257        let config = CancelBroadcasterConfig {
1258            pool_size: 2,
1259            api_key: Some("test_key".to_string()),
1260            api_secret: Some("test_secret".to_string()),
1261            base_url: Some("https://test.example.com".to_string()),
1262            testnet: false,
1263            timeout_secs: Some(5),
1264            max_retries: None,
1265            retry_delay_ms: None,
1266            retry_delay_max_ms: None,
1267            recv_window_ms: None,
1268            max_requests_per_second: None,
1269            max_requests_per_minute: None,
1270            health_check_interval_secs: 60, // Long interval so it won't tick during test
1271            health_check_timeout_secs: 1,
1272            expected_reject_patterns: vec![],
1273            idempotent_success_patterns: vec![],
1274            proxy_urls: vec![],
1275        };
1276
1277        let broadcaster = CancelBroadcaster::new(config).unwrap();
1278
1279        // Should not be running initially
1280        assert!(!broadcaster.running.load(Ordering::Relaxed));
1281
1282        // Start broadcaster
1283        let start_result = broadcaster.start().await;
1284        assert!(start_result.is_ok());
1285        assert!(broadcaster.running.load(Ordering::Relaxed));
1286
1287        // Starting again should be idempotent
1288        let start_again = broadcaster.start().await;
1289        assert!(start_again.is_ok());
1290
1291        // Stop broadcaster
1292        broadcaster.stop().await;
1293        assert!(!broadcaster.running.load(Ordering::Relaxed));
1294
1295        // Stopping again should be safe
1296        broadcaster.stop().await;
1297        assert!(!broadcaster.running.load(Ordering::Relaxed));
1298    }
1299
1300    #[tokio::test]
1301    async fn test_client_stats_collection() {
1302        let config = CancelBroadcasterConfig {
1303            pool_size: 2,
1304            api_key: Some("test_key".to_string()),
1305            api_secret: Some("test_secret".to_string()),
1306            base_url: Some("https://test.example.com".to_string()),
1307            testnet: false,
1308            timeout_secs: Some(5),
1309            max_retries: None,
1310            retry_delay_ms: None,
1311            retry_delay_max_ms: None,
1312            recv_window_ms: None,
1313            max_requests_per_second: None,
1314            max_requests_per_minute: None,
1315            health_check_interval_secs: 60,
1316            health_check_timeout_secs: 5,
1317            expected_reject_patterns: vec![],
1318            idempotent_success_patterns: vec![],
1319            proxy_urls: vec![],
1320        };
1321
1322        let broadcaster = CancelBroadcaster::new(config).unwrap();
1323        let stats = broadcaster.get_client_stats_async().await;
1324
1325        assert_eq!(stats.len(), 2);
1326        assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1327        assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1328        assert!(stats[0].healthy); // Should be healthy initially
1329        assert!(stats[1].healthy);
1330        assert_eq!(stats[0].cancel_count, 0);
1331        assert_eq!(stats[1].cancel_count, 0);
1332        assert_eq!(stats[0].error_count, 0);
1333        assert_eq!(stats[1].error_count, 0);
1334    }
1335
1336    #[tokio::test]
1337    async fn test_testnet_config_sets_base_url() {
1338        let config = CancelBroadcasterConfig {
1339            pool_size: 1,
1340            api_key: Some("test_key".to_string()),
1341            api_secret: Some("test_secret".to_string()),
1342            base_url: None, // Not specified
1343            testnet: true,  // But testnet is true
1344            timeout_secs: Some(5),
1345            max_retries: None,
1346            retry_delay_ms: None,
1347            retry_delay_max_ms: None,
1348            recv_window_ms: None,
1349            max_requests_per_second: None,
1350            max_requests_per_minute: None,
1351            health_check_interval_secs: 60,
1352            health_check_timeout_secs: 5,
1353            expected_reject_patterns: vec![],
1354            idempotent_success_patterns: vec![],
1355            proxy_urls: vec![],
1356        };
1357
1358        let broadcaster = CancelBroadcaster::new(config);
1359        assert!(broadcaster.is_ok());
1360    }
1361
1362    #[tokio::test]
1363    async fn test_default_config() {
1364        let config = CancelBroadcasterConfig {
1365            api_key: Some("test_key".to_string()),
1366            api_secret: Some("test_secret".to_string()),
1367            base_url: Some("https://test.example.com".to_string()),
1368            ..Default::default()
1369        };
1370
1371        let broadcaster = CancelBroadcaster::new(config);
1372        assert!(broadcaster.is_ok());
1373
1374        let broadcaster = broadcaster.unwrap();
1375        let metrics = broadcaster.get_metrics_async().await;
1376
1377        // Default pool_size is 2
1378        assert_eq!(metrics.total_clients, 2);
1379    }
1380
1381    #[tokio::test]
1382    async fn test_clone_for_async() {
1383        let config = CancelBroadcasterConfig {
1384            pool_size: 1,
1385            api_key: Some("test_key".to_string()),
1386            api_secret: Some("test_secret".to_string()),
1387            base_url: Some("https://test.example.com".to_string()),
1388            testnet: false,
1389            timeout_secs: Some(5),
1390            max_retries: None,
1391            retry_delay_ms: None,
1392            retry_delay_max_ms: None,
1393            recv_window_ms: None,
1394            max_requests_per_second: None,
1395            max_requests_per_minute: None,
1396            health_check_interval_secs: 60,
1397            health_check_timeout_secs: 5,
1398            expected_reject_patterns: vec![],
1399            idempotent_success_patterns: vec![],
1400            proxy_urls: vec![],
1401        };
1402
1403        let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1404
1405        // Increment a metric on original
1406        broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1407
1408        // Clone should share the same atomic
1409        let broadcaster2 = broadcaster1.clone_for_async();
1410        let metrics2 = broadcaster2.get_metrics_async().await;
1411
1412        assert_eq!(metrics2.total_cancels, 1); // Should see the increment
1413
1414        // Modify through clone
1415        broadcaster2
1416            .successful_cancels
1417            .fetch_add(5, Ordering::Relaxed);
1418
1419        // Original should see the change
1420        let metrics1 = broadcaster1.get_metrics_async().await;
1421        assert_eq!(metrics1.successful_cancels, 5);
1422    }
1423
1424    #[tokio::test]
1425    async fn test_pattern_matching() {
1426        // Test that pattern matching works for expected rejects and idempotent successes
1427        let config = CancelBroadcasterConfig {
1428            pool_size: 1,
1429            api_key: Some("test_key".to_string()),
1430            api_secret: Some("test_secret".to_string()),
1431            base_url: Some("https://test.example.com".to_string()),
1432            testnet: false,
1433            timeout_secs: Some(5),
1434            max_retries: None,
1435            retry_delay_ms: None,
1436            retry_delay_max_ms: None,
1437            recv_window_ms: None,
1438            max_requests_per_second: None,
1439            max_requests_per_minute: None,
1440            health_check_interval_secs: 60,
1441            health_check_timeout_secs: 5,
1442            expected_reject_patterns: vec![
1443                "ParticipateDoNotInitiate".to_string(),
1444                "Close-only".to_string(),
1445            ],
1446            idempotent_success_patterns: vec![
1447                "AlreadyCanceled".to_string(),
1448                "orderID not found".to_string(),
1449                "Unable to cancel".to_string(),
1450            ],
1451            proxy_urls: vec![],
1452        };
1453
1454        let broadcaster = CancelBroadcaster::new(config).unwrap();
1455
1456        // Test expected reject patterns
1457        assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1458        assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1459        assert!(!broadcaster.is_expected_reject("Connection timeout"));
1460
1461        // Test idempotent success patterns
1462        assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1463        assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1464        assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1465        assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1466    }
1467
1468    // Happy-path coverage for broadcast_batch_cancel and broadcast_cancel_all
1469    // Note: These use simplified stubs since batch/cancel-all bypass test_handler
1470    // Full HTTP mocking tested in integration tests
1471    #[tokio::test]
1472    async fn test_broadcast_batch_cancel_structure() {
1473        // Validates broadcaster structure and metric initialization
1474        let config = CancelBroadcasterConfig {
1475            pool_size: 2,
1476            api_key: Some("test_key".to_string()),
1477            api_secret: Some("test_secret".to_string()),
1478            base_url: Some("https://test.example.com".to_string()),
1479            testnet: false,
1480            timeout_secs: Some(5),
1481            max_retries: None,
1482            retry_delay_ms: None,
1483            retry_delay_max_ms: None,
1484            recv_window_ms: None,
1485            max_requests_per_second: None,
1486            max_requests_per_minute: None,
1487            health_check_interval_secs: 60,
1488            health_check_timeout_secs: 5,
1489            expected_reject_patterns: vec![],
1490            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1491            proxy_urls: vec![],
1492        };
1493
1494        let broadcaster = CancelBroadcaster::new(config).unwrap();
1495        let metrics = broadcaster.get_metrics_async().await;
1496
1497        // Verify initial state
1498        assert_eq!(metrics.total_clients, 2);
1499        assert_eq!(metrics.total_cancels, 0);
1500        assert_eq!(metrics.successful_cancels, 0);
1501        assert_eq!(metrics.failed_cancels, 0);
1502    }
1503
1504    #[tokio::test]
1505    async fn test_broadcast_cancel_all_structure() {
1506        // Validates broadcaster structure for cancel_all operations
1507        let config = CancelBroadcasterConfig {
1508            pool_size: 3,
1509            api_key: Some("test_key".to_string()),
1510            api_secret: Some("test_secret".to_string()),
1511            base_url: Some("https://test.example.com".to_string()),
1512            testnet: false,
1513            timeout_secs: Some(5),
1514            max_retries: None,
1515            retry_delay_ms: None,
1516            retry_delay_max_ms: None,
1517            recv_window_ms: None,
1518            max_requests_per_second: None,
1519            max_requests_per_minute: None,
1520            health_check_interval_secs: 60,
1521            health_check_timeout_secs: 5,
1522            expected_reject_patterns: vec![],
1523            idempotent_success_patterns: vec!["orderID not found".to_string()],
1524            proxy_urls: vec![],
1525        };
1526
1527        let broadcaster = CancelBroadcaster::new(config).unwrap();
1528        let metrics = broadcaster.get_metrics_async().await;
1529
1530        // Verify pool size and initial metrics
1531        assert_eq!(metrics.total_clients, 3);
1532        assert_eq!(metrics.healthy_clients, 3);
1533        assert_eq!(metrics.total_cancels, 0);
1534    }
1535
1536    // Metric health tests - validates that idempotent successes don't increment failed_cancels
1537    #[tokio::test]
1538    async fn test_single_cancel_metrics_with_mixed_responses() {
1539        // Test similar to test_broadcast_cancel_mixed_idempotent_and_failure
1540        // but explicitly validates metric health
1541        let transports = vec![
1542            create_stub_transport("client-0", |_, _, _| async {
1543                anyhow::bail!("Connection timeout")
1544            }),
1545            create_stub_transport("client-1", |_, _, _| async {
1546                anyhow::bail!("AlreadyCanceled")
1547            }),
1548        ];
1549
1550        let config = CancelBroadcasterConfig::default();
1551        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1552
1553        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1554        let result = broadcaster
1555            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1556            .await;
1557
1558        // Should succeed with idempotent
1559        assert!(result.is_ok());
1560        assert!(result.unwrap().is_none());
1561
1562        // Verify metrics: idempotent success doesn't count as failure
1563        let metrics = broadcaster.get_metrics_async().await;
1564        assert_eq!(
1565            metrics.failed_cancels, 0,
1566            "Idempotent success should not increment failed_cancels"
1567        );
1568        assert_eq!(metrics.idempotent_successes, 1);
1569        assert_eq!(metrics.successful_cancels, 0);
1570    }
1571
1572    #[tokio::test]
1573    async fn test_metrics_initialization_and_health() {
1574        // Validates that metrics start at zero and clients start healthy
1575        let config = CancelBroadcasterConfig {
1576            pool_size: 4,
1577            api_key: Some("test_key".to_string()),
1578            api_secret: Some("test_secret".to_string()),
1579            base_url: Some("https://test.example.com".to_string()),
1580            testnet: false,
1581            timeout_secs: Some(5),
1582            max_retries: None,
1583            retry_delay_ms: None,
1584            retry_delay_max_ms: None,
1585            recv_window_ms: None,
1586            max_requests_per_second: None,
1587            max_requests_per_minute: None,
1588            health_check_interval_secs: 60,
1589            health_check_timeout_secs: 5,
1590            expected_reject_patterns: vec![],
1591            idempotent_success_patterns: vec![],
1592            proxy_urls: vec![],
1593        };
1594
1595        let broadcaster = CancelBroadcaster::new(config).unwrap();
1596        let metrics = broadcaster.get_metrics_async().await;
1597
1598        // All metrics should start at zero
1599        assert_eq!(metrics.total_cancels, 0);
1600        assert_eq!(metrics.successful_cancels, 0);
1601        assert_eq!(metrics.failed_cancels, 0);
1602        assert_eq!(metrics.expected_rejects, 0);
1603        assert_eq!(metrics.idempotent_successes, 0);
1604
1605        // All clients should start healthy
1606        assert_eq!(metrics.healthy_clients, 4);
1607        assert_eq!(metrics.total_clients, 4);
1608    }
1609
1610    // Health-check task lifecycle test
1611    #[tokio::test]
1612    async fn test_health_check_task_lifecycle() {
1613        let config = CancelBroadcasterConfig {
1614            pool_size: 1,
1615            api_key: Some("test_key".to_string()),
1616            api_secret: Some("test_secret".to_string()),
1617            base_url: Some("https://test.example.com".to_string()),
1618            testnet: false,
1619            timeout_secs: Some(5),
1620            max_retries: None,
1621            retry_delay_ms: None,
1622            retry_delay_max_ms: None,
1623            recv_window_ms: None,
1624            max_requests_per_second: None,
1625            max_requests_per_minute: None,
1626            health_check_interval_secs: 1, // Very short interval
1627            health_check_timeout_secs: 1,
1628            expected_reject_patterns: vec![],
1629            idempotent_success_patterns: vec![],
1630            proxy_urls: vec![],
1631        };
1632
1633        let broadcaster = CancelBroadcaster::new(config).unwrap();
1634
1635        // Start the broadcaster
1636        broadcaster.start().await.unwrap();
1637        assert!(broadcaster.running.load(Ordering::Relaxed));
1638
1639        // Verify task handle exists
1640        {
1641            let task_guard = broadcaster.health_check_task.read().await;
1642            assert!(task_guard.is_some());
1643        }
1644
1645        // Wait a bit for health check to potentially run
1646        tokio::time::sleep(Duration::from_millis(100)).await;
1647
1648        // Stop the broadcaster
1649        broadcaster.stop().await;
1650        assert!(!broadcaster.running.load(Ordering::Relaxed));
1651
1652        // Verify task handle has been cleared
1653        {
1654            let task_guard = broadcaster.health_check_task.read().await;
1655            assert!(task_guard.is_none());
1656        }
1657    }
1658}