1use std::{
19 future::Future,
20 sync::{
21 Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use anyhow::Context;
27use async_trait::async_trait;
28use chrono::{DateTime, Utc};
29use futures_util::{StreamExt, pin_mut};
30use nautilus_common::{
31 live::{runner::get_exec_event_sender, runtime::get_runtime},
32 messages::{
33 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
34 execution::{
35 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
36 GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
37 QueryOrder, SubmitOrder, SubmitOrderList,
38 },
39 },
40};
41use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
43use nautilus_live::execution::client::LiveExecutionClient;
44use nautilus_model::{
45 accounts::AccountAny,
46 enums::{AccountType, OmsType, OrderType},
47 events::{AccountState, OrderEventAny, OrderRejected, OrderSubmitted},
48 identifiers::{AccountId, ClientId, InstrumentId, Venue},
49 orders::Order,
50 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51 types::{AccountBalance, MarginBalance},
52};
53use tokio::task::JoinHandle;
54
55use crate::{
56 common::{
57 consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
58 enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
59 },
60 config::OKXExecClientConfig,
61 http::client::OKXHttpClient,
62 websocket::{
63 client::OKXWebSocketClient,
64 messages::{ExecutionReport, NautilusWsMessage},
65 },
66};
67
68#[derive(Debug)]
69pub struct OKXExecutionClient {
70 core: ExecutionClientCore,
71 config: OKXExecClientConfig,
72 http_client: OKXHttpClient,
73 ws_private: OKXWebSocketClient,
74 ws_business: OKXWebSocketClient,
75 trade_mode: OKXTradeMode,
76 exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
77 started: bool,
78 connected: AtomicBool,
79 instruments_initialized: AtomicBool,
80 ws_stream_handle: Option<JoinHandle<()>>,
81 ws_business_stream_handle: Option<JoinHandle<()>>,
82 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
83}
84
85impl OKXExecutionClient {
86 pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
92 let http_client = OKXHttpClient::with_credentials(
94 config.api_key.clone(),
95 config.api_secret.clone(),
96 config.api_passphrase.clone(),
97 config.base_url_http.clone(),
98 config.http_timeout_secs,
99 config.max_retries,
100 config.retry_delay_initial_ms,
101 config.retry_delay_max_ms,
102 config.is_demo,
103 config.http_proxy_url.clone(),
104 )?;
105
106 let account_id = core.account_id;
107 let ws_private = OKXWebSocketClient::with_credentials(
108 Some(config.ws_private_url()),
109 config.api_key.clone(),
110 config.api_secret.clone(),
111 config.api_passphrase.clone(),
112 Some(account_id),
113 Some(20), )
115 .context("failed to construct OKX private websocket client")?;
116
117 let ws_business = OKXWebSocketClient::with_credentials(
118 Some(config.ws_business_url()),
119 config.api_key.clone(),
120 config.api_secret.clone(),
121 config.api_passphrase.clone(),
122 Some(account_id),
123 Some(20), )
125 .context("failed to construct OKX business websocket client")?;
126
127 let trade_mode = Self::derive_trade_mode(core.account_type, &config);
128
129 Ok(Self {
130 core,
131 config,
132 http_client,
133 ws_private,
134 ws_business,
135 trade_mode,
136 exec_event_sender: None,
137 started: false,
138 connected: AtomicBool::new(false),
139 instruments_initialized: AtomicBool::new(false),
140 ws_stream_handle: None,
141 ws_business_stream_handle: None,
142 pending_tasks: Mutex::new(Vec::new()),
143 })
144 }
145
146 fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
147 let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
148
149 if account_type == AccountType::Cash {
150 if !config.use_spot_margin {
151 return OKXTradeMode::Cash;
152 }
153 return if is_cross_margin {
154 OKXTradeMode::Cross
155 } else {
156 OKXTradeMode::Isolated
157 };
158 }
159
160 if is_cross_margin {
161 OKXTradeMode::Cross
162 } else {
163 OKXTradeMode::Isolated
164 }
165 }
166
167 fn instrument_types(&self) -> Vec<OKXInstrumentType> {
168 if self.config.instrument_types.is_empty() {
169 vec![OKXInstrumentType::Spot]
170 } else {
171 self.config.instrument_types.clone()
172 }
173 }
174
175 async fn refresh_account_state(&self) -> anyhow::Result<()> {
176 let account_state = self
177 .http_client
178 .request_account_state(self.core.account_id)
179 .await
180 .context("failed to request OKX account state")?;
181
182 self.core.generate_account_state(
183 account_state.balances.clone(),
184 account_state.margins.clone(),
185 account_state.is_reported,
186 account_state.ts_event,
187 )
188 }
189
190 fn update_account_state(&self) -> anyhow::Result<()> {
191 let runtime = get_runtime();
192 runtime.block_on(self.refresh_account_state())
193 }
194
195 fn is_conditional_order(&self, order_type: OrderType) -> bool {
196 OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
197 }
198
199 fn submit_regular_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
200 let order = cmd.order.clone();
201 let ws_private = self.ws_private.clone();
202 let trade_mode = self.trade_mode;
203
204 self.spawn_task("submit_order", async move {
205 ws_private
206 .submit_order(
207 order.trader_id(),
208 order.strategy_id(),
209 order.instrument_id(),
210 trade_mode,
211 order.client_order_id(),
212 order.order_side(),
213 order.order_type(),
214 order.quantity(),
215 Some(order.time_in_force()),
216 order.price(),
217 order.trigger_price(),
218 Some(order.is_post_only()),
219 Some(order.is_reduce_only()),
220 Some(order.is_quote_quantity()),
221 None,
222 )
223 .await?;
224 Ok(())
225 });
226
227 Ok(())
228 }
229
230 fn submit_conditional_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
231 let order = cmd.order.clone();
232 let trigger_price = order
233 .trigger_price()
234 .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
235 let http_client = self.http_client.clone();
236 let trade_mode = self.trade_mode;
237
238 self.spawn_task("submit_algo_order", async move {
239 http_client
240 .place_algo_order_with_domain_types(
241 order.instrument_id(),
242 trade_mode,
243 order.client_order_id(),
244 order.order_side(),
245 order.order_type(),
246 order.quantity(),
247 trigger_price,
248 order.trigger_type(),
249 order.price(),
250 Some(order.is_reduce_only()),
251 )
252 .await?;
253 Ok(())
254 });
255
256 Ok(())
257 }
258
259 fn cancel_ws_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
260 let ws_private = self.ws_private.clone();
261 let command = cmd.clone();
262
263 self.spawn_task("cancel_order", async move {
264 ws_private
265 .cancel_order(
266 command.trader_id,
267 command.strategy_id,
268 command.instrument_id,
269 Some(command.client_order_id),
270 Some(command.venue_order_id),
271 )
272 .await?;
273 Ok(())
274 });
275
276 Ok(())
277 }
278
279 fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
280 let ws_private = self.ws_private.clone();
281 self.spawn_task("mass_cancel_orders", async move {
282 ws_private.mass_cancel_orders(instrument_id).await?;
283 Ok(())
284 });
285 Ok(())
286 }
287
288 fn spawn_task<F>(&self, description: &'static str, fut: F)
289 where
290 F: Future<Output = anyhow::Result<()>> + Send + 'static,
291 {
292 let runtime = get_runtime();
293 let handle = runtime.spawn(async move {
294 if let Err(e) = fut.await {
295 tracing::warn!("{description} failed: {e:?}");
296 }
297 });
298
299 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
300 tasks.retain(|handle| !handle.is_finished());
301 tasks.push(handle);
302 }
303
304 fn abort_pending_tasks(&self) {
305 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
306 for handle in tasks.drain(..) {
307 handle.abort();
308 }
309 }
310}
311
312#[async_trait(?Send)]
313impl ExecutionClient for OKXExecutionClient {
314 fn is_connected(&self) -> bool {
315 self.connected.load(Ordering::Acquire)
316 }
317
318 fn client_id(&self) -> ClientId {
319 self.core.client_id
320 }
321
322 fn account_id(&self) -> AccountId {
323 self.core.account_id
324 }
325
326 fn venue(&self) -> Venue {
327 *OKX_VENUE
328 }
329
330 fn oms_type(&self) -> OmsType {
331 self.core.oms_type
332 }
333
334 fn get_account(&self) -> Option<AccountAny> {
335 self.core.get_account()
336 }
337
338 async fn connect(&mut self) -> anyhow::Result<()> {
339 if self.connected.load(Ordering::Acquire) {
340 return Ok(());
341 }
342
343 if self.exec_event_sender.is_none() {
345 self.exec_event_sender = Some(get_exec_event_sender());
346 }
347
348 let instrument_types = self.instrument_types();
349
350 if !self.instruments_initialized.load(Ordering::Acquire) {
351 let mut all_instruments = Vec::new();
352 for instrument_type in &instrument_types {
353 let instruments = self
354 .http_client
355 .request_instruments(*instrument_type, None)
356 .await
357 .with_context(|| {
358 format!("failed to request OKX instruments for {instrument_type:?}")
359 })?;
360
361 if instruments.is_empty() {
362 tracing::warn!("No instruments returned for {instrument_type:?}");
363 continue;
364 }
365
366 self.http_client.cache_instruments(instruments.clone());
367 all_instruments.extend(instruments);
368 }
369
370 if !all_instruments.is_empty() {
371 self.ws_private.cache_instruments(all_instruments);
372 }
373 self.instruments_initialized.store(true, Ordering::Release);
374 }
375
376 let Some(sender) = self.exec_event_sender.as_ref() else {
377 tracing::error!("Execution event sender not initialized");
378 anyhow::bail!("Execution event sender not initialized");
379 };
380
381 self.ws_private.connect().await?;
382 self.ws_private.wait_until_active(10.0).await?;
383
384 if self.ws_stream_handle.is_none() {
385 let stream = self.ws_private.stream();
386 let sender = sender.clone();
387 let handle = tokio::spawn(async move {
388 pin_mut!(stream);
389 while let Some(message) = stream.next().await {
390 dispatch_ws_message(message, &sender);
391 }
392 });
393 self.ws_stream_handle = Some(handle);
394 }
395
396 self.ws_business.connect().await?;
397 self.ws_business.wait_until_active(10.0).await?;
398
399 if self.ws_business_stream_handle.is_none() {
400 let stream = self.ws_business.stream();
401 let sender = sender.clone();
402 let handle = tokio::spawn(async move {
403 pin_mut!(stream);
404 while let Some(message) = stream.next().await {
405 dispatch_ws_message(message, &sender);
406 }
407 });
408 self.ws_business_stream_handle = Some(handle);
409 }
410
411 for inst_type in &instrument_types {
412 tracing::debug!(
413 "Subscribing to channels for instrument type: {:?}",
414 inst_type
415 );
416 self.ws_private.subscribe_orders(*inst_type).await?;
417
418 if self.config.use_fills_channel
419 && let Err(e) = self.ws_private.subscribe_fills(*inst_type).await
420 {
421 tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
422 }
423 }
424
425 self.ws_private.subscribe_account().await?;
426
427 for inst_type in &instrument_types {
429 if *inst_type != OKXInstrumentType::Option {
430 self.ws_business.subscribe_orders_algo(*inst_type).await?;
431 }
432 }
433
434 let account_state = self
435 .http_client
436 .request_account_state(self.core.account_id)
437 .await
438 .context("failed to request OKX account state")?;
439
440 dispatch_account_state(account_state, sender);
441
442 self.connected.store(true, Ordering::Release);
443 tracing::info!(client_id = %self.core.client_id, "Connected");
444 Ok(())
445 }
446
447 async fn disconnect(&mut self) -> anyhow::Result<()> {
448 if !self.connected.load(Ordering::Acquire) {
449 return Ok(());
450 }
451
452 self.abort_pending_tasks();
453 self.http_client.cancel_all_requests();
454
455 if let Err(e) = self.ws_private.close().await {
456 tracing::warn!("Error closing private websocket: {e:?}");
457 }
458
459 if let Err(e) = self.ws_business.close().await {
460 tracing::warn!("Error closing business websocket: {e:?}");
461 }
462
463 if let Some(handle) = self.ws_stream_handle.take() {
464 handle.abort();
465 }
466
467 if let Some(handle) = self.ws_business_stream_handle.take() {
468 handle.abort();
469 }
470
471 self.connected.store(false, Ordering::Release);
472 tracing::info!(client_id = %self.core.client_id, "Disconnected");
473 Ok(())
474 }
475
476 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
477 self.update_account_state()
478 }
479
480 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
481 tracing::debug!(
482 "query_order not implemented for OKX execution client (client_order_id={})",
483 cmd.client_order_id
484 );
485 Ok(())
486 }
487
488 fn generate_account_state(
489 &self,
490 balances: Vec<AccountBalance>,
491 margins: Vec<MarginBalance>,
492 reported: bool,
493 ts_event: UnixNanos,
494 ) -> anyhow::Result<()> {
495 self.core
496 .generate_account_state(balances, margins, reported, ts_event)
497 }
498
499 fn start(&mut self) -> anyhow::Result<()> {
500 if self.started {
501 return Ok(());
502 }
503
504 self.started = true;
505
506 let http_client = self.http_client.clone();
508 let ws_private = self.ws_private.clone();
509 let instrument_types = self.config.instrument_types.clone();
510
511 get_runtime().spawn(async move {
512 let mut all_instruments = Vec::new();
513 for instrument_type in instrument_types {
514 match http_client.request_instruments(instrument_type, None).await {
515 Ok(instruments) => {
516 if instruments.is_empty() {
517 tracing::warn!("No instruments returned for {instrument_type:?}");
518 continue;
519 }
520 http_client.cache_instruments(instruments.clone());
521 all_instruments.extend(instruments);
522 }
523 Err(e) => {
524 tracing::error!(
525 "Failed to request instruments for {instrument_type:?}: {e}"
526 );
527 }
528 }
529 }
530
531 if all_instruments.is_empty() {
532 tracing::warn!(
533 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
534 );
535 } else {
536 ws_private.cache_instruments(all_instruments);
537 tracing::info!("Instruments initialized");
538 }
539 });
540
541 tracing::info!(
542 client_id = %self.core.client_id,
543 account_id = %self.core.account_id,
544 account_type = ?self.core.account_type,
545 trade_mode = ?self.trade_mode,
546 instrument_types = ?self.config.instrument_types,
547 use_fills_channel = self.config.use_fills_channel,
548 is_demo = self.config.is_demo,
549 http_proxy_url = ?self.config.http_proxy_url,
550 ws_proxy_url = ?self.config.ws_proxy_url,
551 "Started"
552 );
553 Ok(())
554 }
555
556 fn stop(&mut self) -> anyhow::Result<()> {
557 if !self.started {
558 return Ok(());
559 }
560
561 self.started = false;
562 self.connected.store(false, Ordering::Release);
563 if let Some(handle) = self.ws_stream_handle.take() {
564 handle.abort();
565 }
566 self.abort_pending_tasks();
567 tracing::info!(client_id = %self.core.client_id, "Stopped");
568 Ok(())
569 }
570
571 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
572 let order = &cmd.order;
573
574 if order.is_closed() {
575 let client_order_id = order.client_order_id();
576 tracing::warn!("Cannot submit closed order {client_order_id}");
577 return Ok(());
578 }
579
580 let event = OrderSubmitted::new(
581 self.core.trader_id,
582 order.strategy_id(),
583 order.instrument_id(),
584 order.client_order_id(),
585 self.core.account_id,
586 UUID4::new(),
587 cmd.ts_init,
588 get_atomic_clock_realtime().get_time_ns(),
589 );
590 if let Some(sender) = &self.exec_event_sender {
591 tracing::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
592 if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Submitted(event))) {
593 tracing::warn!("Failed to send OrderSubmitted event: {e}");
594 }
595 } else {
596 tracing::warn!("Cannot send OrderSubmitted: exec_event_sender not initialized");
597 }
598
599 let result = if self.is_conditional_order(order.order_type()) {
600 self.submit_conditional_order(cmd)
601 } else {
602 self.submit_regular_order(cmd)
603 };
604
605 if let Err(e) = result {
606 let rejected_event = OrderRejected::new(
607 self.core.trader_id,
608 order.strategy_id(),
609 order.instrument_id(),
610 order.client_order_id(),
611 self.core.account_id,
612 format!("submit-order-error: {e}").into(),
613 UUID4::new(),
614 cmd.ts_init,
615 get_atomic_clock_realtime().get_time_ns(),
616 false,
617 false,
618 );
619 if let Some(sender) = &self.exec_event_sender {
620 if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(
621 rejected_event,
622 ))) {
623 tracing::warn!("Failed to send OrderRejected event: {e}");
624 }
625 } else {
626 tracing::warn!("Cannot send OrderRejected: exec_event_sender not initialized");
627 }
628 return Err(e);
629 }
630
631 Ok(())
632 }
633
634 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
635 tracing::warn!(
636 "submit_order_list not yet implemented for OKX execution client (got {} orders)",
637 cmd.order_list.orders.len()
638 );
639 Ok(())
640 }
641
642 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
643 let ws_private = self.ws_private.clone();
644 let command = cmd.clone();
645
646 self.spawn_task("modify_order", async move {
647 ws_private
648 .modify_order(
649 command.trader_id,
650 command.strategy_id,
651 command.instrument_id,
652 Some(command.client_order_id),
653 command.price,
654 command.quantity,
655 Some(command.venue_order_id),
656 )
657 .await?;
658 Ok(())
659 });
660
661 Ok(())
662 }
663
664 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
665 self.cancel_ws_order(cmd)
666 }
667
668 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
669 if self.config.use_mm_mass_cancel {
670 self.mass_cancel_instrument(cmd.instrument_id)
672 } else {
673 let cache = self.core.cache().borrow();
675 let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None);
676
677 if open_orders.is_empty() {
678 tracing::debug!("No open orders to cancel for {}", cmd.instrument_id);
679 return Ok(());
680 }
681
682 let mut payload = Vec::with_capacity(open_orders.len());
683 for order in open_orders {
684 payload.push((
685 order.instrument_id(),
686 Some(order.client_order_id()),
687 order.venue_order_id(),
688 ));
689 }
690 drop(cache);
691
692 tracing::debug!(
693 "Canceling {} open orders for {} via batch cancel",
694 payload.len(),
695 cmd.instrument_id
696 );
697
698 let ws_private = self.ws_private.clone();
699 self.spawn_task("batch_cancel_orders", async move {
700 ws_private.batch_cancel_orders(payload).await?;
701 Ok(())
702 });
703
704 Ok(())
705 }
706 }
707
708 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
709 let mut payload = Vec::with_capacity(cmd.cancels.len());
710
711 for cancel in &cmd.cancels {
712 payload.push((
713 cancel.instrument_id,
714 Some(cancel.client_order_id),
715 Some(cancel.venue_order_id),
716 ));
717 }
718
719 let ws_private = self.ws_private.clone();
720 self.spawn_task("batch_cancel_orders", async move {
721 ws_private.batch_cancel_orders(payload).await?;
722 Ok(())
723 });
724
725 Ok(())
726 }
727}
728
729#[async_trait(?Send)]
730impl LiveExecutionClient for OKXExecutionClient {
731 async fn generate_order_status_report(
732 &self,
733 cmd: &GenerateOrderStatusReport,
734 ) -> anyhow::Result<Option<OrderStatusReport>> {
735 let Some(instrument_id) = cmd.instrument_id else {
736 tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
737 return Ok(None);
738 };
739
740 let mut reports = self
741 .http_client
742 .request_order_status_reports(
743 self.core.account_id,
744 None,
745 Some(instrument_id),
746 None,
747 None,
748 false,
749 None,
750 )
751 .await?;
752
753 if let Some(client_order_id) = cmd.client_order_id {
754 reports.retain(|report| report.client_order_id == Some(client_order_id));
755 }
756
757 if let Some(venue_order_id) = cmd.venue_order_id {
758 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
759 }
760
761 Ok(reports.into_iter().next())
762 }
763
764 async fn generate_order_status_reports(
765 &self,
766 cmd: &GenerateOrderStatusReport,
767 ) -> anyhow::Result<Vec<OrderStatusReport>> {
768 let mut reports = Vec::new();
769
770 if let Some(instrument_id) = cmd.instrument_id {
771 let mut fetched = self
772 .http_client
773 .request_order_status_reports(
774 self.core.account_id,
775 None,
776 Some(instrument_id),
777 None,
778 None,
779 false,
780 None,
781 )
782 .await?;
783 reports.append(&mut fetched);
784 } else {
785 for inst_type in self.instrument_types() {
786 let mut fetched = self
787 .http_client
788 .request_order_status_reports(
789 self.core.account_id,
790 Some(inst_type),
791 None,
792 None,
793 None,
794 false,
795 None,
796 )
797 .await?;
798 reports.append(&mut fetched);
799 }
800 }
801
802 if let Some(client_order_id) = cmd.client_order_id {
803 reports.retain(|report| report.client_order_id == Some(client_order_id));
804 }
805
806 if let Some(venue_order_id) = cmd.venue_order_id {
807 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
808 }
809
810 Ok(reports)
811 }
812
813 async fn generate_fill_reports(
814 &self,
815 cmd: GenerateFillReports,
816 ) -> anyhow::Result<Vec<FillReport>> {
817 let start_dt = nanos_to_datetime(cmd.start);
818 let end_dt = nanos_to_datetime(cmd.end);
819 let mut reports = Vec::new();
820
821 if let Some(instrument_id) = cmd.instrument_id {
822 let mut fetched = self
823 .http_client
824 .request_fill_reports(
825 self.core.account_id,
826 None,
827 Some(instrument_id),
828 start_dt,
829 end_dt,
830 None,
831 )
832 .await?;
833 reports.append(&mut fetched);
834 } else {
835 for inst_type in self.instrument_types() {
836 let mut fetched = self
837 .http_client
838 .request_fill_reports(
839 self.core.account_id,
840 Some(inst_type),
841 None,
842 start_dt,
843 end_dt,
844 None,
845 )
846 .await?;
847 reports.append(&mut fetched);
848 }
849 }
850
851 if let Some(venue_order_id) = cmd.venue_order_id {
852 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
853 }
854
855 Ok(reports)
856 }
857
858 async fn generate_position_status_reports(
859 &self,
860 cmd: &GeneratePositionReports,
861 ) -> anyhow::Result<Vec<PositionStatusReport>> {
862 let mut reports = Vec::new();
863
864 if let Some(instrument_id) = cmd.instrument_id {
866 let mut fetched = self
867 .http_client
868 .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
869 .await?;
870 reports.append(&mut fetched);
871 } else {
872 for inst_type in self.instrument_types() {
873 let mut fetched = self
874 .http_client
875 .request_position_status_reports(self.core.account_id, Some(inst_type), None)
876 .await?;
877 reports.append(&mut fetched);
878 }
879 }
880
881 let mut margin_reports = self
884 .http_client
885 .request_spot_margin_position_reports(self.core.account_id)
886 .await?;
887
888 if let Some(instrument_id) = cmd.instrument_id {
889 margin_reports.retain(|report| report.instrument_id == instrument_id);
890 }
891
892 reports.append(&mut margin_reports);
893
894 let _ = nanos_to_datetime(cmd.start);
895 let _ = nanos_to_datetime(cmd.end);
896
897 Ok(reports)
898 }
899
900 async fn generate_mass_status(
901 &self,
902 lookback_mins: Option<u64>,
903 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
904 tracing::warn!(
905 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
906 );
907 Ok(None)
908 }
909}
910
911fn dispatch_ws_message(
912 message: NautilusWsMessage,
913 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
914) {
915 match message {
916 NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state, sender),
917 NautilusWsMessage::PositionUpdate(report) => {
918 dispatch_position_status_report(report, sender);
919 }
920 NautilusWsMessage::ExecutionReports(reports) => {
921 tracing::debug!("Processing {} execution report(s)", reports.len());
922 for report in reports {
923 dispatch_execution_report(report, sender);
924 }
925 }
926 NautilusWsMessage::OrderAccepted(event) => {
927 dispatch_order_event(OrderEventAny::Accepted(event), sender);
928 }
929 NautilusWsMessage::OrderCanceled(event) => {
930 dispatch_order_event(OrderEventAny::Canceled(event), sender);
931 }
932 NautilusWsMessage::OrderExpired(event) => {
933 dispatch_order_event(OrderEventAny::Expired(event), sender);
934 }
935 NautilusWsMessage::OrderRejected(event) => {
936 dispatch_order_event(OrderEventAny::Rejected(event), sender);
937 }
938 NautilusWsMessage::OrderCancelRejected(event) => {
939 dispatch_order_event(OrderEventAny::CancelRejected(event), sender);
940 }
941 NautilusWsMessage::OrderModifyRejected(event) => {
942 dispatch_order_event(OrderEventAny::ModifyRejected(event), sender);
943 }
944 NautilusWsMessage::OrderTriggered(event) => {
945 dispatch_order_event(OrderEventAny::Triggered(event), sender);
946 }
947 NautilusWsMessage::OrderUpdated(event) => {
948 dispatch_order_event(OrderEventAny::Updated(event), sender);
949 }
950 NautilusWsMessage::Error(e) => {
951 tracing::warn!(
952 "Websocket error: code={} message={} conn_id={:?}",
953 e.code,
954 e.message,
955 e.conn_id
956 );
957 }
958 NautilusWsMessage::Reconnected => {
959 tracing::info!("Websocket reconnected");
960 }
961 NautilusWsMessage::Authenticated => {
962 tracing::debug!("Websocket authenticated");
963 }
964 NautilusWsMessage::Deltas(_)
965 | NautilusWsMessage::Raw(_)
966 | NautilusWsMessage::Data(_)
967 | NautilusWsMessage::FundingRates(_)
968 | NautilusWsMessage::Instrument(_) => {
969 tracing::debug!("Ignoring websocket data message");
970 }
971 }
972}
973
974fn dispatch_account_state(
975 state: AccountState,
976 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
977) {
978 if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
979 tracing::warn!("Failed to send account state: {e}");
980 }
981}
982
983fn dispatch_position_status_report(
984 report: PositionStatusReport,
985 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
986) {
987 let exec_report = NautilusExecutionReport::Position(Box::new(report));
988 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
989 tracing::warn!("Failed to send position status report: {e}");
990 }
991}
992
993fn dispatch_execution_report(
994 report: ExecutionReport,
995 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
996) {
997 match report {
998 ExecutionReport::Order(order_report) => {
999 let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1000 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1001 tracing::warn!("Failed to send order status report: {e}");
1002 }
1003 }
1004 ExecutionReport::Fill(fill_report) => {
1005 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1006 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1007 tracing::warn!("Failed to send fill report: {e}");
1008 }
1009 }
1010 }
1011}
1012
1013fn dispatch_order_event(
1014 event: OrderEventAny,
1015 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1016) {
1017 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
1018 tracing::warn!("Failed to send order event: {e}");
1019 }
1020}
1021
1022fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
1023 value.map(|nanos| nanos.to_datetime_utc())
1024}
1025
1026#[cfg(test)]
1027mod tests {
1028 use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
1029 use nautilus_core::UnixNanos;
1030 use nautilus_model::identifiers::{
1031 ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
1032 };
1033 use rstest::rstest;
1034
1035 #[rstest]
1036 fn test_batch_cancel_orders_builds_payload() {
1037 let trader_id = TraderId::from("TRADER-001");
1038 let strategy_id = StrategyId::from("STRATEGY-001");
1039 let client_id = ClientId::from("OKX");
1040 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1041 let client_order_id1 = ClientOrderId::new("order1");
1042 let client_order_id2 = ClientOrderId::new("order2");
1043 let venue_order_id1 = VenueOrderId::new("venue1");
1044 let venue_order_id2 = VenueOrderId::new("venue2");
1045
1046 let cmd = BatchCancelOrders {
1047 trader_id,
1048 client_id,
1049 strategy_id,
1050 instrument_id,
1051 cancels: vec![
1052 CancelOrder {
1053 trader_id,
1054 client_id,
1055 strategy_id,
1056 instrument_id,
1057 client_order_id: client_order_id1,
1058 venue_order_id: venue_order_id1,
1059 command_id: Default::default(),
1060 ts_init: UnixNanos::default(),
1061 params: None,
1062 },
1063 CancelOrder {
1064 trader_id,
1065 client_id,
1066 strategy_id,
1067 instrument_id,
1068 client_order_id: client_order_id2,
1069 venue_order_id: venue_order_id2,
1070 command_id: Default::default(),
1071 ts_init: UnixNanos::default(),
1072 params: None,
1073 },
1074 ],
1075 command_id: Default::default(),
1076 ts_init: UnixNanos::default(),
1077 params: None,
1078 };
1079
1080 let mut payload = Vec::with_capacity(cmd.cancels.len());
1082 for cancel in &cmd.cancels {
1083 payload.push((
1084 cancel.instrument_id,
1085 Some(cancel.client_order_id),
1086 Some(cancel.venue_order_id),
1087 ));
1088 }
1089
1090 assert_eq!(payload.len(), 2);
1091 assert_eq!(payload[0].0, instrument_id);
1092 assert_eq!(payload[0].1, Some(client_order_id1));
1093 assert_eq!(payload[0].2, Some(venue_order_id1));
1094 assert_eq!(payload[1].0, instrument_id);
1095 assert_eq!(payload[1].1, Some(client_order_id2));
1096 assert_eq!(payload[1].2, Some(venue_order_id2));
1097 }
1098
1099 #[rstest]
1100 fn test_batch_cancel_orders_with_empty_cancels() {
1101 let cmd = BatchCancelOrders {
1102 trader_id: TraderId::from("TRADER-001"),
1103 client_id: ClientId::from("OKX"),
1104 strategy_id: StrategyId::from("STRATEGY-001"),
1105 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1106 cancels: vec![],
1107 command_id: Default::default(),
1108 ts_init: UnixNanos::default(),
1109 params: None,
1110 };
1111
1112 let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1113 Vec::with_capacity(cmd.cancels.len());
1114 assert_eq!(payload.len(), 0);
1115 }
1116}