1use std::{future::Future, str::FromStr, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures_util::{StreamExt, pin_mut};
23use indexmap::IndexMap;
24use nautilus_common::{
25 clients::ExecutionClient,
26 enums::LogLevel,
27 live::{get_runtime, runner::get_exec_event_sender},
28 messages::execution::{
29 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
30 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
31 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
32 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
33 SubmitOrderList,
34 },
35};
36use nautilus_core::{
37 UnixNanos,
38 time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::{AccountType, OmsType, OrderSide},
44 events::OrderEventAny,
45 identifiers::{AccountId, ClientId, ClientOrderId, Venue, VenueOrderId},
46 instruments::{Instrument, InstrumentAny},
47 orders::{Order, OrderAny},
48 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
49 types::{AccountBalance, MarginBalance},
50};
51use rust_decimal::prelude::ToPrimitive;
52use tokio::task::JoinHandle;
53
54use crate::{
55 broadcast::{
56 canceller::{CancelBroadcaster, CancelBroadcasterConfig},
57 submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
58 },
59 common::enums::BitmexPegPriceType,
60 config::BitmexExecClientConfig,
61 http::client::BitmexHttpClient,
62 websocket::{client::BitmexWebSocketClient, messages::NautilusWsMessage},
63};
64
65#[derive(Debug)]
66pub struct BitmexExecutionClient {
67 core: ExecutionClientCore,
68 clock: &'static AtomicTime,
69 config: BitmexExecClientConfig,
70 emitter: ExecutionEventEmitter,
71 http_client: BitmexHttpClient,
72 ws_client: BitmexWebSocketClient,
73 _submitter: SubmitBroadcaster,
74 _canceller: CancelBroadcaster,
75 ws_stream_handle: Option<JoinHandle<()>>,
76 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
77}
78
79impl BitmexExecutionClient {
80 fn log_report_receipt(count: usize, report_type: &str, log_level: LogLevel) {
81 let plural = if count == 1 { "" } else { "s" };
82 let message = format!("Received {count} {report_type}{plural}");
83
84 match log_level {
85 LogLevel::Off => {}
86 LogLevel::Trace => log::trace!("{message}"),
87 LogLevel::Debug => log::debug!("{message}"),
88 LogLevel::Info => log::info!("{message}"),
89 LogLevel::Warning => log::warn!("{message}"),
90 LogLevel::Error => log::error!("{message}"),
91 }
92 }
93
94 pub fn new(core: ExecutionClientCore, config: BitmexExecClientConfig) -> anyhow::Result<Self> {
100 if !config.has_api_credentials() {
101 anyhow::bail!("BitMEX execution client requires API key and secret");
102 }
103
104 let trader_id = core.trader_id;
105 let account_id = config.account_id.unwrap_or(core.account_id);
106 let clock = get_atomic_clock_realtime();
107 let emitter =
108 ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
109 let http_client = BitmexHttpClient::new(
110 Some(config.http_base_url()),
111 config.api_key.clone(),
112 config.api_secret.clone(),
113 config.use_testnet,
114 config.http_timeout_secs,
115 config.max_retries,
116 config.retry_delay_initial_ms,
117 config.retry_delay_max_ms,
118 config.recv_window_ms,
119 config.max_requests_per_second,
120 config.max_requests_per_minute,
121 config.http_proxy_url.clone(),
122 )
123 .context("failed to construct BitMEX HTTP client")?;
124 let ws_client = BitmexWebSocketClient::new(
125 Some(config.ws_url()),
126 config.api_key.clone(),
127 config.api_secret.clone(),
128 Some(account_id),
129 config.heartbeat_interval_secs,
130 )
131 .context("failed to construct BitMEX execution websocket client")?;
132
133 let pool_size = config.submitter_pool_size.unwrap_or(1);
134 let submitter_proxy_urls = match &config.submitter_proxy_urls {
135 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
136 None => vec![config.http_proxy_url.clone(); pool_size],
137 };
138
139 let submitter_config = SubmitBroadcasterConfig {
140 pool_size,
141 api_key: config.api_key.clone(),
142 api_secret: config.api_secret.clone(),
143 base_url: config.base_url_http.clone(),
144 testnet: config.use_testnet,
145 timeout_secs: config.http_timeout_secs,
146 max_retries: config.max_retries,
147 retry_delay_ms: config.retry_delay_initial_ms,
148 retry_delay_max_ms: config.retry_delay_max_ms,
149 recv_window_ms: config.recv_window_ms,
150 max_requests_per_second: config.max_requests_per_second,
151 max_requests_per_minute: config.max_requests_per_minute,
152 proxy_urls: submitter_proxy_urls,
153 ..Default::default()
154 };
155
156 let _submitter = SubmitBroadcaster::new(submitter_config)
157 .context("failed to create SubmitBroadcaster")?;
158
159 let canceller_pool_size = config.canceller_pool_size.unwrap_or(1);
160 let canceller_proxy_urls = match &config.canceller_proxy_urls {
161 Some(urls) => urls.iter().map(|url| Some(url.clone())).collect(),
162 None => vec![config.http_proxy_url.clone(); canceller_pool_size],
163 };
164
165 let canceller_config = CancelBroadcasterConfig {
166 pool_size: canceller_pool_size,
167 api_key: config.api_key.clone(),
168 api_secret: config.api_secret.clone(),
169 base_url: config.base_url_http.clone(),
170 testnet: config.use_testnet,
171 timeout_secs: config.http_timeout_secs,
172 max_retries: config.max_retries,
173 retry_delay_ms: config.retry_delay_initial_ms,
174 retry_delay_max_ms: config.retry_delay_max_ms,
175 recv_window_ms: config.recv_window_ms,
176 max_requests_per_second: config.max_requests_per_second,
177 max_requests_per_minute: config.max_requests_per_minute,
178 proxy_urls: canceller_proxy_urls,
179 ..Default::default()
180 };
181
182 let _canceller = CancelBroadcaster::new(canceller_config)
183 .context("failed to create CancelBroadcaster")?;
184
185 Ok(Self {
186 core,
187 clock,
188 config,
189 emitter,
190 http_client,
191 ws_client,
192 _submitter,
193 _canceller,
194 ws_stream_handle: None,
195 pending_tasks: Mutex::new(Vec::new()),
196 })
197 }
198
199 fn spawn_task<F>(&self, label: &'static str, fut: F)
200 where
201 F: Future<Output = anyhow::Result<()>> + Send + 'static,
202 {
203 let handle = get_runtime().spawn(async move {
204 if let Err(e) = fut.await {
205 log::error!("{label}: {e:?}");
206 }
207 });
208
209 let mut guard = self
210 .pending_tasks
211 .lock()
212 .expect("pending task lock poisoned");
213
214 guard.retain(|h| !h.is_finished());
216 guard.push(handle);
217 }
218
219 fn abort_pending_tasks(&self) {
220 let mut guard = self
221 .pending_tasks
222 .lock()
223 .expect("pending task lock poisoned");
224 for handle in guard.drain(..) {
225 handle.abort();
226 }
227 }
228
229 async fn ensure_instruments_initialized_async(&mut self) -> anyhow::Result<()> {
230 if self.core.instruments_initialized() {
231 return Ok(());
232 }
233
234 let mut instruments: Vec<InstrumentAny> = {
235 let cache = self.core.cache();
236 cache
237 .instruments(&self.core.venue, None)
238 .into_iter()
239 .cloned()
240 .collect()
241 };
242
243 if instruments.is_empty() {
244 let http = self.http_client.clone();
245 instruments = http
246 .request_instruments(self.config.active_only)
247 .await
248 .context("failed to request BitMEX instruments")?;
249 } else {
250 log::debug!(
251 "Reusing {} cached BitMEX instruments for execution client initialization",
252 instruments.len()
253 );
254 }
255
256 instruments.sort_by_key(|instrument| instrument.id());
257
258 for instrument in &instruments {
259 self.http_client.cache_instrument(instrument.clone());
260 self._submitter.cache_instrument(instrument.clone());
261 self._canceller.cache_instrument(instrument.clone());
262 }
263
264 self.ws_client.cache_instruments(instruments);
265
266 self.core.set_instruments_initialized();
267 Ok(())
268 }
269
270 async fn refresh_account_state(&self) -> anyhow::Result<()> {
271 let account_state = self
272 .http_client
273 .request_account_state(self.core.account_id)
274 .await
275 .context("failed to request BitMEX account state")?;
276
277 self.emitter.send_account_state(account_state);
278 Ok(())
279 }
280
281 fn start_ws_stream(&mut self) -> anyhow::Result<()> {
282 if self.ws_stream_handle.is_some() {
283 return Ok(());
284 }
285
286 let stream = self.ws_client.stream();
287 let emitter = self.emitter.clone();
288
289 let handle = get_runtime().spawn(async move {
290 pin_mut!(stream);
291 while let Some(message) = stream.next().await {
292 dispatch_ws_message(message, &emitter);
293 }
294 });
295
296 self.ws_stream_handle = Some(handle);
297 Ok(())
298 }
299
300 fn submit_cached_order(
301 &self,
302 order: OrderAny,
303 submit_tries: Option<usize>,
304 peg_price_type: Option<BitmexPegPriceType>,
305 peg_offset_value: Option<f64>,
306 task_label: &'static str,
307 ) -> anyhow::Result<()> {
308 if order.is_closed() {
309 log::warn!("Cannot submit closed order {}", order.client_order_id());
310 return Ok(());
311 }
312
313 self.emitter.emit_order_submitted(&order);
314
315 let use_broadcaster = submit_tries.is_some_and(|n| n > 1);
316 let http_client = self.http_client.clone();
317 let submitter = self._submitter.clone_for_async();
318 let emitter = self.emitter.clone();
319 let clock = self.clock;
320 let strategy_id = order.strategy_id();
321 let instrument_id = order.instrument_id();
322 let client_order_id = order.client_order_id();
323 let order_side = order.order_side();
324 let order_type = order.order_type();
325 let quantity = order.quantity();
326 let time_in_force = order.time_in_force();
327 let price = order.price();
328 let trigger_price = order.trigger_price();
329 let trigger_type = order.trigger_type();
330 let trailing_offset = order.trailing_offset().and_then(|d| d.to_f64());
331 let trailing_offset_type = order.trailing_offset_type();
332 let display_qty = order.display_qty();
333 let post_only = order.is_post_only();
334 let reduce_only = order.is_reduce_only();
335 let order_list_id = order.order_list_id();
336 let contingency_type = order.contingency_type();
337
338 self.spawn_task(task_label, async move {
339 let result = if use_broadcaster {
340 submitter
341 .broadcast_submit(
342 instrument_id,
343 client_order_id,
344 order_side,
345 order_type,
346 quantity,
347 time_in_force,
348 price,
349 trigger_price,
350 trigger_type,
351 trailing_offset,
352 trailing_offset_type,
353 display_qty,
354 post_only,
355 reduce_only,
356 order_list_id,
357 contingency_type,
358 submit_tries,
359 peg_price_type,
360 peg_offset_value,
361 )
362 .await
363 } else {
364 http_client
365 .submit_order(
366 instrument_id,
367 client_order_id,
368 order_side,
369 order_type,
370 quantity,
371 time_in_force,
372 price,
373 trigger_price,
374 trigger_type,
375 trailing_offset,
376 trailing_offset_type,
377 display_qty,
378 post_only,
379 reduce_only,
380 order_list_id,
381 contingency_type,
382 peg_price_type,
383 peg_offset_value,
384 )
385 .await
386 };
387
388 match result {
389 Ok(report) => emitter.send_order_status_report(report),
390 Err(e) => {
391 let error_msg = e.to_string();
392
393 if error_msg.contains("IDEMPOTENT_DUPLICATE") {
396 log::warn!(
397 "Order {client_order_id} may exist (duplicate clOrdID from all transports), \
398 awaiting WebSocket confirmation",
399 );
400 return Ok(());
401 }
402
403 let ts_event = clock.get_time_ns();
404 emitter.emit_order_rejected_event(
405 strategy_id,
406 instrument_id,
407 client_order_id,
408 &format!("submit-order-error: {error_msg}"),
409 ts_event,
410 post_only,
411 );
412 }
413 }
414 Ok(())
415 });
416
417 Ok(())
418 }
419}
420
421#[async_trait(?Send)]
422impl ExecutionClient for BitmexExecutionClient {
423 fn is_connected(&self) -> bool {
424 self.core.is_connected()
425 }
426
427 fn client_id(&self) -> ClientId {
428 self.core.client_id
429 }
430
431 fn account_id(&self) -> AccountId {
432 self.core.account_id
433 }
434
435 fn venue(&self) -> Venue {
436 self.core.venue
437 }
438
439 fn oms_type(&self) -> OmsType {
440 self.core.oms_type
441 }
442
443 fn get_account(&self) -> Option<AccountAny> {
444 self.core.cache().account(&self.core.account_id).cloned()
445 }
446
447 fn generate_account_state(
448 &self,
449 balances: Vec<AccountBalance>,
450 margins: Vec<MarginBalance>,
451 reported: bool,
452 ts_event: UnixNanos,
453 ) -> anyhow::Result<()> {
454 self.emitter
455 .emit_account_state(balances, margins, reported, ts_event);
456 Ok(())
457 }
458
459 fn start(&mut self) -> anyhow::Result<()> {
460 if self.core.is_started() {
461 return Ok(());
462 }
463
464 self.emitter.set_sender(get_exec_event_sender());
465 self.core.set_started();
466 log::info!(
467 "BitMEX execution client started: client_id={}, account_id={}, use_testnet={}, submitter_pool_size={:?}, canceller_pool_size={:?}, http_proxy_url={:?}, ws_proxy_url={:?}, submitter_proxy_urls={:?}, canceller_proxy_urls={:?}",
468 self.core.client_id,
469 self.core.account_id,
470 self.config.use_testnet,
471 self.config.submitter_pool_size,
472 self.config.canceller_pool_size,
473 self.config.http_proxy_url,
474 self.config.ws_proxy_url,
475 self.config.submitter_proxy_urls,
476 self.config.canceller_proxy_urls,
477 );
478 Ok(())
479 }
480
481 fn stop(&mut self) -> anyhow::Result<()> {
482 if self.core.is_stopped() {
483 return Ok(());
484 }
485
486 self.core.set_stopped();
487 self.core.set_disconnected();
488 if let Some(handle) = self.ws_stream_handle.take() {
489 handle.abort();
490 }
491 self.abort_pending_tasks();
492 log::info!("BitMEX execution client {} stopped", self.core.client_id);
493 Ok(())
494 }
495
496 async fn connect(&mut self) -> anyhow::Result<()> {
497 if self.core.is_connected() {
498 return Ok(());
499 }
500
501 self.http_client.reset_cancellation_token();
503
504 self.ensure_instruments_initialized_async().await?;
505
506 self.ws_client.connect().await?;
507 self.ws_client.wait_until_active(10.0).await?;
508
509 self._submitter.start().await?;
511 self._canceller.start().await?;
512
513 self.ws_client.subscribe_orders().await?;
514 self.ws_client.subscribe_executions().await?;
515 self.ws_client.subscribe_positions().await?;
516 self.ws_client.subscribe_wallet().await?;
517 if let Err(e) = self.ws_client.subscribe_margin().await {
518 log::debug!("Margin subscription unavailable: {e:?}");
519 }
520
521 self.start_ws_stream()?;
522 self.refresh_account_state().await?;
523
524 self.core.set_connected();
525 log::info!("Connected: client_id={}", self.core.client_id);
526 Ok(())
527 }
528
529 async fn disconnect(&mut self) -> anyhow::Result<()> {
530 if self.core.is_disconnected() {
531 return Ok(());
532 }
533
534 self.http_client.cancel_all_requests();
535 self._submitter.stop().await;
536 self._canceller.stop().await;
537
538 if let Err(e) = self.ws_client.close().await {
539 log::warn!("Error while closing BitMEX execution websocket: {e:?}");
540 }
541
542 if let Some(handle) = self.ws_stream_handle.take() {
543 handle.abort();
544 }
545
546 self.abort_pending_tasks();
547 self.core.set_disconnected();
548 log::info!("Disconnected: client_id={}", self.core.client_id);
549 Ok(())
550 }
551
552 async fn generate_order_status_report(
553 &self,
554 cmd: &GenerateOrderStatusReport,
555 ) -> anyhow::Result<Option<OrderStatusReport>> {
556 let instrument_id = cmd
557 .instrument_id
558 .context("BitMEX generate_order_status_report requires an instrument identifier")?;
559
560 self.http_client
561 .query_order(
562 instrument_id,
563 cmd.client_order_id,
564 cmd.venue_order_id.map(|id| VenueOrderId::from(id.as_str())),
565 )
566 .await
567 .context("failed to query BitMEX order status")
568 }
569
570 async fn generate_order_status_reports(
571 &self,
572 cmd: &GenerateOrderStatusReports,
573 ) -> anyhow::Result<Vec<OrderStatusReport>> {
574 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
575 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
576
577 let mut reports = self
578 .http_client
579 .request_order_status_reports(cmd.instrument_id, cmd.open_only, start_dt, end_dt, None)
580 .await
581 .context("failed to request BitMEX order status reports")?;
582
583 if let Some(start) = cmd.start {
584 reports.retain(|report| report.ts_last >= start);
585 }
586
587 if let Some(end) = cmd.end {
588 reports.retain(|report| report.ts_last <= end);
589 }
590
591 Self::log_report_receipt(reports.len(), "OrderStatusReport", cmd.log_receipt_level);
592
593 Ok(reports)
594 }
595
596 async fn generate_fill_reports(
597 &self,
598 cmd: GenerateFillReports,
599 ) -> anyhow::Result<Vec<FillReport>> {
600 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
601 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
602
603 let mut reports = self
604 .http_client
605 .request_fill_reports(cmd.instrument_id, start_dt, end_dt, None)
606 .await
607 .context("failed to request BitMEX fill reports")?;
608
609 if let Some(order_id) = cmd.venue_order_id {
610 reports.retain(|report| report.venue_order_id.as_str() == order_id.as_str());
611 }
612
613 if let Some(start) = cmd.start {
614 reports.retain(|report| report.ts_event >= start);
615 }
616
617 if let Some(end) = cmd.end {
618 reports.retain(|report| report.ts_event <= end);
619 }
620
621 Self::log_report_receipt(reports.len(), "FillReport", cmd.log_receipt_level);
622
623 Ok(reports)
624 }
625
626 async fn generate_position_status_reports(
627 &self,
628 cmd: &GeneratePositionStatusReports,
629 ) -> anyhow::Result<Vec<PositionStatusReport>> {
630 let mut reports = self
631 .http_client
632 .request_position_status_reports()
633 .await
634 .context("failed to request BitMEX position reports")?;
635
636 if let Some(instrument_id) = cmd.instrument_id {
637 reports.retain(|report| report.instrument_id == instrument_id);
638 }
639
640 if let Some(start) = cmd.start {
641 reports.retain(|report| report.ts_last >= start);
642 }
643
644 if let Some(end) = cmd.end {
645 reports.retain(|report| report.ts_last <= end);
646 }
647
648 Self::log_report_receipt(reports.len(), "PositionStatusReport", cmd.log_receipt_level);
649
650 Ok(reports)
651 }
652
653 async fn generate_mass_status(
654 &self,
655 lookback_mins: Option<u64>,
656 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
657 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
658
659 let ts_now = self.clock.get_time_ns();
660 let start = lookback_mins.map(|mins| {
661 let lookback_ns = mins.saturating_mul(60).saturating_mul(1_000_000_000);
662 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
663 });
664
665 let order_cmd = GenerateOrderStatusReportsBuilder::default()
666 .ts_init(ts_now)
667 .open_only(false)
668 .start(start)
669 .build()
670 .map_err(|e| anyhow::anyhow!("{e}"))?;
671
672 let fill_cmd = GenerateFillReportsBuilder::default()
673 .ts_init(ts_now)
674 .start(start)
675 .build()
676 .map_err(|e| anyhow::anyhow!("{e}"))?;
677
678 let position_cmd = GeneratePositionStatusReportsBuilder::default()
679 .ts_init(ts_now)
680 .start(start)
681 .build()
682 .map_err(|e| anyhow::anyhow!("{e}"))?;
683
684 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
685 self.generate_order_status_reports(&order_cmd),
686 self.generate_fill_reports(fill_cmd),
687 self.generate_position_status_reports(&position_cmd),
688 )?;
689
690 let mut mass_status = ExecutionMassStatus::new(
691 self.core.client_id,
692 self.core.account_id,
693 self.core.venue,
694 ts_now,
695 None,
696 );
697 mass_status.add_order_reports(order_reports);
698 mass_status.add_fill_reports(fill_reports);
699 mass_status.add_position_reports(position_reports);
700
701 Ok(Some(mass_status))
702 }
703
704 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
705 let http_client = self.http_client.clone();
706 let emitter = self.emitter.clone();
707 let account_id = self.core.account_id;
708
709 self.spawn_task("query_account", async move {
710 match http_client.request_account_state(account_id).await {
711 Ok(account_state) => emitter.send_account_state(account_state),
712 Err(e) => log::error!("BitMEX query account failed: {e:?}"),
713 }
714 Ok(())
715 });
716
717 Ok(())
718 }
719
720 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
721 let http_client = self.http_client.clone();
722 let instrument_id = cmd.instrument_id;
723 let client_order_id = Some(cmd.client_order_id);
724 let venue_order_id = cmd.venue_order_id;
725 let emitter = self.emitter.clone();
726
727 self.spawn_task("query_order", async move {
728 match http_client
729 .request_order_status_report(instrument_id, client_order_id, venue_order_id)
730 .await
731 {
732 Ok(report) => emitter.send_order_status_report(report),
733 Err(e) => log::error!("BitMEX query order failed: {e:?}"),
734 }
735 Ok(())
736 });
737
738 Ok(())
739 }
740
741 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
742 let submit_tries = cmd
743 .params
744 .as_ref()
745 .and_then(|params| params.get("submit_tries"))
746 .and_then(|s| s.parse::<usize>().ok())
747 .filter(|&n| n > 0);
748
749 let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
750 let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
751
752 let order = self
753 .core
754 .cache()
755 .order(&cmd.client_order_id)
756 .cloned()
757 .ok_or_else(|| {
758 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
759 })?;
760
761 self.submit_cached_order(
762 order,
763 submit_tries,
764 peg_price_type,
765 peg_offset_value,
766 "submit_order",
767 )
768 }
769
770 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
771 if cmd.order_list.client_order_ids.is_empty() {
772 log::debug!("submit_order_list called with empty order list");
773 return Ok(());
774 }
775
776 let submit_tries = cmd
777 .params
778 .as_ref()
779 .and_then(|params| params.get("submit_tries"))
780 .and_then(|s| s.parse::<usize>().ok())
781 .filter(|&n| n > 0);
782
783 let peg_price_type = parse_peg_price_type(cmd.params.as_ref())?;
784 let peg_offset_value = parse_peg_offset_value(cmd.params.as_ref())?;
785
786 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
787
788 log::info!(
789 "Submitting BitMEX order list: order_list_id={}, count={}",
790 cmd.order_list.id,
791 orders.len(),
792 );
793
794 for order in orders {
795 self.submit_cached_order(
796 order,
797 submit_tries,
798 peg_price_type,
799 peg_offset_value,
800 "submit_order_list_item",
801 )?;
802 }
803
804 Ok(())
805 }
806
807 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
808 let http_client = self.http_client.clone();
809 let emitter = self.emitter.clone();
810 let instrument_id = cmd.instrument_id;
811 let client_order_id = Some(cmd.client_order_id);
812 let venue_order_id = cmd.venue_order_id;
813 let quantity = cmd.quantity;
814 let price = cmd.price;
815 let trigger_price = cmd.trigger_price;
816
817 self.spawn_task("modify_order", async move {
818 match http_client
819 .modify_order(
820 instrument_id,
821 client_order_id,
822 venue_order_id,
823 quantity,
824 price,
825 trigger_price,
826 )
827 .await
828 {
829 Ok(report) => emitter.send_order_status_report(report),
830 Err(e) => log::error!("BitMEX modify order failed: {e:?}"),
831 }
832 Ok(())
833 });
834
835 Ok(())
836 }
837
838 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
839 let canceller = self._canceller.clone_for_async();
840 let emitter = self.emitter.clone();
841 let instrument_id = cmd.instrument_id;
842 let client_order_id = Some(cmd.client_order_id);
843 let venue_order_id = cmd.venue_order_id;
844
845 self.spawn_task("cancel_order", async move {
846 match canceller
847 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
848 .await
849 {
850 Ok(Some(report)) => emitter.send_order_status_report(report),
851 Ok(None) => {
852 log::debug!("Order already cancelled: {client_order_id:?}");
854 }
855 Err(e) => log::error!("BitMEX cancel order failed: {e:?}"),
856 }
857 Ok(())
858 });
859
860 Ok(())
861 }
862
863 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
864 let canceller = self._canceller.clone_for_async();
865 let emitter = self.emitter.clone();
866 let instrument_id = cmd.instrument_id;
867 let order_side = if cmd.order_side == OrderSide::NoOrderSide {
868 log::debug!(
869 "BitMEX cancel_all_orders received NoOrderSide for {instrument_id}, using unfiltered cancel-all",
870 );
871 None
872 } else {
873 Some(cmd.order_side)
874 };
875
876 self.spawn_task("cancel_all_orders", async move {
877 match canceller
878 .broadcast_cancel_all(instrument_id, order_side)
879 .await
880 {
881 Ok(reports) => {
882 for report in reports {
883 emitter.send_order_status_report(report);
884 }
885 }
886 Err(e) => log::error!("BitMEX cancel all failed: {e:?}"),
887 }
888 Ok(())
889 });
890
891 Ok(())
892 }
893
894 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
895 let canceller = self._canceller.clone_for_async();
896 let emitter = self.emitter.clone();
897 let instrument_id = cmd.instrument_id;
898
899 let client_ids: Vec<ClientOrderId> = cmd
900 .cancels
901 .iter()
902 .map(|cancel| cancel.client_order_id)
903 .collect();
904
905 let venue_ids: Vec<VenueOrderId> = cmd
906 .cancels
907 .iter()
908 .filter_map(|cancel| cancel.venue_order_id)
909 .collect();
910
911 let client_ids_opt = if client_ids.is_empty() {
912 None
913 } else {
914 Some(client_ids)
915 };
916
917 let venue_ids_opt = if venue_ids.is_empty() {
918 None
919 } else {
920 Some(venue_ids)
921 };
922
923 self.spawn_task("batch_cancel_orders", async move {
924 match canceller
925 .broadcast_batch_cancel(instrument_id, client_ids_opt, venue_ids_opt)
926 .await
927 {
928 Ok(reports) => {
929 for report in reports {
930 emitter.send_order_status_report(report);
931 }
932 }
933 Err(e) => log::error!("BitMEX batch cancel failed: {e:?}"),
934 }
935 Ok(())
936 });
937
938 Ok(())
939 }
940}
941
942fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
944 match message {
945 NautilusWsMessage::OrderStatusReports(reports) => {
946 for report in reports {
947 emitter.send_order_status_report(report);
948 }
949 }
950 NautilusWsMessage::FillReports(reports) => {
951 for report in reports {
952 emitter.send_fill_report(report);
953 }
954 }
955 NautilusWsMessage::PositionStatusReports(reports) => {
956 for report in reports {
957 emitter.send_position_report(report);
958 }
959 }
960 NautilusWsMessage::AccountStates(states) => {
961 for state in states {
962 emitter.send_account_state(state);
963 }
964 }
965 NautilusWsMessage::OrderUpdated(event) => {
966 emitter.send_order_event(OrderEventAny::Updated(*event));
967 }
968 NautilusWsMessage::OrderUpdates(events) => {
969 for event in events {
970 emitter.send_order_event(OrderEventAny::Updated(event));
971 }
972 }
973 NautilusWsMessage::Data(_)
974 | NautilusWsMessage::Instruments(_)
975 | NautilusWsMessage::FundingRateUpdates(_) => {
976 log::debug!("Ignoring BitMEX data message on execution stream");
977 }
978 NautilusWsMessage::Reconnected => {
979 log::info!("BitMEX execution websocket reconnected");
980 }
981 NautilusWsMessage::Authenticated => {
982 log::debug!("BitMEX execution websocket authenticated");
983 }
984 }
985}
986
987fn parse_peg_price_type(
988 params: Option<&IndexMap<String, String>>,
989) -> anyhow::Result<Option<BitmexPegPriceType>> {
990 let value = params.and_then(|p| p.get("peg_price_type"));
991 match value {
992 Some(s) => BitmexPegPriceType::from_str(s)
993 .map(Some)
994 .map_err(|_| anyhow::anyhow!("Invalid peg_price_type: {s}")),
995 None => Ok(None),
996 }
997}
998
999fn parse_peg_offset_value(
1000 params: Option<&IndexMap<String, String>>,
1001) -> anyhow::Result<Option<f64>> {
1002 let value = params.and_then(|p| p.get("peg_offset_value"));
1003 match value {
1004 Some(s) => s
1005 .parse::<f64>()
1006 .map(Some)
1007 .map_err(|_| anyhow::anyhow!("Invalid peg_offset_value: {s}")),
1008 None => Ok(None),
1009 }
1010}