nautilus_bitmex/execution/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Cancel request broadcaster for redundant order cancellation.
17//!
18//! This module provides the [`CancelBroadcaster`] which fans out cancel requests
19//! to multiple HTTP clients in parallel for redundancy. Key design patterns:
20//!
21//! - **Dependency injection via traits**: Uses `CancelExecutor` trait to abstract
22//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
23//! - **Trait objects over generics**: Uses `Arc<dyn CancelExecutor>` to avoid
24//!   generic type parameters on the public API (simpler Python FFI).
25//! - **Short-circuit on first success**: Aborts remaining requests once any client
26//!   succeeds, minimizing latency.
27//! - **Idempotent success handling**: Recognizes "already cancelled" responses as
28//!   successful outcomes.
29
30// TODO: Replace boxed futures in `CancelExecutor` once stable async trait object support
31// lands so we can drop the per-call heap allocation
32
33use std::{
34    sync::{
35        Arc,
36        atomic::{AtomicBool, AtomicU64, Ordering},
37    },
38    time::Duration,
39};
40
41use futures_util::future;
42use nautilus_model::{
43    enums::OrderSide,
44    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
45    instruments::InstrumentAny,
46    reports::OrderStatusReport,
47};
48use tokio::{sync::RwLock, task::JoinHandle, time::interval};
49
50use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
51
52/// Trait for order cancellation operations.
53///
54/// This trait abstracts the execution layer to enable dependency injection and testing
55/// without conditional compilation. The broadcaster holds executors as `Arc<dyn CancelExecutor>`
56/// to avoid generic type parameters that would complicate the Python FFI boundary.
57///
58/// # Thread Safety
59///
60/// All methods must be safe to call concurrently from multiple threads. Implementations
61/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
62///
63/// # Error Handling
64///
65/// Methods return `anyhow::Result` for flexibility. Implementers should provide
66/// meaningful error messages that can be logged and tracked by the broadcaster.
67///
68/// # Implementation Note
69///
70/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
71/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
72/// `Clone`) to be used without modification.
73trait CancelExecutor: Send + Sync {
74    /// Adds an instrument for caching.
75    fn add_instrument(&self, instrument: InstrumentAny);
76
77    /// Performs a health check on the executor.
78    fn health_check(
79        &self,
80    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>;
81
82    /// Cancels a single order.
83    fn cancel_order(
84        &self,
85        instrument_id: InstrumentId,
86        client_order_id: Option<ClientOrderId>,
87        venue_order_id: Option<VenueOrderId>,
88    ) -> std::pin::Pin<
89        Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
90    >;
91
92    /// Cancels multiple orders.
93    fn cancel_orders(
94        &self,
95        instrument_id: InstrumentId,
96        client_order_ids: Option<Vec<ClientOrderId>>,
97        venue_order_ids: Option<Vec<VenueOrderId>>,
98    ) -> std::pin::Pin<
99        Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
100    >;
101
102    /// Cancels all orders for an instrument.
103    fn cancel_all_orders(
104        &self,
105        instrument_id: InstrumentId,
106        order_side: Option<OrderSide>,
107    ) -> std::pin::Pin<
108        Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
109    >;
110}
111
112impl CancelExecutor for BitmexHttpClient {
113    fn add_instrument(&self, instrument: InstrumentAny) {
114        BitmexHttpClient::add_instrument(self, instrument);
115    }
116
117    fn health_check(
118        &self,
119    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>> {
120        Box::pin(async move {
121            BitmexHttpClient::http_get_server_time(self)
122                .await
123                .map(|_| ())
124                .map_err(|e| anyhow::anyhow!("{e}"))
125        })
126    }
127
128    fn cancel_order(
129        &self,
130        instrument_id: InstrumentId,
131        client_order_id: Option<ClientOrderId>,
132        venue_order_id: Option<VenueOrderId>,
133    ) -> std::pin::Pin<
134        Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
135    > {
136        Box::pin(async move {
137            BitmexHttpClient::cancel_order(self, instrument_id, client_order_id, venue_order_id)
138                .await
139        })
140    }
141
142    fn cancel_orders(
143        &self,
144        instrument_id: InstrumentId,
145        client_order_ids: Option<Vec<ClientOrderId>>,
146        venue_order_ids: Option<Vec<VenueOrderId>>,
147    ) -> std::pin::Pin<
148        Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
149    > {
150        Box::pin(async move {
151            BitmexHttpClient::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids)
152                .await
153        })
154    }
155
156    fn cancel_all_orders(
157        &self,
158        instrument_id: InstrumentId,
159        order_side: Option<nautilus_model::enums::OrderSide>,
160    ) -> std::pin::Pin<
161        Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
162    > {
163        Box::pin(async move {
164            BitmexHttpClient::cancel_all_orders(self, instrument_id, order_side).await
165        })
166    }
167}
168
169/// Configuration for the cancel broadcaster.
170#[derive(Debug, Clone)]
171pub struct CancelBroadcasterConfig {
172    /// Number of HTTP clients in the pool.
173    pub pool_size: usize,
174    /// BitMEX API key (None will source from environment).
175    pub api_key: Option<String>,
176    /// BitMEX API secret (None will source from environment).
177    pub api_secret: Option<String>,
178    /// Base URL for BitMEX HTTP API.
179    pub base_url: Option<String>,
180    /// If connecting to BitMEX testnet.
181    pub testnet: bool,
182    /// Timeout in seconds for HTTP requests.
183    pub timeout_secs: Option<u64>,
184    /// Maximum number of retry attempts for failed requests.
185    pub max_retries: Option<u32>,
186    /// Initial delay in milliseconds between retry attempts.
187    pub retry_delay_ms: Option<u64>,
188    /// Maximum delay in milliseconds between retry attempts.
189    pub retry_delay_max_ms: Option<u64>,
190    /// Expiration window in milliseconds for signed requests.
191    pub recv_window_ms: Option<u64>,
192    /// Maximum REST burst rate (requests per second).
193    pub max_requests_per_second: Option<u32>,
194    /// Maximum REST rolling rate (requests per minute).
195    pub max_requests_per_minute: Option<u32>,
196    /// Interval in seconds between health check pings.
197    pub health_check_interval_secs: u64,
198    /// Timeout in seconds for health check requests.
199    pub health_check_timeout_secs: u64,
200    /// Substrings to identify expected cancel rejections for debug-level logging.
201    pub expected_reject_patterns: Vec<String>,
202    /// Substrings to identify idempotent success (order already cancelled/not found).
203    pub idempotent_success_patterns: Vec<String>,
204}
205
206impl Default for CancelBroadcasterConfig {
207    fn default() -> Self {
208        Self {
209            pool_size: 2,
210            api_key: None,
211            api_secret: None,
212            base_url: None,
213            testnet: false,
214            timeout_secs: Some(60),
215            max_retries: None,
216            retry_delay_ms: Some(1_000),
217            retry_delay_max_ms: Some(5_000),
218            recv_window_ms: Some(10_000),
219            max_requests_per_second: Some(10),
220            max_requests_per_minute: Some(120),
221            health_check_interval_secs: 30,
222            health_check_timeout_secs: 5,
223            expected_reject_patterns: vec![
224                r"Order had execInst of ParticipateDoNotInitiate".to_string(),
225            ],
226            idempotent_success_patterns: vec![
227                r"AlreadyCanceled".to_string(),
228                r"orderID not found".to_string(),
229                r"Unable to cancel order due to existing state".to_string(),
230            ],
231        }
232    }
233}
234
235/// Transport client wrapper with health monitoring.
236#[derive(Clone)]
237struct TransportClient {
238    /// Executor wrapped in Arc to enable cloning without requiring Clone on CancelExecutor.
239    ///
240    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
241    /// the executor across multiple TransportClient clones.
242    executor: Arc<dyn CancelExecutor>,
243    client_id: String,
244    healthy: Arc<AtomicBool>,
245    cancel_count: Arc<AtomicU64>,
246    error_count: Arc<AtomicU64>,
247}
248
249impl std::fmt::Debug for TransportClient {
250    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251        f.debug_struct("TransportClient")
252            .field("client_id", &self.client_id)
253            .field("healthy", &self.healthy)
254            .field("cancel_count", &self.cancel_count)
255            .field("error_count", &self.error_count)
256            .finish()
257    }
258}
259
260impl TransportClient {
261    fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
262        Self {
263            executor: Arc::new(executor),
264            client_id,
265            healthy: Arc::new(AtomicBool::new(true)),
266            cancel_count: Arc::new(AtomicU64::new(0)),
267            error_count: Arc::new(AtomicU64::new(0)),
268        }
269    }
270
271    fn is_healthy(&self) -> bool {
272        self.healthy.load(Ordering::Relaxed)
273    }
274
275    fn mark_healthy(&self) {
276        self.healthy.store(true, Ordering::Relaxed);
277    }
278
279    fn mark_unhealthy(&self) {
280        self.healthy.store(false, Ordering::Relaxed);
281    }
282
283    async fn health_check(&self, timeout_secs: u64) -> bool {
284        match tokio::time::timeout(
285            Duration::from_secs(timeout_secs),
286            self.executor.health_check(),
287        )
288        .await
289        {
290            Ok(Ok(_)) => {
291                self.mark_healthy();
292                true
293            }
294            Ok(Err(e)) => {
295                tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
296                self.mark_unhealthy();
297                false
298            }
299            Err(_) => {
300                tracing::warn!("Health check timeout for client {}", self.client_id);
301                self.mark_unhealthy();
302                false
303            }
304        }
305    }
306
307    async fn cancel_order(
308        &self,
309        instrument_id: InstrumentId,
310        client_order_id: Option<ClientOrderId>,
311        venue_order_id: Option<VenueOrderId>,
312    ) -> anyhow::Result<OrderStatusReport> {
313        self.cancel_count.fetch_add(1, Ordering::Relaxed);
314
315        match self
316            .executor
317            .cancel_order(instrument_id, client_order_id, venue_order_id)
318            .await
319        {
320            Ok(report) => {
321                self.mark_healthy();
322                Ok(report)
323            }
324            Err(e) => {
325                self.error_count.fetch_add(1, Ordering::Relaxed);
326                Err(e)
327            }
328        }
329    }
330
331    fn get_cancel_count(&self) -> u64 {
332        self.cancel_count.load(Ordering::Relaxed)
333    }
334
335    fn get_error_count(&self) -> u64 {
336        self.error_count.load(Ordering::Relaxed)
337    }
338}
339
340/// Broadcasts cancel requests to multiple HTTP clients for redundancy.
341///
342/// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
343/// in parallel, short-circuits when the first successful acknowledgement is received,
344/// and handles expected rejection patterns with appropriate log levels.
345#[cfg_attr(feature = "python", pyo3::pyclass)]
346#[derive(Debug)]
347pub struct CancelBroadcaster {
348    config: CancelBroadcasterConfig,
349    transports: Arc<RwLock<Vec<TransportClient>>>,
350    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
351    running: Arc<AtomicBool>,
352    total_cancels: Arc<AtomicU64>,
353    successful_cancels: Arc<AtomicU64>,
354    failed_cancels: Arc<AtomicU64>,
355    expected_rejects: Arc<AtomicU64>,
356    idempotent_successes: Arc<AtomicU64>,
357}
358
359impl CancelBroadcaster {
360    /// Creates a new [`CancelBroadcaster`] with internal HTTP client pool.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if any HTTP client fails to initialize.
365    pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
366        let mut transports = Vec::with_capacity(config.pool_size);
367
368        // Synthesize base_url when testnet is true but base_url is None
369        let base_url = if config.testnet && config.base_url.is_none() {
370            Some(BITMEX_HTTP_TESTNET_URL.to_string())
371        } else {
372            config.base_url.clone()
373        };
374
375        for i in 0..config.pool_size {
376            let client = BitmexHttpClient::with_credentials(
377                config.api_key.clone(),
378                config.api_secret.clone(),
379                base_url.clone(),
380                config.timeout_secs,
381                config.max_retries,
382                config.retry_delay_ms,
383                config.retry_delay_max_ms,
384                config.recv_window_ms,
385                config.max_requests_per_second,
386                config.max_requests_per_minute,
387            )
388            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
389
390            transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
391        }
392
393        Ok(Self {
394            config,
395            transports: Arc::new(RwLock::new(transports)),
396            health_check_task: Arc::new(RwLock::new(None)),
397            running: Arc::new(AtomicBool::new(false)),
398            total_cancels: Arc::new(AtomicU64::new(0)),
399            successful_cancels: Arc::new(AtomicU64::new(0)),
400            failed_cancels: Arc::new(AtomicU64::new(0)),
401            expected_rejects: Arc::new(AtomicU64::new(0)),
402            idempotent_successes: Arc::new(AtomicU64::new(0)),
403        })
404    }
405
406    /// Starts the broadcaster and health check loop.
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if the broadcaster is already running.
411    pub async fn start(&self) -> anyhow::Result<()> {
412        if self.running.load(Ordering::Relaxed) {
413            return Ok(());
414        }
415
416        self.running.store(true, Ordering::Relaxed);
417
418        // Initial health check for all clients
419        self.run_health_checks().await;
420
421        // Start periodic health check task
422        let transports = Arc::clone(&self.transports);
423        let running = Arc::clone(&self.running);
424        let interval_secs = self.config.health_check_interval_secs;
425        let timeout_secs = self.config.health_check_timeout_secs;
426
427        let task = tokio::spawn(async move {
428            let mut ticker = interval(Duration::from_secs(interval_secs));
429            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
430
431            loop {
432                ticker.tick().await;
433
434                if !running.load(Ordering::Relaxed) {
435                    break;
436                }
437
438                let transports_guard = transports.read().await;
439                let transports_clone: Vec<_> = transports_guard.clone();
440                drop(transports_guard);
441
442                let tasks: Vec<_> = transports_clone
443                    .iter()
444                    .map(|t| t.health_check(timeout_secs))
445                    .collect();
446
447                let results = future::join_all(tasks).await;
448                let healthy_count = results.iter().filter(|&&r| r).count();
449
450                tracing::debug!(
451                    "Health check complete: {}/{} clients healthy",
452                    healthy_count,
453                    results.len()
454                );
455            }
456        });
457
458        *self.health_check_task.write().await = Some(task);
459
460        tracing::info!(
461            "CancelBroadcaster started with {} clients",
462            self.transports.read().await.len()
463        );
464
465        Ok(())
466    }
467
468    /// Stops the broadcaster and health check loop.
469    pub async fn stop(&self) {
470        if !self.running.load(Ordering::Relaxed) {
471            return;
472        }
473
474        self.running.store(false, Ordering::Relaxed);
475
476        if let Some(task) = self.health_check_task.write().await.take() {
477            task.abort();
478        }
479
480        tracing::info!("CancelBroadcaster stopped");
481    }
482
483    async fn run_health_checks(&self) {
484        let transports_guard = self.transports.read().await;
485        let transports_clone: Vec<_> = transports_guard.clone();
486        drop(transports_guard);
487
488        let tasks: Vec<_> = transports_clone
489            .iter()
490            .map(|t| t.health_check(self.config.health_check_timeout_secs))
491            .collect();
492
493        let results = future::join_all(tasks).await;
494        let healthy_count = results.iter().filter(|&&r| r).count();
495
496        tracing::debug!(
497            "Health check complete: {}/{} clients healthy",
498            healthy_count,
499            results.len()
500        );
501    }
502
503    fn is_expected_reject(&self, error_message: &str) -> bool {
504        self.config
505            .expected_reject_patterns
506            .iter()
507            .any(|pattern| error_message.contains(pattern))
508    }
509
510    fn is_idempotent_success(&self, error_message: &str) -> bool {
511        self.config
512            .idempotent_success_patterns
513            .iter()
514            .any(|pattern| error_message.contains(pattern))
515    }
516
517    /// Processes cancel request results, handling success, idempotent success, and failures.
518    ///
519    /// This helper consolidates the common error handling loop used across all broadcast methods.
520    async fn process_cancel_results<T>(
521        &self,
522        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
523        idempotent_result: impl FnOnce() -> anyhow::Result<T>,
524        operation: &str,
525        params: String,
526        idempotent_reason: &str,
527    ) -> anyhow::Result<T>
528    where
529        T: Send + 'static,
530    {
531        let mut errors = Vec::new();
532
533        while !handles.is_empty() {
534            let current_handles = std::mem::take(&mut handles);
535            let (result, _idx, remaining) = future::select_all(current_handles).await;
536            handles = remaining.into_iter().collect();
537
538            match result {
539                Ok((client_id, Ok(result))) => {
540                    // First success - abort remaining handles
541                    for handle in &handles {
542                        handle.abort();
543                    }
544                    self.successful_cancels.fetch_add(1, Ordering::Relaxed);
545                    tracing::debug!(
546                        "{} broadcast succeeded [{}] {}",
547                        operation,
548                        client_id,
549                        params
550                    );
551                    return Ok(result);
552                }
553                Ok((client_id, Err(e))) => {
554                    let error_msg = e.to_string();
555
556                    if self.is_idempotent_success(&error_msg) {
557                        // First idempotent success - abort remaining handles and return success
558                        for handle in &handles {
559                            handle.abort();
560                        }
561                        self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
562                        tracing::debug!(
563                            "Idempotent success [{}] - {}: {} {}",
564                            client_id,
565                            idempotent_reason,
566                            error_msg,
567                            params
568                        );
569                        return idempotent_result();
570                    }
571
572                    if self.is_expected_reject(&error_msg) {
573                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
574                        tracing::debug!(
575                            "Expected {} rejection [{}]: {} {}",
576                            operation.to_lowercase(),
577                            client_id,
578                            error_msg,
579                            params
580                        );
581                        errors.push(error_msg);
582                    } else {
583                        tracing::warn!(
584                            "{} request failed [{}]: {} {}",
585                            operation,
586                            client_id,
587                            error_msg,
588                            params
589                        );
590                        errors.push(error_msg);
591                    }
592                }
593                Err(e) => {
594                    tracing::warn!("{} task join error: {e:?}", operation);
595                    errors.push(format!("Task panicked: {e:?}"));
596                }
597            }
598        }
599
600        // All tasks failed
601        self.failed_cancels.fetch_add(1, Ordering::Relaxed);
602        tracing::error!(
603            "All {} requests failed: {:?} {}",
604            operation.to_lowercase(),
605            errors,
606            params
607        );
608        Err(anyhow::anyhow!(
609            "All {} requests failed: {:?}",
610            operation.to_lowercase(),
611            errors
612        ))
613    }
614
615    /// Broadcasts a single cancel request to all healthy clients in parallel.
616    ///
617    /// # Returns
618    ///
619    /// - `Ok(Some(report))` if successfully cancelled with a report.
620    /// - `Ok(None)` if the order was already cancelled (idempotent success).
621    /// - `Err` if all requests failed.
622    ///
623    /// # Errors
624    ///
625    /// Returns an error if all cancel requests fail or no healthy clients are available.
626    pub async fn broadcast_cancel(
627        &self,
628        instrument_id: InstrumentId,
629        client_order_id: Option<ClientOrderId>,
630        venue_order_id: Option<VenueOrderId>,
631    ) -> anyhow::Result<Option<OrderStatusReport>> {
632        self.total_cancels.fetch_add(1, Ordering::Relaxed);
633
634        // Filter for healthy clients and clone them
635        let transports_guard = self.transports.read().await;
636        let healthy_transports: Vec<TransportClient> = transports_guard
637            .iter()
638            .filter(|t| t.is_healthy())
639            .cloned()
640            .collect();
641        drop(transports_guard);
642
643        if healthy_transports.is_empty() {
644            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
645            anyhow::bail!("No healthy transport clients available");
646        }
647
648        // Spawn tasks for all healthy clients
649        let mut handles = Vec::new();
650        for transport in healthy_transports {
651            let handle = tokio::spawn(async move {
652                let client_id = transport.client_id.clone();
653                let result = transport
654                    .cancel_order(instrument_id, client_order_id, venue_order_id)
655                    .await
656                    .map(Some); // Wrap success in Some for Option<OrderStatusReport>
657                (client_id, result)
658            });
659            handles.push(handle);
660        }
661
662        self.process_cancel_results(
663            handles,
664            || Ok(None),
665            "Cancel",
666            format!(
667                "(client_order_id={:?}, venue_order_id={:?})",
668                client_order_id, venue_order_id
669            ),
670            "order already cancelled/not found",
671        )
672        .await
673    }
674
675    /// Broadcasts a batch cancel request to all healthy clients in parallel.
676    ///
677    /// # Errors
678    ///
679    /// Returns an error if all cancel requests fail or no healthy clients are available.
680    pub async fn broadcast_batch_cancel(
681        &self,
682        instrument_id: InstrumentId,
683        client_order_ids: Option<Vec<ClientOrderId>>,
684        venue_order_ids: Option<Vec<VenueOrderId>>,
685    ) -> anyhow::Result<Vec<OrderStatusReport>> {
686        self.total_cancels.fetch_add(1, Ordering::Relaxed);
687
688        // Filter for healthy clients and clone them
689        let transports_guard = self.transports.read().await;
690        let healthy_transports: Vec<TransportClient> = transports_guard
691            .iter()
692            .filter(|t| t.is_healthy())
693            .cloned()
694            .collect();
695        drop(transports_guard);
696
697        if healthy_transports.is_empty() {
698            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
699            anyhow::bail!("No healthy transport clients available");
700        }
701
702        // Spawn tasks for all healthy clients
703        let mut handles = Vec::new();
704
705        for transport in healthy_transports {
706            let client_order_ids_clone = client_order_ids.clone();
707            let venue_order_ids_clone = venue_order_ids.clone();
708            let handle = tokio::spawn(async move {
709                let client_id = transport.client_id.clone();
710                let result = transport
711                    .executor
712                    .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
713                    .await;
714                (client_id, result)
715            });
716            handles.push(handle);
717        }
718
719        self.process_cancel_results(
720            handles,
721            || Ok(Vec::new()),
722            "Batch cancel",
723            format!(
724                "(client_order_ids={:?}, venue_order_ids={:?})",
725                client_order_ids, venue_order_ids
726            ),
727            "orders already cancelled/not found",
728        )
729        .await
730    }
731
732    /// Broadcasts a cancel all request to all healthy clients in parallel.
733    ///
734    /// # Errors
735    ///
736    /// Returns an error if all cancel requests fail or no healthy clients are available.
737    pub async fn broadcast_cancel_all(
738        &self,
739        instrument_id: InstrumentId,
740        order_side: Option<nautilus_model::enums::OrderSide>,
741    ) -> anyhow::Result<Vec<OrderStatusReport>> {
742        self.total_cancels.fetch_add(1, Ordering::Relaxed);
743
744        // Filter for healthy clients and clone them
745        let transports_guard = self.transports.read().await;
746        let healthy_transports: Vec<TransportClient> = transports_guard
747            .iter()
748            .filter(|t| t.is_healthy())
749            .cloned()
750            .collect();
751        drop(transports_guard);
752
753        if healthy_transports.is_empty() {
754            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
755            anyhow::bail!("No healthy transport clients available");
756        }
757
758        // Spawn tasks for all healthy clients
759        let mut handles = Vec::new();
760        for transport in healthy_transports {
761            let handle = tokio::spawn(async move {
762                let client_id = transport.client_id.clone();
763                let result = transport
764                    .executor
765                    .cancel_all_orders(instrument_id, order_side)
766                    .await;
767                (client_id, result)
768            });
769            handles.push(handle);
770        }
771
772        self.process_cancel_results(
773            handles,
774            || Ok(Vec::new()),
775            "Cancel all",
776            format!(
777                "(instrument_id={}, order_side={:?})",
778                instrument_id, order_side
779            ),
780            "no orders to cancel",
781        )
782        .await
783    }
784
785    /// Gets broadcaster metrics.
786    pub fn get_metrics(&self) -> BroadcasterMetrics {
787        let transports_guard = self.transports.blocking_read();
788        let healthy_clients = transports_guard.iter().filter(|t| t.is_healthy()).count();
789        let total_clients = transports_guard.len();
790        drop(transports_guard);
791
792        BroadcasterMetrics {
793            total_cancels: self.total_cancels.load(Ordering::Relaxed),
794            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
795            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
796            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
797            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
798            healthy_clients,
799            total_clients,
800        }
801    }
802
803    /// Gets broadcaster metrics (async version for use within async context).
804    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
805        let transports_guard = self.transports.read().await;
806        let healthy_clients = transports_guard.iter().filter(|t| t.is_healthy()).count();
807        let total_clients = transports_guard.len();
808        drop(transports_guard);
809
810        BroadcasterMetrics {
811            total_cancels: self.total_cancels.load(Ordering::Relaxed),
812            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
813            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
814            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
815            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
816            healthy_clients,
817            total_clients,
818        }
819    }
820
821    /// Gets per-client statistics.
822    pub fn get_client_stats(&self) -> Vec<ClientStats> {
823        let transports = self.transports.blocking_read();
824        transports
825            .iter()
826            .map(|t| ClientStats {
827                client_id: t.client_id.clone(),
828                healthy: t.is_healthy(),
829                cancel_count: t.get_cancel_count(),
830                error_count: t.get_error_count(),
831            })
832            .collect()
833    }
834
835    /// Gets per-client statistics (async version for use within async context).
836    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
837        let transports = self.transports.read().await;
838        transports
839            .iter()
840            .map(|t| ClientStats {
841                client_id: t.client_id.clone(),
842                healthy: t.is_healthy(),
843                cancel_count: t.get_cancel_count(),
844                error_count: t.get_error_count(),
845            })
846            .collect()
847    }
848
849    /// Adds an instrument to all HTTP clients in the pool for caching.
850    pub fn add_instrument(&self, instrument: nautilus_model::instruments::any::InstrumentAny) {
851        let transports = self.transports.blocking_read();
852        for transport in transports.iter() {
853            transport.executor.add_instrument(instrument.clone());
854        }
855    }
856
857    pub fn clone_for_async(&self) -> Self {
858        Self {
859            config: self.config.clone(),
860            transports: Arc::clone(&self.transports),
861            health_check_task: Arc::clone(&self.health_check_task),
862            running: Arc::clone(&self.running),
863            total_cancels: Arc::clone(&self.total_cancels),
864            successful_cancels: Arc::clone(&self.successful_cancels),
865            failed_cancels: Arc::clone(&self.failed_cancels),
866            expected_rejects: Arc::clone(&self.expected_rejects),
867            idempotent_successes: Arc::clone(&self.idempotent_successes),
868        }
869    }
870
871    #[cfg(test)]
872    fn new_with_transports(
873        config: CancelBroadcasterConfig,
874        transports: Vec<TransportClient>,
875    ) -> Self {
876        Self {
877            config,
878            transports: Arc::new(RwLock::new(transports)),
879            health_check_task: Arc::new(RwLock::new(None)),
880            running: Arc::new(AtomicBool::new(false)),
881            total_cancels: Arc::new(AtomicU64::new(0)),
882            successful_cancels: Arc::new(AtomicU64::new(0)),
883            failed_cancels: Arc::new(AtomicU64::new(0)),
884            expected_rejects: Arc::new(AtomicU64::new(0)),
885            idempotent_successes: Arc::new(AtomicU64::new(0)),
886        }
887    }
888}
889
890/// Broadcaster metrics snapshot.
891#[derive(Debug, Clone)]
892pub struct BroadcasterMetrics {
893    pub total_cancels: u64,
894    pub successful_cancels: u64,
895    pub failed_cancels: u64,
896    pub expected_rejects: u64,
897    pub idempotent_successes: u64,
898    pub healthy_clients: usize,
899    pub total_clients: usize,
900}
901
902/// Per-client statistics.
903#[derive(Debug, Clone)]
904pub struct ClientStats {
905    pub client_id: String,
906    pub healthy: bool,
907    pub cancel_count: u64,
908    pub error_count: u64,
909}
910
911////////////////////////////////////////////////////////////////////////////////
912// Tests
913////////////////////////////////////////////////////////////////////////////////
914
915#[cfg(test)]
916mod tests {
917    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
918
919    use nautilus_core::UUID4;
920    use nautilus_model::{
921        enums::{
922            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
923        },
924        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
925        reports::OrderStatusReport,
926        types::{Price, Quantity},
927    };
928
929    use super::*;
930
931    /// Mock executor for testing.
932    #[derive(Clone)]
933    #[allow(clippy::type_complexity)]
934    struct MockExecutor {
935        handler: Arc<
936            dyn Fn(
937                    InstrumentId,
938                    Option<ClientOrderId>,
939                    Option<VenueOrderId>,
940                ) -> std::pin::Pin<
941                    Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send>,
942                > + Send
943                + Sync,
944        >,
945    }
946
947    impl MockExecutor {
948        fn new<F, Fut>(handler: F) -> Self
949        where
950            F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
951                + Send
952                + Sync
953                + 'static,
954            Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
955        {
956            Self {
957                handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
958            }
959        }
960    }
961
962    impl CancelExecutor for MockExecutor {
963        fn health_check(
964            &self,
965        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>
966        {
967            Box::pin(async { Ok(()) })
968        }
969
970        fn cancel_order(
971            &self,
972            instrument_id: InstrumentId,
973            client_order_id: Option<ClientOrderId>,
974            venue_order_id: Option<VenueOrderId>,
975        ) -> std::pin::Pin<
976            Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
977        > {
978            (self.handler)(instrument_id, client_order_id, venue_order_id)
979        }
980
981        fn cancel_orders(
982            &self,
983            _instrument_id: InstrumentId,
984            _client_order_ids: Option<Vec<ClientOrderId>>,
985            _venue_order_ids: Option<Vec<VenueOrderId>>,
986        ) -> std::pin::Pin<
987            Box<
988                dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
989                    + Send
990                    + '_,
991            >,
992        > {
993            Box::pin(async { Ok(Vec::new()) })
994        }
995
996        fn cancel_all_orders(
997            &self,
998            instrument_id: InstrumentId,
999            _order_side: Option<nautilus_model::enums::OrderSide>,
1000        ) -> std::pin::Pin<
1001            Box<
1002                dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
1003                    + Send
1004                    + '_,
1005            >,
1006        > {
1007            // Try to get result from the single-order handler to propagate errors
1008            let handler = Arc::clone(&self.handler);
1009            Box::pin(async move {
1010                // Call the handler to check if it would fail
1011                let result = handler(instrument_id, None, None).await;
1012                match result {
1013                    Ok(_) => Ok(Vec::new()),
1014                    Err(e) => Err(e),
1015                }
1016            })
1017        }
1018
1019        fn add_instrument(&self, _instrument: nautilus_model::instruments::any::InstrumentAny) {
1020            // No-op for mock
1021        }
1022    }
1023
1024    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
1025        OrderStatusReport {
1026            account_id: AccountId::from("BITMEX-001"),
1027            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
1028            venue_order_id: VenueOrderId::from(venue_order_id),
1029            order_side: OrderSide::Buy,
1030            order_type: OrderType::Limit,
1031            time_in_force: TimeInForce::Gtc,
1032            order_status: OrderStatus::Canceled,
1033            price: Some(Price::new(50000.0, 2)),
1034            quantity: Quantity::new(100.0, 0),
1035            filled_qty: Quantity::new(0.0, 0),
1036            report_id: UUID4::new(),
1037            ts_accepted: 0.into(),
1038            ts_last: 0.into(),
1039            ts_init: 0.into(),
1040            client_order_id: None,
1041            avg_px: None,
1042            trigger_price: None,
1043            trigger_type: None,
1044            contingency_type: ContingencyType::NoContingency,
1045            expire_time: None,
1046            order_list_id: None,
1047            venue_position_id: None,
1048            linked_order_ids: None,
1049            parent_order_id: None,
1050            display_qty: None,
1051            limit_offset: None,
1052            trailing_offset: None,
1053            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
1054            post_only: false,
1055            reduce_only: false,
1056            cancel_reason: None,
1057            ts_triggered: None,
1058        }
1059    }
1060
1061    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
1062    where
1063        F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
1064            + Send
1065            + Sync
1066            + 'static,
1067        Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
1068    {
1069        let executor = MockExecutor::new(handler);
1070        TransportClient::new(executor, client_id.to_string())
1071    }
1072
1073    #[tokio::test]
1074    async fn test_broadcast_cancel_immediate_success() {
1075        let report = create_test_report("ORDER-1");
1076        let report_clone = report.clone();
1077
1078        let transports = vec![
1079            create_stub_transport("client-0", move |_, _, _| {
1080                let report = report_clone.clone();
1081                async move { Ok(report) }
1082            }),
1083            create_stub_transport("client-1", |_, _, _| async {
1084                tokio::time::sleep(Duration::from_secs(10)).await;
1085                anyhow::bail!("Should be aborted")
1086            }),
1087        ];
1088
1089        let config = CancelBroadcasterConfig::default();
1090        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1091
1092        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1093        let result = broadcaster
1094            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1095            .await;
1096
1097        assert!(result.is_ok());
1098        let returned_report = result.unwrap();
1099        assert!(returned_report.is_some());
1100        assert_eq!(
1101            returned_report.unwrap().venue_order_id,
1102            report.venue_order_id
1103        );
1104
1105        let metrics = broadcaster.get_metrics_async().await;
1106        assert_eq!(metrics.successful_cancels, 1);
1107        assert_eq!(metrics.failed_cancels, 0);
1108        assert_eq!(metrics.total_cancels, 1);
1109    }
1110
1111    #[tokio::test]
1112    async fn test_broadcast_cancel_idempotent_success() {
1113        let transports = vec![
1114            create_stub_transport("client-0", |_, _, _| async {
1115                anyhow::bail!("AlreadyCanceled")
1116            }),
1117            create_stub_transport("client-1", |_, _, _| async {
1118                tokio::time::sleep(Duration::from_secs(10)).await;
1119                anyhow::bail!("Should be aborted")
1120            }),
1121        ];
1122
1123        let config = CancelBroadcasterConfig::default();
1124        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1125
1126        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1127        let result = broadcaster
1128            .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1129            .await;
1130
1131        assert!(result.is_ok());
1132        assert!(result.unwrap().is_none());
1133
1134        let metrics = broadcaster.get_metrics_async().await;
1135        assert_eq!(metrics.idempotent_successes, 1);
1136        assert_eq!(metrics.successful_cancels, 0);
1137        assert_eq!(metrics.failed_cancels, 0);
1138    }
1139
1140    #[tokio::test]
1141    async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1142        let transports = vec![
1143            create_stub_transport("client-0", |_, _, _| async {
1144                anyhow::bail!("502 Bad Gateway")
1145            }),
1146            create_stub_transport("client-1", |_, _, _| async {
1147                anyhow::bail!("orderID not found")
1148            }),
1149        ];
1150
1151        let config = CancelBroadcasterConfig::default();
1152        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1153
1154        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1155        let result = broadcaster
1156            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1157            .await;
1158
1159        assert!(result.is_ok());
1160        assert!(result.unwrap().is_none());
1161
1162        let metrics = broadcaster.get_metrics_async().await;
1163        assert_eq!(metrics.idempotent_successes, 1);
1164        assert_eq!(metrics.failed_cancels, 0);
1165    }
1166
1167    #[tokio::test]
1168    async fn test_broadcast_cancel_all_failures() {
1169        let transports = vec![
1170            create_stub_transport("client-0", |_, _, _| async {
1171                anyhow::bail!("502 Bad Gateway")
1172            }),
1173            create_stub_transport("client-1", |_, _, _| async {
1174                anyhow::bail!("Connection refused")
1175            }),
1176        ];
1177
1178        let config = CancelBroadcasterConfig::default();
1179        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1180
1181        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1182        let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1183
1184        assert!(result.is_err());
1185        assert!(
1186            result
1187                .unwrap_err()
1188                .to_string()
1189                .contains("All cancel all requests failed")
1190        );
1191
1192        let metrics = broadcaster.get_metrics_async().await;
1193        assert_eq!(metrics.failed_cancels, 1);
1194        assert_eq!(metrics.successful_cancels, 0);
1195        assert_eq!(metrics.idempotent_successes, 0);
1196    }
1197
1198    #[tokio::test]
1199    async fn test_broadcast_cancel_no_healthy_clients() {
1200        let transport = create_stub_transport("client-0", |_, _, _| async {
1201            Ok(create_test_report("ORDER-1"))
1202        });
1203        transport.healthy.store(false, Ordering::Relaxed);
1204
1205        let config = CancelBroadcasterConfig::default();
1206        let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1207
1208        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1209        let result = broadcaster
1210            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1211            .await;
1212
1213        assert!(result.is_err());
1214        assert!(
1215            result
1216                .unwrap_err()
1217                .to_string()
1218                .contains("No healthy transport clients available")
1219        );
1220
1221        let metrics = broadcaster.get_metrics_async().await;
1222        assert_eq!(metrics.failed_cancels, 1);
1223    }
1224
1225    #[tokio::test]
1226    async fn test_broadcast_cancel_metrics_increment() {
1227        let report1 = create_test_report("ORDER-1");
1228        let report1_clone = report1.clone();
1229        let report2 = create_test_report("ORDER-2");
1230        let report2_clone = report2.clone();
1231
1232        let transports = vec![
1233            create_stub_transport("client-0", move |_, _, _| {
1234                let report = report1_clone.clone();
1235                async move { Ok(report) }
1236            }),
1237            create_stub_transport("client-1", move |_, _, _| {
1238                let report = report2_clone.clone();
1239                async move { Ok(report) }
1240            }),
1241        ];
1242
1243        let config = CancelBroadcasterConfig::default();
1244        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1245
1246        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1247
1248        let _ = broadcaster
1249            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1250            .await;
1251
1252        let _ = broadcaster
1253            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1254            .await;
1255
1256        let metrics = broadcaster.get_metrics_async().await;
1257        assert_eq!(metrics.total_cancels, 2);
1258        assert_eq!(metrics.successful_cancels, 2);
1259    }
1260
1261    #[tokio::test]
1262    async fn test_broadcast_cancel_expected_reject_pattern() {
1263        let transports = vec![
1264            create_stub_transport("client-0", |_, _, _| async {
1265                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1266            }),
1267            create_stub_transport("client-1", |_, _, _| async {
1268                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1269            }),
1270        ];
1271
1272        let config = CancelBroadcasterConfig::default();
1273        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1274
1275        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1276        let result = broadcaster
1277            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1278            .await;
1279
1280        assert!(result.is_err());
1281
1282        let metrics = broadcaster.get_metrics_async().await;
1283        assert_eq!(metrics.expected_rejects, 2);
1284        assert_eq!(metrics.failed_cancels, 1);
1285    }
1286
1287    #[tokio::test]
1288    async fn test_broadcaster_creation_with_pool() {
1289        let config = CancelBroadcasterConfig {
1290            pool_size: 3,
1291            api_key: Some("test_key".to_string()),
1292            api_secret: Some("test_secret".to_string()),
1293            base_url: Some("https://test.example.com".to_string()),
1294            testnet: false,
1295            timeout_secs: Some(5),
1296            max_retries: Some(2),
1297            retry_delay_ms: Some(100),
1298            retry_delay_max_ms: Some(1000),
1299            recv_window_ms: Some(5000),
1300            max_requests_per_second: Some(10),
1301            max_requests_per_minute: Some(100),
1302            health_check_interval_secs: 30,
1303            health_check_timeout_secs: 5,
1304            expected_reject_patterns: vec!["test_pattern".to_string()],
1305            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1306        };
1307
1308        let broadcaster = CancelBroadcaster::new(config.clone());
1309        assert!(broadcaster.is_ok());
1310
1311        let broadcaster = broadcaster.unwrap();
1312        let metrics = broadcaster.get_metrics_async().await;
1313
1314        assert_eq!(metrics.total_clients, 3);
1315        assert_eq!(metrics.total_cancels, 0);
1316        assert_eq!(metrics.successful_cancels, 0);
1317        assert_eq!(metrics.failed_cancels, 0);
1318    }
1319
1320    #[tokio::test]
1321    async fn test_broadcaster_lifecycle() {
1322        let config = CancelBroadcasterConfig {
1323            pool_size: 2,
1324            api_key: Some("test_key".to_string()),
1325            api_secret: Some("test_secret".to_string()),
1326            base_url: Some("https://test.example.com".to_string()),
1327            testnet: false,
1328            timeout_secs: Some(5),
1329            max_retries: None,
1330            retry_delay_ms: None,
1331            retry_delay_max_ms: None,
1332            recv_window_ms: None,
1333            max_requests_per_second: None,
1334            max_requests_per_minute: None,
1335            health_check_interval_secs: 60, // Long interval so it won't tick during test
1336            health_check_timeout_secs: 1,
1337            expected_reject_patterns: vec![],
1338            idempotent_success_patterns: vec![],
1339        };
1340
1341        let broadcaster = CancelBroadcaster::new(config).unwrap();
1342
1343        // Should not be running initially
1344        assert!(!broadcaster.running.load(Ordering::Relaxed));
1345
1346        // Start broadcaster
1347        let start_result = broadcaster.start().await;
1348        assert!(start_result.is_ok());
1349        assert!(broadcaster.running.load(Ordering::Relaxed));
1350
1351        // Starting again should be idempotent
1352        let start_again = broadcaster.start().await;
1353        assert!(start_again.is_ok());
1354
1355        // Stop broadcaster
1356        broadcaster.stop().await;
1357        assert!(!broadcaster.running.load(Ordering::Relaxed));
1358
1359        // Stopping again should be safe
1360        broadcaster.stop().await;
1361        assert!(!broadcaster.running.load(Ordering::Relaxed));
1362    }
1363
1364    #[tokio::test]
1365    async fn test_client_stats_collection() {
1366        let config = CancelBroadcasterConfig {
1367            pool_size: 2,
1368            api_key: Some("test_key".to_string()),
1369            api_secret: Some("test_secret".to_string()),
1370            base_url: Some("https://test.example.com".to_string()),
1371            testnet: false,
1372            timeout_secs: Some(5),
1373            max_retries: None,
1374            retry_delay_ms: None,
1375            retry_delay_max_ms: None,
1376            recv_window_ms: None,
1377            max_requests_per_second: None,
1378            max_requests_per_minute: None,
1379            health_check_interval_secs: 60,
1380            health_check_timeout_secs: 5,
1381            expected_reject_patterns: vec![],
1382            idempotent_success_patterns: vec![],
1383        };
1384
1385        let broadcaster = CancelBroadcaster::new(config).unwrap();
1386        let stats = broadcaster.get_client_stats_async().await;
1387
1388        assert_eq!(stats.len(), 2);
1389        assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1390        assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1391        assert!(stats[0].healthy); // Should be healthy initially
1392        assert!(stats[1].healthy);
1393        assert_eq!(stats[0].cancel_count, 0);
1394        assert_eq!(stats[1].cancel_count, 0);
1395        assert_eq!(stats[0].error_count, 0);
1396        assert_eq!(stats[1].error_count, 0);
1397    }
1398
1399    #[tokio::test]
1400    async fn test_testnet_config_sets_base_url() {
1401        let config = CancelBroadcasterConfig {
1402            pool_size: 1,
1403            api_key: Some("test_key".to_string()),
1404            api_secret: Some("test_secret".to_string()),
1405            base_url: None, // Not specified
1406            testnet: true,  // But testnet is true
1407            timeout_secs: Some(5),
1408            max_retries: None,
1409            retry_delay_ms: None,
1410            retry_delay_max_ms: None,
1411            recv_window_ms: None,
1412            max_requests_per_second: None,
1413            max_requests_per_minute: None,
1414            health_check_interval_secs: 60,
1415            health_check_timeout_secs: 5,
1416            expected_reject_patterns: vec![],
1417            idempotent_success_patterns: vec![],
1418        };
1419
1420        let broadcaster = CancelBroadcaster::new(config);
1421        assert!(broadcaster.is_ok());
1422    }
1423
1424    #[tokio::test]
1425    async fn test_default_config() {
1426        let config = CancelBroadcasterConfig {
1427            api_key: Some("test_key".to_string()),
1428            api_secret: Some("test_secret".to_string()),
1429            base_url: Some("https://test.example.com".to_string()),
1430            ..Default::default()
1431        };
1432
1433        let broadcaster = CancelBroadcaster::new(config);
1434        assert!(broadcaster.is_ok());
1435
1436        let broadcaster = broadcaster.unwrap();
1437        let metrics = broadcaster.get_metrics_async().await;
1438
1439        // Default pool_size is 2
1440        assert_eq!(metrics.total_clients, 2);
1441    }
1442
1443    #[tokio::test]
1444    async fn test_clone_for_async() {
1445        let config = CancelBroadcasterConfig {
1446            pool_size: 1,
1447            api_key: Some("test_key".to_string()),
1448            api_secret: Some("test_secret".to_string()),
1449            base_url: Some("https://test.example.com".to_string()),
1450            testnet: false,
1451            timeout_secs: Some(5),
1452            max_retries: None,
1453            retry_delay_ms: None,
1454            retry_delay_max_ms: None,
1455            recv_window_ms: None,
1456            max_requests_per_second: None,
1457            max_requests_per_minute: None,
1458            health_check_interval_secs: 60,
1459            health_check_timeout_secs: 5,
1460            expected_reject_patterns: vec![],
1461            idempotent_success_patterns: vec![],
1462        };
1463
1464        let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1465
1466        // Increment a metric on original
1467        broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1468
1469        // Clone should share the same atomic
1470        let broadcaster2 = broadcaster1.clone_for_async();
1471        let metrics2 = broadcaster2.get_metrics_async().await;
1472
1473        assert_eq!(metrics2.total_cancels, 1); // Should see the increment
1474
1475        // Modify through clone
1476        broadcaster2
1477            .successful_cancels
1478            .fetch_add(5, Ordering::Relaxed);
1479
1480        // Original should see the change
1481        let metrics1 = broadcaster1.get_metrics_async().await;
1482        assert_eq!(metrics1.successful_cancels, 5);
1483    }
1484
1485    #[tokio::test]
1486    async fn test_pattern_matching() {
1487        // Test that pattern matching works for expected rejects and idempotent successes
1488        let config = CancelBroadcasterConfig {
1489            pool_size: 1,
1490            api_key: Some("test_key".to_string()),
1491            api_secret: Some("test_secret".to_string()),
1492            base_url: Some("https://test.example.com".to_string()),
1493            testnet: false,
1494            timeout_secs: Some(5),
1495            max_retries: None,
1496            retry_delay_ms: None,
1497            retry_delay_max_ms: None,
1498            recv_window_ms: None,
1499            max_requests_per_second: None,
1500            max_requests_per_minute: None,
1501            health_check_interval_secs: 60,
1502            health_check_timeout_secs: 5,
1503            expected_reject_patterns: vec![
1504                "ParticipateDoNotInitiate".to_string(),
1505                "Close-only".to_string(),
1506            ],
1507            idempotent_success_patterns: vec![
1508                "AlreadyCanceled".to_string(),
1509                "orderID not found".to_string(),
1510                "Unable to cancel".to_string(),
1511            ],
1512        };
1513
1514        let broadcaster = CancelBroadcaster::new(config).unwrap();
1515
1516        // Test expected reject patterns
1517        assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1518        assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1519        assert!(!broadcaster.is_expected_reject("Connection timeout"));
1520
1521        // Test idempotent success patterns
1522        assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1523        assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1524        assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1525        assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1526    }
1527
1528    // Happy-path coverage for broadcast_batch_cancel and broadcast_cancel_all
1529    // Note: These use simplified stubs since batch/cancel-all bypass test_handler
1530    // Full HTTP mocking tested in integration tests
1531    #[tokio::test]
1532    async fn test_broadcast_batch_cancel_structure() {
1533        // Validates broadcaster structure and metric initialization
1534        let config = CancelBroadcasterConfig {
1535            pool_size: 2,
1536            api_key: Some("test_key".to_string()),
1537            api_secret: Some("test_secret".to_string()),
1538            base_url: Some("https://test.example.com".to_string()),
1539            testnet: false,
1540            timeout_secs: Some(5),
1541            max_retries: None,
1542            retry_delay_ms: None,
1543            retry_delay_max_ms: None,
1544            recv_window_ms: None,
1545            max_requests_per_second: None,
1546            max_requests_per_minute: None,
1547            health_check_interval_secs: 60,
1548            health_check_timeout_secs: 5,
1549            expected_reject_patterns: vec![],
1550            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1551        };
1552
1553        let broadcaster = CancelBroadcaster::new(config).unwrap();
1554        let metrics = broadcaster.get_metrics_async().await;
1555
1556        // Verify initial state
1557        assert_eq!(metrics.total_clients, 2);
1558        assert_eq!(metrics.total_cancels, 0);
1559        assert_eq!(metrics.successful_cancels, 0);
1560        assert_eq!(metrics.failed_cancels, 0);
1561    }
1562
1563    #[tokio::test]
1564    async fn test_broadcast_cancel_all_structure() {
1565        // Validates broadcaster structure for cancel_all operations
1566        let config = CancelBroadcasterConfig {
1567            pool_size: 3,
1568            api_key: Some("test_key".to_string()),
1569            api_secret: Some("test_secret".to_string()),
1570            base_url: Some("https://test.example.com".to_string()),
1571            testnet: false,
1572            timeout_secs: Some(5),
1573            max_retries: None,
1574            retry_delay_ms: None,
1575            retry_delay_max_ms: None,
1576            recv_window_ms: None,
1577            max_requests_per_second: None,
1578            max_requests_per_minute: None,
1579            health_check_interval_secs: 60,
1580            health_check_timeout_secs: 5,
1581            expected_reject_patterns: vec![],
1582            idempotent_success_patterns: vec!["orderID not found".to_string()],
1583        };
1584
1585        let broadcaster = CancelBroadcaster::new(config).unwrap();
1586        let metrics = broadcaster.get_metrics_async().await;
1587
1588        // Verify pool size and initial metrics
1589        assert_eq!(metrics.total_clients, 3);
1590        assert_eq!(metrics.healthy_clients, 3);
1591        assert_eq!(metrics.total_cancels, 0);
1592    }
1593
1594    // Metric health tests - validates that idempotent successes don't increment failed_cancels
1595    #[tokio::test]
1596    async fn test_single_cancel_metrics_with_mixed_responses() {
1597        // Test similar to test_broadcast_cancel_mixed_idempotent_and_failure
1598        // but explicitly validates metric health
1599        let transports = vec![
1600            create_stub_transport("client-0", |_, _, _| async {
1601                anyhow::bail!("Connection timeout")
1602            }),
1603            create_stub_transport("client-1", |_, _, _| async {
1604                anyhow::bail!("AlreadyCanceled")
1605            }),
1606        ];
1607
1608        let config = CancelBroadcasterConfig::default();
1609        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1610
1611        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1612        let result = broadcaster
1613            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1614            .await;
1615
1616        // Should succeed with idempotent
1617        assert!(result.is_ok());
1618        assert!(result.unwrap().is_none());
1619
1620        // Verify metrics: idempotent success doesn't count as failure
1621        let metrics = broadcaster.get_metrics_async().await;
1622        assert_eq!(
1623            metrics.failed_cancels, 0,
1624            "Idempotent success should not increment failed_cancels"
1625        );
1626        assert_eq!(metrics.idempotent_successes, 1);
1627        assert_eq!(metrics.successful_cancels, 0);
1628    }
1629
1630    #[tokio::test]
1631    async fn test_metrics_initialization_and_health() {
1632        // Validates that metrics start at zero and clients start healthy
1633        let config = CancelBroadcasterConfig {
1634            pool_size: 4,
1635            api_key: Some("test_key".to_string()),
1636            api_secret: Some("test_secret".to_string()),
1637            base_url: Some("https://test.example.com".to_string()),
1638            testnet: false,
1639            timeout_secs: Some(5),
1640            max_retries: None,
1641            retry_delay_ms: None,
1642            retry_delay_max_ms: None,
1643            recv_window_ms: None,
1644            max_requests_per_second: None,
1645            max_requests_per_minute: None,
1646            health_check_interval_secs: 60,
1647            health_check_timeout_secs: 5,
1648            expected_reject_patterns: vec![],
1649            idempotent_success_patterns: vec![],
1650        };
1651
1652        let broadcaster = CancelBroadcaster::new(config).unwrap();
1653        let metrics = broadcaster.get_metrics_async().await;
1654
1655        // All metrics should start at zero
1656        assert_eq!(metrics.total_cancels, 0);
1657        assert_eq!(metrics.successful_cancels, 0);
1658        assert_eq!(metrics.failed_cancels, 0);
1659        assert_eq!(metrics.expected_rejects, 0);
1660        assert_eq!(metrics.idempotent_successes, 0);
1661
1662        // All clients should start healthy
1663        assert_eq!(metrics.healthy_clients, 4);
1664        assert_eq!(metrics.total_clients, 4);
1665    }
1666
1667    // Health-check task lifecycle test
1668    #[tokio::test]
1669    async fn test_health_check_task_lifecycle() {
1670        let config = CancelBroadcasterConfig {
1671            pool_size: 1,
1672            api_key: Some("test_key".to_string()),
1673            api_secret: Some("test_secret".to_string()),
1674            base_url: Some("https://test.example.com".to_string()),
1675            testnet: false,
1676            timeout_secs: Some(5),
1677            max_retries: None,
1678            retry_delay_ms: None,
1679            retry_delay_max_ms: None,
1680            recv_window_ms: None,
1681            max_requests_per_second: None,
1682            max_requests_per_minute: None,
1683            health_check_interval_secs: 1, // Very short interval
1684            health_check_timeout_secs: 1,
1685            expected_reject_patterns: vec![],
1686            idempotent_success_patterns: vec![],
1687        };
1688
1689        let broadcaster = CancelBroadcaster::new(config).unwrap();
1690
1691        // Start the broadcaster
1692        broadcaster.start().await.unwrap();
1693        assert!(broadcaster.running.load(Ordering::Relaxed));
1694
1695        // Verify task handle exists
1696        {
1697            let task_guard = broadcaster.health_check_task.read().await;
1698            assert!(task_guard.is_some());
1699        }
1700
1701        // Wait a bit for health check to potentially run
1702        tokio::time::sleep(Duration::from_millis(100)).await;
1703
1704        // Stop the broadcaster
1705        broadcaster.stop().await;
1706        assert!(!broadcaster.running.load(Ordering::Relaxed));
1707
1708        // Verify task handle has been cleared
1709        {
1710            let task_guard = broadcaster.health_check_task.read().await;
1711            assert!(task_guard.is_none());
1712        }
1713    }
1714}