1use std::{
19 future::Future,
20 sync::{
21 Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::{Duration, Instant},
25};
26
27use anyhow::Context;
28use async_trait::async_trait;
29use chrono::{DateTime, Utc};
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32 live::{runner::get_exec_event_sender, runtime::get_runtime},
33 messages::{
34 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
35 execution::{
36 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
37 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
38 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
39 },
40 },
41};
42use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, time::get_atomic_clock_realtime};
43use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
44use nautilus_model::{
45 accounts::AccountAny,
46 enums::{AccountType, OmsType, OrderType},
47 events::{AccountState, OrderEventAny, OrderRejected, OrderSubmitted},
48 identifiers::{AccountId, ClientId, InstrumentId, Venue},
49 orders::Order,
50 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51 types::{AccountBalance, MarginBalance},
52};
53use tokio::task::JoinHandle;
54
55use crate::{
56 common::{
57 consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
58 enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
59 },
60 config::OKXExecClientConfig,
61 http::client::OKXHttpClient,
62 websocket::{
63 client::OKXWebSocketClient,
64 messages::{ExecutionReport, NautilusWsMessage},
65 },
66};
67
68#[derive(Debug)]
69pub struct OKXExecutionClient {
70 core: ExecutionClientCore,
71 config: OKXExecClientConfig,
72 http_client: OKXHttpClient,
73 ws_private: OKXWebSocketClient,
74 ws_business: OKXWebSocketClient,
75 trade_mode: OKXTradeMode,
76 exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
77 started: bool,
78 connected: AtomicBool,
79 instruments_initialized: AtomicBool,
80 ws_stream_handle: Option<JoinHandle<()>>,
81 ws_business_stream_handle: Option<JoinHandle<()>>,
82 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
83}
84
85impl OKXExecutionClient {
86 pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
92 let http_client = OKXHttpClient::with_credentials(
94 config.api_key.clone(),
95 config.api_secret.clone(),
96 config.api_passphrase.clone(),
97 config.base_url_http.clone(),
98 config.http_timeout_secs,
99 config.max_retries,
100 config.retry_delay_initial_ms,
101 config.retry_delay_max_ms,
102 config.is_demo,
103 config.http_proxy_url.clone(),
104 )?;
105
106 let account_id = core.account_id;
107 let ws_private = OKXWebSocketClient::with_credentials(
108 Some(config.ws_private_url()),
109 config.api_key.clone(),
110 config.api_secret.clone(),
111 config.api_passphrase.clone(),
112 Some(account_id),
113 Some(20), )
115 .context("failed to construct OKX private websocket client")?;
116
117 let ws_business = OKXWebSocketClient::with_credentials(
118 Some(config.ws_business_url()),
119 config.api_key.clone(),
120 config.api_secret.clone(),
121 config.api_passphrase.clone(),
122 Some(account_id),
123 Some(20), )
125 .context("failed to construct OKX business websocket client")?;
126
127 let trade_mode = Self::derive_trade_mode(core.account_type, &config);
128
129 Ok(Self {
130 core,
131 config,
132 http_client,
133 ws_private,
134 ws_business,
135 trade_mode,
136 exec_event_sender: None,
137 started: false,
138 connected: AtomicBool::new(false),
139 instruments_initialized: AtomicBool::new(false),
140 ws_stream_handle: None,
141 ws_business_stream_handle: None,
142 pending_tasks: Mutex::new(Vec::new()),
143 })
144 }
145
146 fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
147 let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
148
149 if account_type == AccountType::Cash {
150 if !config.use_spot_margin {
151 return OKXTradeMode::Cash;
152 }
153 return if is_cross_margin {
154 OKXTradeMode::Cross
155 } else {
156 OKXTradeMode::Isolated
157 };
158 }
159
160 if is_cross_margin {
161 OKXTradeMode::Cross
162 } else {
163 OKXTradeMode::Isolated
164 }
165 }
166
167 fn instrument_types(&self) -> Vec<OKXInstrumentType> {
168 if self.config.instrument_types.is_empty() {
169 vec![OKXInstrumentType::Spot]
170 } else {
171 self.config.instrument_types.clone()
172 }
173 }
174
175 async fn refresh_account_state(&self) -> anyhow::Result<()> {
176 let account_state = self
177 .http_client
178 .request_account_state(self.core.account_id)
179 .await
180 .context("failed to request OKX account state")?;
181
182 self.core.generate_account_state(
183 account_state.balances.clone(),
184 account_state.margins.clone(),
185 account_state.is_reported,
186 account_state.ts_event,
187 )
188 }
189
190 fn update_account_state(&self) -> anyhow::Result<()> {
191 let runtime = get_runtime();
192 runtime.block_on(self.refresh_account_state())
193 }
194
195 fn is_conditional_order(&self, order_type: OrderType) -> bool {
196 OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
197 }
198
199 fn submit_regular_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
200 let order = cmd.order.clone();
201 let ws_private = self.ws_private.clone();
202 let trade_mode = self.trade_mode;
203
204 self.spawn_task("submit_order", async move {
205 ws_private
206 .submit_order(
207 order.trader_id(),
208 order.strategy_id(),
209 order.instrument_id(),
210 trade_mode,
211 order.client_order_id(),
212 order.order_side(),
213 order.order_type(),
214 order.quantity(),
215 Some(order.time_in_force()),
216 order.price(),
217 order.trigger_price(),
218 Some(order.is_post_only()),
219 Some(order.is_reduce_only()),
220 Some(order.is_quote_quantity()),
221 None,
222 )
223 .await?;
224 Ok(())
225 });
226
227 Ok(())
228 }
229
230 fn submit_conditional_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
231 let order = cmd.order.clone();
232 let trigger_price = order
233 .trigger_price()
234 .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
235 let http_client = self.http_client.clone();
236 let trade_mode = self.trade_mode;
237
238 self.spawn_task("submit_algo_order", async move {
239 http_client
240 .place_algo_order_with_domain_types(
241 order.instrument_id(),
242 trade_mode,
243 order.client_order_id(),
244 order.order_side(),
245 order.order_type(),
246 order.quantity(),
247 trigger_price,
248 order.trigger_type(),
249 order.price(),
250 Some(order.is_reduce_only()),
251 )
252 .await?;
253 Ok(())
254 });
255
256 Ok(())
257 }
258
259 fn cancel_ws_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
260 let ws_private = self.ws_private.clone();
261 let command = cmd.clone();
262
263 self.spawn_task("cancel_order", async move {
264 ws_private
265 .cancel_order(
266 command.trader_id,
267 command.strategy_id,
268 command.instrument_id,
269 Some(command.client_order_id),
270 command.venue_order_id,
271 )
272 .await?;
273 Ok(())
274 });
275
276 Ok(())
277 }
278
279 fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
280 let ws_private = self.ws_private.clone();
281 self.spawn_task("mass_cancel_orders", async move {
282 ws_private.mass_cancel_orders(instrument_id).await?;
283 Ok(())
284 });
285 Ok(())
286 }
287
288 fn spawn_task<F>(&self, description: &'static str, fut: F)
289 where
290 F: Future<Output = anyhow::Result<()>> + Send + 'static,
291 {
292 let runtime = get_runtime();
293 let handle = runtime.spawn(async move {
294 if let Err(e) = fut.await {
295 tracing::warn!("{description} failed: {e:?}");
296 }
297 });
298
299 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
300 tasks.retain(|handle| !handle.is_finished());
301 tasks.push(handle);
302 }
303
304 fn abort_pending_tasks(&self) {
305 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
306 for handle in tasks.drain(..) {
307 handle.abort();
308 }
309 }
310
311 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
313 let account_id = self.core.account_id;
314
315 if self.core.cache().borrow().account(&account_id).is_some() {
316 tracing::info!("Account {account_id} registered");
317 return Ok(());
318 }
319
320 let start = Instant::now();
321 let timeout = Duration::from_secs_f64(timeout_secs);
322 let interval = Duration::from_millis(10);
323
324 loop {
325 tokio::time::sleep(interval).await;
326
327 if self.core.cache().borrow().account(&account_id).is_some() {
328 tracing::info!("Account {account_id} registered");
329 return Ok(());
330 }
331
332 if start.elapsed() >= timeout {
333 anyhow::bail!(
334 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
335 );
336 }
337 }
338 }
339}
340
341#[async_trait(?Send)]
342impl ExecutionClient for OKXExecutionClient {
343 fn is_connected(&self) -> bool {
344 self.connected.load(Ordering::Acquire)
345 }
346
347 fn client_id(&self) -> ClientId {
348 self.core.client_id
349 }
350
351 fn account_id(&self) -> AccountId {
352 self.core.account_id
353 }
354
355 fn venue(&self) -> Venue {
356 *OKX_VENUE
357 }
358
359 fn oms_type(&self) -> OmsType {
360 self.core.oms_type
361 }
362
363 fn get_account(&self) -> Option<AccountAny> {
364 self.core.get_account()
365 }
366
367 async fn connect(&mut self) -> anyhow::Result<()> {
368 if self.connected.load(Ordering::Acquire) {
369 return Ok(());
370 }
371
372 if self.exec_event_sender.is_none() {
374 self.exec_event_sender = Some(get_exec_event_sender());
375 }
376
377 let instrument_types = self.instrument_types();
378
379 if !self.instruments_initialized.load(Ordering::Acquire) {
380 let mut all_instruments = Vec::new();
381 for instrument_type in &instrument_types {
382 let instruments = self
383 .http_client
384 .request_instruments(*instrument_type, None)
385 .await
386 .with_context(|| {
387 format!("failed to request OKX instruments for {instrument_type:?}")
388 })?;
389
390 if instruments.is_empty() {
391 tracing::warn!("No instruments returned for {instrument_type:?}");
392 continue;
393 }
394
395 tracing::info!(
396 "Loaded {} {instrument_type:?} instruments",
397 instruments.len()
398 );
399
400 self.http_client.cache_instruments(instruments.clone());
401 all_instruments.extend(instruments);
402 }
403
404 {
406 let mut cache = self.core.cache().borrow_mut();
407 for instrument in &all_instruments {
408 if let Err(e) = cache.add_instrument(instrument.clone()) {
409 tracing::debug!("Instrument already in cache: {e}");
410 }
411 }
412 }
413
414 if !all_instruments.is_empty() {
415 self.ws_private.cache_instruments(all_instruments);
416 }
417 self.instruments_initialized.store(true, Ordering::Release);
418 }
419
420 let Some(sender) = self.exec_event_sender.as_ref() else {
421 tracing::error!("Execution event sender not initialized");
422 anyhow::bail!("Execution event sender not initialized");
423 };
424
425 self.ws_private.connect().await?;
426 self.ws_private.wait_until_active(10.0).await?;
427 tracing::info!("Connected to private WebSocket");
428
429 if self.ws_stream_handle.is_none() {
430 let stream = self.ws_private.stream();
431 let sender = sender.clone();
432 let handle = get_runtime().spawn(async move {
433 pin_mut!(stream);
434 while let Some(message) = stream.next().await {
435 dispatch_ws_message(message, &sender);
436 }
437 });
438 self.ws_stream_handle = Some(handle);
439 }
440
441 self.ws_business.connect().await?;
442 self.ws_business.wait_until_active(10.0).await?;
443 tracing::info!("Connected to business WebSocket");
444
445 if self.ws_business_stream_handle.is_none() {
446 let stream = self.ws_business.stream();
447 let sender = sender.clone();
448 let handle = get_runtime().spawn(async move {
449 pin_mut!(stream);
450 while let Some(message) = stream.next().await {
451 dispatch_ws_message(message, &sender);
452 }
453 });
454 self.ws_business_stream_handle = Some(handle);
455 }
456
457 for inst_type in &instrument_types {
458 tracing::info!("Subscribing to orders channel for {inst_type:?}");
459 self.ws_private.subscribe_orders(*inst_type).await?;
460
461 if self.config.use_fills_channel {
462 tracing::info!("Subscribing to fills channel for {inst_type:?}");
463 if let Err(e) = self.ws_private.subscribe_fills(*inst_type).await {
464 tracing::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
465 }
466 }
467 }
468
469 self.ws_private.subscribe_account().await?;
470
471 for inst_type in &instrument_types {
473 if *inst_type != OKXInstrumentType::Option {
474 self.ws_business.subscribe_orders_algo(*inst_type).await?;
475 }
476 }
477
478 let account_state = self
479 .http_client
480 .request_account_state(self.core.account_id)
481 .await
482 .context("failed to request OKX account state")?;
483
484 if !account_state.balances.is_empty() {
485 tracing::info!(
486 "Received account state with {} balance(s)",
487 account_state.balances.len()
488 );
489 }
490 dispatch_account_state(account_state, sender);
491
492 self.await_account_registered(30.0).await?;
494
495 self.connected.store(true, Ordering::Release);
496 tracing::info!(client_id = %self.core.client_id, "Connected");
497 Ok(())
498 }
499
500 async fn disconnect(&mut self) -> anyhow::Result<()> {
501 if !self.connected.load(Ordering::Acquire) {
502 return Ok(());
503 }
504
505 self.abort_pending_tasks();
506 self.http_client.cancel_all_requests();
507
508 if let Err(e) = self.ws_private.close().await {
509 tracing::warn!("Error closing private websocket: {e:?}");
510 }
511
512 if let Err(e) = self.ws_business.close().await {
513 tracing::warn!("Error closing business websocket: {e:?}");
514 }
515
516 if let Some(handle) = self.ws_stream_handle.take() {
517 handle.abort();
518 }
519
520 if let Some(handle) = self.ws_business_stream_handle.take() {
521 handle.abort();
522 }
523
524 self.connected.store(false, Ordering::Release);
525 tracing::info!(client_id = %self.core.client_id, "Disconnected");
526 Ok(())
527 }
528
529 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
530 self.update_account_state()
531 }
532
533 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
534 tracing::debug!(
535 "query_order not implemented for OKX execution client (client_order_id={})",
536 cmd.client_order_id
537 );
538 Ok(())
539 }
540
541 fn generate_account_state(
542 &self,
543 balances: Vec<AccountBalance>,
544 margins: Vec<MarginBalance>,
545 reported: bool,
546 ts_event: UnixNanos,
547 ) -> anyhow::Result<()> {
548 self.core
549 .generate_account_state(balances, margins, reported, ts_event)
550 }
551
552 fn start(&mut self) -> anyhow::Result<()> {
553 if self.started {
554 return Ok(());
555 }
556
557 self.started = true;
558
559 let http_client = self.http_client.clone();
561 let ws_private = self.ws_private.clone();
562 let instrument_types = self.config.instrument_types.clone();
563
564 get_runtime().spawn(async move {
565 let mut all_instruments = Vec::new();
566 for instrument_type in instrument_types {
567 match http_client.request_instruments(instrument_type, None).await {
568 Ok(instruments) => {
569 if instruments.is_empty() {
570 tracing::warn!("No instruments returned for {instrument_type:?}");
571 continue;
572 }
573 http_client.cache_instruments(instruments.clone());
574 all_instruments.extend(instruments);
575 }
576 Err(e) => {
577 tracing::error!(
578 "Failed to request instruments for {instrument_type:?}: {e}"
579 );
580 }
581 }
582 }
583
584 if all_instruments.is_empty() {
585 tracing::warn!(
586 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
587 );
588 } else {
589 ws_private.cache_instruments(all_instruments);
590 tracing::info!("Instruments initialized");
591 }
592 });
593
594 tracing::info!(
595 client_id = %self.core.client_id,
596 account_id = %self.core.account_id,
597 account_type = ?self.core.account_type,
598 trade_mode = ?self.trade_mode,
599 instrument_types = ?self.config.instrument_types,
600 use_fills_channel = self.config.use_fills_channel,
601 is_demo = self.config.is_demo,
602 http_proxy_url = ?self.config.http_proxy_url,
603 ws_proxy_url = ?self.config.ws_proxy_url,
604 "Started"
605 );
606 Ok(())
607 }
608
609 fn stop(&mut self) -> anyhow::Result<()> {
610 if !self.started {
611 return Ok(());
612 }
613
614 self.started = false;
615 self.connected.store(false, Ordering::Release);
616 if let Some(handle) = self.ws_stream_handle.take() {
617 handle.abort();
618 }
619 self.abort_pending_tasks();
620 tracing::info!(client_id = %self.core.client_id, "Stopped");
621 Ok(())
622 }
623
624 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
625 let order = &cmd.order;
626
627 if order.is_closed() {
628 let client_order_id = order.client_order_id();
629 tracing::warn!("Cannot submit closed order {client_order_id}");
630 return Ok(());
631 }
632
633 let event = OrderSubmitted::new(
634 self.core.trader_id,
635 order.strategy_id(),
636 order.instrument_id(),
637 order.client_order_id(),
638 self.core.account_id,
639 UUID4::new(),
640 cmd.ts_init,
641 get_atomic_clock_realtime().get_time_ns(),
642 );
643 if let Some(sender) = &self.exec_event_sender {
644 tracing::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
645 if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Submitted(event))) {
646 tracing::warn!("Failed to send OrderSubmitted event: {e}");
647 }
648 } else {
649 tracing::warn!("Cannot send OrderSubmitted: exec_event_sender not initialized");
650 }
651
652 let result = if self.is_conditional_order(order.order_type()) {
653 self.submit_conditional_order(cmd)
654 } else {
655 self.submit_regular_order(cmd)
656 };
657
658 if let Err(e) = result {
659 let rejected_event = OrderRejected::new(
660 self.core.trader_id,
661 order.strategy_id(),
662 order.instrument_id(),
663 order.client_order_id(),
664 self.core.account_id,
665 format!("submit-order-error: {e}").into(),
666 UUID4::new(),
667 cmd.ts_init,
668 get_atomic_clock_realtime().get_time_ns(),
669 false,
670 false,
671 );
672 if let Some(sender) = &self.exec_event_sender {
673 if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(
674 rejected_event,
675 ))) {
676 tracing::warn!("Failed to send OrderRejected event: {e}");
677 }
678 } else {
679 tracing::warn!("Cannot send OrderRejected: exec_event_sender not initialized");
680 }
681 return Err(e);
682 }
683
684 Ok(())
685 }
686
687 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
688 tracing::warn!(
689 "submit_order_list not yet implemented for OKX execution client (got {} orders)",
690 cmd.order_list.orders.len()
691 );
692 Ok(())
693 }
694
695 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
696 let ws_private = self.ws_private.clone();
697 let command = cmd.clone();
698
699 self.spawn_task("modify_order", async move {
700 ws_private
701 .modify_order(
702 command.trader_id,
703 command.strategy_id,
704 command.instrument_id,
705 Some(command.client_order_id),
706 command.price,
707 command.quantity,
708 command.venue_order_id,
709 )
710 .await?;
711 Ok(())
712 });
713
714 Ok(())
715 }
716
717 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
718 self.cancel_ws_order(cmd)
719 }
720
721 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
722 if self.config.use_mm_mass_cancel {
723 self.mass_cancel_instrument(cmd.instrument_id)
725 } else {
726 let cache = self.core.cache().borrow();
728 let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None);
729
730 if open_orders.is_empty() {
731 tracing::debug!("No open orders to cancel for {}", cmd.instrument_id);
732 return Ok(());
733 }
734
735 let mut payload = Vec::with_capacity(open_orders.len());
736 for order in open_orders {
737 payload.push((
738 order.instrument_id(),
739 Some(order.client_order_id()),
740 order.venue_order_id(),
741 ));
742 }
743 drop(cache);
744
745 tracing::debug!(
746 "Canceling {} open orders for {} via batch cancel",
747 payload.len(),
748 cmd.instrument_id
749 );
750
751 let ws_private = self.ws_private.clone();
752 self.spawn_task("batch_cancel_orders", async move {
753 ws_private.batch_cancel_orders(payload).await?;
754 Ok(())
755 });
756
757 Ok(())
758 }
759 }
760
761 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
762 let mut payload = Vec::with_capacity(cmd.cancels.len());
763
764 for cancel in &cmd.cancels {
765 payload.push((
766 cancel.instrument_id,
767 Some(cancel.client_order_id),
768 cancel.venue_order_id,
769 ));
770 }
771
772 let ws_private = self.ws_private.clone();
773 self.spawn_task("batch_cancel_orders", async move {
774 ws_private.batch_cancel_orders(payload).await?;
775 Ok(())
776 });
777
778 Ok(())
779 }
780
781 async fn generate_order_status_report(
782 &self,
783 cmd: &GenerateOrderStatusReport,
784 ) -> anyhow::Result<Option<OrderStatusReport>> {
785 let Some(instrument_id) = cmd.instrument_id else {
786 tracing::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
787 return Ok(None);
788 };
789
790 let mut reports = self
791 .http_client
792 .request_order_status_reports(
793 self.core.account_id,
794 None,
795 Some(instrument_id),
796 None,
797 None,
798 false,
799 None,
800 )
801 .await?;
802
803 if let Some(client_order_id) = cmd.client_order_id {
804 reports.retain(|report| report.client_order_id == Some(client_order_id));
805 }
806
807 if let Some(venue_order_id) = cmd.venue_order_id {
808 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
809 }
810
811 Ok(reports.into_iter().next())
812 }
813
814 async fn generate_order_status_reports(
815 &self,
816 cmd: &GenerateOrderStatusReports,
817 ) -> anyhow::Result<Vec<OrderStatusReport>> {
818 let mut reports = Vec::new();
819
820 if let Some(instrument_id) = cmd.instrument_id {
821 let mut fetched = self
822 .http_client
823 .request_order_status_reports(
824 self.core.account_id,
825 None,
826 Some(instrument_id),
827 None,
828 None,
829 false,
830 None,
831 )
832 .await?;
833 reports.append(&mut fetched);
834 } else {
835 for inst_type in self.instrument_types() {
836 let mut fetched = self
837 .http_client
838 .request_order_status_reports(
839 self.core.account_id,
840 Some(inst_type),
841 None,
842 None,
843 None,
844 false,
845 None,
846 )
847 .await?;
848 reports.append(&mut fetched);
849 }
850 }
851
852 if cmd.open_only {
854 reports.retain(|r| r.order_status.is_open());
855 }
856
857 if let Some(start) = cmd.start {
859 reports.retain(|r| r.ts_last >= start);
860 }
861 if let Some(end) = cmd.end {
862 reports.retain(|r| r.ts_last <= end);
863 }
864
865 Ok(reports)
866 }
867
868 async fn generate_fill_reports(
869 &self,
870 cmd: GenerateFillReports,
871 ) -> anyhow::Result<Vec<FillReport>> {
872 let start_dt = nanos_to_datetime(cmd.start);
873 let end_dt = nanos_to_datetime(cmd.end);
874 let mut reports = Vec::new();
875
876 if let Some(instrument_id) = cmd.instrument_id {
877 let mut fetched = self
878 .http_client
879 .request_fill_reports(
880 self.core.account_id,
881 None,
882 Some(instrument_id),
883 start_dt,
884 end_dt,
885 None,
886 )
887 .await?;
888 reports.append(&mut fetched);
889 } else {
890 for inst_type in self.instrument_types() {
891 let mut fetched = self
892 .http_client
893 .request_fill_reports(
894 self.core.account_id,
895 Some(inst_type),
896 None,
897 start_dt,
898 end_dt,
899 None,
900 )
901 .await?;
902 reports.append(&mut fetched);
903 }
904 }
905
906 if let Some(venue_order_id) = cmd.venue_order_id {
907 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
908 }
909
910 Ok(reports)
911 }
912
913 async fn generate_position_status_reports(
914 &self,
915 cmd: &GeneratePositionStatusReports,
916 ) -> anyhow::Result<Vec<PositionStatusReport>> {
917 let mut reports = Vec::new();
918
919 if let Some(instrument_id) = cmd.instrument_id {
922 let mut fetched = self
923 .http_client
924 .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
925 .await?;
926 reports.append(&mut fetched);
927 } else {
928 for inst_type in self.instrument_types() {
929 if inst_type == OKXInstrumentType::Spot || inst_type == OKXInstrumentType::Margin {
931 continue;
932 }
933 let mut fetched = self
934 .http_client
935 .request_position_status_reports(self.core.account_id, Some(inst_type), None)
936 .await?;
937 reports.append(&mut fetched);
938 }
939 }
940
941 let mut margin_reports = self
944 .http_client
945 .request_spot_margin_position_reports(self.core.account_id)
946 .await?;
947
948 if let Some(instrument_id) = cmd.instrument_id {
949 margin_reports.retain(|report| report.instrument_id == instrument_id);
950 }
951
952 reports.append(&mut margin_reports);
953
954 let _ = nanos_to_datetime(cmd.start);
955 let _ = nanos_to_datetime(cmd.end);
956
957 Ok(reports)
958 }
959
960 async fn generate_mass_status(
961 &self,
962 lookback_mins: Option<u64>,
963 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
964 tracing::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
965
966 let ts_now = get_atomic_clock_realtime().get_time_ns();
967
968 let start = lookback_mins.map(|mins| {
969 let lookback_ns = mins * 60 * 1_000_000_000;
970 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
971 });
972
973 let order_cmd = GenerateOrderStatusReports::new(
974 UUID4::new(),
975 ts_now,
976 false, None, start, None, None, None, );
983
984 let fill_cmd = GenerateFillReports::new(
985 UUID4::new(),
986 ts_now,
987 None, None, start,
990 None, None, None, );
994
995 let position_cmd = GeneratePositionStatusReports::new(
996 UUID4::new(),
997 ts_now,
998 None, start,
1000 None, None, None, );
1004
1005 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1006 self.generate_order_status_reports(&order_cmd),
1007 self.generate_fill_reports(fill_cmd),
1008 self.generate_position_status_reports(&position_cmd),
1009 )?;
1010
1011 tracing::info!("Received {} OrderStatusReports", order_reports.len());
1012 tracing::info!("Received {} FillReports", fill_reports.len());
1013 tracing::info!("Received {} PositionReports", position_reports.len());
1014
1015 let mut mass_status = ExecutionMassStatus::new(
1016 self.core.client_id,
1017 self.core.account_id,
1018 *OKX_VENUE,
1019 ts_now,
1020 None,
1021 );
1022
1023 mass_status.add_order_reports(order_reports);
1024 mass_status.add_fill_reports(fill_reports);
1025 mass_status.add_position_reports(position_reports);
1026
1027 Ok(Some(mass_status))
1028 }
1029}
1030
1031fn dispatch_ws_message(
1032 message: NautilusWsMessage,
1033 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1034) {
1035 match message {
1036 NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state, sender),
1037 NautilusWsMessage::PositionUpdate(report) => {
1038 dispatch_position_status_report(report, sender);
1039 }
1040 NautilusWsMessage::ExecutionReports(reports) => {
1041 tracing::debug!("Processing {} execution report(s)", reports.len());
1042 for report in reports {
1043 dispatch_execution_report(report, sender);
1044 }
1045 }
1046 NautilusWsMessage::OrderAccepted(event) => {
1047 dispatch_order_event(OrderEventAny::Accepted(event), sender);
1048 }
1049 NautilusWsMessage::OrderCanceled(event) => {
1050 dispatch_order_event(OrderEventAny::Canceled(event), sender);
1051 }
1052 NautilusWsMessage::OrderExpired(event) => {
1053 dispatch_order_event(OrderEventAny::Expired(event), sender);
1054 }
1055 NautilusWsMessage::OrderRejected(event) => {
1056 dispatch_order_event(OrderEventAny::Rejected(event), sender);
1057 }
1058 NautilusWsMessage::OrderCancelRejected(event) => {
1059 dispatch_order_event(OrderEventAny::CancelRejected(event), sender);
1060 }
1061 NautilusWsMessage::OrderModifyRejected(event) => {
1062 dispatch_order_event(OrderEventAny::ModifyRejected(event), sender);
1063 }
1064 NautilusWsMessage::OrderTriggered(event) => {
1065 dispatch_order_event(OrderEventAny::Triggered(event), sender);
1066 }
1067 NautilusWsMessage::OrderUpdated(event) => {
1068 dispatch_order_event(OrderEventAny::Updated(event), sender);
1069 }
1070 NautilusWsMessage::Error(e) => {
1071 tracing::warn!(
1072 "Websocket error: code={} message={} conn_id={:?}",
1073 e.code,
1074 e.message,
1075 e.conn_id
1076 );
1077 }
1078 NautilusWsMessage::Reconnected => {
1079 tracing::info!("Websocket reconnected");
1080 }
1081 NautilusWsMessage::Authenticated => {
1082 tracing::debug!("Websocket authenticated");
1083 }
1084 NautilusWsMessage::Deltas(_)
1085 | NautilusWsMessage::Raw(_)
1086 | NautilusWsMessage::Data(_)
1087 | NautilusWsMessage::FundingRates(_)
1088 | NautilusWsMessage::Instrument(_) => {
1089 tracing::debug!("Ignoring websocket data message");
1090 }
1091 }
1092}
1093
1094fn dispatch_account_state(
1095 state: AccountState,
1096 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1097) {
1098 if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
1099 tracing::warn!("Failed to send account state: {e}");
1100 }
1101}
1102
1103fn dispatch_position_status_report(
1104 report: PositionStatusReport,
1105 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1106) {
1107 let exec_report = NautilusExecutionReport::Position(Box::new(report));
1108 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1109 tracing::warn!("Failed to send position status report: {e}");
1110 }
1111}
1112
1113fn dispatch_execution_report(
1114 report: ExecutionReport,
1115 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1116) {
1117 match report {
1118 ExecutionReport::Order(order_report) => {
1119 let exec_report = NautilusExecutionReport::OrderStatus(Box::new(order_report));
1120 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1121 tracing::warn!("Failed to send order status report: {e}");
1122 }
1123 }
1124 ExecutionReport::Fill(fill_report) => {
1125 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1126 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1127 tracing::warn!("Failed to send fill report: {e}");
1128 }
1129 }
1130 }
1131}
1132
1133fn dispatch_order_event(
1134 event: OrderEventAny,
1135 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1136) {
1137 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
1138 tracing::warn!("Failed to send order event: {e}");
1139 }
1140}
1141
1142fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
1143 value.map(|nanos| nanos.to_datetime_utc())
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148 use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
1149 use nautilus_core::UnixNanos;
1150 use nautilus_model::identifiers::{
1151 ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
1152 };
1153 use rstest::rstest;
1154
1155 #[rstest]
1156 fn test_batch_cancel_orders_builds_payload() {
1157 let trader_id = TraderId::from("TRADER-001");
1158 let strategy_id = StrategyId::from("STRATEGY-001");
1159 let client_id = Some(ClientId::from("OKX"));
1160 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1161 let client_order_id1 = ClientOrderId::new("order1");
1162 let client_order_id2 = ClientOrderId::new("order2");
1163 let venue_order_id1 = VenueOrderId::new("venue1");
1164 let venue_order_id2 = VenueOrderId::new("venue2");
1165
1166 let cmd = BatchCancelOrders {
1167 trader_id,
1168 client_id,
1169 strategy_id,
1170 instrument_id,
1171 cancels: vec![
1172 CancelOrder {
1173 trader_id,
1174 client_id,
1175 strategy_id,
1176 instrument_id,
1177 client_order_id: client_order_id1,
1178 venue_order_id: Some(venue_order_id1),
1179 command_id: Default::default(),
1180 ts_init: UnixNanos::default(),
1181 params: None,
1182 },
1183 CancelOrder {
1184 trader_id,
1185 client_id,
1186 strategy_id,
1187 instrument_id,
1188 client_order_id: client_order_id2,
1189 venue_order_id: Some(venue_order_id2),
1190 command_id: Default::default(),
1191 ts_init: UnixNanos::default(),
1192 params: None,
1193 },
1194 ],
1195 command_id: Default::default(),
1196 ts_init: UnixNanos::default(),
1197 params: None,
1198 };
1199
1200 let mut payload = Vec::with_capacity(cmd.cancels.len());
1202 for cancel in &cmd.cancels {
1203 payload.push((
1204 cancel.instrument_id,
1205 Some(cancel.client_order_id),
1206 cancel.venue_order_id,
1207 ));
1208 }
1209
1210 assert_eq!(payload.len(), 2);
1211 assert_eq!(payload[0].0, instrument_id);
1212 assert_eq!(payload[0].1, Some(client_order_id1));
1213 assert_eq!(payload[0].2, Some(venue_order_id1));
1214 assert_eq!(payload[1].0, instrument_id);
1215 assert_eq!(payload[1].1, Some(client_order_id2));
1216 assert_eq!(payload[1].2, Some(venue_order_id2));
1217 }
1218
1219 #[rstest]
1220 fn test_batch_cancel_orders_with_empty_cancels() {
1221 let cmd = BatchCancelOrders {
1222 trader_id: TraderId::from("TRADER-001"),
1223 client_id: Some(ClientId::from("OKX")),
1224 strategy_id: StrategyId::from("STRATEGY-001"),
1225 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1226 cancels: vec![],
1227 command_id: Default::default(),
1228 ts_init: UnixNanos::default(),
1229 params: None,
1230 };
1231
1232 let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1233 Vec::with_capacity(cmd.cancels.len());
1234 assert_eq!(payload.len(), 0);
1235 }
1236}