nautilus_bitmex/execution/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Submit request broadcaster for redundant order submission.
17//!
18//! This module provides the [`SubmitBroadcaster`] which fans out submit requests
19//! to multiple HTTP clients in parallel for redundancy. The broadcaster is triggered
20//! when the `SubmitOrder` command contains `params["broadcast_submit_tries"]`.
21//!
22//! Key design patterns:
23//!
24//! - **Dependency injection via traits**: Uses `SubmitExecutor` trait to abstract
25//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
26//! - **Trait objects over generics**: Uses `Arc<dyn SubmitExecutor>` to avoid
27//!   generic type parameters on the public API (simpler Python FFI).
28//! - **Short-circuit on first success**: Aborts remaining requests once any client
29//!   succeeds, minimizing latency.
30//! - **Idempotent rejection handling**: Recognizes duplicate clOrdID as expected
31//!   rejections for debug-level logging without noise.
32
33// TODO: Replace boxed futures in `SubmitExecutor` once stable async trait object support
34// lands so we can drop the per-call heap allocation
35
36use std::{
37    fmt::Debug,
38    future::Future,
39    pin::Pin,
40    sync::{
41        Arc,
42        atomic::{AtomicBool, AtomicU64, Ordering},
43    },
44    time::Duration,
45};
46
47use futures_util::future;
48use nautilus_common::live::get_runtime;
49use nautilus_model::{
50    enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
51    identifiers::{ClientOrderId, InstrumentId, OrderListId},
52    instruments::InstrumentAny,
53    reports::OrderStatusReport,
54    types::{Price, Quantity},
55};
56use tokio::{sync::RwLock, task::JoinHandle, time::interval};
57
58use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
59
60/// Trait for order submission operations.
61///
62/// This trait abstracts the execution layer to enable dependency injection and testing
63/// without conditional compilation. The broadcaster holds executors as `Arc<dyn SubmitExecutor>`
64/// to avoid generic type parameters that would complicate the Python FFI boundary.
65///
66/// # Thread Safety
67///
68/// All methods must be safe to call concurrently from multiple threads. Implementations
69/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
70///
71/// # Error Handling
72///
73/// Methods return `anyhow::Result` for flexibility. Implementers should provide
74/// meaningful error messages that can be logged and tracked by the broadcaster.
75///
76/// # Implementation Note
77///
78/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
79/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
80/// `Clone`) to be used without modification.
81trait SubmitExecutor: Send + Sync {
82    /// Adds an instrument for caching.
83    fn add_instrument(&self, instrument: InstrumentAny);
84
85    /// Performs a health check on the executor.
86    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
87
88    /// Submits a single order.
89    #[allow(clippy::too_many_arguments)]
90    fn submit_order(
91        &self,
92        instrument_id: InstrumentId,
93        client_order_id: ClientOrderId,
94        order_side: OrderSide,
95        order_type: OrderType,
96        quantity: Quantity,
97        time_in_force: TimeInForce,
98        price: Option<Price>,
99        trigger_price: Option<Price>,
100        trigger_type: Option<TriggerType>,
101        display_qty: Option<Quantity>,
102        post_only: bool,
103        reduce_only: bool,
104        order_list_id: Option<OrderListId>,
105        contingency_type: Option<ContingencyType>,
106    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
107}
108
109impl SubmitExecutor for BitmexHttpClient {
110    fn add_instrument(&self, instrument: InstrumentAny) {
111        Self::cache_instrument(self, instrument);
112    }
113
114    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
115        Box::pin(async move {
116            Self::get_server_time(self)
117                .await
118                .map(|_| ())
119                .map_err(|e| anyhow::anyhow!("{e}"))
120        })
121    }
122
123    #[allow(clippy::too_many_arguments)]
124    fn submit_order(
125        &self,
126        instrument_id: InstrumentId,
127        client_order_id: ClientOrderId,
128        order_side: OrderSide,
129        order_type: OrderType,
130        quantity: Quantity,
131        time_in_force: TimeInForce,
132        price: Option<Price>,
133        trigger_price: Option<Price>,
134        trigger_type: Option<TriggerType>,
135        display_qty: Option<Quantity>,
136        post_only: bool,
137        reduce_only: bool,
138        order_list_id: Option<OrderListId>,
139        contingency_type: Option<ContingencyType>,
140    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
141        Box::pin(async move {
142            Self::submit_order(
143                self,
144                instrument_id,
145                client_order_id,
146                order_side,
147                order_type,
148                quantity,
149                time_in_force,
150                price,
151                trigger_price,
152                trigger_type,
153                display_qty,
154                post_only,
155                reduce_only,
156                order_list_id,
157                contingency_type,
158            )
159            .await
160        })
161    }
162}
163
164/// Configuration for the submit broadcaster.
165#[derive(Debug, Clone)]
166pub struct SubmitBroadcasterConfig {
167    /// Number of HTTP clients in the pool.
168    pub pool_size: usize,
169    /// BitMEX API key (None will source from environment).
170    pub api_key: Option<String>,
171    /// BitMEX API secret (None will source from environment).
172    pub api_secret: Option<String>,
173    /// Base URL for BitMEX HTTP API.
174    pub base_url: Option<String>,
175    /// If connecting to BitMEX testnet.
176    pub testnet: bool,
177    /// Timeout in seconds for HTTP requests.
178    pub timeout_secs: Option<u64>,
179    /// Maximum number of retry attempts for failed requests.
180    pub max_retries: Option<u32>,
181    /// Initial delay in milliseconds between retry attempts.
182    pub retry_delay_ms: Option<u64>,
183    /// Maximum delay in milliseconds between retry attempts.
184    pub retry_delay_max_ms: Option<u64>,
185    /// Expiration window in milliseconds for signed requests.
186    pub recv_window_ms: Option<u64>,
187    /// Maximum REST burst rate (requests per second).
188    pub max_requests_per_second: Option<u32>,
189    /// Maximum REST rolling rate (requests per minute).
190    pub max_requests_per_minute: Option<u32>,
191    /// Interval in seconds between health check pings.
192    pub health_check_interval_secs: u64,
193    /// Timeout in seconds for health check requests.
194    pub health_check_timeout_secs: u64,
195    /// Substrings to identify expected submit rejections for debug-level logging.
196    pub expected_reject_patterns: Vec<String>,
197    /// Optional list of proxy URLs for path diversity.
198    ///
199    /// Each transport instance uses the proxy at its index. If the list is shorter
200    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
201    /// are ignored.
202    pub proxy_urls: Vec<Option<String>>,
203}
204
205impl Default for SubmitBroadcasterConfig {
206    fn default() -> Self {
207        Self {
208            pool_size: 3,
209            api_key: None,
210            api_secret: None,
211            base_url: None,
212            testnet: false,
213            timeout_secs: Some(60),
214            max_retries: None,
215            retry_delay_ms: Some(1_000),
216            retry_delay_max_ms: Some(5_000),
217            recv_window_ms: Some(10_000),
218            max_requests_per_second: Some(10),
219            max_requests_per_minute: Some(120),
220            health_check_interval_secs: 30,
221            health_check_timeout_secs: 5,
222            expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
223            proxy_urls: vec![],
224        }
225    }
226}
227
228/// Transport client wrapper with health monitoring.
229#[derive(Clone)]
230struct TransportClient {
231    /// Executor wrapped in Arc to enable cloning without requiring Clone on SubmitExecutor.
232    ///
233    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
234    /// the executor across multiple TransportClient clones.
235    executor: Arc<dyn SubmitExecutor>,
236    client_id: String,
237    healthy: Arc<AtomicBool>,
238    submit_count: Arc<AtomicU64>,
239    error_count: Arc<AtomicU64>,
240}
241
242impl Debug for TransportClient {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        f.debug_struct(stringify!(TransportClient))
245            .field("client_id", &self.client_id)
246            .field("healthy", &self.healthy)
247            .field("submit_count", &self.submit_count)
248            .field("error_count", &self.error_count)
249            .finish()
250    }
251}
252
253impl TransportClient {
254    fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
255        Self {
256            executor: Arc::new(executor),
257            client_id,
258            healthy: Arc::new(AtomicBool::new(true)),
259            submit_count: Arc::new(AtomicU64::new(0)),
260            error_count: Arc::new(AtomicU64::new(0)),
261        }
262    }
263
264    fn is_healthy(&self) -> bool {
265        self.healthy.load(Ordering::Relaxed)
266    }
267
268    fn mark_healthy(&self) {
269        self.healthy.store(true, Ordering::Relaxed);
270    }
271
272    fn mark_unhealthy(&self) {
273        self.healthy.store(false, Ordering::Relaxed);
274    }
275
276    fn get_submit_count(&self) -> u64 {
277        self.submit_count.load(Ordering::Relaxed)
278    }
279
280    fn get_error_count(&self) -> u64 {
281        self.error_count.load(Ordering::Relaxed)
282    }
283
284    async fn health_check(&self, timeout_secs: u64) -> bool {
285        match tokio::time::timeout(
286            Duration::from_secs(timeout_secs),
287            self.executor.health_check(),
288        )
289        .await
290        {
291            Ok(Ok(())) => {
292                self.mark_healthy();
293                true
294            }
295            Ok(Err(e)) => {
296                log::warn!("Health check failed for client {}: {e:?}", self.client_id);
297                self.mark_unhealthy();
298                false
299            }
300            Err(_) => {
301                log::warn!("Health check timeout for client {}", self.client_id);
302                self.mark_unhealthy();
303                false
304            }
305        }
306    }
307
308    #[allow(clippy::too_many_arguments)]
309    async fn submit_order(
310        &self,
311        instrument_id: InstrumentId,
312        client_order_id: ClientOrderId,
313        order_side: OrderSide,
314        order_type: OrderType,
315        quantity: Quantity,
316        time_in_force: TimeInForce,
317        price: Option<Price>,
318        trigger_price: Option<Price>,
319        trigger_type: Option<TriggerType>,
320        display_qty: Option<Quantity>,
321        post_only: bool,
322        reduce_only: bool,
323        order_list_id: Option<OrderListId>,
324        contingency_type: Option<ContingencyType>,
325    ) -> anyhow::Result<OrderStatusReport> {
326        self.submit_count.fetch_add(1, Ordering::Relaxed);
327
328        match self
329            .executor
330            .submit_order(
331                instrument_id,
332                client_order_id,
333                order_side,
334                order_type,
335                quantity,
336                time_in_force,
337                price,
338                trigger_price,
339                trigger_type,
340                display_qty,
341                post_only,
342                reduce_only,
343                order_list_id,
344                contingency_type,
345            )
346            .await
347        {
348            Ok(report) => {
349                self.mark_healthy();
350                Ok(report)
351            }
352            Err(e) => {
353                self.error_count.fetch_add(1, Ordering::Relaxed);
354                Err(e)
355            }
356        }
357    }
358}
359
360/// Broadcasts submit requests to multiple HTTP clients for redundancy.
361///
362/// This broadcaster fans out submit requests to multiple pre-warmed HTTP clients
363/// in parallel, short-circuits when the first successful acknowledgement is received,
364/// and handles expected rejection patterns (duplicate clOrdID) with appropriate log levels.
365#[cfg_attr(feature = "python", pyo3::pyclass)]
366#[derive(Debug)]
367pub struct SubmitBroadcaster {
368    config: SubmitBroadcasterConfig,
369    transports: Arc<Vec<TransportClient>>,
370    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
371    running: Arc<AtomicBool>,
372    total_submits: Arc<AtomicU64>,
373    successful_submits: Arc<AtomicU64>,
374    failed_submits: Arc<AtomicU64>,
375    expected_rejects: Arc<AtomicU64>,
376}
377
378impl SubmitBroadcaster {
379    /// Creates a new [`SubmitBroadcaster`] with internal HTTP client pool.
380    ///
381    /// # Errors
382    ///
383    /// Returns an error if any HTTP client fails to initialize.
384    pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
385        let mut transports = Vec::with_capacity(config.pool_size);
386
387        // Synthesize base_url when testnet is true but base_url is None
388        let base_url = if config.testnet && config.base_url.is_none() {
389            Some(BITMEX_HTTP_TESTNET_URL.to_string())
390        } else {
391            config.base_url.clone()
392        };
393
394        for i in 0..config.pool_size {
395            // Assign proxy from config list, or None if index exceeds list length
396            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
397
398            let client = BitmexHttpClient::with_credentials(
399                config.api_key.clone(),
400                config.api_secret.clone(),
401                base_url.clone(),
402                config.timeout_secs,
403                config.max_retries,
404                config.retry_delay_ms,
405                config.retry_delay_max_ms,
406                config.recv_window_ms,
407                config.max_requests_per_second,
408                config.max_requests_per_minute,
409                proxy_url,
410            )
411            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
412
413            transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
414        }
415
416        Ok(Self {
417            config,
418            transports: Arc::new(transports),
419            health_check_task: Arc::new(RwLock::new(None)),
420            running: Arc::new(AtomicBool::new(false)),
421            total_submits: Arc::new(AtomicU64::new(0)),
422            successful_submits: Arc::new(AtomicU64::new(0)),
423            failed_submits: Arc::new(AtomicU64::new(0)),
424            expected_rejects: Arc::new(AtomicU64::new(0)),
425        })
426    }
427
428    /// Starts the broadcaster and health check loop.
429    ///
430    /// # Errors
431    ///
432    /// Returns an error if the broadcaster is already running.
433    pub async fn start(&self) -> anyhow::Result<()> {
434        if self.running.load(Ordering::Relaxed) {
435            return Ok(());
436        }
437
438        self.running.store(true, Ordering::Relaxed);
439
440        // Initial health check for all clients
441        self.run_health_checks().await;
442
443        // Start periodic health check task
444        let transports = Arc::clone(&self.transports);
445        let running = Arc::clone(&self.running);
446        let interval_secs = self.config.health_check_interval_secs;
447        let timeout_secs = self.config.health_check_timeout_secs;
448
449        let task = get_runtime().spawn(async move {
450            let mut ticker = interval(Duration::from_secs(interval_secs));
451            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
452
453            loop {
454                ticker.tick().await;
455
456                if !running.load(Ordering::Relaxed) {
457                    break;
458                }
459
460                let tasks: Vec<_> = transports
461                    .iter()
462                    .map(|t| t.health_check(timeout_secs))
463                    .collect();
464
465                let results = future::join_all(tasks).await;
466                let healthy_count = results.iter().filter(|&&r| r).count();
467
468                log::debug!(
469                    "Health check complete: {healthy_count}/{} clients healthy",
470                    results.len()
471                );
472            }
473        });
474
475        *self.health_check_task.write().await = Some(task);
476
477        log::info!(
478            "SubmitBroadcaster started with {} clients",
479            self.transports.len()
480        );
481
482        Ok(())
483    }
484
485    /// Stops the broadcaster and health check loop.
486    pub async fn stop(&self) {
487        if !self.running.load(Ordering::Relaxed) {
488            return;
489        }
490
491        self.running.store(false, Ordering::Relaxed);
492
493        if let Some(task) = self.health_check_task.write().await.take() {
494            task.abort();
495        }
496
497        log::info!("SubmitBroadcaster stopped");
498    }
499
500    async fn run_health_checks(&self) {
501        let tasks: Vec<_> = self
502            .transports
503            .iter()
504            .map(|t| t.health_check(self.config.health_check_timeout_secs))
505            .collect();
506
507        let results = future::join_all(tasks).await;
508        let healthy_count = results.iter().filter(|&&r| r).count();
509
510        log::debug!(
511            "Health check complete: {healthy_count}/{} clients healthy",
512            results.len()
513        );
514    }
515
516    fn is_expected_reject(&self, error_message: &str) -> bool {
517        self.config
518            .expected_reject_patterns
519            .iter()
520            .any(|pattern| error_message.contains(pattern))
521    }
522
523    /// Processes submit request results, handling success and failures.
524    ///
525    /// This helper consolidates the common error handling loop used for submit broadcasts.
526    async fn process_submit_results<T>(
527        &self,
528        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
529        operation: &str,
530        params: String,
531    ) -> anyhow::Result<T>
532    where
533        T: Send + 'static,
534    {
535        let mut errors = Vec::new();
536
537        while !handles.is_empty() {
538            let current_handles = std::mem::take(&mut handles);
539            let (result, _idx, remaining) = future::select_all(current_handles).await;
540            handles = remaining.into_iter().collect();
541
542            match result {
543                Ok((client_id, Ok(result))) => {
544                    // First success - abort remaining handles
545                    for handle in &handles {
546                        handle.abort();
547                    }
548                    self.successful_submits.fetch_add(1, Ordering::Relaxed);
549                    log::debug!("{operation} broadcast succeeded [{client_id}] {params}",);
550                    return Ok(result);
551                }
552                Ok((client_id, Err(e))) => {
553                    let error_msg = e.to_string();
554
555                    if self.is_expected_reject(&error_msg) {
556                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
557                        log::debug!(
558                            "Expected {} rejection [{client_id}]: {error_msg} {params}",
559                            operation.to_lowercase(),
560                        );
561                        errors.push(error_msg);
562                    } else {
563                        log::warn!(
564                            "{operation} request failed [{client_id}]: {error_msg} {params}",
565                        );
566                        errors.push(error_msg);
567                    }
568                }
569                Err(e) => {
570                    log::warn!("{operation} task join error: {e:?}");
571                    errors.push(format!("Task panicked: {e:?}"));
572                }
573            }
574        }
575
576        // All tasks failed
577        self.failed_submits.fetch_add(1, Ordering::Relaxed);
578        log::error!(
579            "All {} requests failed: {errors:?} {params}",
580            operation.to_lowercase(),
581        );
582        Err(anyhow::anyhow!(
583            "All {} requests failed: {:?}",
584            operation.to_lowercase(),
585            errors
586        ))
587    }
588
589    /// Broadcasts a submit request to all healthy clients in parallel.
590    ///
591    /// # Returns
592    ///
593    /// - `Ok(report)` if successfully submitted with a report.
594    /// - `Err` if all requests failed.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if all submit requests fail or no healthy clients are available.
599    #[allow(clippy::too_many_arguments)]
600    pub async fn broadcast_submit(
601        &self,
602        instrument_id: InstrumentId,
603        client_order_id: ClientOrderId,
604        order_side: OrderSide,
605        order_type: OrderType,
606        quantity: Quantity,
607        time_in_force: TimeInForce,
608        price: Option<Price>,
609        trigger_price: Option<Price>,
610        trigger_type: Option<TriggerType>,
611        display_qty: Option<Quantity>,
612        post_only: bool,
613        reduce_only: bool,
614        order_list_id: Option<OrderListId>,
615        contingency_type: Option<ContingencyType>,
616        submit_tries: Option<usize>,
617    ) -> anyhow::Result<OrderStatusReport> {
618        self.total_submits.fetch_add(1, Ordering::Relaxed);
619
620        let pool_size = self.config.pool_size;
621        let actual_tries = if let Some(t) = submit_tries {
622            if t > pool_size {
623                // Use log macro for Python visibility for now
624                log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
625            }
626            std::cmp::min(t, pool_size)
627        } else {
628            pool_size
629        };
630
631        log::debug!(
632            "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
633        );
634
635        let healthy_transports: Vec<TransportClient> = self
636            .transports
637            .iter()
638            .filter(|t| t.is_healthy())
639            .take(actual_tries)
640            .cloned()
641            .collect();
642
643        if healthy_transports.is_empty() {
644            self.failed_submits.fetch_add(1, Ordering::Relaxed);
645            anyhow::bail!("No healthy transport clients available");
646        }
647
648        log::debug!(
649            "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
650            healthy_transports.len(),
651        );
652
653        let mut handles = Vec::new();
654        for (idx, transport) in healthy_transports.into_iter().enumerate() {
655            // First client uses original ID, subsequent clients get suffix to avoid duplicates
656            let modified_client_order_id = if idx == 0 {
657                client_order_id
658            } else {
659                ClientOrderId::new(format!("{}-{}", client_order_id.as_str(), idx))
660            };
661
662            let handle = get_runtime().spawn(async move {
663                let client_id = transport.client_id.clone();
664                let result = transport
665                    .submit_order(
666                        instrument_id,
667                        modified_client_order_id,
668                        order_side,
669                        order_type,
670                        quantity,
671                        time_in_force,
672                        price,
673                        trigger_price,
674                        trigger_type,
675                        display_qty,
676                        post_only,
677                        reduce_only,
678                        order_list_id,
679                        contingency_type,
680                    )
681                    .await;
682                (client_id, result)
683            });
684            handles.push(handle);
685        }
686
687        self.process_submit_results(
688            handles,
689            "Submit",
690            format!("(client_order_id={client_order_id:?})"),
691        )
692        .await
693    }
694
695    /// Gets broadcaster metrics.
696    pub fn get_metrics(&self) -> BroadcasterMetrics {
697        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
698        let total_clients = self.transports.len();
699
700        BroadcasterMetrics {
701            total_submits: self.total_submits.load(Ordering::Relaxed),
702            successful_submits: self.successful_submits.load(Ordering::Relaxed),
703            failed_submits: self.failed_submits.load(Ordering::Relaxed),
704            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
705            healthy_clients,
706            total_clients,
707        }
708    }
709
710    /// Gets broadcaster metrics (async version for use within async context).
711    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
712        self.get_metrics()
713    }
714
715    /// Gets per-client statistics.
716    pub fn get_client_stats(&self) -> Vec<ClientStats> {
717        self.transports
718            .iter()
719            .map(|t| ClientStats {
720                client_id: t.client_id.clone(),
721                healthy: t.is_healthy(),
722                submit_count: t.get_submit_count(),
723                error_count: t.get_error_count(),
724            })
725            .collect()
726    }
727
728    /// Gets per-client statistics (async version for use within async context).
729    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
730        self.get_client_stats()
731    }
732
733    /// Caches an instrument in all HTTP clients in the pool.
734    pub fn cache_instrument(&self, instrument: InstrumentAny) {
735        for transport in self.transports.iter() {
736            transport.executor.add_instrument(instrument.clone());
737        }
738    }
739
740    #[must_use]
741    pub fn clone_for_async(&self) -> Self {
742        Self {
743            config: self.config.clone(),
744            transports: Arc::clone(&self.transports),
745            health_check_task: Arc::clone(&self.health_check_task),
746            running: Arc::clone(&self.running),
747            total_submits: Arc::clone(&self.total_submits),
748            successful_submits: Arc::clone(&self.successful_submits),
749            failed_submits: Arc::clone(&self.failed_submits),
750            expected_rejects: Arc::clone(&self.expected_rejects),
751        }
752    }
753
754    #[cfg(test)]
755    fn new_with_transports(
756        config: SubmitBroadcasterConfig,
757        transports: Vec<TransportClient>,
758    ) -> Self {
759        Self {
760            config,
761            transports: Arc::new(transports),
762            health_check_task: Arc::new(RwLock::new(None)),
763            running: Arc::new(AtomicBool::new(false)),
764            total_submits: Arc::new(AtomicU64::new(0)),
765            successful_submits: Arc::new(AtomicU64::new(0)),
766            failed_submits: Arc::new(AtomicU64::new(0)),
767            expected_rejects: Arc::new(AtomicU64::new(0)),
768        }
769    }
770}
771
772/// Broadcaster metrics snapshot.
773#[derive(Debug, Clone)]
774pub struct BroadcasterMetrics {
775    pub total_submits: u64,
776    pub successful_submits: u64,
777    pub failed_submits: u64,
778    pub expected_rejects: u64,
779    pub healthy_clients: usize,
780    pub total_clients: usize,
781}
782
783/// Per-client statistics.
784#[derive(Debug, Clone)]
785pub struct ClientStats {
786    pub client_id: String,
787    pub healthy: bool,
788    pub submit_count: u64,
789    pub error_count: u64,
790}
791
792#[cfg(test)]
793mod tests {
794    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
795
796    use nautilus_core::UUID4;
797    use nautilus_model::{
798        enums::{
799            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
800        },
801        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
802        reports::OrderStatusReport,
803        types::{Price, Quantity},
804    };
805
806    use super::*;
807
808    /// Mock executor for testing.
809    #[derive(Clone)]
810    #[allow(clippy::type_complexity)]
811    struct MockExecutor {
812        handler: Arc<
813            dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
814                + Send
815                + Sync,
816        >,
817    }
818
819    impl MockExecutor {
820        fn new<F, Fut>(handler: F) -> Self
821        where
822            F: Fn() -> Fut + Send + Sync + 'static,
823            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
824        {
825            Self {
826                handler: Arc::new(move || Box::pin(handler())),
827            }
828        }
829    }
830
831    impl SubmitExecutor for MockExecutor {
832        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
833            Box::pin(async { Ok(()) })
834        }
835
836        #[allow(clippy::too_many_arguments)]
837        fn submit_order(
838            &self,
839            _instrument_id: InstrumentId,
840            _client_order_id: ClientOrderId,
841            _order_side: OrderSide,
842            _order_type: OrderType,
843            _quantity: Quantity,
844            _time_in_force: TimeInForce,
845            _price: Option<Price>,
846            _trigger_price: Option<Price>,
847            _trigger_type: Option<TriggerType>,
848            _display_qty: Option<Quantity>,
849            _post_only: bool,
850            _reduce_only: bool,
851            _order_list_id: Option<OrderListId>,
852            _contingency_type: Option<ContingencyType>,
853        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
854            (self.handler)()
855        }
856
857        fn add_instrument(&self, _instrument: InstrumentAny) {
858            // No-op for mock
859        }
860    }
861
862    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
863        OrderStatusReport {
864            account_id: AccountId::from("BITMEX-001"),
865            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
866            venue_order_id: VenueOrderId::from(venue_order_id),
867            order_side: OrderSide::Buy,
868            order_type: OrderType::Limit,
869            time_in_force: TimeInForce::Gtc,
870            order_status: OrderStatus::Accepted,
871            price: Some(Price::new(50000.0, 2)),
872            quantity: Quantity::new(100.0, 0),
873            filled_qty: Quantity::new(0.0, 0),
874            report_id: UUID4::new(),
875            ts_accepted: 0.into(),
876            ts_last: 0.into(),
877            ts_init: 0.into(),
878            client_order_id: None,
879            avg_px: None,
880            trigger_price: None,
881            trigger_type: None,
882            contingency_type: ContingencyType::NoContingency,
883            expire_time: None,
884            order_list_id: None,
885            venue_position_id: None,
886            linked_order_ids: None,
887            parent_order_id: None,
888            display_qty: None,
889            limit_offset: None,
890            trailing_offset: None,
891            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
892            post_only: false,
893            reduce_only: false,
894            cancel_reason: None,
895            ts_triggered: None,
896        }
897    }
898
899    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
900    where
901        F: Fn() -> Fut + Send + Sync + 'static,
902        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
903    {
904        let executor = MockExecutor::new(handler);
905        TransportClient::new(executor, client_id.to_string())
906    }
907
908    #[tokio::test]
909    async fn test_broadcast_submit_immediate_success() {
910        let report = create_test_report("ORDER-1");
911        let report_clone = report.clone();
912
913        let transports = vec![
914            create_stub_transport("client-0", move || {
915                let report = report_clone.clone();
916                async move { Ok(report) }
917            }),
918            create_stub_transport("client-1", || async {
919                tokio::time::sleep(Duration::from_secs(10)).await;
920                anyhow::bail!("Should be aborted")
921            }),
922        ];
923
924        let config = SubmitBroadcasterConfig::default();
925        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
926
927        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
928        let result = broadcaster
929            .broadcast_submit(
930                instrument_id,
931                ClientOrderId::from("O-123"),
932                OrderSide::Buy,
933                OrderType::Limit,
934                Quantity::new(100.0, 0),
935                TimeInForce::Gtc,
936                Some(Price::new(50000.0, 2)),
937                None,
938                None,
939                None,
940                false,
941                false,
942                None,
943                None,
944                None,
945            )
946            .await;
947
948        assert!(result.is_ok());
949        let returned_report = result.unwrap();
950        assert_eq!(returned_report.venue_order_id, report.venue_order_id);
951
952        let metrics = broadcaster.get_metrics_async().await;
953        assert_eq!(metrics.successful_submits, 1);
954        assert_eq!(metrics.failed_submits, 0);
955        assert_eq!(metrics.total_submits, 1);
956    }
957
958    #[tokio::test]
959    async fn test_broadcast_submit_duplicate_clordid_expected() {
960        let transports = vec![
961            create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
962            create_stub_transport("client-1", || async {
963                tokio::time::sleep(Duration::from_secs(10)).await;
964                anyhow::bail!("Should be aborted")
965            }),
966        ];
967
968        let config = SubmitBroadcasterConfig::default();
969        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
970
971        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
972        let result = broadcaster
973            .broadcast_submit(
974                instrument_id,
975                ClientOrderId::from("O-123"),
976                OrderSide::Buy,
977                OrderType::Limit,
978                Quantity::new(100.0, 0),
979                TimeInForce::Gtc,
980                Some(Price::new(50000.0, 2)),
981                None,
982                None,
983                None,
984                false,
985                false,
986                None,
987                None,
988                None,
989            )
990            .await;
991
992        assert!(result.is_err());
993
994        let metrics = broadcaster.get_metrics_async().await;
995        assert_eq!(metrics.expected_rejects, 1);
996        assert_eq!(metrics.successful_submits, 0);
997        assert_eq!(metrics.failed_submits, 1);
998    }
999
1000    #[tokio::test]
1001    async fn test_broadcast_submit_all_failures() {
1002        let transports = vec![
1003            create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1004            create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1005        ];
1006
1007        let config = SubmitBroadcasterConfig::default();
1008        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1009
1010        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1011        let result = broadcaster
1012            .broadcast_submit(
1013                instrument_id,
1014                ClientOrderId::from("O-456"),
1015                OrderSide::Sell,
1016                OrderType::Market,
1017                Quantity::new(50.0, 0),
1018                TimeInForce::Ioc,
1019                None,
1020                None,
1021                None,
1022                None,
1023                false,
1024                false,
1025                None,
1026                None,
1027                None,
1028            )
1029            .await;
1030
1031        assert!(result.is_err());
1032        assert!(
1033            result
1034                .unwrap_err()
1035                .to_string()
1036                .contains("All submit requests failed")
1037        );
1038
1039        let metrics = broadcaster.get_metrics_async().await;
1040        assert_eq!(metrics.failed_submits, 1);
1041        assert_eq!(metrics.successful_submits, 0);
1042    }
1043
1044    #[tokio::test]
1045    async fn test_broadcast_submit_no_healthy_clients() {
1046        let transport =
1047            create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1048        transport.healthy.store(false, Ordering::Relaxed);
1049
1050        let config = SubmitBroadcasterConfig::default();
1051        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1052
1053        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1054        let result = broadcaster
1055            .broadcast_submit(
1056                instrument_id,
1057                ClientOrderId::from("O-789"),
1058                OrderSide::Buy,
1059                OrderType::Limit,
1060                Quantity::new(100.0, 0),
1061                TimeInForce::Gtc,
1062                Some(Price::new(50000.0, 2)),
1063                None,
1064                None,
1065                None,
1066                false,
1067                false,
1068                None,
1069                None,
1070                None,
1071            )
1072            .await;
1073
1074        assert!(result.is_err());
1075        assert!(
1076            result
1077                .unwrap_err()
1078                .to_string()
1079                .contains("No healthy transport clients available")
1080        );
1081
1082        let metrics = broadcaster.get_metrics_async().await;
1083        assert_eq!(metrics.failed_submits, 1);
1084    }
1085
1086    #[tokio::test]
1087    async fn test_default_config() {
1088        let config = SubmitBroadcasterConfig {
1089            api_key: Some("test_key".to_string()),
1090            api_secret: Some("test_secret".to_string()),
1091            base_url: Some("https://test.example.com".to_string()),
1092            ..Default::default()
1093        };
1094
1095        let broadcaster = SubmitBroadcaster::new(config);
1096        assert!(broadcaster.is_ok());
1097
1098        let broadcaster = broadcaster.unwrap();
1099        let metrics = broadcaster.get_metrics_async().await;
1100
1101        // Default pool_size is 3
1102        assert_eq!(metrics.total_clients, 3);
1103    }
1104
1105    #[tokio::test]
1106    async fn test_broadcaster_lifecycle() {
1107        let config = SubmitBroadcasterConfig {
1108            pool_size: 2,
1109            api_key: Some("test_key".to_string()),
1110            api_secret: Some("test_secret".to_string()),
1111            base_url: Some("https://test.example.com".to_string()),
1112            testnet: false,
1113            timeout_secs: Some(5),
1114            max_retries: None,
1115            retry_delay_ms: None,
1116            retry_delay_max_ms: None,
1117            recv_window_ms: None,
1118            max_requests_per_second: None,
1119            max_requests_per_minute: None,
1120            health_check_interval_secs: 60,
1121            health_check_timeout_secs: 1,
1122            expected_reject_patterns: vec![],
1123            proxy_urls: vec![],
1124        };
1125
1126        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1127
1128        // Should not be running initially
1129        assert!(!broadcaster.running.load(Ordering::Relaxed));
1130
1131        // Start broadcaster
1132        let start_result = broadcaster.start().await;
1133        assert!(start_result.is_ok());
1134        assert!(broadcaster.running.load(Ordering::Relaxed));
1135
1136        // Starting again should be idempotent
1137        let start_again = broadcaster.start().await;
1138        assert!(start_again.is_ok());
1139
1140        // Stop broadcaster
1141        broadcaster.stop().await;
1142        assert!(!broadcaster.running.load(Ordering::Relaxed));
1143
1144        // Stopping again should be safe
1145        broadcaster.stop().await;
1146        assert!(!broadcaster.running.load(Ordering::Relaxed));
1147    }
1148
1149    #[tokio::test]
1150    async fn test_broadcast_submit_metrics_increment() {
1151        let report = create_test_report("ORDER-1");
1152        let report_clone = report.clone();
1153
1154        let transports = vec![create_stub_transport("client-0", move || {
1155            let report = report_clone.clone();
1156            async move { Ok(report) }
1157        })];
1158
1159        let config = SubmitBroadcasterConfig::default();
1160        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1161
1162        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1163        let _ = broadcaster
1164            .broadcast_submit(
1165                instrument_id,
1166                ClientOrderId::from("O-123"),
1167                OrderSide::Buy,
1168                OrderType::Limit,
1169                Quantity::new(100.0, 0),
1170                TimeInForce::Gtc,
1171                Some(Price::new(50000.0, 2)),
1172                None,
1173                None,
1174                None,
1175                false,
1176                false,
1177                None,
1178                None,
1179                None,
1180            )
1181            .await;
1182
1183        let metrics = broadcaster.get_metrics_async().await;
1184        assert_eq!(metrics.total_submits, 1);
1185        assert_eq!(metrics.successful_submits, 1);
1186        assert_eq!(metrics.failed_submits, 0);
1187    }
1188
1189    #[tokio::test]
1190    async fn test_broadcaster_creation_with_pool() {
1191        let config = SubmitBroadcasterConfig {
1192            pool_size: 4,
1193            api_key: Some("test_key".to_string()),
1194            api_secret: Some("test_secret".to_string()),
1195            base_url: Some("https://test.example.com".to_string()),
1196            ..Default::default()
1197        };
1198
1199        let broadcaster = SubmitBroadcaster::new(config);
1200        assert!(broadcaster.is_ok());
1201
1202        let broadcaster = broadcaster.unwrap();
1203        let metrics = broadcaster.get_metrics_async().await;
1204        assert_eq!(metrics.total_clients, 4);
1205    }
1206
1207    #[tokio::test]
1208    async fn test_client_stats_collection() {
1209        // Both clients fail so broadcast waits for all of them (no early abort on success).
1210        // This ensures both clients execute and record their stats before the function returns.
1211        let transports = vec![
1212            create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
1213            create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1214        ];
1215
1216        let config = SubmitBroadcasterConfig::default();
1217        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1218
1219        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1220        let _ = broadcaster
1221            .broadcast_submit(
1222                instrument_id,
1223                ClientOrderId::from("O-123"),
1224                OrderSide::Buy,
1225                OrderType::Limit,
1226                Quantity::new(100.0, 0),
1227                TimeInForce::Gtc,
1228                Some(Price::new(50000.0, 2)),
1229                None,
1230                None,
1231                None,
1232                false,
1233                false,
1234                None,
1235                None,
1236                None,
1237            )
1238            .await;
1239
1240        let stats = broadcaster.get_client_stats_async().await;
1241        assert_eq!(stats.len(), 2);
1242
1243        let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1244        assert_eq!(client0.submit_count, 1);
1245        assert_eq!(client0.error_count, 1);
1246
1247        let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1248        assert_eq!(client1.submit_count, 1);
1249        assert_eq!(client1.error_count, 1);
1250    }
1251
1252    #[tokio::test]
1253    async fn test_testnet_config_sets_base_url() {
1254        let config = SubmitBroadcasterConfig {
1255            pool_size: 1,
1256            api_key: Some("test_key".to_string()),
1257            api_secret: Some("test_secret".to_string()),
1258            testnet: true,
1259            base_url: None,
1260            ..Default::default()
1261        };
1262
1263        let broadcaster = SubmitBroadcaster::new(config);
1264        assert!(broadcaster.is_ok());
1265    }
1266
1267    #[tokio::test]
1268    async fn test_clone_for_async() {
1269        let config = SubmitBroadcasterConfig {
1270            pool_size: 1,
1271            api_key: Some("test_key".to_string()),
1272            api_secret: Some("test_secret".to_string()),
1273            base_url: Some("https://test.example.com".to_string()),
1274            ..Default::default()
1275        };
1276
1277        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1278        let cloned = broadcaster.clone_for_async();
1279
1280        // Verify they share the same atomics
1281        broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1282        assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1283    }
1284
1285    #[tokio::test]
1286    async fn test_pattern_matching() {
1287        let config = SubmitBroadcasterConfig {
1288            expected_reject_patterns: vec![
1289                "Duplicate clOrdID".to_string(),
1290                "Order already exists".to_string(),
1291            ],
1292            ..Default::default()
1293        };
1294
1295        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1296
1297        assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1298        assert!(broadcaster.is_expected_reject("Order already exists in system"));
1299        assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1300        assert!(!broadcaster.is_expected_reject("Internal server error"));
1301    }
1302
1303    #[tokio::test]
1304    async fn test_submit_metrics_with_mixed_responses() {
1305        let report = create_test_report("ORDER-1");
1306        let report_clone = report.clone();
1307
1308        let transports = vec![
1309            create_stub_transport("client-0", move || {
1310                let report = report_clone.clone();
1311                async move { Ok(report) }
1312            }),
1313            create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1314        ];
1315
1316        let config = SubmitBroadcasterConfig::default();
1317        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1318
1319        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1320        let result = broadcaster
1321            .broadcast_submit(
1322                instrument_id,
1323                ClientOrderId::from("O-123"),
1324                OrderSide::Buy,
1325                OrderType::Limit,
1326                Quantity::new(100.0, 0),
1327                TimeInForce::Gtc,
1328                Some(Price::new(50000.0, 2)),
1329                None,
1330                None,
1331                None,
1332                false,
1333                false,
1334                None,
1335                None,
1336                None,
1337            )
1338            .await;
1339
1340        assert!(result.is_ok());
1341
1342        let metrics = broadcaster.get_metrics_async().await;
1343        assert_eq!(metrics.total_submits, 1);
1344        assert_eq!(metrics.successful_submits, 1);
1345        assert_eq!(metrics.failed_submits, 0);
1346    }
1347
1348    #[tokio::test]
1349    async fn test_metrics_initialization_and_health() {
1350        let config = SubmitBroadcasterConfig {
1351            pool_size: 2,
1352            api_key: Some("test_key".to_string()),
1353            api_secret: Some("test_secret".to_string()),
1354            base_url: Some("https://test.example.com".to_string()),
1355            ..Default::default()
1356        };
1357
1358        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1359        let metrics = broadcaster.get_metrics_async().await;
1360
1361        assert_eq!(metrics.total_submits, 0);
1362        assert_eq!(metrics.successful_submits, 0);
1363        assert_eq!(metrics.failed_submits, 0);
1364        assert_eq!(metrics.expected_rejects, 0);
1365        assert_eq!(metrics.total_clients, 2);
1366        assert_eq!(metrics.healthy_clients, 2);
1367    }
1368
1369    #[tokio::test]
1370    async fn test_health_check_task_lifecycle() {
1371        let config = SubmitBroadcasterConfig {
1372            pool_size: 2,
1373            api_key: Some("test_key".to_string()),
1374            api_secret: Some("test_secret".to_string()),
1375            base_url: Some("https://test.example.com".to_string()),
1376            health_check_interval_secs: 1,
1377            ..Default::default()
1378        };
1379
1380        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1381
1382        // Start should spawn health check task
1383        broadcaster.start().await.unwrap();
1384        assert!(broadcaster.running.load(Ordering::Relaxed));
1385        assert!(
1386            broadcaster
1387                .health_check_task
1388                .read()
1389                .await
1390                .as_ref()
1391                .is_some()
1392        );
1393
1394        // Stop should clean up task
1395        broadcaster.stop().await;
1396        assert!(!broadcaster.running.load(Ordering::Relaxed));
1397    }
1398
1399    #[tokio::test]
1400    async fn test_expected_reject_pattern_comprehensive() {
1401        let transports = vec![
1402            create_stub_transport("client-0", || async {
1403                anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1404            }),
1405            create_stub_transport("client-1", || async {
1406                tokio::time::sleep(Duration::from_secs(10)).await;
1407                anyhow::bail!("Should be aborted")
1408            }),
1409        ];
1410
1411        let config = SubmitBroadcasterConfig::default();
1412        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1413
1414        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1415        let result = broadcaster
1416            .broadcast_submit(
1417                instrument_id,
1418                ClientOrderId::from("O-123"),
1419                OrderSide::Buy,
1420                OrderType::Limit,
1421                Quantity::new(100.0, 0),
1422                TimeInForce::Gtc,
1423                Some(Price::new(50000.0, 2)),
1424                None,
1425                None,
1426                None,
1427                false,
1428                false,
1429                None,
1430                None,
1431                None,
1432            )
1433            .await;
1434
1435        // All failed with expected reject
1436        assert!(result.is_err());
1437
1438        let metrics = broadcaster.get_metrics_async().await;
1439        assert_eq!(metrics.expected_rejects, 1);
1440        assert_eq!(metrics.failed_submits, 1);
1441        assert_eq!(metrics.successful_submits, 0);
1442    }
1443
1444    #[tokio::test]
1445    async fn test_client_order_id_suffix_for_multiple_clients() {
1446        use std::sync::{Arc, Mutex};
1447
1448        #[derive(Clone)]
1449        struct CaptureExecutor {
1450            captured_ids: Arc<Mutex<Vec<String>>>,
1451            barrier: Arc<tokio::sync::Barrier>,
1452            report: OrderStatusReport,
1453        }
1454
1455        impl SubmitExecutor for CaptureExecutor {
1456            fn health_check(
1457                &self,
1458            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1459                Box::pin(async { Ok(()) })
1460            }
1461
1462            #[allow(clippy::too_many_arguments)]
1463            fn submit_order(
1464                &self,
1465                _instrument_id: InstrumentId,
1466                client_order_id: ClientOrderId,
1467                _order_side: OrderSide,
1468                _order_type: OrderType,
1469                _quantity: Quantity,
1470                _time_in_force: TimeInForce,
1471                _price: Option<Price>,
1472                _trigger_price: Option<Price>,
1473                _trigger_type: Option<TriggerType>,
1474                _display_qty: Option<Quantity>,
1475                _post_only: bool,
1476                _reduce_only: bool,
1477                _order_list_id: Option<OrderListId>,
1478                _contingency_type: Option<ContingencyType>,
1479            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1480            {
1481                // Capture the client_order_id
1482                self.captured_ids
1483                    .lock()
1484                    .unwrap()
1485                    .push(client_order_id.as_str().to_string());
1486                let report = self.report.clone();
1487                let barrier = Arc::clone(&self.barrier);
1488                // Wait for all tasks to capture their IDs before any completes
1489                // (with concurrent execution, first success aborts others)
1490                Box::pin(async move {
1491                    barrier.wait().await;
1492                    Ok(report)
1493                })
1494            }
1495
1496            fn add_instrument(&self, _instrument: InstrumentAny) {}
1497        }
1498
1499        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1500        let barrier = Arc::new(tokio::sync::Barrier::new(3));
1501        let report = create_test_report("ORDER-1");
1502
1503        let transports = vec![
1504            TransportClient::new(
1505                CaptureExecutor {
1506                    captured_ids: Arc::clone(&captured_ids),
1507                    barrier: Arc::clone(&barrier),
1508                    report: report.clone(),
1509                },
1510                "client-0".to_string(),
1511            ),
1512            TransportClient::new(
1513                CaptureExecutor {
1514                    captured_ids: Arc::clone(&captured_ids),
1515                    barrier: Arc::clone(&barrier),
1516                    report: report.clone(),
1517                },
1518                "client-1".to_string(),
1519            ),
1520            TransportClient::new(
1521                CaptureExecutor {
1522                    captured_ids: Arc::clone(&captured_ids),
1523                    barrier: Arc::clone(&barrier),
1524                    report: report.clone(),
1525                },
1526                "client-2".to_string(),
1527            ),
1528        ];
1529
1530        let config = SubmitBroadcasterConfig::default();
1531        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1532
1533        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1534        let result = broadcaster
1535            .broadcast_submit(
1536                instrument_id,
1537                ClientOrderId::from("O-123"),
1538                OrderSide::Buy,
1539                OrderType::Limit,
1540                Quantity::new(100.0, 0),
1541                TimeInForce::Gtc,
1542                Some(Price::new(50000.0, 2)),
1543                None,
1544                None,
1545                None,
1546                false,
1547                false,
1548                None,
1549                None,
1550                None,
1551            )
1552            .await;
1553
1554        assert!(result.is_ok());
1555
1556        // Check captured client_order_ids (order is non-deterministic with concurrent execution)
1557        let ids = captured_ids.lock().unwrap();
1558        assert_eq!(ids.len(), 3);
1559        assert!(ids.contains(&"O-123".to_string())); // First client gets original ID
1560        assert!(ids.contains(&"O-123-1".to_string())); // Second client gets suffix -1
1561        assert!(ids.contains(&"O-123-2".to_string())); // Third client gets suffix -2
1562    }
1563
1564    #[tokio::test]
1565    async fn test_client_order_id_suffix_with_partial_failure() {
1566        use std::sync::{Arc, Mutex};
1567
1568        #[derive(Clone)]
1569        struct CaptureAndFailExecutor {
1570            captured_ids: Arc<Mutex<Vec<String>>>,
1571            barrier: Arc<tokio::sync::Barrier>,
1572            should_succeed: bool,
1573        }
1574
1575        impl SubmitExecutor for CaptureAndFailExecutor {
1576            fn health_check(
1577                &self,
1578            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1579                Box::pin(async { Ok(()) })
1580            }
1581
1582            #[allow(clippy::too_many_arguments)]
1583            fn submit_order(
1584                &self,
1585                _instrument_id: InstrumentId,
1586                client_order_id: ClientOrderId,
1587                _order_side: OrderSide,
1588                _order_type: OrderType,
1589                _quantity: Quantity,
1590                _time_in_force: TimeInForce,
1591                _price: Option<Price>,
1592                _trigger_price: Option<Price>,
1593                _trigger_type: Option<TriggerType>,
1594                _display_qty: Option<Quantity>,
1595                _post_only: bool,
1596                _reduce_only: bool,
1597                _order_list_id: Option<OrderListId>,
1598                _contingency_type: Option<ContingencyType>,
1599            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1600            {
1601                // Capture the client_order_id
1602                self.captured_ids
1603                    .lock()
1604                    .unwrap()
1605                    .push(client_order_id.as_str().to_string());
1606                let barrier = Arc::clone(&self.barrier);
1607                let should_succeed = self.should_succeed;
1608                // Wait for all tasks to capture their IDs before any completes
1609                // (with concurrent execution, first success aborts others)
1610                Box::pin(async move {
1611                    barrier.wait().await;
1612                    if should_succeed {
1613                        Ok(create_test_report("ORDER-1"))
1614                    } else {
1615                        anyhow::bail!("Network error")
1616                    }
1617                })
1618            }
1619
1620            fn add_instrument(&self, _instrument: InstrumentAny) {}
1621        }
1622
1623        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1624        let barrier = Arc::new(tokio::sync::Barrier::new(2));
1625
1626        let transports = vec![
1627            TransportClient::new(
1628                CaptureAndFailExecutor {
1629                    captured_ids: Arc::clone(&captured_ids),
1630                    barrier: Arc::clone(&barrier),
1631                    should_succeed: false,
1632                },
1633                "client-0".to_string(),
1634            ),
1635            TransportClient::new(
1636                CaptureAndFailExecutor {
1637                    captured_ids: Arc::clone(&captured_ids),
1638                    barrier: Arc::clone(&barrier),
1639                    should_succeed: true,
1640                },
1641                "client-1".to_string(),
1642            ),
1643        ];
1644
1645        let config = SubmitBroadcasterConfig::default();
1646        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1647
1648        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1649        let result = broadcaster
1650            .broadcast_submit(
1651                instrument_id,
1652                ClientOrderId::from("O-456"),
1653                OrderSide::Sell,
1654                OrderType::Market,
1655                Quantity::new(50.0, 0),
1656                TimeInForce::Ioc,
1657                None,
1658                None,
1659                None,
1660                None,
1661                false,
1662                false,
1663                None,
1664                None,
1665                None,
1666            )
1667            .await;
1668
1669        assert!(result.is_ok());
1670
1671        // Check captured client_order_ids (order is non-deterministic with concurrent execution)
1672        let ids = captured_ids.lock().unwrap();
1673        assert_eq!(ids.len(), 2);
1674        assert!(ids.contains(&"O-456".to_string())); // First client gets original ID
1675        assert!(ids.contains(&"O-456-1".to_string())); // Second client gets suffix -1
1676    }
1677
1678    #[tokio::test]
1679    async fn test_proxy_urls_populated_from_config() {
1680        let config = SubmitBroadcasterConfig {
1681            pool_size: 3,
1682            api_key: Some("test_key".to_string()),
1683            api_secret: Some("test_secret".to_string()),
1684            proxy_urls: vec![
1685                Some("http://proxy1:8080".to_string()),
1686                Some("http://proxy2:8080".to_string()),
1687                Some("http://proxy3:8080".to_string()),
1688            ],
1689            ..Default::default()
1690        };
1691
1692        assert_eq!(config.proxy_urls.len(), 3);
1693        assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1694        assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1695        assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1696    }
1697}