nautilus_bitmex/execution/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Submit request broadcaster for redundant order submission.
17//!
18//! This module provides the [`SubmitBroadcaster`] which fans out submit requests
19//! to multiple HTTP clients in parallel for redundancy. The broadcaster is triggered
20//! when the `SubmitOrder` command contains `params["broadcast_submit_tries"]`.
21//!
22//! Key design patterns:
23//!
24//! - **Dependency injection via traits**: Uses `SubmitExecutor` trait to abstract
25//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
26//! - **Trait objects over generics**: Uses `Arc<dyn SubmitExecutor>` to avoid
27//!   generic type parameters on the public API (simpler Python FFI).
28//! - **Short-circuit on first success**: Aborts remaining requests once any client
29//!   succeeds, minimizing latency.
30//! - **Idempotent rejection handling**: Recognizes duplicate clOrdID as expected
31//!   rejections for debug-level logging without noise.
32
33// TODO: Replace boxed futures in `SubmitExecutor` once stable async trait object support
34// lands so we can drop the per-call heap allocation
35
36use std::{
37    fmt::Debug,
38    future::Future,
39    pin::Pin,
40    sync::{
41        Arc,
42        atomic::{AtomicBool, AtomicU64, Ordering},
43    },
44    time::Duration,
45};
46
47use futures_util::future;
48use nautilus_model::{
49    enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
50    identifiers::{ClientOrderId, InstrumentId, OrderListId},
51    instruments::InstrumentAny,
52    reports::OrderStatusReport,
53    types::{Price, Quantity},
54};
55use tokio::{sync::RwLock, task::JoinHandle, time::interval};
56
57use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
58
59/// Trait for order submission operations.
60///
61/// This trait abstracts the execution layer to enable dependency injection and testing
62/// without conditional compilation. The broadcaster holds executors as `Arc<dyn SubmitExecutor>`
63/// to avoid generic type parameters that would complicate the Python FFI boundary.
64///
65/// # Thread Safety
66///
67/// All methods must be safe to call concurrently from multiple threads. Implementations
68/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
69///
70/// # Error Handling
71///
72/// Methods return `anyhow::Result` for flexibility. Implementers should provide
73/// meaningful error messages that can be logged and tracked by the broadcaster.
74///
75/// # Implementation Note
76///
77/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
78/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
79/// `Clone`) to be used without modification.
80trait SubmitExecutor: Send + Sync {
81    /// Adds an instrument for caching.
82    fn add_instrument(&self, instrument: InstrumentAny);
83
84    /// Performs a health check on the executor.
85    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
86
87    /// Submits a single order.
88    #[allow(clippy::too_many_arguments)]
89    fn submit_order(
90        &self,
91        instrument_id: InstrumentId,
92        client_order_id: ClientOrderId,
93        order_side: OrderSide,
94        order_type: OrderType,
95        quantity: Quantity,
96        time_in_force: TimeInForce,
97        price: Option<Price>,
98        trigger_price: Option<Price>,
99        trigger_type: Option<TriggerType>,
100        display_qty: Option<Quantity>,
101        post_only: bool,
102        reduce_only: bool,
103        order_list_id: Option<OrderListId>,
104        contingency_type: Option<ContingencyType>,
105    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
106}
107
108impl SubmitExecutor for BitmexHttpClient {
109    fn add_instrument(&self, instrument: InstrumentAny) {
110        Self::cache_instrument(self, instrument);
111    }
112
113    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
114        Box::pin(async move {
115            Self::get_server_time(self)
116                .await
117                .map(|_| ())
118                .map_err(|e| anyhow::anyhow!("{e}"))
119        })
120    }
121
122    #[allow(clippy::too_many_arguments)]
123    fn submit_order(
124        &self,
125        instrument_id: InstrumentId,
126        client_order_id: ClientOrderId,
127        order_side: OrderSide,
128        order_type: OrderType,
129        quantity: Quantity,
130        time_in_force: TimeInForce,
131        price: Option<Price>,
132        trigger_price: Option<Price>,
133        trigger_type: Option<TriggerType>,
134        display_qty: Option<Quantity>,
135        post_only: bool,
136        reduce_only: bool,
137        order_list_id: Option<OrderListId>,
138        contingency_type: Option<ContingencyType>,
139    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
140        Box::pin(async move {
141            Self::submit_order(
142                self,
143                instrument_id,
144                client_order_id,
145                order_side,
146                order_type,
147                quantity,
148                time_in_force,
149                price,
150                trigger_price,
151                trigger_type,
152                display_qty,
153                post_only,
154                reduce_only,
155                order_list_id,
156                contingency_type,
157            )
158            .await
159        })
160    }
161}
162
163/// Configuration for the submit broadcaster.
164#[derive(Debug, Clone)]
165pub struct SubmitBroadcasterConfig {
166    /// Number of HTTP clients in the pool.
167    pub pool_size: usize,
168    /// BitMEX API key (None will source from environment).
169    pub api_key: Option<String>,
170    /// BitMEX API secret (None will source from environment).
171    pub api_secret: Option<String>,
172    /// Base URL for BitMEX HTTP API.
173    pub base_url: Option<String>,
174    /// If connecting to BitMEX testnet.
175    pub testnet: bool,
176    /// Timeout in seconds for HTTP requests.
177    pub timeout_secs: Option<u64>,
178    /// Maximum number of retry attempts for failed requests.
179    pub max_retries: Option<u32>,
180    /// Initial delay in milliseconds between retry attempts.
181    pub retry_delay_ms: Option<u64>,
182    /// Maximum delay in milliseconds between retry attempts.
183    pub retry_delay_max_ms: Option<u64>,
184    /// Expiration window in milliseconds for signed requests.
185    pub recv_window_ms: Option<u64>,
186    /// Maximum REST burst rate (requests per second).
187    pub max_requests_per_second: Option<u32>,
188    /// Maximum REST rolling rate (requests per minute).
189    pub max_requests_per_minute: Option<u32>,
190    /// Interval in seconds between health check pings.
191    pub health_check_interval_secs: u64,
192    /// Timeout in seconds for health check requests.
193    pub health_check_timeout_secs: u64,
194    /// Substrings to identify expected submit rejections for debug-level logging.
195    pub expected_reject_patterns: Vec<String>,
196    /// Optional list of proxy URLs for path diversity.
197    ///
198    /// Each transport instance uses the proxy at its index. If the list is shorter
199    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
200    /// are ignored.
201    pub proxy_urls: Vec<Option<String>>,
202}
203
204impl Default for SubmitBroadcasterConfig {
205    fn default() -> Self {
206        Self {
207            pool_size: 3,
208            api_key: None,
209            api_secret: None,
210            base_url: None,
211            testnet: false,
212            timeout_secs: Some(60),
213            max_retries: None,
214            retry_delay_ms: Some(1_000),
215            retry_delay_max_ms: Some(5_000),
216            recv_window_ms: Some(10_000),
217            max_requests_per_second: Some(10),
218            max_requests_per_minute: Some(120),
219            health_check_interval_secs: 30,
220            health_check_timeout_secs: 5,
221            expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
222            proxy_urls: vec![],
223        }
224    }
225}
226
227/// Transport client wrapper with health monitoring.
228#[derive(Clone)]
229struct TransportClient {
230    /// Executor wrapped in Arc to enable cloning without requiring Clone on SubmitExecutor.
231    ///
232    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
233    /// the executor across multiple TransportClient clones.
234    executor: Arc<dyn SubmitExecutor>,
235    client_id: String,
236    healthy: Arc<AtomicBool>,
237    submit_count: Arc<AtomicU64>,
238    error_count: Arc<AtomicU64>,
239}
240
241impl Debug for TransportClient {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        f.debug_struct("TransportClient")
244            .field("client_id", &self.client_id)
245            .field("healthy", &self.healthy)
246            .field("submit_count", &self.submit_count)
247            .field("error_count", &self.error_count)
248            .finish()
249    }
250}
251
252impl TransportClient {
253    fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
254        Self {
255            executor: Arc::new(executor),
256            client_id,
257            healthy: Arc::new(AtomicBool::new(true)),
258            submit_count: Arc::new(AtomicU64::new(0)),
259            error_count: Arc::new(AtomicU64::new(0)),
260        }
261    }
262
263    fn is_healthy(&self) -> bool {
264        self.healthy.load(Ordering::Relaxed)
265    }
266
267    fn mark_healthy(&self) {
268        self.healthy.store(true, Ordering::Relaxed);
269    }
270
271    fn mark_unhealthy(&self) {
272        self.healthy.store(false, Ordering::Relaxed);
273    }
274
275    fn get_submit_count(&self) -> u64 {
276        self.submit_count.load(Ordering::Relaxed)
277    }
278
279    fn get_error_count(&self) -> u64 {
280        self.error_count.load(Ordering::Relaxed)
281    }
282
283    async fn health_check(&self, timeout_secs: u64) -> bool {
284        match tokio::time::timeout(
285            Duration::from_secs(timeout_secs),
286            self.executor.health_check(),
287        )
288        .await
289        {
290            Ok(Ok(_)) => {
291                self.mark_healthy();
292                true
293            }
294            Ok(Err(e)) => {
295                tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
296                self.mark_unhealthy();
297                false
298            }
299            Err(_) => {
300                tracing::warn!("Health check timeout for client {}", self.client_id);
301                self.mark_unhealthy();
302                false
303            }
304        }
305    }
306
307    #[allow(clippy::too_many_arguments)]
308    async fn submit_order(
309        &self,
310        instrument_id: InstrumentId,
311        client_order_id: ClientOrderId,
312        order_side: OrderSide,
313        order_type: OrderType,
314        quantity: Quantity,
315        time_in_force: TimeInForce,
316        price: Option<Price>,
317        trigger_price: Option<Price>,
318        trigger_type: Option<TriggerType>,
319        display_qty: Option<Quantity>,
320        post_only: bool,
321        reduce_only: bool,
322        order_list_id: Option<OrderListId>,
323        contingency_type: Option<ContingencyType>,
324    ) -> anyhow::Result<OrderStatusReport> {
325        self.submit_count.fetch_add(1, Ordering::Relaxed);
326
327        match self
328            .executor
329            .submit_order(
330                instrument_id,
331                client_order_id,
332                order_side,
333                order_type,
334                quantity,
335                time_in_force,
336                price,
337                trigger_price,
338                trigger_type,
339                display_qty,
340                post_only,
341                reduce_only,
342                order_list_id,
343                contingency_type,
344            )
345            .await
346        {
347            Ok(report) => {
348                self.mark_healthy();
349                Ok(report)
350            }
351            Err(e) => {
352                self.error_count.fetch_add(1, Ordering::Relaxed);
353                Err(e)
354            }
355        }
356    }
357}
358
359/// Broadcasts submit requests to multiple HTTP clients for redundancy.
360///
361/// This broadcaster fans out submit requests to multiple pre-warmed HTTP clients
362/// in parallel, short-circuits when the first successful acknowledgement is received,
363/// and handles expected rejection patterns (duplicate clOrdID) with appropriate log levels.
364#[cfg_attr(feature = "python", pyo3::pyclass)]
365#[derive(Debug)]
366pub struct SubmitBroadcaster {
367    config: SubmitBroadcasterConfig,
368    transports: Arc<Vec<TransportClient>>,
369    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
370    running: Arc<AtomicBool>,
371    total_submits: Arc<AtomicU64>,
372    successful_submits: Arc<AtomicU64>,
373    failed_submits: Arc<AtomicU64>,
374    expected_rejects: Arc<AtomicU64>,
375}
376
377impl SubmitBroadcaster {
378    /// Creates a new [`SubmitBroadcaster`] with internal HTTP client pool.
379    ///
380    /// # Errors
381    ///
382    /// Returns an error if any HTTP client fails to initialize.
383    pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
384        let mut transports = Vec::with_capacity(config.pool_size);
385
386        // Synthesize base_url when testnet is true but base_url is None
387        let base_url = if config.testnet && config.base_url.is_none() {
388            Some(BITMEX_HTTP_TESTNET_URL.to_string())
389        } else {
390            config.base_url.clone()
391        };
392
393        for i in 0..config.pool_size {
394            // Assign proxy from config list, or None if index exceeds list length
395            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
396
397            let client = BitmexHttpClient::with_credentials(
398                config.api_key.clone(),
399                config.api_secret.clone(),
400                base_url.clone(),
401                config.timeout_secs,
402                config.max_retries,
403                config.retry_delay_ms,
404                config.retry_delay_max_ms,
405                config.recv_window_ms,
406                config.max_requests_per_second,
407                config.max_requests_per_minute,
408                proxy_url,
409            )
410            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
411
412            transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
413        }
414
415        Ok(Self {
416            config,
417            transports: Arc::new(transports),
418            health_check_task: Arc::new(RwLock::new(None)),
419            running: Arc::new(AtomicBool::new(false)),
420            total_submits: Arc::new(AtomicU64::new(0)),
421            successful_submits: Arc::new(AtomicU64::new(0)),
422            failed_submits: Arc::new(AtomicU64::new(0)),
423            expected_rejects: Arc::new(AtomicU64::new(0)),
424        })
425    }
426
427    /// Starts the broadcaster and health check loop.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if the broadcaster is already running.
432    pub async fn start(&self) -> anyhow::Result<()> {
433        if self.running.load(Ordering::Relaxed) {
434            return Ok(());
435        }
436
437        self.running.store(true, Ordering::Relaxed);
438
439        // Initial health check for all clients
440        self.run_health_checks().await;
441
442        // Start periodic health check task
443        let transports = Arc::clone(&self.transports);
444        let running = Arc::clone(&self.running);
445        let interval_secs = self.config.health_check_interval_secs;
446        let timeout_secs = self.config.health_check_timeout_secs;
447
448        let task = tokio::spawn(async move {
449            let mut ticker = interval(Duration::from_secs(interval_secs));
450            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
451
452            loop {
453                ticker.tick().await;
454
455                if !running.load(Ordering::Relaxed) {
456                    break;
457                }
458
459                let tasks: Vec<_> = transports
460                    .iter()
461                    .map(|t| t.health_check(timeout_secs))
462                    .collect();
463
464                let results = future::join_all(tasks).await;
465                let healthy_count = results.iter().filter(|&&r| r).count();
466
467                tracing::debug!(
468                    "Health check complete: {healthy_count}/{} clients healthy",
469                    results.len()
470                );
471            }
472        });
473
474        *self.health_check_task.write().await = Some(task);
475
476        tracing::info!(
477            "SubmitBroadcaster started with {} clients",
478            self.transports.len()
479        );
480
481        Ok(())
482    }
483
484    /// Stops the broadcaster and health check loop.
485    pub async fn stop(&self) {
486        if !self.running.load(Ordering::Relaxed) {
487            return;
488        }
489
490        self.running.store(false, Ordering::Relaxed);
491
492        if let Some(task) = self.health_check_task.write().await.take() {
493            task.abort();
494        }
495
496        tracing::info!("SubmitBroadcaster stopped");
497    }
498
499    async fn run_health_checks(&self) {
500        let tasks: Vec<_> = self
501            .transports
502            .iter()
503            .map(|t| t.health_check(self.config.health_check_timeout_secs))
504            .collect();
505
506        let results = future::join_all(tasks).await;
507        let healthy_count = results.iter().filter(|&&r| r).count();
508
509        tracing::debug!(
510            "Health check complete: {healthy_count}/{} clients healthy",
511            results.len()
512        );
513    }
514
515    fn is_expected_reject(&self, error_message: &str) -> bool {
516        self.config
517            .expected_reject_patterns
518            .iter()
519            .any(|pattern| error_message.contains(pattern))
520    }
521
522    /// Processes submit request results, handling success and failures.
523    ///
524    /// This helper consolidates the common error handling loop used for submit broadcasts.
525    async fn process_submit_results<T>(
526        &self,
527        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
528        operation: &str,
529        params: String,
530    ) -> anyhow::Result<T>
531    where
532        T: Send + 'static,
533    {
534        let mut errors = Vec::new();
535
536        while !handles.is_empty() {
537            let current_handles = std::mem::take(&mut handles);
538            let (result, _idx, remaining) = future::select_all(current_handles).await;
539            handles = remaining.into_iter().collect();
540
541            match result {
542                Ok((client_id, Ok(result))) => {
543                    // First success - abort remaining handles
544                    for handle in &handles {
545                        handle.abort();
546                    }
547                    self.successful_submits.fetch_add(1, Ordering::Relaxed);
548                    tracing::debug!("{} broadcast succeeded [{client_id}] {params}", operation,);
549                    return Ok(result);
550                }
551                Ok((client_id, Err(e))) => {
552                    let error_msg = e.to_string();
553
554                    if self.is_expected_reject(&error_msg) {
555                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
556                        tracing::debug!(
557                            "Expected {} rejection [{client_id}]: {error_msg} {params}",
558                            operation.to_lowercase(),
559                        );
560                        errors.push(error_msg);
561                    } else {
562                        tracing::warn!(
563                            "{} request failed [{client_id}]: {error_msg} {params}",
564                            operation,
565                        );
566                        errors.push(error_msg);
567                    }
568                }
569                Err(e) => {
570                    tracing::warn!("{} task join error: {e:?}", operation);
571                    errors.push(format!("Task panicked: {e:?}"));
572                }
573            }
574        }
575
576        // All tasks failed
577        self.failed_submits.fetch_add(1, Ordering::Relaxed);
578        tracing::error!(
579            "All {} requests failed: {errors:?} {params}",
580            operation.to_lowercase(),
581        );
582        Err(anyhow::anyhow!(
583            "All {} requests failed: {:?}",
584            operation.to_lowercase(),
585            errors
586        ))
587    }
588
589    /// Broadcasts a submit request to all healthy clients in parallel.
590    ///
591    /// # Returns
592    ///
593    /// - `Ok(report)` if successfully submitted with a report.
594    /// - `Err` if all requests failed.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if all submit requests fail or no healthy clients are available.
599    #[allow(clippy::too_many_arguments)]
600    pub async fn broadcast_submit(
601        &self,
602        instrument_id: InstrumentId,
603        client_order_id: ClientOrderId,
604        order_side: OrderSide,
605        order_type: OrderType,
606        quantity: Quantity,
607        time_in_force: TimeInForce,
608        price: Option<Price>,
609        trigger_price: Option<Price>,
610        trigger_type: Option<TriggerType>,
611        display_qty: Option<Quantity>,
612        post_only: bool,
613        reduce_only: bool,
614        order_list_id: Option<OrderListId>,
615        contingency_type: Option<ContingencyType>,
616        submit_tries: Option<usize>,
617    ) -> anyhow::Result<OrderStatusReport> {
618        self.total_submits.fetch_add(1, Ordering::Relaxed);
619
620        let pool_size = self.config.pool_size;
621        let actual_tries = if let Some(t) = submit_tries {
622            if t > pool_size {
623                // Use log macro for Python visibility for now
624                log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
625            }
626            std::cmp::min(t, pool_size)
627        } else {
628            pool_size
629        };
630
631        tracing::debug!(
632            "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
633        );
634
635        let healthy_transports: Vec<TransportClient> = self
636            .transports
637            .iter()
638            .filter(|t| t.is_healthy())
639            .take(actual_tries)
640            .cloned()
641            .collect();
642
643        if healthy_transports.is_empty() {
644            self.failed_submits.fetch_add(1, Ordering::Relaxed);
645            anyhow::bail!("No healthy transport clients available");
646        }
647
648        tracing::debug!(
649            "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
650            healthy_transports.len(),
651        );
652
653        let mut handles = Vec::new();
654        for (idx, transport) in healthy_transports.into_iter().enumerate() {
655            // First client uses original ID, subsequent clients get suffix to avoid duplicates
656            let modified_client_order_id = if idx == 0 {
657                client_order_id
658            } else {
659                ClientOrderId::new(format!("{}-{}", client_order_id.as_str(), idx))
660            };
661
662            let handle = tokio::spawn(async move {
663                let client_id = transport.client_id.clone();
664                let result = transport
665                    .submit_order(
666                        instrument_id,
667                        modified_client_order_id,
668                        order_side,
669                        order_type,
670                        quantity,
671                        time_in_force,
672                        price,
673                        trigger_price,
674                        trigger_type,
675                        display_qty,
676                        post_only,
677                        reduce_only,
678                        order_list_id,
679                        contingency_type,
680                    )
681                    .await;
682                (client_id, result)
683            });
684            handles.push(handle);
685        }
686
687        self.process_submit_results(
688            handles,
689            "Submit",
690            format!("(client_order_id={client_order_id:?})"),
691        )
692        .await
693    }
694
695    /// Gets broadcaster metrics.
696    pub fn get_metrics(&self) -> BroadcasterMetrics {
697        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
698        let total_clients = self.transports.len();
699
700        BroadcasterMetrics {
701            total_submits: self.total_submits.load(Ordering::Relaxed),
702            successful_submits: self.successful_submits.load(Ordering::Relaxed),
703            failed_submits: self.failed_submits.load(Ordering::Relaxed),
704            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
705            healthy_clients,
706            total_clients,
707        }
708    }
709
710    /// Gets broadcaster metrics (async version for use within async context).
711    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
712        self.get_metrics()
713    }
714
715    /// Gets per-client statistics.
716    pub fn get_client_stats(&self) -> Vec<ClientStats> {
717        self.transports
718            .iter()
719            .map(|t| ClientStats {
720                client_id: t.client_id.clone(),
721                healthy: t.is_healthy(),
722                submit_count: t.get_submit_count(),
723                error_count: t.get_error_count(),
724            })
725            .collect()
726    }
727
728    /// Gets per-client statistics (async version for use within async context).
729    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
730        self.get_client_stats()
731    }
732
733    /// Caches an instrument in all HTTP clients in the pool.
734    pub fn cache_instrument(&self, instrument: InstrumentAny) {
735        for transport in self.transports.iter() {
736            transport.executor.add_instrument(instrument.clone());
737        }
738    }
739
740    #[must_use]
741    pub fn clone_for_async(&self) -> Self {
742        Self {
743            config: self.config.clone(),
744            transports: Arc::clone(&self.transports),
745            health_check_task: Arc::clone(&self.health_check_task),
746            running: Arc::clone(&self.running),
747            total_submits: Arc::clone(&self.total_submits),
748            successful_submits: Arc::clone(&self.successful_submits),
749            failed_submits: Arc::clone(&self.failed_submits),
750            expected_rejects: Arc::clone(&self.expected_rejects),
751        }
752    }
753
754    #[cfg(test)]
755    fn new_with_transports(
756        config: SubmitBroadcasterConfig,
757        transports: Vec<TransportClient>,
758    ) -> Self {
759        Self {
760            config,
761            transports: Arc::new(transports),
762            health_check_task: Arc::new(RwLock::new(None)),
763            running: Arc::new(AtomicBool::new(false)),
764            total_submits: Arc::new(AtomicU64::new(0)),
765            successful_submits: Arc::new(AtomicU64::new(0)),
766            failed_submits: Arc::new(AtomicU64::new(0)),
767            expected_rejects: Arc::new(AtomicU64::new(0)),
768        }
769    }
770}
771
772/// Broadcaster metrics snapshot.
773#[derive(Debug, Clone)]
774pub struct BroadcasterMetrics {
775    pub total_submits: u64,
776    pub successful_submits: u64,
777    pub failed_submits: u64,
778    pub expected_rejects: u64,
779    pub healthy_clients: usize,
780    pub total_clients: usize,
781}
782
783/// Per-client statistics.
784#[derive(Debug, Clone)]
785pub struct ClientStats {
786    pub client_id: String,
787    pub healthy: bool,
788    pub submit_count: u64,
789    pub error_count: u64,
790}
791
792#[cfg(test)]
793mod tests {
794    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
795
796    use nautilus_core::UUID4;
797    use nautilus_model::{
798        enums::{
799            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
800        },
801        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
802        reports::OrderStatusReport,
803        types::{Price, Quantity},
804    };
805
806    use super::*;
807
808    /// Mock executor for testing.
809    #[derive(Clone)]
810    #[allow(clippy::type_complexity)]
811    struct MockExecutor {
812        handler: Arc<
813            dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
814                + Send
815                + Sync,
816        >,
817    }
818
819    impl MockExecutor {
820        fn new<F, Fut>(handler: F) -> Self
821        where
822            F: Fn() -> Fut + Send + Sync + 'static,
823            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
824        {
825            Self {
826                handler: Arc::new(move || Box::pin(handler())),
827            }
828        }
829    }
830
831    impl SubmitExecutor for MockExecutor {
832        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
833            Box::pin(async { Ok(()) })
834        }
835
836        #[allow(clippy::too_many_arguments)]
837        fn submit_order(
838            &self,
839            _instrument_id: InstrumentId,
840            _client_order_id: ClientOrderId,
841            _order_side: OrderSide,
842            _order_type: OrderType,
843            _quantity: Quantity,
844            _time_in_force: TimeInForce,
845            _price: Option<Price>,
846            _trigger_price: Option<Price>,
847            _trigger_type: Option<TriggerType>,
848            _display_qty: Option<Quantity>,
849            _post_only: bool,
850            _reduce_only: bool,
851            _order_list_id: Option<OrderListId>,
852            _contingency_type: Option<ContingencyType>,
853        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
854            (self.handler)()
855        }
856
857        fn add_instrument(&self, _instrument: InstrumentAny) {
858            // No-op for mock
859        }
860    }
861
862    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
863        OrderStatusReport {
864            account_id: AccountId::from("BITMEX-001"),
865            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
866            venue_order_id: VenueOrderId::from(venue_order_id),
867            order_side: OrderSide::Buy,
868            order_type: OrderType::Limit,
869            time_in_force: TimeInForce::Gtc,
870            order_status: OrderStatus::Accepted,
871            price: Some(Price::new(50000.0, 2)),
872            quantity: Quantity::new(100.0, 0),
873            filled_qty: Quantity::new(0.0, 0),
874            report_id: UUID4::new(),
875            ts_accepted: 0.into(),
876            ts_last: 0.into(),
877            ts_init: 0.into(),
878            client_order_id: None,
879            avg_px: None,
880            trigger_price: None,
881            trigger_type: None,
882            contingency_type: ContingencyType::NoContingency,
883            expire_time: None,
884            order_list_id: None,
885            venue_position_id: None,
886            linked_order_ids: None,
887            parent_order_id: None,
888            display_qty: None,
889            limit_offset: None,
890            trailing_offset: None,
891            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
892            post_only: false,
893            reduce_only: false,
894            cancel_reason: None,
895            ts_triggered: None,
896        }
897    }
898
899    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
900    where
901        F: Fn() -> Fut + Send + Sync + 'static,
902        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
903    {
904        let executor = MockExecutor::new(handler);
905        TransportClient::new(executor, client_id.to_string())
906    }
907
908    #[tokio::test]
909    async fn test_broadcast_submit_immediate_success() {
910        let report = create_test_report("ORDER-1");
911        let report_clone = report.clone();
912
913        let transports = vec![
914            create_stub_transport("client-0", move || {
915                let report = report_clone.clone();
916                async move { Ok(report) }
917            }),
918            create_stub_transport("client-1", || async {
919                tokio::time::sleep(Duration::from_secs(10)).await;
920                anyhow::bail!("Should be aborted")
921            }),
922        ];
923
924        let config = SubmitBroadcasterConfig::default();
925        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
926
927        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
928        let result = broadcaster
929            .broadcast_submit(
930                instrument_id,
931                ClientOrderId::from("O-123"),
932                OrderSide::Buy,
933                OrderType::Limit,
934                Quantity::new(100.0, 0),
935                TimeInForce::Gtc,
936                Some(Price::new(50000.0, 2)),
937                None,
938                None,
939                None,
940                false,
941                false,
942                None,
943                None,
944                None,
945            )
946            .await;
947
948        assert!(result.is_ok());
949        let returned_report = result.unwrap();
950        assert_eq!(returned_report.venue_order_id, report.venue_order_id);
951
952        let metrics = broadcaster.get_metrics_async().await;
953        assert_eq!(metrics.successful_submits, 1);
954        assert_eq!(metrics.failed_submits, 0);
955        assert_eq!(metrics.total_submits, 1);
956    }
957
958    #[tokio::test]
959    async fn test_broadcast_submit_duplicate_clordid_expected() {
960        let transports = vec![
961            create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
962            create_stub_transport("client-1", || async {
963                tokio::time::sleep(Duration::from_secs(10)).await;
964                anyhow::bail!("Should be aborted")
965            }),
966        ];
967
968        let config = SubmitBroadcasterConfig::default();
969        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
970
971        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
972        let result = broadcaster
973            .broadcast_submit(
974                instrument_id,
975                ClientOrderId::from("O-123"),
976                OrderSide::Buy,
977                OrderType::Limit,
978                Quantity::new(100.0, 0),
979                TimeInForce::Gtc,
980                Some(Price::new(50000.0, 2)),
981                None,
982                None,
983                None,
984                false,
985                false,
986                None,
987                None,
988                None,
989            )
990            .await;
991
992        assert!(result.is_err());
993
994        let metrics = broadcaster.get_metrics_async().await;
995        assert_eq!(metrics.expected_rejects, 1);
996        assert_eq!(metrics.successful_submits, 0);
997        assert_eq!(metrics.failed_submits, 1);
998    }
999
1000    #[tokio::test]
1001    async fn test_broadcast_submit_all_failures() {
1002        let transports = vec![
1003            create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1004            create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1005        ];
1006
1007        let config = SubmitBroadcasterConfig::default();
1008        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1009
1010        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1011        let result = broadcaster
1012            .broadcast_submit(
1013                instrument_id,
1014                ClientOrderId::from("O-456"),
1015                OrderSide::Sell,
1016                OrderType::Market,
1017                Quantity::new(50.0, 0),
1018                TimeInForce::Ioc,
1019                None,
1020                None,
1021                None,
1022                None,
1023                false,
1024                false,
1025                None,
1026                None,
1027                None,
1028            )
1029            .await;
1030
1031        assert!(result.is_err());
1032        assert!(
1033            result
1034                .unwrap_err()
1035                .to_string()
1036                .contains("All submit requests failed")
1037        );
1038
1039        let metrics = broadcaster.get_metrics_async().await;
1040        assert_eq!(metrics.failed_submits, 1);
1041        assert_eq!(metrics.successful_submits, 0);
1042    }
1043
1044    #[tokio::test]
1045    async fn test_broadcast_submit_no_healthy_clients() {
1046        let transport =
1047            create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1048        transport.healthy.store(false, Ordering::Relaxed);
1049
1050        let config = SubmitBroadcasterConfig::default();
1051        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1052
1053        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1054        let result = broadcaster
1055            .broadcast_submit(
1056                instrument_id,
1057                ClientOrderId::from("O-789"),
1058                OrderSide::Buy,
1059                OrderType::Limit,
1060                Quantity::new(100.0, 0),
1061                TimeInForce::Gtc,
1062                Some(Price::new(50000.0, 2)),
1063                None,
1064                None,
1065                None,
1066                false,
1067                false,
1068                None,
1069                None,
1070                None,
1071            )
1072            .await;
1073
1074        assert!(result.is_err());
1075        assert!(
1076            result
1077                .unwrap_err()
1078                .to_string()
1079                .contains("No healthy transport clients available")
1080        );
1081
1082        let metrics = broadcaster.get_metrics_async().await;
1083        assert_eq!(metrics.failed_submits, 1);
1084    }
1085
1086    #[tokio::test]
1087    async fn test_default_config() {
1088        let config = SubmitBroadcasterConfig {
1089            api_key: Some("test_key".to_string()),
1090            api_secret: Some("test_secret".to_string()),
1091            base_url: Some("https://test.example.com".to_string()),
1092            ..Default::default()
1093        };
1094
1095        let broadcaster = SubmitBroadcaster::new(config);
1096        assert!(broadcaster.is_ok());
1097
1098        let broadcaster = broadcaster.unwrap();
1099        let metrics = broadcaster.get_metrics_async().await;
1100
1101        // Default pool_size is 3
1102        assert_eq!(metrics.total_clients, 3);
1103    }
1104
1105    #[tokio::test]
1106    async fn test_broadcaster_lifecycle() {
1107        let config = SubmitBroadcasterConfig {
1108            pool_size: 2,
1109            api_key: Some("test_key".to_string()),
1110            api_secret: Some("test_secret".to_string()),
1111            base_url: Some("https://test.example.com".to_string()),
1112            testnet: false,
1113            timeout_secs: Some(5),
1114            max_retries: None,
1115            retry_delay_ms: None,
1116            retry_delay_max_ms: None,
1117            recv_window_ms: None,
1118            max_requests_per_second: None,
1119            max_requests_per_minute: None,
1120            health_check_interval_secs: 60,
1121            health_check_timeout_secs: 1,
1122            expected_reject_patterns: vec![],
1123            proxy_urls: vec![],
1124        };
1125
1126        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1127
1128        // Should not be running initially
1129        assert!(!broadcaster.running.load(Ordering::Relaxed));
1130
1131        // Start broadcaster
1132        let start_result = broadcaster.start().await;
1133        assert!(start_result.is_ok());
1134        assert!(broadcaster.running.load(Ordering::Relaxed));
1135
1136        // Starting again should be idempotent
1137        let start_again = broadcaster.start().await;
1138        assert!(start_again.is_ok());
1139
1140        // Stop broadcaster
1141        broadcaster.stop().await;
1142        assert!(!broadcaster.running.load(Ordering::Relaxed));
1143
1144        // Stopping again should be safe
1145        broadcaster.stop().await;
1146        assert!(!broadcaster.running.load(Ordering::Relaxed));
1147    }
1148
1149    #[tokio::test]
1150    async fn test_broadcast_submit_metrics_increment() {
1151        let report = create_test_report("ORDER-1");
1152        let report_clone = report.clone();
1153
1154        let transports = vec![create_stub_transport("client-0", move || {
1155            let report = report_clone.clone();
1156            async move { Ok(report) }
1157        })];
1158
1159        let config = SubmitBroadcasterConfig::default();
1160        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1161
1162        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1163        let _ = broadcaster
1164            .broadcast_submit(
1165                instrument_id,
1166                ClientOrderId::from("O-123"),
1167                OrderSide::Buy,
1168                OrderType::Limit,
1169                Quantity::new(100.0, 0),
1170                TimeInForce::Gtc,
1171                Some(Price::new(50000.0, 2)),
1172                None,
1173                None,
1174                None,
1175                false,
1176                false,
1177                None,
1178                None,
1179                None,
1180            )
1181            .await;
1182
1183        let metrics = broadcaster.get_metrics_async().await;
1184        assert_eq!(metrics.total_submits, 1);
1185        assert_eq!(metrics.successful_submits, 1);
1186        assert_eq!(metrics.failed_submits, 0);
1187    }
1188
1189    #[tokio::test]
1190    async fn test_broadcaster_creation_with_pool() {
1191        let config = SubmitBroadcasterConfig {
1192            pool_size: 4,
1193            api_key: Some("test_key".to_string()),
1194            api_secret: Some("test_secret".to_string()),
1195            base_url: Some("https://test.example.com".to_string()),
1196            ..Default::default()
1197        };
1198
1199        let broadcaster = SubmitBroadcaster::new(config);
1200        assert!(broadcaster.is_ok());
1201
1202        let broadcaster = broadcaster.unwrap();
1203        let metrics = broadcaster.get_metrics_async().await;
1204        assert_eq!(metrics.total_clients, 4);
1205    }
1206
1207    #[tokio::test]
1208    async fn test_client_stats_collection() {
1209        let report = create_test_report("ORDER-1");
1210        let report_clone = report.clone();
1211
1212        let transports = vec![
1213            create_stub_transport("client-0", move || {
1214                let report = report_clone.clone();
1215                async move { Ok(report) }
1216            }),
1217            create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1218        ];
1219
1220        let config = SubmitBroadcasterConfig::default();
1221        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1222
1223        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1224        let _ = broadcaster
1225            .broadcast_submit(
1226                instrument_id,
1227                ClientOrderId::from("O-123"),
1228                OrderSide::Buy,
1229                OrderType::Limit,
1230                Quantity::new(100.0, 0),
1231                TimeInForce::Gtc,
1232                Some(Price::new(50000.0, 2)),
1233                None,
1234                None,
1235                None,
1236                false,
1237                false,
1238                None,
1239                None,
1240                None,
1241            )
1242            .await;
1243
1244        let stats = broadcaster.get_client_stats_async().await;
1245        assert_eq!(stats.len(), 2);
1246
1247        let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1248        assert_eq!(client0.submit_count, 1);
1249        assert_eq!(client0.error_count, 0);
1250
1251        let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1252        assert_eq!(client1.submit_count, 1);
1253        assert_eq!(client1.error_count, 1);
1254    }
1255
1256    #[tokio::test]
1257    async fn test_testnet_config_sets_base_url() {
1258        let config = SubmitBroadcasterConfig {
1259            pool_size: 1,
1260            api_key: Some("test_key".to_string()),
1261            api_secret: Some("test_secret".to_string()),
1262            testnet: true,
1263            base_url: None,
1264            ..Default::default()
1265        };
1266
1267        let broadcaster = SubmitBroadcaster::new(config);
1268        assert!(broadcaster.is_ok());
1269    }
1270
1271    #[tokio::test]
1272    async fn test_clone_for_async() {
1273        let config = SubmitBroadcasterConfig {
1274            pool_size: 1,
1275            api_key: Some("test_key".to_string()),
1276            api_secret: Some("test_secret".to_string()),
1277            base_url: Some("https://test.example.com".to_string()),
1278            ..Default::default()
1279        };
1280
1281        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1282        let cloned = broadcaster.clone_for_async();
1283
1284        // Verify they share the same atomics
1285        broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1286        assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1287    }
1288
1289    #[tokio::test]
1290    async fn test_pattern_matching() {
1291        let config = SubmitBroadcasterConfig {
1292            expected_reject_patterns: vec![
1293                "Duplicate clOrdID".to_string(),
1294                "Order already exists".to_string(),
1295            ],
1296            ..Default::default()
1297        };
1298
1299        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1300
1301        assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1302        assert!(broadcaster.is_expected_reject("Order already exists in system"));
1303        assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1304        assert!(!broadcaster.is_expected_reject("Internal server error"));
1305    }
1306
1307    #[tokio::test]
1308    async fn test_submit_metrics_with_mixed_responses() {
1309        let report = create_test_report("ORDER-1");
1310        let report_clone = report.clone();
1311
1312        let transports = vec![
1313            create_stub_transport("client-0", move || {
1314                let report = report_clone.clone();
1315                async move { Ok(report) }
1316            }),
1317            create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1318        ];
1319
1320        let config = SubmitBroadcasterConfig::default();
1321        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1322
1323        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1324        let result = broadcaster
1325            .broadcast_submit(
1326                instrument_id,
1327                ClientOrderId::from("O-123"),
1328                OrderSide::Buy,
1329                OrderType::Limit,
1330                Quantity::new(100.0, 0),
1331                TimeInForce::Gtc,
1332                Some(Price::new(50000.0, 2)),
1333                None,
1334                None,
1335                None,
1336                false,
1337                false,
1338                None,
1339                None,
1340                None,
1341            )
1342            .await;
1343
1344        assert!(result.is_ok());
1345
1346        let metrics = broadcaster.get_metrics_async().await;
1347        assert_eq!(metrics.total_submits, 1);
1348        assert_eq!(metrics.successful_submits, 1);
1349        assert_eq!(metrics.failed_submits, 0);
1350    }
1351
1352    #[tokio::test]
1353    async fn test_metrics_initialization_and_health() {
1354        let config = SubmitBroadcasterConfig {
1355            pool_size: 2,
1356            api_key: Some("test_key".to_string()),
1357            api_secret: Some("test_secret".to_string()),
1358            base_url: Some("https://test.example.com".to_string()),
1359            ..Default::default()
1360        };
1361
1362        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1363        let metrics = broadcaster.get_metrics_async().await;
1364
1365        assert_eq!(metrics.total_submits, 0);
1366        assert_eq!(metrics.successful_submits, 0);
1367        assert_eq!(metrics.failed_submits, 0);
1368        assert_eq!(metrics.expected_rejects, 0);
1369        assert_eq!(metrics.total_clients, 2);
1370        assert_eq!(metrics.healthy_clients, 2);
1371    }
1372
1373    #[tokio::test]
1374    async fn test_health_check_task_lifecycle() {
1375        let config = SubmitBroadcasterConfig {
1376            pool_size: 2,
1377            api_key: Some("test_key".to_string()),
1378            api_secret: Some("test_secret".to_string()),
1379            base_url: Some("https://test.example.com".to_string()),
1380            health_check_interval_secs: 1,
1381            ..Default::default()
1382        };
1383
1384        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1385
1386        // Start should spawn health check task
1387        broadcaster.start().await.unwrap();
1388        assert!(broadcaster.running.load(Ordering::Relaxed));
1389        assert!(
1390            broadcaster
1391                .health_check_task
1392                .read()
1393                .await
1394                .as_ref()
1395                .is_some()
1396        );
1397
1398        // Wait a bit to let health checks run
1399        tokio::time::sleep(Duration::from_millis(100)).await;
1400
1401        // Stop should clean up task
1402        broadcaster.stop().await;
1403        assert!(!broadcaster.running.load(Ordering::Relaxed));
1404    }
1405
1406    #[tokio::test]
1407    async fn test_expected_reject_pattern_comprehensive() {
1408        let transports = vec![
1409            create_stub_transport("client-0", || async {
1410                anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1411            }),
1412            create_stub_transport("client-1", || async {
1413                tokio::time::sleep(Duration::from_secs(10)).await;
1414                anyhow::bail!("Should be aborted")
1415            }),
1416        ];
1417
1418        let config = SubmitBroadcasterConfig::default();
1419        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1420
1421        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1422        let result = broadcaster
1423            .broadcast_submit(
1424                instrument_id,
1425                ClientOrderId::from("O-123"),
1426                OrderSide::Buy,
1427                OrderType::Limit,
1428                Quantity::new(100.0, 0),
1429                TimeInForce::Gtc,
1430                Some(Price::new(50000.0, 2)),
1431                None,
1432                None,
1433                None,
1434                false,
1435                false,
1436                None,
1437                None,
1438                None,
1439            )
1440            .await;
1441
1442        // All failed with expected reject
1443        assert!(result.is_err());
1444
1445        let metrics = broadcaster.get_metrics_async().await;
1446        assert_eq!(metrics.expected_rejects, 1);
1447        assert_eq!(metrics.failed_submits, 1);
1448        assert_eq!(metrics.successful_submits, 0);
1449    }
1450
1451    #[tokio::test]
1452    async fn test_client_order_id_suffix_for_multiple_clients() {
1453        use std::sync::{Arc, Mutex};
1454
1455        #[derive(Clone)]
1456        struct CaptureExecutor {
1457            captured_ids: Arc<Mutex<Vec<String>>>,
1458            report: OrderStatusReport,
1459        }
1460
1461        impl SubmitExecutor for CaptureExecutor {
1462            fn health_check(
1463                &self,
1464            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1465                Box::pin(async { Ok(()) })
1466            }
1467
1468            #[allow(clippy::too_many_arguments)]
1469            fn submit_order(
1470                &self,
1471                _instrument_id: InstrumentId,
1472                client_order_id: ClientOrderId,
1473                _order_side: OrderSide,
1474                _order_type: OrderType,
1475                _quantity: Quantity,
1476                _time_in_force: TimeInForce,
1477                _price: Option<Price>,
1478                _trigger_price: Option<Price>,
1479                _trigger_type: Option<TriggerType>,
1480                _display_qty: Option<Quantity>,
1481                _post_only: bool,
1482                _reduce_only: bool,
1483                _order_list_id: Option<OrderListId>,
1484                _contingency_type: Option<ContingencyType>,
1485            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1486            {
1487                // Capture the client_order_id
1488                self.captured_ids
1489                    .lock()
1490                    .unwrap()
1491                    .push(client_order_id.as_str().to_string());
1492                let report = self.report.clone();
1493                Box::pin(async move { Ok(report) })
1494            }
1495
1496            fn add_instrument(&self, _instrument: InstrumentAny) {}
1497        }
1498
1499        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1500        let report = create_test_report("ORDER-1");
1501
1502        let transports = vec![
1503            TransportClient::new(
1504                CaptureExecutor {
1505                    captured_ids: Arc::clone(&captured_ids),
1506                    report: report.clone(),
1507                },
1508                "client-0".to_string(),
1509            ),
1510            TransportClient::new(
1511                CaptureExecutor {
1512                    captured_ids: Arc::clone(&captured_ids),
1513                    report: report.clone(),
1514                },
1515                "client-1".to_string(),
1516            ),
1517            TransportClient::new(
1518                CaptureExecutor {
1519                    captured_ids: Arc::clone(&captured_ids),
1520                    report: report.clone(),
1521                },
1522                "client-2".to_string(),
1523            ),
1524        ];
1525
1526        let config = SubmitBroadcasterConfig::default();
1527        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1528
1529        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1530        let result = broadcaster
1531            .broadcast_submit(
1532                instrument_id,
1533                ClientOrderId::from("O-123"),
1534                OrderSide::Buy,
1535                OrderType::Limit,
1536                Quantity::new(100.0, 0),
1537                TimeInForce::Gtc,
1538                Some(Price::new(50000.0, 2)),
1539                None,
1540                None,
1541                None,
1542                false,
1543                false,
1544                None,
1545                None,
1546                None,
1547            )
1548            .await;
1549
1550        assert!(result.is_ok());
1551
1552        // Check captured client_order_ids
1553        let ids = captured_ids.lock().unwrap();
1554        assert_eq!(ids.len(), 3);
1555        assert_eq!(ids[0], "O-123"); // First client gets original ID
1556        assert_eq!(ids[1], "O-123-1"); // Second client gets suffix -1
1557        assert_eq!(ids[2], "O-123-2"); // Third client gets suffix -2
1558    }
1559
1560    #[tokio::test]
1561    async fn test_client_order_id_suffix_with_partial_failure() {
1562        use std::sync::{Arc, Mutex};
1563
1564        #[derive(Clone)]
1565        struct CaptureAndFailExecutor {
1566            captured_ids: Arc<Mutex<Vec<String>>>,
1567            should_succeed: bool,
1568        }
1569
1570        impl SubmitExecutor for CaptureAndFailExecutor {
1571            fn health_check(
1572                &self,
1573            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1574                Box::pin(async { Ok(()) })
1575            }
1576
1577            #[allow(clippy::too_many_arguments)]
1578            fn submit_order(
1579                &self,
1580                _instrument_id: InstrumentId,
1581                client_order_id: ClientOrderId,
1582                _order_side: OrderSide,
1583                _order_type: OrderType,
1584                _quantity: Quantity,
1585                _time_in_force: TimeInForce,
1586                _price: Option<Price>,
1587                _trigger_price: Option<Price>,
1588                _trigger_type: Option<TriggerType>,
1589                _display_qty: Option<Quantity>,
1590                _post_only: bool,
1591                _reduce_only: bool,
1592                _order_list_id: Option<OrderListId>,
1593                _contingency_type: Option<ContingencyType>,
1594            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1595            {
1596                // Capture the client_order_id
1597                self.captured_ids
1598                    .lock()
1599                    .unwrap()
1600                    .push(client_order_id.as_str().to_string());
1601                let should_succeed = self.should_succeed;
1602                Box::pin(async move {
1603                    if should_succeed {
1604                        Ok(create_test_report("ORDER-1"))
1605                    } else {
1606                        anyhow::bail!("Network error")
1607                    }
1608                })
1609            }
1610
1611            fn add_instrument(&self, _instrument: InstrumentAny) {}
1612        }
1613
1614        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1615
1616        let transports = vec![
1617            TransportClient::new(
1618                CaptureAndFailExecutor {
1619                    captured_ids: Arc::clone(&captured_ids),
1620                    should_succeed: false,
1621                },
1622                "client-0".to_string(),
1623            ),
1624            TransportClient::new(
1625                CaptureAndFailExecutor {
1626                    captured_ids: Arc::clone(&captured_ids),
1627                    should_succeed: true,
1628                },
1629                "client-1".to_string(),
1630            ),
1631        ];
1632
1633        let config = SubmitBroadcasterConfig::default();
1634        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1635
1636        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1637        let result = broadcaster
1638            .broadcast_submit(
1639                instrument_id,
1640                ClientOrderId::from("O-456"),
1641                OrderSide::Sell,
1642                OrderType::Market,
1643                Quantity::new(50.0, 0),
1644                TimeInForce::Ioc,
1645                None,
1646                None,
1647                None,
1648                None,
1649                false,
1650                false,
1651                None,
1652                None,
1653                None,
1654            )
1655            .await;
1656
1657        assert!(result.is_ok());
1658
1659        // Check that both clients received unique client_order_ids
1660        let ids = captured_ids.lock().unwrap();
1661        assert_eq!(ids.len(), 2);
1662        assert_eq!(ids[0], "O-456"); // First client gets original ID
1663        assert_eq!(ids[1], "O-456-1"); // Second client gets suffix -1
1664    }
1665
1666    #[tokio::test]
1667    async fn test_proxy_urls_populated_from_config() {
1668        let config = SubmitBroadcasterConfig {
1669            pool_size: 3,
1670            api_key: Some("test_key".to_string()),
1671            api_secret: Some("test_secret".to_string()),
1672            proxy_urls: vec![
1673                Some("http://proxy1:8080".to_string()),
1674                Some("http://proxy2:8080".to_string()),
1675                Some("http://proxy3:8080".to_string()),
1676            ],
1677            ..Default::default()
1678        };
1679
1680        assert_eq!(config.proxy_urls.len(), 3);
1681        assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1682        assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1683        assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1684    }
1685}