1use std::{
37 fmt::Debug,
38 future::Future,
39 pin::Pin,
40 sync::{
41 Arc,
42 atomic::{AtomicBool, AtomicU64, Ordering},
43 },
44 time::Duration,
45};
46
47use futures_util::future;
48use nautilus_model::{
49 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
50 identifiers::{ClientOrderId, InstrumentId, OrderListId},
51 instruments::InstrumentAny,
52 reports::OrderStatusReport,
53 types::{Price, Quantity},
54};
55use tokio::{sync::RwLock, task::JoinHandle, time::interval};
56
57use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
58
59trait SubmitExecutor: Send + Sync {
81 fn add_instrument(&self, instrument: InstrumentAny);
83
84 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
86
87 #[allow(clippy::too_many_arguments)]
89 fn submit_order(
90 &self,
91 instrument_id: InstrumentId,
92 client_order_id: ClientOrderId,
93 order_side: OrderSide,
94 order_type: OrderType,
95 quantity: Quantity,
96 time_in_force: TimeInForce,
97 price: Option<Price>,
98 trigger_price: Option<Price>,
99 trigger_type: Option<TriggerType>,
100 display_qty: Option<Quantity>,
101 post_only: bool,
102 reduce_only: bool,
103 order_list_id: Option<OrderListId>,
104 contingency_type: Option<ContingencyType>,
105 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
106}
107
108impl SubmitExecutor for BitmexHttpClient {
109 fn add_instrument(&self, instrument: InstrumentAny) {
110 Self::cache_instrument(self, instrument);
111 }
112
113 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
114 Box::pin(async move {
115 Self::get_server_time(self)
116 .await
117 .map(|_| ())
118 .map_err(|e| anyhow::anyhow!("{e}"))
119 })
120 }
121
122 #[allow(clippy::too_many_arguments)]
123 fn submit_order(
124 &self,
125 instrument_id: InstrumentId,
126 client_order_id: ClientOrderId,
127 order_side: OrderSide,
128 order_type: OrderType,
129 quantity: Quantity,
130 time_in_force: TimeInForce,
131 price: Option<Price>,
132 trigger_price: Option<Price>,
133 trigger_type: Option<TriggerType>,
134 display_qty: Option<Quantity>,
135 post_only: bool,
136 reduce_only: bool,
137 order_list_id: Option<OrderListId>,
138 contingency_type: Option<ContingencyType>,
139 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
140 Box::pin(async move {
141 Self::submit_order(
142 self,
143 instrument_id,
144 client_order_id,
145 order_side,
146 order_type,
147 quantity,
148 time_in_force,
149 price,
150 trigger_price,
151 trigger_type,
152 display_qty,
153 post_only,
154 reduce_only,
155 order_list_id,
156 contingency_type,
157 )
158 .await
159 })
160 }
161}
162
163#[derive(Debug, Clone)]
165pub struct SubmitBroadcasterConfig {
166 pub pool_size: usize,
168 pub api_key: Option<String>,
170 pub api_secret: Option<String>,
172 pub base_url: Option<String>,
174 pub testnet: bool,
176 pub timeout_secs: Option<u64>,
178 pub max_retries: Option<u32>,
180 pub retry_delay_ms: Option<u64>,
182 pub retry_delay_max_ms: Option<u64>,
184 pub recv_window_ms: Option<u64>,
186 pub max_requests_per_second: Option<u32>,
188 pub max_requests_per_minute: Option<u32>,
190 pub health_check_interval_secs: u64,
192 pub health_check_timeout_secs: u64,
194 pub expected_reject_patterns: Vec<String>,
196 pub proxy_urls: Vec<Option<String>>,
202}
203
204impl Default for SubmitBroadcasterConfig {
205 fn default() -> Self {
206 Self {
207 pool_size: 3,
208 api_key: None,
209 api_secret: None,
210 base_url: None,
211 testnet: false,
212 timeout_secs: Some(60),
213 max_retries: None,
214 retry_delay_ms: Some(1_000),
215 retry_delay_max_ms: Some(5_000),
216 recv_window_ms: Some(10_000),
217 max_requests_per_second: Some(10),
218 max_requests_per_minute: Some(120),
219 health_check_interval_secs: 30,
220 health_check_timeout_secs: 5,
221 expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
222 proxy_urls: vec![],
223 }
224 }
225}
226
227#[derive(Clone)]
229struct TransportClient {
230 executor: Arc<dyn SubmitExecutor>,
235 client_id: String,
236 healthy: Arc<AtomicBool>,
237 submit_count: Arc<AtomicU64>,
238 error_count: Arc<AtomicU64>,
239}
240
241impl Debug for TransportClient {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 f.debug_struct("TransportClient")
244 .field("client_id", &self.client_id)
245 .field("healthy", &self.healthy)
246 .field("submit_count", &self.submit_count)
247 .field("error_count", &self.error_count)
248 .finish()
249 }
250}
251
252impl TransportClient {
253 fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
254 Self {
255 executor: Arc::new(executor),
256 client_id,
257 healthy: Arc::new(AtomicBool::new(true)),
258 submit_count: Arc::new(AtomicU64::new(0)),
259 error_count: Arc::new(AtomicU64::new(0)),
260 }
261 }
262
263 fn is_healthy(&self) -> bool {
264 self.healthy.load(Ordering::Relaxed)
265 }
266
267 fn mark_healthy(&self) {
268 self.healthy.store(true, Ordering::Relaxed);
269 }
270
271 fn mark_unhealthy(&self) {
272 self.healthy.store(false, Ordering::Relaxed);
273 }
274
275 fn get_submit_count(&self) -> u64 {
276 self.submit_count.load(Ordering::Relaxed)
277 }
278
279 fn get_error_count(&self) -> u64 {
280 self.error_count.load(Ordering::Relaxed)
281 }
282
283 async fn health_check(&self, timeout_secs: u64) -> bool {
284 match tokio::time::timeout(
285 Duration::from_secs(timeout_secs),
286 self.executor.health_check(),
287 )
288 .await
289 {
290 Ok(Ok(_)) => {
291 self.mark_healthy();
292 true
293 }
294 Ok(Err(e)) => {
295 tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
296 self.mark_unhealthy();
297 false
298 }
299 Err(_) => {
300 tracing::warn!("Health check timeout for client {}", self.client_id);
301 self.mark_unhealthy();
302 false
303 }
304 }
305 }
306
307 #[allow(clippy::too_many_arguments)]
308 async fn submit_order(
309 &self,
310 instrument_id: InstrumentId,
311 client_order_id: ClientOrderId,
312 order_side: OrderSide,
313 order_type: OrderType,
314 quantity: Quantity,
315 time_in_force: TimeInForce,
316 price: Option<Price>,
317 trigger_price: Option<Price>,
318 trigger_type: Option<TriggerType>,
319 display_qty: Option<Quantity>,
320 post_only: bool,
321 reduce_only: bool,
322 order_list_id: Option<OrderListId>,
323 contingency_type: Option<ContingencyType>,
324 ) -> anyhow::Result<OrderStatusReport> {
325 self.submit_count.fetch_add(1, Ordering::Relaxed);
326
327 match self
328 .executor
329 .submit_order(
330 instrument_id,
331 client_order_id,
332 order_side,
333 order_type,
334 quantity,
335 time_in_force,
336 price,
337 trigger_price,
338 trigger_type,
339 display_qty,
340 post_only,
341 reduce_only,
342 order_list_id,
343 contingency_type,
344 )
345 .await
346 {
347 Ok(report) => {
348 self.mark_healthy();
349 Ok(report)
350 }
351 Err(e) => {
352 self.error_count.fetch_add(1, Ordering::Relaxed);
353 Err(e)
354 }
355 }
356 }
357}
358
359#[cfg_attr(feature = "python", pyo3::pyclass)]
365#[derive(Debug)]
366pub struct SubmitBroadcaster {
367 config: SubmitBroadcasterConfig,
368 transports: Arc<Vec<TransportClient>>,
369 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
370 running: Arc<AtomicBool>,
371 total_submits: Arc<AtomicU64>,
372 successful_submits: Arc<AtomicU64>,
373 failed_submits: Arc<AtomicU64>,
374 expected_rejects: Arc<AtomicU64>,
375}
376
377impl SubmitBroadcaster {
378 pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
384 let mut transports = Vec::with_capacity(config.pool_size);
385
386 let base_url = if config.testnet && config.base_url.is_none() {
388 Some(BITMEX_HTTP_TESTNET_URL.to_string())
389 } else {
390 config.base_url.clone()
391 };
392
393 for i in 0..config.pool_size {
394 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
396
397 let client = BitmexHttpClient::with_credentials(
398 config.api_key.clone(),
399 config.api_secret.clone(),
400 base_url.clone(),
401 config.timeout_secs,
402 config.max_retries,
403 config.retry_delay_ms,
404 config.retry_delay_max_ms,
405 config.recv_window_ms,
406 config.max_requests_per_second,
407 config.max_requests_per_minute,
408 proxy_url,
409 )
410 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
411
412 transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
413 }
414
415 Ok(Self {
416 config,
417 transports: Arc::new(transports),
418 health_check_task: Arc::new(RwLock::new(None)),
419 running: Arc::new(AtomicBool::new(false)),
420 total_submits: Arc::new(AtomicU64::new(0)),
421 successful_submits: Arc::new(AtomicU64::new(0)),
422 failed_submits: Arc::new(AtomicU64::new(0)),
423 expected_rejects: Arc::new(AtomicU64::new(0)),
424 })
425 }
426
427 pub async fn start(&self) -> anyhow::Result<()> {
433 if self.running.load(Ordering::Relaxed) {
434 return Ok(());
435 }
436
437 self.running.store(true, Ordering::Relaxed);
438
439 self.run_health_checks().await;
441
442 let transports = Arc::clone(&self.transports);
444 let running = Arc::clone(&self.running);
445 let interval_secs = self.config.health_check_interval_secs;
446 let timeout_secs = self.config.health_check_timeout_secs;
447
448 let task = tokio::spawn(async move {
449 let mut ticker = interval(Duration::from_secs(interval_secs));
450 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
451
452 loop {
453 ticker.tick().await;
454
455 if !running.load(Ordering::Relaxed) {
456 break;
457 }
458
459 let tasks: Vec<_> = transports
460 .iter()
461 .map(|t| t.health_check(timeout_secs))
462 .collect();
463
464 let results = future::join_all(tasks).await;
465 let healthy_count = results.iter().filter(|&&r| r).count();
466
467 tracing::debug!(
468 "Health check complete: {healthy_count}/{} clients healthy",
469 results.len()
470 );
471 }
472 });
473
474 *self.health_check_task.write().await = Some(task);
475
476 tracing::info!(
477 "SubmitBroadcaster started with {} clients",
478 self.transports.len()
479 );
480
481 Ok(())
482 }
483
484 pub async fn stop(&self) {
486 if !self.running.load(Ordering::Relaxed) {
487 return;
488 }
489
490 self.running.store(false, Ordering::Relaxed);
491
492 if let Some(task) = self.health_check_task.write().await.take() {
493 task.abort();
494 }
495
496 tracing::info!("SubmitBroadcaster stopped");
497 }
498
499 async fn run_health_checks(&self) {
500 let tasks: Vec<_> = self
501 .transports
502 .iter()
503 .map(|t| t.health_check(self.config.health_check_timeout_secs))
504 .collect();
505
506 let results = future::join_all(tasks).await;
507 let healthy_count = results.iter().filter(|&&r| r).count();
508
509 tracing::debug!(
510 "Health check complete: {healthy_count}/{} clients healthy",
511 results.len()
512 );
513 }
514
515 fn is_expected_reject(&self, error_message: &str) -> bool {
516 self.config
517 .expected_reject_patterns
518 .iter()
519 .any(|pattern| error_message.contains(pattern))
520 }
521
522 async fn process_submit_results<T>(
526 &self,
527 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
528 operation: &str,
529 params: String,
530 ) -> anyhow::Result<T>
531 where
532 T: Send + 'static,
533 {
534 let mut errors = Vec::new();
535
536 while !handles.is_empty() {
537 let current_handles = std::mem::take(&mut handles);
538 let (result, _idx, remaining) = future::select_all(current_handles).await;
539 handles = remaining.into_iter().collect();
540
541 match result {
542 Ok((client_id, Ok(result))) => {
543 for handle in &handles {
545 handle.abort();
546 }
547 self.successful_submits.fetch_add(1, Ordering::Relaxed);
548 tracing::debug!("{} broadcast succeeded [{client_id}] {params}", operation,);
549 return Ok(result);
550 }
551 Ok((client_id, Err(e))) => {
552 let error_msg = e.to_string();
553
554 if self.is_expected_reject(&error_msg) {
555 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
556 tracing::debug!(
557 "Expected {} rejection [{client_id}]: {error_msg} {params}",
558 operation.to_lowercase(),
559 );
560 errors.push(error_msg);
561 } else {
562 tracing::warn!(
563 "{} request failed [{client_id}]: {error_msg} {params}",
564 operation,
565 );
566 errors.push(error_msg);
567 }
568 }
569 Err(e) => {
570 tracing::warn!("{} task join error: {e:?}", operation);
571 errors.push(format!("Task panicked: {e:?}"));
572 }
573 }
574 }
575
576 self.failed_submits.fetch_add(1, Ordering::Relaxed);
578 tracing::error!(
579 "All {} requests failed: {errors:?} {params}",
580 operation.to_lowercase(),
581 );
582 Err(anyhow::anyhow!(
583 "All {} requests failed: {:?}",
584 operation.to_lowercase(),
585 errors
586 ))
587 }
588
589 #[allow(clippy::too_many_arguments)]
600 pub async fn broadcast_submit(
601 &self,
602 instrument_id: InstrumentId,
603 client_order_id: ClientOrderId,
604 order_side: OrderSide,
605 order_type: OrderType,
606 quantity: Quantity,
607 time_in_force: TimeInForce,
608 price: Option<Price>,
609 trigger_price: Option<Price>,
610 trigger_type: Option<TriggerType>,
611 display_qty: Option<Quantity>,
612 post_only: bool,
613 reduce_only: bool,
614 order_list_id: Option<OrderListId>,
615 contingency_type: Option<ContingencyType>,
616 submit_tries: Option<usize>,
617 ) -> anyhow::Result<OrderStatusReport> {
618 self.total_submits.fetch_add(1, Ordering::Relaxed);
619
620 let pool_size = self.config.pool_size;
621 let actual_tries = if let Some(t) = submit_tries {
622 if t > pool_size {
623 log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
625 }
626 std::cmp::min(t, pool_size)
627 } else {
628 pool_size
629 };
630
631 tracing::debug!(
632 "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
633 );
634
635 let healthy_transports: Vec<TransportClient> = self
636 .transports
637 .iter()
638 .filter(|t| t.is_healthy())
639 .take(actual_tries)
640 .cloned()
641 .collect();
642
643 if healthy_transports.is_empty() {
644 self.failed_submits.fetch_add(1, Ordering::Relaxed);
645 anyhow::bail!("No healthy transport clients available");
646 }
647
648 tracing::debug!(
649 "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
650 healthy_transports.len(),
651 );
652
653 let mut handles = Vec::new();
654 for (idx, transport) in healthy_transports.into_iter().enumerate() {
655 let modified_client_order_id = if idx == 0 {
657 client_order_id
658 } else {
659 ClientOrderId::new(format!("{}-{}", client_order_id.as_str(), idx))
660 };
661
662 let handle = tokio::spawn(async move {
663 let client_id = transport.client_id.clone();
664 let result = transport
665 .submit_order(
666 instrument_id,
667 modified_client_order_id,
668 order_side,
669 order_type,
670 quantity,
671 time_in_force,
672 price,
673 trigger_price,
674 trigger_type,
675 display_qty,
676 post_only,
677 reduce_only,
678 order_list_id,
679 contingency_type,
680 )
681 .await;
682 (client_id, result)
683 });
684 handles.push(handle);
685 }
686
687 self.process_submit_results(
688 handles,
689 "Submit",
690 format!("(client_order_id={client_order_id:?})"),
691 )
692 .await
693 }
694
695 pub fn get_metrics(&self) -> BroadcasterMetrics {
697 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
698 let total_clients = self.transports.len();
699
700 BroadcasterMetrics {
701 total_submits: self.total_submits.load(Ordering::Relaxed),
702 successful_submits: self.successful_submits.load(Ordering::Relaxed),
703 failed_submits: self.failed_submits.load(Ordering::Relaxed),
704 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
705 healthy_clients,
706 total_clients,
707 }
708 }
709
710 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
712 self.get_metrics()
713 }
714
715 pub fn get_client_stats(&self) -> Vec<ClientStats> {
717 self.transports
718 .iter()
719 .map(|t| ClientStats {
720 client_id: t.client_id.clone(),
721 healthy: t.is_healthy(),
722 submit_count: t.get_submit_count(),
723 error_count: t.get_error_count(),
724 })
725 .collect()
726 }
727
728 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
730 self.get_client_stats()
731 }
732
733 pub fn cache_instrument(&self, instrument: InstrumentAny) {
735 for transport in self.transports.iter() {
736 transport.executor.add_instrument(instrument.clone());
737 }
738 }
739
740 #[must_use]
741 pub fn clone_for_async(&self) -> Self {
742 Self {
743 config: self.config.clone(),
744 transports: Arc::clone(&self.transports),
745 health_check_task: Arc::clone(&self.health_check_task),
746 running: Arc::clone(&self.running),
747 total_submits: Arc::clone(&self.total_submits),
748 successful_submits: Arc::clone(&self.successful_submits),
749 failed_submits: Arc::clone(&self.failed_submits),
750 expected_rejects: Arc::clone(&self.expected_rejects),
751 }
752 }
753
754 #[cfg(test)]
755 fn new_with_transports(
756 config: SubmitBroadcasterConfig,
757 transports: Vec<TransportClient>,
758 ) -> Self {
759 Self {
760 config,
761 transports: Arc::new(transports),
762 health_check_task: Arc::new(RwLock::new(None)),
763 running: Arc::new(AtomicBool::new(false)),
764 total_submits: Arc::new(AtomicU64::new(0)),
765 successful_submits: Arc::new(AtomicU64::new(0)),
766 failed_submits: Arc::new(AtomicU64::new(0)),
767 expected_rejects: Arc::new(AtomicU64::new(0)),
768 }
769 }
770}
771
772#[derive(Debug, Clone)]
774pub struct BroadcasterMetrics {
775 pub total_submits: u64,
776 pub successful_submits: u64,
777 pub failed_submits: u64,
778 pub expected_rejects: u64,
779 pub healthy_clients: usize,
780 pub total_clients: usize,
781}
782
783#[derive(Debug, Clone)]
785pub struct ClientStats {
786 pub client_id: String,
787 pub healthy: bool,
788 pub submit_count: u64,
789 pub error_count: u64,
790}
791
792#[cfg(test)]
793mod tests {
794 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
795
796 use nautilus_core::UUID4;
797 use nautilus_model::{
798 enums::{
799 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
800 },
801 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
802 reports::OrderStatusReport,
803 types::{Price, Quantity},
804 };
805
806 use super::*;
807
808 #[derive(Clone)]
810 #[allow(clippy::type_complexity)]
811 struct MockExecutor {
812 handler: Arc<
813 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
814 + Send
815 + Sync,
816 >,
817 }
818
819 impl MockExecutor {
820 fn new<F, Fut>(handler: F) -> Self
821 where
822 F: Fn() -> Fut + Send + Sync + 'static,
823 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
824 {
825 Self {
826 handler: Arc::new(move || Box::pin(handler())),
827 }
828 }
829 }
830
831 impl SubmitExecutor for MockExecutor {
832 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
833 Box::pin(async { Ok(()) })
834 }
835
836 #[allow(clippy::too_many_arguments)]
837 fn submit_order(
838 &self,
839 _instrument_id: InstrumentId,
840 _client_order_id: ClientOrderId,
841 _order_side: OrderSide,
842 _order_type: OrderType,
843 _quantity: Quantity,
844 _time_in_force: TimeInForce,
845 _price: Option<Price>,
846 _trigger_price: Option<Price>,
847 _trigger_type: Option<TriggerType>,
848 _display_qty: Option<Quantity>,
849 _post_only: bool,
850 _reduce_only: bool,
851 _order_list_id: Option<OrderListId>,
852 _contingency_type: Option<ContingencyType>,
853 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
854 (self.handler)()
855 }
856
857 fn add_instrument(&self, _instrument: InstrumentAny) {
858 }
860 }
861
862 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
863 OrderStatusReport {
864 account_id: AccountId::from("BITMEX-001"),
865 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
866 venue_order_id: VenueOrderId::from(venue_order_id),
867 order_side: OrderSide::Buy,
868 order_type: OrderType::Limit,
869 time_in_force: TimeInForce::Gtc,
870 order_status: OrderStatus::Accepted,
871 price: Some(Price::new(50000.0, 2)),
872 quantity: Quantity::new(100.0, 0),
873 filled_qty: Quantity::new(0.0, 0),
874 report_id: UUID4::new(),
875 ts_accepted: 0.into(),
876 ts_last: 0.into(),
877 ts_init: 0.into(),
878 client_order_id: None,
879 avg_px: None,
880 trigger_price: None,
881 trigger_type: None,
882 contingency_type: ContingencyType::NoContingency,
883 expire_time: None,
884 order_list_id: None,
885 venue_position_id: None,
886 linked_order_ids: None,
887 parent_order_id: None,
888 display_qty: None,
889 limit_offset: None,
890 trailing_offset: None,
891 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
892 post_only: false,
893 reduce_only: false,
894 cancel_reason: None,
895 ts_triggered: None,
896 }
897 }
898
899 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
900 where
901 F: Fn() -> Fut + Send + Sync + 'static,
902 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
903 {
904 let executor = MockExecutor::new(handler);
905 TransportClient::new(executor, client_id.to_string())
906 }
907
908 #[tokio::test]
909 async fn test_broadcast_submit_immediate_success() {
910 let report = create_test_report("ORDER-1");
911 let report_clone = report.clone();
912
913 let transports = vec![
914 create_stub_transport("client-0", move || {
915 let report = report_clone.clone();
916 async move { Ok(report) }
917 }),
918 create_stub_transport("client-1", || async {
919 tokio::time::sleep(Duration::from_secs(10)).await;
920 anyhow::bail!("Should be aborted")
921 }),
922 ];
923
924 let config = SubmitBroadcasterConfig::default();
925 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
926
927 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
928 let result = broadcaster
929 .broadcast_submit(
930 instrument_id,
931 ClientOrderId::from("O-123"),
932 OrderSide::Buy,
933 OrderType::Limit,
934 Quantity::new(100.0, 0),
935 TimeInForce::Gtc,
936 Some(Price::new(50000.0, 2)),
937 None,
938 None,
939 None,
940 false,
941 false,
942 None,
943 None,
944 None,
945 )
946 .await;
947
948 assert!(result.is_ok());
949 let returned_report = result.unwrap();
950 assert_eq!(returned_report.venue_order_id, report.venue_order_id);
951
952 let metrics = broadcaster.get_metrics_async().await;
953 assert_eq!(metrics.successful_submits, 1);
954 assert_eq!(metrics.failed_submits, 0);
955 assert_eq!(metrics.total_submits, 1);
956 }
957
958 #[tokio::test]
959 async fn test_broadcast_submit_duplicate_clordid_expected() {
960 let transports = vec![
961 create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
962 create_stub_transport("client-1", || async {
963 tokio::time::sleep(Duration::from_secs(10)).await;
964 anyhow::bail!("Should be aborted")
965 }),
966 ];
967
968 let config = SubmitBroadcasterConfig::default();
969 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
970
971 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
972 let result = broadcaster
973 .broadcast_submit(
974 instrument_id,
975 ClientOrderId::from("O-123"),
976 OrderSide::Buy,
977 OrderType::Limit,
978 Quantity::new(100.0, 0),
979 TimeInForce::Gtc,
980 Some(Price::new(50000.0, 2)),
981 None,
982 None,
983 None,
984 false,
985 false,
986 None,
987 None,
988 None,
989 )
990 .await;
991
992 assert!(result.is_err());
993
994 let metrics = broadcaster.get_metrics_async().await;
995 assert_eq!(metrics.expected_rejects, 1);
996 assert_eq!(metrics.successful_submits, 0);
997 assert_eq!(metrics.failed_submits, 1);
998 }
999
1000 #[tokio::test]
1001 async fn test_broadcast_submit_all_failures() {
1002 let transports = vec![
1003 create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1004 create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1005 ];
1006
1007 let config = SubmitBroadcasterConfig::default();
1008 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1009
1010 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1011 let result = broadcaster
1012 .broadcast_submit(
1013 instrument_id,
1014 ClientOrderId::from("O-456"),
1015 OrderSide::Sell,
1016 OrderType::Market,
1017 Quantity::new(50.0, 0),
1018 TimeInForce::Ioc,
1019 None,
1020 None,
1021 None,
1022 None,
1023 false,
1024 false,
1025 None,
1026 None,
1027 None,
1028 )
1029 .await;
1030
1031 assert!(result.is_err());
1032 assert!(
1033 result
1034 .unwrap_err()
1035 .to_string()
1036 .contains("All submit requests failed")
1037 );
1038
1039 let metrics = broadcaster.get_metrics_async().await;
1040 assert_eq!(metrics.failed_submits, 1);
1041 assert_eq!(metrics.successful_submits, 0);
1042 }
1043
1044 #[tokio::test]
1045 async fn test_broadcast_submit_no_healthy_clients() {
1046 let transport =
1047 create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1048 transport.healthy.store(false, Ordering::Relaxed);
1049
1050 let config = SubmitBroadcasterConfig::default();
1051 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1052
1053 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1054 let result = broadcaster
1055 .broadcast_submit(
1056 instrument_id,
1057 ClientOrderId::from("O-789"),
1058 OrderSide::Buy,
1059 OrderType::Limit,
1060 Quantity::new(100.0, 0),
1061 TimeInForce::Gtc,
1062 Some(Price::new(50000.0, 2)),
1063 None,
1064 None,
1065 None,
1066 false,
1067 false,
1068 None,
1069 None,
1070 None,
1071 )
1072 .await;
1073
1074 assert!(result.is_err());
1075 assert!(
1076 result
1077 .unwrap_err()
1078 .to_string()
1079 .contains("No healthy transport clients available")
1080 );
1081
1082 let metrics = broadcaster.get_metrics_async().await;
1083 assert_eq!(metrics.failed_submits, 1);
1084 }
1085
1086 #[tokio::test]
1087 async fn test_default_config() {
1088 let config = SubmitBroadcasterConfig {
1089 api_key: Some("test_key".to_string()),
1090 api_secret: Some("test_secret".to_string()),
1091 base_url: Some("https://test.example.com".to_string()),
1092 ..Default::default()
1093 };
1094
1095 let broadcaster = SubmitBroadcaster::new(config);
1096 assert!(broadcaster.is_ok());
1097
1098 let broadcaster = broadcaster.unwrap();
1099 let metrics = broadcaster.get_metrics_async().await;
1100
1101 assert_eq!(metrics.total_clients, 3);
1103 }
1104
1105 #[tokio::test]
1106 async fn test_broadcaster_lifecycle() {
1107 let config = SubmitBroadcasterConfig {
1108 pool_size: 2,
1109 api_key: Some("test_key".to_string()),
1110 api_secret: Some("test_secret".to_string()),
1111 base_url: Some("https://test.example.com".to_string()),
1112 testnet: false,
1113 timeout_secs: Some(5),
1114 max_retries: None,
1115 retry_delay_ms: None,
1116 retry_delay_max_ms: None,
1117 recv_window_ms: None,
1118 max_requests_per_second: None,
1119 max_requests_per_minute: None,
1120 health_check_interval_secs: 60,
1121 health_check_timeout_secs: 1,
1122 expected_reject_patterns: vec![],
1123 proxy_urls: vec![],
1124 };
1125
1126 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1127
1128 assert!(!broadcaster.running.load(Ordering::Relaxed));
1130
1131 let start_result = broadcaster.start().await;
1133 assert!(start_result.is_ok());
1134 assert!(broadcaster.running.load(Ordering::Relaxed));
1135
1136 let start_again = broadcaster.start().await;
1138 assert!(start_again.is_ok());
1139
1140 broadcaster.stop().await;
1142 assert!(!broadcaster.running.load(Ordering::Relaxed));
1143
1144 broadcaster.stop().await;
1146 assert!(!broadcaster.running.load(Ordering::Relaxed));
1147 }
1148
1149 #[tokio::test]
1150 async fn test_broadcast_submit_metrics_increment() {
1151 let report = create_test_report("ORDER-1");
1152 let report_clone = report.clone();
1153
1154 let transports = vec![create_stub_transport("client-0", move || {
1155 let report = report_clone.clone();
1156 async move { Ok(report) }
1157 })];
1158
1159 let config = SubmitBroadcasterConfig::default();
1160 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1161
1162 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1163 let _ = broadcaster
1164 .broadcast_submit(
1165 instrument_id,
1166 ClientOrderId::from("O-123"),
1167 OrderSide::Buy,
1168 OrderType::Limit,
1169 Quantity::new(100.0, 0),
1170 TimeInForce::Gtc,
1171 Some(Price::new(50000.0, 2)),
1172 None,
1173 None,
1174 None,
1175 false,
1176 false,
1177 None,
1178 None,
1179 None,
1180 )
1181 .await;
1182
1183 let metrics = broadcaster.get_metrics_async().await;
1184 assert_eq!(metrics.total_submits, 1);
1185 assert_eq!(metrics.successful_submits, 1);
1186 assert_eq!(metrics.failed_submits, 0);
1187 }
1188
1189 #[tokio::test]
1190 async fn test_broadcaster_creation_with_pool() {
1191 let config = SubmitBroadcasterConfig {
1192 pool_size: 4,
1193 api_key: Some("test_key".to_string()),
1194 api_secret: Some("test_secret".to_string()),
1195 base_url: Some("https://test.example.com".to_string()),
1196 ..Default::default()
1197 };
1198
1199 let broadcaster = SubmitBroadcaster::new(config);
1200 assert!(broadcaster.is_ok());
1201
1202 let broadcaster = broadcaster.unwrap();
1203 let metrics = broadcaster.get_metrics_async().await;
1204 assert_eq!(metrics.total_clients, 4);
1205 }
1206
1207 #[tokio::test]
1208 async fn test_client_stats_collection() {
1209 let report = create_test_report("ORDER-1");
1210 let report_clone = report.clone();
1211
1212 let transports = vec![
1213 create_stub_transport("client-0", move || {
1214 let report = report_clone.clone();
1215 async move { Ok(report) }
1216 }),
1217 create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1218 ];
1219
1220 let config = SubmitBroadcasterConfig::default();
1221 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1222
1223 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1224 let _ = broadcaster
1225 .broadcast_submit(
1226 instrument_id,
1227 ClientOrderId::from("O-123"),
1228 OrderSide::Buy,
1229 OrderType::Limit,
1230 Quantity::new(100.0, 0),
1231 TimeInForce::Gtc,
1232 Some(Price::new(50000.0, 2)),
1233 None,
1234 None,
1235 None,
1236 false,
1237 false,
1238 None,
1239 None,
1240 None,
1241 )
1242 .await;
1243
1244 let stats = broadcaster.get_client_stats_async().await;
1245 assert_eq!(stats.len(), 2);
1246
1247 let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1248 assert_eq!(client0.submit_count, 1);
1249 assert_eq!(client0.error_count, 0);
1250
1251 let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1252 assert_eq!(client1.submit_count, 1);
1253 assert_eq!(client1.error_count, 1);
1254 }
1255
1256 #[tokio::test]
1257 async fn test_testnet_config_sets_base_url() {
1258 let config = SubmitBroadcasterConfig {
1259 pool_size: 1,
1260 api_key: Some("test_key".to_string()),
1261 api_secret: Some("test_secret".to_string()),
1262 testnet: true,
1263 base_url: None,
1264 ..Default::default()
1265 };
1266
1267 let broadcaster = SubmitBroadcaster::new(config);
1268 assert!(broadcaster.is_ok());
1269 }
1270
1271 #[tokio::test]
1272 async fn test_clone_for_async() {
1273 let config = SubmitBroadcasterConfig {
1274 pool_size: 1,
1275 api_key: Some("test_key".to_string()),
1276 api_secret: Some("test_secret".to_string()),
1277 base_url: Some("https://test.example.com".to_string()),
1278 ..Default::default()
1279 };
1280
1281 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1282 let cloned = broadcaster.clone_for_async();
1283
1284 broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1286 assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1287 }
1288
1289 #[tokio::test]
1290 async fn test_pattern_matching() {
1291 let config = SubmitBroadcasterConfig {
1292 expected_reject_patterns: vec![
1293 "Duplicate clOrdID".to_string(),
1294 "Order already exists".to_string(),
1295 ],
1296 ..Default::default()
1297 };
1298
1299 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1300
1301 assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1302 assert!(broadcaster.is_expected_reject("Order already exists in system"));
1303 assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1304 assert!(!broadcaster.is_expected_reject("Internal server error"));
1305 }
1306
1307 #[tokio::test]
1308 async fn test_submit_metrics_with_mixed_responses() {
1309 let report = create_test_report("ORDER-1");
1310 let report_clone = report.clone();
1311
1312 let transports = vec![
1313 create_stub_transport("client-0", move || {
1314 let report = report_clone.clone();
1315 async move { Ok(report) }
1316 }),
1317 create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1318 ];
1319
1320 let config = SubmitBroadcasterConfig::default();
1321 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1322
1323 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1324 let result = broadcaster
1325 .broadcast_submit(
1326 instrument_id,
1327 ClientOrderId::from("O-123"),
1328 OrderSide::Buy,
1329 OrderType::Limit,
1330 Quantity::new(100.0, 0),
1331 TimeInForce::Gtc,
1332 Some(Price::new(50000.0, 2)),
1333 None,
1334 None,
1335 None,
1336 false,
1337 false,
1338 None,
1339 None,
1340 None,
1341 )
1342 .await;
1343
1344 assert!(result.is_ok());
1345
1346 let metrics = broadcaster.get_metrics_async().await;
1347 assert_eq!(metrics.total_submits, 1);
1348 assert_eq!(metrics.successful_submits, 1);
1349 assert_eq!(metrics.failed_submits, 0);
1350 }
1351
1352 #[tokio::test]
1353 async fn test_metrics_initialization_and_health() {
1354 let config = SubmitBroadcasterConfig {
1355 pool_size: 2,
1356 api_key: Some("test_key".to_string()),
1357 api_secret: Some("test_secret".to_string()),
1358 base_url: Some("https://test.example.com".to_string()),
1359 ..Default::default()
1360 };
1361
1362 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1363 let metrics = broadcaster.get_metrics_async().await;
1364
1365 assert_eq!(metrics.total_submits, 0);
1366 assert_eq!(metrics.successful_submits, 0);
1367 assert_eq!(metrics.failed_submits, 0);
1368 assert_eq!(metrics.expected_rejects, 0);
1369 assert_eq!(metrics.total_clients, 2);
1370 assert_eq!(metrics.healthy_clients, 2);
1371 }
1372
1373 #[tokio::test]
1374 async fn test_health_check_task_lifecycle() {
1375 let config = SubmitBroadcasterConfig {
1376 pool_size: 2,
1377 api_key: Some("test_key".to_string()),
1378 api_secret: Some("test_secret".to_string()),
1379 base_url: Some("https://test.example.com".to_string()),
1380 health_check_interval_secs: 1,
1381 ..Default::default()
1382 };
1383
1384 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1385
1386 broadcaster.start().await.unwrap();
1388 assert!(broadcaster.running.load(Ordering::Relaxed));
1389 assert!(
1390 broadcaster
1391 .health_check_task
1392 .read()
1393 .await
1394 .as_ref()
1395 .is_some()
1396 );
1397
1398 tokio::time::sleep(Duration::from_millis(100)).await;
1400
1401 broadcaster.stop().await;
1403 assert!(!broadcaster.running.load(Ordering::Relaxed));
1404 }
1405
1406 #[tokio::test]
1407 async fn test_expected_reject_pattern_comprehensive() {
1408 let transports = vec![
1409 create_stub_transport("client-0", || async {
1410 anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1411 }),
1412 create_stub_transport("client-1", || async {
1413 tokio::time::sleep(Duration::from_secs(10)).await;
1414 anyhow::bail!("Should be aborted")
1415 }),
1416 ];
1417
1418 let config = SubmitBroadcasterConfig::default();
1419 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1420
1421 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1422 let result = broadcaster
1423 .broadcast_submit(
1424 instrument_id,
1425 ClientOrderId::from("O-123"),
1426 OrderSide::Buy,
1427 OrderType::Limit,
1428 Quantity::new(100.0, 0),
1429 TimeInForce::Gtc,
1430 Some(Price::new(50000.0, 2)),
1431 None,
1432 None,
1433 None,
1434 false,
1435 false,
1436 None,
1437 None,
1438 None,
1439 )
1440 .await;
1441
1442 assert!(result.is_err());
1444
1445 let metrics = broadcaster.get_metrics_async().await;
1446 assert_eq!(metrics.expected_rejects, 1);
1447 assert_eq!(metrics.failed_submits, 1);
1448 assert_eq!(metrics.successful_submits, 0);
1449 }
1450
1451 #[tokio::test]
1452 async fn test_client_order_id_suffix_for_multiple_clients() {
1453 use std::sync::{Arc, Mutex};
1454
1455 #[derive(Clone)]
1456 struct CaptureExecutor {
1457 captured_ids: Arc<Mutex<Vec<String>>>,
1458 report: OrderStatusReport,
1459 }
1460
1461 impl SubmitExecutor for CaptureExecutor {
1462 fn health_check(
1463 &self,
1464 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1465 Box::pin(async { Ok(()) })
1466 }
1467
1468 #[allow(clippy::too_many_arguments)]
1469 fn submit_order(
1470 &self,
1471 _instrument_id: InstrumentId,
1472 client_order_id: ClientOrderId,
1473 _order_side: OrderSide,
1474 _order_type: OrderType,
1475 _quantity: Quantity,
1476 _time_in_force: TimeInForce,
1477 _price: Option<Price>,
1478 _trigger_price: Option<Price>,
1479 _trigger_type: Option<TriggerType>,
1480 _display_qty: Option<Quantity>,
1481 _post_only: bool,
1482 _reduce_only: bool,
1483 _order_list_id: Option<OrderListId>,
1484 _contingency_type: Option<ContingencyType>,
1485 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1486 {
1487 self.captured_ids
1489 .lock()
1490 .unwrap()
1491 .push(client_order_id.as_str().to_string());
1492 let report = self.report.clone();
1493 Box::pin(async move { Ok(report) })
1494 }
1495
1496 fn add_instrument(&self, _instrument: InstrumentAny) {}
1497 }
1498
1499 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1500 let report = create_test_report("ORDER-1");
1501
1502 let transports = vec![
1503 TransportClient::new(
1504 CaptureExecutor {
1505 captured_ids: Arc::clone(&captured_ids),
1506 report: report.clone(),
1507 },
1508 "client-0".to_string(),
1509 ),
1510 TransportClient::new(
1511 CaptureExecutor {
1512 captured_ids: Arc::clone(&captured_ids),
1513 report: report.clone(),
1514 },
1515 "client-1".to_string(),
1516 ),
1517 TransportClient::new(
1518 CaptureExecutor {
1519 captured_ids: Arc::clone(&captured_ids),
1520 report: report.clone(),
1521 },
1522 "client-2".to_string(),
1523 ),
1524 ];
1525
1526 let config = SubmitBroadcasterConfig::default();
1527 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1528
1529 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1530 let result = broadcaster
1531 .broadcast_submit(
1532 instrument_id,
1533 ClientOrderId::from("O-123"),
1534 OrderSide::Buy,
1535 OrderType::Limit,
1536 Quantity::new(100.0, 0),
1537 TimeInForce::Gtc,
1538 Some(Price::new(50000.0, 2)),
1539 None,
1540 None,
1541 None,
1542 false,
1543 false,
1544 None,
1545 None,
1546 None,
1547 )
1548 .await;
1549
1550 assert!(result.is_ok());
1551
1552 let ids = captured_ids.lock().unwrap();
1554 assert_eq!(ids.len(), 3);
1555 assert_eq!(ids[0], "O-123"); assert_eq!(ids[1], "O-123-1"); assert_eq!(ids[2], "O-123-2"); }
1559
1560 #[tokio::test]
1561 async fn test_client_order_id_suffix_with_partial_failure() {
1562 use std::sync::{Arc, Mutex};
1563
1564 #[derive(Clone)]
1565 struct CaptureAndFailExecutor {
1566 captured_ids: Arc<Mutex<Vec<String>>>,
1567 should_succeed: bool,
1568 }
1569
1570 impl SubmitExecutor for CaptureAndFailExecutor {
1571 fn health_check(
1572 &self,
1573 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1574 Box::pin(async { Ok(()) })
1575 }
1576
1577 #[allow(clippy::too_many_arguments)]
1578 fn submit_order(
1579 &self,
1580 _instrument_id: InstrumentId,
1581 client_order_id: ClientOrderId,
1582 _order_side: OrderSide,
1583 _order_type: OrderType,
1584 _quantity: Quantity,
1585 _time_in_force: TimeInForce,
1586 _price: Option<Price>,
1587 _trigger_price: Option<Price>,
1588 _trigger_type: Option<TriggerType>,
1589 _display_qty: Option<Quantity>,
1590 _post_only: bool,
1591 _reduce_only: bool,
1592 _order_list_id: Option<OrderListId>,
1593 _contingency_type: Option<ContingencyType>,
1594 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1595 {
1596 self.captured_ids
1598 .lock()
1599 .unwrap()
1600 .push(client_order_id.as_str().to_string());
1601 let should_succeed = self.should_succeed;
1602 Box::pin(async move {
1603 if should_succeed {
1604 Ok(create_test_report("ORDER-1"))
1605 } else {
1606 anyhow::bail!("Network error")
1607 }
1608 })
1609 }
1610
1611 fn add_instrument(&self, _instrument: InstrumentAny) {}
1612 }
1613
1614 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1615
1616 let transports = vec![
1617 TransportClient::new(
1618 CaptureAndFailExecutor {
1619 captured_ids: Arc::clone(&captured_ids),
1620 should_succeed: false,
1621 },
1622 "client-0".to_string(),
1623 ),
1624 TransportClient::new(
1625 CaptureAndFailExecutor {
1626 captured_ids: Arc::clone(&captured_ids),
1627 should_succeed: true,
1628 },
1629 "client-1".to_string(),
1630 ),
1631 ];
1632
1633 let config = SubmitBroadcasterConfig::default();
1634 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1635
1636 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1637 let result = broadcaster
1638 .broadcast_submit(
1639 instrument_id,
1640 ClientOrderId::from("O-456"),
1641 OrderSide::Sell,
1642 OrderType::Market,
1643 Quantity::new(50.0, 0),
1644 TimeInForce::Ioc,
1645 None,
1646 None,
1647 None,
1648 None,
1649 false,
1650 false,
1651 None,
1652 None,
1653 None,
1654 )
1655 .await;
1656
1657 assert!(result.is_ok());
1658
1659 let ids = captured_ids.lock().unwrap();
1661 assert_eq!(ids.len(), 2);
1662 assert_eq!(ids[0], "O-456"); assert_eq!(ids[1], "O-456-1"); }
1665
1666 #[tokio::test]
1667 async fn test_proxy_urls_populated_from_config() {
1668 let config = SubmitBroadcasterConfig {
1669 pool_size: 3,
1670 api_key: Some("test_key".to_string()),
1671 api_secret: Some("test_secret".to_string()),
1672 proxy_urls: vec![
1673 Some("http://proxy1:8080".to_string()),
1674 Some("http://proxy2:8080".to_string()),
1675 Some("http://proxy3:8080".to_string()),
1676 ],
1677 ..Default::default()
1678 };
1679
1680 assert_eq!(config.proxy_urls.len(), 3);
1681 assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1682 assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1683 assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1684 }
1685}