nautilus_architect_ax/data/
mod.rs1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use dashmap::DashMap;
26use futures_util::StreamExt;
27use nautilus_common::{
28 clients::DataClient,
29 live::{runner::get_data_event_sender, runtime::get_runtime},
30 messages::{
31 DataEvent, DataResponse,
32 data::{
33 BarsResponse, InstrumentResponse, InstrumentsResponse, RequestBars, RequestInstrument,
34 RequestInstruments, SubscribeBars, SubscribeBookDeltas, SubscribeQuotes,
35 SubscribeTrades, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeQuotes,
36 UnsubscribeTrades,
37 },
38 },
39};
40use nautilus_core::{
41 datetime::datetime_to_unix_nanos,
42 time::{AtomicTime, get_atomic_clock_realtime},
43};
44use nautilus_model::{
45 data::{BarType, Data, OrderBookDeltas_API},
46 identifiers::{ClientId, InstrumentId, Venue},
47 instruments::InstrumentAny,
48};
49use tokio::task::JoinHandle;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54 common::{
55 consts::AX_VENUE,
56 enums::{AxCandleWidth, AxMarketDataLevel},
57 parse::map_bar_spec_to_candle_width,
58 },
59 config::AxDataClientConfig,
60 http::client::AxHttpClient,
61 websocket::{data::client::AxMdWebSocketClient, messages::NautilusWsMessage},
62};
63
64#[derive(Debug)]
72pub struct AxDataClient {
73 client_id: ClientId,
75 config: AxDataClientConfig,
77 http_client: AxHttpClient,
79 ws_client: AxMdWebSocketClient,
81 is_connected: AtomicBool,
83 cancellation_token: CancellationToken,
85 tasks: Vec<JoinHandle<()>>,
87 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
89 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
91 clock: &'static AtomicTime,
93 active_quote_subs: Arc<DashMap<InstrumentId, ()>>,
95 active_trade_subs: Arc<DashMap<InstrumentId, ()>>,
97 active_book_subs: Arc<DashMap<InstrumentId, AxMarketDataLevel>>,
99 active_bar_subs: Arc<DashMap<InstrumentId, (BarType, AxCandleWidth)>>,
101}
102
103impl AxDataClient {
104 pub fn new(
110 client_id: ClientId,
111 config: AxDataClientConfig,
112 http_client: AxHttpClient,
113 ws_client: AxMdWebSocketClient,
114 ) -> anyhow::Result<Self> {
115 let clock = get_atomic_clock_realtime();
116 let data_sender = get_data_event_sender();
117
118 let instruments = http_client.instruments_cache.clone();
120
121 Ok(Self {
122 client_id,
123 config,
124 http_client,
125 ws_client,
126 is_connected: AtomicBool::new(false),
127 cancellation_token: CancellationToken::new(),
128 tasks: Vec::new(),
129 data_sender,
130 instruments,
131 clock,
132 active_quote_subs: Arc::new(DashMap::new()),
133 active_trade_subs: Arc::new(DashMap::new()),
134 active_book_subs: Arc::new(DashMap::new()),
135 active_bar_subs: Arc::new(DashMap::new()),
136 })
137 }
138
139 #[must_use]
141 pub fn venue(&self) -> Venue {
142 *AX_VENUE
143 }
144
145 #[must_use]
147 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
148 &self.instruments
149 }
150
151 fn spawn_message_handler(&mut self) {
153 let mut ws_client = self.ws_client.clone();
154 let data_sender = self.data_sender.clone();
155 let cancellation_token = self.cancellation_token.clone();
156
157 let handle = get_runtime().spawn(async move {
158 let stream = ws_client.stream();
159 tokio::pin!(stream);
160
161 loop {
162 tokio::select! {
163 () = cancellation_token.cancelled() => {
164 log::debug!("Message handler cancelled");
165 break;
166 }
167 msg = stream.next() => {
168 match msg {
169 Some(ws_msg) => {
170 Self::handle_ws_message(ws_msg, &data_sender);
171 }
172 None => {
173 log::debug!("WebSocket stream ended");
174 break;
175 }
176 }
177 }
178 }
179 }
180 });
181
182 self.tasks.push(handle);
183 }
184
185 fn handle_ws_message(
187 msg: NautilusWsMessage,
188 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
189 ) {
190 match msg {
191 NautilusWsMessage::Data(data_vec) => {
192 for data in data_vec {
193 if let Err(e) = sender.send(DataEvent::Data(data)) {
194 log::error!("Failed to send data event: {e}");
195 }
196 }
197 }
198 NautilusWsMessage::Deltas(deltas) => {
199 let api_deltas = OrderBookDeltas_API::new(deltas);
200 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
201 log::error!("Failed to send deltas event: {e}");
202 }
203 }
204 NautilusWsMessage::Bar(bar) => {
205 if let Err(e) = sender.send(DataEvent::Data(Data::Bar(bar))) {
206 log::error!("Failed to send bar event: {e}");
207 }
208 }
209 NautilusWsMessage::Heartbeat => {
210 log::trace!("Received heartbeat");
211 }
212 NautilusWsMessage::Reconnected => {
213 log::info!("WebSocket reconnected");
214 }
215 NautilusWsMessage::Error(err) => {
216 log::error!("WebSocket error: {err:?}");
217 }
218 }
219 }
220}
221
222#[async_trait(?Send)]
223impl DataClient for AxDataClient {
224 fn client_id(&self) -> ClientId {
225 self.client_id
226 }
227
228 fn venue(&self) -> Option<Venue> {
229 Some(*AX_VENUE)
230 }
231
232 fn start(&mut self) -> anyhow::Result<()> {
233 log::debug!("Starting {}", self.client_id);
234 Ok(())
235 }
236
237 fn stop(&mut self) -> anyhow::Result<()> {
238 log::debug!("Stopping {}", self.client_id);
239 self.cancellation_token.cancel();
240 Ok(())
241 }
242
243 fn reset(&mut self) -> anyhow::Result<()> {
244 log::debug!("Resetting {}", self.client_id);
245 self.cancellation_token.cancel();
246 self.tasks.clear();
247 self.cancellation_token = CancellationToken::new();
248 self.active_quote_subs.clear();
249 self.active_trade_subs.clear();
250 self.active_book_subs.clear();
251 self.active_bar_subs.clear();
252 Ok(())
253 }
254
255 fn dispose(&mut self) -> anyhow::Result<()> {
256 log::debug!("Disposing {}", self.client_id);
257 self.cancellation_token.cancel();
258 Ok(())
259 }
260
261 fn is_connected(&self) -> bool {
262 self.is_connected.load(Ordering::Acquire)
263 }
264
265 fn is_disconnected(&self) -> bool {
266 !self.is_connected()
267 }
268
269 async fn connect(&mut self) -> anyhow::Result<()> {
270 log::info!("Connecting {}", self.client_id);
271
272 if self.config.has_api_credentials() {
273 let api_key = self.config.api_key.as_ref().unwrap();
274 let api_secret = self.config.api_secret.as_ref().unwrap();
275 let token = self
276 .http_client
277 .authenticate(api_key, api_secret, 86400) .await
279 .context("Failed to authenticate with Ax")?;
280 log::debug!("Authenticated with Ax");
281 self.ws_client.set_auth_token(token);
282 }
283
284 let instruments = self
285 .http_client
286 .request_instruments(None, None)
287 .await
288 .context("Failed to fetch instruments")?;
289 self.http_client.cache_instruments(instruments);
290 log::info!(
291 "Cached {} instruments",
292 self.http_client.get_cached_symbols().len()
293 );
294
295 self.ws_client
296 .connect()
297 .await
298 .context("Failed to connect WebSocket")?;
299 log::info!("WebSocket connected");
300 self.spawn_message_handler();
301
302 self.is_connected.store(true, Ordering::Release);
303 log::info!("Connected {}", self.client_id);
304
305 Ok(())
306 }
307
308 async fn disconnect(&mut self) -> anyhow::Result<()> {
309 log::info!("Disconnecting {}", self.client_id);
310 self.cancellation_token.cancel();
311 self.ws_client.close().await;
312
313 for task in self.tasks.drain(..) {
314 task.abort();
315 }
316
317 self.is_connected.store(false, Ordering::Release);
318 log::info!("Disconnected {}", self.client_id);
319
320 Ok(())
321 }
322
323 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
324 let instrument_id = cmd.instrument_id;
325 let symbol = instrument_id.symbol.as_str();
326
327 if self.active_quote_subs.contains_key(&instrument_id) {
328 log::debug!("Already subscribed to quotes for {symbol}");
329 return Ok(());
330 }
331
332 log::debug!("Subscribing to quotes for {symbol}");
333
334 get_runtime().block_on(async {
335 self.ws_client
336 .subscribe(symbol, AxMarketDataLevel::Level1)
337 .await
338 })?;
339
340 self.active_quote_subs.insert(instrument_id, ());
341 Ok(())
342 }
343
344 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
345 let instrument_id = cmd.instrument_id;
346 let symbol = instrument_id.symbol.as_str();
347
348 if !self.active_quote_subs.contains_key(&instrument_id) {
349 log::debug!("Not subscribed to quotes for {symbol}");
350 return Ok(());
351 }
352
353 log::debug!("Unsubscribing from quotes for {symbol}");
354
355 get_runtime().block_on(async { self.ws_client.unsubscribe(symbol).await })?;
356
357 self.active_quote_subs.remove(&instrument_id);
358 Ok(())
359 }
360
361 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
362 let instrument_id = cmd.instrument_id;
363 let symbol = instrument_id.symbol.as_str();
364
365 if self.active_trade_subs.contains_key(&instrument_id) {
366 log::debug!("Already subscribed to trades for {symbol}");
367 return Ok(());
368 }
369
370 log::debug!("Subscribing to trades for {symbol}");
371
372 get_runtime().block_on(async {
374 self.ws_client
375 .subscribe(symbol, AxMarketDataLevel::Level1)
376 .await
377 })?;
378
379 self.active_trade_subs.insert(instrument_id, ());
380 Ok(())
381 }
382
383 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
384 let instrument_id = cmd.instrument_id;
385 let symbol = instrument_id.symbol.as_str();
386
387 if !self.active_trade_subs.contains_key(&instrument_id) {
388 log::debug!("Not subscribed to trades for {symbol}");
389 return Ok(());
390 }
391
392 log::debug!("Unsubscribing from trades for {symbol}");
393
394 get_runtime().block_on(async { self.ws_client.unsubscribe(symbol).await })?;
395
396 self.active_trade_subs.remove(&instrument_id);
397 Ok(())
398 }
399
400 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
401 let instrument_id = cmd.instrument_id;
402 let symbol = instrument_id.symbol.as_str();
403
404 if self.active_book_subs.contains_key(&instrument_id) {
405 log::debug!("Already subscribed to book deltas for {symbol}");
406 return Ok(());
407 }
408
409 let level = AxMarketDataLevel::Level2;
411 log::debug!("Subscribing to book deltas for {symbol} at {level:?}");
412
413 get_runtime().block_on(async { self.ws_client.subscribe(symbol, level).await })?;
414
415 self.active_book_subs.insert(instrument_id, level);
416 Ok(())
417 }
418
419 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
420 let instrument_id = cmd.instrument_id;
421 let symbol = instrument_id.symbol.as_str();
422
423 if !self.active_book_subs.contains_key(&instrument_id) {
424 log::debug!("Not subscribed to book deltas for {symbol}");
425 return Ok(());
426 }
427
428 log::debug!("Unsubscribing from book deltas for {symbol}");
429
430 get_runtime().block_on(async { self.ws_client.unsubscribe(symbol).await })?;
431
432 self.active_book_subs.remove(&instrument_id);
433 Ok(())
434 }
435
436 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
437 let bar_type = cmd.bar_type;
438 let instrument_id = bar_type.instrument_id();
439 let symbol = instrument_id.symbol.as_str();
440
441 if self.active_bar_subs.contains_key(&instrument_id) {
442 log::debug!("Already subscribed to bars for {symbol}");
443 return Ok(());
444 }
445
446 let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
447 log::debug!("Subscribing to bars for {bar_type} (width: {width:?})");
448
449 get_runtime().block_on(async { self.ws_client.subscribe_candles(symbol, width).await })?;
450
451 self.active_bar_subs
452 .insert(instrument_id, (bar_type, width));
453 Ok(())
454 }
455
456 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
457 let bar_type = cmd.bar_type;
458 let instrument_id = bar_type.instrument_id();
459 let symbol = instrument_id.symbol.as_str();
460
461 let width = match self.active_bar_subs.get(&instrument_id) {
462 Some(entry) => entry.value().1,
463 None => {
464 log::debug!("Not subscribed to bars for {symbol}");
465 return Ok(());
466 }
467 };
468
469 log::debug!("Unsubscribing from bars for {bar_type}");
470
471 get_runtime()
472 .block_on(async { self.ws_client.unsubscribe_candles(symbol, width).await })?;
473
474 self.active_bar_subs.remove(&instrument_id);
475 Ok(())
476 }
477
478 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
479 let http = self.http_client.clone();
480 let sender = self.data_sender.clone();
481 let request_id = request.request_id;
482 let client_id = request.client_id.unwrap_or(self.client_id);
483 let venue = *AX_VENUE;
484 let start_nanos = datetime_to_unix_nanos(request.start);
485 let end_nanos = datetime_to_unix_nanos(request.end);
486 let params = request.params.clone();
487 let clock = self.clock;
488
489 get_runtime().spawn(async move {
490 match http.request_instruments(None, None).await {
491 Ok(instruments) => {
492 log::info!("Fetched {} instruments from Ax", instruments.len());
493 http.cache_instruments(instruments.clone());
494
495 let response = DataResponse::Instruments(InstrumentsResponse::new(
496 request_id,
497 client_id,
498 venue,
499 instruments,
500 start_nanos,
501 end_nanos,
502 clock.get_time_ns(),
503 params,
504 ));
505
506 if let Err(e) = sender.send(DataEvent::Response(response)) {
507 log::error!("Failed to send instruments response: {e}");
508 }
509 }
510 Err(e) => {
511 log::error!("Failed to request instruments: {e}");
512 }
513 }
514 });
515
516 Ok(())
517 }
518
519 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
520 let http = self.http_client.clone();
521 let sender = self.data_sender.clone();
522 let request_id = request.request_id;
523 let client_id = request.client_id.unwrap_or(self.client_id);
524 let instrument_id = request.instrument_id;
525 let symbol = instrument_id.symbol.to_string();
526 let start_nanos = datetime_to_unix_nanos(request.start);
527 let end_nanos = datetime_to_unix_nanos(request.end);
528 let params = request.params.clone();
529 let clock = self.clock;
530
531 get_runtime().spawn(async move {
532 match http.request_instrument(&symbol, None, None).await {
533 Ok(instrument) => {
534 log::debug!("Fetched instrument {symbol} from Ax");
535 http.cache_instrument(instrument.clone());
536
537 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
538 request_id,
539 client_id,
540 instrument_id,
541 instrument,
542 start_nanos,
543 end_nanos,
544 clock.get_time_ns(),
545 params,
546 )));
547
548 if let Err(e) = sender.send(DataEvent::Response(response)) {
549 log::error!("Failed to send instrument response: {e}");
550 }
551 }
552 Err(e) => {
553 log::error!("Failed to request instrument {symbol}: {e}");
554 }
555 }
556 });
557
558 Ok(())
559 }
560
561 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
562 let http = self.http_client.clone();
563 let sender = self.data_sender.clone();
564 let request_id = request.request_id;
565 let client_id = request.client_id.unwrap_or(self.client_id);
566 let bar_type = request.bar_type;
567 let symbol = bar_type.instrument_id().symbol.to_string();
568 let start_nanos = datetime_to_unix_nanos(request.start);
569 let end_nanos = datetime_to_unix_nanos(request.end);
570 let params = request.params.clone();
571 let clock = self.clock;
572 let width = match map_bar_spec_to_candle_width(&bar_type.spec()) {
573 Ok(w) => w,
574 Err(e) => {
575 log::error!("Failed to map bar type {bar_type}: {e}");
576 return Err(e);
577 }
578 };
579
580 get_runtime().spawn(async move {
581 let start_ns = start_nanos.map_or(0, |n| n.as_i64());
582 let end_ns = end_nanos.map_or(clock.get_time_ns().as_i64(), |n| n.as_i64());
583
584 match http.request_bars(&symbol, start_ns, end_ns, width).await {
585 Ok(bars) => {
586 log::debug!("Fetched {} bars for {symbol}", bars.len());
587
588 let response = DataResponse::Bars(BarsResponse::new(
589 request_id,
590 client_id,
591 bar_type,
592 bars,
593 start_nanos,
594 end_nanos,
595 clock.get_time_ns(),
596 params,
597 ));
598
599 if let Err(e) = sender.send(DataEvent::Response(response)) {
600 log::error!("Failed to send bars response: {e}");
601 }
602 }
603 Err(e) => {
604 log::error!("Failed to request bars for {symbol}: {e}");
605 }
606 }
607 });
608
609 Ok(())
610 }
611}