nautilus_kraken/python/
websocket_spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc`:
23//!
24//! - `ws_client: Option<Arc<WebSocketClient>>` - The WebSocket connection.
25//! - `subscriptions: Arc<DashMap<String, KrakenWsChannel>>` - Subscription tracking.
26//!
27//! Without shared state, clones would be independent, causing:
28//! - Lost WebSocket messages.
29//! - Missing subscription data.
30//! - Connection state desynchronization.
31//!
32//! ## Connection Flow
33//!
34//! 1. Clone the client for async operation.
35//! 2. Connect and populate shared state on the clone.
36//! 3. Spawn stream handler as background task.
37//! 4. Return immediately (non-blocking).
38//!
39//! ## Important Notes
40//!
41//! - Never use `block_on()` - it blocks the runtime.
42//! - Always clone before async blocks for lifetime requirements.
43
44use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::to_pyruntime_err;
47use nautilus_model::{
48    data::{BarType, Data, OrderBookDeltas_API},
49    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
50    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
51};
52use pyo3::{IntoPyObjectExt, prelude::*};
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56    common::{
57        enums::{KrakenEnvironment, KrakenProductType},
58        urls::get_kraken_ws_private_url,
59    },
60    config::KrakenDataClientConfig,
61    websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
62};
63
64#[pymethods]
65impl KrakenSpotWebSocketClient {
66    #[new]
67    #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
68    fn py_new(
69        environment: Option<KrakenEnvironment>,
70        private: bool,
71        base_url: Option<String>,
72        heartbeat_secs: Option<u64>,
73        api_key: Option<String>,
74        api_secret: Option<String>,
75    ) -> PyResult<Self> {
76        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
77
78        let (resolved_api_key, resolved_api_secret) =
79            crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
80                .map(|c| c.into_parts())
81                .map(|(k, s)| (Some(k), Some(s)))
82                .unwrap_or((None, None));
83
84        let (ws_public_url, ws_private_url) = if private {
85            // Use provided URL or default to the private endpoint
86            let private_url = base_url.unwrap_or_else(|| {
87                get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
88            });
89            (None, Some(private_url))
90        } else {
91            (base_url, None)
92        };
93
94        let config = KrakenDataClientConfig {
95            environment: env,
96            ws_public_url,
97            ws_private_url,
98            heartbeat_interval_secs: heartbeat_secs,
99            api_key: resolved_api_key,
100            api_secret: resolved_api_secret,
101            ..Default::default()
102        };
103
104        let token = CancellationToken::new();
105
106        Ok(KrakenSpotWebSocketClient::new(config, token))
107    }
108
109    #[getter]
110    #[pyo3(name = "url")]
111    #[must_use]
112    pub fn py_url(&self) -> &str {
113        self.url()
114    }
115
116    #[pyo3(name = "is_connected")]
117    fn py_is_connected(&self) -> bool {
118        self.is_connected()
119    }
120
121    #[pyo3(name = "is_active")]
122    fn py_is_active(&self) -> bool {
123        self.is_active()
124    }
125
126    #[pyo3(name = "is_closed")]
127    fn py_is_closed(&self) -> bool {
128        self.is_closed()
129    }
130
131    #[pyo3(name = "get_subscriptions")]
132    fn py_get_subscriptions(&self) -> Vec<String> {
133        self.get_subscriptions()
134    }
135
136    #[pyo3(name = "cache_instrument")]
137    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
138        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
139        Ok(())
140    }
141
142    #[pyo3(name = "set_account_id")]
143    fn py_set_account_id(&self, account_id: AccountId) {
144        self.set_account_id(account_id);
145    }
146
147    #[pyo3(name = "cache_client_order")]
148    fn py_cache_client_order(
149        &self,
150        client_order_id: ClientOrderId,
151        _venue_order_id: Option<VenueOrderId>,
152        instrument_id: InstrumentId,
153        trader_id: TraderId,
154        strategy_id: StrategyId,
155    ) {
156        // Note: venue_order_id not used for spot yet, but kept for API consistency
157        self.cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
158    }
159
160    #[pyo3(name = "cancel_all_requests")]
161    fn py_cancel_all_requests(&self) {
162        self.cancel_all_requests();
163    }
164
165    #[pyo3(name = "connect")]
166    fn py_connect<'py>(
167        &mut self,
168        py: Python<'py>,
169        instruments: Vec<Py<PyAny>>,
170        callback: Py<PyAny>,
171    ) -> PyResult<Bound<'py, PyAny>> {
172        let mut instruments_any = Vec::new();
173        for inst in instruments {
174            let inst_any = pyobject_to_instrument_any(py, inst)?;
175            instruments_any.push(inst_any);
176        }
177
178        let mut client = self.clone();
179
180        pyo3_async_runtimes::tokio::future_into_py(py, async move {
181            client.connect().await.map_err(to_pyruntime_err)?;
182
183            // Cache instruments after connection is established
184            client.cache_instruments(instruments_any);
185
186            let stream = client.stream();
187
188            get_runtime().spawn(async move {
189                tokio::pin!(stream);
190
191                while let Some(msg) = stream.next().await {
192                    match msg {
193                        NautilusWsMessage::Data(data_vec) => {
194                            Python::attach(|py| {
195                                for data in data_vec {
196                                    let py_obj = data_to_pycapsule(py, data);
197                                    call_python(py, &callback, py_obj);
198                                }
199                            });
200                        }
201                        NautilusWsMessage::Deltas(deltas) => {
202                            Python::attach(|py| {
203                                let py_obj = data_to_pycapsule(
204                                    py,
205                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
206                                );
207                                call_python(py, &callback, py_obj);
208                            });
209                        }
210                        NautilusWsMessage::OrderRejected(event) => {
211                            Python::attach(|py| match event.into_py_any(py) {
212                                Ok(py_obj) => call_python(py, &callback, py_obj),
213                                Err(e) => {
214                                    tracing::error!(
215                                        "Failed to convert OrderRejected to Python: {e}"
216                                    );
217                                }
218                            });
219                        }
220                        NautilusWsMessage::OrderAccepted(event) => {
221                            Python::attach(|py| match event.into_py_any(py) {
222                                Ok(py_obj) => call_python(py, &callback, py_obj),
223                                Err(e) => {
224                                    tracing::error!(
225                                        "Failed to convert OrderAccepted to Python: {e}"
226                                    );
227                                }
228                            });
229                        }
230                        NautilusWsMessage::OrderCanceled(event) => {
231                            Python::attach(|py| match event.into_py_any(py) {
232                                Ok(py_obj) => call_python(py, &callback, py_obj),
233                                Err(e) => {
234                                    tracing::error!(
235                                        "Failed to convert OrderCanceled to Python: {e}"
236                                    );
237                                }
238                            });
239                        }
240                        NautilusWsMessage::OrderExpired(event) => {
241                            Python::attach(|py| match event.into_py_any(py) {
242                                Ok(py_obj) => call_python(py, &callback, py_obj),
243                                Err(e) => {
244                                    tracing::error!(
245                                        "Failed to convert OrderExpired to Python: {e}"
246                                    );
247                                }
248                            });
249                        }
250                        NautilusWsMessage::OrderUpdated(event) => {
251                            Python::attach(|py| match event.into_py_any(py) {
252                                Ok(py_obj) => call_python(py, &callback, py_obj),
253                                Err(e) => {
254                                    tracing::error!(
255                                        "Failed to convert OrderUpdated to Python: {e}"
256                                    );
257                                }
258                            });
259                        }
260                        NautilusWsMessage::OrderStatusReport(report) => {
261                            Python::attach(|py| match (*report).into_py_any(py) {
262                                Ok(py_obj) => call_python(py, &callback, py_obj),
263                                Err(e) => {
264                                    tracing::error!(
265                                        "Failed to convert OrderStatusReport to Python: {e}"
266                                    );
267                                }
268                            });
269                        }
270                        NautilusWsMessage::FillReport(report) => {
271                            Python::attach(|py| match (*report).into_py_any(py) {
272                                Ok(py_obj) => call_python(py, &callback, py_obj),
273                                Err(e) => {
274                                    tracing::error!("Failed to convert FillReport to Python: {e}");
275                                }
276                            });
277                        }
278                        NautilusWsMessage::Reconnected => {
279                            tracing::info!("WebSocket reconnected");
280                        }
281                    }
282                }
283            });
284
285            Ok(())
286        })
287    }
288
289    #[pyo3(name = "wait_until_active")]
290    fn py_wait_until_active<'py>(
291        &self,
292        py: Python<'py>,
293        timeout_secs: f64,
294    ) -> PyResult<Bound<'py, PyAny>> {
295        let client = self.clone();
296
297        pyo3_async_runtimes::tokio::future_into_py(py, async move {
298            client
299                .wait_until_active(timeout_secs)
300                .await
301                .map_err(to_pyruntime_err)?;
302            Ok(())
303        })
304    }
305
306    #[pyo3(name = "authenticate")]
307    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
308        let client = self.clone();
309
310        pyo3_async_runtimes::tokio::future_into_py(py, async move {
311            client.authenticate().await.map_err(to_pyruntime_err)?;
312            Ok(())
313        })
314    }
315
316    #[pyo3(name = "disconnect")]
317    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
318        let mut client = self.clone();
319
320        pyo3_async_runtimes::tokio::future_into_py(py, async move {
321            client.disconnect().await.map_err(to_pyruntime_err)?;
322            Ok(())
323        })
324    }
325
326    #[pyo3(name = "send_ping")]
327    fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
328        let client = self.clone();
329
330        pyo3_async_runtimes::tokio::future_into_py(py, async move {
331            client.send_ping().await.map_err(to_pyruntime_err)?;
332            Ok(())
333        })
334    }
335
336    #[pyo3(name = "close")]
337    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
338        let mut client = self.clone();
339
340        pyo3_async_runtimes::tokio::future_into_py(py, async move {
341            client.close().await.map_err(to_pyruntime_err)?;
342            Ok(())
343        })
344    }
345
346    #[pyo3(name = "subscribe_book")]
347    fn py_subscribe_book<'py>(
348        &self,
349        py: Python<'py>,
350        instrument_id: InstrumentId,
351        depth: Option<u32>,
352    ) -> PyResult<Bound<'py, PyAny>> {
353        let client = self.clone();
354
355        pyo3_async_runtimes::tokio::future_into_py(py, async move {
356            client
357                .subscribe_book(instrument_id, depth)
358                .await
359                .map_err(to_pyruntime_err)?;
360            Ok(())
361        })
362    }
363
364    #[pyo3(name = "subscribe_quotes")]
365    fn py_subscribe_quotes<'py>(
366        &self,
367        py: Python<'py>,
368        instrument_id: InstrumentId,
369    ) -> PyResult<Bound<'py, PyAny>> {
370        let client = self.clone();
371
372        pyo3_async_runtimes::tokio::future_into_py(py, async move {
373            client
374                .subscribe_quotes(instrument_id)
375                .await
376                .map_err(to_pyruntime_err)?;
377            Ok(())
378        })
379    }
380
381    #[pyo3(name = "subscribe_trades")]
382    fn py_subscribe_trades<'py>(
383        &self,
384        py: Python<'py>,
385        instrument_id: InstrumentId,
386    ) -> PyResult<Bound<'py, PyAny>> {
387        let client = self.clone();
388
389        pyo3_async_runtimes::tokio::future_into_py(py, async move {
390            client
391                .subscribe_trades(instrument_id)
392                .await
393                .map_err(to_pyruntime_err)?;
394            Ok(())
395        })
396    }
397
398    #[pyo3(name = "subscribe_bars")]
399    fn py_subscribe_bars<'py>(
400        &self,
401        py: Python<'py>,
402        bar_type: BarType,
403    ) -> PyResult<Bound<'py, PyAny>> {
404        let client = self.clone();
405
406        pyo3_async_runtimes::tokio::future_into_py(py, async move {
407            client
408                .subscribe_bars(bar_type)
409                .await
410                .map_err(to_pyruntime_err)?;
411            Ok(())
412        })
413    }
414
415    #[pyo3(name = "subscribe_executions")]
416    #[pyo3(signature = (snap_orders=true, snap_trades=true))]
417    fn py_subscribe_executions<'py>(
418        &self,
419        py: Python<'py>,
420        snap_orders: bool,
421        snap_trades: bool,
422    ) -> PyResult<Bound<'py, PyAny>> {
423        let client = self.clone();
424
425        pyo3_async_runtimes::tokio::future_into_py(py, async move {
426            client
427                .subscribe_executions(snap_orders, snap_trades)
428                .await
429                .map_err(to_pyruntime_err)?;
430            Ok(())
431        })
432    }
433
434    #[pyo3(name = "unsubscribe_book")]
435    fn py_unsubscribe_book<'py>(
436        &self,
437        py: Python<'py>,
438        instrument_id: InstrumentId,
439    ) -> PyResult<Bound<'py, PyAny>> {
440        let client = self.clone();
441
442        pyo3_async_runtimes::tokio::future_into_py(py, async move {
443            client
444                .unsubscribe_book(instrument_id)
445                .await
446                .map_err(to_pyruntime_err)?;
447            Ok(())
448        })
449    }
450
451    #[pyo3(name = "unsubscribe_quotes")]
452    fn py_unsubscribe_quotes<'py>(
453        &self,
454        py: Python<'py>,
455        instrument_id: InstrumentId,
456    ) -> PyResult<Bound<'py, PyAny>> {
457        let client = self.clone();
458
459        pyo3_async_runtimes::tokio::future_into_py(py, async move {
460            client
461                .unsubscribe_quotes(instrument_id)
462                .await
463                .map_err(to_pyruntime_err)?;
464            Ok(())
465        })
466    }
467
468    #[pyo3(name = "unsubscribe_trades")]
469    fn py_unsubscribe_trades<'py>(
470        &self,
471        py: Python<'py>,
472        instrument_id: InstrumentId,
473    ) -> PyResult<Bound<'py, PyAny>> {
474        let client = self.clone();
475
476        pyo3_async_runtimes::tokio::future_into_py(py, async move {
477            client
478                .unsubscribe_trades(instrument_id)
479                .await
480                .map_err(to_pyruntime_err)?;
481            Ok(())
482        })
483    }
484
485    #[pyo3(name = "unsubscribe_bars")]
486    fn py_unsubscribe_bars<'py>(
487        &self,
488        py: Python<'py>,
489        bar_type: BarType,
490    ) -> PyResult<Bound<'py, PyAny>> {
491        let client = self.clone();
492
493        pyo3_async_runtimes::tokio::future_into_py(py, async move {
494            client
495                .unsubscribe_bars(bar_type)
496                .await
497                .map_err(to_pyruntime_err)?;
498            Ok(())
499        })
500    }
501}
502
503pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
504    if let Err(e) = callback.call1(py, (py_obj,)) {
505        tracing::error!("Error calling Python: {e}");
506    }
507}