Skip to main content

nautilus_bitmex/broadcast/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Submit request broadcaster for redundant order submission.
17//!
18//! This module provides the [`SubmitBroadcaster`] which fans out submit requests
19//! to multiple HTTP clients in parallel for redundancy. The broadcaster is triggered
20//! when the `SubmitOrder` command contains `params["broadcast_submit_tries"]`.
21//!
22//! Key design patterns:
23//!
24//! - **Dependency injection via traits**: Uses `SubmitExecutor` trait to abstract
25//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
26//! - **Trait objects over generics**: Uses `Arc<dyn SubmitExecutor>` to avoid
27//!   generic type parameters on the public API (simpler Python FFI).
28//! - **Short-circuit on first success**: Aborts remaining requests once any client
29//!   succeeds, minimizing latency.
30//! - **Idempotent rejection handling**: Recognizes duplicate clOrdID as expected
31//!   rejections for debug-level logging without noise.
32
33// TODO: Replace boxed futures in `SubmitExecutor` once stable async trait object support
34// lands so we can drop the per-call heap allocation
35
36use std::{
37    fmt::Debug,
38    future::Future,
39    pin::Pin,
40    sync::{
41        Arc,
42        atomic::{AtomicBool, AtomicU64, Ordering},
43    },
44    time::Duration,
45};
46
47use futures_util::future;
48use nautilus_common::live::get_runtime;
49use nautilus_model::{
50    enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
51    identifiers::{ClientOrderId, InstrumentId, OrderListId},
52    instruments::InstrumentAny,
53    reports::OrderStatusReport,
54    types::{Price, Quantity},
55};
56use tokio::{sync::RwLock, task::JoinHandle, time::interval};
57
58use crate::{
59    common::{consts::BITMEX_HTTP_TESTNET_URL, enums::BitmexPegPriceType},
60    http::client::BitmexHttpClient,
61};
62
63/// Trait for order submission operations.
64///
65/// This trait abstracts the execution layer to enable dependency injection and testing
66/// without conditional compilation. The broadcaster holds executors as `Arc<dyn SubmitExecutor>`
67/// to avoid generic type parameters that would complicate the Python FFI boundary.
68///
69/// # Thread Safety
70///
71/// All methods must be safe to call concurrently from multiple threads. Implementations
72/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
73///
74/// # Error Handling
75///
76/// Methods return `anyhow::Result` for flexibility. Implementers should provide
77/// meaningful error messages that can be logged and tracked by the broadcaster.
78///
79/// # Implementation Note
80///
81/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
82/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
83/// `Clone`) to be used without modification.
84trait SubmitExecutor: Send + Sync {
85    /// Adds an instrument for caching.
86    fn add_instrument(&self, instrument: InstrumentAny);
87
88    /// Performs a health check on the executor.
89    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
90
91    /// Submits a single order.
92    #[allow(clippy::too_many_arguments)]
93    fn submit_order(
94        &self,
95        instrument_id: InstrumentId,
96        client_order_id: ClientOrderId,
97        order_side: OrderSide,
98        order_type: OrderType,
99        quantity: Quantity,
100        time_in_force: TimeInForce,
101        price: Option<Price>,
102        trigger_price: Option<Price>,
103        trigger_type: Option<TriggerType>,
104        trailing_offset: Option<f64>,
105        trailing_offset_type: Option<TrailingOffsetType>,
106        display_qty: Option<Quantity>,
107        post_only: bool,
108        reduce_only: bool,
109        order_list_id: Option<OrderListId>,
110        contingency_type: Option<ContingencyType>,
111        peg_price_type: Option<BitmexPegPriceType>,
112        peg_offset_value: Option<f64>,
113    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
114}
115
116impl SubmitExecutor for BitmexHttpClient {
117    fn add_instrument(&self, instrument: InstrumentAny) {
118        Self::cache_instrument(self, instrument);
119    }
120
121    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
122        Box::pin(async move {
123            Self::get_server_time(self)
124                .await
125                .map(|_| ())
126                .map_err(|e| anyhow::anyhow!("{e}"))
127        })
128    }
129
130    #[allow(clippy::too_many_arguments)]
131    fn submit_order(
132        &self,
133        instrument_id: InstrumentId,
134        client_order_id: ClientOrderId,
135        order_side: OrderSide,
136        order_type: OrderType,
137        quantity: Quantity,
138        time_in_force: TimeInForce,
139        price: Option<Price>,
140        trigger_price: Option<Price>,
141        trigger_type: Option<TriggerType>,
142        trailing_offset: Option<f64>,
143        trailing_offset_type: Option<TrailingOffsetType>,
144        display_qty: Option<Quantity>,
145        post_only: bool,
146        reduce_only: bool,
147        order_list_id: Option<OrderListId>,
148        contingency_type: Option<ContingencyType>,
149        peg_price_type: Option<BitmexPegPriceType>,
150        peg_offset_value: Option<f64>,
151    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
152        Box::pin(async move {
153            Self::submit_order(
154                self,
155                instrument_id,
156                client_order_id,
157                order_side,
158                order_type,
159                quantity,
160                time_in_force,
161                price,
162                trigger_price,
163                trigger_type,
164                trailing_offset,
165                trailing_offset_type,
166                display_qty,
167                post_only,
168                reduce_only,
169                order_list_id,
170                contingency_type,
171                peg_price_type,
172                peg_offset_value,
173            )
174            .await
175        })
176    }
177}
178
179/// Configuration for the submit broadcaster.
180#[derive(Debug, Clone)]
181pub struct SubmitBroadcasterConfig {
182    /// Number of HTTP clients in the pool.
183    pub pool_size: usize,
184    /// BitMEX API key (None will source from environment).
185    pub api_key: Option<String>,
186    /// BitMEX API secret (None will source from environment).
187    pub api_secret: Option<String>,
188    /// Base URL for BitMEX HTTP API.
189    pub base_url: Option<String>,
190    /// If connecting to BitMEX testnet.
191    pub testnet: bool,
192    /// Timeout in seconds for HTTP requests.
193    pub timeout_secs: Option<u64>,
194    /// Maximum number of retry attempts for failed requests.
195    pub max_retries: Option<u32>,
196    /// Initial delay in milliseconds between retry attempts.
197    pub retry_delay_ms: Option<u64>,
198    /// Maximum delay in milliseconds between retry attempts.
199    pub retry_delay_max_ms: Option<u64>,
200    /// Expiration window in milliseconds for signed requests.
201    pub recv_window_ms: Option<u64>,
202    /// Maximum REST burst rate (requests per second).
203    pub max_requests_per_second: Option<u32>,
204    /// Maximum REST rolling rate (requests per minute).
205    pub max_requests_per_minute: Option<u32>,
206    /// Interval in seconds between health check pings.
207    pub health_check_interval_secs: u64,
208    /// Timeout in seconds for health check requests.
209    pub health_check_timeout_secs: u64,
210    /// Substrings to identify expected submit rejections for debug-level logging.
211    pub expected_reject_patterns: Vec<String>,
212    /// Optional list of proxy URLs for path diversity.
213    ///
214    /// Each transport instance uses the proxy at its index. If the list is shorter
215    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
216    /// are ignored.
217    pub proxy_urls: Vec<Option<String>>,
218}
219
220impl Default for SubmitBroadcasterConfig {
221    fn default() -> Self {
222        Self {
223            pool_size: 3,
224            api_key: None,
225            api_secret: None,
226            base_url: None,
227            testnet: false,
228            timeout_secs: Some(60),
229            max_retries: None,
230            retry_delay_ms: Some(1_000),
231            retry_delay_max_ms: Some(5_000),
232            recv_window_ms: Some(10_000),
233            max_requests_per_second: Some(10),
234            max_requests_per_minute: Some(120),
235            health_check_interval_secs: 30,
236            health_check_timeout_secs: 5,
237            expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
238            proxy_urls: vec![],
239        }
240    }
241}
242
243/// Transport client wrapper with health monitoring.
244#[derive(Clone)]
245struct TransportClient {
246    /// Executor wrapped in Arc to enable cloning without requiring Clone on SubmitExecutor.
247    ///
248    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
249    /// the executor across multiple TransportClient clones.
250    executor: Arc<dyn SubmitExecutor>,
251    client_id: String,
252    healthy: Arc<AtomicBool>,
253    submit_count: Arc<AtomicU64>,
254    error_count: Arc<AtomicU64>,
255}
256
257impl Debug for TransportClient {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        f.debug_struct(stringify!(TransportClient))
260            .field("client_id", &self.client_id)
261            .field("healthy", &self.healthy)
262            .field("submit_count", &self.submit_count)
263            .field("error_count", &self.error_count)
264            .finish()
265    }
266}
267
268impl TransportClient {
269    fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
270        Self {
271            executor: Arc::new(executor),
272            client_id,
273            healthy: Arc::new(AtomicBool::new(true)),
274            submit_count: Arc::new(AtomicU64::new(0)),
275            error_count: Arc::new(AtomicU64::new(0)),
276        }
277    }
278
279    fn is_healthy(&self) -> bool {
280        self.healthy.load(Ordering::Relaxed)
281    }
282
283    fn mark_healthy(&self) {
284        self.healthy.store(true, Ordering::Relaxed);
285    }
286
287    fn mark_unhealthy(&self) {
288        self.healthy.store(false, Ordering::Relaxed);
289    }
290
291    fn get_submit_count(&self) -> u64 {
292        self.submit_count.load(Ordering::Relaxed)
293    }
294
295    fn get_error_count(&self) -> u64 {
296        self.error_count.load(Ordering::Relaxed)
297    }
298
299    async fn health_check(&self, timeout_secs: u64) -> bool {
300        match tokio::time::timeout(
301            Duration::from_secs(timeout_secs),
302            self.executor.health_check(),
303        )
304        .await
305        {
306            Ok(Ok(())) => {
307                self.mark_healthy();
308                true
309            }
310            Ok(Err(e)) => {
311                log::warn!("Health check failed for client {}: {e:?}", self.client_id);
312                self.mark_unhealthy();
313                false
314            }
315            Err(_) => {
316                log::warn!("Health check timeout for client {}", self.client_id);
317                self.mark_unhealthy();
318                false
319            }
320        }
321    }
322
323    #[allow(clippy::too_many_arguments)]
324    async fn submit_order(
325        &self,
326        instrument_id: InstrumentId,
327        client_order_id: ClientOrderId,
328        order_side: OrderSide,
329        order_type: OrderType,
330        quantity: Quantity,
331        time_in_force: TimeInForce,
332        price: Option<Price>,
333        trigger_price: Option<Price>,
334        trigger_type: Option<TriggerType>,
335        trailing_offset: Option<f64>,
336        trailing_offset_type: Option<TrailingOffsetType>,
337        display_qty: Option<Quantity>,
338        post_only: bool,
339        reduce_only: bool,
340        order_list_id: Option<OrderListId>,
341        contingency_type: Option<ContingencyType>,
342        peg_price_type: Option<BitmexPegPriceType>,
343        peg_offset_value: Option<f64>,
344    ) -> anyhow::Result<OrderStatusReport> {
345        self.submit_count.fetch_add(1, Ordering::Relaxed);
346
347        match self
348            .executor
349            .submit_order(
350                instrument_id,
351                client_order_id,
352                order_side,
353                order_type,
354                quantity,
355                time_in_force,
356                price,
357                trigger_price,
358                trigger_type,
359                trailing_offset,
360                trailing_offset_type,
361                display_qty,
362                post_only,
363                reduce_only,
364                order_list_id,
365                contingency_type,
366                peg_price_type,
367                peg_offset_value,
368            )
369            .await
370        {
371            Ok(report) => {
372                self.mark_healthy();
373                Ok(report)
374            }
375            Err(e) => {
376                self.error_count.fetch_add(1, Ordering::Relaxed);
377                Err(e)
378            }
379        }
380    }
381}
382
383/// Broadcasts submit requests to multiple HTTP clients for redundancy.
384///
385/// This broadcaster fans out submit requests to multiple pre-warmed HTTP clients
386/// in parallel, short-circuits when the first successful acknowledgement is received,
387/// and handles expected rejection patterns (duplicate clOrdID) with appropriate log levels.
388#[cfg_attr(feature = "python", pyo3::pyclass)]
389#[derive(Debug)]
390pub struct SubmitBroadcaster {
391    config: SubmitBroadcasterConfig,
392    transports: Arc<Vec<TransportClient>>,
393    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
394    running: Arc<AtomicBool>,
395    total_submits: Arc<AtomicU64>,
396    successful_submits: Arc<AtomicU64>,
397    failed_submits: Arc<AtomicU64>,
398    expected_rejects: Arc<AtomicU64>,
399}
400
401impl SubmitBroadcaster {
402    /// Creates a new [`SubmitBroadcaster`] with internal HTTP client pool.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if any HTTP client fails to initialize.
407    pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
408        let mut transports = Vec::with_capacity(config.pool_size);
409
410        // Synthesize base_url when testnet is true but base_url is None
411        let base_url = if config.testnet && config.base_url.is_none() {
412            Some(BITMEX_HTTP_TESTNET_URL.to_string())
413        } else {
414            config.base_url.clone()
415        };
416
417        for i in 0..config.pool_size {
418            // Assign proxy from config list, or None if index exceeds list length
419            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
420
421            let client = BitmexHttpClient::with_credentials(
422                config.api_key.clone(),
423                config.api_secret.clone(),
424                base_url.clone(),
425                config.timeout_secs,
426                config.max_retries,
427                config.retry_delay_ms,
428                config.retry_delay_max_ms,
429                config.recv_window_ms,
430                config.max_requests_per_second,
431                config.max_requests_per_minute,
432                proxy_url,
433            )
434            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
435
436            transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
437        }
438
439        Ok(Self {
440            config,
441            transports: Arc::new(transports),
442            health_check_task: Arc::new(RwLock::new(None)),
443            running: Arc::new(AtomicBool::new(false)),
444            total_submits: Arc::new(AtomicU64::new(0)),
445            successful_submits: Arc::new(AtomicU64::new(0)),
446            failed_submits: Arc::new(AtomicU64::new(0)),
447            expected_rejects: Arc::new(AtomicU64::new(0)),
448        })
449    }
450
451    /// Starts the broadcaster and health check loop.
452    ///
453    /// # Errors
454    ///
455    /// Returns an error if the broadcaster is already running.
456    pub async fn start(&self) -> anyhow::Result<()> {
457        if self.running.load(Ordering::Relaxed) {
458            return Ok(());
459        }
460
461        self.running.store(true, Ordering::Relaxed);
462
463        // Initial health check for all clients
464        self.run_health_checks().await;
465
466        // Start periodic health check task
467        let transports = Arc::clone(&self.transports);
468        let running = Arc::clone(&self.running);
469        let interval_secs = self.config.health_check_interval_secs;
470        let timeout_secs = self.config.health_check_timeout_secs;
471
472        let task = get_runtime().spawn(async move {
473            let mut ticker = interval(Duration::from_secs(interval_secs));
474            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
475
476            loop {
477                ticker.tick().await;
478
479                if !running.load(Ordering::Relaxed) {
480                    break;
481                }
482
483                let tasks: Vec<_> = transports
484                    .iter()
485                    .map(|t| t.health_check(timeout_secs))
486                    .collect();
487
488                let results = future::join_all(tasks).await;
489                let healthy_count = results.iter().filter(|&&r| r).count();
490
491                log::debug!(
492                    "Health check complete: {healthy_count}/{} clients healthy",
493                    results.len()
494                );
495            }
496        });
497
498        *self.health_check_task.write().await = Some(task);
499
500        log::info!(
501            "SubmitBroadcaster started with {} clients",
502            self.transports.len()
503        );
504
505        Ok(())
506    }
507
508    /// Stops the broadcaster and health check loop.
509    pub async fn stop(&self) {
510        if !self.running.load(Ordering::Relaxed) {
511            return;
512        }
513
514        self.running.store(false, Ordering::Relaxed);
515
516        if let Some(task) = self.health_check_task.write().await.take() {
517            task.abort();
518        }
519
520        log::info!("SubmitBroadcaster stopped");
521    }
522
523    async fn run_health_checks(&self) {
524        let tasks: Vec<_> = self
525            .transports
526            .iter()
527            .map(|t| t.health_check(self.config.health_check_timeout_secs))
528            .collect();
529
530        let results = future::join_all(tasks).await;
531        let healthy_count = results.iter().filter(|&&r| r).count();
532
533        log::debug!(
534            "Health check complete: {healthy_count}/{} clients healthy",
535            results.len()
536        );
537    }
538
539    fn is_expected_reject(&self, error_message: &str) -> bool {
540        self.config
541            .expected_reject_patterns
542            .iter()
543            .any(|pattern| error_message.contains(pattern))
544    }
545
546    /// Processes submit request results, handling success and failures.
547    ///
548    /// This helper consolidates the common error handling loop used for submit broadcasts.
549    async fn process_submit_results<T>(
550        &self,
551        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
552        operation: &str,
553        params: String,
554    ) -> anyhow::Result<T>
555    where
556        T: Send + 'static,
557    {
558        let mut errors = Vec::new();
559        let mut all_duplicate_clordid = true;
560
561        while !handles.is_empty() {
562            let current_handles = std::mem::take(&mut handles);
563            let (result, _idx, remaining) = future::select_all(current_handles).await;
564            handles = remaining.into_iter().collect();
565
566            match result {
567                Ok((client_id, Ok(result))) => {
568                    // First success - abort remaining handles
569                    for handle in &handles {
570                        handle.abort();
571                    }
572                    self.successful_submits.fetch_add(1, Ordering::Relaxed);
573                    log::debug!("{operation} broadcast succeeded [{client_id}] {params}",);
574                    return Ok(result);
575                }
576                Ok((client_id, Err(e))) => {
577                    let error_msg = e.to_string();
578                    let is_duplicate = error_msg.contains("Duplicate clOrdID");
579
580                    if !is_duplicate {
581                        all_duplicate_clordid = false;
582                    }
583
584                    if self.is_expected_reject(&error_msg) {
585                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
586                        log::debug!(
587                            "Expected {} rejection [{client_id}]: {error_msg} {params}",
588                            operation.to_lowercase(),
589                        );
590                        errors.push(error_msg);
591                    } else {
592                        log::warn!(
593                            "{operation} request failed [{client_id}]: {error_msg} {params}",
594                        );
595                        errors.push(error_msg);
596                    }
597                }
598                Err(e) => {
599                    all_duplicate_clordid = false;
600                    log::warn!("{operation} task join error: {e:?}");
601                    errors.push(format!("Task panicked: {e:?}"));
602                }
603            }
604        }
605
606        // All tasks failed
607        self.failed_submits.fetch_add(1, Ordering::Relaxed);
608
609        // If all errors were "Duplicate clOrdID", this is likely an idempotent scenario
610        // where the order exists but the success response was lost
611        if all_duplicate_clordid && !errors.is_empty() {
612            log::warn!(
613                "All {} requests returned 'Duplicate clOrdID' - order likely exists {params}",
614                operation.to_lowercase(),
615            );
616            anyhow::bail!("IDEMPOTENT_DUPLICATE: Order likely exists but confirmation was lost");
617        }
618
619        log::error!(
620            "All {} requests failed: {errors:?} {params}",
621            operation.to_lowercase(),
622        );
623        Err(anyhow::anyhow!(
624            "All {} requests failed: {:?}",
625            operation.to_lowercase(),
626            errors
627        ))
628    }
629
630    /// Broadcasts a submit request to all healthy clients in parallel.
631    ///
632    /// # Returns
633    ///
634    /// - `Ok(report)` if successfully submitted with a report.
635    /// - `Err` if all requests failed.
636    ///
637    /// # Errors
638    ///
639    /// Returns an error if all submit requests fail or no healthy clients are available.
640    #[allow(clippy::too_many_arguments)]
641    pub async fn broadcast_submit(
642        &self,
643        instrument_id: InstrumentId,
644        client_order_id: ClientOrderId,
645        order_side: OrderSide,
646        order_type: OrderType,
647        quantity: Quantity,
648        time_in_force: TimeInForce,
649        price: Option<Price>,
650        trigger_price: Option<Price>,
651        trigger_type: Option<TriggerType>,
652        trailing_offset: Option<f64>,
653        trailing_offset_type: Option<TrailingOffsetType>,
654        display_qty: Option<Quantity>,
655        post_only: bool,
656        reduce_only: bool,
657        order_list_id: Option<OrderListId>,
658        contingency_type: Option<ContingencyType>,
659        submit_tries: Option<usize>,
660        peg_price_type: Option<BitmexPegPriceType>,
661        peg_offset_value: Option<f64>,
662    ) -> anyhow::Result<OrderStatusReport> {
663        self.total_submits.fetch_add(1, Ordering::Relaxed);
664
665        let pool_size = self.config.pool_size;
666        let actual_tries = if let Some(t) = submit_tries {
667            if t > pool_size {
668                // Use log macro for Python visibility for now
669                log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
670            }
671            std::cmp::min(t, pool_size)
672        } else {
673            pool_size
674        };
675
676        log::debug!(
677            "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
678        );
679
680        let healthy_transports: Vec<TransportClient> = self
681            .transports
682            .iter()
683            .filter(|t| t.is_healthy())
684            .take(actual_tries)
685            .cloned()
686            .collect();
687
688        if healthy_transports.is_empty() {
689            self.failed_submits.fetch_add(1, Ordering::Relaxed);
690            anyhow::bail!("No healthy transport clients available");
691        }
692
693        log::debug!(
694            "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
695            healthy_transports.len(),
696        );
697
698        let mut handles = Vec::new();
699        for transport in healthy_transports {
700            // All transports use the same client_order_id. If multiple succeed,
701            // BitMEX rejects duplicates with "duplicate clOrdID" (expected rejection).
702            let handle = get_runtime().spawn(async move {
703                let client_id = transport.client_id.clone();
704                let result = transport
705                    .submit_order(
706                        instrument_id,
707                        client_order_id,
708                        order_side,
709                        order_type,
710                        quantity,
711                        time_in_force,
712                        price,
713                        trigger_price,
714                        trigger_type,
715                        trailing_offset,
716                        trailing_offset_type,
717                        display_qty,
718                        post_only,
719                        reduce_only,
720                        order_list_id,
721                        contingency_type,
722                        peg_price_type,
723                        peg_offset_value,
724                    )
725                    .await;
726                (client_id, result)
727            });
728            handles.push(handle);
729        }
730
731        self.process_submit_results(
732            handles,
733            "Submit",
734            format!("(client_order_id={client_order_id:?})"),
735        )
736        .await
737    }
738
739    /// Gets broadcaster metrics.
740    pub fn get_metrics(&self) -> BroadcasterMetrics {
741        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
742        let total_clients = self.transports.len();
743
744        BroadcasterMetrics {
745            total_submits: self.total_submits.load(Ordering::Relaxed),
746            successful_submits: self.successful_submits.load(Ordering::Relaxed),
747            failed_submits: self.failed_submits.load(Ordering::Relaxed),
748            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
749            healthy_clients,
750            total_clients,
751        }
752    }
753
754    /// Gets broadcaster metrics (async version for use within async context).
755    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
756        self.get_metrics()
757    }
758
759    /// Gets per-client statistics.
760    pub fn get_client_stats(&self) -> Vec<ClientStats> {
761        self.transports
762            .iter()
763            .map(|t| ClientStats {
764                client_id: t.client_id.clone(),
765                healthy: t.is_healthy(),
766                submit_count: t.get_submit_count(),
767                error_count: t.get_error_count(),
768            })
769            .collect()
770    }
771
772    /// Gets per-client statistics (async version for use within async context).
773    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
774        self.get_client_stats()
775    }
776
777    /// Caches an instrument in all HTTP clients in the pool.
778    pub fn cache_instrument(&self, instrument: InstrumentAny) {
779        for transport in self.transports.iter() {
780            transport.executor.add_instrument(instrument.clone());
781        }
782    }
783
784    #[must_use]
785    pub fn clone_for_async(&self) -> Self {
786        Self {
787            config: self.config.clone(),
788            transports: Arc::clone(&self.transports),
789            health_check_task: Arc::clone(&self.health_check_task),
790            running: Arc::clone(&self.running),
791            total_submits: Arc::clone(&self.total_submits),
792            successful_submits: Arc::clone(&self.successful_submits),
793            failed_submits: Arc::clone(&self.failed_submits),
794            expected_rejects: Arc::clone(&self.expected_rejects),
795        }
796    }
797
798    #[cfg(test)]
799    fn new_with_transports(
800        config: SubmitBroadcasterConfig,
801        transports: Vec<TransportClient>,
802    ) -> Self {
803        Self {
804            config,
805            transports: Arc::new(transports),
806            health_check_task: Arc::new(RwLock::new(None)),
807            running: Arc::new(AtomicBool::new(false)),
808            total_submits: Arc::new(AtomicU64::new(0)),
809            successful_submits: Arc::new(AtomicU64::new(0)),
810            failed_submits: Arc::new(AtomicU64::new(0)),
811            expected_rejects: Arc::new(AtomicU64::new(0)),
812        }
813    }
814}
815
816/// Broadcaster metrics snapshot.
817#[derive(Debug, Clone)]
818pub struct BroadcasterMetrics {
819    pub total_submits: u64,
820    pub successful_submits: u64,
821    pub failed_submits: u64,
822    pub expected_rejects: u64,
823    pub healthy_clients: usize,
824    pub total_clients: usize,
825}
826
827/// Per-client statistics.
828#[derive(Debug, Clone)]
829pub struct ClientStats {
830    pub client_id: String,
831    pub healthy: bool,
832    pub submit_count: u64,
833    pub error_count: u64,
834}
835
836#[cfg(test)]
837mod tests {
838    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
839
840    use nautilus_core::UUID4;
841    use nautilus_model::{
842        enums::{
843            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
844        },
845        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
846        reports::OrderStatusReport,
847        types::{Price, Quantity},
848    };
849
850    use super::*;
851
852    /// Mock executor for testing.
853    #[derive(Clone)]
854    #[allow(clippy::type_complexity)]
855    struct MockExecutor {
856        handler: Arc<
857            dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
858                + Send
859                + Sync,
860        >,
861    }
862
863    impl MockExecutor {
864        fn new<F, Fut>(handler: F) -> Self
865        where
866            F: Fn() -> Fut + Send + Sync + 'static,
867            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
868        {
869            Self {
870                handler: Arc::new(move || Box::pin(handler())),
871            }
872        }
873    }
874
875    impl SubmitExecutor for MockExecutor {
876        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
877            Box::pin(async { Ok(()) })
878        }
879
880        #[allow(clippy::too_many_arguments)]
881        fn submit_order(
882            &self,
883            _instrument_id: InstrumentId,
884            _client_order_id: ClientOrderId,
885            _order_side: OrderSide,
886            _order_type: OrderType,
887            _quantity: Quantity,
888            _time_in_force: TimeInForce,
889            _price: Option<Price>,
890            _trigger_price: Option<Price>,
891            _trigger_type: Option<TriggerType>,
892            _trailing_offset: Option<f64>,
893            _trailing_offset_type: Option<TrailingOffsetType>,
894            _display_qty: Option<Quantity>,
895            _post_only: bool,
896            _reduce_only: bool,
897            _order_list_id: Option<OrderListId>,
898            _contingency_type: Option<ContingencyType>,
899            _peg_price_type: Option<BitmexPegPriceType>,
900            _peg_offset_value: Option<f64>,
901        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
902            (self.handler)()
903        }
904
905        fn add_instrument(&self, _instrument: InstrumentAny) {
906            // No-op for mock
907        }
908    }
909
910    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
911        OrderStatusReport {
912            account_id: AccountId::from("BITMEX-001"),
913            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
914            venue_order_id: VenueOrderId::from(venue_order_id),
915            order_side: OrderSide::Buy,
916            order_type: OrderType::Limit,
917            time_in_force: TimeInForce::Gtc,
918            order_status: OrderStatus::Accepted,
919            price: Some(Price::new(50000.0, 2)),
920            quantity: Quantity::new(100.0, 0),
921            filled_qty: Quantity::new(0.0, 0),
922            report_id: UUID4::new(),
923            ts_accepted: 0.into(),
924            ts_last: 0.into(),
925            ts_init: 0.into(),
926            client_order_id: None,
927            avg_px: None,
928            trigger_price: None,
929            trigger_type: None,
930            contingency_type: ContingencyType::NoContingency,
931            expire_time: None,
932            order_list_id: None,
933            venue_position_id: None,
934            linked_order_ids: None,
935            parent_order_id: None,
936            display_qty: None,
937            limit_offset: None,
938            trailing_offset: None,
939            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
940            post_only: false,
941            reduce_only: false,
942            cancel_reason: None,
943            ts_triggered: None,
944        }
945    }
946
947    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
948    where
949        F: Fn() -> Fut + Send + Sync + 'static,
950        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
951    {
952        let executor = MockExecutor::new(handler);
953        TransportClient::new(executor, client_id.to_string())
954    }
955
956    #[tokio::test]
957    async fn test_broadcast_submit_immediate_success() {
958        let report = create_test_report("ORDER-1");
959        let report_clone = report.clone();
960
961        let transports = vec![
962            create_stub_transport("client-0", move || {
963                let report = report_clone.clone();
964                async move { Ok(report) }
965            }),
966            create_stub_transport("client-1", || async {
967                tokio::time::sleep(Duration::from_secs(10)).await;
968                anyhow::bail!("Should be aborted")
969            }),
970        ];
971
972        let config = SubmitBroadcasterConfig::default();
973        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
974
975        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
976        let result = broadcaster
977            .broadcast_submit(
978                instrument_id,
979                ClientOrderId::from("O-123"),
980                OrderSide::Buy,
981                OrderType::Limit,
982                Quantity::new(100.0, 0),
983                TimeInForce::Gtc,
984                Some(Price::new(50000.0, 2)),
985                None,
986                None,
987                None,
988                None,
989                None,
990                false,
991                false,
992                None,
993                None,
994                None,
995                None,
996                None,
997            )
998            .await;
999
1000        assert!(result.is_ok());
1001        let returned_report = result.unwrap();
1002        assert_eq!(returned_report.venue_order_id, report.venue_order_id);
1003
1004        let metrics = broadcaster.get_metrics_async().await;
1005        assert_eq!(metrics.successful_submits, 1);
1006        assert_eq!(metrics.failed_submits, 0);
1007        assert_eq!(metrics.total_submits, 1);
1008    }
1009
1010    #[tokio::test]
1011    async fn test_broadcast_submit_duplicate_clordid_expected() {
1012        let transports = vec![
1013            create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
1014            create_stub_transport("client-1", || async {
1015                tokio::time::sleep(Duration::from_secs(10)).await;
1016                anyhow::bail!("Should be aborted")
1017            }),
1018        ];
1019
1020        let config = SubmitBroadcasterConfig::default();
1021        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1022
1023        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1024        let result = broadcaster
1025            .broadcast_submit(
1026                instrument_id,
1027                ClientOrderId::from("O-123"),
1028                OrderSide::Buy,
1029                OrderType::Limit,
1030                Quantity::new(100.0, 0),
1031                TimeInForce::Gtc,
1032                Some(Price::new(50000.0, 2)),
1033                None,
1034                None,
1035                None,
1036                None,
1037                None,
1038                false,
1039                false,
1040                None,
1041                None,
1042                None,
1043                None,
1044                None,
1045            )
1046            .await;
1047
1048        assert!(result.is_err());
1049
1050        let metrics = broadcaster.get_metrics_async().await;
1051        assert_eq!(metrics.expected_rejects, 1);
1052        assert_eq!(metrics.successful_submits, 0);
1053        assert_eq!(metrics.failed_submits, 1);
1054    }
1055
1056    #[tokio::test]
1057    async fn test_broadcast_submit_all_failures() {
1058        let transports = vec![
1059            create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1060            create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1061        ];
1062
1063        let config = SubmitBroadcasterConfig::default();
1064        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1065
1066        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1067        let result = broadcaster
1068            .broadcast_submit(
1069                instrument_id,
1070                ClientOrderId::from("O-456"),
1071                OrderSide::Sell,
1072                OrderType::Market,
1073                Quantity::new(50.0, 0),
1074                TimeInForce::Ioc,
1075                None,
1076                None,
1077                None,
1078                None,
1079                None,
1080                None,
1081                false,
1082                false,
1083                None,
1084                None,
1085                None,
1086                None,
1087                None,
1088            )
1089            .await;
1090
1091        assert!(result.is_err());
1092        assert!(
1093            result
1094                .unwrap_err()
1095                .to_string()
1096                .contains("All submit requests failed")
1097        );
1098
1099        let metrics = broadcaster.get_metrics_async().await;
1100        assert_eq!(metrics.failed_submits, 1);
1101        assert_eq!(metrics.successful_submits, 0);
1102    }
1103
1104    #[tokio::test]
1105    async fn test_broadcast_submit_no_healthy_clients() {
1106        let transport =
1107            create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1108        transport.healthy.store(false, Ordering::Relaxed);
1109
1110        let config = SubmitBroadcasterConfig::default();
1111        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1112
1113        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1114        let result = broadcaster
1115            .broadcast_submit(
1116                instrument_id,
1117                ClientOrderId::from("O-789"),
1118                OrderSide::Buy,
1119                OrderType::Limit,
1120                Quantity::new(100.0, 0),
1121                TimeInForce::Gtc,
1122                Some(Price::new(50000.0, 2)),
1123                None,
1124                None,
1125                None,
1126                None,
1127                None,
1128                false,
1129                false,
1130                None,
1131                None,
1132                None,
1133                None,
1134                None,
1135            )
1136            .await;
1137
1138        assert!(result.is_err());
1139        assert!(
1140            result
1141                .unwrap_err()
1142                .to_string()
1143                .contains("No healthy transport clients available")
1144        );
1145
1146        let metrics = broadcaster.get_metrics_async().await;
1147        assert_eq!(metrics.failed_submits, 1);
1148    }
1149
1150    #[tokio::test]
1151    async fn test_default_config() {
1152        let config = SubmitBroadcasterConfig {
1153            api_key: Some("test_key".to_string()),
1154            api_secret: Some("test_secret".to_string()),
1155            base_url: Some("https://test.example.com".to_string()),
1156            ..Default::default()
1157        };
1158
1159        let broadcaster = SubmitBroadcaster::new(config);
1160        assert!(broadcaster.is_ok());
1161
1162        let broadcaster = broadcaster.unwrap();
1163        let metrics = broadcaster.get_metrics_async().await;
1164
1165        // Default pool_size is 3
1166        assert_eq!(metrics.total_clients, 3);
1167    }
1168
1169    #[tokio::test]
1170    async fn test_broadcaster_lifecycle() {
1171        let config = SubmitBroadcasterConfig {
1172            pool_size: 2,
1173            api_key: Some("test_key".to_string()),
1174            api_secret: Some("test_secret".to_string()),
1175            base_url: Some("https://test.example.com".to_string()),
1176            testnet: false,
1177            timeout_secs: Some(5),
1178            max_retries: None,
1179            retry_delay_ms: None,
1180            retry_delay_max_ms: None,
1181            recv_window_ms: None,
1182            max_requests_per_second: None,
1183            max_requests_per_minute: None,
1184            health_check_interval_secs: 60,
1185            health_check_timeout_secs: 1,
1186            expected_reject_patterns: vec![],
1187            proxy_urls: vec![],
1188        };
1189
1190        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1191
1192        // Should not be running initially
1193        assert!(!broadcaster.running.load(Ordering::Relaxed));
1194
1195        // Start broadcaster
1196        let start_result = broadcaster.start().await;
1197        assert!(start_result.is_ok());
1198        assert!(broadcaster.running.load(Ordering::Relaxed));
1199
1200        // Starting again should be idempotent
1201        let start_again = broadcaster.start().await;
1202        assert!(start_again.is_ok());
1203
1204        // Stop broadcaster
1205        broadcaster.stop().await;
1206        assert!(!broadcaster.running.load(Ordering::Relaxed));
1207
1208        // Stopping again should be safe
1209        broadcaster.stop().await;
1210        assert!(!broadcaster.running.load(Ordering::Relaxed));
1211    }
1212
1213    #[tokio::test]
1214    async fn test_broadcast_submit_metrics_increment() {
1215        let report = create_test_report("ORDER-1");
1216        let report_clone = report.clone();
1217
1218        let transports = vec![create_stub_transport("client-0", move || {
1219            let report = report_clone.clone();
1220            async move { Ok(report) }
1221        })];
1222
1223        let config = SubmitBroadcasterConfig::default();
1224        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1225
1226        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1227        let _ = broadcaster
1228            .broadcast_submit(
1229                instrument_id,
1230                ClientOrderId::from("O-123"),
1231                OrderSide::Buy,
1232                OrderType::Limit,
1233                Quantity::new(100.0, 0),
1234                TimeInForce::Gtc,
1235                Some(Price::new(50000.0, 2)),
1236                None,
1237                None,
1238                None,
1239                None,
1240                None,
1241                false,
1242                false,
1243                None,
1244                None,
1245                None,
1246                None,
1247                None,
1248            )
1249            .await;
1250
1251        let metrics = broadcaster.get_metrics_async().await;
1252        assert_eq!(metrics.total_submits, 1);
1253        assert_eq!(metrics.successful_submits, 1);
1254        assert_eq!(metrics.failed_submits, 0);
1255    }
1256
1257    #[tokio::test]
1258    async fn test_broadcaster_creation_with_pool() {
1259        let config = SubmitBroadcasterConfig {
1260            pool_size: 4,
1261            api_key: Some("test_key".to_string()),
1262            api_secret: Some("test_secret".to_string()),
1263            base_url: Some("https://test.example.com".to_string()),
1264            ..Default::default()
1265        };
1266
1267        let broadcaster = SubmitBroadcaster::new(config);
1268        assert!(broadcaster.is_ok());
1269
1270        let broadcaster = broadcaster.unwrap();
1271        let metrics = broadcaster.get_metrics_async().await;
1272        assert_eq!(metrics.total_clients, 4);
1273    }
1274
1275    #[tokio::test]
1276    async fn test_client_stats_collection() {
1277        // Both clients fail so broadcast waits for all of them (no early abort on success).
1278        // This ensures both clients execute and record their stats before the function returns.
1279        let transports = vec![
1280            create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
1281            create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1282        ];
1283
1284        let config = SubmitBroadcasterConfig::default();
1285        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1286
1287        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1288        let _ = broadcaster
1289            .broadcast_submit(
1290                instrument_id,
1291                ClientOrderId::from("O-123"),
1292                OrderSide::Buy,
1293                OrderType::Limit,
1294                Quantity::new(100.0, 0),
1295                TimeInForce::Gtc,
1296                Some(Price::new(50000.0, 2)),
1297                None,
1298                None,
1299                None,
1300                None,
1301                None,
1302                false,
1303                false,
1304                None,
1305                None,
1306                None,
1307                None,
1308                None,
1309            )
1310            .await;
1311
1312        let stats = broadcaster.get_client_stats_async().await;
1313        assert_eq!(stats.len(), 2);
1314
1315        let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1316        assert_eq!(client0.submit_count, 1);
1317        assert_eq!(client0.error_count, 1);
1318
1319        let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1320        assert_eq!(client1.submit_count, 1);
1321        assert_eq!(client1.error_count, 1);
1322    }
1323
1324    #[tokio::test]
1325    async fn test_testnet_config_sets_base_url() {
1326        let config = SubmitBroadcasterConfig {
1327            pool_size: 1,
1328            api_key: Some("test_key".to_string()),
1329            api_secret: Some("test_secret".to_string()),
1330            testnet: true,
1331            base_url: None,
1332            ..Default::default()
1333        };
1334
1335        let broadcaster = SubmitBroadcaster::new(config);
1336        assert!(broadcaster.is_ok());
1337    }
1338
1339    #[tokio::test]
1340    async fn test_clone_for_async() {
1341        let config = SubmitBroadcasterConfig {
1342            pool_size: 1,
1343            api_key: Some("test_key".to_string()),
1344            api_secret: Some("test_secret".to_string()),
1345            base_url: Some("https://test.example.com".to_string()),
1346            ..Default::default()
1347        };
1348
1349        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1350        let cloned = broadcaster.clone_for_async();
1351
1352        // Verify they share the same atomics
1353        broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1354        assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1355    }
1356
1357    #[tokio::test]
1358    async fn test_pattern_matching() {
1359        let config = SubmitBroadcasterConfig {
1360            expected_reject_patterns: vec![
1361                "Duplicate clOrdID".to_string(),
1362                "Order already exists".to_string(),
1363            ],
1364            ..Default::default()
1365        };
1366
1367        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1368
1369        assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1370        assert!(broadcaster.is_expected_reject("Order already exists in system"));
1371        assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1372        assert!(!broadcaster.is_expected_reject("Internal server error"));
1373    }
1374
1375    #[tokio::test]
1376    async fn test_submit_metrics_with_mixed_responses() {
1377        let report = create_test_report("ORDER-1");
1378        let report_clone = report.clone();
1379
1380        let transports = vec![
1381            create_stub_transport("client-0", move || {
1382                let report = report_clone.clone();
1383                async move { Ok(report) }
1384            }),
1385            create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1386        ];
1387
1388        let config = SubmitBroadcasterConfig::default();
1389        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1390
1391        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1392        let result = broadcaster
1393            .broadcast_submit(
1394                instrument_id,
1395                ClientOrderId::from("O-123"),
1396                OrderSide::Buy,
1397                OrderType::Limit,
1398                Quantity::new(100.0, 0),
1399                TimeInForce::Gtc,
1400                Some(Price::new(50000.0, 2)),
1401                None,
1402                None,
1403                None,
1404                None,
1405                None,
1406                false,
1407                false,
1408                None,
1409                None,
1410                None,
1411                None,
1412                None,
1413            )
1414            .await;
1415
1416        assert!(result.is_ok());
1417
1418        let metrics = broadcaster.get_metrics_async().await;
1419        assert_eq!(metrics.total_submits, 1);
1420        assert_eq!(metrics.successful_submits, 1);
1421        assert_eq!(metrics.failed_submits, 0);
1422    }
1423
1424    #[tokio::test]
1425    async fn test_metrics_initialization_and_health() {
1426        let config = SubmitBroadcasterConfig {
1427            pool_size: 2,
1428            api_key: Some("test_key".to_string()),
1429            api_secret: Some("test_secret".to_string()),
1430            base_url: Some("https://test.example.com".to_string()),
1431            ..Default::default()
1432        };
1433
1434        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1435        let metrics = broadcaster.get_metrics_async().await;
1436
1437        assert_eq!(metrics.total_submits, 0);
1438        assert_eq!(metrics.successful_submits, 0);
1439        assert_eq!(metrics.failed_submits, 0);
1440        assert_eq!(metrics.expected_rejects, 0);
1441        assert_eq!(metrics.total_clients, 2);
1442        assert_eq!(metrics.healthy_clients, 2);
1443    }
1444
1445    #[tokio::test]
1446    async fn test_health_check_task_lifecycle() {
1447        let config = SubmitBroadcasterConfig {
1448            pool_size: 2,
1449            api_key: Some("test_key".to_string()),
1450            api_secret: Some("test_secret".to_string()),
1451            base_url: Some("https://test.example.com".to_string()),
1452            health_check_interval_secs: 1,
1453            ..Default::default()
1454        };
1455
1456        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1457
1458        // Start should spawn health check task
1459        broadcaster.start().await.unwrap();
1460        assert!(broadcaster.running.load(Ordering::Relaxed));
1461        assert!(
1462            broadcaster
1463                .health_check_task
1464                .read()
1465                .await
1466                .as_ref()
1467                .is_some()
1468        );
1469
1470        // Stop should clean up task
1471        broadcaster.stop().await;
1472        assert!(!broadcaster.running.load(Ordering::Relaxed));
1473    }
1474
1475    #[tokio::test]
1476    async fn test_expected_reject_pattern_comprehensive() {
1477        let transports = vec![
1478            create_stub_transport("client-0", || async {
1479                anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1480            }),
1481            create_stub_transport("client-1", || async {
1482                tokio::time::sleep(Duration::from_secs(10)).await;
1483                anyhow::bail!("Should be aborted")
1484            }),
1485        ];
1486
1487        let config = SubmitBroadcasterConfig::default();
1488        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1489
1490        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1491        let result = broadcaster
1492            .broadcast_submit(
1493                instrument_id,
1494                ClientOrderId::from("O-123"),
1495                OrderSide::Buy,
1496                OrderType::Limit,
1497                Quantity::new(100.0, 0),
1498                TimeInForce::Gtc,
1499                Some(Price::new(50000.0, 2)),
1500                None,
1501                None,
1502                None,
1503                None,
1504                None,
1505                false,
1506                false,
1507                None,
1508                None,
1509                None,
1510                None,
1511                None,
1512            )
1513            .await;
1514
1515        // All failed with expected reject
1516        assert!(result.is_err());
1517
1518        let metrics = broadcaster.get_metrics_async().await;
1519        assert_eq!(metrics.expected_rejects, 1);
1520        assert_eq!(metrics.failed_submits, 1);
1521        assert_eq!(metrics.successful_submits, 0);
1522    }
1523
1524    #[tokio::test]
1525    async fn test_client_order_id_suffix_for_multiple_clients() {
1526        use std::sync::{Arc, Mutex};
1527
1528        #[derive(Clone)]
1529        struct CaptureExecutor {
1530            captured_ids: Arc<Mutex<Vec<String>>>,
1531            barrier: Arc<tokio::sync::Barrier>,
1532            report: OrderStatusReport,
1533        }
1534
1535        impl SubmitExecutor for CaptureExecutor {
1536            fn health_check(
1537                &self,
1538            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1539                Box::pin(async { Ok(()) })
1540            }
1541
1542            #[allow(clippy::too_many_arguments)]
1543            fn submit_order(
1544                &self,
1545                _instrument_id: InstrumentId,
1546                client_order_id: ClientOrderId,
1547                _order_side: OrderSide,
1548                _order_type: OrderType,
1549                _quantity: Quantity,
1550                _time_in_force: TimeInForce,
1551                _price: Option<Price>,
1552                _trigger_price: Option<Price>,
1553                _trigger_type: Option<TriggerType>,
1554                _trailing_offset: Option<f64>,
1555                _trailing_offset_type: Option<TrailingOffsetType>,
1556                _display_qty: Option<Quantity>,
1557                _post_only: bool,
1558                _reduce_only: bool,
1559                _order_list_id: Option<OrderListId>,
1560                _contingency_type: Option<ContingencyType>,
1561                _peg_price_type: Option<BitmexPegPriceType>,
1562                _peg_offset_value: Option<f64>,
1563            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1564            {
1565                // Capture the client_order_id
1566                self.captured_ids
1567                    .lock()
1568                    .unwrap()
1569                    .push(client_order_id.as_str().to_string());
1570                let report = self.report.clone();
1571                let barrier = Arc::clone(&self.barrier);
1572                // Wait for all tasks to capture their IDs before any completes
1573                // (with concurrent execution, first success aborts others)
1574                Box::pin(async move {
1575                    barrier.wait().await;
1576                    Ok(report)
1577                })
1578            }
1579
1580            fn add_instrument(&self, _instrument: InstrumentAny) {}
1581        }
1582
1583        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1584        let barrier = Arc::new(tokio::sync::Barrier::new(3));
1585        let report = create_test_report("ORDER-1");
1586
1587        let transports = vec![
1588            TransportClient::new(
1589                CaptureExecutor {
1590                    captured_ids: Arc::clone(&captured_ids),
1591                    barrier: Arc::clone(&barrier),
1592                    report: report.clone(),
1593                },
1594                "client-0".to_string(),
1595            ),
1596            TransportClient::new(
1597                CaptureExecutor {
1598                    captured_ids: Arc::clone(&captured_ids),
1599                    barrier: Arc::clone(&barrier),
1600                    report: report.clone(),
1601                },
1602                "client-1".to_string(),
1603            ),
1604            TransportClient::new(
1605                CaptureExecutor {
1606                    captured_ids: Arc::clone(&captured_ids),
1607                    barrier: Arc::clone(&barrier),
1608                    report: report.clone(),
1609                },
1610                "client-2".to_string(),
1611            ),
1612        ];
1613
1614        let config = SubmitBroadcasterConfig::default();
1615        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1616
1617        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1618        let result = broadcaster
1619            .broadcast_submit(
1620                instrument_id,
1621                ClientOrderId::from("O-123"),
1622                OrderSide::Buy,
1623                OrderType::Limit,
1624                Quantity::new(100.0, 0),
1625                TimeInForce::Gtc,
1626                Some(Price::new(50000.0, 2)),
1627                None,
1628                None,
1629                None,
1630                None,
1631                None,
1632                false,
1633                false,
1634                None,
1635                None,
1636                None,
1637                None,
1638                None,
1639            )
1640            .await;
1641
1642        assert!(result.is_ok());
1643
1644        // All transports receive the same client_order_id (no suffixing)
1645        let ids = captured_ids.lock().unwrap();
1646        assert_eq!(ids.len(), 3);
1647        assert!(ids.iter().all(|id| id == "O-123")); // All clients get the same ID
1648    }
1649
1650    #[tokio::test]
1651    async fn test_client_order_id_suffix_with_partial_failure() {
1652        use std::sync::{Arc, Mutex};
1653
1654        #[derive(Clone)]
1655        struct CaptureAndFailExecutor {
1656            captured_ids: Arc<Mutex<Vec<String>>>,
1657            barrier: Arc<tokio::sync::Barrier>,
1658            should_succeed: bool,
1659        }
1660
1661        impl SubmitExecutor for CaptureAndFailExecutor {
1662            fn health_check(
1663                &self,
1664            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1665                Box::pin(async { Ok(()) })
1666            }
1667
1668            #[allow(clippy::too_many_arguments)]
1669            fn submit_order(
1670                &self,
1671                _instrument_id: InstrumentId,
1672                client_order_id: ClientOrderId,
1673                _order_side: OrderSide,
1674                _order_type: OrderType,
1675                _quantity: Quantity,
1676                _time_in_force: TimeInForce,
1677                _price: Option<Price>,
1678                _trigger_price: Option<Price>,
1679                _trigger_type: Option<TriggerType>,
1680                _trailing_offset: Option<f64>,
1681                _trailing_offset_type: Option<TrailingOffsetType>,
1682                _display_qty: Option<Quantity>,
1683                _post_only: bool,
1684                _reduce_only: bool,
1685                _order_list_id: Option<OrderListId>,
1686                _contingency_type: Option<ContingencyType>,
1687                _peg_price_type: Option<BitmexPegPriceType>,
1688                _peg_offset_value: Option<f64>,
1689            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1690            {
1691                // Capture the client_order_id
1692                self.captured_ids
1693                    .lock()
1694                    .unwrap()
1695                    .push(client_order_id.as_str().to_string());
1696                let barrier = Arc::clone(&self.barrier);
1697                let should_succeed = self.should_succeed;
1698                // Wait for all tasks to capture their IDs before any completes
1699                // (with concurrent execution, first success aborts others)
1700                Box::pin(async move {
1701                    barrier.wait().await;
1702                    if should_succeed {
1703                        Ok(create_test_report("ORDER-1"))
1704                    } else {
1705                        anyhow::bail!("Network error")
1706                    }
1707                })
1708            }
1709
1710            fn add_instrument(&self, _instrument: InstrumentAny) {}
1711        }
1712
1713        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1714        let barrier = Arc::new(tokio::sync::Barrier::new(2));
1715
1716        let transports = vec![
1717            TransportClient::new(
1718                CaptureAndFailExecutor {
1719                    captured_ids: Arc::clone(&captured_ids),
1720                    barrier: Arc::clone(&barrier),
1721                    should_succeed: false,
1722                },
1723                "client-0".to_string(),
1724            ),
1725            TransportClient::new(
1726                CaptureAndFailExecutor {
1727                    captured_ids: Arc::clone(&captured_ids),
1728                    barrier: Arc::clone(&barrier),
1729                    should_succeed: true,
1730                },
1731                "client-1".to_string(),
1732            ),
1733        ];
1734
1735        let config = SubmitBroadcasterConfig::default();
1736        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1737
1738        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1739        let result = broadcaster
1740            .broadcast_submit(
1741                instrument_id,
1742                ClientOrderId::from("O-456"),
1743                OrderSide::Sell,
1744                OrderType::Market,
1745                Quantity::new(50.0, 0),
1746                TimeInForce::Ioc,
1747                None,
1748                None,
1749                None,
1750                None,
1751                None,
1752                None,
1753                false,
1754                false,
1755                None,
1756                None,
1757                None,
1758                None,
1759                None,
1760            )
1761            .await;
1762
1763        assert!(result.is_ok());
1764
1765        // All transports receive the same client_order_id (no suffixing)
1766        let ids = captured_ids.lock().unwrap();
1767        assert_eq!(ids.len(), 2);
1768        assert!(ids.iter().all(|id| id == "O-456")); // All clients get the same ID
1769    }
1770
1771    #[tokio::test]
1772    async fn test_proxy_urls_populated_from_config() {
1773        let config = SubmitBroadcasterConfig {
1774            pool_size: 3,
1775            api_key: Some("test_key".to_string()),
1776            api_secret: Some("test_secret".to_string()),
1777            proxy_urls: vec![
1778                Some("http://proxy1:8080".to_string()),
1779                Some("http://proxy2:8080".to_string()),
1780                Some("http://proxy3:8080".to_string()),
1781            ],
1782            ..Default::default()
1783        };
1784
1785        assert_eq!(config.proxy_urls.len(), 3);
1786        assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1787        assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1788        assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1789    }
1790}