1use std::{
19 future::Future,
20 sync::{
21 Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::{Duration, Instant},
25};
26
27use anyhow::Context;
28use async_trait::async_trait;
29use chrono::{DateTime, Utc};
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32 clients::ExecutionClient,
33 live::{runner::get_exec_event_sender, runtime::get_runtime},
34 messages::{
35 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
36 execution::{
37 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
38 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
39 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
40 },
41 },
42};
43use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, time::get_atomic_clock_realtime};
44use nautilus_live::ExecutionClientCore;
45use nautilus_model::{
46 accounts::AccountAny,
47 enums::{AccountType, OmsType, OrderType},
48 events::{
49 AccountState, OrderCancelRejected, OrderEventAny, OrderModifyRejected, OrderRejected,
50 OrderSubmitted,
51 },
52 identifiers::{AccountId, ClientId, InstrumentId, Venue},
53 orders::Order,
54 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
55 types::{AccountBalance, MarginBalance},
56};
57use tokio::task::JoinHandle;
58
59use crate::{
60 common::{
61 consts::{OKX_CONDITIONAL_ORDER_TYPES, OKX_VENUE},
62 enums::{OKXInstrumentType, OKXMarginMode, OKXTradeMode},
63 },
64 config::OKXExecClientConfig,
65 http::client::OKXHttpClient,
66 websocket::{
67 client::OKXWebSocketClient,
68 messages::{ExecutionReport, NautilusWsMessage},
69 },
70};
71
72#[derive(Debug)]
73pub struct OKXExecutionClient {
74 core: ExecutionClientCore,
75 config: OKXExecClientConfig,
76 http_client: OKXHttpClient,
77 ws_private: OKXWebSocketClient,
78 ws_business: OKXWebSocketClient,
79 trade_mode: OKXTradeMode,
80 exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
81 started: bool,
82 connected: AtomicBool,
83 instruments_initialized: AtomicBool,
84 ws_stream_handle: Option<JoinHandle<()>>,
85 ws_business_stream_handle: Option<JoinHandle<()>>,
86 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
87}
88
89impl OKXExecutionClient {
90 pub fn new(core: ExecutionClientCore, config: OKXExecClientConfig) -> anyhow::Result<Self> {
96 let http_client = OKXHttpClient::with_credentials(
98 config.api_key.clone(),
99 config.api_secret.clone(),
100 config.api_passphrase.clone(),
101 config.base_url_http.clone(),
102 config.http_timeout_secs,
103 config.max_retries,
104 config.retry_delay_initial_ms,
105 config.retry_delay_max_ms,
106 config.is_demo,
107 config.http_proxy_url.clone(),
108 )?;
109
110 let account_id = core.account_id;
111 let ws_private = OKXWebSocketClient::with_credentials(
112 Some(config.ws_private_url()),
113 config.api_key.clone(),
114 config.api_secret.clone(),
115 config.api_passphrase.clone(),
116 Some(account_id),
117 Some(20), )
119 .context("failed to construct OKX private websocket client")?;
120
121 let ws_business = OKXWebSocketClient::with_credentials(
122 Some(config.ws_business_url()),
123 config.api_key.clone(),
124 config.api_secret.clone(),
125 config.api_passphrase.clone(),
126 Some(account_id),
127 Some(20), )
129 .context("failed to construct OKX business websocket client")?;
130
131 let trade_mode = Self::derive_trade_mode(core.account_type, &config);
132
133 Ok(Self {
134 core,
135 config,
136 http_client,
137 ws_private,
138 ws_business,
139 trade_mode,
140 exec_event_sender: None,
141 started: false,
142 connected: AtomicBool::new(false),
143 instruments_initialized: AtomicBool::new(false),
144 ws_stream_handle: None,
145 ws_business_stream_handle: None,
146 pending_tasks: Mutex::new(Vec::new()),
147 })
148 }
149
150 fn derive_trade_mode(account_type: AccountType, config: &OKXExecClientConfig) -> OKXTradeMode {
151 let is_cross_margin = config.margin_mode == Some(OKXMarginMode::Cross);
152
153 if account_type == AccountType::Cash {
154 if !config.use_spot_margin {
155 return OKXTradeMode::Cash;
156 }
157 return if is_cross_margin {
158 OKXTradeMode::Cross
159 } else {
160 OKXTradeMode::Isolated
161 };
162 }
163
164 if is_cross_margin {
165 OKXTradeMode::Cross
166 } else {
167 OKXTradeMode::Isolated
168 }
169 }
170
171 fn instrument_types(&self) -> Vec<OKXInstrumentType> {
172 if self.config.instrument_types.is_empty() {
173 vec![OKXInstrumentType::Spot]
174 } else {
175 self.config.instrument_types.clone()
176 }
177 }
178
179 async fn refresh_account_state(&self) -> anyhow::Result<()> {
180 let account_state = self
181 .http_client
182 .request_account_state(self.core.account_id)
183 .await
184 .context("failed to request OKX account state")?;
185
186 self.core.generate_account_state(
187 account_state.balances.clone(),
188 account_state.margins.clone(),
189 account_state.is_reported,
190 account_state.ts_event,
191 )
192 }
193
194 fn update_account_state(&self) -> anyhow::Result<()> {
195 let runtime = get_runtime();
196 runtime.block_on(self.refresh_account_state())
197 }
198
199 fn is_conditional_order(&self, order_type: OrderType) -> bool {
200 OKX_CONDITIONAL_ORDER_TYPES.contains(&order_type)
201 }
202
203 fn submit_regular_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
204 let order = cmd.order.clone();
205 let ws_private = self.ws_private.clone();
206 let trade_mode = self.trade_mode;
207
208 let exec_event_sender = self.exec_event_sender.clone();
209 let trader_id = self.core.trader_id;
210 let account_id = self.core.account_id;
211 let ts_init = cmd.ts_init;
212 let client_order_id = order.client_order_id();
213 let strategy_id = order.strategy_id();
214 let instrument_id = order.instrument_id();
215 let order_side = order.order_side();
216 let order_type = order.order_type();
217 let quantity = order.quantity();
218 let time_in_force = order.time_in_force();
219 let price = order.price();
220 let trigger_price = order.trigger_price();
221 let is_post_only = order.is_post_only();
222 let is_reduce_only = order.is_reduce_only();
223 let is_quote_quantity = order.is_quote_quantity();
224
225 self.spawn_task("submit_order", async move {
226 let result = ws_private
227 .submit_order(
228 trader_id,
229 strategy_id,
230 instrument_id,
231 trade_mode,
232 client_order_id,
233 order_side,
234 order_type,
235 quantity,
236 Some(time_in_force),
237 price,
238 trigger_price,
239 Some(is_post_only),
240 Some(is_reduce_only),
241 Some(is_quote_quantity),
242 None,
243 )
244 .await
245 .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"));
246
247 if let Err(e) = result {
248 let rejected_event = OrderRejected::new(
249 trader_id,
250 strategy_id,
251 instrument_id,
252 client_order_id,
253 account_id,
254 format!("submit-order-error: {e}").into(),
255 UUID4::new(),
256 ts_init,
257 get_atomic_clock_realtime().get_time_ns(),
258 false,
259 false,
260 );
261
262 if let Some(sender) = &exec_event_sender {
263 if let Err(send_err) = sender.send(ExecutionEvent::Order(
264 OrderEventAny::Rejected(rejected_event),
265 )) {
266 log::warn!("Failed to send OrderRejected event: {send_err}");
267 }
268 } else {
269 log::warn!("Cannot send OrderRejected: exec_event_sender not initialized");
270 }
271
272 return Err(e);
273 }
274
275 Ok(())
276 });
277
278 Ok(())
279 }
280
281 fn submit_conditional_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
282 let order = cmd.order.clone();
283 let trigger_price = order
284 .trigger_price()
285 .ok_or_else(|| anyhow::anyhow!("conditional order requires a trigger price"))?;
286 let http_client = self.http_client.clone();
287 let trade_mode = self.trade_mode;
288
289 let exec_event_sender = self.exec_event_sender.clone();
290 let trader_id = self.core.trader_id;
291 let account_id = self.core.account_id;
292 let ts_init = cmd.ts_init;
293 let client_order_id = order.client_order_id();
294 let strategy_id = order.strategy_id();
295 let instrument_id = order.instrument_id();
296 let order_side = order.order_side();
297 let order_type = order.order_type();
298 let quantity = order.quantity();
299 let trigger_type = order.trigger_type();
300 let price = order.price();
301 let is_reduce_only = order.is_reduce_only();
302
303 self.spawn_task("submit_algo_order", async move {
304 let result = http_client
305 .place_algo_order_with_domain_types(
306 instrument_id,
307 trade_mode,
308 client_order_id,
309 order_side,
310 order_type,
311 quantity,
312 trigger_price,
313 trigger_type,
314 price,
315 Some(is_reduce_only),
316 )
317 .await
318 .map_err(|e| anyhow::anyhow!("Submit algo order failed: {e}"));
319
320 if let Err(e) = result {
321 let rejected_event = OrderRejected::new(
322 trader_id,
323 strategy_id,
324 instrument_id,
325 client_order_id,
326 account_id,
327 format!("submit-order-error: {e}").into(),
328 UUID4::new(),
329 ts_init,
330 get_atomic_clock_realtime().get_time_ns(),
331 false,
332 false,
333 );
334
335 if let Some(sender) = &exec_event_sender {
336 if let Err(send_err) = sender.send(ExecutionEvent::Order(
337 OrderEventAny::Rejected(rejected_event),
338 )) {
339 log::warn!("Failed to send OrderRejected event: {send_err}");
340 }
341 } else {
342 log::warn!("Cannot send OrderRejected: exec_event_sender not initialized");
343 }
344
345 return Err(e);
346 }
347
348 Ok(())
349 });
350
351 Ok(())
352 }
353
354 fn cancel_ws_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
355 let ws_private = self.ws_private.clone();
356 let command = cmd.clone();
357
358 let exec_event_sender = self.exec_event_sender.clone();
359 let trader_id = self.core.trader_id;
360 let account_id = self.core.account_id;
361 let ts_init = cmd.ts_init;
362
363 self.spawn_task("cancel_order", async move {
364 let result = ws_private
365 .cancel_order(
366 command.trader_id,
367 command.strategy_id,
368 command.instrument_id,
369 Some(command.client_order_id),
370 command.venue_order_id,
371 )
372 .await
373 .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
374
375 if let Err(e) = result {
376 let rejected_event = OrderCancelRejected::new(
377 trader_id,
378 command.strategy_id,
379 command.instrument_id,
380 command.client_order_id,
381 format!("cancel-order-error: {e}").into(),
382 UUID4::new(),
383 get_atomic_clock_realtime().get_time_ns(),
384 ts_init,
385 false,
386 command.venue_order_id,
387 Some(account_id),
388 );
389
390 if let Some(sender) = &exec_event_sender {
391 if let Err(send_err) = sender.send(ExecutionEvent::Order(
392 OrderEventAny::CancelRejected(rejected_event),
393 )) {
394 log::warn!("Failed to send OrderCancelRejected event: {send_err}");
395 }
396 } else {
397 log::warn!(
398 "Cannot send OrderCancelRejected: exec_event_sender not initialized"
399 );
400 }
401
402 return Err(e);
403 }
404
405 Ok(())
406 });
407
408 Ok(())
409 }
410
411 fn mass_cancel_instrument(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
412 let ws_private = self.ws_private.clone();
413 self.spawn_task("mass_cancel_orders", async move {
414 ws_private.mass_cancel_orders(instrument_id).await?;
415 Ok(())
416 });
417 Ok(())
418 }
419
420 fn spawn_task<F>(&self, description: &'static str, fut: F)
421 where
422 F: Future<Output = anyhow::Result<()>> + Send + 'static,
423 {
424 let runtime = get_runtime();
425 let handle = runtime.spawn(async move {
426 if let Err(e) = fut.await {
427 log::warn!("{description} failed: {e:?}");
428 }
429 });
430
431 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
432 tasks.retain(|handle| !handle.is_finished());
433 tasks.push(handle);
434 }
435
436 fn abort_pending_tasks(&self) {
437 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
438 for handle in tasks.drain(..) {
439 handle.abort();
440 }
441 }
442
443 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
445 let account_id = self.core.account_id;
446
447 if self.core.cache().borrow().account(&account_id).is_some() {
448 log::info!("Account {account_id} registered");
449 return Ok(());
450 }
451
452 let start = Instant::now();
453 let timeout = Duration::from_secs_f64(timeout_secs);
454 let interval = Duration::from_millis(10);
455
456 loop {
457 tokio::time::sleep(interval).await;
458
459 if self.core.cache().borrow().account(&account_id).is_some() {
460 log::info!("Account {account_id} registered");
461 return Ok(());
462 }
463
464 if start.elapsed() >= timeout {
465 anyhow::bail!(
466 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
467 );
468 }
469 }
470 }
471}
472
473#[async_trait(?Send)]
474impl ExecutionClient for OKXExecutionClient {
475 fn is_connected(&self) -> bool {
476 self.connected.load(Ordering::Acquire)
477 }
478
479 fn client_id(&self) -> ClientId {
480 self.core.client_id
481 }
482
483 fn account_id(&self) -> AccountId {
484 self.core.account_id
485 }
486
487 fn venue(&self) -> Venue {
488 *OKX_VENUE
489 }
490
491 fn oms_type(&self) -> OmsType {
492 self.core.oms_type
493 }
494
495 fn get_account(&self) -> Option<AccountAny> {
496 self.core.get_account()
497 }
498
499 async fn connect(&mut self) -> anyhow::Result<()> {
500 if self.connected.load(Ordering::Acquire) {
501 return Ok(());
502 }
503
504 if self.exec_event_sender.is_none() {
506 self.exec_event_sender = Some(get_exec_event_sender());
507 }
508
509 let instrument_types = self.instrument_types();
510
511 if !self.instruments_initialized.load(Ordering::Acquire) {
512 let mut all_instruments = Vec::new();
513 for instrument_type in &instrument_types {
514 let instruments = self
515 .http_client
516 .request_instruments(*instrument_type, None)
517 .await
518 .with_context(|| {
519 format!("failed to request OKX instruments for {instrument_type:?}")
520 })?;
521
522 if instruments.is_empty() {
523 log::warn!("No instruments returned for {instrument_type:?}");
524 continue;
525 }
526
527 log::info!(
528 "Loaded {} {instrument_type:?} instruments",
529 instruments.len()
530 );
531
532 self.http_client.cache_instruments(instruments.clone());
533 all_instruments.extend(instruments);
534 }
535
536 {
538 let mut cache = self.core.cache().borrow_mut();
539 for instrument in &all_instruments {
540 if let Err(e) = cache.add_instrument(instrument.clone()) {
541 log::debug!("Instrument already in cache: {e}");
542 }
543 }
544 }
545
546 if !all_instruments.is_empty() {
547 self.ws_private.cache_instruments(all_instruments);
548 }
549 self.instruments_initialized.store(true, Ordering::Release);
550 }
551
552 let Some(sender) = self.exec_event_sender.as_ref() else {
553 log::error!("Execution event sender not initialized");
554 anyhow::bail!("Execution event sender not initialized");
555 };
556
557 self.ws_private.connect().await?;
558 self.ws_private.wait_until_active(10.0).await?;
559 log::info!("Connected to private WebSocket");
560
561 if self.ws_stream_handle.is_none() {
562 let stream = self.ws_private.stream();
563 let sender = sender.clone();
564 let handle = get_runtime().spawn(async move {
565 pin_mut!(stream);
566 while let Some(message) = stream.next().await {
567 dispatch_ws_message(message, &sender);
568 }
569 });
570 self.ws_stream_handle = Some(handle);
571 }
572
573 self.ws_business.connect().await?;
574 self.ws_business.wait_until_active(10.0).await?;
575 log::info!("Connected to business WebSocket");
576
577 if self.ws_business_stream_handle.is_none() {
578 let stream = self.ws_business.stream();
579 let sender = sender.clone();
580 let handle = get_runtime().spawn(async move {
581 pin_mut!(stream);
582 while let Some(message) = stream.next().await {
583 dispatch_ws_message(message, &sender);
584 }
585 });
586 self.ws_business_stream_handle = Some(handle);
587 }
588
589 for inst_type in &instrument_types {
590 log::info!("Subscribing to orders channel for {inst_type:?}");
591 self.ws_private.subscribe_orders(*inst_type).await?;
592
593 if self.config.use_fills_channel {
594 log::info!("Subscribing to fills channel for {inst_type:?}");
595 if let Err(e) = self.ws_private.subscribe_fills(*inst_type).await {
596 log::warn!("Failed to subscribe to fills channel ({inst_type:?}): {e}");
597 }
598 }
599 }
600
601 self.ws_private.subscribe_account().await?;
602
603 for inst_type in &instrument_types {
605 if *inst_type != OKXInstrumentType::Option {
606 self.ws_business.subscribe_orders_algo(*inst_type).await?;
607 }
608 }
609
610 let account_state = self
611 .http_client
612 .request_account_state(self.core.account_id)
613 .await
614 .context("failed to request OKX account state")?;
615
616 if !account_state.balances.is_empty() {
617 log::info!(
618 "Received account state with {} balance(s)",
619 account_state.balances.len()
620 );
621 }
622 dispatch_account_state(account_state, sender);
623
624 self.await_account_registered(30.0).await?;
626
627 self.connected.store(true, Ordering::Release);
628 log::info!("Connected: client_id={}", self.core.client_id);
629 Ok(())
630 }
631
632 async fn disconnect(&mut self) -> anyhow::Result<()> {
633 if !self.connected.load(Ordering::Acquire) {
634 return Ok(());
635 }
636
637 self.abort_pending_tasks();
638 self.http_client.cancel_all_requests();
639
640 if let Err(e) = self.ws_private.close().await {
641 log::warn!("Error closing private websocket: {e:?}");
642 }
643
644 if let Err(e) = self.ws_business.close().await {
645 log::warn!("Error closing business websocket: {e:?}");
646 }
647
648 if let Some(handle) = self.ws_stream_handle.take() {
649 handle.abort();
650 }
651
652 if let Some(handle) = self.ws_business_stream_handle.take() {
653 handle.abort();
654 }
655
656 self.connected.store(false, Ordering::Release);
657 log::info!("Disconnected: client_id={}", self.core.client_id);
658 Ok(())
659 }
660
661 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
662 self.update_account_state()
663 }
664
665 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
666 log::debug!(
667 "query_order not implemented for OKX execution client (client_order_id={})",
668 cmd.client_order_id
669 );
670 Ok(())
671 }
672
673 fn generate_account_state(
674 &self,
675 balances: Vec<AccountBalance>,
676 margins: Vec<MarginBalance>,
677 reported: bool,
678 ts_event: UnixNanos,
679 ) -> anyhow::Result<()> {
680 self.core
681 .generate_account_state(balances, margins, reported, ts_event)
682 }
683
684 fn start(&mut self) -> anyhow::Result<()> {
685 if self.started {
686 return Ok(());
687 }
688
689 self.started = true;
690
691 let http_client = self.http_client.clone();
693 let ws_private = self.ws_private.clone();
694 let instrument_types = self.config.instrument_types.clone();
695
696 get_runtime().spawn(async move {
697 let mut all_instruments = Vec::new();
698 for instrument_type in instrument_types {
699 match http_client.request_instruments(instrument_type, None).await {
700 Ok(instruments) => {
701 if instruments.is_empty() {
702 log::warn!("No instruments returned for {instrument_type:?}");
703 continue;
704 }
705 http_client.cache_instruments(instruments.clone());
706 all_instruments.extend(instruments);
707 }
708 Err(e) => {
709 log::error!("Failed to request instruments for {instrument_type:?}: {e}");
710 }
711 }
712 }
713
714 if all_instruments.is_empty() {
715 log::warn!(
716 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
717 );
718 } else {
719 ws_private.cache_instruments(all_instruments);
720 log::info!("Instruments initialized");
721 }
722 });
723
724 log::info!(
725 "Started: client_id={}, account_id={}, account_type={:?}, trade_mode={:?}, instrument_types={:?}, use_fills_channel={}, is_demo={}, http_proxy_url={:?}, ws_proxy_url={:?}",
726 self.core.client_id,
727 self.core.account_id,
728 self.core.account_type,
729 self.trade_mode,
730 self.config.instrument_types,
731 self.config.use_fills_channel,
732 self.config.is_demo,
733 self.config.http_proxy_url,
734 self.config.ws_proxy_url,
735 );
736 Ok(())
737 }
738
739 fn stop(&mut self) -> anyhow::Result<()> {
740 if !self.started {
741 return Ok(());
742 }
743
744 self.started = false;
745 self.connected.store(false, Ordering::Release);
746 if let Some(handle) = self.ws_stream_handle.take() {
747 handle.abort();
748 }
749 self.abort_pending_tasks();
750 log::info!("Stopped: client_id={}", self.core.client_id);
751 Ok(())
752 }
753
754 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
755 let order = &cmd.order;
756
757 if order.is_closed() {
758 let client_order_id = order.client_order_id();
759 log::warn!("Cannot submit closed order {client_order_id}");
760 return Ok(());
761 }
762
763 let event = OrderSubmitted::new(
764 self.core.trader_id,
765 order.strategy_id(),
766 order.instrument_id(),
767 order.client_order_id(),
768 self.core.account_id,
769 UUID4::new(),
770 cmd.ts_init,
771 get_atomic_clock_realtime().get_time_ns(),
772 );
773 if let Some(sender) = &self.exec_event_sender {
774 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
775 if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Submitted(event))) {
776 log::warn!("Failed to send OrderSubmitted event: {e}");
777 }
778 } else {
779 log::warn!("Cannot send OrderSubmitted: exec_event_sender not initialized");
780 }
781
782 if self.is_conditional_order(order.order_type()) {
783 self.submit_conditional_order(cmd)
784 } else {
785 self.submit_regular_order(cmd)
786 }
787 }
788
789 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
790 log::warn!(
791 "submit_order_list not yet implemented for OKX execution client (got {} orders)",
792 cmd.order_list.orders.len()
793 );
794 Ok(())
795 }
796
797 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
798 let ws_private = self.ws_private.clone();
799 let command = cmd.clone();
800
801 let exec_event_sender = self.exec_event_sender.clone();
803 let trader_id = self.core.trader_id;
804 let account_id = self.core.account_id;
805 let ts_init = cmd.ts_init;
806
807 self.spawn_task("modify_order", async move {
808 let result = ws_private
809 .modify_order(
810 command.trader_id,
811 command.strategy_id,
812 command.instrument_id,
813 Some(command.client_order_id),
814 command.price,
815 command.quantity,
816 command.venue_order_id,
817 )
818 .await
819 .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
820
821 if let Err(e) = result {
822 let rejected_event = OrderModifyRejected::new(
823 trader_id,
824 command.strategy_id,
825 command.instrument_id,
826 command.client_order_id,
827 format!("modify-order-error: {e}").into(),
828 UUID4::new(),
829 get_atomic_clock_realtime().get_time_ns(),
830 ts_init,
831 false,
832 command.venue_order_id,
833 Some(account_id),
834 );
835
836 if let Some(sender) = &exec_event_sender {
837 if let Err(send_err) = sender.send(ExecutionEvent::Order(
838 OrderEventAny::ModifyRejected(rejected_event),
839 )) {
840 log::warn!("Failed to send OrderModifyRejected event: {send_err}");
841 }
842 } else {
843 log::warn!(
844 "Cannot send OrderModifyRejected: exec_event_sender not initialized"
845 );
846 }
847
848 return Err(e);
849 }
850
851 Ok(())
852 });
853
854 Ok(())
855 }
856
857 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
858 self.cancel_ws_order(cmd)
859 }
860
861 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
862 if self.config.use_mm_mass_cancel {
863 self.mass_cancel_instrument(cmd.instrument_id)
865 } else {
866 let cache = self.core.cache().borrow();
868 let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None);
869
870 if open_orders.is_empty() {
871 log::debug!("No open orders to cancel for {}", cmd.instrument_id);
872 return Ok(());
873 }
874
875 let mut payload = Vec::with_capacity(open_orders.len());
876 for order in open_orders {
877 payload.push((
878 order.instrument_id(),
879 Some(order.client_order_id()),
880 order.venue_order_id(),
881 ));
882 }
883 drop(cache);
884
885 log::debug!(
886 "Canceling {} open orders for {} via batch cancel",
887 payload.len(),
888 cmd.instrument_id
889 );
890
891 let ws_private = self.ws_private.clone();
892 self.spawn_task("batch_cancel_orders", async move {
893 ws_private.batch_cancel_orders(payload).await?;
894 Ok(())
895 });
896
897 Ok(())
898 }
899 }
900
901 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
902 let mut payload = Vec::with_capacity(cmd.cancels.len());
903
904 for cancel in &cmd.cancels {
905 payload.push((
906 cancel.instrument_id,
907 Some(cancel.client_order_id),
908 cancel.venue_order_id,
909 ));
910 }
911
912 let ws_private = self.ws_private.clone();
913 self.spawn_task("batch_cancel_orders", async move {
914 ws_private.batch_cancel_orders(payload).await?;
915 Ok(())
916 });
917
918 Ok(())
919 }
920
921 async fn generate_order_status_report(
922 &self,
923 cmd: &GenerateOrderStatusReport,
924 ) -> anyhow::Result<Option<OrderStatusReport>> {
925 let Some(instrument_id) = cmd.instrument_id else {
926 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
927 return Ok(None);
928 };
929
930 let mut reports = self
931 .http_client
932 .request_order_status_reports(
933 self.core.account_id,
934 None,
935 Some(instrument_id),
936 None,
937 None,
938 false,
939 None,
940 )
941 .await?;
942
943 if let Some(client_order_id) = cmd.client_order_id {
944 reports.retain(|report| report.client_order_id == Some(client_order_id));
945 }
946
947 if let Some(venue_order_id) = cmd.venue_order_id {
948 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
949 }
950
951 Ok(reports.into_iter().next())
952 }
953
954 async fn generate_order_status_reports(
955 &self,
956 cmd: &GenerateOrderStatusReports,
957 ) -> anyhow::Result<Vec<OrderStatusReport>> {
958 let mut reports = Vec::new();
959
960 if let Some(instrument_id) = cmd.instrument_id {
961 let mut fetched = self
962 .http_client
963 .request_order_status_reports(
964 self.core.account_id,
965 None,
966 Some(instrument_id),
967 None,
968 None,
969 false,
970 None,
971 )
972 .await?;
973 reports.append(&mut fetched);
974 } else {
975 for inst_type in self.instrument_types() {
976 let mut fetched = self
977 .http_client
978 .request_order_status_reports(
979 self.core.account_id,
980 Some(inst_type),
981 None,
982 None,
983 None,
984 false,
985 None,
986 )
987 .await?;
988 reports.append(&mut fetched);
989 }
990 }
991
992 if cmd.open_only {
994 reports.retain(|r| r.order_status.is_open());
995 }
996
997 if let Some(start) = cmd.start {
999 reports.retain(|r| r.ts_last >= start);
1000 }
1001 if let Some(end) = cmd.end {
1002 reports.retain(|r| r.ts_last <= end);
1003 }
1004
1005 Ok(reports)
1006 }
1007
1008 async fn generate_fill_reports(
1009 &self,
1010 cmd: GenerateFillReports,
1011 ) -> anyhow::Result<Vec<FillReport>> {
1012 let start_dt = nanos_to_datetime(cmd.start);
1013 let end_dt = nanos_to_datetime(cmd.end);
1014 let mut reports = Vec::new();
1015
1016 if let Some(instrument_id) = cmd.instrument_id {
1017 let mut fetched = self
1018 .http_client
1019 .request_fill_reports(
1020 self.core.account_id,
1021 None,
1022 Some(instrument_id),
1023 start_dt,
1024 end_dt,
1025 None,
1026 )
1027 .await?;
1028 reports.append(&mut fetched);
1029 } else {
1030 for inst_type in self.instrument_types() {
1031 let mut fetched = self
1032 .http_client
1033 .request_fill_reports(
1034 self.core.account_id,
1035 Some(inst_type),
1036 None,
1037 start_dt,
1038 end_dt,
1039 None,
1040 )
1041 .await?;
1042 reports.append(&mut fetched);
1043 }
1044 }
1045
1046 if let Some(venue_order_id) = cmd.venue_order_id {
1047 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
1048 }
1049
1050 Ok(reports)
1051 }
1052
1053 async fn generate_position_status_reports(
1054 &self,
1055 cmd: &GeneratePositionStatusReports,
1056 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1057 let mut reports = Vec::new();
1058
1059 if let Some(instrument_id) = cmd.instrument_id {
1062 let mut fetched = self
1063 .http_client
1064 .request_position_status_reports(self.core.account_id, None, Some(instrument_id))
1065 .await?;
1066 reports.append(&mut fetched);
1067 } else {
1068 for inst_type in self.instrument_types() {
1069 if inst_type == OKXInstrumentType::Spot || inst_type == OKXInstrumentType::Margin {
1071 continue;
1072 }
1073 let mut fetched = self
1074 .http_client
1075 .request_position_status_reports(self.core.account_id, Some(inst_type), None)
1076 .await?;
1077 reports.append(&mut fetched);
1078 }
1079 }
1080
1081 let mut margin_reports = self
1084 .http_client
1085 .request_spot_margin_position_reports(self.core.account_id)
1086 .await?;
1087
1088 if let Some(instrument_id) = cmd.instrument_id {
1089 margin_reports.retain(|report| report.instrument_id == instrument_id);
1090 }
1091
1092 reports.append(&mut margin_reports);
1093
1094 let _ = nanos_to_datetime(cmd.start);
1095 let _ = nanos_to_datetime(cmd.end);
1096
1097 Ok(reports)
1098 }
1099
1100 async fn generate_mass_status(
1101 &self,
1102 lookback_mins: Option<u64>,
1103 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1104 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1105
1106 let ts_now = get_atomic_clock_realtime().get_time_ns();
1107
1108 let start = lookback_mins.map(|mins| {
1109 let lookback_ns = mins * 60 * 1_000_000_000;
1110 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1111 });
1112
1113 let order_cmd = GenerateOrderStatusReports::new(
1114 UUID4::new(),
1115 ts_now,
1116 false, None, start, None, None, None, );
1123
1124 let fill_cmd = GenerateFillReports::new(
1125 UUID4::new(),
1126 ts_now,
1127 None, None, start,
1130 None, None, None, );
1134
1135 let position_cmd = GeneratePositionStatusReports::new(
1136 UUID4::new(),
1137 ts_now,
1138 None, start,
1140 None, None, None, );
1144
1145 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1146 self.generate_order_status_reports(&order_cmd),
1147 self.generate_fill_reports(fill_cmd),
1148 self.generate_position_status_reports(&position_cmd),
1149 )?;
1150
1151 log::info!("Received {} OrderStatusReports", order_reports.len());
1152 log::info!("Received {} FillReports", fill_reports.len());
1153 log::info!("Received {} PositionReports", position_reports.len());
1154
1155 let mut mass_status = ExecutionMassStatus::new(
1156 self.core.client_id,
1157 self.core.account_id,
1158 *OKX_VENUE,
1159 ts_now,
1160 None,
1161 );
1162
1163 mass_status.add_order_reports(order_reports);
1164 mass_status.add_fill_reports(fill_reports);
1165 mass_status.add_position_reports(position_reports);
1166
1167 Ok(Some(mass_status))
1168 }
1169}
1170
1171fn dispatch_ws_message(
1172 message: NautilusWsMessage,
1173 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1174) {
1175 match message {
1176 NautilusWsMessage::AccountUpdate(state) => dispatch_account_state(state, sender),
1177 NautilusWsMessage::PositionUpdate(report) => {
1178 dispatch_position_status_report(report, sender);
1179 }
1180 NautilusWsMessage::ExecutionReports(reports) => {
1181 log::debug!("Processing {} execution report(s)", reports.len());
1182 for report in reports {
1183 dispatch_execution_report(report, sender);
1184 }
1185 }
1186 NautilusWsMessage::OrderAccepted(event) => {
1187 dispatch_order_event(OrderEventAny::Accepted(event), sender);
1188 }
1189 NautilusWsMessage::OrderCanceled(event) => {
1190 dispatch_order_event(OrderEventAny::Canceled(event), sender);
1191 }
1192 NautilusWsMessage::OrderExpired(event) => {
1193 dispatch_order_event(OrderEventAny::Expired(event), sender);
1194 }
1195 NautilusWsMessage::OrderRejected(event) => {
1196 dispatch_order_event(OrderEventAny::Rejected(event), sender);
1197 }
1198 NautilusWsMessage::OrderCancelRejected(event) => {
1199 dispatch_order_event(OrderEventAny::CancelRejected(event), sender);
1200 }
1201 NautilusWsMessage::OrderModifyRejected(event) => {
1202 dispatch_order_event(OrderEventAny::ModifyRejected(event), sender);
1203 }
1204 NautilusWsMessage::OrderTriggered(event) => {
1205 dispatch_order_event(OrderEventAny::Triggered(event), sender);
1206 }
1207 NautilusWsMessage::OrderUpdated(event) => {
1208 dispatch_order_event(OrderEventAny::Updated(event), sender);
1209 }
1210 NautilusWsMessage::Error(e) => {
1211 log::warn!(
1212 "Websocket error: code={} message={} conn_id={:?}",
1213 e.code,
1214 e.message,
1215 e.conn_id
1216 );
1217 }
1218 NautilusWsMessage::Reconnected => {
1219 log::info!("Websocket reconnected");
1220 }
1221 NautilusWsMessage::Authenticated => {
1222 log::debug!("Websocket authenticated");
1223 }
1224 NautilusWsMessage::Deltas(_)
1225 | NautilusWsMessage::Raw(_)
1226 | NautilusWsMessage::Data(_)
1227 | NautilusWsMessage::FundingRates(_)
1228 | NautilusWsMessage::Instrument(_) => {
1229 log::debug!("Ignoring websocket data message");
1230 }
1231 }
1232}
1233
1234fn dispatch_account_state(
1235 state: AccountState,
1236 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1237) {
1238 if let Err(e) = sender.send(ExecutionEvent::Account(state)) {
1239 log::warn!("Failed to send account state: {e}");
1240 }
1241}
1242
1243fn dispatch_position_status_report(
1244 report: PositionStatusReport,
1245 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1246) {
1247 let exec_report = NautilusExecutionReport::Position(Box::new(report));
1248 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1249 log::warn!("Failed to send position status report: {e}");
1250 }
1251}
1252
1253fn dispatch_execution_report(
1254 report: ExecutionReport,
1255 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1256) {
1257 match report {
1258 ExecutionReport::Order(order_report) => {
1259 let exec_report = NautilusExecutionReport::Order(Box::new(order_report));
1260 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1261 log::warn!("Failed to send order status report: {e}");
1262 }
1263 }
1264 ExecutionReport::Fill(fill_report) => {
1265 let exec_report = NautilusExecutionReport::Fill(Box::new(fill_report));
1266 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1267 log::warn!("Failed to send fill report: {e}");
1268 }
1269 }
1270 }
1271}
1272
1273fn dispatch_order_event(
1274 event: OrderEventAny,
1275 sender: &tokio::sync::mpsc::UnboundedSender<ExecutionEvent>,
1276) {
1277 if let Err(e) = sender.send(ExecutionEvent::Order(event)) {
1278 log::warn!("Failed to send order event: {e}");
1279 }
1280}
1281
1282fn nanos_to_datetime(value: Option<UnixNanos>) -> Option<DateTime<Utc>> {
1283 value.map(|nanos| nanos.to_datetime_utc())
1284}
1285
1286#[cfg(test)]
1287mod tests {
1288 use nautilus_common::messages::execution::{BatchCancelOrders, CancelOrder};
1289 use nautilus_core::UnixNanos;
1290 use nautilus_model::identifiers::{
1291 ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId,
1292 };
1293 use rstest::rstest;
1294
1295 #[rstest]
1296 fn test_batch_cancel_orders_builds_payload() {
1297 let trader_id = TraderId::from("TRADER-001");
1298 let strategy_id = StrategyId::from("STRATEGY-001");
1299 let client_id = Some(ClientId::from("OKX"));
1300 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1301 let client_order_id1 = ClientOrderId::new("order1");
1302 let client_order_id2 = ClientOrderId::new("order2");
1303 let venue_order_id1 = VenueOrderId::new("venue1");
1304 let venue_order_id2 = VenueOrderId::new("venue2");
1305
1306 let cmd = BatchCancelOrders {
1307 trader_id,
1308 client_id,
1309 strategy_id,
1310 instrument_id,
1311 cancels: vec![
1312 CancelOrder {
1313 trader_id,
1314 client_id,
1315 strategy_id,
1316 instrument_id,
1317 client_order_id: client_order_id1,
1318 venue_order_id: Some(venue_order_id1),
1319 command_id: Default::default(),
1320 ts_init: UnixNanos::default(),
1321 params: None,
1322 },
1323 CancelOrder {
1324 trader_id,
1325 client_id,
1326 strategy_id,
1327 instrument_id,
1328 client_order_id: client_order_id2,
1329 venue_order_id: Some(venue_order_id2),
1330 command_id: Default::default(),
1331 ts_init: UnixNanos::default(),
1332 params: None,
1333 },
1334 ],
1335 command_id: Default::default(),
1336 ts_init: UnixNanos::default(),
1337 params: None,
1338 };
1339
1340 let mut payload = Vec::with_capacity(cmd.cancels.len());
1342 for cancel in &cmd.cancels {
1343 payload.push((
1344 cancel.instrument_id,
1345 Some(cancel.client_order_id),
1346 cancel.venue_order_id,
1347 ));
1348 }
1349
1350 assert_eq!(payload.len(), 2);
1351 assert_eq!(payload[0].0, instrument_id);
1352 assert_eq!(payload[0].1, Some(client_order_id1));
1353 assert_eq!(payload[0].2, Some(venue_order_id1));
1354 assert_eq!(payload[1].0, instrument_id);
1355 assert_eq!(payload[1].1, Some(client_order_id2));
1356 assert_eq!(payload[1].2, Some(venue_order_id2));
1357 }
1358
1359 #[rstest]
1360 fn test_batch_cancel_orders_with_empty_cancels() {
1361 let cmd = BatchCancelOrders {
1362 trader_id: TraderId::from("TRADER-001"),
1363 client_id: Some(ClientId::from("OKX")),
1364 strategy_id: StrategyId::from("STRATEGY-001"),
1365 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
1366 cancels: vec![],
1367 command_id: Default::default(),
1368 ts_init: UnixNanos::default(),
1369 params: None,
1370 };
1371
1372 let payload: Vec<(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>)> =
1373 Vec::with_capacity(cmd.cancels.len());
1374 assert_eq!(payload.len(), 0);
1375 }
1376}