1use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
47use nautilus_model::{
48 data::bar::BarType,
49 identifiers::{AccountId, InstrumentId},
50 python::{
51 data::data_to_pycapsule,
52 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53 },
54};
55use pyo3::{conversion::IntoPyObjectExt, prelude::*};
56
57use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl BitmexWebSocketClient {
61 #[new]
62 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
63 fn py_new(
64 url: Option<String>,
65 api_key: Option<String>,
66 api_secret: Option<String>,
67 account_id: Option<AccountId>,
68 heartbeat: Option<u64>,
69 testnet: bool,
70 ) -> PyResult<Self> {
71 Self::new_with_env(url, api_key, api_secret, account_id, heartbeat, testnet)
72 .map_err(to_pyvalue_err)
73 }
74
75 #[staticmethod]
76 #[pyo3(name = "from_env")]
77 fn py_from_env() -> PyResult<Self> {
78 Self::from_env().map_err(to_pyvalue_err)
79 }
80
81 #[getter]
82 #[pyo3(name = "url")]
83 #[must_use]
84 pub const fn py_url(&self) -> &str {
85 self.url()
86 }
87
88 #[getter]
89 #[pyo3(name = "api_key")]
90 #[must_use]
91 pub fn py_api_key(&self) -> Option<&str> {
92 self.api_key()
93 }
94
95 #[getter]
96 #[pyo3(name = "api_key_masked")]
97 #[must_use]
98 pub fn py_api_key_masked(&self) -> Option<String> {
99 self.api_key_masked()
100 }
101
102 #[pyo3(name = "is_active")]
103 fn py_is_active(&mut self) -> bool {
104 self.is_active()
105 }
106
107 #[pyo3(name = "is_closed")]
108 fn py_is_closed(&mut self) -> bool {
109 self.is_closed()
110 }
111
112 #[pyo3(name = "get_subscriptions")]
113 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
114 self.get_subscriptions(instrument_id)
115 }
116
117 #[pyo3(name = "set_account_id")]
118 pub fn py_set_account_id(&mut self, account_id: AccountId) {
119 self.set_account_id(account_id);
120 }
121
122 #[pyo3(name = "cache_instrument")]
123 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
124 let inst_any = pyobject_to_instrument_any(py, instrument)?;
125 self.cache_instrument(inst_any);
126 Ok(())
127 }
128
129 #[pyo3(name = "connect")]
130 fn py_connect<'py>(
131 &mut self,
132 py: Python<'py>,
133 instruments: Vec<Py<PyAny>>,
134 callback: Py<PyAny>,
135 ) -> PyResult<Bound<'py, PyAny>> {
136 let mut instruments_any = Vec::new();
137 for inst in instruments {
138 let inst_any = pyobject_to_instrument_any(py, inst)?;
139 instruments_any.push(inst_any);
140 }
141
142 self.cache_instruments(instruments_any);
143
144 let mut client = self.clone();
147
148 pyo3_async_runtimes::tokio::future_into_py(py, async move {
149 client.connect().await.map_err(to_pyruntime_err)?;
150
151 let stream = client.stream();
152
153 get_runtime().spawn(async move {
154 let _client = client; tokio::pin!(stream);
156
157 while let Some(msg) = stream.next().await {
158 Python::attach(|py| match msg {
159 NautilusWsMessage::Data(data_vec) => {
160 for data in data_vec {
161 let py_obj = data_to_pycapsule(py, data);
162 call_python(py, &callback, py_obj);
163 }
164 }
165 NautilusWsMessage::Instruments(instruments) => {
166 for instrument in instruments {
167 if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
168 call_python(py, &callback, py_obj);
169 }
170 }
171 }
172 NautilusWsMessage::OrderStatusReports(reports) => {
173 for report in reports {
174 if let Ok(py_obj) = report.into_py_any(py) {
175 call_python(py, &callback, py_obj);
176 }
177 }
178 }
179 NautilusWsMessage::FillReports(reports) => {
180 for report in reports {
181 if let Ok(py_obj) = report.into_py_any(py) {
182 call_python(py, &callback, py_obj);
183 }
184 }
185 }
186 NautilusWsMessage::PositionStatusReports(reports) => {
187 for report in reports {
188 if let Ok(py_obj) = report.into_py_any(py) {
189 call_python(py, &callback, py_obj);
190 }
191 }
192 }
193 NautilusWsMessage::FundingRateUpdates(updates) => {
194 for update in updates {
195 if let Ok(py_obj) = update.into_py_any(py) {
196 call_python(py, &callback, py_obj);
197 }
198 }
199 }
200 NautilusWsMessage::AccountStates(states) => {
201 for state in states {
202 if let Ok(py_obj) = state.into_py_any(py) {
203 call_python(py, &callback, py_obj);
204 }
205 }
206 }
207 NautilusWsMessage::OrderUpdated(event) => {
208 if let Ok(py_obj) = (*event).into_py_any(py) {
209 call_python(py, &callback, py_obj);
210 }
211 }
212 NautilusWsMessage::OrderUpdates(events) => {
213 for event in events {
214 if let Ok(py_obj) = event.into_py_any(py) {
215 call_python(py, &callback, py_obj);
216 }
217 }
218 }
219 NautilusWsMessage::Reconnected => {}
220 NautilusWsMessage::Authenticated => {}
221 });
222 }
223 });
224
225 Ok(())
226 })
227 }
228
229 #[pyo3(name = "wait_until_active")]
230 fn py_wait_until_active<'py>(
231 &self,
232 py: Python<'py>,
233 timeout_secs: f64,
234 ) -> PyResult<Bound<'py, PyAny>> {
235 let client = self.clone();
236
237 pyo3_async_runtimes::tokio::future_into_py(py, async move {
238 client
239 .wait_until_active(timeout_secs)
240 .await
241 .map_err(to_pyruntime_err)?;
242 Ok(())
243 })
244 }
245
246 #[pyo3(name = "close")]
247 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
248 let mut client = self.clone();
249
250 pyo3_async_runtimes::tokio::future_into_py(py, async move {
251 if let Err(e) = client.close().await {
252 log::error!("Error on close: {e}");
253 }
254 Ok(())
255 })
256 }
257
258 #[pyo3(name = "subscribe_instruments")]
259 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
260 let client = self.clone();
261
262 pyo3_async_runtimes::tokio::future_into_py(py, async move {
263 if let Err(e) = client.subscribe_instruments().await {
264 log::error!("Failed to subscribe to instruments: {e}");
265 }
266 Ok(())
267 })
268 }
269
270 #[pyo3(name = "subscribe_instrument")]
271 fn py_subscribe_instrument<'py>(
272 &self,
273 py: Python<'py>,
274 instrument_id: InstrumentId,
275 ) -> PyResult<Bound<'py, PyAny>> {
276 let client = self.clone();
277
278 pyo3_async_runtimes::tokio::future_into_py(py, async move {
279 if let Err(e) = client.subscribe_instrument(instrument_id).await {
280 log::error!("Failed to subscribe to instrument: {e}");
281 }
282 Ok(())
283 })
284 }
285
286 #[pyo3(name = "subscribe_book")]
287 fn py_subscribe_book<'py>(
288 &self,
289 py: Python<'py>,
290 instrument_id: InstrumentId,
291 ) -> PyResult<Bound<'py, PyAny>> {
292 let client = self.clone();
293
294 pyo3_async_runtimes::tokio::future_into_py(py, async move {
295 if let Err(e) = client.subscribe_book(instrument_id).await {
296 log::error!("Failed to subscribe to order book: {e}");
297 }
298 Ok(())
299 })
300 }
301
302 #[pyo3(name = "subscribe_book_25")]
303 fn py_subscribe_book_25<'py>(
304 &self,
305 py: Python<'py>,
306 instrument_id: InstrumentId,
307 ) -> PyResult<Bound<'py, PyAny>> {
308 let client = self.clone();
309
310 pyo3_async_runtimes::tokio::future_into_py(py, async move {
311 if let Err(e) = client.subscribe_book_25(instrument_id).await {
312 log::error!("Failed to subscribe to order book 25: {e}");
313 }
314 Ok(())
315 })
316 }
317
318 #[pyo3(name = "subscribe_book_depth10")]
319 fn py_subscribe_book_depth10<'py>(
320 &self,
321 py: Python<'py>,
322 instrument_id: InstrumentId,
323 ) -> PyResult<Bound<'py, PyAny>> {
324 let client = self.clone();
325
326 pyo3_async_runtimes::tokio::future_into_py(py, async move {
327 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
328 log::error!("Failed to subscribe to order book depth 10: {e}");
329 }
330 Ok(())
331 })
332 }
333
334 #[pyo3(name = "subscribe_quotes")]
335 fn py_subscribe_quotes<'py>(
336 &self,
337 py: Python<'py>,
338 instrument_id: InstrumentId,
339 ) -> PyResult<Bound<'py, PyAny>> {
340 let client = self.clone();
341
342 pyo3_async_runtimes::tokio::future_into_py(py, async move {
343 if let Err(e) = client.subscribe_quotes(instrument_id).await {
344 log::error!("Failed to subscribe to quotes: {e}");
345 }
346 Ok(())
347 })
348 }
349
350 #[pyo3(name = "subscribe_trades")]
351 fn py_subscribe_trades<'py>(
352 &self,
353 py: Python<'py>,
354 instrument_id: InstrumentId,
355 ) -> PyResult<Bound<'py, PyAny>> {
356 let client = self.clone();
357
358 pyo3_async_runtimes::tokio::future_into_py(py, async move {
359 if let Err(e) = client.subscribe_trades(instrument_id).await {
360 log::error!("Failed to subscribe to trades: {e}");
361 }
362 Ok(())
363 })
364 }
365
366 #[pyo3(name = "subscribe_mark_prices")]
367 fn py_subscribe_mark_prices<'py>(
368 &self,
369 py: Python<'py>,
370 instrument_id: InstrumentId,
371 ) -> PyResult<Bound<'py, PyAny>> {
372 let client = self.clone();
373
374 pyo3_async_runtimes::tokio::future_into_py(py, async move {
375 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
376 log::error!("Failed to subscribe to mark prices: {e}");
377 }
378 Ok(())
379 })
380 }
381
382 #[pyo3(name = "subscribe_index_prices")]
383 fn py_subscribe_index_prices<'py>(
384 &self,
385 py: Python<'py>,
386 instrument_id: InstrumentId,
387 ) -> PyResult<Bound<'py, PyAny>> {
388 let client = self.clone();
389
390 pyo3_async_runtimes::tokio::future_into_py(py, async move {
391 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
392 log::error!("Failed to subscribe to index prices: {e}");
393 }
394 Ok(())
395 })
396 }
397
398 #[pyo3(name = "subscribe_funding_rates")]
399 fn py_subscribe_funding_rates<'py>(
400 &self,
401 py: Python<'py>,
402 instrument_id: InstrumentId,
403 ) -> PyResult<Bound<'py, PyAny>> {
404 let client = self.clone();
405
406 pyo3_async_runtimes::tokio::future_into_py(py, async move {
407 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
408 log::error!("Failed to subscribe to funding: {e}");
409 }
410 Ok(())
411 })
412 }
413
414 #[pyo3(name = "subscribe_bars")]
415 fn py_subscribe_bars<'py>(
416 &self,
417 py: Python<'py>,
418 bar_type: BarType,
419 ) -> PyResult<Bound<'py, PyAny>> {
420 let client = self.clone();
421
422 pyo3_async_runtimes::tokio::future_into_py(py, async move {
423 if let Err(e) = client.subscribe_bars(bar_type).await {
424 log::error!("Failed to subscribe to bars: {e}");
425 }
426 Ok(())
427 })
428 }
429
430 #[pyo3(name = "unsubscribe_instruments")]
431 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
432 let client = self.clone();
433
434 pyo3_async_runtimes::tokio::future_into_py(py, async move {
435 if let Err(e) = client.unsubscribe_instruments().await {
436 log::error!("Failed to unsubscribe from instruments: {e}");
437 }
438 Ok(())
439 })
440 }
441
442 #[pyo3(name = "unsubscribe_instrument")]
443 fn py_unsubscribe_instrument<'py>(
444 &self,
445 py: Python<'py>,
446 instrument_id: InstrumentId,
447 ) -> PyResult<Bound<'py, PyAny>> {
448 let client = self.clone();
449
450 pyo3_async_runtimes::tokio::future_into_py(py, async move {
451 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
452 log::error!("Failed to unsubscribe from instrument: {e}");
453 }
454 Ok(())
455 })
456 }
457
458 #[pyo3(name = "unsubscribe_book")]
459 fn py_unsubscribe_book<'py>(
460 &self,
461 py: Python<'py>,
462 instrument_id: InstrumentId,
463 ) -> PyResult<Bound<'py, PyAny>> {
464 let client = self.clone();
465
466 pyo3_async_runtimes::tokio::future_into_py(py, async move {
467 if let Err(e) = client.unsubscribe_book(instrument_id).await {
468 log::error!("Failed to unsubscribe from order book: {e}");
469 }
470 Ok(())
471 })
472 }
473
474 #[pyo3(name = "unsubscribe_book_25")]
475 fn py_unsubscribe_book_25<'py>(
476 &self,
477 py: Python<'py>,
478 instrument_id: InstrumentId,
479 ) -> PyResult<Bound<'py, PyAny>> {
480 let client = self.clone();
481
482 pyo3_async_runtimes::tokio::future_into_py(py, async move {
483 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
484 log::error!("Failed to unsubscribe from order book 25: {e}");
485 }
486 Ok(())
487 })
488 }
489
490 #[pyo3(name = "unsubscribe_book_depth10")]
491 fn py_unsubscribe_book_depth10<'py>(
492 &self,
493 py: Python<'py>,
494 instrument_id: InstrumentId,
495 ) -> PyResult<Bound<'py, PyAny>> {
496 let client = self.clone();
497
498 pyo3_async_runtimes::tokio::future_into_py(py, async move {
499 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
500 log::error!("Failed to unsubscribe from order book depth 10: {e}");
501 }
502 Ok(())
503 })
504 }
505
506 #[pyo3(name = "unsubscribe_quotes")]
507 fn py_unsubscribe_quotes<'py>(
508 &self,
509 py: Python<'py>,
510 instrument_id: InstrumentId,
511 ) -> PyResult<Bound<'py, PyAny>> {
512 let client = self.clone();
513
514 pyo3_async_runtimes::tokio::future_into_py(py, async move {
515 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
516 log::error!("Failed to unsubscribe from quotes: {e}");
517 }
518 Ok(())
519 })
520 }
521
522 #[pyo3(name = "unsubscribe_trades")]
523 fn py_unsubscribe_trades<'py>(
524 &self,
525 py: Python<'py>,
526 instrument_id: InstrumentId,
527 ) -> PyResult<Bound<'py, PyAny>> {
528 let client = self.clone();
529
530 pyo3_async_runtimes::tokio::future_into_py(py, async move {
531 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
532 log::error!("Failed to unsubscribe from trades: {e}");
533 }
534 Ok(())
535 })
536 }
537
538 #[pyo3(name = "unsubscribe_mark_prices")]
539 fn py_unsubscribe_mark_prices<'py>(
540 &self,
541 py: Python<'py>,
542 instrument_id: InstrumentId,
543 ) -> PyResult<Bound<'py, PyAny>> {
544 let client = self.clone();
545
546 pyo3_async_runtimes::tokio::future_into_py(py, async move {
547 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
548 log::error!("Failed to unsubscribe from mark prices: {e}");
549 }
550 Ok(())
551 })
552 }
553
554 #[pyo3(name = "unsubscribe_index_prices")]
555 fn py_unsubscribe_index_prices<'py>(
556 &self,
557 py: Python<'py>,
558 instrument_id: InstrumentId,
559 ) -> PyResult<Bound<'py, PyAny>> {
560 let client = self.clone();
561
562 pyo3_async_runtimes::tokio::future_into_py(py, async move {
563 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
564 log::error!("Failed to unsubscribe from index prices: {e}");
565 }
566 Ok(())
567 })
568 }
569
570 #[pyo3(name = "unsubscribe_funding_rates")]
571 fn py_unsubscribe_funding_rates<'py>(
572 &self,
573 py: Python<'py>,
574 instrument_id: InstrumentId,
575 ) -> PyResult<Bound<'py, PyAny>> {
576 let client = self.clone();
577 pyo3_async_runtimes::tokio::future_into_py(py, async move {
578 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
579 log::error!("Failed to unsubscribe from funding rates: {e}");
580 }
581 Ok(())
582 })
583 }
584
585 #[pyo3(name = "unsubscribe_bars")]
586 fn py_unsubscribe_bars<'py>(
587 &self,
588 py: Python<'py>,
589 bar_type: BarType,
590 ) -> PyResult<Bound<'py, PyAny>> {
591 let client = self.clone();
592
593 pyo3_async_runtimes::tokio::future_into_py(py, async move {
594 if let Err(e) = client.unsubscribe_bars(bar_type).await {
595 log::error!("Failed to unsubscribe from bars: {e}");
596 }
597 Ok(())
598 })
599 }
600
601 #[pyo3(name = "subscribe_orders")]
602 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
603 let client = self.clone();
604
605 pyo3_async_runtimes::tokio::future_into_py(py, async move {
606 if let Err(e) = client.subscribe_orders().await {
607 log::error!("Failed to subscribe to orders: {e}");
608 }
609 Ok(())
610 })
611 }
612
613 #[pyo3(name = "subscribe_executions")]
614 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
615 let client = self.clone();
616
617 pyo3_async_runtimes::tokio::future_into_py(py, async move {
618 if let Err(e) = client.subscribe_executions().await {
619 log::error!("Failed to subscribe to executions: {e}");
620 }
621 Ok(())
622 })
623 }
624
625 #[pyo3(name = "subscribe_positions")]
626 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
627 let client = self.clone();
628
629 pyo3_async_runtimes::tokio::future_into_py(py, async move {
630 if let Err(e) = client.subscribe_positions().await {
631 log::error!("Failed to subscribe to positions: {e}");
632 }
633 Ok(())
634 })
635 }
636
637 #[pyo3(name = "subscribe_margin")]
638 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
639 let client = self.clone();
640
641 pyo3_async_runtimes::tokio::future_into_py(py, async move {
642 if let Err(e) = client.subscribe_margin().await {
643 log::error!("Failed to subscribe to margin: {e}");
644 }
645 Ok(())
646 })
647 }
648
649 #[pyo3(name = "subscribe_wallet")]
650 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
651 let client = self.clone();
652
653 pyo3_async_runtimes::tokio::future_into_py(py, async move {
654 if let Err(e) = client.subscribe_wallet().await {
655 log::error!("Failed to subscribe to wallet: {e}");
656 }
657 Ok(())
658 })
659 }
660
661 #[pyo3(name = "unsubscribe_orders")]
662 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
663 let client = self.clone();
664
665 pyo3_async_runtimes::tokio::future_into_py(py, async move {
666 if let Err(e) = client.unsubscribe_orders().await {
667 log::error!("Failed to unsubscribe from orders: {e}");
668 }
669 Ok(())
670 })
671 }
672
673 #[pyo3(name = "unsubscribe_executions")]
674 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
675 let client = self.clone();
676
677 pyo3_async_runtimes::tokio::future_into_py(py, async move {
678 if let Err(e) = client.unsubscribe_executions().await {
679 log::error!("Failed to unsubscribe from executions: {e}");
680 }
681 Ok(())
682 })
683 }
684
685 #[pyo3(name = "unsubscribe_positions")]
686 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
687 let client = self.clone();
688
689 pyo3_async_runtimes::tokio::future_into_py(py, async move {
690 if let Err(e) = client.unsubscribe_positions().await {
691 log::error!("Failed to unsubscribe from positions: {e}");
692 }
693 Ok(())
694 })
695 }
696
697 #[pyo3(name = "unsubscribe_margin")]
698 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
699 let client = self.clone();
700
701 pyo3_async_runtimes::tokio::future_into_py(py, async move {
702 if let Err(e) = client.unsubscribe_margin().await {
703 log::error!("Failed to unsubscribe from margin: {e}");
704 }
705 Ok(())
706 })
707 }
708
709 #[pyo3(name = "unsubscribe_wallet")]
710 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
711 let client = self.clone();
712
713 pyo3_async_runtimes::tokio::future_into_py(py, async move {
714 if let Err(e) = client.unsubscribe_wallet().await {
715 log::error!("Failed to unsubscribe from wallet: {e}");
716 }
717 Ok(())
718 })
719 }
720}