nautilus_coinbase_intx/python/
websocket.rs1use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
47use nautilus_model::{
48 data::BarType,
49 identifiers::InstrumentId,
50 python::{
51 data::data_to_pycapsule,
52 events::order::order_event_to_pyobject,
53 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
54 },
55};
56use pyo3::{exceptions::PyRuntimeError, prelude::*};
57
58use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
59
60#[pymethods]
61impl CoinbaseIntxWebSocketClient {
62 #[new]
63 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
64 fn py_new(
65 url: Option<String>,
66 api_key: Option<String>,
67 api_secret: Option<String>,
68 api_passphrase: Option<String>,
69 heartbeat: Option<u64>,
70 ) -> PyResult<Self> {
71 Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
72 }
73
74 #[getter]
75 #[pyo3(name = "url")]
76 #[must_use]
77 pub const fn py_url(&self) -> &str {
78 self.url()
79 }
80
81 #[getter]
82 #[pyo3(name = "api_key")]
83 #[must_use]
84 pub fn py_api_key(&self) -> &str {
85 self.api_key()
86 }
87
88 #[getter]
89 #[pyo3(name = "api_key_masked")]
90 #[must_use]
91 pub fn py_api_key_masked(&self) -> String {
92 self.api_key_masked()
93 }
94
95 #[pyo3(name = "is_active")]
96 fn py_is_active(&mut self) -> bool {
97 self.is_active()
98 }
99
100 #[pyo3(name = "is_closed")]
101 fn py_is_closed(&mut self) -> bool {
102 self.is_closed()
103 }
104
105 #[pyo3(name = "get_subscriptions")]
106 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
107 let channels = self.get_subscriptions(instrument_id);
108
109 channels
111 .iter()
112 .map(|c| {
113 serde_json::to_value(c)
114 .ok()
115 .and_then(|v| v.as_str().map(String::from))
116 .unwrap_or_else(|| c.to_string())
117 })
118 .collect()
119 }
120
121 #[pyo3(name = "connect")]
122 fn py_connect<'py>(
123 &mut self,
124 py: Python<'py>,
125 instruments: Vec<Py<PyAny>>,
126 callback: Py<PyAny>,
127 ) -> PyResult<Bound<'py, PyAny>> {
128 let mut instruments_any = Vec::new();
129 for inst in instruments {
130 let inst_any = pyobject_to_instrument_any(py, inst)?;
131 instruments_any.push(inst_any);
132 }
133
134 self.cache_instruments(instruments_any);
135
136 let mut client = self.clone();
137
138 pyo3_async_runtimes::tokio::future_into_py(py, async move {
139 client.connect().await.map_err(to_pyruntime_err)?;
140
141 let stream = client.stream();
142
143 get_runtime().spawn(async move {
144 tokio::pin!(stream);
145
146 while let Some(msg) = stream.next().await {
147 match msg {
148 NautilusWsMessage::Instrument(inst) => Python::attach(|py| {
149 let py_obj = instrument_any_to_pyobject(py, inst)
150 .expect("Failed to create instrument");
151 call_python(py, &callback, py_obj);
152 }),
153 NautilusWsMessage::Data(data) => Python::attach(|py| {
154 let py_obj = data_to_pycapsule(py, data);
155 call_python(py, &callback, py_obj);
156 }),
157 NautilusWsMessage::DataVec(data_vec) => Python::attach(|py| {
158 for data in data_vec {
159 let py_obj = data_to_pycapsule(py, data);
160 call_python(py, &callback, py_obj);
161 }
162 }),
163 NautilusWsMessage::Deltas(deltas) => Python::attach(|py| {
164 call_python(py, &callback, deltas.into_py_any_unwrap(py));
165 }),
166 NautilusWsMessage::MarkPrice(mark_price) => Python::attach(|py| {
167 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
168 }),
169 NautilusWsMessage::IndexPrice(index_price) => Python::attach(|py| {
170 call_python(py, &callback, index_price.into_py_any_unwrap(py));
171 }),
172 NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
173 Python::attach(|py| {
174 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
175 call_python(py, &callback, index_price.into_py_any_unwrap(py));
176 });
177 }
178 NautilusWsMessage::OrderEvent(msg) => Python::attach(|py| {
179 let py_obj =
180 order_event_to_pyobject(py, msg).expect("Failed to create event");
181 call_python(py, &callback, py_obj);
182 }),
183 }
184 }
185 });
186
187 Ok(())
188 })
189 }
190
191 #[pyo3(name = "wait_until_active")]
192 fn py_wait_until_active<'py>(
193 &self,
194 py: Python<'py>,
195 timeout_secs: f64,
196 ) -> PyResult<Bound<'py, PyAny>> {
197 let client = self.clone();
198
199 pyo3_async_runtimes::tokio::future_into_py(py, async move {
200 client
201 .wait_until_active(timeout_secs)
202 .await
203 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
204 Ok(())
205 })
206 }
207
208 #[pyo3(name = "close")]
209 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
210 let mut client = self.clone();
211
212 pyo3_async_runtimes::tokio::future_into_py(py, async move {
213 if let Err(e) = client.close().await {
214 log::error!("Error on close: {e}");
215 }
216 Ok(())
217 })
218 }
219
220 #[pyo3(name = "subscribe_instruments")]
221 #[pyo3(signature = (instrument_ids=None))]
222 fn py_subscribe_instruments<'py>(
223 &self,
224 py: Python<'py>,
225 instrument_ids: Option<Vec<InstrumentId>>,
226 ) -> PyResult<Bound<'py, PyAny>> {
227 let client = self.clone();
228 let instrument_ids = instrument_ids.unwrap_or_default();
229
230 pyo3_async_runtimes::tokio::future_into_py(py, async move {
231 if let Err(e) = client.subscribe_instruments(instrument_ids).await {
232 log::error!("Failed to subscribe to instruments: {e}");
233 }
234 Ok(())
235 })
236 }
237
238 #[pyo3(name = "subscribe_book")]
239 fn py_subscribe_book<'py>(
240 &self,
241 py: Python<'py>,
242 instrument_ids: Vec<InstrumentId>,
243 ) -> PyResult<Bound<'py, PyAny>> {
244 let client = self.clone();
245
246 pyo3_async_runtimes::tokio::future_into_py(py, async move {
247 if let Err(e) = client.subscribe_book(instrument_ids).await {
248 log::error!("Failed to subscribe to order book: {e}");
249 }
250 Ok(())
251 })
252 }
253
254 #[pyo3(name = "subscribe_quotes")]
255 fn py_subscribe_quotes<'py>(
256 &self,
257 py: Python<'py>,
258 instrument_ids: Vec<InstrumentId>,
259 ) -> PyResult<Bound<'py, PyAny>> {
260 let client = self.clone();
261
262 pyo3_async_runtimes::tokio::future_into_py(py, async move {
263 if let Err(e) = client.subscribe_quotes(instrument_ids).await {
264 log::error!("Failed to subscribe to quotes: {e}");
265 }
266 Ok(())
267 })
268 }
269
270 #[pyo3(name = "subscribe_trades")]
271 fn py_subscribe_trades<'py>(
272 &self,
273 py: Python<'py>,
274 instrument_ids: Vec<InstrumentId>,
275 ) -> PyResult<Bound<'py, PyAny>> {
276 let client = self.clone();
277
278 pyo3_async_runtimes::tokio::future_into_py(py, async move {
279 if let Err(e) = client.subscribe_trades(instrument_ids).await {
280 log::error!("Failed to subscribe to trades: {e}");
281 }
282 Ok(())
283 })
284 }
285
286 #[pyo3(name = "subscribe_mark_prices")]
287 fn py_subscribe_mark_prices<'py>(
288 &self,
289 py: Python<'py>,
290 instrument_ids: Vec<InstrumentId>,
291 ) -> PyResult<Bound<'py, PyAny>> {
292 let client = self.clone();
293
294 pyo3_async_runtimes::tokio::future_into_py(py, async move {
295 if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
296 log::error!("Failed to subscribe to mark prices: {e}");
297 }
298 Ok(())
299 })
300 }
301
302 #[pyo3(name = "subscribe_index_prices")]
303 fn py_subscribe_index_prices<'py>(
304 &self,
305 py: Python<'py>,
306 instrument_ids: Vec<InstrumentId>,
307 ) -> PyResult<Bound<'py, PyAny>> {
308 let client = self.clone();
309
310 pyo3_async_runtimes::tokio::future_into_py(py, async move {
311 if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
312 log::error!("Failed to subscribe to index prices: {e}");
313 }
314 Ok(())
315 })
316 }
317
318 #[pyo3(name = "subscribe_bars")]
319 fn py_subscribe_bars<'py>(
320 &self,
321 py: Python<'py>,
322 bar_type: BarType,
323 ) -> PyResult<Bound<'py, PyAny>> {
324 let client = self.clone();
325
326 pyo3_async_runtimes::tokio::future_into_py(py, async move {
327 if let Err(e) = client.subscribe_bars(bar_type).await {
328 log::error!("Failed to subscribe to bars: {e}");
329 }
330 Ok(())
331 })
332 }
333
334 #[pyo3(name = "unsubscribe_instruments")]
335 fn py_unsubscribe_instruments<'py>(
336 &self,
337 py: Python<'py>,
338 instrument_ids: Vec<InstrumentId>,
339 ) -> PyResult<Bound<'py, PyAny>> {
340 let client = self.clone();
341
342 pyo3_async_runtimes::tokio::future_into_py(py, async move {
343 if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
344 log::error!("Failed to unsubscribe from order book: {e}");
345 }
346 Ok(())
347 })
348 }
349
350 #[pyo3(name = "unsubscribe_book")]
351 fn py_unsubscribe_book<'py>(
352 &self,
353 py: Python<'py>,
354 instrument_ids: Vec<InstrumentId>,
355 ) -> PyResult<Bound<'py, PyAny>> {
356 let client = self.clone();
357
358 pyo3_async_runtimes::tokio::future_into_py(py, async move {
359 if let Err(e) = client.unsubscribe_book(instrument_ids).await {
360 log::error!("Failed to unsubscribe from order book: {e}");
361 }
362 Ok(())
363 })
364 }
365
366 #[pyo3(name = "unsubscribe_quotes")]
367 fn py_unsubscribe_quotes<'py>(
368 &self,
369 py: Python<'py>,
370 instrument_ids: Vec<InstrumentId>,
371 ) -> PyResult<Bound<'py, PyAny>> {
372 let client = self.clone();
373
374 pyo3_async_runtimes::tokio::future_into_py(py, async move {
375 if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
376 log::error!("Failed to unsubscribe from quotes: {e}");
377 }
378 Ok(())
379 })
380 }
381
382 #[pyo3(name = "unsubscribe_trades")]
383 fn py_unsubscribe_trades<'py>(
384 &self,
385 py: Python<'py>,
386 instrument_ids: Vec<InstrumentId>,
387 ) -> PyResult<Bound<'py, PyAny>> {
388 let client = self.clone();
389
390 pyo3_async_runtimes::tokio::future_into_py(py, async move {
391 if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
392 log::error!("Failed to unsubscribe from trades: {e}");
393 }
394 Ok(())
395 })
396 }
397
398 #[pyo3(name = "unsubscribe_mark_prices")]
399 fn py_unsubscribe_mark_prices<'py>(
400 &self,
401 py: Python<'py>,
402 instrument_ids: Vec<InstrumentId>,
403 ) -> PyResult<Bound<'py, PyAny>> {
404 let client = self.clone();
405
406 pyo3_async_runtimes::tokio::future_into_py(py, async move {
407 if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
408 log::error!("Failed to unsubscribe from mark prices: {e}");
409 }
410 Ok(())
411 })
412 }
413
414 #[pyo3(name = "unsubscribe_index_prices")]
415 fn py_unsubscribe_index_prices<'py>(
416 &self,
417 py: Python<'py>,
418 instrument_ids: Vec<InstrumentId>,
419 ) -> PyResult<Bound<'py, PyAny>> {
420 let client = self.clone();
421
422 pyo3_async_runtimes::tokio::future_into_py(py, async move {
423 if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
424 log::error!("Failed to unsubscribe from index prices: {e}");
425 }
426 Ok(())
427 })
428 }
429
430 #[pyo3(name = "unsubscribe_bars")]
431 fn py_unsubscribe_bars<'py>(
432 &self,
433 py: Python<'py>,
434 bar_type: BarType,
435 ) -> PyResult<Bound<'py, PyAny>> {
436 let client = self.clone();
437
438 pyo3_async_runtimes::tokio::future_into_py(py, async move {
439 if let Err(e) = client.unsubscribe_bars(bar_type).await {
440 log::error!("Failed to unsubscribe from bars: {e}");
441 }
442 Ok(())
443 })
444 }
445}
446
447pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
448 if let Err(e) = callback.call1(py, (py_obj,)) {
449 tracing::error!("Error calling Python: {e}");
450 }
451}