nautilus_bitmex/execution/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Cancel request broadcaster for redundant order cancellation.
17//!
18//! This module provides the [`CancelBroadcaster`] which fans out cancel requests
19//! to multiple HTTP clients in parallel for redundancy. Key design patterns:
20//!
21//! - **Dependency injection via traits**: Uses `CancelExecutor` trait to abstract
22//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
23//! - **Trait objects over generics**: Uses `Arc<dyn CancelExecutor>` to avoid
24//!   generic type parameters on the public API (simpler Python FFI).
25//! - **Short-circuit on first success**: Aborts remaining requests once any client
26//!   succeeds, minimizing latency.
27//! - **Idempotent success handling**: Recognizes "already cancelled" responses as
28//!   successful outcomes.
29
30// TODO: Replace boxed futures in `CancelExecutor` once stable async trait object support
31// lands so we can drop the per-call heap allocation
32
33use std::{
34    fmt::Debug,
35    future::Future,
36    pin::Pin,
37    sync::{
38        Arc,
39        atomic::{AtomicBool, AtomicU64, Ordering},
40    },
41    time::Duration,
42};
43
44use futures_util::future;
45use nautilus_common::live::get_runtime;
46use nautilus_model::{
47    enums::OrderSide,
48    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
49    instruments::InstrumentAny,
50    reports::OrderStatusReport,
51};
52use tokio::{sync::RwLock, task::JoinHandle, time::interval};
53
54use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
55
56/// Trait for order cancellation operations.
57///
58/// This trait abstracts the execution layer to enable dependency injection and testing
59/// without conditional compilation. The broadcaster holds executors as `Arc<dyn CancelExecutor>`
60/// to avoid generic type parameters that would complicate the Python FFI boundary.
61///
62/// # Thread Safety
63///
64/// All methods must be safe to call concurrently from multiple threads. Implementations
65/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
66///
67/// # Error Handling
68///
69/// Methods return `anyhow::Result` for flexibility. Implementers should provide
70/// meaningful error messages that can be logged and tracked by the broadcaster.
71///
72/// # Implementation Note
73///
74/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
75/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
76/// `Clone`) to be used without modification.
77trait CancelExecutor: Send + Sync {
78    /// Adds an instrument for caching.
79    fn add_instrument(&self, instrument: InstrumentAny);
80
81    /// Performs a health check on the executor.
82    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
83
84    /// Cancels a single order.
85    fn cancel_order(
86        &self,
87        instrument_id: InstrumentId,
88        client_order_id: Option<ClientOrderId>,
89        venue_order_id: Option<VenueOrderId>,
90    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
91
92    /// Cancels multiple orders.
93    fn cancel_orders(
94        &self,
95        instrument_id: InstrumentId,
96        client_order_ids: Option<Vec<ClientOrderId>>,
97        venue_order_ids: Option<Vec<VenueOrderId>>,
98    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
99
100    /// Cancels all orders for an instrument.
101    fn cancel_all_orders(
102        &self,
103        instrument_id: InstrumentId,
104        order_side: Option<OrderSide>,
105    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
106}
107
108impl CancelExecutor for BitmexHttpClient {
109    fn add_instrument(&self, instrument: InstrumentAny) {
110        Self::cache_instrument(self, instrument);
111    }
112
113    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
114        Box::pin(async move {
115            Self::get_server_time(self)
116                .await
117                .map(|_| ())
118                .map_err(|e| anyhow::anyhow!("{e}"))
119        })
120    }
121
122    fn cancel_order(
123        &self,
124        instrument_id: InstrumentId,
125        client_order_id: Option<ClientOrderId>,
126        venue_order_id: Option<VenueOrderId>,
127    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
128        Box::pin(async move {
129            Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
130        })
131    }
132
133    fn cancel_orders(
134        &self,
135        instrument_id: InstrumentId,
136        client_order_ids: Option<Vec<ClientOrderId>>,
137        venue_order_ids: Option<Vec<VenueOrderId>>,
138    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
139        Box::pin(async move {
140            Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
141        })
142    }
143
144    fn cancel_all_orders(
145        &self,
146        instrument_id: InstrumentId,
147        order_side: Option<OrderSide>,
148    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
149        Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
150    }
151}
152
153/// Configuration for the cancel broadcaster.
154#[derive(Debug, Clone)]
155pub struct CancelBroadcasterConfig {
156    /// Number of HTTP clients in the pool.
157    pub pool_size: usize,
158    /// BitMEX API key (None will source from environment).
159    pub api_key: Option<String>,
160    /// BitMEX API secret (None will source from environment).
161    pub api_secret: Option<String>,
162    /// Base URL for BitMEX HTTP API.
163    pub base_url: Option<String>,
164    /// If connecting to BitMEX testnet.
165    pub testnet: bool,
166    /// Timeout in seconds for HTTP requests.
167    pub timeout_secs: Option<u64>,
168    /// Maximum number of retry attempts for failed requests.
169    pub max_retries: Option<u32>,
170    /// Initial delay in milliseconds between retry attempts.
171    pub retry_delay_ms: Option<u64>,
172    /// Maximum delay in milliseconds between retry attempts.
173    pub retry_delay_max_ms: Option<u64>,
174    /// Expiration window in milliseconds for signed requests.
175    pub recv_window_ms: Option<u64>,
176    /// Maximum REST burst rate (requests per second).
177    pub max_requests_per_second: Option<u32>,
178    /// Maximum REST rolling rate (requests per minute).
179    pub max_requests_per_minute: Option<u32>,
180    /// Interval in seconds between health check pings.
181    pub health_check_interval_secs: u64,
182    /// Timeout in seconds for health check requests.
183    pub health_check_timeout_secs: u64,
184    /// Substrings to identify expected cancel rejections for debug-level logging.
185    pub expected_reject_patterns: Vec<String>,
186    /// Substrings to identify idempotent success (order already cancelled/not found).
187    pub idempotent_success_patterns: Vec<String>,
188    /// Optional list of proxy URLs for path diversity.
189    ///
190    /// Each transport instance uses the proxy at its index. If the list is shorter
191    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
192    /// are ignored.
193    pub proxy_urls: Vec<Option<String>>,
194}
195
196impl Default for CancelBroadcasterConfig {
197    fn default() -> Self {
198        Self {
199            pool_size: 2,
200            api_key: None,
201            api_secret: None,
202            base_url: None,
203            testnet: false,
204            timeout_secs: Some(60),
205            max_retries: None,
206            retry_delay_ms: Some(1_000),
207            retry_delay_max_ms: Some(5_000),
208            recv_window_ms: Some(10_000),
209            max_requests_per_second: Some(10),
210            max_requests_per_minute: Some(120),
211            health_check_interval_secs: 30,
212            health_check_timeout_secs: 5,
213            expected_reject_patterns: vec![
214                r"Order had execInst of ParticipateDoNotInitiate".to_string(),
215            ],
216            idempotent_success_patterns: vec![
217                r"AlreadyCanceled".to_string(),
218                r"orderID not found".to_string(),
219                r"Unable to cancel order due to existing state".to_string(),
220            ],
221            proxy_urls: vec![],
222        }
223    }
224}
225
226/// Transport client wrapper with health monitoring.
227#[derive(Clone)]
228struct TransportClient {
229    /// Executor wrapped in Arc to enable cloning without requiring Clone on CancelExecutor.
230    ///
231    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
232    /// the executor across multiple TransportClient clones.
233    executor: Arc<dyn CancelExecutor>,
234    client_id: String,
235    healthy: Arc<AtomicBool>,
236    cancel_count: Arc<AtomicU64>,
237    error_count: Arc<AtomicU64>,
238}
239
240impl Debug for TransportClient {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        f.debug_struct("TransportClient")
243            .field("client_id", &self.client_id)
244            .field("healthy", &self.healthy)
245            .field("cancel_count", &self.cancel_count)
246            .field("error_count", &self.error_count)
247            .finish()
248    }
249}
250
251impl TransportClient {
252    fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
253        Self {
254            executor: Arc::new(executor),
255            client_id,
256            healthy: Arc::new(AtomicBool::new(true)),
257            cancel_count: Arc::new(AtomicU64::new(0)),
258            error_count: Arc::new(AtomicU64::new(0)),
259        }
260    }
261
262    fn is_healthy(&self) -> bool {
263        self.healthy.load(Ordering::Relaxed)
264    }
265
266    fn mark_healthy(&self) {
267        self.healthy.store(true, Ordering::Relaxed);
268    }
269
270    fn mark_unhealthy(&self) {
271        self.healthy.store(false, Ordering::Relaxed);
272    }
273
274    fn get_cancel_count(&self) -> u64 {
275        self.cancel_count.load(Ordering::Relaxed)
276    }
277
278    fn get_error_count(&self) -> u64 {
279        self.error_count.load(Ordering::Relaxed)
280    }
281
282    async fn health_check(&self, timeout_secs: u64) -> bool {
283        match tokio::time::timeout(
284            Duration::from_secs(timeout_secs),
285            self.executor.health_check(),
286        )
287        .await
288        {
289            Ok(Ok(_)) => {
290                self.mark_healthy();
291                true
292            }
293            Ok(Err(e)) => {
294                tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
295                self.mark_unhealthy();
296                false
297            }
298            Err(_) => {
299                tracing::warn!("Health check timeout for client {}", self.client_id);
300                self.mark_unhealthy();
301                false
302            }
303        }
304    }
305
306    async fn cancel_order(
307        &self,
308        instrument_id: InstrumentId,
309        client_order_id: Option<ClientOrderId>,
310        venue_order_id: Option<VenueOrderId>,
311    ) -> anyhow::Result<OrderStatusReport> {
312        self.cancel_count.fetch_add(1, Ordering::Relaxed);
313
314        match self
315            .executor
316            .cancel_order(instrument_id, client_order_id, venue_order_id)
317            .await
318        {
319            Ok(report) => {
320                self.mark_healthy();
321                Ok(report)
322            }
323            Err(e) => {
324                self.error_count.fetch_add(1, Ordering::Relaxed);
325                Err(e)
326            }
327        }
328    }
329}
330
331/// Broadcasts cancel requests to multiple HTTP clients for redundancy.
332///
333/// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
334/// in parallel, short-circuits when the first successful acknowledgement is received,
335/// and handles expected rejection patterns with appropriate log levels.
336#[cfg_attr(feature = "python", pyo3::pyclass)]
337#[derive(Debug)]
338pub struct CancelBroadcaster {
339    config: CancelBroadcasterConfig,
340    transports: Arc<Vec<TransportClient>>,
341    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
342    running: Arc<AtomicBool>,
343    total_cancels: Arc<AtomicU64>,
344    successful_cancels: Arc<AtomicU64>,
345    failed_cancels: Arc<AtomicU64>,
346    expected_rejects: Arc<AtomicU64>,
347    idempotent_successes: Arc<AtomicU64>,
348}
349
350impl CancelBroadcaster {
351    /// Creates a new [`CancelBroadcaster`] with internal HTTP client pool.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if any HTTP client fails to initialize.
356    pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
357        let mut transports = Vec::with_capacity(config.pool_size);
358
359        // Synthesize base_url when testnet is true but base_url is None
360        let base_url = if config.testnet && config.base_url.is_none() {
361            Some(BITMEX_HTTP_TESTNET_URL.to_string())
362        } else {
363            config.base_url.clone()
364        };
365
366        for i in 0..config.pool_size {
367            // Assign proxy from config list, or None if index exceeds list length
368            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
369
370            let client = BitmexHttpClient::with_credentials(
371                config.api_key.clone(),
372                config.api_secret.clone(),
373                base_url.clone(),
374                config.timeout_secs,
375                config.max_retries,
376                config.retry_delay_ms,
377                config.retry_delay_max_ms,
378                config.recv_window_ms,
379                config.max_requests_per_second,
380                config.max_requests_per_minute,
381                proxy_url,
382            )
383            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
384
385            transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
386        }
387
388        Ok(Self {
389            config,
390            transports: Arc::new(transports),
391            health_check_task: Arc::new(RwLock::new(None)),
392            running: Arc::new(AtomicBool::new(false)),
393            total_cancels: Arc::new(AtomicU64::new(0)),
394            successful_cancels: Arc::new(AtomicU64::new(0)),
395            failed_cancels: Arc::new(AtomicU64::new(0)),
396            expected_rejects: Arc::new(AtomicU64::new(0)),
397            idempotent_successes: Arc::new(AtomicU64::new(0)),
398        })
399    }
400
401    /// Starts the broadcaster and health check loop.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the broadcaster is already running.
406    pub async fn start(&self) -> anyhow::Result<()> {
407        if self.running.load(Ordering::Relaxed) {
408            return Ok(());
409        }
410
411        self.running.store(true, Ordering::Relaxed);
412
413        // Initial health check for all clients
414        self.run_health_checks().await;
415
416        // Start periodic health check task
417        let transports = Arc::clone(&self.transports);
418        let running = Arc::clone(&self.running);
419        let interval_secs = self.config.health_check_interval_secs;
420        let timeout_secs = self.config.health_check_timeout_secs;
421
422        let task = get_runtime().spawn(async move {
423            let mut ticker = interval(Duration::from_secs(interval_secs));
424            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
425
426            loop {
427                ticker.tick().await;
428
429                if !running.load(Ordering::Relaxed) {
430                    break;
431                }
432
433                let tasks: Vec<_> = transports
434                    .iter()
435                    .map(|t| t.health_check(timeout_secs))
436                    .collect();
437
438                let results = future::join_all(tasks).await;
439                let healthy_count = results.iter().filter(|&&r| r).count();
440
441                tracing::debug!(
442                    "Health check complete: {}/{} clients healthy",
443                    healthy_count,
444                    results.len()
445                );
446            }
447        });
448
449        *self.health_check_task.write().await = Some(task);
450
451        tracing::info!(
452            "CancelBroadcaster started with {} clients",
453            self.transports.len()
454        );
455
456        Ok(())
457    }
458
459    /// Stops the broadcaster and health check loop.
460    pub async fn stop(&self) {
461        if !self.running.load(Ordering::Relaxed) {
462            return;
463        }
464
465        self.running.store(false, Ordering::Relaxed);
466
467        if let Some(task) = self.health_check_task.write().await.take() {
468            task.abort();
469        }
470
471        tracing::info!("CancelBroadcaster stopped");
472    }
473
474    async fn run_health_checks(&self) {
475        let tasks: Vec<_> = self
476            .transports
477            .iter()
478            .map(|t| t.health_check(self.config.health_check_timeout_secs))
479            .collect();
480
481        let results = future::join_all(tasks).await;
482        let healthy_count = results.iter().filter(|&&r| r).count();
483
484        tracing::debug!(
485            "Health check complete: {}/{} clients healthy",
486            healthy_count,
487            results.len()
488        );
489    }
490
491    fn is_expected_reject(&self, error_message: &str) -> bool {
492        self.config
493            .expected_reject_patterns
494            .iter()
495            .any(|pattern| error_message.contains(pattern))
496    }
497
498    fn is_idempotent_success(&self, error_message: &str) -> bool {
499        self.config
500            .idempotent_success_patterns
501            .iter()
502            .any(|pattern| error_message.contains(pattern))
503    }
504
505    /// Processes cancel request results, handling success, idempotent success, and failures.
506    ///
507    /// This helper consolidates the common error handling loop used across all broadcast methods.
508    async fn process_cancel_results<T>(
509        &self,
510        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
511        idempotent_result: impl FnOnce() -> anyhow::Result<T>,
512        operation: &str,
513        params: String,
514        idempotent_reason: &str,
515    ) -> anyhow::Result<T>
516    where
517        T: Send + 'static,
518    {
519        let mut errors = Vec::new();
520
521        while !handles.is_empty() {
522            let current_handles = std::mem::take(&mut handles);
523            let (result, _idx, remaining) = future::select_all(current_handles).await;
524            handles = remaining.into_iter().collect();
525
526            match result {
527                Ok((client_id, Ok(result))) => {
528                    // First success - abort remaining handles
529                    for handle in &handles {
530                        handle.abort();
531                    }
532                    self.successful_cancels.fetch_add(1, Ordering::Relaxed);
533
534                    tracing::debug!(
535                        "{} broadcast succeeded [{}] {}",
536                        operation,
537                        client_id,
538                        params
539                    );
540
541                    return Ok(result);
542                }
543                Ok((client_id, Err(e))) => {
544                    let error_msg = e.to_string();
545
546                    if self.is_idempotent_success(&error_msg) {
547                        // First idempotent success - abort remaining handles and return success
548                        for handle in &handles {
549                            handle.abort();
550                        }
551                        self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
552
553                        tracing::debug!(
554                            "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
555                        );
556
557                        return idempotent_result();
558                    }
559
560                    if self.is_expected_reject(&error_msg) {
561                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
562                        tracing::debug!(
563                            "Expected {} rejection [{}]: {} {}",
564                            operation.to_lowercase(),
565                            client_id,
566                            error_msg,
567                            params
568                        );
569                        errors.push(error_msg);
570                    } else {
571                        tracing::warn!(
572                            "{} request failed [{}]: {} {}",
573                            operation,
574                            client_id,
575                            error_msg,
576                            params
577                        );
578                        errors.push(error_msg);
579                    }
580                }
581                Err(e) => {
582                    tracing::warn!("{} task join error: {e:?}", operation);
583                    errors.push(format!("Task panicked: {e:?}"));
584                }
585            }
586        }
587
588        // All tasks failed
589        self.failed_cancels.fetch_add(1, Ordering::Relaxed);
590        tracing::error!(
591            "All {} requests failed: {errors:?} {params}",
592            operation.to_lowercase(),
593        );
594        Err(anyhow::anyhow!(
595            "All {} requests failed: {errors:?}",
596            operation.to_lowercase(),
597        ))
598    }
599
600    /// Broadcasts a single cancel request to all healthy clients in parallel.
601    ///
602    /// # Returns
603    ///
604    /// - `Ok(Some(report))` if successfully cancelled with a report.
605    /// - `Ok(None)` if the order was already cancelled (idempotent success).
606    /// - `Err` if all requests failed.
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if all cancel requests fail or no healthy clients are available.
611    pub async fn broadcast_cancel(
612        &self,
613        instrument_id: InstrumentId,
614        client_order_id: Option<ClientOrderId>,
615        venue_order_id: Option<VenueOrderId>,
616    ) -> anyhow::Result<Option<OrderStatusReport>> {
617        self.total_cancels.fetch_add(1, Ordering::Relaxed);
618
619        let healthy_transports: Vec<TransportClient> = self
620            .transports
621            .iter()
622            .filter(|t| t.is_healthy())
623            .cloned()
624            .collect();
625
626        if healthy_transports.is_empty() {
627            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
628            anyhow::bail!("No healthy transport clients available");
629        }
630
631        let mut handles = Vec::new();
632        for transport in healthy_transports {
633            let handle = get_runtime().spawn(async move {
634                let client_id = transport.client_id.clone();
635                let result = transport
636                    .cancel_order(instrument_id, client_order_id, venue_order_id)
637                    .await
638                    .map(Some); // Wrap success in Some for Option<OrderStatusReport>
639                (client_id, result)
640            });
641            handles.push(handle);
642        }
643
644        self.process_cancel_results(
645            handles,
646            || Ok(None),
647            "Cancel",
648            format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
649            "order already cancelled/not found",
650        )
651        .await
652    }
653
654    /// Broadcasts a batch cancel request to all healthy clients in parallel.
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if all cancel requests fail or no healthy clients are available.
659    pub async fn broadcast_batch_cancel(
660        &self,
661        instrument_id: InstrumentId,
662        client_order_ids: Option<Vec<ClientOrderId>>,
663        venue_order_ids: Option<Vec<VenueOrderId>>,
664    ) -> anyhow::Result<Vec<OrderStatusReport>> {
665        self.total_cancels.fetch_add(1, Ordering::Relaxed);
666
667        let healthy_transports: Vec<TransportClient> = self
668            .transports
669            .iter()
670            .filter(|t| t.is_healthy())
671            .cloned()
672            .collect();
673
674        if healthy_transports.is_empty() {
675            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
676            anyhow::bail!("No healthy transport clients available");
677        }
678
679        let mut handles = Vec::new();
680
681        for transport in healthy_transports {
682            let client_order_ids_clone = client_order_ids.clone();
683            let venue_order_ids_clone = venue_order_ids.clone();
684            let handle = get_runtime().spawn(async move {
685                let client_id = transport.client_id.clone();
686                let result = transport
687                    .executor
688                    .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
689                    .await;
690                (client_id, result)
691            });
692            handles.push(handle);
693        }
694
695        self.process_cancel_results(
696            handles,
697            || Ok(Vec::new()),
698            "Batch cancel",
699            format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
700            "orders already cancelled/not found",
701        )
702        .await
703    }
704
705    /// Broadcasts a cancel all request to all healthy clients in parallel.
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if all cancel requests fail or no healthy clients are available.
710    pub async fn broadcast_cancel_all(
711        &self,
712        instrument_id: InstrumentId,
713        order_side: Option<OrderSide>,
714    ) -> anyhow::Result<Vec<OrderStatusReport>> {
715        self.total_cancels.fetch_add(1, Ordering::Relaxed);
716
717        let healthy_transports: Vec<TransportClient> = self
718            .transports
719            .iter()
720            .filter(|t| t.is_healthy())
721            .cloned()
722            .collect();
723
724        if healthy_transports.is_empty() {
725            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
726            anyhow::bail!("No healthy transport clients available");
727        }
728
729        let mut handles = Vec::new();
730        for transport in healthy_transports {
731            let handle = get_runtime().spawn(async move {
732                let client_id = transport.client_id.clone();
733                let result = transport
734                    .executor
735                    .cancel_all_orders(instrument_id, order_side)
736                    .await;
737                (client_id, result)
738            });
739            handles.push(handle);
740        }
741
742        self.process_cancel_results(
743            handles,
744            || Ok(Vec::new()),
745            "Cancel all",
746            format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
747            "no orders to cancel",
748        )
749        .await
750    }
751
752    /// Gets broadcaster metrics.
753    pub fn get_metrics(&self) -> BroadcasterMetrics {
754        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
755        let total_clients = self.transports.len();
756
757        BroadcasterMetrics {
758            total_cancels: self.total_cancels.load(Ordering::Relaxed),
759            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
760            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
761            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
762            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
763            healthy_clients,
764            total_clients,
765        }
766    }
767
768    /// Gets broadcaster metrics (async version for use within async context).
769    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
770        self.get_metrics()
771    }
772
773    /// Gets per-client statistics.
774    pub fn get_client_stats(&self) -> Vec<ClientStats> {
775        self.transports
776            .iter()
777            .map(|t| ClientStats {
778                client_id: t.client_id.clone(),
779                healthy: t.is_healthy(),
780                cancel_count: t.get_cancel_count(),
781                error_count: t.get_error_count(),
782            })
783            .collect()
784    }
785
786    /// Gets per-client statistics (async version for use within async context).
787    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
788        self.get_client_stats()
789    }
790
791    /// Caches an instrument in all HTTP clients in the pool.
792    pub fn cache_instrument(&self, instrument: InstrumentAny) {
793        for transport in self.transports.iter() {
794            transport.executor.add_instrument(instrument.clone());
795        }
796    }
797
798    #[must_use]
799    pub fn clone_for_async(&self) -> Self {
800        Self {
801            config: self.config.clone(),
802            transports: Arc::clone(&self.transports),
803            health_check_task: Arc::clone(&self.health_check_task),
804            running: Arc::clone(&self.running),
805            total_cancels: Arc::clone(&self.total_cancels),
806            successful_cancels: Arc::clone(&self.successful_cancels),
807            failed_cancels: Arc::clone(&self.failed_cancels),
808            expected_rejects: Arc::clone(&self.expected_rejects),
809            idempotent_successes: Arc::clone(&self.idempotent_successes),
810        }
811    }
812
813    #[cfg(test)]
814    fn new_with_transports(
815        config: CancelBroadcasterConfig,
816        transports: Vec<TransportClient>,
817    ) -> Self {
818        Self {
819            config,
820            transports: Arc::new(transports),
821            health_check_task: Arc::new(RwLock::new(None)),
822            running: Arc::new(AtomicBool::new(false)),
823            total_cancels: Arc::new(AtomicU64::new(0)),
824            successful_cancels: Arc::new(AtomicU64::new(0)),
825            failed_cancels: Arc::new(AtomicU64::new(0)),
826            expected_rejects: Arc::new(AtomicU64::new(0)),
827            idempotent_successes: Arc::new(AtomicU64::new(0)),
828        }
829    }
830}
831
832/// Broadcaster metrics snapshot.
833#[derive(Debug, Clone)]
834pub struct BroadcasterMetrics {
835    pub total_cancels: u64,
836    pub successful_cancels: u64,
837    pub failed_cancels: u64,
838    pub expected_rejects: u64,
839    pub idempotent_successes: u64,
840    pub healthy_clients: usize,
841    pub total_clients: usize,
842}
843
844/// Per-client statistics.
845#[derive(Debug, Clone)]
846pub struct ClientStats {
847    pub client_id: String,
848    pub healthy: bool,
849    pub cancel_count: u64,
850    pub error_count: u64,
851}
852
853#[cfg(test)]
854mod tests {
855    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
856
857    use nautilus_core::UUID4;
858    use nautilus_model::{
859        enums::{
860            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
861        },
862        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
863        reports::OrderStatusReport,
864        types::{Price, Quantity},
865    };
866
867    use super::*;
868
869    /// Mock executor for testing.
870    #[derive(Clone)]
871    #[allow(clippy::type_complexity)]
872    struct MockExecutor {
873        handler: Arc<
874            dyn Fn(
875                    InstrumentId,
876                    Option<ClientOrderId>,
877                    Option<VenueOrderId>,
878                )
879                    -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
880                + Send
881                + Sync,
882        >,
883    }
884
885    impl MockExecutor {
886        fn new<F, Fut>(handler: F) -> Self
887        where
888            F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
889                + Send
890                + Sync
891                + 'static,
892            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
893        {
894            Self {
895                handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
896            }
897        }
898    }
899
900    impl CancelExecutor for MockExecutor {
901        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
902            Box::pin(async { Ok(()) })
903        }
904
905        fn cancel_order(
906            &self,
907            instrument_id: InstrumentId,
908            client_order_id: Option<ClientOrderId>,
909            venue_order_id: Option<VenueOrderId>,
910        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
911            (self.handler)(instrument_id, client_order_id, venue_order_id)
912        }
913
914        fn cancel_orders(
915            &self,
916            _instrument_id: InstrumentId,
917            _client_order_ids: Option<Vec<ClientOrderId>>,
918            _venue_order_ids: Option<Vec<VenueOrderId>>,
919        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
920        {
921            Box::pin(async { Ok(Vec::new()) })
922        }
923
924        fn cancel_all_orders(
925            &self,
926            instrument_id: InstrumentId,
927            _order_side: Option<OrderSide>,
928        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
929        {
930            // Try to get result from the single-order handler to propagate errors
931            let handler = Arc::clone(&self.handler);
932            Box::pin(async move {
933                // Call the handler to check if it would fail
934                let result = handler(instrument_id, None, None).await;
935                match result {
936                    Ok(_) => Ok(Vec::new()),
937                    Err(e) => Err(e),
938                }
939            })
940        }
941
942        fn add_instrument(&self, _instrument: InstrumentAny) {
943            // No-op for mock
944        }
945    }
946
947    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
948        OrderStatusReport {
949            account_id: AccountId::from("BITMEX-001"),
950            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
951            venue_order_id: VenueOrderId::from(venue_order_id),
952            order_side: OrderSide::Buy,
953            order_type: OrderType::Limit,
954            time_in_force: TimeInForce::Gtc,
955            order_status: OrderStatus::Canceled,
956            price: Some(Price::new(50000.0, 2)),
957            quantity: Quantity::new(100.0, 0),
958            filled_qty: Quantity::new(0.0, 0),
959            report_id: UUID4::new(),
960            ts_accepted: 0.into(),
961            ts_last: 0.into(),
962            ts_init: 0.into(),
963            client_order_id: None,
964            avg_px: None,
965            trigger_price: None,
966            trigger_type: None,
967            contingency_type: ContingencyType::NoContingency,
968            expire_time: None,
969            order_list_id: None,
970            venue_position_id: None,
971            linked_order_ids: None,
972            parent_order_id: None,
973            display_qty: None,
974            limit_offset: None,
975            trailing_offset: None,
976            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
977            post_only: false,
978            reduce_only: false,
979            cancel_reason: None,
980            ts_triggered: None,
981        }
982    }
983
984    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
985    where
986        F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
987            + Send
988            + Sync
989            + 'static,
990        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
991    {
992        let executor = MockExecutor::new(handler);
993        TransportClient::new(executor, client_id.to_string())
994    }
995
996    #[tokio::test]
997    async fn test_broadcast_cancel_immediate_success() {
998        let report = create_test_report("ORDER-1");
999        let report_clone = report.clone();
1000
1001        let transports = vec![
1002            create_stub_transport("client-0", move |_, _, _| {
1003                let report = report_clone.clone();
1004                async move { Ok(report) }
1005            }),
1006            create_stub_transport("client-1", |_, _, _| async {
1007                tokio::time::sleep(Duration::from_secs(10)).await;
1008                anyhow::bail!("Should be aborted")
1009            }),
1010        ];
1011
1012        let config = CancelBroadcasterConfig::default();
1013        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1014
1015        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1016        let result = broadcaster
1017            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1018            .await;
1019
1020        assert!(result.is_ok());
1021        let returned_report = result.unwrap();
1022        assert!(returned_report.is_some());
1023        assert_eq!(
1024            returned_report.unwrap().venue_order_id,
1025            report.venue_order_id
1026        );
1027
1028        let metrics = broadcaster.get_metrics_async().await;
1029        assert_eq!(metrics.successful_cancels, 1);
1030        assert_eq!(metrics.failed_cancels, 0);
1031        assert_eq!(metrics.total_cancels, 1);
1032    }
1033
1034    #[tokio::test]
1035    async fn test_broadcast_cancel_idempotent_success() {
1036        let transports = vec![
1037            create_stub_transport("client-0", |_, _, _| async {
1038                anyhow::bail!("AlreadyCanceled")
1039            }),
1040            create_stub_transport("client-1", |_, _, _| async {
1041                tokio::time::sleep(Duration::from_secs(10)).await;
1042                anyhow::bail!("Should be aborted")
1043            }),
1044        ];
1045
1046        let config = CancelBroadcasterConfig::default();
1047        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1048
1049        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1050        let result = broadcaster
1051            .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1052            .await;
1053
1054        assert!(result.is_ok());
1055        assert!(result.unwrap().is_none());
1056
1057        let metrics = broadcaster.get_metrics_async().await;
1058        assert_eq!(metrics.idempotent_successes, 1);
1059        assert_eq!(metrics.successful_cancels, 0);
1060        assert_eq!(metrics.failed_cancels, 0);
1061    }
1062
1063    #[tokio::test]
1064    async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1065        let transports = vec![
1066            create_stub_transport("client-0", |_, _, _| async {
1067                anyhow::bail!("502 Bad Gateway")
1068            }),
1069            create_stub_transport("client-1", |_, _, _| async {
1070                anyhow::bail!("orderID not found")
1071            }),
1072        ];
1073
1074        let config = CancelBroadcasterConfig::default();
1075        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1076
1077        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1078        let result = broadcaster
1079            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1080            .await;
1081
1082        assert!(result.is_ok());
1083        assert!(result.unwrap().is_none());
1084
1085        let metrics = broadcaster.get_metrics_async().await;
1086        assert_eq!(metrics.idempotent_successes, 1);
1087        assert_eq!(metrics.failed_cancels, 0);
1088    }
1089
1090    #[tokio::test]
1091    async fn test_broadcast_cancel_all_failures() {
1092        let transports = vec![
1093            create_stub_transport("client-0", |_, _, _| async {
1094                anyhow::bail!("502 Bad Gateway")
1095            }),
1096            create_stub_transport("client-1", |_, _, _| async {
1097                anyhow::bail!("Connection refused")
1098            }),
1099        ];
1100
1101        let config = CancelBroadcasterConfig::default();
1102        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1103
1104        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1105        let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1106
1107        assert!(result.is_err());
1108        assert!(
1109            result
1110                .unwrap_err()
1111                .to_string()
1112                .contains("All cancel all requests failed")
1113        );
1114
1115        let metrics = broadcaster.get_metrics_async().await;
1116        assert_eq!(metrics.failed_cancels, 1);
1117        assert_eq!(metrics.successful_cancels, 0);
1118        assert_eq!(metrics.idempotent_successes, 0);
1119    }
1120
1121    #[tokio::test]
1122    async fn test_broadcast_cancel_no_healthy_clients() {
1123        let transport = create_stub_transport("client-0", |_, _, _| async {
1124            Ok(create_test_report("ORDER-1"))
1125        });
1126        transport.healthy.store(false, Ordering::Relaxed);
1127
1128        let config = CancelBroadcasterConfig::default();
1129        let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1130
1131        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1132        let result = broadcaster
1133            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1134            .await;
1135
1136        assert!(result.is_err());
1137        assert!(
1138            result
1139                .unwrap_err()
1140                .to_string()
1141                .contains("No healthy transport clients available")
1142        );
1143
1144        let metrics = broadcaster.get_metrics_async().await;
1145        assert_eq!(metrics.failed_cancels, 1);
1146    }
1147
1148    #[tokio::test]
1149    async fn test_broadcast_cancel_metrics_increment() {
1150        let report1 = create_test_report("ORDER-1");
1151        let report1_clone = report1.clone();
1152        let report2 = create_test_report("ORDER-2");
1153        let report2_clone = report2.clone();
1154
1155        let transports = vec![
1156            create_stub_transport("client-0", move |_, _, _| {
1157                let report = report1_clone.clone();
1158                async move { Ok(report) }
1159            }),
1160            create_stub_transport("client-1", move |_, _, _| {
1161                let report = report2_clone.clone();
1162                async move { Ok(report) }
1163            }),
1164        ];
1165
1166        let config = CancelBroadcasterConfig::default();
1167        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1168
1169        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1170
1171        let _ = broadcaster
1172            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1173            .await;
1174
1175        let _ = broadcaster
1176            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1177            .await;
1178
1179        let metrics = broadcaster.get_metrics_async().await;
1180        assert_eq!(metrics.total_cancels, 2);
1181        assert_eq!(metrics.successful_cancels, 2);
1182    }
1183
1184    #[tokio::test]
1185    async fn test_broadcast_cancel_expected_reject_pattern() {
1186        let transports = vec![
1187            create_stub_transport("client-0", |_, _, _| async {
1188                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1189            }),
1190            create_stub_transport("client-1", |_, _, _| async {
1191                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1192            }),
1193        ];
1194
1195        let config = CancelBroadcasterConfig::default();
1196        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1197
1198        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1199        let result = broadcaster
1200            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1201            .await;
1202
1203        assert!(result.is_err());
1204
1205        let metrics = broadcaster.get_metrics_async().await;
1206        assert_eq!(metrics.expected_rejects, 2);
1207        assert_eq!(metrics.failed_cancels, 1);
1208    }
1209
1210    #[tokio::test]
1211    async fn test_broadcaster_creation_with_pool() {
1212        let config = CancelBroadcasterConfig {
1213            pool_size: 3,
1214            api_key: Some("test_key".to_string()),
1215            api_secret: Some("test_secret".to_string()),
1216            base_url: Some("https://test.example.com".to_string()),
1217            testnet: false,
1218            timeout_secs: Some(5),
1219            max_retries: Some(2),
1220            retry_delay_ms: Some(100),
1221            retry_delay_max_ms: Some(1000),
1222            recv_window_ms: Some(5000),
1223            max_requests_per_second: Some(10),
1224            max_requests_per_minute: Some(100),
1225            health_check_interval_secs: 30,
1226            health_check_timeout_secs: 5,
1227            expected_reject_patterns: vec!["test_pattern".to_string()],
1228            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1229            proxy_urls: vec![],
1230        };
1231
1232        let broadcaster = CancelBroadcaster::new(config.clone());
1233        assert!(broadcaster.is_ok());
1234
1235        let broadcaster = broadcaster.unwrap();
1236        let metrics = broadcaster.get_metrics_async().await;
1237
1238        assert_eq!(metrics.total_clients, 3);
1239        assert_eq!(metrics.total_cancels, 0);
1240        assert_eq!(metrics.successful_cancels, 0);
1241        assert_eq!(metrics.failed_cancels, 0);
1242    }
1243
1244    #[tokio::test]
1245    async fn test_broadcaster_lifecycle() {
1246        let config = CancelBroadcasterConfig {
1247            pool_size: 2,
1248            api_key: Some("test_key".to_string()),
1249            api_secret: Some("test_secret".to_string()),
1250            base_url: Some("https://test.example.com".to_string()),
1251            testnet: false,
1252            timeout_secs: Some(5),
1253            max_retries: None,
1254            retry_delay_ms: None,
1255            retry_delay_max_ms: None,
1256            recv_window_ms: None,
1257            max_requests_per_second: None,
1258            max_requests_per_minute: None,
1259            health_check_interval_secs: 60, // Long interval so it won't tick during test
1260            health_check_timeout_secs: 1,
1261            expected_reject_patterns: vec![],
1262            idempotent_success_patterns: vec![],
1263            proxy_urls: vec![],
1264        };
1265
1266        let broadcaster = CancelBroadcaster::new(config).unwrap();
1267
1268        // Should not be running initially
1269        assert!(!broadcaster.running.load(Ordering::Relaxed));
1270
1271        // Start broadcaster
1272        let start_result = broadcaster.start().await;
1273        assert!(start_result.is_ok());
1274        assert!(broadcaster.running.load(Ordering::Relaxed));
1275
1276        // Starting again should be idempotent
1277        let start_again = broadcaster.start().await;
1278        assert!(start_again.is_ok());
1279
1280        // Stop broadcaster
1281        broadcaster.stop().await;
1282        assert!(!broadcaster.running.load(Ordering::Relaxed));
1283
1284        // Stopping again should be safe
1285        broadcaster.stop().await;
1286        assert!(!broadcaster.running.load(Ordering::Relaxed));
1287    }
1288
1289    #[tokio::test]
1290    async fn test_client_stats_collection() {
1291        let config = CancelBroadcasterConfig {
1292            pool_size: 2,
1293            api_key: Some("test_key".to_string()),
1294            api_secret: Some("test_secret".to_string()),
1295            base_url: Some("https://test.example.com".to_string()),
1296            testnet: false,
1297            timeout_secs: Some(5),
1298            max_retries: None,
1299            retry_delay_ms: None,
1300            retry_delay_max_ms: None,
1301            recv_window_ms: None,
1302            max_requests_per_second: None,
1303            max_requests_per_minute: None,
1304            health_check_interval_secs: 60,
1305            health_check_timeout_secs: 5,
1306            expected_reject_patterns: vec![],
1307            idempotent_success_patterns: vec![],
1308            proxy_urls: vec![],
1309        };
1310
1311        let broadcaster = CancelBroadcaster::new(config).unwrap();
1312        let stats = broadcaster.get_client_stats_async().await;
1313
1314        assert_eq!(stats.len(), 2);
1315        assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1316        assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1317        assert!(stats[0].healthy); // Should be healthy initially
1318        assert!(stats[1].healthy);
1319        assert_eq!(stats[0].cancel_count, 0);
1320        assert_eq!(stats[1].cancel_count, 0);
1321        assert_eq!(stats[0].error_count, 0);
1322        assert_eq!(stats[1].error_count, 0);
1323    }
1324
1325    #[tokio::test]
1326    async fn test_testnet_config_sets_base_url() {
1327        let config = CancelBroadcasterConfig {
1328            pool_size: 1,
1329            api_key: Some("test_key".to_string()),
1330            api_secret: Some("test_secret".to_string()),
1331            base_url: None, // Not specified
1332            testnet: true,  // But testnet is true
1333            timeout_secs: Some(5),
1334            max_retries: None,
1335            retry_delay_ms: None,
1336            retry_delay_max_ms: None,
1337            recv_window_ms: None,
1338            max_requests_per_second: None,
1339            max_requests_per_minute: None,
1340            health_check_interval_secs: 60,
1341            health_check_timeout_secs: 5,
1342            expected_reject_patterns: vec![],
1343            idempotent_success_patterns: vec![],
1344            proxy_urls: vec![],
1345        };
1346
1347        let broadcaster = CancelBroadcaster::new(config);
1348        assert!(broadcaster.is_ok());
1349    }
1350
1351    #[tokio::test]
1352    async fn test_default_config() {
1353        let config = CancelBroadcasterConfig {
1354            api_key: Some("test_key".to_string()),
1355            api_secret: Some("test_secret".to_string()),
1356            base_url: Some("https://test.example.com".to_string()),
1357            ..Default::default()
1358        };
1359
1360        let broadcaster = CancelBroadcaster::new(config);
1361        assert!(broadcaster.is_ok());
1362
1363        let broadcaster = broadcaster.unwrap();
1364        let metrics = broadcaster.get_metrics_async().await;
1365
1366        // Default pool_size is 2
1367        assert_eq!(metrics.total_clients, 2);
1368    }
1369
1370    #[tokio::test]
1371    async fn test_clone_for_async() {
1372        let config = CancelBroadcasterConfig {
1373            pool_size: 1,
1374            api_key: Some("test_key".to_string()),
1375            api_secret: Some("test_secret".to_string()),
1376            base_url: Some("https://test.example.com".to_string()),
1377            testnet: false,
1378            timeout_secs: Some(5),
1379            max_retries: None,
1380            retry_delay_ms: None,
1381            retry_delay_max_ms: None,
1382            recv_window_ms: None,
1383            max_requests_per_second: None,
1384            max_requests_per_minute: None,
1385            health_check_interval_secs: 60,
1386            health_check_timeout_secs: 5,
1387            expected_reject_patterns: vec![],
1388            idempotent_success_patterns: vec![],
1389            proxy_urls: vec![],
1390        };
1391
1392        let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1393
1394        // Increment a metric on original
1395        broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1396
1397        // Clone should share the same atomic
1398        let broadcaster2 = broadcaster1.clone_for_async();
1399        let metrics2 = broadcaster2.get_metrics_async().await;
1400
1401        assert_eq!(metrics2.total_cancels, 1); // Should see the increment
1402
1403        // Modify through clone
1404        broadcaster2
1405            .successful_cancels
1406            .fetch_add(5, Ordering::Relaxed);
1407
1408        // Original should see the change
1409        let metrics1 = broadcaster1.get_metrics_async().await;
1410        assert_eq!(metrics1.successful_cancels, 5);
1411    }
1412
1413    #[tokio::test]
1414    async fn test_pattern_matching() {
1415        // Test that pattern matching works for expected rejects and idempotent successes
1416        let config = CancelBroadcasterConfig {
1417            pool_size: 1,
1418            api_key: Some("test_key".to_string()),
1419            api_secret: Some("test_secret".to_string()),
1420            base_url: Some("https://test.example.com".to_string()),
1421            testnet: false,
1422            timeout_secs: Some(5),
1423            max_retries: None,
1424            retry_delay_ms: None,
1425            retry_delay_max_ms: None,
1426            recv_window_ms: None,
1427            max_requests_per_second: None,
1428            max_requests_per_minute: None,
1429            health_check_interval_secs: 60,
1430            health_check_timeout_secs: 5,
1431            expected_reject_patterns: vec![
1432                "ParticipateDoNotInitiate".to_string(),
1433                "Close-only".to_string(),
1434            ],
1435            idempotent_success_patterns: vec![
1436                "AlreadyCanceled".to_string(),
1437                "orderID not found".to_string(),
1438                "Unable to cancel".to_string(),
1439            ],
1440            proxy_urls: vec![],
1441        };
1442
1443        let broadcaster = CancelBroadcaster::new(config).unwrap();
1444
1445        // Test expected reject patterns
1446        assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1447        assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1448        assert!(!broadcaster.is_expected_reject("Connection timeout"));
1449
1450        // Test idempotent success patterns
1451        assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1452        assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1453        assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1454        assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1455    }
1456
1457    // Happy-path coverage for broadcast_batch_cancel and broadcast_cancel_all
1458    // Note: These use simplified stubs since batch/cancel-all bypass test_handler
1459    // Full HTTP mocking tested in integration tests
1460    #[tokio::test]
1461    async fn test_broadcast_batch_cancel_structure() {
1462        // Validates broadcaster structure and metric initialization
1463        let config = CancelBroadcasterConfig {
1464            pool_size: 2,
1465            api_key: Some("test_key".to_string()),
1466            api_secret: Some("test_secret".to_string()),
1467            base_url: Some("https://test.example.com".to_string()),
1468            testnet: false,
1469            timeout_secs: Some(5),
1470            max_retries: None,
1471            retry_delay_ms: None,
1472            retry_delay_max_ms: None,
1473            recv_window_ms: None,
1474            max_requests_per_second: None,
1475            max_requests_per_minute: None,
1476            health_check_interval_secs: 60,
1477            health_check_timeout_secs: 5,
1478            expected_reject_patterns: vec![],
1479            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1480            proxy_urls: vec![],
1481        };
1482
1483        let broadcaster = CancelBroadcaster::new(config).unwrap();
1484        let metrics = broadcaster.get_metrics_async().await;
1485
1486        // Verify initial state
1487        assert_eq!(metrics.total_clients, 2);
1488        assert_eq!(metrics.total_cancels, 0);
1489        assert_eq!(metrics.successful_cancels, 0);
1490        assert_eq!(metrics.failed_cancels, 0);
1491    }
1492
1493    #[tokio::test]
1494    async fn test_broadcast_cancel_all_structure() {
1495        // Validates broadcaster structure for cancel_all operations
1496        let config = CancelBroadcasterConfig {
1497            pool_size: 3,
1498            api_key: Some("test_key".to_string()),
1499            api_secret: Some("test_secret".to_string()),
1500            base_url: Some("https://test.example.com".to_string()),
1501            testnet: false,
1502            timeout_secs: Some(5),
1503            max_retries: None,
1504            retry_delay_ms: None,
1505            retry_delay_max_ms: None,
1506            recv_window_ms: None,
1507            max_requests_per_second: None,
1508            max_requests_per_minute: None,
1509            health_check_interval_secs: 60,
1510            health_check_timeout_secs: 5,
1511            expected_reject_patterns: vec![],
1512            idempotent_success_patterns: vec!["orderID not found".to_string()],
1513            proxy_urls: vec![],
1514        };
1515
1516        let broadcaster = CancelBroadcaster::new(config).unwrap();
1517        let metrics = broadcaster.get_metrics_async().await;
1518
1519        // Verify pool size and initial metrics
1520        assert_eq!(metrics.total_clients, 3);
1521        assert_eq!(metrics.healthy_clients, 3);
1522        assert_eq!(metrics.total_cancels, 0);
1523    }
1524
1525    // Metric health tests - validates that idempotent successes don't increment failed_cancels
1526    #[tokio::test]
1527    async fn test_single_cancel_metrics_with_mixed_responses() {
1528        // Test similar to test_broadcast_cancel_mixed_idempotent_and_failure
1529        // but explicitly validates metric health
1530        let transports = vec![
1531            create_stub_transport("client-0", |_, _, _| async {
1532                anyhow::bail!("Connection timeout")
1533            }),
1534            create_stub_transport("client-1", |_, _, _| async {
1535                anyhow::bail!("AlreadyCanceled")
1536            }),
1537        ];
1538
1539        let config = CancelBroadcasterConfig::default();
1540        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1541
1542        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1543        let result = broadcaster
1544            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1545            .await;
1546
1547        // Should succeed with idempotent
1548        assert!(result.is_ok());
1549        assert!(result.unwrap().is_none());
1550
1551        // Verify metrics: idempotent success doesn't count as failure
1552        let metrics = broadcaster.get_metrics_async().await;
1553        assert_eq!(
1554            metrics.failed_cancels, 0,
1555            "Idempotent success should not increment failed_cancels"
1556        );
1557        assert_eq!(metrics.idempotent_successes, 1);
1558        assert_eq!(metrics.successful_cancels, 0);
1559    }
1560
1561    #[tokio::test]
1562    async fn test_metrics_initialization_and_health() {
1563        // Validates that metrics start at zero and clients start healthy
1564        let config = CancelBroadcasterConfig {
1565            pool_size: 4,
1566            api_key: Some("test_key".to_string()),
1567            api_secret: Some("test_secret".to_string()),
1568            base_url: Some("https://test.example.com".to_string()),
1569            testnet: false,
1570            timeout_secs: Some(5),
1571            max_retries: None,
1572            retry_delay_ms: None,
1573            retry_delay_max_ms: None,
1574            recv_window_ms: None,
1575            max_requests_per_second: None,
1576            max_requests_per_minute: None,
1577            health_check_interval_secs: 60,
1578            health_check_timeout_secs: 5,
1579            expected_reject_patterns: vec![],
1580            idempotent_success_patterns: vec![],
1581            proxy_urls: vec![],
1582        };
1583
1584        let broadcaster = CancelBroadcaster::new(config).unwrap();
1585        let metrics = broadcaster.get_metrics_async().await;
1586
1587        // All metrics should start at zero
1588        assert_eq!(metrics.total_cancels, 0);
1589        assert_eq!(metrics.successful_cancels, 0);
1590        assert_eq!(metrics.failed_cancels, 0);
1591        assert_eq!(metrics.expected_rejects, 0);
1592        assert_eq!(metrics.idempotent_successes, 0);
1593
1594        // All clients should start healthy
1595        assert_eq!(metrics.healthy_clients, 4);
1596        assert_eq!(metrics.total_clients, 4);
1597    }
1598
1599    // Health-check task lifecycle test
1600    #[tokio::test]
1601    async fn test_health_check_task_lifecycle() {
1602        let config = CancelBroadcasterConfig {
1603            pool_size: 1,
1604            api_key: Some("test_key".to_string()),
1605            api_secret: Some("test_secret".to_string()),
1606            base_url: Some("https://test.example.com".to_string()),
1607            testnet: false,
1608            timeout_secs: Some(5),
1609            max_retries: None,
1610            retry_delay_ms: None,
1611            retry_delay_max_ms: None,
1612            recv_window_ms: None,
1613            max_requests_per_second: None,
1614            max_requests_per_minute: None,
1615            health_check_interval_secs: 1, // Very short interval
1616            health_check_timeout_secs: 1,
1617            expected_reject_patterns: vec![],
1618            idempotent_success_patterns: vec![],
1619            proxy_urls: vec![],
1620        };
1621
1622        let broadcaster = CancelBroadcaster::new(config).unwrap();
1623
1624        // Start the broadcaster
1625        broadcaster.start().await.unwrap();
1626        assert!(broadcaster.running.load(Ordering::Relaxed));
1627
1628        // Verify task handle exists
1629        {
1630            let task_guard = broadcaster.health_check_task.read().await;
1631            assert!(task_guard.is_some());
1632        }
1633
1634        // Stop the broadcaster
1635        broadcaster.stop().await;
1636        assert!(!broadcaster.running.load(Ordering::Relaxed));
1637
1638        // Verify task handle has been cleared
1639        {
1640            let task_guard = broadcaster.health_check_task.read().await;
1641            assert!(task_guard.is_none());
1642        }
1643    }
1644}