nautilus_bitmex/execution/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Cancel request broadcaster for redundant order cancellation.
17//!
18//! This module provides the [`CancelBroadcaster`] which fans out cancel requests
19//! to multiple HTTP clients in parallel for redundancy. Key design patterns:
20//!
21//! - **Dependency injection via traits**: Uses `CancelExecutor` trait to abstract
22//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
23//! - **Trait objects over generics**: Uses `Arc<dyn CancelExecutor>` to avoid
24//!   generic type parameters on the public API (simpler Python FFI).
25//! - **Short-circuit on first success**: Aborts remaining requests once any client
26//!   succeeds, minimizing latency.
27//! - **Idempotent success handling**: Recognizes "already cancelled" responses as
28//!   successful outcomes.
29
30// TODO: Replace boxed futures in `CancelExecutor` once stable async trait object support
31// lands so we can drop the per-call heap allocation
32
33use std::{
34    fmt::Debug,
35    future::Future,
36    pin::Pin,
37    sync::{
38        Arc,
39        atomic::{AtomicBool, AtomicU64, Ordering},
40    },
41    time::Duration,
42};
43
44use futures_util::future;
45use nautilus_model::{
46    enums::OrderSide,
47    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
48    instruments::InstrumentAny,
49    reports::OrderStatusReport,
50};
51use tokio::{sync::RwLock, task::JoinHandle, time::interval};
52
53use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
54
55/// Trait for order cancellation operations.
56///
57/// This trait abstracts the execution layer to enable dependency injection and testing
58/// without conditional compilation. The broadcaster holds executors as `Arc<dyn CancelExecutor>`
59/// to avoid generic type parameters that would complicate the Python FFI boundary.
60///
61/// # Thread Safety
62///
63/// All methods must be safe to call concurrently from multiple threads. Implementations
64/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
65///
66/// # Error Handling
67///
68/// Methods return `anyhow::Result` for flexibility. Implementers should provide
69/// meaningful error messages that can be logged and tracked by the broadcaster.
70///
71/// # Implementation Note
72///
73/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
74/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
75/// `Clone`) to be used without modification.
76trait CancelExecutor: Send + Sync {
77    /// Adds an instrument for caching.
78    fn add_instrument(&self, instrument: InstrumentAny);
79
80    /// Performs a health check on the executor.
81    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
82
83    /// Cancels a single order.
84    fn cancel_order(
85        &self,
86        instrument_id: InstrumentId,
87        client_order_id: Option<ClientOrderId>,
88        venue_order_id: Option<VenueOrderId>,
89    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
90
91    /// Cancels multiple orders.
92    fn cancel_orders(
93        &self,
94        instrument_id: InstrumentId,
95        client_order_ids: Option<Vec<ClientOrderId>>,
96        venue_order_ids: Option<Vec<VenueOrderId>>,
97    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
98
99    /// Cancels all orders for an instrument.
100    fn cancel_all_orders(
101        &self,
102        instrument_id: InstrumentId,
103        order_side: Option<OrderSide>,
104    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
105}
106
107impl CancelExecutor for BitmexHttpClient {
108    fn add_instrument(&self, instrument: InstrumentAny) {
109        Self::cache_instrument(self, instrument);
110    }
111
112    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
113        Box::pin(async move {
114            Self::get_server_time(self)
115                .await
116                .map(|_| ())
117                .map_err(|e| anyhow::anyhow!("{e}"))
118        })
119    }
120
121    fn cancel_order(
122        &self,
123        instrument_id: InstrumentId,
124        client_order_id: Option<ClientOrderId>,
125        venue_order_id: Option<VenueOrderId>,
126    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
127        Box::pin(async move {
128            Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
129        })
130    }
131
132    fn cancel_orders(
133        &self,
134        instrument_id: InstrumentId,
135        client_order_ids: Option<Vec<ClientOrderId>>,
136        venue_order_ids: Option<Vec<VenueOrderId>>,
137    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
138        Box::pin(async move {
139            Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
140        })
141    }
142
143    fn cancel_all_orders(
144        &self,
145        instrument_id: InstrumentId,
146        order_side: Option<OrderSide>,
147    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
148        Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
149    }
150}
151
152/// Configuration for the cancel broadcaster.
153#[derive(Debug, Clone)]
154pub struct CancelBroadcasterConfig {
155    /// Number of HTTP clients in the pool.
156    pub pool_size: usize,
157    /// BitMEX API key (None will source from environment).
158    pub api_key: Option<String>,
159    /// BitMEX API secret (None will source from environment).
160    pub api_secret: Option<String>,
161    /// Base URL for BitMEX HTTP API.
162    pub base_url: Option<String>,
163    /// If connecting to BitMEX testnet.
164    pub testnet: bool,
165    /// Timeout in seconds for HTTP requests.
166    pub timeout_secs: Option<u64>,
167    /// Maximum number of retry attempts for failed requests.
168    pub max_retries: Option<u32>,
169    /// Initial delay in milliseconds between retry attempts.
170    pub retry_delay_ms: Option<u64>,
171    /// Maximum delay in milliseconds between retry attempts.
172    pub retry_delay_max_ms: Option<u64>,
173    /// Expiration window in milliseconds for signed requests.
174    pub recv_window_ms: Option<u64>,
175    /// Maximum REST burst rate (requests per second).
176    pub max_requests_per_second: Option<u32>,
177    /// Maximum REST rolling rate (requests per minute).
178    pub max_requests_per_minute: Option<u32>,
179    /// Interval in seconds between health check pings.
180    pub health_check_interval_secs: u64,
181    /// Timeout in seconds for health check requests.
182    pub health_check_timeout_secs: u64,
183    /// Substrings to identify expected cancel rejections for debug-level logging.
184    pub expected_reject_patterns: Vec<String>,
185    /// Substrings to identify idempotent success (order already cancelled/not found).
186    pub idempotent_success_patterns: Vec<String>,
187    /// Optional list of proxy URLs for path diversity.
188    ///
189    /// Each transport instance uses the proxy at its index. If the list is shorter
190    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
191    /// are ignored.
192    pub proxy_urls: Vec<Option<String>>,
193}
194
195impl Default for CancelBroadcasterConfig {
196    fn default() -> Self {
197        Self {
198            pool_size: 2,
199            api_key: None,
200            api_secret: None,
201            base_url: None,
202            testnet: false,
203            timeout_secs: Some(60),
204            max_retries: None,
205            retry_delay_ms: Some(1_000),
206            retry_delay_max_ms: Some(5_000),
207            recv_window_ms: Some(10_000),
208            max_requests_per_second: Some(10),
209            max_requests_per_minute: Some(120),
210            health_check_interval_secs: 30,
211            health_check_timeout_secs: 5,
212            expected_reject_patterns: vec![
213                r"Order had execInst of ParticipateDoNotInitiate".to_string(),
214            ],
215            idempotent_success_patterns: vec![
216                r"AlreadyCanceled".to_string(),
217                r"orderID not found".to_string(),
218                r"Unable to cancel order due to existing state".to_string(),
219            ],
220            proxy_urls: vec![],
221        }
222    }
223}
224
225/// Transport client wrapper with health monitoring.
226#[derive(Clone)]
227struct TransportClient {
228    /// Executor wrapped in Arc to enable cloning without requiring Clone on CancelExecutor.
229    ///
230    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
231    /// the executor across multiple TransportClient clones.
232    executor: Arc<dyn CancelExecutor>,
233    client_id: String,
234    healthy: Arc<AtomicBool>,
235    cancel_count: Arc<AtomicU64>,
236    error_count: Arc<AtomicU64>,
237}
238
239impl Debug for TransportClient {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("TransportClient")
242            .field("client_id", &self.client_id)
243            .field("healthy", &self.healthy)
244            .field("cancel_count", &self.cancel_count)
245            .field("error_count", &self.error_count)
246            .finish()
247    }
248}
249
250impl TransportClient {
251    fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
252        Self {
253            executor: Arc::new(executor),
254            client_id,
255            healthy: Arc::new(AtomicBool::new(true)),
256            cancel_count: Arc::new(AtomicU64::new(0)),
257            error_count: Arc::new(AtomicU64::new(0)),
258        }
259    }
260
261    fn is_healthy(&self) -> bool {
262        self.healthy.load(Ordering::Relaxed)
263    }
264
265    fn mark_healthy(&self) {
266        self.healthy.store(true, Ordering::Relaxed);
267    }
268
269    fn mark_unhealthy(&self) {
270        self.healthy.store(false, Ordering::Relaxed);
271    }
272
273    fn get_cancel_count(&self) -> u64 {
274        self.cancel_count.load(Ordering::Relaxed)
275    }
276
277    fn get_error_count(&self) -> u64 {
278        self.error_count.load(Ordering::Relaxed)
279    }
280
281    async fn health_check(&self, timeout_secs: u64) -> bool {
282        match tokio::time::timeout(
283            Duration::from_secs(timeout_secs),
284            self.executor.health_check(),
285        )
286        .await
287        {
288            Ok(Ok(_)) => {
289                self.mark_healthy();
290                true
291            }
292            Ok(Err(e)) => {
293                tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
294                self.mark_unhealthy();
295                false
296            }
297            Err(_) => {
298                tracing::warn!("Health check timeout for client {}", self.client_id);
299                self.mark_unhealthy();
300                false
301            }
302        }
303    }
304
305    async fn cancel_order(
306        &self,
307        instrument_id: InstrumentId,
308        client_order_id: Option<ClientOrderId>,
309        venue_order_id: Option<VenueOrderId>,
310    ) -> anyhow::Result<OrderStatusReport> {
311        self.cancel_count.fetch_add(1, Ordering::Relaxed);
312
313        match self
314            .executor
315            .cancel_order(instrument_id, client_order_id, venue_order_id)
316            .await
317        {
318            Ok(report) => {
319                self.mark_healthy();
320                Ok(report)
321            }
322            Err(e) => {
323                self.error_count.fetch_add(1, Ordering::Relaxed);
324                Err(e)
325            }
326        }
327    }
328}
329
330/// Broadcasts cancel requests to multiple HTTP clients for redundancy.
331///
332/// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
333/// in parallel, short-circuits when the first successful acknowledgement is received,
334/// and handles expected rejection patterns with appropriate log levels.
335#[cfg_attr(feature = "python", pyo3::pyclass)]
336#[derive(Debug)]
337pub struct CancelBroadcaster {
338    config: CancelBroadcasterConfig,
339    transports: Arc<Vec<TransportClient>>,
340    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
341    running: Arc<AtomicBool>,
342    total_cancels: Arc<AtomicU64>,
343    successful_cancels: Arc<AtomicU64>,
344    failed_cancels: Arc<AtomicU64>,
345    expected_rejects: Arc<AtomicU64>,
346    idempotent_successes: Arc<AtomicU64>,
347}
348
349impl CancelBroadcaster {
350    /// Creates a new [`CancelBroadcaster`] with internal HTTP client pool.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if any HTTP client fails to initialize.
355    pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
356        let mut transports = Vec::with_capacity(config.pool_size);
357
358        // Synthesize base_url when testnet is true but base_url is None
359        let base_url = if config.testnet && config.base_url.is_none() {
360            Some(BITMEX_HTTP_TESTNET_URL.to_string())
361        } else {
362            config.base_url.clone()
363        };
364
365        for i in 0..config.pool_size {
366            // Assign proxy from config list, or None if index exceeds list length
367            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
368
369            let client = BitmexHttpClient::with_credentials(
370                config.api_key.clone(),
371                config.api_secret.clone(),
372                base_url.clone(),
373                config.timeout_secs,
374                config.max_retries,
375                config.retry_delay_ms,
376                config.retry_delay_max_ms,
377                config.recv_window_ms,
378                config.max_requests_per_second,
379                config.max_requests_per_minute,
380                proxy_url,
381            )
382            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
383
384            transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
385        }
386
387        Ok(Self {
388            config,
389            transports: Arc::new(transports),
390            health_check_task: Arc::new(RwLock::new(None)),
391            running: Arc::new(AtomicBool::new(false)),
392            total_cancels: Arc::new(AtomicU64::new(0)),
393            successful_cancels: Arc::new(AtomicU64::new(0)),
394            failed_cancels: Arc::new(AtomicU64::new(0)),
395            expected_rejects: Arc::new(AtomicU64::new(0)),
396            idempotent_successes: Arc::new(AtomicU64::new(0)),
397        })
398    }
399
400    /// Starts the broadcaster and health check loop.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the broadcaster is already running.
405    pub async fn start(&self) -> anyhow::Result<()> {
406        if self.running.load(Ordering::Relaxed) {
407            return Ok(());
408        }
409
410        self.running.store(true, Ordering::Relaxed);
411
412        // Initial health check for all clients
413        self.run_health_checks().await;
414
415        // Start periodic health check task
416        let transports = Arc::clone(&self.transports);
417        let running = Arc::clone(&self.running);
418        let interval_secs = self.config.health_check_interval_secs;
419        let timeout_secs = self.config.health_check_timeout_secs;
420
421        let task = tokio::spawn(async move {
422            let mut ticker = interval(Duration::from_secs(interval_secs));
423            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
424
425            loop {
426                ticker.tick().await;
427
428                if !running.load(Ordering::Relaxed) {
429                    break;
430                }
431
432                let tasks: Vec<_> = transports
433                    .iter()
434                    .map(|t| t.health_check(timeout_secs))
435                    .collect();
436
437                let results = future::join_all(tasks).await;
438                let healthy_count = results.iter().filter(|&&r| r).count();
439
440                tracing::debug!(
441                    "Health check complete: {}/{} clients healthy",
442                    healthy_count,
443                    results.len()
444                );
445            }
446        });
447
448        *self.health_check_task.write().await = Some(task);
449
450        tracing::info!(
451            "CancelBroadcaster started with {} clients",
452            self.transports.len()
453        );
454
455        Ok(())
456    }
457
458    /// Stops the broadcaster and health check loop.
459    pub async fn stop(&self) {
460        if !self.running.load(Ordering::Relaxed) {
461            return;
462        }
463
464        self.running.store(false, Ordering::Relaxed);
465
466        if let Some(task) = self.health_check_task.write().await.take() {
467            task.abort();
468        }
469
470        tracing::info!("CancelBroadcaster stopped");
471    }
472
473    async fn run_health_checks(&self) {
474        let tasks: Vec<_> = self
475            .transports
476            .iter()
477            .map(|t| t.health_check(self.config.health_check_timeout_secs))
478            .collect();
479
480        let results = future::join_all(tasks).await;
481        let healthy_count = results.iter().filter(|&&r| r).count();
482
483        tracing::debug!(
484            "Health check complete: {}/{} clients healthy",
485            healthy_count,
486            results.len()
487        );
488    }
489
490    fn is_expected_reject(&self, error_message: &str) -> bool {
491        self.config
492            .expected_reject_patterns
493            .iter()
494            .any(|pattern| error_message.contains(pattern))
495    }
496
497    fn is_idempotent_success(&self, error_message: &str) -> bool {
498        self.config
499            .idempotent_success_patterns
500            .iter()
501            .any(|pattern| error_message.contains(pattern))
502    }
503
504    /// Processes cancel request results, handling success, idempotent success, and failures.
505    ///
506    /// This helper consolidates the common error handling loop used across all broadcast methods.
507    async fn process_cancel_results<T>(
508        &self,
509        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
510        idempotent_result: impl FnOnce() -> anyhow::Result<T>,
511        operation: &str,
512        params: String,
513        idempotent_reason: &str,
514    ) -> anyhow::Result<T>
515    where
516        T: Send + 'static,
517    {
518        let mut errors = Vec::new();
519
520        while !handles.is_empty() {
521            let current_handles = std::mem::take(&mut handles);
522            let (result, _idx, remaining) = future::select_all(current_handles).await;
523            handles = remaining.into_iter().collect();
524
525            match result {
526                Ok((client_id, Ok(result))) => {
527                    // First success - abort remaining handles
528                    for handle in &handles {
529                        handle.abort();
530                    }
531                    self.successful_cancels.fetch_add(1, Ordering::Relaxed);
532
533                    tracing::debug!(
534                        "{} broadcast succeeded [{}] {}",
535                        operation,
536                        client_id,
537                        params
538                    );
539
540                    return Ok(result);
541                }
542                Ok((client_id, Err(e))) => {
543                    let error_msg = e.to_string();
544
545                    if self.is_idempotent_success(&error_msg) {
546                        // First idempotent success - abort remaining handles and return success
547                        for handle in &handles {
548                            handle.abort();
549                        }
550                        self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
551
552                        tracing::debug!(
553                            "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
554                        );
555
556                        return idempotent_result();
557                    }
558
559                    if self.is_expected_reject(&error_msg) {
560                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
561                        tracing::debug!(
562                            "Expected {} rejection [{}]: {} {}",
563                            operation.to_lowercase(),
564                            client_id,
565                            error_msg,
566                            params
567                        );
568                        errors.push(error_msg);
569                    } else {
570                        tracing::warn!(
571                            "{} request failed [{}]: {} {}",
572                            operation,
573                            client_id,
574                            error_msg,
575                            params
576                        );
577                        errors.push(error_msg);
578                    }
579                }
580                Err(e) => {
581                    tracing::warn!("{} task join error: {e:?}", operation);
582                    errors.push(format!("Task panicked: {e:?}"));
583                }
584            }
585        }
586
587        // All tasks failed
588        self.failed_cancels.fetch_add(1, Ordering::Relaxed);
589        tracing::error!(
590            "All {} requests failed: {errors:?} {params}",
591            operation.to_lowercase(),
592        );
593        Err(anyhow::anyhow!(
594            "All {} requests failed: {errors:?}",
595            operation.to_lowercase(),
596        ))
597    }
598
599    /// Broadcasts a single cancel request to all healthy clients in parallel.
600    ///
601    /// # Returns
602    ///
603    /// - `Ok(Some(report))` if successfully cancelled with a report.
604    /// - `Ok(None)` if the order was already cancelled (idempotent success).
605    /// - `Err` if all requests failed.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if all cancel requests fail or no healthy clients are available.
610    pub async fn broadcast_cancel(
611        &self,
612        instrument_id: InstrumentId,
613        client_order_id: Option<ClientOrderId>,
614        venue_order_id: Option<VenueOrderId>,
615    ) -> anyhow::Result<Option<OrderStatusReport>> {
616        self.total_cancels.fetch_add(1, Ordering::Relaxed);
617
618        let healthy_transports: Vec<TransportClient> = self
619            .transports
620            .iter()
621            .filter(|t| t.is_healthy())
622            .cloned()
623            .collect();
624
625        if healthy_transports.is_empty() {
626            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
627            anyhow::bail!("No healthy transport clients available");
628        }
629
630        let mut handles = Vec::new();
631        for transport in healthy_transports {
632            let handle = tokio::spawn(async move {
633                let client_id = transport.client_id.clone();
634                let result = transport
635                    .cancel_order(instrument_id, client_order_id, venue_order_id)
636                    .await
637                    .map(Some); // Wrap success in Some for Option<OrderStatusReport>
638                (client_id, result)
639            });
640            handles.push(handle);
641        }
642
643        self.process_cancel_results(
644            handles,
645            || Ok(None),
646            "Cancel",
647            format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
648            "order already cancelled/not found",
649        )
650        .await
651    }
652
653    /// Broadcasts a batch cancel request to all healthy clients in parallel.
654    ///
655    /// # Errors
656    ///
657    /// Returns an error if all cancel requests fail or no healthy clients are available.
658    pub async fn broadcast_batch_cancel(
659        &self,
660        instrument_id: InstrumentId,
661        client_order_ids: Option<Vec<ClientOrderId>>,
662        venue_order_ids: Option<Vec<VenueOrderId>>,
663    ) -> anyhow::Result<Vec<OrderStatusReport>> {
664        self.total_cancels.fetch_add(1, Ordering::Relaxed);
665
666        let healthy_transports: Vec<TransportClient> = self
667            .transports
668            .iter()
669            .filter(|t| t.is_healthy())
670            .cloned()
671            .collect();
672
673        if healthy_transports.is_empty() {
674            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
675            anyhow::bail!("No healthy transport clients available");
676        }
677
678        let mut handles = Vec::new();
679
680        for transport in healthy_transports {
681            let client_order_ids_clone = client_order_ids.clone();
682            let venue_order_ids_clone = venue_order_ids.clone();
683            let handle = tokio::spawn(async move {
684                let client_id = transport.client_id.clone();
685                let result = transport
686                    .executor
687                    .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
688                    .await;
689                (client_id, result)
690            });
691            handles.push(handle);
692        }
693
694        self.process_cancel_results(
695            handles,
696            || Ok(Vec::new()),
697            "Batch cancel",
698            format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
699            "orders already cancelled/not found",
700        )
701        .await
702    }
703
704    /// Broadcasts a cancel all request to all healthy clients in parallel.
705    ///
706    /// # Errors
707    ///
708    /// Returns an error if all cancel requests fail or no healthy clients are available.
709    pub async fn broadcast_cancel_all(
710        &self,
711        instrument_id: InstrumentId,
712        order_side: Option<OrderSide>,
713    ) -> anyhow::Result<Vec<OrderStatusReport>> {
714        self.total_cancels.fetch_add(1, Ordering::Relaxed);
715
716        let healthy_transports: Vec<TransportClient> = self
717            .transports
718            .iter()
719            .filter(|t| t.is_healthy())
720            .cloned()
721            .collect();
722
723        if healthy_transports.is_empty() {
724            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
725            anyhow::bail!("No healthy transport clients available");
726        }
727
728        let mut handles = Vec::new();
729        for transport in healthy_transports {
730            let handle = tokio::spawn(async move {
731                let client_id = transport.client_id.clone();
732                let result = transport
733                    .executor
734                    .cancel_all_orders(instrument_id, order_side)
735                    .await;
736                (client_id, result)
737            });
738            handles.push(handle);
739        }
740
741        self.process_cancel_results(
742            handles,
743            || Ok(Vec::new()),
744            "Cancel all",
745            format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
746            "no orders to cancel",
747        )
748        .await
749    }
750
751    /// Gets broadcaster metrics.
752    pub fn get_metrics(&self) -> BroadcasterMetrics {
753        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
754        let total_clients = self.transports.len();
755
756        BroadcasterMetrics {
757            total_cancels: self.total_cancels.load(Ordering::Relaxed),
758            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
759            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
760            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
761            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
762            healthy_clients,
763            total_clients,
764        }
765    }
766
767    /// Gets broadcaster metrics (async version for use within async context).
768    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
769        self.get_metrics()
770    }
771
772    /// Gets per-client statistics.
773    pub fn get_client_stats(&self) -> Vec<ClientStats> {
774        self.transports
775            .iter()
776            .map(|t| ClientStats {
777                client_id: t.client_id.clone(),
778                healthy: t.is_healthy(),
779                cancel_count: t.get_cancel_count(),
780                error_count: t.get_error_count(),
781            })
782            .collect()
783    }
784
785    /// Gets per-client statistics (async version for use within async context).
786    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
787        self.get_client_stats()
788    }
789
790    /// Caches an instrument in all HTTP clients in the pool.
791    pub fn cache_instrument(&self, instrument: InstrumentAny) {
792        for transport in self.transports.iter() {
793            transport.executor.add_instrument(instrument.clone());
794        }
795    }
796
797    #[must_use]
798    pub fn clone_for_async(&self) -> Self {
799        Self {
800            config: self.config.clone(),
801            transports: Arc::clone(&self.transports),
802            health_check_task: Arc::clone(&self.health_check_task),
803            running: Arc::clone(&self.running),
804            total_cancels: Arc::clone(&self.total_cancels),
805            successful_cancels: Arc::clone(&self.successful_cancels),
806            failed_cancels: Arc::clone(&self.failed_cancels),
807            expected_rejects: Arc::clone(&self.expected_rejects),
808            idempotent_successes: Arc::clone(&self.idempotent_successes),
809        }
810    }
811
812    #[cfg(test)]
813    fn new_with_transports(
814        config: CancelBroadcasterConfig,
815        transports: Vec<TransportClient>,
816    ) -> Self {
817        Self {
818            config,
819            transports: Arc::new(transports),
820            health_check_task: Arc::new(RwLock::new(None)),
821            running: Arc::new(AtomicBool::new(false)),
822            total_cancels: Arc::new(AtomicU64::new(0)),
823            successful_cancels: Arc::new(AtomicU64::new(0)),
824            failed_cancels: Arc::new(AtomicU64::new(0)),
825            expected_rejects: Arc::new(AtomicU64::new(0)),
826            idempotent_successes: Arc::new(AtomicU64::new(0)),
827        }
828    }
829}
830
831/// Broadcaster metrics snapshot.
832#[derive(Debug, Clone)]
833pub struct BroadcasterMetrics {
834    pub total_cancels: u64,
835    pub successful_cancels: u64,
836    pub failed_cancels: u64,
837    pub expected_rejects: u64,
838    pub idempotent_successes: u64,
839    pub healthy_clients: usize,
840    pub total_clients: usize,
841}
842
843/// Per-client statistics.
844#[derive(Debug, Clone)]
845pub struct ClientStats {
846    pub client_id: String,
847    pub healthy: bool,
848    pub cancel_count: u64,
849    pub error_count: u64,
850}
851
852#[cfg(test)]
853mod tests {
854    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
855
856    use nautilus_core::UUID4;
857    use nautilus_model::{
858        enums::{
859            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
860        },
861        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
862        reports::OrderStatusReport,
863        types::{Price, Quantity},
864    };
865
866    use super::*;
867
868    /// Mock executor for testing.
869    #[derive(Clone)]
870    #[allow(clippy::type_complexity)]
871    struct MockExecutor {
872        handler: Arc<
873            dyn Fn(
874                    InstrumentId,
875                    Option<ClientOrderId>,
876                    Option<VenueOrderId>,
877                )
878                    -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
879                + Send
880                + Sync,
881        >,
882    }
883
884    impl MockExecutor {
885        fn new<F, Fut>(handler: F) -> Self
886        where
887            F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
888                + Send
889                + Sync
890                + 'static,
891            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
892        {
893            Self {
894                handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
895            }
896        }
897    }
898
899    impl CancelExecutor for MockExecutor {
900        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
901            Box::pin(async { Ok(()) })
902        }
903
904        fn cancel_order(
905            &self,
906            instrument_id: InstrumentId,
907            client_order_id: Option<ClientOrderId>,
908            venue_order_id: Option<VenueOrderId>,
909        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
910            (self.handler)(instrument_id, client_order_id, venue_order_id)
911        }
912
913        fn cancel_orders(
914            &self,
915            _instrument_id: InstrumentId,
916            _client_order_ids: Option<Vec<ClientOrderId>>,
917            _venue_order_ids: Option<Vec<VenueOrderId>>,
918        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
919        {
920            Box::pin(async { Ok(Vec::new()) })
921        }
922
923        fn cancel_all_orders(
924            &self,
925            instrument_id: InstrumentId,
926            _order_side: Option<OrderSide>,
927        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
928        {
929            // Try to get result from the single-order handler to propagate errors
930            let handler = Arc::clone(&self.handler);
931            Box::pin(async move {
932                // Call the handler to check if it would fail
933                let result = handler(instrument_id, None, None).await;
934                match result {
935                    Ok(_) => Ok(Vec::new()),
936                    Err(e) => Err(e),
937                }
938            })
939        }
940
941        fn add_instrument(&self, _instrument: InstrumentAny) {
942            // No-op for mock
943        }
944    }
945
946    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
947        OrderStatusReport {
948            account_id: AccountId::from("BITMEX-001"),
949            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
950            venue_order_id: VenueOrderId::from(venue_order_id),
951            order_side: OrderSide::Buy,
952            order_type: OrderType::Limit,
953            time_in_force: TimeInForce::Gtc,
954            order_status: OrderStatus::Canceled,
955            price: Some(Price::new(50000.0, 2)),
956            quantity: Quantity::new(100.0, 0),
957            filled_qty: Quantity::new(0.0, 0),
958            report_id: UUID4::new(),
959            ts_accepted: 0.into(),
960            ts_last: 0.into(),
961            ts_init: 0.into(),
962            client_order_id: None,
963            avg_px: None,
964            trigger_price: None,
965            trigger_type: None,
966            contingency_type: ContingencyType::NoContingency,
967            expire_time: None,
968            order_list_id: None,
969            venue_position_id: None,
970            linked_order_ids: None,
971            parent_order_id: None,
972            display_qty: None,
973            limit_offset: None,
974            trailing_offset: None,
975            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
976            post_only: false,
977            reduce_only: false,
978            cancel_reason: None,
979            ts_triggered: None,
980        }
981    }
982
983    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
984    where
985        F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
986            + Send
987            + Sync
988            + 'static,
989        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
990    {
991        let executor = MockExecutor::new(handler);
992        TransportClient::new(executor, client_id.to_string())
993    }
994
995    #[tokio::test]
996    async fn test_broadcast_cancel_immediate_success() {
997        let report = create_test_report("ORDER-1");
998        let report_clone = report.clone();
999
1000        let transports = vec![
1001            create_stub_transport("client-0", move |_, _, _| {
1002                let report = report_clone.clone();
1003                async move { Ok(report) }
1004            }),
1005            create_stub_transport("client-1", |_, _, _| async {
1006                tokio::time::sleep(Duration::from_secs(10)).await;
1007                anyhow::bail!("Should be aborted")
1008            }),
1009        ];
1010
1011        let config = CancelBroadcasterConfig::default();
1012        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1013
1014        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1015        let result = broadcaster
1016            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1017            .await;
1018
1019        assert!(result.is_ok());
1020        let returned_report = result.unwrap();
1021        assert!(returned_report.is_some());
1022        assert_eq!(
1023            returned_report.unwrap().venue_order_id,
1024            report.venue_order_id
1025        );
1026
1027        let metrics = broadcaster.get_metrics_async().await;
1028        assert_eq!(metrics.successful_cancels, 1);
1029        assert_eq!(metrics.failed_cancels, 0);
1030        assert_eq!(metrics.total_cancels, 1);
1031    }
1032
1033    #[tokio::test]
1034    async fn test_broadcast_cancel_idempotent_success() {
1035        let transports = vec![
1036            create_stub_transport("client-0", |_, _, _| async {
1037                anyhow::bail!("AlreadyCanceled")
1038            }),
1039            create_stub_transport("client-1", |_, _, _| async {
1040                tokio::time::sleep(Duration::from_secs(10)).await;
1041                anyhow::bail!("Should be aborted")
1042            }),
1043        ];
1044
1045        let config = CancelBroadcasterConfig::default();
1046        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1047
1048        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1049        let result = broadcaster
1050            .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1051            .await;
1052
1053        assert!(result.is_ok());
1054        assert!(result.unwrap().is_none());
1055
1056        let metrics = broadcaster.get_metrics_async().await;
1057        assert_eq!(metrics.idempotent_successes, 1);
1058        assert_eq!(metrics.successful_cancels, 0);
1059        assert_eq!(metrics.failed_cancels, 0);
1060    }
1061
1062    #[tokio::test]
1063    async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1064        let transports = vec![
1065            create_stub_transport("client-0", |_, _, _| async {
1066                anyhow::bail!("502 Bad Gateway")
1067            }),
1068            create_stub_transport("client-1", |_, _, _| async {
1069                anyhow::bail!("orderID not found")
1070            }),
1071        ];
1072
1073        let config = CancelBroadcasterConfig::default();
1074        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1075
1076        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1077        let result = broadcaster
1078            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1079            .await;
1080
1081        assert!(result.is_ok());
1082        assert!(result.unwrap().is_none());
1083
1084        let metrics = broadcaster.get_metrics_async().await;
1085        assert_eq!(metrics.idempotent_successes, 1);
1086        assert_eq!(metrics.failed_cancels, 0);
1087    }
1088
1089    #[tokio::test]
1090    async fn test_broadcast_cancel_all_failures() {
1091        let transports = vec![
1092            create_stub_transport("client-0", |_, _, _| async {
1093                anyhow::bail!("502 Bad Gateway")
1094            }),
1095            create_stub_transport("client-1", |_, _, _| async {
1096                anyhow::bail!("Connection refused")
1097            }),
1098        ];
1099
1100        let config = CancelBroadcasterConfig::default();
1101        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1102
1103        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1104        let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1105
1106        assert!(result.is_err());
1107        assert!(
1108            result
1109                .unwrap_err()
1110                .to_string()
1111                .contains("All cancel all requests failed")
1112        );
1113
1114        let metrics = broadcaster.get_metrics_async().await;
1115        assert_eq!(metrics.failed_cancels, 1);
1116        assert_eq!(metrics.successful_cancels, 0);
1117        assert_eq!(metrics.idempotent_successes, 0);
1118    }
1119
1120    #[tokio::test]
1121    async fn test_broadcast_cancel_no_healthy_clients() {
1122        let transport = create_stub_transport("client-0", |_, _, _| async {
1123            Ok(create_test_report("ORDER-1"))
1124        });
1125        transport.healthy.store(false, Ordering::Relaxed);
1126
1127        let config = CancelBroadcasterConfig::default();
1128        let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1129
1130        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1131        let result = broadcaster
1132            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1133            .await;
1134
1135        assert!(result.is_err());
1136        assert!(
1137            result
1138                .unwrap_err()
1139                .to_string()
1140                .contains("No healthy transport clients available")
1141        );
1142
1143        let metrics = broadcaster.get_metrics_async().await;
1144        assert_eq!(metrics.failed_cancels, 1);
1145    }
1146
1147    #[tokio::test]
1148    async fn test_broadcast_cancel_metrics_increment() {
1149        let report1 = create_test_report("ORDER-1");
1150        let report1_clone = report1.clone();
1151        let report2 = create_test_report("ORDER-2");
1152        let report2_clone = report2.clone();
1153
1154        let transports = vec![
1155            create_stub_transport("client-0", move |_, _, _| {
1156                let report = report1_clone.clone();
1157                async move { Ok(report) }
1158            }),
1159            create_stub_transport("client-1", move |_, _, _| {
1160                let report = report2_clone.clone();
1161                async move { Ok(report) }
1162            }),
1163        ];
1164
1165        let config = CancelBroadcasterConfig::default();
1166        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1167
1168        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1169
1170        let _ = broadcaster
1171            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1172            .await;
1173
1174        let _ = broadcaster
1175            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1176            .await;
1177
1178        let metrics = broadcaster.get_metrics_async().await;
1179        assert_eq!(metrics.total_cancels, 2);
1180        assert_eq!(metrics.successful_cancels, 2);
1181    }
1182
1183    #[tokio::test]
1184    async fn test_broadcast_cancel_expected_reject_pattern() {
1185        let transports = vec![
1186            create_stub_transport("client-0", |_, _, _| async {
1187                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1188            }),
1189            create_stub_transport("client-1", |_, _, _| async {
1190                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1191            }),
1192        ];
1193
1194        let config = CancelBroadcasterConfig::default();
1195        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1196
1197        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1198        let result = broadcaster
1199            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1200            .await;
1201
1202        assert!(result.is_err());
1203
1204        let metrics = broadcaster.get_metrics_async().await;
1205        assert_eq!(metrics.expected_rejects, 2);
1206        assert_eq!(metrics.failed_cancels, 1);
1207    }
1208
1209    #[tokio::test]
1210    async fn test_broadcaster_creation_with_pool() {
1211        let config = CancelBroadcasterConfig {
1212            pool_size: 3,
1213            api_key: Some("test_key".to_string()),
1214            api_secret: Some("test_secret".to_string()),
1215            base_url: Some("https://test.example.com".to_string()),
1216            testnet: false,
1217            timeout_secs: Some(5),
1218            max_retries: Some(2),
1219            retry_delay_ms: Some(100),
1220            retry_delay_max_ms: Some(1000),
1221            recv_window_ms: Some(5000),
1222            max_requests_per_second: Some(10),
1223            max_requests_per_minute: Some(100),
1224            health_check_interval_secs: 30,
1225            health_check_timeout_secs: 5,
1226            expected_reject_patterns: vec!["test_pattern".to_string()],
1227            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1228            proxy_urls: vec![],
1229        };
1230
1231        let broadcaster = CancelBroadcaster::new(config.clone());
1232        assert!(broadcaster.is_ok());
1233
1234        let broadcaster = broadcaster.unwrap();
1235        let metrics = broadcaster.get_metrics_async().await;
1236
1237        assert_eq!(metrics.total_clients, 3);
1238        assert_eq!(metrics.total_cancels, 0);
1239        assert_eq!(metrics.successful_cancels, 0);
1240        assert_eq!(metrics.failed_cancels, 0);
1241    }
1242
1243    #[tokio::test]
1244    async fn test_broadcaster_lifecycle() {
1245        let config = CancelBroadcasterConfig {
1246            pool_size: 2,
1247            api_key: Some("test_key".to_string()),
1248            api_secret: Some("test_secret".to_string()),
1249            base_url: Some("https://test.example.com".to_string()),
1250            testnet: false,
1251            timeout_secs: Some(5),
1252            max_retries: None,
1253            retry_delay_ms: None,
1254            retry_delay_max_ms: None,
1255            recv_window_ms: None,
1256            max_requests_per_second: None,
1257            max_requests_per_minute: None,
1258            health_check_interval_secs: 60, // Long interval so it won't tick during test
1259            health_check_timeout_secs: 1,
1260            expected_reject_patterns: vec![],
1261            idempotent_success_patterns: vec![],
1262            proxy_urls: vec![],
1263        };
1264
1265        let broadcaster = CancelBroadcaster::new(config).unwrap();
1266
1267        // Should not be running initially
1268        assert!(!broadcaster.running.load(Ordering::Relaxed));
1269
1270        // Start broadcaster
1271        let start_result = broadcaster.start().await;
1272        assert!(start_result.is_ok());
1273        assert!(broadcaster.running.load(Ordering::Relaxed));
1274
1275        // Starting again should be idempotent
1276        let start_again = broadcaster.start().await;
1277        assert!(start_again.is_ok());
1278
1279        // Stop broadcaster
1280        broadcaster.stop().await;
1281        assert!(!broadcaster.running.load(Ordering::Relaxed));
1282
1283        // Stopping again should be safe
1284        broadcaster.stop().await;
1285        assert!(!broadcaster.running.load(Ordering::Relaxed));
1286    }
1287
1288    #[tokio::test]
1289    async fn test_client_stats_collection() {
1290        let config = CancelBroadcasterConfig {
1291            pool_size: 2,
1292            api_key: Some("test_key".to_string()),
1293            api_secret: Some("test_secret".to_string()),
1294            base_url: Some("https://test.example.com".to_string()),
1295            testnet: false,
1296            timeout_secs: Some(5),
1297            max_retries: None,
1298            retry_delay_ms: None,
1299            retry_delay_max_ms: None,
1300            recv_window_ms: None,
1301            max_requests_per_second: None,
1302            max_requests_per_minute: None,
1303            health_check_interval_secs: 60,
1304            health_check_timeout_secs: 5,
1305            expected_reject_patterns: vec![],
1306            idempotent_success_patterns: vec![],
1307            proxy_urls: vec![],
1308        };
1309
1310        let broadcaster = CancelBroadcaster::new(config).unwrap();
1311        let stats = broadcaster.get_client_stats_async().await;
1312
1313        assert_eq!(stats.len(), 2);
1314        assert_eq!(stats[0].client_id, "bitmex-cancel-0");
1315        assert_eq!(stats[1].client_id, "bitmex-cancel-1");
1316        assert!(stats[0].healthy); // Should be healthy initially
1317        assert!(stats[1].healthy);
1318        assert_eq!(stats[0].cancel_count, 0);
1319        assert_eq!(stats[1].cancel_count, 0);
1320        assert_eq!(stats[0].error_count, 0);
1321        assert_eq!(stats[1].error_count, 0);
1322    }
1323
1324    #[tokio::test]
1325    async fn test_testnet_config_sets_base_url() {
1326        let config = CancelBroadcasterConfig {
1327            pool_size: 1,
1328            api_key: Some("test_key".to_string()),
1329            api_secret: Some("test_secret".to_string()),
1330            base_url: None, // Not specified
1331            testnet: true,  // But testnet is true
1332            timeout_secs: Some(5),
1333            max_retries: None,
1334            retry_delay_ms: None,
1335            retry_delay_max_ms: None,
1336            recv_window_ms: None,
1337            max_requests_per_second: None,
1338            max_requests_per_minute: None,
1339            health_check_interval_secs: 60,
1340            health_check_timeout_secs: 5,
1341            expected_reject_patterns: vec![],
1342            idempotent_success_patterns: vec![],
1343            proxy_urls: vec![],
1344        };
1345
1346        let broadcaster = CancelBroadcaster::new(config);
1347        assert!(broadcaster.is_ok());
1348    }
1349
1350    #[tokio::test]
1351    async fn test_default_config() {
1352        let config = CancelBroadcasterConfig {
1353            api_key: Some("test_key".to_string()),
1354            api_secret: Some("test_secret".to_string()),
1355            base_url: Some("https://test.example.com".to_string()),
1356            ..Default::default()
1357        };
1358
1359        let broadcaster = CancelBroadcaster::new(config);
1360        assert!(broadcaster.is_ok());
1361
1362        let broadcaster = broadcaster.unwrap();
1363        let metrics = broadcaster.get_metrics_async().await;
1364
1365        // Default pool_size is 2
1366        assert_eq!(metrics.total_clients, 2);
1367    }
1368
1369    #[tokio::test]
1370    async fn test_clone_for_async() {
1371        let config = CancelBroadcasterConfig {
1372            pool_size: 1,
1373            api_key: Some("test_key".to_string()),
1374            api_secret: Some("test_secret".to_string()),
1375            base_url: Some("https://test.example.com".to_string()),
1376            testnet: false,
1377            timeout_secs: Some(5),
1378            max_retries: None,
1379            retry_delay_ms: None,
1380            retry_delay_max_ms: None,
1381            recv_window_ms: None,
1382            max_requests_per_second: None,
1383            max_requests_per_minute: None,
1384            health_check_interval_secs: 60,
1385            health_check_timeout_secs: 5,
1386            expected_reject_patterns: vec![],
1387            idempotent_success_patterns: vec![],
1388            proxy_urls: vec![],
1389        };
1390
1391        let broadcaster1 = CancelBroadcaster::new(config).unwrap();
1392
1393        // Increment a metric on original
1394        broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1395
1396        // Clone should share the same atomic
1397        let broadcaster2 = broadcaster1.clone_for_async();
1398        let metrics2 = broadcaster2.get_metrics_async().await;
1399
1400        assert_eq!(metrics2.total_cancels, 1); // Should see the increment
1401
1402        // Modify through clone
1403        broadcaster2
1404            .successful_cancels
1405            .fetch_add(5, Ordering::Relaxed);
1406
1407        // Original should see the change
1408        let metrics1 = broadcaster1.get_metrics_async().await;
1409        assert_eq!(metrics1.successful_cancels, 5);
1410    }
1411
1412    #[tokio::test]
1413    async fn test_pattern_matching() {
1414        // Test that pattern matching works for expected rejects and idempotent successes
1415        let config = CancelBroadcasterConfig {
1416            pool_size: 1,
1417            api_key: Some("test_key".to_string()),
1418            api_secret: Some("test_secret".to_string()),
1419            base_url: Some("https://test.example.com".to_string()),
1420            testnet: false,
1421            timeout_secs: Some(5),
1422            max_retries: None,
1423            retry_delay_ms: None,
1424            retry_delay_max_ms: None,
1425            recv_window_ms: None,
1426            max_requests_per_second: None,
1427            max_requests_per_minute: None,
1428            health_check_interval_secs: 60,
1429            health_check_timeout_secs: 5,
1430            expected_reject_patterns: vec![
1431                "ParticipateDoNotInitiate".to_string(),
1432                "Close-only".to_string(),
1433            ],
1434            idempotent_success_patterns: vec![
1435                "AlreadyCanceled".to_string(),
1436                "orderID not found".to_string(),
1437                "Unable to cancel".to_string(),
1438            ],
1439            proxy_urls: vec![],
1440        };
1441
1442        let broadcaster = CancelBroadcaster::new(config).unwrap();
1443
1444        // Test expected reject patterns
1445        assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1446        assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1447        assert!(!broadcaster.is_expected_reject("Connection timeout"));
1448
1449        // Test idempotent success patterns
1450        assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1451        assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1452        assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1453        assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1454    }
1455
1456    // Happy-path coverage for broadcast_batch_cancel and broadcast_cancel_all
1457    // Note: These use simplified stubs since batch/cancel-all bypass test_handler
1458    // Full HTTP mocking tested in integration tests
1459    #[tokio::test]
1460    async fn test_broadcast_batch_cancel_structure() {
1461        // Validates broadcaster structure and metric initialization
1462        let config = CancelBroadcasterConfig {
1463            pool_size: 2,
1464            api_key: Some("test_key".to_string()),
1465            api_secret: Some("test_secret".to_string()),
1466            base_url: Some("https://test.example.com".to_string()),
1467            testnet: false,
1468            timeout_secs: Some(5),
1469            max_retries: None,
1470            retry_delay_ms: None,
1471            retry_delay_max_ms: None,
1472            recv_window_ms: None,
1473            max_requests_per_second: None,
1474            max_requests_per_minute: None,
1475            health_check_interval_secs: 60,
1476            health_check_timeout_secs: 5,
1477            expected_reject_patterns: vec![],
1478            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1479            proxy_urls: vec![],
1480        };
1481
1482        let broadcaster = CancelBroadcaster::new(config).unwrap();
1483        let metrics = broadcaster.get_metrics_async().await;
1484
1485        // Verify initial state
1486        assert_eq!(metrics.total_clients, 2);
1487        assert_eq!(metrics.total_cancels, 0);
1488        assert_eq!(metrics.successful_cancels, 0);
1489        assert_eq!(metrics.failed_cancels, 0);
1490    }
1491
1492    #[tokio::test]
1493    async fn test_broadcast_cancel_all_structure() {
1494        // Validates broadcaster structure for cancel_all operations
1495        let config = CancelBroadcasterConfig {
1496            pool_size: 3,
1497            api_key: Some("test_key".to_string()),
1498            api_secret: Some("test_secret".to_string()),
1499            base_url: Some("https://test.example.com".to_string()),
1500            testnet: false,
1501            timeout_secs: Some(5),
1502            max_retries: None,
1503            retry_delay_ms: None,
1504            retry_delay_max_ms: None,
1505            recv_window_ms: None,
1506            max_requests_per_second: None,
1507            max_requests_per_minute: None,
1508            health_check_interval_secs: 60,
1509            health_check_timeout_secs: 5,
1510            expected_reject_patterns: vec![],
1511            idempotent_success_patterns: vec!["orderID not found".to_string()],
1512            proxy_urls: vec![],
1513        };
1514
1515        let broadcaster = CancelBroadcaster::new(config).unwrap();
1516        let metrics = broadcaster.get_metrics_async().await;
1517
1518        // Verify pool size and initial metrics
1519        assert_eq!(metrics.total_clients, 3);
1520        assert_eq!(metrics.healthy_clients, 3);
1521        assert_eq!(metrics.total_cancels, 0);
1522    }
1523
1524    // Metric health tests - validates that idempotent successes don't increment failed_cancels
1525    #[tokio::test]
1526    async fn test_single_cancel_metrics_with_mixed_responses() {
1527        // Test similar to test_broadcast_cancel_mixed_idempotent_and_failure
1528        // but explicitly validates metric health
1529        let transports = vec![
1530            create_stub_transport("client-0", |_, _, _| async {
1531                anyhow::bail!("Connection timeout")
1532            }),
1533            create_stub_transport("client-1", |_, _, _| async {
1534                anyhow::bail!("AlreadyCanceled")
1535            }),
1536        ];
1537
1538        let config = CancelBroadcasterConfig::default();
1539        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1540
1541        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1542        let result = broadcaster
1543            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1544            .await;
1545
1546        // Should succeed with idempotent
1547        assert!(result.is_ok());
1548        assert!(result.unwrap().is_none());
1549
1550        // Verify metrics: idempotent success doesn't count as failure
1551        let metrics = broadcaster.get_metrics_async().await;
1552        assert_eq!(
1553            metrics.failed_cancels, 0,
1554            "Idempotent success should not increment failed_cancels"
1555        );
1556        assert_eq!(metrics.idempotent_successes, 1);
1557        assert_eq!(metrics.successful_cancels, 0);
1558    }
1559
1560    #[tokio::test]
1561    async fn test_metrics_initialization_and_health() {
1562        // Validates that metrics start at zero and clients start healthy
1563        let config = CancelBroadcasterConfig {
1564            pool_size: 4,
1565            api_key: Some("test_key".to_string()),
1566            api_secret: Some("test_secret".to_string()),
1567            base_url: Some("https://test.example.com".to_string()),
1568            testnet: false,
1569            timeout_secs: Some(5),
1570            max_retries: None,
1571            retry_delay_ms: None,
1572            retry_delay_max_ms: None,
1573            recv_window_ms: None,
1574            max_requests_per_second: None,
1575            max_requests_per_minute: None,
1576            health_check_interval_secs: 60,
1577            health_check_timeout_secs: 5,
1578            expected_reject_patterns: vec![],
1579            idempotent_success_patterns: vec![],
1580            proxy_urls: vec![],
1581        };
1582
1583        let broadcaster = CancelBroadcaster::new(config).unwrap();
1584        let metrics = broadcaster.get_metrics_async().await;
1585
1586        // All metrics should start at zero
1587        assert_eq!(metrics.total_cancels, 0);
1588        assert_eq!(metrics.successful_cancels, 0);
1589        assert_eq!(metrics.failed_cancels, 0);
1590        assert_eq!(metrics.expected_rejects, 0);
1591        assert_eq!(metrics.idempotent_successes, 0);
1592
1593        // All clients should start healthy
1594        assert_eq!(metrics.healthy_clients, 4);
1595        assert_eq!(metrics.total_clients, 4);
1596    }
1597
1598    // Health-check task lifecycle test
1599    #[tokio::test]
1600    async fn test_health_check_task_lifecycle() {
1601        let config = CancelBroadcasterConfig {
1602            pool_size: 1,
1603            api_key: Some("test_key".to_string()),
1604            api_secret: Some("test_secret".to_string()),
1605            base_url: Some("https://test.example.com".to_string()),
1606            testnet: false,
1607            timeout_secs: Some(5),
1608            max_retries: None,
1609            retry_delay_ms: None,
1610            retry_delay_max_ms: None,
1611            recv_window_ms: None,
1612            max_requests_per_second: None,
1613            max_requests_per_minute: None,
1614            health_check_interval_secs: 1, // Very short interval
1615            health_check_timeout_secs: 1,
1616            expected_reject_patterns: vec![],
1617            idempotent_success_patterns: vec![],
1618            proxy_urls: vec![],
1619        };
1620
1621        let broadcaster = CancelBroadcaster::new(config).unwrap();
1622
1623        // Start the broadcaster
1624        broadcaster.start().await.unwrap();
1625        assert!(broadcaster.running.load(Ordering::Relaxed));
1626
1627        // Verify task handle exists
1628        {
1629            let task_guard = broadcaster.health_check_task.read().await;
1630            assert!(task_guard.is_some());
1631        }
1632
1633        // Wait a bit for health check to potentially run
1634        tokio::time::sleep(Duration::from_millis(100)).await;
1635
1636        // Stop the broadcaster
1637        broadcaster.stop().await;
1638        assert!(!broadcaster.running.load(Ordering::Relaxed));
1639
1640        // Verify task handle has been cleared
1641        {
1642            let task_guard = broadcaster.health_check_task.read().await;
1643            assert!(task_guard.is_none());
1644        }
1645    }
1646}