1pub mod canceller;
19pub mod submitter;
20
21use std::{any::Any, future::Future, sync::Mutex};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 clients::ExecutionClient,
28 live::{runner::get_exec_event_sender, runtime::get_runtime},
29 messages::{
30 ExecutionEvent, ExecutionReport,
31 execution::{
32 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
34 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
35 },
36 },
37 msgbus,
38};
39use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
40use nautilus_live::ExecutionClientCore;
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::OmsType,
44 events::{AccountState, OrderEventAny, OrderRejected},
45 identifiers::{AccountId, ClientId, Venue, VenueOrderId},
46 instruments::Instrument,
47 orders::Order,
48 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
49 types::{AccountBalance, MarginBalance},
50};
51use tokio::task::JoinHandle;
52
53use crate::{
54 config::BitmexExecClientConfig,
55 execution::{
56 canceller::{CancelBroadcaster, CancelBroadcasterConfig},
57 submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
58 },
59 http::client::BitmexHttpClient,
60 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
61};
62
63#[derive(Debug)]
64pub struct BitmexExecutionClient {
65 core: ExecutionClientCore,
66 config: BitmexExecClientConfig,
67 http_client: BitmexHttpClient,
68 ws_client: BitmexWebSocketClient,
69 _submitter: SubmitBroadcaster,
70 _canceller: CancelBroadcaster,
71 started: bool,
72 connected: bool,
73 instruments_initialized: bool,
74 ws_stream_handle: Option<JoinHandle<()>>,
75 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
76}
77
78impl BitmexExecutionClient {
79 pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
85 if !config.has_api_credentials() {
86 anyhow::bail!("BitMEX execution client requires API key and secret");
87 }
88
89 let http_client = BitmexHttpClient::new(
90 Some(config.http_base_url()),
91 config.api_key.clone(),
92 config.api_secret.clone(),
93 config.use_testnet,
94 config.http_timeout_secs,
95 config.max_retries,
96 config.retry_delay_initial_ms,
97 config.retry_delay_max_ms,
98 config.recv_window_ms,
99 config.max_requests_per_second,
100 config.max_requests_per_minute,
101 config.http_proxy_url.clone(),
102 )
103 .context("failed to construct BitMEX HTTP client")?;
104
105 let account_id = config.account_id.unwrap_or(core.account_id);
106 let ws_client = BitmexWebSocketClient::new(
107 Some(config.ws_url()),
108 config.api_key.clone(),
109 config.api_secret.clone(),
110 Some(account_id),
111 config.heartbeat_interval_secs,
112 )
113 .context("failed to construct BitMEX execution websocket client")?;
114
115 let pool_size = config.submitter_pool_size.unwrap_or(1);
116 let submitter_proxy_urls = match &config.submitter_proxy_urls {
117 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
118 None => vec![config.http_proxy_url.clone(); pool_size],
119 };
120
121 let submitter_config = SubmitBroadcasterConfig {
122 pool_size,
123 api_key: config.api_key.clone(),
124 api_secret: config.api_secret.clone(),
125 base_url: config.base_url_http.clone(),
126 testnet: config.use_testnet,
127 timeout_secs: config.http_timeout_secs,
128 max_retries: config.max_retries,
129 retry_delay_ms: config.retry_delay_initial_ms,
130 retry_delay_max_ms: config.retry_delay_max_ms,
131 recv_window_ms: config.recv_window_ms,
132 max_requests_per_second: config.max_requests_per_second,
133 max_requests_per_minute: config.max_requests_per_minute,
134 proxy_urls: submitter_proxy_urls,
135 ..Default::default()
136 };
137
138 let _submitter = SubmitBroadcaster::new(submitter_config)
139 .context("failed to create SubmitBroadcaster")?;
140
141 let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
142 let canceller_proxy_urls = match &config.canceller_proxy_urls {
143 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
144 None => vec![config.http_proxy_url.clone(); canceller_pool_size],
145 };
146
147 let canceller_config = CancelBroadcasterConfig {
148 pool_size: canceller_pool_size,
149 api_key: config.api_key.clone(),
150 api_secret: config.api_secret.clone(),
151 base_url: config.base_url_http.clone(),
152 testnet: config.use_testnet,
153 timeout_secs: config.http_timeout_secs,
154 max_retries: config.max_retries,
155 retry_delay_ms: config.retry_delay_initial_ms,
156 retry_delay_max_ms: config.retry_delay_max_ms,
157 recv_window_ms: config.recv_window_ms,
158 max_requests_per_second: config.max_requests_per_second,
159 max_requests_per_minute: config.max_requests_per_minute,
160 proxy_urls: canceller_proxy_urls,
161 ..Default::default()
162 };
163
164 let _canceller = CancelBroadcaster::new(canceller_config)
165 .context("failed to create CancelBroadcaster")?;
166
167 Ok(Self {
168 core,
169 config,
170 http_client,
171 ws_client,
172 _submitter,
173 _canceller,
174 started: false,
175 connected: false,
176 instruments_initialized: false,
177 ws_stream_handle: None,
178 pending_tasks: Mutex::new(Vec::new()),
179 })
180 }
181
182 fn spawn_task<F>(&self, label: &'static str, fut: F)
183 where
184 F: Future<Output = anyhow::Result<()>> + Send + 'static,
185 {
186 let handle = get_runtime().spawn(async move {
187 if let Err(e) = fut.await {
188 log::error!("{label}: {e:?}");
189 }
190 });
191
192 self.pending_tasks
193 .lock()
194 .expect("pending task lock poisoned")
195 .push(handle);
196 }
197
198 fn abort_pending_tasks(&self) {
199 let mut guard = self
200 .pending_tasks
201 .lock()
202 .expect("pending task lock poisoned");
203 for handle in guard.drain(..) {
204 handle.abort();
205 }
206 }
207
208 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
209 if self.instruments_initialized {
210 return Ok(());
211 }
212
213 let http = self.http_client.clone();
214 let mut instruments = http
215 .request_instruments(self.config.active_only)
216 .await
217 .context("failed to request BitMEX instruments")?;
218
219 instruments.sort_by_key(|instrument| instrument.id());
220
221 for instrument in &instruments {
222 self.http_client.cache_instrument(instrument.clone());
223 self._submitter.cache_instrument(instrument.clone());
224 self._canceller.cache_instrument(instrument.clone());
225 }
226
227 self.ws_client.cache_instruments(instruments);
228
229 self.instruments_initialized = true;
230 Ok(())
231 }
232
233 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
234 if self.instruments_initialized {
235 return Ok(());
236 }
237
238 let runtime = get_runtime();
239 runtime.block_on(self.ensure_instruments_initialized_async())
240 }
241
242 async fn refresh_account_state(&self) -> anyhow::Result<()> {
243 let account_state = self
244 .http_client
245 .request_account_state(self.core.account_id)
246 .await
247 .context("failed to request BitMEX account state")?;
248
249 dispatch_account_state(account_state);
250 Ok(())
251 }
252
253 fn update_account_state(&self) -> anyhow::Result<()> {
254 let runtime = get_runtime();
255 runtime.block_on(self.refresh_account_state())
256 }
257
258 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
259 if self.ws_stream_handle.is_some() {
260 return Ok(());
261 }
262
263 let stream = self.ws_client.stream();
264
265 let handle = get_runtime().spawn(async move {
266 pin_mut!(stream);
267 while let Some(message) = stream.next().await {
268 dispatch_ws_message(message);
269 }
270 });
271
272 self.ws_stream_handle = Some(handle);
273 Ok(())
274 }
275}
276
277#[async_trait(?Send)]
278impl ExecutionClient for BitmexExecutionClient {
279 fn is_connected(&self) -> bool {
280 self.connected
281 }
282
283 fn client_id(&self) -> ClientId {
284 self.core.client_id
285 }
286
287 fn account_id(&self) -> AccountId {
288 self.core.account_id
289 }
290
291 fn venue(&self) -> Venue {
292 self.core.venue
293 }
294
295 fn oms_type(&self) -> OmsType {
296 self.core.oms_type
297 }
298
299 fn get_account(&self) -> Option<AccountAny> {
300 self.core.get_account()
301 }
302
303 fn generate_account_state(
304 &self,
305 balances: Vec<AccountBalance>,
306 margins: Vec<MarginBalance>,
307 reported: bool,
308 ts_event: UnixNanos,
309 ) -> anyhow::Result<()> {
310 self.core
311 .generate_account_state(balances, margins, reported, ts_event)
312 }
313
314 fn start(&mut self) -> anyhow::Result<()> {
315 if self.started {
316 return Ok(());
317 }
318
319 self.ensure_instruments_initialized()?;
320 self.started = true;
321 log::info!(
322 "BitMEX execution client started: client_id={}, account_id={}, use_testnet={}, submitter_pool_size={:?}, canceller_pool_size={:?}, http_proxy_url={:?}, ws_proxy_url={:?}, submitter_proxy_urls={:?}, canceller_proxy_urls={:?}",
323 self.core.client_id,
324 self.core.account_id,
325 self.config.use_testnet,
326 self.config.submitter_pool_size,
327 self.config.canceller_pool_size,
328 self.config.http_proxy_url,
329 self.config.ws_proxy_url,
330 self.config.submitter_proxy_urls,
331 self.config.canceller_proxy_urls,
332 );
333 Ok(())
334 }
335
336 fn stop(&mut self) -> anyhow::Result<()> {
337 if !self.started {
338 return Ok(());
339 }
340
341 self.started = false;
342 self.connected = false;
343 if let Some(handle) = self.ws_stream_handle.take() {
344 handle.abort();
345 }
346 self.abort_pending_tasks();
347 log::info!("BitMEX execution client {} stopped", self.core.client_id);
348 Ok(())
349 }
350
351 async fn connect(&mut self) -> anyhow::Result<()> {
352 if self.connected {
353 return Ok(());
354 }
355
356 self.ensure_instruments_initialized_async().await?;
357
358 self._submitter.start().await?;
359 self._canceller.start().await?;
360
361 self.ws_client.connect().await?;
362 self.ws_client.wait_until_active(10.0).await?;
363
364 self.ws_client.subscribe_orders().await?;
365 self.ws_client.subscribe_executions().await?;
366 self.ws_client.subscribe_positions().await?;
367 self.ws_client.subscribe_wallet().await?;
368 if let Err(e) = self.ws_client.subscribe_margin().await {
369 log::debug!("Margin subscription unavailable: {e:?}");
370 }
371
372 self.start_ws_stream()?;
373 self.refresh_account_state().await?;
374
375 self.connected = true;
376 self.core.set_connected(true);
377 log::info!("Connected: client_id={}", self.core.client_id);
378 Ok(())
379 }
380
381 async fn disconnect(&mut self) -> anyhow::Result<()> {
382 if !self.connected {
383 return Ok(());
384 }
385
386 self.http_client.cancel_all_requests();
387 self._submitter.stop().await;
388 self._canceller.stop().await;
389
390 if let Err(e) = self.ws_client.close().await {
391 log::warn!("Error while closing BitMEX execution websocket: {e:?}");
392 }
393
394 if let Some(handle) = self.ws_stream_handle.take() {
395 handle.abort();
396 }
397
398 self.abort_pending_tasks();
399 self.connected = false;
400 self.core.set_connected(false);
401 log::info!("Disconnected: client_id={}", self.core.client_id);
402 Ok(())
403 }
404
405 async fn generate_order_status_report(
406 &self,
407 cmd: &GenerateOrderStatusReport,
408 ) -> anyhow::Result<Option<OrderStatusReport>> {
409 let instrument_id = cmd
410 .instrument_id
411 .context("BitMEX generate_order_status_report requires an instrument identifier")?;
412
413 self.http_client
414 .query_order(
415 instrument_id,
416 cmd.client_order_id,
417 cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
418 )
419 .await
420 .context("failed to query BitMEX order status")
421 }
422
423 async fn generate_order_status_reports(
424 &self,
425 cmd: &GenerateOrderStatusReports,
426 ) -> anyhow::Result<Vec<OrderStatusReport>> {
427 let reports = self
428 .http_client
429 .request_order_status_reports(cmd.instrument_id, cmd.open_only, None)
430 .await
431 .context("failed to request BitMEX order status reports")?;
432 Ok(reports)
433 }
434
435 async fn generate_fill_reports(
436 &self,
437 cmd: GenerateFillReports,
438 ) -> anyhow::Result<Vec<FillReport>> {
439 let mut reports = self
440 .http_client
441 .request_fill_reports(cmd.instrument_id, None)
442 .await
443 .context("failed to request BitMEX fill reports")?;
444
445 if let Some(order_id) = cmd.venue_order_id {
446 reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
447 }
448
449 Ok(reports)
450 }
451
452 async fn generate_position_status_reports(
453 &self,
454 cmd: &GeneratePositionStatusReports,
455 ) -> anyhow::Result<Vec<PositionStatusReport>> {
456 let mut reports = self
457 .http_client
458 .request_position_status_reports()
459 .await
460 .context("failed to request BitMEX position reports")?;
461
462 if let Some(instrument_id) = cmd.instrument_id {
463 reports.retain(|report| report.instrument_id == instrument_id);
464 }
465
466 Ok(reports)
467 }
468
469 async fn generate_mass_status(
470 &self,
471 _lookback_mins: Option<u64>,
472 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
473 log::warn!("generate_mass_status not yet implemented for BitMEX execution client");
474 Ok(None)
475 }
476
477 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
478 self.update_account_state()
479 }
480
481 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
482 let http_client = self.http_client.clone();
483 let instrument_id = cmd.instrument_id;
484 let client_order_id = Some(cmd.client_order_id);
485 let venue_order_id = cmd.venue_order_id;
486
487 self.spawn_task("query_order", async move {
488 match http_client
489 .request_order_status_report(instrument_id, client_order_id, venue_order_id)
490 .await
491 {
492 Ok(report) => dispatch_order_status_report(report),
493 Err(e) => log::error!("BitMEX query order failed: {e:?}"),
494 }
495 Ok(())
496 });
497
498 Ok(())
499 }
500
501 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
502 let order = cmd.order.clone();
503
504 if order.is_closed() {
505 log::warn!("Cannot submit closed order {}", order.client_order_id());
506 return Ok(());
507 }
508
509 self.core.generate_order_submitted(
510 order.strategy_id(),
511 order.instrument_id(),
512 order.client_order_id(),
513 cmd.ts_init,
514 );
515
516 let submit_tries = cmd
517 .params
518 .as_ref()
519 .and_then(|params| params.get("submit_tries"))
520 .and_then(|s| s.parse::<usize>().ok())
521 .filter(|&n| n > 0);
522
523 let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
524
525 let http_client = self.http_client.clone();
526 let submitter = self._submitter.clone_for_async();
527 let trader_id = self.core.trader_id;
528 let strategy_id = order.strategy_id();
529 let instrument_id = order.instrument_id();
530 let account_id = self.core.account_id;
531 let client_order_id = order.client_order_id();
532 let order_side = order.order_side();
533 let order_type = order.order_type();
534 let quantity = order.quantity();
535 let time_in_force = order.time_in_force();
536 let price = order.price();
537 let trigger_price = order.trigger_price();
538 let trigger_type = order.trigger_type();
539 let display_qty = order.display_qty();
540 let post_only = order.is_post_only();
541 let reduce_only = order.is_reduce_only();
542 let order_list_id = order.order_list_id();
543 let contingency_type = order.contingency_type();
544 let ts_event = cmd.ts_init;
545
546 self.spawn_task("submit_order", async move {
547 let result = if use_broadcaster {
548 submitter
549 .broadcast_submit(
550 instrument_id,
551 client_order_id,
552 order_side,
553 order_type,
554 quantity,
555 time_in_force,
556 price,
557 trigger_price,
558 trigger_type,
559 display_qty,
560 post_only,
561 reduce_only,
562 order_list_id,
563 contingency_type,
564 submit_tries,
565 )
566 .await
567 } else {
568 http_client
569 .submit_order(
570 instrument_id,
571 client_order_id,
572 order_side,
573 order_type,
574 quantity,
575 time_in_force,
576 price,
577 trigger_price,
578 trigger_type,
579 display_qty,
580 post_only,
581 reduce_only,
582 order_list_id,
583 contingency_type,
584 )
585 .await
586 };
587
588 match result {
589 Ok(report) => dispatch_order_status_report(report),
590 Err(e) => {
591 let event = OrderRejected::new(
592 trader_id,
593 strategy_id,
594 instrument_id,
595 client_order_id,
596 account_id,
597 format!("submit-order-error: {e}").into(),
598 UUID4::new(),
599 ts_event,
600 get_atomic_clock_realtime().get_time_ns(),
601 false,
602 post_only,
603 );
604 dispatch_order_event(OrderEventAny::Rejected(event));
605 }
606 }
607 Ok(())
608 });
609
610 Ok(())
611 }
612
613 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
614 log::warn!(
615 "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
616 cmd.order_list.orders.len()
617 );
618 Ok(())
619 }
620
621 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
622 let http_client = self.http_client.clone();
623 let instrument_id = cmd.instrument_id;
624 let client_order_id = Some(cmd.client_order_id);
625 let venue_order_id = cmd.venue_order_id;
626 let quantity = cmd.quantity;
627 let price = cmd.price;
628 let trigger_price = cmd.trigger_price;
629
630 self.spawn_task("modify_order", async move {
631 match http_client
632 .modify_order(
633 instrument_id,
634 client_order_id,
635 venue_order_id,
636 quantity,
637 price,
638 trigger_price,
639 )
640 .await
641 {
642 Ok(report) => dispatch_order_status_report(report),
643 Err(e) => log::error!("BitMEX modify order failed: {e:?}"),
644 }
645 Ok(())
646 });
647
648 Ok(())
649 }
650
651 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
652 let canceller = self._canceller.clone_for_async();
653 let instrument_id = cmd.instrument_id;
654 let client_order_id = Some(cmd.client_order_id);
655 let venue_order_id = cmd.venue_order_id;
656
657 self.spawn_task("cancel_order", async move {
658 match canceller
659 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
660 .await
661 {
662 Ok(Some(report)) => dispatch_order_status_report(report),
663 Ok(None) => {
664 log::debug!("Order already cancelled: {client_order_id:?}");
666 }
667 Err(e) => log::error!("BitMEX cancel order failed: {e:?}"),
668 }
669 Ok(())
670 });
671
672 Ok(())
673 }
674
675 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
676 let canceller = self._canceller.clone_for_async();
677 let instrument_id = cmd.instrument_id;
678 let order_side = Some(cmd.order_side);
679
680 self.spawn_task("cancel_all_orders", async move {
681 match canceller
682 .broadcast_cancel_all(instrument_id, order_side)
683 .await
684 {
685 Ok(reports) => {
686 for report in reports {
687 dispatch_order_status_report(report);
688 }
689 }
690 Err(e) => log::error!("BitMEX cancel all failed: {e:?}"),
691 }
692 Ok(())
693 });
694
695 Ok(())
696 }
697
698 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
699 let canceller = self._canceller.clone_for_async();
700 let instrument_id = cmd.instrument_id;
701 let venue_ids: Vec<VenueOrderId> = cmd
702 .cancels
703 .iter()
704 .filter_map(|cancel| cancel.venue_order_id)
705 .collect();
706
707 self.spawn_task("batch_cancel_orders", async move {
708 match canceller
709 .broadcast_batch_cancel(instrument_id, None, Some(venue_ids))
710 .await
711 {
712 Ok(reports) => {
713 for report in reports {
714 dispatch_order_status_report(report);
715 }
716 }
717 Err(e) => log::error!("BitMEX batch cancel failed: {e:?}"),
718 }
719 Ok(())
720 });
721
722 Ok(())
723 }
724}
725
726fn dispatch_ws_message(message: NautilusWsMessage) {
727 match message {
728 NautilusWsMessage::OrderStatusReports(reports) => {
729 for report in reports {
730 dispatch_order_status_report(report);
731 }
732 }
733 NautilusWsMessage::FillReports(reports) => {
734 for report in reports {
735 dispatch_fill_report(report);
736 }
737 }
738 NautilusWsMessage::PositionStatusReport(report) => {
739 dispatch_position_status_report(report);
740 }
741 NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
742 NautilusWsMessage::OrderUpdated(event) => {
743 dispatch_order_event(OrderEventAny::Updated(event));
744 }
745 NautilusWsMessage::Data(_)
746 | NautilusWsMessage::Instruments(_)
747 | NautilusWsMessage::FundingRateUpdates(_) => {
748 log::debug!("Ignoring BitMEX data message on execution stream");
749 }
750 NautilusWsMessage::Reconnected => {
751 log::info!("BitMEX execution websocket reconnected");
752 }
753 NautilusWsMessage::Authenticated => {
754 log::debug!("BitMEX execution websocket authenticated");
755 }
756 }
757}
758
759fn dispatch_account_state(state: AccountState) {
760 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
761}
762
763fn dispatch_order_status_report(report: OrderStatusReport) {
764 let sender = get_exec_event_sender();
765 let exec_report = ExecutionReport::Order(Box::new(report));
766 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
767 log::warn!("Failed to send order status report: {e}");
768 }
769}
770
771fn dispatch_fill_report(report: FillReport) {
772 let sender = get_exec_event_sender();
773 let exec_report = ExecutionReport::Fill(Box::new(report));
774 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
775 log::warn!("Failed to send fill report: {e}");
776 }
777}
778
779fn dispatch_position_status_report(report: PositionStatusReport) {
780 let sender = get_exec_event_sender();
781 let exec_report = ExecutionReport::Position(Box::new(report));
782 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
783 log::warn!("Failed to send position status report: {e}");
784 }
785}
786
787fn dispatch_order_event(event: OrderEventAny) {
788 let sender = get_exec_event_sender();
789 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
790 log::warn!("Failed to send order event: {e}");
791 }
792}