1use std::{
19 sync::atomic::Ordering,
20 time::{Duration, Instant},
21};
22
23use dashmap::DashMap;
24use nautilus_common::live::get_runtime;
25use nautilus_core::{UUID4, python::to_pyvalue_err, time::get_atomic_clock_realtime};
26use nautilus_model::{
27 data::BarType,
28 enums::AccountType,
29 events::AccountState,
30 identifiers::{AccountId, InstrumentId},
31 python::instruments::pyobject_to_instrument_any,
32 types::{AccountBalance, Currency, Money},
33};
34use nautilus_network::mode::ConnectionMode;
35use pyo3::{IntoPyObjectExt, prelude::*};
36
37use crate::{
38 common::{credential::DydxCredential, enums::DydxCandleResolution, parse::extract_raw_symbol},
39 execution::types::OrderContext,
40 http::{client::DydxHttpClient, parse::parse_account_state},
41 python::encoder::PyDydxClientOrderIdEncoder,
42 websocket::{
43 client::DydxWebSocketClient,
44 enums::NautilusWsMessage,
45 handler::HandlerCommand,
46 parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
47 },
48};
49
50#[pymethods]
51impl DydxWebSocketClient {
52 #[staticmethod]
53 #[pyo3(name = "new_public")]
54 fn py_new_public(url: String, heartbeat: Option<u64>) -> Self {
55 Self::new_public(url, heartbeat)
56 }
57
58 #[staticmethod]
59 #[pyo3(name = "new_private")]
60 fn py_new_private(
61 url: String,
62 private_key: String,
63 authenticator_ids: Vec<u64>,
64 account_id: AccountId,
65 heartbeat: Option<u64>,
66 ) -> PyResult<Self> {
67 let credential = DydxCredential::from_private_key(&private_key, authenticator_ids)
68 .map_err(to_pyvalue_err)?;
69 Ok(Self::new_private(url, credential, account_id, heartbeat))
70 }
71
72 #[pyo3(name = "is_connected")]
73 fn py_is_connected(&self) -> bool {
74 self.is_connected()
75 }
76
77 #[pyo3(name = "set_bars_timestamp_on_close")]
78 fn py_set_bars_timestamp_on_close(&mut self, value: bool) {
79 self.set_bars_timestamp_on_close(value);
80 }
81
82 #[pyo3(name = "set_account_id")]
83 fn py_set_account_id(&mut self, account_id: AccountId) {
84 self.set_account_id(account_id);
85 }
86
87 #[pyo3(name = "share_instrument_cache")]
93 fn py_share_instrument_cache(&mut self, http_client: &DydxHttpClient) {
94 self.set_instrument_cache(http_client.instrument_cache().clone());
95 }
96
97 #[pyo3(name = "account_id")]
98 fn py_account_id(&self) -> Option<AccountId> {
99 self.account_id()
100 }
101
102 #[pyo3(name = "encoder")]
104 fn py_encoder(&self) -> PyDydxClientOrderIdEncoder {
105 PyDydxClientOrderIdEncoder::from_arc(self.encoder().clone())
106 }
107
108 #[getter]
109 fn py_url(&self) -> String {
110 self.url().to_string()
111 }
112
113 #[pyo3(name = "connect")]
114 fn py_connect<'py>(
115 &mut self,
116 py: Python<'py>,
117 instruments: Vec<Py<PyAny>>,
118 callback: Py<PyAny>,
119 ) -> PyResult<Bound<'py, PyAny>> {
120 let mut instruments_any = Vec::new();
122 for inst in instruments {
123 let inst_any = pyobject_to_instrument_any(py, inst)?;
124 instruments_any.push(inst_any);
125 }
126
127 self.cache_instruments(instruments_any);
129
130 let mut client = self.clone();
131
132 pyo3_async_runtimes::tokio::future_into_py(py, async move {
133 client.connect().await.map_err(to_pyvalue_err)?;
135
136 if let Some(mut rx) = client.take_receiver() {
138 get_runtime().spawn(async move {
140 let _client = client; let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
142 let order_id_map: DashMap<String, (u32, u32)> = DashMap::new();
143
144 while let Some(msg) = rx.recv().await {
145 match msg {
146 NautilusWsMessage::Data(items) => {
147 Python::attach(|py| {
148 for data in items {
149 use nautilus_model::python::data::data_to_pycapsule;
150 let py_obj = data_to_pycapsule(py, data);
151 if let Err(e) = callback.call1(py, (py_obj,)) {
152 log::error!("Error calling Python callback: {e}");
153 }
154 }
155 });
156 }
157 NautilusWsMessage::Deltas(deltas) => {
158 Python::attach(|py| {
159 use nautilus_model::{
160 data::{Data, OrderBookDeltas_API},
161 python::data::data_to_pycapsule,
162 };
163 let data = Data::Deltas(OrderBookDeltas_API::new(*deltas));
164 let py_obj = data_to_pycapsule(py, data);
165 if let Err(e) = callback.call1(py, (py_obj,)) {
166 log::error!("Error calling Python callback: {e}");
167 }
168 });
169 }
170 NautilusWsMessage::BlockHeight { height, time } => {
171 Python::attach(|py| {
172 use pyo3::types::PyDict;
173 let dict = PyDict::new(py);
174 let _ = dict.set_item("type", "block_height");
175 let _ = dict.set_item("height", height);
176 let _ = dict.set_item("time", time.to_rfc3339());
177 if let Err(e) = callback.call1(py, (dict,)) {
178 log::error!("Error calling Python callback for block_height: {e}");
179 }
180 });
181 }
182 NautilusWsMessage::SubaccountSubscribed(data) => {
183 let Some(account_id) = _client.account_id() else {
185 log::warn!("Cannot parse subaccount subscription: account_id not set");
186 continue;
187 };
188
189 let instrument_cache = _client.instrument_cache();
190 let ts_init = get_atomic_clock_realtime().get_time_ns();
191
192 let inst_map = instrument_cache.to_instrument_id_map();
194 let oracle_map = instrument_cache.to_oracle_prices_map();
195
196 if let Some(ref subaccount) = data.contents.subaccount {
198 match parse_account_state(
199 subaccount,
200 account_id,
201 &inst_map,
202 &oracle_map,
203 ts_init,
204 ts_init,
205 ) {
206 Ok(account_state) => {
207 Python::attach(|py| {
208 match account_state.into_py_any(py) {
209 Ok(py_obj) => {
210 if let Err(e) = callback.call1(py, (py_obj,)) {
211 log::error!("Error calling Python callback for AccountState: {e}");
212 }
213 }
214 Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
215 }
216 });
217 }
218 Err(e) => log::error!("Failed to parse account state: {e}"),
219 }
220
221 if let Some(ref positions) = subaccount.open_perpetual_positions {
223 for (market, ws_position) in positions {
224 match parse_ws_position_report(
225 ws_position,
226 instrument_cache,
227 account_id,
228 ts_init,
229 ) {
230 Ok(report) => {
231 Python::attach(|py| {
232 match pyo3::Py::new(py, report) {
233 Ok(py_obj) => {
234 if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
235 log::error!("Error calling Python callback for PositionStatusReport: {e}");
236 }
237 }
238 Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
239 }
240 });
241 }
242 Err(e) => log::error!("Failed to parse position for {market}: {e}"),
243 }
244 }
245 }
246 } else {
247 log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
248
249 let currency = Currency::get_or_create_crypto_with_context("USDC", None);
251 let zero = Money::zero(currency);
252 let balance = AccountBalance::new_checked(zero, zero, zero)
253 .expect("zero balance should always be valid");
254 let account_state = AccountState::new(
255 account_id,
256 AccountType::Margin,
257 vec![balance],
258 vec![],
259 true,
260 UUID4::new(),
261 ts_init,
262 ts_init,
263 None,
264 );
265 Python::attach(|py| {
266 match account_state.into_py_any(py) {
267 Ok(py_obj) => {
268 if let Err(e) = callback.call1(py, (py_obj,)) {
269 log::error!("Error calling Python callback for AccountState: {e}");
270 }
271 }
272 Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
273 }
274 });
275 }
276 }
277 NautilusWsMessage::SubaccountsChannelData(data) => {
278 let Some(account_id) = _client.account_id() else {
279 log::warn!("Cannot parse SubaccountsChannelData: account_id not set");
280 continue;
281 };
282
283 let instrument_cache = _client.instrument_cache();
284 let encoder = _client.encoder();
285 let ts_init = get_atomic_clock_realtime().get_time_ns();
286
287 let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
288
289 let mut pending_order_reports = Vec::new();
293 if let Some(ref orders) = data.contents.orders {
294 for ws_order in orders {
295 if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
297 let client_meta = ws_order.client_metadata
298 .as_ref()
299 .and_then(|s| s.parse::<u32>().ok())
300 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
301 order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
302 }
303
304 match parse_ws_order_report(
305 ws_order,
306 instrument_cache,
307 &order_contexts,
308 encoder,
309 account_id,
310 ts_init,
311 ) {
312 Ok(report) => {
313 if !report.order_status.is_open()
314 && let Ok(cid) = ws_order.client_id.parse::<u32>()
315 {
316 let meta = ws_order.client_metadata
317 .as_ref()
318 .and_then(|s| s.parse::<u32>().ok())
319 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
320 terminal_orders.push((cid, meta, ws_order.id.clone()));
321 }
322 pending_order_reports.push(report);
323 }
324 Err(e) => log::error!("Failed to parse WS order: {e}"),
325 }
326 }
327 }
328
329 if let Some(ref fills) = data.contents.fills {
332 for ws_fill in fills {
333 match parse_ws_fill_report(
334 ws_fill,
335 instrument_cache,
336 &order_id_map,
337 &order_contexts,
338 encoder,
339 account_id,
340 ts_init,
341 ) {
342 Ok(report) => {
343 Python::attach(|py| {
344 match pyo3::Py::new(py, report) {
345 Ok(py_obj) => {
346 if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
347 log::error!("Error calling callback for FillReport: {e}");
348 }
349 }
350 Err(e) => log::error!("Failed to convert FillReport: {e}"),
351 }
352 });
353 }
354 Err(e) => log::error!("Failed to parse WS fill: {e}"),
355 }
356 }
357 }
358
359 for report in pending_order_reports {
361 Python::attach(|py| {
362 match pyo3::Py::new(py, report) {
363 Ok(py_obj) => {
364 if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
365 log::error!("Error calling callback for OrderStatusReport: {e}");
366 }
367 }
368 Err(e) => log::error!("Failed to convert OrderStatusReport: {e}"),
369 }
370 });
371 }
372
373 for (client_id, client_metadata, order_id) in terminal_orders {
375 order_contexts.remove(&client_id);
376 encoder.remove(client_id, client_metadata);
377 order_id_map.remove(&order_id);
378 }
379 }
380 NautilusWsMessage::MarkPrice(mark_price) => {
381 Python::attach(|py| {
382 match mark_price.into_py_any(py) {
383 Ok(py_obj) => {
384 if let Err(e) = callback.call1(py, (py_obj,)) {
385 log::error!("Error calling Python callback for MarkPriceUpdate: {e}");
386 }
387 }
388 Err(e) => log::error!("Failed to convert MarkPriceUpdate to Python: {e}"),
389 }
390 });
391 }
392 NautilusWsMessage::IndexPrice(index_price) => {
393 Python::attach(|py| {
394 match index_price.into_py_any(py) {
395 Ok(py_obj) => {
396 if let Err(e) = callback.call1(py, (py_obj,)) {
397 log::error!("Error calling Python callback for IndexPriceUpdate: {e}");
398 }
399 }
400 Err(e) => log::error!("Failed to convert IndexPriceUpdate to Python: {e}"),
401 }
402 });
403 }
404 NautilusWsMessage::FundingRate(funding_rate) => {
405 Python::attach(|py| {
406 match funding_rate.into_py_any(py) {
407 Ok(py_obj) => {
408 if let Err(e) = callback.call1(py, (py_obj,)) {
409 log::error!("Error calling Python callback for FundingRateUpdate: {e}");
410 }
411 }
412 Err(e) => log::error!("Failed to convert FundingRateUpdate to Python: {e}"),
413 }
414 });
415 }
416 NautilusWsMessage::Error(err) => {
417 log::error!("dYdX WebSocket error: {err}");
418 }
419 NautilusWsMessage::Reconnected => {
420 log::info!("dYdX WebSocket reconnected");
421 }
422 NautilusWsMessage::AccountState(state) => {
423 Python::attach(|py| {
424 match state.into_py_any(py) {
425 Ok(py_obj) => {
426 if let Err(e) = callback.call1(py, (py_obj,)) {
427 log::error!("Error calling Python callback for AccountState: {e}");
428 }
429 }
430 Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
431 }
432 });
433 }
434 NautilusWsMessage::Position(report) => {
435 Python::attach(|py| {
436 match pyo3::Py::new(py, *report) {
437 Ok(py_obj) => {
438 if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
439 log::error!("Error calling Python callback for PositionStatusReport: {e}");
440 }
441 }
442 Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
443 }
444 });
445 }
446 NautilusWsMessage::Order(report) => {
447 Python::attach(|py| {
448 match pyo3::Py::new(py, *report) {
449 Ok(py_obj) => {
450 if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
451 log::error!("Error calling Python callback for OrderStatusReport: {e}");
452 }
453 }
454 Err(e) => log::error!("Failed to convert OrderStatusReport to Python: {e}"),
455 }
456 });
457 }
458 NautilusWsMessage::Fill(report) => {
459 Python::attach(|py| {
460 match pyo3::Py::new(py, *report) {
461 Ok(py_obj) => {
462 if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
463 log::error!("Error calling Python callback for FillReport: {e}");
464 }
465 }
466 Err(e) => log::error!("Failed to convert FillReport to Python: {e}"),
467 }
468 });
469 }
470 NautilusWsMessage::NewInstrumentDiscovered { ticker } => {
471 log::info!("New instrument discovered via WebSocket: {ticker}");
472 Python::attach(|py| {
473 use pyo3::types::PyDict;
474 let dict = PyDict::new(py);
475 let _ = dict.set_item("type", "new_instrument_discovered");
476 let _ = dict.set_item("ticker", &ticker);
477 if let Err(e) = callback.call1(py, (dict,)) {
478 log::error!("Error calling Python callback for new_instrument_discovered: {e}");
479 }
480 });
481 }
482 }
483 }
484 });
485 }
486
487 Ok(())
488 })
489 }
490
491 #[pyo3(name = "disconnect")]
492 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
493 let mut client = self.clone();
494 pyo3_async_runtimes::tokio::future_into_py(py, async move {
495 client.disconnect().await.map_err(to_pyvalue_err)?;
496 Ok(())
497 })
498 }
499
500 #[pyo3(name = "wait_until_active")]
501 fn py_wait_until_active<'py>(
502 &self,
503 py: Python<'py>,
504 timeout_secs: f64,
505 ) -> PyResult<Bound<'py, PyAny>> {
506 let connection_mode = self.connection_mode_atomic();
507
508 pyo3_async_runtimes::tokio::future_into_py(py, async move {
509 let timeout = Duration::from_secs_f64(timeout_secs);
510 let start = Instant::now();
511
512 loop {
513 let mode = connection_mode.load();
514 let mode_u8 = mode.load(Ordering::Relaxed);
515 let is_connected = matches!(
516 mode_u8,
517 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
518 );
519
520 if is_connected {
521 break;
522 }
523
524 if start.elapsed() > timeout {
525 return Err(to_pyvalue_err(std::io::Error::new(
526 std::io::ErrorKind::TimedOut,
527 format!("Client did not become active within {timeout_secs}s"),
528 )));
529 }
530 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
531 }
532
533 Ok(())
534 })
535 }
536
537 #[pyo3(name = "cache_instrument")]
538 fn py_cache_instrument(&self, instrument: Py<PyAny>, py: Python<'_>) -> PyResult<()> {
539 let inst_any = pyobject_to_instrument_any(py, instrument)?;
540 self.cache_instrument(inst_any);
541 Ok(())
542 }
543
544 #[pyo3(name = "cache_instruments")]
545 fn py_cache_instruments(&self, instruments: Vec<Py<PyAny>>, py: Python<'_>) -> PyResult<()> {
546 let mut instruments_any = Vec::new();
547 for inst in instruments {
548 let inst_any = pyobject_to_instrument_any(py, inst)?;
549 instruments_any.push(inst_any);
550 }
551 self.cache_instruments(instruments_any);
552 Ok(())
553 }
554
555 #[pyo3(name = "is_closed")]
556 fn py_is_closed(&self) -> bool {
557 !self.is_connected()
558 }
559
560 #[pyo3(name = "subscribe_trades")]
561 fn py_subscribe_trades<'py>(
562 &self,
563 py: Python<'py>,
564 instrument_id: InstrumentId,
565 ) -> PyResult<Bound<'py, PyAny>> {
566 let client = self.clone();
567 pyo3_async_runtimes::tokio::future_into_py(py, async move {
568 client
569 .subscribe_trades(instrument_id)
570 .await
571 .map_err(to_pyvalue_err)?;
572 Ok(())
573 })
574 }
575
576 #[pyo3(name = "unsubscribe_trades")]
577 fn py_unsubscribe_trades<'py>(
578 &self,
579 py: Python<'py>,
580 instrument_id: InstrumentId,
581 ) -> PyResult<Bound<'py, PyAny>> {
582 let client = self.clone();
583 pyo3_async_runtimes::tokio::future_into_py(py, async move {
584 client
585 .unsubscribe_trades(instrument_id)
586 .await
587 .map_err(to_pyvalue_err)?;
588 Ok(())
589 })
590 }
591
592 #[pyo3(name = "subscribe_orderbook")]
593 fn py_subscribe_orderbook<'py>(
594 &self,
595 py: Python<'py>,
596 instrument_id: InstrumentId,
597 ) -> PyResult<Bound<'py, PyAny>> {
598 let client = self.clone();
599 pyo3_async_runtimes::tokio::future_into_py(py, async move {
600 client
601 .subscribe_orderbook(instrument_id)
602 .await
603 .map_err(to_pyvalue_err)?;
604 Ok(())
605 })
606 }
607
608 #[pyo3(name = "unsubscribe_orderbook")]
609 fn py_unsubscribe_orderbook<'py>(
610 &self,
611 py: Python<'py>,
612 instrument_id: InstrumentId,
613 ) -> PyResult<Bound<'py, PyAny>> {
614 let client = self.clone();
615 pyo3_async_runtimes::tokio::future_into_py(py, async move {
616 client
617 .unsubscribe_orderbook(instrument_id)
618 .await
619 .map_err(to_pyvalue_err)?;
620 Ok(())
621 })
622 }
623
624 #[pyo3(name = "subscribe_bars")]
625 fn py_subscribe_bars<'py>(
626 &self,
627 py: Python<'py>,
628 bar_type: BarType,
629 ) -> PyResult<Bound<'py, PyAny>> {
630 let spec = bar_type.spec();
631 let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
632 let resolution = resolution.to_string();
633
634 let client = self.clone();
635 let instrument_id = bar_type.instrument_id();
636
637 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
639 let topic = format!("{ticker}/{resolution}");
640
641 pyo3_async_runtimes::tokio::future_into_py(py, async move {
642 client
644 .send_command(HandlerCommand::RegisterBarType { topic, bar_type })
645 .map_err(to_pyvalue_err)?;
646
647 tokio::time::sleep(Duration::from_millis(50)).await;
649
650 client
651 .subscribe_candles(instrument_id, &resolution)
652 .await
653 .map_err(to_pyvalue_err)?;
654 Ok(())
655 })
656 }
657
658 #[pyo3(name = "unsubscribe_bars")]
659 fn py_unsubscribe_bars<'py>(
660 &self,
661 py: Python<'py>,
662 bar_type: BarType,
663 ) -> PyResult<Bound<'py, PyAny>> {
664 let spec = bar_type.spec();
665 let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
666 let resolution = resolution.to_string();
667
668 let client = self.clone();
669 let instrument_id = bar_type.instrument_id();
670
671 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
673 let topic = format!("{ticker}/{resolution}");
674
675 pyo3_async_runtimes::tokio::future_into_py(py, async move {
676 client
677 .unsubscribe_candles(instrument_id, &resolution)
678 .await
679 .map_err(to_pyvalue_err)?;
680
681 client
683 .send_command(HandlerCommand::UnregisterBarType { topic })
684 .map_err(to_pyvalue_err)?;
685
686 Ok(())
687 })
688 }
689
690 #[pyo3(name = "subscribe_markets")]
691 fn py_subscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
692 let client = self.clone();
693 pyo3_async_runtimes::tokio::future_into_py(py, async move {
694 client.subscribe_markets().await.map_err(to_pyvalue_err)?;
695 Ok(())
696 })
697 }
698
699 #[pyo3(name = "unsubscribe_markets")]
700 fn py_unsubscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
701 let client = self.clone();
702 pyo3_async_runtimes::tokio::future_into_py(py, async move {
703 client.unsubscribe_markets().await.map_err(to_pyvalue_err)?;
704 Ok(())
705 })
706 }
707
708 #[pyo3(name = "subscribe_subaccount")]
709 fn py_subscribe_subaccount<'py>(
710 &self,
711 py: Python<'py>,
712 address: String,
713 subaccount_number: u32,
714 ) -> PyResult<Bound<'py, PyAny>> {
715 let client = self.clone();
716 pyo3_async_runtimes::tokio::future_into_py(py, async move {
717 client
718 .subscribe_subaccount(&address, subaccount_number)
719 .await
720 .map_err(to_pyvalue_err)?;
721 Ok(())
722 })
723 }
724
725 #[pyo3(name = "unsubscribe_subaccount")]
726 fn py_unsubscribe_subaccount<'py>(
727 &self,
728 py: Python<'py>,
729 address: String,
730 subaccount_number: u32,
731 ) -> PyResult<Bound<'py, PyAny>> {
732 let client = self.clone();
733 pyo3_async_runtimes::tokio::future_into_py(py, async move {
734 client
735 .unsubscribe_subaccount(&address, subaccount_number)
736 .await
737 .map_err(to_pyvalue_err)?;
738 Ok(())
739 })
740 }
741
742 #[pyo3(name = "subscribe_block_height")]
743 fn py_subscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
744 let client = self.clone();
745 pyo3_async_runtimes::tokio::future_into_py(py, async move {
746 client
747 .subscribe_block_height()
748 .await
749 .map_err(to_pyvalue_err)?;
750 Ok(())
751 })
752 }
753
754 #[pyo3(name = "unsubscribe_block_height")]
755 fn py_unsubscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
756 let client = self.clone();
757 pyo3_async_runtimes::tokio::future_into_py(py, async move {
758 client
759 .unsubscribe_block_height()
760 .await
761 .map_err(to_pyvalue_err)?;
762 Ok(())
763 })
764 }
765}