nautilus_tardis/machine/
client.rs1use std::{
17 collections::HashMap,
18 env,
19 sync::{
20 Arc,
21 atomic::{AtomicBool, Ordering},
22 },
23};
24
25use futures_util::{Stream, StreamExt, pin_mut};
26use nautilus_model::data::Data;
27use ustr::Ustr;
28
29use super::{
30 Error,
31 message::WsMessage,
32 replay_normalized, stream_normalized,
33 types::{
34 ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions, TardisInstrumentKey,
35 TardisInstrumentMiniInfo,
36 },
37};
38use crate::machine::parse::parse_tardis_ws_message;
39
40#[cfg_attr(
42 feature = "python",
43 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
44)]
45#[derive(Debug, Clone)]
46pub struct TardisMachineClient {
47 pub base_url: String,
48 pub replay_signal: Arc<AtomicBool>,
49 pub stream_signal: Arc<AtomicBool>,
50 pub instruments: HashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
51 pub normalize_symbols: bool,
52}
53
54impl TardisMachineClient {
55 pub fn new(base_url: Option<&str>, normalize_symbols: bool) -> anyhow::Result<Self> {
61 let base_url = base_url
62 .map(ToString::to_string)
63 .or_else(|| env::var("TARDIS_MACHINE_WS_URL").ok())
64 .ok_or_else(|| {
65 anyhow::anyhow!(
66 "Tardis Machine `base_url` must be provided or set in the 'TARDIS_MACHINE_WS_URL' environment variable"
67 )
68 })?;
69
70 Ok(Self {
71 base_url,
72 replay_signal: Arc::new(AtomicBool::new(false)),
73 stream_signal: Arc::new(AtomicBool::new(false)),
74 instruments: HashMap::new(),
75 normalize_symbols,
76 })
77 }
78
79 pub fn add_instrument_info(&mut self, info: TardisInstrumentMiniInfo) {
80 let key = info.as_tardis_instrument_key();
81 self.instruments.insert(key, Arc::new(info));
82 }
83
84 #[must_use]
85 pub fn is_closed(&self) -> bool {
86 self.replay_signal.load(Ordering::Relaxed) || self.stream_signal.load(Ordering::Relaxed)
87 }
88
89 pub fn close(&mut self) {
90 tracing::debug!("Closing");
91
92 self.replay_signal.store(true, Ordering::Relaxed);
93 self.stream_signal.store(true, Ordering::Relaxed);
94
95 tracing::debug!("Closed");
96 }
97
98 pub async fn replay(
104 &self,
105 options: Vec<ReplayNormalizedRequestOptions>,
106 ) -> Result<impl Stream<Item = Result<Data, Error>>, Error> {
107 let stream = replay_normalized(&self.base_url, options, self.replay_signal.clone()).await?;
108
109 Ok(handle_ws_stream(
112 Box::pin(stream),
113 None,
114 Some(self.instruments.clone()),
115 ))
116 }
117
118 pub async fn stream(
124 &self,
125 instrument: TardisInstrumentMiniInfo,
126 options: Vec<StreamNormalizedRequestOptions>,
127 ) -> Result<impl Stream<Item = Result<Data, Error>>, Error> {
128 let stream = stream_normalized(&self.base_url, options, self.stream_signal.clone()).await?;
129
130 Ok(handle_ws_stream(
133 Box::pin(stream),
134 Some(Arc::new(instrument)),
135 None,
136 ))
137 }
138}
139
140fn handle_ws_stream<S>(
141 stream: S,
142 instrument: Option<Arc<TardisInstrumentMiniInfo>>,
143 instrument_map: Option<HashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>>,
144) -> impl Stream<Item = Result<Data, Error>>
145where
146 S: Stream<Item = Result<WsMessage, Error>> + Unpin,
147{
148 assert!(
149 instrument.is_some() || instrument_map.is_some(),
150 "Either `instrument` or `instrument_map` must be provided"
151 );
152
153 async_stream::stream! {
154 pin_mut!(stream);
155
156 while let Some(result) = stream.next().await {
157 match result {
158 Ok(msg) => {
159 if matches!(msg, WsMessage::Disconnect(_)) {
160 tracing::debug!("Received disconnect message: {msg:?}");
161 continue;
162 }
163
164 let info = instrument.clone().or_else(|| {
165 instrument_map
166 .as_ref()
167 .and_then(|map| determine_instrument_info(&msg, map))
168 });
169
170 if let Some(info) = info {
171 if let Some(data) = parse_tardis_ws_message(msg, info) {
172 yield Ok(data);
173 }
174 } else {
175 tracing::error!("Missing instrument info for message: {msg:?}");
176 yield Err(Error::ConnectionClosed {
177 reason: "Missing instrument definition info".to_string()
178 });
179 break;
180 }
181 }
182 Err(e) => {
183 tracing::error!("Error in WebSocket stream: {e:?}");
184 yield Err(e);
185 break;
186 }
187 }
188 }
189 }
190}
191
192pub fn determine_instrument_info(
193 msg: &WsMessage,
194 instrument_map: &HashMap<TardisInstrumentKey, Arc<TardisInstrumentMiniInfo>>,
195) -> Option<Arc<TardisInstrumentMiniInfo>> {
196 let key = match msg {
197 WsMessage::BookChange(msg) => {
198 TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange)
199 }
200 WsMessage::BookSnapshot(msg) => {
201 TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange)
202 }
203 WsMessage::Trade(msg) => TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange),
204 WsMessage::TradeBar(msg) => TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange),
205 WsMessage::DerivativeTicker(msg) => {
206 TardisInstrumentKey::new(Ustr::from(&msg.symbol), msg.exchange)
207 }
208 WsMessage::Disconnect(_) => return None,
209 };
210 if let Some(inst) = instrument_map.get(&key) {
211 Some(inst.clone())
212 } else {
213 tracing::error!("Instrument definition info not available for {key:?}");
214 None
215 }
216}