1pub mod canceller;
19pub mod submitter;
20
21use std::{any::Any, future::Future, sync::Mutex};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 live::{runner::get_exec_event_sender, runtime::get_runtime},
28 messages::{
29 ExecutionEvent, ExecutionReport,
30 execution::{
31 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32 GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
33 QueryOrder, SubmitOrder, SubmitOrderList,
34 },
35 },
36 msgbus,
37};
38use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
39use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
40use nautilus_live::execution::client::LiveExecutionClient;
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::OmsType,
44 events::{AccountState, OrderEventAny, OrderRejected},
45 identifiers::{AccountId, ClientId, Venue, VenueOrderId},
46 instruments::Instrument,
47 orders::Order,
48 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
49 types::{AccountBalance, MarginBalance},
50};
51use tokio::task::JoinHandle;
52
53use crate::{
54 config::BitmexExecClientConfig,
55 execution::{
56 canceller::{CancelBroadcaster, CancelBroadcasterConfig},
57 submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
58 },
59 http::client::BitmexHttpClient,
60 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
61};
62
63#[derive(Debug)]
64pub struct BitmexExecutionClient {
65 core: ExecutionClientCore,
66 config: BitmexExecClientConfig,
67 http_client: BitmexHttpClient,
68 ws_client: BitmexWebSocketClient,
69 _submitter: SubmitBroadcaster,
70 _canceller: CancelBroadcaster,
71 started: bool,
72 connected: bool,
73 instruments_initialized: bool,
74 ws_stream_handle: Option<JoinHandle<()>>,
75 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
76}
77
78impl BitmexExecutionClient {
79 pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
85 if !config.has_api_credentials() {
86 anyhow::bail!("BitMEX execution client requires API key and secret");
87 }
88
89 let http_client = BitmexHttpClient::new(
90 Some(config.http_base_url()),
91 config.api_key.clone(),
92 config.api_secret.clone(),
93 config.use_testnet,
94 config.http_timeout_secs,
95 config.max_retries,
96 config.retry_delay_initial_ms,
97 config.retry_delay_max_ms,
98 config.recv_window_ms,
99 config.max_requests_per_second,
100 config.max_requests_per_minute,
101 config.http_proxy_url.clone(),
102 )
103 .context("failed to construct BitMEX HTTP client")?;
104
105 let account_id = config.account_id.unwrap_or(core.account_id);
106 let ws_client = BitmexWebSocketClient::new(
107 Some(config.ws_url()),
108 config.api_key.clone(),
109 config.api_secret.clone(),
110 Some(account_id),
111 config.heartbeat_interval_secs,
112 )
113 .context("failed to construct BitMEX execution websocket client")?;
114
115 let pool_size = config.submitter_pool_size.unwrap_or(1);
116 let submitter_proxy_urls = match &config.submitter_proxy_urls {
117 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
118 None => vec![config.http_proxy_url.clone(); pool_size],
119 };
120
121 let submitter_config = SubmitBroadcasterConfig {
122 pool_size,
123 api_key: config.api_key.clone(),
124 api_secret: config.api_secret.clone(),
125 base_url: config.base_url_http.clone(),
126 testnet: config.use_testnet,
127 timeout_secs: config.http_timeout_secs,
128 max_retries: config.max_retries,
129 retry_delay_ms: config.retry_delay_initial_ms,
130 retry_delay_max_ms: config.retry_delay_max_ms,
131 recv_window_ms: config.recv_window_ms,
132 max_requests_per_second: config.max_requests_per_second,
133 max_requests_per_minute: config.max_requests_per_minute,
134 proxy_urls: submitter_proxy_urls,
135 ..Default::default()
136 };
137
138 let _submitter = SubmitBroadcaster::new(submitter_config)
139 .context("failed to create SubmitBroadcaster")?;
140
141 let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
142 let canceller_proxy_urls = match &config.canceller_proxy_urls {
143 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
144 None => vec![config.http_proxy_url.clone(); canceller_pool_size],
145 };
146
147 let canceller_config = CancelBroadcasterConfig {
148 pool_size: canceller_pool_size,
149 api_key: config.api_key.clone(),
150 api_secret: config.api_secret.clone(),
151 base_url: config.base_url_http.clone(),
152 testnet: config.use_testnet,
153 timeout_secs: config.http_timeout_secs,
154 max_retries: config.max_retries,
155 retry_delay_ms: config.retry_delay_initial_ms,
156 retry_delay_max_ms: config.retry_delay_max_ms,
157 recv_window_ms: config.recv_window_ms,
158 max_requests_per_second: config.max_requests_per_second,
159 max_requests_per_minute: config.max_requests_per_minute,
160 proxy_urls: canceller_proxy_urls,
161 ..Default::default()
162 };
163
164 let _canceller = CancelBroadcaster::new(canceller_config)
165 .context("failed to create CancelBroadcaster")?;
166
167 Ok(Self {
168 core,
169 config,
170 http_client,
171 ws_client,
172 _submitter,
173 _canceller,
174 started: false,
175 connected: false,
176 instruments_initialized: false,
177 ws_stream_handle: None,
178 pending_tasks: Mutex::new(Vec::new()),
179 })
180 }
181
182 fn spawn_task<F>(&self, label: &'static str, fut: F)
183 where
184 F: Future<Output = anyhow::Result<()>> + Send + 'static,
185 {
186 let handle = tokio::spawn(async move {
187 if let Err(e) = fut.await {
188 tracing::error!("{label}: {e:?}");
189 }
190 });
191
192 self.pending_tasks
193 .lock()
194 .expect("pending task lock poisoned")
195 .push(handle);
196 }
197
198 fn abort_pending_tasks(&self) {
199 let mut guard = self
200 .pending_tasks
201 .lock()
202 .expect("pending task lock poisoned");
203 for handle in guard.drain(..) {
204 handle.abort();
205 }
206 }
207
208 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
209 if self.instruments_initialized {
210 return Ok(());
211 }
212
213 let http = self.http_client.clone();
214 let mut instruments = http
215 .request_instruments(self.config.active_only)
216 .await
217 .context("failed to request BitMEX instruments")?;
218
219 instruments.sort_by_key(|instrument| instrument.id());
220
221 for instrument in &instruments {
222 self.http_client.cache_instrument(instrument.clone());
223 self._submitter.cache_instrument(instrument.clone());
224 self._canceller.cache_instrument(instrument.clone());
225 }
226
227 self.ws_client.cache_instruments(instruments);
228
229 self.instruments_initialized = true;
230 Ok(())
231 }
232
233 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
234 if self.instruments_initialized {
235 return Ok(());
236 }
237
238 let runtime = get_runtime();
239 runtime.block_on(self.ensure_instruments_initialized_async())
240 }
241
242 async fn refresh_account_state(&self) -> anyhow::Result<()> {
243 let account_state = self
244 .http_client
245 .request_account_state(self.core.account_id)
246 .await
247 .context("failed to request BitMEX account state")?;
248
249 dispatch_account_state(account_state);
250 Ok(())
251 }
252
253 fn update_account_state(&self) -> anyhow::Result<()> {
254 let runtime = get_runtime();
255 runtime.block_on(self.refresh_account_state())
256 }
257
258 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
259 if self.ws_stream_handle.is_some() {
260 return Ok(());
261 }
262
263 let stream = self.ws_client.stream();
264
265 let handle = tokio::spawn(async move {
266 pin_mut!(stream);
267 while let Some(message) = stream.next().await {
268 dispatch_ws_message(message);
269 }
270 });
271
272 self.ws_stream_handle = Some(handle);
273 Ok(())
274 }
275}
276
277#[async_trait(?Send)]
278impl ExecutionClient for BitmexExecutionClient {
279 fn is_connected(&self) -> bool {
280 self.connected
281 }
282
283 fn client_id(&self) -> ClientId {
284 self.core.client_id
285 }
286
287 fn account_id(&self) -> AccountId {
288 self.core.account_id
289 }
290
291 fn venue(&self) -> Venue {
292 self.core.venue
293 }
294
295 fn oms_type(&self) -> OmsType {
296 self.core.oms_type
297 }
298
299 fn get_account(&self) -> Option<AccountAny> {
300 self.core.get_account()
301 }
302
303 fn generate_account_state(
304 &self,
305 balances: Vec<AccountBalance>,
306 margins: Vec<MarginBalance>,
307 reported: bool,
308 ts_event: UnixNanos,
309 ) -> anyhow::Result<()> {
310 self.core
311 .generate_account_state(balances, margins, reported, ts_event)
312 }
313
314 fn start(&mut self) -> anyhow::Result<()> {
315 if self.started {
316 return Ok(());
317 }
318
319 self.ensure_instruments_initialized()?;
320 self.started = true;
321 tracing::info!(
322 client_id = %self.core.client_id,
323 account_id = %self.core.account_id,
324 use_testnet = self.config.use_testnet,
325 submitter_pool_size = ?self.config.submitter_pool_size,
326 canceller_pool_size = ?self.config.canceller_pool_size,
327 http_proxy_url = ?self.config.http_proxy_url,
328 ws_proxy_url = ?self.config.ws_proxy_url,
329 submitter_proxy_urls = ?self.config.submitter_proxy_urls,
330 canceller_proxy_urls = ?self.config.canceller_proxy_urls,
331 "BitMEX execution client started"
332 );
333 Ok(())
334 }
335
336 fn stop(&mut self) -> anyhow::Result<()> {
337 if !self.started {
338 return Ok(());
339 }
340
341 self.started = false;
342 self.connected = false;
343 if let Some(handle) = self.ws_stream_handle.take() {
344 handle.abort();
345 }
346 self.abort_pending_tasks();
347 tracing::info!("BitMEX execution client {} stopped", self.core.client_id);
348 Ok(())
349 }
350
351 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
352 let order = cmd.order.clone();
353
354 if order.is_closed() {
355 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
356 return Ok(());
357 }
358
359 self.core.generate_order_submitted(
360 order.strategy_id(),
361 order.instrument_id(),
362 order.client_order_id(),
363 cmd.ts_init,
364 );
365
366 let submit_tries = cmd
367 .params
368 .as_ref()
369 .and_then(|params| params.get("submit_tries"))
370 .and_then(|s| s.parse::<usize>().ok())
371 .filter(|&n| n > 0);
372
373 let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
374
375 let http_client = self.http_client.clone();
376 let submitter = self._submitter.clone_for_async();
377 let trader_id = self.core.trader_id;
378 let strategy_id = order.strategy_id();
379 let instrument_id = order.instrument_id();
380 let account_id = self.core.account_id;
381 let client_order_id = order.client_order_id();
382 let order_side = order.order_side();
383 let order_type = order.order_type();
384 let quantity = order.quantity();
385 let time_in_force = order.time_in_force();
386 let price = order.price();
387 let trigger_price = order.trigger_price();
388 let trigger_type = order.trigger_type();
389 let display_qty = order.display_qty();
390 let post_only = order.is_post_only();
391 let reduce_only = order.is_reduce_only();
392 let order_list_id = order.order_list_id();
393 let contingency_type = order.contingency_type();
394 let ts_event = cmd.ts_init;
395
396 self.spawn_task("submit_order", async move {
397 let result = if use_broadcaster {
398 submitter
399 .broadcast_submit(
400 instrument_id,
401 client_order_id,
402 order_side,
403 order_type,
404 quantity,
405 time_in_force,
406 price,
407 trigger_price,
408 trigger_type,
409 display_qty,
410 post_only,
411 reduce_only,
412 order_list_id,
413 contingency_type,
414 submit_tries,
415 )
416 .await
417 } else {
418 http_client
419 .submit_order(
420 instrument_id,
421 client_order_id,
422 order_side,
423 order_type,
424 quantity,
425 time_in_force,
426 price,
427 trigger_price,
428 trigger_type,
429 display_qty,
430 post_only,
431 reduce_only,
432 order_list_id,
433 contingency_type,
434 )
435 .await
436 };
437
438 match result {
439 Ok(report) => dispatch_order_status_report(report),
440 Err(e) => {
441 let event = OrderRejected::new(
442 trader_id,
443 strategy_id,
444 instrument_id,
445 client_order_id,
446 account_id,
447 format!("submit-order-error: {e}").into(),
448 UUID4::new(),
449 ts_event,
450 get_atomic_clock_realtime().get_time_ns(),
451 false,
452 post_only,
453 );
454 dispatch_order_event(OrderEventAny::Rejected(event));
455 }
456 }
457 Ok(())
458 });
459
460 Ok(())
461 }
462
463 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
464 tracing::warn!(
465 "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
466 cmd.order_list.orders.len()
467 );
468 Ok(())
469 }
470
471 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
472 let http_client = self.http_client.clone();
473 let instrument_id = cmd.instrument_id;
474 let client_order_id = Some(cmd.client_order_id);
475 let venue_order_id = Some(cmd.venue_order_id);
476 let quantity = cmd.quantity;
477 let price = cmd.price;
478 let trigger_price = cmd.trigger_price;
479
480 self.spawn_task("modify_order", async move {
481 match http_client
482 .modify_order(
483 instrument_id,
484 client_order_id,
485 venue_order_id,
486 quantity,
487 price,
488 trigger_price,
489 )
490 .await
491 {
492 Ok(report) => dispatch_order_status_report(report),
493 Err(e) => tracing::error!("BitMEX modify order failed: {e:?}"),
494 }
495 Ok(())
496 });
497
498 Ok(())
499 }
500
501 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
502 let canceller = self._canceller.clone_for_async();
503 let instrument_id = cmd.instrument_id;
504 let client_order_id = Some(cmd.client_order_id);
505 let venue_order_id = Some(cmd.venue_order_id);
506
507 self.spawn_task("cancel_order", async move {
508 match canceller
509 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
510 .await
511 {
512 Ok(Some(report)) => dispatch_order_status_report(report),
513 Ok(None) => {
514 tracing::debug!("Order already cancelled: {:?}", client_order_id);
516 }
517 Err(e) => tracing::error!("BitMEX cancel order failed: {e:?}"),
518 }
519 Ok(())
520 });
521
522 Ok(())
523 }
524
525 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
526 let canceller = self._canceller.clone_for_async();
527 let instrument_id = cmd.instrument_id;
528 let order_side = Some(cmd.order_side);
529
530 self.spawn_task("cancel_all_orders", async move {
531 match canceller
532 .broadcast_cancel_all(instrument_id, order_side)
533 .await
534 {
535 Ok(reports) => {
536 for report in reports {
537 dispatch_order_status_report(report);
538 }
539 }
540 Err(e) => tracing::error!("BitMEX cancel all failed: {e:?}"),
541 }
542 Ok(())
543 });
544
545 Ok(())
546 }
547
548 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
549 let canceller = self._canceller.clone_for_async();
550 let instrument_id = cmd.instrument_id;
551 let venue_ids: Vec<VenueOrderId> = cmd
552 .cancels
553 .iter()
554 .map(|cancel| cancel.venue_order_id)
555 .collect();
556
557 self.spawn_task("batch_cancel_orders", async move {
558 match canceller
559 .broadcast_batch_cancel(instrument_id, None, Some(venue_ids))
560 .await
561 {
562 Ok(reports) => {
563 for report in reports {
564 dispatch_order_status_report(report);
565 }
566 }
567 Err(e) => tracing::error!("BitMEX batch cancel failed: {e:?}"),
568 }
569 Ok(())
570 });
571
572 Ok(())
573 }
574
575 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
576 self.update_account_state()
577 }
578
579 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
580 let http_client = self.http_client.clone();
581 let instrument_id = cmd.instrument_id;
582 let client_order_id = Some(cmd.client_order_id);
583 let venue_order_id = Some(cmd.venue_order_id);
584
585 self.spawn_task("query_order", async move {
586 match http_client
587 .request_order_status_report(instrument_id, client_order_id, venue_order_id)
588 .await
589 {
590 Ok(report) => dispatch_order_status_report(report),
591 Err(e) => tracing::error!("BitMEX query order failed: {e:?}"),
592 }
593 Ok(())
594 });
595
596 Ok(())
597 }
598
599 async fn connect(&mut self) -> anyhow::Result<()> {
600 if self.connected {
601 return Ok(());
602 }
603
604 self.ensure_instruments_initialized_async().await?;
605
606 self._submitter.start().await?;
607 self._canceller.start().await?;
608
609 self.ws_client.connect().await?;
610 self.ws_client.wait_until_active(10.0).await?;
611
612 self.ws_client.subscribe_orders().await?;
613 self.ws_client.subscribe_executions().await?;
614 self.ws_client.subscribe_positions().await?;
615 self.ws_client.subscribe_wallet().await?;
616 if let Err(e) = self.ws_client.subscribe_margin().await {
617 tracing::debug!("Margin subscription unavailable: {e:?}");
618 }
619
620 self.start_ws_stream()?;
621 self.refresh_account_state().await?;
622
623 self.connected = true;
624 self.core.set_connected(true);
625 tracing::info!(client_id = %self.core.client_id, "Connected");
626 Ok(())
627 }
628
629 async fn disconnect(&mut self) -> anyhow::Result<()> {
630 if !self.connected {
631 return Ok(());
632 }
633
634 self.http_client.cancel_all_requests();
635 self._submitter.stop().await;
636 self._canceller.stop().await;
637
638 if let Err(e) = self.ws_client.close().await {
639 tracing::warn!("Error while closing BitMEX execution websocket: {e:?}");
640 }
641
642 if let Some(handle) = self.ws_stream_handle.take() {
643 handle.abort();
644 }
645
646 self.abort_pending_tasks();
647 self.connected = false;
648 self.core.set_connected(false);
649 tracing::info!(client_id = %self.core.client_id, "Disconnected");
650 Ok(())
651 }
652}
653
654#[async_trait(?Send)]
655impl LiveExecutionClient for BitmexExecutionClient {
656 async fn generate_order_status_report(
657 &self,
658 cmd: &GenerateOrderStatusReport,
659 ) -> anyhow::Result<Option<OrderStatusReport>> {
660 let instrument_id = cmd
661 .instrument_id
662 .context("BitMEX generate_order_status_report requires an instrument identifier")?;
663
664 self.http_client
665 .query_order(
666 instrument_id,
667 cmd.client_order_id,
668 cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
669 )
670 .await
671 .context("failed to query BitMEX order status")
672 }
673
674 async fn generate_order_status_reports(
675 &self,
676 cmd: &GenerateOrderStatusReport,
677 ) -> anyhow::Result<Vec<OrderStatusReport>> {
678 let reports = self
679 .http_client
680 .request_order_status_reports(cmd.instrument_id, false, None)
681 .await
682 .context("failed to request BitMEX order status reports")?;
683 Ok(reports)
684 }
685
686 async fn generate_fill_reports(
687 &self,
688 cmd: GenerateFillReports,
689 ) -> anyhow::Result<Vec<FillReport>> {
690 let mut reports = self
691 .http_client
692 .request_fill_reports(cmd.instrument_id, None)
693 .await
694 .context("failed to request BitMEX fill reports")?;
695
696 if let Some(order_id) = cmd.venue_order_id {
697 reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
698 }
699
700 Ok(reports)
701 }
702
703 async fn generate_position_status_reports(
704 &self,
705 cmd: &GeneratePositionReports,
706 ) -> anyhow::Result<Vec<PositionStatusReport>> {
707 let mut reports = self
708 .http_client
709 .request_position_status_reports()
710 .await
711 .context("failed to request BitMEX position reports")?;
712
713 if let Some(instrument_id) = cmd.instrument_id {
714 reports.retain(|report| report.instrument_id == instrument_id);
715 }
716
717 Ok(reports)
718 }
719
720 async fn generate_mass_status(
721 &self,
722 _lookback_mins: Option<u64>,
723 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
724 tracing::warn!("generate_mass_status not yet implemented for BitMEX execution client");
725 Ok(None)
726 }
727}
728
729fn dispatch_ws_message(message: NautilusWsMessage) {
730 match message {
731 NautilusWsMessage::OrderStatusReports(reports) => {
732 for report in reports {
733 dispatch_order_status_report(report);
734 }
735 }
736 NautilusWsMessage::FillReports(reports) => {
737 for report in reports {
738 dispatch_fill_report(report);
739 }
740 }
741 NautilusWsMessage::PositionStatusReport(report) => {
742 dispatch_position_status_report(report);
743 }
744 NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
745 NautilusWsMessage::OrderUpdated(event) => {
746 dispatch_order_event(OrderEventAny::Updated(event));
747 }
748 NautilusWsMessage::Data(_)
749 | NautilusWsMessage::Instruments(_)
750 | NautilusWsMessage::FundingRateUpdates(_) => {
751 tracing::debug!("Ignoring BitMEX data message on execution stream");
752 }
753 NautilusWsMessage::Reconnected => {
754 tracing::info!("BitMEX execution websocket reconnected");
755 }
756 NautilusWsMessage::Authenticated => {
757 tracing::debug!("BitMEX execution websocket authenticated");
758 }
759 }
760}
761
762fn dispatch_account_state(state: AccountState) {
763 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
764}
765
766fn dispatch_order_status_report(report: OrderStatusReport) {
767 let sender = get_exec_event_sender();
768 let exec_report = ExecutionReport::OrderStatus(Box::new(report));
769 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
770 tracing::warn!("Failed to send order status report: {e}");
771 }
772}
773
774fn dispatch_fill_report(report: FillReport) {
775 let sender = get_exec_event_sender();
776 let exec_report = ExecutionReport::Fill(Box::new(report));
777 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
778 tracing::warn!("Failed to send fill report: {e}");
779 }
780}
781
782fn dispatch_position_status_report(report: PositionStatusReport) {
783 let sender = get_exec_event_sender();
784 let exec_report = ExecutionReport::Position(Box::new(report));
785 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
786 tracing::warn!("Failed to send position status report: {e}");
787 }
788}
789
790fn dispatch_order_event(event: OrderEventAny) {
791 let sender = get_exec_event_sender();
792 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
793 tracing::warn!("Failed to send order event: {e}");
794 }
795}