Skip to main content

nautilus_tardis/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Tardis data client for streaming replay data into the live engine.
17
18use std::{
19    collections::HashMap,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26
27use ahash::AHashSet;
28use futures_util::{SinkExt, StreamExt};
29use nautilus_common::{
30    clients::DataClient,
31    live::{runner::get_data_event_sender, runtime::get_runtime},
32    messages::DataEvent,
33};
34use nautilus_core::{
35    parsing::precision_from_str,
36    time::{AtomicTime, get_atomic_clock_realtime},
37};
38use nautilus_model::identifiers::{ClientId, Venue};
39use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
40use tokio_tungstenite::{connect_async, tungstenite};
41use tokio_util::sync::CancellationToken;
42use ustr::Ustr;
43
44use crate::{
45    common::consts::TARDIS_MACHINE_WS_URL,
46    config::TardisDataClientConfig,
47    http::{TardisHttpClient, parse::parse_instrument_any},
48    machine::{
49        client::determine_instrument_info,
50        message::WsMessage,
51        parse::{parse_tardis_ws_message, parse_tardis_ws_message_funding_rate},
52        types::{TardisInstrumentKey, TardisInstrumentMiniInfo},
53    },
54    parse::{normalize_instrument_id, parse_instrument_id},
55};
56
57/// Tardis data client for streaming replay or live data into the platform.
58#[derive(Debug)]
59pub struct TardisDataClient {
60    client_id: ClientId,
61    config: TardisDataClientConfig,
62    is_connected: Arc<AtomicBool>,
63    cancellation_token: CancellationToken,
64    tasks: Vec<JoinHandle<()>>,
65    data_sender: UnboundedSender<DataEvent>,
66    #[allow(dead_code)]
67    clock: &'static AtomicTime,
68}
69
70impl TardisDataClient {
71    /// Creates a new [`TardisDataClient`] instance.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the data event sender is not initialized.
76    pub fn new(client_id: ClientId, config: TardisDataClientConfig) -> anyhow::Result<Self> {
77        let clock = get_atomic_clock_realtime();
78        let data_sender = get_data_event_sender();
79
80        Ok(Self {
81            client_id,
82            config,
83            is_connected: Arc::new(AtomicBool::new(false)),
84            cancellation_token: CancellationToken::new(),
85            tasks: Vec::new(),
86            data_sender,
87            clock,
88        })
89    }
90}
91
92#[async_trait::async_trait(?Send)]
93impl DataClient for TardisDataClient {
94    fn client_id(&self) -> ClientId {
95        self.client_id
96    }
97
98    fn venue(&self) -> Option<Venue> {
99        None // Tardis is multi-venue
100    }
101
102    fn start(&mut self) -> anyhow::Result<()> {
103        log::info!("Starting {}", self.client_id);
104        Ok(())
105    }
106
107    fn stop(&mut self) -> anyhow::Result<()> {
108        log::info!("Stopping {}", self.client_id);
109        self.cancellation_token.cancel();
110        self.tasks.clear();
111        self.is_connected.store(false, Ordering::Release);
112        Ok(())
113    }
114
115    fn reset(&mut self) -> anyhow::Result<()> {
116        self.cancellation_token.cancel();
117        for handle in self.tasks.drain(..) {
118            handle.abort();
119        }
120        self.cancellation_token = CancellationToken::new();
121        self.is_connected.store(false, Ordering::Release);
122        Ok(())
123    }
124
125    fn dispose(&mut self) -> anyhow::Result<()> {
126        self.stop()
127    }
128
129    fn is_connected(&self) -> bool {
130        self.is_connected.load(Ordering::Acquire)
131    }
132
133    fn is_disconnected(&self) -> bool {
134        !self.is_connected()
135    }
136
137    async fn connect(&mut self) -> anyhow::Result<()> {
138        if self.is_connected() {
139            return Ok(());
140        }
141
142        if self.config.options.is_empty() {
143            anyhow::bail!("Replay options cannot be empty");
144        }
145
146        let normalize_symbols = self.config.normalize_symbols;
147        let book_snapshot_output = self.config.book_snapshot_output.clone();
148
149        let http_client = TardisHttpClient::new(
150            self.config.api_key.as_deref(),
151            None, // base_url
152            None, // timeout_secs
153            normalize_symbols,
154        )?;
155
156        let base_url = self
157            .config
158            .tardis_ws_url
159            .clone()
160            .or_else(|| std::env::var(TARDIS_MACHINE_WS_URL).ok())
161            .ok_or_else(|| {
162                anyhow::anyhow!(
163                    "Tardis Machine `tardis_ws_url` must be provided or \
164                     set in the '{TARDIS_MACHINE_WS_URL}' environment variable"
165                )
166            })?;
167
168        let exchanges: AHashSet<_> = self.config.options.iter().map(|opt| opt.exchange).collect();
169        let mut instrument_map: HashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>> =
170            HashMap::new();
171
172        for exchange in &exchanges {
173            log::info!("Fetching instruments for {exchange}");
174
175            let instruments_info = http_client
176                .instruments_info(*exchange, None, None)
177                .await
178                .map_err(|e| {
179                    anyhow::anyhow!("Failed to fetch instrument info for {exchange}: {e}")
180                })?;
181
182            log::info!(
183                "Received {} instruments for {exchange}",
184                instruments_info.len()
185            );
186
187            for inst in &instruments_info {
188                let instrument_type = inst.instrument_type;
189                let price_precision = precision_from_str(&inst.price_increment.to_string());
190                let size_precision = precision_from_str(&inst.amount_increment.to_string());
191
192                let instrument_id = if normalize_symbols {
193                    normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
194                } else {
195                    parse_instrument_id(exchange, inst.id)
196                };
197
198                let info = TardisInstrumentMiniInfo::new(
199                    instrument_id,
200                    Some(Ustr::from(&inst.id)),
201                    *exchange,
202                    price_precision,
203                    size_precision,
204                );
205                let key = info.as_tardis_instrument_key();
206                instrument_map.insert(key, Arc::new(info));
207            }
208
209            // Parse and emit Nautilus instrument definitions
210            for inst in instruments_info {
211                for instrument in parse_instrument_any(inst, None, None, normalize_symbols) {
212                    if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
213                        log::error!("Failed to send instrument event: {e}");
214                    }
215                }
216            }
217        }
218
219        let options_json = serde_json::to_string(&self.config.options)?;
220        let url = format!(
221            "{base_url}/ws-replay-normalized?options={}",
222            urlencoding::encode(&options_json)
223        );
224
225        log::info!("Connecting to Tardis Machine replay");
226        log::debug!("URL: {base_url}/ws-replay-normalized?options={options_json}");
227
228        let (ws_stream, _) = connect_async(&url)
229            .await
230            .map_err(|e| anyhow::anyhow!("Failed to connect to Tardis Machine: {e}"))?;
231
232        log::info!("Connected to Tardis Machine");
233
234        // Ensure a fresh token so reconnect after stop() works
235        self.cancellation_token = CancellationToken::new();
236
237        let sender = self.data_sender.clone();
238        let cancel = self.cancellation_token.clone();
239        let connected = self.is_connected.clone();
240
241        let handle = get_runtime().spawn(async move {
242            let (mut writer, mut reader) = ws_stream.split();
243
244            // Child token inherits cancellation from parent `cancel`, so
245            // reset()/stop() cancelling the main token also stops the heartbeat
246            let heartbeat_token = cancel.child_token();
247            let heartbeat_signal = heartbeat_token.clone();
248            get_runtime().spawn(async move {
249                let mut interval = tokio::time::interval(Duration::from_secs(10));
250                loop {
251                    tokio::select! {
252                        _ = interval.tick() => {
253                            log::trace!("Sending PING");
254                            if let Err(e) = writer.send(tungstenite::Message::Ping(vec![].into())).await {
255                                log::debug!("Heartbeat send failed: {e}");
256                                break;
257                            }
258                        }
259                        () = heartbeat_signal.cancelled() => break,
260                    }
261                }
262            });
263
264            loop {
265                let msg = tokio::select! {
266                    msg = reader.next() => msg,
267                    () = cancel.cancelled() => {
268                        log::debug!("Replay stream task cancelled");
269                        break;
270                    }
271                };
272
273                match msg {
274                    Some(Ok(tungstenite::Message::Text(text))) => {
275                        match serde_json::from_str::<WsMessage>(&text) {
276                            Ok(ws_msg) => {
277                                if matches!(ws_msg, WsMessage::Disconnect(_)) {
278                                    log::debug!("Received disconnect message");
279                                    continue;
280                                }
281
282                                let info = determine_instrument_info(&ws_msg, &instrument_map);
283                                if let Some(info) = info {
284                                    let event = if matches!(ws_msg, WsMessage::DerivativeTicker(_))
285                                    {
286                                        parse_tardis_ws_message_funding_rate(ws_msg, info)
287                                            .map(DataEvent::FundingRate)
288                                    } else {
289                                        parse_tardis_ws_message(ws_msg, info, &book_snapshot_output)
290                                            .map(DataEvent::Data)
291                                    };
292
293                                    if let Some(event) = event
294                                        && let Err(e) = sender.send(event)
295                                    {
296                                        log::error!("Failed to send data event: {e}");
297                                        break;
298                                    }
299                                }
300                            }
301                            Err(e) => {
302                                log::error!("Failed to deserialize message: {e}");
303                            }
304                        }
305                    }
306                    Some(Ok(tungstenite::Message::Close(frame))) => {
307                        if let Some(frame) = frame {
308                            log::info!("WebSocket closed: {} {}", frame.code, frame.reason);
309                        } else {
310                            log::info!("WebSocket closed");
311                        }
312                        break;
313                    }
314                    Some(Ok(_)) => continue, // Skip ping/pong/binary/frame
315                    Some(Err(e)) => {
316                        log::error!("WebSocket error: {e}");
317                        break;
318                    }
319                    None => {
320                        log::info!("Replay stream ended");
321                        break;
322                    }
323                }
324            }
325
326            heartbeat_token.cancel();
327            connected.store(false, Ordering::Release);
328        });
329
330        self.tasks.push(handle);
331        self.is_connected.store(true, Ordering::Release);
332        log::info!("Connected: {}", self.client_id);
333
334        Ok(())
335    }
336
337    async fn disconnect(&mut self) -> anyhow::Result<()> {
338        if self.is_disconnected() {
339            return Ok(());
340        }
341
342        self.cancellation_token.cancel();
343        self.cancellation_token = CancellationToken::new();
344
345        let handles: Vec<_> = self.tasks.drain(..).collect();
346        for handle in handles {
347            if let Err(e) = handle.await {
348                log::error!("Error joining replay task: {e}");
349            }
350        }
351
352        self.is_connected.store(false, Ordering::Release);
353        log::info!("Disconnected: {}", self.client_id);
354
355        Ok(())
356    }
357}