1use std::{
37 fmt::Debug,
38 future::Future,
39 pin::Pin,
40 sync::{
41 Arc,
42 atomic::{AtomicBool, AtomicU64, Ordering},
43 },
44 time::Duration,
45};
46
47use futures_util::future;
48use nautilus_common::live::get_runtime;
49use nautilus_model::{
50 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
51 identifiers::{ClientOrderId, InstrumentId, OrderListId},
52 instruments::InstrumentAny,
53 reports::OrderStatusReport,
54 types::{Price, Quantity},
55};
56use tokio::{sync::RwLock, task::JoinHandle, time::interval};
57
58use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
59
60trait SubmitExecutor: Send + Sync {
82 fn add_instrument(&self, instrument: InstrumentAny);
84
85 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
87
88 #[allow(clippy::too_many_arguments)]
90 fn submit_order(
91 &self,
92 instrument_id: InstrumentId,
93 client_order_id: ClientOrderId,
94 order_side: OrderSide,
95 order_type: OrderType,
96 quantity: Quantity,
97 time_in_force: TimeInForce,
98 price: Option<Price>,
99 trigger_price: Option<Price>,
100 trigger_type: Option<TriggerType>,
101 display_qty: Option<Quantity>,
102 post_only: bool,
103 reduce_only: bool,
104 order_list_id: Option<OrderListId>,
105 contingency_type: Option<ContingencyType>,
106 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
107}
108
109impl SubmitExecutor for BitmexHttpClient {
110 fn add_instrument(&self, instrument: InstrumentAny) {
111 Self::cache_instrument(self, instrument);
112 }
113
114 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
115 Box::pin(async move {
116 Self::get_server_time(self)
117 .await
118 .map(|_| ())
119 .map_err(|e| anyhow::anyhow!("{e}"))
120 })
121 }
122
123 #[allow(clippy::too_many_arguments)]
124 fn submit_order(
125 &self,
126 instrument_id: InstrumentId,
127 client_order_id: ClientOrderId,
128 order_side: OrderSide,
129 order_type: OrderType,
130 quantity: Quantity,
131 time_in_force: TimeInForce,
132 price: Option<Price>,
133 trigger_price: Option<Price>,
134 trigger_type: Option<TriggerType>,
135 display_qty: Option<Quantity>,
136 post_only: bool,
137 reduce_only: bool,
138 order_list_id: Option<OrderListId>,
139 contingency_type: Option<ContingencyType>,
140 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
141 Box::pin(async move {
142 Self::submit_order(
143 self,
144 instrument_id,
145 client_order_id,
146 order_side,
147 order_type,
148 quantity,
149 time_in_force,
150 price,
151 trigger_price,
152 trigger_type,
153 display_qty,
154 post_only,
155 reduce_only,
156 order_list_id,
157 contingency_type,
158 )
159 .await
160 })
161 }
162}
163
164#[derive(Debug, Clone)]
166pub struct SubmitBroadcasterConfig {
167 pub pool_size: usize,
169 pub api_key: Option<String>,
171 pub api_secret: Option<String>,
173 pub base_url: Option<String>,
175 pub testnet: bool,
177 pub timeout_secs: Option<u64>,
179 pub max_retries: Option<u32>,
181 pub retry_delay_ms: Option<u64>,
183 pub retry_delay_max_ms: Option<u64>,
185 pub recv_window_ms: Option<u64>,
187 pub max_requests_per_second: Option<u32>,
189 pub max_requests_per_minute: Option<u32>,
191 pub health_check_interval_secs: u64,
193 pub health_check_timeout_secs: u64,
195 pub expected_reject_patterns: Vec<String>,
197 pub proxy_urls: Vec<Option<String>>,
203}
204
205impl Default for SubmitBroadcasterConfig {
206 fn default() -> Self {
207 Self {
208 pool_size: 3,
209 api_key: None,
210 api_secret: None,
211 base_url: None,
212 testnet: false,
213 timeout_secs: Some(60),
214 max_retries: None,
215 retry_delay_ms: Some(1_000),
216 retry_delay_max_ms: Some(5_000),
217 recv_window_ms: Some(10_000),
218 max_requests_per_second: Some(10),
219 max_requests_per_minute: Some(120),
220 health_check_interval_secs: 30,
221 health_check_timeout_secs: 5,
222 expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
223 proxy_urls: vec![],
224 }
225 }
226}
227
228#[derive(Clone)]
230struct TransportClient {
231 executor: Arc<dyn SubmitExecutor>,
236 client_id: String,
237 healthy: Arc<AtomicBool>,
238 submit_count: Arc<AtomicU64>,
239 error_count: Arc<AtomicU64>,
240}
241
242impl Debug for TransportClient {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 f.debug_struct("TransportClient")
245 .field("client_id", &self.client_id)
246 .field("healthy", &self.healthy)
247 .field("submit_count", &self.submit_count)
248 .field("error_count", &self.error_count)
249 .finish()
250 }
251}
252
253impl TransportClient {
254 fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
255 Self {
256 executor: Arc::new(executor),
257 client_id,
258 healthy: Arc::new(AtomicBool::new(true)),
259 submit_count: Arc::new(AtomicU64::new(0)),
260 error_count: Arc::new(AtomicU64::new(0)),
261 }
262 }
263
264 fn is_healthy(&self) -> bool {
265 self.healthy.load(Ordering::Relaxed)
266 }
267
268 fn mark_healthy(&self) {
269 self.healthy.store(true, Ordering::Relaxed);
270 }
271
272 fn mark_unhealthy(&self) {
273 self.healthy.store(false, Ordering::Relaxed);
274 }
275
276 fn get_submit_count(&self) -> u64 {
277 self.submit_count.load(Ordering::Relaxed)
278 }
279
280 fn get_error_count(&self) -> u64 {
281 self.error_count.load(Ordering::Relaxed)
282 }
283
284 async fn health_check(&self, timeout_secs: u64) -> bool {
285 match tokio::time::timeout(
286 Duration::from_secs(timeout_secs),
287 self.executor.health_check(),
288 )
289 .await
290 {
291 Ok(Ok(_)) => {
292 self.mark_healthy();
293 true
294 }
295 Ok(Err(e)) => {
296 tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
297 self.mark_unhealthy();
298 false
299 }
300 Err(_) => {
301 tracing::warn!("Health check timeout for client {}", self.client_id);
302 self.mark_unhealthy();
303 false
304 }
305 }
306 }
307
308 #[allow(clippy::too_many_arguments)]
309 async fn submit_order(
310 &self,
311 instrument_id: InstrumentId,
312 client_order_id: ClientOrderId,
313 order_side: OrderSide,
314 order_type: OrderType,
315 quantity: Quantity,
316 time_in_force: TimeInForce,
317 price: Option<Price>,
318 trigger_price: Option<Price>,
319 trigger_type: Option<TriggerType>,
320 display_qty: Option<Quantity>,
321 post_only: bool,
322 reduce_only: bool,
323 order_list_id: Option<OrderListId>,
324 contingency_type: Option<ContingencyType>,
325 ) -> anyhow::Result<OrderStatusReport> {
326 self.submit_count.fetch_add(1, Ordering::Relaxed);
327
328 match self
329 .executor
330 .submit_order(
331 instrument_id,
332 client_order_id,
333 order_side,
334 order_type,
335 quantity,
336 time_in_force,
337 price,
338 trigger_price,
339 trigger_type,
340 display_qty,
341 post_only,
342 reduce_only,
343 order_list_id,
344 contingency_type,
345 )
346 .await
347 {
348 Ok(report) => {
349 self.mark_healthy();
350 Ok(report)
351 }
352 Err(e) => {
353 self.error_count.fetch_add(1, Ordering::Relaxed);
354 Err(e)
355 }
356 }
357 }
358}
359
360#[cfg_attr(feature = "python", pyo3::pyclass)]
366#[derive(Debug)]
367pub struct SubmitBroadcaster {
368 config: SubmitBroadcasterConfig,
369 transports: Arc<Vec<TransportClient>>,
370 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
371 running: Arc<AtomicBool>,
372 total_submits: Arc<AtomicU64>,
373 successful_submits: Arc<AtomicU64>,
374 failed_submits: Arc<AtomicU64>,
375 expected_rejects: Arc<AtomicU64>,
376}
377
378impl SubmitBroadcaster {
379 pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
385 let mut transports = Vec::with_capacity(config.pool_size);
386
387 let base_url = if config.testnet && config.base_url.is_none() {
389 Some(BITMEX_HTTP_TESTNET_URL.to_string())
390 } else {
391 config.base_url.clone()
392 };
393
394 for i in 0..config.pool_size {
395 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
397
398 let client = BitmexHttpClient::with_credentials(
399 config.api_key.clone(),
400 config.api_secret.clone(),
401 base_url.clone(),
402 config.timeout_secs,
403 config.max_retries,
404 config.retry_delay_ms,
405 config.retry_delay_max_ms,
406 config.recv_window_ms,
407 config.max_requests_per_second,
408 config.max_requests_per_minute,
409 proxy_url,
410 )
411 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
412
413 transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
414 }
415
416 Ok(Self {
417 config,
418 transports: Arc::new(transports),
419 health_check_task: Arc::new(RwLock::new(None)),
420 running: Arc::new(AtomicBool::new(false)),
421 total_submits: Arc::new(AtomicU64::new(0)),
422 successful_submits: Arc::new(AtomicU64::new(0)),
423 failed_submits: Arc::new(AtomicU64::new(0)),
424 expected_rejects: Arc::new(AtomicU64::new(0)),
425 })
426 }
427
428 pub async fn start(&self) -> anyhow::Result<()> {
434 if self.running.load(Ordering::Relaxed) {
435 return Ok(());
436 }
437
438 self.running.store(true, Ordering::Relaxed);
439
440 self.run_health_checks().await;
442
443 let transports = Arc::clone(&self.transports);
445 let running = Arc::clone(&self.running);
446 let interval_secs = self.config.health_check_interval_secs;
447 let timeout_secs = self.config.health_check_timeout_secs;
448
449 let task = get_runtime().spawn(async move {
450 let mut ticker = interval(Duration::from_secs(interval_secs));
451 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
452
453 loop {
454 ticker.tick().await;
455
456 if !running.load(Ordering::Relaxed) {
457 break;
458 }
459
460 let tasks: Vec<_> = transports
461 .iter()
462 .map(|t| t.health_check(timeout_secs))
463 .collect();
464
465 let results = future::join_all(tasks).await;
466 let healthy_count = results.iter().filter(|&&r| r).count();
467
468 tracing::debug!(
469 "Health check complete: {healthy_count}/{} clients healthy",
470 results.len()
471 );
472 }
473 });
474
475 *self.health_check_task.write().await = Some(task);
476
477 tracing::info!(
478 "SubmitBroadcaster started with {} clients",
479 self.transports.len()
480 );
481
482 Ok(())
483 }
484
485 pub async fn stop(&self) {
487 if !self.running.load(Ordering::Relaxed) {
488 return;
489 }
490
491 self.running.store(false, Ordering::Relaxed);
492
493 if let Some(task) = self.health_check_task.write().await.take() {
494 task.abort();
495 }
496
497 tracing::info!("SubmitBroadcaster stopped");
498 }
499
500 async fn run_health_checks(&self) {
501 let tasks: Vec<_> = self
502 .transports
503 .iter()
504 .map(|t| t.health_check(self.config.health_check_timeout_secs))
505 .collect();
506
507 let results = future::join_all(tasks).await;
508 let healthy_count = results.iter().filter(|&&r| r).count();
509
510 tracing::debug!(
511 "Health check complete: {healthy_count}/{} clients healthy",
512 results.len()
513 );
514 }
515
516 fn is_expected_reject(&self, error_message: &str) -> bool {
517 self.config
518 .expected_reject_patterns
519 .iter()
520 .any(|pattern| error_message.contains(pattern))
521 }
522
523 async fn process_submit_results<T>(
527 &self,
528 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
529 operation: &str,
530 params: String,
531 ) -> anyhow::Result<T>
532 where
533 T: Send + 'static,
534 {
535 let mut errors = Vec::new();
536
537 while !handles.is_empty() {
538 let current_handles = std::mem::take(&mut handles);
539 let (result, _idx, remaining) = future::select_all(current_handles).await;
540 handles = remaining.into_iter().collect();
541
542 match result {
543 Ok((client_id, Ok(result))) => {
544 for handle in &handles {
546 handle.abort();
547 }
548 self.successful_submits.fetch_add(1, Ordering::Relaxed);
549 tracing::debug!("{} broadcast succeeded [{client_id}] {params}", operation,);
550 return Ok(result);
551 }
552 Ok((client_id, Err(e))) => {
553 let error_msg = e.to_string();
554
555 if self.is_expected_reject(&error_msg) {
556 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
557 tracing::debug!(
558 "Expected {} rejection [{client_id}]: {error_msg} {params}",
559 operation.to_lowercase(),
560 );
561 errors.push(error_msg);
562 } else {
563 tracing::warn!(
564 "{} request failed [{client_id}]: {error_msg} {params}",
565 operation,
566 );
567 errors.push(error_msg);
568 }
569 }
570 Err(e) => {
571 tracing::warn!("{} task join error: {e:?}", operation);
572 errors.push(format!("Task panicked: {e:?}"));
573 }
574 }
575 }
576
577 self.failed_submits.fetch_add(1, Ordering::Relaxed);
579 tracing::error!(
580 "All {} requests failed: {errors:?} {params}",
581 operation.to_lowercase(),
582 );
583 Err(anyhow::anyhow!(
584 "All {} requests failed: {:?}",
585 operation.to_lowercase(),
586 errors
587 ))
588 }
589
590 #[allow(clippy::too_many_arguments)]
601 pub async fn broadcast_submit(
602 &self,
603 instrument_id: InstrumentId,
604 client_order_id: ClientOrderId,
605 order_side: OrderSide,
606 order_type: OrderType,
607 quantity: Quantity,
608 time_in_force: TimeInForce,
609 price: Option<Price>,
610 trigger_price: Option<Price>,
611 trigger_type: Option<TriggerType>,
612 display_qty: Option<Quantity>,
613 post_only: bool,
614 reduce_only: bool,
615 order_list_id: Option<OrderListId>,
616 contingency_type: Option<ContingencyType>,
617 submit_tries: Option<usize>,
618 ) -> anyhow::Result<OrderStatusReport> {
619 self.total_submits.fetch_add(1, Ordering::Relaxed);
620
621 let pool_size = self.config.pool_size;
622 let actual_tries = if let Some(t) = submit_tries {
623 if t > pool_size {
624 log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
626 }
627 std::cmp::min(t, pool_size)
628 } else {
629 pool_size
630 };
631
632 tracing::debug!(
633 "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
634 );
635
636 let healthy_transports: Vec<TransportClient> = self
637 .transports
638 .iter()
639 .filter(|t| t.is_healthy())
640 .take(actual_tries)
641 .cloned()
642 .collect();
643
644 if healthy_transports.is_empty() {
645 self.failed_submits.fetch_add(1, Ordering::Relaxed);
646 anyhow::bail!("No healthy transport clients available");
647 }
648
649 tracing::debug!(
650 "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
651 healthy_transports.len(),
652 );
653
654 let mut handles = Vec::new();
655 for (idx, transport) in healthy_transports.into_iter().enumerate() {
656 let modified_client_order_id = if idx == 0 {
658 client_order_id
659 } else {
660 ClientOrderId::new(format!("{}-{}", client_order_id.as_str(), idx))
661 };
662
663 let handle = get_runtime().spawn(async move {
664 let client_id = transport.client_id.clone();
665 let result = transport
666 .submit_order(
667 instrument_id,
668 modified_client_order_id,
669 order_side,
670 order_type,
671 quantity,
672 time_in_force,
673 price,
674 trigger_price,
675 trigger_type,
676 display_qty,
677 post_only,
678 reduce_only,
679 order_list_id,
680 contingency_type,
681 )
682 .await;
683 (client_id, result)
684 });
685 handles.push(handle);
686 }
687
688 self.process_submit_results(
689 handles,
690 "Submit",
691 format!("(client_order_id={client_order_id:?})"),
692 )
693 .await
694 }
695
696 pub fn get_metrics(&self) -> BroadcasterMetrics {
698 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
699 let total_clients = self.transports.len();
700
701 BroadcasterMetrics {
702 total_submits: self.total_submits.load(Ordering::Relaxed),
703 successful_submits: self.successful_submits.load(Ordering::Relaxed),
704 failed_submits: self.failed_submits.load(Ordering::Relaxed),
705 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
706 healthy_clients,
707 total_clients,
708 }
709 }
710
711 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
713 self.get_metrics()
714 }
715
716 pub fn get_client_stats(&self) -> Vec<ClientStats> {
718 self.transports
719 .iter()
720 .map(|t| ClientStats {
721 client_id: t.client_id.clone(),
722 healthy: t.is_healthy(),
723 submit_count: t.get_submit_count(),
724 error_count: t.get_error_count(),
725 })
726 .collect()
727 }
728
729 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
731 self.get_client_stats()
732 }
733
734 pub fn cache_instrument(&self, instrument: InstrumentAny) {
736 for transport in self.transports.iter() {
737 transport.executor.add_instrument(instrument.clone());
738 }
739 }
740
741 #[must_use]
742 pub fn clone_for_async(&self) -> Self {
743 Self {
744 config: self.config.clone(),
745 transports: Arc::clone(&self.transports),
746 health_check_task: Arc::clone(&self.health_check_task),
747 running: Arc::clone(&self.running),
748 total_submits: Arc::clone(&self.total_submits),
749 successful_submits: Arc::clone(&self.successful_submits),
750 failed_submits: Arc::clone(&self.failed_submits),
751 expected_rejects: Arc::clone(&self.expected_rejects),
752 }
753 }
754
755 #[cfg(test)]
756 fn new_with_transports(
757 config: SubmitBroadcasterConfig,
758 transports: Vec<TransportClient>,
759 ) -> Self {
760 Self {
761 config,
762 transports: Arc::new(transports),
763 health_check_task: Arc::new(RwLock::new(None)),
764 running: Arc::new(AtomicBool::new(false)),
765 total_submits: Arc::new(AtomicU64::new(0)),
766 successful_submits: Arc::new(AtomicU64::new(0)),
767 failed_submits: Arc::new(AtomicU64::new(0)),
768 expected_rejects: Arc::new(AtomicU64::new(0)),
769 }
770 }
771}
772
773#[derive(Debug, Clone)]
775pub struct BroadcasterMetrics {
776 pub total_submits: u64,
777 pub successful_submits: u64,
778 pub failed_submits: u64,
779 pub expected_rejects: u64,
780 pub healthy_clients: usize,
781 pub total_clients: usize,
782}
783
784#[derive(Debug, Clone)]
786pub struct ClientStats {
787 pub client_id: String,
788 pub healthy: bool,
789 pub submit_count: u64,
790 pub error_count: u64,
791}
792
793#[cfg(test)]
794mod tests {
795 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
796
797 use nautilus_core::UUID4;
798 use nautilus_model::{
799 enums::{
800 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
801 },
802 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
803 reports::OrderStatusReport,
804 types::{Price, Quantity},
805 };
806
807 use super::*;
808
809 #[derive(Clone)]
811 #[allow(clippy::type_complexity)]
812 struct MockExecutor {
813 handler: Arc<
814 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
815 + Send
816 + Sync,
817 >,
818 }
819
820 impl MockExecutor {
821 fn new<F, Fut>(handler: F) -> Self
822 where
823 F: Fn() -> Fut + Send + Sync + 'static,
824 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
825 {
826 Self {
827 handler: Arc::new(move || Box::pin(handler())),
828 }
829 }
830 }
831
832 impl SubmitExecutor for MockExecutor {
833 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
834 Box::pin(async { Ok(()) })
835 }
836
837 #[allow(clippy::too_many_arguments)]
838 fn submit_order(
839 &self,
840 _instrument_id: InstrumentId,
841 _client_order_id: ClientOrderId,
842 _order_side: OrderSide,
843 _order_type: OrderType,
844 _quantity: Quantity,
845 _time_in_force: TimeInForce,
846 _price: Option<Price>,
847 _trigger_price: Option<Price>,
848 _trigger_type: Option<TriggerType>,
849 _display_qty: Option<Quantity>,
850 _post_only: bool,
851 _reduce_only: bool,
852 _order_list_id: Option<OrderListId>,
853 _contingency_type: Option<ContingencyType>,
854 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
855 (self.handler)()
856 }
857
858 fn add_instrument(&self, _instrument: InstrumentAny) {
859 }
861 }
862
863 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
864 OrderStatusReport {
865 account_id: AccountId::from("BITMEX-001"),
866 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
867 venue_order_id: VenueOrderId::from(venue_order_id),
868 order_side: OrderSide::Buy,
869 order_type: OrderType::Limit,
870 time_in_force: TimeInForce::Gtc,
871 order_status: OrderStatus::Accepted,
872 price: Some(Price::new(50000.0, 2)),
873 quantity: Quantity::new(100.0, 0),
874 filled_qty: Quantity::new(0.0, 0),
875 report_id: UUID4::new(),
876 ts_accepted: 0.into(),
877 ts_last: 0.into(),
878 ts_init: 0.into(),
879 client_order_id: None,
880 avg_px: None,
881 trigger_price: None,
882 trigger_type: None,
883 contingency_type: ContingencyType::NoContingency,
884 expire_time: None,
885 order_list_id: None,
886 venue_position_id: None,
887 linked_order_ids: None,
888 parent_order_id: None,
889 display_qty: None,
890 limit_offset: None,
891 trailing_offset: None,
892 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
893 post_only: false,
894 reduce_only: false,
895 cancel_reason: None,
896 ts_triggered: None,
897 }
898 }
899
900 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
901 where
902 F: Fn() -> Fut + Send + Sync + 'static,
903 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
904 {
905 let executor = MockExecutor::new(handler);
906 TransportClient::new(executor, client_id.to_string())
907 }
908
909 #[tokio::test]
910 async fn test_broadcast_submit_immediate_success() {
911 let report = create_test_report("ORDER-1");
912 let report_clone = report.clone();
913
914 let transports = vec![
915 create_stub_transport("client-0", move || {
916 let report = report_clone.clone();
917 async move { Ok(report) }
918 }),
919 create_stub_transport("client-1", || async {
920 tokio::time::sleep(Duration::from_secs(10)).await;
921 anyhow::bail!("Should be aborted")
922 }),
923 ];
924
925 let config = SubmitBroadcasterConfig::default();
926 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
927
928 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
929 let result = broadcaster
930 .broadcast_submit(
931 instrument_id,
932 ClientOrderId::from("O-123"),
933 OrderSide::Buy,
934 OrderType::Limit,
935 Quantity::new(100.0, 0),
936 TimeInForce::Gtc,
937 Some(Price::new(50000.0, 2)),
938 None,
939 None,
940 None,
941 false,
942 false,
943 None,
944 None,
945 None,
946 )
947 .await;
948
949 assert!(result.is_ok());
950 let returned_report = result.unwrap();
951 assert_eq!(returned_report.venue_order_id, report.venue_order_id);
952
953 let metrics = broadcaster.get_metrics_async().await;
954 assert_eq!(metrics.successful_submits, 1);
955 assert_eq!(metrics.failed_submits, 0);
956 assert_eq!(metrics.total_submits, 1);
957 }
958
959 #[tokio::test]
960 async fn test_broadcast_submit_duplicate_clordid_expected() {
961 let transports = vec![
962 create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
963 create_stub_transport("client-1", || async {
964 tokio::time::sleep(Duration::from_secs(10)).await;
965 anyhow::bail!("Should be aborted")
966 }),
967 ];
968
969 let config = SubmitBroadcasterConfig::default();
970 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
971
972 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
973 let result = broadcaster
974 .broadcast_submit(
975 instrument_id,
976 ClientOrderId::from("O-123"),
977 OrderSide::Buy,
978 OrderType::Limit,
979 Quantity::new(100.0, 0),
980 TimeInForce::Gtc,
981 Some(Price::new(50000.0, 2)),
982 None,
983 None,
984 None,
985 false,
986 false,
987 None,
988 None,
989 None,
990 )
991 .await;
992
993 assert!(result.is_err());
994
995 let metrics = broadcaster.get_metrics_async().await;
996 assert_eq!(metrics.expected_rejects, 1);
997 assert_eq!(metrics.successful_submits, 0);
998 assert_eq!(metrics.failed_submits, 1);
999 }
1000
1001 #[tokio::test]
1002 async fn test_broadcast_submit_all_failures() {
1003 let transports = vec![
1004 create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1005 create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1006 ];
1007
1008 let config = SubmitBroadcasterConfig::default();
1009 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1010
1011 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1012 let result = broadcaster
1013 .broadcast_submit(
1014 instrument_id,
1015 ClientOrderId::from("O-456"),
1016 OrderSide::Sell,
1017 OrderType::Market,
1018 Quantity::new(50.0, 0),
1019 TimeInForce::Ioc,
1020 None,
1021 None,
1022 None,
1023 None,
1024 false,
1025 false,
1026 None,
1027 None,
1028 None,
1029 )
1030 .await;
1031
1032 assert!(result.is_err());
1033 assert!(
1034 result
1035 .unwrap_err()
1036 .to_string()
1037 .contains("All submit requests failed")
1038 );
1039
1040 let metrics = broadcaster.get_metrics_async().await;
1041 assert_eq!(metrics.failed_submits, 1);
1042 assert_eq!(metrics.successful_submits, 0);
1043 }
1044
1045 #[tokio::test]
1046 async fn test_broadcast_submit_no_healthy_clients() {
1047 let transport =
1048 create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1049 transport.healthy.store(false, Ordering::Relaxed);
1050
1051 let config = SubmitBroadcasterConfig::default();
1052 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1053
1054 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1055 let result = broadcaster
1056 .broadcast_submit(
1057 instrument_id,
1058 ClientOrderId::from("O-789"),
1059 OrderSide::Buy,
1060 OrderType::Limit,
1061 Quantity::new(100.0, 0),
1062 TimeInForce::Gtc,
1063 Some(Price::new(50000.0, 2)),
1064 None,
1065 None,
1066 None,
1067 false,
1068 false,
1069 None,
1070 None,
1071 None,
1072 )
1073 .await;
1074
1075 assert!(result.is_err());
1076 assert!(
1077 result
1078 .unwrap_err()
1079 .to_string()
1080 .contains("No healthy transport clients available")
1081 );
1082
1083 let metrics = broadcaster.get_metrics_async().await;
1084 assert_eq!(metrics.failed_submits, 1);
1085 }
1086
1087 #[tokio::test]
1088 async fn test_default_config() {
1089 let config = SubmitBroadcasterConfig {
1090 api_key: Some("test_key".to_string()),
1091 api_secret: Some("test_secret".to_string()),
1092 base_url: Some("https://test.example.com".to_string()),
1093 ..Default::default()
1094 };
1095
1096 let broadcaster = SubmitBroadcaster::new(config);
1097 assert!(broadcaster.is_ok());
1098
1099 let broadcaster = broadcaster.unwrap();
1100 let metrics = broadcaster.get_metrics_async().await;
1101
1102 assert_eq!(metrics.total_clients, 3);
1104 }
1105
1106 #[tokio::test]
1107 async fn test_broadcaster_lifecycle() {
1108 let config = SubmitBroadcasterConfig {
1109 pool_size: 2,
1110 api_key: Some("test_key".to_string()),
1111 api_secret: Some("test_secret".to_string()),
1112 base_url: Some("https://test.example.com".to_string()),
1113 testnet: false,
1114 timeout_secs: Some(5),
1115 max_retries: None,
1116 retry_delay_ms: None,
1117 retry_delay_max_ms: None,
1118 recv_window_ms: None,
1119 max_requests_per_second: None,
1120 max_requests_per_minute: None,
1121 health_check_interval_secs: 60,
1122 health_check_timeout_secs: 1,
1123 expected_reject_patterns: vec![],
1124 proxy_urls: vec![],
1125 };
1126
1127 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1128
1129 assert!(!broadcaster.running.load(Ordering::Relaxed));
1131
1132 let start_result = broadcaster.start().await;
1134 assert!(start_result.is_ok());
1135 assert!(broadcaster.running.load(Ordering::Relaxed));
1136
1137 let start_again = broadcaster.start().await;
1139 assert!(start_again.is_ok());
1140
1141 broadcaster.stop().await;
1143 assert!(!broadcaster.running.load(Ordering::Relaxed));
1144
1145 broadcaster.stop().await;
1147 assert!(!broadcaster.running.load(Ordering::Relaxed));
1148 }
1149
1150 #[tokio::test]
1151 async fn test_broadcast_submit_metrics_increment() {
1152 let report = create_test_report("ORDER-1");
1153 let report_clone = report.clone();
1154
1155 let transports = vec![create_stub_transport("client-0", move || {
1156 let report = report_clone.clone();
1157 async move { Ok(report) }
1158 })];
1159
1160 let config = SubmitBroadcasterConfig::default();
1161 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1162
1163 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1164 let _ = broadcaster
1165 .broadcast_submit(
1166 instrument_id,
1167 ClientOrderId::from("O-123"),
1168 OrderSide::Buy,
1169 OrderType::Limit,
1170 Quantity::new(100.0, 0),
1171 TimeInForce::Gtc,
1172 Some(Price::new(50000.0, 2)),
1173 None,
1174 None,
1175 None,
1176 false,
1177 false,
1178 None,
1179 None,
1180 None,
1181 )
1182 .await;
1183
1184 let metrics = broadcaster.get_metrics_async().await;
1185 assert_eq!(metrics.total_submits, 1);
1186 assert_eq!(metrics.successful_submits, 1);
1187 assert_eq!(metrics.failed_submits, 0);
1188 }
1189
1190 #[tokio::test]
1191 async fn test_broadcaster_creation_with_pool() {
1192 let config = SubmitBroadcasterConfig {
1193 pool_size: 4,
1194 api_key: Some("test_key".to_string()),
1195 api_secret: Some("test_secret".to_string()),
1196 base_url: Some("https://test.example.com".to_string()),
1197 ..Default::default()
1198 };
1199
1200 let broadcaster = SubmitBroadcaster::new(config);
1201 assert!(broadcaster.is_ok());
1202
1203 let broadcaster = broadcaster.unwrap();
1204 let metrics = broadcaster.get_metrics_async().await;
1205 assert_eq!(metrics.total_clients, 4);
1206 }
1207
1208 #[tokio::test]
1209 async fn test_client_stats_collection() {
1210 let transports = vec![
1213 create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
1214 create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1215 ];
1216
1217 let config = SubmitBroadcasterConfig::default();
1218 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1219
1220 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1221 let _ = broadcaster
1222 .broadcast_submit(
1223 instrument_id,
1224 ClientOrderId::from("O-123"),
1225 OrderSide::Buy,
1226 OrderType::Limit,
1227 Quantity::new(100.0, 0),
1228 TimeInForce::Gtc,
1229 Some(Price::new(50000.0, 2)),
1230 None,
1231 None,
1232 None,
1233 false,
1234 false,
1235 None,
1236 None,
1237 None,
1238 )
1239 .await;
1240
1241 let stats = broadcaster.get_client_stats_async().await;
1242 assert_eq!(stats.len(), 2);
1243
1244 let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1245 assert_eq!(client0.submit_count, 1);
1246 assert_eq!(client0.error_count, 1);
1247
1248 let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1249 assert_eq!(client1.submit_count, 1);
1250 assert_eq!(client1.error_count, 1);
1251 }
1252
1253 #[tokio::test]
1254 async fn test_testnet_config_sets_base_url() {
1255 let config = SubmitBroadcasterConfig {
1256 pool_size: 1,
1257 api_key: Some("test_key".to_string()),
1258 api_secret: Some("test_secret".to_string()),
1259 testnet: true,
1260 base_url: None,
1261 ..Default::default()
1262 };
1263
1264 let broadcaster = SubmitBroadcaster::new(config);
1265 assert!(broadcaster.is_ok());
1266 }
1267
1268 #[tokio::test]
1269 async fn test_clone_for_async() {
1270 let config = SubmitBroadcasterConfig {
1271 pool_size: 1,
1272 api_key: Some("test_key".to_string()),
1273 api_secret: Some("test_secret".to_string()),
1274 base_url: Some("https://test.example.com".to_string()),
1275 ..Default::default()
1276 };
1277
1278 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1279 let cloned = broadcaster.clone_for_async();
1280
1281 broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1283 assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1284 }
1285
1286 #[tokio::test]
1287 async fn test_pattern_matching() {
1288 let config = SubmitBroadcasterConfig {
1289 expected_reject_patterns: vec![
1290 "Duplicate clOrdID".to_string(),
1291 "Order already exists".to_string(),
1292 ],
1293 ..Default::default()
1294 };
1295
1296 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1297
1298 assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1299 assert!(broadcaster.is_expected_reject("Order already exists in system"));
1300 assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1301 assert!(!broadcaster.is_expected_reject("Internal server error"));
1302 }
1303
1304 #[tokio::test]
1305 async fn test_submit_metrics_with_mixed_responses() {
1306 let report = create_test_report("ORDER-1");
1307 let report_clone = report.clone();
1308
1309 let transports = vec![
1310 create_stub_transport("client-0", move || {
1311 let report = report_clone.clone();
1312 async move { Ok(report) }
1313 }),
1314 create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1315 ];
1316
1317 let config = SubmitBroadcasterConfig::default();
1318 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1319
1320 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1321 let result = broadcaster
1322 .broadcast_submit(
1323 instrument_id,
1324 ClientOrderId::from("O-123"),
1325 OrderSide::Buy,
1326 OrderType::Limit,
1327 Quantity::new(100.0, 0),
1328 TimeInForce::Gtc,
1329 Some(Price::new(50000.0, 2)),
1330 None,
1331 None,
1332 None,
1333 false,
1334 false,
1335 None,
1336 None,
1337 None,
1338 )
1339 .await;
1340
1341 assert!(result.is_ok());
1342
1343 let metrics = broadcaster.get_metrics_async().await;
1344 assert_eq!(metrics.total_submits, 1);
1345 assert_eq!(metrics.successful_submits, 1);
1346 assert_eq!(metrics.failed_submits, 0);
1347 }
1348
1349 #[tokio::test]
1350 async fn test_metrics_initialization_and_health() {
1351 let config = SubmitBroadcasterConfig {
1352 pool_size: 2,
1353 api_key: Some("test_key".to_string()),
1354 api_secret: Some("test_secret".to_string()),
1355 base_url: Some("https://test.example.com".to_string()),
1356 ..Default::default()
1357 };
1358
1359 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1360 let metrics = broadcaster.get_metrics_async().await;
1361
1362 assert_eq!(metrics.total_submits, 0);
1363 assert_eq!(metrics.successful_submits, 0);
1364 assert_eq!(metrics.failed_submits, 0);
1365 assert_eq!(metrics.expected_rejects, 0);
1366 assert_eq!(metrics.total_clients, 2);
1367 assert_eq!(metrics.healthy_clients, 2);
1368 }
1369
1370 #[tokio::test]
1371 async fn test_health_check_task_lifecycle() {
1372 let config = SubmitBroadcasterConfig {
1373 pool_size: 2,
1374 api_key: Some("test_key".to_string()),
1375 api_secret: Some("test_secret".to_string()),
1376 base_url: Some("https://test.example.com".to_string()),
1377 health_check_interval_secs: 1,
1378 ..Default::default()
1379 };
1380
1381 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1382
1383 broadcaster.start().await.unwrap();
1385 assert!(broadcaster.running.load(Ordering::Relaxed));
1386 assert!(
1387 broadcaster
1388 .health_check_task
1389 .read()
1390 .await
1391 .as_ref()
1392 .is_some()
1393 );
1394
1395 broadcaster.stop().await;
1397 assert!(!broadcaster.running.load(Ordering::Relaxed));
1398 }
1399
1400 #[tokio::test]
1401 async fn test_expected_reject_pattern_comprehensive() {
1402 let transports = vec![
1403 create_stub_transport("client-0", || async {
1404 anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1405 }),
1406 create_stub_transport("client-1", || async {
1407 tokio::time::sleep(Duration::from_secs(10)).await;
1408 anyhow::bail!("Should be aborted")
1409 }),
1410 ];
1411
1412 let config = SubmitBroadcasterConfig::default();
1413 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1414
1415 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1416 let result = broadcaster
1417 .broadcast_submit(
1418 instrument_id,
1419 ClientOrderId::from("O-123"),
1420 OrderSide::Buy,
1421 OrderType::Limit,
1422 Quantity::new(100.0, 0),
1423 TimeInForce::Gtc,
1424 Some(Price::new(50000.0, 2)),
1425 None,
1426 None,
1427 None,
1428 false,
1429 false,
1430 None,
1431 None,
1432 None,
1433 )
1434 .await;
1435
1436 assert!(result.is_err());
1438
1439 let metrics = broadcaster.get_metrics_async().await;
1440 assert_eq!(metrics.expected_rejects, 1);
1441 assert_eq!(metrics.failed_submits, 1);
1442 assert_eq!(metrics.successful_submits, 0);
1443 }
1444
1445 #[tokio::test]
1446 async fn test_client_order_id_suffix_for_multiple_clients() {
1447 use std::sync::{Arc, Mutex};
1448
1449 #[derive(Clone)]
1450 struct CaptureExecutor {
1451 captured_ids: Arc<Mutex<Vec<String>>>,
1452 barrier: Arc<tokio::sync::Barrier>,
1453 report: OrderStatusReport,
1454 }
1455
1456 impl SubmitExecutor for CaptureExecutor {
1457 fn health_check(
1458 &self,
1459 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1460 Box::pin(async { Ok(()) })
1461 }
1462
1463 #[allow(clippy::too_many_arguments)]
1464 fn submit_order(
1465 &self,
1466 _instrument_id: InstrumentId,
1467 client_order_id: ClientOrderId,
1468 _order_side: OrderSide,
1469 _order_type: OrderType,
1470 _quantity: Quantity,
1471 _time_in_force: TimeInForce,
1472 _price: Option<Price>,
1473 _trigger_price: Option<Price>,
1474 _trigger_type: Option<TriggerType>,
1475 _display_qty: Option<Quantity>,
1476 _post_only: bool,
1477 _reduce_only: bool,
1478 _order_list_id: Option<OrderListId>,
1479 _contingency_type: Option<ContingencyType>,
1480 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1481 {
1482 self.captured_ids
1484 .lock()
1485 .unwrap()
1486 .push(client_order_id.as_str().to_string());
1487 let report = self.report.clone();
1488 let barrier = Arc::clone(&self.barrier);
1489 Box::pin(async move {
1492 barrier.wait().await;
1493 Ok(report)
1494 })
1495 }
1496
1497 fn add_instrument(&self, _instrument: InstrumentAny) {}
1498 }
1499
1500 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1501 let barrier = Arc::new(tokio::sync::Barrier::new(3));
1502 let report = create_test_report("ORDER-1");
1503
1504 let transports = vec![
1505 TransportClient::new(
1506 CaptureExecutor {
1507 captured_ids: Arc::clone(&captured_ids),
1508 barrier: Arc::clone(&barrier),
1509 report: report.clone(),
1510 },
1511 "client-0".to_string(),
1512 ),
1513 TransportClient::new(
1514 CaptureExecutor {
1515 captured_ids: Arc::clone(&captured_ids),
1516 barrier: Arc::clone(&barrier),
1517 report: report.clone(),
1518 },
1519 "client-1".to_string(),
1520 ),
1521 TransportClient::new(
1522 CaptureExecutor {
1523 captured_ids: Arc::clone(&captured_ids),
1524 barrier: Arc::clone(&barrier),
1525 report: report.clone(),
1526 },
1527 "client-2".to_string(),
1528 ),
1529 ];
1530
1531 let config = SubmitBroadcasterConfig::default();
1532 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1533
1534 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1535 let result = broadcaster
1536 .broadcast_submit(
1537 instrument_id,
1538 ClientOrderId::from("O-123"),
1539 OrderSide::Buy,
1540 OrderType::Limit,
1541 Quantity::new(100.0, 0),
1542 TimeInForce::Gtc,
1543 Some(Price::new(50000.0, 2)),
1544 None,
1545 None,
1546 None,
1547 false,
1548 false,
1549 None,
1550 None,
1551 None,
1552 )
1553 .await;
1554
1555 assert!(result.is_ok());
1556
1557 let ids = captured_ids.lock().unwrap();
1559 assert_eq!(ids.len(), 3);
1560 assert!(ids.contains(&"O-123".to_string())); assert!(ids.contains(&"O-123-1".to_string())); assert!(ids.contains(&"O-123-2".to_string())); }
1564
1565 #[tokio::test]
1566 async fn test_client_order_id_suffix_with_partial_failure() {
1567 use std::sync::{Arc, Mutex};
1568
1569 #[derive(Clone)]
1570 struct CaptureAndFailExecutor {
1571 captured_ids: Arc<Mutex<Vec<String>>>,
1572 barrier: Arc<tokio::sync::Barrier>,
1573 should_succeed: bool,
1574 }
1575
1576 impl SubmitExecutor for CaptureAndFailExecutor {
1577 fn health_check(
1578 &self,
1579 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1580 Box::pin(async { Ok(()) })
1581 }
1582
1583 #[allow(clippy::too_many_arguments)]
1584 fn submit_order(
1585 &self,
1586 _instrument_id: InstrumentId,
1587 client_order_id: ClientOrderId,
1588 _order_side: OrderSide,
1589 _order_type: OrderType,
1590 _quantity: Quantity,
1591 _time_in_force: TimeInForce,
1592 _price: Option<Price>,
1593 _trigger_price: Option<Price>,
1594 _trigger_type: Option<TriggerType>,
1595 _display_qty: Option<Quantity>,
1596 _post_only: bool,
1597 _reduce_only: bool,
1598 _order_list_id: Option<OrderListId>,
1599 _contingency_type: Option<ContingencyType>,
1600 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1601 {
1602 self.captured_ids
1604 .lock()
1605 .unwrap()
1606 .push(client_order_id.as_str().to_string());
1607 let barrier = Arc::clone(&self.barrier);
1608 let should_succeed = self.should_succeed;
1609 Box::pin(async move {
1612 barrier.wait().await;
1613 if should_succeed {
1614 Ok(create_test_report("ORDER-1"))
1615 } else {
1616 anyhow::bail!("Network error")
1617 }
1618 })
1619 }
1620
1621 fn add_instrument(&self, _instrument: InstrumentAny) {}
1622 }
1623
1624 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1625 let barrier = Arc::new(tokio::sync::Barrier::new(2));
1626
1627 let transports = vec![
1628 TransportClient::new(
1629 CaptureAndFailExecutor {
1630 captured_ids: Arc::clone(&captured_ids),
1631 barrier: Arc::clone(&barrier),
1632 should_succeed: false,
1633 },
1634 "client-0".to_string(),
1635 ),
1636 TransportClient::new(
1637 CaptureAndFailExecutor {
1638 captured_ids: Arc::clone(&captured_ids),
1639 barrier: Arc::clone(&barrier),
1640 should_succeed: true,
1641 },
1642 "client-1".to_string(),
1643 ),
1644 ];
1645
1646 let config = SubmitBroadcasterConfig::default();
1647 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1648
1649 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1650 let result = broadcaster
1651 .broadcast_submit(
1652 instrument_id,
1653 ClientOrderId::from("O-456"),
1654 OrderSide::Sell,
1655 OrderType::Market,
1656 Quantity::new(50.0, 0),
1657 TimeInForce::Ioc,
1658 None,
1659 None,
1660 None,
1661 None,
1662 false,
1663 false,
1664 None,
1665 None,
1666 None,
1667 )
1668 .await;
1669
1670 assert!(result.is_ok());
1671
1672 let ids = captured_ids.lock().unwrap();
1674 assert_eq!(ids.len(), 2);
1675 assert!(ids.contains(&"O-456".to_string())); assert!(ids.contains(&"O-456-1".to_string())); }
1678
1679 #[tokio::test]
1680 async fn test_proxy_urls_populated_from_config() {
1681 let config = SubmitBroadcasterConfig {
1682 pool_size: 3,
1683 api_key: Some("test_key".to_string()),
1684 api_secret: Some("test_secret".to_string()),
1685 proxy_urls: vec![
1686 Some("http://proxy1:8080".to_string()),
1687 Some("http://proxy2:8080".to_string()),
1688 Some("http://proxy3:8080".to_string()),
1689 ],
1690 ..Default::default()
1691 };
1692
1693 assert_eq!(config.proxy_urls.len(), 3);
1694 assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1695 assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1696 assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1697 }
1698}