nautilus_tardis/machine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16pub mod client;
17pub mod message;
18pub mod parse;
19pub mod types;
20
21use std::{
22    sync::{
23        Arc,
24        atomic::{AtomicBool, Ordering},
25    },
26    time::Duration,
27};
28
29use async_stream::stream;
30use futures_util::{SinkExt, Stream, StreamExt, stream::SplitSink};
31use message::WsMessage;
32use nautilus_common::live::get_runtime;
33use tokio::net::TcpStream;
34use tokio_tungstenite::{
35    MaybeTlsStream, WebSocketStream, connect_async,
36    tungstenite::{self, protocol::frame::coding::CloseCode},
37};
38use types::{ReplayNormalizedRequestOptions, StreamNormalizedRequestOptions};
39
40pub use crate::machine::client::TardisMachineClient;
41
42pub type Result<T> = std::result::Result<T, Error>;
43
44/// The error that could happen while interacting with Tardis Machine Server.
45#[derive(Debug, thiserror::Error)]
46pub enum Error {
47    /// An error that could happen when an empty options array was given.
48    #[error("Options cannot be empty")]
49    EmptyOptions,
50    /// An error when failed to connect to Tardis' websocket connection.
51    #[error("Failed to connect: {0}")]
52    ConnectFailed(#[from] tungstenite::Error),
53    /// An error when WS connection to the machine server was rejected.
54    #[error("Connection rejected: {reason}")]
55    ConnectRejected {
56        /// The status code for the initial WS connection.
57        status: tungstenite::http::StatusCode,
58        /// The reason why the connection was rejected.
59        reason: String,
60    },
61    /// An error where the websocket connection was closed unexpectedly by Tardis.
62    #[error("Connection closed: {reason}")]
63    ConnectionClosed {
64        /// The reason why the connection was closed.
65        reason: String,
66    },
67    /// An error when deserializing the response from Tardis.
68    #[error("Failed to deserialize message: {0}")]
69    Deserialization(#[from] serde_json::Error),
70}
71
72/// Connects to the Tardis Machine WS replay endpoint and returns a stream of WebSocket messages.
73///
74/// # Errors
75///
76/// Returns `Error::EmptyOptions` if no options provided,
77/// or `Error::ConnectFailed`/`Error::ConnectRejected` if connection fails.
78pub async fn replay_normalized(
79    base_url: &str,
80    options: Vec<ReplayNormalizedRequestOptions>,
81    signal: Arc<AtomicBool>,
82) -> Result<impl Stream<Item = Result<WsMessage>>> {
83    if options.is_empty() {
84        return Err(Error::EmptyOptions);
85    }
86
87    let path = format!("{base_url}/ws-replay-normalized?options=");
88    let options = serde_json::to_string(&options)?;
89
90    let plain_url = format!("{path}{options}");
91    tracing::debug!("Connecting to {plain_url}");
92
93    let url = format!("{path}{}", urlencoding::encode(&options));
94    stream_from_websocket(base_url, url, signal).await
95}
96
97/// Connects to the Tardis Machine WS streaming endpoint and returns a stream of WebSocket messages.
98///
99/// # Errors
100///
101/// Returns `Error::EmptyOptions` if no options provided,
102/// or `Error::ConnectFailed`/`Error::ConnectRejected` if connection fails.
103pub async fn stream_normalized(
104    base_url: &str,
105    options: Vec<StreamNormalizedRequestOptions>,
106    signal: Arc<AtomicBool>,
107) -> Result<impl Stream<Item = Result<WsMessage>>> {
108    if options.is_empty() {
109        return Err(Error::EmptyOptions);
110    }
111
112    let path = format!("{base_url}/ws-stream-normalized?options=");
113    let options = serde_json::to_string(&options)?;
114
115    let plain_url = format!("{path}{options}");
116    tracing::debug!("Connecting to {plain_url}");
117
118    let url = format!("{path}{}", urlencoding::encode(&options));
119    stream_from_websocket(base_url, url, signal).await
120}
121
122async fn stream_from_websocket(
123    base_url: &str,
124    url: String,
125    signal: Arc<AtomicBool>,
126) -> Result<impl Stream<Item = Result<WsMessage>>> {
127    let (ws_stream, ws_resp) = connect_async(url).await?;
128
129    handle_connection_response(ws_resp)?;
130    tracing::info!("Connected to {base_url}");
131
132    Ok(stream! {
133        let (writer, mut reader) = ws_stream.split();
134        get_runtime().spawn(heartbeat(writer));
135
136        // Timeout awaiting the next record before checking signal
137        let timeout = Duration::from_millis(10);
138
139        tracing::info!("Streaming from websocket...");
140
141        loop {
142            if signal.load(Ordering::Relaxed) {
143                tracing::debug!("Shutdown signal received");
144                break;
145            }
146
147            let result = tokio::time::timeout(timeout, reader.next()).await;
148            let msg = match result {
149                Ok(msg) => msg,
150                Err(_) => continue, // Timeout
151            };
152
153            match msg {
154                Some(Ok(msg)) => match msg {
155                    tungstenite::Message::Frame(_)
156                    | tungstenite::Message::Binary(_)
157                    | tungstenite::Message::Pong(_)
158                    | tungstenite::Message::Ping(_) => {
159                        tracing::trace!("Received {msg:?}");
160                        continue; // Skip and continue to the next message
161                    }
162                    tungstenite::Message::Close(Some(frame)) => {
163                        let reason = frame.reason.to_string();
164                        if frame.code == CloseCode::Normal {
165                            tracing::debug!("Connection closed normally: {reason}");
166                        } else {
167                            tracing::error!(
168                                "Connection closed abnormally with code: {:?}, reason: {reason}", frame.code
169                            );
170                            yield Err(Error::ConnectionClosed { reason });
171                        }
172                        break;
173                    }
174                    tungstenite::Message::Close(None) => {
175                        tracing::error!("Connection closed without a frame");
176                        yield Err(Error::ConnectionClosed {
177                            reason: "No close frame provided".to_string()
178                        });
179                        break;
180                    }
181                    tungstenite::Message::Text(msg) => {
182                        match serde_json::from_str::<WsMessage>(&msg) {
183                            Ok(parsed_msg) => yield Ok(parsed_msg),
184                            Err(e) => {
185                                tracing::error!("Failed to deserialize message: {msg}. Error: {e}");
186                                yield Err(Error::Deserialization(e));
187                            }
188                        }
189                    }
190                },
191                Some(Err(e)) => {
192                    tracing::error!("WebSocket error: {e}");
193                    yield Err(Error::ConnectFailed(e));
194                    break;
195                }
196                None => {
197                    tracing::error!("Connection closed unexpectedly");
198                    yield Err(Error::ConnectionClosed {
199                        reason: "Unexpected connection close".to_string(),
200                    });
201                    break;
202                }
203            }
204        }
205
206        tracing::info!("Shutdown stream");
207    })
208}
209
210#[allow(clippy::result_large_err)]
211fn handle_connection_response(ws_resp: tungstenite::http::Response<Option<Vec<u8>>>) -> Result<()> {
212    if ws_resp.status() != tungstenite::http::StatusCode::SWITCHING_PROTOCOLS {
213        return match ws_resp.body() {
214            Some(resp) => Err(Error::ConnectRejected {
215                status: ws_resp.status(),
216                reason: String::from_utf8_lossy(resp).to_string(),
217            }),
218            None => Err(Error::ConnectRejected {
219                status: ws_resp.status(),
220                reason: "Unknown reason".to_string(),
221            }),
222        };
223    }
224    Ok(())
225}
226
227async fn heartbeat(
228    mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
229) {
230    let mut heartbeat_interval = tokio::time::interval(Duration::from_secs(10));
231
232    loop {
233        heartbeat_interval.tick().await;
234        tracing::trace!("Sending PING");
235
236        if let Err(e) = sender.send(tungstenite::Message::Ping(vec![].into())).await {
237            tracing::debug!("Heartbeat send failed (connection closed): {e}");
238            break;
239        }
240    }
241
242    tracing::debug!("Heartbeat task exiting");
243}