1pub mod canceller;
19
20use std::{any::Any, cell::Ref, future::Future, sync::Mutex};
21
22use anyhow::Context;
23use async_trait::async_trait;
24use futures_util::{StreamExt, pin_mut};
25use nautilus_common::{
26 clock::Clock,
27 messages::{
28 ExecutionEvent,
29 execution::{
30 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31 GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
32 QueryOrder, SubmitOrder, SubmitOrderList,
33 },
34 },
35 msgbus,
36 runner::get_exec_event_sender,
37 runtime::get_runtime,
38};
39use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
40use nautilus_execution::client::{ExecutionClient, LiveExecutionClient, base::ExecutionClientCore};
41use nautilus_live::execution::LiveExecutionClientExt;
42use nautilus_model::{
43 events::{AccountState, OrderEventAny, OrderRejected},
44 identifiers::{AccountId, VenueOrderId},
45 instruments::Instrument,
46 orders::Order,
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52 config::BitmexExecClientConfig,
53 http::client::BitmexHttpClient,
54 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
55};
56
57#[derive(Debug)]
58pub struct BitmexExecutionClient {
59 core: ExecutionClientCore,
60 config: BitmexExecClientConfig,
61 http_client: BitmexHttpClient,
62 ws_client: BitmexWebSocketClient,
63 started: bool,
64 connected: bool,
65 instruments_initialized: bool,
66 ws_stream_handle: Option<JoinHandle<()>>,
67 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
68}
69
70impl BitmexExecutionClient {
71 pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
77 if !config.has_api_credentials() {
78 anyhow::bail!("BitMEX execution client requires API key and secret");
79 }
80
81 let http_client = BitmexHttpClient::new(
82 Some(config.http_base_url()),
83 config.api_key.clone(),
84 config.api_secret.clone(),
85 config.use_testnet,
86 config.http_timeout_secs,
87 config.max_retries,
88 config.retry_delay_initial_ms,
89 config.retry_delay_max_ms,
90 config.recv_window_ms,
91 config.max_requests_per_second,
92 config.max_requests_per_minute,
93 )
94 .context("failed to construct BitMEX HTTP client")?;
95
96 let account_id = config.account_id.unwrap_or(core.account_id);
97 let ws_client = BitmexWebSocketClient::new(
98 Some(config.ws_url()),
99 config.api_key.clone(),
100 config.api_secret.clone(),
101 Some(account_id),
102 config.heartbeat_interval_secs,
103 )
104 .context("failed to construct BitMEX execution websocket client")?;
105
106 Ok(Self {
107 core,
108 config,
109 http_client,
110 ws_client,
111 started: false,
112 connected: false,
113 instruments_initialized: false,
114 ws_stream_handle: None,
115 pending_tasks: Mutex::new(Vec::new()),
116 })
117 }
118
119 fn spawn_task<F>(&self, label: &'static str, fut: F)
120 where
121 F: Future<Output = anyhow::Result<()>> + Send + 'static,
122 {
123 let handle = tokio::spawn(async move {
124 if let Err(err) = fut.await {
125 tracing::error!("{label}: {err:?}");
126 }
127 });
128
129 self.pending_tasks
130 .lock()
131 .expect("pending task lock poisoned")
132 .push(handle);
133 }
134
135 fn abort_pending_tasks(&self) {
136 let mut guard = self
137 .pending_tasks
138 .lock()
139 .expect("pending task lock poisoned");
140 for handle in guard.drain(..) {
141 handle.abort();
142 }
143 }
144
145 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
146 if self.instruments_initialized {
147 return Ok(());
148 }
149
150 let http = self.http_client.clone();
151 let mut instruments = http
152 .request_instruments(self.config.active_only)
153 .await
154 .context("failed to request BitMEX instruments")?;
155
156 instruments.sort_by_key(|instrument| instrument.id());
157
158 for instrument in &instruments {
159 self.http_client.add_instrument(instrument.clone());
160 }
161
162 self.ws_client.initialize_instruments_cache(instruments);
163
164 self.instruments_initialized = true;
165 Ok(())
166 }
167
168 fn ensure_instruments_initialized(&mut self) -> anyhow::Result<()> {
169 if self.instruments_initialized {
170 return Ok(());
171 }
172
173 let runtime = get_runtime();
174 runtime.block_on(self.ensure_instruments_initialized_async())
175 }
176
177 async fn refresh_account_state(&self) -> anyhow::Result<()> {
178 let account_state = self
179 .http_client
180 .request_account_state(self.core.account_id)
181 .await
182 .context("failed to request BitMEX account state")?;
183
184 dispatch_account_state(account_state);
185 Ok(())
186 }
187
188 fn update_account_state(&self) -> anyhow::Result<()> {
189 let runtime = get_runtime();
190 runtime.block_on(self.refresh_account_state())
191 }
192
193 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
194 if self.ws_stream_handle.is_some() {
195 return Ok(());
196 }
197
198 let stream = self.ws_client.stream();
199 let handle = tokio::spawn(async move {
200 pin_mut!(stream);
201 while let Some(message) = stream.next().await {
202 dispatch_ws_message(message);
203 }
204 });
205
206 self.ws_stream_handle = Some(handle);
207 Ok(())
208 }
209}
210
211impl ExecutionClient for BitmexExecutionClient {
212 fn is_connected(&self) -> bool {
213 self.connected
214 }
215
216 fn client_id(&self) -> nautilus_model::identifiers::ClientId {
217 self.core.client_id
218 }
219
220 fn account_id(&self) -> AccountId {
221 self.core.account_id
222 }
223
224 fn venue(&self) -> nautilus_model::identifiers::Venue {
225 self.core.venue
226 }
227
228 fn oms_type(&self) -> nautilus_model::enums::OmsType {
229 self.core.oms_type
230 }
231
232 fn get_account(&self) -> Option<nautilus_model::accounts::AccountAny> {
233 self.core.get_account()
234 }
235
236 fn generate_account_state(
237 &self,
238 balances: Vec<nautilus_model::types::AccountBalance>,
239 margins: Vec<nautilus_model::types::MarginBalance>,
240 reported: bool,
241 ts_event: UnixNanos,
242 ) -> anyhow::Result<()> {
243 self.core
244 .generate_account_state(balances, margins, reported, ts_event)
245 }
246
247 fn start(&mut self) -> anyhow::Result<()> {
248 if self.started {
249 return Ok(());
250 }
251
252 self.ensure_instruments_initialized()?;
253 self.started = true;
254 tracing::info!("BitMEX execution client {} started", self.core.client_id);
255 Ok(())
256 }
257
258 fn stop(&mut self) -> anyhow::Result<()> {
259 if !self.started {
260 return Ok(());
261 }
262
263 self.started = false;
264 self.connected = false;
265 if let Some(handle) = self.ws_stream_handle.take() {
266 handle.abort();
267 }
268 self.abort_pending_tasks();
269 tracing::info!("BitMEX execution client {} stopped", self.core.client_id);
270 Ok(())
271 }
272
273 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
274 let order = cmd.order.clone();
275
276 if order.is_closed() {
277 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
278 return Ok(());
279 }
280
281 self.core.generate_order_submitted(
282 order.strategy_id(),
283 order.instrument_id(),
284 order.client_order_id(),
285 cmd.ts_init,
286 );
287
288 let http_client = self.http_client.clone();
289 let trader_id = self.core.trader_id;
290 let strategy_id = order.strategy_id();
291 let instrument_id = order.instrument_id();
292 let account_id = self.core.account_id;
293 let client_order_id = order.client_order_id();
294 let order_side = order.order_side();
295 let order_type = order.order_type();
296 let quantity = order.quantity();
297 let time_in_force = order.time_in_force();
298 let price = order.price();
299 let trigger_price = order.trigger_price();
300 let trigger_type = order.trigger_type();
301 let display_qty = order.display_qty();
302 let post_only = order.is_post_only();
303 let reduce_only = order.is_reduce_only();
304 let order_list_id = order.order_list_id();
305 let contingency_type = order.contingency_type();
306 let ts_event = cmd.ts_init;
307
308 self.spawn_task("submit_order", async move {
309 match http_client
310 .submit_order(
311 instrument_id,
312 client_order_id,
313 order_side,
314 order_type,
315 quantity,
316 time_in_force,
317 price,
318 trigger_price,
319 trigger_type,
320 display_qty,
321 post_only,
322 reduce_only,
323 order_list_id,
324 contingency_type,
325 )
326 .await
327 {
328 Ok(report) => dispatch_order_status_report(report),
329 Err(err) => {
330 let event = OrderRejected::new(
331 trader_id,
332 strategy_id,
333 instrument_id,
334 client_order_id,
335 account_id,
336 format!("submit-order-error: {err}").into(),
337 UUID4::new(),
338 ts_event,
339 get_atomic_clock_realtime().get_time_ns(),
340 false,
341 post_only,
342 );
343 dispatch_order_event(OrderEventAny::Rejected(event));
344 }
345 }
346 Ok(())
347 });
348
349 Ok(())
350 }
351
352 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
353 tracing::warn!(
354 "submit_order_list not yet implemented for BitMEX execution client ({} orders)",
355 cmd.order_list.orders.len()
356 );
357 Ok(())
358 }
359
360 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
361 let http_client = self.http_client.clone();
362 let instrument_id = cmd.instrument_id;
363 let client_order_id = Some(cmd.client_order_id);
364 let venue_order_id = Some(cmd.venue_order_id);
365 let quantity = cmd.quantity;
366 let price = cmd.price;
367 let trigger_price = cmd.trigger_price;
368
369 self.spawn_task("modify_order", async move {
370 match http_client
371 .modify_order(
372 instrument_id,
373 client_order_id,
374 venue_order_id,
375 quantity,
376 price,
377 trigger_price,
378 )
379 .await
380 {
381 Ok(report) => dispatch_order_status_report(report),
382 Err(err) => tracing::error!("BitMEX modify order failed: {err:?}"),
383 }
384 Ok(())
385 });
386
387 Ok(())
388 }
389
390 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
391 let http_client = self.http_client.clone();
392 let instrument_id = cmd.instrument_id;
393 let client_order_id = Some(cmd.client_order_id);
394 let venue_order_id = Some(cmd.venue_order_id);
395
396 self.spawn_task("cancel_order", async move {
397 match http_client
398 .cancel_order(instrument_id, client_order_id, venue_order_id)
399 .await
400 {
401 Ok(report) => dispatch_order_status_report(report),
402 Err(err) => tracing::error!("BitMEX cancel order failed: {err:?}"),
403 }
404 Ok(())
405 });
406
407 Ok(())
408 }
409
410 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
411 let http_client = self.http_client.clone();
412 let instrument_id = cmd.instrument_id;
413 let order_side = Some(cmd.order_side);
414
415 self.spawn_task("cancel_all_orders", async move {
416 match http_client
417 .cancel_all_orders(instrument_id, order_side)
418 .await
419 {
420 Ok(reports) => {
421 for report in reports {
422 dispatch_order_status_report(report);
423 }
424 }
425 Err(err) => tracing::error!("BitMEX cancel all failed: {err:?}"),
426 }
427 Ok(())
428 });
429
430 Ok(())
431 }
432
433 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
434 let http_client = self.http_client.clone();
435 let instrument_id = cmd.instrument_id;
436 let venue_ids: Vec<VenueOrderId> = cmd
437 .cancels
438 .iter()
439 .map(|cancel| cancel.venue_order_id)
440 .collect();
441
442 self.spawn_task("batch_cancel_orders", async move {
443 match http_client
444 .cancel_orders(instrument_id, None, Some(venue_ids))
445 .await
446 {
447 Ok(reports) => {
448 for report in reports {
449 dispatch_order_status_report(report);
450 }
451 }
452 Err(err) => tracing::error!("BitMEX batch cancel failed: {err:?}"),
453 }
454 Ok(())
455 });
456
457 Ok(())
458 }
459
460 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
461 self.update_account_state()
462 }
463
464 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
465 let http_client = self.http_client.clone();
466 let instrument_id = cmd.instrument_id;
467 let client_order_id = Some(cmd.client_order_id);
468 let venue_order_id = Some(cmd.venue_order_id);
469
470 self.spawn_task("query_order", async move {
471 match http_client
472 .request_order_status_report(instrument_id, client_order_id, venue_order_id)
473 .await
474 {
475 Ok(report) => dispatch_order_status_report(report),
476 Err(err) => tracing::error!("BitMEX query order failed: {err:?}"),
477 }
478 Ok(())
479 });
480
481 Ok(())
482 }
483}
484
485#[async_trait(?Send)]
486impl LiveExecutionClient for BitmexExecutionClient {
487 async fn connect(&mut self) -> anyhow::Result<()> {
488 if self.connected {
489 return Ok(());
490 }
491
492 self.ensure_instruments_initialized_async().await?;
493
494 self.ws_client.connect().await?;
495 self.ws_client.wait_until_active(10.0).await?;
496
497 self.ws_client.subscribe_orders().await?;
498 self.ws_client.subscribe_executions().await?;
499 self.ws_client.subscribe_positions().await?;
500 self.ws_client.subscribe_wallet().await?;
501 if let Err(err) = self.ws_client.subscribe_margin().await {
502 tracing::debug!("Margin subscription unavailable: {err:?}");
503 }
504
505 self.start_ws_stream()?;
506 self.refresh_account_state().await?;
507
508 self.connected = true;
509 self.core.set_connected(true);
510 tracing::info!("BitMEX execution client {} connected", self.core.client_id);
511 Ok(())
512 }
513
514 async fn disconnect(&mut self) -> anyhow::Result<()> {
515 if !self.connected {
516 return Ok(());
517 }
518
519 self.http_client.cancel_all_requests();
520 if let Err(err) = self.ws_client.close().await {
521 tracing::warn!("Error while closing BitMEX execution websocket: {err:?}");
522 }
523
524 if let Some(handle) = self.ws_stream_handle.take() {
525 handle.abort();
526 }
527
528 self.abort_pending_tasks();
529 self.connected = false;
530 self.core.set_connected(false);
531 tracing::info!(
532 "BitMEX execution client {} disconnected",
533 self.core.client_id
534 );
535 Ok(())
536 }
537
538 async fn generate_order_status_report(
539 &self,
540 cmd: &GenerateOrderStatusReport,
541 ) -> anyhow::Result<Option<OrderStatusReport>> {
542 let instrument_id = cmd
543 .instrument_id
544 .context("BitMEX generate_order_status_report requires an instrument identifier")?;
545
546 self.http_client
547 .query_order(
548 instrument_id,
549 cmd.client_order_id,
550 cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
551 )
552 .await
553 .context("failed to query BitMEX order status")
554 }
555
556 async fn generate_order_status_reports(
557 &self,
558 cmd: &GenerateOrderStatusReport,
559 ) -> anyhow::Result<Vec<OrderStatusReport>> {
560 let reports = self
561 .http_client
562 .request_order_status_reports(cmd.instrument_id, false, None)
563 .await
564 .context("failed to request BitMEX order status reports")?;
565 Ok(reports)
566 }
567
568 async fn generate_fill_reports(
569 &self,
570 cmd: GenerateFillReports,
571 ) -> anyhow::Result<Vec<FillReport>> {
572 let mut reports = self
573 .http_client
574 .request_fill_reports(cmd.instrument_id, None)
575 .await
576 .context("failed to request BitMEX fill reports")?;
577
578 if let Some(order_id) = cmd.venue_order_id {
579 reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
580 }
581
582 Ok(reports)
583 }
584
585 async fn generate_position_status_reports(
586 &self,
587 cmd: &GeneratePositionReports,
588 ) -> anyhow::Result<Vec<PositionStatusReport>> {
589 let mut reports = self
590 .http_client
591 .request_position_status_reports()
592 .await
593 .context("failed to request BitMEX position reports")?;
594
595 if let Some(instrument_id) = cmd.instrument_id {
596 reports.retain(|report| report.instrument_id == instrument_id);
597 }
598
599 Ok(reports)
600 }
601
602 async fn generate_mass_status(
603 &self,
604 _lookback_mins: Option<u64>,
605 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
606 tracing::warn!("generate_mass_status not yet implemented for BitMEX execution client");
607 Ok(None)
608 }
609}
610
611fn dispatch_ws_message(message: NautilusWsMessage) {
612 match message {
613 NautilusWsMessage::OrderStatusReports(reports) => {
614 for report in reports {
615 dispatch_order_status_report(report);
616 }
617 }
618 NautilusWsMessage::FillReports(reports) => {
619 for report in reports {
620 dispatch_fill_report(report);
621 }
622 }
623 NautilusWsMessage::PositionStatusReport(report) => {
624 dispatch_position_status_report(report);
625 }
626 NautilusWsMessage::AccountState(state) => dispatch_account_state(state),
627 NautilusWsMessage::OrderUpdated(event) => {
628 dispatch_order_event(OrderEventAny::Updated(event));
629 }
630 NautilusWsMessage::Data(_) | NautilusWsMessage::FundingRateUpdates(_) => {
631 tracing::debug!("Ignoring BitMEX data message on execution stream");
632 }
633 NautilusWsMessage::Reconnected => {
634 tracing::info!("BitMEX execution websocket reconnected");
635 }
636 }
637}
638
639fn dispatch_account_state(state: AccountState) {
640 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
641}
642
643fn dispatch_order_status_report(report: OrderStatusReport) {
644 let sender = get_exec_event_sender();
645 let exec_report = nautilus_common::messages::ExecutionReport::OrderStatus(Box::new(report));
646 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
647 tracing::warn!("Failed to send order status report: {e}");
648 }
649}
650
651fn dispatch_fill_report(report: FillReport) {
652 let sender = get_exec_event_sender();
653 let exec_report = nautilus_common::messages::ExecutionReport::Fill(Box::new(report));
654 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
655 tracing::warn!("Failed to send fill report: {e}");
656 }
657}
658
659fn dispatch_position_status_report(report: PositionStatusReport) {
660 let sender = get_exec_event_sender();
661 let exec_report = nautilus_common::messages::ExecutionReport::Position(Box::new(report));
662 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
663 tracing::warn!("Failed to send position status report: {e}");
664 }
665}
666
667fn dispatch_order_event(event: OrderEventAny) {
668 let sender = get_exec_event_sender();
669 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
670 tracing::warn!("Failed to send order event: {e}");
671 }
672}
673
674impl LiveExecutionClientExt for BitmexExecutionClient {
675 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<ExecutionEvent> {
676 get_exec_event_sender()
677 }
678
679 fn get_clock(&self) -> Ref<'_, dyn Clock> {
680 self.core.clock().borrow()
681 }
682}