nautilus_coinbase_intx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use futures_util::StreamExt;
17use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyvalue_err};
18use nautilus_model::{
19    data::BarType,
20    identifiers::InstrumentId,
21    python::{
22        data::data_to_pycapsule,
23        events::order::order_event_to_pyobject,
24        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
25    },
26};
27use pyo3::{exceptions::PyRuntimeError, prelude::*};
28use pyo3_async_runtimes::tokio::get_runtime;
29
30use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
31
32#[pymethods]
33impl CoinbaseIntxWebSocketClient {
34    #[new]
35    #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
36    fn py_new(
37        url: Option<String>,
38        api_key: Option<String>,
39        api_secret: Option<String>,
40        api_passphrase: Option<String>,
41        heartbeat: Option<u64>,
42    ) -> PyResult<Self> {
43        Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
44    }
45
46    #[getter]
47    #[pyo3(name = "url")]
48    pub fn py_url(&self) -> &str {
49        self.url()
50    }
51
52    #[getter]
53    #[pyo3(name = "api_key")]
54    pub fn py_api_key(&self) -> &str {
55        self.api_key()
56    }
57
58    #[pyo3(name = "is_active")]
59    fn py_is_active(&mut self) -> bool {
60        self.is_active()
61    }
62
63    #[pyo3(name = "is_closed")]
64    fn py_is_closed(&mut self) -> bool {
65        self.is_closed()
66    }
67
68    #[pyo3(name = "connect")]
69    fn py_connect<'py>(
70        &mut self,
71        py: Python<'py>,
72        instruments: Vec<PyObject>,
73        callback: PyObject,
74    ) -> PyResult<Bound<'py, PyAny>> {
75        let mut instruments_any = Vec::new();
76        for inst in instruments {
77            let inst_any = pyobject_to_instrument_any(py, inst)?;
78            instruments_any.push(inst_any);
79        }
80
81        get_runtime().block_on(async {
82            self.connect(instruments_any)
83                .await
84                .map_err(|e| PyRuntimeError::new_err(e.to_string()))
85        })?;
86
87        let stream = self.stream();
88
89        pyo3_async_runtimes::tokio::future_into_py(py, async move {
90            tokio::pin!(stream);
91
92            while let Some(msg) = stream.next().await {
93                match msg {
94                    NautilusWsMessage::Instrument(inst) => Python::with_gil(|py| {
95                        let py_obj = instrument_any_to_pyobject(py, inst)
96                            .expect("Failed to create instrument");
97                        call_python(py, &callback, py_obj);
98                    }),
99                    NautilusWsMessage::Data(data) => Python::with_gil(|py| {
100                        let py_obj = data_to_pycapsule(py, data);
101                        call_python(py, &callback, py_obj);
102                    }),
103                    NautilusWsMessage::DataVec(data_vec) => Python::with_gil(|py| {
104                        for data in data_vec {
105                            let py_obj = data_to_pycapsule(py, data);
106                            call_python(py, &callback, py_obj);
107                        }
108                    }),
109                    NautilusWsMessage::Deltas(deltas) => Python::with_gil(|py| {
110                        call_python(py, &callback, deltas.into_py_any_unwrap(py));
111                    }),
112                    NautilusWsMessage::MarkPrice(mark_price) => Python::with_gil(|py| {
113                        call_python(py, &callback, mark_price.into_py_any_unwrap(py));
114                    }),
115                    NautilusWsMessage::IndexPrice(index_price) => Python::with_gil(|py| {
116                        call_python(py, &callback, index_price.into_py_any_unwrap(py));
117                    }),
118                    NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
119                        Python::with_gil(|py| {
120                            call_python(py, &callback, mark_price.into_py_any_unwrap(py));
121                            call_python(py, &callback, index_price.into_py_any_unwrap(py));
122                        })
123                    }
124                    NautilusWsMessage::OrderEvent(msg) => Python::with_gil(|py| {
125                        let py_obj =
126                            order_event_to_pyobject(py, msg).expect("Failed to create event");
127                        call_python(py, &callback, py_obj);
128                    }),
129                }
130            }
131
132            Ok(())
133        })
134    }
135
136    #[pyo3(name = "close")]
137    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
138        let mut client = self.clone();
139
140        pyo3_async_runtimes::tokio::future_into_py(py, async move {
141            if let Err(e) = client.close().await {
142                log::error!("Error on close: {e}");
143            }
144            Ok(())
145        })
146    }
147
148    #[pyo3(name = "subscribe_instruments")]
149    #[pyo3(signature = (instrument_ids=None))]
150    fn py_subscribe_instruments<'py>(
151        &self,
152        py: Python<'py>,
153        instrument_ids: Option<Vec<InstrumentId>>,
154    ) -> PyResult<Bound<'py, PyAny>> {
155        let client = self.clone();
156        let instrument_ids = instrument_ids.unwrap_or_default();
157
158        pyo3_async_runtimes::tokio::future_into_py(py, async move {
159            if let Err(e) = client.subscribe_instruments(instrument_ids).await {
160                log::error!("Failed to subscribe to instruments: {e}");
161            }
162            Ok(())
163        })
164    }
165
166    #[pyo3(name = "subscribe_order_book")]
167    fn py_subscribe_order_book<'py>(
168        &self,
169        py: Python<'py>,
170        instrument_ids: Vec<InstrumentId>,
171    ) -> PyResult<Bound<'py, PyAny>> {
172        let client = self.clone();
173
174        pyo3_async_runtimes::tokio::future_into_py(py, async move {
175            if let Err(e) = client.subscribe_order_book(instrument_ids).await {
176                log::error!("Failed to subscribe to order book: {e}");
177            }
178            Ok(())
179        })
180    }
181
182    #[pyo3(name = "subscribe_quotes")]
183    fn py_subscribe_quotes<'py>(
184        &self,
185        py: Python<'py>,
186        instrument_ids: Vec<InstrumentId>,
187    ) -> PyResult<Bound<'py, PyAny>> {
188        let client = self.clone();
189
190        pyo3_async_runtimes::tokio::future_into_py(py, async move {
191            if let Err(e) = client.subscribe_quotes(instrument_ids).await {
192                log::error!("Failed to subscribe to quotes: {e}");
193            }
194            Ok(())
195        })
196    }
197
198    #[pyo3(name = "subscribe_trades")]
199    fn py_subscribe_trades<'py>(
200        &self,
201        py: Python<'py>,
202        instrument_ids: Vec<InstrumentId>,
203    ) -> PyResult<Bound<'py, PyAny>> {
204        let client = self.clone();
205
206        pyo3_async_runtimes::tokio::future_into_py(py, async move {
207            if let Err(e) = client.subscribe_trades(instrument_ids).await {
208                log::error!("Failed to subscribe to trades: {e}");
209            }
210            Ok(())
211        })
212    }
213
214    #[pyo3(name = "subscribe_mark_prices")]
215    fn py_subscribe_mark_prices<'py>(
216        &self,
217        py: Python<'py>,
218        instrument_ids: Vec<InstrumentId>,
219    ) -> PyResult<Bound<'py, PyAny>> {
220        let client = self.clone();
221
222        pyo3_async_runtimes::tokio::future_into_py(py, async move {
223            if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
224                log::error!("Failed to subscribe to mark prices: {e}");
225            }
226            Ok(())
227        })
228    }
229
230    #[pyo3(name = "subscribe_index_prices")]
231    fn py_subscribe_index_prices<'py>(
232        &self,
233        py: Python<'py>,
234        instrument_ids: Vec<InstrumentId>,
235    ) -> PyResult<Bound<'py, PyAny>> {
236        let client = self.clone();
237
238        pyo3_async_runtimes::tokio::future_into_py(py, async move {
239            if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
240                log::error!("Failed to subscribe to index prices: {e}");
241            }
242            Ok(())
243        })
244    }
245
246    #[pyo3(name = "subscribe_bars")]
247    fn py_subscribe_bars<'py>(
248        &self,
249        py: Python<'py>,
250        bar_type: BarType,
251    ) -> PyResult<Bound<'py, PyAny>> {
252        let client = self.clone();
253
254        pyo3_async_runtimes::tokio::future_into_py(py, async move {
255            if let Err(e) = client.subscribe_bars(bar_type).await {
256                log::error!("Failed to subscribe to bars: {e}");
257            }
258            Ok(())
259        })
260    }
261
262    #[pyo3(name = "unsubscribe_instruments")]
263    fn py_unsubscribe_instruments<'py>(
264        &self,
265        py: Python<'py>,
266        instrument_ids: Vec<InstrumentId>,
267    ) -> PyResult<Bound<'py, PyAny>> {
268        let client = self.clone();
269
270        pyo3_async_runtimes::tokio::future_into_py(py, async move {
271            if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
272                log::error!("Failed to unsubscribe from order book: {e}");
273            }
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "unsubscribe_order_book")]
279    fn py_unsubscribe_order_book<'py>(
280        &self,
281        py: Python<'py>,
282        instrument_ids: Vec<InstrumentId>,
283    ) -> PyResult<Bound<'py, PyAny>> {
284        let client = self.clone();
285
286        pyo3_async_runtimes::tokio::future_into_py(py, async move {
287            if let Err(e) = client.unsubscribe_order_book(instrument_ids).await {
288                log::error!("Failed to unsubscribe from order book: {e}");
289            }
290            Ok(())
291        })
292    }
293
294    #[pyo3(name = "unsubscribe_quotes")]
295    fn py_unsubscribe_quotes<'py>(
296        &self,
297        py: Python<'py>,
298        instrument_ids: Vec<InstrumentId>,
299    ) -> PyResult<Bound<'py, PyAny>> {
300        let client = self.clone();
301
302        pyo3_async_runtimes::tokio::future_into_py(py, async move {
303            if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
304                log::error!("Failed to unsubscribe from quotes: {e}");
305            }
306            Ok(())
307        })
308    }
309
310    #[pyo3(name = "unsubscribe_trades")]
311    fn py_unsubscribe_trades<'py>(
312        &self,
313        py: Python<'py>,
314        instrument_ids: Vec<InstrumentId>,
315    ) -> PyResult<Bound<'py, PyAny>> {
316        let client = self.clone();
317
318        pyo3_async_runtimes::tokio::future_into_py(py, async move {
319            if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
320                log::error!("Failed to unsubscribe from trades: {e}");
321            }
322            Ok(())
323        })
324    }
325
326    #[pyo3(name = "unsubscribe_mark_prices")]
327    fn py_unsubscribe_mark_prices<'py>(
328        &self,
329        py: Python<'py>,
330        instrument_ids: Vec<InstrumentId>,
331    ) -> PyResult<Bound<'py, PyAny>> {
332        let client = self.clone();
333
334        pyo3_async_runtimes::tokio::future_into_py(py, async move {
335            if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
336                log::error!("Failed to unsubscribe from mark prices: {e}");
337            }
338            Ok(())
339        })
340    }
341
342    #[pyo3(name = "unsubscribe_index_prices")]
343    fn py_unsubscribe_index_prices<'py>(
344        &self,
345        py: Python<'py>,
346        instrument_ids: Vec<InstrumentId>,
347    ) -> PyResult<Bound<'py, PyAny>> {
348        let client = self.clone();
349
350        pyo3_async_runtimes::tokio::future_into_py(py, async move {
351            if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
352                log::error!("Failed to unsubscribe from index prices: {e}");
353            }
354            Ok(())
355        })
356    }
357
358    #[pyo3(name = "unsubscribe_bars")]
359    fn py_unsubscribe_bars<'py>(
360        &self,
361        py: Python<'py>,
362        bar_type: BarType,
363    ) -> PyResult<Bound<'py, PyAny>> {
364        let client = self.clone();
365
366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
367            if let Err(e) = client.unsubscribe_bars(bar_type).await {
368                log::error!("Failed to unsubscribe from bars: {e}");
369            }
370            Ok(())
371        })
372    }
373}
374
375pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
376    if let Err(e) = callback.call1(py, (py_obj,)) {
377        tracing::error!("Error calling Python: {e}");
378    }
379}