nautilus_architect_ax/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the AX Exchange adapter.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use dashmap::DashMap;
26use futures_util::StreamExt;
27use nautilus_common::{
28    clients::DataClient,
29    live::{runner::get_data_event_sender, runtime::get_runtime},
30    messages::{
31        DataEvent, DataResponse,
32        data::{
33            BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
34            RequestInstruments, SubscribeBars, SubscribeBookDeltas, SubscribeQuotes,
35            SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeQuotes,
36            UnsubscribeTrades,
37        },
38    },
39};
40use nautilus_core::{
41    datetime::datetime_to_unix_nanos,
42    time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_model::{
45    data::{BarType, Data, OrderBookDeltas_API},
46    identifiers::{ClientId, InstrumentId, Venue},
47    instruments::InstrumentAny,
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54    common::{
55        consts::AX_VENUE,
56        enums::{AxCandleWidth, AxMarketDataLevel},
57        parse::map_bar_spec_to_candle_width,
58    },
59    config::AxDataClientConfig,
60    http::client::AxHttpClient,
61    websocket::{data::client::AxMdWebSocketClient, messages::NautilusWsMessage},
62};
63
64/// AX Exchange data client for live market data streaming and historical data requests.
65///
66/// This client integrates with the Nautilus DataEngine to provide:
67/// - Real-time market data via WebSocket subscriptions
68/// - Historical data via REST API requests
69/// - Automatic instrument discovery and caching
70/// - Connection lifecycle management
71#[derive(Debug)]
72pub struct AxDataClient {
73    /// The client ID for this data client.
74    client_id: ClientId,
75    /// Configuration for the data client.
76    config: AxDataClientConfig,
77    /// HTTP client for REST API requests.
78    http_client: AxHttpClient,
79    /// WebSocket client for real-time data streaming.
80    ws_client: AxMdWebSocketClient,
81    /// Whether the client is currently connected.
82    is_connected: AtomicBool,
83    /// Cancellation token for async operations.
84    cancellation_token: CancellationToken,
85    /// Background task handles.
86    tasks: Vec<JoinHandle<()>>,
87    /// Channel sender for emitting data events to the DataEngine.
88    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
89    /// Cached instruments by symbol (shared with HTTP client).
90    instruments: Arc<DashMap<Ustr, InstrumentAny>>,
91    /// High-resolution clock for timestamps.
92    clock: &'static AtomicTime,
93    /// Active quote subscriptions.
94    active_quote_subs: Arc<DashMap<InstrumentId, ()>>,
95    /// Active trade subscriptions.
96    active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
97    /// Active order book subscriptions (maps instrument to level).
98    active_book_subs: Arc<DashMap<InstrumentId, AxMarketDataLevel>>,
99    /// Active bar subscriptions (maps instrument to candle width).
100    active_bar_subs: Arc<DashMap<InstrumentId, (BarType, AxCandleWidth)>>,
101}
102
103impl AxDataClient {
104    /// Creates a new [`AxDataClient`] instance.
105    ///
106    /// # Errors
107    ///
108    /// Returns an error if the data event sender cannot be obtained.
109    pub fn new(
110        client_id: ClientId,
111        config: AxDataClientConfig,
112        http_client: AxHttpClient,
113        ws_client: AxMdWebSocketClient,
114    ) -> anyhow::Result<Self> {
115        let clock = get_atomic_clock_realtime();
116        let data_sender = get_data_event_sender();
117
118        // Share instruments cache with HTTP client
119        let instruments = http_client.instruments_cache.clone();
120
121        Ok(Self {
122            client_id,
123            config,
124            http_client,
125            ws_client,
126            is_connected: AtomicBool::new(false),
127            cancellation_token: CancellationToken::new(),
128            tasks: Vec::new(),
129            data_sender,
130            instruments,
131            clock,
132            active_quote_subs: Arc::new(DashMap::new()),
133            active_trade_subs: Arc::new(DashMap::new()),
134            active_book_subs: Arc::new(DashMap::new()),
135            active_bar_subs: Arc::new(DashMap::new()),
136        })
137    }
138
139    /// Returns the venue for this data client.
140    #[must_use]
141    pub fn venue(&self) -> Venue {
142        *AX_VENUE
143    }
144
145    /// Returns a reference to the instruments cache.
146    #[must_use]
147    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
148        &self.instruments
149    }
150
151    /// Spawns a message handler task to forward WebSocket data to the DataEngine.
152    fn spawn_message_handler(&mut self) {
153        let mut ws_client = self.ws_client.clone();
154        let data_sender = self.data_sender.clone();
155        let cancellation_token = self.cancellation_token.clone();
156
157        let handle = get_runtime().spawn(async move {
158            let stream = ws_client.stream();
159            tokio::pin!(stream);
160
161            loop {
162                tokio::select! {
163                    () = cancellation_token.cancelled() => {
164                        log::debug!("Message handler cancelled");
165                        break;
166                    }
167                    msg = stream.next() => {
168                        match msg {
169                            Some(ws_msg) => {
170                                Self::handle_ws_message(ws_msg, &data_sender);
171                            }
172                            None => {
173                                log::debug!("WebSocket stream ended");
174                                break;
175                            }
176                        }
177                    }
178                }
179            }
180        });
181
182        self.tasks.push(handle);
183    }
184
185    /// Handles a WebSocket message and forwards data to the DataEngine.
186    fn handle_ws_message(
187        msg: NautilusWsMessage,
188        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
189    ) {
190        match msg {
191            NautilusWsMessage::Data(data_vec) => {
192                for data in data_vec {
193                    if let Err(e) = sender.send(DataEvent::Data(data)) {
194                        log::error!("Failed to send data event: {e}");
195                    }
196                }
197            }
198            NautilusWsMessage::Deltas(deltas) => {
199                let api_deltas = OrderBookDeltas_API::new(deltas);
200                if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
201                    log::error!("Failed to send deltas event: {e}");
202                }
203            }
204            NautilusWsMessage::Bar(bar) => {
205                if let Err(e) = sender.send(DataEvent::Data(Data::Bar(bar))) {
206                    log::error!("Failed to send bar event: {e}");
207                }
208            }
209            NautilusWsMessage::Heartbeat => {
210                log::trace!("Received heartbeat");
211            }
212            NautilusWsMessage::Reconnected => {
213                log::info!("WebSocket reconnected");
214            }
215            NautilusWsMessage::Error(err) => {
216                log::error!("WebSocket error: {err:?}");
217            }
218        }
219    }
220}
221
222#[async_trait(?Send)]
223impl DataClient for AxDataClient {
224    fn client_id(&self) -> ClientId {
225        self.client_id
226    }
227
228    fn venue(&self) -> Option<Venue> {
229        Some(*AX_VENUE)
230    }
231
232    fn start(&mut self) -> anyhow::Result<()> {
233        log::debug!("Starting {}", self.client_id);
234        Ok(())
235    }
236
237    fn stop(&mut self) -> anyhow::Result<()> {
238        log::debug!("Stopping {}", self.client_id);
239        self.cancellation_token.cancel();
240        Ok(())
241    }
242
243    fn reset(&mut self) -> anyhow::Result<()> {
244        log::debug!("Resetting {}", self.client_id);
245        self.cancellation_token.cancel();
246        self.tasks.clear();
247        self.cancellation_token = CancellationToken::new();
248        self.active_quote_subs.clear();
249        self.active_trade_subs.clear();
250        self.active_book_subs.clear();
251        self.active_bar_subs.clear();
252        Ok(())
253    }
254
255    fn dispose(&mut self) -> anyhow::Result<()> {
256        log::debug!("Disposing {}", self.client_id);
257        self.cancellation_token.cancel();
258        Ok(())
259    }
260
261    fn is_connected(&self) -> bool {
262        self.is_connected.load(Ordering::Acquire)
263    }
264
265    fn is_disconnected(&self) -> bool {
266        !self.is_connected()
267    }
268
269    async fn connect(&mut self) -> anyhow::Result<()> {
270        log::info!("Connecting {}", self.client_id);
271
272        if self.config.has_api_credentials() {
273            let api_key = self.config.api_key.as_ref().unwrap();
274            let api_secret = self.config.api_secret.as_ref().unwrap();
275            let token = self
276                .http_client
277                .authenticate(api_key, api_secret, 86400) // 24 hour token
278                .await
279                .context("Failed to authenticate with Ax")?;
280            log::debug!("Authenticated with Ax");
281            self.ws_client.set_auth_token(token);
282        }
283
284        let instruments = self
285            .http_client
286            .request_instruments(None, None)
287            .await
288            .context("Failed to fetch instruments")?;
289        self.http_client.cache_instruments(instruments);
290        log::info!(
291            "Cached {} instruments",
292            self.http_client.get_cached_symbols().len()
293        );
294
295        self.ws_client
296            .connect()
297            .await
298            .context("Failed to connect WebSocket")?;
299        log::info!("WebSocket connected");
300        self.spawn_message_handler();
301
302        self.is_connected.store(true, Ordering::Release);
303        log::info!("Connected {}", self.client_id);
304
305        Ok(())
306    }
307
308    async fn disconnect(&mut self) -> anyhow::Result<()> {
309        log::info!("Disconnecting {}", self.client_id);
310        self.cancellation_token.cancel();
311        self.ws_client.close().await;
312
313        for task in self.tasks.drain(..) {
314            task.abort();
315        }
316
317        self.is_connected.store(false, Ordering::Release);
318        log::info!("Disconnected {}", self.client_id);
319
320        Ok(())
321    }
322
323    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
324        let instrument_id = cmd.instrument_id;
325        let symbol = instrument_id.symbol.as_str();
326
327        if self.active_quote_subs.contains_key(&instrument_id) {
328            log::debug!("Already subscribed to quotes for {symbol}");
329            return Ok(());
330        }
331
332        log::debug!("Subscribing to quotes for {symbol}");
333
334        get_runtime().block_on(async {
335            self.ws_client
336                .subscribe(symbol, AxMarketDataLevel::Level1)
337                .await
338        })?;
339
340        self.active_quote_subs.insert(instrument_id, ());
341        Ok(())
342    }
343
344    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
345        let instrument_id = cmd.instrument_id;
346        let symbol = instrument_id.symbol.as_str();
347
348        if !self.active_quote_subs.contains_key(&instrument_id) {
349            log::debug!("Not subscribed to quotes for {symbol}");
350            return Ok(());
351        }
352
353        log::debug!("Unsubscribing from quotes for {symbol}");
354
355        get_runtime().block_on(async { self.ws_client.unsubscribe(symbol).await })?;
356
357        self.active_quote_subs.remove(&instrument_id);
358        Ok(())
359    }
360
361    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
362        let instrument_id = cmd.instrument_id;
363        let symbol = instrument_id.symbol.as_str();
364
365        if self.active_trade_subs.contains_key(&instrument_id) {
366            log::debug!("Already subscribed to trades for {symbol}");
367            return Ok(());
368        }
369
370        log::debug!("Subscribing to trades for {symbol}");
371
372        // Trades come with Level1 subscription
373        get_runtime().block_on(async {
374            self.ws_client
375                .subscribe(symbol, AxMarketDataLevel::Level1)
376                .await
377        })?;
378
379        self.active_trade_subs.insert(instrument_id, ());
380        Ok(())
381    }
382
383    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
384        let instrument_id = cmd.instrument_id;
385        let symbol = instrument_id.symbol.as_str();
386
387        if !self.active_trade_subs.contains_key(&instrument_id) {
388            log::debug!("Not subscribed to trades for {symbol}");
389            return Ok(());
390        }
391
392        log::debug!("Unsubscribing from trades for {symbol}");
393
394        get_runtime().block_on(async { self.ws_client.unsubscribe(symbol).await })?;
395
396        self.active_trade_subs.remove(&instrument_id);
397        Ok(())
398    }
399
400    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
401        let instrument_id = cmd.instrument_id;
402        let symbol = instrument_id.symbol.as_str();
403
404        if self.active_book_subs.contains_key(&instrument_id) {
405            log::debug!("Already subscribed to book deltas for {symbol}");
406            return Ok(());
407        }
408
409        // Use Level2 for order book deltas
410        let level = AxMarketDataLevel::Level2;
411        log::debug!("Subscribing to book deltas for {symbol} at {level:?}");
412
413        get_runtime().block_on(async { self.ws_client.subscribe(symbol, level).await })?;
414
415        self.active_book_subs.insert(instrument_id, level);
416        Ok(())
417    }
418
419    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
420        let instrument_id = cmd.instrument_id;
421        let symbol = instrument_id.symbol.as_str();
422
423        if !self.active_book_subs.contains_key(&instrument_id) {
424            log::debug!("Not subscribed to book deltas for {symbol}");
425            return Ok(());
426        }
427
428        log::debug!("Unsubscribing from book deltas for {symbol}");
429
430        get_runtime().block_on(async { self.ws_client.unsubscribe(symbol).await })?;
431
432        self.active_book_subs.remove(&instrument_id);
433        Ok(())
434    }
435
436    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
437        let bar_type = cmd.bar_type;
438        let instrument_id = bar_type.instrument_id();
439        let symbol = instrument_id.symbol.as_str();
440
441        if self.active_bar_subs.contains_key(&instrument_id) {
442            log::debug!("Already subscribed to bars for {symbol}");
443            return Ok(());
444        }
445
446        let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
447        log::debug!("Subscribing to bars for {bar_type} (width: {width:?})");
448
449        get_runtime().block_on(async { self.ws_client.subscribe_candles(symbol, width).await })?;
450
451        self.active_bar_subs
452            .insert(instrument_id, (bar_type, width));
453        Ok(())
454    }
455
456    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
457        let bar_type = cmd.bar_type;
458        let instrument_id = bar_type.instrument_id();
459        let symbol = instrument_id.symbol.as_str();
460
461        let width = match self.active_bar_subs.get(&instrument_id) {
462            Some(entry) => entry.value().1,
463            None => {
464                log::debug!("Not subscribed to bars for {symbol}");
465                return Ok(());
466            }
467        };
468
469        log::debug!("Unsubscribing from bars for {bar_type}");
470
471        get_runtime()
472            .block_on(async { self.ws_client.unsubscribe_candles(symbol, width).await })?;
473
474        self.active_bar_subs.remove(&instrument_id);
475        Ok(())
476    }
477
478    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
479        let http = self.http_client.clone();
480        let sender = self.data_sender.clone();
481        let request_id = request.request_id;
482        let client_id = request.client_id.unwrap_or(self.client_id);
483        let venue = *AX_VENUE;
484        let start_nanos = datetime_to_unix_nanos(request.start);
485        let end_nanos = datetime_to_unix_nanos(request.end);
486        let params = request.params.clone();
487        let clock = self.clock;
488
489        get_runtime().spawn(async move {
490            match http.request_instruments(None, None).await {
491                Ok(instruments) => {
492                    log::info!("Fetched {} instruments from Ax", instruments.len());
493                    http.cache_instruments(instruments.clone());
494
495                    let response = DataResponse::Instruments(InstrumentsResponse::new(
496                        request_id,
497                        client_id,
498                        venue,
499                        instruments,
500                        start_nanos,
501                        end_nanos,
502                        clock.get_time_ns(),
503                        params,
504                    ));
505
506                    if let Err(e) = sender.send(DataEvent::Response(response)) {
507                        log::error!("Failed to send instruments response: {e}");
508                    }
509                }
510                Err(e) => {
511                    log::error!("Failed to request instruments: {e}");
512                }
513            }
514        });
515
516        Ok(())
517    }
518
519    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
520        let http = self.http_client.clone();
521        let sender = self.data_sender.clone();
522        let request_id = request.request_id;
523        let client_id = request.client_id.unwrap_or(self.client_id);
524        let instrument_id = request.instrument_id;
525        let symbol = instrument_id.symbol.to_string();
526        let start_nanos = datetime_to_unix_nanos(request.start);
527        let end_nanos = datetime_to_unix_nanos(request.end);
528        let params = request.params.clone();
529        let clock = self.clock;
530
531        get_runtime().spawn(async move {
532            match http.request_instrument(&symbol, None, None).await {
533                Ok(instrument) => {
534                    log::debug!("Fetched instrument {symbol} from Ax");
535                    http.cache_instrument(instrument.clone());
536
537                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
538                        request_id,
539                        client_id,
540                        instrument_id,
541                        instrument,
542                        start_nanos,
543                        end_nanos,
544                        clock.get_time_ns(),
545                        params,
546                    )));
547
548                    if let Err(e) = sender.send(DataEvent::Response(response)) {
549                        log::error!("Failed to send instrument response: {e}");
550                    }
551                }
552                Err(e) => {
553                    log::error!("Failed to request instrument {symbol}: {e}");
554                }
555            }
556        });
557
558        Ok(())
559    }
560
561    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
562        let http = self.http_client.clone();
563        let sender = self.data_sender.clone();
564        let request_id = request.request_id;
565        let client_id = request.client_id.unwrap_or(self.client_id);
566        let bar_type = request.bar_type;
567        let symbol = bar_type.instrument_id().symbol.to_string();
568        let start_nanos = datetime_to_unix_nanos(request.start);
569        let end_nanos = datetime_to_unix_nanos(request.end);
570        let params = request.params.clone();
571        let clock = self.clock;
572        let width = match map_bar_spec_to_candle_width(&bar_type.spec()) {
573            Ok(w) => w,
574            Err(e) => {
575                log::error!("Failed to map bar type {bar_type}: {e}");
576                return Err(e);
577            }
578        };
579
580        get_runtime().spawn(async move {
581            let start_ns = start_nanos.map_or(0, |n| n.as_i64());
582            let end_ns = end_nanos.map_or(clock.get_time_ns().as_i64(), |n| n.as_i64());
583
584            match http.request_bars(&symbol, start_ns, end_ns, width).await {
585                Ok(bars) => {
586                    log::debug!("Fetched {} bars for {symbol}", bars.len());
587
588                    let response = DataResponse::Bars(BarsResponse::new(
589                        request_id,
590                        client_id,
591                        bar_type,
592                        bars,
593                        start_nanos,
594                        end_nanos,
595                        clock.get_time_ns(),
596                        params,
597                    ));
598
599                    if let Err(e) = sender.send(DataEvent::Response(response)) {
600                        log::error!("Failed to send bars response: {e}");
601                    }
602                }
603                Err(e) => {
604                    log::error!("Failed to request bars for {symbol}: {e}");
605                }
606            }
607        });
608
609        Ok(())
610    }
611}