nautilus_kraken/python/
websocket_spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc`:
23//!
24//! - `ws_client: Option<Arc<WebSocketClient>>` - The WebSocket connection.
25//! - `subscriptions: Arc<DashMap<String, KrakenWsChannel>>` - Subscription tracking.
26//!
27//! Without shared state, clones would be independent, causing:
28//! - Lost WebSocket messages.
29//! - Missing subscription data.
30//! - Connection state desynchronization.
31//!
32//! ## Connection Flow
33//!
34//! 1. Clone the client for async operation.
35//! 2. Connect and populate shared state on the clone.
36//! 3. Spawn stream handler as background task.
37//! 4. Return immediately (non-blocking).
38//!
39//! ## Important Notes
40//!
41//! - Never use `block_on()` - it blocks the runtime.
42//! - Always clone before async blocks for lifetime requirements.
43
44use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{call_python, to_pyruntime_err};
47use nautilus_model::{
48    data::{BarType, Data, OrderBookDeltas_API},
49    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
50    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
51};
52use pyo3::{IntoPyObjectExt, prelude::*};
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56    common::{
57        enums::{KrakenEnvironment, KrakenProductType},
58        urls::get_kraken_ws_private_url,
59    },
60    config::KrakenDataClientConfig,
61    websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
62};
63
64#[pymethods]
65impl KrakenSpotWebSocketClient {
66    #[new]
67    #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
68    fn py_new(
69        environment: Option<KrakenEnvironment>,
70        private: bool,
71        base_url: Option<String>,
72        heartbeat_secs: Option<u64>,
73        api_key: Option<String>,
74        api_secret: Option<String>,
75    ) -> PyResult<Self> {
76        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
77
78        let (resolved_api_key, resolved_api_secret) =
79            crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
80                .map(|c| c.into_parts())
81                .map_or((None, None), |(k, s)| (Some(k), Some(s)));
82
83        let (ws_public_url, ws_private_url) = if private {
84            // Use provided URL or default to the private endpoint
85            let private_url = base_url.unwrap_or_else(|| {
86                get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
87            });
88            (None, Some(private_url))
89        } else {
90            (base_url, None)
91        };
92
93        let config = KrakenDataClientConfig {
94            environment: env,
95            ws_public_url,
96            ws_private_url,
97            heartbeat_interval_secs: heartbeat_secs,
98            api_key: resolved_api_key,
99            api_secret: resolved_api_secret,
100            ..Default::default()
101        };
102
103        let token = CancellationToken::new();
104
105        Ok(Self::new(config, token))
106    }
107
108    #[getter]
109    #[pyo3(name = "url")]
110    #[must_use]
111    pub fn py_url(&self) -> &str {
112        self.url()
113    }
114
115    #[pyo3(name = "is_connected")]
116    fn py_is_connected(&self) -> bool {
117        self.is_connected()
118    }
119
120    #[pyo3(name = "is_active")]
121    fn py_is_active(&self) -> bool {
122        self.is_active()
123    }
124
125    #[pyo3(name = "is_closed")]
126    fn py_is_closed(&self) -> bool {
127        self.is_closed()
128    }
129
130    #[pyo3(name = "get_subscriptions")]
131    fn py_get_subscriptions(&self) -> Vec<String> {
132        self.get_subscriptions()
133    }
134
135    #[pyo3(name = "cache_instrument")]
136    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
137        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
138        Ok(())
139    }
140
141    #[pyo3(name = "set_account_id")]
142    fn py_set_account_id(&self, account_id: AccountId) {
143        self.set_account_id(account_id);
144    }
145
146    #[pyo3(name = "cache_client_order")]
147    fn py_cache_client_order(
148        &self,
149        client_order_id: ClientOrderId,
150        _venue_order_id: Option<VenueOrderId>,
151        instrument_id: InstrumentId,
152        trader_id: TraderId,
153        strategy_id: StrategyId,
154    ) {
155        // Note: venue_order_id not used for spot yet, but kept for API consistency
156        self.cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
157    }
158
159    #[pyo3(name = "cancel_all_requests")]
160    fn py_cancel_all_requests(&self) {
161        self.cancel_all_requests();
162    }
163
164    #[pyo3(name = "connect")]
165    fn py_connect<'py>(
166        &mut self,
167        py: Python<'py>,
168        instruments: Vec<Py<PyAny>>,
169        callback: Py<PyAny>,
170    ) -> PyResult<Bound<'py, PyAny>> {
171        let mut instruments_any = Vec::new();
172        for inst in instruments {
173            let inst_any = pyobject_to_instrument_any(py, inst)?;
174            instruments_any.push(inst_any);
175        }
176
177        let mut client = self.clone();
178
179        pyo3_async_runtimes::tokio::future_into_py(py, async move {
180            client.connect().await.map_err(to_pyruntime_err)?;
181
182            // Cache instruments after connection is established
183            client.cache_instruments(instruments_any);
184
185            let stream = client.stream();
186
187            get_runtime().spawn(async move {
188                tokio::pin!(stream);
189
190                while let Some(msg) = stream.next().await {
191                    match msg {
192                        NautilusWsMessage::Data(data_vec) => {
193                            Python::attach(|py| {
194                                for data in data_vec {
195                                    let py_obj = data_to_pycapsule(py, data);
196                                    call_python(py, &callback, py_obj);
197                                }
198                            });
199                        }
200                        NautilusWsMessage::Deltas(deltas) => {
201                            Python::attach(|py| {
202                                let py_obj = data_to_pycapsule(
203                                    py,
204                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
205                                );
206                                call_python(py, &callback, py_obj);
207                            });
208                        }
209                        NautilusWsMessage::OrderRejected(event) => {
210                            Python::attach(|py| match event.into_py_any(py) {
211                                Ok(py_obj) => call_python(py, &callback, py_obj),
212                                Err(e) => {
213                                    log::error!("Failed to convert OrderRejected to Python: {e}");
214                                }
215                            });
216                        }
217                        NautilusWsMessage::OrderAccepted(event) => {
218                            Python::attach(|py| match event.into_py_any(py) {
219                                Ok(py_obj) => call_python(py, &callback, py_obj),
220                                Err(e) => {
221                                    log::error!("Failed to convert OrderAccepted to Python: {e}");
222                                }
223                            });
224                        }
225                        NautilusWsMessage::OrderCanceled(event) => {
226                            Python::attach(|py| match event.into_py_any(py) {
227                                Ok(py_obj) => call_python(py, &callback, py_obj),
228                                Err(e) => {
229                                    log::error!("Failed to convert OrderCanceled to Python: {e}");
230                                }
231                            });
232                        }
233                        NautilusWsMessage::OrderExpired(event) => {
234                            Python::attach(|py| match event.into_py_any(py) {
235                                Ok(py_obj) => call_python(py, &callback, py_obj),
236                                Err(e) => {
237                                    log::error!("Failed to convert OrderExpired to Python: {e}");
238                                }
239                            });
240                        }
241                        NautilusWsMessage::OrderUpdated(event) => {
242                            Python::attach(|py| match event.into_py_any(py) {
243                                Ok(py_obj) => call_python(py, &callback, py_obj),
244                                Err(e) => {
245                                    log::error!("Failed to convert OrderUpdated to Python: {e}");
246                                }
247                            });
248                        }
249                        NautilusWsMessage::OrderStatusReport(report) => {
250                            Python::attach(|py| match (*report).into_py_any(py) {
251                                Ok(py_obj) => call_python(py, &callback, py_obj),
252                                Err(e) => {
253                                    log::error!(
254                                        "Failed to convert OrderStatusReport to Python: {e}"
255                                    );
256                                }
257                            });
258                        }
259                        NautilusWsMessage::FillReport(report) => {
260                            Python::attach(|py| match (*report).into_py_any(py) {
261                                Ok(py_obj) => call_python(py, &callback, py_obj),
262                                Err(e) => {
263                                    log::error!("Failed to convert FillReport to Python: {e}");
264                                }
265                            });
266                        }
267                        NautilusWsMessage::Reconnected => {
268                            log::info!("WebSocket reconnected");
269                        }
270                    }
271                }
272            });
273
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "wait_until_active")]
279    fn py_wait_until_active<'py>(
280        &self,
281        py: Python<'py>,
282        timeout_secs: f64,
283    ) -> PyResult<Bound<'py, PyAny>> {
284        let client = self.clone();
285
286        pyo3_async_runtimes::tokio::future_into_py(py, async move {
287            client
288                .wait_until_active(timeout_secs)
289                .await
290                .map_err(to_pyruntime_err)?;
291            Ok(())
292        })
293    }
294
295    #[pyo3(name = "authenticate")]
296    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
297        let client = self.clone();
298
299        pyo3_async_runtimes::tokio::future_into_py(py, async move {
300            client.authenticate().await.map_err(to_pyruntime_err)?;
301            Ok(())
302        })
303    }
304
305    #[pyo3(name = "disconnect")]
306    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
307        let mut client = self.clone();
308
309        pyo3_async_runtimes::tokio::future_into_py(py, async move {
310            client.disconnect().await.map_err(to_pyruntime_err)?;
311            Ok(())
312        })
313    }
314
315    #[pyo3(name = "send_ping")]
316    fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
317        let client = self.clone();
318
319        pyo3_async_runtimes::tokio::future_into_py(py, async move {
320            client.send_ping().await.map_err(to_pyruntime_err)?;
321            Ok(())
322        })
323    }
324
325    #[pyo3(name = "close")]
326    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
327        let mut client = self.clone();
328
329        pyo3_async_runtimes::tokio::future_into_py(py, async move {
330            client.close().await.map_err(to_pyruntime_err)?;
331            Ok(())
332        })
333    }
334
335    #[pyo3(name = "subscribe_book")]
336    fn py_subscribe_book<'py>(
337        &self,
338        py: Python<'py>,
339        instrument_id: InstrumentId,
340        depth: Option<u32>,
341    ) -> PyResult<Bound<'py, PyAny>> {
342        let client = self.clone();
343
344        pyo3_async_runtimes::tokio::future_into_py(py, async move {
345            client
346                .subscribe_book(instrument_id, depth)
347                .await
348                .map_err(to_pyruntime_err)?;
349            Ok(())
350        })
351    }
352
353    #[pyo3(name = "subscribe_quotes")]
354    fn py_subscribe_quotes<'py>(
355        &self,
356        py: Python<'py>,
357        instrument_id: InstrumentId,
358    ) -> PyResult<Bound<'py, PyAny>> {
359        let client = self.clone();
360
361        pyo3_async_runtimes::tokio::future_into_py(py, async move {
362            client
363                .subscribe_quotes(instrument_id)
364                .await
365                .map_err(to_pyruntime_err)?;
366            Ok(())
367        })
368    }
369
370    #[pyo3(name = "subscribe_trades")]
371    fn py_subscribe_trades<'py>(
372        &self,
373        py: Python<'py>,
374        instrument_id: InstrumentId,
375    ) -> PyResult<Bound<'py, PyAny>> {
376        let client = self.clone();
377
378        pyo3_async_runtimes::tokio::future_into_py(py, async move {
379            client
380                .subscribe_trades(instrument_id)
381                .await
382                .map_err(to_pyruntime_err)?;
383            Ok(())
384        })
385    }
386
387    #[pyo3(name = "subscribe_bars")]
388    fn py_subscribe_bars<'py>(
389        &self,
390        py: Python<'py>,
391        bar_type: BarType,
392    ) -> PyResult<Bound<'py, PyAny>> {
393        let client = self.clone();
394
395        pyo3_async_runtimes::tokio::future_into_py(py, async move {
396            client
397                .subscribe_bars(bar_type)
398                .await
399                .map_err(to_pyruntime_err)?;
400            Ok(())
401        })
402    }
403
404    #[pyo3(name = "subscribe_executions")]
405    #[pyo3(signature = (snap_orders=true, snap_trades=true))]
406    fn py_subscribe_executions<'py>(
407        &self,
408        py: Python<'py>,
409        snap_orders: bool,
410        snap_trades: bool,
411    ) -> PyResult<Bound<'py, PyAny>> {
412        let client = self.clone();
413
414        pyo3_async_runtimes::tokio::future_into_py(py, async move {
415            client
416                .subscribe_executions(snap_orders, snap_trades)
417                .await
418                .map_err(to_pyruntime_err)?;
419            Ok(())
420        })
421    }
422
423    #[pyo3(name = "unsubscribe_book")]
424    fn py_unsubscribe_book<'py>(
425        &self,
426        py: Python<'py>,
427        instrument_id: InstrumentId,
428    ) -> PyResult<Bound<'py, PyAny>> {
429        let client = self.clone();
430
431        pyo3_async_runtimes::tokio::future_into_py(py, async move {
432            client
433                .unsubscribe_book(instrument_id)
434                .await
435                .map_err(to_pyruntime_err)?;
436            Ok(())
437        })
438    }
439
440    #[pyo3(name = "unsubscribe_quotes")]
441    fn py_unsubscribe_quotes<'py>(
442        &self,
443        py: Python<'py>,
444        instrument_id: InstrumentId,
445    ) -> PyResult<Bound<'py, PyAny>> {
446        let client = self.clone();
447
448        pyo3_async_runtimes::tokio::future_into_py(py, async move {
449            client
450                .unsubscribe_quotes(instrument_id)
451                .await
452                .map_err(to_pyruntime_err)?;
453            Ok(())
454        })
455    }
456
457    #[pyo3(name = "unsubscribe_trades")]
458    fn py_unsubscribe_trades<'py>(
459        &self,
460        py: Python<'py>,
461        instrument_id: InstrumentId,
462    ) -> PyResult<Bound<'py, PyAny>> {
463        let client = self.clone();
464
465        pyo3_async_runtimes::tokio::future_into_py(py, async move {
466            client
467                .unsubscribe_trades(instrument_id)
468                .await
469                .map_err(to_pyruntime_err)?;
470            Ok(())
471        })
472    }
473
474    #[pyo3(name = "unsubscribe_bars")]
475    fn py_unsubscribe_bars<'py>(
476        &self,
477        py: Python<'py>,
478        bar_type: BarType,
479    ) -> PyResult<Bound<'py, PyAny>> {
480        let client = self.clone();
481
482        pyo3_async_runtimes::tokio::future_into_py(py, async move {
483            client
484                .unsubscribe_bars(bar_type)
485                .await
486                .map_err(to_pyruntime_err)?;
487            Ok(())
488        })
489    }
490}