Skip to main content

nautilus_coinbase_intx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Coinbase Intx WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
47use nautilus_model::{
48    data::BarType,
49    identifiers::InstrumentId,
50    python::{
51        data::data_to_pycapsule,
52        events::order::order_event_to_pyobject,
53        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
54    },
55};
56use pyo3::{exceptions::PyRuntimeError, prelude::*};
57
58use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
59
60#[pymethods]
61impl CoinbaseIntxWebSocketClient {
62    #[new]
63    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
64    fn py_new(
65        url: Option<String>,
66        api_key: Option<String>,
67        api_secret: Option<String>,
68        api_passphrase: Option<String>,
69        heartbeat: Option<u64>,
70    ) -> PyResult<Self> {
71        Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
72    }
73
74    #[getter]
75    #[pyo3(name = "url")]
76    #[must_use]
77    pub const fn py_url(&self) -> &str {
78        self.url()
79    }
80
81    #[getter]
82    #[pyo3(name = "api_key")]
83    #[must_use]
84    pub fn py_api_key(&self) -> &str {
85        self.api_key()
86    }
87
88    #[getter]
89    #[pyo3(name = "api_key_masked")]
90    #[must_use]
91    pub fn py_api_key_masked(&self) -> String {
92        self.api_key_masked()
93    }
94
95    #[pyo3(name = "is_active")]
96    fn py_is_active(&mut self) -> bool {
97        self.is_active()
98    }
99
100    #[pyo3(name = "is_closed")]
101    fn py_is_closed(&mut self) -> bool {
102        self.is_closed()
103    }
104
105    #[pyo3(name = "get_subscriptions")]
106    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
107        let channels = self.get_subscriptions(instrument_id);
108
109        // Convert to Coinbase channel names
110        channels
111            .iter()
112            .map(|c| {
113                serde_json::to_value(c)
114                    .ok()
115                    .and_then(|v| v.as_str().map(String::from))
116                    .unwrap_or_else(|| c.to_string())
117            })
118            .collect()
119    }
120
121    #[pyo3(name = "connect")]
122    fn py_connect<'py>(
123        &mut self,
124        py: Python<'py>,
125        instruments: Vec<Py<PyAny>>,
126        callback: Py<PyAny>,
127    ) -> PyResult<Bound<'py, PyAny>> {
128        let mut instruments_any = Vec::new();
129        for inst in instruments {
130            let inst_any = pyobject_to_instrument_any(py, inst)?;
131            instruments_any.push(inst_any);
132        }
133
134        self.cache_instruments(instruments_any);
135
136        let mut client = self.clone();
137
138        pyo3_async_runtimes::tokio::future_into_py(py, async move {
139            client.connect().await.map_err(to_pyruntime_err)?;
140
141            let stream = client.stream();
142
143            get_runtime().spawn(async move {
144                tokio::pin!(stream);
145
146                while let Some(msg) = stream.next().await {
147                    match msg {
148                        NautilusWsMessage::Instrument(inst) => Python::attach(|py| {
149                            let py_obj = instrument_any_to_pyobject(py, inst)
150                                .expect("Failed to create instrument");
151                            call_python(py, &callback, py_obj);
152                        }),
153                        NautilusWsMessage::Data(data) => Python::attach(|py| {
154                            let py_obj = data_to_pycapsule(py, data);
155                            call_python(py, &callback, py_obj);
156                        }),
157                        NautilusWsMessage::DataVec(data_vec) => Python::attach(|py| {
158                            for data in data_vec {
159                                let py_obj = data_to_pycapsule(py, data);
160                                call_python(py, &callback, py_obj);
161                            }
162                        }),
163                        NautilusWsMessage::Deltas(deltas) => Python::attach(|py| {
164                            call_python(py, &callback, deltas.into_py_any_unwrap(py));
165                        }),
166                        NautilusWsMessage::MarkPrice(mark_price) => Python::attach(|py| {
167                            call_python(py, &callback, mark_price.into_py_any_unwrap(py));
168                        }),
169                        NautilusWsMessage::IndexPrice(index_price) => Python::attach(|py| {
170                            call_python(py, &callback, index_price.into_py_any_unwrap(py));
171                        }),
172                        NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
173                            Python::attach(|py| {
174                                call_python(py, &callback, mark_price.into_py_any_unwrap(py));
175                                call_python(py, &callback, index_price.into_py_any_unwrap(py));
176                            });
177                        }
178                        NautilusWsMessage::OrderEvent(msg) => Python::attach(|py| {
179                            let py_obj =
180                                order_event_to_pyobject(py, msg).expect("Failed to create event");
181                            call_python(py, &callback, py_obj);
182                        }),
183                    }
184                }
185            });
186
187            Ok(())
188        })
189    }
190
191    #[pyo3(name = "wait_until_active")]
192    fn py_wait_until_active<'py>(
193        &self,
194        py: Python<'py>,
195        timeout_secs: f64,
196    ) -> PyResult<Bound<'py, PyAny>> {
197        let client = self.clone();
198
199        pyo3_async_runtimes::tokio::future_into_py(py, async move {
200            client
201                .wait_until_active(timeout_secs)
202                .await
203                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
204            Ok(())
205        })
206    }
207
208    #[pyo3(name = "close")]
209    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
210        let mut client = self.clone();
211
212        pyo3_async_runtimes::tokio::future_into_py(py, async move {
213            if let Err(e) = client.close().await {
214                log::error!("Error on close: {e}");
215            }
216            Ok(())
217        })
218    }
219
220    #[pyo3(name = "subscribe_instruments")]
221    #[pyo3(signature = (instrument_ids=None))]
222    fn py_subscribe_instruments<'py>(
223        &self,
224        py: Python<'py>,
225        instrument_ids: Option<Vec<InstrumentId>>,
226    ) -> PyResult<Bound<'py, PyAny>> {
227        let client = self.clone();
228        let instrument_ids = instrument_ids.unwrap_or_default();
229
230        pyo3_async_runtimes::tokio::future_into_py(py, async move {
231            if let Err(e) = client.subscribe_instruments(instrument_ids).await {
232                log::error!("Failed to subscribe to instruments: {e}");
233            }
234            Ok(())
235        })
236    }
237
238    #[pyo3(name = "subscribe_book")]
239    fn py_subscribe_book<'py>(
240        &self,
241        py: Python<'py>,
242        instrument_ids: Vec<InstrumentId>,
243    ) -> PyResult<Bound<'py, PyAny>> {
244        let client = self.clone();
245
246        pyo3_async_runtimes::tokio::future_into_py(py, async move {
247            if let Err(e) = client.subscribe_book(instrument_ids).await {
248                log::error!("Failed to subscribe to order book: {e}");
249            }
250            Ok(())
251        })
252    }
253
254    #[pyo3(name = "subscribe_quotes")]
255    fn py_subscribe_quotes<'py>(
256        &self,
257        py: Python<'py>,
258        instrument_ids: Vec<InstrumentId>,
259    ) -> PyResult<Bound<'py, PyAny>> {
260        let client = self.clone();
261
262        pyo3_async_runtimes::tokio::future_into_py(py, async move {
263            if let Err(e) = client.subscribe_quotes(instrument_ids).await {
264                log::error!("Failed to subscribe to quotes: {e}");
265            }
266            Ok(())
267        })
268    }
269
270    #[pyo3(name = "subscribe_trades")]
271    fn py_subscribe_trades<'py>(
272        &self,
273        py: Python<'py>,
274        instrument_ids: Vec<InstrumentId>,
275    ) -> PyResult<Bound<'py, PyAny>> {
276        let client = self.clone();
277
278        pyo3_async_runtimes::tokio::future_into_py(py, async move {
279            if let Err(e) = client.subscribe_trades(instrument_ids).await {
280                log::error!("Failed to subscribe to trades: {e}");
281            }
282            Ok(())
283        })
284    }
285
286    #[pyo3(name = "subscribe_mark_prices")]
287    fn py_subscribe_mark_prices<'py>(
288        &self,
289        py: Python<'py>,
290        instrument_ids: Vec<InstrumentId>,
291    ) -> PyResult<Bound<'py, PyAny>> {
292        let client = self.clone();
293
294        pyo3_async_runtimes::tokio::future_into_py(py, async move {
295            if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
296                log::error!("Failed to subscribe to mark prices: {e}");
297            }
298            Ok(())
299        })
300    }
301
302    #[pyo3(name = "subscribe_index_prices")]
303    fn py_subscribe_index_prices<'py>(
304        &self,
305        py: Python<'py>,
306        instrument_ids: Vec<InstrumentId>,
307    ) -> PyResult<Bound<'py, PyAny>> {
308        let client = self.clone();
309
310        pyo3_async_runtimes::tokio::future_into_py(py, async move {
311            if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
312                log::error!("Failed to subscribe to index prices: {e}");
313            }
314            Ok(())
315        })
316    }
317
318    #[pyo3(name = "subscribe_bars")]
319    fn py_subscribe_bars<'py>(
320        &self,
321        py: Python<'py>,
322        bar_type: BarType,
323    ) -> PyResult<Bound<'py, PyAny>> {
324        let client = self.clone();
325
326        pyo3_async_runtimes::tokio::future_into_py(py, async move {
327            if let Err(e) = client.subscribe_bars(bar_type).await {
328                log::error!("Failed to subscribe to bars: {e}");
329            }
330            Ok(())
331        })
332    }
333
334    #[pyo3(name = "unsubscribe_instruments")]
335    fn py_unsubscribe_instruments<'py>(
336        &self,
337        py: Python<'py>,
338        instrument_ids: Vec<InstrumentId>,
339    ) -> PyResult<Bound<'py, PyAny>> {
340        let client = self.clone();
341
342        pyo3_async_runtimes::tokio::future_into_py(py, async move {
343            if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
344                log::error!("Failed to unsubscribe from order book: {e}");
345            }
346            Ok(())
347        })
348    }
349
350    #[pyo3(name = "unsubscribe_book")]
351    fn py_unsubscribe_book<'py>(
352        &self,
353        py: Python<'py>,
354        instrument_ids: Vec<InstrumentId>,
355    ) -> PyResult<Bound<'py, PyAny>> {
356        let client = self.clone();
357
358        pyo3_async_runtimes::tokio::future_into_py(py, async move {
359            if let Err(e) = client.unsubscribe_book(instrument_ids).await {
360                log::error!("Failed to unsubscribe from order book: {e}");
361            }
362            Ok(())
363        })
364    }
365
366    #[pyo3(name = "unsubscribe_quotes")]
367    fn py_unsubscribe_quotes<'py>(
368        &self,
369        py: Python<'py>,
370        instrument_ids: Vec<InstrumentId>,
371    ) -> PyResult<Bound<'py, PyAny>> {
372        let client = self.clone();
373
374        pyo3_async_runtimes::tokio::future_into_py(py, async move {
375            if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
376                log::error!("Failed to unsubscribe from quotes: {e}");
377            }
378            Ok(())
379        })
380    }
381
382    #[pyo3(name = "unsubscribe_trades")]
383    fn py_unsubscribe_trades<'py>(
384        &self,
385        py: Python<'py>,
386        instrument_ids: Vec<InstrumentId>,
387    ) -> PyResult<Bound<'py, PyAny>> {
388        let client = self.clone();
389
390        pyo3_async_runtimes::tokio::future_into_py(py, async move {
391            if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
392                log::error!("Failed to unsubscribe from trades: {e}");
393            }
394            Ok(())
395        })
396    }
397
398    #[pyo3(name = "unsubscribe_mark_prices")]
399    fn py_unsubscribe_mark_prices<'py>(
400        &self,
401        py: Python<'py>,
402        instrument_ids: Vec<InstrumentId>,
403    ) -> PyResult<Bound<'py, PyAny>> {
404        let client = self.clone();
405
406        pyo3_async_runtimes::tokio::future_into_py(py, async move {
407            if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
408                log::error!("Failed to unsubscribe from mark prices: {e}");
409            }
410            Ok(())
411        })
412    }
413
414    #[pyo3(name = "unsubscribe_index_prices")]
415    fn py_unsubscribe_index_prices<'py>(
416        &self,
417        py: Python<'py>,
418        instrument_ids: Vec<InstrumentId>,
419    ) -> PyResult<Bound<'py, PyAny>> {
420        let client = self.clone();
421
422        pyo3_async_runtimes::tokio::future_into_py(py, async move {
423            if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
424                log::error!("Failed to unsubscribe from index prices: {e}");
425            }
426            Ok(())
427        })
428    }
429
430    #[pyo3(name = "unsubscribe_bars")]
431    fn py_unsubscribe_bars<'py>(
432        &self,
433        py: Python<'py>,
434        bar_type: BarType,
435    ) -> PyResult<Bound<'py, PyAny>> {
436        let client = self.clone();
437
438        pyo3_async_runtimes::tokio::future_into_py(py, async move {
439            if let Err(e) = client.unsubscribe_bars(bar_type).await {
440                log::error!("Failed to unsubscribe from bars: {e}");
441            }
442            Ok(())
443        })
444    }
445}
446
447pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
448    if let Err(e) = callback.call1(py, (py_obj,)) {
449        tracing::error!("Error calling Python: {e}");
450    }
451}