nautilus_network/python/
socket.rs
1use std::{
17 sync::{Arc, atomic::Ordering},
18 time::Duration,
19};
20
21use nautilus_core::python::to_pyruntime_err;
22use pyo3::prelude::*;
23use tokio_tungstenite::tungstenite::stream::Mode;
24
25use crate::{
26 mode::ConnectionMode,
27 socket::{SocketClient, SocketConfig, WriterCommand},
28};
29
30#[pymethods]
31impl SocketConfig {
32 #[new]
33 #[allow(clippy::too_many_arguments)]
34 #[pyo3(signature = (url, ssl, suffix, handler, heartbeat=None, reconnect_timeout_ms=10_000, reconnect_delay_initial_ms=2_000, reconnect_delay_max_ms=30_000, reconnect_backoff_factor=1.5, reconnect_jitter_ms=100, certs_dir=None))]
35 fn py_new(
36 url: String,
37 ssl: bool,
38 suffix: Vec<u8>,
39 handler: PyObject,
40 heartbeat: Option<(u64, Vec<u8>)>,
41 reconnect_timeout_ms: Option<u64>,
42 reconnect_delay_initial_ms: Option<u64>,
43 reconnect_delay_max_ms: Option<u64>,
44 reconnect_backoff_factor: Option<f64>,
45 reconnect_jitter_ms: Option<u64>,
46 certs_dir: Option<String>,
47 ) -> Self {
48 let mode = if ssl { Mode::Tls } else { Mode::Plain };
49 Self {
50 url,
51 mode,
52 suffix,
53 py_handler: Some(Arc::new(handler)),
54 heartbeat,
55 reconnect_timeout_ms,
56 reconnect_delay_initial_ms,
57 reconnect_delay_max_ms,
58 reconnect_backoff_factor,
59 reconnect_jitter_ms,
60 certs_dir,
61 }
62 }
63}
64
65#[pymethods]
66impl SocketClient {
67 #[staticmethod]
73 #[pyo3(name = "connect")]
74 #[pyo3(signature = (config, post_connection=None, post_reconnection=None, post_disconnection=None))]
75 fn py_connect(
76 config: SocketConfig,
77 post_connection: Option<PyObject>,
78 post_reconnection: Option<PyObject>,
79 post_disconnection: Option<PyObject>,
80 py: Python<'_>,
81 ) -> PyResult<Bound<PyAny>> {
82 pyo3_async_runtimes::tokio::future_into_py(py, async move {
83 Self::connect(
84 config,
85 None, post_connection,
87 post_reconnection,
88 post_disconnection,
89 )
90 .await
91 .map_err(to_pyruntime_err)
92 })
93 }
94
95 #[pyo3(name = "is_active")]
105 fn py_is_active(slf: PyRef<'_, Self>) -> bool {
106 slf.is_active()
107 }
108
109 #[pyo3(name = "is_reconnecting")]
110 fn py_is_reconnecting(slf: PyRef<'_, Self>) -> bool {
111 slf.is_reconnecting()
112 }
113
114 #[pyo3(name = "is_disconnecting")]
115 fn py_is_disconnecting(slf: PyRef<'_, Self>) -> bool {
116 slf.is_disconnecting()
117 }
118
119 #[pyo3(name = "is_closed")]
120 fn py_is_closed(slf: PyRef<'_, Self>) -> bool {
121 slf.is_closed()
122 }
123
124 #[pyo3(name = "mode")]
125 fn py_mode(slf: PyRef<'_, Self>) -> String {
126 slf.connection_mode().to_string()
127 }
128
129 #[pyo3(name = "reconnect")]
131 fn py_reconnect<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
132 let mode = slf.connection_mode.clone();
133 let mode_str = ConnectionMode::from_atomic(&mode).to_string();
134 tracing::debug!("Reconnect from mode {mode_str}");
135
136 pyo3_async_runtimes::tokio::future_into_py(py, async move {
137 match ConnectionMode::from_atomic(&mode) {
138 ConnectionMode::Reconnect => {
139 tracing::warn!("Cannot reconnect - socket already reconnecting");
140 }
141 ConnectionMode::Disconnect => {
142 tracing::warn!("Cannot reconnect - socket disconnecting");
143 }
144 ConnectionMode::Closed => {
145 tracing::warn!("Cannot reconnect - socket closed");
146 }
147 _ => {
148 mode.store(ConnectionMode::Reconnect.as_u8(), Ordering::SeqCst);
149 while !ConnectionMode::from_atomic(&mode).is_active() {
150 tokio::time::sleep(Duration::from_millis(10)).await;
151 }
152 }
153 }
154
155 Ok(())
156 })
157 }
158
159 #[pyo3(name = "close")]
169 fn py_close<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
170 let mode = slf.connection_mode.clone();
171 let mode_str = ConnectionMode::from_atomic(&mode).to_string();
172 tracing::debug!("Close from mode {mode_str}");
173
174 pyo3_async_runtimes::tokio::future_into_py(py, async move {
175 match ConnectionMode::from_atomic(&mode) {
176 ConnectionMode::Closed => {
177 tracing::warn!("Socket already closed");
178 }
179 ConnectionMode::Disconnect => {
180 tracing::warn!("Socket already disconnecting");
181 }
182 _ => {
183 mode.store(ConnectionMode::Disconnect.as_u8(), Ordering::SeqCst);
184 while !ConnectionMode::from_atomic(&mode).is_closed() {
185 tokio::time::sleep(Duration::from_millis(10)).await;
186 }
187 }
188 }
189
190 Ok(())
191 })
192 }
193
194 #[pyo3(name = "send")]
200 fn py_send<'py>(
201 slf: PyRef<'_, Self>,
202 data: Vec<u8>,
203 py: Python<'py>,
204 ) -> PyResult<Bound<'py, PyAny>> {
205 tracing::trace!("Sending {}", String::from_utf8_lossy(&data));
206
207 let mode = slf.connection_mode.clone();
208 let writer_tx = slf.writer_tx.clone();
209
210 pyo3_async_runtimes::tokio::future_into_py(py, async move {
211 if ConnectionMode::from_atomic(&mode).is_closed() {
212 let msg = format!(
213 "Cannot send data ({}): socket closed",
214 String::from_utf8_lossy(&data)
215 );
216 log::error!("{msg}");
217 return Ok(());
218 }
219
220 let timeout = Duration::from_secs(2);
221 let check_interval = Duration::from_millis(1);
222
223 if !ConnectionMode::from_atomic(&mode).is_active() {
224 tracing::debug!("Waiting for client to become ACTIVE before sending (2s)...");
225 match tokio::time::timeout(timeout, async {
226 while !ConnectionMode::from_atomic(&mode).is_active() {
227 if matches!(
228 ConnectionMode::from_atomic(&mode),
229 ConnectionMode::Disconnect | ConnectionMode::Closed
230 ) {
231 return Err("Client disconnected waiting to send");
232 }
233
234 tokio::time::sleep(check_interval).await;
235 }
236
237 Ok(())
238 })
239 .await
240 {
241 Ok(Ok(())) => tracing::debug!("Client now active"),
242 Ok(Err(e)) => {
243 tracing::error!(
244 "Failed sending data ({}): {e}",
245 String::from_utf8_lossy(&data)
246 );
247 return Ok(());
248 }
249 Err(_) => {
250 tracing::error!(
251 "Failed sending data ({}): timeout waiting to become ACTIVE",
252 String::from_utf8_lossy(&data)
253 );
254 return Ok(());
255 }
256 }
257 }
258
259 let msg = WriterCommand::Send(data.into());
260 if let Err(e) = writer_tx.send(msg) {
261 tracing::error!("{e}");
262 }
263 Ok(())
264 })
265 }
266}