nautilus_kraken/python/
websocket_futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken Futures WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21    data::{Data, OrderBookDeltas_API},
22    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{IntoPyObjectExt, prelude::*};
26
27use crate::{
28    common::{
29        credential::KrakenCredential,
30        enums::{KrakenEnvironment, KrakenProductType},
31        urls::get_kraken_ws_public_url,
32    },
33    websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
34};
35
36#[pymethods]
37impl KrakenFuturesWebSocketClient {
38    #[new]
39    #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
40    fn py_new(
41        environment: Option<KrakenEnvironment>,
42        base_url: Option<String>,
43        heartbeat_secs: Option<u64>,
44        api_key: Option<String>,
45        api_secret: Option<String>,
46    ) -> PyResult<Self> {
47        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
48        let demo = env == KrakenEnvironment::Demo;
49        let url = base_url.unwrap_or_else(|| {
50            get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
51        });
52        let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
53
54        Ok(Self::with_credentials(url, heartbeat_secs, credential))
55    }
56
57    #[getter]
58    #[pyo3(name = "has_credentials")]
59    #[must_use]
60    pub fn py_has_credentials(&self) -> bool {
61        self.has_credentials()
62    }
63
64    #[getter]
65    #[pyo3(name = "url")]
66    #[must_use]
67    pub fn py_url(&self) -> &str {
68        self.url()
69    }
70
71    #[pyo3(name = "is_closed")]
72    fn py_is_closed(&self) -> bool {
73        self.is_closed()
74    }
75
76    #[pyo3(name = "is_active")]
77    fn py_is_active(&self) -> bool {
78        self.is_active()
79    }
80
81    #[pyo3(name = "wait_until_active")]
82    fn py_wait_until_active<'py>(
83        &self,
84        py: Python<'py>,
85        timeout_secs: f64,
86    ) -> PyResult<Bound<'py, PyAny>> {
87        let client = self.clone();
88
89        pyo3_async_runtimes::tokio::future_into_py(py, async move {
90            client
91                .wait_until_active(timeout_secs)
92                .await
93                .map_err(to_pyruntime_err)?;
94            Ok(())
95        })
96    }
97
98    #[pyo3(name = "authenticate")]
99    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
100        let client = self.clone();
101
102        pyo3_async_runtimes::tokio::future_into_py(py, async move {
103            client.authenticate().await.map_err(to_pyruntime_err)?;
104            Ok(())
105        })
106    }
107
108    #[pyo3(name = "cache_instruments")]
109    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
110        let mut instruments_any = Vec::new();
111        for inst in instruments {
112            let inst_any = pyobject_to_instrument_any(py, inst)?;
113            instruments_any.push(inst_any);
114        }
115        self.cache_instruments(instruments_any);
116        Ok(())
117    }
118
119    #[pyo3(name = "cache_instrument")]
120    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
121        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
122        Ok(())
123    }
124
125    #[pyo3(name = "connect")]
126    fn py_connect<'py>(
127        &mut self,
128        py: Python<'py>,
129        instruments: Vec<Py<PyAny>>,
130        callback: Py<PyAny>,
131    ) -> PyResult<Bound<'py, PyAny>> {
132        let mut instruments_any = Vec::new();
133        for inst in instruments {
134            let inst_any = pyobject_to_instrument_any(py, inst)?;
135            instruments_any.push(inst_any);
136        }
137
138        let mut client = self.clone();
139
140        pyo3_async_runtimes::tokio::future_into_py(py, async move {
141            client.connect().await.map_err(to_pyruntime_err)?;
142
143            // Cache instruments after connection is established
144            client.cache_instruments(instruments_any);
145
146            // Take ownership of the receiver
147            if let Some(mut rx) = client.take_output_rx() {
148                get_runtime().spawn(async move {
149                    while let Some(msg) = rx.recv().await {
150                        Python::attach(|py| match msg {
151                            KrakenFuturesWsMessage::MarkPrice(update) => {
152                                let py_obj = data_to_pycapsule(py, Data::from(update));
153                                if let Err(e) = callback.call1(py, (py_obj,)) {
154                                    log::error!("Error calling Python callback: {e}");
155                                }
156                            }
157                            KrakenFuturesWsMessage::IndexPrice(update) => {
158                                let py_obj = data_to_pycapsule(py, Data::from(update));
159                                if let Err(e) = callback.call1(py, (py_obj,)) {
160                                    log::error!("Error calling Python callback: {e}");
161                                }
162                            }
163                            KrakenFuturesWsMessage::Quote(quote) => {
164                                let py_obj = data_to_pycapsule(py, Data::from(quote));
165                                if let Err(e) = callback.call1(py, (py_obj,)) {
166                                    log::error!("Error calling Python callback: {e}");
167                                }
168                            }
169                            KrakenFuturesWsMessage::Trade(trade) => {
170                                let py_obj = data_to_pycapsule(py, Data::from(trade));
171                                if let Err(e) = callback.call1(py, (py_obj,)) {
172                                    log::error!("Error calling Python callback: {e}");
173                                }
174                            }
175                            KrakenFuturesWsMessage::BookDeltas(deltas) => {
176                                let py_obj = data_to_pycapsule(
177                                    py,
178                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
179                                );
180                                if let Err(e) = callback.call1(py, (py_obj,)) {
181                                    log::error!("Error calling Python callback: {e}");
182                                }
183                            }
184                            KrakenFuturesWsMessage::OrderAccepted(event) => {
185                                match event.into_py_any(py) {
186                                    Ok(py_obj) => {
187                                        if let Err(e) = callback.call1(py, (py_obj,)) {
188                                            log::error!("Error calling Python callback: {e}");
189                                        }
190                                    }
191                                    Err(e) => {
192                                        log::error!(
193                                            "Failed to convert OrderAccepted to Python: {e}"
194                                        );
195                                    }
196                                }
197                            }
198                            KrakenFuturesWsMessage::OrderCanceled(event) => {
199                                match event.into_py_any(py) {
200                                    Ok(py_obj) => {
201                                        if let Err(e) = callback.call1(py, (py_obj,)) {
202                                            log::error!("Error calling Python callback: {e}");
203                                        }
204                                    }
205                                    Err(e) => {
206                                        log::error!(
207                                            "Failed to convert OrderCanceled to Python: {e}"
208                                        );
209                                    }
210                                }
211                            }
212                            KrakenFuturesWsMessage::OrderExpired(event) => {
213                                match event.into_py_any(py) {
214                                    Ok(py_obj) => {
215                                        if let Err(e) = callback.call1(py, (py_obj,)) {
216                                            log::error!("Error calling Python callback: {e}");
217                                        }
218                                    }
219                                    Err(e) => {
220                                        log::error!(
221                                            "Failed to convert OrderExpired to Python: {e}"
222                                        );
223                                    }
224                                }
225                            }
226                            KrakenFuturesWsMessage::OrderUpdated(event) => {
227                                match event.into_py_any(py) {
228                                    Ok(py_obj) => {
229                                        if let Err(e) = callback.call1(py, (py_obj,)) {
230                                            log::error!("Error calling Python callback: {e}");
231                                        }
232                                    }
233                                    Err(e) => {
234                                        log::error!(
235                                            "Failed to convert OrderUpdated to Python: {e}"
236                                        );
237                                    }
238                                }
239                            }
240                            KrakenFuturesWsMessage::OrderStatusReport(report) => {
241                                match (*report).into_py_any(py) {
242                                    Ok(py_obj) => {
243                                        if let Err(e) = callback.call1(py, (py_obj,)) {
244                                            log::error!("Error calling Python callback: {e}");
245                                        }
246                                    }
247                                    Err(e) => {
248                                        log::error!(
249                                            "Failed to convert OrderStatusReport to Python: {e}"
250                                        );
251                                    }
252                                }
253                            }
254                            KrakenFuturesWsMessage::FillReport(report) => {
255                                match (*report).into_py_any(py) {
256                                    Ok(py_obj) => {
257                                        if let Err(e) = callback.call1(py, (py_obj,)) {
258                                            log::error!("Error calling Python callback: {e}");
259                                        }
260                                    }
261                                    Err(e) => {
262                                        log::error!("Failed to convert FillReport to Python: {e}");
263                                    }
264                                }
265                            }
266                            KrakenFuturesWsMessage::Reconnected => {
267                                log::info!("WebSocket reconnected");
268                            }
269                        });
270                    }
271                });
272            }
273
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "disconnect")]
279    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
280        let mut client = self.clone();
281
282        pyo3_async_runtimes::tokio::future_into_py(py, async move {
283            client.disconnect().await.map_err(to_pyruntime_err)?;
284            Ok(())
285        })
286    }
287
288    #[pyo3(name = "close")]
289    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
290        let mut client = self.clone();
291
292        pyo3_async_runtimes::tokio::future_into_py(py, async move {
293            client.close().await.map_err(to_pyruntime_err)?;
294            Ok(())
295        })
296    }
297
298    #[pyo3(name = "subscribe_book")]
299    #[pyo3(signature = (instrument_id, depth=None))]
300    fn py_subscribe_book<'py>(
301        &self,
302        py: Python<'py>,
303        instrument_id: InstrumentId,
304        depth: Option<u32>,
305    ) -> PyResult<Bound<'py, PyAny>> {
306        let client = self.clone();
307
308        pyo3_async_runtimes::tokio::future_into_py(py, async move {
309            client
310                .subscribe_book(instrument_id, depth)
311                .await
312                .map_err(to_pyruntime_err)?;
313            Ok(())
314        })
315    }
316
317    #[pyo3(name = "subscribe_quotes")]
318    fn py_subscribe_quotes<'py>(
319        &self,
320        py: Python<'py>,
321        instrument_id: InstrumentId,
322    ) -> PyResult<Bound<'py, PyAny>> {
323        let client = self.clone();
324
325        pyo3_async_runtimes::tokio::future_into_py(py, async move {
326            client
327                .subscribe_quotes(instrument_id)
328                .await
329                .map_err(to_pyruntime_err)?;
330            Ok(())
331        })
332    }
333
334    #[pyo3(name = "subscribe_trades")]
335    fn py_subscribe_trades<'py>(
336        &self,
337        py: Python<'py>,
338        instrument_id: InstrumentId,
339    ) -> PyResult<Bound<'py, PyAny>> {
340        let client = self.clone();
341
342        pyo3_async_runtimes::tokio::future_into_py(py, async move {
343            client
344                .subscribe_trades(instrument_id)
345                .await
346                .map_err(to_pyruntime_err)?;
347            Ok(())
348        })
349    }
350
351    #[pyo3(name = "subscribe_mark_price")]
352    fn py_subscribe_mark_price<'py>(
353        &self,
354        py: Python<'py>,
355        instrument_id: InstrumentId,
356    ) -> PyResult<Bound<'py, PyAny>> {
357        let client = self.clone();
358
359        pyo3_async_runtimes::tokio::future_into_py(py, async move {
360            client
361                .subscribe_mark_price(instrument_id)
362                .await
363                .map_err(to_pyruntime_err)?;
364            Ok(())
365        })
366    }
367
368    #[pyo3(name = "subscribe_index_price")]
369    fn py_subscribe_index_price<'py>(
370        &self,
371        py: Python<'py>,
372        instrument_id: InstrumentId,
373    ) -> PyResult<Bound<'py, PyAny>> {
374        let client = self.clone();
375
376        pyo3_async_runtimes::tokio::future_into_py(py, async move {
377            client
378                .subscribe_index_price(instrument_id)
379                .await
380                .map_err(to_pyruntime_err)?;
381            Ok(())
382        })
383    }
384
385    #[pyo3(name = "unsubscribe_book")]
386    fn py_unsubscribe_book<'py>(
387        &self,
388        py: Python<'py>,
389        instrument_id: InstrumentId,
390    ) -> PyResult<Bound<'py, PyAny>> {
391        let client = self.clone();
392
393        pyo3_async_runtimes::tokio::future_into_py(py, async move {
394            client
395                .unsubscribe_book(instrument_id)
396                .await
397                .map_err(to_pyruntime_err)?;
398            Ok(())
399        })
400    }
401
402    #[pyo3(name = "unsubscribe_quotes")]
403    fn py_unsubscribe_quotes<'py>(
404        &self,
405        py: Python<'py>,
406        instrument_id: InstrumentId,
407    ) -> PyResult<Bound<'py, PyAny>> {
408        let client = self.clone();
409
410        pyo3_async_runtimes::tokio::future_into_py(py, async move {
411            client
412                .unsubscribe_quotes(instrument_id)
413                .await
414                .map_err(to_pyruntime_err)?;
415            Ok(())
416        })
417    }
418
419    #[pyo3(name = "unsubscribe_trades")]
420    fn py_unsubscribe_trades<'py>(
421        &self,
422        py: Python<'py>,
423        instrument_id: InstrumentId,
424    ) -> PyResult<Bound<'py, PyAny>> {
425        let client = self.clone();
426
427        pyo3_async_runtimes::tokio::future_into_py(py, async move {
428            client
429                .unsubscribe_trades(instrument_id)
430                .await
431                .map_err(to_pyruntime_err)?;
432            Ok(())
433        })
434    }
435
436    #[pyo3(name = "unsubscribe_mark_price")]
437    fn py_unsubscribe_mark_price<'py>(
438        &self,
439        py: Python<'py>,
440        instrument_id: InstrumentId,
441    ) -> PyResult<Bound<'py, PyAny>> {
442        let client = self.clone();
443
444        pyo3_async_runtimes::tokio::future_into_py(py, async move {
445            client
446                .unsubscribe_mark_price(instrument_id)
447                .await
448                .map_err(to_pyruntime_err)?;
449            Ok(())
450        })
451    }
452
453    #[pyo3(name = "unsubscribe_index_price")]
454    fn py_unsubscribe_index_price<'py>(
455        &self,
456        py: Python<'py>,
457        instrument_id: InstrumentId,
458    ) -> PyResult<Bound<'py, PyAny>> {
459        let client = self.clone();
460
461        pyo3_async_runtimes::tokio::future_into_py(py, async move {
462            client
463                .unsubscribe_index_price(instrument_id)
464                .await
465                .map_err(to_pyruntime_err)?;
466            Ok(())
467        })
468    }
469
470    #[pyo3(name = "set_account_id")]
471    fn py_set_account_id(&self, account_id: AccountId) {
472        self.set_account_id(account_id);
473    }
474
475    #[pyo3(name = "cache_client_order")]
476    fn py_cache_client_order(
477        &self,
478        client_order_id: ClientOrderId,
479        venue_order_id: Option<VenueOrderId>,
480        instrument_id: InstrumentId,
481        trader_id: TraderId,
482        strategy_id: StrategyId,
483    ) {
484        self.cache_client_order(
485            client_order_id,
486            venue_order_id,
487            instrument_id,
488            trader_id,
489            strategy_id,
490        );
491    }
492
493    #[pyo3(name = "sign_challenge")]
494    fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
495        self.sign_challenge(challenge).map_err(to_pyruntime_err)
496    }
497
498    #[pyo3(name = "authenticate_with_challenge")]
499    fn py_authenticate_with_challenge<'py>(
500        &self,
501        py: Python<'py>,
502        challenge: String,
503    ) -> PyResult<Bound<'py, PyAny>> {
504        let client = self.clone();
505
506        pyo3_async_runtimes::tokio::future_into_py(py, async move {
507            client
508                .authenticate_with_challenge(&challenge)
509                .await
510                .map_err(to_pyruntime_err)?;
511            Ok(())
512        })
513    }
514
515    #[pyo3(name = "set_auth_credentials")]
516    fn py_set_auth_credentials<'py>(
517        &self,
518        py: Python<'py>,
519        original_challenge: String,
520        signed_challenge: String,
521    ) -> PyResult<Bound<'py, PyAny>> {
522        let client = self.clone();
523
524        pyo3_async_runtimes::tokio::future_into_py(py, async move {
525            client
526                .set_auth_credentials(original_challenge, signed_challenge)
527                .await
528                .map_err(to_pyruntime_err)?;
529            Ok(())
530        })
531    }
532
533    #[pyo3(name = "subscribe_open_orders")]
534    fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
535        let client = self.clone();
536
537        pyo3_async_runtimes::tokio::future_into_py(py, async move {
538            client
539                .subscribe_open_orders()
540                .await
541                .map_err(to_pyruntime_err)?;
542            Ok(())
543        })
544    }
545
546    #[pyo3(name = "subscribe_fills")]
547    fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
548        let client = self.clone();
549
550        pyo3_async_runtimes::tokio::future_into_py(py, async move {
551            client.subscribe_fills().await.map_err(to_pyruntime_err)?;
552            Ok(())
553        })
554    }
555
556    #[pyo3(name = "subscribe_executions")]
557    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
558        let client = self.clone();
559
560        pyo3_async_runtimes::tokio::future_into_py(py, async move {
561            client
562                .subscribe_executions()
563                .await
564                .map_err(to_pyruntime_err)?;
565            Ok(())
566        })
567    }
568}