1use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21 data::{Data, OrderBookDeltas_API},
22 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{IntoPyObjectExt, prelude::*};
26
27use crate::{
28 common::{
29 credential::KrakenCredential,
30 enums::{KrakenEnvironment, KrakenProductType},
31 urls::get_kraken_ws_public_url,
32 },
33 websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
34};
35
36#[pymethods]
37impl KrakenFuturesWebSocketClient {
38 #[new]
39 #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
40 fn py_new(
41 environment: Option<KrakenEnvironment>,
42 base_url: Option<String>,
43 heartbeat_secs: Option<u64>,
44 api_key: Option<String>,
45 api_secret: Option<String>,
46 ) -> PyResult<Self> {
47 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
48 let demo = env == KrakenEnvironment::Demo;
49 let url = base_url.unwrap_or_else(|| {
50 get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
51 });
52 let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
53
54 Ok(Self::with_credentials(url, heartbeat_secs, credential))
55 }
56
57 #[getter]
58 #[pyo3(name = "has_credentials")]
59 #[must_use]
60 pub fn py_has_credentials(&self) -> bool {
61 self.has_credentials()
62 }
63
64 #[getter]
65 #[pyo3(name = "url")]
66 #[must_use]
67 pub fn py_url(&self) -> &str {
68 self.url()
69 }
70
71 #[pyo3(name = "is_closed")]
72 fn py_is_closed(&self) -> bool {
73 self.is_closed()
74 }
75
76 #[pyo3(name = "is_active")]
77 fn py_is_active(&self) -> bool {
78 self.is_active()
79 }
80
81 #[pyo3(name = "wait_until_active")]
82 fn py_wait_until_active<'py>(
83 &self,
84 py: Python<'py>,
85 timeout_secs: f64,
86 ) -> PyResult<Bound<'py, PyAny>> {
87 let client = self.clone();
88
89 pyo3_async_runtimes::tokio::future_into_py(py, async move {
90 client
91 .wait_until_active(timeout_secs)
92 .await
93 .map_err(to_pyruntime_err)?;
94 Ok(())
95 })
96 }
97
98 #[pyo3(name = "authenticate")]
99 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
100 let client = self.clone();
101
102 pyo3_async_runtimes::tokio::future_into_py(py, async move {
103 client.authenticate().await.map_err(to_pyruntime_err)?;
104 Ok(())
105 })
106 }
107
108 #[pyo3(name = "cache_instruments")]
109 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
110 let mut instruments_any = Vec::new();
111 for inst in instruments {
112 let inst_any = pyobject_to_instrument_any(py, inst)?;
113 instruments_any.push(inst_any);
114 }
115 self.cache_instruments(instruments_any);
116 Ok(())
117 }
118
119 #[pyo3(name = "cache_instrument")]
120 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
121 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
122 Ok(())
123 }
124
125 #[pyo3(name = "connect")]
126 fn py_connect<'py>(
127 &mut self,
128 py: Python<'py>,
129 instruments: Vec<Py<PyAny>>,
130 callback: Py<PyAny>,
131 ) -> PyResult<Bound<'py, PyAny>> {
132 let mut instruments_any = Vec::new();
133 for inst in instruments {
134 let inst_any = pyobject_to_instrument_any(py, inst)?;
135 instruments_any.push(inst_any);
136 }
137
138 let mut client = self.clone();
139
140 pyo3_async_runtimes::tokio::future_into_py(py, async move {
141 client.connect().await.map_err(to_pyruntime_err)?;
142
143 client.cache_instruments(instruments_any);
145
146 if let Some(mut rx) = client.take_output_rx() {
148 get_runtime().spawn(async move {
149 while let Some(msg) = rx.recv().await {
150 Python::attach(|py| match msg {
151 KrakenFuturesWsMessage::MarkPrice(update) => {
152 let py_obj = data_to_pycapsule(py, Data::from(update));
153 if let Err(e) = callback.call1(py, (py_obj,)) {
154 log::error!("Error calling Python callback: {e}");
155 }
156 }
157 KrakenFuturesWsMessage::IndexPrice(update) => {
158 let py_obj = data_to_pycapsule(py, Data::from(update));
159 if let Err(e) = callback.call1(py, (py_obj,)) {
160 log::error!("Error calling Python callback: {e}");
161 }
162 }
163 KrakenFuturesWsMessage::Quote(quote) => {
164 let py_obj = data_to_pycapsule(py, Data::from(quote));
165 if let Err(e) = callback.call1(py, (py_obj,)) {
166 log::error!("Error calling Python callback: {e}");
167 }
168 }
169 KrakenFuturesWsMessage::Trade(trade) => {
170 let py_obj = data_to_pycapsule(py, Data::from(trade));
171 if let Err(e) = callback.call1(py, (py_obj,)) {
172 log::error!("Error calling Python callback: {e}");
173 }
174 }
175 KrakenFuturesWsMessage::BookDeltas(deltas) => {
176 let py_obj = data_to_pycapsule(
177 py,
178 Data::Deltas(OrderBookDeltas_API::new(deltas)),
179 );
180 if let Err(e) = callback.call1(py, (py_obj,)) {
181 log::error!("Error calling Python callback: {e}");
182 }
183 }
184 KrakenFuturesWsMessage::OrderAccepted(event) => {
185 match event.into_py_any(py) {
186 Ok(py_obj) => {
187 if let Err(e) = callback.call1(py, (py_obj,)) {
188 log::error!("Error calling Python callback: {e}");
189 }
190 }
191 Err(e) => {
192 log::error!(
193 "Failed to convert OrderAccepted to Python: {e}"
194 );
195 }
196 }
197 }
198 KrakenFuturesWsMessage::OrderCanceled(event) => {
199 match event.into_py_any(py) {
200 Ok(py_obj) => {
201 if let Err(e) = callback.call1(py, (py_obj,)) {
202 log::error!("Error calling Python callback: {e}");
203 }
204 }
205 Err(e) => {
206 log::error!(
207 "Failed to convert OrderCanceled to Python: {e}"
208 );
209 }
210 }
211 }
212 KrakenFuturesWsMessage::OrderExpired(event) => {
213 match event.into_py_any(py) {
214 Ok(py_obj) => {
215 if let Err(e) = callback.call1(py, (py_obj,)) {
216 log::error!("Error calling Python callback: {e}");
217 }
218 }
219 Err(e) => {
220 log::error!(
221 "Failed to convert OrderExpired to Python: {e}"
222 );
223 }
224 }
225 }
226 KrakenFuturesWsMessage::OrderUpdated(event) => {
227 match event.into_py_any(py) {
228 Ok(py_obj) => {
229 if let Err(e) = callback.call1(py, (py_obj,)) {
230 log::error!("Error calling Python callback: {e}");
231 }
232 }
233 Err(e) => {
234 log::error!(
235 "Failed to convert OrderUpdated to Python: {e}"
236 );
237 }
238 }
239 }
240 KrakenFuturesWsMessage::OrderStatusReport(report) => {
241 match (*report).into_py_any(py) {
242 Ok(py_obj) => {
243 if let Err(e) = callback.call1(py, (py_obj,)) {
244 log::error!("Error calling Python callback: {e}");
245 }
246 }
247 Err(e) => {
248 log::error!(
249 "Failed to convert OrderStatusReport to Python: {e}"
250 );
251 }
252 }
253 }
254 KrakenFuturesWsMessage::FillReport(report) => {
255 match (*report).into_py_any(py) {
256 Ok(py_obj) => {
257 if let Err(e) = callback.call1(py, (py_obj,)) {
258 log::error!("Error calling Python callback: {e}");
259 }
260 }
261 Err(e) => {
262 log::error!("Failed to convert FillReport to Python: {e}");
263 }
264 }
265 }
266 KrakenFuturesWsMessage::Reconnected => {
267 log::info!("WebSocket reconnected");
268 }
269 });
270 }
271 });
272 }
273
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "disconnect")]
279 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
280 let mut client = self.clone();
281
282 pyo3_async_runtimes::tokio::future_into_py(py, async move {
283 client.disconnect().await.map_err(to_pyruntime_err)?;
284 Ok(())
285 })
286 }
287
288 #[pyo3(name = "close")]
289 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
290 let mut client = self.clone();
291
292 pyo3_async_runtimes::tokio::future_into_py(py, async move {
293 client.close().await.map_err(to_pyruntime_err)?;
294 Ok(())
295 })
296 }
297
298 #[pyo3(name = "subscribe_book")]
299 #[pyo3(signature = (instrument_id, depth=None))]
300 fn py_subscribe_book<'py>(
301 &self,
302 py: Python<'py>,
303 instrument_id: InstrumentId,
304 depth: Option<u32>,
305 ) -> PyResult<Bound<'py, PyAny>> {
306 let client = self.clone();
307
308 pyo3_async_runtimes::tokio::future_into_py(py, async move {
309 client
310 .subscribe_book(instrument_id, depth)
311 .await
312 .map_err(to_pyruntime_err)?;
313 Ok(())
314 })
315 }
316
317 #[pyo3(name = "subscribe_quotes")]
318 fn py_subscribe_quotes<'py>(
319 &self,
320 py: Python<'py>,
321 instrument_id: InstrumentId,
322 ) -> PyResult<Bound<'py, PyAny>> {
323 let client = self.clone();
324
325 pyo3_async_runtimes::tokio::future_into_py(py, async move {
326 client
327 .subscribe_quotes(instrument_id)
328 .await
329 .map_err(to_pyruntime_err)?;
330 Ok(())
331 })
332 }
333
334 #[pyo3(name = "subscribe_trades")]
335 fn py_subscribe_trades<'py>(
336 &self,
337 py: Python<'py>,
338 instrument_id: InstrumentId,
339 ) -> PyResult<Bound<'py, PyAny>> {
340 let client = self.clone();
341
342 pyo3_async_runtimes::tokio::future_into_py(py, async move {
343 client
344 .subscribe_trades(instrument_id)
345 .await
346 .map_err(to_pyruntime_err)?;
347 Ok(())
348 })
349 }
350
351 #[pyo3(name = "subscribe_mark_price")]
352 fn py_subscribe_mark_price<'py>(
353 &self,
354 py: Python<'py>,
355 instrument_id: InstrumentId,
356 ) -> PyResult<Bound<'py, PyAny>> {
357 let client = self.clone();
358
359 pyo3_async_runtimes::tokio::future_into_py(py, async move {
360 client
361 .subscribe_mark_price(instrument_id)
362 .await
363 .map_err(to_pyruntime_err)?;
364 Ok(())
365 })
366 }
367
368 #[pyo3(name = "subscribe_index_price")]
369 fn py_subscribe_index_price<'py>(
370 &self,
371 py: Python<'py>,
372 instrument_id: InstrumentId,
373 ) -> PyResult<Bound<'py, PyAny>> {
374 let client = self.clone();
375
376 pyo3_async_runtimes::tokio::future_into_py(py, async move {
377 client
378 .subscribe_index_price(instrument_id)
379 .await
380 .map_err(to_pyruntime_err)?;
381 Ok(())
382 })
383 }
384
385 #[pyo3(name = "unsubscribe_book")]
386 fn py_unsubscribe_book<'py>(
387 &self,
388 py: Python<'py>,
389 instrument_id: InstrumentId,
390 ) -> PyResult<Bound<'py, PyAny>> {
391 let client = self.clone();
392
393 pyo3_async_runtimes::tokio::future_into_py(py, async move {
394 client
395 .unsubscribe_book(instrument_id)
396 .await
397 .map_err(to_pyruntime_err)?;
398 Ok(())
399 })
400 }
401
402 #[pyo3(name = "unsubscribe_quotes")]
403 fn py_unsubscribe_quotes<'py>(
404 &self,
405 py: Python<'py>,
406 instrument_id: InstrumentId,
407 ) -> PyResult<Bound<'py, PyAny>> {
408 let client = self.clone();
409
410 pyo3_async_runtimes::tokio::future_into_py(py, async move {
411 client
412 .unsubscribe_quotes(instrument_id)
413 .await
414 .map_err(to_pyruntime_err)?;
415 Ok(())
416 })
417 }
418
419 #[pyo3(name = "unsubscribe_trades")]
420 fn py_unsubscribe_trades<'py>(
421 &self,
422 py: Python<'py>,
423 instrument_id: InstrumentId,
424 ) -> PyResult<Bound<'py, PyAny>> {
425 let client = self.clone();
426
427 pyo3_async_runtimes::tokio::future_into_py(py, async move {
428 client
429 .unsubscribe_trades(instrument_id)
430 .await
431 .map_err(to_pyruntime_err)?;
432 Ok(())
433 })
434 }
435
436 #[pyo3(name = "unsubscribe_mark_price")]
437 fn py_unsubscribe_mark_price<'py>(
438 &self,
439 py: Python<'py>,
440 instrument_id: InstrumentId,
441 ) -> PyResult<Bound<'py, PyAny>> {
442 let client = self.clone();
443
444 pyo3_async_runtimes::tokio::future_into_py(py, async move {
445 client
446 .unsubscribe_mark_price(instrument_id)
447 .await
448 .map_err(to_pyruntime_err)?;
449 Ok(())
450 })
451 }
452
453 #[pyo3(name = "unsubscribe_index_price")]
454 fn py_unsubscribe_index_price<'py>(
455 &self,
456 py: Python<'py>,
457 instrument_id: InstrumentId,
458 ) -> PyResult<Bound<'py, PyAny>> {
459 let client = self.clone();
460
461 pyo3_async_runtimes::tokio::future_into_py(py, async move {
462 client
463 .unsubscribe_index_price(instrument_id)
464 .await
465 .map_err(to_pyruntime_err)?;
466 Ok(())
467 })
468 }
469
470 #[pyo3(name = "set_account_id")]
471 fn py_set_account_id(&self, account_id: AccountId) {
472 self.set_account_id(account_id);
473 }
474
475 #[pyo3(name = "cache_client_order")]
476 fn py_cache_client_order(
477 &self,
478 client_order_id: ClientOrderId,
479 venue_order_id: Option<VenueOrderId>,
480 instrument_id: InstrumentId,
481 trader_id: TraderId,
482 strategy_id: StrategyId,
483 ) {
484 self.cache_client_order(
485 client_order_id,
486 venue_order_id,
487 instrument_id,
488 trader_id,
489 strategy_id,
490 );
491 }
492
493 #[pyo3(name = "sign_challenge")]
494 fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
495 self.sign_challenge(challenge).map_err(to_pyruntime_err)
496 }
497
498 #[pyo3(name = "authenticate_with_challenge")]
499 fn py_authenticate_with_challenge<'py>(
500 &self,
501 py: Python<'py>,
502 challenge: String,
503 ) -> PyResult<Bound<'py, PyAny>> {
504 let client = self.clone();
505
506 pyo3_async_runtimes::tokio::future_into_py(py, async move {
507 client
508 .authenticate_with_challenge(&challenge)
509 .await
510 .map_err(to_pyruntime_err)?;
511 Ok(())
512 })
513 }
514
515 #[pyo3(name = "set_auth_credentials")]
516 fn py_set_auth_credentials<'py>(
517 &self,
518 py: Python<'py>,
519 original_challenge: String,
520 signed_challenge: String,
521 ) -> PyResult<Bound<'py, PyAny>> {
522 let client = self.clone();
523
524 pyo3_async_runtimes::tokio::future_into_py(py, async move {
525 client
526 .set_auth_credentials(original_challenge, signed_challenge)
527 .await
528 .map_err(to_pyruntime_err)?;
529 Ok(())
530 })
531 }
532
533 #[pyo3(name = "subscribe_open_orders")]
534 fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
535 let client = self.clone();
536
537 pyo3_async_runtimes::tokio::future_into_py(py, async move {
538 client
539 .subscribe_open_orders()
540 .await
541 .map_err(to_pyruntime_err)?;
542 Ok(())
543 })
544 }
545
546 #[pyo3(name = "subscribe_fills")]
547 fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
548 let client = self.clone();
549
550 pyo3_async_runtimes::tokio::future_into_py(py, async move {
551 client.subscribe_fills().await.map_err(to_pyruntime_err)?;
552 Ok(())
553 })
554 }
555
556 #[pyo3(name = "subscribe_executions")]
557 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
558 let client = self.clone();
559
560 pyo3_async_runtimes::tokio::future_into_py(py, async move {
561 client
562 .subscribe_executions()
563 .await
564 .map_err(to_pyruntime_err)?;
565 Ok(())
566 })
567 }
568}