nautilus_kraken/python/
websocket_futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken Futures WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21    data::{Data, OrderBookDeltas_API},
22    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{IntoPyObjectExt, prelude::*};
26
27use crate::{
28    common::{
29        credential::KrakenCredential,
30        enums::{KrakenEnvironment, KrakenProductType},
31        urls::get_kraken_ws_public_url,
32    },
33    websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
34};
35
36#[pymethods]
37impl KrakenFuturesWebSocketClient {
38    #[new]
39    #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
40    fn py_new(
41        environment: Option<KrakenEnvironment>,
42        base_url: Option<String>,
43        heartbeat_secs: Option<u64>,
44        api_key: Option<String>,
45        api_secret: Option<String>,
46    ) -> PyResult<Self> {
47        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
48        let demo = env == KrakenEnvironment::Demo;
49        let url = base_url.unwrap_or_else(|| {
50            get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
51        });
52        let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
53
54        Ok(KrakenFuturesWebSocketClient::with_credentials(
55            url,
56            heartbeat_secs,
57            credential,
58        ))
59    }
60
61    #[getter]
62    #[pyo3(name = "has_credentials")]
63    #[must_use]
64    pub fn py_has_credentials(&self) -> bool {
65        self.has_credentials()
66    }
67
68    #[getter]
69    #[pyo3(name = "url")]
70    #[must_use]
71    pub fn py_url(&self) -> &str {
72        self.url()
73    }
74
75    #[pyo3(name = "is_closed")]
76    fn py_is_closed(&self) -> bool {
77        self.is_closed()
78    }
79
80    #[pyo3(name = "is_active")]
81    fn py_is_active(&self) -> bool {
82        self.is_active()
83    }
84
85    #[pyo3(name = "wait_until_active")]
86    fn py_wait_until_active<'py>(
87        &self,
88        py: Python<'py>,
89        timeout_secs: f64,
90    ) -> PyResult<Bound<'py, PyAny>> {
91        let client = self.clone();
92
93        pyo3_async_runtimes::tokio::future_into_py(py, async move {
94            client
95                .wait_until_active(timeout_secs)
96                .await
97                .map_err(to_pyruntime_err)?;
98            Ok(())
99        })
100    }
101
102    #[pyo3(name = "authenticate")]
103    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
104        let client = self.clone();
105
106        pyo3_async_runtimes::tokio::future_into_py(py, async move {
107            client.authenticate().await.map_err(to_pyruntime_err)?;
108            Ok(())
109        })
110    }
111
112    #[pyo3(name = "cache_instruments")]
113    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
114        let mut instruments_any = Vec::new();
115        for inst in instruments {
116            let inst_any = pyobject_to_instrument_any(py, inst)?;
117            instruments_any.push(inst_any);
118        }
119        self.cache_instruments(instruments_any);
120        Ok(())
121    }
122
123    #[pyo3(name = "cache_instrument")]
124    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
125        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
126        Ok(())
127    }
128
129    #[pyo3(name = "connect")]
130    fn py_connect<'py>(
131        &mut self,
132        py: Python<'py>,
133        instruments: Vec<Py<PyAny>>,
134        callback: Py<PyAny>,
135    ) -> PyResult<Bound<'py, PyAny>> {
136        let mut instruments_any = Vec::new();
137        for inst in instruments {
138            let inst_any = pyobject_to_instrument_any(py, inst)?;
139            instruments_any.push(inst_any);
140        }
141
142        let mut client = self.clone();
143
144        pyo3_async_runtimes::tokio::future_into_py(py, async move {
145            client.connect().await.map_err(to_pyruntime_err)?;
146
147            // Cache instruments after connection is established
148            client.cache_instruments(instruments_any);
149
150            // Take ownership of the receiver
151            if let Some(mut rx) = client.take_output_rx() {
152                get_runtime().spawn(async move {
153                    while let Some(msg) = rx.recv().await {
154                        Python::attach(|py| match msg {
155                            KrakenFuturesWsMessage::MarkPrice(update) => {
156                                let py_obj = data_to_pycapsule(py, Data::from(update));
157                                if let Err(e) = callback.call1(py, (py_obj,)) {
158                                    tracing::error!("Error calling Python callback: {e}");
159                                }
160                            }
161                            KrakenFuturesWsMessage::IndexPrice(update) => {
162                                let py_obj = data_to_pycapsule(py, Data::from(update));
163                                if let Err(e) = callback.call1(py, (py_obj,)) {
164                                    tracing::error!("Error calling Python callback: {e}");
165                                }
166                            }
167                            KrakenFuturesWsMessage::Quote(quote) => {
168                                let py_obj = data_to_pycapsule(py, Data::from(quote));
169                                if let Err(e) = callback.call1(py, (py_obj,)) {
170                                    tracing::error!("Error calling Python callback: {e}");
171                                }
172                            }
173                            KrakenFuturesWsMessage::Trade(trade) => {
174                                let py_obj = data_to_pycapsule(py, Data::from(trade));
175                                if let Err(e) = callback.call1(py, (py_obj,)) {
176                                    tracing::error!("Error calling Python callback: {e}");
177                                }
178                            }
179                            KrakenFuturesWsMessage::BookDeltas(deltas) => {
180                                let py_obj = data_to_pycapsule(
181                                    py,
182                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
183                                );
184                                if let Err(e) = callback.call1(py, (py_obj,)) {
185                                    tracing::error!("Error calling Python callback: {e}");
186                                }
187                            }
188                            KrakenFuturesWsMessage::OrderAccepted(event) => {
189                                match event.into_py_any(py) {
190                                    Ok(py_obj) => {
191                                        if let Err(e) = callback.call1(py, (py_obj,)) {
192                                            tracing::error!("Error calling Python callback: {e}");
193                                        }
194                                    }
195                                    Err(e) => {
196                                        tracing::error!(
197                                            "Failed to convert OrderAccepted to Python: {e}"
198                                        );
199                                    }
200                                }
201                            }
202                            KrakenFuturesWsMessage::OrderCanceled(event) => {
203                                match event.into_py_any(py) {
204                                    Ok(py_obj) => {
205                                        if let Err(e) = callback.call1(py, (py_obj,)) {
206                                            tracing::error!("Error calling Python callback: {e}");
207                                        }
208                                    }
209                                    Err(e) => {
210                                        tracing::error!(
211                                            "Failed to convert OrderCanceled to Python: {e}"
212                                        );
213                                    }
214                                }
215                            }
216                            KrakenFuturesWsMessage::OrderExpired(event) => {
217                                match event.into_py_any(py) {
218                                    Ok(py_obj) => {
219                                        if let Err(e) = callback.call1(py, (py_obj,)) {
220                                            tracing::error!("Error calling Python callback: {e}");
221                                        }
222                                    }
223                                    Err(e) => {
224                                        tracing::error!(
225                                            "Failed to convert OrderExpired to Python: {e}"
226                                        );
227                                    }
228                                }
229                            }
230                            KrakenFuturesWsMessage::OrderUpdated(event) => {
231                                match event.into_py_any(py) {
232                                    Ok(py_obj) => {
233                                        if let Err(e) = callback.call1(py, (py_obj,)) {
234                                            tracing::error!("Error calling Python callback: {e}");
235                                        }
236                                    }
237                                    Err(e) => {
238                                        tracing::error!(
239                                            "Failed to convert OrderUpdated to Python: {e}"
240                                        );
241                                    }
242                                }
243                            }
244                            KrakenFuturesWsMessage::OrderStatusReport(report) => {
245                                match (*report).into_py_any(py) {
246                                    Ok(py_obj) => {
247                                        if let Err(e) = callback.call1(py, (py_obj,)) {
248                                            tracing::error!("Error calling Python callback: {e}");
249                                        }
250                                    }
251                                    Err(e) => {
252                                        tracing::error!(
253                                            "Failed to convert OrderStatusReport to Python: {e}"
254                                        );
255                                    }
256                                }
257                            }
258                            KrakenFuturesWsMessage::FillReport(report) => {
259                                match (*report).into_py_any(py) {
260                                    Ok(py_obj) => {
261                                        if let Err(e) = callback.call1(py, (py_obj,)) {
262                                            tracing::error!("Error calling Python callback: {e}");
263                                        }
264                                    }
265                                    Err(e) => {
266                                        tracing::error!(
267                                            "Failed to convert FillReport to Python: {e}"
268                                        );
269                                    }
270                                }
271                            }
272                            KrakenFuturesWsMessage::Reconnected => {
273                                tracing::info!("WebSocket reconnected");
274                            }
275                        });
276                    }
277                });
278            }
279
280            Ok(())
281        })
282    }
283
284    #[pyo3(name = "disconnect")]
285    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
286        let mut client = self.clone();
287
288        pyo3_async_runtimes::tokio::future_into_py(py, async move {
289            client.disconnect().await.map_err(to_pyruntime_err)?;
290            Ok(())
291        })
292    }
293
294    #[pyo3(name = "close")]
295    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
296        let mut client = self.clone();
297
298        pyo3_async_runtimes::tokio::future_into_py(py, async move {
299            client.close().await.map_err(to_pyruntime_err)?;
300            Ok(())
301        })
302    }
303
304    #[pyo3(name = "subscribe_book")]
305    #[pyo3(signature = (instrument_id, depth=None))]
306    fn py_subscribe_book<'py>(
307        &self,
308        py: Python<'py>,
309        instrument_id: InstrumentId,
310        depth: Option<u32>,
311    ) -> PyResult<Bound<'py, PyAny>> {
312        let client = self.clone();
313
314        pyo3_async_runtimes::tokio::future_into_py(py, async move {
315            client
316                .subscribe_book(instrument_id, depth)
317                .await
318                .map_err(to_pyruntime_err)?;
319            Ok(())
320        })
321    }
322
323    #[pyo3(name = "subscribe_quotes")]
324    fn py_subscribe_quotes<'py>(
325        &self,
326        py: Python<'py>,
327        instrument_id: InstrumentId,
328    ) -> PyResult<Bound<'py, PyAny>> {
329        let client = self.clone();
330
331        pyo3_async_runtimes::tokio::future_into_py(py, async move {
332            client
333                .subscribe_quotes(instrument_id)
334                .await
335                .map_err(to_pyruntime_err)?;
336            Ok(())
337        })
338    }
339
340    #[pyo3(name = "subscribe_trades")]
341    fn py_subscribe_trades<'py>(
342        &self,
343        py: Python<'py>,
344        instrument_id: InstrumentId,
345    ) -> PyResult<Bound<'py, PyAny>> {
346        let client = self.clone();
347
348        pyo3_async_runtimes::tokio::future_into_py(py, async move {
349            client
350                .subscribe_trades(instrument_id)
351                .await
352                .map_err(to_pyruntime_err)?;
353            Ok(())
354        })
355    }
356
357    #[pyo3(name = "subscribe_mark_price")]
358    fn py_subscribe_mark_price<'py>(
359        &self,
360        py: Python<'py>,
361        instrument_id: InstrumentId,
362    ) -> PyResult<Bound<'py, PyAny>> {
363        let client = self.clone();
364
365        pyo3_async_runtimes::tokio::future_into_py(py, async move {
366            client
367                .subscribe_mark_price(instrument_id)
368                .await
369                .map_err(to_pyruntime_err)?;
370            Ok(())
371        })
372    }
373
374    #[pyo3(name = "subscribe_index_price")]
375    fn py_subscribe_index_price<'py>(
376        &self,
377        py: Python<'py>,
378        instrument_id: InstrumentId,
379    ) -> PyResult<Bound<'py, PyAny>> {
380        let client = self.clone();
381
382        pyo3_async_runtimes::tokio::future_into_py(py, async move {
383            client
384                .subscribe_index_price(instrument_id)
385                .await
386                .map_err(to_pyruntime_err)?;
387            Ok(())
388        })
389    }
390
391    #[pyo3(name = "unsubscribe_book")]
392    fn py_unsubscribe_book<'py>(
393        &self,
394        py: Python<'py>,
395        instrument_id: InstrumentId,
396    ) -> PyResult<Bound<'py, PyAny>> {
397        let client = self.clone();
398
399        pyo3_async_runtimes::tokio::future_into_py(py, async move {
400            client
401                .unsubscribe_book(instrument_id)
402                .await
403                .map_err(to_pyruntime_err)?;
404            Ok(())
405        })
406    }
407
408    #[pyo3(name = "unsubscribe_quotes")]
409    fn py_unsubscribe_quotes<'py>(
410        &self,
411        py: Python<'py>,
412        instrument_id: InstrumentId,
413    ) -> PyResult<Bound<'py, PyAny>> {
414        let client = self.clone();
415
416        pyo3_async_runtimes::tokio::future_into_py(py, async move {
417            client
418                .unsubscribe_quotes(instrument_id)
419                .await
420                .map_err(to_pyruntime_err)?;
421            Ok(())
422        })
423    }
424
425    #[pyo3(name = "unsubscribe_trades")]
426    fn py_unsubscribe_trades<'py>(
427        &self,
428        py: Python<'py>,
429        instrument_id: InstrumentId,
430    ) -> PyResult<Bound<'py, PyAny>> {
431        let client = self.clone();
432
433        pyo3_async_runtimes::tokio::future_into_py(py, async move {
434            client
435                .unsubscribe_trades(instrument_id)
436                .await
437                .map_err(to_pyruntime_err)?;
438            Ok(())
439        })
440    }
441
442    #[pyo3(name = "unsubscribe_mark_price")]
443    fn py_unsubscribe_mark_price<'py>(
444        &self,
445        py: Python<'py>,
446        instrument_id: InstrumentId,
447    ) -> PyResult<Bound<'py, PyAny>> {
448        let client = self.clone();
449
450        pyo3_async_runtimes::tokio::future_into_py(py, async move {
451            client
452                .unsubscribe_mark_price(instrument_id)
453                .await
454                .map_err(to_pyruntime_err)?;
455            Ok(())
456        })
457    }
458
459    #[pyo3(name = "unsubscribe_index_price")]
460    fn py_unsubscribe_index_price<'py>(
461        &self,
462        py: Python<'py>,
463        instrument_id: InstrumentId,
464    ) -> PyResult<Bound<'py, PyAny>> {
465        let client = self.clone();
466
467        pyo3_async_runtimes::tokio::future_into_py(py, async move {
468            client
469                .unsubscribe_index_price(instrument_id)
470                .await
471                .map_err(to_pyruntime_err)?;
472            Ok(())
473        })
474    }
475
476    #[pyo3(name = "set_account_id")]
477    fn py_set_account_id(&self, account_id: AccountId) {
478        self.set_account_id(account_id);
479    }
480
481    #[pyo3(name = "cache_client_order")]
482    fn py_cache_client_order(
483        &self,
484        client_order_id: ClientOrderId,
485        venue_order_id: Option<VenueOrderId>,
486        instrument_id: InstrumentId,
487        trader_id: TraderId,
488        strategy_id: StrategyId,
489    ) {
490        self.cache_client_order(
491            client_order_id,
492            venue_order_id,
493            instrument_id,
494            trader_id,
495            strategy_id,
496        );
497    }
498
499    #[pyo3(name = "sign_challenge")]
500    fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
501        self.sign_challenge(challenge).map_err(to_pyruntime_err)
502    }
503
504    #[pyo3(name = "authenticate_with_challenge")]
505    fn py_authenticate_with_challenge<'py>(
506        &self,
507        py: Python<'py>,
508        challenge: String,
509    ) -> PyResult<Bound<'py, PyAny>> {
510        let client = self.clone();
511
512        pyo3_async_runtimes::tokio::future_into_py(py, async move {
513            client
514                .authenticate_with_challenge(&challenge)
515                .await
516                .map_err(to_pyruntime_err)?;
517            Ok(())
518        })
519    }
520
521    #[pyo3(name = "set_auth_credentials")]
522    fn py_set_auth_credentials<'py>(
523        &self,
524        py: Python<'py>,
525        original_challenge: String,
526        signed_challenge: String,
527    ) -> PyResult<Bound<'py, PyAny>> {
528        let client = self.clone();
529
530        pyo3_async_runtimes::tokio::future_into_py(py, async move {
531            client
532                .set_auth_credentials(original_challenge, signed_challenge)
533                .await
534                .map_err(to_pyruntime_err)?;
535            Ok(())
536        })
537    }
538
539    #[pyo3(name = "subscribe_open_orders")]
540    fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
541        let client = self.clone();
542
543        pyo3_async_runtimes::tokio::future_into_py(py, async move {
544            client
545                .subscribe_open_orders()
546                .await
547                .map_err(to_pyruntime_err)?;
548            Ok(())
549        })
550    }
551
552    #[pyo3(name = "subscribe_fills")]
553    fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
554        let client = self.clone();
555
556        pyo3_async_runtimes::tokio::future_into_py(py, async move {
557            client.subscribe_fills().await.map_err(to_pyruntime_err)?;
558            Ok(())
559        })
560    }
561
562    #[pyo3(name = "subscribe_executions")]
563    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
564        let client = self.clone();
565
566        pyo3_async_runtimes::tokio::future_into_py(py, async move {
567            client
568                .subscribe_executions()
569                .await
570                .map_err(to_pyruntime_err)?;
571            Ok(())
572        })
573    }
574}