nautilus_coinbase_intx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Coinbase Intx WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47    data::BarType,
48    identifiers::InstrumentId,
49    python::{
50        data::data_to_pycapsule,
51        events::order::order_event_to_pyobject,
52        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53    },
54};
55use pyo3::{exceptions::PyRuntimeError, prelude::*};
56
57use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl CoinbaseIntxWebSocketClient {
61    #[new]
62    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
63    fn py_new(
64        url: Option<String>,
65        api_key: Option<String>,
66        api_secret: Option<String>,
67        api_passphrase: Option<String>,
68        heartbeat: Option<u64>,
69    ) -> PyResult<Self> {
70        Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
71    }
72
73    #[getter]
74    #[pyo3(name = "url")]
75    #[must_use]
76    pub const fn py_url(&self) -> &str {
77        self.url()
78    }
79
80    #[getter]
81    #[pyo3(name = "api_key")]
82    #[must_use]
83    pub fn py_api_key(&self) -> &str {
84        self.api_key()
85    }
86
87    #[getter]
88    #[pyo3(name = "api_key_masked")]
89    #[must_use]
90    pub fn py_api_key_masked(&self) -> String {
91        self.api_key_masked()
92    }
93
94    #[pyo3(name = "is_active")]
95    fn py_is_active(&mut self) -> bool {
96        self.is_active()
97    }
98
99    #[pyo3(name = "is_closed")]
100    fn py_is_closed(&mut self) -> bool {
101        self.is_closed()
102    }
103
104    #[pyo3(name = "get_subscriptions")]
105    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
106        let channels = self.get_subscriptions(instrument_id);
107
108        // Convert to Coinbase channel names
109        channels
110            .iter()
111            .map(|c| {
112                serde_json::to_value(c)
113                    .ok()
114                    .and_then(|v| v.as_str().map(String::from))
115                    .unwrap_or_else(|| c.to_string())
116            })
117            .collect()
118    }
119
120    #[pyo3(name = "connect")]
121    fn py_connect<'py>(
122        &mut self,
123        py: Python<'py>,
124        instruments: Vec<Py<PyAny>>,
125        callback: Py<PyAny>,
126    ) -> PyResult<Bound<'py, PyAny>> {
127        let mut instruments_any = Vec::new();
128        for inst in instruments {
129            let inst_any = pyobject_to_instrument_any(py, inst)?;
130            instruments_any.push(inst_any);
131        }
132
133        self.cache_instruments(instruments_any);
134
135        let mut client = self.clone();
136
137        pyo3_async_runtimes::tokio::future_into_py(py, async move {
138            client.connect().await.map_err(to_pyruntime_err)?;
139
140            let stream = client.stream();
141
142            tokio::spawn(async move {
143                tokio::pin!(stream);
144
145                while let Some(msg) = stream.next().await {
146                    match msg {
147                        NautilusWsMessage::Instrument(inst) => Python::attach(|py| {
148                            let py_obj = instrument_any_to_pyobject(py, inst)
149                                .expect("Failed to create instrument");
150                            call_python(py, &callback, py_obj);
151                        }),
152                        NautilusWsMessage::Data(data) => Python::attach(|py| {
153                            let py_obj = data_to_pycapsule(py, data);
154                            call_python(py, &callback, py_obj);
155                        }),
156                        NautilusWsMessage::DataVec(data_vec) => Python::attach(|py| {
157                            for data in data_vec {
158                                let py_obj = data_to_pycapsule(py, data);
159                                call_python(py, &callback, py_obj);
160                            }
161                        }),
162                        NautilusWsMessage::Deltas(deltas) => Python::attach(|py| {
163                            call_python(py, &callback, deltas.into_py_any_unwrap(py));
164                        }),
165                        NautilusWsMessage::MarkPrice(mark_price) => Python::attach(|py| {
166                            call_python(py, &callback, mark_price.into_py_any_unwrap(py));
167                        }),
168                        NautilusWsMessage::IndexPrice(index_price) => Python::attach(|py| {
169                            call_python(py, &callback, index_price.into_py_any_unwrap(py));
170                        }),
171                        NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
172                            Python::attach(|py| {
173                                call_python(py, &callback, mark_price.into_py_any_unwrap(py));
174                                call_python(py, &callback, index_price.into_py_any_unwrap(py));
175                            });
176                        }
177                        NautilusWsMessage::OrderEvent(msg) => Python::attach(|py| {
178                            let py_obj =
179                                order_event_to_pyobject(py, msg).expect("Failed to create event");
180                            call_python(py, &callback, py_obj);
181                        }),
182                    }
183                }
184            });
185
186            Ok(())
187        })
188    }
189
190    #[pyo3(name = "wait_until_active")]
191    fn py_wait_until_active<'py>(
192        &self,
193        py: Python<'py>,
194        timeout_secs: f64,
195    ) -> PyResult<Bound<'py, PyAny>> {
196        let client = self.clone();
197
198        pyo3_async_runtimes::tokio::future_into_py(py, async move {
199            client
200                .wait_until_active(timeout_secs)
201                .await
202                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
203            Ok(())
204        })
205    }
206
207    #[pyo3(name = "close")]
208    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
209        let mut client = self.clone();
210
211        pyo3_async_runtimes::tokio::future_into_py(py, async move {
212            if let Err(e) = client.close().await {
213                log::error!("Error on close: {e}");
214            }
215            Ok(())
216        })
217    }
218
219    #[pyo3(name = "subscribe_instruments")]
220    #[pyo3(signature = (instrument_ids=None))]
221    fn py_subscribe_instruments<'py>(
222        &self,
223        py: Python<'py>,
224        instrument_ids: Option<Vec<InstrumentId>>,
225    ) -> PyResult<Bound<'py, PyAny>> {
226        let client = self.clone();
227        let instrument_ids = instrument_ids.unwrap_or_default();
228
229        pyo3_async_runtimes::tokio::future_into_py(py, async move {
230            if let Err(e) = client.subscribe_instruments(instrument_ids).await {
231                log::error!("Failed to subscribe to instruments: {e}");
232            }
233            Ok(())
234        })
235    }
236
237    #[pyo3(name = "subscribe_book")]
238    fn py_subscribe_book<'py>(
239        &self,
240        py: Python<'py>,
241        instrument_ids: Vec<InstrumentId>,
242    ) -> PyResult<Bound<'py, PyAny>> {
243        let client = self.clone();
244
245        pyo3_async_runtimes::tokio::future_into_py(py, async move {
246            if let Err(e) = client.subscribe_book(instrument_ids).await {
247                log::error!("Failed to subscribe to order book: {e}");
248            }
249            Ok(())
250        })
251    }
252
253    #[pyo3(name = "subscribe_quotes")]
254    fn py_subscribe_quotes<'py>(
255        &self,
256        py: Python<'py>,
257        instrument_ids: Vec<InstrumentId>,
258    ) -> PyResult<Bound<'py, PyAny>> {
259        let client = self.clone();
260
261        pyo3_async_runtimes::tokio::future_into_py(py, async move {
262            if let Err(e) = client.subscribe_quotes(instrument_ids).await {
263                log::error!("Failed to subscribe to quotes: {e}");
264            }
265            Ok(())
266        })
267    }
268
269    #[pyo3(name = "subscribe_trades")]
270    fn py_subscribe_trades<'py>(
271        &self,
272        py: Python<'py>,
273        instrument_ids: Vec<InstrumentId>,
274    ) -> PyResult<Bound<'py, PyAny>> {
275        let client = self.clone();
276
277        pyo3_async_runtimes::tokio::future_into_py(py, async move {
278            if let Err(e) = client.subscribe_trades(instrument_ids).await {
279                log::error!("Failed to subscribe to trades: {e}");
280            }
281            Ok(())
282        })
283    }
284
285    #[pyo3(name = "subscribe_mark_prices")]
286    fn py_subscribe_mark_prices<'py>(
287        &self,
288        py: Python<'py>,
289        instrument_ids: Vec<InstrumentId>,
290    ) -> PyResult<Bound<'py, PyAny>> {
291        let client = self.clone();
292
293        pyo3_async_runtimes::tokio::future_into_py(py, async move {
294            if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
295                log::error!("Failed to subscribe to mark prices: {e}");
296            }
297            Ok(())
298        })
299    }
300
301    #[pyo3(name = "subscribe_index_prices")]
302    fn py_subscribe_index_prices<'py>(
303        &self,
304        py: Python<'py>,
305        instrument_ids: Vec<InstrumentId>,
306    ) -> PyResult<Bound<'py, PyAny>> {
307        let client = self.clone();
308
309        pyo3_async_runtimes::tokio::future_into_py(py, async move {
310            if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
311                log::error!("Failed to subscribe to index prices: {e}");
312            }
313            Ok(())
314        })
315    }
316
317    #[pyo3(name = "subscribe_bars")]
318    fn py_subscribe_bars<'py>(
319        &self,
320        py: Python<'py>,
321        bar_type: BarType,
322    ) -> PyResult<Bound<'py, PyAny>> {
323        let client = self.clone();
324
325        pyo3_async_runtimes::tokio::future_into_py(py, async move {
326            if let Err(e) = client.subscribe_bars(bar_type).await {
327                log::error!("Failed to subscribe to bars: {e}");
328            }
329            Ok(())
330        })
331    }
332
333    #[pyo3(name = "unsubscribe_instruments")]
334    fn py_unsubscribe_instruments<'py>(
335        &self,
336        py: Python<'py>,
337        instrument_ids: Vec<InstrumentId>,
338    ) -> PyResult<Bound<'py, PyAny>> {
339        let client = self.clone();
340
341        pyo3_async_runtimes::tokio::future_into_py(py, async move {
342            if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
343                log::error!("Failed to unsubscribe from order book: {e}");
344            }
345            Ok(())
346        })
347    }
348
349    #[pyo3(name = "unsubscribe_book")]
350    fn py_unsubscribe_book<'py>(
351        &self,
352        py: Python<'py>,
353        instrument_ids: Vec<InstrumentId>,
354    ) -> PyResult<Bound<'py, PyAny>> {
355        let client = self.clone();
356
357        pyo3_async_runtimes::tokio::future_into_py(py, async move {
358            if let Err(e) = client.unsubscribe_book(instrument_ids).await {
359                log::error!("Failed to unsubscribe from order book: {e}");
360            }
361            Ok(())
362        })
363    }
364
365    #[pyo3(name = "unsubscribe_quotes")]
366    fn py_unsubscribe_quotes<'py>(
367        &self,
368        py: Python<'py>,
369        instrument_ids: Vec<InstrumentId>,
370    ) -> PyResult<Bound<'py, PyAny>> {
371        let client = self.clone();
372
373        pyo3_async_runtimes::tokio::future_into_py(py, async move {
374            if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
375                log::error!("Failed to unsubscribe from quotes: {e}");
376            }
377            Ok(())
378        })
379    }
380
381    #[pyo3(name = "unsubscribe_trades")]
382    fn py_unsubscribe_trades<'py>(
383        &self,
384        py: Python<'py>,
385        instrument_ids: Vec<InstrumentId>,
386    ) -> PyResult<Bound<'py, PyAny>> {
387        let client = self.clone();
388
389        pyo3_async_runtimes::tokio::future_into_py(py, async move {
390            if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
391                log::error!("Failed to unsubscribe from trades: {e}");
392            }
393            Ok(())
394        })
395    }
396
397    #[pyo3(name = "unsubscribe_mark_prices")]
398    fn py_unsubscribe_mark_prices<'py>(
399        &self,
400        py: Python<'py>,
401        instrument_ids: Vec<InstrumentId>,
402    ) -> PyResult<Bound<'py, PyAny>> {
403        let client = self.clone();
404
405        pyo3_async_runtimes::tokio::future_into_py(py, async move {
406            if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
407                log::error!("Failed to unsubscribe from mark prices: {e}");
408            }
409            Ok(())
410        })
411    }
412
413    #[pyo3(name = "unsubscribe_index_prices")]
414    fn py_unsubscribe_index_prices<'py>(
415        &self,
416        py: Python<'py>,
417        instrument_ids: Vec<InstrumentId>,
418    ) -> PyResult<Bound<'py, PyAny>> {
419        let client = self.clone();
420
421        pyo3_async_runtimes::tokio::future_into_py(py, async move {
422            if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
423                log::error!("Failed to unsubscribe from index prices: {e}");
424            }
425            Ok(())
426        })
427    }
428
429    #[pyo3(name = "unsubscribe_bars")]
430    fn py_unsubscribe_bars<'py>(
431        &self,
432        py: Python<'py>,
433        bar_type: BarType,
434    ) -> PyResult<Bound<'py, PyAny>> {
435        let client = self.clone();
436
437        pyo3_async_runtimes::tokio::future_into_py(py, async move {
438            if let Err(e) = client.unsubscribe_bars(bar_type).await {
439                log::error!("Failed to unsubscribe from bars: {e}");
440            }
441            Ok(())
442        })
443    }
444}
445
446pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
447    if let Err(e) = callback.call1(py, (py_obj,)) {
448        tracing::error!("Error calling Python: {e}");
449    }
450}