nautilus_tardis/machine/
mod.rs1pub mod client;
17pub mod message;
18pub mod parse;
19pub mod types;
20
21use std::{
22 sync::{
23 Arc,
24 atomic::{AtomicBool, Ordering},
25 },
26 time::Duration,
27};
28
29use async_stream::stream;
30use futures_util::{SinkExt, Stream, StreamExt, stream::SplitSink};
31use message::WsMessage;
32use tokio::net::TcpStream;
33use tokio_tungstenite::{
34 MaybeTlsStream, WebSocketStream, connect_async,
35 tungstenite::{self, protocol::frame::coding::CloseCode},
36};
37use types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
38
39pub use crate::machine::client::TardisMachineClient;
40
41pub type Result<T> = std::result::Result<T, Error>;
42
43#[derive(Debug, thiserror::Error)]
45pub enum Error {
46 #[error("Options cannot be empty")]
48 EmptyOptions,
49 #[error("Failed to connect: {0}")]
51 ConnectFailed(#[from] tungstenite::Error),
52 #[error("Connection rejected: {reason}")]
54 ConnectRejected {
55 status: tungstenite::http::StatusCode,
57 reason: String,
59 },
60 #[error("Connection closed: {reason}")]
62 ConnectionClosed {
63 reason: String,
65 },
66 #[error("Failed to deserialize message: {0}")]
68 Deserialization(#[from] serde_json::Error),
69}
70
71pub async fn replay_normalized(
72 base_url: &str,
73 options: Vec<ReplayNormalizedRequestOptions>,
74 signal: Arc<AtomicBool>,
75) -> Result<impl Stream<Item = Result<WsMessage>>> {
76 if options.is_empty() {
77 return Err(Error::EmptyOptions);
78 }
79
80 let path = format!("{base_url}/ws-replay-normalized?options=");
81 let options = serde_json::to_string(&options)?;
82
83 let plain_url = format!("{path}{options}");
84 tracing::debug!("Connecting to {plain_url}");
85
86 let url = format!("{path}{}", urlencoding::encode(&options));
87 stream_from_websocket(base_url, url, signal).await
88}
89
90pub async fn stream_normalized(
91 base_url: &str,
92 options: Vec<StreamNormalizedRequestOptions>,
93 signal: Arc<AtomicBool>,
94) -> Result<impl Stream<Item = Result<WsMessage>>> {
95 if options.is_empty() {
96 return Err(Error::EmptyOptions);
97 }
98
99 let path = format!("{base_url}/ws-stream-normalized?options=");
100 let options = serde_json::to_string(&options)?;
101
102 let plain_url = format!("{path}{options}");
103 tracing::debug!("Connecting to {plain_url}");
104
105 let url = format!("{path}{}", urlencoding::encode(&options));
106 stream_from_websocket(base_url, url, signal).await
107}
108
109async fn stream_from_websocket(
110 base_url: &str,
111 url: String,
112 signal: Arc<AtomicBool>,
113) -> Result<impl Stream<Item = Result<WsMessage>>> {
114 let (ws_stream, ws_resp) = connect_async(url).await?;
115
116 handle_connection_response(ws_resp)?;
117 tracing::info!("Connected to {base_url}");
118
119 Ok(stream! {
120 let (writer, mut reader) = ws_stream.split();
121 tokio::spawn(heartbeat(writer));
122
123 let timeout = Duration::from_millis(10);
125
126 tracing::info!("Streaming from websocket...");
127
128 loop {
129 if signal.load(Ordering::Relaxed) {
130 tracing::debug!("Shutdown signal received");
131 break;
132 }
133
134 let result = tokio::time::timeout(timeout, reader.next()).await;
135 let msg = match result {
136 Ok(msg) => msg,
137 Err(_) => continue, };
139
140 match msg {
141 Some(Ok(msg)) => match msg {
142 tungstenite::Message::Frame(_)
143 | tungstenite::Message::Binary(_)
144 | tungstenite::Message::Pong(_)
145 | tungstenite::Message::Ping(_) => {
146 tracing::trace!("Received {msg:?}");
147 continue; }
149 tungstenite::Message::Close(Some(frame)) => {
150 let reason = frame.reason.to_string();
151 if frame.code == CloseCode::Normal {
152 tracing::debug!("Connection closed normally: {reason}");
153 } else {
154 tracing::error!(
155 "Connection closed abnormally with code: {:?}, reason: {reason}",
156 frame.code
157 );
158 yield Err(Error::ConnectionClosed { reason });
159 }
160 break;
161 }
162 tungstenite::Message::Close(None) => {
163 tracing::error!("Connection closed without a frame");
164 yield Err(Error::ConnectionClosed {
165 reason: "No close frame provided".to_string()
166 });
167 break;
168 }
169 tungstenite::Message::Text(msg) => {
170 match serde_json::from_str::<WsMessage>(&msg) {
171 Ok(parsed_msg) => yield Ok(parsed_msg),
172 Err(e) => {
173 tracing::error!("Failed to deserialize message: {msg}. Error: {e}");
174 yield Err(Error::Deserialization(e));
175 }
176 }
177 }
178 },
179 Some(Err(e)) => {
180 tracing::error!("WebSocket error: {e}");
181 yield Err(Error::ConnectFailed(e));
182 break;
183 }
184 None => {
185 tracing::error!("Connection closed unexpectedly");
186 yield Err(Error::ConnectionClosed {
187 reason: "Unexpected connection close".to_string(),
188 });
189 break;
190 }
191 }
192 }
193
194 tracing::info!("Shutdown stream");
195 })
196}
197
198fn handle_connection_response(ws_resp: tungstenite::http::Response<Option<Vec<u8>>>) -> Result<()> {
199 if ws_resp.status() != tungstenite::http::StatusCode::SWITCHING_PROTOCOLS {
200 return match ws_resp.body() {
201 Some(resp) => Err(Error::ConnectRejected {
202 status: ws_resp.status(),
203 reason: String::from_utf8_lossy(resp).to_string(),
204 }),
205 None => Err(Error::ConnectRejected {
206 status: ws_resp.status(),
207 reason: "Unknown reason".to_string(),
208 }),
209 };
210 }
211 Ok(())
212}
213
214async fn heartbeat(
215 mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
216) {
217 let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(10));
218 let retry_interval = Duration::from_secs(1);
219
220 loop {
221 heartbeat_interval.tick().await;
222 tracing::trace!("Sending PING");
223
224 let mut count = 3;
225 let mut retry_interval = tokio::time::interval(retry_interval);
226
227 while count > 0 {
228 retry_interval.tick().await;
229 let _ = sender.send(tungstenite::Message::Ping(vec![].into())).await;
230 count -= 1;
231 }
232 }
233}