nautilus_kraken/python/
websocket_spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc`:
23//!
24//! - `ws_client: Option<Arc<WebSocketClient>>` - The WebSocket connection.
25//! - `subscriptions: Arc<DashMap<String, KrakenWsChannel>>` - Subscription tracking.
26//!
27//! Without shared state, clones would be independent, causing:
28//! - Lost WebSocket messages.
29//! - Missing subscription data.
30//! - Connection state desynchronization.
31//!
32//! ## Connection Flow
33//!
34//! 1. Clone the client for async operation.
35//! 2. Connect and populate shared state on the clone.
36//! 3. Spawn stream handler as background task.
37//! 4. Return immediately (non-blocking).
38//!
39//! ## Important Notes
40//!
41//! - Never use `block_on()` - it blocks the runtime.
42//! - Always clone before async blocks for lifetime requirements.
43
44use futures_util::StreamExt;
45use nautilus_core::python::to_pyruntime_err;
46use nautilus_model::{
47    data::{BarType, Data, OrderBookDeltas_API},
48    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
49    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::{IntoPyObjectExt, prelude::*};
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55    common::{
56        enums::{KrakenEnvironment, KrakenProductType},
57        urls::get_kraken_ws_private_url,
58    },
59    config::KrakenDataClientConfig,
60    websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
61};
62
63#[pymethods]
64impl KrakenSpotWebSocketClient {
65    #[new]
66    #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
67    fn py_new(
68        environment: Option<KrakenEnvironment>,
69        private: bool,
70        base_url: Option<String>,
71        heartbeat_secs: Option<u64>,
72        api_key: Option<String>,
73        api_secret: Option<String>,
74    ) -> PyResult<Self> {
75        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
76
77        let (resolved_api_key, resolved_api_secret) =
78            crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
79                .map(|c| c.into_parts())
80                .map(|(k, s)| (Some(k), Some(s)))
81                .unwrap_or((None, None));
82
83        let (ws_public_url, ws_private_url) = if private {
84            // Use provided URL or default to the private endpoint
85            let private_url = base_url.unwrap_or_else(|| {
86                get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
87            });
88            (None, Some(private_url))
89        } else {
90            (base_url, None)
91        };
92
93        let config = KrakenDataClientConfig {
94            environment: env,
95            ws_public_url,
96            ws_private_url,
97            heartbeat_interval_secs: heartbeat_secs,
98            api_key: resolved_api_key,
99            api_secret: resolved_api_secret,
100            ..Default::default()
101        };
102
103        let token = CancellationToken::new();
104
105        Ok(KrakenSpotWebSocketClient::new(config, token))
106    }
107
108    #[getter]
109    #[pyo3(name = "url")]
110    #[must_use]
111    pub fn py_url(&self) -> &str {
112        self.url()
113    }
114
115    #[pyo3(name = "is_connected")]
116    fn py_is_connected(&self) -> bool {
117        self.is_connected()
118    }
119
120    #[pyo3(name = "is_active")]
121    fn py_is_active(&self) -> bool {
122        self.is_active()
123    }
124
125    #[pyo3(name = "is_closed")]
126    fn py_is_closed(&self) -> bool {
127        self.is_closed()
128    }
129
130    #[pyo3(name = "get_subscriptions")]
131    fn py_get_subscriptions(&self) -> Vec<String> {
132        self.get_subscriptions()
133    }
134
135    #[pyo3(name = "cache_instrument")]
136    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
137        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
138        Ok(())
139    }
140
141    #[pyo3(name = "set_account_id")]
142    fn py_set_account_id(&self, account_id: AccountId) {
143        self.set_account_id(account_id);
144    }
145
146    #[pyo3(name = "cache_client_order")]
147    fn py_cache_client_order(
148        &self,
149        client_order_id: ClientOrderId,
150        _venue_order_id: Option<VenueOrderId>,
151        instrument_id: InstrumentId,
152        trader_id: TraderId,
153        strategy_id: StrategyId,
154    ) {
155        // Note: venue_order_id not used for spot yet, but kept for API consistency
156        self.cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
157    }
158
159    #[pyo3(name = "cancel_all_requests")]
160    fn py_cancel_all_requests(&self) {
161        self.cancel_all_requests();
162    }
163
164    #[pyo3(name = "connect")]
165    fn py_connect<'py>(
166        &mut self,
167        py: Python<'py>,
168        instruments: Vec<Py<PyAny>>,
169        callback: Py<PyAny>,
170    ) -> PyResult<Bound<'py, PyAny>> {
171        let mut instruments_any = Vec::new();
172        for inst in instruments {
173            let inst_any = pyobject_to_instrument_any(py, inst)?;
174            instruments_any.push(inst_any);
175        }
176
177        let mut client = self.clone();
178
179        pyo3_async_runtimes::tokio::future_into_py(py, async move {
180            client.connect().await.map_err(to_pyruntime_err)?;
181
182            // Cache instruments after connection is established
183            client.cache_instruments(instruments_any);
184
185            let stream = client.stream();
186
187            tokio::spawn(async move {
188                tokio::pin!(stream);
189
190                while let Some(msg) = stream.next().await {
191                    match msg {
192                        NautilusWsMessage::Data(data_vec) => {
193                            Python::attach(|py| {
194                                for data in data_vec {
195                                    let py_obj = data_to_pycapsule(py, data);
196                                    call_python(py, &callback, py_obj);
197                                }
198                            });
199                        }
200                        NautilusWsMessage::Deltas(deltas) => {
201                            Python::attach(|py| {
202                                let py_obj = data_to_pycapsule(
203                                    py,
204                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
205                                );
206                                call_python(py, &callback, py_obj);
207                            });
208                        }
209                        NautilusWsMessage::OrderRejected(event) => {
210                            Python::attach(|py| match event.into_py_any(py) {
211                                Ok(py_obj) => call_python(py, &callback, py_obj),
212                                Err(e) => {
213                                    tracing::error!(
214                                        "Failed to convert OrderRejected to Python: {e}"
215                                    );
216                                }
217                            });
218                        }
219                        NautilusWsMessage::OrderAccepted(event) => {
220                            Python::attach(|py| match event.into_py_any(py) {
221                                Ok(py_obj) => call_python(py, &callback, py_obj),
222                                Err(e) => {
223                                    tracing::error!(
224                                        "Failed to convert OrderAccepted to Python: {e}"
225                                    );
226                                }
227                            });
228                        }
229                        NautilusWsMessage::OrderCanceled(event) => {
230                            Python::attach(|py| match event.into_py_any(py) {
231                                Ok(py_obj) => call_python(py, &callback, py_obj),
232                                Err(e) => {
233                                    tracing::error!(
234                                        "Failed to convert OrderCanceled to Python: {e}"
235                                    );
236                                }
237                            });
238                        }
239                        NautilusWsMessage::OrderExpired(event) => {
240                            Python::attach(|py| match event.into_py_any(py) {
241                                Ok(py_obj) => call_python(py, &callback, py_obj),
242                                Err(e) => {
243                                    tracing::error!(
244                                        "Failed to convert OrderExpired to Python: {e}"
245                                    );
246                                }
247                            });
248                        }
249                        NautilusWsMessage::OrderUpdated(event) => {
250                            Python::attach(|py| match event.into_py_any(py) {
251                                Ok(py_obj) => call_python(py, &callback, py_obj),
252                                Err(e) => {
253                                    tracing::error!(
254                                        "Failed to convert OrderUpdated to Python: {e}"
255                                    );
256                                }
257                            });
258                        }
259                        NautilusWsMessage::OrderStatusReport(report) => {
260                            Python::attach(|py| match (*report).into_py_any(py) {
261                                Ok(py_obj) => call_python(py, &callback, py_obj),
262                                Err(e) => {
263                                    tracing::error!(
264                                        "Failed to convert OrderStatusReport to Python: {e}"
265                                    );
266                                }
267                            });
268                        }
269                        NautilusWsMessage::FillReport(report) => {
270                            Python::attach(|py| match (*report).into_py_any(py) {
271                                Ok(py_obj) => call_python(py, &callback, py_obj),
272                                Err(e) => {
273                                    tracing::error!("Failed to convert FillReport to Python: {e}");
274                                }
275                            });
276                        }
277                        NautilusWsMessage::Reconnected => {
278                            tracing::info!("WebSocket reconnected");
279                        }
280                    }
281                }
282            });
283
284            Ok(())
285        })
286    }
287
288    #[pyo3(name = "wait_until_active")]
289    fn py_wait_until_active<'py>(
290        &self,
291        py: Python<'py>,
292        timeout_secs: f64,
293    ) -> PyResult<Bound<'py, PyAny>> {
294        let client = self.clone();
295
296        pyo3_async_runtimes::tokio::future_into_py(py, async move {
297            client
298                .wait_until_active(timeout_secs)
299                .await
300                .map_err(to_pyruntime_err)?;
301            Ok(())
302        })
303    }
304
305    #[pyo3(name = "authenticate")]
306    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
307        let client = self.clone();
308
309        pyo3_async_runtimes::tokio::future_into_py(py, async move {
310            client.authenticate().await.map_err(to_pyruntime_err)?;
311            Ok(())
312        })
313    }
314
315    #[pyo3(name = "disconnect")]
316    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
317        let mut client = self.clone();
318
319        pyo3_async_runtimes::tokio::future_into_py(py, async move {
320            client.disconnect().await.map_err(to_pyruntime_err)?;
321            Ok(())
322        })
323    }
324
325    #[pyo3(name = "send_ping")]
326    fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
327        let client = self.clone();
328
329        pyo3_async_runtimes::tokio::future_into_py(py, async move {
330            client.send_ping().await.map_err(to_pyruntime_err)?;
331            Ok(())
332        })
333    }
334
335    #[pyo3(name = "close")]
336    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
337        let mut client = self.clone();
338
339        pyo3_async_runtimes::tokio::future_into_py(py, async move {
340            client.close().await.map_err(to_pyruntime_err)?;
341            Ok(())
342        })
343    }
344
345    #[pyo3(name = "subscribe_book")]
346    fn py_subscribe_book<'py>(
347        &self,
348        py: Python<'py>,
349        instrument_id: InstrumentId,
350        depth: Option<u32>,
351    ) -> PyResult<Bound<'py, PyAny>> {
352        let client = self.clone();
353
354        pyo3_async_runtimes::tokio::future_into_py(py, async move {
355            client
356                .subscribe_book(instrument_id, depth)
357                .await
358                .map_err(to_pyruntime_err)?;
359            Ok(())
360        })
361    }
362
363    #[pyo3(name = "subscribe_quotes")]
364    fn py_subscribe_quotes<'py>(
365        &self,
366        py: Python<'py>,
367        instrument_id: InstrumentId,
368    ) -> PyResult<Bound<'py, PyAny>> {
369        let client = self.clone();
370
371        pyo3_async_runtimes::tokio::future_into_py(py, async move {
372            client
373                .subscribe_quotes(instrument_id)
374                .await
375                .map_err(to_pyruntime_err)?;
376            Ok(())
377        })
378    }
379
380    #[pyo3(name = "subscribe_trades")]
381    fn py_subscribe_trades<'py>(
382        &self,
383        py: Python<'py>,
384        instrument_id: InstrumentId,
385    ) -> PyResult<Bound<'py, PyAny>> {
386        let client = self.clone();
387
388        pyo3_async_runtimes::tokio::future_into_py(py, async move {
389            client
390                .subscribe_trades(instrument_id)
391                .await
392                .map_err(to_pyruntime_err)?;
393            Ok(())
394        })
395    }
396
397    #[pyo3(name = "subscribe_bars")]
398    fn py_subscribe_bars<'py>(
399        &self,
400        py: Python<'py>,
401        bar_type: BarType,
402    ) -> PyResult<Bound<'py, PyAny>> {
403        let client = self.clone();
404
405        pyo3_async_runtimes::tokio::future_into_py(py, async move {
406            client
407                .subscribe_bars(bar_type)
408                .await
409                .map_err(to_pyruntime_err)?;
410            Ok(())
411        })
412    }
413
414    #[pyo3(name = "subscribe_executions")]
415    #[pyo3(signature = (snap_orders=true, snap_trades=true))]
416    fn py_subscribe_executions<'py>(
417        &self,
418        py: Python<'py>,
419        snap_orders: bool,
420        snap_trades: bool,
421    ) -> PyResult<Bound<'py, PyAny>> {
422        let client = self.clone();
423
424        pyo3_async_runtimes::tokio::future_into_py(py, async move {
425            client
426                .subscribe_executions(snap_orders, snap_trades)
427                .await
428                .map_err(to_pyruntime_err)?;
429            Ok(())
430        })
431    }
432
433    #[pyo3(name = "unsubscribe_book")]
434    fn py_unsubscribe_book<'py>(
435        &self,
436        py: Python<'py>,
437        instrument_id: InstrumentId,
438    ) -> PyResult<Bound<'py, PyAny>> {
439        let client = self.clone();
440
441        pyo3_async_runtimes::tokio::future_into_py(py, async move {
442            client
443                .unsubscribe_book(instrument_id)
444                .await
445                .map_err(to_pyruntime_err)?;
446            Ok(())
447        })
448    }
449
450    #[pyo3(name = "unsubscribe_quotes")]
451    fn py_unsubscribe_quotes<'py>(
452        &self,
453        py: Python<'py>,
454        instrument_id: InstrumentId,
455    ) -> PyResult<Bound<'py, PyAny>> {
456        let client = self.clone();
457
458        pyo3_async_runtimes::tokio::future_into_py(py, async move {
459            client
460                .unsubscribe_quotes(instrument_id)
461                .await
462                .map_err(to_pyruntime_err)?;
463            Ok(())
464        })
465    }
466
467    #[pyo3(name = "unsubscribe_trades")]
468    fn py_unsubscribe_trades<'py>(
469        &self,
470        py: Python<'py>,
471        instrument_id: InstrumentId,
472    ) -> PyResult<Bound<'py, PyAny>> {
473        let client = self.clone();
474
475        pyo3_async_runtimes::tokio::future_into_py(py, async move {
476            client
477                .unsubscribe_trades(instrument_id)
478                .await
479                .map_err(to_pyruntime_err)?;
480            Ok(())
481        })
482    }
483
484    #[pyo3(name = "unsubscribe_bars")]
485    fn py_unsubscribe_bars<'py>(
486        &self,
487        py: Python<'py>,
488        bar_type: BarType,
489    ) -> PyResult<Bound<'py, PyAny>> {
490        let client = self.clone();
491
492        pyo3_async_runtimes::tokio::future_into_py(py, async move {
493            client
494                .unsubscribe_bars(bar_type)
495                .await
496                .map_err(to_pyruntime_err)?;
497            Ok(())
498        })
499    }
500}
501
502pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
503    if let Err(e) = callback.call1(py, (py_obj,)) {
504        tracing::error!("Error calling Python: {e}");
505    }
506}