nautilus_network/python/
socket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{Arc, atomic::Ordering},
18    time::Duration,
19};
20
21use nautilus_core::python::to_pyruntime_err;
22use pyo3::prelude::*;
23use tokio::io::AsyncWriteExt;
24use tokio_tungstenite::tungstenite::stream::Mode;
25
26use crate::{
27    mode::ConnectionMode,
28    socket::{SocketClient, SocketConfig},
29};
30
31#[pymethods]
32impl SocketConfig {
33    #[new]
34    #[pyo3(signature = (url, ssl, suffix, handler, heartbeat=None, reconnect_timeout_ms=10_000, reconnect_delay_initial_ms=2_000, reconnect_delay_max_ms=30_000, reconnect_backoff_factor=1.5, reconnect_jitter_ms=100, certs_dir=None))]
35    #[allow(clippy::too_many_arguments)]
36    fn py_new(
37        url: String,
38        ssl: bool,
39        suffix: Vec<u8>,
40        handler: PyObject,
41        heartbeat: Option<(u64, Vec<u8>)>,
42        reconnect_timeout_ms: Option<u64>,
43        reconnect_delay_initial_ms: Option<u64>,
44        reconnect_delay_max_ms: Option<u64>,
45        reconnect_backoff_factor: Option<f64>,
46        reconnect_jitter_ms: Option<u64>,
47        certs_dir: Option<String>,
48    ) -> Self {
49        let mode = if ssl { Mode::Tls } else { Mode::Plain };
50        Self {
51            url,
52            mode,
53            suffix,
54            handler: Arc::new(handler),
55            heartbeat,
56            reconnect_timeout_ms,
57            reconnect_delay_initial_ms,
58            reconnect_delay_max_ms,
59            reconnect_backoff_factor,
60            reconnect_jitter_ms,
61            certs_dir,
62        }
63    }
64}
65
66#[pymethods]
67impl SocketClient {
68    /// Create a socket client.
69    ///
70    /// # Errors
71    ///
72    /// - Throws an Exception if it is unable to make socket connection.
73    #[staticmethod]
74    #[pyo3(name = "connect")]
75    #[pyo3(signature = (config, post_connection=None, post_reconnection=None, post_disconnection=None))]
76    fn py_connect(
77        config: SocketConfig,
78        post_connection: Option<PyObject>,
79        post_reconnection: Option<PyObject>,
80        post_disconnection: Option<PyObject>,
81        py: Python<'_>,
82    ) -> PyResult<Bound<PyAny>> {
83        pyo3_async_runtimes::tokio::future_into_py(py, async move {
84            Self::connect(
85                config,
86                post_connection,
87                post_reconnection,
88                post_disconnection,
89            )
90            .await
91            .map_err(to_pyruntime_err)
92        })
93    }
94
95    /// Check if the client is still alive.
96    ///
97    /// Even if the connection is disconnected the client will still be alive
98    /// and trying to reconnect.
99    ///
100    /// This is particularly useful for check why a `send` failed. It could
101    /// be because the connection disconnected and the client is still alive
102    /// and reconnecting. In such cases the send can be retried after some
103    /// delay
104    #[pyo3(name = "is_active")]
105    fn py_is_active(slf: PyRef<'_, Self>) -> bool {
106        slf.is_active()
107    }
108
109    #[pyo3(name = "is_reconnecting")]
110    fn py_is_reconnecting(slf: PyRef<'_, Self>) -> bool {
111        slf.is_reconnecting()
112    }
113
114    #[pyo3(name = "is_disconnecting")]
115    fn py_is_disconnecting(slf: PyRef<'_, Self>) -> bool {
116        slf.is_disconnecting()
117    }
118
119    #[pyo3(name = "is_closed")]
120    fn py_is_closed(slf: PyRef<'_, Self>) -> bool {
121        slf.is_closed()
122    }
123
124    #[pyo3(name = "mode")]
125    fn py_mode(slf: PyRef<'_, Self>) -> String {
126        slf.connection_mode().to_string()
127    }
128
129    /// Reconnect the client.
130    #[pyo3(name = "reconnect")]
131    fn py_reconnect<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
132        let mode = slf.connection_mode.clone();
133        let mode_str = ConnectionMode::from_atomic(&mode).to_string();
134        tracing::debug!("Reconnect from mode {mode_str}");
135
136        pyo3_async_runtimes::tokio::future_into_py(py, async move {
137            match ConnectionMode::from_atomic(&mode) {
138                ConnectionMode::Reconnect => {
139                    tracing::warn!("Cannot reconnect: socket already reconnecting");
140                }
141                ConnectionMode::Disconnect => {
142                    tracing::warn!("Cannot reconnect: socket disconnecting");
143                }
144                ConnectionMode::Closed => {
145                    tracing::warn!("Cannot reconnect: socket closed");
146                }
147                _ => {
148                    mode.store(ConnectionMode::Reconnect.as_u8(), Ordering::SeqCst);
149                    while !ConnectionMode::from_atomic(&mode).is_active() {
150                        tokio::time::sleep(Duration::from_millis(10)).await;
151                    }
152                }
153            }
154
155            Ok(())
156        })
157    }
158
159    /// Close the client.
160    ///
161    /// The connection is not completely closed until all references
162    /// to the client are gone and the client is dropped.
163    ///
164    /// # Safety
165    ///
166    /// - The client should not be used after closing it
167    /// - Any auto-reconnect job should be aborted before closing the client
168    #[pyo3(name = "close")]
169    fn py_close<'py>(slf: PyRef<'_, Self>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
170        let mode = slf.connection_mode.clone();
171        let mode_str = ConnectionMode::from_atomic(&mode).to_string();
172        tracing::debug!("Close from mode {mode_str}");
173
174        pyo3_async_runtimes::tokio::future_into_py(py, async move {
175            match ConnectionMode::from_atomic(&mode) {
176                ConnectionMode::Closed => {
177                    tracing::warn!("Socket already closed");
178                }
179                ConnectionMode::Disconnect => {
180                    tracing::warn!("Socket already disconnecting");
181                }
182                _ => {
183                    mode.store(ConnectionMode::Disconnect.as_u8(), Ordering::SeqCst);
184                    while !ConnectionMode::from_atomic(&mode).is_closed() {
185                        tokio::time::sleep(Duration::from_millis(10)).await;
186                    }
187                }
188            }
189
190            Ok(())
191        })
192    }
193
194    /// Send bytes data to the connection.
195    ///
196    /// # Errors
197    ///
198    /// - Throws an Exception if it is not able to send data.
199    #[pyo3(name = "send")]
200    fn py_send<'py>(
201        slf: PyRef<'_, Self>,
202        mut data: Vec<u8>,
203        py: Python<'py>,
204    ) -> PyResult<Bound<'py, PyAny>> {
205        data.extend(&slf.suffix);
206        tracing::trace!("Sending {}", String::from_utf8_lossy(&data));
207
208        let writer = slf.writer.clone();
209        let mode = slf.connection_mode.clone();
210
211        pyo3_async_runtimes::tokio::future_into_py(py, async move {
212            if ConnectionMode::from_atomic(&mode).is_closed() {
213                let msg = format!(
214                    "Cannot send data ({}): socket closed",
215                    String::from_utf8_lossy(&data)
216                );
217                log::error!("{}", msg);
218                return Ok(());
219            }
220
221            let timeout = Duration::from_secs(2);
222            let check_interval = Duration::from_millis(1);
223
224            if !ConnectionMode::from_atomic(&mode).is_active() {
225                tracing::debug!("Waiting for client to become ACTIVE before sending (2s)...");
226                match tokio::time::timeout(timeout, async {
227                    while !ConnectionMode::from_atomic(&mode).is_active() {
228                        if matches!(
229                            ConnectionMode::from_atomic(&mode),
230                            ConnectionMode::Disconnect | ConnectionMode::Closed
231                        ) {
232                            return Err("Client disconnected waiting to send");
233                        }
234
235                        tokio::time::sleep(check_interval).await;
236                    }
237
238                    Ok(())
239                })
240                .await
241                {
242                    Ok(Ok(())) => tracing::debug!("Client now active"),
243                    Ok(Err(e)) => {
244                        tracing::error!(
245                            "Cannot send data ({}): {e}",
246                            String::from_utf8_lossy(&data)
247                        );
248                        return Ok(());
249                    }
250                    Err(_) => {
251                        tracing::error!(
252                            "Cannot send data ({}): timeout waiting to become ACTIVE",
253                            String::from_utf8_lossy(&data)
254                        );
255                        return Ok(());
256                    }
257                }
258            }
259
260            let mut writer = writer.lock().await;
261            writer.write_all(&data).await?;
262            Ok(())
263        })
264    }
265}