1use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47 data::bar::BarType,
48 identifiers::{AccountId, InstrumentId},
49 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
52
53use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
54
55#[pymethods]
56impl BitmexWebSocketClient {
57 #[new]
58 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None))]
59 fn py_new(
60 url: Option<String>,
61 api_key: Option<String>,
62 api_secret: Option<String>,
63 account_id: Option<AccountId>,
64 heartbeat: Option<u64>,
65 ) -> PyResult<Self> {
66 let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
68 let env_key = std::env::var("BITMEX_API_KEY").ok();
70 let env_secret = std::env::var("BITMEX_API_SECRET").ok();
71 (env_key, env_secret)
72 } else {
73 (api_key, api_secret)
74 };
75
76 Self::new(url, final_api_key, final_api_secret, account_id, heartbeat)
77 .map_err(to_pyvalue_err)
78 }
79
80 #[staticmethod]
81 #[pyo3(name = "from_env")]
82 fn py_from_env() -> PyResult<Self> {
83 Self::from_env().map_err(to_pyvalue_err)
84 }
85
86 #[getter]
87 #[pyo3(name = "url")]
88 #[must_use]
89 pub const fn py_url(&self) -> &str {
90 self.url()
91 }
92
93 #[getter]
94 #[pyo3(name = "api_key")]
95 #[must_use]
96 pub fn py_api_key(&self) -> Option<&str> {
97 self.api_key()
98 }
99
100 #[pyo3(name = "is_active")]
101 fn py_is_active(&mut self) -> bool {
102 self.is_active()
103 }
104
105 #[pyo3(name = "is_closed")]
106 fn py_is_closed(&mut self) -> bool {
107 self.is_closed()
108 }
109
110 #[pyo3(name = "get_subscriptions")]
111 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
112 self.get_subscriptions(instrument_id)
113 }
114
115 #[pyo3(name = "set_account_id")]
116 pub fn py_set_account_id(&mut self, account_id: AccountId) {
117 self.set_account_id(account_id);
118 }
119
120 #[pyo3(name = "connect")]
121 fn py_connect<'py>(
122 &mut self,
123 py: Python<'py>,
124 instruments: Vec<PyObject>,
125 callback: PyObject,
126 ) -> PyResult<Bound<'py, PyAny>> {
127 let mut instruments_any = Vec::new();
128 for inst in instruments {
129 let inst_any = pyobject_to_instrument_any(py, inst)?;
130 instruments_any.push(inst_any);
131 }
132
133 self.initialize_instruments_cache(instruments_any);
134
135 let mut client = self.clone();
136
137 pyo3_async_runtimes::tokio::future_into_py(py, async move {
138 client.connect().await.map_err(to_pyruntime_err)?;
139
140 let stream = client.stream();
141
142 tokio::spawn(async move {
143 tokio::pin!(stream);
144
145 while let Some(msg) = stream.next().await {
146 Python::with_gil(|py| match msg {
147 NautilusWsMessage::Data(data_vec) => {
148 for data in data_vec {
149 let py_obj = data_to_pycapsule(py, data);
150 call_python(py, &callback, py_obj);
151 }
152 }
153 NautilusWsMessage::OrderStatusReports(reports) => {
154 for report in reports {
155 if let Ok(py_obj) = report.into_py_any(py) {
156 call_python(py, &callback, py_obj);
157 }
158 }
159 }
160 NautilusWsMessage::FillReports(reports) => {
161 for report in reports {
162 if let Ok(py_obj) = report.into_py_any(py) {
163 call_python(py, &callback, py_obj);
164 }
165 }
166 }
167 NautilusWsMessage::PositionStatusReport(report) => {
168 if let Ok(py_obj) = report.into_py_any(py) {
169 call_python(py, &callback, py_obj);
170 }
171 }
172 NautilusWsMessage::FundingRateUpdates(updates) => {
173 for update in updates {
174 if let Ok(py_obj) = update.into_py_any(py) {
175 call_python(py, &callback, py_obj);
176 }
177 }
178 }
179 NautilusWsMessage::AccountState(account_state) => {
180 if let Ok(py_obj) = account_state.into_py_any(py) {
181 call_python(py, &callback, py_obj);
182 }
183 }
184 NautilusWsMessage::OrderUpdated(event) => {
185 if let Ok(py_obj) = event.into_py_any(py) {
186 call_python(py, &callback, py_obj);
187 }
188 }
189 NautilusWsMessage::Reconnected => {} });
191 }
192 });
193
194 Ok(())
195 })
196 }
197
198 #[pyo3(name = "wait_until_active")]
199 fn py_wait_until_active<'py>(
200 &self,
201 py: Python<'py>,
202 timeout_secs: f64,
203 ) -> PyResult<Bound<'py, PyAny>> {
204 let client = self.clone();
205
206 pyo3_async_runtimes::tokio::future_into_py(py, async move {
207 client
208 .wait_until_active(timeout_secs)
209 .await
210 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
211 Ok(())
212 })
213 }
214
215 #[pyo3(name = "close")]
216 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
217 let mut client = self.clone();
218
219 pyo3_async_runtimes::tokio::future_into_py(py, async move {
220 if let Err(e) = client.close().await {
221 log::error!("Error on close: {e}");
222 }
223 Ok(())
224 })
225 }
226
227 #[pyo3(name = "subscribe_instruments")]
228 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
229 let client = self.clone();
230
231 pyo3_async_runtimes::tokio::future_into_py(py, async move {
232 if let Err(e) = client.subscribe_instruments().await {
233 log::error!("Failed to subscribe to instruments: {e}");
234 }
235 Ok(())
236 })
237 }
238
239 #[pyo3(name = "subscribe_instrument")]
240 fn py_subscribe_instrument<'py>(
241 &self,
242 py: Python<'py>,
243 instrument_id: InstrumentId,
244 ) -> PyResult<Bound<'py, PyAny>> {
245 let client = self.clone();
246
247 pyo3_async_runtimes::tokio::future_into_py(py, async move {
248 if let Err(e) = client.subscribe_instrument(instrument_id).await {
249 log::error!("Failed to subscribe to instrument: {e}");
250 }
251 Ok(())
252 })
253 }
254
255 #[pyo3(name = "subscribe_book")]
256 fn py_subscribe_book<'py>(
257 &self,
258 py: Python<'py>,
259 instrument_id: InstrumentId,
260 ) -> PyResult<Bound<'py, PyAny>> {
261 let client = self.clone();
262
263 pyo3_async_runtimes::tokio::future_into_py(py, async move {
264 if let Err(e) = client.subscribe_book(instrument_id).await {
265 log::error!("Failed to subscribe to order book: {e}");
266 }
267 Ok(())
268 })
269 }
270
271 #[pyo3(name = "subscribe_book_25")]
272 fn py_subscribe_book_25<'py>(
273 &self,
274 py: Python<'py>,
275 instrument_id: InstrumentId,
276 ) -> PyResult<Bound<'py, PyAny>> {
277 let client = self.clone();
278
279 pyo3_async_runtimes::tokio::future_into_py(py, async move {
280 if let Err(e) = client.subscribe_book_25(instrument_id).await {
281 log::error!("Failed to subscribe to order book 25: {e}");
282 }
283 Ok(())
284 })
285 }
286
287 #[pyo3(name = "subscribe_book_depth10")]
288 fn py_subscribe_book_depth10<'py>(
289 &self,
290 py: Python<'py>,
291 instrument_id: InstrumentId,
292 ) -> PyResult<Bound<'py, PyAny>> {
293 let client = self.clone();
294
295 pyo3_async_runtimes::tokio::future_into_py(py, async move {
296 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
297 log::error!("Failed to subscribe to order book depth 10: {e}");
298 }
299 Ok(())
300 })
301 }
302
303 #[pyo3(name = "subscribe_quotes")]
304 fn py_subscribe_quotes<'py>(
305 &self,
306 py: Python<'py>,
307 instrument_id: InstrumentId,
308 ) -> PyResult<Bound<'py, PyAny>> {
309 let client = self.clone();
310
311 pyo3_async_runtimes::tokio::future_into_py(py, async move {
312 if let Err(e) = client.subscribe_quotes(instrument_id).await {
313 log::error!("Failed to subscribe to quotes: {e}");
314 }
315 Ok(())
316 })
317 }
318
319 #[pyo3(name = "subscribe_trades")]
320 fn py_subscribe_trades<'py>(
321 &self,
322 py: Python<'py>,
323 instrument_id: InstrumentId,
324 ) -> PyResult<Bound<'py, PyAny>> {
325 let client = self.clone();
326
327 pyo3_async_runtimes::tokio::future_into_py(py, async move {
328 if let Err(e) = client.subscribe_trades(instrument_id).await {
329 log::error!("Failed to subscribe to trades: {e}");
330 }
331 Ok(())
332 })
333 }
334
335 #[pyo3(name = "subscribe_mark_prices")]
336 fn py_subscribe_mark_prices<'py>(
337 &self,
338 py: Python<'py>,
339 instrument_id: InstrumentId,
340 ) -> PyResult<Bound<'py, PyAny>> {
341 let client = self.clone();
342
343 pyo3_async_runtimes::tokio::future_into_py(py, async move {
344 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
345 log::error!("Failed to subscribe to mark prices: {e}");
346 }
347 Ok(())
348 })
349 }
350
351 #[pyo3(name = "subscribe_index_prices")]
352 fn py_subscribe_index_prices<'py>(
353 &self,
354 py: Python<'py>,
355 instrument_id: InstrumentId,
356 ) -> PyResult<Bound<'py, PyAny>> {
357 let client = self.clone();
358
359 pyo3_async_runtimes::tokio::future_into_py(py, async move {
360 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
361 log::error!("Failed to subscribe to index prices: {e}");
362 }
363 Ok(())
364 })
365 }
366
367 #[pyo3(name = "subscribe_funding_rates")]
368 fn py_subscribe_funding_rates<'py>(
369 &self,
370 py: Python<'py>,
371 instrument_id: InstrumentId,
372 ) -> PyResult<Bound<'py, PyAny>> {
373 let client = self.clone();
374
375 pyo3_async_runtimes::tokio::future_into_py(py, async move {
376 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
377 log::error!("Failed to subscribe to funding: {e}");
378 }
379 Ok(())
380 })
381 }
382
383 #[pyo3(name = "subscribe_bars")]
384 fn py_subscribe_bars<'py>(
385 &self,
386 py: Python<'py>,
387 bar_type: BarType,
388 ) -> PyResult<Bound<'py, PyAny>> {
389 let client = self.clone();
390
391 pyo3_async_runtimes::tokio::future_into_py(py, async move {
392 if let Err(e) = client.subscribe_bars(bar_type).await {
393 log::error!("Failed to subscribe to bars: {e}");
394 }
395 Ok(())
396 })
397 }
398
399 #[pyo3(name = "unsubscribe_instruments")]
400 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
401 let client = self.clone();
402
403 pyo3_async_runtimes::tokio::future_into_py(py, async move {
404 if let Err(e) = client.unsubscribe_instruments().await {
405 log::error!("Failed to unsubscribe from instruments: {e}");
406 }
407 Ok(())
408 })
409 }
410
411 #[pyo3(name = "unsubscribe_instrument")]
412 fn py_unsubscribe_instrument<'py>(
413 &self,
414 py: Python<'py>,
415 instrument_id: InstrumentId,
416 ) -> PyResult<Bound<'py, PyAny>> {
417 let client = self.clone();
418
419 pyo3_async_runtimes::tokio::future_into_py(py, async move {
420 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
421 log::error!("Failed to unsubscribe from instrument: {e}");
422 }
423 Ok(())
424 })
425 }
426
427 #[pyo3(name = "unsubscribe_book")]
428 fn py_unsubscribe_book<'py>(
429 &self,
430 py: Python<'py>,
431 instrument_id: InstrumentId,
432 ) -> PyResult<Bound<'py, PyAny>> {
433 let client = self.clone();
434
435 pyo3_async_runtimes::tokio::future_into_py(py, async move {
436 if let Err(e) = client.unsubscribe_book(instrument_id).await {
437 log::error!("Failed to unsubscribe from order book: {e}");
438 }
439 Ok(())
440 })
441 }
442
443 #[pyo3(name = "unsubscribe_book_25")]
444 fn py_unsubscribe_book_25<'py>(
445 &self,
446 py: Python<'py>,
447 instrument_id: InstrumentId,
448 ) -> PyResult<Bound<'py, PyAny>> {
449 let client = self.clone();
450
451 pyo3_async_runtimes::tokio::future_into_py(py, async move {
452 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
453 log::error!("Failed to unsubscribe from order book 25: {e}");
454 }
455 Ok(())
456 })
457 }
458
459 #[pyo3(name = "unsubscribe_book_depth10")]
460 fn py_unsubscribe_book_depth10<'py>(
461 &self,
462 py: Python<'py>,
463 instrument_id: InstrumentId,
464 ) -> PyResult<Bound<'py, PyAny>> {
465 let client = self.clone();
466
467 pyo3_async_runtimes::tokio::future_into_py(py, async move {
468 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
469 log::error!("Failed to unsubscribe from order book depth 10: {e}");
470 }
471 Ok(())
472 })
473 }
474
475 #[pyo3(name = "unsubscribe_quotes")]
476 fn py_unsubscribe_quotes<'py>(
477 &self,
478 py: Python<'py>,
479 instrument_id: InstrumentId,
480 ) -> PyResult<Bound<'py, PyAny>> {
481 let client = self.clone();
482
483 pyo3_async_runtimes::tokio::future_into_py(py, async move {
484 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
485 log::error!("Failed to unsubscribe from quotes: {e}");
486 }
487 Ok(())
488 })
489 }
490
491 #[pyo3(name = "unsubscribe_trades")]
492 fn py_unsubscribe_trades<'py>(
493 &self,
494 py: Python<'py>,
495 instrument_id: InstrumentId,
496 ) -> PyResult<Bound<'py, PyAny>> {
497 let client = self.clone();
498
499 pyo3_async_runtimes::tokio::future_into_py(py, async move {
500 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
501 log::error!("Failed to unsubscribe from trades: {e}");
502 }
503 Ok(())
504 })
505 }
506
507 #[pyo3(name = "unsubscribe_mark_prices")]
508 fn py_unsubscribe_mark_prices<'py>(
509 &self,
510 py: Python<'py>,
511 instrument_id: InstrumentId,
512 ) -> PyResult<Bound<'py, PyAny>> {
513 let client = self.clone();
514
515 pyo3_async_runtimes::tokio::future_into_py(py, async move {
516 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
517 log::error!("Failed to unsubscribe from mark prices: {e}");
518 }
519 Ok(())
520 })
521 }
522
523 #[pyo3(name = "unsubscribe_index_prices")]
524 fn py_unsubscribe_index_prices<'py>(
525 &self,
526 py: Python<'py>,
527 instrument_id: InstrumentId,
528 ) -> PyResult<Bound<'py, PyAny>> {
529 let client = self.clone();
530
531 pyo3_async_runtimes::tokio::future_into_py(py, async move {
532 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
533 log::error!("Failed to unsubscribe from index prices: {e}");
534 }
535 Ok(())
536 })
537 }
538
539 #[pyo3(name = "unsubscribe_funding_rates")]
540 fn py_unsubscribe_funding_rates<'py>(
541 &self,
542 py: Python<'py>,
543 instrument_id: InstrumentId,
544 ) -> PyResult<Bound<'py, PyAny>> {
545 let client = self.clone();
546 pyo3_async_runtimes::tokio::future_into_py(py, async move {
547 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
548 log::error!("Failed to unsubscribe from funding rates: {e}");
549 }
550 Ok(())
551 })
552 }
553
554 #[pyo3(name = "unsubscribe_bars")]
555 fn py_unsubscribe_bars<'py>(
556 &self,
557 py: Python<'py>,
558 bar_type: BarType,
559 ) -> PyResult<Bound<'py, PyAny>> {
560 let client = self.clone();
561
562 pyo3_async_runtimes::tokio::future_into_py(py, async move {
563 if let Err(e) = client.unsubscribe_bars(bar_type).await {
564 log::error!("Failed to unsubscribe from bars: {e}");
565 }
566 Ok(())
567 })
568 }
569
570 #[pyo3(name = "subscribe_orders")]
571 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
572 let client = self.clone();
573
574 pyo3_async_runtimes::tokio::future_into_py(py, async move {
575 if let Err(e) = client.subscribe_orders().await {
576 log::error!("Failed to subscribe to orders: {e}");
577 }
578 Ok(())
579 })
580 }
581
582 #[pyo3(name = "subscribe_executions")]
583 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
584 let client = self.clone();
585
586 pyo3_async_runtimes::tokio::future_into_py(py, async move {
587 if let Err(e) = client.subscribe_executions().await {
588 log::error!("Failed to subscribe to executions: {e}");
589 }
590 Ok(())
591 })
592 }
593
594 #[pyo3(name = "subscribe_positions")]
595 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
596 let client = self.clone();
597
598 pyo3_async_runtimes::tokio::future_into_py(py, async move {
599 if let Err(e) = client.subscribe_positions().await {
600 log::error!("Failed to subscribe to positions: {e}");
601 }
602 Ok(())
603 })
604 }
605
606 #[pyo3(name = "subscribe_margin")]
607 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
608 let client = self.clone();
609
610 pyo3_async_runtimes::tokio::future_into_py(py, async move {
611 if let Err(e) = client.subscribe_margin().await {
612 log::error!("Failed to subscribe to margin: {e}");
613 }
614 Ok(())
615 })
616 }
617
618 #[pyo3(name = "subscribe_wallet")]
619 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
620 let client = self.clone();
621
622 pyo3_async_runtimes::tokio::future_into_py(py, async move {
623 if let Err(e) = client.subscribe_wallet().await {
624 log::error!("Failed to subscribe to wallet: {e}");
625 }
626 Ok(())
627 })
628 }
629
630 #[pyo3(name = "unsubscribe_orders")]
631 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
632 let client = self.clone();
633
634 pyo3_async_runtimes::tokio::future_into_py(py, async move {
635 if let Err(e) = client.unsubscribe_orders().await {
636 log::error!("Failed to unsubscribe from orders: {e}");
637 }
638 Ok(())
639 })
640 }
641
642 #[pyo3(name = "unsubscribe_executions")]
643 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
644 let client = self.clone();
645
646 pyo3_async_runtimes::tokio::future_into_py(py, async move {
647 if let Err(e) = client.unsubscribe_executions().await {
648 log::error!("Failed to unsubscribe from executions: {e}");
649 }
650 Ok(())
651 })
652 }
653
654 #[pyo3(name = "unsubscribe_positions")]
655 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
656 let client = self.clone();
657
658 pyo3_async_runtimes::tokio::future_into_py(py, async move {
659 if let Err(e) = client.unsubscribe_positions().await {
660 log::error!("Failed to unsubscribe from positions: {e}");
661 }
662 Ok(())
663 })
664 }
665
666 #[pyo3(name = "unsubscribe_margin")]
667 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
668 let client = self.clone();
669
670 pyo3_async_runtimes::tokio::future_into_py(py, async move {
671 if let Err(e) = client.unsubscribe_margin().await {
672 log::error!("Failed to unsubscribe from margin: {e}");
673 }
674 Ok(())
675 })
676 }
677
678 #[pyo3(name = "unsubscribe_wallet")]
679 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
680 let client = self.clone();
681
682 pyo3_async_runtimes::tokio::future_into_py(py, async move {
683 if let Err(e) = client.unsubscribe_wallet().await {
684 log::error!("Failed to unsubscribe from wallet: {e}");
685 }
686 Ok(())
687 })
688 }
689}
690
691pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
692 if let Err(e) = callback.call1(py, (py_obj,)) {
693 tracing::error!("Error calling Python: {e}");
694 }
695}