nautilus_tardis/machine/
mod.rs1pub mod client;
17pub mod message;
18pub mod parse;
19pub mod types;
20
21use std::{
22 sync::{
23 atomic::{AtomicBool, Ordering},
24 Arc,
25 },
26 time::Duration,
27};
28
29use async_stream::stream;
30use futures_util::{stream::SplitSink, SinkExt, Stream, StreamExt};
31use message::WsMessage;
32use tokio::net::TcpStream;
33use tokio_tungstenite::{
34 connect_async,
35 tungstenite::{self, protocol::frame::coding::CloseCode},
36 MaybeTlsStream, WebSocketStream,
37};
38use types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
39
40pub use crate::machine::client::TardisMachineClient;
41
42pub type Result<T> = std::result::Result<T, Error>;
43
44#[derive(Debug, thiserror::Error)]
46pub enum Error {
47 #[error("Options cannot be empty")]
49 EmptyOptions,
50 #[error("Failed to connect: {0}")]
52 ConnectFailed(#[from] tungstenite::Error),
53 #[error("Connection rejected: {reason}")]
55 ConnectRejected {
56 status: tungstenite::http::StatusCode,
58 reason: String,
60 },
61 #[error("Connection closed: {reason}")]
63 ConnectionClosed {
64 reason: String,
66 },
67 #[error("Failed to deserialize message: {0}")]
69 Deserialization(#[from] serde_json::Error),
70}
71
72pub async fn replay_normalized(
73 base_url: &str,
74 options: Vec<ReplayNormalizedRequestOptions>,
75 signal: Arc<AtomicBool>,
76) -> Result<impl Stream<Item = Result<WsMessage>>> {
77 if options.is_empty() {
78 return Err(Error::EmptyOptions);
79 }
80
81 let path = format!("{base_url}/ws-replay-normalized?options=");
82 let options = serde_json::to_string(&options)?;
83
84 let plain_url = format!("{path}{options}");
85 tracing::debug!("Connecting to {plain_url}");
86
87 let url = format!("{path}{}", urlencoding::encode(&options));
88 stream_from_websocket(base_url, &url, signal).await
89}
90
91pub async fn stream_normalized(
92 base_url: &str,
93 options: Vec<StreamNormalizedRequestOptions>,
94 signal: Arc<AtomicBool>,
95) -> Result<impl Stream<Item = Result<WsMessage>>> {
96 if options.is_empty() {
97 return Err(Error::EmptyOptions);
98 }
99
100 let path = format!("{base_url}/ws-stream-normalized?options=");
101 let options = serde_json::to_string(&options)?;
102
103 let plain_url = format!("{path}{options}");
104 tracing::debug!("Connecting to {plain_url}");
105
106 let url = format!("{path}{}", urlencoding::encode(&options));
107 stream_from_websocket(base_url, &url, signal).await
108}
109
110async fn stream_from_websocket(
111 base_url: &str,
112 url: &str,
113 signal: Arc<AtomicBool>,
114) -> Result<impl Stream<Item = Result<WsMessage>>> {
115 let (ws_stream, ws_resp) = connect_async(url).await?;
116
117 handle_connection_response(ws_resp)?;
118 tracing::info!("Connected to {base_url}");
119
120 Ok(stream! {
121 let (writer, mut reader) = ws_stream.split();
122 tokio::spawn(heartbeat(writer));
123
124 let timeout = Duration::from_millis(10);
126
127 tracing::info!("Streaming from websocket...");
128
129 loop {
130 if signal.load(Ordering::Relaxed) {
131 tracing::debug!("Shutdown signal received");
132 break;
133 }
134
135 let result = tokio::time::timeout(timeout, reader.next()).await;
136 let msg = match result {
137 Ok(msg) => msg,
138 Err(_) => continue, };
140
141 match msg {
142 Some(Ok(msg)) => match msg {
143 tungstenite::Message::Frame(_)
144 | tungstenite::Message::Binary(_)
145 | tungstenite::Message::Pong(_)
146 | tungstenite::Message::Ping(_) => {
147 tracing::trace!("Received {msg:?}");
148 continue; }
150 tungstenite::Message::Close(Some(frame)) => {
151 let reason = frame.reason.to_string();
152 if frame.code == CloseCode::Normal {
153 tracing::debug!("Connection closed normally: {reason}");
154 } else {
155 tracing::error!(
156 "Connection closed abnormally with code: {:?}, reason: {reason}",
157 frame.code
158 );
159 yield Err(Error::ConnectionClosed { reason });
160 }
161 break;
162 }
163 tungstenite::Message::Close(None) => {
164 tracing::error!("Connection closed without a frame");
165 yield Err(Error::ConnectionClosed {
166 reason: "No close frame provided".to_string()
167 });
168 break;
169 }
170 tungstenite::Message::Text(msg) => {
171 match serde_json::from_str::<WsMessage>(&msg) {
172 Ok(parsed_msg) => yield Ok(parsed_msg),
173 Err(e) => {
174 tracing::error!("Failed to deserialize message: {msg}. Error: {e}");
175 yield Err(Error::Deserialization(e));
176 }
177 }
178 }
179 },
180 Some(Err(e)) => {
181 tracing::error!("WebSocket error: {e}");
182 yield Err(Error::ConnectFailed(e));
183 break;
184 }
185 None => {
186 tracing::error!("Connection closed unexpectedly");
187 yield Err(Error::ConnectionClosed {
188 reason: "Unexpected connection close".to_string(),
189 });
190 break;
191 }
192 }
193 }
194
195 tracing::info!("Shutdown stream");
196 })
197}
198
199fn handle_connection_response(ws_resp: tungstenite::http::Response<Option<Vec<u8>>>) -> Result<()> {
200 if ws_resp.status() != tungstenite::http::StatusCode::SWITCHING_PROTOCOLS {
201 return match ws_resp.body() {
202 Some(resp) => Err(Error::ConnectRejected {
203 status: ws_resp.status(),
204 reason: String::from_utf8_lossy(resp).to_string(),
205 }),
206 None => Err(Error::ConnectRejected {
207 status: ws_resp.status(),
208 reason: "Unknown reason".to_string(),
209 }),
210 };
211 }
212 Ok(())
213}
214
215async fn heartbeat(
216 mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
217) {
218 let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(10));
219 let retry_interval = Duration::from_secs(1);
220
221 loop {
222 heartbeat_interval.tick().await;
223 tracing::trace!("Sending PING");
224
225 let mut count = 3;
226 let mut retry_interval = tokio::time::interval(retry_interval);
227
228 while count > 0 {
229 retry_interval.tick().await;
230 let _ = sender.send(tungstenite::Message::Ping(vec![].into())).await;
231 count -= 1;
232 }
233 }
234}