nautilus_bitmex/execution/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Cancel request broadcaster for redundant order cancellation.
17//!
18//! This module provides the [`CancelBroadcaster`] which fans out cancel requests
19//! to multiple HTTP clients in parallel for redundancy. Key design patterns:
20//!
21//! - **Dependency injection via traits**: Uses `CancelExecutor` trait to abstract
22//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
23//! - **Trait objects over generics**: Uses `Arc<dyn CancelExecutor>` to avoid
24//!   generic type parameters on the public API (simpler Python FFI).
25//! - **Short-circuit on first success**: Aborts remaining requests once any client
26//!   succeeds, minimizing latency.
27//! - **Idempotent success handling**: Recognizes "already cancelled" responses as
28//!   successful outcomes.
29
30// TODO: Replace boxed futures in `CancelExecutor` once stable async trait object support
31// lands so we can drop the per-call heap allocation
32
33use std::{
34    sync::{
35        Arc,
36        atomic::{AtomicBool, AtomicU64, Ordering},
37    },
38    time::Duration,
39};
40
41use futures_util::future;
42use nautilus_model::{
43    enums::OrderSide,
44    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
45    instruments::InstrumentAny,
46    reports::OrderStatusReport,
47};
48use tokio::{sync::RwLock, task::JoinHandle, time::interval};
49
50use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
51
52/// Trait for order cancellation operations.
53///
54/// This trait abstracts the execution layer to enable dependency injection and testing
55/// without conditional compilation. The broadcaster holds executors as `Arc<dyn CancelExecutor>`
56/// to avoid generic type parameters that would complicate the Python FFI boundary.
57///
58/// # Thread Safety
59///
60/// All methods must be safe to call concurrently from multiple threads. Implementations
61/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
62///
63/// # Error Handling
64///
65/// Methods return `anyhow::Result` for flexibility. Implementers should provide
66/// meaningful error messages that can be logged and tracked by the broadcaster.
67///
68/// # Implementation Note
69///
70/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
71/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
72/// `Clone`) to be used without modification.
73trait CancelExecutor: Send + Sync {
74    /// Adds an instrument for caching.
75    fn add_instrument(&self, instrument: InstrumentAny);
76
77    /// Performs a health check on the executor.
78    fn health_check(
79        &self,
80    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>;
81
82    /// Cancels a single order.
83    fn cancel_order(
84        &self,
85        instrument_id: InstrumentId,
86        client_order_id: Option<ClientOrderId>,
87        venue_order_id: Option<VenueOrderId>,
88    ) -> std::pin::Pin<
89        Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
90    >;
91
92    /// Cancels multiple orders.
93    fn cancel_orders(
94        &self,
95        instrument_id: InstrumentId,
96        client_order_ids: Option<Vec<ClientOrderId>>,
97        venue_order_ids: Option<Vec<VenueOrderId>>,
98    ) -> std::pin::Pin<
99        Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
100    >;
101
102    /// Cancels all orders for an instrument.
103    fn cancel_all_orders(
104        &self,
105        instrument_id: InstrumentId,
106        order_side: Option<OrderSide>,
107    ) -> std::pin::Pin<
108        Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
109    >;
110}
111
112impl CancelExecutor for BitmexHttpClient {
113    fn add_instrument(&self, instrument: InstrumentAny) {
114        Self::add_instrument(self, instrument);
115    }
116
117    fn health_check(
118        &self,
119    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>> {
120        Box::pin(async move {
121            Self::http_get_server_time(self)
122                .await
123                .map(|_| ())
124                .map_err(|e| anyhow::anyhow!("{e}"))
125        })
126    }
127
128    fn cancel_order(
129        &self,
130        instrument_id: InstrumentId,
131        client_order_id: Option<ClientOrderId>,
132        venue_order_id: Option<VenueOrderId>,
133    ) -> std::pin::Pin<
134        Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
135    > {
136        Box::pin(async move {
137            Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
138        })
139    }
140
141    fn cancel_orders(
142        &self,
143        instrument_id: InstrumentId,
144        client_order_ids: Option<Vec<ClientOrderId>>,
145        venue_order_ids: Option<Vec<VenueOrderId>>,
146    ) -> std::pin::Pin<
147        Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
148    > {
149        Box::pin(async move {
150            Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
151        })
152    }
153
154    fn cancel_all_orders(
155        &self,
156        instrument_id: InstrumentId,
157        order_side: Option<nautilus_model::enums::OrderSide>,
158    ) -> std::pin::Pin<
159        Box<dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>,
160    > {
161        Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
162    }
163}
164
165/// Configuration for the cancel broadcaster.
166#[derive(Debug, Clone)]
167pub struct CancelBroadcasterConfig {
168    /// Number of HTTP clients in the pool.
169    pub pool_size: usize,
170    /// BitMEX API key (None will source from environment).
171    pub api_key: Option<String>,
172    /// BitMEX API secret (None will source from environment).
173    pub api_secret: Option<String>,
174    /// Base URL for BitMEX HTTP API.
175    pub base_url: Option<String>,
176    /// If connecting to BitMEX testnet.
177    pub testnet: bool,
178    /// Timeout in seconds for HTTP requests.
179    pub timeout_secs: Option<u64>,
180    /// Maximum number of retry attempts for failed requests.
181    pub max_retries: Option<u32>,
182    /// Initial delay in milliseconds between retry attempts.
183    pub retry_delay_ms: Option<u64>,
184    /// Maximum delay in milliseconds between retry attempts.
185    pub retry_delay_max_ms: Option<u64>,
186    /// Expiration window in milliseconds for signed requests.
187    pub recv_window_ms: Option<u64>,
188    /// Maximum REST burst rate (requests per second).
189    pub max_requests_per_second: Option<u32>,
190    /// Maximum REST rolling rate (requests per minute).
191    pub max_requests_per_minute: Option<u32>,
192    /// Interval in seconds between health check pings.
193    pub health_check_interval_secs: u64,
194    /// Timeout in seconds for health check requests.
195    pub health_check_timeout_secs: u64,
196    /// Substrings to identify expected cancel rejections for debug-level logging.
197    pub expected_reject_patterns: Vec<String>,
198    /// Substrings to identify idempotent success (order already cancelled/not found).
199    pub idempotent_success_patterns: Vec<String>,
200}
201
202impl Default for CancelBroadcasterConfig {
203    fn default() -> Self {
204        Self {
205            pool_size: 2,
206            api_key: None,
207            api_secret: None,
208            base_url: None,
209            testnet: false,
210            timeout_secs: Some(60),
211            max_retries: None,
212            retry_delay_ms: Some(1_000),
213            retry_delay_max_ms: Some(5_000),
214            recv_window_ms: Some(10_000),
215            max_requests_per_second: Some(10),
216            max_requests_per_minute: Some(120),
217            health_check_interval_secs: 30,
218            health_check_timeout_secs: 5,
219            expected_reject_patterns: vec![
220                r"Order had execInst of ParticipateDoNotInitiate".to_string(),
221            ],
222            idempotent_success_patterns: vec![
223                r"AlreadyCanceled".to_string(),
224                r"orderID not found".to_string(),
225                r"Unable to cancel order due to existing state".to_string(),
226            ],
227        }
228    }
229}
230
231/// Transport client wrapper with health monitoring.
232#[derive(Clone)]
233struct TransportClient {
234    /// Executor wrapped in Arc to enable cloning without requiring Clone on CancelExecutor.
235    ///
236    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
237    /// the executor across multiple TransportClient clones.
238    executor: Arc<dyn CancelExecutor>,
239    client_id: String,
240    healthy: Arc<AtomicBool>,
241    cancel_count: Arc<AtomicU64>,
242    error_count: Arc<AtomicU64>,
243}
244
245impl std::fmt::Debug for TransportClient {
246    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
247        f.debug_struct("TransportClient")
248            .field("client_id", &self.client_id)
249            .field("healthy", &self.healthy)
250            .field("cancel_count", &self.cancel_count)
251            .field("error_count", &self.error_count)
252            .finish()
253    }
254}
255
256impl TransportClient {
257    fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
258        Self {
259            executor: Arc::new(executor),
260            client_id,
261            healthy: Arc::new(AtomicBool::new(true)),
262            cancel_count: Arc::new(AtomicU64::new(0)),
263            error_count: Arc::new(AtomicU64::new(0)),
264        }
265    }
266
267    fn is_healthy(&self) -> bool {
268        self.healthy.load(Ordering::Relaxed)
269    }
270
271    fn mark_healthy(&self) {
272        self.healthy.store(true, Ordering::Relaxed);
273    }
274
275    fn mark_unhealthy(&self) {
276        self.healthy.store(false, Ordering::Relaxed);
277    }
278
279    async fn health_check(&self, timeout_secs: u64) -> bool {
280        match tokio::time::timeout(
281            Duration::from_secs(timeout_secs),
282            self.executor.health_check(),
283        )
284        .await
285        {
286            Ok(Ok(_)) => {
287                self.mark_healthy();
288                true
289            }
290            Ok(Err(e)) => {
291                tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
292                self.mark_unhealthy();
293                false
294            }
295            Err(_) => {
296                tracing::warn!("Health check timeout for client {}", self.client_id);
297                self.mark_unhealthy();
298                false
299            }
300        }
301    }
302
303    async fn cancel_order(
304        &self,
305        instrument_id: InstrumentId,
306        client_order_id: Option<ClientOrderId>,
307        venue_order_id: Option<VenueOrderId>,
308    ) -> anyhow::Result<OrderStatusReport> {
309        self.cancel_count.fetch_add(1, Ordering::Relaxed);
310
311        match self
312            .executor
313            .cancel_order(instrument_id, client_order_id, venue_order_id)
314            .await
315        {
316            Ok(report) => {
317                self.mark_healthy();
318                Ok(report)
319            }
320            Err(e) => {
321                self.error_count.fetch_add(1, Ordering::Relaxed);
322                Err(e)
323            }
324        }
325    }
326
327    fn get_cancel_count(&self) -> u64 {
328        self.cancel_count.load(Ordering::Relaxed)
329    }
330
331    fn get_error_count(&self) -> u64 {
332        self.error_count.load(Ordering::Relaxed)
333    }
334}
335
336/// Broadcasts cancel requests to multiple HTTP clients for redundancy.
337///
338/// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
339/// in parallel, short-circuits when the first successful acknowledgement is received,
340/// and handles expected rejection patterns with appropriate log levels.
341#[cfg_attr(feature = "python", pyo3::pyclass)]
342#[derive(Debug)]
343pub struct CancelBroadcaster {
344    config: CancelBroadcasterConfig,
345    transports: Arc<RwLock<Vec<TransportClient>>>,
346    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
347    running: Arc<AtomicBool>,
348    total_cancels: Arc<AtomicU64>,
349    successful_cancels: Arc<AtomicU64>,
350    failed_cancels: Arc<AtomicU64>,
351    expected_rejects: Arc<AtomicU64>,
352    idempotent_successes: Arc<AtomicU64>,
353}
354
355impl CancelBroadcaster {
356    /// Creates a new [`CancelBroadcaster`] with internal HTTP client pool.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if any HTTP client fails to initialize.
361    pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
362        let mut transports = Vec::with_capacity(config.pool_size);
363
364        // Synthesize base_url when testnet is true but base_url is None
365        let base_url = if config.testnet && config.base_url.is_none() {
366            Some(BITMEX_HTTP_TESTNET_URL.to_string())
367        } else {
368            config.base_url.clone()
369        };
370
371        for i in 0..config.pool_size {
372            let client = BitmexHttpClient::with_credentials(
373                config.api_key.clone(),
374                config.api_secret.clone(),
375                base_url.clone(),
376                config.timeout_secs,
377                config.max_retries,
378                config.retry_delay_ms,
379                config.retry_delay_max_ms,
380                config.recv_window_ms,
381                config.max_requests_per_second,
382                config.max_requests_per_minute,
383            )
384            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
385
386            transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
387        }
388
389        Ok(Self {
390            config,
391            transports: Arc::new(RwLock::new(transports)),
392            health_check_task: Arc::new(RwLock::new(None)),
393            running: Arc::new(AtomicBool::new(false)),
394            total_cancels: Arc::new(AtomicU64::new(0)),
395            successful_cancels: Arc::new(AtomicU64::new(0)),
396            failed_cancels: Arc::new(AtomicU64::new(0)),
397            expected_rejects: Arc::new(AtomicU64::new(0)),
398            idempotent_successes: Arc::new(AtomicU64::new(0)),
399        })
400    }
401
402    /// Starts the broadcaster and health check loop.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the broadcaster is already running.
407    pub async fn start(&self) -> anyhow::Result<()> {
408        if self.running.load(Ordering::Relaxed) {
409            return Ok(());
410        }
411
412        self.running.store(true, Ordering::Relaxed);
413
414        // Initial health check for all clients
415        self.run_health_checks().await;
416
417        // Start periodic health check task
418        let transports = Arc::clone(&self.transports);
419        let running = Arc::clone(&self.running);
420        let interval_secs = self.config.health_check_interval_secs;
421        let timeout_secs = self.config.health_check_timeout_secs;
422
423        let task = tokio::spawn(async move {
424            let mut ticker = interval(Duration::from_secs(interval_secs));
425            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
426
427            loop {
428                ticker.tick().await;
429
430                if !running.load(Ordering::Relaxed) {
431                    break;
432                }
433
434                let transports_guard = transports.read().await;
435                let transports_clone: Vec<_> = transports_guard.clone();
436                drop(transports_guard);
437
438                let tasks: Vec<_> = transports_clone
439                    .iter()
440                    .map(|t| t.health_check(timeout_secs))
441                    .collect();
442
443                let results = future::join_all(tasks).await;
444                let healthy_count = results.iter().filter(|&&r| r).count();
445
446                tracing::debug!(
447                    "Health check complete: {}/{} clients healthy",
448                    healthy_count,
449                    results.len()
450                );
451            }
452        });
453
454        *self.health_check_task.write().await = Some(task);
455
456        tracing::info!(
457            "CancelBroadcaster started with {} clients",
458            self.transports.read().await.len()
459        );
460
461        Ok(())
462    }
463
464    /// Stops the broadcaster and health check loop.
465    pub async fn stop(&self) {
466        if !self.running.load(Ordering::Relaxed) {
467            return;
468        }
469
470        self.running.store(false, Ordering::Relaxed);
471
472        if let Some(task) = self.health_check_task.write().await.take() {
473            task.abort();
474        }
475
476        tracing::info!("CancelBroadcaster stopped");
477    }
478
479    async fn run_health_checks(&self) {
480        let transports_guard = self.transports.read().await;
481        let transports_clone: Vec<_> = transports_guard.clone();
482        drop(transports_guard);
483
484        let tasks: Vec<_> = transports_clone
485            .iter()
486            .map(|t| t.health_check(self.config.health_check_timeout_secs))
487            .collect();
488
489        let results = future::join_all(tasks).await;
490        let healthy_count = results.iter().filter(|&&r| r).count();
491
492        tracing::debug!(
493            "Health check complete: {}/{} clients healthy",
494            healthy_count,
495            results.len()
496        );
497    }
498
499    fn is_expected_reject(&self, error_message: &str) -> bool {
500        self.config
501            .expected_reject_patterns
502            .iter()
503            .any(|pattern| error_message.contains(pattern))
504    }
505
506    fn is_idempotent_success(&self, error_message: &str) -> bool {
507        self.config
508            .idempotent_success_patterns
509            .iter()
510            .any(|pattern| error_message.contains(pattern))
511    }
512
513    /// Processes cancel request results, handling success, idempotent success, and failures.
514    ///
515    /// This helper consolidates the common error handling loop used across all broadcast methods.
516    async fn process_cancel_results<T>(
517        &self,
518        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
519        idempotent_result: impl FnOnce() -> anyhow::Result<T>,
520        operation: &str,
521        params: String,
522        idempotent_reason: &str,
523    ) -> anyhow::Result<T>
524    where
525        T: Send + 'static,
526    {
527        let mut errors = Vec::new();
528
529        while !handles.is_empty() {
530            let current_handles = std::mem::take(&mut handles);
531            let (result, _idx, remaining) = future::select_all(current_handles).await;
532            handles = remaining.into_iter().collect();
533
534            match result {
535                Ok((client_id, Ok(result))) => {
536                    // First success - abort remaining handles
537                    for handle in &handles {
538                        handle.abort();
539                    }
540                    self.successful_cancels.fetch_add(1, Ordering::Relaxed);
541                    tracing::debug!(
542                        "{} broadcast succeeded [{}] {}",
543                        operation,
544                        client_id,
545                        params
546                    );
547                    return Ok(result);
548                }
549                Ok((client_id, Err(e))) => {
550                    let error_msg = e.to_string();
551
552                    if self.is_idempotent_success(&error_msg) {
553                        // First idempotent success - abort remaining handles and return success
554                        for handle in &handles {
555                            handle.abort();
556                        }
557                        self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
558                        tracing::debug!(
559                            "Idempotent success [{}] - {}: {} {}",
560                            client_id,
561                            idempotent_reason,
562                            error_msg,
563                            params
564                        );
565                        return idempotent_result();
566                    }
567
568                    if self.is_expected_reject(&error_msg) {
569                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
570                        tracing::debug!(
571                            "Expected {} rejection [{}]: {} {}",
572                            operation.to_lowercase(),
573                            client_id,
574                            error_msg,
575                            params
576                        );
577                        errors.push(error_msg);
578                    } else {
579                        tracing::warn!(
580                            "{} request failed [{}]: {} {}",
581                            operation,
582                            client_id,
583                            error_msg,
584                            params
585                        );
586                        errors.push(error_msg);
587                    }
588                }
589                Err(e) => {
590                    tracing::warn!("{} task join error: {e:?}", operation);
591                    errors.push(format!("Task panicked: {e:?}"));
592                }
593            }
594        }
595
596        // All tasks failed
597        self.failed_cancels.fetch_add(1, Ordering::Relaxed);
598        tracing::error!(
599            "All {} requests failed: {:?} {}",
600            operation.to_lowercase(),
601            errors,
602            params
603        );
604        Err(anyhow::anyhow!(
605            "All {} requests failed: {:?}",
606            operation.to_lowercase(),
607            errors
608        ))
609    }
610
611    /// Broadcasts a single cancel request to all healthy clients in parallel.
612    ///
613    /// # Returns
614    ///
615    /// - `Ok(Some(report))` if successfully cancelled with a report.
616    /// - `Ok(None)` if the order was already cancelled (idempotent success).
617    /// - `Err` if all requests failed.
618    ///
619    /// # Errors
620    ///
621    /// Returns an error if all cancel requests fail or no healthy clients are available.
622    pub async fn broadcast_cancel(
623        &self,
624        instrument_id: InstrumentId,
625        client_order_id: Option<ClientOrderId>,
626        venue_order_id: Option<VenueOrderId>,
627    ) -> anyhow::Result<Option<OrderStatusReport>> {
628        self.total_cancels.fetch_add(1, Ordering::Relaxed);
629
630        // Filter for healthy clients and clone them
631        let transports_guard = self.transports.read().await;
632        let healthy_transports: Vec<TransportClient> = transports_guard
633            .iter()
634            .filter(|t| t.is_healthy())
635            .cloned()
636            .collect();
637        drop(transports_guard);
638
639        if healthy_transports.is_empty() {
640            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
641            anyhow::bail!("No healthy transport clients available");
642        }
643
644        // Spawn tasks for all healthy clients
645        let mut handles = Vec::new();
646        for transport in healthy_transports {
647            let handle = tokio::spawn(async move {
648                let client_id = transport.client_id.clone();
649                let result = transport
650                    .cancel_order(instrument_id, client_order_id, venue_order_id)
651                    .await
652                    .map(Some); // Wrap success in Some for Option<OrderStatusReport>
653                (client_id, result)
654            });
655            handles.push(handle);
656        }
657
658        self.process_cancel_results(
659            handles,
660            || Ok(None),
661            "Cancel",
662            format!(
663                "(client_order_id={:?}, venue_order_id={:?})",
664                client_order_id, venue_order_id
665            ),
666            "order already cancelled/not found",
667        )
668        .await
669    }
670
671    /// Broadcasts a batch cancel request to all healthy clients in parallel.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if all cancel requests fail or no healthy clients are available.
676    pub async fn broadcast_batch_cancel(
677        &self,
678        instrument_id: InstrumentId,
679        client_order_ids: Option<Vec<ClientOrderId>>,
680        venue_order_ids: Option<Vec<VenueOrderId>>,
681    ) -> anyhow::Result<Vec<OrderStatusReport>> {
682        self.total_cancels.fetch_add(1, Ordering::Relaxed);
683
684        // Filter for healthy clients and clone them
685        let transports_guard = self.transports.read().await;
686        let healthy_transports: Vec<TransportClient> = transports_guard
687            .iter()
688            .filter(|t| t.is_healthy())
689            .cloned()
690            .collect();
691        drop(transports_guard);
692
693        if healthy_transports.is_empty() {
694            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
695            anyhow::bail!("No healthy transport clients available");
696        }
697
698        // Spawn tasks for all healthy clients
699        let mut handles = Vec::new();
700
701        for transport in healthy_transports {
702            let client_order_ids_clone = client_order_ids.clone();
703            let venue_order_ids_clone = venue_order_ids.clone();
704            let handle = tokio::spawn(async move {
705                let client_id = transport.client_id.clone();
706                let result = transport
707                    .executor
708                    .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
709                    .await;
710                (client_id, result)
711            });
712            handles.push(handle);
713        }
714
715        self.process_cancel_results(
716            handles,
717            || Ok(Vec::new()),
718            "Batch cancel",
719            format!(
720                "(client_order_ids={:?}, venue_order_ids={:?})",
721                client_order_ids, venue_order_ids
722            ),
723            "orders already cancelled/not found",
724        )
725        .await
726    }
727
728    /// Broadcasts a cancel all request to all healthy clients in parallel.
729    ///
730    /// # Errors
731    ///
732    /// Returns an error if all cancel requests fail or no healthy clients are available.
733    pub async fn broadcast_cancel_all(
734        &self,
735        instrument_id: InstrumentId,
736        order_side: Option<nautilus_model::enums::OrderSide>,
737    ) -> anyhow::Result<Vec<OrderStatusReport>> {
738        self.total_cancels.fetch_add(1, Ordering::Relaxed);
739
740        // Filter for healthy clients and clone them
741        let transports_guard = self.transports.read().await;
742        let healthy_transports: Vec<TransportClient> = transports_guard
743            .iter()
744            .filter(|t| t.is_healthy())
745            .cloned()
746            .collect();
747        drop(transports_guard);
748
749        if healthy_transports.is_empty() {
750            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
751            anyhow::bail!("No healthy transport clients available");
752        }
753
754        // Spawn tasks for all healthy clients
755        let mut handles = Vec::new();
756        for transport in healthy_transports {
757            let handle = tokio::spawn(async move {
758                let client_id = transport.client_id.clone();
759                let result = transport
760                    .executor
761                    .cancel_all_orders(instrument_id, order_side)
762                    .await;
763                (client_id, result)
764            });
765            handles.push(handle);
766        }
767
768        self.process_cancel_results(
769            handles,
770            || Ok(Vec::new()),
771            "Cancel all",
772            format!(
773                "(instrument_id={}, order_side={:?})",
774                instrument_id, order_side
775            ),
776            "no orders to cancel",
777        )
778        .await
779    }
780
781    /// Gets broadcaster metrics.
782    pub fn get_metrics(&self) -> BroadcasterMetrics {
783        let transports_guard = self.transports.blocking_read();
784        let healthy_clients = transports_guard.iter().filter(|t| t.is_healthy()).count();
785        let total_clients = transports_guard.len();
786        drop(transports_guard);
787
788        BroadcasterMetrics {
789            total_cancels: self.total_cancels.load(Ordering::Relaxed),
790            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
791            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
792            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
793            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
794            healthy_clients,
795            total_clients,
796        }
797    }
798
799    /// Gets broadcaster metrics (async version for use within async context).
800    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
801        let transports_guard = self.transports.read().await;
802        let healthy_clients = transports_guard.iter().filter(|t| t.is_healthy()).count();
803        let total_clients = transports_guard.len();
804        drop(transports_guard);
805
806        BroadcasterMetrics {
807            total_cancels: self.total_cancels.load(Ordering::Relaxed),
808            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
809            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
810            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
811            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
812            healthy_clients,
813            total_clients,
814        }
815    }
816
817    /// Gets per-client statistics.
818    pub fn get_client_stats(&self) -> Vec<ClientStats> {
819        let transports = self.transports.blocking_read();
820        transports
821            .iter()
822            .map(|t| ClientStats {
823                client_id: t.client_id.clone(),
824                healthy: t.is_healthy(),
825                cancel_count: t.get_cancel_count(),
826                error_count: t.get_error_count(),
827            })
828            .collect()
829    }
830
831    /// Gets per-client statistics (async version for use within async context).
832    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
833        let transports = self.transports.read().await;
834        transports
835            .iter()
836            .map(|t| ClientStats {
837                client_id: t.client_id.clone(),
838                healthy: t.is_healthy(),
839                cancel_count: t.get_cancel_count(),
840                error_count: t.get_error_count(),
841            })
842            .collect()
843    }
844
845    /// Adds an instrument to all HTTP clients in the pool for caching.
846    pub fn add_instrument(&self, instrument: nautilus_model::instruments::any::InstrumentAny) {
847        let transports = self.transports.blocking_read();
848        for transport in transports.iter() {
849            transport.executor.add_instrument(instrument.clone());
850        }
851    }
852
853    pub fn clone_for_async(&self) -> Self {
854        Self {
855            config: self.config.clone(),
856            transports: Arc::clone(&self.transports),
857            health_check_task: Arc::clone(&self.health_check_task),
858            running: Arc::clone(&self.running),
859            total_cancels: Arc::clone(&self.total_cancels),
860            successful_cancels: Arc::clone(&self.successful_cancels),
861            failed_cancels: Arc::clone(&self.failed_cancels),
862            expected_rejects: Arc::clone(&self.expected_rejects),
863            idempotent_successes: Arc::clone(&self.idempotent_successes),
864        }
865    }
866
867    #[cfg(test)]
868    fn new_with_transports(
869        config: CancelBroadcasterConfig,
870        transports: Vec<TransportClient>,
871    ) -> Self {
872        Self {
873            config,
874            transports: Arc::new(RwLock::new(transports)),
875            health_check_task: Arc::new(RwLock::new(None)),
876            running: Arc::new(AtomicBool::new(false)),
877            total_cancels: Arc::new(AtomicU64::new(0)),
878            successful_cancels: Arc::new(AtomicU64::new(0)),
879            failed_cancels: Arc::new(AtomicU64::new(0)),
880            expected_rejects: Arc::new(AtomicU64::new(0)),
881            idempotent_successes: Arc::new(AtomicU64::new(0)),
882        }
883    }
884}
885
886/// Broadcaster metrics snapshot.
887#[derive(Debug, Clone)]
888pub struct BroadcasterMetrics {
889    pub total_cancels: u64,
890    pub successful_cancels: u64,
891    pub failed_cancels: u64,
892    pub expected_rejects: u64,
893    pub idempotent_successes: u64,
894    pub healthy_clients: usize,
895    pub total_clients: usize,
896}
897
898/// Per-client statistics.
899#[derive(Debug, Clone)]
900pub struct ClientStats {
901    pub client_id: String,
902    pub healthy: bool,
903    pub cancel_count: u64,
904    pub error_count: u64,
905}
906
907////////////////////////////////////////////////////////////////////////////////
908// Tests
909////////////////////////////////////////////////////////////////////////////////
910
911#[cfg(test)]
912mod tests {
913    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
914
915    use nautilus_core::UUID4;
916    use nautilus_model::{
917        enums::{
918            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
919        },
920        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
921        reports::OrderStatusReport,
922        types::{Price, Quantity},
923    };
924
925    use super::*;
926
927    /// Mock executor for testing.
928    #[derive(Clone)]
929    #[allow(clippy::type_complexity)]
930    struct MockExecutor {
931        handler: Arc<
932            dyn Fn(
933                    InstrumentId,
934                    Option<ClientOrderId>,
935                    Option<VenueOrderId>,
936                ) -> std::pin::Pin<
937                    Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send>,
938                > + Send
939                + Sync,
940        >,
941    }
942
943    impl MockExecutor {
944        fn new<F, Fut>(handler: F) -> Self
945        where
946            F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
947                + Send
948                + Sync
949                + 'static,
950            Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
951        {
952            Self {
953                handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
954            }
955        }
956    }
957
958    impl CancelExecutor for MockExecutor {
959        fn health_check(
960            &self,
961        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = anyhow::Result<()>> + Send + '_>>
962        {
963            Box::pin(async { Ok(()) })
964        }
965
966        fn cancel_order(
967            &self,
968            instrument_id: InstrumentId,
969            client_order_id: Option<ClientOrderId>,
970            venue_order_id: Option<VenueOrderId>,
971        ) -> std::pin::Pin<
972            Box<dyn std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>,
973        > {
974            (self.handler)(instrument_id, client_order_id, venue_order_id)
975        }
976
977        fn cancel_orders(
978            &self,
979            _instrument_id: InstrumentId,
980            _client_order_ids: Option<Vec<ClientOrderId>>,
981            _venue_order_ids: Option<Vec<VenueOrderId>>,
982        ) -> std::pin::Pin<
983            Box<
984                dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
985                    + Send
986                    + '_,
987            >,
988        > {
989            Box::pin(async { Ok(Vec::new()) })
990        }
991
992        fn cancel_all_orders(
993            &self,
994            instrument_id: InstrumentId,
995            _order_side: Option<nautilus_model::enums::OrderSide>,
996        ) -> std::pin::Pin<
997            Box<
998                dyn std::future::Future<Output = anyhow::Result<Vec<OrderStatusReport>>>
999                    + Send
1000                    + '_,
1001            >,
1002        > {
1003            // Try to get result from the single-order handler to propagate errors
1004            let handler = Arc::clone(&self.handler);
1005            Box::pin(async move {
1006                // Call the handler to check if it would fail
1007                let result = handler(instrument_id, None, None).await;
1008                match result {
1009                    Ok(_) => Ok(Vec::new()),
1010                    Err(e) => Err(e),
1011                }
1012            })
1013        }
1014
1015        fn add_instrument(&self, _instrument: nautilus_model::instruments::any::InstrumentAny) {
1016            // No-op for mock
1017        }
1018    }
1019
1020    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
1021        OrderStatusReport {
1022            account_id: AccountId::from("BITMEX-001"),
1023            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
1024            venue_order_id: VenueOrderId::from(venue_order_id),
1025            order_side: OrderSide::Buy,
1026            order_type: OrderType::Limit,
1027            time_in_force: TimeInForce::Gtc,
1028            order_status: OrderStatus::Canceled,
1029            price: Some(Price::new(50000.0, 2)),
1030            quantity: Quantity::new(100.0, 0),
1031            filled_qty: Quantity::new(0.0, 0),
1032            report_id: UUID4::new(),
1033            ts_accepted: 0.into(),
1034            ts_last: 0.into(),
1035            ts_init: 0.into(),
1036            client_order_id: None,
1037            avg_px: None,
1038            trigger_price: None,
1039            trigger_type: None,
1040            contingency_type: ContingencyType::NoContingency,
1041            expire_time: None,
1042            order_list_id: None,
1043            venue_position_id: None,
1044            linked_order_ids: None,
1045            parent_order_id: None,
1046            display_qty: None,
1047            limit_offset: None,
1048            trailing_offset: None,
1049            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
1050            post_only: false,
1051            reduce_only: false,
1052            cancel_reason: None,
1053            ts_triggered: None,
1054        }
1055    }
1056
1057    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
1058    where
1059        F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
1060            + Send
1061            + Sync
1062            + 'static,
1063        Fut: std::future::Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
1064    {
1065        let executor = MockExecutor::new(handler);
1066        TransportClient::new(executor, client_id.to_string())
1067    }
1068
1069    #[tokio::test]
1070    async fn test_broadcast_cancel_immediate_success() {
1071        let report = create_test_report("ORDER-1");
1072        let report_clone = report.clone();
1073
1074        let transports = vec![
1075            create_stub_transport("client-0", move |_, _, _| {
1076                let report = report_clone.clone();
1077                async move { Ok(report) }
1078            }),
1079            create_stub_transport("client-1", |_, _, _| async {
1080                tokio::time::sleep(Duration::from_secs(10)).await;
1081                anyhow::bail!("Should be aborted")
1082            }),
1083        ];
1084
1085        let config = CancelBroadcasterConfig::default();
1086        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1087
1088        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1089        let result = broadcaster
1090            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1091            .await;
1092
1093        assert!(result.is_ok());
1094        let returned_report = result.unwrap();
1095        assert!(returned_report.is_some());
1096        assert_eq!(
1097            returned_report.unwrap().venue_order_id,
1098            report.venue_order_id
1099        );
1100
1101        let metrics = broadcaster.get_metrics_async().await;
1102        assert_eq!(metrics.successful_cancels, 1);
1103        assert_eq!(metrics.failed_cancels, 0);
1104        assert_eq!(metrics.total_cancels, 1);
1105    }
1106
1107    #[tokio::test]
1108    async fn test_broadcast_cancel_idempotent_success() {
1109        let transports = vec![
1110            create_stub_transport("client-0", |_, _, _| async {
1111                anyhow::bail!("AlreadyCanceled")
1112            }),
1113            create_stub_transport("client-1", |_, _, _| async {
1114                tokio::time::sleep(Duration::from_secs(10)).await;
1115                anyhow::bail!("Should be aborted")
1116            }),
1117        ];
1118
1119        let config = CancelBroadcasterConfig::default();
1120        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1121
1122        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1123        let result = broadcaster
1124            .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1125            .await;
1126
1127        assert!(result.is_ok());
1128        assert!(result.unwrap().is_none());
1129
1130        let metrics = broadcaster.get_metrics_async().await;
1131        assert_eq!(metrics.idempotent_successes, 1);
1132        assert_eq!(metrics.successful_cancels, 0);
1133        assert_eq!(metrics.failed_cancels, 0);
1134    }
1135
1136    #[tokio::test]
1137    async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1138        let transports = vec![
1139            create_stub_transport("client-0", |_, _, _| async {
1140                anyhow::bail!("502 Bad Gateway")
1141            }),
1142            create_stub_transport("client-1", |_, _, _| async {
1143                anyhow::bail!("orderID not found")
1144            }),
1145        ];
1146
1147        let config = CancelBroadcasterConfig::default();
1148        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1149
1150        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1151        let result = broadcaster
1152            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1153            .await;
1154
1155        assert!(result.is_ok());
1156        assert!(result.unwrap().is_none());
1157
1158        let metrics = broadcaster.get_metrics_async().await;
1159        assert_eq!(metrics.idempotent_successes, 1);
1160        assert_eq!(metrics.failed_cancels, 0);
1161    }
1162
1163    #[tokio::test]
1164    async fn test_broadcast_cancel_all_failures() {
1165        let transports = vec![
1166            create_stub_transport("client-0", |_, _, _| async {
1167                anyhow::bail!("502 Bad Gateway")
1168            }),
1169            create_stub_transport("client-1", |_, _, _| async {
1170                anyhow::bail!("Connection refused")
1171            }),
1172        ];
1173
1174        let config = CancelBroadcasterConfig::default();
1175        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1176
1177        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1178        let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1179
1180        assert!(result.is_err());
1181        assert!(
1182            result
1183                .unwrap_err()
1184                .to_string()
1185                .contains("All cancel all requests failed")
1186        );
1187
1188        let metrics = broadcaster.get_metrics_async().await;
1189        assert_eq!(metrics.failed_cancels, 1);
1190        assert_eq!(metrics.successful_cancels, 0);
1191        assert_eq!(metrics.idempotent_successes, 0);
1192    }
1193
1194    #[tokio::test]
1195    async fn test_broadcast_cancel_no_healthy_clients() {
1196        let transport = create_stub_transport("client-0", |_, _, _| async {
1197            Ok(create_test_report("ORDER-1"))
1198        });
1199        transport.healthy.store(false, Ordering::Relaxed);
1200
1201        let config = CancelBroadcasterConfig::default();
1202        let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1203
1204        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1205        let result = broadcaster
1206            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1207            .await;
1208
1209        assert!(result.is_err());
1210        assert!(
1211            result
1212                .unwrap_err()
1213                .to_string()
1214                .contains("No healthy transport clients available")
1215        );
1216
1217        let metrics = broadcaster.get_metrics_async().await;
1218        assert_eq!(metrics.failed_cancels, 1);
1219    }
1220
1221    #[tokio::test]
1222    async fn test_broadcast_cancel_metrics_increment() {
1223        let report1 = create_test_report("ORDER-1");
1224        let report1_clone = report1.clone();
1225        let report2 = create_test_report("ORDER-2");
1226        let report2_clone = report2.clone();
1227
1228        let transports = vec![
1229            create_stub_transport("client-0", move |_, _, _| {
1230                let report = report1_clone.clone();
1231                async move { Ok(report) }
1232            }),
1233            create_stub_transport("client-1", move |_, _, _| {
1234                let report = report2_clone.clone();
1235                async move { Ok(report) }
1236            }),
1237        ];
1238
1239        let config = CancelBroadcasterConfig::default();
1240        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1241
1242        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1243
1244        let _ = broadcaster
1245            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1246            .await;
1247
1248        let _ = broadcaster
1249            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1250            .await;
1251
1252        let metrics = broadcaster.get_metrics_async().await;
1253        assert_eq!(metrics.total_cancels, 2);
1254        assert_eq!(metrics.successful_cancels, 2);
1255    }
1256
1257    #[tokio::test]
1258    async fn test_broadcast_cancel_expected_reject_pattern() {
1259        let transports = vec![
1260            create_stub_transport("client-0", |_, _, _| async {
1261                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1262            }),
1263            create_stub_transport("client-1", |_, _, _| async {
1264                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1265            }),
1266        ];
1267
1268        let config = CancelBroadcasterConfig::default();
1269        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1270
1271        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1272        let result = broadcaster
1273            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1274            .await;
1275
1276        assert!(result.is_err());
1277
1278        let metrics = broadcaster.get_metrics_async().await;
1279        assert_eq!(metrics.expected_rejects, 2);
1280        assert_eq!(metrics.failed_cancels, 1);
1281    }
1282
1283    #[tokio::test]
1284    async fn test_broadcaster_creation_with_pool() {
1285        let config = CancelBroadcasterConfig {
1286            pool_size: 3,
1287            api_key: Some("test_key".to_string()),
1288            api_secret: Some("test_secret".to_string()),
1289            base_url: Some("https://test.example.com".to_string()),
1290            testnet: false,
1291            timeout_secs: Some(5),
1292            max_retries: Some(2),
1293            retry_delay_ms: Some(100),
1294            retry_delay_max_ms: Some(1000),
1295            recv_window_ms: Some(5000),
1296            max_requests_per_second: Some(10),
1297            max_requests_per_minute: Some(100),
1298            health_check_interval_secs: 30,
1299            health_check_timeout_secs: 5,
1300            expected_reject_patterns: vec!["test_pattern".to_string()],
1301            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1302        };
1303
1304        let broadcaster = CancelBroadcaster::new(config.clone());
1305        assert!(broadcaster.is_ok());
1306
1307        let broadcaster = broadcaster.unwrap();
1308        let metrics = broadcaster.get_metrics_async().await;
1309
1310        assert_eq!(metrics.total_clients, 3);
1311        assert_eq!(metrics.total_cancels, 0);
1312        assert_eq!(metrics.successful_cancels, 0);
1313        assert_eq!(metrics.failed_cancels, 0);
1314    }
1315
1316    #[tokio::test]
1317    async fn test_broadcaster_lifecycle() {
1318        let config = CancelBroadcasterConfig {
1319            pool_size: 2,
1320            api_key: Some("test_key".to_string()),
1321            api_secret: Some("test_secret".to_string()),
1322            base_url: Some("https://test.example.com".to_string()),
1323            testnet: false,
1324            timeout_secs: Some(5),
1325            max_retries: None,
1326            retry_delay_ms: None,
1327            retry_delay_max_ms: None,
1328            recv_window_ms: None,
1329            max_requests_per_second: None,
1330            max_requests_per_minute: None,
1331            health_check_interval_secs: 60, // Long interval so it won't tick during test
1332            health_check_timeout_secs: 1,
1333            expected_reject_patterns: vec![],
1334            idempotent_success_patterns: vec![],
1335        };
1336
1337        let broadcaster = CancelBroadcaster::new(config).unwrap();
1338
1339        // Should not be running initially
1340        assert!(!broadcaster.running.load(Ordering::Relaxed));
1341
1342        // Start broadcaster
1343        let start_result = broadcaster.start().await;
1344        assert!(start_result.is_ok());
1345        assert!(broadcaster.running.load(Ordering::Relaxed));
1346
1347        // Starting again should be idempotent
1348        let start_again = broadcaster.start().await;
1349        assert!(start_again.is_ok());
1350
1351        // Stop broadcaster
1352        broadcaster.stop().await;
1353        assert!(!broadcaster.running.load(Ordering::Relaxed));
1354
1355        // Stopping again should be safe
1356        broadcaster.stop().await;
1357        assert!(!broadcaster.running.load(Ordering::Relaxed));
1358    }
1359
1360    #[tokio::test]
1361    async fn test_client_stats_collection() {
1362        let config = CancelBroadcasterConfig {
1363            pool_size: 2,
1364            api_key: Some("test_key".to_string()),
1365            api_secret: Some("test_secret".to_string()),
1366            base_url: Some("https://test.example.com".to_string()),
1367            testnet: false,
1368            timeout_secs: Some(5),
1369            max_retries: None,
1370            retry_delay_ms: None,
1371            retry_delay_max_ms: None,
1372            recv_window_ms: None,
1373            max_requests_per_second: None,
1374            max_requests_per_minute: None,
1375            health_check_interval_secs: 60,
1376            health_check_timeout_secs: 5,
1377            expected_reject_patterns: vec![],
1378            idempotent_success_patterns: vec![],
1379        };
1380
1381        let broadcaster = CancelBroadcaster::new(config).unwrap();
1382        let stats = broadcaster.get_client_stats_async().await;
1383
1384        assert_eq!(stats.len(), 2);
1385        assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1386        assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1387        assert!(stats[0].healthy); // Should be healthy initially
1388        assert!(stats[1].healthy);
1389        assert_eq!(stats[0].cancel_count, 0);
1390        assert_eq!(stats[1].cancel_count, 0);
1391        assert_eq!(stats[0].error_count, 0);
1392        assert_eq!(stats[1].error_count, 0);
1393    }
1394
1395    #[tokio::test]
1396    async fn test_testnet_config_sets_base_url() {
1397        let config = CancelBroadcasterConfig {
1398            pool_size: 1,
1399            api_key: Some("test_key".to_string()),
1400            api_secret: Some("test_secret".to_string()),
1401            base_url: None, // Not specified
1402            testnet: true,  // But testnet is true
1403            timeout_secs: Some(5),
1404            max_retries: None,
1405            retry_delay_ms: None,
1406            retry_delay_max_ms: None,
1407            recv_window_ms: None,
1408            max_requests_per_second: None,
1409            max_requests_per_minute: None,
1410            health_check_interval_secs: 60,
1411            health_check_timeout_secs: 5,
1412            expected_reject_patterns: vec![],
1413            idempotent_success_patterns: vec![],
1414        };
1415
1416        let broadcaster = CancelBroadcaster::new(config);
1417        assert!(broadcaster.is_ok());
1418    }
1419
1420    #[tokio::test]
1421    async fn test_default_config() {
1422        let config = CancelBroadcasterConfig {
1423            api_key: Some("test_key".to_string()),
1424            api_secret: Some("test_secret".to_string()),
1425            base_url: Some("https://test.example.com".to_string()),
1426            ..Default::default()
1427        };
1428
1429        let broadcaster = CancelBroadcaster::new(config);
1430        assert!(broadcaster.is_ok());
1431
1432        let broadcaster = broadcaster.unwrap();
1433        let metrics = broadcaster.get_metrics_async().await;
1434
1435        // Default pool_size is 2
1436        assert_eq!(metrics.total_clients, 2);
1437    }
1438
1439    #[tokio::test]
1440    async fn test_clone_for_async() {
1441        let config = CancelBroadcasterConfig {
1442            pool_size: 1,
1443            api_key: Some("test_key".to_string()),
1444            api_secret: Some("test_secret".to_string()),
1445            base_url: Some("https://test.example.com".to_string()),
1446            testnet: false,
1447            timeout_secs: Some(5),
1448            max_retries: None,
1449            retry_delay_ms: None,
1450            retry_delay_max_ms: None,
1451            recv_window_ms: None,
1452            max_requests_per_second: None,
1453            max_requests_per_minute: None,
1454            health_check_interval_secs: 60,
1455            health_check_timeout_secs: 5,
1456            expected_reject_patterns: vec![],
1457            idempotent_success_patterns: vec![],
1458        };
1459
1460        let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1461
1462        // Increment a metric on original
1463        broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1464
1465        // Clone should share the same atomic
1466        let broadcaster2 = broadcaster1.clone_for_async();
1467        let metrics2 = broadcaster2.get_metrics_async().await;
1468
1469        assert_eq!(metrics2.total_cancels, 1); // Should see the increment
1470
1471        // Modify through clone
1472        broadcaster2
1473            .successful_cancels
1474            .fetch_add(5, Ordering::Relaxed);
1475
1476        // Original should see the change
1477        let metrics1 = broadcaster1.get_metrics_async().await;
1478        assert_eq!(metrics1.successful_cancels, 5);
1479    }
1480
1481    #[tokio::test]
1482    async fn test_pattern_matching() {
1483        // Test that pattern matching works for expected rejects and idempotent successes
1484        let config = CancelBroadcasterConfig {
1485            pool_size: 1,
1486            api_key: Some("test_key".to_string()),
1487            api_secret: Some("test_secret".to_string()),
1488            base_url: Some("https://test.example.com".to_string()),
1489            testnet: false,
1490            timeout_secs: Some(5),
1491            max_retries: None,
1492            retry_delay_ms: None,
1493            retry_delay_max_ms: None,
1494            recv_window_ms: None,
1495            max_requests_per_second: None,
1496            max_requests_per_minute: None,
1497            health_check_interval_secs: 60,
1498            health_check_timeout_secs: 5,
1499            expected_reject_patterns: vec![
1500                "ParticipateDoNotInitiate".to_string(),
1501                "Close-only".to_string(),
1502            ],
1503            idempotent_success_patterns: vec![
1504                "AlreadyCanceled".to_string(),
1505                "orderID not found".to_string(),
1506                "Unable to cancel".to_string(),
1507            ],
1508        };
1509
1510        let broadcaster = CancelBroadcaster::new(config).unwrap();
1511
1512        // Test expected reject patterns
1513        assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1514        assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1515        assert!(!broadcaster.is_expected_reject("Connection timeout"));
1516
1517        // Test idempotent success patterns
1518        assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1519        assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1520        assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1521        assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1522    }
1523
1524    // Happy-path coverage for broadcast_batch_cancel and broadcast_cancel_all
1525    // Note: These use simplified stubs since batch/cancel-all bypass test_handler
1526    // Full HTTP mocking tested in integration tests
1527    #[tokio::test]
1528    async fn test_broadcast_batch_cancel_structure() {
1529        // Validates broadcaster structure and metric initialization
1530        let config = CancelBroadcasterConfig {
1531            pool_size: 2,
1532            api_key: Some("test_key".to_string()),
1533            api_secret: Some("test_secret".to_string()),
1534            base_url: Some("https://test.example.com".to_string()),
1535            testnet: false,
1536            timeout_secs: Some(5),
1537            max_retries: None,
1538            retry_delay_ms: None,
1539            retry_delay_max_ms: None,
1540            recv_window_ms: None,
1541            max_requests_per_second: None,
1542            max_requests_per_minute: None,
1543            health_check_interval_secs: 60,
1544            health_check_timeout_secs: 5,
1545            expected_reject_patterns: vec![],
1546            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1547        };
1548
1549        let broadcaster = CancelBroadcaster::new(config).unwrap();
1550        let metrics = broadcaster.get_metrics_async().await;
1551
1552        // Verify initial state
1553        assert_eq!(metrics.total_clients, 2);
1554        assert_eq!(metrics.total_cancels, 0);
1555        assert_eq!(metrics.successful_cancels, 0);
1556        assert_eq!(metrics.failed_cancels, 0);
1557    }
1558
1559    #[tokio::test]
1560    async fn test_broadcast_cancel_all_structure() {
1561        // Validates broadcaster structure for cancel_all operations
1562        let config = CancelBroadcasterConfig {
1563            pool_size: 3,
1564            api_key: Some("test_key".to_string()),
1565            api_secret: Some("test_secret".to_string()),
1566            base_url: Some("https://test.example.com".to_string()),
1567            testnet: false,
1568            timeout_secs: Some(5),
1569            max_retries: None,
1570            retry_delay_ms: None,
1571            retry_delay_max_ms: None,
1572            recv_window_ms: None,
1573            max_requests_per_second: None,
1574            max_requests_per_minute: None,
1575            health_check_interval_secs: 60,
1576            health_check_timeout_secs: 5,
1577            expected_reject_patterns: vec![],
1578            idempotent_success_patterns: vec!["orderID not found".to_string()],
1579        };
1580
1581        let broadcaster = CancelBroadcaster::new(config).unwrap();
1582        let metrics = broadcaster.get_metrics_async().await;
1583
1584        // Verify pool size and initial metrics
1585        assert_eq!(metrics.total_clients, 3);
1586        assert_eq!(metrics.healthy_clients, 3);
1587        assert_eq!(metrics.total_cancels, 0);
1588    }
1589
1590    // Metric health tests - validates that idempotent successes don't increment failed_cancels
1591    #[tokio::test]
1592    async fn test_single_cancel_metrics_with_mixed_responses() {
1593        // Test similar to test_broadcast_cancel_mixed_idempotent_and_failure
1594        // but explicitly validates metric health
1595        let transports = vec![
1596            create_stub_transport("client-0", |_, _, _| async {
1597                anyhow::bail!("Connection timeout")
1598            }),
1599            create_stub_transport("client-1", |_, _, _| async {
1600                anyhow::bail!("AlreadyCanceled")
1601            }),
1602        ];
1603
1604        let config = CancelBroadcasterConfig::default();
1605        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1606
1607        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1608        let result = broadcaster
1609            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1610            .await;
1611
1612        // Should succeed with idempotent
1613        assert!(result.is_ok());
1614        assert!(result.unwrap().is_none());
1615
1616        // Verify metrics: idempotent success doesn't count as failure
1617        let metrics = broadcaster.get_metrics_async().await;
1618        assert_eq!(
1619            metrics.failed_cancels, 0,
1620            "Idempotent success should not increment failed_cancels"
1621        );
1622        assert_eq!(metrics.idempotent_successes, 1);
1623        assert_eq!(metrics.successful_cancels, 0);
1624    }
1625
1626    #[tokio::test]
1627    async fn test_metrics_initialization_and_health() {
1628        // Validates that metrics start at zero and clients start healthy
1629        let config = CancelBroadcasterConfig {
1630            pool_size: 4,
1631            api_key: Some("test_key".to_string()),
1632            api_secret: Some("test_secret".to_string()),
1633            base_url: Some("https://test.example.com".to_string()),
1634            testnet: false,
1635            timeout_secs: Some(5),
1636            max_retries: None,
1637            retry_delay_ms: None,
1638            retry_delay_max_ms: None,
1639            recv_window_ms: None,
1640            max_requests_per_second: None,
1641            max_requests_per_minute: None,
1642            health_check_interval_secs: 60,
1643            health_check_timeout_secs: 5,
1644            expected_reject_patterns: vec![],
1645            idempotent_success_patterns: vec![],
1646        };
1647
1648        let broadcaster = CancelBroadcaster::new(config).unwrap();
1649        let metrics = broadcaster.get_metrics_async().await;
1650
1651        // All metrics should start at zero
1652        assert_eq!(metrics.total_cancels, 0);
1653        assert_eq!(metrics.successful_cancels, 0);
1654        assert_eq!(metrics.failed_cancels, 0);
1655        assert_eq!(metrics.expected_rejects, 0);
1656        assert_eq!(metrics.idempotent_successes, 0);
1657
1658        // All clients should start healthy
1659        assert_eq!(metrics.healthy_clients, 4);
1660        assert_eq!(metrics.total_clients, 4);
1661    }
1662
1663    // Health-check task lifecycle test
1664    #[tokio::test]
1665    async fn test_health_check_task_lifecycle() {
1666        let config = CancelBroadcasterConfig {
1667            pool_size: 1,
1668            api_key: Some("test_key".to_string()),
1669            api_secret: Some("test_secret".to_string()),
1670            base_url: Some("https://test.example.com".to_string()),
1671            testnet: false,
1672            timeout_secs: Some(5),
1673            max_retries: None,
1674            retry_delay_ms: None,
1675            retry_delay_max_ms: None,
1676            recv_window_ms: None,
1677            max_requests_per_second: None,
1678            max_requests_per_minute: None,
1679            health_check_interval_secs: 1, // Very short interval
1680            health_check_timeout_secs: 1,
1681            expected_reject_patterns: vec![],
1682            idempotent_success_patterns: vec![],
1683        };
1684
1685        let broadcaster = CancelBroadcaster::new(config).unwrap();
1686
1687        // Start the broadcaster
1688        broadcaster.start().await.unwrap();
1689        assert!(broadcaster.running.load(Ordering::Relaxed));
1690
1691        // Verify task handle exists
1692        {
1693            let task_guard = broadcaster.health_check_task.read().await;
1694            assert!(task_guard.is_some());
1695        }
1696
1697        // Wait a bit for health check to potentially run
1698        tokio::time::sleep(Duration::from_millis(100)).await;
1699
1700        // Stop the broadcaster
1701        broadcaster.stop().await;
1702        assert!(!broadcaster.running.load(Ordering::Relaxed));
1703
1704        // Verify task handle has been cleared
1705        {
1706            let task_guard = broadcaster.health_check_task.read().await;
1707            assert!(task_guard.is_none());
1708        }
1709    }
1710}