1use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21 data::{Data, OrderBookDeltas_API},
22 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{IntoPyObjectExt, prelude::*};
26
27use crate::{
28 common::{
29 credential::KrakenCredential,
30 enums::{KrakenEnvironment, KrakenProductType},
31 urls::get_kraken_ws_public_url,
32 },
33 websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
34};
35
36#[pymethods]
37impl KrakenFuturesWebSocketClient {
38 #[new]
39 #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
40 fn py_new(
41 environment: Option<KrakenEnvironment>,
42 base_url: Option<String>,
43 heartbeat_secs: Option<u64>,
44 api_key: Option<String>,
45 api_secret: Option<String>,
46 ) -> PyResult<Self> {
47 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
48 let demo = env == KrakenEnvironment::Demo;
49 let url = base_url.unwrap_or_else(|| {
50 get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
51 });
52 let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
53
54 Ok(Self::with_credentials(url, heartbeat_secs, credential))
55 }
56
57 #[getter]
58 #[pyo3(name = "has_credentials")]
59 #[must_use]
60 pub fn py_has_credentials(&self) -> bool {
61 self.has_credentials()
62 }
63
64 #[getter]
65 #[pyo3(name = "url")]
66 #[must_use]
67 pub fn py_url(&self) -> &str {
68 self.url()
69 }
70
71 #[pyo3(name = "is_closed")]
72 fn py_is_closed(&self) -> bool {
73 self.is_closed()
74 }
75
76 #[pyo3(name = "is_active")]
77 fn py_is_active(&self) -> bool {
78 self.is_active()
79 }
80
81 #[pyo3(name = "wait_until_active")]
82 fn py_wait_until_active<'py>(
83 &self,
84 py: Python<'py>,
85 timeout_secs: f64,
86 ) -> PyResult<Bound<'py, PyAny>> {
87 let client = self.clone();
88
89 pyo3_async_runtimes::tokio::future_into_py(py, async move {
90 client
91 .wait_until_active(timeout_secs)
92 .await
93 .map_err(to_pyruntime_err)?;
94 Ok(())
95 })
96 }
97
98 #[pyo3(name = "authenticate")]
99 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
100 let client = self.clone();
101
102 pyo3_async_runtimes::tokio::future_into_py(py, async move {
103 client.authenticate().await.map_err(to_pyruntime_err)?;
104 Ok(())
105 })
106 }
107
108 #[pyo3(name = "cache_instruments")]
109 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
110 let mut instruments_any = Vec::new();
111 for inst in instruments {
112 let inst_any = pyobject_to_instrument_any(py, inst)?;
113 instruments_any.push(inst_any);
114 }
115 self.cache_instruments(instruments_any);
116 Ok(())
117 }
118
119 #[pyo3(name = "cache_instrument")]
120 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
121 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
122 Ok(())
123 }
124
125 #[pyo3(name = "connect")]
126 fn py_connect<'py>(
127 &mut self,
128 py: Python<'py>,
129 instruments: Vec<Py<PyAny>>,
130 callback: Py<PyAny>,
131 ) -> PyResult<Bound<'py, PyAny>> {
132 let mut instruments_any = Vec::new();
133 for inst in instruments {
134 let inst_any = pyobject_to_instrument_any(py, inst)?;
135 instruments_any.push(inst_any);
136 }
137
138 let mut client = self.clone();
139
140 pyo3_async_runtimes::tokio::future_into_py(py, async move {
141 client.connect().await.map_err(to_pyruntime_err)?;
142
143 client.cache_instruments(instruments_any);
145
146 if let Some(mut rx) = client.take_output_rx() {
148 get_runtime().spawn(async move {
149 while let Some(msg) = rx.recv().await {
150 Python::attach(|py| match msg {
151 KrakenFuturesWsMessage::MarkPrice(update) => {
152 let py_obj = data_to_pycapsule(py, Data::from(update));
153 if let Err(e) = callback.call1(py, (py_obj,)) {
154 log::error!("Error calling Python callback: {e}");
155 }
156 }
157 KrakenFuturesWsMessage::IndexPrice(update) => {
158 let py_obj = data_to_pycapsule(py, Data::from(update));
159 if let Err(e) = callback.call1(py, (py_obj,)) {
160 log::error!("Error calling Python callback: {e}");
161 }
162 }
163 KrakenFuturesWsMessage::FundingRate(update) => {
164 match update.into_py_any(py) {
165 Ok(py_obj) => {
166 if let Err(e) = callback.call1(py, (py_obj,)) {
167 log::error!("Error calling Python callback: {e}");
168 }
169 }
170 Err(e) => {
171 log::error!(
172 "Failed to convert FundingRateUpdate to Python: {e}"
173 );
174 }
175 }
176 }
177 KrakenFuturesWsMessage::Quote(quote) => {
178 let py_obj = data_to_pycapsule(py, Data::from(quote));
179 if let Err(e) = callback.call1(py, (py_obj,)) {
180 log::error!("Error calling Python callback: {e}");
181 }
182 }
183 KrakenFuturesWsMessage::Trade(trade) => {
184 let py_obj = data_to_pycapsule(py, Data::from(trade));
185 if let Err(e) = callback.call1(py, (py_obj,)) {
186 log::error!("Error calling Python callback: {e}");
187 }
188 }
189 KrakenFuturesWsMessage::BookDeltas(deltas) => {
190 let py_obj = data_to_pycapsule(
191 py,
192 Data::Deltas(OrderBookDeltas_API::new(deltas)),
193 );
194 if let Err(e) = callback.call1(py, (py_obj,)) {
195 log::error!("Error calling Python callback: {e}");
196 }
197 }
198 KrakenFuturesWsMessage::OrderAccepted(event) => {
199 match event.into_py_any(py) {
200 Ok(py_obj) => {
201 if let Err(e) = callback.call1(py, (py_obj,)) {
202 log::error!("Error calling Python callback: {e}");
203 }
204 }
205 Err(e) => {
206 log::error!(
207 "Failed to convert OrderAccepted to Python: {e}"
208 );
209 }
210 }
211 }
212 KrakenFuturesWsMessage::OrderCanceled(event) => {
213 match event.into_py_any(py) {
214 Ok(py_obj) => {
215 if let Err(e) = callback.call1(py, (py_obj,)) {
216 log::error!("Error calling Python callback: {e}");
217 }
218 }
219 Err(e) => {
220 log::error!(
221 "Failed to convert OrderCanceled to Python: {e}"
222 );
223 }
224 }
225 }
226 KrakenFuturesWsMessage::OrderExpired(event) => {
227 match event.into_py_any(py) {
228 Ok(py_obj) => {
229 if let Err(e) = callback.call1(py, (py_obj,)) {
230 log::error!("Error calling Python callback: {e}");
231 }
232 }
233 Err(e) => {
234 log::error!(
235 "Failed to convert OrderExpired to Python: {e}"
236 );
237 }
238 }
239 }
240 KrakenFuturesWsMessage::OrderUpdated(event) => {
241 match event.into_py_any(py) {
242 Ok(py_obj) => {
243 if let Err(e) = callback.call1(py, (py_obj,)) {
244 log::error!("Error calling Python callback: {e}");
245 }
246 }
247 Err(e) => {
248 log::error!(
249 "Failed to convert OrderUpdated to Python: {e}"
250 );
251 }
252 }
253 }
254 KrakenFuturesWsMessage::OrderStatusReport(report) => {
255 match (*report).into_py_any(py) {
256 Ok(py_obj) => {
257 if let Err(e) = callback.call1(py, (py_obj,)) {
258 log::error!("Error calling Python callback: {e}");
259 }
260 }
261 Err(e) => {
262 log::error!(
263 "Failed to convert OrderStatusReport to Python: {e}"
264 );
265 }
266 }
267 }
268 KrakenFuturesWsMessage::FillReport(report) => {
269 match (*report).into_py_any(py) {
270 Ok(py_obj) => {
271 if let Err(e) = callback.call1(py, (py_obj,)) {
272 log::error!("Error calling Python callback: {e}");
273 }
274 }
275 Err(e) => {
276 log::error!("Failed to convert FillReport to Python: {e}");
277 }
278 }
279 }
280 KrakenFuturesWsMessage::Reconnected => {
281 log::info!("WebSocket reconnected");
282 }
283 });
284 }
285 });
286 }
287
288 Ok(())
289 })
290 }
291
292 #[pyo3(name = "disconnect")]
293 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
294 let mut client = self.clone();
295
296 pyo3_async_runtimes::tokio::future_into_py(py, async move {
297 client.disconnect().await.map_err(to_pyruntime_err)?;
298 Ok(())
299 })
300 }
301
302 #[pyo3(name = "close")]
303 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
304 let mut client = self.clone();
305
306 pyo3_async_runtimes::tokio::future_into_py(py, async move {
307 client.close().await.map_err(to_pyruntime_err)?;
308 Ok(())
309 })
310 }
311
312 #[pyo3(name = "subscribe_book")]
313 #[pyo3(signature = (instrument_id, depth=None))]
314 fn py_subscribe_book<'py>(
315 &self,
316 py: Python<'py>,
317 instrument_id: InstrumentId,
318 depth: Option<u32>,
319 ) -> PyResult<Bound<'py, PyAny>> {
320 let client = self.clone();
321
322 pyo3_async_runtimes::tokio::future_into_py(py, async move {
323 client
324 .subscribe_book(instrument_id, depth)
325 .await
326 .map_err(to_pyruntime_err)?;
327 Ok(())
328 })
329 }
330
331 #[pyo3(name = "subscribe_quotes")]
332 fn py_subscribe_quotes<'py>(
333 &self,
334 py: Python<'py>,
335 instrument_id: InstrumentId,
336 ) -> PyResult<Bound<'py, PyAny>> {
337 let client = self.clone();
338
339 pyo3_async_runtimes::tokio::future_into_py(py, async move {
340 client
341 .subscribe_quotes(instrument_id)
342 .await
343 .map_err(to_pyruntime_err)?;
344 Ok(())
345 })
346 }
347
348 #[pyo3(name = "subscribe_trades")]
349 fn py_subscribe_trades<'py>(
350 &self,
351 py: Python<'py>,
352 instrument_id: InstrumentId,
353 ) -> PyResult<Bound<'py, PyAny>> {
354 let client = self.clone();
355
356 pyo3_async_runtimes::tokio::future_into_py(py, async move {
357 client
358 .subscribe_trades(instrument_id)
359 .await
360 .map_err(to_pyruntime_err)?;
361 Ok(())
362 })
363 }
364
365 #[pyo3(name = "subscribe_mark_price")]
366 fn py_subscribe_mark_price<'py>(
367 &self,
368 py: Python<'py>,
369 instrument_id: InstrumentId,
370 ) -> PyResult<Bound<'py, PyAny>> {
371 let client = self.clone();
372
373 pyo3_async_runtimes::tokio::future_into_py(py, async move {
374 client
375 .subscribe_mark_price(instrument_id)
376 .await
377 .map_err(to_pyruntime_err)?;
378 Ok(())
379 })
380 }
381
382 #[pyo3(name = "subscribe_index_price")]
383 fn py_subscribe_index_price<'py>(
384 &self,
385 py: Python<'py>,
386 instrument_id: InstrumentId,
387 ) -> PyResult<Bound<'py, PyAny>> {
388 let client = self.clone();
389
390 pyo3_async_runtimes::tokio::future_into_py(py, async move {
391 client
392 .subscribe_index_price(instrument_id)
393 .await
394 .map_err(to_pyruntime_err)?;
395 Ok(())
396 })
397 }
398
399 #[pyo3(name = "subscribe_funding_rate")]
400 fn py_subscribe_funding_rate<'py>(
401 &self,
402 py: Python<'py>,
403 instrument_id: InstrumentId,
404 ) -> PyResult<Bound<'py, PyAny>> {
405 let client = self.clone();
406
407 pyo3_async_runtimes::tokio::future_into_py(py, async move {
408 client
409 .subscribe_funding_rate(instrument_id)
410 .await
411 .map_err(to_pyruntime_err)?;
412 Ok(())
413 })
414 }
415
416 #[pyo3(name = "unsubscribe_book")]
417 fn py_unsubscribe_book<'py>(
418 &self,
419 py: Python<'py>,
420 instrument_id: InstrumentId,
421 ) -> PyResult<Bound<'py, PyAny>> {
422 let client = self.clone();
423
424 pyo3_async_runtimes::tokio::future_into_py(py, async move {
425 client
426 .unsubscribe_book(instrument_id)
427 .await
428 .map_err(to_pyruntime_err)?;
429 Ok(())
430 })
431 }
432
433 #[pyo3(name = "unsubscribe_quotes")]
434 fn py_unsubscribe_quotes<'py>(
435 &self,
436 py: Python<'py>,
437 instrument_id: InstrumentId,
438 ) -> PyResult<Bound<'py, PyAny>> {
439 let client = self.clone();
440
441 pyo3_async_runtimes::tokio::future_into_py(py, async move {
442 client
443 .unsubscribe_quotes(instrument_id)
444 .await
445 .map_err(to_pyruntime_err)?;
446 Ok(())
447 })
448 }
449
450 #[pyo3(name = "unsubscribe_trades")]
451 fn py_unsubscribe_trades<'py>(
452 &self,
453 py: Python<'py>,
454 instrument_id: InstrumentId,
455 ) -> PyResult<Bound<'py, PyAny>> {
456 let client = self.clone();
457
458 pyo3_async_runtimes::tokio::future_into_py(py, async move {
459 client
460 .unsubscribe_trades(instrument_id)
461 .await
462 .map_err(to_pyruntime_err)?;
463 Ok(())
464 })
465 }
466
467 #[pyo3(name = "unsubscribe_mark_price")]
468 fn py_unsubscribe_mark_price<'py>(
469 &self,
470 py: Python<'py>,
471 instrument_id: InstrumentId,
472 ) -> PyResult<Bound<'py, PyAny>> {
473 let client = self.clone();
474
475 pyo3_async_runtimes::tokio::future_into_py(py, async move {
476 client
477 .unsubscribe_mark_price(instrument_id)
478 .await
479 .map_err(to_pyruntime_err)?;
480 Ok(())
481 })
482 }
483
484 #[pyo3(name = "unsubscribe_index_price")]
485 fn py_unsubscribe_index_price<'py>(
486 &self,
487 py: Python<'py>,
488 instrument_id: InstrumentId,
489 ) -> PyResult<Bound<'py, PyAny>> {
490 let client = self.clone();
491
492 pyo3_async_runtimes::tokio::future_into_py(py, async move {
493 client
494 .unsubscribe_index_price(instrument_id)
495 .await
496 .map_err(to_pyruntime_err)?;
497 Ok(())
498 })
499 }
500
501 #[pyo3(name = "unsubscribe_funding_rate")]
502 fn py_unsubscribe_funding_rate<'py>(
503 &self,
504 py: Python<'py>,
505 instrument_id: InstrumentId,
506 ) -> PyResult<Bound<'py, PyAny>> {
507 let client = self.clone();
508
509 pyo3_async_runtimes::tokio::future_into_py(py, async move {
510 client
511 .unsubscribe_funding_rate(instrument_id)
512 .await
513 .map_err(to_pyruntime_err)?;
514 Ok(())
515 })
516 }
517
518 #[pyo3(name = "set_account_id")]
519 fn py_set_account_id(&self, account_id: AccountId) {
520 self.set_account_id(account_id);
521 }
522
523 #[pyo3(name = "cache_client_order")]
524 fn py_cache_client_order(
525 &self,
526 client_order_id: ClientOrderId,
527 venue_order_id: Option<VenueOrderId>,
528 instrument_id: InstrumentId,
529 trader_id: TraderId,
530 strategy_id: StrategyId,
531 ) {
532 self.cache_client_order(
533 client_order_id,
534 venue_order_id,
535 instrument_id,
536 trader_id,
537 strategy_id,
538 );
539 }
540
541 #[pyo3(name = "sign_challenge")]
542 fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
543 self.sign_challenge(challenge).map_err(to_pyruntime_err)
544 }
545
546 #[pyo3(name = "authenticate_with_challenge")]
547 fn py_authenticate_with_challenge<'py>(
548 &self,
549 py: Python<'py>,
550 challenge: String,
551 ) -> PyResult<Bound<'py, PyAny>> {
552 let client = self.clone();
553
554 pyo3_async_runtimes::tokio::future_into_py(py, async move {
555 client
556 .authenticate_with_challenge(&challenge)
557 .await
558 .map_err(to_pyruntime_err)?;
559 Ok(())
560 })
561 }
562
563 #[pyo3(name = "set_auth_credentials")]
564 fn py_set_auth_credentials<'py>(
565 &self,
566 py: Python<'py>,
567 original_challenge: String,
568 signed_challenge: String,
569 ) -> PyResult<Bound<'py, PyAny>> {
570 let client = self.clone();
571
572 pyo3_async_runtimes::tokio::future_into_py(py, async move {
573 client
574 .set_auth_credentials(original_challenge, signed_challenge)
575 .await
576 .map_err(to_pyruntime_err)?;
577 Ok(())
578 })
579 }
580
581 #[pyo3(name = "subscribe_open_orders")]
582 fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
583 let client = self.clone();
584
585 pyo3_async_runtimes::tokio::future_into_py(py, async move {
586 client
587 .subscribe_open_orders()
588 .await
589 .map_err(to_pyruntime_err)?;
590 Ok(())
591 })
592 }
593
594 #[pyo3(name = "subscribe_fills")]
595 fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
596 let client = self.clone();
597
598 pyo3_async_runtimes::tokio::future_into_py(py, async move {
599 client.subscribe_fills().await.map_err(to_pyruntime_err)?;
600 Ok(())
601 })
602 }
603
604 #[pyo3(name = "subscribe_executions")]
605 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
606 let client = self.clone();
607
608 pyo3_async_runtimes::tokio::future_into_py(py, async move {
609 client
610 .subscribe_executions()
611 .await
612 .map_err(to_pyruntime_err)?;
613 Ok(())
614 })
615 }
616}