nautilus_network/python/
socket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{Arc, atomic::Ordering},
18    time::Duration,
19};
20
21use nautilus_core::python::to_pyruntime_err;
22use pyo3::prelude::*;
23use tokio_tungstenite::tungstenite::stream::Mode;
24
25use crate::{
26    mode::ConnectionMode,
27    socket::{SocketClient, SocketConfig, WriterCommand},
28};
29
30#[pymethods]
31impl SocketConfig {
32    #[new]
33    #[allow(clippy::too_many_arguments)]
34    #[pyo3(signature = (url, ssl, suffix, handler, heartbeat=None, reconnect_timeout_ms=10_000, reconnect_delay_initial_ms=2_000, reconnect_delay_max_ms=30_000, reconnect_backoff_factor=1.5, reconnect_jitter_ms=100, certs_dir=None))]
35    fn py_new(
36        url: String,
37        ssl: bool,
38        suffix: Vec<u8>,
39        handler: PyObject,
40        heartbeat: Option<(u64, Vec<u8>)>,
41        reconnect_timeout_ms: Option<u64>,
42        reconnect_delay_initial_ms: Option<u64>,
43        reconnect_delay_max_ms: Option<u64>,
44        reconnect_backoff_factor: Option<f64>,
45        reconnect_jitter_ms: Option<u64>,
46        certs_dir: Option<String>,
47    ) -> Self {
48        let mode = if ssl { Mode::Tls } else { Mode::Plain };
49        Self {
50            url,
51            mode,
52            suffix,
53            py_handler: Some(Arc::new(handler)),
54            heartbeat,
55            reconnect_timeout_ms,
56            reconnect_delay_initial_ms,
57            reconnect_delay_max_ms,
58            reconnect_backoff_factor,
59            reconnect_jitter_ms,
60            certs_dir,
61        }
62    }
63}
64
65#[pymethods]
66impl SocketClient {
67    /// Create a socket client.
68    ///
69    /// # Errors
70    ///
71    /// - Throws an Exception if it is unable to make socket connection.
72    #[staticmethod]
73    #[pyo3(name = "connect")]
74    #[pyo3(signature = (config, post_connection=None, post_reconnection=None, post_disconnection=None))]
75    fn py_connect(
76        config: SocketConfig,
77        post_connection: Option<PyObject>,
78        post_reconnection: Option<PyObject>,
79        post_disconnection: Option<PyObject>,
80        py: Python<'_>,
81    ) -> PyResult<Bound<PyAny>> {
82        pyo3_async_runtimes::tokio::future_into_py(py, async move {
83            Self::connect(
84                config,
85                None, // Rust handler
86                post_connection,
87                post_reconnection,
88                post_disconnection,
89            )
90            .await
91            .map_err(to_pyruntime_err)
92        })
93    }
94
95    /// Check if the client is still alive.
96    ///
97    /// Even if the connection is disconnected the client will still be alive
98    /// and trying to reconnect.
99    ///
100    /// This is particularly useful for check why a `send` failed. It could
101    /// be because the connection disconnected and the client is still alive
102    /// and reconnecting. In such cases the send can be retried after some
103    /// delay
104    #[pyo3(name = "is_active")]
105    fn py_is_active(slf: PyRef<'_, Self>) -> bool {
106        slf.is_active()
107    }
108
109    #[pyo3(name = "is_reconnecting")]
110    fn py_is_reconnecting(slf: PyRef<'_, Self>) -> bool {
111        slf.is_reconnecting()
112    }
113
114    #[pyo3(name = "is_disconnecting")]
115    fn py_is_disconnecting(slf: PyRef<'_, Self>) -> bool {
116        slf.is_disconnecting()
117    }
118
119    #[pyo3(name = "is_closed")]
120    fn py_is_closed(slf: PyRef<'_, Self>) -> bool {
121        slf.is_closed()
122    }
123
124    #[pyo3(name = "mode")]
125    fn py_mode(slf: PyRef<'_, Self>) -> String {
126        slf.connection_mode().to_string()
127    }
128
129    /// Reconnect the client.
130    #[pyo3(name = "reconnect")]
131    fn py_reconnect<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
132        let mode = slf.connection_mode.clone();
133        let mode_str = ConnectionMode::from_atomic(&mode).to_string();
134        tracing::debug!("Reconnect from mode {mode_str}");
135
136        pyo3_async_runtimes::tokio::future_into_py(py, async move {
137            match ConnectionMode::from_atomic(&mode) {
138                ConnectionMode::Reconnect => {
139                    tracing::warn!("Cannot reconnect - socket already reconnecting");
140                }
141                ConnectionMode::Disconnect => {
142                    tracing::warn!("Cannot reconnect - socket disconnecting");
143                }
144                ConnectionMode::Closed => {
145                    tracing::warn!("Cannot reconnect - socket closed");
146                }
147                _ => {
148                    mode.store(ConnectionMode::Reconnect.as_u8(), Ordering::SeqCst);
149                    while !ConnectionMode::from_atomic(&mode).is_active() {
150                        tokio::time::sleep(Duration::from_millis(10)).await;
151                    }
152                }
153            }
154
155            Ok(())
156        })
157    }
158
159    /// Close the client.
160    ///
161    /// The connection is not completely closed until all references
162    /// to the client are gone and the client is dropped.
163    ///
164    /// # Safety
165    ///
166    /// - The client should not be used after closing it
167    /// - Any auto-reconnect job should be aborted before closing the client
168    #[pyo3(name = "close")]
169    fn py_close<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
170        let mode = slf.connection_mode.clone();
171        let mode_str = ConnectionMode::from_atomic(&mode).to_string();
172        tracing::debug!("Close from mode {mode_str}");
173
174        pyo3_async_runtimes::tokio::future_into_py(py, async move {
175            match ConnectionMode::from_atomic(&mode) {
176                ConnectionMode::Closed => {
177                    tracing::warn!("Socket already closed");
178                }
179                ConnectionMode::Disconnect => {
180                    tracing::warn!("Socket already disconnecting");
181                }
182                _ => {
183                    mode.store(ConnectionMode::Disconnect.as_u8(), Ordering::SeqCst);
184                    while !ConnectionMode::from_atomic(&mode).is_closed() {
185                        tokio::time::sleep(Duration::from_millis(10)).await;
186                    }
187                }
188            }
189
190            Ok(())
191        })
192    }
193
194    /// Send bytes data to the connection.
195    ///
196    /// # Errors
197    ///
198    /// - Throws an Exception if it is not able to send data.
199    #[pyo3(name = "send")]
200    fn py_send<'py>(
201        slf: PyRef<'_, Self>,
202        data: Vec<u8>,
203        py: Python<'py>,
204    ) -> PyResult<Bound<'py, PyAny>> {
205        tracing::trace!("Sending {}", String::from_utf8_lossy(&data));
206
207        let mode = slf.connection_mode.clone();
208        let writer_tx = slf.writer_tx.clone();
209
210        pyo3_async_runtimes::tokio::future_into_py(py, async move {
211            if ConnectionMode::from_atomic(&mode).is_closed() {
212                let msg = format!(
213                    "Cannot send data ({}): socket closed",
214                    String::from_utf8_lossy(&data)
215                );
216                log::error!("{msg}");
217                return Ok(());
218            }
219
220            let timeout = Duration::from_secs(2);
221            let check_interval = Duration::from_millis(1);
222
223            if !ConnectionMode::from_atomic(&mode).is_active() {
224                tracing::debug!("Waiting for client to become ACTIVE before sending (2s)...");
225                match tokio::time::timeout(timeout, async {
226                    while !ConnectionMode::from_atomic(&mode).is_active() {
227                        if matches!(
228                            ConnectionMode::from_atomic(&mode),
229                            ConnectionMode::Disconnect | ConnectionMode::Closed
230                        ) {
231                            return Err("Client disconnected waiting to send");
232                        }
233
234                        tokio::time::sleep(check_interval).await;
235                    }
236
237                    Ok(())
238                })
239                .await
240                {
241                    Ok(Ok(())) => tracing::debug!("Client now active"),
242                    Ok(Err(e)) => {
243                        tracing::error!(
244                            "Failed sending data ({}): {e}",
245                            String::from_utf8_lossy(&data)
246                        );
247                        return Ok(());
248                    }
249                    Err(_) => {
250                        tracing::error!(
251                            "Failed sending data ({}): timeout waiting to become ACTIVE",
252                            String::from_utf8_lossy(&data)
253                        );
254                        return Ok(());
255                    }
256                }
257            }
258
259            let msg = WriterCommand::Send(data.into());
260            if let Err(e) = writer_tx.send(msg) {
261                tracing::error!("{e}");
262            }
263            Ok(())
264        })
265    }
266}