1use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::to_pyruntime_err;
47use nautilus_model::{
48 data::{BarType, Data, OrderBookDeltas_API},
49 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
50 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
51};
52use pyo3::{IntoPyObjectExt, prelude::*};
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56 common::{
57 enums::{KrakenEnvironment, KrakenProductType},
58 urls::get_kraken_ws_private_url,
59 },
60 config::KrakenDataClientConfig,
61 websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
62};
63
64#[pymethods]
65impl KrakenSpotWebSocketClient {
66 #[new]
67 #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
68 fn py_new(
69 environment: Option<KrakenEnvironment>,
70 private: bool,
71 base_url: Option<String>,
72 heartbeat_secs: Option<u64>,
73 api_key: Option<String>,
74 api_secret: Option<String>,
75 ) -> PyResult<Self> {
76 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
77
78 let (resolved_api_key, resolved_api_secret) =
79 crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
80 .map(|c| c.into_parts())
81 .map(|(k, s)| (Some(k), Some(s)))
82 .unwrap_or((None, None));
83
84 let (ws_public_url, ws_private_url) = if private {
85 let private_url = base_url.unwrap_or_else(|| {
87 get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
88 });
89 (None, Some(private_url))
90 } else {
91 (base_url, None)
92 };
93
94 let config = KrakenDataClientConfig {
95 environment: env,
96 ws_public_url,
97 ws_private_url,
98 heartbeat_interval_secs: heartbeat_secs,
99 api_key: resolved_api_key,
100 api_secret: resolved_api_secret,
101 ..Default::default()
102 };
103
104 let token = CancellationToken::new();
105
106 Ok(KrakenSpotWebSocketClient::new(config, token))
107 }
108
109 #[getter]
110 #[pyo3(name = "url")]
111 #[must_use]
112 pub fn py_url(&self) -> &str {
113 self.url()
114 }
115
116 #[pyo3(name = "is_connected")]
117 fn py_is_connected(&self) -> bool {
118 self.is_connected()
119 }
120
121 #[pyo3(name = "is_active")]
122 fn py_is_active(&self) -> bool {
123 self.is_active()
124 }
125
126 #[pyo3(name = "is_closed")]
127 fn py_is_closed(&self) -> bool {
128 self.is_closed()
129 }
130
131 #[pyo3(name = "get_subscriptions")]
132 fn py_get_subscriptions(&self) -> Vec<String> {
133 self.get_subscriptions()
134 }
135
136 #[pyo3(name = "cache_instrument")]
137 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
138 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
139 Ok(())
140 }
141
142 #[pyo3(name = "set_account_id")]
143 fn py_set_account_id(&self, account_id: AccountId) {
144 self.set_account_id(account_id);
145 }
146
147 #[pyo3(name = "cache_client_order")]
148 fn py_cache_client_order(
149 &self,
150 client_order_id: ClientOrderId,
151 _venue_order_id: Option<VenueOrderId>,
152 instrument_id: InstrumentId,
153 trader_id: TraderId,
154 strategy_id: StrategyId,
155 ) {
156 self.cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
158 }
159
160 #[pyo3(name = "cancel_all_requests")]
161 fn py_cancel_all_requests(&self) {
162 self.cancel_all_requests();
163 }
164
165 #[pyo3(name = "connect")]
166 fn py_connect<'py>(
167 &mut self,
168 py: Python<'py>,
169 instruments: Vec<Py<PyAny>>,
170 callback: Py<PyAny>,
171 ) -> PyResult<Bound<'py, PyAny>> {
172 let mut instruments_any = Vec::new();
173 for inst in instruments {
174 let inst_any = pyobject_to_instrument_any(py, inst)?;
175 instruments_any.push(inst_any);
176 }
177
178 let mut client = self.clone();
179
180 pyo3_async_runtimes::tokio::future_into_py(py, async move {
181 client.connect().await.map_err(to_pyruntime_err)?;
182
183 client.cache_instruments(instruments_any);
185
186 let stream = client.stream();
187
188 get_runtime().spawn(async move {
189 tokio::pin!(stream);
190
191 while let Some(msg) = stream.next().await {
192 match msg {
193 NautilusWsMessage::Data(data_vec) => {
194 Python::attach(|py| {
195 for data in data_vec {
196 let py_obj = data_to_pycapsule(py, data);
197 call_python(py, &callback, py_obj);
198 }
199 });
200 }
201 NautilusWsMessage::Deltas(deltas) => {
202 Python::attach(|py| {
203 let py_obj = data_to_pycapsule(
204 py,
205 Data::Deltas(OrderBookDeltas_API::new(deltas)),
206 );
207 call_python(py, &callback, py_obj);
208 });
209 }
210 NautilusWsMessage::OrderRejected(event) => {
211 Python::attach(|py| match event.into_py_any(py) {
212 Ok(py_obj) => call_python(py, &callback, py_obj),
213 Err(e) => {
214 tracing::error!(
215 "Failed to convert OrderRejected to Python: {e}"
216 );
217 }
218 });
219 }
220 NautilusWsMessage::OrderAccepted(event) => {
221 Python::attach(|py| match event.into_py_any(py) {
222 Ok(py_obj) => call_python(py, &callback, py_obj),
223 Err(e) => {
224 tracing::error!(
225 "Failed to convert OrderAccepted to Python: {e}"
226 );
227 }
228 });
229 }
230 NautilusWsMessage::OrderCanceled(event) => {
231 Python::attach(|py| match event.into_py_any(py) {
232 Ok(py_obj) => call_python(py, &callback, py_obj),
233 Err(e) => {
234 tracing::error!(
235 "Failed to convert OrderCanceled to Python: {e}"
236 );
237 }
238 });
239 }
240 NautilusWsMessage::OrderExpired(event) => {
241 Python::attach(|py| match event.into_py_any(py) {
242 Ok(py_obj) => call_python(py, &callback, py_obj),
243 Err(e) => {
244 tracing::error!(
245 "Failed to convert OrderExpired to Python: {e}"
246 );
247 }
248 });
249 }
250 NautilusWsMessage::OrderUpdated(event) => {
251 Python::attach(|py| match event.into_py_any(py) {
252 Ok(py_obj) => call_python(py, &callback, py_obj),
253 Err(e) => {
254 tracing::error!(
255 "Failed to convert OrderUpdated to Python: {e}"
256 );
257 }
258 });
259 }
260 NautilusWsMessage::OrderStatusReport(report) => {
261 Python::attach(|py| match (*report).into_py_any(py) {
262 Ok(py_obj) => call_python(py, &callback, py_obj),
263 Err(e) => {
264 tracing::error!(
265 "Failed to convert OrderStatusReport to Python: {e}"
266 );
267 }
268 });
269 }
270 NautilusWsMessage::FillReport(report) => {
271 Python::attach(|py| match (*report).into_py_any(py) {
272 Ok(py_obj) => call_python(py, &callback, py_obj),
273 Err(e) => {
274 tracing::error!("Failed to convert FillReport to Python: {e}");
275 }
276 });
277 }
278 NautilusWsMessage::Reconnected => {
279 tracing::info!("WebSocket reconnected");
280 }
281 }
282 }
283 });
284
285 Ok(())
286 })
287 }
288
289 #[pyo3(name = "wait_until_active")]
290 fn py_wait_until_active<'py>(
291 &self,
292 py: Python<'py>,
293 timeout_secs: f64,
294 ) -> PyResult<Bound<'py, PyAny>> {
295 let client = self.clone();
296
297 pyo3_async_runtimes::tokio::future_into_py(py, async move {
298 client
299 .wait_until_active(timeout_secs)
300 .await
301 .map_err(to_pyruntime_err)?;
302 Ok(())
303 })
304 }
305
306 #[pyo3(name = "authenticate")]
307 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
308 let client = self.clone();
309
310 pyo3_async_runtimes::tokio::future_into_py(py, async move {
311 client.authenticate().await.map_err(to_pyruntime_err)?;
312 Ok(())
313 })
314 }
315
316 #[pyo3(name = "disconnect")]
317 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
318 let mut client = self.clone();
319
320 pyo3_async_runtimes::tokio::future_into_py(py, async move {
321 client.disconnect().await.map_err(to_pyruntime_err)?;
322 Ok(())
323 })
324 }
325
326 #[pyo3(name = "send_ping")]
327 fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
328 let client = self.clone();
329
330 pyo3_async_runtimes::tokio::future_into_py(py, async move {
331 client.send_ping().await.map_err(to_pyruntime_err)?;
332 Ok(())
333 })
334 }
335
336 #[pyo3(name = "close")]
337 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
338 let mut client = self.clone();
339
340 pyo3_async_runtimes::tokio::future_into_py(py, async move {
341 client.close().await.map_err(to_pyruntime_err)?;
342 Ok(())
343 })
344 }
345
346 #[pyo3(name = "subscribe_book")]
347 fn py_subscribe_book<'py>(
348 &self,
349 py: Python<'py>,
350 instrument_id: InstrumentId,
351 depth: Option<u32>,
352 ) -> PyResult<Bound<'py, PyAny>> {
353 let client = self.clone();
354
355 pyo3_async_runtimes::tokio::future_into_py(py, async move {
356 client
357 .subscribe_book(instrument_id, depth)
358 .await
359 .map_err(to_pyruntime_err)?;
360 Ok(())
361 })
362 }
363
364 #[pyo3(name = "subscribe_quotes")]
365 fn py_subscribe_quotes<'py>(
366 &self,
367 py: Python<'py>,
368 instrument_id: InstrumentId,
369 ) -> PyResult<Bound<'py, PyAny>> {
370 let client = self.clone();
371
372 pyo3_async_runtimes::tokio::future_into_py(py, async move {
373 client
374 .subscribe_quotes(instrument_id)
375 .await
376 .map_err(to_pyruntime_err)?;
377 Ok(())
378 })
379 }
380
381 #[pyo3(name = "subscribe_trades")]
382 fn py_subscribe_trades<'py>(
383 &self,
384 py: Python<'py>,
385 instrument_id: InstrumentId,
386 ) -> PyResult<Bound<'py, PyAny>> {
387 let client = self.clone();
388
389 pyo3_async_runtimes::tokio::future_into_py(py, async move {
390 client
391 .subscribe_trades(instrument_id)
392 .await
393 .map_err(to_pyruntime_err)?;
394 Ok(())
395 })
396 }
397
398 #[pyo3(name = "subscribe_bars")]
399 fn py_subscribe_bars<'py>(
400 &self,
401 py: Python<'py>,
402 bar_type: BarType,
403 ) -> PyResult<Bound<'py, PyAny>> {
404 let client = self.clone();
405
406 pyo3_async_runtimes::tokio::future_into_py(py, async move {
407 client
408 .subscribe_bars(bar_type)
409 .await
410 .map_err(to_pyruntime_err)?;
411 Ok(())
412 })
413 }
414
415 #[pyo3(name = "subscribe_executions")]
416 #[pyo3(signature = (snap_orders=true, snap_trades=true))]
417 fn py_subscribe_executions<'py>(
418 &self,
419 py: Python<'py>,
420 snap_orders: bool,
421 snap_trades: bool,
422 ) -> PyResult<Bound<'py, PyAny>> {
423 let client = self.clone();
424
425 pyo3_async_runtimes::tokio::future_into_py(py, async move {
426 client
427 .subscribe_executions(snap_orders, snap_trades)
428 .await
429 .map_err(to_pyruntime_err)?;
430 Ok(())
431 })
432 }
433
434 #[pyo3(name = "unsubscribe_book")]
435 fn py_unsubscribe_book<'py>(
436 &self,
437 py: Python<'py>,
438 instrument_id: InstrumentId,
439 ) -> PyResult<Bound<'py, PyAny>> {
440 let client = self.clone();
441
442 pyo3_async_runtimes::tokio::future_into_py(py, async move {
443 client
444 .unsubscribe_book(instrument_id)
445 .await
446 .map_err(to_pyruntime_err)?;
447 Ok(())
448 })
449 }
450
451 #[pyo3(name = "unsubscribe_quotes")]
452 fn py_unsubscribe_quotes<'py>(
453 &self,
454 py: Python<'py>,
455 instrument_id: InstrumentId,
456 ) -> PyResult<Bound<'py, PyAny>> {
457 let client = self.clone();
458
459 pyo3_async_runtimes::tokio::future_into_py(py, async move {
460 client
461 .unsubscribe_quotes(instrument_id)
462 .await
463 .map_err(to_pyruntime_err)?;
464 Ok(())
465 })
466 }
467
468 #[pyo3(name = "unsubscribe_trades")]
469 fn py_unsubscribe_trades<'py>(
470 &self,
471 py: Python<'py>,
472 instrument_id: InstrumentId,
473 ) -> PyResult<Bound<'py, PyAny>> {
474 let client = self.clone();
475
476 pyo3_async_runtimes::tokio::future_into_py(py, async move {
477 client
478 .unsubscribe_trades(instrument_id)
479 .await
480 .map_err(to_pyruntime_err)?;
481 Ok(())
482 })
483 }
484
485 #[pyo3(name = "unsubscribe_bars")]
486 fn py_unsubscribe_bars<'py>(
487 &self,
488 py: Python<'py>,
489 bar_type: BarType,
490 ) -> PyResult<Bound<'py, PyAny>> {
491 let client = self.clone();
492
493 pyo3_async_runtimes::tokio::future_into_py(py, async move {
494 client
495 .unsubscribe_bars(bar_type)
496 .await
497 .map_err(to_pyruntime_err)?;
498 Ok(())
499 })
500 }
501}
502
503pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
504 if let Err(e) = callback.call1(py, (py_obj,)) {
505 tracing::error!("Error calling Python: {e}");
506 }
507}