nautilus_coinbase_intx/python/
websocket.rs1use futures_util::StreamExt;
45use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47 data::BarType,
48 identifiers::InstrumentId,
49 python::{
50 data::data_to_pycapsule,
51 events::order::order_event_to_pyobject,
52 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53 },
54};
55use pyo3::{exceptions::PyRuntimeError, prelude::*};
56
57use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl CoinbaseIntxWebSocketClient {
61 #[new]
62 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
63 fn py_new(
64 url: Option<String>,
65 api_key: Option<String>,
66 api_secret: Option<String>,
67 api_passphrase: Option<String>,
68 heartbeat: Option<u64>,
69 ) -> PyResult<Self> {
70 Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
71 }
72
73 #[getter]
74 #[pyo3(name = "url")]
75 #[must_use]
76 pub const fn py_url(&self) -> &str {
77 self.url()
78 }
79
80 #[getter]
81 #[pyo3(name = "api_key")]
82 #[must_use]
83 pub fn py_api_key(&self) -> &str {
84 self.api_key()
85 }
86
87 #[pyo3(name = "is_active")]
88 fn py_is_active(&mut self) -> bool {
89 self.is_active()
90 }
91
92 #[pyo3(name = "is_closed")]
93 fn py_is_closed(&mut self) -> bool {
94 self.is_closed()
95 }
96
97 #[pyo3(name = "get_subscriptions")]
98 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
99 let channels = self.get_subscriptions(instrument_id);
100
101 channels
103 .iter()
104 .map(|c| {
105 serde_json::to_value(c)
106 .ok()
107 .and_then(|v| v.as_str().map(String::from))
108 .unwrap_or_else(|| c.to_string())
109 })
110 .collect()
111 }
112
113 #[pyo3(name = "connect")]
114 fn py_connect<'py>(
115 &mut self,
116 py: Python<'py>,
117 instruments: Vec<PyObject>,
118 callback: PyObject,
119 ) -> PyResult<Bound<'py, PyAny>> {
120 let mut instruments_any = Vec::new();
121 for inst in instruments {
122 let inst_any = pyobject_to_instrument_any(py, inst)?;
123 instruments_any.push(inst_any);
124 }
125
126 self.initialize_instruments_cache(instruments_any);
127
128 let mut client = self.clone();
129
130 pyo3_async_runtimes::tokio::future_into_py(py, async move {
131 client.connect().await.map_err(to_pyruntime_err)?;
132
133 let stream = client.stream();
134
135 tokio::spawn(async move {
136 tokio::pin!(stream);
137
138 while let Some(msg) = stream.next().await {
139 match msg {
140 NautilusWsMessage::Instrument(inst) => Python::with_gil(|py| {
141 let py_obj = instrument_any_to_pyobject(py, inst)
142 .expect("Failed to create instrument");
143 call_python(py, &callback, py_obj);
144 }),
145 NautilusWsMessage::Data(data) => Python::with_gil(|py| {
146 let py_obj = data_to_pycapsule(py, data);
147 call_python(py, &callback, py_obj);
148 }),
149 NautilusWsMessage::DataVec(data_vec) => Python::with_gil(|py| {
150 for data in data_vec {
151 let py_obj = data_to_pycapsule(py, data);
152 call_python(py, &callback, py_obj);
153 }
154 }),
155 NautilusWsMessage::Deltas(deltas) => Python::with_gil(|py| {
156 call_python(py, &callback, deltas.into_py_any_unwrap(py));
157 }),
158 NautilusWsMessage::MarkPrice(mark_price) => Python::with_gil(|py| {
159 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
160 }),
161 NautilusWsMessage::IndexPrice(index_price) => Python::with_gil(|py| {
162 call_python(py, &callback, index_price.into_py_any_unwrap(py));
163 }),
164 NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
165 Python::with_gil(|py| {
166 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
167 call_python(py, &callback, index_price.into_py_any_unwrap(py));
168 });
169 }
170 NautilusWsMessage::OrderEvent(msg) => Python::with_gil(|py| {
171 let py_obj =
172 order_event_to_pyobject(py, msg).expect("Failed to create event");
173 call_python(py, &callback, py_obj);
174 }),
175 }
176 }
177 });
178
179 Ok(())
180 })
181 }
182
183 #[pyo3(name = "wait_until_active")]
184 fn py_wait_until_active<'py>(
185 &self,
186 py: Python<'py>,
187 timeout_secs: f64,
188 ) -> PyResult<Bound<'py, PyAny>> {
189 let client = self.clone();
190
191 pyo3_async_runtimes::tokio::future_into_py(py, async move {
192 client
193 .wait_until_active(timeout_secs)
194 .await
195 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
196 Ok(())
197 })
198 }
199
200 #[pyo3(name = "close")]
201 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
202 let mut client = self.clone();
203
204 pyo3_async_runtimes::tokio::future_into_py(py, async move {
205 if let Err(e) = client.close().await {
206 log::error!("Error on close: {e}");
207 }
208 Ok(())
209 })
210 }
211
212 #[pyo3(name = "subscribe_instruments")]
213 #[pyo3(signature = (instrument_ids=None))]
214 fn py_subscribe_instruments<'py>(
215 &self,
216 py: Python<'py>,
217 instrument_ids: Option<Vec<InstrumentId>>,
218 ) -> PyResult<Bound<'py, PyAny>> {
219 let client = self.clone();
220 let instrument_ids = instrument_ids.unwrap_or_default();
221
222 pyo3_async_runtimes::tokio::future_into_py(py, async move {
223 if let Err(e) = client.subscribe_instruments(instrument_ids).await {
224 log::error!("Failed to subscribe to instruments: {e}");
225 }
226 Ok(())
227 })
228 }
229
230 #[pyo3(name = "subscribe_book")]
231 fn py_subscribe_book<'py>(
232 &self,
233 py: Python<'py>,
234 instrument_ids: Vec<InstrumentId>,
235 ) -> PyResult<Bound<'py, PyAny>> {
236 let client = self.clone();
237
238 pyo3_async_runtimes::tokio::future_into_py(py, async move {
239 if let Err(e) = client.subscribe_book(instrument_ids).await {
240 log::error!("Failed to subscribe to order book: {e}");
241 }
242 Ok(())
243 })
244 }
245
246 #[pyo3(name = "subscribe_quotes")]
247 fn py_subscribe_quotes<'py>(
248 &self,
249 py: Python<'py>,
250 instrument_ids: Vec<InstrumentId>,
251 ) -> PyResult<Bound<'py, PyAny>> {
252 let client = self.clone();
253
254 pyo3_async_runtimes::tokio::future_into_py(py, async move {
255 if let Err(e) = client.subscribe_quotes(instrument_ids).await {
256 log::error!("Failed to subscribe to quotes: {e}");
257 }
258 Ok(())
259 })
260 }
261
262 #[pyo3(name = "subscribe_trades")]
263 fn py_subscribe_trades<'py>(
264 &self,
265 py: Python<'py>,
266 instrument_ids: Vec<InstrumentId>,
267 ) -> PyResult<Bound<'py, PyAny>> {
268 let client = self.clone();
269
270 pyo3_async_runtimes::tokio::future_into_py(py, async move {
271 if let Err(e) = client.subscribe_trades(instrument_ids).await {
272 log::error!("Failed to subscribe to trades: {e}");
273 }
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "subscribe_mark_prices")]
279 fn py_subscribe_mark_prices<'py>(
280 &self,
281 py: Python<'py>,
282 instrument_ids: Vec<InstrumentId>,
283 ) -> PyResult<Bound<'py, PyAny>> {
284 let client = self.clone();
285
286 pyo3_async_runtimes::tokio::future_into_py(py, async move {
287 if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
288 log::error!("Failed to subscribe to mark prices: {e}");
289 }
290 Ok(())
291 })
292 }
293
294 #[pyo3(name = "subscribe_index_prices")]
295 fn py_subscribe_index_prices<'py>(
296 &self,
297 py: Python<'py>,
298 instrument_ids: Vec<InstrumentId>,
299 ) -> PyResult<Bound<'py, PyAny>> {
300 let client = self.clone();
301
302 pyo3_async_runtimes::tokio::future_into_py(py, async move {
303 if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
304 log::error!("Failed to subscribe to index prices: {e}");
305 }
306 Ok(())
307 })
308 }
309
310 #[pyo3(name = "subscribe_bars")]
311 fn py_subscribe_bars<'py>(
312 &self,
313 py: Python<'py>,
314 bar_type: BarType,
315 ) -> PyResult<Bound<'py, PyAny>> {
316 let client = self.clone();
317
318 pyo3_async_runtimes::tokio::future_into_py(py, async move {
319 if let Err(e) = client.subscribe_bars(bar_type).await {
320 log::error!("Failed to subscribe to bars: {e}");
321 }
322 Ok(())
323 })
324 }
325
326 #[pyo3(name = "unsubscribe_instruments")]
327 fn py_unsubscribe_instruments<'py>(
328 &self,
329 py: Python<'py>,
330 instrument_ids: Vec<InstrumentId>,
331 ) -> PyResult<Bound<'py, PyAny>> {
332 let client = self.clone();
333
334 pyo3_async_runtimes::tokio::future_into_py(py, async move {
335 if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
336 log::error!("Failed to unsubscribe from order book: {e}");
337 }
338 Ok(())
339 })
340 }
341
342 #[pyo3(name = "unsubscribe_book")]
343 fn py_unsubscribe_book<'py>(
344 &self,
345 py: Python<'py>,
346 instrument_ids: Vec<InstrumentId>,
347 ) -> PyResult<Bound<'py, PyAny>> {
348 let client = self.clone();
349
350 pyo3_async_runtimes::tokio::future_into_py(py, async move {
351 if let Err(e) = client.unsubscribe_book(instrument_ids).await {
352 log::error!("Failed to unsubscribe from order book: {e}");
353 }
354 Ok(())
355 })
356 }
357
358 #[pyo3(name = "unsubscribe_quotes")]
359 fn py_unsubscribe_quotes<'py>(
360 &self,
361 py: Python<'py>,
362 instrument_ids: Vec<InstrumentId>,
363 ) -> PyResult<Bound<'py, PyAny>> {
364 let client = self.clone();
365
366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
367 if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
368 log::error!("Failed to unsubscribe from quotes: {e}");
369 }
370 Ok(())
371 })
372 }
373
374 #[pyo3(name = "unsubscribe_trades")]
375 fn py_unsubscribe_trades<'py>(
376 &self,
377 py: Python<'py>,
378 instrument_ids: Vec<InstrumentId>,
379 ) -> PyResult<Bound<'py, PyAny>> {
380 let client = self.clone();
381
382 pyo3_async_runtimes::tokio::future_into_py(py, async move {
383 if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
384 log::error!("Failed to unsubscribe from trades: {e}");
385 }
386 Ok(())
387 })
388 }
389
390 #[pyo3(name = "unsubscribe_mark_prices")]
391 fn py_unsubscribe_mark_prices<'py>(
392 &self,
393 py: Python<'py>,
394 instrument_ids: Vec<InstrumentId>,
395 ) -> PyResult<Bound<'py, PyAny>> {
396 let client = self.clone();
397
398 pyo3_async_runtimes::tokio::future_into_py(py, async move {
399 if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
400 log::error!("Failed to unsubscribe from mark prices: {e}");
401 }
402 Ok(())
403 })
404 }
405
406 #[pyo3(name = "unsubscribe_index_prices")]
407 fn py_unsubscribe_index_prices<'py>(
408 &self,
409 py: Python<'py>,
410 instrument_ids: Vec<InstrumentId>,
411 ) -> PyResult<Bound<'py, PyAny>> {
412 let client = self.clone();
413
414 pyo3_async_runtimes::tokio::future_into_py(py, async move {
415 if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
416 log::error!("Failed to unsubscribe from index prices: {e}");
417 }
418 Ok(())
419 })
420 }
421
422 #[pyo3(name = "unsubscribe_bars")]
423 fn py_unsubscribe_bars<'py>(
424 &self,
425 py: Python<'py>,
426 bar_type: BarType,
427 ) -> PyResult<Bound<'py, PyAny>> {
428 let client = self.clone();
429
430 pyo3_async_runtimes::tokio::future_into_py(py, async move {
431 if let Err(e) = client.unsubscribe_bars(bar_type).await {
432 log::error!("Failed to unsubscribe from bars: {e}");
433 }
434 Ok(())
435 })
436 }
437}
438
439pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
440 if let Err(e) = callback.call1(py, (py_obj,)) {
441 tracing::error!("Error calling Python: {e}");
442 }
443}