nautilus_tardis/machine/
mod.rs1pub mod client;
17pub mod message;
18pub mod parse;
19pub mod types;
20
21use std::{
22 sync::{
23 Arc,
24 atomic::{AtomicBool, Ordering},
25 },
26 time::Duration,
27};
28
29use async_stream::stream;
30use futures_util::{SinkExt, Stream, StreamExt, stream::SplitSink};
31use message::WsMessage;
32use nautilus_common::live::get_runtime;
33use tokio::net::TcpStream;
34use tokio_tungstenite::{
35 MaybeTlsStream, WebSocketStream, connect_async,
36 tungstenite::{self, protocol::frame::coding::CloseCode},
37};
38use types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
39
40pub use crate::machine::client::TardisMachineClient;
41
42pub type Result<T> = std::result::Result<T, Error>;
43
44#[derive(Debug, thiserror::Error)]
46pub enum Error {
47 #[error("Options cannot be empty")]
49 EmptyOptions,
50 #[error("Failed to connect: {0}")]
52 ConnectFailed(#[from] tungstenite::Error),
53 #[error("Connection rejected: {reason}")]
55 ConnectRejected {
56 status: tungstenite::http::StatusCode,
58 reason: String,
60 },
61 #[error("Connection closed: {reason}")]
63 ConnectionClosed {
64 reason: String,
66 },
67 #[error("Failed to deserialize message: {0}")]
69 Deserialization(#[from] serde_json::Error),
70}
71
72pub async fn replay_normalized(
79 base_url: &str,
80 options: Vec<ReplayNormalizedRequestOptions>,
81 signal: Arc<AtomicBool>,
82) -> Result<impl Stream<Item = Result<WsMessage>>> {
83 if options.is_empty() {
84 return Err(Error::EmptyOptions);
85 }
86
87 let path = format!("{base_url}/ws-replay-normalized?options=");
88 let options = serde_json::to_string(&options)?;
89
90 let plain_url = format!("{path}{options}");
91 tracing::debug!("Connecting to {plain_url}");
92
93 let url = format!("{path}{}", urlencoding::encode(&options));
94 stream_from_websocket(base_url, url, signal).await
95}
96
97pub async fn stream_normalized(
104 base_url: &str,
105 options: Vec<StreamNormalizedRequestOptions>,
106 signal: Arc<AtomicBool>,
107) -> Result<impl Stream<Item = Result<WsMessage>>> {
108 if options.is_empty() {
109 return Err(Error::EmptyOptions);
110 }
111
112 let path = format!("{base_url}/ws-stream-normalized?options=");
113 let options = serde_json::to_string(&options)?;
114
115 let plain_url = format!("{path}{options}");
116 tracing::debug!("Connecting to {plain_url}");
117
118 let url = format!("{path}{}", urlencoding::encode(&options));
119 stream_from_websocket(base_url, url, signal).await
120}
121
122async fn stream_from_websocket(
123 base_url: &str,
124 url: String,
125 signal: Arc<AtomicBool>,
126) -> Result<impl Stream<Item = Result<WsMessage>>> {
127 let (ws_stream, ws_resp) = connect_async(url).await?;
128
129 handle_connection_response(ws_resp)?;
130 tracing::info!("Connected to {base_url}");
131
132 Ok(stream! {
133 let (writer, mut reader) = ws_stream.split();
134 get_runtime().spawn(heartbeat(writer));
135
136 let timeout = Duration::from_millis(10);
138
139 tracing::info!("Streaming from websocket...");
140
141 loop {
142 if signal.load(Ordering::Relaxed) {
143 tracing::debug!("Shutdown signal received");
144 break;
145 }
146
147 let result = tokio::time::timeout(timeout, reader.next()).await;
148 let msg = match result {
149 Ok(msg) => msg,
150 Err(_) => continue, };
152
153 match msg {
154 Some(Ok(msg)) => match msg {
155 tungstenite::Message::Frame(_)
156 | tungstenite::Message::Binary(_)
157 | tungstenite::Message::Pong(_)
158 | tungstenite::Message::Ping(_) => {
159 tracing::trace!("Received {msg:?}");
160 continue; }
162 tungstenite::Message::Close(Some(frame)) => {
163 let reason = frame.reason.to_string();
164 if frame.code == CloseCode::Normal {
165 tracing::debug!("Connection closed normally: {reason}");
166 } else {
167 tracing::error!(
168 "Connection closed abnormally with code: {:?}, reason: {reason}", frame.code
169 );
170 yield Err(Error::ConnectionClosed { reason });
171 }
172 break;
173 }
174 tungstenite::Message::Close(None) => {
175 tracing::error!("Connection closed without a frame");
176 yield Err(Error::ConnectionClosed {
177 reason: "No close frame provided".to_string()
178 });
179 break;
180 }
181 tungstenite::Message::Text(msg) => {
182 match serde_json::from_str::<WsMessage>(&msg) {
183 Ok(parsed_msg) => yield Ok(parsed_msg),
184 Err(e) => {
185 tracing::error!("Failed to deserialize message: {msg}. Error: {e}");
186 yield Err(Error::Deserialization(e));
187 }
188 }
189 }
190 },
191 Some(Err(e)) => {
192 tracing::error!("WebSocket error: {e}");
193 yield Err(Error::ConnectFailed(e));
194 break;
195 }
196 None => {
197 tracing::error!("Connection closed unexpectedly");
198 yield Err(Error::ConnectionClosed {
199 reason: "Unexpected connection close".to_string(),
200 });
201 break;
202 }
203 }
204 }
205
206 tracing::info!("Shutdown stream");
207 })
208}
209
210#[allow(clippy::result_large_err)]
211fn handle_connection_response(ws_resp: tungstenite::http::Response<Option<Vec<u8>>>) -> Result<()> {
212 if ws_resp.status() != tungstenite::http::StatusCode::SWITCHING_PROTOCOLS {
213 return match ws_resp.body() {
214 Some(resp) => Err(Error::ConnectRejected {
215 status: ws_resp.status(),
216 reason: String::from_utf8_lossy(resp).to_string(),
217 }),
218 None => Err(Error::ConnectRejected {
219 status: ws_resp.status(),
220 reason: "Unknown reason".to_string(),
221 }),
222 };
223 }
224 Ok(())
225}
226
227async fn heartbeat(
228 mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
229) {
230 let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(10));
231
232 loop {
233 heartbeat_interval.tick().await;
234 tracing::trace!("Sending PING");
235
236 if let Err(e) = sender.send(tungstenite::Message::Ping(vec![].into())).await {
237 tracing::debug!("Heartbeat send failed (connection closed): {e}");
238 break;
239 }
240 }
241
242 tracing::debug!("Heartbeat task exiting");
243}