1use nautilus_core::python::to_pyruntime_err;
19use nautilus_model::{
20 data::{Data, OrderBookDeltas_API},
21 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
22 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
23};
24use pyo3::{IntoPyObjectExt, prelude::*};
25
26use crate::{
27 common::{
28 credential::KrakenCredential,
29 enums::{KrakenEnvironment, KrakenProductType},
30 urls::get_kraken_ws_public_url,
31 },
32 websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
33};
34
35#[pymethods]
36impl KrakenFuturesWebSocketClient {
37 #[new]
38 #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
39 fn py_new(
40 environment: Option<KrakenEnvironment>,
41 base_url: Option<String>,
42 heartbeat_secs: Option<u64>,
43 api_key: Option<String>,
44 api_secret: Option<String>,
45 ) -> PyResult<Self> {
46 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
47 let demo = env == KrakenEnvironment::Demo;
48 let url = base_url.unwrap_or_else(|| {
49 get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
50 });
51 let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
52
53 Ok(KrakenFuturesWebSocketClient::with_credentials(
54 url,
55 heartbeat_secs,
56 credential,
57 ))
58 }
59
60 #[getter]
61 #[pyo3(name = "has_credentials")]
62 #[must_use]
63 pub fn py_has_credentials(&self) -> bool {
64 self.has_credentials()
65 }
66
67 #[getter]
68 #[pyo3(name = "url")]
69 #[must_use]
70 pub fn py_url(&self) -> &str {
71 self.url()
72 }
73
74 #[pyo3(name = "is_closed")]
75 fn py_is_closed(&self) -> bool {
76 self.is_closed()
77 }
78
79 #[pyo3(name = "is_active")]
80 fn py_is_active(&self) -> bool {
81 self.is_active()
82 }
83
84 #[pyo3(name = "wait_until_active")]
85 fn py_wait_until_active<'py>(
86 &self,
87 py: Python<'py>,
88 timeout_secs: f64,
89 ) -> PyResult<Bound<'py, PyAny>> {
90 let client = self.clone();
91
92 pyo3_async_runtimes::tokio::future_into_py(py, async move {
93 client
94 .wait_until_active(timeout_secs)
95 .await
96 .map_err(to_pyruntime_err)?;
97 Ok(())
98 })
99 }
100
101 #[pyo3(name = "authenticate")]
102 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
103 let client = self.clone();
104
105 pyo3_async_runtimes::tokio::future_into_py(py, async move {
106 client.authenticate().await.map_err(to_pyruntime_err)?;
107 Ok(())
108 })
109 }
110
111 #[pyo3(name = "cache_instruments")]
112 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
113 let mut instruments_any = Vec::new();
114 for inst in instruments {
115 let inst_any = pyobject_to_instrument_any(py, inst)?;
116 instruments_any.push(inst_any);
117 }
118 self.cache_instruments(instruments_any);
119 Ok(())
120 }
121
122 #[pyo3(name = "cache_instrument")]
123 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
124 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
125 Ok(())
126 }
127
128 #[pyo3(name = "connect")]
129 fn py_connect<'py>(
130 &mut self,
131 py: Python<'py>,
132 instruments: Vec<Py<PyAny>>,
133 callback: Py<PyAny>,
134 ) -> PyResult<Bound<'py, PyAny>> {
135 let mut instruments_any = Vec::new();
136 for inst in instruments {
137 let inst_any = pyobject_to_instrument_any(py, inst)?;
138 instruments_any.push(inst_any);
139 }
140
141 let mut client = self.clone();
142
143 pyo3_async_runtimes::tokio::future_into_py(py, async move {
144 client.connect().await.map_err(to_pyruntime_err)?;
145
146 client.cache_instruments(instruments_any);
148
149 if let Some(mut rx) = client.take_output_rx() {
151 tokio::spawn(async move {
152 while let Some(msg) = rx.recv().await {
153 Python::attach(|py| match msg {
154 KrakenFuturesWsMessage::MarkPrice(update) => {
155 let py_obj = data_to_pycapsule(py, Data::from(update));
156 if let Err(e) = callback.call1(py, (py_obj,)) {
157 tracing::error!("Error calling Python callback: {e}");
158 }
159 }
160 KrakenFuturesWsMessage::IndexPrice(update) => {
161 let py_obj = data_to_pycapsule(py, Data::from(update));
162 if let Err(e) = callback.call1(py, (py_obj,)) {
163 tracing::error!("Error calling Python callback: {e}");
164 }
165 }
166 KrakenFuturesWsMessage::Quote(quote) => {
167 let py_obj = data_to_pycapsule(py, Data::from(quote));
168 if let Err(e) = callback.call1(py, (py_obj,)) {
169 tracing::error!("Error calling Python callback: {e}");
170 }
171 }
172 KrakenFuturesWsMessage::Trade(trade) => {
173 let py_obj = data_to_pycapsule(py, Data::from(trade));
174 if let Err(e) = callback.call1(py, (py_obj,)) {
175 tracing::error!("Error calling Python callback: {e}");
176 }
177 }
178 KrakenFuturesWsMessage::BookDeltas(deltas) => {
179 let py_obj = data_to_pycapsule(
180 py,
181 Data::Deltas(OrderBookDeltas_API::new(deltas)),
182 );
183 if let Err(e) = callback.call1(py, (py_obj,)) {
184 tracing::error!("Error calling Python callback: {e}");
185 }
186 }
187 KrakenFuturesWsMessage::OrderAccepted(event) => {
188 match event.into_py_any(py) {
189 Ok(py_obj) => {
190 if let Err(e) = callback.call1(py, (py_obj,)) {
191 tracing::error!("Error calling Python callback: {e}");
192 }
193 }
194 Err(e) => {
195 tracing::error!(
196 "Failed to convert OrderAccepted to Python: {e}"
197 );
198 }
199 }
200 }
201 KrakenFuturesWsMessage::OrderCanceled(event) => {
202 match event.into_py_any(py) {
203 Ok(py_obj) => {
204 if let Err(e) = callback.call1(py, (py_obj,)) {
205 tracing::error!("Error calling Python callback: {e}");
206 }
207 }
208 Err(e) => {
209 tracing::error!(
210 "Failed to convert OrderCanceled to Python: {e}"
211 );
212 }
213 }
214 }
215 KrakenFuturesWsMessage::OrderExpired(event) => {
216 match event.into_py_any(py) {
217 Ok(py_obj) => {
218 if let Err(e) = callback.call1(py, (py_obj,)) {
219 tracing::error!("Error calling Python callback: {e}");
220 }
221 }
222 Err(e) => {
223 tracing::error!(
224 "Failed to convert OrderExpired to Python: {e}"
225 );
226 }
227 }
228 }
229 KrakenFuturesWsMessage::OrderUpdated(event) => {
230 match event.into_py_any(py) {
231 Ok(py_obj) => {
232 if let Err(e) = callback.call1(py, (py_obj,)) {
233 tracing::error!("Error calling Python callback: {e}");
234 }
235 }
236 Err(e) => {
237 tracing::error!(
238 "Failed to convert OrderUpdated to Python: {e}"
239 );
240 }
241 }
242 }
243 KrakenFuturesWsMessage::OrderStatusReport(report) => {
244 match (*report).into_py_any(py) {
245 Ok(py_obj) => {
246 if let Err(e) = callback.call1(py, (py_obj,)) {
247 tracing::error!("Error calling Python callback: {e}");
248 }
249 }
250 Err(e) => {
251 tracing::error!(
252 "Failed to convert OrderStatusReport to Python: {e}"
253 );
254 }
255 }
256 }
257 KrakenFuturesWsMessage::FillReport(report) => {
258 match (*report).into_py_any(py) {
259 Ok(py_obj) => {
260 if let Err(e) = callback.call1(py, (py_obj,)) {
261 tracing::error!("Error calling Python callback: {e}");
262 }
263 }
264 Err(e) => {
265 tracing::error!(
266 "Failed to convert FillReport to Python: {e}"
267 );
268 }
269 }
270 }
271 KrakenFuturesWsMessage::Reconnected => {
272 tracing::info!("WebSocket reconnected");
273 }
274 });
275 }
276 });
277 }
278
279 Ok(())
280 })
281 }
282
283 #[pyo3(name = "disconnect")]
284 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
285 let mut client = self.clone();
286
287 pyo3_async_runtimes::tokio::future_into_py(py, async move {
288 client.disconnect().await.map_err(to_pyruntime_err)?;
289 Ok(())
290 })
291 }
292
293 #[pyo3(name = "close")]
294 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
295 let mut client = self.clone();
296
297 pyo3_async_runtimes::tokio::future_into_py(py, async move {
298 client.close().await.map_err(to_pyruntime_err)?;
299 Ok(())
300 })
301 }
302
303 #[pyo3(name = "subscribe_book")]
304 #[pyo3(signature = (instrument_id, depth=None))]
305 fn py_subscribe_book<'py>(
306 &self,
307 py: Python<'py>,
308 instrument_id: InstrumentId,
309 depth: Option<u32>,
310 ) -> PyResult<Bound<'py, PyAny>> {
311 let client = self.clone();
312
313 pyo3_async_runtimes::tokio::future_into_py(py, async move {
314 client
315 .subscribe_book(instrument_id, depth)
316 .await
317 .map_err(to_pyruntime_err)?;
318 Ok(())
319 })
320 }
321
322 #[pyo3(name = "subscribe_quotes")]
323 fn py_subscribe_quotes<'py>(
324 &self,
325 py: Python<'py>,
326 instrument_id: InstrumentId,
327 ) -> PyResult<Bound<'py, PyAny>> {
328 let client = self.clone();
329
330 pyo3_async_runtimes::tokio::future_into_py(py, async move {
331 client
332 .subscribe_quotes(instrument_id)
333 .await
334 .map_err(to_pyruntime_err)?;
335 Ok(())
336 })
337 }
338
339 #[pyo3(name = "subscribe_trades")]
340 fn py_subscribe_trades<'py>(
341 &self,
342 py: Python<'py>,
343 instrument_id: InstrumentId,
344 ) -> PyResult<Bound<'py, PyAny>> {
345 let client = self.clone();
346
347 pyo3_async_runtimes::tokio::future_into_py(py, async move {
348 client
349 .subscribe_trades(instrument_id)
350 .await
351 .map_err(to_pyruntime_err)?;
352 Ok(())
353 })
354 }
355
356 #[pyo3(name = "subscribe_mark_price")]
357 fn py_subscribe_mark_price<'py>(
358 &self,
359 py: Python<'py>,
360 instrument_id: InstrumentId,
361 ) -> PyResult<Bound<'py, PyAny>> {
362 let client = self.clone();
363
364 pyo3_async_runtimes::tokio::future_into_py(py, async move {
365 client
366 .subscribe_mark_price(instrument_id)
367 .await
368 .map_err(to_pyruntime_err)?;
369 Ok(())
370 })
371 }
372
373 #[pyo3(name = "subscribe_index_price")]
374 fn py_subscribe_index_price<'py>(
375 &self,
376 py: Python<'py>,
377 instrument_id: InstrumentId,
378 ) -> PyResult<Bound<'py, PyAny>> {
379 let client = self.clone();
380
381 pyo3_async_runtimes::tokio::future_into_py(py, async move {
382 client
383 .subscribe_index_price(instrument_id)
384 .await
385 .map_err(to_pyruntime_err)?;
386 Ok(())
387 })
388 }
389
390 #[pyo3(name = "unsubscribe_book")]
391 fn py_unsubscribe_book<'py>(
392 &self,
393 py: Python<'py>,
394 instrument_id: InstrumentId,
395 ) -> PyResult<Bound<'py, PyAny>> {
396 let client = self.clone();
397
398 pyo3_async_runtimes::tokio::future_into_py(py, async move {
399 client
400 .unsubscribe_book(instrument_id)
401 .await
402 .map_err(to_pyruntime_err)?;
403 Ok(())
404 })
405 }
406
407 #[pyo3(name = "unsubscribe_quotes")]
408 fn py_unsubscribe_quotes<'py>(
409 &self,
410 py: Python<'py>,
411 instrument_id: InstrumentId,
412 ) -> PyResult<Bound<'py, PyAny>> {
413 let client = self.clone();
414
415 pyo3_async_runtimes::tokio::future_into_py(py, async move {
416 client
417 .unsubscribe_quotes(instrument_id)
418 .await
419 .map_err(to_pyruntime_err)?;
420 Ok(())
421 })
422 }
423
424 #[pyo3(name = "unsubscribe_trades")]
425 fn py_unsubscribe_trades<'py>(
426 &self,
427 py: Python<'py>,
428 instrument_id: InstrumentId,
429 ) -> PyResult<Bound<'py, PyAny>> {
430 let client = self.clone();
431
432 pyo3_async_runtimes::tokio::future_into_py(py, async move {
433 client
434 .unsubscribe_trades(instrument_id)
435 .await
436 .map_err(to_pyruntime_err)?;
437 Ok(())
438 })
439 }
440
441 #[pyo3(name = "unsubscribe_mark_price")]
442 fn py_unsubscribe_mark_price<'py>(
443 &self,
444 py: Python<'py>,
445 instrument_id: InstrumentId,
446 ) -> PyResult<Bound<'py, PyAny>> {
447 let client = self.clone();
448
449 pyo3_async_runtimes::tokio::future_into_py(py, async move {
450 client
451 .unsubscribe_mark_price(instrument_id)
452 .await
453 .map_err(to_pyruntime_err)?;
454 Ok(())
455 })
456 }
457
458 #[pyo3(name = "unsubscribe_index_price")]
459 fn py_unsubscribe_index_price<'py>(
460 &self,
461 py: Python<'py>,
462 instrument_id: InstrumentId,
463 ) -> PyResult<Bound<'py, PyAny>> {
464 let client = self.clone();
465
466 pyo3_async_runtimes::tokio::future_into_py(py, async move {
467 client
468 .unsubscribe_index_price(instrument_id)
469 .await
470 .map_err(to_pyruntime_err)?;
471 Ok(())
472 })
473 }
474
475 #[pyo3(name = "set_account_id")]
476 fn py_set_account_id(&self, account_id: AccountId) {
477 self.set_account_id(account_id);
478 }
479
480 #[pyo3(name = "cache_client_order")]
481 fn py_cache_client_order(
482 &self,
483 client_order_id: ClientOrderId,
484 venue_order_id: Option<VenueOrderId>,
485 instrument_id: InstrumentId,
486 trader_id: TraderId,
487 strategy_id: StrategyId,
488 ) {
489 self.cache_client_order(
490 client_order_id,
491 venue_order_id,
492 instrument_id,
493 trader_id,
494 strategy_id,
495 );
496 }
497
498 #[pyo3(name = "sign_challenge")]
499 fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
500 self.sign_challenge(challenge).map_err(to_pyruntime_err)
501 }
502
503 #[pyo3(name = "authenticate_with_challenge")]
504 fn py_authenticate_with_challenge<'py>(
505 &self,
506 py: Python<'py>,
507 challenge: String,
508 ) -> PyResult<Bound<'py, PyAny>> {
509 let client = self.clone();
510
511 pyo3_async_runtimes::tokio::future_into_py(py, async move {
512 client
513 .authenticate_with_challenge(&challenge)
514 .await
515 .map_err(to_pyruntime_err)?;
516 Ok(())
517 })
518 }
519
520 #[pyo3(name = "set_auth_credentials")]
521 fn py_set_auth_credentials<'py>(
522 &self,
523 py: Python<'py>,
524 original_challenge: String,
525 signed_challenge: String,
526 ) -> PyResult<Bound<'py, PyAny>> {
527 let client = self.clone();
528
529 pyo3_async_runtimes::tokio::future_into_py(py, async move {
530 client
531 .set_auth_credentials(original_challenge, signed_challenge)
532 .await
533 .map_err(to_pyruntime_err)?;
534 Ok(())
535 })
536 }
537
538 #[pyo3(name = "subscribe_open_orders")]
539 fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
540 let client = self.clone();
541
542 pyo3_async_runtimes::tokio::future_into_py(py, async move {
543 client
544 .subscribe_open_orders()
545 .await
546 .map_err(to_pyruntime_err)?;
547 Ok(())
548 })
549 }
550
551 #[pyo3(name = "subscribe_fills")]
552 fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
553 let client = self.clone();
554
555 pyo3_async_runtimes::tokio::future_into_py(py, async move {
556 client.subscribe_fills().await.map_err(to_pyruntime_err)?;
557 Ok(())
558 })
559 }
560
561 #[pyo3(name = "subscribe_executions")]
562 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
563 let client = self.clone();
564
565 pyo3_async_runtimes::tokio::future_into_py(py, async move {
566 client
567 .subscribe_executions()
568 .await
569 .map_err(to_pyruntime_err)?;
570 Ok(())
571 })
572 }
573}