1use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47 data::bar::BarType,
48 identifiers::{AccountId, InstrumentId},
49 python::{
50 data::data_to_pycapsule,
51 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
52 },
53};
54use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
55
56use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
57
58#[pymethods]
59impl BitmexWebSocketClient {
60 #[new]
61 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
62 fn py_new(
63 url: Option<String>,
64 api_key: Option<String>,
65 api_secret: Option<String>,
66 account_id: Option<AccountId>,
67 heartbeat: Option<u64>,
68 testnet: bool,
69 ) -> PyResult<Self> {
70 let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
72 let (key_var, secret_var) = if testnet {
74 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
75 } else {
76 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
77 };
78
79 let env_key = std::env::var(key_var).ok();
80 let env_secret = std::env::var(secret_var).ok();
81 (env_key, env_secret)
82 } else {
83 (api_key, api_secret)
84 };
85
86 Self::new(url, final_api_key, final_api_secret, account_id, heartbeat)
87 .map_err(to_pyvalue_err)
88 }
89
90 #[staticmethod]
91 #[pyo3(name = "from_env")]
92 fn py_from_env() -> PyResult<Self> {
93 Self::from_env().map_err(to_pyvalue_err)
94 }
95
96 #[getter]
97 #[pyo3(name = "url")]
98 #[must_use]
99 pub const fn py_url(&self) -> &str {
100 self.url()
101 }
102
103 #[getter]
104 #[pyo3(name = "api_key")]
105 #[must_use]
106 pub fn py_api_key(&self) -> Option<&str> {
107 self.api_key()
108 }
109
110 #[getter]
111 #[pyo3(name = "api_key_masked")]
112 #[must_use]
113 pub fn py_api_key_masked(&self) -> Option<String> {
114 self.api_key_masked()
115 }
116
117 #[pyo3(name = "is_active")]
118 fn py_is_active(&mut self) -> bool {
119 self.is_active()
120 }
121
122 #[pyo3(name = "is_closed")]
123 fn py_is_closed(&mut self) -> bool {
124 self.is_closed()
125 }
126
127 #[pyo3(name = "get_subscriptions")]
128 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
129 self.get_subscriptions(instrument_id)
130 }
131
132 #[pyo3(name = "set_account_id")]
133 pub fn py_set_account_id(&mut self, account_id: AccountId) {
134 self.set_account_id(account_id);
135 }
136
137 #[pyo3(name = "cache_instrument")]
138 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
139 let inst_any = pyobject_to_instrument_any(py, instrument)?;
140 self.cache_instrument(inst_any);
141 Ok(())
142 }
143
144 #[pyo3(name = "connect")]
145 fn py_connect<'py>(
146 &mut self,
147 py: Python<'py>,
148 instruments: Vec<Py<PyAny>>,
149 callback: Py<PyAny>,
150 ) -> PyResult<Bound<'py, PyAny>> {
151 let mut instruments_any = Vec::new();
152 for inst in instruments {
153 let inst_any = pyobject_to_instrument_any(py, inst)?;
154 instruments_any.push(inst_any);
155 }
156
157 self.cache_instruments(instruments_any);
158
159 let mut client = self.clone();
162
163 pyo3_async_runtimes::tokio::future_into_py(py, async move {
164 client.connect().await.map_err(to_pyruntime_err)?;
165
166 let stream = client.stream();
167
168 tokio::spawn(async move {
169 let _client = client; tokio::pin!(stream);
171
172 while let Some(msg) = stream.next().await {
173 Python::attach(|py| match msg {
174 NautilusWsMessage::Data(data_vec) => {
175 for data in data_vec {
176 let py_obj = data_to_pycapsule(py, data);
177 call_python(py, &callback, py_obj);
178 }
179 }
180 NautilusWsMessage::Instruments(instruments) => {
181 for instrument in instruments {
182 if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
183 call_python(py, &callback, py_obj);
184 }
185 }
186 }
187 NautilusWsMessage::OrderStatusReports(reports) => {
188 for report in reports {
189 if let Ok(py_obj) = report.into_py_any(py) {
190 call_python(py, &callback, py_obj);
191 }
192 }
193 }
194 NautilusWsMessage::FillReports(reports) => {
195 for report in reports {
196 if let Ok(py_obj) = report.into_py_any(py) {
197 call_python(py, &callback, py_obj);
198 }
199 }
200 }
201 NautilusWsMessage::PositionStatusReport(report) => {
202 if let Ok(py_obj) = report.into_py_any(py) {
203 call_python(py, &callback, py_obj);
204 }
205 }
206 NautilusWsMessage::FundingRateUpdates(updates) => {
207 for update in updates {
208 if let Ok(py_obj) = update.into_py_any(py) {
209 call_python(py, &callback, py_obj);
210 }
211 }
212 }
213 NautilusWsMessage::AccountState(account_state) => {
214 if let Ok(py_obj) = account_state.into_py_any(py) {
215 call_python(py, &callback, py_obj);
216 }
217 }
218 NautilusWsMessage::OrderUpdated(event) => {
219 if let Ok(py_obj) = event.into_py_any(py) {
220 call_python(py, &callback, py_obj);
221 }
222 }
223 NautilusWsMessage::Reconnected => {}
224 NautilusWsMessage::Authenticated => {}
225 });
226 }
227 });
228
229 Ok(())
230 })
231 }
232
233 #[pyo3(name = "wait_until_active")]
234 fn py_wait_until_active<'py>(
235 &self,
236 py: Python<'py>,
237 timeout_secs: f64,
238 ) -> PyResult<Bound<'py, PyAny>> {
239 let client = self.clone();
240
241 pyo3_async_runtimes::tokio::future_into_py(py, async move {
242 client
243 .wait_until_active(timeout_secs)
244 .await
245 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
246 Ok(())
247 })
248 }
249
250 #[pyo3(name = "close")]
251 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
252 let mut client = self.clone();
253
254 pyo3_async_runtimes::tokio::future_into_py(py, async move {
255 if let Err(e) = client.close().await {
256 log::error!("Error on close: {e}");
257 }
258 Ok(())
259 })
260 }
261
262 #[pyo3(name = "subscribe_instruments")]
263 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
264 let client = self.clone();
265
266 pyo3_async_runtimes::tokio::future_into_py(py, async move {
267 if let Err(e) = client.subscribe_instruments().await {
268 log::error!("Failed to subscribe to instruments: {e}");
269 }
270 Ok(())
271 })
272 }
273
274 #[pyo3(name = "subscribe_instrument")]
275 fn py_subscribe_instrument<'py>(
276 &self,
277 py: Python<'py>,
278 instrument_id: InstrumentId,
279 ) -> PyResult<Bound<'py, PyAny>> {
280 let client = self.clone();
281
282 pyo3_async_runtimes::tokio::future_into_py(py, async move {
283 if let Err(e) = client.subscribe_instrument(instrument_id).await {
284 log::error!("Failed to subscribe to instrument: {e}");
285 }
286 Ok(())
287 })
288 }
289
290 #[pyo3(name = "subscribe_book")]
291 fn py_subscribe_book<'py>(
292 &self,
293 py: Python<'py>,
294 instrument_id: InstrumentId,
295 ) -> PyResult<Bound<'py, PyAny>> {
296 let client = self.clone();
297
298 pyo3_async_runtimes::tokio::future_into_py(py, async move {
299 if let Err(e) = client.subscribe_book(instrument_id).await {
300 log::error!("Failed to subscribe to order book: {e}");
301 }
302 Ok(())
303 })
304 }
305
306 #[pyo3(name = "subscribe_book_25")]
307 fn py_subscribe_book_25<'py>(
308 &self,
309 py: Python<'py>,
310 instrument_id: InstrumentId,
311 ) -> PyResult<Bound<'py, PyAny>> {
312 let client = self.clone();
313
314 pyo3_async_runtimes::tokio::future_into_py(py, async move {
315 if let Err(e) = client.subscribe_book_25(instrument_id).await {
316 log::error!("Failed to subscribe to order book 25: {e}");
317 }
318 Ok(())
319 })
320 }
321
322 #[pyo3(name = "subscribe_book_depth10")]
323 fn py_subscribe_book_depth10<'py>(
324 &self,
325 py: Python<'py>,
326 instrument_id: InstrumentId,
327 ) -> PyResult<Bound<'py, PyAny>> {
328 let client = self.clone();
329
330 pyo3_async_runtimes::tokio::future_into_py(py, async move {
331 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
332 log::error!("Failed to subscribe to order book depth 10: {e}");
333 }
334 Ok(())
335 })
336 }
337
338 #[pyo3(name = "subscribe_quotes")]
339 fn py_subscribe_quotes<'py>(
340 &self,
341 py: Python<'py>,
342 instrument_id: InstrumentId,
343 ) -> PyResult<Bound<'py, PyAny>> {
344 let client = self.clone();
345
346 pyo3_async_runtimes::tokio::future_into_py(py, async move {
347 if let Err(e) = client.subscribe_quotes(instrument_id).await {
348 log::error!("Failed to subscribe to quotes: {e}");
349 }
350 Ok(())
351 })
352 }
353
354 #[pyo3(name = "subscribe_trades")]
355 fn py_subscribe_trades<'py>(
356 &self,
357 py: Python<'py>,
358 instrument_id: InstrumentId,
359 ) -> PyResult<Bound<'py, PyAny>> {
360 let client = self.clone();
361
362 pyo3_async_runtimes::tokio::future_into_py(py, async move {
363 if let Err(e) = client.subscribe_trades(instrument_id).await {
364 log::error!("Failed to subscribe to trades: {e}");
365 }
366 Ok(())
367 })
368 }
369
370 #[pyo3(name = "subscribe_mark_prices")]
371 fn py_subscribe_mark_prices<'py>(
372 &self,
373 py: Python<'py>,
374 instrument_id: InstrumentId,
375 ) -> PyResult<Bound<'py, PyAny>> {
376 let client = self.clone();
377
378 pyo3_async_runtimes::tokio::future_into_py(py, async move {
379 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
380 log::error!("Failed to subscribe to mark prices: {e}");
381 }
382 Ok(())
383 })
384 }
385
386 #[pyo3(name = "subscribe_index_prices")]
387 fn py_subscribe_index_prices<'py>(
388 &self,
389 py: Python<'py>,
390 instrument_id: InstrumentId,
391 ) -> PyResult<Bound<'py, PyAny>> {
392 let client = self.clone();
393
394 pyo3_async_runtimes::tokio::future_into_py(py, async move {
395 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
396 log::error!("Failed to subscribe to index prices: {e}");
397 }
398 Ok(())
399 })
400 }
401
402 #[pyo3(name = "subscribe_funding_rates")]
403 fn py_subscribe_funding_rates<'py>(
404 &self,
405 py: Python<'py>,
406 instrument_id: InstrumentId,
407 ) -> PyResult<Bound<'py, PyAny>> {
408 let client = self.clone();
409
410 pyo3_async_runtimes::tokio::future_into_py(py, async move {
411 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
412 log::error!("Failed to subscribe to funding: {e}");
413 }
414 Ok(())
415 })
416 }
417
418 #[pyo3(name = "subscribe_bars")]
419 fn py_subscribe_bars<'py>(
420 &self,
421 py: Python<'py>,
422 bar_type: BarType,
423 ) -> PyResult<Bound<'py, PyAny>> {
424 let client = self.clone();
425
426 pyo3_async_runtimes::tokio::future_into_py(py, async move {
427 if let Err(e) = client.subscribe_bars(bar_type).await {
428 log::error!("Failed to subscribe to bars: {e}");
429 }
430 Ok(())
431 })
432 }
433
434 #[pyo3(name = "unsubscribe_instruments")]
435 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
436 let client = self.clone();
437
438 pyo3_async_runtimes::tokio::future_into_py(py, async move {
439 if let Err(e) = client.unsubscribe_instruments().await {
440 log::error!("Failed to unsubscribe from instruments: {e}");
441 }
442 Ok(())
443 })
444 }
445
446 #[pyo3(name = "unsubscribe_instrument")]
447 fn py_unsubscribe_instrument<'py>(
448 &self,
449 py: Python<'py>,
450 instrument_id: InstrumentId,
451 ) -> PyResult<Bound<'py, PyAny>> {
452 let client = self.clone();
453
454 pyo3_async_runtimes::tokio::future_into_py(py, async move {
455 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
456 log::error!("Failed to unsubscribe from instrument: {e}");
457 }
458 Ok(())
459 })
460 }
461
462 #[pyo3(name = "unsubscribe_book")]
463 fn py_unsubscribe_book<'py>(
464 &self,
465 py: Python<'py>,
466 instrument_id: InstrumentId,
467 ) -> PyResult<Bound<'py, PyAny>> {
468 let client = self.clone();
469
470 pyo3_async_runtimes::tokio::future_into_py(py, async move {
471 if let Err(e) = client.unsubscribe_book(instrument_id).await {
472 log::error!("Failed to unsubscribe from order book: {e}");
473 }
474 Ok(())
475 })
476 }
477
478 #[pyo3(name = "unsubscribe_book_25")]
479 fn py_unsubscribe_book_25<'py>(
480 &self,
481 py: Python<'py>,
482 instrument_id: InstrumentId,
483 ) -> PyResult<Bound<'py, PyAny>> {
484 let client = self.clone();
485
486 pyo3_async_runtimes::tokio::future_into_py(py, async move {
487 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
488 log::error!("Failed to unsubscribe from order book 25: {e}");
489 }
490 Ok(())
491 })
492 }
493
494 #[pyo3(name = "unsubscribe_book_depth10")]
495 fn py_unsubscribe_book_depth10<'py>(
496 &self,
497 py: Python<'py>,
498 instrument_id: InstrumentId,
499 ) -> PyResult<Bound<'py, PyAny>> {
500 let client = self.clone();
501
502 pyo3_async_runtimes::tokio::future_into_py(py, async move {
503 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
504 log::error!("Failed to unsubscribe from order book depth 10: {e}");
505 }
506 Ok(())
507 })
508 }
509
510 #[pyo3(name = "unsubscribe_quotes")]
511 fn py_unsubscribe_quotes<'py>(
512 &self,
513 py: Python<'py>,
514 instrument_id: InstrumentId,
515 ) -> PyResult<Bound<'py, PyAny>> {
516 let client = self.clone();
517
518 pyo3_async_runtimes::tokio::future_into_py(py, async move {
519 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
520 log::error!("Failed to unsubscribe from quotes: {e}");
521 }
522 Ok(())
523 })
524 }
525
526 #[pyo3(name = "unsubscribe_trades")]
527 fn py_unsubscribe_trades<'py>(
528 &self,
529 py: Python<'py>,
530 instrument_id: InstrumentId,
531 ) -> PyResult<Bound<'py, PyAny>> {
532 let client = self.clone();
533
534 pyo3_async_runtimes::tokio::future_into_py(py, async move {
535 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
536 log::error!("Failed to unsubscribe from trades: {e}");
537 }
538 Ok(())
539 })
540 }
541
542 #[pyo3(name = "unsubscribe_mark_prices")]
543 fn py_unsubscribe_mark_prices<'py>(
544 &self,
545 py: Python<'py>,
546 instrument_id: InstrumentId,
547 ) -> PyResult<Bound<'py, PyAny>> {
548 let client = self.clone();
549
550 pyo3_async_runtimes::tokio::future_into_py(py, async move {
551 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
552 log::error!("Failed to unsubscribe from mark prices: {e}");
553 }
554 Ok(())
555 })
556 }
557
558 #[pyo3(name = "unsubscribe_index_prices")]
559 fn py_unsubscribe_index_prices<'py>(
560 &self,
561 py: Python<'py>,
562 instrument_id: InstrumentId,
563 ) -> PyResult<Bound<'py, PyAny>> {
564 let client = self.clone();
565
566 pyo3_async_runtimes::tokio::future_into_py(py, async move {
567 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
568 log::error!("Failed to unsubscribe from index prices: {e}");
569 }
570 Ok(())
571 })
572 }
573
574 #[pyo3(name = "unsubscribe_funding_rates")]
575 fn py_unsubscribe_funding_rates<'py>(
576 &self,
577 py: Python<'py>,
578 instrument_id: InstrumentId,
579 ) -> PyResult<Bound<'py, PyAny>> {
580 let client = self.clone();
581 pyo3_async_runtimes::tokio::future_into_py(py, async move {
582 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
583 log::error!("Failed to unsubscribe from funding rates: {e}");
584 }
585 Ok(())
586 })
587 }
588
589 #[pyo3(name = "unsubscribe_bars")]
590 fn py_unsubscribe_bars<'py>(
591 &self,
592 py: Python<'py>,
593 bar_type: BarType,
594 ) -> PyResult<Bound<'py, PyAny>> {
595 let client = self.clone();
596
597 pyo3_async_runtimes::tokio::future_into_py(py, async move {
598 if let Err(e) = client.unsubscribe_bars(bar_type).await {
599 log::error!("Failed to unsubscribe from bars: {e}");
600 }
601 Ok(())
602 })
603 }
604
605 #[pyo3(name = "subscribe_orders")]
606 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
607 let client = self.clone();
608
609 pyo3_async_runtimes::tokio::future_into_py(py, async move {
610 if let Err(e) = client.subscribe_orders().await {
611 log::error!("Failed to subscribe to orders: {e}");
612 }
613 Ok(())
614 })
615 }
616
617 #[pyo3(name = "subscribe_executions")]
618 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
619 let client = self.clone();
620
621 pyo3_async_runtimes::tokio::future_into_py(py, async move {
622 if let Err(e) = client.subscribe_executions().await {
623 log::error!("Failed to subscribe to executions: {e}");
624 }
625 Ok(())
626 })
627 }
628
629 #[pyo3(name = "subscribe_positions")]
630 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
631 let client = self.clone();
632
633 pyo3_async_runtimes::tokio::future_into_py(py, async move {
634 if let Err(e) = client.subscribe_positions().await {
635 log::error!("Failed to subscribe to positions: {e}");
636 }
637 Ok(())
638 })
639 }
640
641 #[pyo3(name = "subscribe_margin")]
642 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
643 let client = self.clone();
644
645 pyo3_async_runtimes::tokio::future_into_py(py, async move {
646 if let Err(e) = client.subscribe_margin().await {
647 log::error!("Failed to subscribe to margin: {e}");
648 }
649 Ok(())
650 })
651 }
652
653 #[pyo3(name = "subscribe_wallet")]
654 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
655 let client = self.clone();
656
657 pyo3_async_runtimes::tokio::future_into_py(py, async move {
658 if let Err(e) = client.subscribe_wallet().await {
659 log::error!("Failed to subscribe to wallet: {e}");
660 }
661 Ok(())
662 })
663 }
664
665 #[pyo3(name = "unsubscribe_orders")]
666 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
667 let client = self.clone();
668
669 pyo3_async_runtimes::tokio::future_into_py(py, async move {
670 if let Err(e) = client.unsubscribe_orders().await {
671 log::error!("Failed to unsubscribe from orders: {e}");
672 }
673 Ok(())
674 })
675 }
676
677 #[pyo3(name = "unsubscribe_executions")]
678 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
679 let client = self.clone();
680
681 pyo3_async_runtimes::tokio::future_into_py(py, async move {
682 if let Err(e) = client.unsubscribe_executions().await {
683 log::error!("Failed to unsubscribe from executions: {e}");
684 }
685 Ok(())
686 })
687 }
688
689 #[pyo3(name = "unsubscribe_positions")]
690 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
691 let client = self.clone();
692
693 pyo3_async_runtimes::tokio::future_into_py(py, async move {
694 if let Err(e) = client.unsubscribe_positions().await {
695 log::error!("Failed to unsubscribe from positions: {e}");
696 }
697 Ok(())
698 })
699 }
700
701 #[pyo3(name = "unsubscribe_margin")]
702 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
703 let client = self.clone();
704
705 pyo3_async_runtimes::tokio::future_into_py(py, async move {
706 if let Err(e) = client.unsubscribe_margin().await {
707 log::error!("Failed to unsubscribe from margin: {e}");
708 }
709 Ok(())
710 })
711 }
712
713 #[pyo3(name = "unsubscribe_wallet")]
714 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
715 let client = self.clone();
716
717 pyo3_async_runtimes::tokio::future_into_py(py, async move {
718 if let Err(e) = client.unsubscribe_wallet().await {
719 log::error!("Failed to unsubscribe from wallet: {e}");
720 }
721 Ok(())
722 })
723 }
724}
725
726pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
727 if let Err(e) = callback.call1(py, (py_obj,)) {
728 tracing::error!("Error calling Python: {e}");
729 }
730}