1use futures_util::StreamExt;
45use nautilus_core::python::to_pyruntime_err;
46use nautilus_model::{
47 data::{BarType, Data, OrderBookDeltas_API},
48 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
49 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::{IntoPyObjectExt, prelude::*};
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55 common::{
56 enums::{KrakenEnvironment, KrakenProductType},
57 urls::get_kraken_ws_private_url,
58 },
59 config::KrakenDataClientConfig,
60 websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
61};
62
63#[pymethods]
64impl KrakenSpotWebSocketClient {
65 #[new]
66 #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
67 fn py_new(
68 environment: Option<KrakenEnvironment>,
69 private: bool,
70 base_url: Option<String>,
71 heartbeat_secs: Option<u64>,
72 api_key: Option<String>,
73 api_secret: Option<String>,
74 ) -> PyResult<Self> {
75 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
76
77 let (resolved_api_key, resolved_api_secret) =
78 crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
79 .map(|c| c.into_parts())
80 .map(|(k, s)| (Some(k), Some(s)))
81 .unwrap_or((None, None));
82
83 let (ws_public_url, ws_private_url) = if private {
84 let private_url = base_url.unwrap_or_else(|| {
86 get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
87 });
88 (None, Some(private_url))
89 } else {
90 (base_url, None)
91 };
92
93 let config = KrakenDataClientConfig {
94 environment: env,
95 ws_public_url,
96 ws_private_url,
97 heartbeat_interval_secs: heartbeat_secs,
98 api_key: resolved_api_key,
99 api_secret: resolved_api_secret,
100 ..Default::default()
101 };
102
103 let token = CancellationToken::new();
104
105 Ok(KrakenSpotWebSocketClient::new(config, token))
106 }
107
108 #[getter]
109 #[pyo3(name = "url")]
110 #[must_use]
111 pub fn py_url(&self) -> &str {
112 self.url()
113 }
114
115 #[pyo3(name = "is_connected")]
116 fn py_is_connected(&self) -> bool {
117 self.is_connected()
118 }
119
120 #[pyo3(name = "is_active")]
121 fn py_is_active(&self) -> bool {
122 self.is_active()
123 }
124
125 #[pyo3(name = "is_closed")]
126 fn py_is_closed(&self) -> bool {
127 self.is_closed()
128 }
129
130 #[pyo3(name = "get_subscriptions")]
131 fn py_get_subscriptions(&self) -> Vec<String> {
132 self.get_subscriptions()
133 }
134
135 #[pyo3(name = "cache_instrument")]
136 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
137 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
138 Ok(())
139 }
140
141 #[pyo3(name = "set_account_id")]
142 fn py_set_account_id(&self, account_id: AccountId) {
143 self.set_account_id(account_id);
144 }
145
146 #[pyo3(name = "cache_client_order")]
147 fn py_cache_client_order(
148 &self,
149 client_order_id: ClientOrderId,
150 _venue_order_id: Option<VenueOrderId>,
151 instrument_id: InstrumentId,
152 trader_id: TraderId,
153 strategy_id: StrategyId,
154 ) {
155 self.cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
157 }
158
159 #[pyo3(name = "cancel_all_requests")]
160 fn py_cancel_all_requests(&self) {
161 self.cancel_all_requests();
162 }
163
164 #[pyo3(name = "connect")]
165 fn py_connect<'py>(
166 &mut self,
167 py: Python<'py>,
168 instruments: Vec<Py<PyAny>>,
169 callback: Py<PyAny>,
170 ) -> PyResult<Bound<'py, PyAny>> {
171 let mut instruments_any = Vec::new();
172 for inst in instruments {
173 let inst_any = pyobject_to_instrument_any(py, inst)?;
174 instruments_any.push(inst_any);
175 }
176
177 let mut client = self.clone();
178
179 pyo3_async_runtimes::tokio::future_into_py(py, async move {
180 client.connect().await.map_err(to_pyruntime_err)?;
181
182 client.cache_instruments(instruments_any);
184
185 let stream = client.stream();
186
187 tokio::spawn(async move {
188 tokio::pin!(stream);
189
190 while let Some(msg) = stream.next().await {
191 match msg {
192 NautilusWsMessage::Data(data_vec) => {
193 Python::attach(|py| {
194 for data in data_vec {
195 let py_obj = data_to_pycapsule(py, data);
196 call_python(py, &callback, py_obj);
197 }
198 });
199 }
200 NautilusWsMessage::Deltas(deltas) => {
201 Python::attach(|py| {
202 let py_obj = data_to_pycapsule(
203 py,
204 Data::Deltas(OrderBookDeltas_API::new(deltas)),
205 );
206 call_python(py, &callback, py_obj);
207 });
208 }
209 NautilusWsMessage::OrderRejected(event) => {
210 Python::attach(|py| match event.into_py_any(py) {
211 Ok(py_obj) => call_python(py, &callback, py_obj),
212 Err(e) => {
213 tracing::error!(
214 "Failed to convert OrderRejected to Python: {e}"
215 );
216 }
217 });
218 }
219 NautilusWsMessage::OrderAccepted(event) => {
220 Python::attach(|py| match event.into_py_any(py) {
221 Ok(py_obj) => call_python(py, &callback, py_obj),
222 Err(e) => {
223 tracing::error!(
224 "Failed to convert OrderAccepted to Python: {e}"
225 );
226 }
227 });
228 }
229 NautilusWsMessage::OrderCanceled(event) => {
230 Python::attach(|py| match event.into_py_any(py) {
231 Ok(py_obj) => call_python(py, &callback, py_obj),
232 Err(e) => {
233 tracing::error!(
234 "Failed to convert OrderCanceled to Python: {e}"
235 );
236 }
237 });
238 }
239 NautilusWsMessage::OrderExpired(event) => {
240 Python::attach(|py| match event.into_py_any(py) {
241 Ok(py_obj) => call_python(py, &callback, py_obj),
242 Err(e) => {
243 tracing::error!(
244 "Failed to convert OrderExpired to Python: {e}"
245 );
246 }
247 });
248 }
249 NautilusWsMessage::OrderUpdated(event) => {
250 Python::attach(|py| match event.into_py_any(py) {
251 Ok(py_obj) => call_python(py, &callback, py_obj),
252 Err(e) => {
253 tracing::error!(
254 "Failed to convert OrderUpdated to Python: {e}"
255 );
256 }
257 });
258 }
259 NautilusWsMessage::OrderStatusReport(report) => {
260 Python::attach(|py| match (*report).into_py_any(py) {
261 Ok(py_obj) => call_python(py, &callback, py_obj),
262 Err(e) => {
263 tracing::error!(
264 "Failed to convert OrderStatusReport to Python: {e}"
265 );
266 }
267 });
268 }
269 NautilusWsMessage::FillReport(report) => {
270 Python::attach(|py| match (*report).into_py_any(py) {
271 Ok(py_obj) => call_python(py, &callback, py_obj),
272 Err(e) => {
273 tracing::error!("Failed to convert FillReport to Python: {e}");
274 }
275 });
276 }
277 NautilusWsMessage::Reconnected => {
278 tracing::info!("WebSocket reconnected");
279 }
280 }
281 }
282 });
283
284 Ok(())
285 })
286 }
287
288 #[pyo3(name = "wait_until_active")]
289 fn py_wait_until_active<'py>(
290 &self,
291 py: Python<'py>,
292 timeout_secs: f64,
293 ) -> PyResult<Bound<'py, PyAny>> {
294 let client = self.clone();
295
296 pyo3_async_runtimes::tokio::future_into_py(py, async move {
297 client
298 .wait_until_active(timeout_secs)
299 .await
300 .map_err(to_pyruntime_err)?;
301 Ok(())
302 })
303 }
304
305 #[pyo3(name = "authenticate")]
306 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
307 let client = self.clone();
308
309 pyo3_async_runtimes::tokio::future_into_py(py, async move {
310 client.authenticate().await.map_err(to_pyruntime_err)?;
311 Ok(())
312 })
313 }
314
315 #[pyo3(name = "disconnect")]
316 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
317 let mut client = self.clone();
318
319 pyo3_async_runtimes::tokio::future_into_py(py, async move {
320 client.disconnect().await.map_err(to_pyruntime_err)?;
321 Ok(())
322 })
323 }
324
325 #[pyo3(name = "send_ping")]
326 fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
327 let client = self.clone();
328
329 pyo3_async_runtimes::tokio::future_into_py(py, async move {
330 client.send_ping().await.map_err(to_pyruntime_err)?;
331 Ok(())
332 })
333 }
334
335 #[pyo3(name = "close")]
336 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
337 let mut client = self.clone();
338
339 pyo3_async_runtimes::tokio::future_into_py(py, async move {
340 client.close().await.map_err(to_pyruntime_err)?;
341 Ok(())
342 })
343 }
344
345 #[pyo3(name = "subscribe_book")]
346 fn py_subscribe_book<'py>(
347 &self,
348 py: Python<'py>,
349 instrument_id: InstrumentId,
350 depth: Option<u32>,
351 ) -> PyResult<Bound<'py, PyAny>> {
352 let client = self.clone();
353
354 pyo3_async_runtimes::tokio::future_into_py(py, async move {
355 client
356 .subscribe_book(instrument_id, depth)
357 .await
358 .map_err(to_pyruntime_err)?;
359 Ok(())
360 })
361 }
362
363 #[pyo3(name = "subscribe_quotes")]
364 fn py_subscribe_quotes<'py>(
365 &self,
366 py: Python<'py>,
367 instrument_id: InstrumentId,
368 ) -> PyResult<Bound<'py, PyAny>> {
369 let client = self.clone();
370
371 pyo3_async_runtimes::tokio::future_into_py(py, async move {
372 client
373 .subscribe_quotes(instrument_id)
374 .await
375 .map_err(to_pyruntime_err)?;
376 Ok(())
377 })
378 }
379
380 #[pyo3(name = "subscribe_trades")]
381 fn py_subscribe_trades<'py>(
382 &self,
383 py: Python<'py>,
384 instrument_id: InstrumentId,
385 ) -> PyResult<Bound<'py, PyAny>> {
386 let client = self.clone();
387
388 pyo3_async_runtimes::tokio::future_into_py(py, async move {
389 client
390 .subscribe_trades(instrument_id)
391 .await
392 .map_err(to_pyruntime_err)?;
393 Ok(())
394 })
395 }
396
397 #[pyo3(name = "subscribe_bars")]
398 fn py_subscribe_bars<'py>(
399 &self,
400 py: Python<'py>,
401 bar_type: BarType,
402 ) -> PyResult<Bound<'py, PyAny>> {
403 let client = self.clone();
404
405 pyo3_async_runtimes::tokio::future_into_py(py, async move {
406 client
407 .subscribe_bars(bar_type)
408 .await
409 .map_err(to_pyruntime_err)?;
410 Ok(())
411 })
412 }
413
414 #[pyo3(name = "subscribe_executions")]
415 #[pyo3(signature = (snap_orders=true, snap_trades=true))]
416 fn py_subscribe_executions<'py>(
417 &self,
418 py: Python<'py>,
419 snap_orders: bool,
420 snap_trades: bool,
421 ) -> PyResult<Bound<'py, PyAny>> {
422 let client = self.clone();
423
424 pyo3_async_runtimes::tokio::future_into_py(py, async move {
425 client
426 .subscribe_executions(snap_orders, snap_trades)
427 .await
428 .map_err(to_pyruntime_err)?;
429 Ok(())
430 })
431 }
432
433 #[pyo3(name = "unsubscribe_book")]
434 fn py_unsubscribe_book<'py>(
435 &self,
436 py: Python<'py>,
437 instrument_id: InstrumentId,
438 ) -> PyResult<Bound<'py, PyAny>> {
439 let client = self.clone();
440
441 pyo3_async_runtimes::tokio::future_into_py(py, async move {
442 client
443 .unsubscribe_book(instrument_id)
444 .await
445 .map_err(to_pyruntime_err)?;
446 Ok(())
447 })
448 }
449
450 #[pyo3(name = "unsubscribe_quotes")]
451 fn py_unsubscribe_quotes<'py>(
452 &self,
453 py: Python<'py>,
454 instrument_id: InstrumentId,
455 ) -> PyResult<Bound<'py, PyAny>> {
456 let client = self.clone();
457
458 pyo3_async_runtimes::tokio::future_into_py(py, async move {
459 client
460 .unsubscribe_quotes(instrument_id)
461 .await
462 .map_err(to_pyruntime_err)?;
463 Ok(())
464 })
465 }
466
467 #[pyo3(name = "unsubscribe_trades")]
468 fn py_unsubscribe_trades<'py>(
469 &self,
470 py: Python<'py>,
471 instrument_id: InstrumentId,
472 ) -> PyResult<Bound<'py, PyAny>> {
473 let client = self.clone();
474
475 pyo3_async_runtimes::tokio::future_into_py(py, async move {
476 client
477 .unsubscribe_trades(instrument_id)
478 .await
479 .map_err(to_pyruntime_err)?;
480 Ok(())
481 })
482 }
483
484 #[pyo3(name = "unsubscribe_bars")]
485 fn py_unsubscribe_bars<'py>(
486 &self,
487 py: Python<'py>,
488 bar_type: BarType,
489 ) -> PyResult<Bound<'py, PyAny>> {
490 let client = self.clone();
491
492 pyo3_async_runtimes::tokio::future_into_py(py, async move {
493 client
494 .unsubscribe_bars(bar_type)
495 .await
496 .map_err(to_pyruntime_err)?;
497 Ok(())
498 })
499 }
500}
501
502pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
503 if let Err(e) = callback.call1(py, (py_obj,)) {
504 tracing::error!("Error calling Python: {e}");
505 }
506}