nautilus_coinbase_intx/python/
websocket.rs1use futures_util::StreamExt;
45use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47 data::BarType,
48 identifiers::InstrumentId,
49 python::{
50 data::data_to_pycapsule,
51 events::order::order_event_to_pyobject,
52 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53 },
54};
55use pyo3::{exceptions::PyRuntimeError, prelude::*};
56
57use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl CoinbaseIntxWebSocketClient {
61 #[new]
62 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
63 fn py_new(
64 url: Option<String>,
65 api_key: Option<String>,
66 api_secret: Option<String>,
67 api_passphrase: Option<String>,
68 heartbeat: Option<u64>,
69 ) -> PyResult<Self> {
70 Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
71 }
72
73 #[getter]
74 #[pyo3(name = "url")]
75 #[must_use]
76 pub const fn py_url(&self) -> &str {
77 self.url()
78 }
79
80 #[getter]
81 #[pyo3(name = "api_key")]
82 #[must_use]
83 pub fn py_api_key(&self) -> &str {
84 self.api_key()
85 }
86
87 #[getter]
88 #[pyo3(name = "api_key_masked")]
89 #[must_use]
90 pub fn py_api_key_masked(&self) -> String {
91 self.api_key_masked()
92 }
93
94 #[pyo3(name = "is_active")]
95 fn py_is_active(&mut self) -> bool {
96 self.is_active()
97 }
98
99 #[pyo3(name = "is_closed")]
100 fn py_is_closed(&mut self) -> bool {
101 self.is_closed()
102 }
103
104 #[pyo3(name = "get_subscriptions")]
105 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
106 let channels = self.get_subscriptions(instrument_id);
107
108 channels
110 .iter()
111 .map(|c| {
112 serde_json::to_value(c)
113 .ok()
114 .and_then(|v| v.as_str().map(String::from))
115 .unwrap_or_else(|| c.to_string())
116 })
117 .collect()
118 }
119
120 #[pyo3(name = "connect")]
121 fn py_connect<'py>(
122 &mut self,
123 py: Python<'py>,
124 instruments: Vec<Py<PyAny>>,
125 callback: Py<PyAny>,
126 ) -> PyResult<Bound<'py, PyAny>> {
127 let mut instruments_any = Vec::new();
128 for inst in instruments {
129 let inst_any = pyobject_to_instrument_any(py, inst)?;
130 instruments_any.push(inst_any);
131 }
132
133 self.cache_instruments(instruments_any);
134
135 let mut client = self.clone();
136
137 pyo3_async_runtimes::tokio::future_into_py(py, async move {
138 client.connect().await.map_err(to_pyruntime_err)?;
139
140 let stream = client.stream();
141
142 tokio::spawn(async move {
143 tokio::pin!(stream);
144
145 while let Some(msg) = stream.next().await {
146 match msg {
147 NautilusWsMessage::Instrument(inst) => Python::attach(|py| {
148 let py_obj = instrument_any_to_pyobject(py, inst)
149 .expect("Failed to create instrument");
150 call_python(py, &callback, py_obj);
151 }),
152 NautilusWsMessage::Data(data) => Python::attach(|py| {
153 let py_obj = data_to_pycapsule(py, data);
154 call_python(py, &callback, py_obj);
155 }),
156 NautilusWsMessage::DataVec(data_vec) => Python::attach(|py| {
157 for data in data_vec {
158 let py_obj = data_to_pycapsule(py, data);
159 call_python(py, &callback, py_obj);
160 }
161 }),
162 NautilusWsMessage::Deltas(deltas) => Python::attach(|py| {
163 call_python(py, &callback, deltas.into_py_any_unwrap(py));
164 }),
165 NautilusWsMessage::MarkPrice(mark_price) => Python::attach(|py| {
166 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
167 }),
168 NautilusWsMessage::IndexPrice(index_price) => Python::attach(|py| {
169 call_python(py, &callback, index_price.into_py_any_unwrap(py));
170 }),
171 NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
172 Python::attach(|py| {
173 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
174 call_python(py, &callback, index_price.into_py_any_unwrap(py));
175 });
176 }
177 NautilusWsMessage::OrderEvent(msg) => Python::attach(|py| {
178 let py_obj =
179 order_event_to_pyobject(py, msg).expect("Failed to create event");
180 call_python(py, &callback, py_obj);
181 }),
182 }
183 }
184 });
185
186 Ok(())
187 })
188 }
189
190 #[pyo3(name = "wait_until_active")]
191 fn py_wait_until_active<'py>(
192 &self,
193 py: Python<'py>,
194 timeout_secs: f64,
195 ) -> PyResult<Bound<'py, PyAny>> {
196 let client = self.clone();
197
198 pyo3_async_runtimes::tokio::future_into_py(py, async move {
199 client
200 .wait_until_active(timeout_secs)
201 .await
202 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
203 Ok(())
204 })
205 }
206
207 #[pyo3(name = "close")]
208 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
209 let mut client = self.clone();
210
211 pyo3_async_runtimes::tokio::future_into_py(py, async move {
212 if let Err(e) = client.close().await {
213 log::error!("Error on close: {e}");
214 }
215 Ok(())
216 })
217 }
218
219 #[pyo3(name = "subscribe_instruments")]
220 #[pyo3(signature = (instrument_ids=None))]
221 fn py_subscribe_instruments<'py>(
222 &self,
223 py: Python<'py>,
224 instrument_ids: Option<Vec<InstrumentId>>,
225 ) -> PyResult<Bound<'py, PyAny>> {
226 let client = self.clone();
227 let instrument_ids = instrument_ids.unwrap_or_default();
228
229 pyo3_async_runtimes::tokio::future_into_py(py, async move {
230 if let Err(e) = client.subscribe_instruments(instrument_ids).await {
231 log::error!("Failed to subscribe to instruments: {e}");
232 }
233 Ok(())
234 })
235 }
236
237 #[pyo3(name = "subscribe_book")]
238 fn py_subscribe_book<'py>(
239 &self,
240 py: Python<'py>,
241 instrument_ids: Vec<InstrumentId>,
242 ) -> PyResult<Bound<'py, PyAny>> {
243 let client = self.clone();
244
245 pyo3_async_runtimes::tokio::future_into_py(py, async move {
246 if let Err(e) = client.subscribe_book(instrument_ids).await {
247 log::error!("Failed to subscribe to order book: {e}");
248 }
249 Ok(())
250 })
251 }
252
253 #[pyo3(name = "subscribe_quotes")]
254 fn py_subscribe_quotes<'py>(
255 &self,
256 py: Python<'py>,
257 instrument_ids: Vec<InstrumentId>,
258 ) -> PyResult<Bound<'py, PyAny>> {
259 let client = self.clone();
260
261 pyo3_async_runtimes::tokio::future_into_py(py, async move {
262 if let Err(e) = client.subscribe_quotes(instrument_ids).await {
263 log::error!("Failed to subscribe to quotes: {e}");
264 }
265 Ok(())
266 })
267 }
268
269 #[pyo3(name = "subscribe_trades")]
270 fn py_subscribe_trades<'py>(
271 &self,
272 py: Python<'py>,
273 instrument_ids: Vec<InstrumentId>,
274 ) -> PyResult<Bound<'py, PyAny>> {
275 let client = self.clone();
276
277 pyo3_async_runtimes::tokio::future_into_py(py, async move {
278 if let Err(e) = client.subscribe_trades(instrument_ids).await {
279 log::error!("Failed to subscribe to trades: {e}");
280 }
281 Ok(())
282 })
283 }
284
285 #[pyo3(name = "subscribe_mark_prices")]
286 fn py_subscribe_mark_prices<'py>(
287 &self,
288 py: Python<'py>,
289 instrument_ids: Vec<InstrumentId>,
290 ) -> PyResult<Bound<'py, PyAny>> {
291 let client = self.clone();
292
293 pyo3_async_runtimes::tokio::future_into_py(py, async move {
294 if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
295 log::error!("Failed to subscribe to mark prices: {e}");
296 }
297 Ok(())
298 })
299 }
300
301 #[pyo3(name = "subscribe_index_prices")]
302 fn py_subscribe_index_prices<'py>(
303 &self,
304 py: Python<'py>,
305 instrument_ids: Vec<InstrumentId>,
306 ) -> PyResult<Bound<'py, PyAny>> {
307 let client = self.clone();
308
309 pyo3_async_runtimes::tokio::future_into_py(py, async move {
310 if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
311 log::error!("Failed to subscribe to index prices: {e}");
312 }
313 Ok(())
314 })
315 }
316
317 #[pyo3(name = "subscribe_bars")]
318 fn py_subscribe_bars<'py>(
319 &self,
320 py: Python<'py>,
321 bar_type: BarType,
322 ) -> PyResult<Bound<'py, PyAny>> {
323 let client = self.clone();
324
325 pyo3_async_runtimes::tokio::future_into_py(py, async move {
326 if let Err(e) = client.subscribe_bars(bar_type).await {
327 log::error!("Failed to subscribe to bars: {e}");
328 }
329 Ok(())
330 })
331 }
332
333 #[pyo3(name = "unsubscribe_instruments")]
334 fn py_unsubscribe_instruments<'py>(
335 &self,
336 py: Python<'py>,
337 instrument_ids: Vec<InstrumentId>,
338 ) -> PyResult<Bound<'py, PyAny>> {
339 let client = self.clone();
340
341 pyo3_async_runtimes::tokio::future_into_py(py, async move {
342 if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
343 log::error!("Failed to unsubscribe from order book: {e}");
344 }
345 Ok(())
346 })
347 }
348
349 #[pyo3(name = "unsubscribe_book")]
350 fn py_unsubscribe_book<'py>(
351 &self,
352 py: Python<'py>,
353 instrument_ids: Vec<InstrumentId>,
354 ) -> PyResult<Bound<'py, PyAny>> {
355 let client = self.clone();
356
357 pyo3_async_runtimes::tokio::future_into_py(py, async move {
358 if let Err(e) = client.unsubscribe_book(instrument_ids).await {
359 log::error!("Failed to unsubscribe from order book: {e}");
360 }
361 Ok(())
362 })
363 }
364
365 #[pyo3(name = "unsubscribe_quotes")]
366 fn py_unsubscribe_quotes<'py>(
367 &self,
368 py: Python<'py>,
369 instrument_ids: Vec<InstrumentId>,
370 ) -> PyResult<Bound<'py, PyAny>> {
371 let client = self.clone();
372
373 pyo3_async_runtimes::tokio::future_into_py(py, async move {
374 if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
375 log::error!("Failed to unsubscribe from quotes: {e}");
376 }
377 Ok(())
378 })
379 }
380
381 #[pyo3(name = "unsubscribe_trades")]
382 fn py_unsubscribe_trades<'py>(
383 &self,
384 py: Python<'py>,
385 instrument_ids: Vec<InstrumentId>,
386 ) -> PyResult<Bound<'py, PyAny>> {
387 let client = self.clone();
388
389 pyo3_async_runtimes::tokio::future_into_py(py, async move {
390 if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
391 log::error!("Failed to unsubscribe from trades: {e}");
392 }
393 Ok(())
394 })
395 }
396
397 #[pyo3(name = "unsubscribe_mark_prices")]
398 fn py_unsubscribe_mark_prices<'py>(
399 &self,
400 py: Python<'py>,
401 instrument_ids: Vec<InstrumentId>,
402 ) -> PyResult<Bound<'py, PyAny>> {
403 let client = self.clone();
404
405 pyo3_async_runtimes::tokio::future_into_py(py, async move {
406 if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
407 log::error!("Failed to unsubscribe from mark prices: {e}");
408 }
409 Ok(())
410 })
411 }
412
413 #[pyo3(name = "unsubscribe_index_prices")]
414 fn py_unsubscribe_index_prices<'py>(
415 &self,
416 py: Python<'py>,
417 instrument_ids: Vec<InstrumentId>,
418 ) -> PyResult<Bound<'py, PyAny>> {
419 let client = self.clone();
420
421 pyo3_async_runtimes::tokio::future_into_py(py, async move {
422 if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
423 log::error!("Failed to unsubscribe from index prices: {e}");
424 }
425 Ok(())
426 })
427 }
428
429 #[pyo3(name = "unsubscribe_bars")]
430 fn py_unsubscribe_bars<'py>(
431 &self,
432 py: Python<'py>,
433 bar_type: BarType,
434 ) -> PyResult<Bound<'py, PyAny>> {
435 let client = self.clone();
436
437 pyo3_async_runtimes::tokio::future_into_py(py, async move {
438 if let Err(e) = client.unsubscribe_bars(bar_type).await {
439 log::error!("Failed to unsubscribe from bars: {e}");
440 }
441 Ok(())
442 })
443 }
444}
445
446pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
447 if let Err(e) = callback.call1(py, (py_obj,)) {
448 tracing::error!("Error calling Python: {e}");
449 }
450}