nautilus_kraken/python/
websocket_futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Kraken Futures WebSocket client.
17
18use nautilus_core::python::to_pyruntime_err;
19use nautilus_model::{
20    data::{Data, OrderBookDeltas_API},
21    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
22    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
23};
24use pyo3::{IntoPyObjectExt, prelude::*};
25
26use crate::{
27    common::{
28        credential::KrakenCredential,
29        enums::{KrakenEnvironment, KrakenProductType},
30        urls::get_kraken_ws_public_url,
31    },
32    websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
33};
34
35#[pymethods]
36impl KrakenFuturesWebSocketClient {
37    #[new]
38    #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
39    fn py_new(
40        environment: Option<KrakenEnvironment>,
41        base_url: Option<String>,
42        heartbeat_secs: Option<u64>,
43        api_key: Option<String>,
44        api_secret: Option<String>,
45    ) -> PyResult<Self> {
46        let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
47        let demo = env == KrakenEnvironment::Demo;
48        let url = base_url.unwrap_or_else(|| {
49            get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
50        });
51        let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
52
53        Ok(KrakenFuturesWebSocketClient::with_credentials(
54            url,
55            heartbeat_secs,
56            credential,
57        ))
58    }
59
60    #[getter]
61    #[pyo3(name = "has_credentials")]
62    #[must_use]
63    pub fn py_has_credentials(&self) -> bool {
64        self.has_credentials()
65    }
66
67    #[getter]
68    #[pyo3(name = "url")]
69    #[must_use]
70    pub fn py_url(&self) -> &str {
71        self.url()
72    }
73
74    #[pyo3(name = "is_closed")]
75    fn py_is_closed(&self) -> bool {
76        self.is_closed()
77    }
78
79    #[pyo3(name = "is_active")]
80    fn py_is_active(&self) -> bool {
81        self.is_active()
82    }
83
84    #[pyo3(name = "wait_until_active")]
85    fn py_wait_until_active<'py>(
86        &self,
87        py: Python<'py>,
88        timeout_secs: f64,
89    ) -> PyResult<Bound<'py, PyAny>> {
90        let client = self.clone();
91
92        pyo3_async_runtimes::tokio::future_into_py(py, async move {
93            client
94                .wait_until_active(timeout_secs)
95                .await
96                .map_err(to_pyruntime_err)?;
97            Ok(())
98        })
99    }
100
101    #[pyo3(name = "authenticate")]
102    fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
103        let client = self.clone();
104
105        pyo3_async_runtimes::tokio::future_into_py(py, async move {
106            client.authenticate().await.map_err(to_pyruntime_err)?;
107            Ok(())
108        })
109    }
110
111    #[pyo3(name = "cache_instruments")]
112    fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
113        let mut instruments_any = Vec::new();
114        for inst in instruments {
115            let inst_any = pyobject_to_instrument_any(py, inst)?;
116            instruments_any.push(inst_any);
117        }
118        self.cache_instruments(instruments_any);
119        Ok(())
120    }
121
122    #[pyo3(name = "cache_instrument")]
123    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
124        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
125        Ok(())
126    }
127
128    #[pyo3(name = "connect")]
129    fn py_connect<'py>(
130        &mut self,
131        py: Python<'py>,
132        instruments: Vec<Py<PyAny>>,
133        callback: Py<PyAny>,
134    ) -> PyResult<Bound<'py, PyAny>> {
135        let mut instruments_any = Vec::new();
136        for inst in instruments {
137            let inst_any = pyobject_to_instrument_any(py, inst)?;
138            instruments_any.push(inst_any);
139        }
140
141        let mut client = self.clone();
142
143        pyo3_async_runtimes::tokio::future_into_py(py, async move {
144            client.connect().await.map_err(to_pyruntime_err)?;
145
146            // Cache instruments after connection is established
147            client.cache_instruments(instruments_any);
148
149            // Take ownership of the receiver
150            if let Some(mut rx) = client.take_output_rx() {
151                tokio::spawn(async move {
152                    while let Some(msg) = rx.recv().await {
153                        Python::attach(|py| match msg {
154                            KrakenFuturesWsMessage::MarkPrice(update) => {
155                                let py_obj = data_to_pycapsule(py, Data::from(update));
156                                if let Err(e) = callback.call1(py, (py_obj,)) {
157                                    tracing::error!("Error calling Python callback: {e}");
158                                }
159                            }
160                            KrakenFuturesWsMessage::IndexPrice(update) => {
161                                let py_obj = data_to_pycapsule(py, Data::from(update));
162                                if let Err(e) = callback.call1(py, (py_obj,)) {
163                                    tracing::error!("Error calling Python callback: {e}");
164                                }
165                            }
166                            KrakenFuturesWsMessage::Quote(quote) => {
167                                let py_obj = data_to_pycapsule(py, Data::from(quote));
168                                if let Err(e) = callback.call1(py, (py_obj,)) {
169                                    tracing::error!("Error calling Python callback: {e}");
170                                }
171                            }
172                            KrakenFuturesWsMessage::Trade(trade) => {
173                                let py_obj = data_to_pycapsule(py, Data::from(trade));
174                                if let Err(e) = callback.call1(py, (py_obj,)) {
175                                    tracing::error!("Error calling Python callback: {e}");
176                                }
177                            }
178                            KrakenFuturesWsMessage::BookDeltas(deltas) => {
179                                let py_obj = data_to_pycapsule(
180                                    py,
181                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
182                                );
183                                if let Err(e) = callback.call1(py, (py_obj,)) {
184                                    tracing::error!("Error calling Python callback: {e}");
185                                }
186                            }
187                            KrakenFuturesWsMessage::OrderAccepted(event) => {
188                                match event.into_py_any(py) {
189                                    Ok(py_obj) => {
190                                        if let Err(e) = callback.call1(py, (py_obj,)) {
191                                            tracing::error!("Error calling Python callback: {e}");
192                                        }
193                                    }
194                                    Err(e) => {
195                                        tracing::error!(
196                                            "Failed to convert OrderAccepted to Python: {e}"
197                                        );
198                                    }
199                                }
200                            }
201                            KrakenFuturesWsMessage::OrderCanceled(event) => {
202                                match event.into_py_any(py) {
203                                    Ok(py_obj) => {
204                                        if let Err(e) = callback.call1(py, (py_obj,)) {
205                                            tracing::error!("Error calling Python callback: {e}");
206                                        }
207                                    }
208                                    Err(e) => {
209                                        tracing::error!(
210                                            "Failed to convert OrderCanceled to Python: {e}"
211                                        );
212                                    }
213                                }
214                            }
215                            KrakenFuturesWsMessage::OrderExpired(event) => {
216                                match event.into_py_any(py) {
217                                    Ok(py_obj) => {
218                                        if let Err(e) = callback.call1(py, (py_obj,)) {
219                                            tracing::error!("Error calling Python callback: {e}");
220                                        }
221                                    }
222                                    Err(e) => {
223                                        tracing::error!(
224                                            "Failed to convert OrderExpired to Python: {e}"
225                                        );
226                                    }
227                                }
228                            }
229                            KrakenFuturesWsMessage::OrderUpdated(event) => {
230                                match event.into_py_any(py) {
231                                    Ok(py_obj) => {
232                                        if let Err(e) = callback.call1(py, (py_obj,)) {
233                                            tracing::error!("Error calling Python callback: {e}");
234                                        }
235                                    }
236                                    Err(e) => {
237                                        tracing::error!(
238                                            "Failed to convert OrderUpdated to Python: {e}"
239                                        );
240                                    }
241                                }
242                            }
243                            KrakenFuturesWsMessage::OrderStatusReport(report) => {
244                                match (*report).into_py_any(py) {
245                                    Ok(py_obj) => {
246                                        if let Err(e) = callback.call1(py, (py_obj,)) {
247                                            tracing::error!("Error calling Python callback: {e}");
248                                        }
249                                    }
250                                    Err(e) => {
251                                        tracing::error!(
252                                            "Failed to convert OrderStatusReport to Python: {e}"
253                                        );
254                                    }
255                                }
256                            }
257                            KrakenFuturesWsMessage::FillReport(report) => {
258                                match (*report).into_py_any(py) {
259                                    Ok(py_obj) => {
260                                        if let Err(e) = callback.call1(py, (py_obj,)) {
261                                            tracing::error!("Error calling Python callback: {e}");
262                                        }
263                                    }
264                                    Err(e) => {
265                                        tracing::error!(
266                                            "Failed to convert FillReport to Python: {e}"
267                                        );
268                                    }
269                                }
270                            }
271                            KrakenFuturesWsMessage::Reconnected => {
272                                tracing::info!("WebSocket reconnected");
273                            }
274                        });
275                    }
276                });
277            }
278
279            Ok(())
280        })
281    }
282
283    #[pyo3(name = "disconnect")]
284    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
285        let mut client = self.clone();
286
287        pyo3_async_runtimes::tokio::future_into_py(py, async move {
288            client.disconnect().await.map_err(to_pyruntime_err)?;
289            Ok(())
290        })
291    }
292
293    #[pyo3(name = "close")]
294    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
295        let mut client = self.clone();
296
297        pyo3_async_runtimes::tokio::future_into_py(py, async move {
298            client.close().await.map_err(to_pyruntime_err)?;
299            Ok(())
300        })
301    }
302
303    #[pyo3(name = "subscribe_book")]
304    #[pyo3(signature = (instrument_id, depth=None))]
305    fn py_subscribe_book<'py>(
306        &self,
307        py: Python<'py>,
308        instrument_id: InstrumentId,
309        depth: Option<u32>,
310    ) -> PyResult<Bound<'py, PyAny>> {
311        let client = self.clone();
312
313        pyo3_async_runtimes::tokio::future_into_py(py, async move {
314            client
315                .subscribe_book(instrument_id, depth)
316                .await
317                .map_err(to_pyruntime_err)?;
318            Ok(())
319        })
320    }
321
322    #[pyo3(name = "subscribe_quotes")]
323    fn py_subscribe_quotes<'py>(
324        &self,
325        py: Python<'py>,
326        instrument_id: InstrumentId,
327    ) -> PyResult<Bound<'py, PyAny>> {
328        let client = self.clone();
329
330        pyo3_async_runtimes::tokio::future_into_py(py, async move {
331            client
332                .subscribe_quotes(instrument_id)
333                .await
334                .map_err(to_pyruntime_err)?;
335            Ok(())
336        })
337    }
338
339    #[pyo3(name = "subscribe_trades")]
340    fn py_subscribe_trades<'py>(
341        &self,
342        py: Python<'py>,
343        instrument_id: InstrumentId,
344    ) -> PyResult<Bound<'py, PyAny>> {
345        let client = self.clone();
346
347        pyo3_async_runtimes::tokio::future_into_py(py, async move {
348            client
349                .subscribe_trades(instrument_id)
350                .await
351                .map_err(to_pyruntime_err)?;
352            Ok(())
353        })
354    }
355
356    #[pyo3(name = "subscribe_mark_price")]
357    fn py_subscribe_mark_price<'py>(
358        &self,
359        py: Python<'py>,
360        instrument_id: InstrumentId,
361    ) -> PyResult<Bound<'py, PyAny>> {
362        let client = self.clone();
363
364        pyo3_async_runtimes::tokio::future_into_py(py, async move {
365            client
366                .subscribe_mark_price(instrument_id)
367                .await
368                .map_err(to_pyruntime_err)?;
369            Ok(())
370        })
371    }
372
373    #[pyo3(name = "subscribe_index_price")]
374    fn py_subscribe_index_price<'py>(
375        &self,
376        py: Python<'py>,
377        instrument_id: InstrumentId,
378    ) -> PyResult<Bound<'py, PyAny>> {
379        let client = self.clone();
380
381        pyo3_async_runtimes::tokio::future_into_py(py, async move {
382            client
383                .subscribe_index_price(instrument_id)
384                .await
385                .map_err(to_pyruntime_err)?;
386            Ok(())
387        })
388    }
389
390    #[pyo3(name = "unsubscribe_book")]
391    fn py_unsubscribe_book<'py>(
392        &self,
393        py: Python<'py>,
394        instrument_id: InstrumentId,
395    ) -> PyResult<Bound<'py, PyAny>> {
396        let client = self.clone();
397
398        pyo3_async_runtimes::tokio::future_into_py(py, async move {
399            client
400                .unsubscribe_book(instrument_id)
401                .await
402                .map_err(to_pyruntime_err)?;
403            Ok(())
404        })
405    }
406
407    #[pyo3(name = "unsubscribe_quotes")]
408    fn py_unsubscribe_quotes<'py>(
409        &self,
410        py: Python<'py>,
411        instrument_id: InstrumentId,
412    ) -> PyResult<Bound<'py, PyAny>> {
413        let client = self.clone();
414
415        pyo3_async_runtimes::tokio::future_into_py(py, async move {
416            client
417                .unsubscribe_quotes(instrument_id)
418                .await
419                .map_err(to_pyruntime_err)?;
420            Ok(())
421        })
422    }
423
424    #[pyo3(name = "unsubscribe_trades")]
425    fn py_unsubscribe_trades<'py>(
426        &self,
427        py: Python<'py>,
428        instrument_id: InstrumentId,
429    ) -> PyResult<Bound<'py, PyAny>> {
430        let client = self.clone();
431
432        pyo3_async_runtimes::tokio::future_into_py(py, async move {
433            client
434                .unsubscribe_trades(instrument_id)
435                .await
436                .map_err(to_pyruntime_err)?;
437            Ok(())
438        })
439    }
440
441    #[pyo3(name = "unsubscribe_mark_price")]
442    fn py_unsubscribe_mark_price<'py>(
443        &self,
444        py: Python<'py>,
445        instrument_id: InstrumentId,
446    ) -> PyResult<Bound<'py, PyAny>> {
447        let client = self.clone();
448
449        pyo3_async_runtimes::tokio::future_into_py(py, async move {
450            client
451                .unsubscribe_mark_price(instrument_id)
452                .await
453                .map_err(to_pyruntime_err)?;
454            Ok(())
455        })
456    }
457
458    #[pyo3(name = "unsubscribe_index_price")]
459    fn py_unsubscribe_index_price<'py>(
460        &self,
461        py: Python<'py>,
462        instrument_id: InstrumentId,
463    ) -> PyResult<Bound<'py, PyAny>> {
464        let client = self.clone();
465
466        pyo3_async_runtimes::tokio::future_into_py(py, async move {
467            client
468                .unsubscribe_index_price(instrument_id)
469                .await
470                .map_err(to_pyruntime_err)?;
471            Ok(())
472        })
473    }
474
475    #[pyo3(name = "set_account_id")]
476    fn py_set_account_id(&self, account_id: AccountId) {
477        self.set_account_id(account_id);
478    }
479
480    #[pyo3(name = "cache_client_order")]
481    fn py_cache_client_order(
482        &self,
483        client_order_id: ClientOrderId,
484        venue_order_id: Option<VenueOrderId>,
485        instrument_id: InstrumentId,
486        trader_id: TraderId,
487        strategy_id: StrategyId,
488    ) {
489        self.cache_client_order(
490            client_order_id,
491            venue_order_id,
492            instrument_id,
493            trader_id,
494            strategy_id,
495        );
496    }
497
498    #[pyo3(name = "sign_challenge")]
499    fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
500        self.sign_challenge(challenge).map_err(to_pyruntime_err)
501    }
502
503    #[pyo3(name = "authenticate_with_challenge")]
504    fn py_authenticate_with_challenge<'py>(
505        &self,
506        py: Python<'py>,
507        challenge: String,
508    ) -> PyResult<Bound<'py, PyAny>> {
509        let client = self.clone();
510
511        pyo3_async_runtimes::tokio::future_into_py(py, async move {
512            client
513                .authenticate_with_challenge(&challenge)
514                .await
515                .map_err(to_pyruntime_err)?;
516            Ok(())
517        })
518    }
519
520    #[pyo3(name = "set_auth_credentials")]
521    fn py_set_auth_credentials<'py>(
522        &self,
523        py: Python<'py>,
524        original_challenge: String,
525        signed_challenge: String,
526    ) -> PyResult<Bound<'py, PyAny>> {
527        let client = self.clone();
528
529        pyo3_async_runtimes::tokio::future_into_py(py, async move {
530            client
531                .set_auth_credentials(original_challenge, signed_challenge)
532                .await
533                .map_err(to_pyruntime_err)?;
534            Ok(())
535        })
536    }
537
538    #[pyo3(name = "subscribe_open_orders")]
539    fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
540        let client = self.clone();
541
542        pyo3_async_runtimes::tokio::future_into_py(py, async move {
543            client
544                .subscribe_open_orders()
545                .await
546                .map_err(to_pyruntime_err)?;
547            Ok(())
548        })
549    }
550
551    #[pyo3(name = "subscribe_fills")]
552    fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
553        let client = self.clone();
554
555        pyo3_async_runtimes::tokio::future_into_py(py, async move {
556            client.subscribe_fills().await.map_err(to_pyruntime_err)?;
557            Ok(())
558        })
559    }
560
561    #[pyo3(name = "subscribe_executions")]
562    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
563        let client = self.clone();
564
565        pyo3_async_runtimes::tokio::future_into_py(py, async move {
566            client
567                .subscribe_executions()
568                .await
569                .map_err(to_pyruntime_err)?;
570            Ok(())
571        })
572    }
573}