nautilus_kraken/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc`:
23//!
24//! - `ws_client: Option<Arc<WebSocketClient>>` - The WebSocket connection.
25//! - `subscriptions: Arc<DashMap<String, KrakenWsChannel>>` - Subscription tracking.
26//!
27//! Without shared state, clones would be independent, causing:
28//! - Lost WebSocket messages.
29//! - Missing subscription data.
30//! - Connection state desynchronization.
31//!
32//! ## Connection Flow
33//!
34//! 1. Clone the client for async operation.
35//! 2. Connect and populate shared state on the clone.
36//! 3. Spawn stream handler as background task.
37//! 4. Return immediately (non-blocking).
38//!
39//! ## Important Notes
40//!
41//! - Never use `block_on()` - it blocks the runtime.
42//! - Always clone before async blocks for lifetime requirements.
43
44use futures_util::StreamExt;
45use nautilus_core::python::to_pyruntime_err;
46use nautilus_model::{
47    data::{BarType, Data, OrderBookDeltas_API},
48    identifiers::InstrumentId,
49    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::prelude::*;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55    config::KrakenDataClientConfig,
56    websocket::{client::KrakenWebSocketClient, messages::NautilusWsMessage},
57};
58
59#[pymethods]
60impl KrakenWebSocketClient {
61    #[new]
62    fn py_new(url: String) -> PyResult<Self> {
63        let config = KrakenDataClientConfig {
64            ws_public_url: Some(url),
65            ..Default::default()
66        };
67
68        let token = CancellationToken::new();
69
70        Ok(KrakenWebSocketClient::new(config, token))
71    }
72
73    #[getter]
74    #[pyo3(name = "url")]
75    #[must_use]
76    pub fn py_url(&self) -> &str {
77        self.url()
78    }
79
80    #[pyo3(name = "is_connected")]
81    fn py_is_connected(&self) -> bool {
82        self.is_connected()
83    }
84
85    #[pyo3(name = "is_active")]
86    fn py_is_active(&self) -> bool {
87        self.is_active()
88    }
89
90    #[pyo3(name = "is_closed")]
91    fn py_is_closed(&self) -> bool {
92        self.is_closed()
93    }
94
95    #[pyo3(name = "get_subscriptions")]
96    fn py_get_subscriptions(&self) -> Vec<String> {
97        self.get_subscriptions()
98    }
99
100    #[pyo3(name = "cache_instrument")]
101    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
102        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
103        Ok(())
104    }
105
106    #[pyo3(name = "cancel_all_requests")]
107    fn py_cancel_all_requests(&self) {
108        self.cancel_all_requests();
109    }
110
111    #[pyo3(name = "connect")]
112    fn py_connect<'py>(
113        &mut self,
114        py: Python<'py>,
115        instruments: Vec<Py<PyAny>>,
116        callback: Py<PyAny>,
117    ) -> PyResult<Bound<'py, PyAny>> {
118        let mut instruments_any = Vec::new();
119        for inst in instruments {
120            let inst_any = pyobject_to_instrument_any(py, inst)?;
121            instruments_any.push(inst_any);
122        }
123
124        let mut client = self.clone();
125
126        pyo3_async_runtimes::tokio::future_into_py(py, async move {
127            client.connect().await.map_err(to_pyruntime_err)?;
128
129            // Cache instruments after connection is established
130            client.cache_instruments(instruments_any);
131
132            let stream = client.stream();
133
134            tokio::spawn(async move {
135                tokio::pin!(stream);
136
137                while let Some(msg) = stream.next().await {
138                    match msg {
139                        NautilusWsMessage::Data(data_vec) => {
140                            Python::attach(|py| {
141                                for data in data_vec {
142                                    let py_obj = data_to_pycapsule(py, data);
143                                    call_python(py, &callback, py_obj);
144                                }
145                            });
146                        }
147                        NautilusWsMessage::Deltas(deltas) => {
148                            Python::attach(|py| {
149                                let py_obj = data_to_pycapsule(
150                                    py,
151                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
152                                );
153                                call_python(py, &callback, py_obj);
154                            });
155                        }
156                    }
157                }
158            });
159
160            Ok(())
161        })
162    }
163
164    #[pyo3(name = "wait_until_active")]
165    fn py_wait_until_active<'py>(
166        &self,
167        py: Python<'py>,
168        timeout_secs: f64,
169    ) -> PyResult<Bound<'py, PyAny>> {
170        let client = self.clone();
171
172        pyo3_async_runtimes::tokio::future_into_py(py, async move {
173            client
174                .wait_until_active(timeout_secs)
175                .await
176                .map_err(to_pyruntime_err)?;
177            Ok(())
178        })
179    }
180
181    #[pyo3(name = "authenticate")]
182    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
183        let client = self.clone();
184
185        pyo3_async_runtimes::tokio::future_into_py(py, async move {
186            client.authenticate().await.map_err(to_pyruntime_err)?;
187            Ok(())
188        })
189    }
190
191    #[pyo3(name = "disconnect")]
192    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
193        let mut client = self.clone();
194
195        pyo3_async_runtimes::tokio::future_into_py(py, async move {
196            client.disconnect().await.map_err(to_pyruntime_err)?;
197            Ok(())
198        })
199    }
200
201    #[pyo3(name = "close")]
202    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
203        let mut client = self.clone();
204
205        pyo3_async_runtimes::tokio::future_into_py(py, async move {
206            client.close().await.map_err(to_pyruntime_err)?;
207            Ok(())
208        })
209    }
210
211    #[pyo3(name = "subscribe_book")]
212    fn py_subscribe_book<'py>(
213        &self,
214        py: Python<'py>,
215        instrument_id: InstrumentId,
216        depth: Option<u32>,
217    ) -> PyResult<Bound<'py, PyAny>> {
218        let client = self.clone();
219
220        pyo3_async_runtimes::tokio::future_into_py(py, async move {
221            client
222                .subscribe_book(instrument_id, depth)
223                .await
224                .map_err(to_pyruntime_err)?;
225            Ok(())
226        })
227    }
228
229    #[pyo3(name = "subscribe_quotes")]
230    fn py_subscribe_quotes<'py>(
231        &self,
232        py: Python<'py>,
233        instrument_id: InstrumentId,
234    ) -> PyResult<Bound<'py, PyAny>> {
235        let client = self.clone();
236
237        pyo3_async_runtimes::tokio::future_into_py(py, async move {
238            client
239                .subscribe_quotes(instrument_id)
240                .await
241                .map_err(to_pyruntime_err)?;
242            Ok(())
243        })
244    }
245
246    #[pyo3(name = "subscribe_trades")]
247    fn py_subscribe_trades<'py>(
248        &self,
249        py: Python<'py>,
250        instrument_id: InstrumentId,
251    ) -> PyResult<Bound<'py, PyAny>> {
252        let client = self.clone();
253
254        pyo3_async_runtimes::tokio::future_into_py(py, async move {
255            client
256                .subscribe_trades(instrument_id)
257                .await
258                .map_err(to_pyruntime_err)?;
259            Ok(())
260        })
261    }
262
263    #[pyo3(name = "subscribe_bars")]
264    fn py_subscribe_bars<'py>(
265        &self,
266        py: Python<'py>,
267        bar_type: BarType,
268    ) -> PyResult<Bound<'py, PyAny>> {
269        let client = self.clone();
270
271        pyo3_async_runtimes::tokio::future_into_py(py, async move {
272            client
273                .subscribe_bars(bar_type)
274                .await
275                .map_err(to_pyruntime_err)?;
276            Ok(())
277        })
278    }
279
280    #[pyo3(name = "unsubscribe_book")]
281    fn py_unsubscribe_book<'py>(
282        &self,
283        py: Python<'py>,
284        instrument_id: InstrumentId,
285    ) -> PyResult<Bound<'py, PyAny>> {
286        let client = self.clone();
287
288        pyo3_async_runtimes::tokio::future_into_py(py, async move {
289            client
290                .unsubscribe_book(instrument_id)
291                .await
292                .map_err(to_pyruntime_err)?;
293            Ok(())
294        })
295    }
296
297    #[pyo3(name = "unsubscribe_quotes")]
298    fn py_unsubscribe_quotes<'py>(
299        &self,
300        py: Python<'py>,
301        instrument_id: InstrumentId,
302    ) -> PyResult<Bound<'py, PyAny>> {
303        let client = self.clone();
304
305        pyo3_async_runtimes::tokio::future_into_py(py, async move {
306            client
307                .unsubscribe_quotes(instrument_id)
308                .await
309                .map_err(to_pyruntime_err)?;
310            Ok(())
311        })
312    }
313
314    #[pyo3(name = "unsubscribe_trades")]
315    fn py_unsubscribe_trades<'py>(
316        &self,
317        py: Python<'py>,
318        instrument_id: InstrumentId,
319    ) -> PyResult<Bound<'py, PyAny>> {
320        let client = self.clone();
321
322        pyo3_async_runtimes::tokio::future_into_py(py, async move {
323            client
324                .unsubscribe_trades(instrument_id)
325                .await
326                .map_err(to_pyruntime_err)?;
327            Ok(())
328        })
329    }
330
331    #[pyo3(name = "unsubscribe_bars")]
332    fn py_unsubscribe_bars<'py>(
333        &self,
334        py: Python<'py>,
335        bar_type: BarType,
336    ) -> PyResult<Bound<'py, PyAny>> {
337        let client = self.clone();
338
339        pyo3_async_runtimes::tokio::future_into_py(py, async move {
340            client
341                .unsubscribe_bars(bar_type)
342                .await
343                .map_err(to_pyruntime_err)?;
344            Ok(())
345        })
346    }
347
348    #[pyo3(name = "send_ping")]
349    fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
350        let client = self.clone();
351
352        pyo3_async_runtimes::tokio::future_into_py(py, async move {
353            client.send_ping().await.map_err(to_pyruntime_err)?;
354            Ok(())
355        })
356    }
357}
358
359pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
360    if let Err(e) = callback.call1(py, (py_obj,)) {
361        tracing::error!("Error calling Python: {e}");
362    }
363}