1use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
47use nautilus_model::{
48 data::bar::BarType,
49 identifiers::{AccountId, InstrumentId},
50 python::{
51 data::data_to_pycapsule,
52 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53 },
54};
55use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
56
57use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl BitmexWebSocketClient {
61 #[new]
62 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
63 fn py_new(
64 url: Option<String>,
65 api_key: Option<String>,
66 api_secret: Option<String>,
67 account_id: Option<AccountId>,
68 heartbeat: Option<u64>,
69 testnet: bool,
70 ) -> PyResult<Self> {
71 Self::new_with_env(url, api_key, api_secret, account_id, heartbeat, testnet)
72 .map_err(to_pyvalue_err)
73 }
74
75 #[staticmethod]
76 #[pyo3(name = "from_env")]
77 fn py_from_env() -> PyResult<Self> {
78 Self::from_env().map_err(to_pyvalue_err)
79 }
80
81 #[getter]
82 #[pyo3(name = "url")]
83 #[must_use]
84 pub const fn py_url(&self) -> &str {
85 self.url()
86 }
87
88 #[getter]
89 #[pyo3(name = "api_key")]
90 #[must_use]
91 pub fn py_api_key(&self) -> Option<&str> {
92 self.api_key()
93 }
94
95 #[getter]
96 #[pyo3(name = "api_key_masked")]
97 #[must_use]
98 pub fn py_api_key_masked(&self) -> Option<String> {
99 self.api_key_masked()
100 }
101
102 #[pyo3(name = "is_active")]
103 fn py_is_active(&mut self) -> bool {
104 self.is_active()
105 }
106
107 #[pyo3(name = "is_closed")]
108 fn py_is_closed(&mut self) -> bool {
109 self.is_closed()
110 }
111
112 #[pyo3(name = "get_subscriptions")]
113 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
114 self.get_subscriptions(instrument_id)
115 }
116
117 #[pyo3(name = "set_account_id")]
118 pub fn py_set_account_id(&mut self, account_id: AccountId) {
119 self.set_account_id(account_id);
120 }
121
122 #[pyo3(name = "cache_instrument")]
123 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
124 let inst_any = pyobject_to_instrument_any(py, instrument)?;
125 self.cache_instrument(inst_any);
126 Ok(())
127 }
128
129 #[pyo3(name = "connect")]
130 fn py_connect<'py>(
131 &mut self,
132 py: Python<'py>,
133 instruments: Vec<Py<PyAny>>,
134 callback: Py<PyAny>,
135 ) -> PyResult<Bound<'py, PyAny>> {
136 let mut instruments_any = Vec::new();
137 for inst in instruments {
138 let inst_any = pyobject_to_instrument_any(py, inst)?;
139 instruments_any.push(inst_any);
140 }
141
142 self.cache_instruments(instruments_any);
143
144 let mut client = self.clone();
147
148 pyo3_async_runtimes::tokio::future_into_py(py, async move {
149 client.connect().await.map_err(to_pyruntime_err)?;
150
151 let stream = client.stream();
152
153 get_runtime().spawn(async move {
154 let _client = client; tokio::pin!(stream);
156
157 while let Some(msg) = stream.next().await {
158 Python::attach(|py| match msg {
159 NautilusWsMessage::Data(data_vec) => {
160 for data in data_vec {
161 let py_obj = data_to_pycapsule(py, data);
162 call_python(py, &callback, py_obj);
163 }
164 }
165 NautilusWsMessage::Instruments(instruments) => {
166 for instrument in instruments {
167 if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
168 call_python(py, &callback, py_obj);
169 }
170 }
171 }
172 NautilusWsMessage::OrderStatusReports(reports) => {
173 for report in reports {
174 if let Ok(py_obj) = report.into_py_any(py) {
175 call_python(py, &callback, py_obj);
176 }
177 }
178 }
179 NautilusWsMessage::FillReports(reports) => {
180 for report in reports {
181 if let Ok(py_obj) = report.into_py_any(py) {
182 call_python(py, &callback, py_obj);
183 }
184 }
185 }
186 NautilusWsMessage::PositionStatusReport(report) => {
187 if let Ok(py_obj) = report.into_py_any(py) {
188 call_python(py, &callback, py_obj);
189 }
190 }
191 NautilusWsMessage::FundingRateUpdates(updates) => {
192 for update in updates {
193 if let Ok(py_obj) = update.into_py_any(py) {
194 call_python(py, &callback, py_obj);
195 }
196 }
197 }
198 NautilusWsMessage::AccountState(account_state) => {
199 if let Ok(py_obj) = account_state.into_py_any(py) {
200 call_python(py, &callback, py_obj);
201 }
202 }
203 NautilusWsMessage::OrderUpdated(event) => {
204 if let Ok(py_obj) = event.into_py_any(py) {
205 call_python(py, &callback, py_obj);
206 }
207 }
208 NautilusWsMessage::Reconnected => {}
209 NautilusWsMessage::Authenticated => {}
210 });
211 }
212 });
213
214 Ok(())
215 })
216 }
217
218 #[pyo3(name = "wait_until_active")]
219 fn py_wait_until_active<'py>(
220 &self,
221 py: Python<'py>,
222 timeout_secs: f64,
223 ) -> PyResult<Bound<'py, PyAny>> {
224 let client = self.clone();
225
226 pyo3_async_runtimes::tokio::future_into_py(py, async move {
227 client
228 .wait_until_active(timeout_secs)
229 .await
230 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
231 Ok(())
232 })
233 }
234
235 #[pyo3(name = "close")]
236 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
237 let mut client = self.clone();
238
239 pyo3_async_runtimes::tokio::future_into_py(py, async move {
240 if let Err(e) = client.close().await {
241 log::error!("Error on close: {e}");
242 }
243 Ok(())
244 })
245 }
246
247 #[pyo3(name = "subscribe_instruments")]
248 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
249 let client = self.clone();
250
251 pyo3_async_runtimes::tokio::future_into_py(py, async move {
252 if let Err(e) = client.subscribe_instruments().await {
253 log::error!("Failed to subscribe to instruments: {e}");
254 }
255 Ok(())
256 })
257 }
258
259 #[pyo3(name = "subscribe_instrument")]
260 fn py_subscribe_instrument<'py>(
261 &self,
262 py: Python<'py>,
263 instrument_id: InstrumentId,
264 ) -> PyResult<Bound<'py, PyAny>> {
265 let client = self.clone();
266
267 pyo3_async_runtimes::tokio::future_into_py(py, async move {
268 if let Err(e) = client.subscribe_instrument(instrument_id).await {
269 log::error!("Failed to subscribe to instrument: {e}");
270 }
271 Ok(())
272 })
273 }
274
275 #[pyo3(name = "subscribe_book")]
276 fn py_subscribe_book<'py>(
277 &self,
278 py: Python<'py>,
279 instrument_id: InstrumentId,
280 ) -> PyResult<Bound<'py, PyAny>> {
281 let client = self.clone();
282
283 pyo3_async_runtimes::tokio::future_into_py(py, async move {
284 if let Err(e) = client.subscribe_book(instrument_id).await {
285 log::error!("Failed to subscribe to order book: {e}");
286 }
287 Ok(())
288 })
289 }
290
291 #[pyo3(name = "subscribe_book_25")]
292 fn py_subscribe_book_25<'py>(
293 &self,
294 py: Python<'py>,
295 instrument_id: InstrumentId,
296 ) -> PyResult<Bound<'py, PyAny>> {
297 let client = self.clone();
298
299 pyo3_async_runtimes::tokio::future_into_py(py, async move {
300 if let Err(e) = client.subscribe_book_25(instrument_id).await {
301 log::error!("Failed to subscribe to order book 25: {e}");
302 }
303 Ok(())
304 })
305 }
306
307 #[pyo3(name = "subscribe_book_depth10")]
308 fn py_subscribe_book_depth10<'py>(
309 &self,
310 py: Python<'py>,
311 instrument_id: InstrumentId,
312 ) -> PyResult<Bound<'py, PyAny>> {
313 let client = self.clone();
314
315 pyo3_async_runtimes::tokio::future_into_py(py, async move {
316 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
317 log::error!("Failed to subscribe to order book depth 10: {e}");
318 }
319 Ok(())
320 })
321 }
322
323 #[pyo3(name = "subscribe_quotes")]
324 fn py_subscribe_quotes<'py>(
325 &self,
326 py: Python<'py>,
327 instrument_id: InstrumentId,
328 ) -> PyResult<Bound<'py, PyAny>> {
329 let client = self.clone();
330
331 pyo3_async_runtimes::tokio::future_into_py(py, async move {
332 if let Err(e) = client.subscribe_quotes(instrument_id).await {
333 log::error!("Failed to subscribe to quotes: {e}");
334 }
335 Ok(())
336 })
337 }
338
339 #[pyo3(name = "subscribe_trades")]
340 fn py_subscribe_trades<'py>(
341 &self,
342 py: Python<'py>,
343 instrument_id: InstrumentId,
344 ) -> PyResult<Bound<'py, PyAny>> {
345 let client = self.clone();
346
347 pyo3_async_runtimes::tokio::future_into_py(py, async move {
348 if let Err(e) = client.subscribe_trades(instrument_id).await {
349 log::error!("Failed to subscribe to trades: {e}");
350 }
351 Ok(())
352 })
353 }
354
355 #[pyo3(name = "subscribe_mark_prices")]
356 fn py_subscribe_mark_prices<'py>(
357 &self,
358 py: Python<'py>,
359 instrument_id: InstrumentId,
360 ) -> PyResult<Bound<'py, PyAny>> {
361 let client = self.clone();
362
363 pyo3_async_runtimes::tokio::future_into_py(py, async move {
364 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
365 log::error!("Failed to subscribe to mark prices: {e}");
366 }
367 Ok(())
368 })
369 }
370
371 #[pyo3(name = "subscribe_index_prices")]
372 fn py_subscribe_index_prices<'py>(
373 &self,
374 py: Python<'py>,
375 instrument_id: InstrumentId,
376 ) -> PyResult<Bound<'py, PyAny>> {
377 let client = self.clone();
378
379 pyo3_async_runtimes::tokio::future_into_py(py, async move {
380 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
381 log::error!("Failed to subscribe to index prices: {e}");
382 }
383 Ok(())
384 })
385 }
386
387 #[pyo3(name = "subscribe_funding_rates")]
388 fn py_subscribe_funding_rates<'py>(
389 &self,
390 py: Python<'py>,
391 instrument_id: InstrumentId,
392 ) -> PyResult<Bound<'py, PyAny>> {
393 let client = self.clone();
394
395 pyo3_async_runtimes::tokio::future_into_py(py, async move {
396 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
397 log::error!("Failed to subscribe to funding: {e}");
398 }
399 Ok(())
400 })
401 }
402
403 #[pyo3(name = "subscribe_bars")]
404 fn py_subscribe_bars<'py>(
405 &self,
406 py: Python<'py>,
407 bar_type: BarType,
408 ) -> PyResult<Bound<'py, PyAny>> {
409 let client = self.clone();
410
411 pyo3_async_runtimes::tokio::future_into_py(py, async move {
412 if let Err(e) = client.subscribe_bars(bar_type).await {
413 log::error!("Failed to subscribe to bars: {e}");
414 }
415 Ok(())
416 })
417 }
418
419 #[pyo3(name = "unsubscribe_instruments")]
420 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
421 let client = self.clone();
422
423 pyo3_async_runtimes::tokio::future_into_py(py, async move {
424 if let Err(e) = client.unsubscribe_instruments().await {
425 log::error!("Failed to unsubscribe from instruments: {e}");
426 }
427 Ok(())
428 })
429 }
430
431 #[pyo3(name = "unsubscribe_instrument")]
432 fn py_unsubscribe_instrument<'py>(
433 &self,
434 py: Python<'py>,
435 instrument_id: InstrumentId,
436 ) -> PyResult<Bound<'py, PyAny>> {
437 let client = self.clone();
438
439 pyo3_async_runtimes::tokio::future_into_py(py, async move {
440 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
441 log::error!("Failed to unsubscribe from instrument: {e}");
442 }
443 Ok(())
444 })
445 }
446
447 #[pyo3(name = "unsubscribe_book")]
448 fn py_unsubscribe_book<'py>(
449 &self,
450 py: Python<'py>,
451 instrument_id: InstrumentId,
452 ) -> PyResult<Bound<'py, PyAny>> {
453 let client = self.clone();
454
455 pyo3_async_runtimes::tokio::future_into_py(py, async move {
456 if let Err(e) = client.unsubscribe_book(instrument_id).await {
457 log::error!("Failed to unsubscribe from order book: {e}");
458 }
459 Ok(())
460 })
461 }
462
463 #[pyo3(name = "unsubscribe_book_25")]
464 fn py_unsubscribe_book_25<'py>(
465 &self,
466 py: Python<'py>,
467 instrument_id: InstrumentId,
468 ) -> PyResult<Bound<'py, PyAny>> {
469 let client = self.clone();
470
471 pyo3_async_runtimes::tokio::future_into_py(py, async move {
472 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
473 log::error!("Failed to unsubscribe from order book 25: {e}");
474 }
475 Ok(())
476 })
477 }
478
479 #[pyo3(name = "unsubscribe_book_depth10")]
480 fn py_unsubscribe_book_depth10<'py>(
481 &self,
482 py: Python<'py>,
483 instrument_id: InstrumentId,
484 ) -> PyResult<Bound<'py, PyAny>> {
485 let client = self.clone();
486
487 pyo3_async_runtimes::tokio::future_into_py(py, async move {
488 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
489 log::error!("Failed to unsubscribe from order book depth 10: {e}");
490 }
491 Ok(())
492 })
493 }
494
495 #[pyo3(name = "unsubscribe_quotes")]
496 fn py_unsubscribe_quotes<'py>(
497 &self,
498 py: Python<'py>,
499 instrument_id: InstrumentId,
500 ) -> PyResult<Bound<'py, PyAny>> {
501 let client = self.clone();
502
503 pyo3_async_runtimes::tokio::future_into_py(py, async move {
504 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
505 log::error!("Failed to unsubscribe from quotes: {e}");
506 }
507 Ok(())
508 })
509 }
510
511 #[pyo3(name = "unsubscribe_trades")]
512 fn py_unsubscribe_trades<'py>(
513 &self,
514 py: Python<'py>,
515 instrument_id: InstrumentId,
516 ) -> PyResult<Bound<'py, PyAny>> {
517 let client = self.clone();
518
519 pyo3_async_runtimes::tokio::future_into_py(py, async move {
520 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
521 log::error!("Failed to unsubscribe from trades: {e}");
522 }
523 Ok(())
524 })
525 }
526
527 #[pyo3(name = "unsubscribe_mark_prices")]
528 fn py_unsubscribe_mark_prices<'py>(
529 &self,
530 py: Python<'py>,
531 instrument_id: InstrumentId,
532 ) -> PyResult<Bound<'py, PyAny>> {
533 let client = self.clone();
534
535 pyo3_async_runtimes::tokio::future_into_py(py, async move {
536 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
537 log::error!("Failed to unsubscribe from mark prices: {e}");
538 }
539 Ok(())
540 })
541 }
542
543 #[pyo3(name = "unsubscribe_index_prices")]
544 fn py_unsubscribe_index_prices<'py>(
545 &self,
546 py: Python<'py>,
547 instrument_id: InstrumentId,
548 ) -> PyResult<Bound<'py, PyAny>> {
549 let client = self.clone();
550
551 pyo3_async_runtimes::tokio::future_into_py(py, async move {
552 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
553 log::error!("Failed to unsubscribe from index prices: {e}");
554 }
555 Ok(())
556 })
557 }
558
559 #[pyo3(name = "unsubscribe_funding_rates")]
560 fn py_unsubscribe_funding_rates<'py>(
561 &self,
562 py: Python<'py>,
563 instrument_id: InstrumentId,
564 ) -> PyResult<Bound<'py, PyAny>> {
565 let client = self.clone();
566 pyo3_async_runtimes::tokio::future_into_py(py, async move {
567 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
568 log::error!("Failed to unsubscribe from funding rates: {e}");
569 }
570 Ok(())
571 })
572 }
573
574 #[pyo3(name = "unsubscribe_bars")]
575 fn py_unsubscribe_bars<'py>(
576 &self,
577 py: Python<'py>,
578 bar_type: BarType,
579 ) -> PyResult<Bound<'py, PyAny>> {
580 let client = self.clone();
581
582 pyo3_async_runtimes::tokio::future_into_py(py, async move {
583 if let Err(e) = client.unsubscribe_bars(bar_type).await {
584 log::error!("Failed to unsubscribe from bars: {e}");
585 }
586 Ok(())
587 })
588 }
589
590 #[pyo3(name = "subscribe_orders")]
591 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
592 let client = self.clone();
593
594 pyo3_async_runtimes::tokio::future_into_py(py, async move {
595 if let Err(e) = client.subscribe_orders().await {
596 log::error!("Failed to subscribe to orders: {e}");
597 }
598 Ok(())
599 })
600 }
601
602 #[pyo3(name = "subscribe_executions")]
603 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
604 let client = self.clone();
605
606 pyo3_async_runtimes::tokio::future_into_py(py, async move {
607 if let Err(e) = client.subscribe_executions().await {
608 log::error!("Failed to subscribe to executions: {e}");
609 }
610 Ok(())
611 })
612 }
613
614 #[pyo3(name = "subscribe_positions")]
615 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
616 let client = self.clone();
617
618 pyo3_async_runtimes::tokio::future_into_py(py, async move {
619 if let Err(e) = client.subscribe_positions().await {
620 log::error!("Failed to subscribe to positions: {e}");
621 }
622 Ok(())
623 })
624 }
625
626 #[pyo3(name = "subscribe_margin")]
627 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
628 let client = self.clone();
629
630 pyo3_async_runtimes::tokio::future_into_py(py, async move {
631 if let Err(e) = client.subscribe_margin().await {
632 log::error!("Failed to subscribe to margin: {e}");
633 }
634 Ok(())
635 })
636 }
637
638 #[pyo3(name = "subscribe_wallet")]
639 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
640 let client = self.clone();
641
642 pyo3_async_runtimes::tokio::future_into_py(py, async move {
643 if let Err(e) = client.subscribe_wallet().await {
644 log::error!("Failed to subscribe to wallet: {e}");
645 }
646 Ok(())
647 })
648 }
649
650 #[pyo3(name = "unsubscribe_orders")]
651 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
652 let client = self.clone();
653
654 pyo3_async_runtimes::tokio::future_into_py(py, async move {
655 if let Err(e) = client.unsubscribe_orders().await {
656 log::error!("Failed to unsubscribe from orders: {e}");
657 }
658 Ok(())
659 })
660 }
661
662 #[pyo3(name = "unsubscribe_executions")]
663 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
664 let client = self.clone();
665
666 pyo3_async_runtimes::tokio::future_into_py(py, async move {
667 if let Err(e) = client.unsubscribe_executions().await {
668 log::error!("Failed to unsubscribe from executions: {e}");
669 }
670 Ok(())
671 })
672 }
673
674 #[pyo3(name = "unsubscribe_positions")]
675 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
676 let client = self.clone();
677
678 pyo3_async_runtimes::tokio::future_into_py(py, async move {
679 if let Err(e) = client.unsubscribe_positions().await {
680 log::error!("Failed to unsubscribe from positions: {e}");
681 }
682 Ok(())
683 })
684 }
685
686 #[pyo3(name = "unsubscribe_margin")]
687 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
688 let client = self.clone();
689
690 pyo3_async_runtimes::tokio::future_into_py(py, async move {
691 if let Err(e) = client.unsubscribe_margin().await {
692 log::error!("Failed to unsubscribe from margin: {e}");
693 }
694 Ok(())
695 })
696 }
697
698 #[pyo3(name = "unsubscribe_wallet")]
699 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
700 let client = self.clone();
701
702 pyo3_async_runtimes::tokio::future_into_py(py, async move {
703 if let Err(e) = client.unsubscribe_wallet().await {
704 log::error!("Failed to unsubscribe from wallet: {e}");
705 }
706 Ok(())
707 })
708 }
709}
710
711pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
712 if let Err(e) = callback.call1(py, (py_obj,)) {
713 tracing::error!("Error calling Python: {e}");
714 }
715}