nautilus_bitmex/execution/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Cancel request broadcaster for redundant order cancellation.
17//!
18//! This module provides the [`CancelBroadcaster`] which fans out cancel requests
19//! to multiple HTTP clients in parallel for redundancy. Key design patterns:
20//!
21//! - **Dependency injection via traits**: Uses `CancelExecutor` trait to abstract
22//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
23//! - **Trait objects over generics**: Uses `Arc<dyn CancelExecutor>` to avoid
24//!   generic type parameters on the public API (simpler Python FFI).
25//! - **Short-circuit on first success**: Aborts remaining requests once any client
26//!   succeeds, minimizing latency.
27//! - **Idempotent success handling**: Recognizes "already cancelled" responses as
28//!   successful outcomes.
29
30// TODO: Replace boxed futures in `CancelExecutor` once stable async trait object support
31// lands so we can drop the per-call heap allocation
32
33use std::{
34    fmt::Debug,
35    future::Future,
36    pin::Pin,
37    sync::{
38        Arc,
39        atomic::{AtomicBool, AtomicU64, Ordering},
40    },
41    time::Duration,
42};
43
44use futures_util::future;
45use nautilus_common::live::get_runtime;
46use nautilus_model::{
47    enums::OrderSide,
48    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
49    instruments::InstrumentAny,
50    reports::OrderStatusReport,
51};
52use tokio::{sync::RwLock, task::JoinHandle, time::interval};
53
54use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
55
56/// Trait for order cancellation operations.
57///
58/// This trait abstracts the execution layer to enable dependency injection and testing
59/// without conditional compilation. The broadcaster holds executors as `Arc<dyn CancelExecutor>`
60/// to avoid generic type parameters that would complicate the Python FFI boundary.
61///
62/// # Thread Safety
63///
64/// All methods must be safe to call concurrently from multiple threads. Implementations
65/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
66///
67/// # Error Handling
68///
69/// Methods return `anyhow::Result` for flexibility. Implementers should provide
70/// meaningful error messages that can be logged and tracked by the broadcaster.
71///
72/// # Implementation Note
73///
74/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
75/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
76/// `Clone`) to be used without modification.
77trait CancelExecutor: Send + Sync {
78    /// Adds an instrument for caching.
79    fn add_instrument(&self, instrument: InstrumentAny);
80
81    /// Performs a health check on the executor.
82    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
83
84    /// Cancels a single order.
85    fn cancel_order(
86        &self,
87        instrument_id: InstrumentId,
88        client_order_id: Option<ClientOrderId>,
89        venue_order_id: Option<VenueOrderId>,
90    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
91
92    /// Cancels multiple orders.
93    fn cancel_orders(
94        &self,
95        instrument_id: InstrumentId,
96        client_order_ids: Option<Vec<ClientOrderId>>,
97        venue_order_ids: Option<Vec<VenueOrderId>>,
98    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
99
100    /// Cancels all orders for an instrument.
101    fn cancel_all_orders(
102        &self,
103        instrument_id: InstrumentId,
104        order_side: Option<OrderSide>,
105    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
106}
107
108impl CancelExecutor for BitmexHttpClient {
109    fn add_instrument(&self, instrument: InstrumentAny) {
110        Self::cache_instrument(self, instrument);
111    }
112
113    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
114        Box::pin(async move {
115            Self::get_server_time(self)
116                .await
117                .map(|_| ())
118                .map_err(|e| anyhow::anyhow!("{e}"))
119        })
120    }
121
122    fn cancel_order(
123        &self,
124        instrument_id: InstrumentId,
125        client_order_id: Option<ClientOrderId>,
126        venue_order_id: Option<VenueOrderId>,
127    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
128        Box::pin(async move {
129            Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
130        })
131    }
132
133    fn cancel_orders(
134        &self,
135        instrument_id: InstrumentId,
136        client_order_ids: Option<Vec<ClientOrderId>>,
137        venue_order_ids: Option<Vec<VenueOrderId>>,
138    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
139        Box::pin(async move {
140            Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
141        })
142    }
143
144    fn cancel_all_orders(
145        &self,
146        instrument_id: InstrumentId,
147        order_side: Option<OrderSide>,
148    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
149        Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
150    }
151}
152
153/// Configuration for the cancel broadcaster.
154#[derive(Debug, Clone)]
155pub struct CancelBroadcasterConfig {
156    /// Number of HTTP clients in the pool.
157    pub pool_size: usize,
158    /// BitMEX API key (None will source from environment).
159    pub api_key: Option<String>,
160    /// BitMEX API secret (None will source from environment).
161    pub api_secret: Option<String>,
162    /// Base URL for BitMEX HTTP API.
163    pub base_url: Option<String>,
164    /// If connecting to BitMEX testnet.
165    pub testnet: bool,
166    /// Timeout in seconds for HTTP requests.
167    pub timeout_secs: Option<u64>,
168    /// Maximum number of retry attempts for failed requests.
169    pub max_retries: Option<u32>,
170    /// Initial delay in milliseconds between retry attempts.
171    pub retry_delay_ms: Option<u64>,
172    /// Maximum delay in milliseconds between retry attempts.
173    pub retry_delay_max_ms: Option<u64>,
174    /// Expiration window in milliseconds for signed requests.
175    pub recv_window_ms: Option<u64>,
176    /// Maximum REST burst rate (requests per second).
177    pub max_requests_per_second: Option<u32>,
178    /// Maximum REST rolling rate (requests per minute).
179    pub max_requests_per_minute: Option<u32>,
180    /// Interval in seconds between health check pings.
181    pub health_check_interval_secs: u64,
182    /// Timeout in seconds for health check requests.
183    pub health_check_timeout_secs: u64,
184    /// Substrings to identify expected cancel rejections for debug-level logging.
185    pub expected_reject_patterns: Vec<String>,
186    /// Substrings to identify idempotent success (order already cancelled/not found).
187    pub idempotent_success_patterns: Vec<String>,
188    /// Optional list of proxy URLs for path diversity.
189    ///
190    /// Each transport instance uses the proxy at its index. If the list is shorter
191    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
192    /// are ignored.
193    pub proxy_urls: Vec<Option<String>>,
194}
195
196impl Default for CancelBroadcasterConfig {
197    fn default() -> Self {
198        Self {
199            pool_size: 2,
200            api_key: None,
201            api_secret: None,
202            base_url: None,
203            testnet: false,
204            timeout_secs: Some(60),
205            max_retries: None,
206            retry_delay_ms: Some(1_000),
207            retry_delay_max_ms: Some(5_000),
208            recv_window_ms: Some(10_000),
209            max_requests_per_second: Some(10),
210            max_requests_per_minute: Some(120),
211            health_check_interval_secs: 30,
212            health_check_timeout_secs: 5,
213            expected_reject_patterns: vec![
214                r"Order had execInst of ParticipateDoNotInitiate".to_string(),
215            ],
216            idempotent_success_patterns: vec![
217                r"AlreadyCanceled".to_string(),
218                r"orderID not found".to_string(),
219                r"Unable to cancel order due to existing state".to_string(),
220            ],
221            proxy_urls: vec![],
222        }
223    }
224}
225
226/// Transport client wrapper with health monitoring.
227#[derive(Clone)]
228struct TransportClient {
229    /// Executor wrapped in Arc to enable cloning without requiring Clone on CancelExecutor.
230    ///
231    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
232    /// the executor across multiple TransportClient clones.
233    executor: Arc<dyn CancelExecutor>,
234    client_id: String,
235    healthy: Arc<AtomicBool>,
236    cancel_count: Arc<AtomicU64>,
237    error_count: Arc<AtomicU64>,
238}
239
240impl Debug for TransportClient {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        f.debug_struct(stringify!(TransportClient))
243            .field("client_id", &self.client_id)
244            .field("healthy", &self.healthy)
245            .field("cancel_count", &self.cancel_count)
246            .field("error_count", &self.error_count)
247            .finish()
248    }
249}
250
251impl TransportClient {
252    fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
253        Self {
254            executor: Arc::new(executor),
255            client_id,
256            healthy: Arc::new(AtomicBool::new(true)),
257            cancel_count: Arc::new(AtomicU64::new(0)),
258            error_count: Arc::new(AtomicU64::new(0)),
259        }
260    }
261
262    fn is_healthy(&self) -> bool {
263        self.healthy.load(Ordering::Relaxed)
264    }
265
266    fn mark_healthy(&self) {
267        self.healthy.store(true, Ordering::Relaxed);
268    }
269
270    fn mark_unhealthy(&self) {
271        self.healthy.store(false, Ordering::Relaxed);
272    }
273
274    fn get_cancel_count(&self) -> u64 {
275        self.cancel_count.load(Ordering::Relaxed)
276    }
277
278    fn get_error_count(&self) -> u64 {
279        self.error_count.load(Ordering::Relaxed)
280    }
281
282    async fn health_check(&self, timeout_secs: u64) -> bool {
283        match tokio::time::timeout(
284            Duration::from_secs(timeout_secs),
285            self.executor.health_check(),
286        )
287        .await
288        {
289            Ok(Ok(())) => {
290                self.mark_healthy();
291                true
292            }
293            Ok(Err(e)) => {
294                log::warn!("Health check failed for client {}: {e:?}", self.client_id);
295                self.mark_unhealthy();
296                false
297            }
298            Err(_) => {
299                log::warn!("Health check timeout for client {}", self.client_id);
300                self.mark_unhealthy();
301                false
302            }
303        }
304    }
305
306    async fn cancel_order(
307        &self,
308        instrument_id: InstrumentId,
309        client_order_id: Option<ClientOrderId>,
310        venue_order_id: Option<VenueOrderId>,
311    ) -> anyhow::Result<OrderStatusReport> {
312        self.cancel_count.fetch_add(1, Ordering::Relaxed);
313
314        match self
315            .executor
316            .cancel_order(instrument_id, client_order_id, venue_order_id)
317            .await
318        {
319            Ok(report) => {
320                self.mark_healthy();
321                Ok(report)
322            }
323            Err(e) => {
324                self.error_count.fetch_add(1, Ordering::Relaxed);
325                Err(e)
326            }
327        }
328    }
329}
330
331/// Broadcasts cancel requests to multiple HTTP clients for redundancy.
332///
333/// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
334/// in parallel, short-circuits when the first successful acknowledgement is received,
335/// and handles expected rejection patterns with appropriate log levels.
336#[cfg_attr(feature = "python", pyo3::pyclass)]
337#[derive(Debug)]
338pub struct CancelBroadcaster {
339    config: CancelBroadcasterConfig,
340    transports: Arc<Vec<TransportClient>>,
341    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
342    running: Arc<AtomicBool>,
343    total_cancels: Arc<AtomicU64>,
344    successful_cancels: Arc<AtomicU64>,
345    failed_cancels: Arc<AtomicU64>,
346    expected_rejects: Arc<AtomicU64>,
347    idempotent_successes: Arc<AtomicU64>,
348}
349
350impl CancelBroadcaster {
351    /// Creates a new [`CancelBroadcaster`] with internal HTTP client pool.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if any HTTP client fails to initialize.
356    pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
357        let mut transports = Vec::with_capacity(config.pool_size);
358
359        // Synthesize base_url when testnet is true but base_url is None
360        let base_url = if config.testnet && config.base_url.is_none() {
361            Some(BITMEX_HTTP_TESTNET_URL.to_string())
362        } else {
363            config.base_url.clone()
364        };
365
366        for i in 0..config.pool_size {
367            // Assign proxy from config list, or None if index exceeds list length
368            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
369
370            let client = BitmexHttpClient::with_credentials(
371                config.api_key.clone(),
372                config.api_secret.clone(),
373                base_url.clone(),
374                config.timeout_secs,
375                config.max_retries,
376                config.retry_delay_ms,
377                config.retry_delay_max_ms,
378                config.recv_window_ms,
379                config.max_requests_per_second,
380                config.max_requests_per_minute,
381                proxy_url,
382            )
383            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
384
385            transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
386        }
387
388        Ok(Self {
389            config,
390            transports: Arc::new(transports),
391            health_check_task: Arc::new(RwLock::new(None)),
392            running: Arc::new(AtomicBool::new(false)),
393            total_cancels: Arc::new(AtomicU64::new(0)),
394            successful_cancels: Arc::new(AtomicU64::new(0)),
395            failed_cancels: Arc::new(AtomicU64::new(0)),
396            expected_rejects: Arc::new(AtomicU64::new(0)),
397            idempotent_successes: Arc::new(AtomicU64::new(0)),
398        })
399    }
400
401    /// Starts the broadcaster and health check loop.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the broadcaster is already running.
406    pub async fn start(&self) -> anyhow::Result<()> {
407        if self.running.load(Ordering::Relaxed) {
408            return Ok(());
409        }
410
411        self.running.store(true, Ordering::Relaxed);
412
413        // Initial health check for all clients
414        self.run_health_checks().await;
415
416        // Start periodic health check task
417        let transports = Arc::clone(&self.transports);
418        let running = Arc::clone(&self.running);
419        let interval_secs = self.config.health_check_interval_secs;
420        let timeout_secs = self.config.health_check_timeout_secs;
421
422        let task = get_runtime().spawn(async move {
423            let mut ticker = interval(Duration::from_secs(interval_secs));
424            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
425
426            loop {
427                ticker.tick().await;
428
429                if !running.load(Ordering::Relaxed) {
430                    break;
431                }
432
433                let tasks: Vec<_> = transports
434                    .iter()
435                    .map(|t| t.health_check(timeout_secs))
436                    .collect();
437
438                let results = future::join_all(tasks).await;
439                let healthy_count = results.iter().filter(|&&r| r).count();
440
441                log::debug!(
442                    "Health check complete: {}/{} clients healthy",
443                    healthy_count,
444                    results.len()
445                );
446            }
447        });
448
449        *self.health_check_task.write().await = Some(task);
450
451        log::info!(
452            "CancelBroadcaster started with {} clients",
453            self.transports.len()
454        );
455
456        Ok(())
457    }
458
459    /// Stops the broadcaster and health check loop.
460    pub async fn stop(&self) {
461        if !self.running.load(Ordering::Relaxed) {
462            return;
463        }
464
465        self.running.store(false, Ordering::Relaxed);
466
467        if let Some(task) = self.health_check_task.write().await.take() {
468            task.abort();
469        }
470
471        log::info!("CancelBroadcaster stopped");
472    }
473
474    async fn run_health_checks(&self) {
475        let tasks: Vec<_> = self
476            .transports
477            .iter()
478            .map(|t| t.health_check(self.config.health_check_timeout_secs))
479            .collect();
480
481        let results = future::join_all(tasks).await;
482        let healthy_count = results.iter().filter(|&&r| r).count();
483
484        log::debug!(
485            "Health check complete: {}/{} clients healthy",
486            healthy_count,
487            results.len()
488        );
489    }
490
491    fn is_expected_reject(&self, error_message: &str) -> bool {
492        self.config
493            .expected_reject_patterns
494            .iter()
495            .any(|pattern| error_message.contains(pattern))
496    }
497
498    fn is_idempotent_success(&self, error_message: &str) -> bool {
499        self.config
500            .idempotent_success_patterns
501            .iter()
502            .any(|pattern| error_message.contains(pattern))
503    }
504
505    /// Processes cancel request results, handling success, idempotent success, and failures.
506    ///
507    /// This helper consolidates the common error handling loop used across all broadcast methods.
508    async fn process_cancel_results<T>(
509        &self,
510        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
511        idempotent_result: impl FnOnce() -> anyhow::Result<T>,
512        operation: &str,
513        params: String,
514        idempotent_reason: &str,
515    ) -> anyhow::Result<T>
516    where
517        T: Send + 'static,
518    {
519        let mut errors = Vec::new();
520
521        while !handles.is_empty() {
522            let current_handles = std::mem::take(&mut handles);
523            let (result, _idx, remaining) = future::select_all(current_handles).await;
524            handles = remaining.into_iter().collect();
525
526            match result {
527                Ok((client_id, Ok(result))) => {
528                    // First success - abort remaining handles
529                    for handle in &handles {
530                        handle.abort();
531                    }
532                    self.successful_cancels.fetch_add(1, Ordering::Relaxed);
533
534                    log::debug!("{operation} broadcast succeeded [{client_id}] {params}");
535
536                    return Ok(result);
537                }
538                Ok((client_id, Err(e))) => {
539                    let error_msg = e.to_string();
540
541                    if self.is_idempotent_success(&error_msg) {
542                        // First idempotent success - abort remaining handles and return success
543                        for handle in &handles {
544                            handle.abort();
545                        }
546                        self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
547
548                        log::debug!(
549                            "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
550                        );
551
552                        return idempotent_result();
553                    }
554
555                    if self.is_expected_reject(&error_msg) {
556                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
557                        log::debug!(
558                            "Expected {} rejection [{}]: {} {}",
559                            operation.to_lowercase(),
560                            client_id,
561                            error_msg,
562                            params
563                        );
564                        errors.push(error_msg);
565                    } else {
566                        log::warn!(
567                            "{operation} request failed [{client_id}]: {error_msg} {params}"
568                        );
569                        errors.push(error_msg);
570                    }
571                }
572                Err(e) => {
573                    log::warn!("{operation} task join error: {e:?}");
574                    errors.push(format!("Task panicked: {e:?}"));
575                }
576            }
577        }
578
579        // All tasks failed
580        self.failed_cancels.fetch_add(1, Ordering::Relaxed);
581        log::error!(
582            "All {} requests failed: {errors:?} {params}",
583            operation.to_lowercase(),
584        );
585        Err(anyhow::anyhow!(
586            "All {} requests failed: {errors:?}",
587            operation.to_lowercase(),
588        ))
589    }
590
591    /// Broadcasts a single cancel request to all healthy clients in parallel.
592    ///
593    /// # Returns
594    ///
595    /// - `Ok(Some(report))` if successfully cancelled with a report.
596    /// - `Ok(None)` if the order was already cancelled (idempotent success).
597    /// - `Err` if all requests failed.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if all cancel requests fail or no healthy clients are available.
602    pub async fn broadcast_cancel(
603        &self,
604        instrument_id: InstrumentId,
605        client_order_id: Option<ClientOrderId>,
606        venue_order_id: Option<VenueOrderId>,
607    ) -> anyhow::Result<Option<OrderStatusReport>> {
608        self.total_cancels.fetch_add(1, Ordering::Relaxed);
609
610        let healthy_transports: Vec<TransportClient> = self
611            .transports
612            .iter()
613            .filter(|t| t.is_healthy())
614            .cloned()
615            .collect();
616
617        if healthy_transports.is_empty() {
618            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
619            anyhow::bail!("No healthy transport clients available");
620        }
621
622        let mut handles = Vec::new();
623        for transport in healthy_transports {
624            let handle = get_runtime().spawn(async move {
625                let client_id = transport.client_id.clone();
626                let result = transport
627                    .cancel_order(instrument_id, client_order_id, venue_order_id)
628                    .await
629                    .map(Some); // Wrap success in Some for Option<OrderStatusReport>
630                (client_id, result)
631            });
632            handles.push(handle);
633        }
634
635        self.process_cancel_results(
636            handles,
637            || Ok(None),
638            "Cancel",
639            format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
640            "order already cancelled/not found",
641        )
642        .await
643    }
644
645    /// Broadcasts a batch cancel request to all healthy clients in parallel.
646    ///
647    /// # Errors
648    ///
649    /// Returns an error if all cancel requests fail or no healthy clients are available.
650    pub async fn broadcast_batch_cancel(
651        &self,
652        instrument_id: InstrumentId,
653        client_order_ids: Option<Vec<ClientOrderId>>,
654        venue_order_ids: Option<Vec<VenueOrderId>>,
655    ) -> anyhow::Result<Vec<OrderStatusReport>> {
656        self.total_cancels.fetch_add(1, Ordering::Relaxed);
657
658        let healthy_transports: Vec<TransportClient> = self
659            .transports
660            .iter()
661            .filter(|t| t.is_healthy())
662            .cloned()
663            .collect();
664
665        if healthy_transports.is_empty() {
666            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
667            anyhow::bail!("No healthy transport clients available");
668        }
669
670        let mut handles = Vec::new();
671
672        for transport in healthy_transports {
673            let client_order_ids_clone = client_order_ids.clone();
674            let venue_order_ids_clone = venue_order_ids.clone();
675            let handle = get_runtime().spawn(async move {
676                let client_id = transport.client_id.clone();
677                let result = transport
678                    .executor
679                    .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
680                    .await;
681                (client_id, result)
682            });
683            handles.push(handle);
684        }
685
686        self.process_cancel_results(
687            handles,
688            || Ok(Vec::new()),
689            "Batch cancel",
690            format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
691            "orders already cancelled/not found",
692        )
693        .await
694    }
695
696    /// Broadcasts a cancel all request to all healthy clients in parallel.
697    ///
698    /// # Errors
699    ///
700    /// Returns an error if all cancel requests fail or no healthy clients are available.
701    pub async fn broadcast_cancel_all(
702        &self,
703        instrument_id: InstrumentId,
704        order_side: Option<OrderSide>,
705    ) -> anyhow::Result<Vec<OrderStatusReport>> {
706        self.total_cancels.fetch_add(1, Ordering::Relaxed);
707
708        let healthy_transports: Vec<TransportClient> = self
709            .transports
710            .iter()
711            .filter(|t| t.is_healthy())
712            .cloned()
713            .collect();
714
715        if healthy_transports.is_empty() {
716            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
717            anyhow::bail!("No healthy transport clients available");
718        }
719
720        let mut handles = Vec::new();
721        for transport in healthy_transports {
722            let handle = get_runtime().spawn(async move {
723                let client_id = transport.client_id.clone();
724                let result = transport
725                    .executor
726                    .cancel_all_orders(instrument_id, order_side)
727                    .await;
728                (client_id, result)
729            });
730            handles.push(handle);
731        }
732
733        self.process_cancel_results(
734            handles,
735            || Ok(Vec::new()),
736            "Cancel all",
737            format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
738            "no orders to cancel",
739        )
740        .await
741    }
742
743    /// Gets broadcaster metrics.
744    pub fn get_metrics(&self) -> BroadcasterMetrics {
745        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
746        let total_clients = self.transports.len();
747
748        BroadcasterMetrics {
749            total_cancels: self.total_cancels.load(Ordering::Relaxed),
750            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
751            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
752            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
753            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
754            healthy_clients,
755            total_clients,
756        }
757    }
758
759    /// Gets broadcaster metrics (async version for use within async context).
760    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
761        self.get_metrics()
762    }
763
764    /// Gets per-client statistics.
765    pub fn get_client_stats(&self) -> Vec<ClientStats> {
766        self.transports
767            .iter()
768            .map(|t| ClientStats {
769                client_id: t.client_id.clone(),
770                healthy: t.is_healthy(),
771                cancel_count: t.get_cancel_count(),
772                error_count: t.get_error_count(),
773            })
774            .collect()
775    }
776
777    /// Gets per-client statistics (async version for use within async context).
778    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
779        self.get_client_stats()
780    }
781
782    /// Caches an instrument in all HTTP clients in the pool.
783    pub fn cache_instrument(&self, instrument: InstrumentAny) {
784        for transport in self.transports.iter() {
785            transport.executor.add_instrument(instrument.clone());
786        }
787    }
788
789    #[must_use]
790    pub fn clone_for_async(&self) -> Self {
791        Self {
792            config: self.config.clone(),
793            transports: Arc::clone(&self.transports),
794            health_check_task: Arc::clone(&self.health_check_task),
795            running: Arc::clone(&self.running),
796            total_cancels: Arc::clone(&self.total_cancels),
797            successful_cancels: Arc::clone(&self.successful_cancels),
798            failed_cancels: Arc::clone(&self.failed_cancels),
799            expected_rejects: Arc::clone(&self.expected_rejects),
800            idempotent_successes: Arc::clone(&self.idempotent_successes),
801        }
802    }
803
804    #[cfg(test)]
805    fn new_with_transports(
806        config: CancelBroadcasterConfig,
807        transports: Vec<TransportClient>,
808    ) -> Self {
809        Self {
810            config,
811            transports: Arc::new(transports),
812            health_check_task: Arc::new(RwLock::new(None)),
813            running: Arc::new(AtomicBool::new(false)),
814            total_cancels: Arc::new(AtomicU64::new(0)),
815            successful_cancels: Arc::new(AtomicU64::new(0)),
816            failed_cancels: Arc::new(AtomicU64::new(0)),
817            expected_rejects: Arc::new(AtomicU64::new(0)),
818            idempotent_successes: Arc::new(AtomicU64::new(0)),
819        }
820    }
821}
822
823/// Broadcaster metrics snapshot.
824#[derive(Debug, Clone)]
825pub struct BroadcasterMetrics {
826    pub total_cancels: u64,
827    pub successful_cancels: u64,
828    pub failed_cancels: u64,
829    pub expected_rejects: u64,
830    pub idempotent_successes: u64,
831    pub healthy_clients: usize,
832    pub total_clients: usize,
833}
834
835/// Per-client statistics.
836#[derive(Debug, Clone)]
837pub struct ClientStats {
838    pub client_id: String,
839    pub healthy: bool,
840    pub cancel_count: u64,
841    pub error_count: u64,
842}
843
844#[cfg(test)]
845mod tests {
846    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
847
848    use nautilus_core::UUID4;
849    use nautilus_model::{
850        enums::{
851            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
852        },
853        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
854        reports::OrderStatusReport,
855        types::{Price, Quantity},
856    };
857
858    use super::*;
859
860    /// Mock executor for testing.
861    #[derive(Clone)]
862    #[allow(clippy::type_complexity)]
863    struct MockExecutor {
864        handler: Arc<
865            dyn Fn(
866                    InstrumentId,
867                    Option<ClientOrderId>,
868                    Option<VenueOrderId>,
869                )
870                    -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
871                + Send
872                + Sync,
873        >,
874    }
875
876    impl MockExecutor {
877        fn new<F, Fut>(handler: F) -> Self
878        where
879            F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
880                + Send
881                + Sync
882                + 'static,
883            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
884        {
885            Self {
886                handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
887            }
888        }
889    }
890
891    impl CancelExecutor for MockExecutor {
892        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
893            Box::pin(async { Ok(()) })
894        }
895
896        fn cancel_order(
897            &self,
898            instrument_id: InstrumentId,
899            client_order_id: Option<ClientOrderId>,
900            venue_order_id: Option<VenueOrderId>,
901        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
902            (self.handler)(instrument_id, client_order_id, venue_order_id)
903        }
904
905        fn cancel_orders(
906            &self,
907            _instrument_id: InstrumentId,
908            _client_order_ids: Option<Vec<ClientOrderId>>,
909            _venue_order_ids: Option<Vec<VenueOrderId>>,
910        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
911        {
912            Box::pin(async { Ok(Vec::new()) })
913        }
914
915        fn cancel_all_orders(
916            &self,
917            instrument_id: InstrumentId,
918            _order_side: Option<OrderSide>,
919        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
920        {
921            // Try to get result from the single-order handler to propagate errors
922            let handler = Arc::clone(&self.handler);
923            Box::pin(async move {
924                // Call the handler to check if it would fail
925                let result = handler(instrument_id, None, None).await;
926                match result {
927                    Ok(_) => Ok(Vec::new()),
928                    Err(e) => Err(e),
929                }
930            })
931        }
932
933        fn add_instrument(&self, _instrument: InstrumentAny) {
934            // No-op for mock
935        }
936    }
937
938    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
939        OrderStatusReport {
940            account_id: AccountId::from("BITMEX-001"),
941            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
942            venue_order_id: VenueOrderId::from(venue_order_id),
943            order_side: OrderSide::Buy,
944            order_type: OrderType::Limit,
945            time_in_force: TimeInForce::Gtc,
946            order_status: OrderStatus::Canceled,
947            price: Some(Price::new(50000.0, 2)),
948            quantity: Quantity::new(100.0, 0),
949            filled_qty: Quantity::new(0.0, 0),
950            report_id: UUID4::new(),
951            ts_accepted: 0.into(),
952            ts_last: 0.into(),
953            ts_init: 0.into(),
954            client_order_id: None,
955            avg_px: None,
956            trigger_price: None,
957            trigger_type: None,
958            contingency_type: ContingencyType::NoContingency,
959            expire_time: None,
960            order_list_id: None,
961            venue_position_id: None,
962            linked_order_ids: None,
963            parent_order_id: None,
964            display_qty: None,
965            limit_offset: None,
966            trailing_offset: None,
967            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
968            post_only: false,
969            reduce_only: false,
970            cancel_reason: None,
971            ts_triggered: None,
972        }
973    }
974
975    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
976    where
977        F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
978            + Send
979            + Sync
980            + 'static,
981        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
982    {
983        let executor = MockExecutor::new(handler);
984        TransportClient::new(executor, client_id.to_string())
985    }
986
987    #[tokio::test]
988    async fn test_broadcast_cancel_immediate_success() {
989        let report = create_test_report("ORDER-1");
990        let report_clone = report.clone();
991
992        let transports = vec![
993            create_stub_transport("client-0", move |_, _, _| {
994                let report = report_clone.clone();
995                async move { Ok(report) }
996            }),
997            create_stub_transport("client-1", |_, _, _| async {
998                tokio::time::sleep(Duration::from_secs(10)).await;
999                anyhow::bail!("Should be aborted")
1000            }),
1001        ];
1002
1003        let config = CancelBroadcasterConfig::default();
1004        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1005
1006        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1007        let result = broadcaster
1008            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1009            .await;
1010
1011        assert!(result.is_ok());
1012        let returned_report = result.unwrap();
1013        assert!(returned_report.is_some());
1014        assert_eq!(
1015            returned_report.unwrap().venue_order_id,
1016            report.venue_order_id
1017        );
1018
1019        let metrics = broadcaster.get_metrics_async().await;
1020        assert_eq!(metrics.successful_cancels, 1);
1021        assert_eq!(metrics.failed_cancels, 0);
1022        assert_eq!(metrics.total_cancels, 1);
1023    }
1024
1025    #[tokio::test]
1026    async fn test_broadcast_cancel_idempotent_success() {
1027        let transports = vec![
1028            create_stub_transport("client-0", |_, _, _| async {
1029                anyhow::bail!("AlreadyCanceled")
1030            }),
1031            create_stub_transport("client-1", |_, _, _| async {
1032                tokio::time::sleep(Duration::from_secs(10)).await;
1033                anyhow::bail!("Should be aborted")
1034            }),
1035        ];
1036
1037        let config = CancelBroadcasterConfig::default();
1038        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1039
1040        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1041        let result = broadcaster
1042            .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1043            .await;
1044
1045        assert!(result.is_ok());
1046        assert!(result.unwrap().is_none());
1047
1048        let metrics = broadcaster.get_metrics_async().await;
1049        assert_eq!(metrics.idempotent_successes, 1);
1050        assert_eq!(metrics.successful_cancels, 0);
1051        assert_eq!(metrics.failed_cancels, 0);
1052    }
1053
1054    #[tokio::test]
1055    async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1056        let transports = vec![
1057            create_stub_transport("client-0", |_, _, _| async {
1058                anyhow::bail!("502 Bad Gateway")
1059            }),
1060            create_stub_transport("client-1", |_, _, _| async {
1061                anyhow::bail!("orderID not found")
1062            }),
1063        ];
1064
1065        let config = CancelBroadcasterConfig::default();
1066        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1067
1068        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1069        let result = broadcaster
1070            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1071            .await;
1072
1073        assert!(result.is_ok());
1074        assert!(result.unwrap().is_none());
1075
1076        let metrics = broadcaster.get_metrics_async().await;
1077        assert_eq!(metrics.idempotent_successes, 1);
1078        assert_eq!(metrics.failed_cancels, 0);
1079    }
1080
1081    #[tokio::test]
1082    async fn test_broadcast_cancel_all_failures() {
1083        let transports = vec![
1084            create_stub_transport("client-0", |_, _, _| async {
1085                anyhow::bail!("502 Bad Gateway")
1086            }),
1087            create_stub_transport("client-1", |_, _, _| async {
1088                anyhow::bail!("Connection refused")
1089            }),
1090        ];
1091
1092        let config = CancelBroadcasterConfig::default();
1093        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1094
1095        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1096        let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1097
1098        assert!(result.is_err());
1099        assert!(
1100            result
1101                .unwrap_err()
1102                .to_string()
1103                .contains("All cancel all requests failed")
1104        );
1105
1106        let metrics = broadcaster.get_metrics_async().await;
1107        assert_eq!(metrics.failed_cancels, 1);
1108        assert_eq!(metrics.successful_cancels, 0);
1109        assert_eq!(metrics.idempotent_successes, 0);
1110    }
1111
1112    #[tokio::test]
1113    async fn test_broadcast_cancel_no_healthy_clients() {
1114        let transport = create_stub_transport("client-0", |_, _, _| async {
1115            Ok(create_test_report("ORDER-1"))
1116        });
1117        transport.healthy.store(false, Ordering::Relaxed);
1118
1119        let config = CancelBroadcasterConfig::default();
1120        let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1121
1122        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1123        let result = broadcaster
1124            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1125            .await;
1126
1127        assert!(result.is_err());
1128        assert!(
1129            result
1130                .unwrap_err()
1131                .to_string()
1132                .contains("No healthy transport clients available")
1133        );
1134
1135        let metrics = broadcaster.get_metrics_async().await;
1136        assert_eq!(metrics.failed_cancels, 1);
1137    }
1138
1139    #[tokio::test]
1140    async fn test_broadcast_cancel_metrics_increment() {
1141        let report1 = create_test_report("ORDER-1");
1142        let report1_clone = report1.clone();
1143        let report2 = create_test_report("ORDER-2");
1144        let report2_clone = report2.clone();
1145
1146        let transports = vec![
1147            create_stub_transport("client-0", move |_, _, _| {
1148                let report = report1_clone.clone();
1149                async move { Ok(report) }
1150            }),
1151            create_stub_transport("client-1", move |_, _, _| {
1152                let report = report2_clone.clone();
1153                async move { Ok(report) }
1154            }),
1155        ];
1156
1157        let config = CancelBroadcasterConfig::default();
1158        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1159
1160        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1161
1162        let _ = broadcaster
1163            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1164            .await;
1165
1166        let _ = broadcaster
1167            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1168            .await;
1169
1170        let metrics = broadcaster.get_metrics_async().await;
1171        assert_eq!(metrics.total_cancels, 2);
1172        assert_eq!(metrics.successful_cancels, 2);
1173    }
1174
1175    #[tokio::test]
1176    async fn test_broadcast_cancel_expected_reject_pattern() {
1177        let transports = vec![
1178            create_stub_transport("client-0", |_, _, _| async {
1179                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1180            }),
1181            create_stub_transport("client-1", |_, _, _| async {
1182                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1183            }),
1184        ];
1185
1186        let config = CancelBroadcasterConfig::default();
1187        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1188
1189        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1190        let result = broadcaster
1191            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1192            .await;
1193
1194        assert!(result.is_err());
1195
1196        let metrics = broadcaster.get_metrics_async().await;
1197        assert_eq!(metrics.expected_rejects, 2);
1198        assert_eq!(metrics.failed_cancels, 1);
1199    }
1200
1201    #[tokio::test]
1202    async fn test_broadcaster_creation_with_pool() {
1203        let config = CancelBroadcasterConfig {
1204            pool_size: 3,
1205            api_key: Some("test_key".to_string()),
1206            api_secret: Some("test_secret".to_string()),
1207            base_url: Some("https://test.example.com".to_string()),
1208            testnet: false,
1209            timeout_secs: Some(5),
1210            max_retries: Some(2),
1211            retry_delay_ms: Some(100),
1212            retry_delay_max_ms: Some(1000),
1213            recv_window_ms: Some(5000),
1214            max_requests_per_second: Some(10),
1215            max_requests_per_minute: Some(100),
1216            health_check_interval_secs: 30,
1217            health_check_timeout_secs: 5,
1218            expected_reject_patterns: vec!["test_pattern".to_string()],
1219            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1220            proxy_urls: vec![],
1221        };
1222
1223        let broadcaster = CancelBroadcaster::new(config.clone());
1224        assert!(broadcaster.is_ok());
1225
1226        let broadcaster = broadcaster.unwrap();
1227        let metrics = broadcaster.get_metrics_async().await;
1228
1229        assert_eq!(metrics.total_clients, 3);
1230        assert_eq!(metrics.total_cancels, 0);
1231        assert_eq!(metrics.successful_cancels, 0);
1232        assert_eq!(metrics.failed_cancels, 0);
1233    }
1234
1235    #[tokio::test]
1236    async fn test_broadcaster_lifecycle() {
1237        let config = CancelBroadcasterConfig {
1238            pool_size: 2,
1239            api_key: Some("test_key".to_string()),
1240            api_secret: Some("test_secret".to_string()),
1241            base_url: Some("https://test.example.com".to_string()),
1242            testnet: false,
1243            timeout_secs: Some(5),
1244            max_retries: None,
1245            retry_delay_ms: None,
1246            retry_delay_max_ms: None,
1247            recv_window_ms: None,
1248            max_requests_per_second: None,
1249            max_requests_per_minute: None,
1250            health_check_interval_secs: 60, // Long interval so it won't tick during test
1251            health_check_timeout_secs: 1,
1252            expected_reject_patterns: vec![],
1253            idempotent_success_patterns: vec![],
1254            proxy_urls: vec![],
1255        };
1256
1257        let broadcaster = CancelBroadcaster::new(config).unwrap();
1258
1259        // Should not be running initially
1260        assert!(!broadcaster.running.load(Ordering::Relaxed));
1261
1262        // Start broadcaster
1263        let start_result = broadcaster.start().await;
1264        assert!(start_result.is_ok());
1265        assert!(broadcaster.running.load(Ordering::Relaxed));
1266
1267        // Starting again should be idempotent
1268        let start_again = broadcaster.start().await;
1269        assert!(start_again.is_ok());
1270
1271        // Stop broadcaster
1272        broadcaster.stop().await;
1273        assert!(!broadcaster.running.load(Ordering::Relaxed));
1274
1275        // Stopping again should be safe
1276        broadcaster.stop().await;
1277        assert!(!broadcaster.running.load(Ordering::Relaxed));
1278    }
1279
1280    #[tokio::test]
1281    async fn test_client_stats_collection() {
1282        let config = CancelBroadcasterConfig {
1283            pool_size: 2,
1284            api_key: Some("test_key".to_string()),
1285            api_secret: Some("test_secret".to_string()),
1286            base_url: Some("https://test.example.com".to_string()),
1287            testnet: false,
1288            timeout_secs: Some(5),
1289            max_retries: None,
1290            retry_delay_ms: None,
1291            retry_delay_max_ms: None,
1292            recv_window_ms: None,
1293            max_requests_per_second: None,
1294            max_requests_per_minute: None,
1295            health_check_interval_secs: 60,
1296            health_check_timeout_secs: 5,
1297            expected_reject_patterns: vec![],
1298            idempotent_success_patterns: vec![],
1299            proxy_urls: vec![],
1300        };
1301
1302        let broadcaster = CancelBroadcaster::new(config).unwrap();
1303        let stats = broadcaster.get_client_stats_async().await;
1304
1305        assert_eq!(stats.len(), 2);
1306        assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1307        assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1308        assert!(stats[0].healthy); // Should be healthy initially
1309        assert!(stats[1].healthy);
1310        assert_eq!(stats[0].cancel_count, 0);
1311        assert_eq!(stats[1].cancel_count, 0);
1312        assert_eq!(stats[0].error_count, 0);
1313        assert_eq!(stats[1].error_count, 0);
1314    }
1315
1316    #[tokio::test]
1317    async fn test_testnet_config_sets_base_url() {
1318        let config = CancelBroadcasterConfig {
1319            pool_size: 1,
1320            api_key: Some("test_key".to_string()),
1321            api_secret: Some("test_secret".to_string()),
1322            base_url: None, // Not specified
1323            testnet: true,  // But testnet is true
1324            timeout_secs: Some(5),
1325            max_retries: None,
1326            retry_delay_ms: None,
1327            retry_delay_max_ms: None,
1328            recv_window_ms: None,
1329            max_requests_per_second: None,
1330            max_requests_per_minute: None,
1331            health_check_interval_secs: 60,
1332            health_check_timeout_secs: 5,
1333            expected_reject_patterns: vec![],
1334            idempotent_success_patterns: vec![],
1335            proxy_urls: vec![],
1336        };
1337
1338        let broadcaster = CancelBroadcaster::new(config);
1339        assert!(broadcaster.is_ok());
1340    }
1341
1342    #[tokio::test]
1343    async fn test_default_config() {
1344        let config = CancelBroadcasterConfig {
1345            api_key: Some("test_key".to_string()),
1346            api_secret: Some("test_secret".to_string()),
1347            base_url: Some("https://test.example.com".to_string()),
1348            ..Default::default()
1349        };
1350
1351        let broadcaster = CancelBroadcaster::new(config);
1352        assert!(broadcaster.is_ok());
1353
1354        let broadcaster = broadcaster.unwrap();
1355        let metrics = broadcaster.get_metrics_async().await;
1356
1357        // Default pool_size is 2
1358        assert_eq!(metrics.total_clients, 2);
1359    }
1360
1361    #[tokio::test]
1362    async fn test_clone_for_async() {
1363        let config = CancelBroadcasterConfig {
1364            pool_size: 1,
1365            api_key: Some("test_key".to_string()),
1366            api_secret: Some("test_secret".to_string()),
1367            base_url: Some("https://test.example.com".to_string()),
1368            testnet: false,
1369            timeout_secs: Some(5),
1370            max_retries: None,
1371            retry_delay_ms: None,
1372            retry_delay_max_ms: None,
1373            recv_window_ms: None,
1374            max_requests_per_second: None,
1375            max_requests_per_minute: None,
1376            health_check_interval_secs: 60,
1377            health_check_timeout_secs: 5,
1378            expected_reject_patterns: vec![],
1379            idempotent_success_patterns: vec![],
1380            proxy_urls: vec![],
1381        };
1382
1383        let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1384
1385        // Increment a metric on original
1386        broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1387
1388        // Clone should share the same atomic
1389        let broadcaster2 = broadcaster1.clone_for_async();
1390        let metrics2 = broadcaster2.get_metrics_async().await;
1391
1392        assert_eq!(metrics2.total_cancels, 1); // Should see the increment
1393
1394        // Modify through clone
1395        broadcaster2
1396            .successful_cancels
1397            .fetch_add(5, Ordering::Relaxed);
1398
1399        // Original should see the change
1400        let metrics1 = broadcaster1.get_metrics_async().await;
1401        assert_eq!(metrics1.successful_cancels, 5);
1402    }
1403
1404    #[tokio::test]
1405    async fn test_pattern_matching() {
1406        // Test that pattern matching works for expected rejects and idempotent successes
1407        let config = CancelBroadcasterConfig {
1408            pool_size: 1,
1409            api_key: Some("test_key".to_string()),
1410            api_secret: Some("test_secret".to_string()),
1411            base_url: Some("https://test.example.com".to_string()),
1412            testnet: false,
1413            timeout_secs: Some(5),
1414            max_retries: None,
1415            retry_delay_ms: None,
1416            retry_delay_max_ms: None,
1417            recv_window_ms: None,
1418            max_requests_per_second: None,
1419            max_requests_per_minute: None,
1420            health_check_interval_secs: 60,
1421            health_check_timeout_secs: 5,
1422            expected_reject_patterns: vec![
1423                "ParticipateDoNotInitiate".to_string(),
1424                "Close-only".to_string(),
1425            ],
1426            idempotent_success_patterns: vec![
1427                "AlreadyCanceled".to_string(),
1428                "orderID not found".to_string(),
1429                "Unable to cancel".to_string(),
1430            ],
1431            proxy_urls: vec![],
1432        };
1433
1434        let broadcaster = CancelBroadcaster::new(config).unwrap();
1435
1436        // Test expected reject patterns
1437        assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1438        assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1439        assert!(!broadcaster.is_expected_reject("Connection timeout"));
1440
1441        // Test idempotent success patterns
1442        assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1443        assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1444        assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1445        assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1446    }
1447
1448    // Happy-path coverage for broadcast_batch_cancel and broadcast_cancel_all
1449    // Note: These use simplified stubs since batch/cancel-all bypass test_handler
1450    // Full HTTP mocking tested in integration tests
1451    #[tokio::test]
1452    async fn test_broadcast_batch_cancel_structure() {
1453        // Validates broadcaster structure and metric initialization
1454        let config = CancelBroadcasterConfig {
1455            pool_size: 2,
1456            api_key: Some("test_key".to_string()),
1457            api_secret: Some("test_secret".to_string()),
1458            base_url: Some("https://test.example.com".to_string()),
1459            testnet: false,
1460            timeout_secs: Some(5),
1461            max_retries: None,
1462            retry_delay_ms: None,
1463            retry_delay_max_ms: None,
1464            recv_window_ms: None,
1465            max_requests_per_second: None,
1466            max_requests_per_minute: None,
1467            health_check_interval_secs: 60,
1468            health_check_timeout_secs: 5,
1469            expected_reject_patterns: vec![],
1470            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1471            proxy_urls: vec![],
1472        };
1473
1474        let broadcaster = CancelBroadcaster::new(config).unwrap();
1475        let metrics = broadcaster.get_metrics_async().await;
1476
1477        // Verify initial state
1478        assert_eq!(metrics.total_clients, 2);
1479        assert_eq!(metrics.total_cancels, 0);
1480        assert_eq!(metrics.successful_cancels, 0);
1481        assert_eq!(metrics.failed_cancels, 0);
1482    }
1483
1484    #[tokio::test]
1485    async fn test_broadcast_cancel_all_structure() {
1486        // Validates broadcaster structure for cancel_all operations
1487        let config = CancelBroadcasterConfig {
1488            pool_size: 3,
1489            api_key: Some("test_key".to_string()),
1490            api_secret: Some("test_secret".to_string()),
1491            base_url: Some("https://test.example.com".to_string()),
1492            testnet: false,
1493            timeout_secs: Some(5),
1494            max_retries: None,
1495            retry_delay_ms: None,
1496            retry_delay_max_ms: None,
1497            recv_window_ms: None,
1498            max_requests_per_second: None,
1499            max_requests_per_minute: None,
1500            health_check_interval_secs: 60,
1501            health_check_timeout_secs: 5,
1502            expected_reject_patterns: vec![],
1503            idempotent_success_patterns: vec!["orderID not found".to_string()],
1504            proxy_urls: vec![],
1505        };
1506
1507        let broadcaster = CancelBroadcaster::new(config).unwrap();
1508        let metrics = broadcaster.get_metrics_async().await;
1509
1510        // Verify pool size and initial metrics
1511        assert_eq!(metrics.total_clients, 3);
1512        assert_eq!(metrics.healthy_clients, 3);
1513        assert_eq!(metrics.total_cancels, 0);
1514    }
1515
1516    // Metric health tests - validates that idempotent successes don't increment failed_cancels
1517    #[tokio::test]
1518    async fn test_single_cancel_metrics_with_mixed_responses() {
1519        // Test similar to test_broadcast_cancel_mixed_idempotent_and_failure
1520        // but explicitly validates metric health
1521        let transports = vec![
1522            create_stub_transport("client-0", |_, _, _| async {
1523                anyhow::bail!("Connection timeout")
1524            }),
1525            create_stub_transport("client-1", |_, _, _| async {
1526                anyhow::bail!("AlreadyCanceled")
1527            }),
1528        ];
1529
1530        let config = CancelBroadcasterConfig::default();
1531        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1532
1533        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1534        let result = broadcaster
1535            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1536            .await;
1537
1538        // Should succeed with idempotent
1539        assert!(result.is_ok());
1540        assert!(result.unwrap().is_none());
1541
1542        // Verify metrics: idempotent success doesn't count as failure
1543        let metrics = broadcaster.get_metrics_async().await;
1544        assert_eq!(
1545            metrics.failed_cancels, 0,
1546            "Idempotent success should not increment failed_cancels"
1547        );
1548        assert_eq!(metrics.idempotent_successes, 1);
1549        assert_eq!(metrics.successful_cancels, 0);
1550    }
1551
1552    #[tokio::test]
1553    async fn test_metrics_initialization_and_health() {
1554        // Validates that metrics start at zero and clients start healthy
1555        let config = CancelBroadcasterConfig {
1556            pool_size: 4,
1557            api_key: Some("test_key".to_string()),
1558            api_secret: Some("test_secret".to_string()),
1559            base_url: Some("https://test.example.com".to_string()),
1560            testnet: false,
1561            timeout_secs: Some(5),
1562            max_retries: None,
1563            retry_delay_ms: None,
1564            retry_delay_max_ms: None,
1565            recv_window_ms: None,
1566            max_requests_per_second: None,
1567            max_requests_per_minute: None,
1568            health_check_interval_secs: 60,
1569            health_check_timeout_secs: 5,
1570            expected_reject_patterns: vec![],
1571            idempotent_success_patterns: vec![],
1572            proxy_urls: vec![],
1573        };
1574
1575        let broadcaster = CancelBroadcaster::new(config).unwrap();
1576        let metrics = broadcaster.get_metrics_async().await;
1577
1578        // All metrics should start at zero
1579        assert_eq!(metrics.total_cancels, 0);
1580        assert_eq!(metrics.successful_cancels, 0);
1581        assert_eq!(metrics.failed_cancels, 0);
1582        assert_eq!(metrics.expected_rejects, 0);
1583        assert_eq!(metrics.idempotent_successes, 0);
1584
1585        // All clients should start healthy
1586        assert_eq!(metrics.healthy_clients, 4);
1587        assert_eq!(metrics.total_clients, 4);
1588    }
1589
1590    // Health-check task lifecycle test
1591    #[tokio::test]
1592    async fn test_health_check_task_lifecycle() {
1593        let config = CancelBroadcasterConfig {
1594            pool_size: 1,
1595            api_key: Some("test_key".to_string()),
1596            api_secret: Some("test_secret".to_string()),
1597            base_url: Some("https://test.example.com".to_string()),
1598            testnet: false,
1599            timeout_secs: Some(5),
1600            max_retries: None,
1601            retry_delay_ms: None,
1602            retry_delay_max_ms: None,
1603            recv_window_ms: None,
1604            max_requests_per_second: None,
1605            max_requests_per_minute: None,
1606            health_check_interval_secs: 1, // Very short interval
1607            health_check_timeout_secs: 1,
1608            expected_reject_patterns: vec![],
1609            idempotent_success_patterns: vec![],
1610            proxy_urls: vec![],
1611        };
1612
1613        let broadcaster = CancelBroadcaster::new(config).unwrap();
1614
1615        // Start the broadcaster
1616        broadcaster.start().await.unwrap();
1617        assert!(broadcaster.running.load(Ordering::Relaxed));
1618
1619        // Verify task handle exists
1620        {
1621            let task_guard = broadcaster.health_check_task.read().await;
1622            assert!(task_guard.is_some());
1623        }
1624
1625        // Stop the broadcaster
1626        broadcaster.stop().await;
1627        assert!(!broadcaster.running.load(Ordering::Relaxed));
1628
1629        // Verify task handle has been cleared
1630        {
1631            let task_guard = broadcaster.health_check_task.read().await;
1632            assert!(task_guard.is_none());
1633        }
1634    }
1635}