nautilus_coinbase_intx/python/
websocket.rs
1use futures_util::StreamExt;
17use nautilus_core::python::{IntoPyObjectNautilusExt, to_pyvalue_err};
18use nautilus_model::{
19 data::BarType,
20 identifiers::InstrumentId,
21 python::{
22 data::data_to_pycapsule,
23 events::order::order_event_to_pyobject,
24 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
25 },
26};
27use pyo3::{exceptions::PyRuntimeError, prelude::*};
28use pyo3_async_runtimes::tokio::get_runtime;
29
30use crate::websocket::{CoinbaseIntxWebSocketClient, messages::NautilusWsMessage};
31
32#[pymethods]
33impl CoinbaseIntxWebSocketClient {
34 #[new]
35 #[pyo3(signature = (url=None, api_key=None, api_secret=None, api_passphrase=None, heartbeat=None))]
36 fn py_new(
37 url: Option<String>,
38 api_key: Option<String>,
39 api_secret: Option<String>,
40 api_passphrase: Option<String>,
41 heartbeat: Option<u64>,
42 ) -> PyResult<Self> {
43 Self::new(url, api_key, api_secret, api_passphrase, heartbeat).map_err(to_pyvalue_err)
44 }
45
46 #[getter]
47 #[pyo3(name = "url")]
48 pub fn py_url(&self) -> &str {
49 self.url()
50 }
51
52 #[getter]
53 #[pyo3(name = "api_key")]
54 pub fn py_api_key(&self) -> &str {
55 self.api_key()
56 }
57
58 #[pyo3(name = "is_active")]
59 fn py_is_active(&mut self) -> bool {
60 self.is_active()
61 }
62
63 #[pyo3(name = "is_closed")]
64 fn py_is_closed(&mut self) -> bool {
65 self.is_closed()
66 }
67
68 #[pyo3(name = "connect")]
69 fn py_connect<'py>(
70 &mut self,
71 py: Python<'py>,
72 instruments: Vec<PyObject>,
73 callback: PyObject,
74 ) -> PyResult<Bound<'py, PyAny>> {
75 let mut instruments_any = Vec::new();
76 for inst in instruments {
77 let inst_any = pyobject_to_instrument_any(py, inst)?;
78 instruments_any.push(inst_any);
79 }
80
81 get_runtime().block_on(async {
82 self.connect(instruments_any)
83 .await
84 .map_err(|e| PyRuntimeError::new_err(e.to_string()))
85 })?;
86
87 let stream = self.stream();
88
89 pyo3_async_runtimes::tokio::future_into_py(py, async move {
90 tokio::pin!(stream);
91
92 while let Some(msg) = stream.next().await {
93 match msg {
94 NautilusWsMessage::Instrument(inst) => Python::with_gil(|py| {
95 let py_obj = instrument_any_to_pyobject(py, inst)
96 .expect("Failed to create instrument");
97 call_python(py, &callback, py_obj);
98 }),
99 NautilusWsMessage::Data(data) => Python::with_gil(|py| {
100 let py_obj = data_to_pycapsule(py, data);
101 call_python(py, &callback, py_obj);
102 }),
103 NautilusWsMessage::DataVec(data_vec) => Python::with_gil(|py| {
104 for data in data_vec {
105 let py_obj = data_to_pycapsule(py, data);
106 call_python(py, &callback, py_obj);
107 }
108 }),
109 NautilusWsMessage::Deltas(deltas) => Python::with_gil(|py| {
110 call_python(py, &callback, deltas.into_py_any_unwrap(py));
111 }),
112 NautilusWsMessage::MarkPrice(mark_price) => Python::with_gil(|py| {
113 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
114 }),
115 NautilusWsMessage::IndexPrice(index_price) => Python::with_gil(|py| {
116 call_python(py, &callback, index_price.into_py_any_unwrap(py));
117 }),
118 NautilusWsMessage::MarkAndIndex((mark_price, index_price)) => {
119 Python::with_gil(|py| {
120 call_python(py, &callback, mark_price.into_py_any_unwrap(py));
121 call_python(py, &callback, index_price.into_py_any_unwrap(py));
122 })
123 }
124 NautilusWsMessage::OrderEvent(msg) => Python::with_gil(|py| {
125 let py_obj =
126 order_event_to_pyobject(py, msg).expect("Failed to create event");
127 call_python(py, &callback, py_obj);
128 }),
129 }
130 }
131
132 Ok(())
133 })
134 }
135
136 #[pyo3(name = "close")]
137 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
138 let mut client = self.clone();
139
140 pyo3_async_runtimes::tokio::future_into_py(py, async move {
141 if let Err(e) = client.close().await {
142 log::error!("Error on close: {e}");
143 }
144 Ok(())
145 })
146 }
147
148 #[pyo3(name = "subscribe_instruments")]
149 #[pyo3(signature = (instrument_ids=None))]
150 fn py_subscribe_instruments<'py>(
151 &self,
152 py: Python<'py>,
153 instrument_ids: Option<Vec<InstrumentId>>,
154 ) -> PyResult<Bound<'py, PyAny>> {
155 let client = self.clone();
156 let instrument_ids = instrument_ids.unwrap_or_default();
157
158 pyo3_async_runtimes::tokio::future_into_py(py, async move {
159 if let Err(e) = client.subscribe_instruments(instrument_ids).await {
160 log::error!("Failed to subscribe to instruments: {e}");
161 }
162 Ok(())
163 })
164 }
165
166 #[pyo3(name = "subscribe_order_book")]
167 fn py_subscribe_order_book<'py>(
168 &self,
169 py: Python<'py>,
170 instrument_ids: Vec<InstrumentId>,
171 ) -> PyResult<Bound<'py, PyAny>> {
172 let client = self.clone();
173
174 pyo3_async_runtimes::tokio::future_into_py(py, async move {
175 if let Err(e) = client.subscribe_order_book(instrument_ids).await {
176 log::error!("Failed to subscribe to order book: {e}");
177 }
178 Ok(())
179 })
180 }
181
182 #[pyo3(name = "subscribe_quotes")]
183 fn py_subscribe_quotes<'py>(
184 &self,
185 py: Python<'py>,
186 instrument_ids: Vec<InstrumentId>,
187 ) -> PyResult<Bound<'py, PyAny>> {
188 let client = self.clone();
189
190 pyo3_async_runtimes::tokio::future_into_py(py, async move {
191 if let Err(e) = client.subscribe_quotes(instrument_ids).await {
192 log::error!("Failed to subscribe to quotes: {e}");
193 }
194 Ok(())
195 })
196 }
197
198 #[pyo3(name = "subscribe_trades")]
199 fn py_subscribe_trades<'py>(
200 &self,
201 py: Python<'py>,
202 instrument_ids: Vec<InstrumentId>,
203 ) -> PyResult<Bound<'py, PyAny>> {
204 let client = self.clone();
205
206 pyo3_async_runtimes::tokio::future_into_py(py, async move {
207 if let Err(e) = client.subscribe_trades(instrument_ids).await {
208 log::error!("Failed to subscribe to trades: {e}");
209 }
210 Ok(())
211 })
212 }
213
214 #[pyo3(name = "subscribe_mark_prices")]
215 fn py_subscribe_mark_prices<'py>(
216 &self,
217 py: Python<'py>,
218 instrument_ids: Vec<InstrumentId>,
219 ) -> PyResult<Bound<'py, PyAny>> {
220 let client = self.clone();
221
222 pyo3_async_runtimes::tokio::future_into_py(py, async move {
223 if let Err(e) = client.subscribe_mark_prices(instrument_ids).await {
224 log::error!("Failed to subscribe to mark prices: {e}");
225 }
226 Ok(())
227 })
228 }
229
230 #[pyo3(name = "subscribe_index_prices")]
231 fn py_subscribe_index_prices<'py>(
232 &self,
233 py: Python<'py>,
234 instrument_ids: Vec<InstrumentId>,
235 ) -> PyResult<Bound<'py, PyAny>> {
236 let client = self.clone();
237
238 pyo3_async_runtimes::tokio::future_into_py(py, async move {
239 if let Err(e) = client.subscribe_index_prices(instrument_ids).await {
240 log::error!("Failed to subscribe to index prices: {e}");
241 }
242 Ok(())
243 })
244 }
245
246 #[pyo3(name = "subscribe_bars")]
247 fn py_subscribe_bars<'py>(
248 &self,
249 py: Python<'py>,
250 bar_type: BarType,
251 ) -> PyResult<Bound<'py, PyAny>> {
252 let client = self.clone();
253
254 pyo3_async_runtimes::tokio::future_into_py(py, async move {
255 if let Err(e) = client.subscribe_bars(bar_type).await {
256 log::error!("Failed to subscribe to bars: {e}");
257 }
258 Ok(())
259 })
260 }
261
262 #[pyo3(name = "unsubscribe_instruments")]
263 fn py_unsubscribe_instruments<'py>(
264 &self,
265 py: Python<'py>,
266 instrument_ids: Vec<InstrumentId>,
267 ) -> PyResult<Bound<'py, PyAny>> {
268 let client = self.clone();
269
270 pyo3_async_runtimes::tokio::future_into_py(py, async move {
271 if let Err(e) = client.unsubscribe_instruments(instrument_ids).await {
272 log::error!("Failed to unsubscribe from order book: {e}");
273 }
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "unsubscribe_order_book")]
279 fn py_unsubscribe_order_book<'py>(
280 &self,
281 py: Python<'py>,
282 instrument_ids: Vec<InstrumentId>,
283 ) -> PyResult<Bound<'py, PyAny>> {
284 let client = self.clone();
285
286 pyo3_async_runtimes::tokio::future_into_py(py, async move {
287 if let Err(e) = client.unsubscribe_order_book(instrument_ids).await {
288 log::error!("Failed to unsubscribe from order book: {e}");
289 }
290 Ok(())
291 })
292 }
293
294 #[pyo3(name = "unsubscribe_quotes")]
295 fn py_unsubscribe_quotes<'py>(
296 &self,
297 py: Python<'py>,
298 instrument_ids: Vec<InstrumentId>,
299 ) -> PyResult<Bound<'py, PyAny>> {
300 let client = self.clone();
301
302 pyo3_async_runtimes::tokio::future_into_py(py, async move {
303 if let Err(e) = client.unsubscribe_quotes(instrument_ids).await {
304 log::error!("Failed to unsubscribe from quotes: {e}");
305 }
306 Ok(())
307 })
308 }
309
310 #[pyo3(name = "unsubscribe_trades")]
311 fn py_unsubscribe_trades<'py>(
312 &self,
313 py: Python<'py>,
314 instrument_ids: Vec<InstrumentId>,
315 ) -> PyResult<Bound<'py, PyAny>> {
316 let client = self.clone();
317
318 pyo3_async_runtimes::tokio::future_into_py(py, async move {
319 if let Err(e) = client.unsubscribe_trades(instrument_ids).await {
320 log::error!("Failed to unsubscribe from trades: {e}");
321 }
322 Ok(())
323 })
324 }
325
326 #[pyo3(name = "unsubscribe_mark_prices")]
327 fn py_unsubscribe_mark_prices<'py>(
328 &self,
329 py: Python<'py>,
330 instrument_ids: Vec<InstrumentId>,
331 ) -> PyResult<Bound<'py, PyAny>> {
332 let client = self.clone();
333
334 pyo3_async_runtimes::tokio::future_into_py(py, async move {
335 if let Err(e) = client.unsubscribe_mark_prices(instrument_ids).await {
336 log::error!("Failed to unsubscribe from mark prices: {e}");
337 }
338 Ok(())
339 })
340 }
341
342 #[pyo3(name = "unsubscribe_index_prices")]
343 fn py_unsubscribe_index_prices<'py>(
344 &self,
345 py: Python<'py>,
346 instrument_ids: Vec<InstrumentId>,
347 ) -> PyResult<Bound<'py, PyAny>> {
348 let client = self.clone();
349
350 pyo3_async_runtimes::tokio::future_into_py(py, async move {
351 if let Err(e) = client.unsubscribe_index_prices(instrument_ids).await {
352 log::error!("Failed to unsubscribe from index prices: {e}");
353 }
354 Ok(())
355 })
356 }
357
358 #[pyo3(name = "unsubscribe_bars")]
359 fn py_unsubscribe_bars<'py>(
360 &self,
361 py: Python<'py>,
362 bar_type: BarType,
363 ) -> PyResult<Bound<'py, PyAny>> {
364 let client = self.clone();
365
366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
367 if let Err(e) = client.unsubscribe_bars(bar_type).await {
368 log::error!("Failed to unsubscribe from bars: {e}");
369 }
370 Ok(())
371 })
372 }
373}
374
375pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
376 if let Err(e) = callback.call1(py, (py_obj,)) {
377 tracing::error!("Error calling Python: {e}");
378 }
379}