1use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47 data::bar::BarType,
48 identifiers::{AccountId, InstrumentId},
49 python::{
50 data::data_to_pycapsule,
51 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
52 },
53};
54use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
55
56use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
57
58#[pymethods]
59impl BitmexWebSocketClient {
60 #[new]
61 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
62 fn py_new(
63 url: Option<String>,
64 api_key: Option<String>,
65 api_secret: Option<String>,
66 account_id: Option<AccountId>,
67 heartbeat: Option<u64>,
68 testnet: bool,
69 ) -> PyResult<Self> {
70 Self::new_with_env(url, api_key, api_secret, account_id, heartbeat, testnet)
71 .map_err(to_pyvalue_err)
72 }
73
74 #[staticmethod]
75 #[pyo3(name = "from_env")]
76 fn py_from_env() -> PyResult<Self> {
77 Self::from_env().map_err(to_pyvalue_err)
78 }
79
80 #[getter]
81 #[pyo3(name = "url")]
82 #[must_use]
83 pub const fn py_url(&self) -> &str {
84 self.url()
85 }
86
87 #[getter]
88 #[pyo3(name = "api_key")]
89 #[must_use]
90 pub fn py_api_key(&self) -> Option<&str> {
91 self.api_key()
92 }
93
94 #[getter]
95 #[pyo3(name = "api_key_masked")]
96 #[must_use]
97 pub fn py_api_key_masked(&self) -> Option<String> {
98 self.api_key_masked()
99 }
100
101 #[pyo3(name = "is_active")]
102 fn py_is_active(&mut self) -> bool {
103 self.is_active()
104 }
105
106 #[pyo3(name = "is_closed")]
107 fn py_is_closed(&mut self) -> bool {
108 self.is_closed()
109 }
110
111 #[pyo3(name = "get_subscriptions")]
112 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
113 self.get_subscriptions(instrument_id)
114 }
115
116 #[pyo3(name = "set_account_id")]
117 pub fn py_set_account_id(&mut self, account_id: AccountId) {
118 self.set_account_id(account_id);
119 }
120
121 #[pyo3(name = "cache_instrument")]
122 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
123 let inst_any = pyobject_to_instrument_any(py, instrument)?;
124 self.cache_instrument(inst_any);
125 Ok(())
126 }
127
128 #[pyo3(name = "connect")]
129 fn py_connect<'py>(
130 &mut self,
131 py: Python<'py>,
132 instruments: Vec<Py<PyAny>>,
133 callback: Py<PyAny>,
134 ) -> PyResult<Bound<'py, PyAny>> {
135 let mut instruments_any = Vec::new();
136 for inst in instruments {
137 let inst_any = pyobject_to_instrument_any(py, inst)?;
138 instruments_any.push(inst_any);
139 }
140
141 self.cache_instruments(instruments_any);
142
143 let mut client = self.clone();
146
147 pyo3_async_runtimes::tokio::future_into_py(py, async move {
148 client.connect().await.map_err(to_pyruntime_err)?;
149
150 let stream = client.stream();
151
152 tokio::spawn(async move {
153 let _client = client; tokio::pin!(stream);
155
156 while let Some(msg) = stream.next().await {
157 Python::attach(|py| match msg {
158 NautilusWsMessage::Data(data_vec) => {
159 for data in data_vec {
160 let py_obj = data_to_pycapsule(py, data);
161 call_python(py, &callback, py_obj);
162 }
163 }
164 NautilusWsMessage::Instruments(instruments) => {
165 for instrument in instruments {
166 if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
167 call_python(py, &callback, py_obj);
168 }
169 }
170 }
171 NautilusWsMessage::OrderStatusReports(reports) => {
172 for report in reports {
173 if let Ok(py_obj) = report.into_py_any(py) {
174 call_python(py, &callback, py_obj);
175 }
176 }
177 }
178 NautilusWsMessage::FillReports(reports) => {
179 for report in reports {
180 if let Ok(py_obj) = report.into_py_any(py) {
181 call_python(py, &callback, py_obj);
182 }
183 }
184 }
185 NautilusWsMessage::PositionStatusReport(report) => {
186 if let Ok(py_obj) = report.into_py_any(py) {
187 call_python(py, &callback, py_obj);
188 }
189 }
190 NautilusWsMessage::FundingRateUpdates(updates) => {
191 for update in updates {
192 if let Ok(py_obj) = update.into_py_any(py) {
193 call_python(py, &callback, py_obj);
194 }
195 }
196 }
197 NautilusWsMessage::AccountState(account_state) => {
198 if let Ok(py_obj) = account_state.into_py_any(py) {
199 call_python(py, &callback, py_obj);
200 }
201 }
202 NautilusWsMessage::OrderUpdated(event) => {
203 if let Ok(py_obj) = event.into_py_any(py) {
204 call_python(py, &callback, py_obj);
205 }
206 }
207 NautilusWsMessage::Reconnected => {}
208 NautilusWsMessage::Authenticated => {}
209 });
210 }
211 });
212
213 Ok(())
214 })
215 }
216
217 #[pyo3(name = "wait_until_active")]
218 fn py_wait_until_active<'py>(
219 &self,
220 py: Python<'py>,
221 timeout_secs: f64,
222 ) -> PyResult<Bound<'py, PyAny>> {
223 let client = self.clone();
224
225 pyo3_async_runtimes::tokio::future_into_py(py, async move {
226 client
227 .wait_until_active(timeout_secs)
228 .await
229 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
230 Ok(())
231 })
232 }
233
234 #[pyo3(name = "close")]
235 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
236 let mut client = self.clone();
237
238 pyo3_async_runtimes::tokio::future_into_py(py, async move {
239 if let Err(e) = client.close().await {
240 log::error!("Error on close: {e}");
241 }
242 Ok(())
243 })
244 }
245
246 #[pyo3(name = "subscribe_instruments")]
247 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
248 let client = self.clone();
249
250 pyo3_async_runtimes::tokio::future_into_py(py, async move {
251 if let Err(e) = client.subscribe_instruments().await {
252 log::error!("Failed to subscribe to instruments: {e}");
253 }
254 Ok(())
255 })
256 }
257
258 #[pyo3(name = "subscribe_instrument")]
259 fn py_subscribe_instrument<'py>(
260 &self,
261 py: Python<'py>,
262 instrument_id: InstrumentId,
263 ) -> PyResult<Bound<'py, PyAny>> {
264 let client = self.clone();
265
266 pyo3_async_runtimes::tokio::future_into_py(py, async move {
267 if let Err(e) = client.subscribe_instrument(instrument_id).await {
268 log::error!("Failed to subscribe to instrument: {e}");
269 }
270 Ok(())
271 })
272 }
273
274 #[pyo3(name = "subscribe_book")]
275 fn py_subscribe_book<'py>(
276 &self,
277 py: Python<'py>,
278 instrument_id: InstrumentId,
279 ) -> PyResult<Bound<'py, PyAny>> {
280 let client = self.clone();
281
282 pyo3_async_runtimes::tokio::future_into_py(py, async move {
283 if let Err(e) = client.subscribe_book(instrument_id).await {
284 log::error!("Failed to subscribe to order book: {e}");
285 }
286 Ok(())
287 })
288 }
289
290 #[pyo3(name = "subscribe_book_25")]
291 fn py_subscribe_book_25<'py>(
292 &self,
293 py: Python<'py>,
294 instrument_id: InstrumentId,
295 ) -> PyResult<Bound<'py, PyAny>> {
296 let client = self.clone();
297
298 pyo3_async_runtimes::tokio::future_into_py(py, async move {
299 if let Err(e) = client.subscribe_book_25(instrument_id).await {
300 log::error!("Failed to subscribe to order book 25: {e}");
301 }
302 Ok(())
303 })
304 }
305
306 #[pyo3(name = "subscribe_book_depth10")]
307 fn py_subscribe_book_depth10<'py>(
308 &self,
309 py: Python<'py>,
310 instrument_id: InstrumentId,
311 ) -> PyResult<Bound<'py, PyAny>> {
312 let client = self.clone();
313
314 pyo3_async_runtimes::tokio::future_into_py(py, async move {
315 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
316 log::error!("Failed to subscribe to order book depth 10: {e}");
317 }
318 Ok(())
319 })
320 }
321
322 #[pyo3(name = "subscribe_quotes")]
323 fn py_subscribe_quotes<'py>(
324 &self,
325 py: Python<'py>,
326 instrument_id: InstrumentId,
327 ) -> PyResult<Bound<'py, PyAny>> {
328 let client = self.clone();
329
330 pyo3_async_runtimes::tokio::future_into_py(py, async move {
331 if let Err(e) = client.subscribe_quotes(instrument_id).await {
332 log::error!("Failed to subscribe to quotes: {e}");
333 }
334 Ok(())
335 })
336 }
337
338 #[pyo3(name = "subscribe_trades")]
339 fn py_subscribe_trades<'py>(
340 &self,
341 py: Python<'py>,
342 instrument_id: InstrumentId,
343 ) -> PyResult<Bound<'py, PyAny>> {
344 let client = self.clone();
345
346 pyo3_async_runtimes::tokio::future_into_py(py, async move {
347 if let Err(e) = client.subscribe_trades(instrument_id).await {
348 log::error!("Failed to subscribe to trades: {e}");
349 }
350 Ok(())
351 })
352 }
353
354 #[pyo3(name = "subscribe_mark_prices")]
355 fn py_subscribe_mark_prices<'py>(
356 &self,
357 py: Python<'py>,
358 instrument_id: InstrumentId,
359 ) -> PyResult<Bound<'py, PyAny>> {
360 let client = self.clone();
361
362 pyo3_async_runtimes::tokio::future_into_py(py, async move {
363 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
364 log::error!("Failed to subscribe to mark prices: {e}");
365 }
366 Ok(())
367 })
368 }
369
370 #[pyo3(name = "subscribe_index_prices")]
371 fn py_subscribe_index_prices<'py>(
372 &self,
373 py: Python<'py>,
374 instrument_id: InstrumentId,
375 ) -> PyResult<Bound<'py, PyAny>> {
376 let client = self.clone();
377
378 pyo3_async_runtimes::tokio::future_into_py(py, async move {
379 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
380 log::error!("Failed to subscribe to index prices: {e}");
381 }
382 Ok(())
383 })
384 }
385
386 #[pyo3(name = "subscribe_funding_rates")]
387 fn py_subscribe_funding_rates<'py>(
388 &self,
389 py: Python<'py>,
390 instrument_id: InstrumentId,
391 ) -> PyResult<Bound<'py, PyAny>> {
392 let client = self.clone();
393
394 pyo3_async_runtimes::tokio::future_into_py(py, async move {
395 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
396 log::error!("Failed to subscribe to funding: {e}");
397 }
398 Ok(())
399 })
400 }
401
402 #[pyo3(name = "subscribe_bars")]
403 fn py_subscribe_bars<'py>(
404 &self,
405 py: Python<'py>,
406 bar_type: BarType,
407 ) -> PyResult<Bound<'py, PyAny>> {
408 let client = self.clone();
409
410 pyo3_async_runtimes::tokio::future_into_py(py, async move {
411 if let Err(e) = client.subscribe_bars(bar_type).await {
412 log::error!("Failed to subscribe to bars: {e}");
413 }
414 Ok(())
415 })
416 }
417
418 #[pyo3(name = "unsubscribe_instruments")]
419 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
420 let client = self.clone();
421
422 pyo3_async_runtimes::tokio::future_into_py(py, async move {
423 if let Err(e) = client.unsubscribe_instruments().await {
424 log::error!("Failed to unsubscribe from instruments: {e}");
425 }
426 Ok(())
427 })
428 }
429
430 #[pyo3(name = "unsubscribe_instrument")]
431 fn py_unsubscribe_instrument<'py>(
432 &self,
433 py: Python<'py>,
434 instrument_id: InstrumentId,
435 ) -> PyResult<Bound<'py, PyAny>> {
436 let client = self.clone();
437
438 pyo3_async_runtimes::tokio::future_into_py(py, async move {
439 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
440 log::error!("Failed to unsubscribe from instrument: {e}");
441 }
442 Ok(())
443 })
444 }
445
446 #[pyo3(name = "unsubscribe_book")]
447 fn py_unsubscribe_book<'py>(
448 &self,
449 py: Python<'py>,
450 instrument_id: InstrumentId,
451 ) -> PyResult<Bound<'py, PyAny>> {
452 let client = self.clone();
453
454 pyo3_async_runtimes::tokio::future_into_py(py, async move {
455 if let Err(e) = client.unsubscribe_book(instrument_id).await {
456 log::error!("Failed to unsubscribe from order book: {e}");
457 }
458 Ok(())
459 })
460 }
461
462 #[pyo3(name = "unsubscribe_book_25")]
463 fn py_unsubscribe_book_25<'py>(
464 &self,
465 py: Python<'py>,
466 instrument_id: InstrumentId,
467 ) -> PyResult<Bound<'py, PyAny>> {
468 let client = self.clone();
469
470 pyo3_async_runtimes::tokio::future_into_py(py, async move {
471 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
472 log::error!("Failed to unsubscribe from order book 25: {e}");
473 }
474 Ok(())
475 })
476 }
477
478 #[pyo3(name = "unsubscribe_book_depth10")]
479 fn py_unsubscribe_book_depth10<'py>(
480 &self,
481 py: Python<'py>,
482 instrument_id: InstrumentId,
483 ) -> PyResult<Bound<'py, PyAny>> {
484 let client = self.clone();
485
486 pyo3_async_runtimes::tokio::future_into_py(py, async move {
487 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
488 log::error!("Failed to unsubscribe from order book depth 10: {e}");
489 }
490 Ok(())
491 })
492 }
493
494 #[pyo3(name = "unsubscribe_quotes")]
495 fn py_unsubscribe_quotes<'py>(
496 &self,
497 py: Python<'py>,
498 instrument_id: InstrumentId,
499 ) -> PyResult<Bound<'py, PyAny>> {
500 let client = self.clone();
501
502 pyo3_async_runtimes::tokio::future_into_py(py, async move {
503 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
504 log::error!("Failed to unsubscribe from quotes: {e}");
505 }
506 Ok(())
507 })
508 }
509
510 #[pyo3(name = "unsubscribe_trades")]
511 fn py_unsubscribe_trades<'py>(
512 &self,
513 py: Python<'py>,
514 instrument_id: InstrumentId,
515 ) -> PyResult<Bound<'py, PyAny>> {
516 let client = self.clone();
517
518 pyo3_async_runtimes::tokio::future_into_py(py, async move {
519 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
520 log::error!("Failed to unsubscribe from trades: {e}");
521 }
522 Ok(())
523 })
524 }
525
526 #[pyo3(name = "unsubscribe_mark_prices")]
527 fn py_unsubscribe_mark_prices<'py>(
528 &self,
529 py: Python<'py>,
530 instrument_id: InstrumentId,
531 ) -> PyResult<Bound<'py, PyAny>> {
532 let client = self.clone();
533
534 pyo3_async_runtimes::tokio::future_into_py(py, async move {
535 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
536 log::error!("Failed to unsubscribe from mark prices: {e}");
537 }
538 Ok(())
539 })
540 }
541
542 #[pyo3(name = "unsubscribe_index_prices")]
543 fn py_unsubscribe_index_prices<'py>(
544 &self,
545 py: Python<'py>,
546 instrument_id: InstrumentId,
547 ) -> PyResult<Bound<'py, PyAny>> {
548 let client = self.clone();
549
550 pyo3_async_runtimes::tokio::future_into_py(py, async move {
551 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
552 log::error!("Failed to unsubscribe from index prices: {e}");
553 }
554 Ok(())
555 })
556 }
557
558 #[pyo3(name = "unsubscribe_funding_rates")]
559 fn py_unsubscribe_funding_rates<'py>(
560 &self,
561 py: Python<'py>,
562 instrument_id: InstrumentId,
563 ) -> PyResult<Bound<'py, PyAny>> {
564 let client = self.clone();
565 pyo3_async_runtimes::tokio::future_into_py(py, async move {
566 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
567 log::error!("Failed to unsubscribe from funding rates: {e}");
568 }
569 Ok(())
570 })
571 }
572
573 #[pyo3(name = "unsubscribe_bars")]
574 fn py_unsubscribe_bars<'py>(
575 &self,
576 py: Python<'py>,
577 bar_type: BarType,
578 ) -> PyResult<Bound<'py, PyAny>> {
579 let client = self.clone();
580
581 pyo3_async_runtimes::tokio::future_into_py(py, async move {
582 if let Err(e) = client.unsubscribe_bars(bar_type).await {
583 log::error!("Failed to unsubscribe from bars: {e}");
584 }
585 Ok(())
586 })
587 }
588
589 #[pyo3(name = "subscribe_orders")]
590 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
591 let client = self.clone();
592
593 pyo3_async_runtimes::tokio::future_into_py(py, async move {
594 if let Err(e) = client.subscribe_orders().await {
595 log::error!("Failed to subscribe to orders: {e}");
596 }
597 Ok(())
598 })
599 }
600
601 #[pyo3(name = "subscribe_executions")]
602 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
603 let client = self.clone();
604
605 pyo3_async_runtimes::tokio::future_into_py(py, async move {
606 if let Err(e) = client.subscribe_executions().await {
607 log::error!("Failed to subscribe to executions: {e}");
608 }
609 Ok(())
610 })
611 }
612
613 #[pyo3(name = "subscribe_positions")]
614 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
615 let client = self.clone();
616
617 pyo3_async_runtimes::tokio::future_into_py(py, async move {
618 if let Err(e) = client.subscribe_positions().await {
619 log::error!("Failed to subscribe to positions: {e}");
620 }
621 Ok(())
622 })
623 }
624
625 #[pyo3(name = "subscribe_margin")]
626 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
627 let client = self.clone();
628
629 pyo3_async_runtimes::tokio::future_into_py(py, async move {
630 if let Err(e) = client.subscribe_margin().await {
631 log::error!("Failed to subscribe to margin: {e}");
632 }
633 Ok(())
634 })
635 }
636
637 #[pyo3(name = "subscribe_wallet")]
638 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
639 let client = self.clone();
640
641 pyo3_async_runtimes::tokio::future_into_py(py, async move {
642 if let Err(e) = client.subscribe_wallet().await {
643 log::error!("Failed to subscribe to wallet: {e}");
644 }
645 Ok(())
646 })
647 }
648
649 #[pyo3(name = "unsubscribe_orders")]
650 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
651 let client = self.clone();
652
653 pyo3_async_runtimes::tokio::future_into_py(py, async move {
654 if let Err(e) = client.unsubscribe_orders().await {
655 log::error!("Failed to unsubscribe from orders: {e}");
656 }
657 Ok(())
658 })
659 }
660
661 #[pyo3(name = "unsubscribe_executions")]
662 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
663 let client = self.clone();
664
665 pyo3_async_runtimes::tokio::future_into_py(py, async move {
666 if let Err(e) = client.unsubscribe_executions().await {
667 log::error!("Failed to unsubscribe from executions: {e}");
668 }
669 Ok(())
670 })
671 }
672
673 #[pyo3(name = "unsubscribe_positions")]
674 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
675 let client = self.clone();
676
677 pyo3_async_runtimes::tokio::future_into_py(py, async move {
678 if let Err(e) = client.unsubscribe_positions().await {
679 log::error!("Failed to unsubscribe from positions: {e}");
680 }
681 Ok(())
682 })
683 }
684
685 #[pyo3(name = "unsubscribe_margin")]
686 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
687 let client = self.clone();
688
689 pyo3_async_runtimes::tokio::future_into_py(py, async move {
690 if let Err(e) = client.unsubscribe_margin().await {
691 log::error!("Failed to unsubscribe from margin: {e}");
692 }
693 Ok(())
694 })
695 }
696
697 #[pyo3(name = "unsubscribe_wallet")]
698 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
699 let client = self.clone();
700
701 pyo3_async_runtimes::tokio::future_into_py(py, async move {
702 if let Err(e) = client.unsubscribe_wallet().await {
703 log::error!("Failed to unsubscribe from wallet: {e}");
704 }
705 Ok(())
706 })
707 }
708}
709
710pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
711 if let Err(e) = callback.call1(py, (py_obj,)) {
712 tracing::error!("Error calling Python: {e}");
713 }
714}