1use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{call_python, to_pyruntime_err};
47use nautilus_model::{
48 data::{BarType, Data, OrderBookDeltas_API},
49 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
50 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
51};
52use pyo3::{IntoPyObjectExt, prelude::*};
53use tokio_util::sync::CancellationToken;
54
55use crate::{
56 common::{
57 enums::{KrakenEnvironment, KrakenProductType},
58 urls::get_kraken_ws_private_url,
59 },
60 config::KrakenDataClientConfig,
61 websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
62};
63
64#[pymethods]
65impl KrakenSpotWebSocketClient {
66 #[new]
67 #[pyo3(signature = (environment=None, private=false, base_url=None, heartbeat_secs=None, api_key=None, api_secret=None))]
68 fn py_new(
69 environment: Option<KrakenEnvironment>,
70 private: bool,
71 base_url: Option<String>,
72 heartbeat_secs: Option<u64>,
73 api_key: Option<String>,
74 api_secret: Option<String>,
75 ) -> PyResult<Self> {
76 let env = environment.unwrap_or(KrakenEnvironment::Mainnet);
77
78 let (resolved_api_key, resolved_api_secret) =
79 crate::common::credential::KrakenCredential::resolve_spot(api_key, api_secret)
80 .map(|c| c.into_parts())
81 .map_or((None, None), |(k, s)| (Some(k), Some(s)));
82
83 let (ws_public_url, ws_private_url) = if private {
84 let private_url = base_url.unwrap_or_else(|| {
86 get_kraken_ws_private_url(KrakenProductType::Spot, env).to_string()
87 });
88 (None, Some(private_url))
89 } else {
90 (base_url, None)
91 };
92
93 let config = KrakenDataClientConfig {
94 environment: env,
95 ws_public_url,
96 ws_private_url,
97 heartbeat_interval_secs: heartbeat_secs,
98 api_key: resolved_api_key,
99 api_secret: resolved_api_secret,
100 ..Default::default()
101 };
102
103 let token = CancellationToken::new();
104
105 Ok(Self::new(config, token))
106 }
107
108 #[getter]
109 #[pyo3(name = "url")]
110 #[must_use]
111 pub fn py_url(&self) -> &str {
112 self.url()
113 }
114
115 #[pyo3(name = "is_connected")]
116 fn py_is_connected(&self) -> bool {
117 self.is_connected()
118 }
119
120 #[pyo3(name = "is_active")]
121 fn py_is_active(&self) -> bool {
122 self.is_active()
123 }
124
125 #[pyo3(name = "is_closed")]
126 fn py_is_closed(&self) -> bool {
127 self.is_closed()
128 }
129
130 #[pyo3(name = "get_subscriptions")]
131 fn py_get_subscriptions(&self) -> Vec<String> {
132 self.get_subscriptions()
133 }
134
135 #[pyo3(name = "cache_instrument")]
136 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
137 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
138 Ok(())
139 }
140
141 #[pyo3(name = "set_account_id")]
142 fn py_set_account_id(&self, account_id: AccountId) {
143 self.set_account_id(account_id);
144 }
145
146 #[pyo3(name = "cache_client_order")]
147 fn py_cache_client_order(
148 &self,
149 client_order_id: ClientOrderId,
150 _venue_order_id: Option<VenueOrderId>,
151 instrument_id: InstrumentId,
152 trader_id: TraderId,
153 strategy_id: StrategyId,
154 ) {
155 self.cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
157 }
158
159 #[pyo3(name = "cancel_all_requests")]
160 fn py_cancel_all_requests(&self) {
161 self.cancel_all_requests();
162 }
163
164 #[pyo3(name = "connect")]
165 fn py_connect<'py>(
166 &mut self,
167 py: Python<'py>,
168 instruments: Vec<Py<PyAny>>,
169 callback: Py<PyAny>,
170 ) -> PyResult<Bound<'py, PyAny>> {
171 let mut instruments_any = Vec::new();
172 for inst in instruments {
173 let inst_any = pyobject_to_instrument_any(py, inst)?;
174 instruments_any.push(inst_any);
175 }
176
177 let mut client = self.clone();
178
179 pyo3_async_runtimes::tokio::future_into_py(py, async move {
180 client.connect().await.map_err(to_pyruntime_err)?;
181
182 client.cache_instruments(instruments_any);
184
185 let stream = client.stream();
186
187 get_runtime().spawn(async move {
188 tokio::pin!(stream);
189
190 while let Some(msg) = stream.next().await {
191 match msg {
192 NautilusWsMessage::Data(data_vec) => {
193 Python::attach(|py| {
194 for data in data_vec {
195 let py_obj = data_to_pycapsule(py, data);
196 call_python(py, &callback, py_obj);
197 }
198 });
199 }
200 NautilusWsMessage::Deltas(deltas) => {
201 Python::attach(|py| {
202 let py_obj = data_to_pycapsule(
203 py,
204 Data::Deltas(OrderBookDeltas_API::new(deltas)),
205 );
206 call_python(py, &callback, py_obj);
207 });
208 }
209 NautilusWsMessage::OrderRejected(event) => {
210 Python::attach(|py| match event.into_py_any(py) {
211 Ok(py_obj) => call_python(py, &callback, py_obj),
212 Err(e) => {
213 log::error!("Failed to convert OrderRejected to Python: {e}");
214 }
215 });
216 }
217 NautilusWsMessage::OrderAccepted(event) => {
218 Python::attach(|py| match event.into_py_any(py) {
219 Ok(py_obj) => call_python(py, &callback, py_obj),
220 Err(e) => {
221 log::error!("Failed to convert OrderAccepted to Python: {e}");
222 }
223 });
224 }
225 NautilusWsMessage::OrderCanceled(event) => {
226 Python::attach(|py| match event.into_py_any(py) {
227 Ok(py_obj) => call_python(py, &callback, py_obj),
228 Err(e) => {
229 log::error!("Failed to convert OrderCanceled to Python: {e}");
230 }
231 });
232 }
233 NautilusWsMessage::OrderExpired(event) => {
234 Python::attach(|py| match event.into_py_any(py) {
235 Ok(py_obj) => call_python(py, &callback, py_obj),
236 Err(e) => {
237 log::error!("Failed to convert OrderExpired to Python: {e}");
238 }
239 });
240 }
241 NautilusWsMessage::OrderUpdated(event) => {
242 Python::attach(|py| match event.into_py_any(py) {
243 Ok(py_obj) => call_python(py, &callback, py_obj),
244 Err(e) => {
245 log::error!("Failed to convert OrderUpdated to Python: {e}");
246 }
247 });
248 }
249 NautilusWsMessage::OrderStatusReport(report) => {
250 Python::attach(|py| match (*report).into_py_any(py) {
251 Ok(py_obj) => call_python(py, &callback, py_obj),
252 Err(e) => {
253 log::error!(
254 "Failed to convert OrderStatusReport to Python: {e}"
255 );
256 }
257 });
258 }
259 NautilusWsMessage::FillReport(report) => {
260 Python::attach(|py| match (*report).into_py_any(py) {
261 Ok(py_obj) => call_python(py, &callback, py_obj),
262 Err(e) => {
263 log::error!("Failed to convert FillReport to Python: {e}");
264 }
265 });
266 }
267 NautilusWsMessage::Reconnected => {
268 log::info!("WebSocket reconnected");
269 }
270 }
271 }
272 });
273
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "wait_until_active")]
279 fn py_wait_until_active<'py>(
280 &self,
281 py: Python<'py>,
282 timeout_secs: f64,
283 ) -> PyResult<Bound<'py, PyAny>> {
284 let client = self.clone();
285
286 pyo3_async_runtimes::tokio::future_into_py(py, async move {
287 client
288 .wait_until_active(timeout_secs)
289 .await
290 .map_err(to_pyruntime_err)?;
291 Ok(())
292 })
293 }
294
295 #[pyo3(name = "authenticate")]
296 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
297 let client = self.clone();
298
299 pyo3_async_runtimes::tokio::future_into_py(py, async move {
300 client.authenticate().await.map_err(to_pyruntime_err)?;
301 Ok(())
302 })
303 }
304
305 #[pyo3(name = "disconnect")]
306 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
307 let mut client = self.clone();
308
309 pyo3_async_runtimes::tokio::future_into_py(py, async move {
310 client.disconnect().await.map_err(to_pyruntime_err)?;
311 Ok(())
312 })
313 }
314
315 #[pyo3(name = "send_ping")]
316 fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
317 let client = self.clone();
318
319 pyo3_async_runtimes::tokio::future_into_py(py, async move {
320 client.send_ping().await.map_err(to_pyruntime_err)?;
321 Ok(())
322 })
323 }
324
325 #[pyo3(name = "close")]
326 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
327 let mut client = self.clone();
328
329 pyo3_async_runtimes::tokio::future_into_py(py, async move {
330 client.close().await.map_err(to_pyruntime_err)?;
331 Ok(())
332 })
333 }
334
335 #[pyo3(name = "subscribe_book")]
336 fn py_subscribe_book<'py>(
337 &self,
338 py: Python<'py>,
339 instrument_id: InstrumentId,
340 depth: Option<u32>,
341 ) -> PyResult<Bound<'py, PyAny>> {
342 let client = self.clone();
343
344 pyo3_async_runtimes::tokio::future_into_py(py, async move {
345 client
346 .subscribe_book(instrument_id, depth)
347 .await
348 .map_err(to_pyruntime_err)?;
349 Ok(())
350 })
351 }
352
353 #[pyo3(name = "subscribe_quotes")]
354 fn py_subscribe_quotes<'py>(
355 &self,
356 py: Python<'py>,
357 instrument_id: InstrumentId,
358 ) -> PyResult<Bound<'py, PyAny>> {
359 let client = self.clone();
360
361 pyo3_async_runtimes::tokio::future_into_py(py, async move {
362 client
363 .subscribe_quotes(instrument_id)
364 .await
365 .map_err(to_pyruntime_err)?;
366 Ok(())
367 })
368 }
369
370 #[pyo3(name = "subscribe_trades")]
371 fn py_subscribe_trades<'py>(
372 &self,
373 py: Python<'py>,
374 instrument_id: InstrumentId,
375 ) -> PyResult<Bound<'py, PyAny>> {
376 let client = self.clone();
377
378 pyo3_async_runtimes::tokio::future_into_py(py, async move {
379 client
380 .subscribe_trades(instrument_id)
381 .await
382 .map_err(to_pyruntime_err)?;
383 Ok(())
384 })
385 }
386
387 #[pyo3(name = "subscribe_bars")]
388 fn py_subscribe_bars<'py>(
389 &self,
390 py: Python<'py>,
391 bar_type: BarType,
392 ) -> PyResult<Bound<'py, PyAny>> {
393 let client = self.clone();
394
395 pyo3_async_runtimes::tokio::future_into_py(py, async move {
396 client
397 .subscribe_bars(bar_type)
398 .await
399 .map_err(to_pyruntime_err)?;
400 Ok(())
401 })
402 }
403
404 #[pyo3(name = "subscribe_executions")]
405 #[pyo3(signature = (snap_orders=true, snap_trades=true))]
406 fn py_subscribe_executions<'py>(
407 &self,
408 py: Python<'py>,
409 snap_orders: bool,
410 snap_trades: bool,
411 ) -> PyResult<Bound<'py, PyAny>> {
412 let client = self.clone();
413
414 pyo3_async_runtimes::tokio::future_into_py(py, async move {
415 client
416 .subscribe_executions(snap_orders, snap_trades)
417 .await
418 .map_err(to_pyruntime_err)?;
419 Ok(())
420 })
421 }
422
423 #[pyo3(name = "unsubscribe_book")]
424 fn py_unsubscribe_book<'py>(
425 &self,
426 py: Python<'py>,
427 instrument_id: InstrumentId,
428 ) -> PyResult<Bound<'py, PyAny>> {
429 let client = self.clone();
430
431 pyo3_async_runtimes::tokio::future_into_py(py, async move {
432 client
433 .unsubscribe_book(instrument_id)
434 .await
435 .map_err(to_pyruntime_err)?;
436 Ok(())
437 })
438 }
439
440 #[pyo3(name = "unsubscribe_quotes")]
441 fn py_unsubscribe_quotes<'py>(
442 &self,
443 py: Python<'py>,
444 instrument_id: InstrumentId,
445 ) -> PyResult<Bound<'py, PyAny>> {
446 let client = self.clone();
447
448 pyo3_async_runtimes::tokio::future_into_py(py, async move {
449 client
450 .unsubscribe_quotes(instrument_id)
451 .await
452 .map_err(to_pyruntime_err)?;
453 Ok(())
454 })
455 }
456
457 #[pyo3(name = "unsubscribe_trades")]
458 fn py_unsubscribe_trades<'py>(
459 &self,
460 py: Python<'py>,
461 instrument_id: InstrumentId,
462 ) -> PyResult<Bound<'py, PyAny>> {
463 let client = self.clone();
464
465 pyo3_async_runtimes::tokio::future_into_py(py, async move {
466 client
467 .unsubscribe_trades(instrument_id)
468 .await
469 .map_err(to_pyruntime_err)?;
470 Ok(())
471 })
472 }
473
474 #[pyo3(name = "unsubscribe_bars")]
475 fn py_unsubscribe_bars<'py>(
476 &self,
477 py: Python<'py>,
478 bar_type: BarType,
479 ) -> PyResult<Bound<'py, PyAny>> {
480 let client = self.clone();
481
482 pyo3_async_runtimes::tokio::future_into_py(py, async move {
483 client
484 .unsubscribe_bars(bar_type)
485 .await
486 .map_err(to_pyruntime_err)?;
487 Ok(())
488 })
489 }
490}