1use std::{
37 fmt::Debug,
38 future::Future,
39 pin::Pin,
40 sync::{
41 Arc,
42 atomic::{AtomicBool, AtomicU64, Ordering},
43 },
44 time::Duration,
45};
46
47use futures_util::future;
48use nautilus_common::live::get_runtime;
49use nautilus_model::{
50 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
51 identifiers::{ClientOrderId, InstrumentId, OrderListId},
52 instruments::InstrumentAny,
53 reports::OrderStatusReport,
54 types::{Price, Quantity},
55};
56use tokio::{sync::RwLock, task::JoinHandle, time::interval};
57
58use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
59
60trait SubmitExecutor: Send + Sync {
82 fn add_instrument(&self, instrument: InstrumentAny);
84
85 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
87
88 #[allow(clippy::too_many_arguments)]
90 fn submit_order(
91 &self,
92 instrument_id: InstrumentId,
93 client_order_id: ClientOrderId,
94 order_side: OrderSide,
95 order_type: OrderType,
96 quantity: Quantity,
97 time_in_force: TimeInForce,
98 price: Option<Price>,
99 trigger_price: Option<Price>,
100 trigger_type: Option<TriggerType>,
101 display_qty: Option<Quantity>,
102 post_only: bool,
103 reduce_only: bool,
104 order_list_id: Option<OrderListId>,
105 contingency_type: Option<ContingencyType>,
106 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
107}
108
109impl SubmitExecutor for BitmexHttpClient {
110 fn add_instrument(&self, instrument: InstrumentAny) {
111 Self::cache_instrument(self, instrument);
112 }
113
114 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
115 Box::pin(async move {
116 Self::get_server_time(self)
117 .await
118 .map(|_| ())
119 .map_err(|e| anyhow::anyhow!("{e}"))
120 })
121 }
122
123 #[allow(clippy::too_many_arguments)]
124 fn submit_order(
125 &self,
126 instrument_id: InstrumentId,
127 client_order_id: ClientOrderId,
128 order_side: OrderSide,
129 order_type: OrderType,
130 quantity: Quantity,
131 time_in_force: TimeInForce,
132 price: Option<Price>,
133 trigger_price: Option<Price>,
134 trigger_type: Option<TriggerType>,
135 display_qty: Option<Quantity>,
136 post_only: bool,
137 reduce_only: bool,
138 order_list_id: Option<OrderListId>,
139 contingency_type: Option<ContingencyType>,
140 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
141 Box::pin(async move {
142 Self::submit_order(
143 self,
144 instrument_id,
145 client_order_id,
146 order_side,
147 order_type,
148 quantity,
149 time_in_force,
150 price,
151 trigger_price,
152 trigger_type,
153 display_qty,
154 post_only,
155 reduce_only,
156 order_list_id,
157 contingency_type,
158 )
159 .await
160 })
161 }
162}
163
164#[derive(Debug, Clone)]
166pub struct SubmitBroadcasterConfig {
167 pub pool_size: usize,
169 pub api_key: Option<String>,
171 pub api_secret: Option<String>,
173 pub base_url: Option<String>,
175 pub testnet: bool,
177 pub timeout_secs: Option<u64>,
179 pub max_retries: Option<u32>,
181 pub retry_delay_ms: Option<u64>,
183 pub retry_delay_max_ms: Option<u64>,
185 pub recv_window_ms: Option<u64>,
187 pub max_requests_per_second: Option<u32>,
189 pub max_requests_per_minute: Option<u32>,
191 pub health_check_interval_secs: u64,
193 pub health_check_timeout_secs: u64,
195 pub expected_reject_patterns: Vec<String>,
197 pub proxy_urls: Vec<Option<String>>,
203}
204
205impl Default for SubmitBroadcasterConfig {
206 fn default() -> Self {
207 Self {
208 pool_size: 3,
209 api_key: None,
210 api_secret: None,
211 base_url: None,
212 testnet: false,
213 timeout_secs: Some(60),
214 max_retries: None,
215 retry_delay_ms: Some(1_000),
216 retry_delay_max_ms: Some(5_000),
217 recv_window_ms: Some(10_000),
218 max_requests_per_second: Some(10),
219 max_requests_per_minute: Some(120),
220 health_check_interval_secs: 30,
221 health_check_timeout_secs: 5,
222 expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
223 proxy_urls: vec![],
224 }
225 }
226}
227
228#[derive(Clone)]
230struct TransportClient {
231 executor: Arc<dyn SubmitExecutor>,
236 client_id: String,
237 healthy: Arc<AtomicBool>,
238 submit_count: Arc<AtomicU64>,
239 error_count: Arc<AtomicU64>,
240}
241
242impl Debug for TransportClient {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 f.debug_struct(stringify!(TransportClient))
245 .field("client_id", &self.client_id)
246 .field("healthy", &self.healthy)
247 .field("submit_count", &self.submit_count)
248 .field("error_count", &self.error_count)
249 .finish()
250 }
251}
252
253impl TransportClient {
254 fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
255 Self {
256 executor: Arc::new(executor),
257 client_id,
258 healthy: Arc::new(AtomicBool::new(true)),
259 submit_count: Arc::new(AtomicU64::new(0)),
260 error_count: Arc::new(AtomicU64::new(0)),
261 }
262 }
263
264 fn is_healthy(&self) -> bool {
265 self.healthy.load(Ordering::Relaxed)
266 }
267
268 fn mark_healthy(&self) {
269 self.healthy.store(true, Ordering::Relaxed);
270 }
271
272 fn mark_unhealthy(&self) {
273 self.healthy.store(false, Ordering::Relaxed);
274 }
275
276 fn get_submit_count(&self) -> u64 {
277 self.submit_count.load(Ordering::Relaxed)
278 }
279
280 fn get_error_count(&self) -> u64 {
281 self.error_count.load(Ordering::Relaxed)
282 }
283
284 async fn health_check(&self, timeout_secs: u64) -> bool {
285 match tokio::time::timeout(
286 Duration::from_secs(timeout_secs),
287 self.executor.health_check(),
288 )
289 .await
290 {
291 Ok(Ok(())) => {
292 self.mark_healthy();
293 true
294 }
295 Ok(Err(e)) => {
296 log::warn!("Health check failed for client {}: {e:?}", self.client_id);
297 self.mark_unhealthy();
298 false
299 }
300 Err(_) => {
301 log::warn!("Health check timeout for client {}", self.client_id);
302 self.mark_unhealthy();
303 false
304 }
305 }
306 }
307
308 #[allow(clippy::too_many_arguments)]
309 async fn submit_order(
310 &self,
311 instrument_id: InstrumentId,
312 client_order_id: ClientOrderId,
313 order_side: OrderSide,
314 order_type: OrderType,
315 quantity: Quantity,
316 time_in_force: TimeInForce,
317 price: Option<Price>,
318 trigger_price: Option<Price>,
319 trigger_type: Option<TriggerType>,
320 display_qty: Option<Quantity>,
321 post_only: bool,
322 reduce_only: bool,
323 order_list_id: Option<OrderListId>,
324 contingency_type: Option<ContingencyType>,
325 ) -> anyhow::Result<OrderStatusReport> {
326 self.submit_count.fetch_add(1, Ordering::Relaxed);
327
328 match self
329 .executor
330 .submit_order(
331 instrument_id,
332 client_order_id,
333 order_side,
334 order_type,
335 quantity,
336 time_in_force,
337 price,
338 trigger_price,
339 trigger_type,
340 display_qty,
341 post_only,
342 reduce_only,
343 order_list_id,
344 contingency_type,
345 )
346 .await
347 {
348 Ok(report) => {
349 self.mark_healthy();
350 Ok(report)
351 }
352 Err(e) => {
353 self.error_count.fetch_add(1, Ordering::Relaxed);
354 Err(e)
355 }
356 }
357 }
358}
359
360#[cfg_attr(feature = "python", pyo3::pyclass)]
366#[derive(Debug)]
367pub struct SubmitBroadcaster {
368 config: SubmitBroadcasterConfig,
369 transports: Arc<Vec<TransportClient>>,
370 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
371 running: Arc<AtomicBool>,
372 total_submits: Arc<AtomicU64>,
373 successful_submits: Arc<AtomicU64>,
374 failed_submits: Arc<AtomicU64>,
375 expected_rejects: Arc<AtomicU64>,
376}
377
378impl SubmitBroadcaster {
379 pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
385 let mut transports = Vec::with_capacity(config.pool_size);
386
387 let base_url = if config.testnet && config.base_url.is_none() {
389 Some(BITMEX_HTTP_TESTNET_URL.to_string())
390 } else {
391 config.base_url.clone()
392 };
393
394 for i in 0..config.pool_size {
395 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
397
398 let client = BitmexHttpClient::with_credentials(
399 config.api_key.clone(),
400 config.api_secret.clone(),
401 base_url.clone(),
402 config.timeout_secs,
403 config.max_retries,
404 config.retry_delay_ms,
405 config.retry_delay_max_ms,
406 config.recv_window_ms,
407 config.max_requests_per_second,
408 config.max_requests_per_minute,
409 proxy_url,
410 )
411 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
412
413 transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
414 }
415
416 Ok(Self {
417 config,
418 transports: Arc::new(transports),
419 health_check_task: Arc::new(RwLock::new(None)),
420 running: Arc::new(AtomicBool::new(false)),
421 total_submits: Arc::new(AtomicU64::new(0)),
422 successful_submits: Arc::new(AtomicU64::new(0)),
423 failed_submits: Arc::new(AtomicU64::new(0)),
424 expected_rejects: Arc::new(AtomicU64::new(0)),
425 })
426 }
427
428 pub async fn start(&self) -> anyhow::Result<()> {
434 if self.running.load(Ordering::Relaxed) {
435 return Ok(());
436 }
437
438 self.running.store(true, Ordering::Relaxed);
439
440 self.run_health_checks().await;
442
443 let transports = Arc::clone(&self.transports);
445 let running = Arc::clone(&self.running);
446 let interval_secs = self.config.health_check_interval_secs;
447 let timeout_secs = self.config.health_check_timeout_secs;
448
449 let task = get_runtime().spawn(async move {
450 let mut ticker = interval(Duration::from_secs(interval_secs));
451 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
452
453 loop {
454 ticker.tick().await;
455
456 if !running.load(Ordering::Relaxed) {
457 break;
458 }
459
460 let tasks: Vec<_> = transports
461 .iter()
462 .map(|t| t.health_check(timeout_secs))
463 .collect();
464
465 let results = future::join_all(tasks).await;
466 let healthy_count = results.iter().filter(|&&r| r).count();
467
468 log::debug!(
469 "Health check complete: {healthy_count}/{} clients healthy",
470 results.len()
471 );
472 }
473 });
474
475 *self.health_check_task.write().await = Some(task);
476
477 log::info!(
478 "SubmitBroadcaster started with {} clients",
479 self.transports.len()
480 );
481
482 Ok(())
483 }
484
485 pub async fn stop(&self) {
487 if !self.running.load(Ordering::Relaxed) {
488 return;
489 }
490
491 self.running.store(false, Ordering::Relaxed);
492
493 if let Some(task) = self.health_check_task.write().await.take() {
494 task.abort();
495 }
496
497 log::info!("SubmitBroadcaster stopped");
498 }
499
500 async fn run_health_checks(&self) {
501 let tasks: Vec<_> = self
502 .transports
503 .iter()
504 .map(|t| t.health_check(self.config.health_check_timeout_secs))
505 .collect();
506
507 let results = future::join_all(tasks).await;
508 let healthy_count = results.iter().filter(|&&r| r).count();
509
510 log::debug!(
511 "Health check complete: {healthy_count}/{} clients healthy",
512 results.len()
513 );
514 }
515
516 fn is_expected_reject(&self, error_message: &str) -> bool {
517 self.config
518 .expected_reject_patterns
519 .iter()
520 .any(|pattern| error_message.contains(pattern))
521 }
522
523 async fn process_submit_results<T>(
527 &self,
528 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
529 operation: &str,
530 params: String,
531 ) -> anyhow::Result<T>
532 where
533 T: Send + 'static,
534 {
535 let mut errors = Vec::new();
536
537 while !handles.is_empty() {
538 let current_handles = std::mem::take(&mut handles);
539 let (result, _idx, remaining) = future::select_all(current_handles).await;
540 handles = remaining.into_iter().collect();
541
542 match result {
543 Ok((client_id, Ok(result))) => {
544 for handle in &handles {
546 handle.abort();
547 }
548 self.successful_submits.fetch_add(1, Ordering::Relaxed);
549 log::debug!("{operation} broadcast succeeded [{client_id}] {params}",);
550 return Ok(result);
551 }
552 Ok((client_id, Err(e))) => {
553 let error_msg = e.to_string();
554
555 if self.is_expected_reject(&error_msg) {
556 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
557 log::debug!(
558 "Expected {} rejection [{client_id}]: {error_msg} {params}",
559 operation.to_lowercase(),
560 );
561 errors.push(error_msg);
562 } else {
563 log::warn!(
564 "{operation} request failed [{client_id}]: {error_msg} {params}",
565 );
566 errors.push(error_msg);
567 }
568 }
569 Err(e) => {
570 log::warn!("{operation} task join error: {e:?}");
571 errors.push(format!("Task panicked: {e:?}"));
572 }
573 }
574 }
575
576 self.failed_submits.fetch_add(1, Ordering::Relaxed);
578 log::error!(
579 "All {} requests failed: {errors:?} {params}",
580 operation.to_lowercase(),
581 );
582 Err(anyhow::anyhow!(
583 "All {} requests failed: {:?}",
584 operation.to_lowercase(),
585 errors
586 ))
587 }
588
589 #[allow(clippy::too_many_arguments)]
600 pub async fn broadcast_submit(
601 &self,
602 instrument_id: InstrumentId,
603 client_order_id: ClientOrderId,
604 order_side: OrderSide,
605 order_type: OrderType,
606 quantity: Quantity,
607 time_in_force: TimeInForce,
608 price: Option<Price>,
609 trigger_price: Option<Price>,
610 trigger_type: Option<TriggerType>,
611 display_qty: Option<Quantity>,
612 post_only: bool,
613 reduce_only: bool,
614 order_list_id: Option<OrderListId>,
615 contingency_type: Option<ContingencyType>,
616 submit_tries: Option<usize>,
617 ) -> anyhow::Result<OrderStatusReport> {
618 self.total_submits.fetch_add(1, Ordering::Relaxed);
619
620 let pool_size = self.config.pool_size;
621 let actual_tries = if let Some(t) = submit_tries {
622 if t > pool_size {
623 log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
625 }
626 std::cmp::min(t, pool_size)
627 } else {
628 pool_size
629 };
630
631 log::debug!(
632 "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
633 );
634
635 let healthy_transports: Vec<TransportClient> = self
636 .transports
637 .iter()
638 .filter(|t| t.is_healthy())
639 .take(actual_tries)
640 .cloned()
641 .collect();
642
643 if healthy_transports.is_empty() {
644 self.failed_submits.fetch_add(1, Ordering::Relaxed);
645 anyhow::bail!("No healthy transport clients available");
646 }
647
648 log::debug!(
649 "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
650 healthy_transports.len(),
651 );
652
653 let mut handles = Vec::new();
654 for (idx, transport) in healthy_transports.into_iter().enumerate() {
655 let modified_client_order_id = if idx == 0 {
657 client_order_id
658 } else {
659 ClientOrderId::new(format!("{}-{}", client_order_id.as_str(), idx))
660 };
661
662 let handle = get_runtime().spawn(async move {
663 let client_id = transport.client_id.clone();
664 let result = transport
665 .submit_order(
666 instrument_id,
667 modified_client_order_id,
668 order_side,
669 order_type,
670 quantity,
671 time_in_force,
672 price,
673 trigger_price,
674 trigger_type,
675 display_qty,
676 post_only,
677 reduce_only,
678 order_list_id,
679 contingency_type,
680 )
681 .await;
682 (client_id, result)
683 });
684 handles.push(handle);
685 }
686
687 self.process_submit_results(
688 handles,
689 "Submit",
690 format!("(client_order_id={client_order_id:?})"),
691 )
692 .await
693 }
694
695 pub fn get_metrics(&self) -> BroadcasterMetrics {
697 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
698 let total_clients = self.transports.len();
699
700 BroadcasterMetrics {
701 total_submits: self.total_submits.load(Ordering::Relaxed),
702 successful_submits: self.successful_submits.load(Ordering::Relaxed),
703 failed_submits: self.failed_submits.load(Ordering::Relaxed),
704 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
705 healthy_clients,
706 total_clients,
707 }
708 }
709
710 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
712 self.get_metrics()
713 }
714
715 pub fn get_client_stats(&self) -> Vec<ClientStats> {
717 self.transports
718 .iter()
719 .map(|t| ClientStats {
720 client_id: t.client_id.clone(),
721 healthy: t.is_healthy(),
722 submit_count: t.get_submit_count(),
723 error_count: t.get_error_count(),
724 })
725 .collect()
726 }
727
728 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
730 self.get_client_stats()
731 }
732
733 pub fn cache_instrument(&self, instrument: InstrumentAny) {
735 for transport in self.transports.iter() {
736 transport.executor.add_instrument(instrument.clone());
737 }
738 }
739
740 #[must_use]
741 pub fn clone_for_async(&self) -> Self {
742 Self {
743 config: self.config.clone(),
744 transports: Arc::clone(&self.transports),
745 health_check_task: Arc::clone(&self.health_check_task),
746 running: Arc::clone(&self.running),
747 total_submits: Arc::clone(&self.total_submits),
748 successful_submits: Arc::clone(&self.successful_submits),
749 failed_submits: Arc::clone(&self.failed_submits),
750 expected_rejects: Arc::clone(&self.expected_rejects),
751 }
752 }
753
754 #[cfg(test)]
755 fn new_with_transports(
756 config: SubmitBroadcasterConfig,
757 transports: Vec<TransportClient>,
758 ) -> Self {
759 Self {
760 config,
761 transports: Arc::new(transports),
762 health_check_task: Arc::new(RwLock::new(None)),
763 running: Arc::new(AtomicBool::new(false)),
764 total_submits: Arc::new(AtomicU64::new(0)),
765 successful_submits: Arc::new(AtomicU64::new(0)),
766 failed_submits: Arc::new(AtomicU64::new(0)),
767 expected_rejects: Arc::new(AtomicU64::new(0)),
768 }
769 }
770}
771
772#[derive(Debug, Clone)]
774pub struct BroadcasterMetrics {
775 pub total_submits: u64,
776 pub successful_submits: u64,
777 pub failed_submits: u64,
778 pub expected_rejects: u64,
779 pub healthy_clients: usize,
780 pub total_clients: usize,
781}
782
783#[derive(Debug, Clone)]
785pub struct ClientStats {
786 pub client_id: String,
787 pub healthy: bool,
788 pub submit_count: u64,
789 pub error_count: u64,
790}
791
792#[cfg(test)]
793mod tests {
794 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
795
796 use nautilus_core::UUID4;
797 use nautilus_model::{
798 enums::{
799 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
800 },
801 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
802 reports::OrderStatusReport,
803 types::{Price, Quantity},
804 };
805
806 use super::*;
807
808 #[derive(Clone)]
810 #[allow(clippy::type_complexity)]
811 struct MockExecutor {
812 handler: Arc<
813 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
814 + Send
815 + Sync,
816 >,
817 }
818
819 impl MockExecutor {
820 fn new<F, Fut>(handler: F) -> Self
821 where
822 F: Fn() -> Fut + Send + Sync + 'static,
823 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
824 {
825 Self {
826 handler: Arc::new(move || Box::pin(handler())),
827 }
828 }
829 }
830
831 impl SubmitExecutor for MockExecutor {
832 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
833 Box::pin(async { Ok(()) })
834 }
835
836 #[allow(clippy::too_many_arguments)]
837 fn submit_order(
838 &self,
839 _instrument_id: InstrumentId,
840 _client_order_id: ClientOrderId,
841 _order_side: OrderSide,
842 _order_type: OrderType,
843 _quantity: Quantity,
844 _time_in_force: TimeInForce,
845 _price: Option<Price>,
846 _trigger_price: Option<Price>,
847 _trigger_type: Option<TriggerType>,
848 _display_qty: Option<Quantity>,
849 _post_only: bool,
850 _reduce_only: bool,
851 _order_list_id: Option<OrderListId>,
852 _contingency_type: Option<ContingencyType>,
853 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
854 (self.handler)()
855 }
856
857 fn add_instrument(&self, _instrument: InstrumentAny) {
858 }
860 }
861
862 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
863 OrderStatusReport {
864 account_id: AccountId::from("BITMEX-001"),
865 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
866 venue_order_id: VenueOrderId::from(venue_order_id),
867 order_side: OrderSide::Buy,
868 order_type: OrderType::Limit,
869 time_in_force: TimeInForce::Gtc,
870 order_status: OrderStatus::Accepted,
871 price: Some(Price::new(50000.0, 2)),
872 quantity: Quantity::new(100.0, 0),
873 filled_qty: Quantity::new(0.0, 0),
874 report_id: UUID4::new(),
875 ts_accepted: 0.into(),
876 ts_last: 0.into(),
877 ts_init: 0.into(),
878 client_order_id: None,
879 avg_px: None,
880 trigger_price: None,
881 trigger_type: None,
882 contingency_type: ContingencyType::NoContingency,
883 expire_time: None,
884 order_list_id: None,
885 venue_position_id: None,
886 linked_order_ids: None,
887 parent_order_id: None,
888 display_qty: None,
889 limit_offset: None,
890 trailing_offset: None,
891 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
892 post_only: false,
893 reduce_only: false,
894 cancel_reason: None,
895 ts_triggered: None,
896 }
897 }
898
899 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
900 where
901 F: Fn() -> Fut + Send + Sync + 'static,
902 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
903 {
904 let executor = MockExecutor::new(handler);
905 TransportClient::new(executor, client_id.to_string())
906 }
907
908 #[tokio::test]
909 async fn test_broadcast_submit_immediate_success() {
910 let report = create_test_report("ORDER-1");
911 let report_clone = report.clone();
912
913 let transports = vec![
914 create_stub_transport("client-0", move || {
915 let report = report_clone.clone();
916 async move { Ok(report) }
917 }),
918 create_stub_transport("client-1", || async {
919 tokio::time::sleep(Duration::from_secs(10)).await;
920 anyhow::bail!("Should be aborted")
921 }),
922 ];
923
924 let config = SubmitBroadcasterConfig::default();
925 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
926
927 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
928 let result = broadcaster
929 .broadcast_submit(
930 instrument_id,
931 ClientOrderId::from("O-123"),
932 OrderSide::Buy,
933 OrderType::Limit,
934 Quantity::new(100.0, 0),
935 TimeInForce::Gtc,
936 Some(Price::new(50000.0, 2)),
937 None,
938 None,
939 None,
940 false,
941 false,
942 None,
943 None,
944 None,
945 )
946 .await;
947
948 assert!(result.is_ok());
949 let returned_report = result.unwrap();
950 assert_eq!(returned_report.venue_order_id, report.venue_order_id);
951
952 let metrics = broadcaster.get_metrics_async().await;
953 assert_eq!(metrics.successful_submits, 1);
954 assert_eq!(metrics.failed_submits, 0);
955 assert_eq!(metrics.total_submits, 1);
956 }
957
958 #[tokio::test]
959 async fn test_broadcast_submit_duplicate_clordid_expected() {
960 let transports = vec![
961 create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
962 create_stub_transport("client-1", || async {
963 tokio::time::sleep(Duration::from_secs(10)).await;
964 anyhow::bail!("Should be aborted")
965 }),
966 ];
967
968 let config = SubmitBroadcasterConfig::default();
969 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
970
971 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
972 let result = broadcaster
973 .broadcast_submit(
974 instrument_id,
975 ClientOrderId::from("O-123"),
976 OrderSide::Buy,
977 OrderType::Limit,
978 Quantity::new(100.0, 0),
979 TimeInForce::Gtc,
980 Some(Price::new(50000.0, 2)),
981 None,
982 None,
983 None,
984 false,
985 false,
986 None,
987 None,
988 None,
989 )
990 .await;
991
992 assert!(result.is_err());
993
994 let metrics = broadcaster.get_metrics_async().await;
995 assert_eq!(metrics.expected_rejects, 1);
996 assert_eq!(metrics.successful_submits, 0);
997 assert_eq!(metrics.failed_submits, 1);
998 }
999
1000 #[tokio::test]
1001 async fn test_broadcast_submit_all_failures() {
1002 let transports = vec![
1003 create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1004 create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1005 ];
1006
1007 let config = SubmitBroadcasterConfig::default();
1008 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1009
1010 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1011 let result = broadcaster
1012 .broadcast_submit(
1013 instrument_id,
1014 ClientOrderId::from("O-456"),
1015 OrderSide::Sell,
1016 OrderType::Market,
1017 Quantity::new(50.0, 0),
1018 TimeInForce::Ioc,
1019 None,
1020 None,
1021 None,
1022 None,
1023 false,
1024 false,
1025 None,
1026 None,
1027 None,
1028 )
1029 .await;
1030
1031 assert!(result.is_err());
1032 assert!(
1033 result
1034 .unwrap_err()
1035 .to_string()
1036 .contains("All submit requests failed")
1037 );
1038
1039 let metrics = broadcaster.get_metrics_async().await;
1040 assert_eq!(metrics.failed_submits, 1);
1041 assert_eq!(metrics.successful_submits, 0);
1042 }
1043
1044 #[tokio::test]
1045 async fn test_broadcast_submit_no_healthy_clients() {
1046 let transport =
1047 create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1048 transport.healthy.store(false, Ordering::Relaxed);
1049
1050 let config = SubmitBroadcasterConfig::default();
1051 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1052
1053 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1054 let result = broadcaster
1055 .broadcast_submit(
1056 instrument_id,
1057 ClientOrderId::from("O-789"),
1058 OrderSide::Buy,
1059 OrderType::Limit,
1060 Quantity::new(100.0, 0),
1061 TimeInForce::Gtc,
1062 Some(Price::new(50000.0, 2)),
1063 None,
1064 None,
1065 None,
1066 false,
1067 false,
1068 None,
1069 None,
1070 None,
1071 )
1072 .await;
1073
1074 assert!(result.is_err());
1075 assert!(
1076 result
1077 .unwrap_err()
1078 .to_string()
1079 .contains("No healthy transport clients available")
1080 );
1081
1082 let metrics = broadcaster.get_metrics_async().await;
1083 assert_eq!(metrics.failed_submits, 1);
1084 }
1085
1086 #[tokio::test]
1087 async fn test_default_config() {
1088 let config = SubmitBroadcasterConfig {
1089 api_key: Some("test_key".to_string()),
1090 api_secret: Some("test_secret".to_string()),
1091 base_url: Some("https://test.example.com".to_string()),
1092 ..Default::default()
1093 };
1094
1095 let broadcaster = SubmitBroadcaster::new(config);
1096 assert!(broadcaster.is_ok());
1097
1098 let broadcaster = broadcaster.unwrap();
1099 let metrics = broadcaster.get_metrics_async().await;
1100
1101 assert_eq!(metrics.total_clients, 3);
1103 }
1104
1105 #[tokio::test]
1106 async fn test_broadcaster_lifecycle() {
1107 let config = SubmitBroadcasterConfig {
1108 pool_size: 2,
1109 api_key: Some("test_key".to_string()),
1110 api_secret: Some("test_secret".to_string()),
1111 base_url: Some("https://test.example.com".to_string()),
1112 testnet: false,
1113 timeout_secs: Some(5),
1114 max_retries: None,
1115 retry_delay_ms: None,
1116 retry_delay_max_ms: None,
1117 recv_window_ms: None,
1118 max_requests_per_second: None,
1119 max_requests_per_minute: None,
1120 health_check_interval_secs: 60,
1121 health_check_timeout_secs: 1,
1122 expected_reject_patterns: vec![],
1123 proxy_urls: vec![],
1124 };
1125
1126 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1127
1128 assert!(!broadcaster.running.load(Ordering::Relaxed));
1130
1131 let start_result = broadcaster.start().await;
1133 assert!(start_result.is_ok());
1134 assert!(broadcaster.running.load(Ordering::Relaxed));
1135
1136 let start_again = broadcaster.start().await;
1138 assert!(start_again.is_ok());
1139
1140 broadcaster.stop().await;
1142 assert!(!broadcaster.running.load(Ordering::Relaxed));
1143
1144 broadcaster.stop().await;
1146 assert!(!broadcaster.running.load(Ordering::Relaxed));
1147 }
1148
1149 #[tokio::test]
1150 async fn test_broadcast_submit_metrics_increment() {
1151 let report = create_test_report("ORDER-1");
1152 let report_clone = report.clone();
1153
1154 let transports = vec![create_stub_transport("client-0", move || {
1155 let report = report_clone.clone();
1156 async move { Ok(report) }
1157 })];
1158
1159 let config = SubmitBroadcasterConfig::default();
1160 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1161
1162 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1163 let _ = broadcaster
1164 .broadcast_submit(
1165 instrument_id,
1166 ClientOrderId::from("O-123"),
1167 OrderSide::Buy,
1168 OrderType::Limit,
1169 Quantity::new(100.0, 0),
1170 TimeInForce::Gtc,
1171 Some(Price::new(50000.0, 2)),
1172 None,
1173 None,
1174 None,
1175 false,
1176 false,
1177 None,
1178 None,
1179 None,
1180 )
1181 .await;
1182
1183 let metrics = broadcaster.get_metrics_async().await;
1184 assert_eq!(metrics.total_submits, 1);
1185 assert_eq!(metrics.successful_submits, 1);
1186 assert_eq!(metrics.failed_submits, 0);
1187 }
1188
1189 #[tokio::test]
1190 async fn test_broadcaster_creation_with_pool() {
1191 let config = SubmitBroadcasterConfig {
1192 pool_size: 4,
1193 api_key: Some("test_key".to_string()),
1194 api_secret: Some("test_secret".to_string()),
1195 base_url: Some("https://test.example.com".to_string()),
1196 ..Default::default()
1197 };
1198
1199 let broadcaster = SubmitBroadcaster::new(config);
1200 assert!(broadcaster.is_ok());
1201
1202 let broadcaster = broadcaster.unwrap();
1203 let metrics = broadcaster.get_metrics_async().await;
1204 assert_eq!(metrics.total_clients, 4);
1205 }
1206
1207 #[tokio::test]
1208 async fn test_client_stats_collection() {
1209 let transports = vec![
1212 create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
1213 create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1214 ];
1215
1216 let config = SubmitBroadcasterConfig::default();
1217 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1218
1219 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1220 let _ = broadcaster
1221 .broadcast_submit(
1222 instrument_id,
1223 ClientOrderId::from("O-123"),
1224 OrderSide::Buy,
1225 OrderType::Limit,
1226 Quantity::new(100.0, 0),
1227 TimeInForce::Gtc,
1228 Some(Price::new(50000.0, 2)),
1229 None,
1230 None,
1231 None,
1232 false,
1233 false,
1234 None,
1235 None,
1236 None,
1237 )
1238 .await;
1239
1240 let stats = broadcaster.get_client_stats_async().await;
1241 assert_eq!(stats.len(), 2);
1242
1243 let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1244 assert_eq!(client0.submit_count, 1);
1245 assert_eq!(client0.error_count, 1);
1246
1247 let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1248 assert_eq!(client1.submit_count, 1);
1249 assert_eq!(client1.error_count, 1);
1250 }
1251
1252 #[tokio::test]
1253 async fn test_testnet_config_sets_base_url() {
1254 let config = SubmitBroadcasterConfig {
1255 pool_size: 1,
1256 api_key: Some("test_key".to_string()),
1257 api_secret: Some("test_secret".to_string()),
1258 testnet: true,
1259 base_url: None,
1260 ..Default::default()
1261 };
1262
1263 let broadcaster = SubmitBroadcaster::new(config);
1264 assert!(broadcaster.is_ok());
1265 }
1266
1267 #[tokio::test]
1268 async fn test_clone_for_async() {
1269 let config = SubmitBroadcasterConfig {
1270 pool_size: 1,
1271 api_key: Some("test_key".to_string()),
1272 api_secret: Some("test_secret".to_string()),
1273 base_url: Some("https://test.example.com".to_string()),
1274 ..Default::default()
1275 };
1276
1277 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1278 let cloned = broadcaster.clone_for_async();
1279
1280 broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1282 assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1283 }
1284
1285 #[tokio::test]
1286 async fn test_pattern_matching() {
1287 let config = SubmitBroadcasterConfig {
1288 expected_reject_patterns: vec![
1289 "Duplicate clOrdID".to_string(),
1290 "Order already exists".to_string(),
1291 ],
1292 ..Default::default()
1293 };
1294
1295 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1296
1297 assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1298 assert!(broadcaster.is_expected_reject("Order already exists in system"));
1299 assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1300 assert!(!broadcaster.is_expected_reject("Internal server error"));
1301 }
1302
1303 #[tokio::test]
1304 async fn test_submit_metrics_with_mixed_responses() {
1305 let report = create_test_report("ORDER-1");
1306 let report_clone = report.clone();
1307
1308 let transports = vec![
1309 create_stub_transport("client-0", move || {
1310 let report = report_clone.clone();
1311 async move { Ok(report) }
1312 }),
1313 create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1314 ];
1315
1316 let config = SubmitBroadcasterConfig::default();
1317 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1318
1319 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1320 let result = broadcaster
1321 .broadcast_submit(
1322 instrument_id,
1323 ClientOrderId::from("O-123"),
1324 OrderSide::Buy,
1325 OrderType::Limit,
1326 Quantity::new(100.0, 0),
1327 TimeInForce::Gtc,
1328 Some(Price::new(50000.0, 2)),
1329 None,
1330 None,
1331 None,
1332 false,
1333 false,
1334 None,
1335 None,
1336 None,
1337 )
1338 .await;
1339
1340 assert!(result.is_ok());
1341
1342 let metrics = broadcaster.get_metrics_async().await;
1343 assert_eq!(metrics.total_submits, 1);
1344 assert_eq!(metrics.successful_submits, 1);
1345 assert_eq!(metrics.failed_submits, 0);
1346 }
1347
1348 #[tokio::test]
1349 async fn test_metrics_initialization_and_health() {
1350 let config = SubmitBroadcasterConfig {
1351 pool_size: 2,
1352 api_key: Some("test_key".to_string()),
1353 api_secret: Some("test_secret".to_string()),
1354 base_url: Some("https://test.example.com".to_string()),
1355 ..Default::default()
1356 };
1357
1358 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1359 let metrics = broadcaster.get_metrics_async().await;
1360
1361 assert_eq!(metrics.total_submits, 0);
1362 assert_eq!(metrics.successful_submits, 0);
1363 assert_eq!(metrics.failed_submits, 0);
1364 assert_eq!(metrics.expected_rejects, 0);
1365 assert_eq!(metrics.total_clients, 2);
1366 assert_eq!(metrics.healthy_clients, 2);
1367 }
1368
1369 #[tokio::test]
1370 async fn test_health_check_task_lifecycle() {
1371 let config = SubmitBroadcasterConfig {
1372 pool_size: 2,
1373 api_key: Some("test_key".to_string()),
1374 api_secret: Some("test_secret".to_string()),
1375 base_url: Some("https://test.example.com".to_string()),
1376 health_check_interval_secs: 1,
1377 ..Default::default()
1378 };
1379
1380 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1381
1382 broadcaster.start().await.unwrap();
1384 assert!(broadcaster.running.load(Ordering::Relaxed));
1385 assert!(
1386 broadcaster
1387 .health_check_task
1388 .read()
1389 .await
1390 .as_ref()
1391 .is_some()
1392 );
1393
1394 broadcaster.stop().await;
1396 assert!(!broadcaster.running.load(Ordering::Relaxed));
1397 }
1398
1399 #[tokio::test]
1400 async fn test_expected_reject_pattern_comprehensive() {
1401 let transports = vec![
1402 create_stub_transport("client-0", || async {
1403 anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1404 }),
1405 create_stub_transport("client-1", || async {
1406 tokio::time::sleep(Duration::from_secs(10)).await;
1407 anyhow::bail!("Should be aborted")
1408 }),
1409 ];
1410
1411 let config = SubmitBroadcasterConfig::default();
1412 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1413
1414 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1415 let result = broadcaster
1416 .broadcast_submit(
1417 instrument_id,
1418 ClientOrderId::from("O-123"),
1419 OrderSide::Buy,
1420 OrderType::Limit,
1421 Quantity::new(100.0, 0),
1422 TimeInForce::Gtc,
1423 Some(Price::new(50000.0, 2)),
1424 None,
1425 None,
1426 None,
1427 false,
1428 false,
1429 None,
1430 None,
1431 None,
1432 )
1433 .await;
1434
1435 assert!(result.is_err());
1437
1438 let metrics = broadcaster.get_metrics_async().await;
1439 assert_eq!(metrics.expected_rejects, 1);
1440 assert_eq!(metrics.failed_submits, 1);
1441 assert_eq!(metrics.successful_submits, 0);
1442 }
1443
1444 #[tokio::test]
1445 async fn test_client_order_id_suffix_for_multiple_clients() {
1446 use std::sync::{Arc, Mutex};
1447
1448 #[derive(Clone)]
1449 struct CaptureExecutor {
1450 captured_ids: Arc<Mutex<Vec<String>>>,
1451 barrier: Arc<tokio::sync::Barrier>,
1452 report: OrderStatusReport,
1453 }
1454
1455 impl SubmitExecutor for CaptureExecutor {
1456 fn health_check(
1457 &self,
1458 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1459 Box::pin(async { Ok(()) })
1460 }
1461
1462 #[allow(clippy::too_many_arguments)]
1463 fn submit_order(
1464 &self,
1465 _instrument_id: InstrumentId,
1466 client_order_id: ClientOrderId,
1467 _order_side: OrderSide,
1468 _order_type: OrderType,
1469 _quantity: Quantity,
1470 _time_in_force: TimeInForce,
1471 _price: Option<Price>,
1472 _trigger_price: Option<Price>,
1473 _trigger_type: Option<TriggerType>,
1474 _display_qty: Option<Quantity>,
1475 _post_only: bool,
1476 _reduce_only: bool,
1477 _order_list_id: Option<OrderListId>,
1478 _contingency_type: Option<ContingencyType>,
1479 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1480 {
1481 self.captured_ids
1483 .lock()
1484 .unwrap()
1485 .push(client_order_id.as_str().to_string());
1486 let report = self.report.clone();
1487 let barrier = Arc::clone(&self.barrier);
1488 Box::pin(async move {
1491 barrier.wait().await;
1492 Ok(report)
1493 })
1494 }
1495
1496 fn add_instrument(&self, _instrument: InstrumentAny) {}
1497 }
1498
1499 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1500 let barrier = Arc::new(tokio::sync::Barrier::new(3));
1501 let report = create_test_report("ORDER-1");
1502
1503 let transports = vec![
1504 TransportClient::new(
1505 CaptureExecutor {
1506 captured_ids: Arc::clone(&captured_ids),
1507 barrier: Arc::clone(&barrier),
1508 report: report.clone(),
1509 },
1510 "client-0".to_string(),
1511 ),
1512 TransportClient::new(
1513 CaptureExecutor {
1514 captured_ids: Arc::clone(&captured_ids),
1515 barrier: Arc::clone(&barrier),
1516 report: report.clone(),
1517 },
1518 "client-1".to_string(),
1519 ),
1520 TransportClient::new(
1521 CaptureExecutor {
1522 captured_ids: Arc::clone(&captured_ids),
1523 barrier: Arc::clone(&barrier),
1524 report: report.clone(),
1525 },
1526 "client-2".to_string(),
1527 ),
1528 ];
1529
1530 let config = SubmitBroadcasterConfig::default();
1531 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1532
1533 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1534 let result = broadcaster
1535 .broadcast_submit(
1536 instrument_id,
1537 ClientOrderId::from("O-123"),
1538 OrderSide::Buy,
1539 OrderType::Limit,
1540 Quantity::new(100.0, 0),
1541 TimeInForce::Gtc,
1542 Some(Price::new(50000.0, 2)),
1543 None,
1544 None,
1545 None,
1546 false,
1547 false,
1548 None,
1549 None,
1550 None,
1551 )
1552 .await;
1553
1554 assert!(result.is_ok());
1555
1556 let ids = captured_ids.lock().unwrap();
1558 assert_eq!(ids.len(), 3);
1559 assert!(ids.contains(&"O-123".to_string())); assert!(ids.contains(&"O-123-1".to_string())); assert!(ids.contains(&"O-123-2".to_string())); }
1563
1564 #[tokio::test]
1565 async fn test_client_order_id_suffix_with_partial_failure() {
1566 use std::sync::{Arc, Mutex};
1567
1568 #[derive(Clone)]
1569 struct CaptureAndFailExecutor {
1570 captured_ids: Arc<Mutex<Vec<String>>>,
1571 barrier: Arc<tokio::sync::Barrier>,
1572 should_succeed: bool,
1573 }
1574
1575 impl SubmitExecutor for CaptureAndFailExecutor {
1576 fn health_check(
1577 &self,
1578 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1579 Box::pin(async { Ok(()) })
1580 }
1581
1582 #[allow(clippy::too_many_arguments)]
1583 fn submit_order(
1584 &self,
1585 _instrument_id: InstrumentId,
1586 client_order_id: ClientOrderId,
1587 _order_side: OrderSide,
1588 _order_type: OrderType,
1589 _quantity: Quantity,
1590 _time_in_force: TimeInForce,
1591 _price: Option<Price>,
1592 _trigger_price: Option<Price>,
1593 _trigger_type: Option<TriggerType>,
1594 _display_qty: Option<Quantity>,
1595 _post_only: bool,
1596 _reduce_only: bool,
1597 _order_list_id: Option<OrderListId>,
1598 _contingency_type: Option<ContingencyType>,
1599 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1600 {
1601 self.captured_ids
1603 .lock()
1604 .unwrap()
1605 .push(client_order_id.as_str().to_string());
1606 let barrier = Arc::clone(&self.barrier);
1607 let should_succeed = self.should_succeed;
1608 Box::pin(async move {
1611 barrier.wait().await;
1612 if should_succeed {
1613 Ok(create_test_report("ORDER-1"))
1614 } else {
1615 anyhow::bail!("Network error")
1616 }
1617 })
1618 }
1619
1620 fn add_instrument(&self, _instrument: InstrumentAny) {}
1621 }
1622
1623 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1624 let barrier = Arc::new(tokio::sync::Barrier::new(2));
1625
1626 let transports = vec![
1627 TransportClient::new(
1628 CaptureAndFailExecutor {
1629 captured_ids: Arc::clone(&captured_ids),
1630 barrier: Arc::clone(&barrier),
1631 should_succeed: false,
1632 },
1633 "client-0".to_string(),
1634 ),
1635 TransportClient::new(
1636 CaptureAndFailExecutor {
1637 captured_ids: Arc::clone(&captured_ids),
1638 barrier: Arc::clone(&barrier),
1639 should_succeed: true,
1640 },
1641 "client-1".to_string(),
1642 ),
1643 ];
1644
1645 let config = SubmitBroadcasterConfig::default();
1646 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1647
1648 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1649 let result = broadcaster
1650 .broadcast_submit(
1651 instrument_id,
1652 ClientOrderId::from("O-456"),
1653 OrderSide::Sell,
1654 OrderType::Market,
1655 Quantity::new(50.0, 0),
1656 TimeInForce::Ioc,
1657 None,
1658 None,
1659 None,
1660 None,
1661 false,
1662 false,
1663 None,
1664 None,
1665 None,
1666 )
1667 .await;
1668
1669 assert!(result.is_ok());
1670
1671 let ids = captured_ids.lock().unwrap();
1673 assert_eq!(ids.len(), 2);
1674 assert!(ids.contains(&"O-456".to_string())); assert!(ids.contains(&"O-456-1".to_string())); }
1677
1678 #[tokio::test]
1679 async fn test_proxy_urls_populated_from_config() {
1680 let config = SubmitBroadcasterConfig {
1681 pool_size: 3,
1682 api_key: Some("test_key".to_string()),
1683 api_secret: Some("test_secret".to_string()),
1684 proxy_urls: vec![
1685 Some("http://proxy1:8080".to_string()),
1686 Some("http://proxy2:8080".to_string()),
1687 Some("http://proxy3:8080".to_string()),
1688 ],
1689 ..Default::default()
1690 };
1691
1692 assert_eq!(config.proxy_urls.len(), 3);
1693 assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1694 assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1695 assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1696 }
1697}