1use std::{
37 fmt::Debug,
38 future::Future,
39 pin::Pin,
40 sync::{
41 Arc,
42 atomic::{AtomicBool, AtomicU64, Ordering},
43 },
44 time::Duration,
45};
46
47use futures_util::future;
48use nautilus_model::{
49 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
50 identifiers::{ClientOrderId, InstrumentId, OrderListId},
51 instruments::InstrumentAny,
52 reports::OrderStatusReport,
53 types::{Price, Quantity},
54};
55use tokio::{sync::RwLock, task::JoinHandle, time::interval};
56
57use crate::{common::consts::BITMEX_HTTP_TESTNET_URL, http::client::BitmexHttpClient};
58
59trait SubmitExecutor: Send + Sync {
81 fn add_instrument(&self, instrument: InstrumentAny);
83
84 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
86
87 #[allow(clippy::too_many_arguments)]
89 fn submit_order(
90 &self,
91 instrument_id: InstrumentId,
92 client_order_id: ClientOrderId,
93 order_side: OrderSide,
94 order_type: OrderType,
95 quantity: Quantity,
96 time_in_force: TimeInForce,
97 price: Option<Price>,
98 trigger_price: Option<Price>,
99 trigger_type: Option<TriggerType>,
100 display_qty: Option<Quantity>,
101 post_only: bool,
102 reduce_only: bool,
103 order_list_id: Option<OrderListId>,
104 contingency_type: Option<ContingencyType>,
105 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
106}
107
108impl SubmitExecutor for BitmexHttpClient {
109 fn add_instrument(&self, instrument: InstrumentAny) {
110 Self::cache_instrument(self, instrument);
111 }
112
113 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
114 Box::pin(async move {
115 Self::get_server_time(self)
116 .await
117 .map(|_| ())
118 .map_err(|e| anyhow::anyhow!("{e}"))
119 })
120 }
121
122 #[allow(clippy::too_many_arguments)]
123 fn submit_order(
124 &self,
125 instrument_id: InstrumentId,
126 client_order_id: ClientOrderId,
127 order_side: OrderSide,
128 order_type: OrderType,
129 quantity: Quantity,
130 time_in_force: TimeInForce,
131 price: Option<Price>,
132 trigger_price: Option<Price>,
133 trigger_type: Option<TriggerType>,
134 display_qty: Option<Quantity>,
135 post_only: bool,
136 reduce_only: bool,
137 order_list_id: Option<OrderListId>,
138 contingency_type: Option<ContingencyType>,
139 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
140 Box::pin(async move {
141 Self::submit_order(
142 self,
143 instrument_id,
144 client_order_id,
145 order_side,
146 order_type,
147 quantity,
148 time_in_force,
149 price,
150 trigger_price,
151 trigger_type,
152 display_qty,
153 post_only,
154 reduce_only,
155 order_list_id,
156 contingency_type,
157 )
158 .await
159 })
160 }
161}
162
163#[derive(Debug, Clone)]
165pub struct SubmitBroadcasterConfig {
166 pub pool_size: usize,
168 pub api_key: Option<String>,
170 pub api_secret: Option<String>,
172 pub base_url: Option<String>,
174 pub testnet: bool,
176 pub timeout_secs: Option<u64>,
178 pub max_retries: Option<u32>,
180 pub retry_delay_ms: Option<u64>,
182 pub retry_delay_max_ms: Option<u64>,
184 pub recv_window_ms: Option<u64>,
186 pub max_requests_per_second: Option<u32>,
188 pub max_requests_per_minute: Option<u32>,
190 pub health_check_interval_secs: u64,
192 pub health_check_timeout_secs: u64,
194 pub expected_reject_patterns: Vec<String>,
196 pub proxy_urls: Vec<Option<String>>,
202}
203
204impl Default for SubmitBroadcasterConfig {
205 fn default() -> Self {
206 Self {
207 pool_size: 3,
208 api_key: None,
209 api_secret: None,
210 base_url: None,
211 testnet: false,
212 timeout_secs: Some(60),
213 max_retries: None,
214 retry_delay_ms: Some(1_000),
215 retry_delay_max_ms: Some(5_000),
216 recv_window_ms: Some(10_000),
217 max_requests_per_second: Some(10),
218 max_requests_per_minute: Some(120),
219 health_check_interval_secs: 30,
220 health_check_timeout_secs: 5,
221 expected_reject_patterns: vec![r"Duplicate clOrdID".to_string()],
222 proxy_urls: vec![],
223 }
224 }
225}
226
227#[derive(Clone)]
229struct TransportClient {
230 executor: Arc<dyn SubmitExecutor>,
235 client_id: String,
236 healthy: Arc<AtomicBool>,
237 submit_count: Arc<AtomicU64>,
238 error_count: Arc<AtomicU64>,
239}
240
241impl Debug for TransportClient {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 f.debug_struct("TransportClient")
244 .field("client_id", &self.client_id)
245 .field("healthy", &self.healthy)
246 .field("submit_count", &self.submit_count)
247 .field("error_count", &self.error_count)
248 .finish()
249 }
250}
251
252impl TransportClient {
253 fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
254 Self {
255 executor: Arc::new(executor),
256 client_id,
257 healthy: Arc::new(AtomicBool::new(true)),
258 submit_count: Arc::new(AtomicU64::new(0)),
259 error_count: Arc::new(AtomicU64::new(0)),
260 }
261 }
262
263 fn is_healthy(&self) -> bool {
264 self.healthy.load(Ordering::Relaxed)
265 }
266
267 fn mark_healthy(&self) {
268 self.healthy.store(true, Ordering::Relaxed);
269 }
270
271 fn mark_unhealthy(&self) {
272 self.healthy.store(false, Ordering::Relaxed);
273 }
274
275 fn get_submit_count(&self) -> u64 {
276 self.submit_count.load(Ordering::Relaxed)
277 }
278
279 fn get_error_count(&self) -> u64 {
280 self.error_count.load(Ordering::Relaxed)
281 }
282
283 async fn health_check(&self, timeout_secs: u64) -> bool {
284 match tokio::time::timeout(
285 Duration::from_secs(timeout_secs),
286 self.executor.health_check(),
287 )
288 .await
289 {
290 Ok(Ok(_)) => {
291 self.mark_healthy();
292 true
293 }
294 Ok(Err(e)) => {
295 tracing::warn!("Health check failed for client {}: {e:?}", self.client_id);
296 self.mark_unhealthy();
297 false
298 }
299 Err(_) => {
300 tracing::warn!("Health check timeout for client {}", self.client_id);
301 self.mark_unhealthy();
302 false
303 }
304 }
305 }
306
307 #[allow(clippy::too_many_arguments)]
308 async fn submit_order(
309 &self,
310 instrument_id: InstrumentId,
311 client_order_id: ClientOrderId,
312 order_side: OrderSide,
313 order_type: OrderType,
314 quantity: Quantity,
315 time_in_force: TimeInForce,
316 price: Option<Price>,
317 trigger_price: Option<Price>,
318 trigger_type: Option<TriggerType>,
319 display_qty: Option<Quantity>,
320 post_only: bool,
321 reduce_only: bool,
322 order_list_id: Option<OrderListId>,
323 contingency_type: Option<ContingencyType>,
324 ) -> anyhow::Result<OrderStatusReport> {
325 self.submit_count.fetch_add(1, Ordering::Relaxed);
326
327 match self
328 .executor
329 .submit_order(
330 instrument_id,
331 client_order_id,
332 order_side,
333 order_type,
334 quantity,
335 time_in_force,
336 price,
337 trigger_price,
338 trigger_type,
339 display_qty,
340 post_only,
341 reduce_only,
342 order_list_id,
343 contingency_type,
344 )
345 .await
346 {
347 Ok(report) => {
348 self.mark_healthy();
349 Ok(report)
350 }
351 Err(e) => {
352 self.error_count.fetch_add(1, Ordering::Relaxed);
353 Err(e)
354 }
355 }
356 }
357}
358
359#[cfg_attr(feature = "python", pyo3::pyclass)]
365#[derive(Debug)]
366pub struct SubmitBroadcaster {
367 config: SubmitBroadcasterConfig,
368 transports: Arc<Vec<TransportClient>>,
369 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
370 running: Arc<AtomicBool>,
371 total_submits: Arc<AtomicU64>,
372 successful_submits: Arc<AtomicU64>,
373 failed_submits: Arc<AtomicU64>,
374 expected_rejects: Arc<AtomicU64>,
375}
376
377impl SubmitBroadcaster {
378 pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
384 let mut transports = Vec::with_capacity(config.pool_size);
385
386 let base_url = if config.testnet && config.base_url.is_none() {
388 Some(BITMEX_HTTP_TESTNET_URL.to_string())
389 } else {
390 config.base_url.clone()
391 };
392
393 for i in 0..config.pool_size {
394 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
396
397 let client = BitmexHttpClient::with_credentials(
398 config.api_key.clone(),
399 config.api_secret.clone(),
400 base_url.clone(),
401 config.timeout_secs,
402 config.max_retries,
403 config.retry_delay_ms,
404 config.retry_delay_max_ms,
405 config.recv_window_ms,
406 config.max_requests_per_second,
407 config.max_requests_per_minute,
408 proxy_url,
409 )
410 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
411
412 transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
413 }
414
415 Ok(Self {
416 config,
417 transports: Arc::new(transports),
418 health_check_task: Arc::new(RwLock::new(None)),
419 running: Arc::new(AtomicBool::new(false)),
420 total_submits: Arc::new(AtomicU64::new(0)),
421 successful_submits: Arc::new(AtomicU64::new(0)),
422 failed_submits: Arc::new(AtomicU64::new(0)),
423 expected_rejects: Arc::new(AtomicU64::new(0)),
424 })
425 }
426
427 pub async fn start(&self) -> anyhow::Result<()> {
433 if self.running.load(Ordering::Relaxed) {
434 return Ok(());
435 }
436
437 self.running.store(true, Ordering::Relaxed);
438
439 self.run_health_checks().await;
441
442 let transports = Arc::clone(&self.transports);
444 let running = Arc::clone(&self.running);
445 let interval_secs = self.config.health_check_interval_secs;
446 let timeout_secs = self.config.health_check_timeout_secs;
447
448 let task = tokio::spawn(async move {
449 let mut ticker = interval(Duration::from_secs(interval_secs));
450 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
451
452 loop {
453 ticker.tick().await;
454
455 if !running.load(Ordering::Relaxed) {
456 break;
457 }
458
459 let tasks: Vec<_> = transports
460 .iter()
461 .map(|t| t.health_check(timeout_secs))
462 .collect();
463
464 let results = future::join_all(tasks).await;
465 let healthy_count = results.iter().filter(|&&r| r).count();
466
467 tracing::debug!(
468 "Health check complete: {healthy_count}/{} clients healthy",
469 results.len()
470 );
471 }
472 });
473
474 *self.health_check_task.write().await = Some(task);
475
476 tracing::info!(
477 "SubmitBroadcaster started with {} clients",
478 self.transports.len()
479 );
480
481 Ok(())
482 }
483
484 pub async fn stop(&self) {
486 if !self.running.load(Ordering::Relaxed) {
487 return;
488 }
489
490 self.running.store(false, Ordering::Relaxed);
491
492 if let Some(task) = self.health_check_task.write().await.take() {
493 task.abort();
494 }
495
496 tracing::info!("SubmitBroadcaster stopped");
497 }
498
499 async fn run_health_checks(&self) {
500 let tasks: Vec<_> = self
501 .transports
502 .iter()
503 .map(|t| t.health_check(self.config.health_check_timeout_secs))
504 .collect();
505
506 let results = future::join_all(tasks).await;
507 let healthy_count = results.iter().filter(|&&r| r).count();
508
509 tracing::debug!(
510 "Health check complete: {healthy_count}/{} clients healthy",
511 results.len()
512 );
513 }
514
515 fn is_expected_reject(&self, error_message: &str) -> bool {
516 self.config
517 .expected_reject_patterns
518 .iter()
519 .any(|pattern| error_message.contains(pattern))
520 }
521
522 async fn process_submit_results<T>(
526 &self,
527 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
528 operation: &str,
529 params: String,
530 ) -> anyhow::Result<T>
531 where
532 T: Send + 'static,
533 {
534 let mut errors = Vec::new();
535
536 while !handles.is_empty() {
537 let current_handles = std::mem::take(&mut handles);
538 let (result, _idx, remaining) = future::select_all(current_handles).await;
539 handles = remaining.into_iter().collect();
540
541 match result {
542 Ok((client_id, Ok(result))) => {
543 for handle in &handles {
545 handle.abort();
546 }
547 self.successful_submits.fetch_add(1, Ordering::Relaxed);
548 tracing::debug!("{} broadcast succeeded [{client_id}] {params}", operation,);
549 return Ok(result);
550 }
551 Ok((client_id, Err(e))) => {
552 let error_msg = e.to_string();
553
554 if self.is_expected_reject(&error_msg) {
555 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
556 tracing::debug!(
557 "Expected {} rejection [{client_id}]: {error_msg} {params}",
558 operation.to_lowercase(),
559 );
560 errors.push(error_msg);
561 } else {
562 tracing::warn!(
563 "{} request failed [{client_id}]: {error_msg} {params}",
564 operation,
565 );
566 errors.push(error_msg);
567 }
568 }
569 Err(e) => {
570 tracing::warn!("{} task join error: {e:?}", operation);
571 errors.push(format!("Task panicked: {e:?}"));
572 }
573 }
574 }
575
576 self.failed_submits.fetch_add(1, Ordering::Relaxed);
578 tracing::error!(
579 "All {} requests failed: {errors:?} {params}",
580 operation.to_lowercase(),
581 );
582 Err(anyhow::anyhow!(
583 "All {} requests failed: {:?}",
584 operation.to_lowercase(),
585 errors
586 ))
587 }
588
589 #[allow(clippy::too_many_arguments)]
600 pub async fn broadcast_submit(
601 &self,
602 instrument_id: InstrumentId,
603 client_order_id: ClientOrderId,
604 order_side: OrderSide,
605 order_type: OrderType,
606 quantity: Quantity,
607 time_in_force: TimeInForce,
608 price: Option<Price>,
609 trigger_price: Option<Price>,
610 trigger_type: Option<TriggerType>,
611 display_qty: Option<Quantity>,
612 post_only: bool,
613 reduce_only: bool,
614 order_list_id: Option<OrderListId>,
615 contingency_type: Option<ContingencyType>,
616 submit_tries: Option<usize>,
617 ) -> anyhow::Result<OrderStatusReport> {
618 self.total_submits.fetch_add(1, Ordering::Relaxed);
619
620 let pool_size = self.config.pool_size;
621 let actual_tries = if let Some(t) = submit_tries {
622 if t > pool_size {
623 log::warn!(
625 "submit_tries={} exceeds pool_size={}, capping at pool_size",
626 t,
627 pool_size
628 );
629 }
630 std::cmp::min(t, pool_size)
631 } else {
632 pool_size
633 };
634
635 tracing::debug!(
636 "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
637 );
638
639 let healthy_transports: Vec<TransportClient> = self
640 .transports
641 .iter()
642 .filter(|t| t.is_healthy())
643 .take(actual_tries)
644 .cloned()
645 .collect();
646
647 if healthy_transports.is_empty() {
648 self.failed_submits.fetch_add(1, Ordering::Relaxed);
649 anyhow::bail!("No healthy transport clients available");
650 }
651
652 tracing::debug!(
653 "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
654 healthy_transports.len(),
655 );
656
657 let mut handles = Vec::new();
658 for (idx, transport) in healthy_transports.into_iter().enumerate() {
659 let modified_client_order_id = if idx == 0 {
661 client_order_id
662 } else {
663 ClientOrderId::new(format!("{}-{}", client_order_id.as_str(), idx))
664 };
665
666 let handle = tokio::spawn(async move {
667 let client_id = transport.client_id.clone();
668 let result = transport
669 .submit_order(
670 instrument_id,
671 modified_client_order_id,
672 order_side,
673 order_type,
674 quantity,
675 time_in_force,
676 price,
677 trigger_price,
678 trigger_type,
679 display_qty,
680 post_only,
681 reduce_only,
682 order_list_id,
683 contingency_type,
684 )
685 .await;
686 (client_id, result)
687 });
688 handles.push(handle);
689 }
690
691 self.process_submit_results(
692 handles,
693 "Submit",
694 format!("(client_order_id={:?})", client_order_id),
695 )
696 .await
697 }
698
699 pub fn get_metrics(&self) -> BroadcasterMetrics {
701 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
702 let total_clients = self.transports.len();
703
704 BroadcasterMetrics {
705 total_submits: self.total_submits.load(Ordering::Relaxed),
706 successful_submits: self.successful_submits.load(Ordering::Relaxed),
707 failed_submits: self.failed_submits.load(Ordering::Relaxed),
708 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
709 healthy_clients,
710 total_clients,
711 }
712 }
713
714 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
716 self.get_metrics()
717 }
718
719 pub fn get_client_stats(&self) -> Vec<ClientStats> {
721 self.transports
722 .iter()
723 .map(|t| ClientStats {
724 client_id: t.client_id.clone(),
725 healthy: t.is_healthy(),
726 submit_count: t.get_submit_count(),
727 error_count: t.get_error_count(),
728 })
729 .collect()
730 }
731
732 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
734 self.get_client_stats()
735 }
736
737 pub fn cache_instrument(&self, instrument: InstrumentAny) {
739 for transport in self.transports.iter() {
740 transport.executor.add_instrument(instrument.clone());
741 }
742 }
743
744 pub fn clone_for_async(&self) -> Self {
745 Self {
746 config: self.config.clone(),
747 transports: Arc::clone(&self.transports),
748 health_check_task: Arc::clone(&self.health_check_task),
749 running: Arc::clone(&self.running),
750 total_submits: Arc::clone(&self.total_submits),
751 successful_submits: Arc::clone(&self.successful_submits),
752 failed_submits: Arc::clone(&self.failed_submits),
753 expected_rejects: Arc::clone(&self.expected_rejects),
754 }
755 }
756
757 #[cfg(test)]
758 fn new_with_transports(
759 config: SubmitBroadcasterConfig,
760 transports: Vec<TransportClient>,
761 ) -> Self {
762 Self {
763 config,
764 transports: Arc::new(transports),
765 health_check_task: Arc::new(RwLock::new(None)),
766 running: Arc::new(AtomicBool::new(false)),
767 total_submits: Arc::new(AtomicU64::new(0)),
768 successful_submits: Arc::new(AtomicU64::new(0)),
769 failed_submits: Arc::new(AtomicU64::new(0)),
770 expected_rejects: Arc::new(AtomicU64::new(0)),
771 }
772 }
773}
774
775#[derive(Debug, Clone)]
777pub struct BroadcasterMetrics {
778 pub total_submits: u64,
779 pub successful_submits: u64,
780 pub failed_submits: u64,
781 pub expected_rejects: u64,
782 pub healthy_clients: usize,
783 pub total_clients: usize,
784}
785
786#[derive(Debug, Clone)]
788pub struct ClientStats {
789 pub client_id: String,
790 pub healthy: bool,
791 pub submit_count: u64,
792 pub error_count: u64,
793}
794
795#[cfg(test)]
800mod tests {
801 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
802
803 use nautilus_core::UUID4;
804 use nautilus_model::{
805 enums::{
806 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
807 },
808 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
809 reports::OrderStatusReport,
810 types::{Price, Quantity},
811 };
812
813 use super::*;
814
815 #[derive(Clone)]
817 #[allow(clippy::type_complexity)]
818 struct MockExecutor {
819 handler: Arc<
820 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
821 + Send
822 + Sync,
823 >,
824 }
825
826 impl MockExecutor {
827 fn new<F, Fut>(handler: F) -> Self
828 where
829 F: Fn() -> Fut + Send + Sync + 'static,
830 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
831 {
832 Self {
833 handler: Arc::new(move || Box::pin(handler())),
834 }
835 }
836 }
837
838 impl SubmitExecutor for MockExecutor {
839 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
840 Box::pin(async { Ok(()) })
841 }
842
843 #[allow(clippy::too_many_arguments)]
844 fn submit_order(
845 &self,
846 _instrument_id: InstrumentId,
847 _client_order_id: ClientOrderId,
848 _order_side: OrderSide,
849 _order_type: OrderType,
850 _quantity: Quantity,
851 _time_in_force: TimeInForce,
852 _price: Option<Price>,
853 _trigger_price: Option<Price>,
854 _trigger_type: Option<TriggerType>,
855 _display_qty: Option<Quantity>,
856 _post_only: bool,
857 _reduce_only: bool,
858 _order_list_id: Option<OrderListId>,
859 _contingency_type: Option<ContingencyType>,
860 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
861 (self.handler)()
862 }
863
864 fn add_instrument(&self, _instrument: InstrumentAny) {
865 }
867 }
868
869 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
870 OrderStatusReport {
871 account_id: AccountId::from("BITMEX-001"),
872 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
873 venue_order_id: VenueOrderId::from(venue_order_id),
874 order_side: OrderSide::Buy,
875 order_type: OrderType::Limit,
876 time_in_force: TimeInForce::Gtc,
877 order_status: OrderStatus::Accepted,
878 price: Some(Price::new(50000.0, 2)),
879 quantity: Quantity::new(100.0, 0),
880 filled_qty: Quantity::new(0.0, 0),
881 report_id: UUID4::new(),
882 ts_accepted: 0.into(),
883 ts_last: 0.into(),
884 ts_init: 0.into(),
885 client_order_id: None,
886 avg_px: None,
887 trigger_price: None,
888 trigger_type: None,
889 contingency_type: ContingencyType::NoContingency,
890 expire_time: None,
891 order_list_id: None,
892 venue_position_id: None,
893 linked_order_ids: None,
894 parent_order_id: None,
895 display_qty: None,
896 limit_offset: None,
897 trailing_offset: None,
898 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
899 post_only: false,
900 reduce_only: false,
901 cancel_reason: None,
902 ts_triggered: None,
903 }
904 }
905
906 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
907 where
908 F: Fn() -> Fut + Send + Sync + 'static,
909 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
910 {
911 let executor = MockExecutor::new(handler);
912 TransportClient::new(executor, client_id.to_string())
913 }
914
915 #[tokio::test]
916 async fn test_broadcast_submit_immediate_success() {
917 let report = create_test_report("ORDER-1");
918 let report_clone = report.clone();
919
920 let transports = vec![
921 create_stub_transport("client-0", move || {
922 let report = report_clone.clone();
923 async move { Ok(report) }
924 }),
925 create_stub_transport("client-1", || async {
926 tokio::time::sleep(Duration::from_secs(10)).await;
927 anyhow::bail!("Should be aborted")
928 }),
929 ];
930
931 let config = SubmitBroadcasterConfig::default();
932 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
933
934 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
935 let result = broadcaster
936 .broadcast_submit(
937 instrument_id,
938 ClientOrderId::from("O-123"),
939 OrderSide::Buy,
940 OrderType::Limit,
941 Quantity::new(100.0, 0),
942 TimeInForce::Gtc,
943 Some(Price::new(50000.0, 2)),
944 None,
945 None,
946 None,
947 false,
948 false,
949 None,
950 None,
951 None,
952 )
953 .await;
954
955 assert!(result.is_ok());
956 let returned_report = result.unwrap();
957 assert_eq!(returned_report.venue_order_id, report.venue_order_id);
958
959 let metrics = broadcaster.get_metrics_async().await;
960 assert_eq!(metrics.successful_submits, 1);
961 assert_eq!(metrics.failed_submits, 0);
962 assert_eq!(metrics.total_submits, 1);
963 }
964
965 #[tokio::test]
966 async fn test_broadcast_submit_duplicate_clordid_expected() {
967 let transports = vec![
968 create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
969 create_stub_transport("client-1", || async {
970 tokio::time::sleep(Duration::from_secs(10)).await;
971 anyhow::bail!("Should be aborted")
972 }),
973 ];
974
975 let config = SubmitBroadcasterConfig::default();
976 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
977
978 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
979 let result = broadcaster
980 .broadcast_submit(
981 instrument_id,
982 ClientOrderId::from("O-123"),
983 OrderSide::Buy,
984 OrderType::Limit,
985 Quantity::new(100.0, 0),
986 TimeInForce::Gtc,
987 Some(Price::new(50000.0, 2)),
988 None,
989 None,
990 None,
991 false,
992 false,
993 None,
994 None,
995 None,
996 )
997 .await;
998
999 assert!(result.is_err());
1000
1001 let metrics = broadcaster.get_metrics_async().await;
1002 assert_eq!(metrics.expected_rejects, 1);
1003 assert_eq!(metrics.successful_submits, 0);
1004 assert_eq!(metrics.failed_submits, 1);
1005 }
1006
1007 #[tokio::test]
1008 async fn test_broadcast_submit_all_failures() {
1009 let transports = vec![
1010 create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1011 create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1012 ];
1013
1014 let config = SubmitBroadcasterConfig::default();
1015 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1016
1017 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1018 let result = broadcaster
1019 .broadcast_submit(
1020 instrument_id,
1021 ClientOrderId::from("O-456"),
1022 OrderSide::Sell,
1023 OrderType::Market,
1024 Quantity::new(50.0, 0),
1025 TimeInForce::Ioc,
1026 None,
1027 None,
1028 None,
1029 None,
1030 false,
1031 false,
1032 None,
1033 None,
1034 None,
1035 )
1036 .await;
1037
1038 assert!(result.is_err());
1039 assert!(
1040 result
1041 .unwrap_err()
1042 .to_string()
1043 .contains("All submit requests failed")
1044 );
1045
1046 let metrics = broadcaster.get_metrics_async().await;
1047 assert_eq!(metrics.failed_submits, 1);
1048 assert_eq!(metrics.successful_submits, 0);
1049 }
1050
1051 #[tokio::test]
1052 async fn test_broadcast_submit_no_healthy_clients() {
1053 let transport =
1054 create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1055 transport.healthy.store(false, Ordering::Relaxed);
1056
1057 let config = SubmitBroadcasterConfig::default();
1058 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1059
1060 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1061 let result = broadcaster
1062 .broadcast_submit(
1063 instrument_id,
1064 ClientOrderId::from("O-789"),
1065 OrderSide::Buy,
1066 OrderType::Limit,
1067 Quantity::new(100.0, 0),
1068 TimeInForce::Gtc,
1069 Some(Price::new(50000.0, 2)),
1070 None,
1071 None,
1072 None,
1073 false,
1074 false,
1075 None,
1076 None,
1077 None,
1078 )
1079 .await;
1080
1081 assert!(result.is_err());
1082 assert!(
1083 result
1084 .unwrap_err()
1085 .to_string()
1086 .contains("No healthy transport clients available")
1087 );
1088
1089 let metrics = broadcaster.get_metrics_async().await;
1090 assert_eq!(metrics.failed_submits, 1);
1091 }
1092
1093 #[tokio::test]
1094 async fn test_default_config() {
1095 let config = SubmitBroadcasterConfig {
1096 api_key: Some("test_key".to_string()),
1097 api_secret: Some("test_secret".to_string()),
1098 base_url: Some("https://test.example.com".to_string()),
1099 ..Default::default()
1100 };
1101
1102 let broadcaster = SubmitBroadcaster::new(config);
1103 assert!(broadcaster.is_ok());
1104
1105 let broadcaster = broadcaster.unwrap();
1106 let metrics = broadcaster.get_metrics_async().await;
1107
1108 assert_eq!(metrics.total_clients, 3);
1110 }
1111
1112 #[tokio::test]
1113 async fn test_broadcaster_lifecycle() {
1114 let config = SubmitBroadcasterConfig {
1115 pool_size: 2,
1116 api_key: Some("test_key".to_string()),
1117 api_secret: Some("test_secret".to_string()),
1118 base_url: Some("https://test.example.com".to_string()),
1119 testnet: false,
1120 timeout_secs: Some(5),
1121 max_retries: None,
1122 retry_delay_ms: None,
1123 retry_delay_max_ms: None,
1124 recv_window_ms: None,
1125 max_requests_per_second: None,
1126 max_requests_per_minute: None,
1127 health_check_interval_secs: 60,
1128 health_check_timeout_secs: 1,
1129 expected_reject_patterns: vec![],
1130 proxy_urls: vec![],
1131 };
1132
1133 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1134
1135 assert!(!broadcaster.running.load(Ordering::Relaxed));
1137
1138 let start_result = broadcaster.start().await;
1140 assert!(start_result.is_ok());
1141 assert!(broadcaster.running.load(Ordering::Relaxed));
1142
1143 let start_again = broadcaster.start().await;
1145 assert!(start_again.is_ok());
1146
1147 broadcaster.stop().await;
1149 assert!(!broadcaster.running.load(Ordering::Relaxed));
1150
1151 broadcaster.stop().await;
1153 assert!(!broadcaster.running.load(Ordering::Relaxed));
1154 }
1155
1156 #[tokio::test]
1157 async fn test_broadcast_submit_metrics_increment() {
1158 let report = create_test_report("ORDER-1");
1159 let report_clone = report.clone();
1160
1161 let transports = vec![create_stub_transport("client-0", move || {
1162 let report = report_clone.clone();
1163 async move { Ok(report) }
1164 })];
1165
1166 let config = SubmitBroadcasterConfig::default();
1167 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1168
1169 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1170 let _ = broadcaster
1171 .broadcast_submit(
1172 instrument_id,
1173 ClientOrderId::from("O-123"),
1174 OrderSide::Buy,
1175 OrderType::Limit,
1176 Quantity::new(100.0, 0),
1177 TimeInForce::Gtc,
1178 Some(Price::new(50000.0, 2)),
1179 None,
1180 None,
1181 None,
1182 false,
1183 false,
1184 None,
1185 None,
1186 None,
1187 )
1188 .await;
1189
1190 let metrics = broadcaster.get_metrics_async().await;
1191 assert_eq!(metrics.total_submits, 1);
1192 assert_eq!(metrics.successful_submits, 1);
1193 assert_eq!(metrics.failed_submits, 0);
1194 }
1195
1196 #[tokio::test]
1197 async fn test_broadcaster_creation_with_pool() {
1198 let config = SubmitBroadcasterConfig {
1199 pool_size: 4,
1200 api_key: Some("test_key".to_string()),
1201 api_secret: Some("test_secret".to_string()),
1202 base_url: Some("https://test.example.com".to_string()),
1203 ..Default::default()
1204 };
1205
1206 let broadcaster = SubmitBroadcaster::new(config);
1207 assert!(broadcaster.is_ok());
1208
1209 let broadcaster = broadcaster.unwrap();
1210 let metrics = broadcaster.get_metrics_async().await;
1211 assert_eq!(metrics.total_clients, 4);
1212 }
1213
1214 #[tokio::test]
1215 async fn test_client_stats_collection() {
1216 let report = create_test_report("ORDER-1");
1217 let report_clone = report.clone();
1218
1219 let transports = vec![
1220 create_stub_transport("client-0", move || {
1221 let report = report_clone.clone();
1222 async move { Ok(report) }
1223 }),
1224 create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1225 ];
1226
1227 let config = SubmitBroadcasterConfig::default();
1228 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1229
1230 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1231 let _ = broadcaster
1232 .broadcast_submit(
1233 instrument_id,
1234 ClientOrderId::from("O-123"),
1235 OrderSide::Buy,
1236 OrderType::Limit,
1237 Quantity::new(100.0, 0),
1238 TimeInForce::Gtc,
1239 Some(Price::new(50000.0, 2)),
1240 None,
1241 None,
1242 None,
1243 false,
1244 false,
1245 None,
1246 None,
1247 None,
1248 )
1249 .await;
1250
1251 let stats = broadcaster.get_client_stats_async().await;
1252 assert_eq!(stats.len(), 2);
1253
1254 let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1255 assert_eq!(client0.submit_count, 1);
1256 assert_eq!(client0.error_count, 0);
1257
1258 let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1259 assert_eq!(client1.submit_count, 1);
1260 assert_eq!(client1.error_count, 1);
1261 }
1262
1263 #[tokio::test]
1264 async fn test_testnet_config_sets_base_url() {
1265 let config = SubmitBroadcasterConfig {
1266 pool_size: 1,
1267 api_key: Some("test_key".to_string()),
1268 api_secret: Some("test_secret".to_string()),
1269 testnet: true,
1270 base_url: None,
1271 ..Default::default()
1272 };
1273
1274 let broadcaster = SubmitBroadcaster::new(config);
1275 assert!(broadcaster.is_ok());
1276 }
1277
1278 #[tokio::test]
1279 async fn test_clone_for_async() {
1280 let config = SubmitBroadcasterConfig {
1281 pool_size: 1,
1282 api_key: Some("test_key".to_string()),
1283 api_secret: Some("test_secret".to_string()),
1284 base_url: Some("https://test.example.com".to_string()),
1285 ..Default::default()
1286 };
1287
1288 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1289 let cloned = broadcaster.clone_for_async();
1290
1291 broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1293 assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1294 }
1295
1296 #[tokio::test]
1297 async fn test_pattern_matching() {
1298 let config = SubmitBroadcasterConfig {
1299 expected_reject_patterns: vec![
1300 "Duplicate clOrdID".to_string(),
1301 "Order already exists".to_string(),
1302 ],
1303 ..Default::default()
1304 };
1305
1306 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1307
1308 assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1309 assert!(broadcaster.is_expected_reject("Order already exists in system"));
1310 assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1311 assert!(!broadcaster.is_expected_reject("Internal server error"));
1312 }
1313
1314 #[tokio::test]
1315 async fn test_submit_metrics_with_mixed_responses() {
1316 let report = create_test_report("ORDER-1");
1317 let report_clone = report.clone();
1318
1319 let transports = vec![
1320 create_stub_transport("client-0", move || {
1321 let report = report_clone.clone();
1322 async move { Ok(report) }
1323 }),
1324 create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1325 ];
1326
1327 let config = SubmitBroadcasterConfig::default();
1328 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1329
1330 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1331 let result = broadcaster
1332 .broadcast_submit(
1333 instrument_id,
1334 ClientOrderId::from("O-123"),
1335 OrderSide::Buy,
1336 OrderType::Limit,
1337 Quantity::new(100.0, 0),
1338 TimeInForce::Gtc,
1339 Some(Price::new(50000.0, 2)),
1340 None,
1341 None,
1342 None,
1343 false,
1344 false,
1345 None,
1346 None,
1347 None,
1348 )
1349 .await;
1350
1351 assert!(result.is_ok());
1352
1353 let metrics = broadcaster.get_metrics_async().await;
1354 assert_eq!(metrics.total_submits, 1);
1355 assert_eq!(metrics.successful_submits, 1);
1356 assert_eq!(metrics.failed_submits, 0);
1357 }
1358
1359 #[tokio::test]
1360 async fn test_metrics_initialization_and_health() {
1361 let config = SubmitBroadcasterConfig {
1362 pool_size: 2,
1363 api_key: Some("test_key".to_string()),
1364 api_secret: Some("test_secret".to_string()),
1365 base_url: Some("https://test.example.com".to_string()),
1366 ..Default::default()
1367 };
1368
1369 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1370 let metrics = broadcaster.get_metrics_async().await;
1371
1372 assert_eq!(metrics.total_submits, 0);
1373 assert_eq!(metrics.successful_submits, 0);
1374 assert_eq!(metrics.failed_submits, 0);
1375 assert_eq!(metrics.expected_rejects, 0);
1376 assert_eq!(metrics.total_clients, 2);
1377 assert_eq!(metrics.healthy_clients, 2);
1378 }
1379
1380 #[tokio::test]
1381 async fn test_health_check_task_lifecycle() {
1382 let config = SubmitBroadcasterConfig {
1383 pool_size: 2,
1384 api_key: Some("test_key".to_string()),
1385 api_secret: Some("test_secret".to_string()),
1386 base_url: Some("https://test.example.com".to_string()),
1387 health_check_interval_secs: 1,
1388 ..Default::default()
1389 };
1390
1391 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1392
1393 broadcaster.start().await.unwrap();
1395 assert!(broadcaster.running.load(Ordering::Relaxed));
1396 assert!(
1397 broadcaster
1398 .health_check_task
1399 .read()
1400 .await
1401 .as_ref()
1402 .is_some()
1403 );
1404
1405 tokio::time::sleep(Duration::from_millis(100)).await;
1407
1408 broadcaster.stop().await;
1410 assert!(!broadcaster.running.load(Ordering::Relaxed));
1411 }
1412
1413 #[tokio::test]
1414 async fn test_expected_reject_pattern_comprehensive() {
1415 let transports = vec![
1416 create_stub_transport("client-0", || async {
1417 anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1418 }),
1419 create_stub_transport("client-1", || async {
1420 tokio::time::sleep(Duration::from_secs(10)).await;
1421 anyhow::bail!("Should be aborted")
1422 }),
1423 ];
1424
1425 let config = SubmitBroadcasterConfig::default();
1426 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1427
1428 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1429 let result = broadcaster
1430 .broadcast_submit(
1431 instrument_id,
1432 ClientOrderId::from("O-123"),
1433 OrderSide::Buy,
1434 OrderType::Limit,
1435 Quantity::new(100.0, 0),
1436 TimeInForce::Gtc,
1437 Some(Price::new(50000.0, 2)),
1438 None,
1439 None,
1440 None,
1441 false,
1442 false,
1443 None,
1444 None,
1445 None,
1446 )
1447 .await;
1448
1449 assert!(result.is_err());
1451
1452 let metrics = broadcaster.get_metrics_async().await;
1453 assert_eq!(metrics.expected_rejects, 1);
1454 assert_eq!(metrics.failed_submits, 1);
1455 assert_eq!(metrics.successful_submits, 0);
1456 }
1457
1458 #[tokio::test]
1459 async fn test_client_order_id_suffix_for_multiple_clients() {
1460 use std::sync::{Arc, Mutex};
1461
1462 #[derive(Clone)]
1463 struct CaptureExecutor {
1464 captured_ids: Arc<Mutex<Vec<String>>>,
1465 report: OrderStatusReport,
1466 }
1467
1468 impl SubmitExecutor for CaptureExecutor {
1469 fn health_check(
1470 &self,
1471 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1472 Box::pin(async { Ok(()) })
1473 }
1474
1475 #[allow(clippy::too_many_arguments)]
1476 fn submit_order(
1477 &self,
1478 _instrument_id: InstrumentId,
1479 client_order_id: ClientOrderId,
1480 _order_side: OrderSide,
1481 _order_type: OrderType,
1482 _quantity: Quantity,
1483 _time_in_force: TimeInForce,
1484 _price: Option<Price>,
1485 _trigger_price: Option<Price>,
1486 _trigger_type: Option<TriggerType>,
1487 _display_qty: Option<Quantity>,
1488 _post_only: bool,
1489 _reduce_only: bool,
1490 _order_list_id: Option<OrderListId>,
1491 _contingency_type: Option<ContingencyType>,
1492 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1493 {
1494 self.captured_ids
1496 .lock()
1497 .unwrap()
1498 .push(client_order_id.as_str().to_string());
1499 let report = self.report.clone();
1500 Box::pin(async move { Ok(report) })
1501 }
1502
1503 fn add_instrument(&self, _instrument: InstrumentAny) {}
1504 }
1505
1506 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1507 let report = create_test_report("ORDER-1");
1508
1509 let transports = vec![
1510 TransportClient::new(
1511 CaptureExecutor {
1512 captured_ids: Arc::clone(&captured_ids),
1513 report: report.clone(),
1514 },
1515 "client-0".to_string(),
1516 ),
1517 TransportClient::new(
1518 CaptureExecutor {
1519 captured_ids: Arc::clone(&captured_ids),
1520 report: report.clone(),
1521 },
1522 "client-1".to_string(),
1523 ),
1524 TransportClient::new(
1525 CaptureExecutor {
1526 captured_ids: Arc::clone(&captured_ids),
1527 report: report.clone(),
1528 },
1529 "client-2".to_string(),
1530 ),
1531 ];
1532
1533 let config = SubmitBroadcasterConfig::default();
1534 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1535
1536 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1537 let result = broadcaster
1538 .broadcast_submit(
1539 instrument_id,
1540 ClientOrderId::from("O-123"),
1541 OrderSide::Buy,
1542 OrderType::Limit,
1543 Quantity::new(100.0, 0),
1544 TimeInForce::Gtc,
1545 Some(Price::new(50000.0, 2)),
1546 None,
1547 None,
1548 None,
1549 false,
1550 false,
1551 None,
1552 None,
1553 None,
1554 )
1555 .await;
1556
1557 assert!(result.is_ok());
1558
1559 let ids = captured_ids.lock().unwrap();
1561 assert_eq!(ids.len(), 3);
1562 assert_eq!(ids[0], "O-123"); assert_eq!(ids[1], "O-123-1"); assert_eq!(ids[2], "O-123-2"); }
1566
1567 #[tokio::test]
1568 async fn test_client_order_id_suffix_with_partial_failure() {
1569 use std::sync::{Arc, Mutex};
1570
1571 #[derive(Clone)]
1572 struct CaptureAndFailExecutor {
1573 captured_ids: Arc<Mutex<Vec<String>>>,
1574 should_succeed: bool,
1575 }
1576
1577 impl SubmitExecutor for CaptureAndFailExecutor {
1578 fn health_check(
1579 &self,
1580 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1581 Box::pin(async { Ok(()) })
1582 }
1583
1584 #[allow(clippy::too_many_arguments)]
1585 fn submit_order(
1586 &self,
1587 _instrument_id: InstrumentId,
1588 client_order_id: ClientOrderId,
1589 _order_side: OrderSide,
1590 _order_type: OrderType,
1591 _quantity: Quantity,
1592 _time_in_force: TimeInForce,
1593 _price: Option<Price>,
1594 _trigger_price: Option<Price>,
1595 _trigger_type: Option<TriggerType>,
1596 _display_qty: Option<Quantity>,
1597 _post_only: bool,
1598 _reduce_only: bool,
1599 _order_list_id: Option<OrderListId>,
1600 _contingency_type: Option<ContingencyType>,
1601 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1602 {
1603 self.captured_ids
1605 .lock()
1606 .unwrap()
1607 .push(client_order_id.as_str().to_string());
1608 let should_succeed = self.should_succeed;
1609 Box::pin(async move {
1610 if should_succeed {
1611 Ok(create_test_report("ORDER-1"))
1612 } else {
1613 anyhow::bail!("Network error")
1614 }
1615 })
1616 }
1617
1618 fn add_instrument(&self, _instrument: InstrumentAny) {}
1619 }
1620
1621 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1622
1623 let transports = vec![
1624 TransportClient::new(
1625 CaptureAndFailExecutor {
1626 captured_ids: Arc::clone(&captured_ids),
1627 should_succeed: false,
1628 },
1629 "client-0".to_string(),
1630 ),
1631 TransportClient::new(
1632 CaptureAndFailExecutor {
1633 captured_ids: Arc::clone(&captured_ids),
1634 should_succeed: true,
1635 },
1636 "client-1".to_string(),
1637 ),
1638 ];
1639
1640 let config = SubmitBroadcasterConfig::default();
1641 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1642
1643 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1644 let result = broadcaster
1645 .broadcast_submit(
1646 instrument_id,
1647 ClientOrderId::from("O-456"),
1648 OrderSide::Sell,
1649 OrderType::Market,
1650 Quantity::new(50.0, 0),
1651 TimeInForce::Ioc,
1652 None,
1653 None,
1654 None,
1655 None,
1656 false,
1657 false,
1658 None,
1659 None,
1660 None,
1661 )
1662 .await;
1663
1664 assert!(result.is_ok());
1665
1666 let ids = captured_ids.lock().unwrap();
1668 assert_eq!(ids.len(), 2);
1669 assert_eq!(ids[0], "O-456"); assert_eq!(ids[1], "O-456-1"); }
1672
1673 #[tokio::test]
1674 async fn test_proxy_urls_populated_from_config() {
1675 let config = SubmitBroadcasterConfig {
1676 pool_size: 3,
1677 api_key: Some("test_key".to_string()),
1678 api_secret: Some("test_secret".to_string()),
1679 proxy_urls: vec![
1680 Some("http://proxy1:8080".to_string()),
1681 Some("http://proxy2:8080".to_string()),
1682 Some("http://proxy3:8080".to_string()),
1683 ],
1684 ..Default::default()
1685 };
1686
1687 assert_eq!(config.proxy_urls.len(), 3);
1688 assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1689 assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1690 assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1691 }
1692}