1use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47 data::bar::BarType,
48 identifiers::{AccountId, InstrumentId},
49 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
52
53use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
54
55#[pymethods]
56impl BitmexWebSocketClient {
57 #[new]
58 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None))]
59 fn py_new(
60 url: Option<String>,
61 api_key: Option<String>,
62 api_secret: Option<String>,
63 account_id: Option<AccountId>,
64 heartbeat: Option<u64>,
65 ) -> PyResult<Self> {
66 let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
68 let env_key = std::env::var("BITMEX_API_KEY").ok();
70 let env_secret = std::env::var("BITMEX_API_SECRET").ok();
71 (env_key, env_secret)
72 } else {
73 (api_key, api_secret)
74 };
75
76 Self::new(url, final_api_key, final_api_secret, account_id, heartbeat)
77 .map_err(to_pyvalue_err)
78 }
79
80 #[staticmethod]
81 #[pyo3(name = "from_env")]
82 fn py_from_env() -> PyResult<Self> {
83 Self::from_env().map_err(to_pyvalue_err)
84 }
85
86 #[getter]
87 #[pyo3(name = "url")]
88 #[must_use]
89 pub const fn py_url(&self) -> &str {
90 self.url()
91 }
92
93 #[getter]
94 #[pyo3(name = "api_key")]
95 #[must_use]
96 pub fn py_api_key(&self) -> Option<&str> {
97 self.api_key()
98 }
99
100 #[pyo3(name = "is_active")]
101 fn py_is_active(&mut self) -> bool {
102 self.is_active()
103 }
104
105 #[pyo3(name = "is_closed")]
106 fn py_is_closed(&mut self) -> bool {
107 self.is_closed()
108 }
109
110 #[pyo3(name = "get_subscriptions")]
111 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
112 self.get_subscriptions(instrument_id)
113 }
114
115 #[pyo3(name = "connect")]
116 fn py_connect<'py>(
117 &mut self,
118 py: Python<'py>,
119 instruments: Vec<PyObject>,
120 callback: PyObject,
121 ) -> PyResult<Bound<'py, PyAny>> {
122 let mut instruments_any = Vec::new();
123 for inst in instruments {
124 let inst_any = pyobject_to_instrument_any(py, inst)?;
125 instruments_any.push(inst_any);
126 }
127
128 self.initialize_instruments_cache(instruments_any);
129
130 let mut client = self.clone();
131
132 pyo3_async_runtimes::tokio::future_into_py(py, async move {
133 client.connect().await.map_err(to_pyruntime_err)?;
134
135 let stream = client.stream();
136
137 tokio::spawn(async move {
138 tokio::pin!(stream);
139
140 while let Some(msg) = stream.next().await {
141 Python::with_gil(|py| match msg {
142 NautilusWsMessage::Data(data_vec) => {
143 for data in data_vec {
144 let py_obj = data_to_pycapsule(py, data);
145 call_python(py, &callback, py_obj);
146 }
147 }
148 NautilusWsMessage::OrderStatusReports(reports) => {
149 for report in reports {
150 if let Ok(py_obj) = report.into_py_any(py) {
151 call_python(py, &callback, py_obj);
152 }
153 }
154 }
155 NautilusWsMessage::FillReports(reports) => {
156 for report in reports {
157 if let Ok(py_obj) = report.into_py_any(py) {
158 call_python(py, &callback, py_obj);
159 }
160 }
161 }
162 NautilusWsMessage::PositionStatusReport(report) => {
163 if let Ok(py_obj) = (*report).into_py_any(py) {
164 call_python(py, &callback, py_obj);
165 }
166 }
167 NautilusWsMessage::FundingRateUpdates(updates) => {
168 for update in updates {
169 if let Ok(py_obj) = update.into_py_any(py) {
170 call_python(py, &callback, py_obj);
171 }
172 }
173 }
174 NautilusWsMessage::AccountState(account_state) => {
175 if let Ok(py_obj) = (*account_state).into_py_any(py) {
176 call_python(py, &callback, py_obj);
177 }
178 }
179 NautilusWsMessage::Reconnected => {} });
181 }
182 });
183
184 Ok(())
185 })
186 }
187
188 #[pyo3(name = "wait_until_active")]
189 fn py_wait_until_active<'py>(
190 &self,
191 py: Python<'py>,
192 timeout_secs: f64,
193 ) -> PyResult<Bound<'py, PyAny>> {
194 let client = self.clone();
195
196 pyo3_async_runtimes::tokio::future_into_py(py, async move {
197 client
198 .wait_until_active(timeout_secs)
199 .await
200 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
201 Ok(())
202 })
203 }
204
205 #[pyo3(name = "close")]
206 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
207 let mut client = self.clone();
208
209 pyo3_async_runtimes::tokio::future_into_py(py, async move {
210 if let Err(e) = client.close().await {
211 log::error!("Error on close: {e}");
212 }
213 Ok(())
214 })
215 }
216
217 #[pyo3(name = "subscribe_instruments")]
218 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
219 let client = self.clone();
220
221 pyo3_async_runtimes::tokio::future_into_py(py, async move {
222 if let Err(e) = client.subscribe_instruments().await {
223 log::error!("Failed to subscribe to instruments: {e}");
224 }
225 Ok(())
226 })
227 }
228
229 #[pyo3(name = "subscribe_instrument")]
230 fn py_subscribe_instrument<'py>(
231 &self,
232 py: Python<'py>,
233 instrument_id: InstrumentId,
234 ) -> PyResult<Bound<'py, PyAny>> {
235 let client = self.clone();
236
237 pyo3_async_runtimes::tokio::future_into_py(py, async move {
238 if let Err(e) = client.subscribe_instrument(instrument_id).await {
239 log::error!("Failed to subscribe to instrument: {e}");
240 }
241 Ok(())
242 })
243 }
244
245 #[pyo3(name = "subscribe_book")]
246 fn py_subscribe_book<'py>(
247 &self,
248 py: Python<'py>,
249 instrument_id: InstrumentId,
250 ) -> PyResult<Bound<'py, PyAny>> {
251 let client = self.clone();
252
253 pyo3_async_runtimes::tokio::future_into_py(py, async move {
254 if let Err(e) = client.subscribe_book(instrument_id).await {
255 log::error!("Failed to subscribe to order book: {e}");
256 }
257 Ok(())
258 })
259 }
260
261 #[pyo3(name = "subscribe_book_25")]
262 fn py_subscribe_book_25<'py>(
263 &self,
264 py: Python<'py>,
265 instrument_id: InstrumentId,
266 ) -> PyResult<Bound<'py, PyAny>> {
267 let client = self.clone();
268
269 pyo3_async_runtimes::tokio::future_into_py(py, async move {
270 if let Err(e) = client.subscribe_book_25(instrument_id).await {
271 log::error!("Failed to subscribe to order book 25: {e}");
272 }
273 Ok(())
274 })
275 }
276
277 #[pyo3(name = "subscribe_book_depth10")]
278 fn py_subscribe_book_depth10<'py>(
279 &self,
280 py: Python<'py>,
281 instrument_id: InstrumentId,
282 ) -> PyResult<Bound<'py, PyAny>> {
283 let client = self.clone();
284
285 pyo3_async_runtimes::tokio::future_into_py(py, async move {
286 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
287 log::error!("Failed to subscribe to order book depth 10: {e}");
288 }
289 Ok(())
290 })
291 }
292
293 #[pyo3(name = "subscribe_quotes")]
294 fn py_subscribe_quotes<'py>(
295 &self,
296 py: Python<'py>,
297 instrument_id: InstrumentId,
298 ) -> PyResult<Bound<'py, PyAny>> {
299 let client = self.clone();
300
301 pyo3_async_runtimes::tokio::future_into_py(py, async move {
302 if let Err(e) = client.subscribe_quotes(instrument_id).await {
303 log::error!("Failed to subscribe to quotes: {e}");
304 }
305 Ok(())
306 })
307 }
308
309 #[pyo3(name = "subscribe_trades")]
310 fn py_subscribe_trades<'py>(
311 &self,
312 py: Python<'py>,
313 instrument_id: InstrumentId,
314 ) -> PyResult<Bound<'py, PyAny>> {
315 let client = self.clone();
316
317 pyo3_async_runtimes::tokio::future_into_py(py, async move {
318 if let Err(e) = client.subscribe_trades(instrument_id).await {
319 log::error!("Failed to subscribe to trades: {e}");
320 }
321 Ok(())
322 })
323 }
324
325 #[pyo3(name = "subscribe_mark_prices")]
326 fn py_subscribe_mark_prices<'py>(
327 &self,
328 py: Python<'py>,
329 instrument_id: InstrumentId,
330 ) -> PyResult<Bound<'py, PyAny>> {
331 let client = self.clone();
332
333 pyo3_async_runtimes::tokio::future_into_py(py, async move {
334 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
335 log::error!("Failed to subscribe to mark prices: {e}");
336 }
337 Ok(())
338 })
339 }
340
341 #[pyo3(name = "subscribe_index_prices")]
342 fn py_subscribe_index_prices<'py>(
343 &self,
344 py: Python<'py>,
345 instrument_id: InstrumentId,
346 ) -> PyResult<Bound<'py, PyAny>> {
347 let client = self.clone();
348
349 pyo3_async_runtimes::tokio::future_into_py(py, async move {
350 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
351 log::error!("Failed to subscribe to index prices: {e}");
352 }
353 Ok(())
354 })
355 }
356
357 #[pyo3(name = "subscribe_funding_rates")]
358 fn py_subscribe_funding_rates<'py>(
359 &self,
360 py: Python<'py>,
361 instrument_id: InstrumentId,
362 ) -> PyResult<Bound<'py, PyAny>> {
363 let client = self.clone();
364
365 pyo3_async_runtimes::tokio::future_into_py(py, async move {
366 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
367 log::error!("Failed to subscribe to funding: {e}");
368 }
369 Ok(())
370 })
371 }
372
373 #[pyo3(name = "subscribe_bars")]
374 fn py_subscribe_bars<'py>(
375 &self,
376 py: Python<'py>,
377 bar_type: BarType,
378 ) -> PyResult<Bound<'py, PyAny>> {
379 let client = self.clone();
380
381 pyo3_async_runtimes::tokio::future_into_py(py, async move {
382 if let Err(e) = client.subscribe_bars(bar_type).await {
383 log::error!("Failed to subscribe to bars: {e}");
384 }
385 Ok(())
386 })
387 }
388
389 #[pyo3(name = "unsubscribe_instruments")]
390 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
391 let client = self.clone();
392
393 pyo3_async_runtimes::tokio::future_into_py(py, async move {
394 if let Err(e) = client.unsubscribe_instruments().await {
395 log::error!("Failed to unsubscribe from instruments: {e}");
396 }
397 Ok(())
398 })
399 }
400
401 #[pyo3(name = "unsubscribe_instrument")]
402 fn py_unsubscribe_instrument<'py>(
403 &self,
404 py: Python<'py>,
405 instrument_id: InstrumentId,
406 ) -> PyResult<Bound<'py, PyAny>> {
407 let client = self.clone();
408
409 pyo3_async_runtimes::tokio::future_into_py(py, async move {
410 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
411 log::error!("Failed to unsubscribe from instrument: {e}");
412 }
413 Ok(())
414 })
415 }
416
417 #[pyo3(name = "unsubscribe_book")]
418 fn py_unsubscribe_book<'py>(
419 &self,
420 py: Python<'py>,
421 instrument_id: InstrumentId,
422 ) -> PyResult<Bound<'py, PyAny>> {
423 let client = self.clone();
424
425 pyo3_async_runtimes::tokio::future_into_py(py, async move {
426 if let Err(e) = client.unsubscribe_book(instrument_id).await {
427 log::error!("Failed to unsubscribe from order book: {e}");
428 }
429 Ok(())
430 })
431 }
432
433 #[pyo3(name = "unsubscribe_book_25")]
434 fn py_unsubscribe_book_25<'py>(
435 &self,
436 py: Python<'py>,
437 instrument_id: InstrumentId,
438 ) -> PyResult<Bound<'py, PyAny>> {
439 let client = self.clone();
440
441 pyo3_async_runtimes::tokio::future_into_py(py, async move {
442 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
443 log::error!("Failed to unsubscribe from order book 25: {e}");
444 }
445 Ok(())
446 })
447 }
448
449 #[pyo3(name = "unsubscribe_book_depth10")]
450 fn py_unsubscribe_book_depth10<'py>(
451 &self,
452 py: Python<'py>,
453 instrument_id: InstrumentId,
454 ) -> PyResult<Bound<'py, PyAny>> {
455 let client = self.clone();
456
457 pyo3_async_runtimes::tokio::future_into_py(py, async move {
458 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
459 log::error!("Failed to unsubscribe from order book depth 10: {e}");
460 }
461 Ok(())
462 })
463 }
464
465 #[pyo3(name = "unsubscribe_quotes")]
466 fn py_unsubscribe_quotes<'py>(
467 &self,
468 py: Python<'py>,
469 instrument_id: InstrumentId,
470 ) -> PyResult<Bound<'py, PyAny>> {
471 let client = self.clone();
472
473 pyo3_async_runtimes::tokio::future_into_py(py, async move {
474 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
475 log::error!("Failed to unsubscribe from quotes: {e}");
476 }
477 Ok(())
478 })
479 }
480
481 #[pyo3(name = "unsubscribe_trades")]
482 fn py_unsubscribe_trades<'py>(
483 &self,
484 py: Python<'py>,
485 instrument_id: InstrumentId,
486 ) -> PyResult<Bound<'py, PyAny>> {
487 let client = self.clone();
488
489 pyo3_async_runtimes::tokio::future_into_py(py, async move {
490 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
491 log::error!("Failed to unsubscribe from trades: {e}");
492 }
493 Ok(())
494 })
495 }
496
497 #[pyo3(name = "unsubscribe_mark_prices")]
498 fn py_unsubscribe_mark_prices<'py>(
499 &self,
500 py: Python<'py>,
501 instrument_id: InstrumentId,
502 ) -> PyResult<Bound<'py, PyAny>> {
503 let client = self.clone();
504
505 pyo3_async_runtimes::tokio::future_into_py(py, async move {
506 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
507 log::error!("Failed to unsubscribe from mark prices: {e}");
508 }
509 Ok(())
510 })
511 }
512
513 #[pyo3(name = "unsubscribe_index_prices")]
514 fn py_unsubscribe_index_prices<'py>(
515 &self,
516 py: Python<'py>,
517 instrument_id: InstrumentId,
518 ) -> PyResult<Bound<'py, PyAny>> {
519 let client = self.clone();
520
521 pyo3_async_runtimes::tokio::future_into_py(py, async move {
522 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
523 log::error!("Failed to unsubscribe from index prices: {e}");
524 }
525 Ok(())
526 })
527 }
528
529 #[pyo3(name = "unsubscribe_funding_rates")]
530 fn py_unsubscribe_funding_rates<'py>(
531 &self,
532 py: Python<'py>,
533 instrument_id: InstrumentId,
534 ) -> PyResult<Bound<'py, PyAny>> {
535 let client = self.clone();
536 pyo3_async_runtimes::tokio::future_into_py(py, async move {
537 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
538 log::error!("Failed to unsubscribe from funding rates: {e}");
539 }
540 Ok(())
541 })
542 }
543
544 #[pyo3(name = "unsubscribe_bars")]
545 fn py_unsubscribe_bars<'py>(
546 &self,
547 py: Python<'py>,
548 bar_type: BarType,
549 ) -> PyResult<Bound<'py, PyAny>> {
550 let client = self.clone();
551
552 pyo3_async_runtimes::tokio::future_into_py(py, async move {
553 if let Err(e) = client.unsubscribe_bars(bar_type).await {
554 log::error!("Failed to unsubscribe from bars: {e}");
555 }
556 Ok(())
557 })
558 }
559
560 #[pyo3(name = "subscribe_orders")]
561 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
562 let client = self.clone();
563
564 pyo3_async_runtimes::tokio::future_into_py(py, async move {
565 if let Err(e) = client.subscribe_orders().await {
566 log::error!("Failed to subscribe to orders: {e}");
567 }
568 Ok(())
569 })
570 }
571
572 #[pyo3(name = "subscribe_executions")]
573 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
574 let client = self.clone();
575
576 pyo3_async_runtimes::tokio::future_into_py(py, async move {
577 if let Err(e) = client.subscribe_executions().await {
578 log::error!("Failed to subscribe to executions: {e}");
579 }
580 Ok(())
581 })
582 }
583
584 #[pyo3(name = "subscribe_positions")]
585 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
586 let client = self.clone();
587
588 pyo3_async_runtimes::tokio::future_into_py(py, async move {
589 if let Err(e) = client.subscribe_positions().await {
590 log::error!("Failed to subscribe to positions: {e}");
591 }
592 Ok(())
593 })
594 }
595
596 #[pyo3(name = "subscribe_margin")]
597 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
598 let client = self.clone();
599
600 pyo3_async_runtimes::tokio::future_into_py(py, async move {
601 if let Err(e) = client.subscribe_margin().await {
602 log::error!("Failed to subscribe to margin: {e}");
603 }
604 Ok(())
605 })
606 }
607
608 #[pyo3(name = "subscribe_wallet")]
609 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
610 let client = self.clone();
611
612 pyo3_async_runtimes::tokio::future_into_py(py, async move {
613 if let Err(e) = client.subscribe_wallet().await {
614 log::error!("Failed to subscribe to wallet: {e}");
615 }
616 Ok(())
617 })
618 }
619
620 #[pyo3(name = "unsubscribe_orders")]
621 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
622 let client = self.clone();
623
624 pyo3_async_runtimes::tokio::future_into_py(py, async move {
625 if let Err(e) = client.unsubscribe_orders().await {
626 log::error!("Failed to unsubscribe from orders: {e}");
627 }
628 Ok(())
629 })
630 }
631
632 #[pyo3(name = "unsubscribe_executions")]
633 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
634 let client = self.clone();
635
636 pyo3_async_runtimes::tokio::future_into_py(py, async move {
637 if let Err(e) = client.unsubscribe_executions().await {
638 log::error!("Failed to unsubscribe from executions: {e}");
639 }
640 Ok(())
641 })
642 }
643
644 #[pyo3(name = "unsubscribe_positions")]
645 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
646 let client = self.clone();
647
648 pyo3_async_runtimes::tokio::future_into_py(py, async move {
649 if let Err(e) = client.unsubscribe_positions().await {
650 log::error!("Failed to unsubscribe from positions: {e}");
651 }
652 Ok(())
653 })
654 }
655
656 #[pyo3(name = "unsubscribe_margin")]
657 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
658 let client = self.clone();
659
660 pyo3_async_runtimes::tokio::future_into_py(py, async move {
661 if let Err(e) = client.unsubscribe_margin().await {
662 log::error!("Failed to unsubscribe from margin: {e}");
663 }
664 Ok(())
665 })
666 }
667
668 #[pyo3(name = "unsubscribe_wallet")]
669 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
670 let client = self.clone();
671
672 pyo3_async_runtimes::tokio::future_into_py(py, async move {
673 if let Err(e) = client.unsubscribe_wallet().await {
674 log::error!("Failed to unsubscribe from wallet: {e}");
675 }
676 Ok(())
677 })
678 }
679}
680
681pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
682 if let Err(e) = callback.call1(py, (py_obj,)) {
683 tracing::error!("Error calling Python: {e}");
684 }
685}