nautilus_bitmex/execution/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Submit request broadcaster for redundant order submission.
17//!
18//! This module provides the [`SubmitBroadcaster`] which fans out submit requests
19//! to multiple HTTP clients in parallel for redundancy. The broadcaster is triggered
20//! when the `SubmitOrder` command contains `params["broadcast_submit_tries"]`.
21//!
22//! Key design patterns:
23//!
24//! - **Dependency injection via traits**: Uses `SubmitExecutor` trait to abstract
25//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
26//! - **Trait objects over generics**: Uses `Arc<dyn SubmitExecutor>` to avoid
27//!   generic type parameters on the public API (simpler Python FFI).
28//! - **Short-circuit on first success**: Aborts remaining requests once any client
29//!   succeeds, minimizing latency.
30//! - **Idempotent rejection handling**: Recognizes duplicate clOrdID as expected
31//!   rejections for debug-level logging without noise.
32
33// TODO: Replace boxed futures in `SubmitExecutor` once stable async trait object support
34// lands so we can drop the per-call heap allocation
35
36use std::{
37    fmt::Debug,
38    future::Future,
39    pin::Pin,
40    sync::{
41        Arc,
42        atomic::{AtomicBool, AtomicU64, Ordering},
43    },
44    time::Duration,
45};
46
47use futures_util::future;
48use nautilus_common::live::get_runtime;
49use nautilus_model::{
50    enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
51    identifiers::{ClientOrderId, InstrumentId, OrderListId},
52    instruments::InstrumentAny,
53    reports::OrderStatusReport,
54    types::{Price, Quantity},
55};
56use tokio::{sync::RwLock, task::JoinHandle, time::interval};
57
58use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
59
60/// Trait for order submission operations.
61///
62/// This trait abstracts the execution layer to enable dependency injection and testing
63/// without conditional compilation. The broadcaster holds executors as `Arc<dyn SubmitExecutor>`
64/// to avoid generic type parameters that would complicate the Python FFI boundary.
65///
66/// # Thread Safety
67///
68/// All methods must be safe to call concurrently from multiple threads. Implementations
69/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
70///
71/// # Error Handling
72///
73/// Methods return `anyhow::Result` for flexibility. Implementers should provide
74/// meaningful error messages that can be logged and tracked by the broadcaster.
75///
76/// # Implementation Note
77///
78/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
79/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
80/// `Clone`) to be used without modification.
81trait SubmitExecutor: Send + Sync {
82    /// Adds an instrument for caching.
83    fn add_instrument(&self, instrument: InstrumentAny);
84
85    /// Performs a health check on the executor.
86    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
87
88    /// Submits a single order.
89    #[allow(clippy::too_many_arguments)]
90    fn submit_order(
91        &self,
92        instrument_id: InstrumentId,
93        client_order_id: ClientOrderId,
94        order_side: OrderSide,
95        order_type: OrderType,
96        quantity: Quantity,
97        time_in_force: TimeInForce,
98        price: Option<Price>,
99        trigger_price: Option<Price>,
100        trigger_type: Option<TriggerType>,
101        display_qty: Option<Quantity>,
102        post_only: bool,
103        reduce_only: bool,
104        order_list_id: Option<OrderListId>,
105        contingency_type: Option<ContingencyType>,
106    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
107}
108
109impl SubmitExecutor for BitmexHttpClient {
110    fn add_instrument(&self, instrument: InstrumentAny) {
111        Self::cache_instrument(self, instrument);
112    }
113
114    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
115        Box::pin(async move {
116            Self::get_server_time(self)
117                .await
118                .map(|_| ())
119                .map_err(|e| anyhow::anyhow!("{e}"))
120        })
121    }
122
123    #[allow(clippy::too_many_arguments)]
124    fn submit_order(
125        &self,
126        instrument_id: InstrumentId,
127        client_order_id: ClientOrderId,
128        order_side: OrderSide,
129        order_type: OrderType,
130        quantity: Quantity,
131        time_in_force: TimeInForce,
132        price: Option<Price>,
133        trigger_price: Option<Price>,
134        trigger_type: Option<TriggerType>,
135        display_qty: Option<Quantity>,
136        post_only: bool,
137        reduce_only: bool,
138        order_list_id: Option<OrderListId>,
139        contingency_type: Option<ContingencyType>,
140    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
141        Box::pin(async move {
142            Self::submit_order(
143                self,
144                instrument_id,
145                client_order_id,
146                order_side,
147                order_type,
148                quantity,
149                time_in_force,
150                price,
151                trigger_price,
152                trigger_type,
153                display_qty,
154                post_only,
155                reduce_only,
156                order_list_id,
157                contingency_type,
158            )
159            .await
160        })
161    }
162}
163
164/// Configuration for the submit broadcaster.
165#[derive(Debug, Clone)]
166pub struct SubmitBroadcasterConfig {
167    /// Number of HTTP clients in the pool.
168    pub pool_size: usize,
169    /// BitMEX API key (None will source from environment).
170    pub api_key: Option<String>,
171    /// BitMEX API secret (None will source from environment).
172    pub api_secret: Option<String>,
173    /// Base URL for BitMEX HTTP API.
174    pub base_url: Option<String>,
175    /// If connecting to BitMEX testnet.
176    pub testnet: bool,
177    /// Timeout in seconds for HTTP requests.
178    pub timeout_secs: Option<u64>,
179    /// Maximum number of retry attempts for failed requests.
180    pub max_retries: Option<u32>,
181    /// Initial delay in milliseconds between retry attempts.
182    pub retry_delay_ms: Option<u64>,
183    /// Maximum delay in milliseconds between retry attempts.
184    pub retry_delay_max_ms: Option<u64>,
185    /// Expiration window in milliseconds for signed requests.
186    pub recv_window_ms: Option<u64>,
187    /// Maximum REST burst rate (requests per second).
188    pub max_requests_per_second: Option<u32>,
189    /// Maximum REST rolling rate (requests per minute).
190    pub max_requests_per_minute: Option<u32>,
191    /// Interval in seconds between health check pings.
192    pub health_check_interval_secs: u64,
193    /// Timeout in seconds for health check requests.
194    pub health_check_timeout_secs: u64,
195    /// Substrings to identify expected submit rejections for debug-level logging.
196    pub expected_reject_patterns: Vec<String>,
197    /// Optional list of proxy URLs for path diversity.
198    ///
199    /// Each transport instance uses the proxy at its index. If the list is shorter
200    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
201    /// are ignored.
202    pub proxy_urls: Vec<Option<String>>,
203}
204
205impl Default for SubmitBroadcasterConfig {
206    fn default() -> Self {
207        Self {
208            pool_size: 3,
209            api_key: None,
210            api_secret: None,
211            base_url: None,
212            testnet: false,
213            timeout_secs: Some(60),
214            max_retries: None,
215            retry_delay_ms: Some(1_000),
216            retry_delay_max_ms: Some(5_000),
217            recv_window_ms: Some(10_000),
218            max_requests_per_second: Some(10),
219            max_requests_per_minute: Some(120),
220            health_check_interval_secs: 30,
221            health_check_timeout_secs: 5,
222            expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
223            proxy_urls: vec![],
224        }
225    }
226}
227
228/// Transport client wrapper with health monitoring.
229#[derive(Clone)]
230struct TransportClient {
231    /// Executor wrapped in Arc to enable cloning without requiring Clone on SubmitExecutor.
232    ///
233    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
234    /// the executor across multiple TransportClient clones.
235    executor: Arc<dyn SubmitExecutor>,
236    client_id: String,
237    healthy: Arc<AtomicBool>,
238    submit_count: Arc<AtomicU64>,
239    error_count: Arc<AtomicU64>,
240}
241
242impl Debug for TransportClient {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        f.debug_struct("TransportClient")
245            .field("client_id", &self.client_id)
246            .field("healthy", &self.healthy)
247            .field("submit_count", &self.submit_count)
248            .field("error_count", &self.error_count)
249            .finish()
250    }
251}
252
253impl TransportClient {
254    fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
255        Self {
256            executor: Arc::new(executor),
257            client_id,
258            healthy: Arc::new(AtomicBool::new(true)),
259            submit_count: Arc::new(AtomicU64::new(0)),
260            error_count: Arc::new(AtomicU64::new(0)),
261        }
262    }
263
264    fn is_healthy(&self) -> bool {
265        self.healthy.load(Ordering::Relaxed)
266    }
267
268    fn mark_healthy(&self) {
269        self.healthy.store(true, Ordering::Relaxed);
270    }
271
272    fn mark_unhealthy(&self) {
273        self.healthy.store(false, Ordering::Relaxed);
274    }
275
276    fn get_submit_count(&self) -> u64 {
277        self.submit_count.load(Ordering::Relaxed)
278    }
279
280    fn get_error_count(&self) -> u64 {
281        self.error_count.load(Ordering::Relaxed)
282    }
283
284    async fn health_check(&self, timeout_secs: u64) -> bool {
285        match tokio::time::timeout(
286            Duration::from_secs(timeout_secs),
287            self.executor.health_check(),
288        )
289        .await
290        {
291            Ok(Ok(_)) => {
292                self.mark_healthy();
293                true
294            }
295            Ok(Err(e)) => {
296                tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
297                self.mark_unhealthy();
298                false
299            }
300            Err(_) => {
301                tracing::warn!("Health check timeout for client {}", self.client_id);
302                self.mark_unhealthy();
303                false
304            }
305        }
306    }
307
308    #[allow(clippy::too_many_arguments)]
309    async fn submit_order(
310        &self,
311        instrument_id: InstrumentId,
312        client_order_id: ClientOrderId,
313        order_side: OrderSide,
314        order_type: OrderType,
315        quantity: Quantity,
316        time_in_force: TimeInForce,
317        price: Option<Price>,
318        trigger_price: Option<Price>,
319        trigger_type: Option<TriggerType>,
320        display_qty: Option<Quantity>,
321        post_only: bool,
322        reduce_only: bool,
323        order_list_id: Option<OrderListId>,
324        contingency_type: Option<ContingencyType>,
325    ) -> anyhow::Result<OrderStatusReport> {
326        self.submit_count.fetch_add(1, Ordering::Relaxed);
327
328        match self
329            .executor
330            .submit_order(
331                instrument_id,
332                client_order_id,
333                order_side,
334                order_type,
335                quantity,
336                time_in_force,
337                price,
338                trigger_price,
339                trigger_type,
340                display_qty,
341                post_only,
342                reduce_only,
343                order_list_id,
344                contingency_type,
345            )
346            .await
347        {
348            Ok(report) => {
349                self.mark_healthy();
350                Ok(report)
351            }
352            Err(e) => {
353                self.error_count.fetch_add(1, Ordering::Relaxed);
354                Err(e)
355            }
356        }
357    }
358}
359
360/// Broadcasts submit requests to multiple HTTP clients for redundancy.
361///
362/// This broadcaster fans out submit requests to multiple pre-warmed HTTP clients
363/// in parallel, short-circuits when the first successful acknowledgement is received,
364/// and handles expected rejection patterns (duplicate clOrdID) with appropriate log levels.
365#[cfg_attr(feature = "python", pyo3::pyclass)]
366#[derive(Debug)]
367pub struct SubmitBroadcaster {
368    config: SubmitBroadcasterConfig,
369    transports: Arc<Vec<TransportClient>>,
370    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
371    running: Arc<AtomicBool>,
372    total_submits: Arc<AtomicU64>,
373    successful_submits: Arc<AtomicU64>,
374    failed_submits: Arc<AtomicU64>,
375    expected_rejects: Arc<AtomicU64>,
376}
377
378impl SubmitBroadcaster {
379    /// Creates a new [`SubmitBroadcaster`] with internal HTTP client pool.
380    ///
381    /// # Errors
382    ///
383    /// Returns an error if any HTTP client fails to initialize.
384    pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
385        let mut transports = Vec::with_capacity(config.pool_size);
386
387        // Synthesize base_url when testnet is true but base_url is None
388        let base_url = if config.testnet && config.base_url.is_none() {
389            Some(BITMEX_HTTP_TESTNET_URL.to_string())
390        } else {
391            config.base_url.clone()
392        };
393
394        for i in 0..config.pool_size {
395            // Assign proxy from config list, or None if index exceeds list length
396            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
397
398            let client = BitmexHttpClient::with_credentials(
399                config.api_key.clone(),
400                config.api_secret.clone(),
401                base_url.clone(),
402                config.timeout_secs,
403                config.max_retries,
404                config.retry_delay_ms,
405                config.retry_delay_max_ms,
406                config.recv_window_ms,
407                config.max_requests_per_second,
408                config.max_requests_per_minute,
409                proxy_url,
410            )
411            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
412
413            transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
414        }
415
416        Ok(Self {
417            config,
418            transports: Arc::new(transports),
419            health_check_task: Arc::new(RwLock::new(None)),
420            running: Arc::new(AtomicBool::new(false)),
421            total_submits: Arc::new(AtomicU64::new(0)),
422            successful_submits: Arc::new(AtomicU64::new(0)),
423            failed_submits: Arc::new(AtomicU64::new(0)),
424            expected_rejects: Arc::new(AtomicU64::new(0)),
425        })
426    }
427
428    /// Starts the broadcaster and health check loop.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if the broadcaster is already running.
433    pub async fn start(&self) -> anyhow::Result<()> {
434        if self.running.load(Ordering::Relaxed) {
435            return Ok(());
436        }
437
438        self.running.store(true, Ordering::Relaxed);
439
440        // Initial health check for all clients
441        self.run_health_checks().await;
442
443        // Start periodic health check task
444        let transports = Arc::clone(&self.transports);
445        let running = Arc::clone(&self.running);
446        let interval_secs = self.config.health_check_interval_secs;
447        let timeout_secs = self.config.health_check_timeout_secs;
448
449        let task = get_runtime().spawn(async move {
450            let mut ticker = interval(Duration::from_secs(interval_secs));
451            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
452
453            loop {
454                ticker.tick().await;
455
456                if !running.load(Ordering::Relaxed) {
457                    break;
458                }
459
460                let tasks: Vec<_> = transports
461                    .iter()
462                    .map(|t| t.health_check(timeout_secs))
463                    .collect();
464
465                let results = future::join_all(tasks).await;
466                let healthy_count = results.iter().filter(|&&r| r).count();
467
468                tracing::debug!(
469                    "Health check complete: {healthy_count}/{} clients healthy",
470                    results.len()
471                );
472            }
473        });
474
475        *self.health_check_task.write().await = Some(task);
476
477        tracing::info!(
478            "SubmitBroadcaster started with {} clients",
479            self.transports.len()
480        );
481
482        Ok(())
483    }
484
485    /// Stops the broadcaster and health check loop.
486    pub async fn stop(&self) {
487        if !self.running.load(Ordering::Relaxed) {
488            return;
489        }
490
491        self.running.store(false, Ordering::Relaxed);
492
493        if let Some(task) = self.health_check_task.write().await.take() {
494            task.abort();
495        }
496
497        tracing::info!("SubmitBroadcaster stopped");
498    }
499
500    async fn run_health_checks(&self) {
501        let tasks: Vec<_> = self
502            .transports
503            .iter()
504            .map(|t| t.health_check(self.config.health_check_timeout_secs))
505            .collect();
506
507        let results = future::join_all(tasks).await;
508        let healthy_count = results.iter().filter(|&&r| r).count();
509
510        tracing::debug!(
511            "Health check complete: {healthy_count}/{} clients healthy",
512            results.len()
513        );
514    }
515
516    fn is_expected_reject(&self, error_message: &str) -> bool {
517        self.config
518            .expected_reject_patterns
519            .iter()
520            .any(|pattern| error_message.contains(pattern))
521    }
522
523    /// Processes submit request results, handling success and failures.
524    ///
525    /// This helper consolidates the common error handling loop used for submit broadcasts.
526    async fn process_submit_results<T>(
527        &self,
528        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
529        operation: &str,
530        params: String,
531    ) -> anyhow::Result<T>
532    where
533        T: Send + 'static,
534    {
535        let mut errors = Vec::new();
536
537        while !handles.is_empty() {
538            let current_handles = std::mem::take(&mut handles);
539            let (result, _idx, remaining) = future::select_all(current_handles).await;
540            handles = remaining.into_iter().collect();
541
542            match result {
543                Ok((client_id, Ok(result))) => {
544                    // First success - abort remaining handles
545                    for handle in &handles {
546                        handle.abort();
547                    }
548                    self.successful_submits.fetch_add(1, Ordering::Relaxed);
549                    tracing::debug!("{} broadcast succeeded [{client_id}] {params}", operation,);
550                    return Ok(result);
551                }
552                Ok((client_id, Err(e))) => {
553                    let error_msg = e.to_string();
554
555                    if self.is_expected_reject(&error_msg) {
556                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
557                        tracing::debug!(
558                            "Expected {} rejection [{client_id}]: {error_msg} {params}",
559                            operation.to_lowercase(),
560                        );
561                        errors.push(error_msg);
562                    } else {
563                        tracing::warn!(
564                            "{} request failed [{client_id}]: {error_msg} {params}",
565                            operation,
566                        );
567                        errors.push(error_msg);
568                    }
569                }
570                Err(e) => {
571                    tracing::warn!("{} task join error: {e:?}", operation);
572                    errors.push(format!("Task panicked: {e:?}"));
573                }
574            }
575        }
576
577        // All tasks failed
578        self.failed_submits.fetch_add(1, Ordering::Relaxed);
579        tracing::error!(
580            "All {} requests failed: {errors:?} {params}",
581            operation.to_lowercase(),
582        );
583        Err(anyhow::anyhow!(
584            "All {} requests failed: {:?}",
585            operation.to_lowercase(),
586            errors
587        ))
588    }
589
590    /// Broadcasts a submit request to all healthy clients in parallel.
591    ///
592    /// # Returns
593    ///
594    /// - `Ok(report)` if successfully submitted with a report.
595    /// - `Err` if all requests failed.
596    ///
597    /// # Errors
598    ///
599    /// Returns an error if all submit requests fail or no healthy clients are available.
600    #[allow(clippy::too_many_arguments)]
601    pub async fn broadcast_submit(
602        &self,
603        instrument_id: InstrumentId,
604        client_order_id: ClientOrderId,
605        order_side: OrderSide,
606        order_type: OrderType,
607        quantity: Quantity,
608        time_in_force: TimeInForce,
609        price: Option<Price>,
610        trigger_price: Option<Price>,
611        trigger_type: Option<TriggerType>,
612        display_qty: Option<Quantity>,
613        post_only: bool,
614        reduce_only: bool,
615        order_list_id: Option<OrderListId>,
616        contingency_type: Option<ContingencyType>,
617        submit_tries: Option<usize>,
618    ) -> anyhow::Result<OrderStatusReport> {
619        self.total_submits.fetch_add(1, Ordering::Relaxed);
620
621        let pool_size = self.config.pool_size;
622        let actual_tries = if let Some(t) = submit_tries {
623            if t > pool_size {
624                // Use log macro for Python visibility for now
625                log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
626            }
627            std::cmp::min(t, pool_size)
628        } else {
629            pool_size
630        };
631
632        tracing::debug!(
633            "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
634        );
635
636        let healthy_transports: Vec<TransportClient> = self
637            .transports
638            .iter()
639            .filter(|t| t.is_healthy())
640            .take(actual_tries)
641            .cloned()
642            .collect();
643
644        if healthy_transports.is_empty() {
645            self.failed_submits.fetch_add(1, Ordering::Relaxed);
646            anyhow::bail!("No healthy transport clients available");
647        }
648
649        tracing::debug!(
650            "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
651            healthy_transports.len(),
652        );
653
654        let mut handles = Vec::new();
655        for (idx, transport) in healthy_transports.into_iter().enumerate() {
656            // First client uses original ID, subsequent clients get suffix to avoid duplicates
657            let modified_client_order_id = if idx == 0 {
658                client_order_id
659            } else {
660                ClientOrderId::new(format!("{}-{}", client_order_id.as_str(), idx))
661            };
662
663            let handle = get_runtime().spawn(async move {
664                let client_id = transport.client_id.clone();
665                let result = transport
666                    .submit_order(
667                        instrument_id,
668                        modified_client_order_id,
669                        order_side,
670                        order_type,
671                        quantity,
672                        time_in_force,
673                        price,
674                        trigger_price,
675                        trigger_type,
676                        display_qty,
677                        post_only,
678                        reduce_only,
679                        order_list_id,
680                        contingency_type,
681                    )
682                    .await;
683                (client_id, result)
684            });
685            handles.push(handle);
686        }
687
688        self.process_submit_results(
689            handles,
690            "Submit",
691            format!("(client_order_id={client_order_id:?})"),
692        )
693        .await
694    }
695
696    /// Gets broadcaster metrics.
697    pub fn get_metrics(&self) -> BroadcasterMetrics {
698        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
699        let total_clients = self.transports.len();
700
701        BroadcasterMetrics {
702            total_submits: self.total_submits.load(Ordering::Relaxed),
703            successful_submits: self.successful_submits.load(Ordering::Relaxed),
704            failed_submits: self.failed_submits.load(Ordering::Relaxed),
705            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
706            healthy_clients,
707            total_clients,
708        }
709    }
710
711    /// Gets broadcaster metrics (async version for use within async context).
712    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
713        self.get_metrics()
714    }
715
716    /// Gets per-client statistics.
717    pub fn get_client_stats(&self) -> Vec<ClientStats> {
718        self.transports
719            .iter()
720            .map(|t| ClientStats {
721                client_id: t.client_id.clone(),
722                healthy: t.is_healthy(),
723                submit_count: t.get_submit_count(),
724                error_count: t.get_error_count(),
725            })
726            .collect()
727    }
728
729    /// Gets per-client statistics (async version for use within async context).
730    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
731        self.get_client_stats()
732    }
733
734    /// Caches an instrument in all HTTP clients in the pool.
735    pub fn cache_instrument(&self, instrument: InstrumentAny) {
736        for transport in self.transports.iter() {
737            transport.executor.add_instrument(instrument.clone());
738        }
739    }
740
741    #[must_use]
742    pub fn clone_for_async(&self) -> Self {
743        Self {
744            config: self.config.clone(),
745            transports: Arc::clone(&self.transports),
746            health_check_task: Arc::clone(&self.health_check_task),
747            running: Arc::clone(&self.running),
748            total_submits: Arc::clone(&self.total_submits),
749            successful_submits: Arc::clone(&self.successful_submits),
750            failed_submits: Arc::clone(&self.failed_submits),
751            expected_rejects: Arc::clone(&self.expected_rejects),
752        }
753    }
754
755    #[cfg(test)]
756    fn new_with_transports(
757        config: SubmitBroadcasterConfig,
758        transports: Vec<TransportClient>,
759    ) -> Self {
760        Self {
761            config,
762            transports: Arc::new(transports),
763            health_check_task: Arc::new(RwLock::new(None)),
764            running: Arc::new(AtomicBool::new(false)),
765            total_submits: Arc::new(AtomicU64::new(0)),
766            successful_submits: Arc::new(AtomicU64::new(0)),
767            failed_submits: Arc::new(AtomicU64::new(0)),
768            expected_rejects: Arc::new(AtomicU64::new(0)),
769        }
770    }
771}
772
773/// Broadcaster metrics snapshot.
774#[derive(Debug, Clone)]
775pub struct BroadcasterMetrics {
776    pub total_submits: u64,
777    pub successful_submits: u64,
778    pub failed_submits: u64,
779    pub expected_rejects: u64,
780    pub healthy_clients: usize,
781    pub total_clients: usize,
782}
783
784/// Per-client statistics.
785#[derive(Debug, Clone)]
786pub struct ClientStats {
787    pub client_id: String,
788    pub healthy: bool,
789    pub submit_count: u64,
790    pub error_count: u64,
791}
792
793#[cfg(test)]
794mod tests {
795    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
796
797    use nautilus_core::UUID4;
798    use nautilus_model::{
799        enums::{
800            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
801        },
802        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
803        reports::OrderStatusReport,
804        types::{Price, Quantity},
805    };
806
807    use super::*;
808
809    /// Mock executor for testing.
810    #[derive(Clone)]
811    #[allow(clippy::type_complexity)]
812    struct MockExecutor {
813        handler: Arc<
814            dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
815                + Send
816                + Sync,
817        >,
818    }
819
820    impl MockExecutor {
821        fn new<F, Fut>(handler: F) -> Self
822        where
823            F: Fn() -> Fut + Send + Sync + 'static,
824            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
825        {
826            Self {
827                handler: Arc::new(move || Box::pin(handler())),
828            }
829        }
830    }
831
832    impl SubmitExecutor for MockExecutor {
833        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
834            Box::pin(async { Ok(()) })
835        }
836
837        #[allow(clippy::too_many_arguments)]
838        fn submit_order(
839            &self,
840            _instrument_id: InstrumentId,
841            _client_order_id: ClientOrderId,
842            _order_side: OrderSide,
843            _order_type: OrderType,
844            _quantity: Quantity,
845            _time_in_force: TimeInForce,
846            _price: Option<Price>,
847            _trigger_price: Option<Price>,
848            _trigger_type: Option<TriggerType>,
849            _display_qty: Option<Quantity>,
850            _post_only: bool,
851            _reduce_only: bool,
852            _order_list_id: Option<OrderListId>,
853            _contingency_type: Option<ContingencyType>,
854        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
855            (self.handler)()
856        }
857
858        fn add_instrument(&self, _instrument: InstrumentAny) {
859            // No-op for mock
860        }
861    }
862
863    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
864        OrderStatusReport {
865            account_id: AccountId::from("BITMEX-001"),
866            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
867            venue_order_id: VenueOrderId::from(venue_order_id),
868            order_side: OrderSide::Buy,
869            order_type: OrderType::Limit,
870            time_in_force: TimeInForce::Gtc,
871            order_status: OrderStatus::Accepted,
872            price: Some(Price::new(50000.0, 2)),
873            quantity: Quantity::new(100.0, 0),
874            filled_qty: Quantity::new(0.0, 0),
875            report_id: UUID4::new(),
876            ts_accepted: 0.into(),
877            ts_last: 0.into(),
878            ts_init: 0.into(),
879            client_order_id: None,
880            avg_px: None,
881            trigger_price: None,
882            trigger_type: None,
883            contingency_type: ContingencyType::NoContingency,
884            expire_time: None,
885            order_list_id: None,
886            venue_position_id: None,
887            linked_order_ids: None,
888            parent_order_id: None,
889            display_qty: None,
890            limit_offset: None,
891            trailing_offset: None,
892            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
893            post_only: false,
894            reduce_only: false,
895            cancel_reason: None,
896            ts_triggered: None,
897        }
898    }
899
900    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
901    where
902        F: Fn() -> Fut + Send + Sync + 'static,
903        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
904    {
905        let executor = MockExecutor::new(handler);
906        TransportClient::new(executor, client_id.to_string())
907    }
908
909    #[tokio::test]
910    async fn test_broadcast_submit_immediate_success() {
911        let report = create_test_report("ORDER-1");
912        let report_clone = report.clone();
913
914        let transports = vec![
915            create_stub_transport("client-0", move || {
916                let report = report_clone.clone();
917                async move { Ok(report) }
918            }),
919            create_stub_transport("client-1", || async {
920                tokio::time::sleep(Duration::from_secs(10)).await;
921                anyhow::bail!("Should be aborted")
922            }),
923        ];
924
925        let config = SubmitBroadcasterConfig::default();
926        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
927
928        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
929        let result = broadcaster
930            .broadcast_submit(
931                instrument_id,
932                ClientOrderId::from("O-123"),
933                OrderSide::Buy,
934                OrderType::Limit,
935                Quantity::new(100.0, 0),
936                TimeInForce::Gtc,
937                Some(Price::new(50000.0, 2)),
938                None,
939                None,
940                None,
941                false,
942                false,
943                None,
944                None,
945                None,
946            )
947            .await;
948
949        assert!(result.is_ok());
950        let returned_report = result.unwrap();
951        assert_eq!(returned_report.venue_order_id, report.venue_order_id);
952
953        let metrics = broadcaster.get_metrics_async().await;
954        assert_eq!(metrics.successful_submits, 1);
955        assert_eq!(metrics.failed_submits, 0);
956        assert_eq!(metrics.total_submits, 1);
957    }
958
959    #[tokio::test]
960    async fn test_broadcast_submit_duplicate_clordid_expected() {
961        let transports = vec![
962            create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
963            create_stub_transport("client-1", || async {
964                tokio::time::sleep(Duration::from_secs(10)).await;
965                anyhow::bail!("Should be aborted")
966            }),
967        ];
968
969        let config = SubmitBroadcasterConfig::default();
970        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
971
972        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
973        let result = broadcaster
974            .broadcast_submit(
975                instrument_id,
976                ClientOrderId::from("O-123"),
977                OrderSide::Buy,
978                OrderType::Limit,
979                Quantity::new(100.0, 0),
980                TimeInForce::Gtc,
981                Some(Price::new(50000.0, 2)),
982                None,
983                None,
984                None,
985                false,
986                false,
987                None,
988                None,
989                None,
990            )
991            .await;
992
993        assert!(result.is_err());
994
995        let metrics = broadcaster.get_metrics_async().await;
996        assert_eq!(metrics.expected_rejects, 1);
997        assert_eq!(metrics.successful_submits, 0);
998        assert_eq!(metrics.failed_submits, 1);
999    }
1000
1001    #[tokio::test]
1002    async fn test_broadcast_submit_all_failures() {
1003        let transports = vec![
1004            create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1005            create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1006        ];
1007
1008        let config = SubmitBroadcasterConfig::default();
1009        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1010
1011        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1012        let result = broadcaster
1013            .broadcast_submit(
1014                instrument_id,
1015                ClientOrderId::from("O-456"),
1016                OrderSide::Sell,
1017                OrderType::Market,
1018                Quantity::new(50.0, 0),
1019                TimeInForce::Ioc,
1020                None,
1021                None,
1022                None,
1023                None,
1024                false,
1025                false,
1026                None,
1027                None,
1028                None,
1029            )
1030            .await;
1031
1032        assert!(result.is_err());
1033        assert!(
1034            result
1035                .unwrap_err()
1036                .to_string()
1037                .contains("All submit requests failed")
1038        );
1039
1040        let metrics = broadcaster.get_metrics_async().await;
1041        assert_eq!(metrics.failed_submits, 1);
1042        assert_eq!(metrics.successful_submits, 0);
1043    }
1044
1045    #[tokio::test]
1046    async fn test_broadcast_submit_no_healthy_clients() {
1047        let transport =
1048            create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1049        transport.healthy.store(false, Ordering::Relaxed);
1050
1051        let config = SubmitBroadcasterConfig::default();
1052        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1053
1054        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1055        let result = broadcaster
1056            .broadcast_submit(
1057                instrument_id,
1058                ClientOrderId::from("O-789"),
1059                OrderSide::Buy,
1060                OrderType::Limit,
1061                Quantity::new(100.0, 0),
1062                TimeInForce::Gtc,
1063                Some(Price::new(50000.0, 2)),
1064                None,
1065                None,
1066                None,
1067                false,
1068                false,
1069                None,
1070                None,
1071                None,
1072            )
1073            .await;
1074
1075        assert!(result.is_err());
1076        assert!(
1077            result
1078                .unwrap_err()
1079                .to_string()
1080                .contains("No healthy transport clients available")
1081        );
1082
1083        let metrics = broadcaster.get_metrics_async().await;
1084        assert_eq!(metrics.failed_submits, 1);
1085    }
1086
1087    #[tokio::test]
1088    async fn test_default_config() {
1089        let config = SubmitBroadcasterConfig {
1090            api_key: Some("test_key".to_string()),
1091            api_secret: Some("test_secret".to_string()),
1092            base_url: Some("https://test.example.com".to_string()),
1093            ..Default::default()
1094        };
1095
1096        let broadcaster = SubmitBroadcaster::new(config);
1097        assert!(broadcaster.is_ok());
1098
1099        let broadcaster = broadcaster.unwrap();
1100        let metrics = broadcaster.get_metrics_async().await;
1101
1102        // Default pool_size is 3
1103        assert_eq!(metrics.total_clients, 3);
1104    }
1105
1106    #[tokio::test]
1107    async fn test_broadcaster_lifecycle() {
1108        let config = SubmitBroadcasterConfig {
1109            pool_size: 2,
1110            api_key: Some("test_key".to_string()),
1111            api_secret: Some("test_secret".to_string()),
1112            base_url: Some("https://test.example.com".to_string()),
1113            testnet: false,
1114            timeout_secs: Some(5),
1115            max_retries: None,
1116            retry_delay_ms: None,
1117            retry_delay_max_ms: None,
1118            recv_window_ms: None,
1119            max_requests_per_second: None,
1120            max_requests_per_minute: None,
1121            health_check_interval_secs: 60,
1122            health_check_timeout_secs: 1,
1123            expected_reject_patterns: vec![],
1124            proxy_urls: vec![],
1125        };
1126
1127        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1128
1129        // Should not be running initially
1130        assert!(!broadcaster.running.load(Ordering::Relaxed));
1131
1132        // Start broadcaster
1133        let start_result = broadcaster.start().await;
1134        assert!(start_result.is_ok());
1135        assert!(broadcaster.running.load(Ordering::Relaxed));
1136
1137        // Starting again should be idempotent
1138        let start_again = broadcaster.start().await;
1139        assert!(start_again.is_ok());
1140
1141        // Stop broadcaster
1142        broadcaster.stop().await;
1143        assert!(!broadcaster.running.load(Ordering::Relaxed));
1144
1145        // Stopping again should be safe
1146        broadcaster.stop().await;
1147        assert!(!broadcaster.running.load(Ordering::Relaxed));
1148    }
1149
1150    #[tokio::test]
1151    async fn test_broadcast_submit_metrics_increment() {
1152        let report = create_test_report("ORDER-1");
1153        let report_clone = report.clone();
1154
1155        let transports = vec![create_stub_transport("client-0", move || {
1156            let report = report_clone.clone();
1157            async move { Ok(report) }
1158        })];
1159
1160        let config = SubmitBroadcasterConfig::default();
1161        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1162
1163        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1164        let _ = broadcaster
1165            .broadcast_submit(
1166                instrument_id,
1167                ClientOrderId::from("O-123"),
1168                OrderSide::Buy,
1169                OrderType::Limit,
1170                Quantity::new(100.0, 0),
1171                TimeInForce::Gtc,
1172                Some(Price::new(50000.0, 2)),
1173                None,
1174                None,
1175                None,
1176                false,
1177                false,
1178                None,
1179                None,
1180                None,
1181            )
1182            .await;
1183
1184        let metrics = broadcaster.get_metrics_async().await;
1185        assert_eq!(metrics.total_submits, 1);
1186        assert_eq!(metrics.successful_submits, 1);
1187        assert_eq!(metrics.failed_submits, 0);
1188    }
1189
1190    #[tokio::test]
1191    async fn test_broadcaster_creation_with_pool() {
1192        let config = SubmitBroadcasterConfig {
1193            pool_size: 4,
1194            api_key: Some("test_key".to_string()),
1195            api_secret: Some("test_secret".to_string()),
1196            base_url: Some("https://test.example.com".to_string()),
1197            ..Default::default()
1198        };
1199
1200        let broadcaster = SubmitBroadcaster::new(config);
1201        assert!(broadcaster.is_ok());
1202
1203        let broadcaster = broadcaster.unwrap();
1204        let metrics = broadcaster.get_metrics_async().await;
1205        assert_eq!(metrics.total_clients, 4);
1206    }
1207
1208    #[tokio::test]
1209    async fn test_client_stats_collection() {
1210        // Both clients fail so broadcast waits for all of them (no early abort on success).
1211        // This ensures both clients execute and record their stats before the function returns.
1212        let transports = vec![
1213            create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
1214            create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1215        ];
1216
1217        let config = SubmitBroadcasterConfig::default();
1218        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1219
1220        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1221        let _ = broadcaster
1222            .broadcast_submit(
1223                instrument_id,
1224                ClientOrderId::from("O-123"),
1225                OrderSide::Buy,
1226                OrderType::Limit,
1227                Quantity::new(100.0, 0),
1228                TimeInForce::Gtc,
1229                Some(Price::new(50000.0, 2)),
1230                None,
1231                None,
1232                None,
1233                false,
1234                false,
1235                None,
1236                None,
1237                None,
1238            )
1239            .await;
1240
1241        let stats = broadcaster.get_client_stats_async().await;
1242        assert_eq!(stats.len(), 2);
1243
1244        let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1245        assert_eq!(client0.submit_count, 1);
1246        assert_eq!(client0.error_count, 1);
1247
1248        let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1249        assert_eq!(client1.submit_count, 1);
1250        assert_eq!(client1.error_count, 1);
1251    }
1252
1253    #[tokio::test]
1254    async fn test_testnet_config_sets_base_url() {
1255        let config = SubmitBroadcasterConfig {
1256            pool_size: 1,
1257            api_key: Some("test_key".to_string()),
1258            api_secret: Some("test_secret".to_string()),
1259            testnet: true,
1260            base_url: None,
1261            ..Default::default()
1262        };
1263
1264        let broadcaster = SubmitBroadcaster::new(config);
1265        assert!(broadcaster.is_ok());
1266    }
1267
1268    #[tokio::test]
1269    async fn test_clone_for_async() {
1270        let config = SubmitBroadcasterConfig {
1271            pool_size: 1,
1272            api_key: Some("test_key".to_string()),
1273            api_secret: Some("test_secret".to_string()),
1274            base_url: Some("https://test.example.com".to_string()),
1275            ..Default::default()
1276        };
1277
1278        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1279        let cloned = broadcaster.clone_for_async();
1280
1281        // Verify they share the same atomics
1282        broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1283        assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1284    }
1285
1286    #[tokio::test]
1287    async fn test_pattern_matching() {
1288        let config = SubmitBroadcasterConfig {
1289            expected_reject_patterns: vec![
1290                "Duplicate clOrdID".to_string(),
1291                "Order already exists".to_string(),
1292            ],
1293            ..Default::default()
1294        };
1295
1296        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1297
1298        assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1299        assert!(broadcaster.is_expected_reject("Order already exists in system"));
1300        assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1301        assert!(!broadcaster.is_expected_reject("Internal server error"));
1302    }
1303
1304    #[tokio::test]
1305    async fn test_submit_metrics_with_mixed_responses() {
1306        let report = create_test_report("ORDER-1");
1307        let report_clone = report.clone();
1308
1309        let transports = vec![
1310            create_stub_transport("client-0", move || {
1311                let report = report_clone.clone();
1312                async move { Ok(report) }
1313            }),
1314            create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1315        ];
1316
1317        let config = SubmitBroadcasterConfig::default();
1318        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1319
1320        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1321        let result = broadcaster
1322            .broadcast_submit(
1323                instrument_id,
1324                ClientOrderId::from("O-123"),
1325                OrderSide::Buy,
1326                OrderType::Limit,
1327                Quantity::new(100.0, 0),
1328                TimeInForce::Gtc,
1329                Some(Price::new(50000.0, 2)),
1330                None,
1331                None,
1332                None,
1333                false,
1334                false,
1335                None,
1336                None,
1337                None,
1338            )
1339            .await;
1340
1341        assert!(result.is_ok());
1342
1343        let metrics = broadcaster.get_metrics_async().await;
1344        assert_eq!(metrics.total_submits, 1);
1345        assert_eq!(metrics.successful_submits, 1);
1346        assert_eq!(metrics.failed_submits, 0);
1347    }
1348
1349    #[tokio::test]
1350    async fn test_metrics_initialization_and_health() {
1351        let config = SubmitBroadcasterConfig {
1352            pool_size: 2,
1353            api_key: Some("test_key".to_string()),
1354            api_secret: Some("test_secret".to_string()),
1355            base_url: Some("https://test.example.com".to_string()),
1356            ..Default::default()
1357        };
1358
1359        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1360        let metrics = broadcaster.get_metrics_async().await;
1361
1362        assert_eq!(metrics.total_submits, 0);
1363        assert_eq!(metrics.successful_submits, 0);
1364        assert_eq!(metrics.failed_submits, 0);
1365        assert_eq!(metrics.expected_rejects, 0);
1366        assert_eq!(metrics.total_clients, 2);
1367        assert_eq!(metrics.healthy_clients, 2);
1368    }
1369
1370    #[tokio::test]
1371    async fn test_health_check_task_lifecycle() {
1372        let config = SubmitBroadcasterConfig {
1373            pool_size: 2,
1374            api_key: Some("test_key".to_string()),
1375            api_secret: Some("test_secret".to_string()),
1376            base_url: Some("https://test.example.com".to_string()),
1377            health_check_interval_secs: 1,
1378            ..Default::default()
1379        };
1380
1381        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1382
1383        // Start should spawn health check task
1384        broadcaster.start().await.unwrap();
1385        assert!(broadcaster.running.load(Ordering::Relaxed));
1386        assert!(
1387            broadcaster
1388                .health_check_task
1389                .read()
1390                .await
1391                .as_ref()
1392                .is_some()
1393        );
1394
1395        // Stop should clean up task
1396        broadcaster.stop().await;
1397        assert!(!broadcaster.running.load(Ordering::Relaxed));
1398    }
1399
1400    #[tokio::test]
1401    async fn test_expected_reject_pattern_comprehensive() {
1402        let transports = vec![
1403            create_stub_transport("client-0", || async {
1404                anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1405            }),
1406            create_stub_transport("client-1", || async {
1407                tokio::time::sleep(Duration::from_secs(10)).await;
1408                anyhow::bail!("Should be aborted")
1409            }),
1410        ];
1411
1412        let config = SubmitBroadcasterConfig::default();
1413        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1414
1415        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1416        let result = broadcaster
1417            .broadcast_submit(
1418                instrument_id,
1419                ClientOrderId::from("O-123"),
1420                OrderSide::Buy,
1421                OrderType::Limit,
1422                Quantity::new(100.0, 0),
1423                TimeInForce::Gtc,
1424                Some(Price::new(50000.0, 2)),
1425                None,
1426                None,
1427                None,
1428                false,
1429                false,
1430                None,
1431                None,
1432                None,
1433            )
1434            .await;
1435
1436        // All failed with expected reject
1437        assert!(result.is_err());
1438
1439        let metrics = broadcaster.get_metrics_async().await;
1440        assert_eq!(metrics.expected_rejects, 1);
1441        assert_eq!(metrics.failed_submits, 1);
1442        assert_eq!(metrics.successful_submits, 0);
1443    }
1444
1445    #[tokio::test]
1446    async fn test_client_order_id_suffix_for_multiple_clients() {
1447        use std::sync::{Arc, Mutex};
1448
1449        #[derive(Clone)]
1450        struct CaptureExecutor {
1451            captured_ids: Arc<Mutex<Vec<String>>>,
1452            barrier: Arc<tokio::sync::Barrier>,
1453            report: OrderStatusReport,
1454        }
1455
1456        impl SubmitExecutor for CaptureExecutor {
1457            fn health_check(
1458                &self,
1459            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1460                Box::pin(async { Ok(()) })
1461            }
1462
1463            #[allow(clippy::too_many_arguments)]
1464            fn submit_order(
1465                &self,
1466                _instrument_id: InstrumentId,
1467                client_order_id: ClientOrderId,
1468                _order_side: OrderSide,
1469                _order_type: OrderType,
1470                _quantity: Quantity,
1471                _time_in_force: TimeInForce,
1472                _price: Option<Price>,
1473                _trigger_price: Option<Price>,
1474                _trigger_type: Option<TriggerType>,
1475                _display_qty: Option<Quantity>,
1476                _post_only: bool,
1477                _reduce_only: bool,
1478                _order_list_id: Option<OrderListId>,
1479                _contingency_type: Option<ContingencyType>,
1480            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1481            {
1482                // Capture the client_order_id
1483                self.captured_ids
1484                    .lock()
1485                    .unwrap()
1486                    .push(client_order_id.as_str().to_string());
1487                let report = self.report.clone();
1488                let barrier = Arc::clone(&self.barrier);
1489                // Wait for all tasks to capture their IDs before any completes
1490                // (with concurrent execution, first success aborts others)
1491                Box::pin(async move {
1492                    barrier.wait().await;
1493                    Ok(report)
1494                })
1495            }
1496
1497            fn add_instrument(&self, _instrument: InstrumentAny) {}
1498        }
1499
1500        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1501        let barrier = Arc::new(tokio::sync::Barrier::new(3));
1502        let report = create_test_report("ORDER-1");
1503
1504        let transports = vec![
1505            TransportClient::new(
1506                CaptureExecutor {
1507                    captured_ids: Arc::clone(&captured_ids),
1508                    barrier: Arc::clone(&barrier),
1509                    report: report.clone(),
1510                },
1511                "client-0".to_string(),
1512            ),
1513            TransportClient::new(
1514                CaptureExecutor {
1515                    captured_ids: Arc::clone(&captured_ids),
1516                    barrier: Arc::clone(&barrier),
1517                    report: report.clone(),
1518                },
1519                "client-1".to_string(),
1520            ),
1521            TransportClient::new(
1522                CaptureExecutor {
1523                    captured_ids: Arc::clone(&captured_ids),
1524                    barrier: Arc::clone(&barrier),
1525                    report: report.clone(),
1526                },
1527                "client-2".to_string(),
1528            ),
1529        ];
1530
1531        let config = SubmitBroadcasterConfig::default();
1532        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1533
1534        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1535        let result = broadcaster
1536            .broadcast_submit(
1537                instrument_id,
1538                ClientOrderId::from("O-123"),
1539                OrderSide::Buy,
1540                OrderType::Limit,
1541                Quantity::new(100.0, 0),
1542                TimeInForce::Gtc,
1543                Some(Price::new(50000.0, 2)),
1544                None,
1545                None,
1546                None,
1547                false,
1548                false,
1549                None,
1550                None,
1551                None,
1552            )
1553            .await;
1554
1555        assert!(result.is_ok());
1556
1557        // Check captured client_order_ids (order is non-deterministic with concurrent execution)
1558        let ids = captured_ids.lock().unwrap();
1559        assert_eq!(ids.len(), 3);
1560        assert!(ids.contains(&"O-123".to_string())); // First client gets original ID
1561        assert!(ids.contains(&"O-123-1".to_string())); // Second client gets suffix -1
1562        assert!(ids.contains(&"O-123-2".to_string())); // Third client gets suffix -2
1563    }
1564
1565    #[tokio::test]
1566    async fn test_client_order_id_suffix_with_partial_failure() {
1567        use std::sync::{Arc, Mutex};
1568
1569        #[derive(Clone)]
1570        struct CaptureAndFailExecutor {
1571            captured_ids: Arc<Mutex<Vec<String>>>,
1572            barrier: Arc<tokio::sync::Barrier>,
1573            should_succeed: bool,
1574        }
1575
1576        impl SubmitExecutor for CaptureAndFailExecutor {
1577            fn health_check(
1578                &self,
1579            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1580                Box::pin(async { Ok(()) })
1581            }
1582
1583            #[allow(clippy::too_many_arguments)]
1584            fn submit_order(
1585                &self,
1586                _instrument_id: InstrumentId,
1587                client_order_id: ClientOrderId,
1588                _order_side: OrderSide,
1589                _order_type: OrderType,
1590                _quantity: Quantity,
1591                _time_in_force: TimeInForce,
1592                _price: Option<Price>,
1593                _trigger_price: Option<Price>,
1594                _trigger_type: Option<TriggerType>,
1595                _display_qty: Option<Quantity>,
1596                _post_only: bool,
1597                _reduce_only: bool,
1598                _order_list_id: Option<OrderListId>,
1599                _contingency_type: Option<ContingencyType>,
1600            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1601            {
1602                // Capture the client_order_id
1603                self.captured_ids
1604                    .lock()
1605                    .unwrap()
1606                    .push(client_order_id.as_str().to_string());
1607                let barrier = Arc::clone(&self.barrier);
1608                let should_succeed = self.should_succeed;
1609                // Wait for all tasks to capture their IDs before any completes
1610                // (with concurrent execution, first success aborts others)
1611                Box::pin(async move {
1612                    barrier.wait().await;
1613                    if should_succeed {
1614                        Ok(create_test_report("ORDER-1"))
1615                    } else {
1616                        anyhow::bail!("Network error")
1617                    }
1618                })
1619            }
1620
1621            fn add_instrument(&self, _instrument: InstrumentAny) {}
1622        }
1623
1624        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1625        let barrier = Arc::new(tokio::sync::Barrier::new(2));
1626
1627        let transports = vec![
1628            TransportClient::new(
1629                CaptureAndFailExecutor {
1630                    captured_ids: Arc::clone(&captured_ids),
1631                    barrier: Arc::clone(&barrier),
1632                    should_succeed: false,
1633                },
1634                "client-0".to_string(),
1635            ),
1636            TransportClient::new(
1637                CaptureAndFailExecutor {
1638                    captured_ids: Arc::clone(&captured_ids),
1639                    barrier: Arc::clone(&barrier),
1640                    should_succeed: true,
1641                },
1642                "client-1".to_string(),
1643            ),
1644        ];
1645
1646        let config = SubmitBroadcasterConfig::default();
1647        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1648
1649        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1650        let result = broadcaster
1651            .broadcast_submit(
1652                instrument_id,
1653                ClientOrderId::from("O-456"),
1654                OrderSide::Sell,
1655                OrderType::Market,
1656                Quantity::new(50.0, 0),
1657                TimeInForce::Ioc,
1658                None,
1659                None,
1660                None,
1661                None,
1662                false,
1663                false,
1664                None,
1665                None,
1666                None,
1667            )
1668            .await;
1669
1670        assert!(result.is_ok());
1671
1672        // Check captured client_order_ids (order is non-deterministic with concurrent execution)
1673        let ids = captured_ids.lock().unwrap();
1674        assert_eq!(ids.len(), 2);
1675        assert!(ids.contains(&"O-456".to_string())); // First client gets original ID
1676        assert!(ids.contains(&"O-456-1".to_string())); // Second client gets suffix -1
1677    }
1678
1679    #[tokio::test]
1680    async fn test_proxy_urls_populated_from_config() {
1681        let config = SubmitBroadcasterConfig {
1682            pool_size: 3,
1683            api_key: Some("test_key".to_string()),
1684            api_secret: Some("test_secret".to_string()),
1685            proxy_urls: vec![
1686                Some("http://proxy1:8080".to_string()),
1687                Some("http://proxy2:8080".to_string()),
1688                Some("http://proxy3:8080".to_string()),
1689            ],
1690            ..Default::default()
1691        };
1692
1693        assert_eq!(config.proxy_urls.len(), 3);
1694        assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1695        assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1696        assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1697    }
1698}