Skip to main content

nautilus_architect_ax/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the AX Exchange adapter.
17
18use std::{
19    future::Future,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::{StreamExt, pin_mut};
27use nautilus_common::{
28    clients::ExecutionClient,
29    live::{get_runtime, runner::get_exec_event_sender},
30    messages::execution::{
31        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34    },
35};
36use nautilus_core::{
37    MUTEX_POISONED, UUID4, UnixNanos,
38    time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce},
44    events::OrderEventAny,
45    identifiers::{
46        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
47    },
48    orders::Order,
49    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
50    types::{AccountBalance, MarginBalance, Price},
51};
52use tokio::task::JoinHandle;
53
54use crate::{
55    common::{consts::AX_VENUE, enums::AxOrderSide, parse::quantity_to_contracts},
56    config::AxExecClientConfig,
57    http::{client::AxHttpClient, models::PreviewAggressiveLimitOrderRequest},
58    websocket::{AxOrdersWsMessage, NautilusExecWsMessage, orders::AxOrdersWebSocketClient},
59};
60
61/// Live execution client for the AX Exchange.
62#[derive(Debug)]
63pub struct AxExecutionClient {
64    core: ExecutionClientCore,
65    clock: &'static AtomicTime,
66    config: AxExecClientConfig,
67    emitter: ExecutionEventEmitter,
68    http_client: AxHttpClient,
69    ws_orders: AxOrdersWebSocketClient,
70    ws_stream_handle: Option<JoinHandle<()>>,
71    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
72}
73
74impl AxExecutionClient {
75    /// Creates a new [`AxExecutionClient`].
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the client fails to initialize.
80    pub fn new(core: ExecutionClientCore, config: AxExecClientConfig) -> anyhow::Result<Self> {
81        let http_client = AxHttpClient::with_credentials(
82            config.api_key.clone().unwrap_or_default(),
83            config.api_secret.clone().unwrap_or_default(),
84            Some(config.http_base_url()),
85            Some(config.orders_base_url()),
86            config.http_timeout_secs,
87            config.max_retries,
88            config.retry_delay_initial_ms,
89            config.retry_delay_max_ms,
90            config.http_proxy_url.clone(),
91        )?;
92
93        let clock = get_atomic_clock_realtime();
94        let trader_id = core.trader_id;
95        let account_id = core.account_id;
96        let emitter =
97            ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
98        let ws_orders = AxOrdersWebSocketClient::new(
99            config.ws_private_url(),
100            account_id,
101            trader_id,
102            config.heartbeat_interval_secs,
103        );
104
105        Ok(Self {
106            core,
107            clock,
108            config,
109            emitter,
110            http_client,
111            ws_orders,
112            ws_stream_handle: None,
113            pending_tasks: Mutex::new(Vec::new()),
114        })
115    }
116
117    async fn authenticate(&self) -> anyhow::Result<String> {
118        let api_key = self
119            .config
120            .api_key
121            .clone()
122            .or_else(|| std::env::var("AX_API_KEY").ok())
123            .context("AX_API_KEY not configured")?;
124
125        let api_secret = self
126            .config
127            .api_secret
128            .clone()
129            .or_else(|| std::env::var("AX_API_SECRET").ok())
130            .context("AX_API_SECRET not configured")?;
131
132        self.http_client
133            .authenticate(&api_key, &api_secret, 3600)
134            .await
135            .map_err(|e| anyhow::anyhow!("Authentication failed: {e}"))
136    }
137
138    async fn refresh_account_state(&self) -> anyhow::Result<()> {
139        let account_state = self
140            .http_client
141            .request_account_state(self.core.account_id)
142            .await
143            .context("failed to request AX account state")?;
144
145        let ts_event = self.clock.get_time_ns();
146        self.emitter.emit_account_state(
147            account_state.balances.clone(),
148            account_state.margins.clone(),
149            account_state.is_reported,
150            ts_event,
151        );
152        Ok(())
153    }
154
155    fn update_account_state(&self) -> anyhow::Result<()> {
156        let runtime = get_runtime();
157        runtime.block_on(self.refresh_account_state())
158    }
159
160    fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
161        let (
162            client_order_id,
163            strategy_id,
164            instrument_id,
165            order_side,
166            order_type,
167            quantity,
168            trigger_price,
169            time_in_force,
170            is_post_only,
171            limit_price,
172        ) = {
173            let cache = self.core.cache();
174            let order = cache.order(&cmd.client_order_id).ok_or_else(|| {
175                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
176            })?;
177            (
178                order.client_order_id(),
179                order.strategy_id(),
180                order.instrument_id(),
181                order.order_side(),
182                order.order_type(),
183                order.quantity(),
184                order.trigger_price(),
185                order.time_in_force(),
186                order.is_post_only(),
187                order.price(),
188            )
189        };
190
191        let ws_orders = self.ws_orders.clone();
192        let emitter = self.emitter.clone();
193        let clock = self.clock;
194        let trader_id = self.core.trader_id;
195
196        let http_client = if order_type == OrderType::Market {
197            Some(self.http_client.clone())
198        } else {
199            None
200        };
201
202        self.spawn_task("submit_order", async move {
203            let result: anyhow::Result<()> = async {
204                // For market orders, get the take-through price from AX
205                let price = if order_type == OrderType::Market {
206                    let symbol = instrument_id.symbol.inner();
207                    let ax_side = AxOrderSide::try_from(order_side)
208                        .map_err(|e| anyhow::anyhow!("Invalid order side: {e}"))?;
209                    let qty_contracts = quantity_to_contracts(quantity)?;
210
211                    let request =
212                        PreviewAggressiveLimitOrderRequest::new(symbol, qty_contracts, ax_side);
213                    let response = http_client
214                        .expect("HTTP client should be set for market orders")
215                        .inner
216                        .preview_aggressive_limit_order(&request)
217                        .await
218                        .map_err(|e| {
219                            anyhow::anyhow!("Failed to preview aggressive limit order: {e}")
220                        })?;
221
222                    if response.remaining_quantity > 0 {
223                        log::warn!(
224                            "Market order book depth insufficient: \
225                             filled_qty={} remaining_qty={} for {instrument_id}",
226                            response.filled_quantity,
227                            response.remaining_quantity,
228                        );
229                    }
230
231                    let limit_price_decimal = response.limit_price.ok_or_else(|| {
232                        anyhow::anyhow!(
233                            "No liquidity available for market order on {instrument_id}"
234                        )
235                    })?;
236
237                    let price = Price::from(limit_price_decimal.to_string().as_str());
238                    log::info!("Market order take-through price: {price} for {instrument_id}",);
239                    Some(price)
240                } else {
241                    limit_price
242                };
243
244                ws_orders
245                    .submit_order(
246                        trader_id,
247                        strategy_id,
248                        instrument_id,
249                        client_order_id,
250                        order_side,
251                        order_type,
252                        quantity,
253                        time_in_force,
254                        price,
255                        trigger_price,
256                        is_post_only,
257                    )
258                    .await
259                    .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"))?;
260
261                Ok(())
262            }
263            .await;
264
265            if let Err(e) = result {
266                let ts_event = clock.get_time_ns();
267                emitter.emit_order_rejected_event(
268                    strategy_id,
269                    instrument_id,
270                    client_order_id,
271                    &format!("submit-order-error: {e}"),
272                    ts_event,
273                    false,
274                );
275                anyhow::bail!("{e}");
276            }
277
278            Ok(())
279        });
280
281        Ok(())
282    }
283
284    fn cancel_order_internal(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
285        let ws_orders = self.ws_orders.clone();
286
287        let emitter = self.emitter.clone();
288        let clock = self.clock;
289        let instrument_id = cmd.instrument_id;
290        let client_order_id = cmd.client_order_id;
291        let venue_order_id = cmd.venue_order_id;
292        let strategy_id = cmd.strategy_id;
293
294        self.spawn_task("cancel_order", async move {
295            let result = ws_orders
296                .cancel_order(client_order_id, venue_order_id)
297                .await
298                .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
299
300            if let Err(e) = &result {
301                let ts_event = clock.get_time_ns();
302                emitter.emit_order_cancel_rejected_event(
303                    strategy_id,
304                    instrument_id,
305                    client_order_id,
306                    venue_order_id,
307                    &format!("cancel-order-error: {e}"),
308                    ts_event,
309                );
310                anyhow::bail!("{e}");
311            }
312
313            Ok(())
314        });
315
316        Ok(())
317    }
318
319    fn spawn_task<F>(&self, description: &'static str, fut: F)
320    where
321        F: Future<Output = anyhow::Result<()>> + Send + 'static,
322    {
323        let runtime = get_runtime();
324        let handle = runtime.spawn(async move {
325            if let Err(e) = fut.await {
326                log::warn!("{description} failed: {e}");
327            }
328        });
329
330        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
331        tasks.retain(|handle| !handle.is_finished());
332        tasks.push(handle);
333    }
334
335    fn abort_pending_tasks(&self) {
336        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
337        for handle in tasks.drain(..) {
338            handle.abort();
339        }
340    }
341
342    /// Polls the cache until the account is registered or timeout is reached.
343    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
344        let account_id = self.core.account_id;
345
346        if self.core.cache().account(&account_id).is_some() {
347            log::info!("Account {account_id} registered");
348            return Ok(());
349        }
350
351        let start = Instant::now();
352        let timeout = Duration::from_secs_f64(timeout_secs);
353        let interval = Duration::from_millis(10);
354
355        loop {
356            tokio::time::sleep(interval).await;
357
358            if self.core.cache().account(&account_id).is_some() {
359                log::info!("Account {account_id} registered");
360                return Ok(());
361            }
362
363            if start.elapsed() >= timeout {
364                anyhow::bail!(
365                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
366                );
367            }
368        }
369    }
370}
371
372#[async_trait(?Send)]
373impl ExecutionClient for AxExecutionClient {
374    fn is_connected(&self) -> bool {
375        self.core.is_connected()
376    }
377
378    fn client_id(&self) -> ClientId {
379        self.core.client_id
380    }
381
382    fn account_id(&self) -> AccountId {
383        self.core.account_id
384    }
385
386    fn venue(&self) -> Venue {
387        *AX_VENUE
388    }
389
390    fn oms_type(&self) -> OmsType {
391        self.core.oms_type
392    }
393
394    fn get_account(&self) -> Option<AccountAny> {
395        self.core.cache().account(&self.core.account_id).cloned()
396    }
397
398    async fn connect(&mut self) -> anyhow::Result<()> {
399        if self.core.is_connected() {
400            return Ok(());
401        }
402
403        // Reset so requests work after a previous disconnect
404        self.http_client.reset_cancellation_token();
405
406        if !self.core.instruments_initialized() {
407            let instruments = self
408                .http_client
409                .request_instruments(None, None)
410                .await
411                .context("failed to request AX instruments")?;
412
413            if instruments.is_empty() {
414                log::warn!("No instruments returned from AX");
415            } else {
416                log::info!("Loaded {} instruments", instruments.len());
417                self.http_client.cache_instruments(instruments.clone());
418
419                for instrument in instruments {
420                    self.ws_orders.cache_instrument(instrument);
421                }
422            }
423            self.core.set_instruments_initialized();
424        }
425
426        let token = self.authenticate().await?;
427        self.ws_orders.connect(&token).await?;
428        log::info!("Connected to orders WebSocket");
429
430        if self.ws_stream_handle.is_none() {
431            let stream = self.ws_orders.stream();
432            let emitter = self.emitter.clone();
433
434            let handle = get_runtime().spawn(async move {
435                pin_mut!(stream);
436                while let Some(message) = stream.next().await {
437                    dispatch_ws_message(message, &emitter);
438                }
439            });
440            self.ws_stream_handle = Some(handle);
441        }
442
443        let account_state = self
444            .http_client
445            .request_account_state(self.core.account_id)
446            .await
447            .context("failed to request AX account state")?;
448
449        if !account_state.balances.is_empty() {
450            log::info!(
451                "Received account state with {} balance(s)",
452                account_state.balances.len()
453            );
454        }
455        self.emitter.send_account_state(account_state);
456
457        self.await_account_registered(30.0).await?;
458
459        self.core.set_connected();
460        log::info!("Connected: client_id={}", self.core.client_id);
461        Ok(())
462    }
463
464    async fn disconnect(&mut self) -> anyhow::Result<()> {
465        if self.core.is_disconnected() {
466            return Ok(());
467        }
468
469        self.abort_pending_tasks();
470        self.http_client.cancel_all_requests();
471
472        self.ws_orders.close().await;
473
474        if let Some(handle) = self.ws_stream_handle.take() {
475            handle.abort();
476        }
477
478        self.core.set_disconnected();
479        log::info!("Disconnected: client_id={}", self.core.client_id);
480        Ok(())
481    }
482
483    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
484        self.update_account_state()
485    }
486
487    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
488        let http_client = self.http_client.clone();
489        let account_id = self.core.account_id;
490        let client_order_id = cmd.client_order_id;
491        let venue_order_id = cmd.venue_order_id;
492        let instrument_id = cmd.instrument_id;
493        let emitter = self.emitter.clone();
494
495        // Read immutable order fields from cache before spawning
496        let (order_side, order_type, time_in_force) = {
497            let cache = self.core.cache();
498            match cache.order(&client_order_id) {
499                Some(order) => (
500                    order.order_side(),
501                    order.order_type(),
502                    order.time_in_force(),
503                ),
504                None => (OrderSide::NoOrderSide, OrderType::Limit, TimeInForce::Gtc),
505            }
506        };
507
508        self.spawn_task("query_order", async move {
509            match http_client
510                .request_order_status(
511                    account_id,
512                    instrument_id,
513                    Some(client_order_id),
514                    venue_order_id,
515                    order_side,
516                    order_type,
517                    time_in_force,
518                )
519                .await
520            {
521                Ok(report) => emitter.send_order_status_report(report),
522                Err(e) => log::error!("AX query order failed: {e}"),
523            }
524            Ok(())
525        });
526
527        Ok(())
528    }
529
530    fn generate_account_state(
531        &self,
532        balances: Vec<AccountBalance>,
533        margins: Vec<MarginBalance>,
534        reported: bool,
535        ts_event: UnixNanos,
536    ) -> anyhow::Result<()> {
537        self.emitter
538            .emit_account_state(balances, margins, reported, ts_event);
539        Ok(())
540    }
541
542    fn start(&mut self) -> anyhow::Result<()> {
543        if self.core.is_started() {
544            return Ok(());
545        }
546
547        self.emitter.set_sender(get_exec_event_sender());
548        self.core.set_started();
549        log::info!(
550            "Started: client_id={}, account_id={}, is_sandbox={}",
551            self.core.client_id,
552            self.core.account_id,
553            self.config.is_sandbox,
554        );
555        Ok(())
556    }
557
558    fn stop(&mut self) -> anyhow::Result<()> {
559        if self.core.is_stopped() {
560            return Ok(());
561        }
562
563        self.core.set_stopped();
564        self.core.set_disconnected();
565        if let Some(handle) = self.ws_stream_handle.take() {
566            handle.abort();
567        }
568        self.abort_pending_tasks();
569        log::info!("Stopped: client_id={}", self.core.client_id);
570        Ok(())
571    }
572
573    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
574        {
575            let cache = self.core.cache();
576            let order = cache.order(&cmd.client_order_id).ok_or_else(|| {
577                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
578            })?;
579
580            if order.is_closed() {
581                log::warn!("Cannot submit closed order {}", order.client_order_id());
582                return Ok(());
583            }
584
585            if !matches!(
586                order.order_type(),
587                OrderType::Market | OrderType::Limit | OrderType::StopLimit
588            ) {
589                self.emitter.emit_order_denied(
590                    order,
591                    &format!(
592                        "Unsupported order type: {:?}. \
593                         AX supports MARKET, LIMIT and STOP_LIMIT.",
594                        order.order_type(),
595                    ),
596                );
597                return Ok(());
598            }
599
600            log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
601            self.emitter.emit_order_submitted(order);
602        }
603
604        self.submit_order_internal(cmd)
605    }
606
607    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
608        for (client_order_id, order_init) in cmd
609            .order_list
610            .client_order_ids
611            .iter()
612            .zip(cmd.order_inits.iter())
613        {
614            let submit_cmd = SubmitOrder::new(
615                cmd.trader_id,
616                cmd.client_id,
617                cmd.strategy_id,
618                cmd.instrument_id,
619                *client_order_id,
620                order_init.clone(),
621                cmd.exec_algorithm_id,
622                cmd.position_id,
623                cmd.params.clone(),
624                UUID4::new(),
625                cmd.ts_init,
626            );
627            self.submit_order(&submit_cmd)?;
628        }
629        Ok(())
630    }
631
632    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
633        let reason = "AX does not support order modification. Use cancel and resubmit instead.";
634        log::error!("{reason}");
635
636        let ts_event = self.clock.get_time_ns();
637        self.emitter.emit_order_modify_rejected_event(
638            cmd.strategy_id,
639            cmd.instrument_id,
640            cmd.client_order_id,
641            cmd.venue_order_id,
642            reason,
643            ts_event,
644        );
645        Ok(())
646    }
647
648    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
649        self.cancel_order_internal(cmd)
650    }
651
652    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
653        let cache = self.core.cache();
654        let open_orders = cache.orders_open(None, Some(&cmd.instrument_id), None, None, None);
655
656        if open_orders.is_empty() {
657            log::debug!("No open orders to cancel for {}", cmd.instrument_id);
658            return Ok(());
659        }
660
661        log::debug!(
662            "Canceling {} open orders for {}",
663            open_orders.len(),
664            cmd.instrument_id
665        );
666
667        let ts_init = self.clock.get_time_ns();
668
669        for order in open_orders {
670            let cancel_cmd = CancelOrder {
671                trader_id: cmd.trader_id,
672                client_id: cmd.client_id,
673                strategy_id: cmd.strategy_id,
674                instrument_id: order.instrument_id(),
675                client_order_id: order.client_order_id(),
676                venue_order_id: order.venue_order_id(),
677                command_id: UUID4::new(),
678                ts_init,
679                params: None,
680            };
681            self.cancel_order_internal(&cancel_cmd)?;
682        }
683
684        Ok(())
685    }
686
687    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
688        for cancel in &cmd.cancels {
689            self.cancel_order_internal(cancel)?;
690        }
691        Ok(())
692    }
693
694    async fn generate_order_status_report(
695        &self,
696        cmd: &GenerateOrderStatusReport,
697    ) -> anyhow::Result<Option<OrderStatusReport>> {
698        let cid_map = self.ws_orders.cid_to_client_order_id().clone();
699        let cid_resolver = move |cid: u64| cid_map.get(&cid).map(|v| *v);
700
701        let mut reports = self
702            .http_client
703            .request_order_status_reports(self.core.account_id, Some(cid_resolver))
704            .await?;
705
706        if let Some(instrument_id) = cmd.instrument_id {
707            reports.retain(|report| report.instrument_id == instrument_id);
708        }
709
710        if let Some(client_order_id) = cmd.client_order_id {
711            reports.retain(|report| report.client_order_id == Some(client_order_id));
712        }
713
714        if let Some(venue_order_id) = cmd.venue_order_id {
715            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
716        }
717
718        Ok(reports.into_iter().next())
719    }
720
721    async fn generate_order_status_reports(
722        &self,
723        cmd: &GenerateOrderStatusReports,
724    ) -> anyhow::Result<Vec<OrderStatusReport>> {
725        let cid_map = self.ws_orders.cid_to_client_order_id().clone();
726        let cid_resolver = move |cid: u64| cid_map.get(&cid).map(|v| *v);
727
728        let mut reports = self
729            .http_client
730            .request_order_status_reports(self.core.account_id, Some(cid_resolver))
731            .await?;
732
733        if let Some(instrument_id) = cmd.instrument_id {
734            reports.retain(|report| report.instrument_id == instrument_id);
735        }
736
737        if cmd.open_only {
738            reports.retain(|r| r.order_status.is_open());
739        }
740
741        if let Some(start) = cmd.start {
742            reports.retain(|r| r.ts_last >= start);
743        }
744        if let Some(end) = cmd.end {
745            reports.retain(|r| r.ts_last <= end);
746        }
747
748        Ok(reports)
749    }
750
751    async fn generate_fill_reports(
752        &self,
753        cmd: GenerateFillReports,
754    ) -> anyhow::Result<Vec<FillReport>> {
755        let mut reports = self
756            .http_client
757            .request_fill_reports(self.core.account_id)
758            .await?;
759
760        if let Some(instrument_id) = cmd.instrument_id {
761            reports.retain(|report| report.instrument_id == instrument_id);
762        }
763
764        if let Some(venue_order_id) = cmd.venue_order_id {
765            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
766        }
767
768        Ok(reports)
769    }
770
771    async fn generate_position_status_reports(
772        &self,
773        cmd: &GeneratePositionStatusReports,
774    ) -> anyhow::Result<Vec<PositionStatusReport>> {
775        let mut reports = self
776            .http_client
777            .request_position_reports(self.core.account_id)
778            .await?;
779
780        if let Some(instrument_id) = cmd.instrument_id {
781            reports.retain(|report| report.instrument_id == instrument_id);
782        }
783
784        Ok(reports)
785    }
786
787    async fn generate_mass_status(
788        &self,
789        lookback_mins: Option<u64>,
790    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
791        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
792
793        let ts_now = get_atomic_clock_realtime().get_time_ns();
794
795        let start = lookback_mins.map(|mins| {
796            let lookback_ns = mins * 60 * 1_000_000_000;
797            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
798        });
799
800        let order_cmd = GenerateOrderStatusReports::new(
801            UUID4::new(),
802            ts_now,
803            false, // open_only
804            None,  // instrument_id
805            start,
806            None, // end
807            None, // params
808            None, // correlation_id
809        );
810
811        let fill_cmd = GenerateFillReports::new(
812            UUID4::new(),
813            ts_now,
814            None, // instrument_id
815            None, // venue_order_id
816            start,
817            None, // end
818            None, // params
819            None, // correlation_id
820        );
821
822        let position_cmd = GeneratePositionStatusReports::new(
823            UUID4::new(),
824            ts_now,
825            None, // instrument_id
826            start,
827            None, // end
828            None, // params
829            None, // correlation_id
830        );
831
832        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
833            self.generate_order_status_reports(&order_cmd),
834            self.generate_fill_reports(fill_cmd),
835            self.generate_position_status_reports(&position_cmd),
836        )?;
837
838        log::info!("Received {} OrderStatusReports", order_reports.len());
839        log::info!("Received {} FillReports", fill_reports.len());
840        log::info!("Received {} PositionReports", position_reports.len());
841
842        let mut mass_status = ExecutionMassStatus::new(
843            self.core.client_id,
844            self.core.account_id,
845            *AX_VENUE,
846            ts_now,
847            None,
848        );
849
850        mass_status.add_order_reports(order_reports);
851        mass_status.add_fill_reports(fill_reports);
852        mass_status.add_position_reports(position_reports);
853
854        Ok(Some(mass_status))
855    }
856
857    fn register_external_order(
858        &self,
859        client_order_id: ClientOrderId,
860        venue_order_id: VenueOrderId,
861        instrument_id: InstrumentId,
862        strategy_id: StrategyId,
863        _ts_init: UnixNanos,
864    ) {
865        self.ws_orders.register_external_order(
866            client_order_id,
867            venue_order_id,
868            instrument_id,
869            strategy_id,
870        );
871    }
872}
873
874/// Dispatches a WebSocket message using the event emitter.
875fn dispatch_ws_message(message: AxOrdersWsMessage, emitter: &ExecutionEventEmitter) {
876    match message {
877        AxOrdersWsMessage::Nautilus(message) => match message {
878            NautilusExecWsMessage::OrderAccepted(event) => {
879                log::debug!(
880                    "Order accepted: {} {}",
881                    event.client_order_id,
882                    event.venue_order_id
883                );
884                emitter.send_order_event(OrderEventAny::Accepted(event));
885            }
886            NautilusExecWsMessage::OrderFilled(event) => {
887                log::debug!(
888                    "Order filled: {} {} @ {}",
889                    event.client_order_id,
890                    event.last_qty,
891                    event.last_px
892                );
893                emitter.send_order_event(OrderEventAny::Filled(*event));
894            }
895            NautilusExecWsMessage::OrderCanceled(event) => {
896                log::debug!("Order canceled: {}", event.client_order_id);
897                emitter.send_order_event(OrderEventAny::Canceled(event));
898            }
899            NautilusExecWsMessage::OrderExpired(event) => {
900                log::debug!("Order expired: {}", event.client_order_id);
901                emitter.send_order_event(OrderEventAny::Expired(event));
902            }
903            NautilusExecWsMessage::OrderRejected(event) => {
904                log::warn!("Order rejected: {}", event.client_order_id);
905                emitter.send_order_event(OrderEventAny::Rejected(event));
906            }
907            NautilusExecWsMessage::OrderCancelRejected(event) => {
908                log::warn!("Cancel rejected: {}", event.client_order_id);
909                emitter.send_order_event(OrderEventAny::CancelRejected(event));
910            }
911            NautilusExecWsMessage::OrderStatusReports(reports) => {
912                log::debug!("Order status reports: {}", reports.len());
913                for report in reports {
914                    emitter.send_order_status_report(report);
915                }
916            }
917            NautilusExecWsMessage::FillReports(reports) => {
918                log::debug!("Fill reports: {}", reports.len());
919                for report in reports {
920                    emitter.send_fill_report(report);
921                }
922            }
923        },
924        AxOrdersWsMessage::PlaceOrderResponse(resp) => {
925            log::debug!(
926                "Place order response: rid={} oid={}",
927                resp.rid,
928                resp.res.oid
929            );
930        }
931        AxOrdersWsMessage::CancelOrderResponse(resp) => {
932            log::debug!(
933                "Cancel order response: rid={} accepted={}",
934                resp.rid,
935                resp.res.cxl_rx
936            );
937        }
938        AxOrdersWsMessage::OpenOrdersResponse(resp) => {
939            log::debug!("Open orders response: {} orders", resp.res.len());
940        }
941        AxOrdersWsMessage::Error(err) => {
942            log::error!("WebSocket error: {}", err.message);
943        }
944        AxOrdersWsMessage::Reconnected => {
945            log::info!("WebSocket reconnected");
946        }
947        AxOrdersWsMessage::Authenticated => {
948            log::debug!("WebSocket authenticated");
949        }
950    }
951}