1use futures_util::StreamExt;
45use nautilus_core::python::to_pyruntime_err;
46use nautilus_model::{
47 data::{BarType, Data, OrderBookDeltas_API},
48 identifiers::InstrumentId,
49 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::prelude::*;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55 config::KrakenDataClientConfig,
56 websocket::{client::KrakenWebSocketClient, messages::NautilusWsMessage},
57};
58
59#[pymethods]
60impl KrakenWebSocketClient {
61 #[new]
62 fn py_new(url: String) -> PyResult<Self> {
63 let config = KrakenDataClientConfig {
64 ws_public_url: Some(url),
65 ..Default::default()
66 };
67
68 let token = CancellationToken::new();
69
70 Ok(KrakenWebSocketClient::new(config, token))
71 }
72
73 #[getter]
74 #[pyo3(name = "url")]
75 #[must_use]
76 pub fn py_url(&self) -> &str {
77 self.url()
78 }
79
80 #[pyo3(name = "is_connected")]
81 fn py_is_connected(&self) -> bool {
82 self.is_connected()
83 }
84
85 #[pyo3(name = "is_active")]
86 fn py_is_active(&self) -> bool {
87 self.is_active()
88 }
89
90 #[pyo3(name = "is_closed")]
91 fn py_is_closed(&self) -> bool {
92 self.is_closed()
93 }
94
95 #[pyo3(name = "get_subscriptions")]
96 fn py_get_subscriptions(&self) -> Vec<String> {
97 self.get_subscriptions()
98 }
99
100 #[pyo3(name = "cache_instrument")]
101 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
102 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
103 Ok(())
104 }
105
106 #[pyo3(name = "cancel_all_requests")]
107 fn py_cancel_all_requests(&self) {
108 self.cancel_all_requests();
109 }
110
111 #[pyo3(name = "connect")]
112 fn py_connect<'py>(
113 &mut self,
114 py: Python<'py>,
115 instruments: Vec<Py<PyAny>>,
116 callback: Py<PyAny>,
117 ) -> PyResult<Bound<'py, PyAny>> {
118 let mut instruments_any = Vec::new();
119 for inst in instruments {
120 let inst_any = pyobject_to_instrument_any(py, inst)?;
121 instruments_any.push(inst_any);
122 }
123
124 let mut client = self.clone();
125
126 pyo3_async_runtimes::tokio::future_into_py(py, async move {
127 client.connect().await.map_err(to_pyruntime_err)?;
128
129 client.cache_instruments(instruments_any);
131
132 let stream = client.stream();
133
134 tokio::spawn(async move {
135 tokio::pin!(stream);
136
137 while let Some(msg) = stream.next().await {
138 match msg {
139 NautilusWsMessage::Data(data_vec) => {
140 Python::attach(|py| {
141 for data in data_vec {
142 let py_obj = data_to_pycapsule(py, data);
143 call_python(py, &callback, py_obj);
144 }
145 });
146 }
147 NautilusWsMessage::Deltas(deltas) => {
148 Python::attach(|py| {
149 let py_obj = data_to_pycapsule(
150 py,
151 Data::Deltas(OrderBookDeltas_API::new(deltas)),
152 );
153 call_python(py, &callback, py_obj);
154 });
155 }
156 }
157 }
158 });
159
160 Ok(())
161 })
162 }
163
164 #[pyo3(name = "wait_until_active")]
165 fn py_wait_until_active<'py>(
166 &self,
167 py: Python<'py>,
168 timeout_secs: f64,
169 ) -> PyResult<Bound<'py, PyAny>> {
170 let client = self.clone();
171
172 pyo3_async_runtimes::tokio::future_into_py(py, async move {
173 client
174 .wait_until_active(timeout_secs)
175 .await
176 .map_err(to_pyruntime_err)?;
177 Ok(())
178 })
179 }
180
181 #[pyo3(name = "authenticate")]
182 fn py_authenticate<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
183 let client = self.clone();
184
185 pyo3_async_runtimes::tokio::future_into_py(py, async move {
186 client.authenticate().await.map_err(to_pyruntime_err)?;
187 Ok(())
188 })
189 }
190
191 #[pyo3(name = "disconnect")]
192 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
193 let mut client = self.clone();
194
195 pyo3_async_runtimes::tokio::future_into_py(py, async move {
196 client.disconnect().await.map_err(to_pyruntime_err)?;
197 Ok(())
198 })
199 }
200
201 #[pyo3(name = "close")]
202 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
203 let mut client = self.clone();
204
205 pyo3_async_runtimes::tokio::future_into_py(py, async move {
206 client.close().await.map_err(to_pyruntime_err)?;
207 Ok(())
208 })
209 }
210
211 #[pyo3(name = "subscribe_book")]
212 fn py_subscribe_book<'py>(
213 &self,
214 py: Python<'py>,
215 instrument_id: InstrumentId,
216 depth: Option<u32>,
217 ) -> PyResult<Bound<'py, PyAny>> {
218 let client = self.clone();
219
220 pyo3_async_runtimes::tokio::future_into_py(py, async move {
221 client
222 .subscribe_book(instrument_id, depth)
223 .await
224 .map_err(to_pyruntime_err)?;
225 Ok(())
226 })
227 }
228
229 #[pyo3(name = "subscribe_quotes")]
230 fn py_subscribe_quotes<'py>(
231 &self,
232 py: Python<'py>,
233 instrument_id: InstrumentId,
234 ) -> PyResult<Bound<'py, PyAny>> {
235 let client = self.clone();
236
237 pyo3_async_runtimes::tokio::future_into_py(py, async move {
238 client
239 .subscribe_quotes(instrument_id)
240 .await
241 .map_err(to_pyruntime_err)?;
242 Ok(())
243 })
244 }
245
246 #[pyo3(name = "subscribe_trades")]
247 fn py_subscribe_trades<'py>(
248 &self,
249 py: Python<'py>,
250 instrument_id: InstrumentId,
251 ) -> PyResult<Bound<'py, PyAny>> {
252 let client = self.clone();
253
254 pyo3_async_runtimes::tokio::future_into_py(py, async move {
255 client
256 .subscribe_trades(instrument_id)
257 .await
258 .map_err(to_pyruntime_err)?;
259 Ok(())
260 })
261 }
262
263 #[pyo3(name = "subscribe_bars")]
264 fn py_subscribe_bars<'py>(
265 &self,
266 py: Python<'py>,
267 bar_type: BarType,
268 ) -> PyResult<Bound<'py, PyAny>> {
269 let client = self.clone();
270
271 pyo3_async_runtimes::tokio::future_into_py(py, async move {
272 client
273 .subscribe_bars(bar_type)
274 .await
275 .map_err(to_pyruntime_err)?;
276 Ok(())
277 })
278 }
279
280 #[pyo3(name = "unsubscribe_book")]
281 fn py_unsubscribe_book<'py>(
282 &self,
283 py: Python<'py>,
284 instrument_id: InstrumentId,
285 ) -> PyResult<Bound<'py, PyAny>> {
286 let client = self.clone();
287
288 pyo3_async_runtimes::tokio::future_into_py(py, async move {
289 client
290 .unsubscribe_book(instrument_id)
291 .await
292 .map_err(to_pyruntime_err)?;
293 Ok(())
294 })
295 }
296
297 #[pyo3(name = "unsubscribe_quotes")]
298 fn py_unsubscribe_quotes<'py>(
299 &self,
300 py: Python<'py>,
301 instrument_id: InstrumentId,
302 ) -> PyResult<Bound<'py, PyAny>> {
303 let client = self.clone();
304
305 pyo3_async_runtimes::tokio::future_into_py(py, async move {
306 client
307 .unsubscribe_quotes(instrument_id)
308 .await
309 .map_err(to_pyruntime_err)?;
310 Ok(())
311 })
312 }
313
314 #[pyo3(name = "unsubscribe_trades")]
315 fn py_unsubscribe_trades<'py>(
316 &self,
317 py: Python<'py>,
318 instrument_id: InstrumentId,
319 ) -> PyResult<Bound<'py, PyAny>> {
320 let client = self.clone();
321
322 pyo3_async_runtimes::tokio::future_into_py(py, async move {
323 client
324 .unsubscribe_trades(instrument_id)
325 .await
326 .map_err(to_pyruntime_err)?;
327 Ok(())
328 })
329 }
330
331 #[pyo3(name = "unsubscribe_bars")]
332 fn py_unsubscribe_bars<'py>(
333 &self,
334 py: Python<'py>,
335 bar_type: BarType,
336 ) -> PyResult<Bound<'py, PyAny>> {
337 let client = self.clone();
338
339 pyo3_async_runtimes::tokio::future_into_py(py, async move {
340 client
341 .unsubscribe_bars(bar_type)
342 .await
343 .map_err(to_pyruntime_err)?;
344 Ok(())
345 })
346 }
347
348 #[pyo3(name = "send_ping")]
349 fn py_send_ping<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
350 let client = self.clone();
351
352 pyo3_async_runtimes::tokio::future_into_py(py, async move {
353 client.send_ping().await.map_err(to_pyruntime_err)?;
354 Ok(())
355 })
356 }
357}
358
359pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
360 if let Err(e) = callback.call1(py, (py_obj,)) {
361 tracing::error!("Error calling Python: {e}");
362 }
363}