nautilus_network/python/
socket.rs1use std::{
17 sync::{Arc, atomic::Ordering},
18 time::Duration,
19};
20
21use nautilus_core::python::to_pyruntime_err;
22use pyo3::prelude::*;
23use tokio::io::AsyncWriteExt;
24use tokio_tungstenite::tungstenite::stream::Mode;
25
26use crate::{
27 mode::ConnectionMode,
28 socket::{SocketClient, SocketConfig},
29};
30
31#[pymethods]
32impl SocketConfig {
33 #[new]
34 #[pyo3(signature = (url, ssl, suffix, handler, heartbeat=None, reconnect_timeout_ms=10_000, reconnect_delay_initial_ms=2_000, reconnect_delay_max_ms=30_000, reconnect_backoff_factor=1.5, reconnect_jitter_ms=100, certs_dir=None))]
35 #[allow(clippy::too_many_arguments)]
36 fn py_new(
37 url: String,
38 ssl: bool,
39 suffix: Vec<u8>,
40 handler: PyObject,
41 heartbeat: Option<(u64, Vec<u8>)>,
42 reconnect_timeout_ms: Option<u64>,
43 reconnect_delay_initial_ms: Option<u64>,
44 reconnect_delay_max_ms: Option<u64>,
45 reconnect_backoff_factor: Option<f64>,
46 reconnect_jitter_ms: Option<u64>,
47 certs_dir: Option<String>,
48 ) -> Self {
49 let mode = if ssl { Mode::Tls } else { Mode::Plain };
50 Self {
51 url,
52 mode,
53 suffix,
54 handler: Arc::new(handler),
55 heartbeat,
56 reconnect_timeout_ms,
57 reconnect_delay_initial_ms,
58 reconnect_delay_max_ms,
59 reconnect_backoff_factor,
60 reconnect_jitter_ms,
61 certs_dir,
62 }
63 }
64}
65
66#[pymethods]
67impl SocketClient {
68 #[staticmethod]
74 #[pyo3(name = "connect")]
75 #[pyo3(signature = (config, post_connection=None, post_reconnection=None, post_disconnection=None))]
76 fn py_connect(
77 config: SocketConfig,
78 post_connection: Option<PyObject>,
79 post_reconnection: Option<PyObject>,
80 post_disconnection: Option<PyObject>,
81 py: Python<'_>,
82 ) -> PyResult<Bound<PyAny>> {
83 pyo3_async_runtimes::tokio::future_into_py(py, async move {
84 Self::connect(
85 config,
86 post_connection,
87 post_reconnection,
88 post_disconnection,
89 )
90 .await
91 .map_err(to_pyruntime_err)
92 })
93 }
94
95 #[pyo3(name = "is_active")]
105 fn py_is_active(slf: PyRef<'_, Self>) -> bool {
106 slf.is_active()
107 }
108
109 #[pyo3(name = "is_reconnecting")]
110 fn py_is_reconnecting(slf: PyRef<'_, Self>) -> bool {
111 slf.is_reconnecting()
112 }
113
114 #[pyo3(name = "is_disconnecting")]
115 fn py_is_disconnecting(slf: PyRef<'_, Self>) -> bool {
116 slf.is_disconnecting()
117 }
118
119 #[pyo3(name = "is_closed")]
120 fn py_is_closed(slf: PyRef<'_, Self>) -> bool {
121 slf.is_closed()
122 }
123
124 #[pyo3(name = "mode")]
125 fn py_mode(slf: PyRef<'_, Self>) -> String {
126 slf.connection_mode().to_string()
127 }
128
129 #[pyo3(name = "reconnect")]
131 fn py_reconnect<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
132 let mode = slf.connection_mode.clone();
133 let mode_str = ConnectionMode::from_atomic(&mode).to_string();
134 tracing::debug!("Reconnect from mode {mode_str}");
135
136 pyo3_async_runtimes::tokio::future_into_py(py, async move {
137 match ConnectionMode::from_atomic(&mode) {
138 ConnectionMode::Reconnect => {
139 tracing::warn!("Cannot reconnect: socket already reconnecting");
140 }
141 ConnectionMode::Disconnect => {
142 tracing::warn!("Cannot reconnect: socket disconnecting");
143 }
144 ConnectionMode::Closed => {
145 tracing::warn!("Cannot reconnect: socket closed");
146 }
147 _ => {
148 mode.store(ConnectionMode::Reconnect.as_u8(), Ordering::SeqCst);
149 while !ConnectionMode::from_atomic(&mode).is_active() {
150 tokio::time::sleep(Duration::from_millis(10)).await;
151 }
152 }
153 }
154
155 Ok(())
156 })
157 }
158
159 #[pyo3(name = "close")]
169 fn py_close<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
170 let mode = slf.connection_mode.clone();
171 let mode_str = ConnectionMode::from_atomic(&mode).to_string();
172 tracing::debug!("Close from mode {mode_str}");
173
174 pyo3_async_runtimes::tokio::future_into_py(py, async move {
175 match ConnectionMode::from_atomic(&mode) {
176 ConnectionMode::Closed => {
177 tracing::warn!("Socket already closed");
178 }
179 ConnectionMode::Disconnect => {
180 tracing::warn!("Socket already disconnecting");
181 }
182 _ => {
183 mode.store(ConnectionMode::Disconnect.as_u8(), Ordering::SeqCst);
184 while !ConnectionMode::from_atomic(&mode).is_closed() {
185 tokio::time::sleep(Duration::from_millis(10)).await;
186 }
187 }
188 }
189
190 Ok(())
191 })
192 }
193
194 #[pyo3(name = "send")]
200 fn py_send<'py>(
201 slf: PyRef<'_, Self>,
202 mut data: Vec<u8>,
203 py: Python<'py>,
204 ) -> PyResult<Bound<'py, PyAny>> {
205 data.extend(&slf.suffix);
206 tracing::trace!("Sending {}", String::from_utf8_lossy(&data));
207
208 let writer = slf.writer.clone();
209 let mode = slf.connection_mode.clone();
210
211 pyo3_async_runtimes::tokio::future_into_py(py, async move {
212 if ConnectionMode::from_atomic(&mode).is_closed() {
213 let msg = format!(
214 "Cannot send data ({}): socket closed",
215 String::from_utf8_lossy(&data)
216 );
217 log::error!("{}", msg);
218 return Ok(());
219 }
220
221 let timeout = Duration::from_secs(2);
222 let check_interval = Duration::from_millis(1);
223
224 if !ConnectionMode::from_atomic(&mode).is_active() {
225 tracing::debug!("Waiting for client to become ACTIVE before sending (2s)...");
226 match tokio::time::timeout(timeout, async {
227 while !ConnectionMode::from_atomic(&mode).is_active() {
228 if matches!(
229 ConnectionMode::from_atomic(&mode),
230 ConnectionMode::Disconnect | ConnectionMode::Closed
231 ) {
232 return Err("Client disconnected waiting to send");
233 }
234
235 tokio::time::sleep(check_interval).await;
236 }
237
238 Ok(())
239 })
240 .await
241 {
242 Ok(Ok(())) => tracing::debug!("Client now active"),
243 Ok(Err(e)) => {
244 tracing::error!(
245 "Cannot send data ({}): {e}",
246 String::from_utf8_lossy(&data)
247 );
248 return Ok(());
249 }
250 Err(_) => {
251 tracing::error!(
252 "Cannot send data ({}): timeout waiting to become ACTIVE",
253 String::from_utf8_lossy(&data)
254 );
255 return Ok(());
256 }
257 }
258 }
259
260 let mut writer = writer.lock().await;
261 writer.write_all(&data).await?;
262 Ok(())
263 })
264 }
265}