nautilus_bitmex/execution/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Submit request broadcaster for redundant order submission.
17//!
18//! This module provides the [`SubmitBroadcaster`] which fans out submit requests
19//! to multiple HTTP clients in parallel for redundancy. The broadcaster is triggered
20//! when the `SubmitOrder` command contains `params["broadcast_submit_tries"]`.
21//!
22//! Key design patterns:
23//!
24//! - **Dependency injection via traits**: Uses `SubmitExecutor` trait to abstract
25//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
26//! - **Trait objects over generics**: Uses `Arc<dyn SubmitExecutor>` to avoid
27//!   generic type parameters on the public API (simpler Python FFI).
28//! - **Short-circuit on first success**: Aborts remaining requests once any client
29//!   succeeds, minimizing latency.
30//! - **Idempotent rejection handling**: Recognizes duplicate clOrdID as expected
31//!   rejections for debug-level logging without noise.
32
33// TODO: Replace boxed futures in `SubmitExecutor` once stable async trait object support
34// lands so we can drop the per-call heap allocation
35
36use std::{
37    fmt::Debug,
38    future::Future,
39    pin::Pin,
40    sync::{
41        Arc,
42        atomic::{AtomicBool, AtomicU64, Ordering},
43    },
44    time::Duration,
45};
46
47use futures_util::future;
48use nautilus_model::{
49    enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
50    identifiers::{ClientOrderId, InstrumentId, OrderListId},
51    instruments::InstrumentAny,
52    reports::OrderStatusReport,
53    types::{Price, Quantity},
54};
55use tokio::{sync::RwLock, task::JoinHandle, time::interval};
56
57use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
58
59/// Trait for order submission operations.
60///
61/// This trait abstracts the execution layer to enable dependency injection and testing
62/// without conditional compilation. The broadcaster holds executors as `Arc<dyn SubmitExecutor>`
63/// to avoid generic type parameters that would complicate the Python FFI boundary.
64///
65/// # Thread Safety
66///
67/// All methods must be safe to call concurrently from multiple threads. Implementations
68/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
69///
70/// # Error Handling
71///
72/// Methods return `anyhow::Result` for flexibility. Implementers should provide
73/// meaningful error messages that can be logged and tracked by the broadcaster.
74///
75/// # Implementation Note
76///
77/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
78/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
79/// `Clone`) to be used without modification.
80trait SubmitExecutor: Send + Sync {
81    /// Adds an instrument for caching.
82    fn add_instrument(&self, instrument: InstrumentAny);
83
84    /// Performs a health check on the executor.
85    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
86
87    /// Submits a single order.
88    #[allow(clippy::too_many_arguments)]
89    fn submit_order(
90        &self,
91        instrument_id: InstrumentId,
92        client_order_id: ClientOrderId,
93        order_side: OrderSide,
94        order_type: OrderType,
95        quantity: Quantity,
96        time_in_force: TimeInForce,
97        price: Option<Price>,
98        trigger_price: Option<Price>,
99        trigger_type: Option<TriggerType>,
100        display_qty: Option<Quantity>,
101        post_only: bool,
102        reduce_only: bool,
103        order_list_id: Option<OrderListId>,
104        contingency_type: Option<ContingencyType>,
105    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
106}
107
108impl SubmitExecutor for BitmexHttpClient {
109    fn add_instrument(&self, instrument: InstrumentAny) {
110        Self::cache_instrument(self, instrument);
111    }
112
113    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
114        Box::pin(async move {
115            Self::get_server_time(self)
116                .await
117                .map(|_| ())
118                .map_err(|e| anyhow::anyhow!("{e}"))
119        })
120    }
121
122    #[allow(clippy::too_many_arguments)]
123    fn submit_order(
124        &self,
125        instrument_id: InstrumentId,
126        client_order_id: ClientOrderId,
127        order_side: OrderSide,
128        order_type: OrderType,
129        quantity: Quantity,
130        time_in_force: TimeInForce,
131        price: Option<Price>,
132        trigger_price: Option<Price>,
133        trigger_type: Option<TriggerType>,
134        display_qty: Option<Quantity>,
135        post_only: bool,
136        reduce_only: bool,
137        order_list_id: Option<OrderListId>,
138        contingency_type: Option<ContingencyType>,
139    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
140        Box::pin(async move {
141            Self::submit_order(
142                self,
143                instrument_id,
144                client_order_id,
145                order_side,
146                order_type,
147                quantity,
148                time_in_force,
149                price,
150                trigger_price,
151                trigger_type,
152                display_qty,
153                post_only,
154                reduce_only,
155                order_list_id,
156                contingency_type,
157            )
158            .await
159        })
160    }
161}
162
163/// Configuration for the submit broadcaster.
164#[derive(Debug, Clone)]
165pub struct SubmitBroadcasterConfig {
166    /// Number of HTTP clients in the pool.
167    pub pool_size: usize,
168    /// BitMEX API key (None will source from environment).
169    pub api_key: Option<String>,
170    /// BitMEX API secret (None will source from environment).
171    pub api_secret: Option<String>,
172    /// Base URL for BitMEX HTTP API.
173    pub base_url: Option<String>,
174    /// If connecting to BitMEX testnet.
175    pub testnet: bool,
176    /// Timeout in seconds for HTTP requests.
177    pub timeout_secs: Option<u64>,
178    /// Maximum number of retry attempts for failed requests.
179    pub max_retries: Option<u32>,
180    /// Initial delay in milliseconds between retry attempts.
181    pub retry_delay_ms: Option<u64>,
182    /// Maximum delay in milliseconds between retry attempts.
183    pub retry_delay_max_ms: Option<u64>,
184    /// Expiration window in milliseconds for signed requests.
185    pub recv_window_ms: Option<u64>,
186    /// Maximum REST burst rate (requests per second).
187    pub max_requests_per_second: Option<u32>,
188    /// Maximum REST rolling rate (requests per minute).
189    pub max_requests_per_minute: Option<u32>,
190    /// Interval in seconds between health check pings.
191    pub health_check_interval_secs: u64,
192    /// Timeout in seconds for health check requests.
193    pub health_check_timeout_secs: u64,
194    /// Substrings to identify expected submit rejections for debug-level logging.
195    pub expected_reject_patterns: Vec<String>,
196    /// Optional list of proxy URLs for path diversity.
197    ///
198    /// Each transport instance uses the proxy at its index. If the list is shorter
199    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
200    /// are ignored.
201    pub proxy_urls: Vec<Option<String>>,
202}
203
204impl Default for SubmitBroadcasterConfig {
205    fn default() -> Self {
206        Self {
207            pool_size: 3,
208            api_key: None,
209            api_secret: None,
210            base_url: None,
211            testnet: false,
212            timeout_secs: Some(60),
213            max_retries: None,
214            retry_delay_ms: Some(1_000),
215            retry_delay_max_ms: Some(5_000),
216            recv_window_ms: Some(10_000),
217            max_requests_per_second: Some(10),
218            max_requests_per_minute: Some(120),
219            health_check_interval_secs: 30,
220            health_check_timeout_secs: 5,
221            expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
222            proxy_urls: vec![],
223        }
224    }
225}
226
227/// Transport client wrapper with health monitoring.
228#[derive(Clone)]
229struct TransportClient {
230    /// Executor wrapped in Arc to enable cloning without requiring Clone on SubmitExecutor.
231    ///
232    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
233    /// the executor across multiple TransportClient clones.
234    executor: Arc<dyn SubmitExecutor>,
235    client_id: String,
236    healthy: Arc<AtomicBool>,
237    submit_count: Arc<AtomicU64>,
238    error_count: Arc<AtomicU64>,
239}
240
241impl Debug for TransportClient {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        f.debug_struct("TransportClient")
244            .field("client_id", &self.client_id)
245            .field("healthy", &self.healthy)
246            .field("submit_count", &self.submit_count)
247            .field("error_count", &self.error_count)
248            .finish()
249    }
250}
251
252impl TransportClient {
253    fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
254        Self {
255            executor: Arc::new(executor),
256            client_id,
257            healthy: Arc::new(AtomicBool::new(true)),
258            submit_count: Arc::new(AtomicU64::new(0)),
259            error_count: Arc::new(AtomicU64::new(0)),
260        }
261    }
262
263    fn is_healthy(&self) -> bool {
264        self.healthy.load(Ordering::Relaxed)
265    }
266
267    fn mark_healthy(&self) {
268        self.healthy.store(true, Ordering::Relaxed);
269    }
270
271    fn mark_unhealthy(&self) {
272        self.healthy.store(false, Ordering::Relaxed);
273    }
274
275    fn get_submit_count(&self) -> u64 {
276        self.submit_count.load(Ordering::Relaxed)
277    }
278
279    fn get_error_count(&self) -> u64 {
280        self.error_count.load(Ordering::Relaxed)
281    }
282
283    async fn health_check(&self, timeout_secs: u64) -> bool {
284        match tokio::time::timeout(
285            Duration::from_secs(timeout_secs),
286            self.executor.health_check(),
287        )
288        .await
289        {
290            Ok(Ok(_)) => {
291                self.mark_healthy();
292                true
293            }
294            Ok(Err(e)) => {
295                tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
296                self.mark_unhealthy();
297                false
298            }
299            Err(_) => {
300                tracing::warn!("Health check timeout for client {}", self.client_id);
301                self.mark_unhealthy();
302                false
303            }
304        }
305    }
306
307    #[allow(clippy::too_many_arguments)]
308    async fn submit_order(
309        &self,
310        instrument_id: InstrumentId,
311        client_order_id: ClientOrderId,
312        order_side: OrderSide,
313        order_type: OrderType,
314        quantity: Quantity,
315        time_in_force: TimeInForce,
316        price: Option<Price>,
317        trigger_price: Option<Price>,
318        trigger_type: Option<TriggerType>,
319        display_qty: Option<Quantity>,
320        post_only: bool,
321        reduce_only: bool,
322        order_list_id: Option<OrderListId>,
323        contingency_type: Option<ContingencyType>,
324    ) -> anyhow::Result<OrderStatusReport> {
325        self.submit_count.fetch_add(1, Ordering::Relaxed);
326
327        match self
328            .executor
329            .submit_order(
330                instrument_id,
331                client_order_id,
332                order_side,
333                order_type,
334                quantity,
335                time_in_force,
336                price,
337                trigger_price,
338                trigger_type,
339                display_qty,
340                post_only,
341                reduce_only,
342                order_list_id,
343                contingency_type,
344            )
345            .await
346        {
347            Ok(report) => {
348                self.mark_healthy();
349                Ok(report)
350            }
351            Err(e) => {
352                self.error_count.fetch_add(1, Ordering::Relaxed);
353                Err(e)
354            }
355        }
356    }
357}
358
359/// Broadcasts submit requests to multiple HTTP clients for redundancy.
360///
361/// This broadcaster fans out submit requests to multiple pre-warmed HTTP clients
362/// in parallel, short-circuits when the first successful acknowledgement is received,
363/// and handles expected rejection patterns (duplicate clOrdID) with appropriate log levels.
364#[cfg_attr(feature = "python", pyo3::pyclass)]
365#[derive(Debug)]
366pub struct SubmitBroadcaster {
367    config: SubmitBroadcasterConfig,
368    transports: Arc<Vec<TransportClient>>,
369    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
370    running: Arc<AtomicBool>,
371    total_submits: Arc<AtomicU64>,
372    successful_submits: Arc<AtomicU64>,
373    failed_submits: Arc<AtomicU64>,
374    expected_rejects: Arc<AtomicU64>,
375}
376
377impl SubmitBroadcaster {
378    /// Creates a new [`SubmitBroadcaster`] with internal HTTP client pool.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if any HTTP client fails to initialize.
383    pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
384        let mut transports = Vec::with_capacity(config.pool_size);
385
386        // Synthesize base_url when testnet is true but base_url is None
387        let base_url = if config.testnet && config.base_url.is_none() {
388            Some(BITMEX_HTTP_TESTNET_URL.to_string())
389        } else {
390            config.base_url.clone()
391        };
392
393        for i in 0..config.pool_size {
394            // Assign proxy from config list, or None if index exceeds list length
395            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
396
397            let client = BitmexHttpClient::with_credentials(
398                config.api_key.clone(),
399                config.api_secret.clone(),
400                base_url.clone(),
401                config.timeout_secs,
402                config.max_retries,
403                config.retry_delay_ms,
404                config.retry_delay_max_ms,
405                config.recv_window_ms,
406                config.max_requests_per_second,
407                config.max_requests_per_minute,
408                proxy_url,
409            )
410            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
411
412            transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
413        }
414
415        Ok(Self {
416            config,
417            transports: Arc::new(transports),
418            health_check_task: Arc::new(RwLock::new(None)),
419            running: Arc::new(AtomicBool::new(false)),
420            total_submits: Arc::new(AtomicU64::new(0)),
421            successful_submits: Arc::new(AtomicU64::new(0)),
422            failed_submits: Arc::new(AtomicU64::new(0)),
423            expected_rejects: Arc::new(AtomicU64::new(0)),
424        })
425    }
426
427    /// Starts the broadcaster and health check loop.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if the broadcaster is already running.
432    pub async fn start(&self) -> anyhow::Result<()> {
433        if self.running.load(Ordering::Relaxed) {
434            return Ok(());
435        }
436
437        self.running.store(true, Ordering::Relaxed);
438
439        // Initial health check for all clients
440        self.run_health_checks().await;
441
442        // Start periodic health check task
443        let transports = Arc::clone(&self.transports);
444        let running = Arc::clone(&self.running);
445        let interval_secs = self.config.health_check_interval_secs;
446        let timeout_secs = self.config.health_check_timeout_secs;
447
448        let task = tokio::spawn(async move {
449            let mut ticker = interval(Duration::from_secs(interval_secs));
450            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
451
452            loop {
453                ticker.tick().await;
454
455                if !running.load(Ordering::Relaxed) {
456                    break;
457                }
458
459                let tasks: Vec<_> = transports
460                    .iter()
461                    .map(|t| t.health_check(timeout_secs))
462                    .collect();
463
464                let results = future::join_all(tasks).await;
465                let healthy_count = results.iter().filter(|&&r| r).count();
466
467                tracing::debug!(
468                    "Health check complete: {healthy_count}/{} clients healthy",
469                    results.len()
470                );
471            }
472        });
473
474        *self.health_check_task.write().await = Some(task);
475
476        tracing::info!(
477            "SubmitBroadcaster started with {} clients",
478            self.transports.len()
479        );
480
481        Ok(())
482    }
483
484    /// Stops the broadcaster and health check loop.
485    pub async fn stop(&self) {
486        if !self.running.load(Ordering::Relaxed) {
487            return;
488        }
489
490        self.running.store(false, Ordering::Relaxed);
491
492        if let Some(task) = self.health_check_task.write().await.take() {
493            task.abort();
494        }
495
496        tracing::info!("SubmitBroadcaster stopped");
497    }
498
499    async fn run_health_checks(&self) {
500        let tasks: Vec<_> = self
501            .transports
502            .iter()
503            .map(|t| t.health_check(self.config.health_check_timeout_secs))
504            .collect();
505
506        let results = future::join_all(tasks).await;
507        let healthy_count = results.iter().filter(|&&r| r).count();
508
509        tracing::debug!(
510            "Health check complete: {healthy_count}/{} clients healthy",
511            results.len()
512        );
513    }
514
515    fn is_expected_reject(&self, error_message: &str) -> bool {
516        self.config
517            .expected_reject_patterns
518            .iter()
519            .any(|pattern| error_message.contains(pattern))
520    }
521
522    /// Processes submit request results, handling success and failures.
523    ///
524    /// This helper consolidates the common error handling loop used for submit broadcasts.
525    async fn process_submit_results<T>(
526        &self,
527        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
528        operation: &str,
529        params: String,
530    ) -> anyhow::Result<T>
531    where
532        T: Send + 'static,
533    {
534        let mut errors = Vec::new();
535
536        while !handles.is_empty() {
537            let current_handles = std::mem::take(&mut handles);
538            let (result, _idx, remaining) = future::select_all(current_handles).await;
539            handles = remaining.into_iter().collect();
540
541            match result {
542                Ok((client_id, Ok(result))) => {
543                    // First success - abort remaining handles
544                    for handle in &handles {
545                        handle.abort();
546                    }
547                    self.successful_submits.fetch_add(1, Ordering::Relaxed);
548                    tracing::debug!("{} broadcast succeeded [{client_id}] {params}", operation,);
549                    return Ok(result);
550                }
551                Ok((client_id, Err(e))) => {
552                    let error_msg = e.to_string();
553
554                    if self.is_expected_reject(&error_msg) {
555                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
556                        tracing::debug!(
557                            "Expected {} rejection [{client_id}]: {error_msg} {params}",
558                            operation.to_lowercase(),
559                        );
560                        errors.push(error_msg);
561                    } else {
562                        tracing::warn!(
563                            "{} request failed [{client_id}]: {error_msg} {params}",
564                            operation,
565                        );
566                        errors.push(error_msg);
567                    }
568                }
569                Err(e) => {
570                    tracing::warn!("{} task join error: {e:?}", operation);
571                    errors.push(format!("Task panicked: {e:?}"));
572                }
573            }
574        }
575
576        // All tasks failed
577        self.failed_submits.fetch_add(1, Ordering::Relaxed);
578        tracing::error!(
579            "All {} requests failed: {errors:?} {params}",
580            operation.to_lowercase(),
581        );
582        Err(anyhow::anyhow!(
583            "All {} requests failed: {:?}",
584            operation.to_lowercase(),
585            errors
586        ))
587    }
588
589    /// Broadcasts a submit request to all healthy clients in parallel.
590    ///
591    /// # Returns
592    ///
593    /// - `Ok(report)` if successfully submitted with a report.
594    /// - `Err` if all requests failed.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if all submit requests fail or no healthy clients are available.
599    #[allow(clippy::too_many_arguments)]
600    pub async fn broadcast_submit(
601        &self,
602        instrument_id: InstrumentId,
603        client_order_id: ClientOrderId,
604        order_side: OrderSide,
605        order_type: OrderType,
606        quantity: Quantity,
607        time_in_force: TimeInForce,
608        price: Option<Price>,
609        trigger_price: Option<Price>,
610        trigger_type: Option<TriggerType>,
611        display_qty: Option<Quantity>,
612        post_only: bool,
613        reduce_only: bool,
614        order_list_id: Option<OrderListId>,
615        contingency_type: Option<ContingencyType>,
616        submit_tries: Option<usize>,
617    ) -> anyhow::Result<OrderStatusReport> {
618        self.total_submits.fetch_add(1, Ordering::Relaxed);
619
620        let pool_size = self.config.pool_size;
621        let actual_tries = if let Some(t) = submit_tries {
622            if t > pool_size {
623                // Use log macro for Python visibility for now
624                log::warn!(
625                    "submit_tries={} exceeds pool_size={}, capping at pool_size",
626                    t,
627                    pool_size
628                );
629            }
630            std::cmp::min(t, pool_size)
631        } else {
632            pool_size
633        };
634
635        tracing::debug!(
636            "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
637        );
638
639        let healthy_transports: Vec<TransportClient> = self
640            .transports
641            .iter()
642            .filter(|t| t.is_healthy())
643            .take(actual_tries)
644            .cloned()
645            .collect();
646
647        if healthy_transports.is_empty() {
648            self.failed_submits.fetch_add(1, Ordering::Relaxed);
649            anyhow::bail!("No healthy transport clients available");
650        }
651
652        tracing::debug!(
653            "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
654            healthy_transports.len(),
655        );
656
657        let mut handles = Vec::new();
658        for (idx, transport) in healthy_transports.into_iter().enumerate() {
659            // First client uses original ID, subsequent clients get suffix to avoid duplicates
660            let modified_client_order_id = if idx == 0 {
661                client_order_id
662            } else {
663                ClientOrderId::new(format!("{}-{}", client_order_id.as_str(), idx))
664            };
665
666            let handle = tokio::spawn(async move {
667                let client_id = transport.client_id.clone();
668                let result = transport
669                    .submit_order(
670                        instrument_id,
671                        modified_client_order_id,
672                        order_side,
673                        order_type,
674                        quantity,
675                        time_in_force,
676                        price,
677                        trigger_price,
678                        trigger_type,
679                        display_qty,
680                        post_only,
681                        reduce_only,
682                        order_list_id,
683                        contingency_type,
684                    )
685                    .await;
686                (client_id, result)
687            });
688            handles.push(handle);
689        }
690
691        self.process_submit_results(
692            handles,
693            "Submit",
694            format!("(client_order_id={:?})", client_order_id),
695        )
696        .await
697    }
698
699    /// Gets broadcaster metrics.
700    pub fn get_metrics(&self) -> BroadcasterMetrics {
701        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
702        let total_clients = self.transports.len();
703
704        BroadcasterMetrics {
705            total_submits: self.total_submits.load(Ordering::Relaxed),
706            successful_submits: self.successful_submits.load(Ordering::Relaxed),
707            failed_submits: self.failed_submits.load(Ordering::Relaxed),
708            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
709            healthy_clients,
710            total_clients,
711        }
712    }
713
714    /// Gets broadcaster metrics (async version for use within async context).
715    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
716        self.get_metrics()
717    }
718
719    /// Gets per-client statistics.
720    pub fn get_client_stats(&self) -> Vec<ClientStats> {
721        self.transports
722            .iter()
723            .map(|t| ClientStats {
724                client_id: t.client_id.clone(),
725                healthy: t.is_healthy(),
726                submit_count: t.get_submit_count(),
727                error_count: t.get_error_count(),
728            })
729            .collect()
730    }
731
732    /// Gets per-client statistics (async version for use within async context).
733    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
734        self.get_client_stats()
735    }
736
737    /// Caches an instrument in all HTTP clients in the pool.
738    pub fn cache_instrument(&self, instrument: InstrumentAny) {
739        for transport in self.transports.iter() {
740            transport.executor.add_instrument(instrument.clone());
741        }
742    }
743
744    pub fn clone_for_async(&self) -> Self {
745        Self {
746            config: self.config.clone(),
747            transports: Arc::clone(&self.transports),
748            health_check_task: Arc::clone(&self.health_check_task),
749            running: Arc::clone(&self.running),
750            total_submits: Arc::clone(&self.total_submits),
751            successful_submits: Arc::clone(&self.successful_submits),
752            failed_submits: Arc::clone(&self.failed_submits),
753            expected_rejects: Arc::clone(&self.expected_rejects),
754        }
755    }
756
757    #[cfg(test)]
758    fn new_with_transports(
759        config: SubmitBroadcasterConfig,
760        transports: Vec<TransportClient>,
761    ) -> Self {
762        Self {
763            config,
764            transports: Arc::new(transports),
765            health_check_task: Arc::new(RwLock::new(None)),
766            running: Arc::new(AtomicBool::new(false)),
767            total_submits: Arc::new(AtomicU64::new(0)),
768            successful_submits: Arc::new(AtomicU64::new(0)),
769            failed_submits: Arc::new(AtomicU64::new(0)),
770            expected_rejects: Arc::new(AtomicU64::new(0)),
771        }
772    }
773}
774
775/// Broadcaster metrics snapshot.
776#[derive(Debug, Clone)]
777pub struct BroadcasterMetrics {
778    pub total_submits: u64,
779    pub successful_submits: u64,
780    pub failed_submits: u64,
781    pub expected_rejects: u64,
782    pub healthy_clients: usize,
783    pub total_clients: usize,
784}
785
786/// Per-client statistics.
787#[derive(Debug, Clone)]
788pub struct ClientStats {
789    pub client_id: String,
790    pub healthy: bool,
791    pub submit_count: u64,
792    pub error_count: u64,
793}
794
795////////////////////////////////////////////////////////////////////////////////
796// Tests
797////////////////////////////////////////////////////////////////////////////////
798
799#[cfg(test)]
800mod tests {
801    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
802
803    use nautilus_core::UUID4;
804    use nautilus_model::{
805        enums::{
806            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
807        },
808        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
809        reports::OrderStatusReport,
810        types::{Price, Quantity},
811    };
812
813    use super::*;
814
815    /// Mock executor for testing.
816    #[derive(Clone)]
817    #[allow(clippy::type_complexity)]
818    struct MockExecutor {
819        handler: Arc<
820            dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
821                + Send
822                + Sync,
823        >,
824    }
825
826    impl MockExecutor {
827        fn new<F, Fut>(handler: F) -> Self
828        where
829            F: Fn() -> Fut + Send + Sync + 'static,
830            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
831        {
832            Self {
833                handler: Arc::new(move || Box::pin(handler())),
834            }
835        }
836    }
837
838    impl SubmitExecutor for MockExecutor {
839        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
840            Box::pin(async { Ok(()) })
841        }
842
843        #[allow(clippy::too_many_arguments)]
844        fn submit_order(
845            &self,
846            _instrument_id: InstrumentId,
847            _client_order_id: ClientOrderId,
848            _order_side: OrderSide,
849            _order_type: OrderType,
850            _quantity: Quantity,
851            _time_in_force: TimeInForce,
852            _price: Option<Price>,
853            _trigger_price: Option<Price>,
854            _trigger_type: Option<TriggerType>,
855            _display_qty: Option<Quantity>,
856            _post_only: bool,
857            _reduce_only: bool,
858            _order_list_id: Option<OrderListId>,
859            _contingency_type: Option<ContingencyType>,
860        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
861            (self.handler)()
862        }
863
864        fn add_instrument(&self, _instrument: InstrumentAny) {
865            // No-op for mock
866        }
867    }
868
869    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
870        OrderStatusReport {
871            account_id: AccountId::from("BITMEX-001"),
872            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
873            venue_order_id: VenueOrderId::from(venue_order_id),
874            order_side: OrderSide::Buy,
875            order_type: OrderType::Limit,
876            time_in_force: TimeInForce::Gtc,
877            order_status: OrderStatus::Accepted,
878            price: Some(Price::new(50000.0, 2)),
879            quantity: Quantity::new(100.0, 0),
880            filled_qty: Quantity::new(0.0, 0),
881            report_id: UUID4::new(),
882            ts_accepted: 0.into(),
883            ts_last: 0.into(),
884            ts_init: 0.into(),
885            client_order_id: None,
886            avg_px: None,
887            trigger_price: None,
888            trigger_type: None,
889            contingency_type: ContingencyType::NoContingency,
890            expire_time: None,
891            order_list_id: None,
892            venue_position_id: None,
893            linked_order_ids: None,
894            parent_order_id: None,
895            display_qty: None,
896            limit_offset: None,
897            trailing_offset: None,
898            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
899            post_only: false,
900            reduce_only: false,
901            cancel_reason: None,
902            ts_triggered: None,
903        }
904    }
905
906    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
907    where
908        F: Fn() -> Fut + Send + Sync + 'static,
909        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
910    {
911        let executor = MockExecutor::new(handler);
912        TransportClient::new(executor, client_id.to_string())
913    }
914
915    #[tokio::test]
916    async fn test_broadcast_submit_immediate_success() {
917        let report = create_test_report("ORDER-1");
918        let report_clone = report.clone();
919
920        let transports = vec![
921            create_stub_transport("client-0", move || {
922                let report = report_clone.clone();
923                async move { Ok(report) }
924            }),
925            create_stub_transport("client-1", || async {
926                tokio::time::sleep(Duration::from_secs(10)).await;
927                anyhow::bail!("Should be aborted")
928            }),
929        ];
930
931        let config = SubmitBroadcasterConfig::default();
932        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
933
934        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
935        let result = broadcaster
936            .broadcast_submit(
937                instrument_id,
938                ClientOrderId::from("O-123"),
939                OrderSide::Buy,
940                OrderType::Limit,
941                Quantity::new(100.0, 0),
942                TimeInForce::Gtc,
943                Some(Price::new(50000.0, 2)),
944                None,
945                None,
946                None,
947                false,
948                false,
949                None,
950                None,
951                None,
952            )
953            .await;
954
955        assert!(result.is_ok());
956        let returned_report = result.unwrap();
957        assert_eq!(returned_report.venue_order_id, report.venue_order_id);
958
959        let metrics = broadcaster.get_metrics_async().await;
960        assert_eq!(metrics.successful_submits, 1);
961        assert_eq!(metrics.failed_submits, 0);
962        assert_eq!(metrics.total_submits, 1);
963    }
964
965    #[tokio::test]
966    async fn test_broadcast_submit_duplicate_clordid_expected() {
967        let transports = vec![
968            create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
969            create_stub_transport("client-1", || async {
970                tokio::time::sleep(Duration::from_secs(10)).await;
971                anyhow::bail!("Should be aborted")
972            }),
973        ];
974
975        let config = SubmitBroadcasterConfig::default();
976        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
977
978        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
979        let result = broadcaster
980            .broadcast_submit(
981                instrument_id,
982                ClientOrderId::from("O-123"),
983                OrderSide::Buy,
984                OrderType::Limit,
985                Quantity::new(100.0, 0),
986                TimeInForce::Gtc,
987                Some(Price::new(50000.0, 2)),
988                None,
989                None,
990                None,
991                false,
992                false,
993                None,
994                None,
995                None,
996            )
997            .await;
998
999        assert!(result.is_err());
1000
1001        let metrics = broadcaster.get_metrics_async().await;
1002        assert_eq!(metrics.expected_rejects, 1);
1003        assert_eq!(metrics.successful_submits, 0);
1004        assert_eq!(metrics.failed_submits, 1);
1005    }
1006
1007    #[tokio::test]
1008    async fn test_broadcast_submit_all_failures() {
1009        let transports = vec![
1010            create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1011            create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1012        ];
1013
1014        let config = SubmitBroadcasterConfig::default();
1015        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1016
1017        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1018        let result = broadcaster
1019            .broadcast_submit(
1020                instrument_id,
1021                ClientOrderId::from("O-456"),
1022                OrderSide::Sell,
1023                OrderType::Market,
1024                Quantity::new(50.0, 0),
1025                TimeInForce::Ioc,
1026                None,
1027                None,
1028                None,
1029                None,
1030                false,
1031                false,
1032                None,
1033                None,
1034                None,
1035            )
1036            .await;
1037
1038        assert!(result.is_err());
1039        assert!(
1040            result
1041                .unwrap_err()
1042                .to_string()
1043                .contains("All submit requests failed")
1044        );
1045
1046        let metrics = broadcaster.get_metrics_async().await;
1047        assert_eq!(metrics.failed_submits, 1);
1048        assert_eq!(metrics.successful_submits, 0);
1049    }
1050
1051    #[tokio::test]
1052    async fn test_broadcast_submit_no_healthy_clients() {
1053        let transport =
1054            create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1055        transport.healthy.store(false, Ordering::Relaxed);
1056
1057        let config = SubmitBroadcasterConfig::default();
1058        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1059
1060        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1061        let result = broadcaster
1062            .broadcast_submit(
1063                instrument_id,
1064                ClientOrderId::from("O-789"),
1065                OrderSide::Buy,
1066                OrderType::Limit,
1067                Quantity::new(100.0, 0),
1068                TimeInForce::Gtc,
1069                Some(Price::new(50000.0, 2)),
1070                None,
1071                None,
1072                None,
1073                false,
1074                false,
1075                None,
1076                None,
1077                None,
1078            )
1079            .await;
1080
1081        assert!(result.is_err());
1082        assert!(
1083            result
1084                .unwrap_err()
1085                .to_string()
1086                .contains("No healthy transport clients available")
1087        );
1088
1089        let metrics = broadcaster.get_metrics_async().await;
1090        assert_eq!(metrics.failed_submits, 1);
1091    }
1092
1093    #[tokio::test]
1094    async fn test_default_config() {
1095        let config = SubmitBroadcasterConfig {
1096            api_key: Some("test_key".to_string()),
1097            api_secret: Some("test_secret".to_string()),
1098            base_url: Some("https://test.example.com".to_string()),
1099            ..Default::default()
1100        };
1101
1102        let broadcaster = SubmitBroadcaster::new(config);
1103        assert!(broadcaster.is_ok());
1104
1105        let broadcaster = broadcaster.unwrap();
1106        let metrics = broadcaster.get_metrics_async().await;
1107
1108        // Default pool_size is 3
1109        assert_eq!(metrics.total_clients, 3);
1110    }
1111
1112    #[tokio::test]
1113    async fn test_broadcaster_lifecycle() {
1114        let config = SubmitBroadcasterConfig {
1115            pool_size: 2,
1116            api_key: Some("test_key".to_string()),
1117            api_secret: Some("test_secret".to_string()),
1118            base_url: Some("https://test.example.com".to_string()),
1119            testnet: false,
1120            timeout_secs: Some(5),
1121            max_retries: None,
1122            retry_delay_ms: None,
1123            retry_delay_max_ms: None,
1124            recv_window_ms: None,
1125            max_requests_per_second: None,
1126            max_requests_per_minute: None,
1127            health_check_interval_secs: 60,
1128            health_check_timeout_secs: 1,
1129            expected_reject_patterns: vec![],
1130            proxy_urls: vec![],
1131        };
1132
1133        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1134
1135        // Should not be running initially
1136        assert!(!broadcaster.running.load(Ordering::Relaxed));
1137
1138        // Start broadcaster
1139        let start_result = broadcaster.start().await;
1140        assert!(start_result.is_ok());
1141        assert!(broadcaster.running.load(Ordering::Relaxed));
1142
1143        // Starting again should be idempotent
1144        let start_again = broadcaster.start().await;
1145        assert!(start_again.is_ok());
1146
1147        // Stop broadcaster
1148        broadcaster.stop().await;
1149        assert!(!broadcaster.running.load(Ordering::Relaxed));
1150
1151        // Stopping again should be safe
1152        broadcaster.stop().await;
1153        assert!(!broadcaster.running.load(Ordering::Relaxed));
1154    }
1155
1156    #[tokio::test]
1157    async fn test_broadcast_submit_metrics_increment() {
1158        let report = create_test_report("ORDER-1");
1159        let report_clone = report.clone();
1160
1161        let transports = vec![create_stub_transport("client-0", move || {
1162            let report = report_clone.clone();
1163            async move { Ok(report) }
1164        })];
1165
1166        let config = SubmitBroadcasterConfig::default();
1167        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1168
1169        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1170        let _ = broadcaster
1171            .broadcast_submit(
1172                instrument_id,
1173                ClientOrderId::from("O-123"),
1174                OrderSide::Buy,
1175                OrderType::Limit,
1176                Quantity::new(100.0, 0),
1177                TimeInForce::Gtc,
1178                Some(Price::new(50000.0, 2)),
1179                None,
1180                None,
1181                None,
1182                false,
1183                false,
1184                None,
1185                None,
1186                None,
1187            )
1188            .await;
1189
1190        let metrics = broadcaster.get_metrics_async().await;
1191        assert_eq!(metrics.total_submits, 1);
1192        assert_eq!(metrics.successful_submits, 1);
1193        assert_eq!(metrics.failed_submits, 0);
1194    }
1195
1196    #[tokio::test]
1197    async fn test_broadcaster_creation_with_pool() {
1198        let config = SubmitBroadcasterConfig {
1199            pool_size: 4,
1200            api_key: Some("test_key".to_string()),
1201            api_secret: Some("test_secret".to_string()),
1202            base_url: Some("https://test.example.com".to_string()),
1203            ..Default::default()
1204        };
1205
1206        let broadcaster = SubmitBroadcaster::new(config);
1207        assert!(broadcaster.is_ok());
1208
1209        let broadcaster = broadcaster.unwrap();
1210        let metrics = broadcaster.get_metrics_async().await;
1211        assert_eq!(metrics.total_clients, 4);
1212    }
1213
1214    #[tokio::test]
1215    async fn test_client_stats_collection() {
1216        let report = create_test_report("ORDER-1");
1217        let report_clone = report.clone();
1218
1219        let transports = vec![
1220            create_stub_transport("client-0", move || {
1221                let report = report_clone.clone();
1222                async move { Ok(report) }
1223            }),
1224            create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1225        ];
1226
1227        let config = SubmitBroadcasterConfig::default();
1228        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1229
1230        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1231        let _ = broadcaster
1232            .broadcast_submit(
1233                instrument_id,
1234                ClientOrderId::from("O-123"),
1235                OrderSide::Buy,
1236                OrderType::Limit,
1237                Quantity::new(100.0, 0),
1238                TimeInForce::Gtc,
1239                Some(Price::new(50000.0, 2)),
1240                None,
1241                None,
1242                None,
1243                false,
1244                false,
1245                None,
1246                None,
1247                None,
1248            )
1249            .await;
1250
1251        let stats = broadcaster.get_client_stats_async().await;
1252        assert_eq!(stats.len(), 2);
1253
1254        let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1255        assert_eq!(client0.submit_count, 1);
1256        assert_eq!(client0.error_count, 0);
1257
1258        let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1259        assert_eq!(client1.submit_count, 1);
1260        assert_eq!(client1.error_count, 1);
1261    }
1262
1263    #[tokio::test]
1264    async fn test_testnet_config_sets_base_url() {
1265        let config = SubmitBroadcasterConfig {
1266            pool_size: 1,
1267            api_key: Some("test_key".to_string()),
1268            api_secret: Some("test_secret".to_string()),
1269            testnet: true,
1270            base_url: None,
1271            ..Default::default()
1272        };
1273
1274        let broadcaster = SubmitBroadcaster::new(config);
1275        assert!(broadcaster.is_ok());
1276    }
1277
1278    #[tokio::test]
1279    async fn test_clone_for_async() {
1280        let config = SubmitBroadcasterConfig {
1281            pool_size: 1,
1282            api_key: Some("test_key".to_string()),
1283            api_secret: Some("test_secret".to_string()),
1284            base_url: Some("https://test.example.com".to_string()),
1285            ..Default::default()
1286        };
1287
1288        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1289        let cloned = broadcaster.clone_for_async();
1290
1291        // Verify they share the same atomics
1292        broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1293        assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1294    }
1295
1296    #[tokio::test]
1297    async fn test_pattern_matching() {
1298        let config = SubmitBroadcasterConfig {
1299            expected_reject_patterns: vec![
1300                "Duplicate clOrdID".to_string(),
1301                "Order already exists".to_string(),
1302            ],
1303            ..Default::default()
1304        };
1305
1306        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1307
1308        assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1309        assert!(broadcaster.is_expected_reject("Order already exists in system"));
1310        assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1311        assert!(!broadcaster.is_expected_reject("Internal server error"));
1312    }
1313
1314    #[tokio::test]
1315    async fn test_submit_metrics_with_mixed_responses() {
1316        let report = create_test_report("ORDER-1");
1317        let report_clone = report.clone();
1318
1319        let transports = vec![
1320            create_stub_transport("client-0", move || {
1321                let report = report_clone.clone();
1322                async move { Ok(report) }
1323            }),
1324            create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1325        ];
1326
1327        let config = SubmitBroadcasterConfig::default();
1328        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1329
1330        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1331        let result = broadcaster
1332            .broadcast_submit(
1333                instrument_id,
1334                ClientOrderId::from("O-123"),
1335                OrderSide::Buy,
1336                OrderType::Limit,
1337                Quantity::new(100.0, 0),
1338                TimeInForce::Gtc,
1339                Some(Price::new(50000.0, 2)),
1340                None,
1341                None,
1342                None,
1343                false,
1344                false,
1345                None,
1346                None,
1347                None,
1348            )
1349            .await;
1350
1351        assert!(result.is_ok());
1352
1353        let metrics = broadcaster.get_metrics_async().await;
1354        assert_eq!(metrics.total_submits, 1);
1355        assert_eq!(metrics.successful_submits, 1);
1356        assert_eq!(metrics.failed_submits, 0);
1357    }
1358
1359    #[tokio::test]
1360    async fn test_metrics_initialization_and_health() {
1361        let config = SubmitBroadcasterConfig {
1362            pool_size: 2,
1363            api_key: Some("test_key".to_string()),
1364            api_secret: Some("test_secret".to_string()),
1365            base_url: Some("https://test.example.com".to_string()),
1366            ..Default::default()
1367        };
1368
1369        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1370        let metrics = broadcaster.get_metrics_async().await;
1371
1372        assert_eq!(metrics.total_submits, 0);
1373        assert_eq!(metrics.successful_submits, 0);
1374        assert_eq!(metrics.failed_submits, 0);
1375        assert_eq!(metrics.expected_rejects, 0);
1376        assert_eq!(metrics.total_clients, 2);
1377        assert_eq!(metrics.healthy_clients, 2);
1378    }
1379
1380    #[tokio::test]
1381    async fn test_health_check_task_lifecycle() {
1382        let config = SubmitBroadcasterConfig {
1383            pool_size: 2,
1384            api_key: Some("test_key".to_string()),
1385            api_secret: Some("test_secret".to_string()),
1386            base_url: Some("https://test.example.com".to_string()),
1387            health_check_interval_secs: 1,
1388            ..Default::default()
1389        };
1390
1391        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1392
1393        // Start should spawn health check task
1394        broadcaster.start().await.unwrap();
1395        assert!(broadcaster.running.load(Ordering::Relaxed));
1396        assert!(
1397            broadcaster
1398                .health_check_task
1399                .read()
1400                .await
1401                .as_ref()
1402                .is_some()
1403        );
1404
1405        // Wait a bit to let health checks run
1406        tokio::time::sleep(Duration::from_millis(100)).await;
1407
1408        // Stop should clean up task
1409        broadcaster.stop().await;
1410        assert!(!broadcaster.running.load(Ordering::Relaxed));
1411    }
1412
1413    #[tokio::test]
1414    async fn test_expected_reject_pattern_comprehensive() {
1415        let transports = vec![
1416            create_stub_transport("client-0", || async {
1417                anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1418            }),
1419            create_stub_transport("client-1", || async {
1420                tokio::time::sleep(Duration::from_secs(10)).await;
1421                anyhow::bail!("Should be aborted")
1422            }),
1423        ];
1424
1425        let config = SubmitBroadcasterConfig::default();
1426        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1427
1428        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1429        let result = broadcaster
1430            .broadcast_submit(
1431                instrument_id,
1432                ClientOrderId::from("O-123"),
1433                OrderSide::Buy,
1434                OrderType::Limit,
1435                Quantity::new(100.0, 0),
1436                TimeInForce::Gtc,
1437                Some(Price::new(50000.0, 2)),
1438                None,
1439                None,
1440                None,
1441                false,
1442                false,
1443                None,
1444                None,
1445                None,
1446            )
1447            .await;
1448
1449        // All failed with expected reject
1450        assert!(result.is_err());
1451
1452        let metrics = broadcaster.get_metrics_async().await;
1453        assert_eq!(metrics.expected_rejects, 1);
1454        assert_eq!(metrics.failed_submits, 1);
1455        assert_eq!(metrics.successful_submits, 0);
1456    }
1457
1458    #[tokio::test]
1459    async fn test_client_order_id_suffix_for_multiple_clients() {
1460        use std::sync::{Arc, Mutex};
1461
1462        #[derive(Clone)]
1463        struct CaptureExecutor {
1464            captured_ids: Arc<Mutex<Vec<String>>>,
1465            report: OrderStatusReport,
1466        }
1467
1468        impl SubmitExecutor for CaptureExecutor {
1469            fn health_check(
1470                &self,
1471            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1472                Box::pin(async { Ok(()) })
1473            }
1474
1475            #[allow(clippy::too_many_arguments)]
1476            fn submit_order(
1477                &self,
1478                _instrument_id: InstrumentId,
1479                client_order_id: ClientOrderId,
1480                _order_side: OrderSide,
1481                _order_type: OrderType,
1482                _quantity: Quantity,
1483                _time_in_force: TimeInForce,
1484                _price: Option<Price>,
1485                _trigger_price: Option<Price>,
1486                _trigger_type: Option<TriggerType>,
1487                _display_qty: Option<Quantity>,
1488                _post_only: bool,
1489                _reduce_only: bool,
1490                _order_list_id: Option<OrderListId>,
1491                _contingency_type: Option<ContingencyType>,
1492            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1493            {
1494                // Capture the client_order_id
1495                self.captured_ids
1496                    .lock()
1497                    .unwrap()
1498                    .push(client_order_id.as_str().to_string());
1499                let report = self.report.clone();
1500                Box::pin(async move { Ok(report) })
1501            }
1502
1503            fn add_instrument(&self, _instrument: InstrumentAny) {}
1504        }
1505
1506        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1507        let report = create_test_report("ORDER-1");
1508
1509        let transports = vec![
1510            TransportClient::new(
1511                CaptureExecutor {
1512                    captured_ids: Arc::clone(&captured_ids),
1513                    report: report.clone(),
1514                },
1515                "client-0".to_string(),
1516            ),
1517            TransportClient::new(
1518                CaptureExecutor {
1519                    captured_ids: Arc::clone(&captured_ids),
1520                    report: report.clone(),
1521                },
1522                "client-1".to_string(),
1523            ),
1524            TransportClient::new(
1525                CaptureExecutor {
1526                    captured_ids: Arc::clone(&captured_ids),
1527                    report: report.clone(),
1528                },
1529                "client-2".to_string(),
1530            ),
1531        ];
1532
1533        let config = SubmitBroadcasterConfig::default();
1534        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1535
1536        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1537        let result = broadcaster
1538            .broadcast_submit(
1539                instrument_id,
1540                ClientOrderId::from("O-123"),
1541                OrderSide::Buy,
1542                OrderType::Limit,
1543                Quantity::new(100.0, 0),
1544                TimeInForce::Gtc,
1545                Some(Price::new(50000.0, 2)),
1546                None,
1547                None,
1548                None,
1549                false,
1550                false,
1551                None,
1552                None,
1553                None,
1554            )
1555            .await;
1556
1557        assert!(result.is_ok());
1558
1559        // Check captured client_order_ids
1560        let ids = captured_ids.lock().unwrap();
1561        assert_eq!(ids.len(), 3);
1562        assert_eq!(ids[0], "O-123"); // First client gets original ID
1563        assert_eq!(ids[1], "O-123-1"); // Second client gets suffix -1
1564        assert_eq!(ids[2], "O-123-2"); // Third client gets suffix -2
1565    }
1566
1567    #[tokio::test]
1568    async fn test_client_order_id_suffix_with_partial_failure() {
1569        use std::sync::{Arc, Mutex};
1570
1571        #[derive(Clone)]
1572        struct CaptureAndFailExecutor {
1573            captured_ids: Arc<Mutex<Vec<String>>>,
1574            should_succeed: bool,
1575        }
1576
1577        impl SubmitExecutor for CaptureAndFailExecutor {
1578            fn health_check(
1579                &self,
1580            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1581                Box::pin(async { Ok(()) })
1582            }
1583
1584            #[allow(clippy::too_many_arguments)]
1585            fn submit_order(
1586                &self,
1587                _instrument_id: InstrumentId,
1588                client_order_id: ClientOrderId,
1589                _order_side: OrderSide,
1590                _order_type: OrderType,
1591                _quantity: Quantity,
1592                _time_in_force: TimeInForce,
1593                _price: Option<Price>,
1594                _trigger_price: Option<Price>,
1595                _trigger_type: Option<TriggerType>,
1596                _display_qty: Option<Quantity>,
1597                _post_only: bool,
1598                _reduce_only: bool,
1599                _order_list_id: Option<OrderListId>,
1600                _contingency_type: Option<ContingencyType>,
1601            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1602            {
1603                // Capture the client_order_id
1604                self.captured_ids
1605                    .lock()
1606                    .unwrap()
1607                    .push(client_order_id.as_str().to_string());
1608                let should_succeed = self.should_succeed;
1609                Box::pin(async move {
1610                    if should_succeed {
1611                        Ok(create_test_report("ORDER-1"))
1612                    } else {
1613                        anyhow::bail!("Network error")
1614                    }
1615                })
1616            }
1617
1618            fn add_instrument(&self, _instrument: InstrumentAny) {}
1619        }
1620
1621        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1622
1623        let transports = vec![
1624            TransportClient::new(
1625                CaptureAndFailExecutor {
1626                    captured_ids: Arc::clone(&captured_ids),
1627                    should_succeed: false,
1628                },
1629                "client-0".to_string(),
1630            ),
1631            TransportClient::new(
1632                CaptureAndFailExecutor {
1633                    captured_ids: Arc::clone(&captured_ids),
1634                    should_succeed: true,
1635                },
1636                "client-1".to_string(),
1637            ),
1638        ];
1639
1640        let config = SubmitBroadcasterConfig::default();
1641        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1642
1643        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1644        let result = broadcaster
1645            .broadcast_submit(
1646                instrument_id,
1647                ClientOrderId::from("O-456"),
1648                OrderSide::Sell,
1649                OrderType::Market,
1650                Quantity::new(50.0, 0),
1651                TimeInForce::Ioc,
1652                None,
1653                None,
1654                None,
1655                None,
1656                false,
1657                false,
1658                None,
1659                None,
1660                None,
1661            )
1662            .await;
1663
1664        assert!(result.is_ok());
1665
1666        // Check that both clients received unique client_order_ids
1667        let ids = captured_ids.lock().unwrap();
1668        assert_eq!(ids.len(), 2);
1669        assert_eq!(ids[0], "O-456"); // First client gets original ID
1670        assert_eq!(ids[1], "O-456-1"); // Second client gets suffix -1
1671    }
1672
1673    #[tokio::test]
1674    async fn test_proxy_urls_populated_from_config() {
1675        let config = SubmitBroadcasterConfig {
1676            pool_size: 3,
1677            api_key: Some("test_key".to_string()),
1678            api_secret: Some("test_secret".to_string()),
1679            proxy_urls: vec![
1680                Some("http://proxy1:8080".to_string()),
1681                Some("http://proxy2:8080".to_string()),
1682                Some("http://proxy3:8080".to_string()),
1683            ],
1684            ..Default::default()
1685        };
1686
1687        assert_eq!(config.proxy_urls.len(), 3);
1688        assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1689        assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1690        assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1691    }
1692}