nautilus_coinbase_intx/python/
websocket.rs1use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{
47 IntoPyObjectNautilusExt, call_python, to_pyruntime_err, to_pyvalue_err,
48};
49use nautilus_model::{
50 data::BarType,
51 identifiers::InstrumentId,
52 python::{
53 data::data_to_pycapsule,
54 events::order::order_event_to_pyobject,
55 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
56 },
57};
58use pyo3::{exceptions::PyRuntimeError, prelude::*};
59
60use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
61
62#[pymethods]
63impl CoinbaseIntxWebSocketClient {
64 #[new]
65 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
66 fn py_new(
67 url: Option<String>,
68 api_key: Option<String>,
69 api_secret: Option<String>,
70 api_passphrase: Option<String>,
71 heartbeat: Option<u64>,
72 ) -> PyResult<Self> {
73 Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
74 }
75
76 #[getter]
77 #[pyo3(name = "url")]
78 #[must_use]
79 pub const fn py_url(&self) -> &str {
80 self.url()
81 }
82
83 #[getter]
84 #[pyo3(name = "api_key")]
85 #[must_use]
86 pub fn py_api_key(&self) -> &str {
87 self.api_key()
88 }
89
90 #[getter]
91 #[pyo3(name = "api_key_masked")]
92 #[must_use]
93 pub fn py_api_key_masked(&self) -> String {
94 self.api_key_masked()
95 }
96
97 #[pyo3(name = "is_active")]
98 fn py_is_active(&mut self) -> bool {
99 self.is_active()
100 }
101
102 #[pyo3(name = "is_closed")]
103 fn py_is_closed(&mut self) -> bool {
104 self.is_closed()
105 }
106
107 #[pyo3(name = "get_subscriptions")]
108 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
109 let channels = self.get_subscriptions(instrument_id);
110
111 channels
113 .iter()
114 .map(|c| {
115 serde_json::to_value(c)
116 .ok()
117 .and_then(|v| v.as_str().map(String::from))
118 .unwrap_or_else(|| c.to_string())
119 })
120 .collect()
121 }
122
123 #[pyo3(name = "connect")]
124 fn py_connect<'py>(
125 &mut self,
126 py: Python<'py>,
127 instruments: Vec<Py<PyAny>>,
128 callback: Py<PyAny>,
129 ) -> PyResult<Bound<'py, PyAny>> {
130 let mut instruments_any = Vec::new();
131 for inst in instruments {
132 let inst_any = pyobject_to_instrument_any(py, inst)?;
133 instruments_any.push(inst_any);
134 }
135
136 self.cache_instruments(instruments_any);
137
138 let mut client = self.clone();
139
140 pyo3_async_runtimes::tokio::future_into_py(py, async move {
141 client.connect().await.map_err(to_pyruntime_err)?;
142
143 let stream = client.stream();
144
145 get_runtime().spawn(async move {
146 tokio::pin!(stream);
147
148 while let Some(msg) = stream.next().await {
149 match msg {
150 NautilusWsMessage::Instrument(inst) => Python::attach(|py| {
151 let py_obj = instrument_any_to_pyobject(py, inst)
152 .expect("Failed to create instrument");
153 call_python(py, &callback, py_obj);
154 }),
155 NautilusWsMessage::Data(data) => Python::attach(|py| {
156 let py_obj = data_to_pycapsule(py, data);
157 call_python(py, &callback, py_obj);
158 }),
159 NautilusWsMessage::DataVec(data_vec) => Python::attach(|py| {
160 for data in data_vec {
161 let py_obj = data_to_pycapsule(py, data);
162 call_python(py, &callback, py_obj);
163 }
164 }),
165 NautilusWsMessage::Deltas(deltas) => Python::attach(|py| {
166 call_python(py, &callback, deltas.into_py_any_unwrap(py));
167 }),
168 NautilusWsMessage::MarkPrice(mark_price) => Python::attach(|py| {
169 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
170 }),
171 NautilusWsMessage::IndexPrice(index_price) => Python::attach(|py| {
172 call_python(py, &callback, index_price.into_py_any_unwrap(py));
173 }),
174 NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
175 Python::attach(|py| {
176 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
177 call_python(py, &callback, index_price.into_py_any_unwrap(py));
178 });
179 }
180 NautilusWsMessage::OrderEvent(msg) => Python::attach(|py| {
181 let py_obj =
182 order_event_to_pyobject(py, msg).expect("Failed to create event");
183 call_python(py, &callback, py_obj);
184 }),
185 }
186 }
187 });
188
189 Ok(())
190 })
191 }
192
193 #[pyo3(name = "wait_until_active")]
194 fn py_wait_until_active<'py>(
195 &self,
196 py: Python<'py>,
197 timeout_secs: f64,
198 ) -> PyResult<Bound<'py, PyAny>> {
199 let client = self.clone();
200
201 pyo3_async_runtimes::tokio::future_into_py(py, async move {
202 client
203 .wait_until_active(timeout_secs)
204 .await
205 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
206 Ok(())
207 })
208 }
209
210 #[pyo3(name = "close")]
211 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
212 let mut client = self.clone();
213
214 pyo3_async_runtimes::tokio::future_into_py(py, async move {
215 if let Err(e) = client.close().await {
216 log::error!("Error on close: {e}");
217 }
218 Ok(())
219 })
220 }
221
222 #[pyo3(name = "subscribe_instruments")]
223 #[pyo3(signature = (instrument_ids=None))]
224 fn py_subscribe_instruments<'py>(
225 &self,
226 py: Python<'py>,
227 instrument_ids: Option<Vec<InstrumentId>>,
228 ) -> PyResult<Bound<'py, PyAny>> {
229 let client = self.clone();
230 let instrument_ids = instrument_ids.unwrap_or_default();
231
232 pyo3_async_runtimes::tokio::future_into_py(py, async move {
233 if let Err(e) = client.subscribe_instruments(instrument_ids).await {
234 log::error!("Failed to subscribe to instruments: {e}");
235 }
236 Ok(())
237 })
238 }
239
240 #[pyo3(name = "subscribe_book")]
241 fn py_subscribe_book<'py>(
242 &self,
243 py: Python<'py>,
244 instrument_ids: Vec<InstrumentId>,
245 ) -> PyResult<Bound<'py, PyAny>> {
246 let client = self.clone();
247
248 pyo3_async_runtimes::tokio::future_into_py(py, async move {
249 if let Err(e) = client.subscribe_book(instrument_ids).await {
250 log::error!("Failed to subscribe to order book: {e}");
251 }
252 Ok(())
253 })
254 }
255
256 #[pyo3(name = "subscribe_quotes")]
257 fn py_subscribe_quotes<'py>(
258 &self,
259 py: Python<'py>,
260 instrument_ids: Vec<InstrumentId>,
261 ) -> PyResult<Bound<'py, PyAny>> {
262 let client = self.clone();
263
264 pyo3_async_runtimes::tokio::future_into_py(py, async move {
265 if let Err(e) = client.subscribe_quotes(instrument_ids).await {
266 log::error!("Failed to subscribe to quotes: {e}");
267 }
268 Ok(())
269 })
270 }
271
272 #[pyo3(name = "subscribe_trades")]
273 fn py_subscribe_trades<'py>(
274 &self,
275 py: Python<'py>,
276 instrument_ids: Vec<InstrumentId>,
277 ) -> PyResult<Bound<'py, PyAny>> {
278 let client = self.clone();
279
280 pyo3_async_runtimes::tokio::future_into_py(py, async move {
281 if let Err(e) = client.subscribe_trades(instrument_ids).await {
282 log::error!("Failed to subscribe to trades: {e}");
283 }
284 Ok(())
285 })
286 }
287
288 #[pyo3(name = "subscribe_mark_prices")]
289 fn py_subscribe_mark_prices<'py>(
290 &self,
291 py: Python<'py>,
292 instrument_ids: Vec<InstrumentId>,
293 ) -> PyResult<Bound<'py, PyAny>> {
294 let client = self.clone();
295
296 pyo3_async_runtimes::tokio::future_into_py(py, async move {
297 if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
298 log::error!("Failed to subscribe to mark prices: {e}");
299 }
300 Ok(())
301 })
302 }
303
304 #[pyo3(name = "subscribe_index_prices")]
305 fn py_subscribe_index_prices<'py>(
306 &self,
307 py: Python<'py>,
308 instrument_ids: Vec<InstrumentId>,
309 ) -> PyResult<Bound<'py, PyAny>> {
310 let client = self.clone();
311
312 pyo3_async_runtimes::tokio::future_into_py(py, async move {
313 if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
314 log::error!("Failed to subscribe to index prices: {e}");
315 }
316 Ok(())
317 })
318 }
319
320 #[pyo3(name = "subscribe_bars")]
321 fn py_subscribe_bars<'py>(
322 &self,
323 py: Python<'py>,
324 bar_type: BarType,
325 ) -> PyResult<Bound<'py, PyAny>> {
326 let client = self.clone();
327
328 pyo3_async_runtimes::tokio::future_into_py(py, async move {
329 if let Err(e) = client.subscribe_bars(bar_type).await {
330 log::error!("Failed to subscribe to bars: {e}");
331 }
332 Ok(())
333 })
334 }
335
336 #[pyo3(name = "unsubscribe_instruments")]
337 fn py_unsubscribe_instruments<'py>(
338 &self,
339 py: Python<'py>,
340 instrument_ids: Vec<InstrumentId>,
341 ) -> PyResult<Bound<'py, PyAny>> {
342 let client = self.clone();
343
344 pyo3_async_runtimes::tokio::future_into_py(py, async move {
345 if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
346 log::error!("Failed to unsubscribe from order book: {e}");
347 }
348 Ok(())
349 })
350 }
351
352 #[pyo3(name = "unsubscribe_book")]
353 fn py_unsubscribe_book<'py>(
354 &self,
355 py: Python<'py>,
356 instrument_ids: Vec<InstrumentId>,
357 ) -> PyResult<Bound<'py, PyAny>> {
358 let client = self.clone();
359
360 pyo3_async_runtimes::tokio::future_into_py(py, async move {
361 if let Err(e) = client.unsubscribe_book(instrument_ids).await {
362 log::error!("Failed to unsubscribe from order book: {e}");
363 }
364 Ok(())
365 })
366 }
367
368 #[pyo3(name = "unsubscribe_quotes")]
369 fn py_unsubscribe_quotes<'py>(
370 &self,
371 py: Python<'py>,
372 instrument_ids: Vec<InstrumentId>,
373 ) -> PyResult<Bound<'py, PyAny>> {
374 let client = self.clone();
375
376 pyo3_async_runtimes::tokio::future_into_py(py, async move {
377 if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
378 log::error!("Failed to unsubscribe from quotes: {e}");
379 }
380 Ok(())
381 })
382 }
383
384 #[pyo3(name = "unsubscribe_trades")]
385 fn py_unsubscribe_trades<'py>(
386 &self,
387 py: Python<'py>,
388 instrument_ids: Vec<InstrumentId>,
389 ) -> PyResult<Bound<'py, PyAny>> {
390 let client = self.clone();
391
392 pyo3_async_runtimes::tokio::future_into_py(py, async move {
393 if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
394 log::error!("Failed to unsubscribe from trades: {e}");
395 }
396 Ok(())
397 })
398 }
399
400 #[pyo3(name = "unsubscribe_mark_prices")]
401 fn py_unsubscribe_mark_prices<'py>(
402 &self,
403 py: Python<'py>,
404 instrument_ids: Vec<InstrumentId>,
405 ) -> PyResult<Bound<'py, PyAny>> {
406 let client = self.clone();
407
408 pyo3_async_runtimes::tokio::future_into_py(py, async move {
409 if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
410 log::error!("Failed to unsubscribe from mark prices: {e}");
411 }
412 Ok(())
413 })
414 }
415
416 #[pyo3(name = "unsubscribe_index_prices")]
417 fn py_unsubscribe_index_prices<'py>(
418 &self,
419 py: Python<'py>,
420 instrument_ids: Vec<InstrumentId>,
421 ) -> PyResult<Bound<'py, PyAny>> {
422 let client = self.clone();
423
424 pyo3_async_runtimes::tokio::future_into_py(py, async move {
425 if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
426 log::error!("Failed to unsubscribe from index prices: {e}");
427 }
428 Ok(())
429 })
430 }
431
432 #[pyo3(name = "unsubscribe_bars")]
433 fn py_unsubscribe_bars<'py>(
434 &self,
435 py: Python<'py>,
436 bar_type: BarType,
437 ) -> PyResult<Bound<'py, PyAny>> {
438 let client = self.clone();
439
440 pyo3_async_runtimes::tokio::future_into_py(py, async move {
441 if let Err(e) = client.unsubscribe_bars(bar_type).await {
442 log::error!("Failed to unsubscribe from bars: {e}");
443 }
444 Ok(())
445 })
446 }
447}