1use std::{
19 collections::HashMap,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26
27use ahash::AHashSet;
28use futures_util::{SinkExt, StreamExt};
29use nautilus_common::{
30 clients::DataClient,
31 live::{runner::get_data_event_sender, runtime::get_runtime},
32 messages::DataEvent,
33};
34use nautilus_core::{
35 parsing::precision_from_str,
36 time::{AtomicTime, get_atomic_clock_realtime},
37};
38use nautilus_model::identifiers::{ClientId, Venue};
39use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
40use tokio_tungstenite::{connect_async, tungstenite};
41use tokio_util::sync::CancellationToken;
42use ustr::Ustr;
43
44use crate::{
45 common::consts::TARDIS_MACHINE_WS_URL,
46 config::TardisDataClientConfig,
47 http::{TardisHttpClient, parse::parse_instrument_any},
48 machine::{
49 client::determine_instrument_info,
50 message::WsMessage,
51 parse::{parse_tardis_ws_message, parse_tardis_ws_message_funding_rate},
52 types::{TardisInstrumentKey, TardisInstrumentMiniInfo},
53 },
54 parse::{normalize_instrument_id, parse_instrument_id},
55};
56
57#[derive(Debug)]
59pub struct TardisDataClient {
60 client_id: ClientId,
61 config: TardisDataClientConfig,
62 is_connected: Arc<AtomicBool>,
63 cancellation_token: CancellationToken,
64 tasks: Vec<JoinHandle<()>>,
65 data_sender: UnboundedSender<DataEvent>,
66 #[allow(dead_code)]
67 clock: &'static AtomicTime,
68}
69
70impl TardisDataClient {
71 pub fn new(client_id: ClientId, config: TardisDataClientConfig) -> anyhow::Result<Self> {
77 let clock = get_atomic_clock_realtime();
78 let data_sender = get_data_event_sender();
79
80 Ok(Self {
81 client_id,
82 config,
83 is_connected: Arc::new(AtomicBool::new(false)),
84 cancellation_token: CancellationToken::new(),
85 tasks: Vec::new(),
86 data_sender,
87 clock,
88 })
89 }
90}
91
92#[async_trait::async_trait(?Send)]
93impl DataClient for TardisDataClient {
94 fn client_id(&self) -> ClientId {
95 self.client_id
96 }
97
98 fn venue(&self) -> Option<Venue> {
99 None }
101
102 fn start(&mut self) -> anyhow::Result<()> {
103 log::info!("Starting {}", self.client_id);
104 Ok(())
105 }
106
107 fn stop(&mut self) -> anyhow::Result<()> {
108 log::info!("Stopping {}", self.client_id);
109 self.cancellation_token.cancel();
110 self.tasks.clear();
111 self.is_connected.store(false, Ordering::Release);
112 Ok(())
113 }
114
115 fn reset(&mut self) -> anyhow::Result<()> {
116 self.cancellation_token.cancel();
117 for handle in self.tasks.drain(..) {
118 handle.abort();
119 }
120 self.cancellation_token = CancellationToken::new();
121 self.is_connected.store(false, Ordering::Release);
122 Ok(())
123 }
124
125 fn dispose(&mut self) -> anyhow::Result<()> {
126 self.stop()
127 }
128
129 fn is_connected(&self) -> bool {
130 self.is_connected.load(Ordering::Acquire)
131 }
132
133 fn is_disconnected(&self) -> bool {
134 !self.is_connected()
135 }
136
137 async fn connect(&mut self) -> anyhow::Result<()> {
138 if self.is_connected() {
139 return Ok(());
140 }
141
142 if self.config.options.is_empty() {
143 anyhow::bail!("Replay options cannot be empty");
144 }
145
146 let normalize_symbols = self.config.normalize_symbols;
147 let book_snapshot_output = self.config.book_snapshot_output.clone();
148
149 let http_client = TardisHttpClient::new(
150 self.config.api_key.as_deref(),
151 None, None, normalize_symbols,
154 )?;
155
156 let base_url = self
157 .config
158 .tardis_ws_url
159 .clone()
160 .or_else(|| std::env::var(TARDIS_MACHINE_WS_URL).ok())
161 .ok_or_else(|| {
162 anyhow::anyhow!(
163 "Tardis Machine `tardis_ws_url` must be provided or \
164 set in the '{TARDIS_MACHINE_WS_URL}' environment variable"
165 )
166 })?;
167
168 let exchanges: AHashSet<_> = self.config.options.iter().map(|opt| opt.exchange).collect();
169 let mut instrument_map: HashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>> =
170 HashMap::new();
171
172 for exchange in &exchanges {
173 log::info!("Fetching instruments for {exchange}");
174
175 let instruments_info = http_client
176 .instruments_info(*exchange, None, None)
177 .await
178 .map_err(|e| {
179 anyhow::anyhow!("Failed to fetch instrument info for {exchange}: {e}")
180 })?;
181
182 log::info!(
183 "Received {} instruments for {exchange}",
184 instruments_info.len()
185 );
186
187 for inst in &instruments_info {
188 let instrument_type = inst.instrument_type;
189 let price_precision = precision_from_str(&inst.price_increment.to_string());
190 let size_precision = precision_from_str(&inst.amount_increment.to_string());
191
192 let instrument_id = if normalize_symbols {
193 normalize_instrument_id(exchange, inst.id, &instrument_type, inst.inverse)
194 } else {
195 parse_instrument_id(exchange, inst.id)
196 };
197
198 let info = TardisInstrumentMiniInfo::new(
199 instrument_id,
200 Some(Ustr::from(&inst.id)),
201 *exchange,
202 price_precision,
203 size_precision,
204 );
205 let key = info.as_tardis_instrument_key();
206 instrument_map.insert(key, Arc::new(info));
207 }
208
209 for inst in instruments_info {
211 for instrument in parse_instrument_any(inst, None, None, normalize_symbols) {
212 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
213 log::error!("Failed to send instrument event: {e}");
214 }
215 }
216 }
217 }
218
219 let options_json = serde_json::to_string(&self.config.options)?;
220 let url = format!(
221 "{base_url}/ws-replay-normalized?options={}",
222 urlencoding::encode(&options_json)
223 );
224
225 log::info!("Connecting to Tardis Machine replay");
226 log::debug!("URL: {base_url}/ws-replay-normalized?options={options_json}");
227
228 let (ws_stream, _) = connect_async(&url)
229 .await
230 .map_err(|e| anyhow::anyhow!("Failed to connect to Tardis Machine: {e}"))?;
231
232 log::info!("Connected to Tardis Machine");
233
234 self.cancellation_token = CancellationToken::new();
236
237 let sender = self.data_sender.clone();
238 let cancel = self.cancellation_token.clone();
239 let connected = self.is_connected.clone();
240
241 let handle = get_runtime().spawn(async move {
242 let (mut writer, mut reader) = ws_stream.split();
243
244 let heartbeat_token = cancel.child_token();
247 let heartbeat_signal = heartbeat_token.clone();
248 get_runtime().spawn(async move {
249 let mut interval = tokio::time::interval(Duration::from_secs(10));
250 loop {
251 tokio::select! {
252 _ = interval.tick() => {
253 log::trace!("Sending PING");
254 if let Err(e) = writer.send(tungstenite::Message::Ping(vec![].into())).await {
255 log::debug!("Heartbeat send failed: {e}");
256 break;
257 }
258 }
259 () = heartbeat_signal.cancelled() => break,
260 }
261 }
262 });
263
264 loop {
265 let msg = tokio::select! {
266 msg = reader.next() => msg,
267 () = cancel.cancelled() => {
268 log::debug!("Replay stream task cancelled");
269 break;
270 }
271 };
272
273 match msg {
274 Some(Ok(tungstenite::Message::Text(text))) => {
275 match serde_json::from_str::<WsMessage>(&text) {
276 Ok(ws_msg) => {
277 if matches!(ws_msg, WsMessage::Disconnect(_)) {
278 log::debug!("Received disconnect message");
279 continue;
280 }
281
282 let info = determine_instrument_info(&ws_msg, &instrument_map);
283 if let Some(info) = info {
284 let event = if matches!(ws_msg, WsMessage::DerivativeTicker(_))
285 {
286 parse_tardis_ws_message_funding_rate(ws_msg, info)
287 .map(DataEvent::FundingRate)
288 } else {
289 parse_tardis_ws_message(ws_msg, info, &book_snapshot_output)
290 .map(DataEvent::Data)
291 };
292
293 if let Some(event) = event
294 && let Err(e) = sender.send(event)
295 {
296 log::error!("Failed to send data event: {e}");
297 break;
298 }
299 }
300 }
301 Err(e) => {
302 log::error!("Failed to deserialize message: {e}");
303 }
304 }
305 }
306 Some(Ok(tungstenite::Message::Close(frame))) => {
307 if let Some(frame) = frame {
308 log::info!("WebSocket closed: {} {}", frame.code, frame.reason);
309 } else {
310 log::info!("WebSocket closed");
311 }
312 break;
313 }
314 Some(Ok(_)) => continue, Some(Err(e)) => {
316 log::error!("WebSocket error: {e}");
317 break;
318 }
319 None => {
320 log::info!("Replay stream ended");
321 break;
322 }
323 }
324 }
325
326 heartbeat_token.cancel();
327 connected.store(false, Ordering::Release);
328 });
329
330 self.tasks.push(handle);
331 self.is_connected.store(true, Ordering::Release);
332 log::info!("Connected: {}", self.client_id);
333
334 Ok(())
335 }
336
337 async fn disconnect(&mut self) -> anyhow::Result<()> {
338 if self.is_disconnected() {
339 return Ok(());
340 }
341
342 self.cancellation_token.cancel();
343 self.cancellation_token = CancellationToken::new();
344
345 let handles: Vec<_> = self.tasks.drain(..).collect();
346 for handle in handles {
347 if let Err(e) = handle.await {
348 log::error!("Error joining replay task: {e}");
349 }
350 }
351
352 self.is_connected.store(false, Ordering::Release);
353 log::info!("Disconnected: {}", self.client_id);
354
355 Ok(())
356 }
357}