nautilus_coinbase_intx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Coinbase Intx WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{
47    IntoPyObjectNautilusExt, call_python, to_pyruntime_err, to_pyvalue_err,
48};
49use nautilus_model::{
50    data::BarType,
51    identifiers::InstrumentId,
52    python::{
53        data::data_to_pycapsule,
54        events::order::order_event_to_pyobject,
55        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
56    },
57};
58use pyo3::{exceptions::PyRuntimeError, prelude::*};
59
60use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
61
62#[pymethods]
63impl CoinbaseIntxWebSocketClient {
64    #[new]
65    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
66    fn py_new(
67        url: Option<String>,
68        api_key: Option<String>,
69        api_secret: Option<String>,
70        api_passphrase: Option<String>,
71        heartbeat: Option<u64>,
72    ) -> PyResult<Self> {
73        Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
74    }
75
76    #[getter]
77    #[pyo3(name = "url")]
78    #[must_use]
79    pub const fn py_url(&self) -> &str {
80        self.url()
81    }
82
83    #[getter]
84    #[pyo3(name = "api_key")]
85    #[must_use]
86    pub fn py_api_key(&self) -> &str {
87        self.api_key()
88    }
89
90    #[getter]
91    #[pyo3(name = "api_key_masked")]
92    #[must_use]
93    pub fn py_api_key_masked(&self) -> String {
94        self.api_key_masked()
95    }
96
97    #[pyo3(name = "is_active")]
98    fn py_is_active(&mut self) -> bool {
99        self.is_active()
100    }
101
102    #[pyo3(name = "is_closed")]
103    fn py_is_closed(&mut self) -> bool {
104        self.is_closed()
105    }
106
107    #[pyo3(name = "get_subscriptions")]
108    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
109        let channels = self.get_subscriptions(instrument_id);
110
111        // Convert to Coinbase channel names
112        channels
113            .iter()
114            .map(|c| {
115                serde_json::to_value(c)
116                    .ok()
117                    .and_then(|v| v.as_str().map(String::from))
118                    .unwrap_or_else(|| c.to_string())
119            })
120            .collect()
121    }
122
123    #[pyo3(name = "connect")]
124    fn py_connect<'py>(
125        &mut self,
126        py: Python<'py>,
127        instruments: Vec<Py<PyAny>>,
128        callback: Py<PyAny>,
129    ) -> PyResult<Bound<'py, PyAny>> {
130        let mut instruments_any = Vec::new();
131        for inst in instruments {
132            let inst_any = pyobject_to_instrument_any(py, inst)?;
133            instruments_any.push(inst_any);
134        }
135
136        self.cache_instruments(instruments_any);
137
138        let mut client = self.clone();
139
140        pyo3_async_runtimes::tokio::future_into_py(py, async move {
141            client.connect().await.map_err(to_pyruntime_err)?;
142
143            let stream = client.stream();
144
145            get_runtime().spawn(async move {
146                tokio::pin!(stream);
147
148                while let Some(msg) = stream.next().await {
149                    match msg {
150                        NautilusWsMessage::Instrument(inst) => Python::attach(|py| {
151                            let py_obj = instrument_any_to_pyobject(py, inst)
152                                .expect("Failed to create instrument");
153                            call_python(py, &callback, py_obj);
154                        }),
155                        NautilusWsMessage::Data(data) => Python::attach(|py| {
156                            let py_obj = data_to_pycapsule(py, data);
157                            call_python(py, &callback, py_obj);
158                        }),
159                        NautilusWsMessage::DataVec(data_vec) => Python::attach(|py| {
160                            for data in data_vec {
161                                let py_obj = data_to_pycapsule(py, data);
162                                call_python(py, &callback, py_obj);
163                            }
164                        }),
165                        NautilusWsMessage::Deltas(deltas) => Python::attach(|py| {
166                            call_python(py, &callback, deltas.into_py_any_unwrap(py));
167                        }),
168                        NautilusWsMessage::MarkPrice(mark_price) => Python::attach(|py| {
169                            call_python(py, &callback, mark_price.into_py_any_unwrap(py));
170                        }),
171                        NautilusWsMessage::IndexPrice(index_price) => Python::attach(|py| {
172                            call_python(py, &callback, index_price.into_py_any_unwrap(py));
173                        }),
174                        NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
175                            Python::attach(|py| {
176                                call_python(py, &callback, mark_price.into_py_any_unwrap(py));
177                                call_python(py, &callback, index_price.into_py_any_unwrap(py));
178                            });
179                        }
180                        NautilusWsMessage::OrderEvent(msg) => Python::attach(|py| {
181                            let py_obj =
182                                order_event_to_pyobject(py, msg).expect("Failed to create event");
183                            call_python(py, &callback, py_obj);
184                        }),
185                    }
186                }
187            });
188
189            Ok(())
190        })
191    }
192
193    #[pyo3(name = "wait_until_active")]
194    fn py_wait_until_active<'py>(
195        &self,
196        py: Python<'py>,
197        timeout_secs: f64,
198    ) -> PyResult<Bound<'py, PyAny>> {
199        let client = self.clone();
200
201        pyo3_async_runtimes::tokio::future_into_py(py, async move {
202            client
203                .wait_until_active(timeout_secs)
204                .await
205                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
206            Ok(())
207        })
208    }
209
210    #[pyo3(name = "close")]
211    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
212        let mut client = self.clone();
213
214        pyo3_async_runtimes::tokio::future_into_py(py, async move {
215            if let Err(e) = client.close().await {
216                log::error!("Error on close: {e}");
217            }
218            Ok(())
219        })
220    }
221
222    #[pyo3(name = "subscribe_instruments")]
223    #[pyo3(signature = (instrument_ids=None))]
224    fn py_subscribe_instruments<'py>(
225        &self,
226        py: Python<'py>,
227        instrument_ids: Option<Vec<InstrumentId>>,
228    ) -> PyResult<Bound<'py, PyAny>> {
229        let client = self.clone();
230        let instrument_ids = instrument_ids.unwrap_or_default();
231
232        pyo3_async_runtimes::tokio::future_into_py(py, async move {
233            if let Err(e) = client.subscribe_instruments(instrument_ids).await {
234                log::error!("Failed to subscribe to instruments: {e}");
235            }
236            Ok(())
237        })
238    }
239
240    #[pyo3(name = "subscribe_book")]
241    fn py_subscribe_book<'py>(
242        &self,
243        py: Python<'py>,
244        instrument_ids: Vec<InstrumentId>,
245    ) -> PyResult<Bound<'py, PyAny>> {
246        let client = self.clone();
247
248        pyo3_async_runtimes::tokio::future_into_py(py, async move {
249            if let Err(e) = client.subscribe_book(instrument_ids).await {
250                log::error!("Failed to subscribe to order book: {e}");
251            }
252            Ok(())
253        })
254    }
255
256    #[pyo3(name = "subscribe_quotes")]
257    fn py_subscribe_quotes<'py>(
258        &self,
259        py: Python<'py>,
260        instrument_ids: Vec<InstrumentId>,
261    ) -> PyResult<Bound<'py, PyAny>> {
262        let client = self.clone();
263
264        pyo3_async_runtimes::tokio::future_into_py(py, async move {
265            if let Err(e) = client.subscribe_quotes(instrument_ids).await {
266                log::error!("Failed to subscribe to quotes: {e}");
267            }
268            Ok(())
269        })
270    }
271
272    #[pyo3(name = "subscribe_trades")]
273    fn py_subscribe_trades<'py>(
274        &self,
275        py: Python<'py>,
276        instrument_ids: Vec<InstrumentId>,
277    ) -> PyResult<Bound<'py, PyAny>> {
278        let client = self.clone();
279
280        pyo3_async_runtimes::tokio::future_into_py(py, async move {
281            if let Err(e) = client.subscribe_trades(instrument_ids).await {
282                log::error!("Failed to subscribe to trades: {e}");
283            }
284            Ok(())
285        })
286    }
287
288    #[pyo3(name = "subscribe_mark_prices")]
289    fn py_subscribe_mark_prices<'py>(
290        &self,
291        py: Python<'py>,
292        instrument_ids: Vec<InstrumentId>,
293    ) -> PyResult<Bound<'py, PyAny>> {
294        let client = self.clone();
295
296        pyo3_async_runtimes::tokio::future_into_py(py, async move {
297            if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
298                log::error!("Failed to subscribe to mark prices: {e}");
299            }
300            Ok(())
301        })
302    }
303
304    #[pyo3(name = "subscribe_index_prices")]
305    fn py_subscribe_index_prices<'py>(
306        &self,
307        py: Python<'py>,
308        instrument_ids: Vec<InstrumentId>,
309    ) -> PyResult<Bound<'py, PyAny>> {
310        let client = self.clone();
311
312        pyo3_async_runtimes::tokio::future_into_py(py, async move {
313            if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
314                log::error!("Failed to subscribe to index prices: {e}");
315            }
316            Ok(())
317        })
318    }
319
320    #[pyo3(name = "subscribe_bars")]
321    fn py_subscribe_bars<'py>(
322        &self,
323        py: Python<'py>,
324        bar_type: BarType,
325    ) -> PyResult<Bound<'py, PyAny>> {
326        let client = self.clone();
327
328        pyo3_async_runtimes::tokio::future_into_py(py, async move {
329            if let Err(e) = client.subscribe_bars(bar_type).await {
330                log::error!("Failed to subscribe to bars: {e}");
331            }
332            Ok(())
333        })
334    }
335
336    #[pyo3(name = "unsubscribe_instruments")]
337    fn py_unsubscribe_instruments<'py>(
338        &self,
339        py: Python<'py>,
340        instrument_ids: Vec<InstrumentId>,
341    ) -> PyResult<Bound<'py, PyAny>> {
342        let client = self.clone();
343
344        pyo3_async_runtimes::tokio::future_into_py(py, async move {
345            if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
346                log::error!("Failed to unsubscribe from order book: {e}");
347            }
348            Ok(())
349        })
350    }
351
352    #[pyo3(name = "unsubscribe_book")]
353    fn py_unsubscribe_book<'py>(
354        &self,
355        py: Python<'py>,
356        instrument_ids: Vec<InstrumentId>,
357    ) -> PyResult<Bound<'py, PyAny>> {
358        let client = self.clone();
359
360        pyo3_async_runtimes::tokio::future_into_py(py, async move {
361            if let Err(e) = client.unsubscribe_book(instrument_ids).await {
362                log::error!("Failed to unsubscribe from order book: {e}");
363            }
364            Ok(())
365        })
366    }
367
368    #[pyo3(name = "unsubscribe_quotes")]
369    fn py_unsubscribe_quotes<'py>(
370        &self,
371        py: Python<'py>,
372        instrument_ids: Vec<InstrumentId>,
373    ) -> PyResult<Bound<'py, PyAny>> {
374        let client = self.clone();
375
376        pyo3_async_runtimes::tokio::future_into_py(py, async move {
377            if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
378                log::error!("Failed to unsubscribe from quotes: {e}");
379            }
380            Ok(())
381        })
382    }
383
384    #[pyo3(name = "unsubscribe_trades")]
385    fn py_unsubscribe_trades<'py>(
386        &self,
387        py: Python<'py>,
388        instrument_ids: Vec<InstrumentId>,
389    ) -> PyResult<Bound<'py, PyAny>> {
390        let client = self.clone();
391
392        pyo3_async_runtimes::tokio::future_into_py(py, async move {
393            if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
394                log::error!("Failed to unsubscribe from trades: {e}");
395            }
396            Ok(())
397        })
398    }
399
400    #[pyo3(name = "unsubscribe_mark_prices")]
401    fn py_unsubscribe_mark_prices<'py>(
402        &self,
403        py: Python<'py>,
404        instrument_ids: Vec<InstrumentId>,
405    ) -> PyResult<Bound<'py, PyAny>> {
406        let client = self.clone();
407
408        pyo3_async_runtimes::tokio::future_into_py(py, async move {
409            if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
410                log::error!("Failed to unsubscribe from mark prices: {e}");
411            }
412            Ok(())
413        })
414    }
415
416    #[pyo3(name = "unsubscribe_index_prices")]
417    fn py_unsubscribe_index_prices<'py>(
418        &self,
419        py: Python<'py>,
420        instrument_ids: Vec<InstrumentId>,
421    ) -> PyResult<Bound<'py, PyAny>> {
422        let client = self.clone();
423
424        pyo3_async_runtimes::tokio::future_into_py(py, async move {
425            if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
426                log::error!("Failed to unsubscribe from index prices: {e}");
427            }
428            Ok(())
429        })
430    }
431
432    #[pyo3(name = "unsubscribe_bars")]
433    fn py_unsubscribe_bars<'py>(
434        &self,
435        py: Python<'py>,
436        bar_type: BarType,
437    ) -> PyResult<Bound<'py, PyAny>> {
438        let client = self.clone();
439
440        pyo3_async_runtimes::tokio::future_into_py(py, async move {
441            if let Err(e) = client.unsubscribe_bars(bar_type).await {
442                log::error!("Failed to unsubscribe from bars: {e}");
443            }
444            Ok(())
445        })
446    }
447}