1use std::{
19 future::Future,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use chrono::{DateTime, Utc};
27use futures_util::{StreamExt, pin_mut};
28use nautilus_common::{
29 clients::ExecutionClient,
30 live::{get_runtime, runner::get_exec_event_sender},
31 messages::execution::{
32 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
34 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
35 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
36 SubmitOrderList,
37 },
38};
39use nautilus_core::{
40 MUTEX_POISONED, UnixNanos,
41 time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45 accounts::AccountAny,
46 enums::{AccountType, OmsType, OrderType},
47 events::OrderEventAny,
48 identifiers::{
49 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, Venue, VenueOrderId,
50 },
51 orders::Order,
52 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
53 types::{AccountBalance, MarginBalance},
54};
55use tokio::task::JoinHandle;
56
57use crate::{
58 common::{
59 consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
60 enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
61 },
62 config::OKXExecClientConfig,
63 http::{client::OKXHttpClient, models::OKXCancelAlgoOrderRequest},
64 websocket::{
65 client::OKXWebSocketClient,
66 messages::{ExecutionReport, NautilusWsMessage},
67 },
68};
69
70#[derive(Debug)]
71pub struct OKXExecutionClient {
72 core: ExecutionClientCore,
73 clock: &'static AtomicTime,
74 config: OKXExecClientConfig,
75 emitter: ExecutionEventEmitter,
76 http_client: OKXHttpClient,
77 ws_private: OKXWebSocketClient,
78 ws_business: OKXWebSocketClient,
79 trade_mode: OKXTradeMode,
80 ws_stream_handle: Option<JoinHandle<()>>,
81 ws_business_stream_handle: Option<JoinHandle<()>>,
82 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
83}
84
85impl OKXExecutionClient {
86 pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
92 let http_client = OKXHttpClient::with_credentials(
94 config.api_key.clone(),
95 config.api_secret.clone(),
96 config.api_passphrase.clone(),
97 config.base_url_http.clone(),
98 config.http_timeout_secs,
99 config.max_retries,
100 config.retry_delay_initial_ms,
101 config.retry_delay_max_ms,
102 config.is_demo,
103 config.http_proxy_url.clone(),
104 )?;
105
106 let account_id = core.account_id;
107
108 let ws_private = OKXWebSocketClient::with_credentials(
109 Some(config.ws_private_url()),
110 config.api_key.clone(),
111 config.api_secret.clone(),
112 config.api_passphrase.clone(),
113 Some(account_id),
114 Some(20), )
116 .context("failed to construct OKX private websocket client")?;
117
118 let ws_business = OKXWebSocketClient::with_credentials(
119 Some(config.ws_business_url()),
120 config.api_key.clone(),
121 config.api_secret.clone(),
122 config.api_passphrase.clone(),
123 Some(account_id),
124 Some(20), )
126 .context("failed to construct OKX business websocket client")?;
127
128 let trade_mode = Self::derive_trade_mode(core.account_type, &config);
129 let clock = get_atomic_clock_realtime();
130 let emitter = ExecutionEventEmitter::new(
131 clock,
132 core.trader_id,
133 core.account_id,
134 core.account_type,
135 None,
136 );
137
138 Ok(Self {
139 core,
140 clock,
141 config,
142 emitter,
143 http_client,
144 ws_private,
145 ws_business,
146 trade_mode,
147 ws_stream_handle: None,
148 ws_business_stream_handle: None,
149 pending_tasks: Mutex::new(Vec::new()),
150 })
151 }
152
153 fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
154 let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
155
156 if account_type == AccountType::Cash {
157 if !config.use_spot_margin {
158 return OKXTradeMode::Cash;
159 }
160 return if is_cross_margin {
161 OKXTradeMode::Cross
162 } else {
163 OKXTradeMode::Isolated
164 };
165 }
166
167 if is_cross_margin {
168 OKXTradeMode::Cross
169 } else {
170 OKXTradeMode::Isolated
171 }
172 }
173
174 fn instrument_types(&self) -> Vec<OKXInstrumentType> {
175 if self.config.instrument_types.is_empty() {
176 vec![OKXInstrumentType::Spot]
177 } else {
178 self.config.instrument_types.clone()
179 }
180 }
181
182 async fn refresh_account_state(&self) -> anyhow::Result<()> {
183 let account_state = self
184 .http_client
185 .request_account_state(self.core.account_id)
186 .await
187 .context("failed to request OKX account state")?;
188
189 self.emitter.send_account_state(account_state);
190 Ok(())
191 }
192
193 fn update_account_state(&self) -> anyhow::Result<()> {
194 let runtime = get_runtime();
195 runtime.block_on(self.refresh_account_state())
196 }
197
198 fn is_conditional_order(&self, order_type: OrderType) -> bool {
199 OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
200 }
201
202 fn submit_regular_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
203 let order = {
204 let cache = self.core.cache();
205 cache
206 .order(&cmd.client_order_id)
207 .cloned()
208 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?
209 };
210 let ws_private = self.ws_private.clone();
211 let trade_mode = self.trade_mode;
212
213 let emitter = self.emitter.clone();
214 let clock = self.clock;
215 let trader_id = self.core.trader_id;
216 let client_order_id = order.client_order_id();
217 let strategy_id = order.strategy_id();
218 let instrument_id = order.instrument_id();
219 let order_side = order.order_side();
220 let order_type = order.order_type();
221 let quantity = order.quantity();
222 let time_in_force = order.time_in_force();
223 let price = order.price();
224 let trigger_price = order.trigger_price();
225 let is_post_only = order.is_post_only();
226 let is_reduce_only = order.is_reduce_only();
227 let is_quote_quantity = order.is_quote_quantity();
228
229 self.spawn_task("submit_order", async move {
230 let result = ws_private
231 .submit_order(
232 trader_id,
233 strategy_id,
234 instrument_id,
235 trade_mode,
236 client_order_id,
237 order_side,
238 order_type,
239 quantity,
240 Some(time_in_force),
241 price,
242 trigger_price,
243 Some(is_post_only),
244 Some(is_reduce_only),
245 Some(is_quote_quantity),
246 None,
247 )
248 .await
249 .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"));
250
251 if let Err(e) = result {
252 let ts_event = clock.get_time_ns();
253 emitter.emit_order_rejected_event(
254 strategy_id,
255 instrument_id,
256 client_order_id,
257 &format!("submit-order-error: {e}"),
258 ts_event,
259 false,
260 );
261 return Err(e);
262 }
263
264 Ok(())
265 });
266
267 Ok(())
268 }
269
270 fn submit_conditional_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
271 let order = {
272 let cache = self.core.cache();
273 cache
274 .order(&cmd.client_order_id)
275 .cloned()
276 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?
277 };
278 let trigger_price = order
279 .trigger_price()
280 .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
281 let http_client = self.http_client.clone();
282 let trade_mode = self.trade_mode;
283
284 let emitter = self.emitter.clone();
285 let clock = self.clock;
286 let client_order_id = order.client_order_id();
287 let strategy_id = order.strategy_id();
288 let instrument_id = order.instrument_id();
289 let order_side = order.order_side();
290 let order_type = order.order_type();
291 let quantity = order.quantity();
292 let trigger_type = order.trigger_type();
293 let price = order.price();
294 let is_reduce_only = order.is_reduce_only();
295
296 self.spawn_task("submit_algo_order", async move {
297 let result = http_client
298 .place_algo_order_with_domain_types(
299 instrument_id,
300 trade_mode,
301 client_order_id,
302 order_side,
303 order_type,
304 quantity,
305 trigger_price,
306 trigger_type,
307 price,
308 Some(is_reduce_only),
309 )
310 .await
311 .map_err(|e| anyhow::anyhow!("Submit algo order failed: {e}"));
312
313 if let Err(e) = result {
314 let ts_event = clock.get_time_ns();
315 emitter.emit_order_rejected_event(
316 strategy_id,
317 instrument_id,
318 client_order_id,
319 &format!("submit-order-error: {e}"),
320 ts_event,
321 false,
322 );
323 return Err(e);
324 }
325
326 Ok(())
327 });
328
329 Ok(())
330 }
331
332 fn cancel_ws_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
333 let ws_private = self.ws_private.clone();
334 let command = cmd.clone();
335
336 let emitter = self.emitter.clone();
337 let clock = self.clock;
338
339 self.spawn_task("cancel_order", async move {
340 let result = ws_private
341 .cancel_order(
342 command.trader_id,
343 command.strategy_id,
344 command.instrument_id,
345 Some(command.client_order_id),
346 command.venue_order_id,
347 )
348 .await
349 .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
350
351 if let Err(e) = result {
352 let ts_event = clock.get_time_ns();
353 emitter.emit_order_cancel_rejected_event(
354 command.strategy_id,
355 command.instrument_id,
356 command.client_order_id,
357 command.venue_order_id,
358 &format!("cancel-order-error: {e}"),
359 ts_event,
360 );
361 return Err(e);
362 }
363
364 Ok(())
365 });
366
367 Ok(())
368 }
369
370 fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
371 let ws_private = self.ws_private.clone();
372 self.spawn_task("mass_cancel_orders", async move {
373 ws_private.mass_cancel_orders(instrument_id).await?;
374 Ok(())
375 });
376 Ok(())
377 }
378
379 fn spawn_task<F>(&self, description: &'static str, fut: F)
380 where
381 F: Future<Output = anyhow::Result<()>> + Send + 'static,
382 {
383 let runtime = get_runtime();
384 let handle = runtime.spawn(async move {
385 if let Err(e) = fut.await {
386 log::warn!("{description} failed: {e:?}");
387 }
388 });
389
390 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
391 tasks.retain(|handle| !handle.is_finished());
392 tasks.push(handle);
393 }
394
395 fn abort_pending_tasks(&self) {
396 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
397 for handle in tasks.drain(..) {
398 handle.abort();
399 }
400 }
401
402 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
404 let account_id = self.core.account_id;
405
406 if self.core.cache().account(&account_id).is_some() {
407 log::info!("Account {account_id} registered");
408 return Ok(());
409 }
410
411 let start = Instant::now();
412 let timeout = Duration::from_secs_f64(timeout_secs);
413 let interval = Duration::from_millis(10);
414
415 loop {
416 tokio::time::sleep(interval).await;
417
418 if self.core.cache().account(&account_id).is_some() {
419 log::info!("Account {account_id} registered");
420 return Ok(());
421 }
422
423 if start.elapsed() >= timeout {
424 anyhow::bail!(
425 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
426 );
427 }
428 }
429 }
430}
431
432#[async_trait(?Send)]
433impl ExecutionClient for OKXExecutionClient {
434 fn is_connected(&self) -> bool {
435 self.core.is_connected()
436 }
437
438 fn client_id(&self) -> ClientId {
439 self.core.client_id
440 }
441
442 fn account_id(&self) -> AccountId {
443 self.core.account_id
444 }
445
446 fn venue(&self) -> Venue {
447 *OKX_VENUE
448 }
449
450 fn oms_type(&self) -> OmsType {
451 self.core.oms_type
452 }
453
454 fn get_account(&self) -> Option<AccountAny> {
455 self.core.cache().account(&self.core.account_id).cloned()
456 }
457
458 async fn connect(&mut self) -> anyhow::Result<()> {
459 if self.core.is_connected() {
460 return Ok(());
461 }
462
463 let instrument_types = self.instrument_types();
464
465 if !self.core.instruments_initialized() {
466 let mut all_instruments = Vec::new();
467 let mut all_inst_id_codes = Vec::new();
468
469 for instrument_type in &instrument_types {
470 let (instruments, inst_id_codes) = self
471 .http_client
472 .request_instruments(*instrument_type, None)
473 .await
474 .with_context(|| {
475 format!("failed to request OKX instruments for {instrument_type:?}")
476 })?;
477
478 if instruments.is_empty() {
479 log::warn!("No instruments returned for {instrument_type:?}");
480 continue;
481 }
482
483 log::info!(
484 "Loaded {} {instrument_type:?} instruments",
485 instruments.len()
486 );
487
488 self.http_client.cache_instruments(instruments.clone());
489 all_instruments.extend(instruments);
490 all_inst_id_codes.extend(inst_id_codes);
491 }
492
493 if !all_instruments.is_empty() {
494 self.ws_private.cache_instruments(all_instruments);
495 self.ws_private.cache_inst_id_codes(all_inst_id_codes);
496 }
497 self.core.set_instruments_initialized();
498 }
499
500 self.ws_private.connect().await?;
501 self.ws_private.wait_until_active(10.0).await?;
502 log::info!("Connected to private WebSocket");
503
504 if self.ws_stream_handle.is_none() {
505 let stream = self.ws_private.stream();
506 let emitter = self.emitter.clone();
507 let handle = get_runtime().spawn(async move {
508 pin_mut!(stream);
509 while let Some(message) = stream.next().await {
510 dispatch_ws_message(message, &emitter);
511 }
512 });
513 self.ws_stream_handle = Some(handle);
514 }
515
516 self.ws_business.connect().await?;
517 self.ws_business.wait_until_active(10.0).await?;
518 log::info!("Connected to business WebSocket");
519
520 if self.ws_business_stream_handle.is_none() {
521 let stream = self.ws_business.stream();
522 let emitter = self.emitter.clone();
523 let handle = get_runtime().spawn(async move {
524 pin_mut!(stream);
525 while let Some(message) = stream.next().await {
526 dispatch_ws_message(message, &emitter);
527 }
528 });
529 self.ws_business_stream_handle = Some(handle);
530 }
531
532 for inst_type in &instrument_types {
533 log::info!("Subscribing to orders channel for {inst_type:?}");
534 self.ws_private.subscribe_orders(*inst_type).await?;
535
536 if self.config.use_fills_channel {
537 log::info!("Subscribing to fills channel for {inst_type:?}");
538 if let Err(e) = self.ws_private.subscribe_fills(*inst_type).await {
539 log::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
540 }
541 }
542 }
543
544 self.ws_private.subscribe_account().await?;
545
546 for inst_type in &instrument_types {
548 if *inst_type != OKXInstrumentType::Option {
549 self.ws_business.subscribe_orders_algo(*inst_type).await?;
550 }
551 }
552
553 let account_state = self
554 .http_client
555 .request_account_state(self.core.account_id)
556 .await
557 .context("failed to request OKX account state")?;
558
559 if !account_state.balances.is_empty() {
560 log::info!(
561 "Received account state with {} balance(s)",
562 account_state.balances.len()
563 );
564 }
565 self.emitter.send_account_state(account_state);
566
567 self.await_account_registered(30.0).await?;
569
570 self.core.set_connected();
571 log::info!("Connected: client_id={}", self.core.client_id);
572 Ok(())
573 }
574
575 async fn disconnect(&mut self) -> anyhow::Result<()> {
576 if self.core.is_disconnected() {
577 return Ok(());
578 }
579
580 self.abort_pending_tasks();
581 self.http_client.cancel_all_requests();
582
583 if let Err(e) = self.ws_private.close().await {
584 log::warn!("Error closing private websocket: {e:?}");
585 }
586
587 if let Err(e) = self.ws_business.close().await {
588 log::warn!("Error closing business websocket: {e:?}");
589 }
590
591 if let Some(handle) = self.ws_stream_handle.take() {
592 handle.abort();
593 }
594
595 if let Some(handle) = self.ws_business_stream_handle.take() {
596 handle.abort();
597 }
598
599 self.core.set_disconnected();
600 log::info!("Disconnected: client_id={}", self.core.client_id);
601 Ok(())
602 }
603
604 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
605 self.update_account_state()
606 }
607
608 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
609 log::debug!(
610 "query_order not implemented for OKX execution client (client_order_id={})",
611 cmd.client_order_id
612 );
613 Ok(())
614 }
615
616 fn generate_account_state(
617 &self,
618 balances: Vec<AccountBalance>,
619 margins: Vec<MarginBalance>,
620 reported: bool,
621 ts_event: UnixNanos,
622 ) -> anyhow::Result<()> {
623 self.emitter
624 .emit_account_state(balances, margins, reported, ts_event);
625 Ok(())
626 }
627
628 fn start(&mut self) -> anyhow::Result<()> {
629 if self.core.is_started() {
630 return Ok(());
631 }
632
633 let sender = get_exec_event_sender();
634 self.emitter.set_sender(sender);
635 self.core.set_started();
636
637 let http_client = self.http_client.clone();
639 let ws_private = self.ws_private.clone();
640 let instrument_types = self.config.instrument_types.clone();
641
642 get_runtime().spawn(async move {
643 let mut all_instruments = Vec::new();
644 let mut all_inst_id_codes = Vec::new();
645
646 for instrument_type in instrument_types {
647 match http_client.request_instruments(instrument_type, None).await {
648 Ok((instruments, inst_id_codes)) => {
649 if instruments.is_empty() {
650 log::warn!("No instruments returned for {instrument_type:?}");
651 continue;
652 }
653 http_client.cache_instruments(instruments.clone());
654 all_instruments.extend(instruments);
655 all_inst_id_codes.extend(inst_id_codes);
656 }
657 Err(e) => {
658 log::error!("Failed to request instruments for {instrument_type:?}: {e}");
659 }
660 }
661 }
662
663 if all_instruments.is_empty() {
664 log::warn!(
665 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
666 );
667 } else {
668 ws_private.cache_instruments(all_instruments);
669 ws_private.cache_inst_id_codes(all_inst_id_codes);
670 log::info!("Instruments initialized");
671 }
672 });
673
674 log::info!(
675 "Started: client_id={}, account_id={}, account_type={:?}, trade_mode={:?}, instrument_types={:?}, use_fills_channel={}, is_demo={}, http_proxy_url={:?}, ws_proxy_url={:?}",
676 self.core.client_id,
677 self.core.account_id,
678 self.core.account_type,
679 self.trade_mode,
680 self.config.instrument_types,
681 self.config.use_fills_channel,
682 self.config.is_demo,
683 self.config.http_proxy_url,
684 self.config.ws_proxy_url,
685 );
686 Ok(())
687 }
688
689 fn stop(&mut self) -> anyhow::Result<()> {
690 if self.core.is_stopped() {
691 return Ok(());
692 }
693
694 self.core.set_stopped();
695 self.core.set_disconnected();
696 if let Some(handle) = self.ws_stream_handle.take() {
697 handle.abort();
698 }
699 self.abort_pending_tasks();
700 log::info!("Stopped: client_id={}", self.core.client_id);
701 Ok(())
702 }
703
704 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
705 let order_type = {
706 let cache = self.core.cache();
707 let order = cache
708 .order(&cmd.client_order_id)
709 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
710
711 if order.is_closed() {
712 log::warn!("Cannot submit closed order {}", order.client_order_id());
713 return Ok(());
714 }
715
716 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
717 self.emitter.emit_order_submitted(order);
718
719 order.order_type()
720 };
721
722 if self.is_conditional_order(order_type) {
723 self.submit_conditional_order(cmd)
724 } else {
725 self.submit_regular_order(cmd)
726 }
727 }
728
729 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
730 log::warn!(
731 "submit_order_list not yet implemented for OKX execution client (got {} orders)",
732 cmd.order_list.client_order_ids.len()
733 );
734 Ok(())
735 }
736
737 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
738 let ws_private = self.ws_private.clone();
739 let command = cmd.clone();
740
741 let emitter = self.emitter.clone();
742 let clock = self.clock;
743
744 self.spawn_task("modify_order", async move {
745 let result = ws_private
746 .modify_order(
747 command.trader_id,
748 command.strategy_id,
749 command.instrument_id,
750 Some(command.client_order_id),
751 command.price,
752 command.quantity,
753 command.venue_order_id,
754 )
755 .await
756 .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
757
758 if let Err(e) = result {
759 let ts_event = clock.get_time_ns();
760 emitter.emit_order_modify_rejected_event(
761 command.strategy_id,
762 command.instrument_id,
763 command.client_order_id,
764 command.venue_order_id,
765 &format!("modify-order-error: {e}"),
766 ts_event,
767 );
768 return Err(e);
769 }
770
771 Ok(())
772 });
773
774 Ok(())
775 }
776
777 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
778 self.cancel_ws_order(cmd)
779 }
780
781 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
782 if self.config.use_mm_mass_cancel {
783 self.mass_cancel_instrument(cmd.instrument_id)
785 } else {
786 let cache = self.core.cache();
788 let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None, None);
789
790 if open_orders.is_empty() {
791 log::debug!("No open orders to cancel for {}", cmd.instrument_id);
792 return Ok(());
793 }
794
795 let mut regular_payload = Vec::new();
796 let mut algo_orders: Vec<(
797 InstrumentId,
798 ClientOrderId,
799 Option<VenueOrderId>,
800 TraderId,
801 StrategyId,
802 )> = Vec::new();
803
804 for order in &open_orders {
805 let is_pending_algo = self.is_conditional_order(order.order_type())
807 && order.is_triggered() != Some(true);
808
809 if is_pending_algo {
810 algo_orders.push((
811 order.instrument_id(),
812 order.client_order_id(),
813 order.venue_order_id(),
814 order.trader_id(),
815 order.strategy_id(),
816 ));
817 } else {
818 regular_payload.push((
819 order.instrument_id(),
820 Some(order.client_order_id()),
821 order.venue_order_id(),
822 ));
823 }
824 }
825 drop(cache);
826
827 log::debug!(
828 "Canceling {} regular orders and {} algo orders for {}",
829 regular_payload.len(),
830 algo_orders.len(),
831 cmd.instrument_id
832 );
833
834 if !regular_payload.is_empty() {
835 let ws_private = self.ws_private.clone();
836 self.spawn_task("batch_cancel_orders", async move {
837 ws_private.batch_cancel_orders(regular_payload).await?;
838 Ok(())
839 });
840 }
841
842 if !algo_orders.is_empty() {
844 let http_client = self.http_client.clone();
845 let requests: Vec<OKXCancelAlgoOrderRequest> = algo_orders
846 .into_iter()
847 .map(
848 |(
849 instrument_id,
850 client_order_id,
851 venue_order_id,
852 _trader_id,
853 _strategy_id,
854 )| {
855 OKXCancelAlgoOrderRequest {
856 inst_id: instrument_id.symbol.to_string(),
857 inst_id_code: None,
858 algo_id: venue_order_id.map(|id| id.to_string()),
859 algo_cl_ord_id: if venue_order_id.is_none() {
860 Some(client_order_id.to_string())
861 } else {
862 None
863 },
864 }
865 },
866 )
867 .collect();
868
869 self.spawn_task("cancel_algo_orders", async move {
870 http_client.cancel_algo_orders(requests).await?;
871 Ok(())
872 });
873 }
874
875 Ok(())
876 }
877 }
878
879 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
880 let cache = self.core.cache();
881
882 let mut regular_payload = Vec::new();
883 let mut algo_orders = Vec::new();
884
885 for cancel in &cmd.cancels {
886 let is_pending_algo = cache.order(&cancel.client_order_id).is_some_and(|o| {
888 self.is_conditional_order(o.order_type()) && o.is_triggered() != Some(true)
889 });
890
891 if is_pending_algo {
892 algo_orders.push(cancel.clone());
893 } else {
894 regular_payload.push((
895 cancel.instrument_id,
896 Some(cancel.client_order_id),
897 cancel.venue_order_id,
898 ));
899 }
900 }
901 drop(cache);
902
903 if !regular_payload.is_empty() {
904 let ws_private = self.ws_private.clone();
905 self.spawn_task("batch_cancel_orders", async move {
906 ws_private.batch_cancel_orders(regular_payload).await?;
907 Ok(())
908 });
909 }
910
911 if !algo_orders.is_empty() {
913 let http_client = self.http_client.clone();
914 let requests: Vec<OKXCancelAlgoOrderRequest> = algo_orders
915 .into_iter()
916 .map(|cancel| OKXCancelAlgoOrderRequest {
917 inst_id: cancel.instrument_id.symbol.to_string(),
918 inst_id_code: None,
919 algo_id: cancel.venue_order_id.map(|id| id.to_string()),
920 algo_cl_ord_id: if cancel.venue_order_id.is_none() {
921 Some(cancel.client_order_id.to_string())
922 } else {
923 None
924 },
925 })
926 .collect();
927
928 self.spawn_task("cancel_algo_orders", async move {
929 http_client.cancel_algo_orders(requests).await?;
930 Ok(())
931 });
932 }
933
934 Ok(())
935 }
936
937 async fn generate_order_status_report(
938 &self,
939 cmd: &GenerateOrderStatusReport,
940 ) -> anyhow::Result<Option<OrderStatusReport>> {
941 let Some(instrument_id) = cmd.instrument_id else {
942 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
943 return Ok(None);
944 };
945
946 let mut reports = self
947 .http_client
948 .request_order_status_reports(
949 self.core.account_id,
950 None,
951 Some(instrument_id),
952 None,
953 None,
954 false,
955 None,
956 )
957 .await?;
958
959 if let Some(client_order_id) = cmd.client_order_id {
960 reports.retain(|report| report.client_order_id == Some(client_order_id));
961 }
962
963 if let Some(venue_order_id) = cmd.venue_order_id {
964 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
965 }
966
967 Ok(reports.into_iter().next())
968 }
969
970 async fn generate_order_status_reports(
971 &self,
972 cmd: &GenerateOrderStatusReports,
973 ) -> anyhow::Result<Vec<OrderStatusReport>> {
974 let mut reports = Vec::new();
975
976 if let Some(instrument_id) = cmd.instrument_id {
977 let mut fetched = self
978 .http_client
979 .request_order_status_reports(
980 self.core.account_id,
981 None,
982 Some(instrument_id),
983 None,
984 None,
985 false,
986 None,
987 )
988 .await?;
989 reports.append(&mut fetched);
990 } else {
991 for inst_type in self.instrument_types() {
992 let mut fetched = self
993 .http_client
994 .request_order_status_reports(
995 self.core.account_id,
996 Some(inst_type),
997 None,
998 None,
999 None,
1000 false,
1001 None,
1002 )
1003 .await?;
1004 reports.append(&mut fetched);
1005 }
1006 }
1007
1008 if cmd.open_only {
1010 reports.retain(|r| r.order_status.is_open());
1011 }
1012
1013 if let Some(start) = cmd.start {
1015 reports.retain(|r| r.ts_last >= start);
1016 }
1017 if let Some(end) = cmd.end {
1018 reports.retain(|r| r.ts_last <= end);
1019 }
1020
1021 Ok(reports)
1022 }
1023
1024 async fn generate_fill_reports(
1025 &self,
1026 cmd: GenerateFillReports,
1027 ) -> anyhow::Result<Vec<FillReport>> {
1028 let start_dt = nanos_to_datetime(cmd.start);
1029 let end_dt = nanos_to_datetime(cmd.end);
1030 let mut reports = Vec::new();
1031
1032 if let Some(instrument_id) = cmd.instrument_id {
1033 let mut fetched = self
1034 .http_client
1035 .request_fill_reports(
1036 self.core.account_id,
1037 None,
1038 Some(instrument_id),
1039 start_dt,
1040 end_dt,
1041 None,
1042 )
1043 .await?;
1044 reports.append(&mut fetched);
1045 } else {
1046 for inst_type in self.instrument_types() {
1047 let mut fetched = self
1048 .http_client
1049 .request_fill_reports(
1050 self.core.account_id,
1051 Some(inst_type),
1052 None,
1053 start_dt,
1054 end_dt,
1055 None,
1056 )
1057 .await?;
1058 reports.append(&mut fetched);
1059 }
1060 }
1061
1062 if let Some(venue_order_id) = cmd.venue_order_id {
1063 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
1064 }
1065
1066 Ok(reports)
1067 }
1068
1069 async fn generate_position_status_reports(
1070 &self,
1071 cmd: &GeneratePositionStatusReports,
1072 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1073 let mut reports = Vec::new();
1074
1075 if let Some(instrument_id) = cmd.instrument_id {
1078 let mut fetched = self
1079 .http_client
1080 .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
1081 .await?;
1082 reports.append(&mut fetched);
1083 } else {
1084 for inst_type in self.instrument_types() {
1085 if inst_type == OKXInstrumentType::Spot || inst_type == OKXInstrumentType::Margin {
1087 continue;
1088 }
1089 let mut fetched = self
1090 .http_client
1091 .request_position_status_reports(self.core.account_id, Some(inst_type), None)
1092 .await?;
1093 reports.append(&mut fetched);
1094 }
1095 }
1096
1097 let mut margin_reports = self
1100 .http_client
1101 .request_spot_margin_position_reports(self.core.account_id)
1102 .await?;
1103
1104 if let Some(instrument_id) = cmd.instrument_id {
1105 margin_reports.retain(|report| report.instrument_id == instrument_id);
1106 }
1107
1108 reports.append(&mut margin_reports);
1109
1110 let _ = nanos_to_datetime(cmd.start);
1111 let _ = nanos_to_datetime(cmd.end);
1112
1113 Ok(reports)
1114 }
1115
1116 async fn generate_mass_status(
1117 &self,
1118 lookback_mins: Option<u64>,
1119 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1120 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1121
1122 let ts_now = self.clock.get_time_ns();
1123
1124 let start = lookback_mins.map(|mins| {
1125 let lookback_ns = mins * 60 * 1_000_000_000;
1126 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1127 });
1128
1129 let order_cmd = GenerateOrderStatusReportsBuilder::default()
1130 .ts_init(ts_now)
1131 .open_only(false) .start(start)
1133 .build()
1134 .map_err(|e| anyhow::anyhow!("{e}"))?;
1135
1136 let fill_cmd = GenerateFillReportsBuilder::default()
1137 .ts_init(ts_now)
1138 .start(start)
1139 .build()
1140 .map_err(|e| anyhow::anyhow!("{e}"))?;
1141
1142 let position_cmd = GeneratePositionStatusReportsBuilder::default()
1143 .ts_init(ts_now)
1144 .start(start)
1145 .build()
1146 .map_err(|e| anyhow::anyhow!("{e}"))?;
1147
1148 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1149 self.generate_order_status_reports(&order_cmd),
1150 self.generate_fill_reports(fill_cmd),
1151 self.generate_position_status_reports(&position_cmd),
1152 )?;
1153
1154 log::info!("Received {} OrderStatusReports", order_reports.len());
1155 log::info!("Received {} FillReports", fill_reports.len());
1156 log::info!("Received {} PositionReports", position_reports.len());
1157
1158 let mut mass_status = ExecutionMassStatus::new(
1159 self.core.client_id,
1160 self.core.account_id,
1161 *OKX_VENUE,
1162 ts_now,
1163 None,
1164 );
1165
1166 mass_status.add_order_reports(order_reports);
1167 mass_status.add_fill_reports(fill_reports);
1168 mass_status.add_position_reports(position_reports);
1169
1170 Ok(Some(mass_status))
1171 }
1172}
1173
1174fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1176 match message {
1177 NautilusWsMessage::AccountUpdate(state) => {
1178 emitter.send_account_state(state);
1179 }
1180 NautilusWsMessage::PositionUpdate(report) => {
1181 emitter.send_position_report(report);
1182 }
1183 NautilusWsMessage::ExecutionReports(reports) => {
1184 log::debug!("Processing {} execution report(s)", reports.len());
1185 for report in reports {
1186 match report {
1187 ExecutionReport::Order(order_report) => {
1188 emitter.send_order_status_report(order_report);
1189 }
1190 ExecutionReport::Fill(fill_report) => {
1191 emitter.send_fill_report(fill_report);
1192 }
1193 }
1194 }
1195 }
1196 NautilusWsMessage::OrderAccepted(event) => {
1197 emitter.send_order_event(OrderEventAny::Accepted(event));
1198 }
1199 NautilusWsMessage::OrderCanceled(event) => {
1200 emitter.send_order_event(OrderEventAny::Canceled(event));
1201 }
1202 NautilusWsMessage::OrderExpired(event) => {
1203 emitter.send_order_event(OrderEventAny::Expired(event));
1204 }
1205 NautilusWsMessage::OrderRejected(event) => {
1206 emitter.send_order_event(OrderEventAny::Rejected(event));
1207 }
1208 NautilusWsMessage::OrderCancelRejected(event) => {
1209 emitter.send_order_event(OrderEventAny::CancelRejected(event));
1210 }
1211 NautilusWsMessage::OrderModifyRejected(event) => {
1212 emitter.send_order_event(OrderEventAny::ModifyRejected(event));
1213 }
1214 NautilusWsMessage::OrderTriggered(event) => {
1215 emitter.send_order_event(OrderEventAny::Triggered(event));
1216 }
1217 NautilusWsMessage::OrderUpdated(event) => {
1218 emitter.send_order_event(OrderEventAny::Updated(event));
1219 }
1220 NautilusWsMessage::Error(e) => {
1221 log::warn!(
1222 "Websocket error: code={} message={} conn_id={:?}",
1223 e.code,
1224 e.message,
1225 e.conn_id
1226 );
1227 }
1228 NautilusWsMessage::Reconnected => {
1229 log::info!("Websocket reconnected");
1230 }
1231 NautilusWsMessage::Authenticated => {
1232 log::debug!("Websocket authenticated");
1233 }
1234 NautilusWsMessage::Deltas(_)
1235 | NautilusWsMessage::Raw(_)
1236 | NautilusWsMessage::Data(_)
1237 | NautilusWsMessage::FundingRates(_)
1238 | NautilusWsMessage::Instrument(_) => {
1239 log::debug!("Ignoring websocket data message");
1240 }
1241 }
1242}
1243
1244fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
1245 value.map(|nanos| nanos.to_datetime_utc())
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250 use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
1251 use nautilus_core::UnixNanos;
1252 use nautilus_model::identifiers::{
1253 ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
1254 };
1255 use rstest::rstest;
1256
1257 #[rstest]
1258 fn test_batch_cancel_orders_builds_payload() {
1259 let trader_id = TraderId::from("TRADER-001");
1260 let strategy_id = StrategyId::from("STRATEGY-001");
1261 let client_id = Some(ClientId::from("OKX"));
1262 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1263 let client_order_id1 = ClientOrderId::new("order1");
1264 let client_order_id2 = ClientOrderId::new("order2");
1265 let venue_order_id1 = VenueOrderId::new("venue1");
1266 let venue_order_id2 = VenueOrderId::new("venue2");
1267
1268 let cmd = BatchCancelOrders {
1269 trader_id,
1270 client_id,
1271 strategy_id,
1272 instrument_id,
1273 cancels: vec![
1274 CancelOrder {
1275 trader_id,
1276 client_id,
1277 strategy_id,
1278 instrument_id,
1279 client_order_id: client_order_id1,
1280 venue_order_id: Some(venue_order_id1),
1281 command_id: Default::default(),
1282 ts_init: UnixNanos::default(),
1283 params: None,
1284 },
1285 CancelOrder {
1286 trader_id,
1287 client_id,
1288 strategy_id,
1289 instrument_id,
1290 client_order_id: client_order_id2,
1291 venue_order_id: Some(venue_order_id2),
1292 command_id: Default::default(),
1293 ts_init: UnixNanos::default(),
1294 params: None,
1295 },
1296 ],
1297 command_id: Default::default(),
1298 ts_init: UnixNanos::default(),
1299 params: None,
1300 };
1301
1302 let mut payload = Vec::with_capacity(cmd.cancels.len());
1304 for cancel in &cmd.cancels {
1305 payload.push((
1306 cancel.instrument_id,
1307 Some(cancel.client_order_id),
1308 cancel.venue_order_id,
1309 ));
1310 }
1311
1312 assert_eq!(payload.len(), 2);
1313 assert_eq!(payload[0].0, instrument_id);
1314 assert_eq!(payload[0].1, Some(client_order_id1));
1315 assert_eq!(payload[0].2, Some(venue_order_id1));
1316 assert_eq!(payload[1].0, instrument_id);
1317 assert_eq!(payload[1].1, Some(client_order_id2));
1318 assert_eq!(payload[1].2, Some(venue_order_id2));
1319 }
1320
1321 #[rstest]
1322 fn test_batch_cancel_orders_with_empty_cancels() {
1323 let cmd = BatchCancelOrders {
1324 trader_id: TraderId::from("TRADER-001"),
1325 client_id: Some(ClientId::from("OKX")),
1326 strategy_id: StrategyId::from("STRATEGY-001"),
1327 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1328 cancels: vec![],
1329 command_id: Default::default(),
1330 ts_init: UnixNanos::default(),
1331 params: None,
1332 };
1333
1334 let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1335 Vec::with_capacity(cmd.cancels.len());
1336 assert_eq!(payload.len(), 0);
1337 }
1338}