1use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47 data::bar::BarType,
48 identifiers::{AccountId, InstrumentId},
49 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
52
53use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
54
55#[pymethods]
56impl BitmexWebSocketClient {
57 #[new]
58 #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
59 fn py_new(
60 url: Option<String>,
61 api_key: Option<String>,
62 api_secret: Option<String>,
63 account_id: Option<AccountId>,
64 heartbeat: Option<u64>,
65 testnet: bool,
66 ) -> PyResult<Self> {
67 let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
69 let (key_var, secret_var) = if testnet {
71 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
72 } else {
73 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
74 };
75
76 let env_key = std::env::var(key_var).ok();
77 let env_secret = std::env::var(secret_var).ok();
78 (env_key, env_secret)
79 } else {
80 (api_key, api_secret)
81 };
82
83 Self::new(url, final_api_key, final_api_secret, account_id, heartbeat)
84 .map_err(to_pyvalue_err)
85 }
86
87 #[staticmethod]
88 #[pyo3(name = "from_env")]
89 fn py_from_env() -> PyResult<Self> {
90 Self::from_env().map_err(to_pyvalue_err)
91 }
92
93 #[getter]
94 #[pyo3(name = "url")]
95 #[must_use]
96 pub const fn py_url(&self) -> &str {
97 self.url()
98 }
99
100 #[getter]
101 #[pyo3(name = "api_key")]
102 #[must_use]
103 pub fn py_api_key(&self) -> Option<&str> {
104 self.api_key()
105 }
106
107 #[pyo3(name = "is_active")]
108 fn py_is_active(&mut self) -> bool {
109 self.is_active()
110 }
111
112 #[pyo3(name = "is_closed")]
113 fn py_is_closed(&mut self) -> bool {
114 self.is_closed()
115 }
116
117 #[pyo3(name = "get_subscriptions")]
118 fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
119 self.get_subscriptions(instrument_id)
120 }
121
122 #[pyo3(name = "set_account_id")]
123 pub fn py_set_account_id(&mut self, account_id: AccountId) {
124 self.set_account_id(account_id);
125 }
126
127 #[pyo3(name = "connect")]
128 fn py_connect<'py>(
129 &mut self,
130 py: Python<'py>,
131 instruments: Vec<Py<PyAny>>,
132 callback: Py<PyAny>,
133 ) -> PyResult<Bound<'py, PyAny>> {
134 let mut instruments_any = Vec::new();
135 for inst in instruments {
136 let inst_any = pyobject_to_instrument_any(py, inst)?;
137 instruments_any.push(inst_any);
138 }
139
140 self.initialize_instruments_cache(instruments_any);
141
142 let mut client = self.clone();
143
144 pyo3_async_runtimes::tokio::future_into_py(py, async move {
145 client.connect().await.map_err(to_pyruntime_err)?;
146
147 let stream = client.stream();
148
149 tokio::spawn(async move {
150 tokio::pin!(stream);
151
152 while let Some(msg) = stream.next().await {
153 Python::attach(|py| match msg {
154 NautilusWsMessage::Data(data_vec) => {
155 for data in data_vec {
156 let py_obj = data_to_pycapsule(py, data);
157 call_python(py, &callback, py_obj);
158 }
159 }
160 NautilusWsMessage::OrderStatusReports(reports) => {
161 for report in reports {
162 if let Ok(py_obj) = report.into_py_any(py) {
163 call_python(py, &callback, py_obj);
164 }
165 }
166 }
167 NautilusWsMessage::FillReports(reports) => {
168 for report in reports {
169 if let Ok(py_obj) = report.into_py_any(py) {
170 call_python(py, &callback, py_obj);
171 }
172 }
173 }
174 NautilusWsMessage::PositionStatusReport(report) => {
175 if let Ok(py_obj) = report.into_py_any(py) {
176 call_python(py, &callback, py_obj);
177 }
178 }
179 NautilusWsMessage::FundingRateUpdates(updates) => {
180 for update in updates {
181 if let Ok(py_obj) = update.into_py_any(py) {
182 call_python(py, &callback, py_obj);
183 }
184 }
185 }
186 NautilusWsMessage::AccountState(account_state) => {
187 if let Ok(py_obj) = account_state.into_py_any(py) {
188 call_python(py, &callback, py_obj);
189 }
190 }
191 NautilusWsMessage::OrderUpdated(event) => {
192 if let Ok(py_obj) = event.into_py_any(py) {
193 call_python(py, &callback, py_obj);
194 }
195 }
196 NautilusWsMessage::Reconnected => {} });
198 }
199 });
200
201 Ok(())
202 })
203 }
204
205 #[pyo3(name = "wait_until_active")]
206 fn py_wait_until_active<'py>(
207 &self,
208 py: Python<'py>,
209 timeout_secs: f64,
210 ) -> PyResult<Bound<'py, PyAny>> {
211 let client = self.clone();
212
213 pyo3_async_runtimes::tokio::future_into_py(py, async move {
214 client
215 .wait_until_active(timeout_secs)
216 .await
217 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
218 Ok(())
219 })
220 }
221
222 #[pyo3(name = "close")]
223 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
224 let mut client = self.clone();
225
226 pyo3_async_runtimes::tokio::future_into_py(py, async move {
227 if let Err(e) = client.close().await {
228 log::error!("Error on close: {e}");
229 }
230 Ok(())
231 })
232 }
233
234 #[pyo3(name = "subscribe_instruments")]
235 fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
236 let client = self.clone();
237
238 pyo3_async_runtimes::tokio::future_into_py(py, async move {
239 if let Err(e) = client.subscribe_instruments().await {
240 log::error!("Failed to subscribe to instruments: {e}");
241 }
242 Ok(())
243 })
244 }
245
246 #[pyo3(name = "subscribe_instrument")]
247 fn py_subscribe_instrument<'py>(
248 &self,
249 py: Python<'py>,
250 instrument_id: InstrumentId,
251 ) -> PyResult<Bound<'py, PyAny>> {
252 let client = self.clone();
253
254 pyo3_async_runtimes::tokio::future_into_py(py, async move {
255 if let Err(e) = client.subscribe_instrument(instrument_id).await {
256 log::error!("Failed to subscribe to instrument: {e}");
257 }
258 Ok(())
259 })
260 }
261
262 #[pyo3(name = "subscribe_book")]
263 fn py_subscribe_book<'py>(
264 &self,
265 py: Python<'py>,
266 instrument_id: InstrumentId,
267 ) -> PyResult<Bound<'py, PyAny>> {
268 let client = self.clone();
269
270 pyo3_async_runtimes::tokio::future_into_py(py, async move {
271 if let Err(e) = client.subscribe_book(instrument_id).await {
272 log::error!("Failed to subscribe to order book: {e}");
273 }
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "subscribe_book_25")]
279 fn py_subscribe_book_25<'py>(
280 &self,
281 py: Python<'py>,
282 instrument_id: InstrumentId,
283 ) -> PyResult<Bound<'py, PyAny>> {
284 let client = self.clone();
285
286 pyo3_async_runtimes::tokio::future_into_py(py, async move {
287 if let Err(e) = client.subscribe_book_25(instrument_id).await {
288 log::error!("Failed to subscribe to order book 25: {e}");
289 }
290 Ok(())
291 })
292 }
293
294 #[pyo3(name = "subscribe_book_depth10")]
295 fn py_subscribe_book_depth10<'py>(
296 &self,
297 py: Python<'py>,
298 instrument_id: InstrumentId,
299 ) -> PyResult<Bound<'py, PyAny>> {
300 let client = self.clone();
301
302 pyo3_async_runtimes::tokio::future_into_py(py, async move {
303 if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
304 log::error!("Failed to subscribe to order book depth 10: {e}");
305 }
306 Ok(())
307 })
308 }
309
310 #[pyo3(name = "subscribe_quotes")]
311 fn py_subscribe_quotes<'py>(
312 &self,
313 py: Python<'py>,
314 instrument_id: InstrumentId,
315 ) -> PyResult<Bound<'py, PyAny>> {
316 let client = self.clone();
317
318 pyo3_async_runtimes::tokio::future_into_py(py, async move {
319 if let Err(e) = client.subscribe_quotes(instrument_id).await {
320 log::error!("Failed to subscribe to quotes: {e}");
321 }
322 Ok(())
323 })
324 }
325
326 #[pyo3(name = "subscribe_trades")]
327 fn py_subscribe_trades<'py>(
328 &self,
329 py: Python<'py>,
330 instrument_id: InstrumentId,
331 ) -> PyResult<Bound<'py, PyAny>> {
332 let client = self.clone();
333
334 pyo3_async_runtimes::tokio::future_into_py(py, async move {
335 if let Err(e) = client.subscribe_trades(instrument_id).await {
336 log::error!("Failed to subscribe to trades: {e}");
337 }
338 Ok(())
339 })
340 }
341
342 #[pyo3(name = "subscribe_mark_prices")]
343 fn py_subscribe_mark_prices<'py>(
344 &self,
345 py: Python<'py>,
346 instrument_id: InstrumentId,
347 ) -> PyResult<Bound<'py, PyAny>> {
348 let client = self.clone();
349
350 pyo3_async_runtimes::tokio::future_into_py(py, async move {
351 if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
352 log::error!("Failed to subscribe to mark prices: {e}");
353 }
354 Ok(())
355 })
356 }
357
358 #[pyo3(name = "subscribe_index_prices")]
359 fn py_subscribe_index_prices<'py>(
360 &self,
361 py: Python<'py>,
362 instrument_id: InstrumentId,
363 ) -> PyResult<Bound<'py, PyAny>> {
364 let client = self.clone();
365
366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
367 if let Err(e) = client.subscribe_index_prices(instrument_id).await {
368 log::error!("Failed to subscribe to index prices: {e}");
369 }
370 Ok(())
371 })
372 }
373
374 #[pyo3(name = "subscribe_funding_rates")]
375 fn py_subscribe_funding_rates<'py>(
376 &self,
377 py: Python<'py>,
378 instrument_id: InstrumentId,
379 ) -> PyResult<Bound<'py, PyAny>> {
380 let client = self.clone();
381
382 pyo3_async_runtimes::tokio::future_into_py(py, async move {
383 if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
384 log::error!("Failed to subscribe to funding: {e}");
385 }
386 Ok(())
387 })
388 }
389
390 #[pyo3(name = "subscribe_bars")]
391 fn py_subscribe_bars<'py>(
392 &self,
393 py: Python<'py>,
394 bar_type: BarType,
395 ) -> PyResult<Bound<'py, PyAny>> {
396 let client = self.clone();
397
398 pyo3_async_runtimes::tokio::future_into_py(py, async move {
399 if let Err(e) = client.subscribe_bars(bar_type).await {
400 log::error!("Failed to subscribe to bars: {e}");
401 }
402 Ok(())
403 })
404 }
405
406 #[pyo3(name = "unsubscribe_instruments")]
407 fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
408 let client = self.clone();
409
410 pyo3_async_runtimes::tokio::future_into_py(py, async move {
411 if let Err(e) = client.unsubscribe_instruments().await {
412 log::error!("Failed to unsubscribe from instruments: {e}");
413 }
414 Ok(())
415 })
416 }
417
418 #[pyo3(name = "unsubscribe_instrument")]
419 fn py_unsubscribe_instrument<'py>(
420 &self,
421 py: Python<'py>,
422 instrument_id: InstrumentId,
423 ) -> PyResult<Bound<'py, PyAny>> {
424 let client = self.clone();
425
426 pyo3_async_runtimes::tokio::future_into_py(py, async move {
427 if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
428 log::error!("Failed to unsubscribe from instrument: {e}");
429 }
430 Ok(())
431 })
432 }
433
434 #[pyo3(name = "unsubscribe_book")]
435 fn py_unsubscribe_book<'py>(
436 &self,
437 py: Python<'py>,
438 instrument_id: InstrumentId,
439 ) -> PyResult<Bound<'py, PyAny>> {
440 let client = self.clone();
441
442 pyo3_async_runtimes::tokio::future_into_py(py, async move {
443 if let Err(e) = client.unsubscribe_book(instrument_id).await {
444 log::error!("Failed to unsubscribe from order book: {e}");
445 }
446 Ok(())
447 })
448 }
449
450 #[pyo3(name = "unsubscribe_book_25")]
451 fn py_unsubscribe_book_25<'py>(
452 &self,
453 py: Python<'py>,
454 instrument_id: InstrumentId,
455 ) -> PyResult<Bound<'py, PyAny>> {
456 let client = self.clone();
457
458 pyo3_async_runtimes::tokio::future_into_py(py, async move {
459 if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
460 log::error!("Failed to unsubscribe from order book 25: {e}");
461 }
462 Ok(())
463 })
464 }
465
466 #[pyo3(name = "unsubscribe_book_depth10")]
467 fn py_unsubscribe_book_depth10<'py>(
468 &self,
469 py: Python<'py>,
470 instrument_id: InstrumentId,
471 ) -> PyResult<Bound<'py, PyAny>> {
472 let client = self.clone();
473
474 pyo3_async_runtimes::tokio::future_into_py(py, async move {
475 if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
476 log::error!("Failed to unsubscribe from order book depth 10: {e}");
477 }
478 Ok(())
479 })
480 }
481
482 #[pyo3(name = "unsubscribe_quotes")]
483 fn py_unsubscribe_quotes<'py>(
484 &self,
485 py: Python<'py>,
486 instrument_id: InstrumentId,
487 ) -> PyResult<Bound<'py, PyAny>> {
488 let client = self.clone();
489
490 pyo3_async_runtimes::tokio::future_into_py(py, async move {
491 if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
492 log::error!("Failed to unsubscribe from quotes: {e}");
493 }
494 Ok(())
495 })
496 }
497
498 #[pyo3(name = "unsubscribe_trades")]
499 fn py_unsubscribe_trades<'py>(
500 &self,
501 py: Python<'py>,
502 instrument_id: InstrumentId,
503 ) -> PyResult<Bound<'py, PyAny>> {
504 let client = self.clone();
505
506 pyo3_async_runtimes::tokio::future_into_py(py, async move {
507 if let Err(e) = client.unsubscribe_trades(instrument_id).await {
508 log::error!("Failed to unsubscribe from trades: {e}");
509 }
510 Ok(())
511 })
512 }
513
514 #[pyo3(name = "unsubscribe_mark_prices")]
515 fn py_unsubscribe_mark_prices<'py>(
516 &self,
517 py: Python<'py>,
518 instrument_id: InstrumentId,
519 ) -> PyResult<Bound<'py, PyAny>> {
520 let client = self.clone();
521
522 pyo3_async_runtimes::tokio::future_into_py(py, async move {
523 if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
524 log::error!("Failed to unsubscribe from mark prices: {e}");
525 }
526 Ok(())
527 })
528 }
529
530 #[pyo3(name = "unsubscribe_index_prices")]
531 fn py_unsubscribe_index_prices<'py>(
532 &self,
533 py: Python<'py>,
534 instrument_id: InstrumentId,
535 ) -> PyResult<Bound<'py, PyAny>> {
536 let client = self.clone();
537
538 pyo3_async_runtimes::tokio::future_into_py(py, async move {
539 if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
540 log::error!("Failed to unsubscribe from index prices: {e}");
541 }
542 Ok(())
543 })
544 }
545
546 #[pyo3(name = "unsubscribe_funding_rates")]
547 fn py_unsubscribe_funding_rates<'py>(
548 &self,
549 py: Python<'py>,
550 instrument_id: InstrumentId,
551 ) -> PyResult<Bound<'py, PyAny>> {
552 let client = self.clone();
553 pyo3_async_runtimes::tokio::future_into_py(py, async move {
554 if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
555 log::error!("Failed to unsubscribe from funding rates: {e}");
556 }
557 Ok(())
558 })
559 }
560
561 #[pyo3(name = "unsubscribe_bars")]
562 fn py_unsubscribe_bars<'py>(
563 &self,
564 py: Python<'py>,
565 bar_type: BarType,
566 ) -> PyResult<Bound<'py, PyAny>> {
567 let client = self.clone();
568
569 pyo3_async_runtimes::tokio::future_into_py(py, async move {
570 if let Err(e) = client.unsubscribe_bars(bar_type).await {
571 log::error!("Failed to unsubscribe from bars: {e}");
572 }
573 Ok(())
574 })
575 }
576
577 #[pyo3(name = "subscribe_orders")]
578 fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
579 let client = self.clone();
580
581 pyo3_async_runtimes::tokio::future_into_py(py, async move {
582 if let Err(e) = client.subscribe_orders().await {
583 log::error!("Failed to subscribe to orders: {e}");
584 }
585 Ok(())
586 })
587 }
588
589 #[pyo3(name = "subscribe_executions")]
590 fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
591 let client = self.clone();
592
593 pyo3_async_runtimes::tokio::future_into_py(py, async move {
594 if let Err(e) = client.subscribe_executions().await {
595 log::error!("Failed to subscribe to executions: {e}");
596 }
597 Ok(())
598 })
599 }
600
601 #[pyo3(name = "subscribe_positions")]
602 fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
603 let client = self.clone();
604
605 pyo3_async_runtimes::tokio::future_into_py(py, async move {
606 if let Err(e) = client.subscribe_positions().await {
607 log::error!("Failed to subscribe to positions: {e}");
608 }
609 Ok(())
610 })
611 }
612
613 #[pyo3(name = "subscribe_margin")]
614 fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
615 let client = self.clone();
616
617 pyo3_async_runtimes::tokio::future_into_py(py, async move {
618 if let Err(e) = client.subscribe_margin().await {
619 log::error!("Failed to subscribe to margin: {e}");
620 }
621 Ok(())
622 })
623 }
624
625 #[pyo3(name = "subscribe_wallet")]
626 fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
627 let client = self.clone();
628
629 pyo3_async_runtimes::tokio::future_into_py(py, async move {
630 if let Err(e) = client.subscribe_wallet().await {
631 log::error!("Failed to subscribe to wallet: {e}");
632 }
633 Ok(())
634 })
635 }
636
637 #[pyo3(name = "unsubscribe_orders")]
638 fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
639 let client = self.clone();
640
641 pyo3_async_runtimes::tokio::future_into_py(py, async move {
642 if let Err(e) = client.unsubscribe_orders().await {
643 log::error!("Failed to unsubscribe from orders: {e}");
644 }
645 Ok(())
646 })
647 }
648
649 #[pyo3(name = "unsubscribe_executions")]
650 fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
651 let client = self.clone();
652
653 pyo3_async_runtimes::tokio::future_into_py(py, async move {
654 if let Err(e) = client.unsubscribe_executions().await {
655 log::error!("Failed to unsubscribe from executions: {e}");
656 }
657 Ok(())
658 })
659 }
660
661 #[pyo3(name = "unsubscribe_positions")]
662 fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
663 let client = self.clone();
664
665 pyo3_async_runtimes::tokio::future_into_py(py, async move {
666 if let Err(e) = client.unsubscribe_positions().await {
667 log::error!("Failed to unsubscribe from positions: {e}");
668 }
669 Ok(())
670 })
671 }
672
673 #[pyo3(name = "unsubscribe_margin")]
674 fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
675 let client = self.clone();
676
677 pyo3_async_runtimes::tokio::future_into_py(py, async move {
678 if let Err(e) = client.unsubscribe_margin().await {
679 log::error!("Failed to unsubscribe from margin: {e}");
680 }
681 Ok(())
682 })
683 }
684
685 #[pyo3(name = "unsubscribe_wallet")]
686 fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
687 let client = self.clone();
688
689 pyo3_async_runtimes::tokio::future_into_py(py, async move {
690 if let Err(e) = client.unsubscribe_wallet().await {
691 log::error!("Failed to unsubscribe from wallet: {e}");
692 }
693 Ok(())
694 })
695 }
696}
697
698pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
699 if let Err(e) = callback.call1(py, (py_obj,)) {
700 tracing::error!("Error calling Python: {e}");
701 }
702}