1use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21 data::{Data, OrderBookDeltas_API},
22 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{IntoPyObjectExt, prelude::*};
26
27use crate::{
28 common::{
29 credential::KrakenCredential,
30 enums::{KrakenEnvironment, KrakenProductType},
31 urls::get_kraken_ws_public_url,
32 },
33 websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
34};
35
36#[pymethods]
37impl KrakenFuturesWebSocketClient {
38 #[new]
39 #[pyo3(signature = (environment=None, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
40 fn py_new(
41 environment: Option<KrakenEnvironment>,
42 base_url: Option<String>,
43 heartbeat_secs: Option<u64>,
44 api_key: Option<String>,
45 api_secret: Option<String>,
46 ) -> PyResult<Self> {
47 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
48 let demo = env == KrakenEnvironment::Demo;
49 let url = base_url.unwrap_or_else(|| {
50 get_kraken_ws_public_url(KrakenProductType::Futures, env).to_string()
51 });
52 let credential = KrakenCredential::resolve_futures(api_key, api_secret, demo);
53
54 Ok(KrakenFuturesWebSocketClient::with_credentials(
55 url,
56 heartbeat_secs,
57 credential,
58 ))
59 }
60
61 #[getter]
62 #[pyo3(name = "has_credentials")]
63 #[must_use]
64 pub fn py_has_credentials(&self) -> bool {
65 self.has_credentials()
66 }
67
68 #[getter]
69 #[pyo3(name = "url")]
70 #[must_use]
71 pub fn py_url(&self) -> &str {
72 self.url()
73 }
74
75 #[pyo3(name = "is_closed")]
76 fn py_is_closed(&self) -> bool {
77 self.is_closed()
78 }
79
80 #[pyo3(name = "is_active")]
81 fn py_is_active(&self) -> bool {
82 self.is_active()
83 }
84
85 #[pyo3(name = "wait_until_active")]
86 fn py_wait_until_active<'py>(
87 &self,
88 py: Python<'py>,
89 timeout_secs: f64,
90 ) -> PyResult<Bound<'py, PyAny>> {
91 let client = self.clone();
92
93 pyo3_async_runtimes::tokio::future_into_py(py, async move {
94 client
95 .wait_until_active(timeout_secs)
96 .await
97 .map_err(to_pyruntime_err)?;
98 Ok(())
99 })
100 }
101
102 #[pyo3(name = "authenticate")]
103 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
104 let client = self.clone();
105
106 pyo3_async_runtimes::tokio::future_into_py(py, async move {
107 client.authenticate().await.map_err(to_pyruntime_err)?;
108 Ok(())
109 })
110 }
111
112 #[pyo3(name = "cache_instruments")]
113 fn py_cache_instruments(&self, py: Python<'_>, instruments: Vec<Py<PyAny>>) -> PyResult<()> {
114 let mut instruments_any = Vec::new();
115 for inst in instruments {
116 let inst_any = pyobject_to_instrument_any(py, inst)?;
117 instruments_any.push(inst_any);
118 }
119 self.cache_instruments(instruments_any);
120 Ok(())
121 }
122
123 #[pyo3(name = "cache_instrument")]
124 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
125 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
126 Ok(())
127 }
128
129 #[pyo3(name = "connect")]
130 fn py_connect<'py>(
131 &mut self,
132 py: Python<'py>,
133 instruments: Vec<Py<PyAny>>,
134 callback: Py<PyAny>,
135 ) -> PyResult<Bound<'py, PyAny>> {
136 let mut instruments_any = Vec::new();
137 for inst in instruments {
138 let inst_any = pyobject_to_instrument_any(py, inst)?;
139 instruments_any.push(inst_any);
140 }
141
142 let mut client = self.clone();
143
144 pyo3_async_runtimes::tokio::future_into_py(py, async move {
145 client.connect().await.map_err(to_pyruntime_err)?;
146
147 client.cache_instruments(instruments_any);
149
150 if let Some(mut rx) = client.take_output_rx() {
152 get_runtime().spawn(async move {
153 while let Some(msg) = rx.recv().await {
154 Python::attach(|py| match msg {
155 KrakenFuturesWsMessage::MarkPrice(update) => {
156 let py_obj = data_to_pycapsule(py, Data::from(update));
157 if let Err(e) = callback.call1(py, (py_obj,)) {
158 tracing::error!("Error calling Python callback: {e}");
159 }
160 }
161 KrakenFuturesWsMessage::IndexPrice(update) => {
162 let py_obj = data_to_pycapsule(py, Data::from(update));
163 if let Err(e) = callback.call1(py, (py_obj,)) {
164 tracing::error!("Error calling Python callback: {e}");
165 }
166 }
167 KrakenFuturesWsMessage::Quote(quote) => {
168 let py_obj = data_to_pycapsule(py, Data::from(quote));
169 if let Err(e) = callback.call1(py, (py_obj,)) {
170 tracing::error!("Error calling Python callback: {e}");
171 }
172 }
173 KrakenFuturesWsMessage::Trade(trade) => {
174 let py_obj = data_to_pycapsule(py, Data::from(trade));
175 if let Err(e) = callback.call1(py, (py_obj,)) {
176 tracing::error!("Error calling Python callback: {e}");
177 }
178 }
179 KrakenFuturesWsMessage::BookDeltas(deltas) => {
180 let py_obj = data_to_pycapsule(
181 py,
182 Data::Deltas(OrderBookDeltas_API::new(deltas)),
183 );
184 if let Err(e) = callback.call1(py, (py_obj,)) {
185 tracing::error!("Error calling Python callback: {e}");
186 }
187 }
188 KrakenFuturesWsMessage::OrderAccepted(event) => {
189 match event.into_py_any(py) {
190 Ok(py_obj) => {
191 if let Err(e) = callback.call1(py, (py_obj,)) {
192 tracing::error!("Error calling Python callback: {e}");
193 }
194 }
195 Err(e) => {
196 tracing::error!(
197 "Failed to convert OrderAccepted to Python: {e}"
198 );
199 }
200 }
201 }
202 KrakenFuturesWsMessage::OrderCanceled(event) => {
203 match event.into_py_any(py) {
204 Ok(py_obj) => {
205 if let Err(e) = callback.call1(py, (py_obj,)) {
206 tracing::error!("Error calling Python callback: {e}");
207 }
208 }
209 Err(e) => {
210 tracing::error!(
211 "Failed to convert OrderCanceled to Python: {e}"
212 );
213 }
214 }
215 }
216 KrakenFuturesWsMessage::OrderExpired(event) => {
217 match event.into_py_any(py) {
218 Ok(py_obj) => {
219 if let Err(e) = callback.call1(py, (py_obj,)) {
220 tracing::error!("Error calling Python callback: {e}");
221 }
222 }
223 Err(e) => {
224 tracing::error!(
225 "Failed to convert OrderExpired to Python: {e}"
226 );
227 }
228 }
229 }
230 KrakenFuturesWsMessage::OrderUpdated(event) => {
231 match event.into_py_any(py) {
232 Ok(py_obj) => {
233 if let Err(e) = callback.call1(py, (py_obj,)) {
234 tracing::error!("Error calling Python callback: {e}");
235 }
236 }
237 Err(e) => {
238 tracing::error!(
239 "Failed to convert OrderUpdated to Python: {e}"
240 );
241 }
242 }
243 }
244 KrakenFuturesWsMessage::OrderStatusReport(report) => {
245 match (*report).into_py_any(py) {
246 Ok(py_obj) => {
247 if let Err(e) = callback.call1(py, (py_obj,)) {
248 tracing::error!("Error calling Python callback: {e}");
249 }
250 }
251 Err(e) => {
252 tracing::error!(
253 "Failed to convert OrderStatusReport to Python: {e}"
254 );
255 }
256 }
257 }
258 KrakenFuturesWsMessage::FillReport(report) => {
259 match (*report).into_py_any(py) {
260 Ok(py_obj) => {
261 if let Err(e) = callback.call1(py, (py_obj,)) {
262 tracing::error!("Error calling Python callback: {e}");
263 }
264 }
265 Err(e) => {
266 tracing::error!(
267 "Failed to convert FillReport to Python: {e}"
268 );
269 }
270 }
271 }
272 KrakenFuturesWsMessage::Reconnected => {
273 tracing::info!("WebSocket reconnected");
274 }
275 });
276 }
277 });
278 }
279
280 Ok(())
281 })
282 }
283
284 #[pyo3(name = "disconnect")]
285 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
286 let mut client = self.clone();
287
288 pyo3_async_runtimes::tokio::future_into_py(py, async move {
289 client.disconnect().await.map_err(to_pyruntime_err)?;
290 Ok(())
291 })
292 }
293
294 #[pyo3(name = "close")]
295 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
296 let mut client = self.clone();
297
298 pyo3_async_runtimes::tokio::future_into_py(py, async move {
299 client.close().await.map_err(to_pyruntime_err)?;
300 Ok(())
301 })
302 }
303
304 #[pyo3(name = "subscribe_book")]
305 #[pyo3(signature = (instrument_id, depth=None))]
306 fn py_subscribe_book<'py>(
307 &self,
308 py: Python<'py>,
309 instrument_id: InstrumentId,
310 depth: Option<u32>,
311 ) -> PyResult<Bound<'py, PyAny>> {
312 let client = self.clone();
313
314 pyo3_async_runtimes::tokio::future_into_py(py, async move {
315 client
316 .subscribe_book(instrument_id, depth)
317 .await
318 .map_err(to_pyruntime_err)?;
319 Ok(())
320 })
321 }
322
323 #[pyo3(name = "subscribe_quotes")]
324 fn py_subscribe_quotes<'py>(
325 &self,
326 py: Python<'py>,
327 instrument_id: InstrumentId,
328 ) -> PyResult<Bound<'py, PyAny>> {
329 let client = self.clone();
330
331 pyo3_async_runtimes::tokio::future_into_py(py, async move {
332 client
333 .subscribe_quotes(instrument_id)
334 .await
335 .map_err(to_pyruntime_err)?;
336 Ok(())
337 })
338 }
339
340 #[pyo3(name = "subscribe_trades")]
341 fn py_subscribe_trades<'py>(
342 &self,
343 py: Python<'py>,
344 instrument_id: InstrumentId,
345 ) -> PyResult<Bound<'py, PyAny>> {
346 let client = self.clone();
347
348 pyo3_async_runtimes::tokio::future_into_py(py, async move {
349 client
350 .subscribe_trades(instrument_id)
351 .await
352 .map_err(to_pyruntime_err)?;
353 Ok(())
354 })
355 }
356
357 #[pyo3(name = "subscribe_mark_price")]
358 fn py_subscribe_mark_price<'py>(
359 &self,
360 py: Python<'py>,
361 instrument_id: InstrumentId,
362 ) -> PyResult<Bound<'py, PyAny>> {
363 let client = self.clone();
364
365 pyo3_async_runtimes::tokio::future_into_py(py, async move {
366 client
367 .subscribe_mark_price(instrument_id)
368 .await
369 .map_err(to_pyruntime_err)?;
370 Ok(())
371 })
372 }
373
374 #[pyo3(name = "subscribe_index_price")]
375 fn py_subscribe_index_price<'py>(
376 &self,
377 py: Python<'py>,
378 instrument_id: InstrumentId,
379 ) -> PyResult<Bound<'py, PyAny>> {
380 let client = self.clone();
381
382 pyo3_async_runtimes::tokio::future_into_py(py, async move {
383 client
384 .subscribe_index_price(instrument_id)
385 .await
386 .map_err(to_pyruntime_err)?;
387 Ok(())
388 })
389 }
390
391 #[pyo3(name = "unsubscribe_book")]
392 fn py_unsubscribe_book<'py>(
393 &self,
394 py: Python<'py>,
395 instrument_id: InstrumentId,
396 ) -> PyResult<Bound<'py, PyAny>> {
397 let client = self.clone();
398
399 pyo3_async_runtimes::tokio::future_into_py(py, async move {
400 client
401 .unsubscribe_book(instrument_id)
402 .await
403 .map_err(to_pyruntime_err)?;
404 Ok(())
405 })
406 }
407
408 #[pyo3(name = "unsubscribe_quotes")]
409 fn py_unsubscribe_quotes<'py>(
410 &self,
411 py: Python<'py>,
412 instrument_id: InstrumentId,
413 ) -> PyResult<Bound<'py, PyAny>> {
414 let client = self.clone();
415
416 pyo3_async_runtimes::tokio::future_into_py(py, async move {
417 client
418 .unsubscribe_quotes(instrument_id)
419 .await
420 .map_err(to_pyruntime_err)?;
421 Ok(())
422 })
423 }
424
425 #[pyo3(name = "unsubscribe_trades")]
426 fn py_unsubscribe_trades<'py>(
427 &self,
428 py: Python<'py>,
429 instrument_id: InstrumentId,
430 ) -> PyResult<Bound<'py, PyAny>> {
431 let client = self.clone();
432
433 pyo3_async_runtimes::tokio::future_into_py(py, async move {
434 client
435 .unsubscribe_trades(instrument_id)
436 .await
437 .map_err(to_pyruntime_err)?;
438 Ok(())
439 })
440 }
441
442 #[pyo3(name = "unsubscribe_mark_price")]
443 fn py_unsubscribe_mark_price<'py>(
444 &self,
445 py: Python<'py>,
446 instrument_id: InstrumentId,
447 ) -> PyResult<Bound<'py, PyAny>> {
448 let client = self.clone();
449
450 pyo3_async_runtimes::tokio::future_into_py(py, async move {
451 client
452 .unsubscribe_mark_price(instrument_id)
453 .await
454 .map_err(to_pyruntime_err)?;
455 Ok(())
456 })
457 }
458
459 #[pyo3(name = "unsubscribe_index_price")]
460 fn py_unsubscribe_index_price<'py>(
461 &self,
462 py: Python<'py>,
463 instrument_id: InstrumentId,
464 ) -> PyResult<Bound<'py, PyAny>> {
465 let client = self.clone();
466
467 pyo3_async_runtimes::tokio::future_into_py(py, async move {
468 client
469 .unsubscribe_index_price(instrument_id)
470 .await
471 .map_err(to_pyruntime_err)?;
472 Ok(())
473 })
474 }
475
476 #[pyo3(name = "set_account_id")]
477 fn py_set_account_id(&self, account_id: AccountId) {
478 self.set_account_id(account_id);
479 }
480
481 #[pyo3(name = "cache_client_order")]
482 fn py_cache_client_order(
483 &self,
484 client_order_id: ClientOrderId,
485 venue_order_id: Option<VenueOrderId>,
486 instrument_id: InstrumentId,
487 trader_id: TraderId,
488 strategy_id: StrategyId,
489 ) {
490 self.cache_client_order(
491 client_order_id,
492 venue_order_id,
493 instrument_id,
494 trader_id,
495 strategy_id,
496 );
497 }
498
499 #[pyo3(name = "sign_challenge")]
500 fn py_sign_challenge(&self, challenge: &str) -> PyResult<String> {
501 self.sign_challenge(challenge).map_err(to_pyruntime_err)
502 }
503
504 #[pyo3(name = "authenticate_with_challenge")]
505 fn py_authenticate_with_challenge<'py>(
506 &self,
507 py: Python<'py>,
508 challenge: String,
509 ) -> PyResult<Bound<'py, PyAny>> {
510 let client = self.clone();
511
512 pyo3_async_runtimes::tokio::future_into_py(py, async move {
513 client
514 .authenticate_with_challenge(&challenge)
515 .await
516 .map_err(to_pyruntime_err)?;
517 Ok(())
518 })
519 }
520
521 #[pyo3(name = "set_auth_credentials")]
522 fn py_set_auth_credentials<'py>(
523 &self,
524 py: Python<'py>,
525 original_challenge: String,
526 signed_challenge: String,
527 ) -> PyResult<Bound<'py, PyAny>> {
528 let client = self.clone();
529
530 pyo3_async_runtimes::tokio::future_into_py(py, async move {
531 client
532 .set_auth_credentials(original_challenge, signed_challenge)
533 .await
534 .map_err(to_pyruntime_err)?;
535 Ok(())
536 })
537 }
538
539 #[pyo3(name = "subscribe_open_orders")]
540 fn py_subscribe_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
541 let client = self.clone();
542
543 pyo3_async_runtimes::tokio::future_into_py(py, async move {
544 client
545 .subscribe_open_orders()
546 .await
547 .map_err(to_pyruntime_err)?;
548 Ok(())
549 })
550 }
551
552 #[pyo3(name = "subscribe_fills")]
553 fn py_subscribe_fills<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
554 let client = self.clone();
555
556 pyo3_async_runtimes::tokio::future_into_py(py, async move {
557 client.subscribe_fills().await.map_err(to_pyruntime_err)?;
558 Ok(())
559 })
560 }
561
562 #[pyo3(name = "subscribe_executions")]
563 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
564 let client = self.clone();
565
566 pyo3_async_runtimes::tokio::future_into_py(py, async move {
567 client
568 .subscribe_executions()
569 .await
570 .map_err(to_pyruntime_err)?;
571 Ok(())
572 })
573 }
574}