nautilus_tardis/machine/
mod.rspub mod client;
pub mod message;
pub mod parse;
pub mod types;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use async_stream::stream;
use futures_util::{stream::SplitSink, SinkExt, Stream, StreamExt};
use message::WsMessage;
use tokio::{net::TcpStream, time::timeout};
use tokio_tungstenite::{
connect_async,
tungstenite::{self, protocol::frame::coding::CloseCode},
MaybeTlsStream, WebSocketStream,
};
use types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
pub use crate::machine::client::TardisMachineClient;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Options cannot be empty")]
EmptyOptions,
#[error("Failed to connect: {0}")]
ConnectFailed(#[from] tungstenite::Error),
#[error("Connection rejected: {reason}")]
ConnectRejected {
status: tungstenite::http::StatusCode,
reason: String,
},
#[error("Connection closed: {reason}")]
ConnectionClosed {
reason: String,
},
#[error("Failed to deserialize message: {0}")]
Deserialization(#[from] serde_json::Error),
}
pub async fn replay_normalized(
base_url: &str,
options: Vec<ReplayNormalizedRequestOptions>,
signal: Arc<AtomicBool>,
) -> Result<impl Stream<Item = Result<WsMessage>>> {
if options.is_empty() {
return Err(Error::EmptyOptions);
}
let path = format!("{base_url}/ws-replay-normalized?options=");
let options = serde_json::to_string(&options)?;
let plain_url = format!("{path}{options}");
tracing::debug!("Connecting to {plain_url}");
let url = format!("{path}{}", urlencoding::encode(&options));
stream_from_websocket(base_url, &url, signal).await
}
pub async fn stream_normalized(
base_url: &str,
options: Vec<StreamNormalizedRequestOptions>,
signal: Arc<AtomicBool>,
) -> Result<impl Stream<Item = Result<WsMessage>>> {
if options.is_empty() {
return Err(Error::EmptyOptions);
}
let path = format!("{base_url}/ws-stream-normalized?options=");
let options = serde_json::to_string(&options)?;
let plain_url = format!("{path}{options}");
tracing::debug!("Connecting to {plain_url}");
let url = format!("{path}{}", urlencoding::encode(&options));
stream_from_websocket(base_url, &url, signal).await
}
async fn stream_from_websocket(
base_url: &str,
url: &str,
signal: Arc<AtomicBool>,
) -> Result<impl Stream<Item = Result<WsMessage>>> {
let (ws_stream, ws_resp) = connect_async(url).await?;
handle_connection_response(ws_resp)?;
tracing::info!("Connected to {base_url}");
Ok(stream! {
let (writer, mut reader) = ws_stream.split();
tokio::spawn(heartbeat(writer));
let timeout_duration = Duration::from_millis(10);
tracing::info!("Streaming from websocket...");
loop {
if signal.load(Ordering::Relaxed) {
tracing::debug!("Shutdown signal received");
break;
}
let result = timeout(timeout_duration, reader.next()).await;
let msg = match result {
Ok(msg) => msg,
Err(_) => continue, };
match msg {
Some(Ok(msg)) => match msg {
tungstenite::Message::Frame(_)
| tungstenite::Message::Binary(_)
| tungstenite::Message::Pong(_)
| tungstenite::Message::Ping(_) => {
tracing::trace!("Received {msg:?}");
continue; }
tungstenite::Message::Close(Some(frame)) => {
let reason = frame.reason.to_string();
if frame.code != CloseCode::Normal {
tracing::error!(
"Connection closed abnormally with code: {:?}, reason: {reason}",
frame.code
);
yield Err(Error::ConnectionClosed { reason });
} else {
tracing::debug!("Connection closed normally: {reason}");
}
break;
}
tungstenite::Message::Close(None) => {
tracing::error!("Connection closed without a frame");
yield Err(Error::ConnectionClosed {
reason: "No close frame provided".to_string()
});
break;
}
tungstenite::Message::Text(msg) => {
match serde_json::from_str::<WsMessage>(&msg) {
Ok(parsed_msg) => yield Ok(parsed_msg),
Err(e) => {
tracing::error!("Failed to deserialize message: {msg}. Error: {e}");
yield Err(Error::Deserialization(e));
}
}
}
},
Some(Err(e)) => {
tracing::error!("WebSocket error: {e}");
yield Err(Error::ConnectFailed(e));
break;
}
None => {
tracing::error!("Connection closed unexpectedly");
yield Err(Error::ConnectionClosed {
reason: "Unexpected connection close".to_string(),
});
break;
}
}
}
tracing::info!("Shutdown stream");
})
}
fn handle_connection_response(ws_resp: tungstenite::http::Response<Option<Vec<u8>>>) -> Result<()> {
if ws_resp.status() != tungstenite::http::StatusCode::SWITCHING_PROTOCOLS {
return match ws_resp.body() {
Some(resp) => Err(Error::ConnectRejected {
status: ws_resp.status(),
reason: String::from_utf8_lossy(resp).to_string(),
}),
None => Err(Error::ConnectRejected {
status: ws_resp.status(),
reason: "Unknown reason".to_string(),
}),
};
}
Ok(())
}
async fn heartbeat(
mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
) {
let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(10));
let retry_interval = Duration::from_secs(1);
loop {
heartbeat_interval.tick().await;
tracing::trace!("Sending PING");
let mut count = 3;
let mut retry_interval = tokio::time::interval(retry_interval);
while count > 0 {
retry_interval.tick().await;
let _ = sender.send(tungstenite::Message::Ping(vec![])).await;
count -= 1;
}
}
}