Skip to main content

nautilus_kraken/python/
websocket_futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken Futures WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21    data::{Data, OrderBookDeltas_API},
22    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{IntoPyObjectExt, prelude::*};
26
27use crate::{
28    common::{
29        credential::KrakenCredential,
30        enums::{KrakenEnvironment, KrakenProductType},
31        urls::get_kraken_ws_public_url,
32    },
33    websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
34};
35
36#[pymethods]
37impl KrakenFuturesWebSocketClient {
38    #[new]
39    #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
40    fn py_new(
41        environment: Option<KrakenEnvironment>,
42        base_url: Option<String>,
43        heartbeat_secs: Option<u64>,
44        api_key: Option<String>,
45        api_secret: Option<String>,
46    ) -> PyResult<Self> {
47        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
48        let demo = env == KrakenEnvironment::Demo;
49        let url = base_url.unwrap_or_else(|| {
50            get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
51        });
52        let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
53
54        Ok(Self::with_credentials(url, heartbeat_secs, credential))
55    }
56
57    #[getter]
58    #[pyo3(name = "has_credentials")]
59    #[must_use]
60    pub fn py_has_credentials(&self) -> bool {
61        self.has_credentials()
62    }
63
64    #[getter]
65    #[pyo3(name = "url")]
66    #[must_use]
67    pub fn py_url(&self) -> &str {
68        self.url()
69    }
70
71    #[pyo3(name = "is_closed")]
72    fn py_is_closed(&self) -> bool {
73        self.is_closed()
74    }
75
76    #[pyo3(name = "is_active")]
77    fn py_is_active(&self) -> bool {
78        self.is_active()
79    }
80
81    #[pyo3(name = "wait_until_active")]
82    fn py_wait_until_active<'py>(
83        &self,
84        py: Python<'py>,
85        timeout_secs: f64,
86    ) -> PyResult<Bound<'py, PyAny>> {
87        let client = self.clone();
88
89        pyo3_async_runtimes::tokio::future_into_py(py, async move {
90            client
91                .wait_until_active(timeout_secs)
92                .await
93                .map_err(to_pyruntime_err)?;
94            Ok(())
95        })
96    }
97
98    #[pyo3(name = "authenticate")]
99    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
100        let client = self.clone();
101
102        pyo3_async_runtimes::tokio::future_into_py(py, async move {
103            client.authenticate().await.map_err(to_pyruntime_err)?;
104            Ok(())
105        })
106    }
107
108    #[pyo3(name = "cache_instruments")]
109    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
110        let mut instruments_any = Vec::new();
111        for inst in instruments {
112            let inst_any = pyobject_to_instrument_any(py, inst)?;
113            instruments_any.push(inst_any);
114        }
115        self.cache_instruments(instruments_any);
116        Ok(())
117    }
118
119    #[pyo3(name = "cache_instrument")]
120    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
121        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
122        Ok(())
123    }
124
125    #[pyo3(name = "connect")]
126    fn py_connect<'py>(
127        &mut self,
128        py: Python<'py>,
129        instruments: Vec<Py<PyAny>>,
130        callback: Py<PyAny>,
131    ) -> PyResult<Bound<'py, PyAny>> {
132        let mut instruments_any = Vec::new();
133        for inst in instruments {
134            let inst_any = pyobject_to_instrument_any(py, inst)?;
135            instruments_any.push(inst_any);
136        }
137
138        let mut client = self.clone();
139
140        pyo3_async_runtimes::tokio::future_into_py(py, async move {
141            client.connect().await.map_err(to_pyruntime_err)?;
142
143            // Cache instruments after connection is established
144            client.cache_instruments(instruments_any);
145
146            // Take ownership of the receiver
147            if let Some(mut rx) = client.take_output_rx() {
148                get_runtime().spawn(async move {
149                    while let Some(msg) = rx.recv().await {
150                        Python::attach(|py| match msg {
151                            KrakenFuturesWsMessage::MarkPrice(update) => {
152                                let py_obj = data_to_pycapsule(py, Data::from(update));
153                                if let Err(e) = callback.call1(py, (py_obj,)) {
154                                    log::error!("Error calling Python callback: {e}");
155                                }
156                            }
157                            KrakenFuturesWsMessage::IndexPrice(update) => {
158                                let py_obj = data_to_pycapsule(py, Data::from(update));
159                                if let Err(e) = callback.call1(py, (py_obj,)) {
160                                    log::error!("Error calling Python callback: {e}");
161                                }
162                            }
163                            KrakenFuturesWsMessage::FundingRate(update) => {
164                                match update.into_py_any(py) {
165                                    Ok(py_obj) => {
166                                        if let Err(e) = callback.call1(py, (py_obj,)) {
167                                            log::error!("Error calling Python callback: {e}");
168                                        }
169                                    }
170                                    Err(e) => {
171                                        log::error!(
172                                            "Failed to convert FundingRateUpdate to Python: {e}"
173                                        );
174                                    }
175                                }
176                            }
177                            KrakenFuturesWsMessage::Quote(quote) => {
178                                let py_obj = data_to_pycapsule(py, Data::from(quote));
179                                if let Err(e) = callback.call1(py, (py_obj,)) {
180                                    log::error!("Error calling Python callback: {e}");
181                                }
182                            }
183                            KrakenFuturesWsMessage::Trade(trade) => {
184                                let py_obj = data_to_pycapsule(py, Data::from(trade));
185                                if let Err(e) = callback.call1(py, (py_obj,)) {
186                                    log::error!("Error calling Python callback: {e}");
187                                }
188                            }
189                            KrakenFuturesWsMessage::BookDeltas(deltas) => {
190                                let py_obj = data_to_pycapsule(
191                                    py,
192                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
193                                );
194                                if let Err(e) = callback.call1(py, (py_obj,)) {
195                                    log::error!("Error calling Python callback: {e}");
196                                }
197                            }
198                            KrakenFuturesWsMessage::OrderAccepted(event) => {
199                                match event.into_py_any(py) {
200                                    Ok(py_obj) => {
201                                        if let Err(e) = callback.call1(py, (py_obj,)) {
202                                            log::error!("Error calling Python callback: {e}");
203                                        }
204                                    }
205                                    Err(e) => {
206                                        log::error!(
207                                            "Failed to convert OrderAccepted to Python: {e}"
208                                        );
209                                    }
210                                }
211                            }
212                            KrakenFuturesWsMessage::OrderCanceled(event) => {
213                                match event.into_py_any(py) {
214                                    Ok(py_obj) => {
215                                        if let Err(e) = callback.call1(py, (py_obj,)) {
216                                            log::error!("Error calling Python callback: {e}");
217                                        }
218                                    }
219                                    Err(e) => {
220                                        log::error!(
221                                            "Failed to convert OrderCanceled to Python: {e}"
222                                        );
223                                    }
224                                }
225                            }
226                            KrakenFuturesWsMessage::OrderExpired(event) => {
227                                match event.into_py_any(py) {
228                                    Ok(py_obj) => {
229                                        if let Err(e) = callback.call1(py, (py_obj,)) {
230                                            log::error!("Error calling Python callback: {e}");
231                                        }
232                                    }
233                                    Err(e) => {
234                                        log::error!(
235                                            "Failed to convert OrderExpired to Python: {e}"
236                                        );
237                                    }
238                                }
239                            }
240                            KrakenFuturesWsMessage::OrderUpdated(event) => {
241                                match event.into_py_any(py) {
242                                    Ok(py_obj) => {
243                                        if let Err(e) = callback.call1(py, (py_obj,)) {
244                                            log::error!("Error calling Python callback: {e}");
245                                        }
246                                    }
247                                    Err(e) => {
248                                        log::error!(
249                                            "Failed to convert OrderUpdated to Python: {e}"
250                                        );
251                                    }
252                                }
253                            }
254                            KrakenFuturesWsMessage::OrderStatusReport(report) => {
255                                match (*report).into_py_any(py) {
256                                    Ok(py_obj) => {
257                                        if let Err(e) = callback.call1(py, (py_obj,)) {
258                                            log::error!("Error calling Python callback: {e}");
259                                        }
260                                    }
261                                    Err(e) => {
262                                        log::error!(
263                                            "Failed to convert OrderStatusReport to Python: {e}"
264                                        );
265                                    }
266                                }
267                            }
268                            KrakenFuturesWsMessage::FillReport(report) => {
269                                match (*report).into_py_any(py) {
270                                    Ok(py_obj) => {
271                                        if let Err(e) = callback.call1(py, (py_obj,)) {
272                                            log::error!("Error calling Python callback: {e}");
273                                        }
274                                    }
275                                    Err(e) => {
276                                        log::error!("Failed to convert FillReport to Python: {e}");
277                                    }
278                                }
279                            }
280                            KrakenFuturesWsMessage::Reconnected => {
281                                log::info!("WebSocket reconnected");
282                            }
283                        });
284                    }
285                });
286            }
287
288            Ok(())
289        })
290    }
291
292    #[pyo3(name = "disconnect")]
293    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
294        let mut client = self.clone();
295
296        pyo3_async_runtimes::tokio::future_into_py(py, async move {
297            client.disconnect().await.map_err(to_pyruntime_err)?;
298            Ok(())
299        })
300    }
301
302    #[pyo3(name = "close")]
303    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
304        let mut client = self.clone();
305
306        pyo3_async_runtimes::tokio::future_into_py(py, async move {
307            client.close().await.map_err(to_pyruntime_err)?;
308            Ok(())
309        })
310    }
311
312    #[pyo3(name = "subscribe_book")]
313    #[pyo3(signature = (instrument_id, depth=None))]
314    fn py_subscribe_book<'py>(
315        &self,
316        py: Python<'py>,
317        instrument_id: InstrumentId,
318        depth: Option<u32>,
319    ) -> PyResult<Bound<'py, PyAny>> {
320        let client = self.clone();
321
322        pyo3_async_runtimes::tokio::future_into_py(py, async move {
323            client
324                .subscribe_book(instrument_id, depth)
325                .await
326                .map_err(to_pyruntime_err)?;
327            Ok(())
328        })
329    }
330
331    #[pyo3(name = "subscribe_quotes")]
332    fn py_subscribe_quotes<'py>(
333        &self,
334        py: Python<'py>,
335        instrument_id: InstrumentId,
336    ) -> PyResult<Bound<'py, PyAny>> {
337        let client = self.clone();
338
339        pyo3_async_runtimes::tokio::future_into_py(py, async move {
340            client
341                .subscribe_quotes(instrument_id)
342                .await
343                .map_err(to_pyruntime_err)?;
344            Ok(())
345        })
346    }
347
348    #[pyo3(name = "subscribe_trades")]
349    fn py_subscribe_trades<'py>(
350        &self,
351        py: Python<'py>,
352        instrument_id: InstrumentId,
353    ) -> PyResult<Bound<'py, PyAny>> {
354        let client = self.clone();
355
356        pyo3_async_runtimes::tokio::future_into_py(py, async move {
357            client
358                .subscribe_trades(instrument_id)
359                .await
360                .map_err(to_pyruntime_err)?;
361            Ok(())
362        })
363    }
364
365    #[pyo3(name = "subscribe_mark_price")]
366    fn py_subscribe_mark_price<'py>(
367        &self,
368        py: Python<'py>,
369        instrument_id: InstrumentId,
370    ) -> PyResult<Bound<'py, PyAny>> {
371        let client = self.clone();
372
373        pyo3_async_runtimes::tokio::future_into_py(py, async move {
374            client
375                .subscribe_mark_price(instrument_id)
376                .await
377                .map_err(to_pyruntime_err)?;
378            Ok(())
379        })
380    }
381
382    #[pyo3(name = "subscribe_index_price")]
383    fn py_subscribe_index_price<'py>(
384        &self,
385        py: Python<'py>,
386        instrument_id: InstrumentId,
387    ) -> PyResult<Bound<'py, PyAny>> {
388        let client = self.clone();
389
390        pyo3_async_runtimes::tokio::future_into_py(py, async move {
391            client
392                .subscribe_index_price(instrument_id)
393                .await
394                .map_err(to_pyruntime_err)?;
395            Ok(())
396        })
397    }
398
399    #[pyo3(name = "subscribe_funding_rate")]
400    fn py_subscribe_funding_rate<'py>(
401        &self,
402        py: Python<'py>,
403        instrument_id: InstrumentId,
404    ) -> PyResult<Bound<'py, PyAny>> {
405        let client = self.clone();
406
407        pyo3_async_runtimes::tokio::future_into_py(py, async move {
408            client
409                .subscribe_funding_rate(instrument_id)
410                .await
411                .map_err(to_pyruntime_err)?;
412            Ok(())
413        })
414    }
415
416    #[pyo3(name = "unsubscribe_book")]
417    fn py_unsubscribe_book<'py>(
418        &self,
419        py: Python<'py>,
420        instrument_id: InstrumentId,
421    ) -> PyResult<Bound<'py, PyAny>> {
422        let client = self.clone();
423
424        pyo3_async_runtimes::tokio::future_into_py(py, async move {
425            client
426                .unsubscribe_book(instrument_id)
427                .await
428                .map_err(to_pyruntime_err)?;
429            Ok(())
430        })
431    }
432
433    #[pyo3(name = "unsubscribe_quotes")]
434    fn py_unsubscribe_quotes<'py>(
435        &self,
436        py: Python<'py>,
437        instrument_id: InstrumentId,
438    ) -> PyResult<Bound<'py, PyAny>> {
439        let client = self.clone();
440
441        pyo3_async_runtimes::tokio::future_into_py(py, async move {
442            client
443                .unsubscribe_quotes(instrument_id)
444                .await
445                .map_err(to_pyruntime_err)?;
446            Ok(())
447        })
448    }
449
450    #[pyo3(name = "unsubscribe_trades")]
451    fn py_unsubscribe_trades<'py>(
452        &self,
453        py: Python<'py>,
454        instrument_id: InstrumentId,
455    ) -> PyResult<Bound<'py, PyAny>> {
456        let client = self.clone();
457
458        pyo3_async_runtimes::tokio::future_into_py(py, async move {
459            client
460                .unsubscribe_trades(instrument_id)
461                .await
462                .map_err(to_pyruntime_err)?;
463            Ok(())
464        })
465    }
466
467    #[pyo3(name = "unsubscribe_mark_price")]
468    fn py_unsubscribe_mark_price<'py>(
469        &self,
470        py: Python<'py>,
471        instrument_id: InstrumentId,
472    ) -> PyResult<Bound<'py, PyAny>> {
473        let client = self.clone();
474
475        pyo3_async_runtimes::tokio::future_into_py(py, async move {
476            client
477                .unsubscribe_mark_price(instrument_id)
478                .await
479                .map_err(to_pyruntime_err)?;
480            Ok(())
481        })
482    }
483
484    #[pyo3(name = "unsubscribe_index_price")]
485    fn py_unsubscribe_index_price<'py>(
486        &self,
487        py: Python<'py>,
488        instrument_id: InstrumentId,
489    ) -> PyResult<Bound<'py, PyAny>> {
490        let client = self.clone();
491
492        pyo3_async_runtimes::tokio::future_into_py(py, async move {
493            client
494                .unsubscribe_index_price(instrument_id)
495                .await
496                .map_err(to_pyruntime_err)?;
497            Ok(())
498        })
499    }
500
501    #[pyo3(name = "unsubscribe_funding_rate")]
502    fn py_unsubscribe_funding_rate<'py>(
503        &self,
504        py: Python<'py>,
505        instrument_id: InstrumentId,
506    ) -> PyResult<Bound<'py, PyAny>> {
507        let client = self.clone();
508
509        pyo3_async_runtimes::tokio::future_into_py(py, async move {
510            client
511                .unsubscribe_funding_rate(instrument_id)
512                .await
513                .map_err(to_pyruntime_err)?;
514            Ok(())
515        })
516    }
517
518    #[pyo3(name = "set_account_id")]
519    fn py_set_account_id(&self, account_id: AccountId) {
520        self.set_account_id(account_id);
521    }
522
523    #[pyo3(name = "cache_client_order")]
524    fn py_cache_client_order(
525        &self,
526        client_order_id: ClientOrderId,
527        venue_order_id: Option<VenueOrderId>,
528        instrument_id: InstrumentId,
529        trader_id: TraderId,
530        strategy_id: StrategyId,
531    ) {
532        self.cache_client_order(
533            client_order_id,
534            venue_order_id,
535            instrument_id,
536            trader_id,
537            strategy_id,
538        );
539    }
540
541    #[pyo3(name = "sign_challenge")]
542    fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
543        self.sign_challenge(challenge).map_err(to_pyruntime_err)
544    }
545
546    #[pyo3(name = "authenticate_with_challenge")]
547    fn py_authenticate_with_challenge<'py>(
548        &self,
549        py: Python<'py>,
550        challenge: String,
551    ) -> PyResult<Bound<'py, PyAny>> {
552        let client = self.clone();
553
554        pyo3_async_runtimes::tokio::future_into_py(py, async move {
555            client
556                .authenticate_with_challenge(&challenge)
557                .await
558                .map_err(to_pyruntime_err)?;
559            Ok(())
560        })
561    }
562
563    #[pyo3(name = "set_auth_credentials")]
564    fn py_set_auth_credentials<'py>(
565        &self,
566        py: Python<'py>,
567        original_challenge: String,
568        signed_challenge: String,
569    ) -> PyResult<Bound<'py, PyAny>> {
570        let client = self.clone();
571
572        pyo3_async_runtimes::tokio::future_into_py(py, async move {
573            client
574                .set_auth_credentials(original_challenge, signed_challenge)
575                .await
576                .map_err(to_pyruntime_err)?;
577            Ok(())
578        })
579    }
580
581    #[pyo3(name = "subscribe_open_orders")]
582    fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
583        let client = self.clone();
584
585        pyo3_async_runtimes::tokio::future_into_py(py, async move {
586            client
587                .subscribe_open_orders()
588                .await
589                .map_err(to_pyruntime_err)?;
590            Ok(())
591        })
592    }
593
594    #[pyo3(name = "subscribe_fills")]
595    fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
596        let client = self.clone();
597
598        pyo3_async_runtimes::tokio::future_into_py(py, async move {
599            client.subscribe_fills().await.map_err(to_pyruntime_err)?;
600            Ok(())
601        })
602    }
603
604    #[pyo3(name = "subscribe_executions")]
605    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
606        let client = self.clone();
607
608        pyo3_async_runtimes::tokio::future_into_py(py, async move {
609            client
610                .subscribe_executions()
611                .await
612                .map_err(to_pyruntime_err)?;
613            Ok(())
614        })
615    }
616}