1pub mod canceller;
19pub mod submitter;
20
21use std::{any::Any, future::Future, sync::Mutex};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 live::{runner::get_exec_event_sender, runtime::get_runtime},
28 messages::{
29 ExecutionEvent, ExecutionReport,
30 execution::{
31 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34 },
35 },
36 msgbus,
37};
38use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
39use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
40use nautilus_model::{
41 accounts::AccountAny,
42 enums::OmsType,
43 events::{AccountState, OrderEventAny, OrderRejected},
44 identifiers::{AccountId, ClientId, Venue, VenueOrderId},
45 instruments::Instrument,
46 orders::Order,
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48 types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51
52use crate::{
53 config::BitmexExecClientConfig,
54 execution::{
55 canceller::{CancelBroadcaster, CancelBroadcasterConfig},
56 submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
57 },
58 http::client::BitmexHttpClient,
59 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
60};
61
62#[derive(Debug)]
63pub struct BitmexExecutionClient {
64 core: ExecutionClientCore,
65 config: BitmexExecClientConfig,
66 http_client: BitmexHttpClient,
67 ws_client: BitmexWebSocketClient,
68 _submitter: SubmitBroadcaster,
69 _canceller: CancelBroadcaster,
70 started: bool,
71 connected: bool,
72 instruments_initialized: bool,
73 ws_stream_handle: Option<JoinHandle<()>>,
74 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
75}
76
77impl BitmexExecutionClient {
78 pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
84 if !config.has_api_credentials() {
85 anyhow::bail!("BitMEX execution client requires API key and secret");
86 }
87
88 let http_client = BitmexHttpClient::new(
89 Some(config.http_base_url()),
90 config.api_key.clone(),
91 config.api_secret.clone(),
92 config.use_testnet,
93 config.http_timeout_secs,
94 config.max_retries,
95 config.retry_delay_initial_ms,
96 config.retry_delay_max_ms,
97 config.recv_window_ms,
98 config.max_requests_per_second,
99 config.max_requests_per_minute,
100 config.http_proxy_url.clone(),
101 )
102 .context("failed to construct BitMEX HTTP client")?;
103
104 let account_id = config.account_id.unwrap_or(core.account_id);
105 let ws_client = BitmexWebSocketClient::new(
106 Some(config.ws_url()),
107 config.api_key.clone(),
108 config.api_secret.clone(),
109 Some(account_id),
110 config.heartbeat_interval_secs,
111 )
112 .context("failed to construct BitMEX execution websocket client")?;
113
114 let pool_size = config.submitter_pool_size.unwrap_or(1);
115 let submitter_proxy_urls = match &config.submitter_proxy_urls {
116 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
117 None => vec![config.http_proxy_url.clone(); pool_size],
118 };
119
120 let submitter_config = SubmitBroadcasterConfig {
121 pool_size,
122 api_key: config.api_key.clone(),
123 api_secret: config.api_secret.clone(),
124 base_url: config.base_url_http.clone(),
125 testnet: config.use_testnet,
126 timeout_secs: config.http_timeout_secs,
127 max_retries: config.max_retries,
128 retry_delay_ms: config.retry_delay_initial_ms,
129 retry_delay_max_ms: config.retry_delay_max_ms,
130 recv_window_ms: config.recv_window_ms,
131 max_requests_per_second: config.max_requests_per_second,
132 max_requests_per_minute: config.max_requests_per_minute,
133 proxy_urls: submitter_proxy_urls,
134 ..Default::default()
135 };
136
137 let _submitter = SubmitBroadcaster::new(submitter_config)
138 .context("failed to create SubmitBroadcaster")?;
139
140 let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
141 let canceller_proxy_urls = match &config.canceller_proxy_urls {
142 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
143 None => vec![config.http_proxy_url.clone(); canceller_pool_size],
144 };
145
146 let canceller_config = CancelBroadcasterConfig {
147 pool_size: canceller_pool_size,
148 api_key: config.api_key.clone(),
149 api_secret: config.api_secret.clone(),
150 base_url: config.base_url_http.clone(),
151 testnet: config.use_testnet,
152 timeout_secs: config.http_timeout_secs,
153 max_retries: config.max_retries,
154 retry_delay_ms: config.retry_delay_initial_ms,
155 retry_delay_max_ms: config.retry_delay_max_ms,
156 recv_window_ms: config.recv_window_ms,
157 max_requests_per_second: config.max_requests_per_second,
158 max_requests_per_minute: config.max_requests_per_minute,
159 proxy_urls: canceller_proxy_urls,
160 ..Default::default()
161 };
162
163 let _canceller = CancelBroadcaster::new(canceller_config)
164 .context("failed to create CancelBroadcaster")?;
165
166 Ok(Self {
167 core,
168 config,
169 http_client,
170 ws_client,
171 _submitter,
172 _canceller,
173 started: false,
174 connected: false,
175 instruments_initialized: false,
176 ws_stream_handle: None,
177 pending_tasks: Mutex::new(Vec::new()),
178 })
179 }
180
181 fn spawn_task<F>(&self, label: &'static str, fut: F)
182 where
183 F: Future<Output = anyhow::Result<()>> + Send + 'static,
184 {
185 let handle = get_runtime().spawn(async move {
186 if let Err(e) = fut.await {
187 tracing::error!("{label}: {e:?}");
188 }
189 });
190
191 self.pending_tasks
192 .lock()
193 .expect("pending task lock poisoned")
194 .push(handle);
195 }
196
197 fn abort_pending_tasks(&self) {
198 let mut guard = self
199 .pending_tasks
200 .lock()
201 .expect("pending task lock poisoned");
202 for handle in guard.drain(..) {
203 handle.abort();
204 }
205 }
206
207 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
208 if self.instruments_initialized {
209 return Ok(());
210 }
211
212 let http = self.http_client.clone();
213 let mut instruments = http
214 .request_instruments(self.config.active_only)
215 .await
216 .context("failed to request BitMEX instruments")?;
217
218 instruments.sort_by_key(|instrument| instrument.id());
219
220 for instrument in &instruments {
221 self.http_client.cache_instrument(instrument.clone());
222 self._submitter.cache_instrument(instrument.clone());
223 self._canceller.cache_instrument(instrument.clone());
224 }
225
226 self.ws_client.cache_instruments(instruments);
227
228 self.instruments_initialized = true;
229 Ok(())
230 }
231
232 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
233 if self.instruments_initialized {
234 return Ok(());
235 }
236
237 let runtime = get_runtime();
238 runtime.block_on(self.ensure_instruments_initialized_async())
239 }
240
241 async fn refresh_account_state(&self) -> anyhow::Result<()> {
242 let account_state = self
243 .http_client
244 .request_account_state(self.core.account_id)
245 .await
246 .context("failed to request BitMEX account state")?;
247
248 dispatch_account_state(account_state);
249 Ok(())
250 }
251
252 fn update_account_state(&self) -> anyhow::Result<()> {
253 let runtime = get_runtime();
254 runtime.block_on(self.refresh_account_state())
255 }
256
257 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
258 if self.ws_stream_handle.is_some() {
259 return Ok(());
260 }
261
262 let stream = self.ws_client.stream();
263
264 let handle = get_runtime().spawn(async move {
265 pin_mut!(stream);
266 while let Some(message) = stream.next().await {
267 dispatch_ws_message(message);
268 }
269 });
270
271 self.ws_stream_handle = Some(handle);
272 Ok(())
273 }
274}
275
276#[async_trait(?Send)]
277impl ExecutionClient for BitmexExecutionClient {
278 fn is_connected(&self) -> bool {
279 self.connected
280 }
281
282 fn client_id(&self) -> ClientId {
283 self.core.client_id
284 }
285
286 fn account_id(&self) -> AccountId {
287 self.core.account_id
288 }
289
290 fn venue(&self) -> Venue {
291 self.core.venue
292 }
293
294 fn oms_type(&self) -> OmsType {
295 self.core.oms_type
296 }
297
298 fn get_account(&self) -> Option<AccountAny> {
299 self.core.get_account()
300 }
301
302 fn generate_account_state(
303 &self,
304 balances: Vec<AccountBalance>,
305 margins: Vec<MarginBalance>,
306 reported: bool,
307 ts_event: UnixNanos,
308 ) -> anyhow::Result<()> {
309 self.core
310 .generate_account_state(balances, margins, reported, ts_event)
311 }
312
313 fn start(&mut self) -> anyhow::Result<()> {
314 if self.started {
315 return Ok(());
316 }
317
318 self.ensure_instruments_initialized()?;
319 self.started = true;
320 tracing::info!(
321 client_id = %self.core.client_id,
322 account_id = %self.core.account_id,
323 use_testnet = self.config.use_testnet,
324 submitter_pool_size = ?self.config.submitter_pool_size,
325 canceller_pool_size = ?self.config.canceller_pool_size,
326 http_proxy_url = ?self.config.http_proxy_url,
327 ws_proxy_url = ?self.config.ws_proxy_url,
328 submitter_proxy_urls = ?self.config.submitter_proxy_urls,
329 canceller_proxy_urls = ?self.config.canceller_proxy_urls,
330 "BitMEX execution client started"
331 );
332 Ok(())
333 }
334
335 fn stop(&mut self) -> anyhow::Result<()> {
336 if !self.started {
337 return Ok(());
338 }
339
340 self.started = false;
341 self.connected = false;
342 if let Some(handle) = self.ws_stream_handle.take() {
343 handle.abort();
344 }
345 self.abort_pending_tasks();
346 tracing::info!("BitMEX execution client {} stopped", self.core.client_id);
347 Ok(())
348 }
349
350 async fn connect(&mut self) -> anyhow::Result<()> {
351 if self.connected {
352 return Ok(());
353 }
354
355 self.ensure_instruments_initialized_async().await?;
356
357 self._submitter.start().await?;
358 self._canceller.start().await?;
359
360 self.ws_client.connect().await?;
361 self.ws_client.wait_until_active(10.0).await?;
362
363 self.ws_client.subscribe_orders().await?;
364 self.ws_client.subscribe_executions().await?;
365 self.ws_client.subscribe_positions().await?;
366 self.ws_client.subscribe_wallet().await?;
367 if let Err(e) = self.ws_client.subscribe_margin().await {
368 tracing::debug!("Margin subscription unavailable: {e:?}");
369 }
370
371 self.start_ws_stream()?;
372 self.refresh_account_state().await?;
373
374 self.connected = true;
375 self.core.set_connected(true);
376 tracing::info!(client_id = %self.core.client_id, "Connected");
377 Ok(())
378 }
379
380 async fn disconnect(&mut self) -> anyhow::Result<()> {
381 if !self.connected {
382 return Ok(());
383 }
384
385 self.http_client.cancel_all_requests();
386 self._submitter.stop().await;
387 self._canceller.stop().await;
388
389 if let Err(e) = self.ws_client.close().await {
390 tracing::warn!("Error while closing BitMEX execution websocket: {e:?}");
391 }
392
393 if let Some(handle) = self.ws_stream_handle.take() {
394 handle.abort();
395 }
396
397 self.abort_pending_tasks();
398 self.connected = false;
399 self.core.set_connected(false);
400 tracing::info!(client_id = %self.core.client_id, "Disconnected");
401 Ok(())
402 }
403
404 async fn generate_order_status_report(
405 &self,
406 cmd: &GenerateOrderStatusReport,
407 ) -> anyhow::Result<Option<OrderStatusReport>> {
408 let instrument_id = cmd
409 .instrument_id
410 .context("BitMEX generate_order_status_report requires an instrument identifier")?;
411
412 self.http_client
413 .query_order(
414 instrument_id,
415 cmd.client_order_id,
416 cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
417 )
418 .await
419 .context("failed to query BitMEX order status")
420 }
421
422 async fn generate_order_status_reports(
423 &self,
424 cmd: &GenerateOrderStatusReports,
425 ) -> anyhow::Result<Vec<OrderStatusReport>> {
426 let reports = self
427 .http_client
428 .request_order_status_reports(cmd.instrument_id, cmd.open_only, None)
429 .await
430 .context("failed to request BitMEX order status reports")?;
431 Ok(reports)
432 }
433
434 async fn generate_fill_reports(
435 &self,
436 cmd: GenerateFillReports,
437 ) -> anyhow::Result<Vec<FillReport>> {
438 let mut reports = self
439 .http_client
440 .request_fill_reports(cmd.instrument_id, None)
441 .await
442 .context("failed to request BitMEX fill reports")?;
443
444 if let Some(order_id) = cmd.venue_order_id {
445 reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
446 }
447
448 Ok(reports)
449 }
450
451 async fn generate_position_status_reports(
452 &self,
453 cmd: &GeneratePositionStatusReports,
454 ) -> anyhow::Result<Vec<PositionStatusReport>> {
455 let mut reports = self
456 .http_client
457 .request_position_status_reports()
458 .await
459 .context("failed to request BitMEX position reports")?;
460
461 if let Some(instrument_id) = cmd.instrument_id {
462 reports.retain(|report| report.instrument_id == instrument_id);
463 }
464
465 Ok(reports)
466 }
467
468 async fn generate_mass_status(
469 &self,
470 _lookback_mins: Option<u64>,
471 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
472 tracing::warn!("generate_mass_status not yet implemented for BitMEX execution client");
473 Ok(None)
474 }
475
476 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
477 self.update_account_state()
478 }
479
480 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
481 let http_client = self.http_client.clone();
482 let instrument_id = cmd.instrument_id;
483 let client_order_id = Some(cmd.client_order_id);
484 let venue_order_id = cmd.venue_order_id;
485
486 self.spawn_task("query_order", async move {
487 match http_client
488 .request_order_status_report(instrument_id, client_order_id, venue_order_id)
489 .await
490 {
491 Ok(report) => dispatch_order_status_report(report),
492 Err(e) => tracing::error!("BitMEX query order failed: {e:?}"),
493 }
494 Ok(())
495 });
496
497 Ok(())
498 }
499
500 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
501 let order = cmd.order.clone();
502
503 if order.is_closed() {
504 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
505 return Ok(());
506 }
507
508 self.core.generate_order_submitted(
509 order.strategy_id(),
510 order.instrument_id(),
511 order.client_order_id(),
512 cmd.ts_init,
513 );
514
515 let submit_tries = cmd
516 .params
517 .as_ref()
518 .and_then(|params| params.get("submit_tries"))
519 .and_then(|s| s.parse::<usize>().ok())
520 .filter(|&n| n > 0);
521
522 let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
523
524 let http_client = self.http_client.clone();
525 let submitter = self._submitter.clone_for_async();
526 let trader_id = self.core.trader_id;
527 let strategy_id = order.strategy_id();
528 let instrument_id = order.instrument_id();
529 let account_id = self.core.account_id;
530 let client_order_id = order.client_order_id();
531 let order_side = order.order_side();
532 let order_type = order.order_type();
533 let quantity = order.quantity();
534 let time_in_force = order.time_in_force();
535 let price = order.price();
536 let trigger_price = order.trigger_price();
537 let trigger_type = order.trigger_type();
538 let display_qty = order.display_qty();
539 let post_only = order.is_post_only();
540 let reduce_only = order.is_reduce_only();
541 let order_list_id = order.order_list_id();
542 let contingency_type = order.contingency_type();
543 let ts_event = cmd.ts_init;
544
545 self.spawn_task("submit_order", async move {
546 let result = if use_broadcaster {
547 submitter
548 .broadcast_submit(
549 instrument_id,
550 client_order_id,
551 order_side,
552 order_type,
553 quantity,
554 time_in_force,
555 price,
556 trigger_price,
557 trigger_type,
558 display_qty,
559 post_only,
560 reduce_only,
561 order_list_id,
562 contingency_type,
563 submit_tries,
564 )
565 .await
566 } else {
567 http_client
568 .submit_order(
569 instrument_id,
570 client_order_id,
571 order_side,
572 order_type,
573 quantity,
574 time_in_force,
575 price,
576 trigger_price,
577 trigger_type,
578 display_qty,
579 post_only,
580 reduce_only,
581 order_list_id,
582 contingency_type,
583 )
584 .await
585 };
586
587 match result {
588 Ok(report) => dispatch_order_status_report(report),
589 Err(e) => {
590 let event = OrderRejected::new(
591 trader_id,
592 strategy_id,
593 instrument_id,
594 client_order_id,
595 account_id,
596 format!("submit-order-error: {e}").into(),
597 UUID4::new(),
598 ts_event,
599 get_atomic_clock_realtime().get_time_ns(),
600 false,
601 post_only,
602 );
603 dispatch_order_event(OrderEventAny::Rejected(event));
604 }
605 }
606 Ok(())
607 });
608
609 Ok(())
610 }
611
612 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
613 tracing::warn!(
614 "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
615 cmd.order_list.orders.len()
616 );
617 Ok(())
618 }
619
620 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
621 let http_client = self.http_client.clone();
622 let instrument_id = cmd.instrument_id;
623 let client_order_id = Some(cmd.client_order_id);
624 let venue_order_id = cmd.venue_order_id;
625 let quantity = cmd.quantity;
626 let price = cmd.price;
627 let trigger_price = cmd.trigger_price;
628
629 self.spawn_task("modify_order", async move {
630 match http_client
631 .modify_order(
632 instrument_id,
633 client_order_id,
634 venue_order_id,
635 quantity,
636 price,
637 trigger_price,
638 )
639 .await
640 {
641 Ok(report) => dispatch_order_status_report(report),
642 Err(e) => tracing::error!("BitMEX modify order failed: {e:?}"),
643 }
644 Ok(())
645 });
646
647 Ok(())
648 }
649
650 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
651 let canceller = self._canceller.clone_for_async();
652 let instrument_id = cmd.instrument_id;
653 let client_order_id = Some(cmd.client_order_id);
654 let venue_order_id = cmd.venue_order_id;
655
656 self.spawn_task("cancel_order", async move {
657 match canceller
658 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
659 .await
660 {
661 Ok(Some(report)) => dispatch_order_status_report(report),
662 Ok(None) => {
663 tracing::debug!("Order already cancelled: {:?}", client_order_id);
665 }
666 Err(e) => tracing::error!("BitMEX cancel order failed: {e:?}"),
667 }
668 Ok(())
669 });
670
671 Ok(())
672 }
673
674 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
675 let canceller = self._canceller.clone_for_async();
676 let instrument_id = cmd.instrument_id;
677 let order_side = Some(cmd.order_side);
678
679 self.spawn_task("cancel_all_orders", async move {
680 match canceller
681 .broadcast_cancel_all(instrument_id, order_side)
682 .await
683 {
684 Ok(reports) => {
685 for report in reports {
686 dispatch_order_status_report(report);
687 }
688 }
689 Err(e) => tracing::error!("BitMEX cancel all failed: {e:?}"),
690 }
691 Ok(())
692 });
693
694 Ok(())
695 }
696
697 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
698 let canceller = self._canceller.clone_for_async();
699 let instrument_id = cmd.instrument_id;
700 let venue_ids: Vec<VenueOrderId> = cmd
701 .cancels
702 .iter()
703 .filter_map(|cancel| cancel.venue_order_id)
704 .collect();
705
706 self.spawn_task("batch_cancel_orders", async move {
707 match canceller
708 .broadcast_batch_cancel(instrument_id, None, Some(venue_ids))
709 .await
710 {
711 Ok(reports) => {
712 for report in reports {
713 dispatch_order_status_report(report);
714 }
715 }
716 Err(e) => tracing::error!("BitMEX batch cancel failed: {e:?}"),
717 }
718 Ok(())
719 });
720
721 Ok(())
722 }
723}
724
725fn dispatch_ws_message(message: NautilusWsMessage) {
726 match message {
727 NautilusWsMessage::OrderStatusReports(reports) => {
728 for report in reports {
729 dispatch_order_status_report(report);
730 }
731 }
732 NautilusWsMessage::FillReports(reports) => {
733 for report in reports {
734 dispatch_fill_report(report);
735 }
736 }
737 NautilusWsMessage::PositionStatusReport(report) => {
738 dispatch_position_status_report(report);
739 }
740 NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
741 NautilusWsMessage::OrderUpdated(event) => {
742 dispatch_order_event(OrderEventAny::Updated(event));
743 }
744 NautilusWsMessage::Data(_)
745 | NautilusWsMessage::Instruments(_)
746 | NautilusWsMessage::FundingRateUpdates(_) => {
747 tracing::debug!("Ignoring BitMEX data message on execution stream");
748 }
749 NautilusWsMessage::Reconnected => {
750 tracing::info!("BitMEX execution websocket reconnected");
751 }
752 NautilusWsMessage::Authenticated => {
753 tracing::debug!("BitMEX execution websocket authenticated");
754 }
755 }
756}
757
758fn dispatch_account_state(state: AccountState) {
759 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
760}
761
762fn dispatch_order_status_report(report: OrderStatusReport) {
763 let sender = get_exec_event_sender();
764 let exec_report = ExecutionReport::OrderStatus(Box::new(report));
765 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
766 tracing::warn!("Failed to send order status report: {e}");
767 }
768}
769
770fn dispatch_fill_report(report: FillReport) {
771 let sender = get_exec_event_sender();
772 let exec_report = ExecutionReport::Fill(Box::new(report));
773 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
774 tracing::warn!("Failed to send fill report: {e}");
775 }
776}
777
778fn dispatch_position_status_report(report: PositionStatusReport) {
779 let sender = get_exec_event_sender();
780 let exec_report = ExecutionReport::Position(Box::new(report));
781 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
782 tracing::warn!("Failed to send position status report: {e}");
783 }
784}
785
786fn dispatch_order_event(event: OrderEventAny) {
787 let sender = get_exec_event_sender();
788 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
789 tracing::warn!("Failed to send order event: {e}");
790 }
791}