1use std::{
37 fmt::Debug,
38 future::Future,
39 pin::Pin,
40 sync::{
41 Arc,
42 atomic::{AtomicBool, AtomicU64, Ordering},
43 },
44 time::Duration,
45};
46
47use futures_util::future;
48use nautilus_common::live::get_runtime;
49use nautilus_model::{
50 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
51 identifiers::{ClientOrderId, InstrumentId, OrderListId},
52 instruments::InstrumentAny,
53 reports::OrderStatusReport,
54 types::{Price, Quantity},
55};
56use tokio::{sync::RwLock, task::JoinHandle, time::interval};
57
58use crate::{
59 common::{consts::BITMEX_HTTP_TESTNET_URL, enums::BitmexPegPriceType},
60 http::client::BitmexHttpClient,
61};
62
63trait SubmitExecutor: Send + Sync {
85 fn add_instrument(&self, instrument: InstrumentAny);
87
88 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
90
91 #[allow(clippy::too_many_arguments)]
93 fn submit_order(
94 &self,
95 instrument_id: InstrumentId,
96 client_order_id: ClientOrderId,
97 order_side: OrderSide,
98 order_type: OrderType,
99 quantity: Quantity,
100 time_in_force: TimeInForce,
101 price: Option<Price>,
102 trigger_price: Option<Price>,
103 trigger_type: Option<TriggerType>,
104 trailing_offset: Option<f64>,
105 trailing_offset_type: Option<TrailingOffsetType>,
106 display_qty: Option<Quantity>,
107 post_only: bool,
108 reduce_only: bool,
109 order_list_id: Option<OrderListId>,
110 contingency_type: Option<ContingencyType>,
111 peg_price_type: Option<BitmexPegPriceType>,
112 peg_offset_value: Option<f64>,
113 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
114}
115
116impl SubmitExecutor for BitmexHttpClient {
117 fn add_instrument(&self, instrument: InstrumentAny) {
118 Self::cache_instrument(self, instrument);
119 }
120
121 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
122 Box::pin(async move {
123 Self::get_server_time(self)
124 .await
125 .map(|_| ())
126 .map_err(|e| anyhow::anyhow!("{e}"))
127 })
128 }
129
130 #[allow(clippy::too_many_arguments)]
131 fn submit_order(
132 &self,
133 instrument_id: InstrumentId,
134 client_order_id: ClientOrderId,
135 order_side: OrderSide,
136 order_type: OrderType,
137 quantity: Quantity,
138 time_in_force: TimeInForce,
139 price: Option<Price>,
140 trigger_price: Option<Price>,
141 trigger_type: Option<TriggerType>,
142 trailing_offset: Option<f64>,
143 trailing_offset_type: Option<TrailingOffsetType>,
144 display_qty: Option<Quantity>,
145 post_only: bool,
146 reduce_only: bool,
147 order_list_id: Option<OrderListId>,
148 contingency_type: Option<ContingencyType>,
149 peg_price_type: Option<BitmexPegPriceType>,
150 peg_offset_value: Option<f64>,
151 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
152 Box::pin(async move {
153 Self::submit_order(
154 self,
155 instrument_id,
156 client_order_id,
157 order_side,
158 order_type,
159 quantity,
160 time_in_force,
161 price,
162 trigger_price,
163 trigger_type,
164 trailing_offset,
165 trailing_offset_type,
166 display_qty,
167 post_only,
168 reduce_only,
169 order_list_id,
170 contingency_type,
171 peg_price_type,
172 peg_offset_value,
173 )
174 .await
175 })
176 }
177}
178
179#[derive(Debug, Clone)]
181pub struct SubmitBroadcasterConfig {
182 pub pool_size: usize,
184 pub api_key: Option<String>,
186 pub api_secret: Option<String>,
188 pub base_url: Option<String>,
190 pub testnet: bool,
192 pub timeout_secs: Option<u64>,
194 pub max_retries: Option<u32>,
196 pub retry_delay_ms: Option<u64>,
198 pub retry_delay_max_ms: Option<u64>,
200 pub recv_window_ms: Option<u64>,
202 pub max_requests_per_second: Option<u32>,
204 pub max_requests_per_minute: Option<u32>,
206 pub health_check_interval_secs: u64,
208 pub health_check_timeout_secs: u64,
210 pub expected_reject_patterns: Vec<String>,
212 pub proxy_urls: Vec<Option<String>>,
218}
219
220impl Default for SubmitBroadcasterConfig {
221 fn default() -> Self {
222 Self {
223 pool_size: 3,
224 api_key: None,
225 api_secret: None,
226 base_url: None,
227 testnet: false,
228 timeout_secs: Some(60),
229 max_retries: None,
230 retry_delay_ms: Some(1_000),
231 retry_delay_max_ms: Some(5_000),
232 recv_window_ms: Some(10_000),
233 max_requests_per_second: Some(10),
234 max_requests_per_minute: Some(120),
235 health_check_interval_secs: 30,
236 health_check_timeout_secs: 5,
237 expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
238 proxy_urls: vec![],
239 }
240 }
241}
242
243#[derive(Clone)]
245struct TransportClient {
246 executor: Arc<dyn SubmitExecutor>,
251 client_id: String,
252 healthy: Arc<AtomicBool>,
253 submit_count: Arc<AtomicU64>,
254 error_count: Arc<AtomicU64>,
255}
256
257impl Debug for TransportClient {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 f.debug_struct(stringify!(TransportClient))
260 .field("client_id", &self.client_id)
261 .field("healthy", &self.healthy)
262 .field("submit_count", &self.submit_count)
263 .field("error_count", &self.error_count)
264 .finish()
265 }
266}
267
268impl TransportClient {
269 fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
270 Self {
271 executor: Arc::new(executor),
272 client_id,
273 healthy: Arc::new(AtomicBool::new(true)),
274 submit_count: Arc::new(AtomicU64::new(0)),
275 error_count: Arc::new(AtomicU64::new(0)),
276 }
277 }
278
279 fn is_healthy(&self) -> bool {
280 self.healthy.load(Ordering::Relaxed)
281 }
282
283 fn mark_healthy(&self) {
284 self.healthy.store(true, Ordering::Relaxed);
285 }
286
287 fn mark_unhealthy(&self) {
288 self.healthy.store(false, Ordering::Relaxed);
289 }
290
291 fn get_submit_count(&self) -> u64 {
292 self.submit_count.load(Ordering::Relaxed)
293 }
294
295 fn get_error_count(&self) -> u64 {
296 self.error_count.load(Ordering::Relaxed)
297 }
298
299 async fn health_check(&self, timeout_secs: u64) -> bool {
300 match tokio::time::timeout(
301 Duration::from_secs(timeout_secs),
302 self.executor.health_check(),
303 )
304 .await
305 {
306 Ok(Ok(())) => {
307 self.mark_healthy();
308 true
309 }
310 Ok(Err(e)) => {
311 log::warn!("Health check failed for client {}: {e:?}", self.client_id);
312 self.mark_unhealthy();
313 false
314 }
315 Err(_) => {
316 log::warn!("Health check timeout for client {}", self.client_id);
317 self.mark_unhealthy();
318 false
319 }
320 }
321 }
322
323 #[allow(clippy::too_many_arguments)]
324 async fn submit_order(
325 &self,
326 instrument_id: InstrumentId,
327 client_order_id: ClientOrderId,
328 order_side: OrderSide,
329 order_type: OrderType,
330 quantity: Quantity,
331 time_in_force: TimeInForce,
332 price: Option<Price>,
333 trigger_price: Option<Price>,
334 trigger_type: Option<TriggerType>,
335 trailing_offset: Option<f64>,
336 trailing_offset_type: Option<TrailingOffsetType>,
337 display_qty: Option<Quantity>,
338 post_only: bool,
339 reduce_only: bool,
340 order_list_id: Option<OrderListId>,
341 contingency_type: Option<ContingencyType>,
342 peg_price_type: Option<BitmexPegPriceType>,
343 peg_offset_value: Option<f64>,
344 ) -> anyhow::Result<OrderStatusReport> {
345 self.submit_count.fetch_add(1, Ordering::Relaxed);
346
347 match self
348 .executor
349 .submit_order(
350 instrument_id,
351 client_order_id,
352 order_side,
353 order_type,
354 quantity,
355 time_in_force,
356 price,
357 trigger_price,
358 trigger_type,
359 trailing_offset,
360 trailing_offset_type,
361 display_qty,
362 post_only,
363 reduce_only,
364 order_list_id,
365 contingency_type,
366 peg_price_type,
367 peg_offset_value,
368 )
369 .await
370 {
371 Ok(report) => {
372 self.mark_healthy();
373 Ok(report)
374 }
375 Err(e) => {
376 self.error_count.fetch_add(1, Ordering::Relaxed);
377 Err(e)
378 }
379 }
380 }
381}
382
383#[cfg_attr(feature = "python", pyo3::pyclass)]
389#[derive(Debug)]
390pub struct SubmitBroadcaster {
391 config: SubmitBroadcasterConfig,
392 transports: Arc<Vec<TransportClient>>,
393 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
394 running: Arc<AtomicBool>,
395 total_submits: Arc<AtomicU64>,
396 successful_submits: Arc<AtomicU64>,
397 failed_submits: Arc<AtomicU64>,
398 expected_rejects: Arc<AtomicU64>,
399}
400
401impl SubmitBroadcaster {
402 pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
408 let mut transports = Vec::with_capacity(config.pool_size);
409
410 let base_url = if config.testnet && config.base_url.is_none() {
412 Some(BITMEX_HTTP_TESTNET_URL.to_string())
413 } else {
414 config.base_url.clone()
415 };
416
417 for i in 0..config.pool_size {
418 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
420
421 let client = BitmexHttpClient::with_credentials(
422 config.api_key.clone(),
423 config.api_secret.clone(),
424 base_url.clone(),
425 config.timeout_secs,
426 config.max_retries,
427 config.retry_delay_ms,
428 config.retry_delay_max_ms,
429 config.recv_window_ms,
430 config.max_requests_per_second,
431 config.max_requests_per_minute,
432 proxy_url,
433 )
434 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
435
436 transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
437 }
438
439 Ok(Self {
440 config,
441 transports: Arc::new(transports),
442 health_check_task: Arc::new(RwLock::new(None)),
443 running: Arc::new(AtomicBool::new(false)),
444 total_submits: Arc::new(AtomicU64::new(0)),
445 successful_submits: Arc::new(AtomicU64::new(0)),
446 failed_submits: Arc::new(AtomicU64::new(0)),
447 expected_rejects: Arc::new(AtomicU64::new(0)),
448 })
449 }
450
451 pub async fn start(&self) -> anyhow::Result<()> {
457 if self.running.load(Ordering::Relaxed) {
458 return Ok(());
459 }
460
461 self.running.store(true, Ordering::Relaxed);
462
463 self.run_health_checks().await;
465
466 let transports = Arc::clone(&self.transports);
468 let running = Arc::clone(&self.running);
469 let interval_secs = self.config.health_check_interval_secs;
470 let timeout_secs = self.config.health_check_timeout_secs;
471
472 let task = get_runtime().spawn(async move {
473 let mut ticker = interval(Duration::from_secs(interval_secs));
474 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
475
476 loop {
477 ticker.tick().await;
478
479 if !running.load(Ordering::Relaxed) {
480 break;
481 }
482
483 let tasks: Vec<_> = transports
484 .iter()
485 .map(|t| t.health_check(timeout_secs))
486 .collect();
487
488 let results = future::join_all(tasks).await;
489 let healthy_count = results.iter().filter(|&&r| r).count();
490
491 log::debug!(
492 "Health check complete: {healthy_count}/{} clients healthy",
493 results.len()
494 );
495 }
496 });
497
498 *self.health_check_task.write().await = Some(task);
499
500 log::info!(
501 "SubmitBroadcaster started with {} clients",
502 self.transports.len()
503 );
504
505 Ok(())
506 }
507
508 pub async fn stop(&self) {
510 if !self.running.load(Ordering::Relaxed) {
511 return;
512 }
513
514 self.running.store(false, Ordering::Relaxed);
515
516 if let Some(task) = self.health_check_task.write().await.take() {
517 task.abort();
518 }
519
520 log::info!("SubmitBroadcaster stopped");
521 }
522
523 async fn run_health_checks(&self) {
524 let tasks: Vec<_> = self
525 .transports
526 .iter()
527 .map(|t| t.health_check(self.config.health_check_timeout_secs))
528 .collect();
529
530 let results = future::join_all(tasks).await;
531 let healthy_count = results.iter().filter(|&&r| r).count();
532
533 log::debug!(
534 "Health check complete: {healthy_count}/{} clients healthy",
535 results.len()
536 );
537 }
538
539 fn is_expected_reject(&self, error_message: &str) -> bool {
540 self.config
541 .expected_reject_patterns
542 .iter()
543 .any(|pattern| error_message.contains(pattern))
544 }
545
546 async fn process_submit_results<T>(
550 &self,
551 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
552 operation: &str,
553 params: String,
554 ) -> anyhow::Result<T>
555 where
556 T: Send + 'static,
557 {
558 let mut errors = Vec::new();
559 let mut all_duplicate_clordid = true;
560
561 while !handles.is_empty() {
562 let current_handles = std::mem::take(&mut handles);
563 let (result, _idx, remaining) = future::select_all(current_handles).await;
564 handles = remaining.into_iter().collect();
565
566 match result {
567 Ok((client_id, Ok(result))) => {
568 for handle in &handles {
570 handle.abort();
571 }
572 self.successful_submits.fetch_add(1, Ordering::Relaxed);
573 log::debug!("{operation} broadcast succeeded [{client_id}] {params}",);
574 return Ok(result);
575 }
576 Ok((client_id, Err(e))) => {
577 let error_msg = e.to_string();
578 let is_duplicate = error_msg.contains("Duplicate clOrdID");
579
580 if !is_duplicate {
581 all_duplicate_clordid = false;
582 }
583
584 if self.is_expected_reject(&error_msg) {
585 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
586 log::debug!(
587 "Expected {} rejection [{client_id}]: {error_msg} {params}",
588 operation.to_lowercase(),
589 );
590 errors.push(error_msg);
591 } else {
592 log::warn!(
593 "{operation} request failed [{client_id}]: {error_msg} {params}",
594 );
595 errors.push(error_msg);
596 }
597 }
598 Err(e) => {
599 all_duplicate_clordid = false;
600 log::warn!("{operation} task join error: {e:?}");
601 errors.push(format!("Task panicked: {e:?}"));
602 }
603 }
604 }
605
606 self.failed_submits.fetch_add(1, Ordering::Relaxed);
608
609 if all_duplicate_clordid && !errors.is_empty() {
612 log::warn!(
613 "All {} requests returned 'Duplicate clOrdID' - order likely exists {params}",
614 operation.to_lowercase(),
615 );
616 anyhow::bail!("IDEMPOTENT_DUPLICATE: Order likely exists but confirmation was lost");
617 }
618
619 log::error!(
620 "All {} requests failed: {errors:?} {params}",
621 operation.to_lowercase(),
622 );
623 Err(anyhow::anyhow!(
624 "All {} requests failed: {:?}",
625 operation.to_lowercase(),
626 errors
627 ))
628 }
629
630 #[allow(clippy::too_many_arguments)]
641 pub async fn broadcast_submit(
642 &self,
643 instrument_id: InstrumentId,
644 client_order_id: ClientOrderId,
645 order_side: OrderSide,
646 order_type: OrderType,
647 quantity: Quantity,
648 time_in_force: TimeInForce,
649 price: Option<Price>,
650 trigger_price: Option<Price>,
651 trigger_type: Option<TriggerType>,
652 trailing_offset: Option<f64>,
653 trailing_offset_type: Option<TrailingOffsetType>,
654 display_qty: Option<Quantity>,
655 post_only: bool,
656 reduce_only: bool,
657 order_list_id: Option<OrderListId>,
658 contingency_type: Option<ContingencyType>,
659 submit_tries: Option<usize>,
660 peg_price_type: Option<BitmexPegPriceType>,
661 peg_offset_value: Option<f64>,
662 ) -> anyhow::Result<OrderStatusReport> {
663 self.total_submits.fetch_add(1, Ordering::Relaxed);
664
665 let pool_size = self.config.pool_size;
666 let actual_tries = if let Some(t) = submit_tries {
667 if t > pool_size {
668 log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
670 }
671 std::cmp::min(t, pool_size)
672 } else {
673 pool_size
674 };
675
676 log::debug!(
677 "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
678 );
679
680 let healthy_transports: Vec<TransportClient> = self
681 .transports
682 .iter()
683 .filter(|t| t.is_healthy())
684 .take(actual_tries)
685 .cloned()
686 .collect();
687
688 if healthy_transports.is_empty() {
689 self.failed_submits.fetch_add(1, Ordering::Relaxed);
690 anyhow::bail!("No healthy transport clients available");
691 }
692
693 log::debug!(
694 "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
695 healthy_transports.len(),
696 );
697
698 let mut handles = Vec::new();
699 for transport in healthy_transports {
700 let handle = get_runtime().spawn(async move {
703 let client_id = transport.client_id.clone();
704 let result = transport
705 .submit_order(
706 instrument_id,
707 client_order_id,
708 order_side,
709 order_type,
710 quantity,
711 time_in_force,
712 price,
713 trigger_price,
714 trigger_type,
715 trailing_offset,
716 trailing_offset_type,
717 display_qty,
718 post_only,
719 reduce_only,
720 order_list_id,
721 contingency_type,
722 peg_price_type,
723 peg_offset_value,
724 )
725 .await;
726 (client_id, result)
727 });
728 handles.push(handle);
729 }
730
731 self.process_submit_results(
732 handles,
733 "Submit",
734 format!("(client_order_id={client_order_id:?})"),
735 )
736 .await
737 }
738
739 pub fn get_metrics(&self) -> BroadcasterMetrics {
741 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
742 let total_clients = self.transports.len();
743
744 BroadcasterMetrics {
745 total_submits: self.total_submits.load(Ordering::Relaxed),
746 successful_submits: self.successful_submits.load(Ordering::Relaxed),
747 failed_submits: self.failed_submits.load(Ordering::Relaxed),
748 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
749 healthy_clients,
750 total_clients,
751 }
752 }
753
754 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
756 self.get_metrics()
757 }
758
759 pub fn get_client_stats(&self) -> Vec<ClientStats> {
761 self.transports
762 .iter()
763 .map(|t| ClientStats {
764 client_id: t.client_id.clone(),
765 healthy: t.is_healthy(),
766 submit_count: t.get_submit_count(),
767 error_count: t.get_error_count(),
768 })
769 .collect()
770 }
771
772 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
774 self.get_client_stats()
775 }
776
777 pub fn cache_instrument(&self, instrument: InstrumentAny) {
779 for transport in self.transports.iter() {
780 transport.executor.add_instrument(instrument.clone());
781 }
782 }
783
784 #[must_use]
785 pub fn clone_for_async(&self) -> Self {
786 Self {
787 config: self.config.clone(),
788 transports: Arc::clone(&self.transports),
789 health_check_task: Arc::clone(&self.health_check_task),
790 running: Arc::clone(&self.running),
791 total_submits: Arc::clone(&self.total_submits),
792 successful_submits: Arc::clone(&self.successful_submits),
793 failed_submits: Arc::clone(&self.failed_submits),
794 expected_rejects: Arc::clone(&self.expected_rejects),
795 }
796 }
797
798 #[cfg(test)]
799 fn new_with_transports(
800 config: SubmitBroadcasterConfig,
801 transports: Vec<TransportClient>,
802 ) -> Self {
803 Self {
804 config,
805 transports: Arc::new(transports),
806 health_check_task: Arc::new(RwLock::new(None)),
807 running: Arc::new(AtomicBool::new(false)),
808 total_submits: Arc::new(AtomicU64::new(0)),
809 successful_submits: Arc::new(AtomicU64::new(0)),
810 failed_submits: Arc::new(AtomicU64::new(0)),
811 expected_rejects: Arc::new(AtomicU64::new(0)),
812 }
813 }
814}
815
816#[derive(Debug, Clone)]
818pub struct BroadcasterMetrics {
819 pub total_submits: u64,
820 pub successful_submits: u64,
821 pub failed_submits: u64,
822 pub expected_rejects: u64,
823 pub healthy_clients: usize,
824 pub total_clients: usize,
825}
826
827#[derive(Debug, Clone)]
829pub struct ClientStats {
830 pub client_id: String,
831 pub healthy: bool,
832 pub submit_count: u64,
833 pub error_count: u64,
834}
835
836#[cfg(test)]
837mod tests {
838 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
839
840 use nautilus_core::UUID4;
841 use nautilus_model::{
842 enums::{
843 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
844 },
845 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
846 reports::OrderStatusReport,
847 types::{Price, Quantity},
848 };
849
850 use super::*;
851
852 #[derive(Clone)]
854 #[allow(clippy::type_complexity)]
855 struct MockExecutor {
856 handler: Arc<
857 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
858 + Send
859 + Sync,
860 >,
861 }
862
863 impl MockExecutor {
864 fn new<F, Fut>(handler: F) -> Self
865 where
866 F: Fn() -> Fut + Send + Sync + 'static,
867 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
868 {
869 Self {
870 handler: Arc::new(move || Box::pin(handler())),
871 }
872 }
873 }
874
875 impl SubmitExecutor for MockExecutor {
876 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
877 Box::pin(async { Ok(()) })
878 }
879
880 #[allow(clippy::too_many_arguments)]
881 fn submit_order(
882 &self,
883 _instrument_id: InstrumentId,
884 _client_order_id: ClientOrderId,
885 _order_side: OrderSide,
886 _order_type: OrderType,
887 _quantity: Quantity,
888 _time_in_force: TimeInForce,
889 _price: Option<Price>,
890 _trigger_price: Option<Price>,
891 _trigger_type: Option<TriggerType>,
892 _trailing_offset: Option<f64>,
893 _trailing_offset_type: Option<TrailingOffsetType>,
894 _display_qty: Option<Quantity>,
895 _post_only: bool,
896 _reduce_only: bool,
897 _order_list_id: Option<OrderListId>,
898 _contingency_type: Option<ContingencyType>,
899 _peg_price_type: Option<BitmexPegPriceType>,
900 _peg_offset_value: Option<f64>,
901 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
902 (self.handler)()
903 }
904
905 fn add_instrument(&self, _instrument: InstrumentAny) {
906 }
908 }
909
910 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
911 OrderStatusReport {
912 account_id: AccountId::from("BITMEX-001"),
913 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
914 venue_order_id: VenueOrderId::from(venue_order_id),
915 order_side: OrderSide::Buy,
916 order_type: OrderType::Limit,
917 time_in_force: TimeInForce::Gtc,
918 order_status: OrderStatus::Accepted,
919 price: Some(Price::new(50000.0, 2)),
920 quantity: Quantity::new(100.0, 0),
921 filled_qty: Quantity::new(0.0, 0),
922 report_id: UUID4::new(),
923 ts_accepted: 0.into(),
924 ts_last: 0.into(),
925 ts_init: 0.into(),
926 client_order_id: None,
927 avg_px: None,
928 trigger_price: None,
929 trigger_type: None,
930 contingency_type: ContingencyType::NoContingency,
931 expire_time: None,
932 order_list_id: None,
933 venue_position_id: None,
934 linked_order_ids: None,
935 parent_order_id: None,
936 display_qty: None,
937 limit_offset: None,
938 trailing_offset: None,
939 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
940 post_only: false,
941 reduce_only: false,
942 cancel_reason: None,
943 ts_triggered: None,
944 }
945 }
946
947 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
948 where
949 F: Fn() -> Fut + Send + Sync + 'static,
950 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
951 {
952 let executor = MockExecutor::new(handler);
953 TransportClient::new(executor, client_id.to_string())
954 }
955
956 #[tokio::test]
957 async fn test_broadcast_submit_immediate_success() {
958 let report = create_test_report("ORDER-1");
959 let report_clone = report.clone();
960
961 let transports = vec![
962 create_stub_transport("client-0", move || {
963 let report = report_clone.clone();
964 async move { Ok(report) }
965 }),
966 create_stub_transport("client-1", || async {
967 tokio::time::sleep(Duration::from_secs(10)).await;
968 anyhow::bail!("Should be aborted")
969 }),
970 ];
971
972 let config = SubmitBroadcasterConfig::default();
973 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
974
975 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
976 let result = broadcaster
977 .broadcast_submit(
978 instrument_id,
979 ClientOrderId::from("O-123"),
980 OrderSide::Buy,
981 OrderType::Limit,
982 Quantity::new(100.0, 0),
983 TimeInForce::Gtc,
984 Some(Price::new(50000.0, 2)),
985 None,
986 None,
987 None,
988 None,
989 None,
990 false,
991 false,
992 None,
993 None,
994 None,
995 None,
996 None,
997 )
998 .await;
999
1000 assert!(result.is_ok());
1001 let returned_report = result.unwrap();
1002 assert_eq!(returned_report.venue_order_id, report.venue_order_id);
1003
1004 let metrics = broadcaster.get_metrics_async().await;
1005 assert_eq!(metrics.successful_submits, 1);
1006 assert_eq!(metrics.failed_submits, 0);
1007 assert_eq!(metrics.total_submits, 1);
1008 }
1009
1010 #[tokio::test]
1011 async fn test_broadcast_submit_duplicate_clordid_expected() {
1012 let transports = vec![
1013 create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
1014 create_stub_transport("client-1", || async {
1015 tokio::time::sleep(Duration::from_secs(10)).await;
1016 anyhow::bail!("Should be aborted")
1017 }),
1018 ];
1019
1020 let config = SubmitBroadcasterConfig::default();
1021 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1022
1023 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1024 let result = broadcaster
1025 .broadcast_submit(
1026 instrument_id,
1027 ClientOrderId::from("O-123"),
1028 OrderSide::Buy,
1029 OrderType::Limit,
1030 Quantity::new(100.0, 0),
1031 TimeInForce::Gtc,
1032 Some(Price::new(50000.0, 2)),
1033 None,
1034 None,
1035 None,
1036 None,
1037 None,
1038 false,
1039 false,
1040 None,
1041 None,
1042 None,
1043 None,
1044 None,
1045 )
1046 .await;
1047
1048 assert!(result.is_err());
1049
1050 let metrics = broadcaster.get_metrics_async().await;
1051 assert_eq!(metrics.expected_rejects, 1);
1052 assert_eq!(metrics.successful_submits, 0);
1053 assert_eq!(metrics.failed_submits, 1);
1054 }
1055
1056 #[tokio::test]
1057 async fn test_broadcast_submit_all_failures() {
1058 let transports = vec![
1059 create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1060 create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1061 ];
1062
1063 let config = SubmitBroadcasterConfig::default();
1064 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1065
1066 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1067 let result = broadcaster
1068 .broadcast_submit(
1069 instrument_id,
1070 ClientOrderId::from("O-456"),
1071 OrderSide::Sell,
1072 OrderType::Market,
1073 Quantity::new(50.0, 0),
1074 TimeInForce::Ioc,
1075 None,
1076 None,
1077 None,
1078 None,
1079 None,
1080 None,
1081 false,
1082 false,
1083 None,
1084 None,
1085 None,
1086 None,
1087 None,
1088 )
1089 .await;
1090
1091 assert!(result.is_err());
1092 assert!(
1093 result
1094 .unwrap_err()
1095 .to_string()
1096 .contains("All submit requests failed")
1097 );
1098
1099 let metrics = broadcaster.get_metrics_async().await;
1100 assert_eq!(metrics.failed_submits, 1);
1101 assert_eq!(metrics.successful_submits, 0);
1102 }
1103
1104 #[tokio::test]
1105 async fn test_broadcast_submit_no_healthy_clients() {
1106 let transport =
1107 create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1108 transport.healthy.store(false, Ordering::Relaxed);
1109
1110 let config = SubmitBroadcasterConfig::default();
1111 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1112
1113 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1114 let result = broadcaster
1115 .broadcast_submit(
1116 instrument_id,
1117 ClientOrderId::from("O-789"),
1118 OrderSide::Buy,
1119 OrderType::Limit,
1120 Quantity::new(100.0, 0),
1121 TimeInForce::Gtc,
1122 Some(Price::new(50000.0, 2)),
1123 None,
1124 None,
1125 None,
1126 None,
1127 None,
1128 false,
1129 false,
1130 None,
1131 None,
1132 None,
1133 None,
1134 None,
1135 )
1136 .await;
1137
1138 assert!(result.is_err());
1139 assert!(
1140 result
1141 .unwrap_err()
1142 .to_string()
1143 .contains("No healthy transport clients available")
1144 );
1145
1146 let metrics = broadcaster.get_metrics_async().await;
1147 assert_eq!(metrics.failed_submits, 1);
1148 }
1149
1150 #[tokio::test]
1151 async fn test_default_config() {
1152 let config = SubmitBroadcasterConfig {
1153 api_key: Some("test_key".to_string()),
1154 api_secret: Some("test_secret".to_string()),
1155 base_url: Some("https://test.example.com".to_string()),
1156 ..Default::default()
1157 };
1158
1159 let broadcaster = SubmitBroadcaster::new(config);
1160 assert!(broadcaster.is_ok());
1161
1162 let broadcaster = broadcaster.unwrap();
1163 let metrics = broadcaster.get_metrics_async().await;
1164
1165 assert_eq!(metrics.total_clients, 3);
1167 }
1168
1169 #[tokio::test]
1170 async fn test_broadcaster_lifecycle() {
1171 let config = SubmitBroadcasterConfig {
1172 pool_size: 2,
1173 api_key: Some("test_key".to_string()),
1174 api_secret: Some("test_secret".to_string()),
1175 base_url: Some("https://test.example.com".to_string()),
1176 testnet: false,
1177 timeout_secs: Some(5),
1178 max_retries: None,
1179 retry_delay_ms: None,
1180 retry_delay_max_ms: None,
1181 recv_window_ms: None,
1182 max_requests_per_second: None,
1183 max_requests_per_minute: None,
1184 health_check_interval_secs: 60,
1185 health_check_timeout_secs: 1,
1186 expected_reject_patterns: vec![],
1187 proxy_urls: vec![],
1188 };
1189
1190 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1191
1192 assert!(!broadcaster.running.load(Ordering::Relaxed));
1194
1195 let start_result = broadcaster.start().await;
1197 assert!(start_result.is_ok());
1198 assert!(broadcaster.running.load(Ordering::Relaxed));
1199
1200 let start_again = broadcaster.start().await;
1202 assert!(start_again.is_ok());
1203
1204 broadcaster.stop().await;
1206 assert!(!broadcaster.running.load(Ordering::Relaxed));
1207
1208 broadcaster.stop().await;
1210 assert!(!broadcaster.running.load(Ordering::Relaxed));
1211 }
1212
1213 #[tokio::test]
1214 async fn test_broadcast_submit_metrics_increment() {
1215 let report = create_test_report("ORDER-1");
1216 let report_clone = report.clone();
1217
1218 let transports = vec![create_stub_transport("client-0", move || {
1219 let report = report_clone.clone();
1220 async move { Ok(report) }
1221 })];
1222
1223 let config = SubmitBroadcasterConfig::default();
1224 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1225
1226 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1227 let _ = broadcaster
1228 .broadcast_submit(
1229 instrument_id,
1230 ClientOrderId::from("O-123"),
1231 OrderSide::Buy,
1232 OrderType::Limit,
1233 Quantity::new(100.0, 0),
1234 TimeInForce::Gtc,
1235 Some(Price::new(50000.0, 2)),
1236 None,
1237 None,
1238 None,
1239 None,
1240 None,
1241 false,
1242 false,
1243 None,
1244 None,
1245 None,
1246 None,
1247 None,
1248 )
1249 .await;
1250
1251 let metrics = broadcaster.get_metrics_async().await;
1252 assert_eq!(metrics.total_submits, 1);
1253 assert_eq!(metrics.successful_submits, 1);
1254 assert_eq!(metrics.failed_submits, 0);
1255 }
1256
1257 #[tokio::test]
1258 async fn test_broadcaster_creation_with_pool() {
1259 let config = SubmitBroadcasterConfig {
1260 pool_size: 4,
1261 api_key: Some("test_key".to_string()),
1262 api_secret: Some("test_secret".to_string()),
1263 base_url: Some("https://test.example.com".to_string()),
1264 ..Default::default()
1265 };
1266
1267 let broadcaster = SubmitBroadcaster::new(config);
1268 assert!(broadcaster.is_ok());
1269
1270 let broadcaster = broadcaster.unwrap();
1271 let metrics = broadcaster.get_metrics_async().await;
1272 assert_eq!(metrics.total_clients, 4);
1273 }
1274
1275 #[tokio::test]
1276 async fn test_client_stats_collection() {
1277 let transports = vec![
1280 create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
1281 create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1282 ];
1283
1284 let config = SubmitBroadcasterConfig::default();
1285 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1286
1287 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1288 let _ = broadcaster
1289 .broadcast_submit(
1290 instrument_id,
1291 ClientOrderId::from("O-123"),
1292 OrderSide::Buy,
1293 OrderType::Limit,
1294 Quantity::new(100.0, 0),
1295 TimeInForce::Gtc,
1296 Some(Price::new(50000.0, 2)),
1297 None,
1298 None,
1299 None,
1300 None,
1301 None,
1302 false,
1303 false,
1304 None,
1305 None,
1306 None,
1307 None,
1308 None,
1309 )
1310 .await;
1311
1312 let stats = broadcaster.get_client_stats_async().await;
1313 assert_eq!(stats.len(), 2);
1314
1315 let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1316 assert_eq!(client0.submit_count, 1);
1317 assert_eq!(client0.error_count, 1);
1318
1319 let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1320 assert_eq!(client1.submit_count, 1);
1321 assert_eq!(client1.error_count, 1);
1322 }
1323
1324 #[tokio::test]
1325 async fn test_testnet_config_sets_base_url() {
1326 let config = SubmitBroadcasterConfig {
1327 pool_size: 1,
1328 api_key: Some("test_key".to_string()),
1329 api_secret: Some("test_secret".to_string()),
1330 testnet: true,
1331 base_url: None,
1332 ..Default::default()
1333 };
1334
1335 let broadcaster = SubmitBroadcaster::new(config);
1336 assert!(broadcaster.is_ok());
1337 }
1338
1339 #[tokio::test]
1340 async fn test_clone_for_async() {
1341 let config = SubmitBroadcasterConfig {
1342 pool_size: 1,
1343 api_key: Some("test_key".to_string()),
1344 api_secret: Some("test_secret".to_string()),
1345 base_url: Some("https://test.example.com".to_string()),
1346 ..Default::default()
1347 };
1348
1349 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1350 let cloned = broadcaster.clone_for_async();
1351
1352 broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1354 assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1355 }
1356
1357 #[tokio::test]
1358 async fn test_pattern_matching() {
1359 let config = SubmitBroadcasterConfig {
1360 expected_reject_patterns: vec![
1361 "Duplicate clOrdID".to_string(),
1362 "Order already exists".to_string(),
1363 ],
1364 ..Default::default()
1365 };
1366
1367 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1368
1369 assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1370 assert!(broadcaster.is_expected_reject("Order already exists in system"));
1371 assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1372 assert!(!broadcaster.is_expected_reject("Internal server error"));
1373 }
1374
1375 #[tokio::test]
1376 async fn test_submit_metrics_with_mixed_responses() {
1377 let report = create_test_report("ORDER-1");
1378 let report_clone = report.clone();
1379
1380 let transports = vec![
1381 create_stub_transport("client-0", move || {
1382 let report = report_clone.clone();
1383 async move { Ok(report) }
1384 }),
1385 create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1386 ];
1387
1388 let config = SubmitBroadcasterConfig::default();
1389 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1390
1391 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1392 let result = broadcaster
1393 .broadcast_submit(
1394 instrument_id,
1395 ClientOrderId::from("O-123"),
1396 OrderSide::Buy,
1397 OrderType::Limit,
1398 Quantity::new(100.0, 0),
1399 TimeInForce::Gtc,
1400 Some(Price::new(50000.0, 2)),
1401 None,
1402 None,
1403 None,
1404 None,
1405 None,
1406 false,
1407 false,
1408 None,
1409 None,
1410 None,
1411 None,
1412 None,
1413 )
1414 .await;
1415
1416 assert!(result.is_ok());
1417
1418 let metrics = broadcaster.get_metrics_async().await;
1419 assert_eq!(metrics.total_submits, 1);
1420 assert_eq!(metrics.successful_submits, 1);
1421 assert_eq!(metrics.failed_submits, 0);
1422 }
1423
1424 #[tokio::test]
1425 async fn test_metrics_initialization_and_health() {
1426 let config = SubmitBroadcasterConfig {
1427 pool_size: 2,
1428 api_key: Some("test_key".to_string()),
1429 api_secret: Some("test_secret".to_string()),
1430 base_url: Some("https://test.example.com".to_string()),
1431 ..Default::default()
1432 };
1433
1434 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1435 let metrics = broadcaster.get_metrics_async().await;
1436
1437 assert_eq!(metrics.total_submits, 0);
1438 assert_eq!(metrics.successful_submits, 0);
1439 assert_eq!(metrics.failed_submits, 0);
1440 assert_eq!(metrics.expected_rejects, 0);
1441 assert_eq!(metrics.total_clients, 2);
1442 assert_eq!(metrics.healthy_clients, 2);
1443 }
1444
1445 #[tokio::test]
1446 async fn test_health_check_task_lifecycle() {
1447 let config = SubmitBroadcasterConfig {
1448 pool_size: 2,
1449 api_key: Some("test_key".to_string()),
1450 api_secret: Some("test_secret".to_string()),
1451 base_url: Some("https://test.example.com".to_string()),
1452 health_check_interval_secs: 1,
1453 ..Default::default()
1454 };
1455
1456 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1457
1458 broadcaster.start().await.unwrap();
1460 assert!(broadcaster.running.load(Ordering::Relaxed));
1461 assert!(
1462 broadcaster
1463 .health_check_task
1464 .read()
1465 .await
1466 .as_ref()
1467 .is_some()
1468 );
1469
1470 broadcaster.stop().await;
1472 assert!(!broadcaster.running.load(Ordering::Relaxed));
1473 }
1474
1475 #[tokio::test]
1476 async fn test_expected_reject_pattern_comprehensive() {
1477 let transports = vec![
1478 create_stub_transport("client-0", || async {
1479 anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1480 }),
1481 create_stub_transport("client-1", || async {
1482 tokio::time::sleep(Duration::from_secs(10)).await;
1483 anyhow::bail!("Should be aborted")
1484 }),
1485 ];
1486
1487 let config = SubmitBroadcasterConfig::default();
1488 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1489
1490 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1491 let result = broadcaster
1492 .broadcast_submit(
1493 instrument_id,
1494 ClientOrderId::from("O-123"),
1495 OrderSide::Buy,
1496 OrderType::Limit,
1497 Quantity::new(100.0, 0),
1498 TimeInForce::Gtc,
1499 Some(Price::new(50000.0, 2)),
1500 None,
1501 None,
1502 None,
1503 None,
1504 None,
1505 false,
1506 false,
1507 None,
1508 None,
1509 None,
1510 None,
1511 None,
1512 )
1513 .await;
1514
1515 assert!(result.is_err());
1517
1518 let metrics = broadcaster.get_metrics_async().await;
1519 assert_eq!(metrics.expected_rejects, 1);
1520 assert_eq!(metrics.failed_submits, 1);
1521 assert_eq!(metrics.successful_submits, 0);
1522 }
1523
1524 #[tokio::test]
1525 async fn test_client_order_id_suffix_for_multiple_clients() {
1526 use std::sync::{Arc, Mutex};
1527
1528 #[derive(Clone)]
1529 struct CaptureExecutor {
1530 captured_ids: Arc<Mutex<Vec<String>>>,
1531 barrier: Arc<tokio::sync::Barrier>,
1532 report: OrderStatusReport,
1533 }
1534
1535 impl SubmitExecutor for CaptureExecutor {
1536 fn health_check(
1537 &self,
1538 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1539 Box::pin(async { Ok(()) })
1540 }
1541
1542 #[allow(clippy::too_many_arguments)]
1543 fn submit_order(
1544 &self,
1545 _instrument_id: InstrumentId,
1546 client_order_id: ClientOrderId,
1547 _order_side: OrderSide,
1548 _order_type: OrderType,
1549 _quantity: Quantity,
1550 _time_in_force: TimeInForce,
1551 _price: Option<Price>,
1552 _trigger_price: Option<Price>,
1553 _trigger_type: Option<TriggerType>,
1554 _trailing_offset: Option<f64>,
1555 _trailing_offset_type: Option<TrailingOffsetType>,
1556 _display_qty: Option<Quantity>,
1557 _post_only: bool,
1558 _reduce_only: bool,
1559 _order_list_id: Option<OrderListId>,
1560 _contingency_type: Option<ContingencyType>,
1561 _peg_price_type: Option<BitmexPegPriceType>,
1562 _peg_offset_value: Option<f64>,
1563 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1564 {
1565 self.captured_ids
1567 .lock()
1568 .unwrap()
1569 .push(client_order_id.as_str().to_string());
1570 let report = self.report.clone();
1571 let barrier = Arc::clone(&self.barrier);
1572 Box::pin(async move {
1575 barrier.wait().await;
1576 Ok(report)
1577 })
1578 }
1579
1580 fn add_instrument(&self, _instrument: InstrumentAny) {}
1581 }
1582
1583 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1584 let barrier = Arc::new(tokio::sync::Barrier::new(3));
1585 let report = create_test_report("ORDER-1");
1586
1587 let transports = vec![
1588 TransportClient::new(
1589 CaptureExecutor {
1590 captured_ids: Arc::clone(&captured_ids),
1591 barrier: Arc::clone(&barrier),
1592 report: report.clone(),
1593 },
1594 "client-0".to_string(),
1595 ),
1596 TransportClient::new(
1597 CaptureExecutor {
1598 captured_ids: Arc::clone(&captured_ids),
1599 barrier: Arc::clone(&barrier),
1600 report: report.clone(),
1601 },
1602 "client-1".to_string(),
1603 ),
1604 TransportClient::new(
1605 CaptureExecutor {
1606 captured_ids: Arc::clone(&captured_ids),
1607 barrier: Arc::clone(&barrier),
1608 report: report.clone(),
1609 },
1610 "client-2".to_string(),
1611 ),
1612 ];
1613
1614 let config = SubmitBroadcasterConfig::default();
1615 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1616
1617 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1618 let result = broadcaster
1619 .broadcast_submit(
1620 instrument_id,
1621 ClientOrderId::from("O-123"),
1622 OrderSide::Buy,
1623 OrderType::Limit,
1624 Quantity::new(100.0, 0),
1625 TimeInForce::Gtc,
1626 Some(Price::new(50000.0, 2)),
1627 None,
1628 None,
1629 None,
1630 None,
1631 None,
1632 false,
1633 false,
1634 None,
1635 None,
1636 None,
1637 None,
1638 None,
1639 )
1640 .await;
1641
1642 assert!(result.is_ok());
1643
1644 let ids = captured_ids.lock().unwrap();
1646 assert_eq!(ids.len(), 3);
1647 assert!(ids.iter().all(|id| id == "O-123")); }
1649
1650 #[tokio::test]
1651 async fn test_client_order_id_suffix_with_partial_failure() {
1652 use std::sync::{Arc, Mutex};
1653
1654 #[derive(Clone)]
1655 struct CaptureAndFailExecutor {
1656 captured_ids: Arc<Mutex<Vec<String>>>,
1657 barrier: Arc<tokio::sync::Barrier>,
1658 should_succeed: bool,
1659 }
1660
1661 impl SubmitExecutor for CaptureAndFailExecutor {
1662 fn health_check(
1663 &self,
1664 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1665 Box::pin(async { Ok(()) })
1666 }
1667
1668 #[allow(clippy::too_many_arguments)]
1669 fn submit_order(
1670 &self,
1671 _instrument_id: InstrumentId,
1672 client_order_id: ClientOrderId,
1673 _order_side: OrderSide,
1674 _order_type: OrderType,
1675 _quantity: Quantity,
1676 _time_in_force: TimeInForce,
1677 _price: Option<Price>,
1678 _trigger_price: Option<Price>,
1679 _trigger_type: Option<TriggerType>,
1680 _trailing_offset: Option<f64>,
1681 _trailing_offset_type: Option<TrailingOffsetType>,
1682 _display_qty: Option<Quantity>,
1683 _post_only: bool,
1684 _reduce_only: bool,
1685 _order_list_id: Option<OrderListId>,
1686 _contingency_type: Option<ContingencyType>,
1687 _peg_price_type: Option<BitmexPegPriceType>,
1688 _peg_offset_value: Option<f64>,
1689 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1690 {
1691 self.captured_ids
1693 .lock()
1694 .unwrap()
1695 .push(client_order_id.as_str().to_string());
1696 let barrier = Arc::clone(&self.barrier);
1697 let should_succeed = self.should_succeed;
1698 Box::pin(async move {
1701 barrier.wait().await;
1702 if should_succeed {
1703 Ok(create_test_report("ORDER-1"))
1704 } else {
1705 anyhow::bail!("Network error")
1706 }
1707 })
1708 }
1709
1710 fn add_instrument(&self, _instrument: InstrumentAny) {}
1711 }
1712
1713 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1714 let barrier = Arc::new(tokio::sync::Barrier::new(2));
1715
1716 let transports = vec![
1717 TransportClient::new(
1718 CaptureAndFailExecutor {
1719 captured_ids: Arc::clone(&captured_ids),
1720 barrier: Arc::clone(&barrier),
1721 should_succeed: false,
1722 },
1723 "client-0".to_string(),
1724 ),
1725 TransportClient::new(
1726 CaptureAndFailExecutor {
1727 captured_ids: Arc::clone(&captured_ids),
1728 barrier: Arc::clone(&barrier),
1729 should_succeed: true,
1730 },
1731 "client-1".to_string(),
1732 ),
1733 ];
1734
1735 let config = SubmitBroadcasterConfig::default();
1736 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1737
1738 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1739 let result = broadcaster
1740 .broadcast_submit(
1741 instrument_id,
1742 ClientOrderId::from("O-456"),
1743 OrderSide::Sell,
1744 OrderType::Market,
1745 Quantity::new(50.0, 0),
1746 TimeInForce::Ioc,
1747 None,
1748 None,
1749 None,
1750 None,
1751 None,
1752 None,
1753 false,
1754 false,
1755 None,
1756 None,
1757 None,
1758 None,
1759 None,
1760 )
1761 .await;
1762
1763 assert!(result.is_ok());
1764
1765 let ids = captured_ids.lock().unwrap();
1767 assert_eq!(ids.len(), 2);
1768 assert!(ids.iter().all(|id| id == "O-456")); }
1770
1771 #[tokio::test]
1772 async fn test_proxy_urls_populated_from_config() {
1773 let config = SubmitBroadcasterConfig {
1774 pool_size: 3,
1775 api_key: Some("test_key".to_string()),
1776 api_secret: Some("test_secret".to_string()),
1777 proxy_urls: vec![
1778 Some("http://proxy1:8080".to_string()),
1779 Some("http://proxy2:8080".to_string()),
1780 Some("http://proxy3:8080".to_string()),
1781 ],
1782 ..Default::default()
1783 };
1784
1785 assert_eq!(config.proxy_urls.len(), 3);
1786 assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1787 assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1788 assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1789 }
1790}