1use std::{
19 future::Future,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::{StreamExt, pin_mut};
27use nautilus_common::{
28 clients::ExecutionClient,
29 live::{get_runtime, runner::get_exec_event_sender},
30 messages::execution::{
31 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34 },
35};
36use nautilus_core::{
37 MUTEX_POISONED, UUID4, UnixNanos,
38 time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce},
44 events::OrderEventAny,
45 identifiers::{
46 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
47 },
48 orders::Order,
49 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
50 types::{AccountBalance, MarginBalance, Price},
51};
52use tokio::task::JoinHandle;
53
54use crate::{
55 common::{consts::AX_VENUE, enums::AxOrderSide, parse::quantity_to_contracts},
56 config::AxExecClientConfig,
57 http::{client::AxHttpClient, models::PreviewAggressiveLimitOrderRequest},
58 websocket::{AxOrdersWsMessage, NautilusExecWsMessage, orders::AxOrdersWebSocketClient},
59};
60
61#[derive(Debug)]
63pub struct AxExecutionClient {
64 core: ExecutionClientCore,
65 clock: &'static AtomicTime,
66 config: AxExecClientConfig,
67 emitter: ExecutionEventEmitter,
68 http_client: AxHttpClient,
69 ws_orders: AxOrdersWebSocketClient,
70 ws_stream_handle: Option<JoinHandle<()>>,
71 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
72}
73
74impl AxExecutionClient {
75 pub fn new(core: ExecutionClientCore, config: AxExecClientConfig) -> anyhow::Result<Self> {
81 let http_client = AxHttpClient::with_credentials(
82 config.api_key.clone().unwrap_or_default(),
83 config.api_secret.clone().unwrap_or_default(),
84 Some(config.http_base_url()),
85 Some(config.orders_base_url()),
86 config.http_timeout_secs,
87 config.max_retries,
88 config.retry_delay_initial_ms,
89 config.retry_delay_max_ms,
90 config.http_proxy_url.clone(),
91 )?;
92
93 let clock = get_atomic_clock_realtime();
94 let trader_id = core.trader_id;
95 let account_id = core.account_id;
96 let emitter =
97 ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
98 let ws_orders = AxOrdersWebSocketClient::new(
99 config.ws_private_url(),
100 account_id,
101 trader_id,
102 config.heartbeat_interval_secs,
103 );
104
105 Ok(Self {
106 core,
107 clock,
108 config,
109 emitter,
110 http_client,
111 ws_orders,
112 ws_stream_handle: None,
113 pending_tasks: Mutex::new(Vec::new()),
114 })
115 }
116
117 async fn authenticate(&self) -> anyhow::Result<String> {
118 let api_key = self
119 .config
120 .api_key
121 .clone()
122 .or_else(|| std::env::var("AX_API_KEY").ok())
123 .context("AX_API_KEY not configured")?;
124
125 let api_secret = self
126 .config
127 .api_secret
128 .clone()
129 .or_else(|| std::env::var("AX_API_SECRET").ok())
130 .context("AX_API_SECRET not configured")?;
131
132 self.http_client
133 .authenticate(&api_key, &api_secret, 3600)
134 .await
135 .map_err(|e| anyhow::anyhow!("Authentication failed: {e}"))
136 }
137
138 async fn refresh_account_state(&self) -> anyhow::Result<()> {
139 let account_state = self
140 .http_client
141 .request_account_state(self.core.account_id)
142 .await
143 .context("failed to request AX account state")?;
144
145 let ts_event = self.clock.get_time_ns();
146 self.emitter.emit_account_state(
147 account_state.balances.clone(),
148 account_state.margins.clone(),
149 account_state.is_reported,
150 ts_event,
151 );
152 Ok(())
153 }
154
155 fn update_account_state(&self) -> anyhow::Result<()> {
156 let runtime = get_runtime();
157 runtime.block_on(self.refresh_account_state())
158 }
159
160 fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
161 let (
162 client_order_id,
163 strategy_id,
164 instrument_id,
165 order_side,
166 order_type,
167 quantity,
168 trigger_price,
169 time_in_force,
170 is_post_only,
171 limit_price,
172 ) = {
173 let cache = self.core.cache();
174 let order = cache.order(&cmd.client_order_id).ok_or_else(|| {
175 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
176 })?;
177 (
178 order.client_order_id(),
179 order.strategy_id(),
180 order.instrument_id(),
181 order.order_side(),
182 order.order_type(),
183 order.quantity(),
184 order.trigger_price(),
185 order.time_in_force(),
186 order.is_post_only(),
187 order.price(),
188 )
189 };
190
191 let ws_orders = self.ws_orders.clone();
192 let emitter = self.emitter.clone();
193 let clock = self.clock;
194 let trader_id = self.core.trader_id;
195
196 let http_client = if order_type == OrderType::Market {
197 Some(self.http_client.clone())
198 } else {
199 None
200 };
201
202 self.spawn_task("submit_order", async move {
203 let result: anyhow::Result<()> = async {
204 let price = if order_type == OrderType::Market {
206 let symbol = instrument_id.symbol.inner();
207 let ax_side = AxOrderSide::try_from(order_side)
208 .map_err(|e| anyhow::anyhow!("Invalid order side: {e}"))?;
209 let qty_contracts = quantity_to_contracts(quantity)?;
210
211 let request =
212 PreviewAggressiveLimitOrderRequest::new(symbol, qty_contracts, ax_side);
213 let response = http_client
214 .expect("HTTP client should be set for market orders")
215 .inner
216 .preview_aggressive_limit_order(&request)
217 .await
218 .map_err(|e| {
219 anyhow::anyhow!("Failed to preview aggressive limit order: {e}")
220 })?;
221
222 if response.remaining_quantity > 0 {
223 log::warn!(
224 "Market order book depth insufficient: \
225 filled_qty={} remaining_qty={} for {instrument_id}",
226 response.filled_quantity,
227 response.remaining_quantity,
228 );
229 }
230
231 let limit_price_decimal = response.limit_price.ok_or_else(|| {
232 anyhow::anyhow!(
233 "No liquidity available for market order on {instrument_id}"
234 )
235 })?;
236
237 let price = Price::from(limit_price_decimal.to_string().as_str());
238 log::info!("Market order take-through price: {price} for {instrument_id}",);
239 Some(price)
240 } else {
241 limit_price
242 };
243
244 ws_orders
245 .submit_order(
246 trader_id,
247 strategy_id,
248 instrument_id,
249 client_order_id,
250 order_side,
251 order_type,
252 quantity,
253 time_in_force,
254 price,
255 trigger_price,
256 is_post_only,
257 )
258 .await
259 .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"))?;
260
261 Ok(())
262 }
263 .await;
264
265 if let Err(e) = result {
266 let ts_event = clock.get_time_ns();
267 emitter.emit_order_rejected_event(
268 strategy_id,
269 instrument_id,
270 client_order_id,
271 &format!("submit-order-error: {e}"),
272 ts_event,
273 false,
274 );
275 anyhow::bail!("{e}");
276 }
277
278 Ok(())
279 });
280
281 Ok(())
282 }
283
284 fn cancel_order_internal(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
285 let ws_orders = self.ws_orders.clone();
286
287 let emitter = self.emitter.clone();
288 let clock = self.clock;
289 let instrument_id = cmd.instrument_id;
290 let client_order_id = cmd.client_order_id;
291 let venue_order_id = cmd.venue_order_id;
292 let strategy_id = cmd.strategy_id;
293
294 self.spawn_task("cancel_order", async move {
295 let result = ws_orders
296 .cancel_order(client_order_id, venue_order_id)
297 .await
298 .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
299
300 if let Err(e) = &result {
301 let ts_event = clock.get_time_ns();
302 emitter.emit_order_cancel_rejected_event(
303 strategy_id,
304 instrument_id,
305 client_order_id,
306 venue_order_id,
307 &format!("cancel-order-error: {e}"),
308 ts_event,
309 );
310 anyhow::bail!("{e}");
311 }
312
313 Ok(())
314 });
315
316 Ok(())
317 }
318
319 fn spawn_task<F>(&self, description: &'static str, fut: F)
320 where
321 F: Future<Output = anyhow::Result<()>> + Send + 'static,
322 {
323 let runtime = get_runtime();
324 let handle = runtime.spawn(async move {
325 if let Err(e) = fut.await {
326 log::warn!("{description} failed: {e}");
327 }
328 });
329
330 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
331 tasks.retain(|handle| !handle.is_finished());
332 tasks.push(handle);
333 }
334
335 fn abort_pending_tasks(&self) {
336 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
337 for handle in tasks.drain(..) {
338 handle.abort();
339 }
340 }
341
342 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
344 let account_id = self.core.account_id;
345
346 if self.core.cache().account(&account_id).is_some() {
347 log::info!("Account {account_id} registered");
348 return Ok(());
349 }
350
351 let start = Instant::now();
352 let timeout = Duration::from_secs_f64(timeout_secs);
353 let interval = Duration::from_millis(10);
354
355 loop {
356 tokio::time::sleep(interval).await;
357
358 if self.core.cache().account(&account_id).is_some() {
359 log::info!("Account {account_id} registered");
360 return Ok(());
361 }
362
363 if start.elapsed() >= timeout {
364 anyhow::bail!(
365 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
366 );
367 }
368 }
369 }
370}
371
372#[async_trait(?Send)]
373impl ExecutionClient for AxExecutionClient {
374 fn is_connected(&self) -> bool {
375 self.core.is_connected()
376 }
377
378 fn client_id(&self) -> ClientId {
379 self.core.client_id
380 }
381
382 fn account_id(&self) -> AccountId {
383 self.core.account_id
384 }
385
386 fn venue(&self) -> Venue {
387 *AX_VENUE
388 }
389
390 fn oms_type(&self) -> OmsType {
391 self.core.oms_type
392 }
393
394 fn get_account(&self) -> Option<AccountAny> {
395 self.core.cache().account(&self.core.account_id).cloned()
396 }
397
398 async fn connect(&mut self) -> anyhow::Result<()> {
399 if self.core.is_connected() {
400 return Ok(());
401 }
402
403 self.http_client.reset_cancellation_token();
405
406 if !self.core.instruments_initialized() {
407 let instruments = self
408 .http_client
409 .request_instruments(None, None)
410 .await
411 .context("failed to request AX instruments")?;
412
413 if instruments.is_empty() {
414 log::warn!("No instruments returned from AX");
415 } else {
416 log::info!("Loaded {} instruments", instruments.len());
417 self.http_client.cache_instruments(instruments.clone());
418
419 for instrument in instruments {
420 self.ws_orders.cache_instrument(instrument);
421 }
422 }
423 self.core.set_instruments_initialized();
424 }
425
426 let token = self.authenticate().await?;
427 self.ws_orders.connect(&token).await?;
428 log::info!("Connected to orders WebSocket");
429
430 if self.ws_stream_handle.is_none() {
431 let stream = self.ws_orders.stream();
432 let emitter = self.emitter.clone();
433
434 let handle = get_runtime().spawn(async move {
435 pin_mut!(stream);
436 while let Some(message) = stream.next().await {
437 dispatch_ws_message(message, &emitter);
438 }
439 });
440 self.ws_stream_handle = Some(handle);
441 }
442
443 let account_state = self
444 .http_client
445 .request_account_state(self.core.account_id)
446 .await
447 .context("failed to request AX account state")?;
448
449 if !account_state.balances.is_empty() {
450 log::info!(
451 "Received account state with {} balance(s)",
452 account_state.balances.len()
453 );
454 }
455 self.emitter.send_account_state(account_state);
456
457 self.await_account_registered(30.0).await?;
458
459 self.core.set_connected();
460 log::info!("Connected: client_id={}", self.core.client_id);
461 Ok(())
462 }
463
464 async fn disconnect(&mut self) -> anyhow::Result<()> {
465 if self.core.is_disconnected() {
466 return Ok(());
467 }
468
469 self.abort_pending_tasks();
470 self.http_client.cancel_all_requests();
471
472 self.ws_orders.close().await;
473
474 if let Some(handle) = self.ws_stream_handle.take() {
475 handle.abort();
476 }
477
478 self.core.set_disconnected();
479 log::info!("Disconnected: client_id={}", self.core.client_id);
480 Ok(())
481 }
482
483 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
484 self.update_account_state()
485 }
486
487 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
488 let http_client = self.http_client.clone();
489 let account_id = self.core.account_id;
490 let client_order_id = cmd.client_order_id;
491 let venue_order_id = cmd.venue_order_id;
492 let instrument_id = cmd.instrument_id;
493 let emitter = self.emitter.clone();
494
495 let (order_side, order_type, time_in_force) = {
497 let cache = self.core.cache();
498 match cache.order(&client_order_id) {
499 Some(order) => (
500 order.order_side(),
501 order.order_type(),
502 order.time_in_force(),
503 ),
504 None => (OrderSide::NoOrderSide, OrderType::Limit, TimeInForce::Gtc),
505 }
506 };
507
508 self.spawn_task("query_order", async move {
509 match http_client
510 .request_order_status(
511 account_id,
512 instrument_id,
513 Some(client_order_id),
514 venue_order_id,
515 order_side,
516 order_type,
517 time_in_force,
518 )
519 .await
520 {
521 Ok(report) => emitter.send_order_status_report(report),
522 Err(e) => log::error!("AX query order failed: {e}"),
523 }
524 Ok(())
525 });
526
527 Ok(())
528 }
529
530 fn generate_account_state(
531 &self,
532 balances: Vec<AccountBalance>,
533 margins: Vec<MarginBalance>,
534 reported: bool,
535 ts_event: UnixNanos,
536 ) -> anyhow::Result<()> {
537 self.emitter
538 .emit_account_state(balances, margins, reported, ts_event);
539 Ok(())
540 }
541
542 fn start(&mut self) -> anyhow::Result<()> {
543 if self.core.is_started() {
544 return Ok(());
545 }
546
547 self.emitter.set_sender(get_exec_event_sender());
548 self.core.set_started();
549 log::info!(
550 "Started: client_id={}, account_id={}, is_sandbox={}",
551 self.core.client_id,
552 self.core.account_id,
553 self.config.is_sandbox,
554 );
555 Ok(())
556 }
557
558 fn stop(&mut self) -> anyhow::Result<()> {
559 if self.core.is_stopped() {
560 return Ok(());
561 }
562
563 self.core.set_stopped();
564 self.core.set_disconnected();
565 if let Some(handle) = self.ws_stream_handle.take() {
566 handle.abort();
567 }
568 self.abort_pending_tasks();
569 log::info!("Stopped: client_id={}", self.core.client_id);
570 Ok(())
571 }
572
573 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
574 {
575 let cache = self.core.cache();
576 let order = cache.order(&cmd.client_order_id).ok_or_else(|| {
577 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
578 })?;
579
580 if order.is_closed() {
581 log::warn!("Cannot submit closed order {}", order.client_order_id());
582 return Ok(());
583 }
584
585 if !matches!(
586 order.order_type(),
587 OrderType::Market | OrderType::Limit | OrderType::StopLimit
588 ) {
589 self.emitter.emit_order_denied(
590 order,
591 &format!(
592 "Unsupported order type: {:?}. \
593 AX supports MARKET, LIMIT and STOP_LIMIT.",
594 order.order_type(),
595 ),
596 );
597 return Ok(());
598 }
599
600 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
601 self.emitter.emit_order_submitted(order);
602 }
603
604 self.submit_order_internal(cmd)
605 }
606
607 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
608 for (client_order_id, order_init) in cmd
609 .order_list
610 .client_order_ids
611 .iter()
612 .zip(cmd.order_inits.iter())
613 {
614 let submit_cmd = SubmitOrder::new(
615 cmd.trader_id,
616 cmd.client_id,
617 cmd.strategy_id,
618 cmd.instrument_id,
619 *client_order_id,
620 order_init.clone(),
621 cmd.exec_algorithm_id,
622 cmd.position_id,
623 cmd.params.clone(),
624 UUID4::new(),
625 cmd.ts_init,
626 );
627 self.submit_order(&submit_cmd)?;
628 }
629 Ok(())
630 }
631
632 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
633 let reason = "AX does not support order modification. Use cancel and resubmit instead.";
634 log::error!("{reason}");
635
636 let ts_event = self.clock.get_time_ns();
637 self.emitter.emit_order_modify_rejected_event(
638 cmd.strategy_id,
639 cmd.instrument_id,
640 cmd.client_order_id,
641 cmd.venue_order_id,
642 reason,
643 ts_event,
644 );
645 Ok(())
646 }
647
648 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
649 self.cancel_order_internal(cmd)
650 }
651
652 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
653 let cache = self.core.cache();
654 let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None, None);
655
656 if open_orders.is_empty() {
657 log::debug!("No open orders to cancel for {}", cmd.instrument_id);
658 return Ok(());
659 }
660
661 log::debug!(
662 "Canceling {} open orders for {}",
663 open_orders.len(),
664 cmd.instrument_id
665 );
666
667 let ts_init = self.clock.get_time_ns();
668
669 for order in open_orders {
670 let cancel_cmd = CancelOrder {
671 trader_id: cmd.trader_id,
672 client_id: cmd.client_id,
673 strategy_id: cmd.strategy_id,
674 instrument_id: order.instrument_id(),
675 client_order_id: order.client_order_id(),
676 venue_order_id: order.venue_order_id(),
677 command_id: UUID4::new(),
678 ts_init,
679 params: None,
680 };
681 self.cancel_order_internal(&cancel_cmd)?;
682 }
683
684 Ok(())
685 }
686
687 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
688 for cancel in &cmd.cancels {
689 self.cancel_order_internal(cancel)?;
690 }
691 Ok(())
692 }
693
694 async fn generate_order_status_report(
695 &self,
696 cmd: &GenerateOrderStatusReport,
697 ) -> anyhow::Result<Option<OrderStatusReport>> {
698 let cid_map = self.ws_orders.cid_to_client_order_id().clone();
699 let cid_resolver = move |cid: u64| cid_map.get(&cid).map(|v| *v);
700
701 let mut reports = self
702 .http_client
703 .request_order_status_reports(self.core.account_id, Some(cid_resolver))
704 .await?;
705
706 if let Some(instrument_id) = cmd.instrument_id {
707 reports.retain(|report| report.instrument_id == instrument_id);
708 }
709
710 if let Some(client_order_id) = cmd.client_order_id {
711 reports.retain(|report| report.client_order_id == Some(client_order_id));
712 }
713
714 if let Some(venue_order_id) = cmd.venue_order_id {
715 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
716 }
717
718 Ok(reports.into_iter().next())
719 }
720
721 async fn generate_order_status_reports(
722 &self,
723 cmd: &GenerateOrderStatusReports,
724 ) -> anyhow::Result<Vec<OrderStatusReport>> {
725 let cid_map = self.ws_orders.cid_to_client_order_id().clone();
726 let cid_resolver = move |cid: u64| cid_map.get(&cid).map(|v| *v);
727
728 let mut reports = self
729 .http_client
730 .request_order_status_reports(self.core.account_id, Some(cid_resolver))
731 .await?;
732
733 if let Some(instrument_id) = cmd.instrument_id {
734 reports.retain(|report| report.instrument_id == instrument_id);
735 }
736
737 if cmd.open_only {
738 reports.retain(|r| r.order_status.is_open());
739 }
740
741 if let Some(start) = cmd.start {
742 reports.retain(|r| r.ts_last >= start);
743 }
744 if let Some(end) = cmd.end {
745 reports.retain(|r| r.ts_last <= end);
746 }
747
748 Ok(reports)
749 }
750
751 async fn generate_fill_reports(
752 &self,
753 cmd: GenerateFillReports,
754 ) -> anyhow::Result<Vec<FillReport>> {
755 let mut reports = self
756 .http_client
757 .request_fill_reports(self.core.account_id)
758 .await?;
759
760 if let Some(instrument_id) = cmd.instrument_id {
761 reports.retain(|report| report.instrument_id == instrument_id);
762 }
763
764 if let Some(venue_order_id) = cmd.venue_order_id {
765 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
766 }
767
768 Ok(reports)
769 }
770
771 async fn generate_position_status_reports(
772 &self,
773 cmd: &GeneratePositionStatusReports,
774 ) -> anyhow::Result<Vec<PositionStatusReport>> {
775 let mut reports = self
776 .http_client
777 .request_position_reports(self.core.account_id)
778 .await?;
779
780 if let Some(instrument_id) = cmd.instrument_id {
781 reports.retain(|report| report.instrument_id == instrument_id);
782 }
783
784 Ok(reports)
785 }
786
787 async fn generate_mass_status(
788 &self,
789 lookback_mins: Option<u64>,
790 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
791 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
792
793 let ts_now = get_atomic_clock_realtime().get_time_ns();
794
795 let start = lookback_mins.map(|mins| {
796 let lookback_ns = mins * 60 * 1_000_000_000;
797 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
798 });
799
800 let order_cmd = GenerateOrderStatusReports::new(
801 UUID4::new(),
802 ts_now,
803 false, None, start,
806 None, None, None, );
810
811 let fill_cmd = GenerateFillReports::new(
812 UUID4::new(),
813 ts_now,
814 None, None, start,
817 None, None, None, );
821
822 let position_cmd = GeneratePositionStatusReports::new(
823 UUID4::new(),
824 ts_now,
825 None, start,
827 None, None, None, );
831
832 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
833 self.generate_order_status_reports(&order_cmd),
834 self.generate_fill_reports(fill_cmd),
835 self.generate_position_status_reports(&position_cmd),
836 )?;
837
838 log::info!("Received {} OrderStatusReports", order_reports.len());
839 log::info!("Received {} FillReports", fill_reports.len());
840 log::info!("Received {} PositionReports", position_reports.len());
841
842 let mut mass_status = ExecutionMassStatus::new(
843 self.core.client_id,
844 self.core.account_id,
845 *AX_VENUE,
846 ts_now,
847 None,
848 );
849
850 mass_status.add_order_reports(order_reports);
851 mass_status.add_fill_reports(fill_reports);
852 mass_status.add_position_reports(position_reports);
853
854 Ok(Some(mass_status))
855 }
856
857 fn register_external_order(
858 &self,
859 client_order_id: ClientOrderId,
860 venue_order_id: VenueOrderId,
861 instrument_id: InstrumentId,
862 strategy_id: StrategyId,
863 _ts_init: UnixNanos,
864 ) {
865 self.ws_orders.register_external_order(
866 client_order_id,
867 venue_order_id,
868 instrument_id,
869 strategy_id,
870 );
871 }
872}
873
874fn dispatch_ws_message(message: AxOrdersWsMessage, emitter: &ExecutionEventEmitter) {
876 match message {
877 AxOrdersWsMessage::Nautilus(message) => match message {
878 NautilusExecWsMessage::OrderAccepted(event) => {
879 log::debug!(
880 "Order accepted: {} {}",
881 event.client_order_id,
882 event.venue_order_id
883 );
884 emitter.send_order_event(OrderEventAny::Accepted(event));
885 }
886 NautilusExecWsMessage::OrderFilled(event) => {
887 log::debug!(
888 "Order filled: {} {} @ {}",
889 event.client_order_id,
890 event.last_qty,
891 event.last_px
892 );
893 emitter.send_order_event(OrderEventAny::Filled(*event));
894 }
895 NautilusExecWsMessage::OrderCanceled(event) => {
896 log::debug!("Order canceled: {}", event.client_order_id);
897 emitter.send_order_event(OrderEventAny::Canceled(event));
898 }
899 NautilusExecWsMessage::OrderExpired(event) => {
900 log::debug!("Order expired: {}", event.client_order_id);
901 emitter.send_order_event(OrderEventAny::Expired(event));
902 }
903 NautilusExecWsMessage::OrderRejected(event) => {
904 log::warn!("Order rejected: {}", event.client_order_id);
905 emitter.send_order_event(OrderEventAny::Rejected(event));
906 }
907 NautilusExecWsMessage::OrderCancelRejected(event) => {
908 log::warn!("Cancel rejected: {}", event.client_order_id);
909 emitter.send_order_event(OrderEventAny::CancelRejected(event));
910 }
911 NautilusExecWsMessage::OrderStatusReports(reports) => {
912 log::debug!("Order status reports: {}", reports.len());
913 for report in reports {
914 emitter.send_order_status_report(report);
915 }
916 }
917 NautilusExecWsMessage::FillReports(reports) => {
918 log::debug!("Fill reports: {}", reports.len());
919 for report in reports {
920 emitter.send_fill_report(report);
921 }
922 }
923 },
924 AxOrdersWsMessage::PlaceOrderResponse(resp) => {
925 log::debug!(
926 "Place order response: rid={} oid={}",
927 resp.rid,
928 resp.res.oid
929 );
930 }
931 AxOrdersWsMessage::CancelOrderResponse(resp) => {
932 log::debug!(
933 "Cancel order response: rid={} accepted={}",
934 resp.rid,
935 resp.res.cxl_rx
936 );
937 }
938 AxOrdersWsMessage::OpenOrdersResponse(resp) => {
939 log::debug!("Open orders response: {} orders", resp.res.len());
940 }
941 AxOrdersWsMessage::Error(err) => {
942 log::error!("WebSocket error: {}", err.message);
943 }
944 AxOrdersWsMessage::Reconnected => {
945 log::info!("WebSocket reconnected");
946 }
947 AxOrdersWsMessage::Authenticated => {
948 log::debug!("WebSocket authenticated");
949 }
950 }
951}