nautilus_coinbase_intx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Coinbase Intx WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47    data::BarType,
48    identifiers::InstrumentId,
49    python::{
50        data::data_to_pycapsule,
51        events::order::order_event_to_pyobject,
52        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53    },
54};
55use pyo3::{exceptions::PyRuntimeError, prelude::*};
56
57use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl CoinbaseIntxWebSocketClient {
61    #[new]
62    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
63    fn py_new(
64        url: Option<String>,
65        api_key: Option<String>,
66        api_secret: Option<String>,
67        api_passphrase: Option<String>,
68        heartbeat: Option<u64>,
69    ) -> PyResult<Self> {
70        Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
71    }
72
73    #[getter]
74    #[pyo3(name = "url")]
75    #[must_use]
76    pub const fn py_url(&self) -> &str {
77        self.url()
78    }
79
80    #[getter]
81    #[pyo3(name = "api_key")]
82    #[must_use]
83    pub fn py_api_key(&self) -> &str {
84        self.api_key()
85    }
86
87    #[pyo3(name = "is_active")]
88    fn py_is_active(&mut self) -> bool {
89        self.is_active()
90    }
91
92    #[pyo3(name = "is_closed")]
93    fn py_is_closed(&mut self) -> bool {
94        self.is_closed()
95    }
96
97    #[pyo3(name = "get_subscriptions")]
98    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
99        let channels = self.get_subscriptions(instrument_id);
100
101        // Convert to Coinbase channel names
102        channels
103            .iter()
104            .map(|c| {
105                serde_json::to_value(c)
106                    .ok()
107                    .and_then(|v| v.as_str().map(String::from))
108                    .unwrap_or_else(|| c.to_string())
109            })
110            .collect()
111    }
112
113    #[pyo3(name = "connect")]
114    fn py_connect<'py>(
115        &mut self,
116        py: Python<'py>,
117        instruments: Vec<PyObject>,
118        callback: PyObject,
119    ) -> PyResult<Bound<'py, PyAny>> {
120        let mut instruments_any = Vec::new();
121        for inst in instruments {
122            let inst_any = pyobject_to_instrument_any(py, inst)?;
123            instruments_any.push(inst_any);
124        }
125
126        self.initialize_instruments_cache(instruments_any);
127
128        let mut client = self.clone();
129
130        pyo3_async_runtimes::tokio::future_into_py(py, async move {
131            client.connect().await.map_err(to_pyruntime_err)?;
132
133            let stream = client.stream();
134
135            tokio::spawn(async move {
136                tokio::pin!(stream);
137
138                while let Some(msg) = stream.next().await {
139                    match msg {
140                        NautilusWsMessage::Instrument(inst) => Python::with_gil(|py| {
141                            let py_obj = instrument_any_to_pyobject(py, inst)
142                                .expect("Failed to create instrument");
143                            call_python(py, &callback, py_obj);
144                        }),
145                        NautilusWsMessage::Data(data) => Python::with_gil(|py| {
146                            let py_obj = data_to_pycapsule(py, data);
147                            call_python(py, &callback, py_obj);
148                        }),
149                        NautilusWsMessage::DataVec(data_vec) => Python::with_gil(|py| {
150                            for data in data_vec {
151                                let py_obj = data_to_pycapsule(py, data);
152                                call_python(py, &callback, py_obj);
153                            }
154                        }),
155                        NautilusWsMessage::Deltas(deltas) => Python::with_gil(|py| {
156                            call_python(py, &callback, deltas.into_py_any_unwrap(py));
157                        }),
158                        NautilusWsMessage::MarkPrice(mark_price) => Python::with_gil(|py| {
159                            call_python(py, &callback, mark_price.into_py_any_unwrap(py));
160                        }),
161                        NautilusWsMessage::IndexPrice(index_price) => Python::with_gil(|py| {
162                            call_python(py, &callback, index_price.into_py_any_unwrap(py));
163                        }),
164                        NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
165                            Python::with_gil(|py| {
166                                call_python(py, &callback, mark_price.into_py_any_unwrap(py));
167                                call_python(py, &callback, index_price.into_py_any_unwrap(py));
168                            });
169                        }
170                        NautilusWsMessage::OrderEvent(msg) => Python::with_gil(|py| {
171                            let py_obj =
172                                order_event_to_pyobject(py, msg).expect("Failed to create event");
173                            call_python(py, &callback, py_obj);
174                        }),
175                    }
176                }
177            });
178
179            Ok(())
180        })
181    }
182
183    #[pyo3(name = "wait_until_active")]
184    fn py_wait_until_active<'py>(
185        &self,
186        py: Python<'py>,
187        timeout_secs: f64,
188    ) -> PyResult<Bound<'py, PyAny>> {
189        let client = self.clone();
190
191        pyo3_async_runtimes::tokio::future_into_py(py, async move {
192            client
193                .wait_until_active(timeout_secs)
194                .await
195                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
196            Ok(())
197        })
198    }
199
200    #[pyo3(name = "close")]
201    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
202        let mut client = self.clone();
203
204        pyo3_async_runtimes::tokio::future_into_py(py, async move {
205            if let Err(e) = client.close().await {
206                log::error!("Error on close: {e}");
207            }
208            Ok(())
209        })
210    }
211
212    #[pyo3(name = "subscribe_instruments")]
213    #[pyo3(signature = (instrument_ids=None))]
214    fn py_subscribe_instruments<'py>(
215        &self,
216        py: Python<'py>,
217        instrument_ids: Option<Vec<InstrumentId>>,
218    ) -> PyResult<Bound<'py, PyAny>> {
219        let client = self.clone();
220        let instrument_ids = instrument_ids.unwrap_or_default();
221
222        pyo3_async_runtimes::tokio::future_into_py(py, async move {
223            if let Err(e) = client.subscribe_instruments(instrument_ids).await {
224                log::error!("Failed to subscribe to instruments: {e}");
225            }
226            Ok(())
227        })
228    }
229
230    #[pyo3(name = "subscribe_book")]
231    fn py_subscribe_book<'py>(
232        &self,
233        py: Python<'py>,
234        instrument_ids: Vec<InstrumentId>,
235    ) -> PyResult<Bound<'py, PyAny>> {
236        let client = self.clone();
237
238        pyo3_async_runtimes::tokio::future_into_py(py, async move {
239            if let Err(e) = client.subscribe_book(instrument_ids).await {
240                log::error!("Failed to subscribe to order book: {e}");
241            }
242            Ok(())
243        })
244    }
245
246    #[pyo3(name = "subscribe_quotes")]
247    fn py_subscribe_quotes<'py>(
248        &self,
249        py: Python<'py>,
250        instrument_ids: Vec<InstrumentId>,
251    ) -> PyResult<Bound<'py, PyAny>> {
252        let client = self.clone();
253
254        pyo3_async_runtimes::tokio::future_into_py(py, async move {
255            if let Err(e) = client.subscribe_quotes(instrument_ids).await {
256                log::error!("Failed to subscribe to quotes: {e}");
257            }
258            Ok(())
259        })
260    }
261
262    #[pyo3(name = "subscribe_trades")]
263    fn py_subscribe_trades<'py>(
264        &self,
265        py: Python<'py>,
266        instrument_ids: Vec<InstrumentId>,
267    ) -> PyResult<Bound<'py, PyAny>> {
268        let client = self.clone();
269
270        pyo3_async_runtimes::tokio::future_into_py(py, async move {
271            if let Err(e) = client.subscribe_trades(instrument_ids).await {
272                log::error!("Failed to subscribe to trades: {e}");
273            }
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "subscribe_mark_prices")]
279    fn py_subscribe_mark_prices<'py>(
280        &self,
281        py: Python<'py>,
282        instrument_ids: Vec<InstrumentId>,
283    ) -> PyResult<Bound<'py, PyAny>> {
284        let client = self.clone();
285
286        pyo3_async_runtimes::tokio::future_into_py(py, async move {
287            if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
288                log::error!("Failed to subscribe to mark prices: {e}");
289            }
290            Ok(())
291        })
292    }
293
294    #[pyo3(name = "subscribe_index_prices")]
295    fn py_subscribe_index_prices<'py>(
296        &self,
297        py: Python<'py>,
298        instrument_ids: Vec<InstrumentId>,
299    ) -> PyResult<Bound<'py, PyAny>> {
300        let client = self.clone();
301
302        pyo3_async_runtimes::tokio::future_into_py(py, async move {
303            if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
304                log::error!("Failed to subscribe to index prices: {e}");
305            }
306            Ok(())
307        })
308    }
309
310    #[pyo3(name = "subscribe_bars")]
311    fn py_subscribe_bars<'py>(
312        &self,
313        py: Python<'py>,
314        bar_type: BarType,
315    ) -> PyResult<Bound<'py, PyAny>> {
316        let client = self.clone();
317
318        pyo3_async_runtimes::tokio::future_into_py(py, async move {
319            if let Err(e) = client.subscribe_bars(bar_type).await {
320                log::error!("Failed to subscribe to bars: {e}");
321            }
322            Ok(())
323        })
324    }
325
326    #[pyo3(name = "unsubscribe_instruments")]
327    fn py_unsubscribe_instruments<'py>(
328        &self,
329        py: Python<'py>,
330        instrument_ids: Vec<InstrumentId>,
331    ) -> PyResult<Bound<'py, PyAny>> {
332        let client = self.clone();
333
334        pyo3_async_runtimes::tokio::future_into_py(py, async move {
335            if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
336                log::error!("Failed to unsubscribe from order book: {e}");
337            }
338            Ok(())
339        })
340    }
341
342    #[pyo3(name = "unsubscribe_book")]
343    fn py_unsubscribe_book<'py>(
344        &self,
345        py: Python<'py>,
346        instrument_ids: Vec<InstrumentId>,
347    ) -> PyResult<Bound<'py, PyAny>> {
348        let client = self.clone();
349
350        pyo3_async_runtimes::tokio::future_into_py(py, async move {
351            if let Err(e) = client.unsubscribe_book(instrument_ids).await {
352                log::error!("Failed to unsubscribe from order book: {e}");
353            }
354            Ok(())
355        })
356    }
357
358    #[pyo3(name = "unsubscribe_quotes")]
359    fn py_unsubscribe_quotes<'py>(
360        &self,
361        py: Python<'py>,
362        instrument_ids: Vec<InstrumentId>,
363    ) -> PyResult<Bound<'py, PyAny>> {
364        let client = self.clone();
365
366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
367            if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
368                log::error!("Failed to unsubscribe from quotes: {e}");
369            }
370            Ok(())
371        })
372    }
373
374    #[pyo3(name = "unsubscribe_trades")]
375    fn py_unsubscribe_trades<'py>(
376        &self,
377        py: Python<'py>,
378        instrument_ids: Vec<InstrumentId>,
379    ) -> PyResult<Bound<'py, PyAny>> {
380        let client = self.clone();
381
382        pyo3_async_runtimes::tokio::future_into_py(py, async move {
383            if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
384                log::error!("Failed to unsubscribe from trades: {e}");
385            }
386            Ok(())
387        })
388    }
389
390    #[pyo3(name = "unsubscribe_mark_prices")]
391    fn py_unsubscribe_mark_prices<'py>(
392        &self,
393        py: Python<'py>,
394        instrument_ids: Vec<InstrumentId>,
395    ) -> PyResult<Bound<'py, PyAny>> {
396        let client = self.clone();
397
398        pyo3_async_runtimes::tokio::future_into_py(py, async move {
399            if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
400                log::error!("Failed to unsubscribe from mark prices: {e}");
401            }
402            Ok(())
403        })
404    }
405
406    #[pyo3(name = "unsubscribe_index_prices")]
407    fn py_unsubscribe_index_prices<'py>(
408        &self,
409        py: Python<'py>,
410        instrument_ids: Vec<InstrumentId>,
411    ) -> PyResult<Bound<'py, PyAny>> {
412        let client = self.clone();
413
414        pyo3_async_runtimes::tokio::future_into_py(py, async move {
415            if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
416                log::error!("Failed to unsubscribe from index prices: {e}");
417            }
418            Ok(())
419        })
420    }
421
422    #[pyo3(name = "unsubscribe_bars")]
423    fn py_unsubscribe_bars<'py>(
424        &self,
425        py: Python<'py>,
426        bar_type: BarType,
427    ) -> PyResult<Bound<'py, PyAny>> {
428        let client = self.clone();
429
430        pyo3_async_runtimes::tokio::future_into_py(py, async move {
431            if let Err(e) = client.unsubscribe_bars(bar_type).await {
432                log::error!("Failed to unsubscribe from bars: {e}");
433            }
434            Ok(())
435        })
436    }
437}
438
439pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
440    if let Err(e) = callback.call1(py, (py_obj,)) {
441        tracing::error!("Error calling Python: {e}");
442    }
443}