Skip to main content

nautilus_dydx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the dYdX WebSocket client.
17
18use std::{
19    sync::atomic::Ordering,
20    time::{Duration, Instant},
21};
22
23use dashmap::DashMap;
24use nautilus_common::live::get_runtime;
25use nautilus_core::{UUID4, python::to_pyvalue_err, time::get_atomic_clock_realtime};
26use nautilus_model::{
27    data::BarType,
28    enums::AccountType,
29    events::AccountState,
30    identifiers::{AccountId, InstrumentId},
31    python::instruments::pyobject_to_instrument_any,
32    types::{AccountBalance, Currency, Money},
33};
34use nautilus_network::mode::ConnectionMode;
35use pyo3::{IntoPyObjectExt, prelude::*};
36
37use crate::{
38    common::{credential::DydxCredential, enums::DydxCandleResolution, parse::extract_raw_symbol},
39    execution::types::OrderContext,
40    http::{client::DydxHttpClient, parse::parse_account_state},
41    python::encoder::PyDydxClientOrderIdEncoder,
42    websocket::{
43        client::DydxWebSocketClient,
44        enums::NautilusWsMessage,
45        handler::HandlerCommand,
46        parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
47    },
48};
49
50#[pymethods]
51impl DydxWebSocketClient {
52    #[staticmethod]
53    #[pyo3(name = "new_public")]
54    fn py_new_public(url: String, heartbeat: Option<u64>) -> Self {
55        Self::new_public(url, heartbeat)
56    }
57
58    #[staticmethod]
59    #[pyo3(name = "new_private")]
60    fn py_new_private(
61        url: String,
62        private_key: String,
63        authenticator_ids: Vec<u64>,
64        account_id: AccountId,
65        heartbeat: Option<u64>,
66    ) -> PyResult<Self> {
67        let credential = DydxCredential::from_private_key(&private_key, authenticator_ids)
68            .map_err(to_pyvalue_err)?;
69        Ok(Self::new_private(url, credential, account_id, heartbeat))
70    }
71
72    #[pyo3(name = "is_connected")]
73    fn py_is_connected(&self) -> bool {
74        self.is_connected()
75    }
76
77    #[pyo3(name = "set_bars_timestamp_on_close")]
78    fn py_set_bars_timestamp_on_close(&mut self, value: bool) {
79        self.set_bars_timestamp_on_close(value);
80    }
81
82    #[pyo3(name = "set_account_id")]
83    fn py_set_account_id(&mut self, account_id: AccountId) {
84        self.set_account_id(account_id);
85    }
86
87    /// Share the HTTP client's instrument cache with this WebSocket client.
88    ///
89    /// The HTTP client's cache includes CLOB pair ID and market ticker indices
90    /// needed for parsing SubaccountsChannelData into typed execution events.
91    /// Must be called before `connect()`.
92    #[pyo3(name = "share_instrument_cache")]
93    fn py_share_instrument_cache(&mut self, http_client: &DydxHttpClient) {
94        self.set_instrument_cache(http_client.instrument_cache().clone());
95    }
96
97    #[pyo3(name = "account_id")]
98    fn py_account_id(&self) -> Option<AccountId> {
99        self.account_id()
100    }
101
102    /// Returns the shared client order ID encoder.
103    #[pyo3(name = "encoder")]
104    fn py_encoder(&self) -> PyDydxClientOrderIdEncoder {
105        PyDydxClientOrderIdEncoder::from_arc(self.encoder().clone())
106    }
107
108    #[getter]
109    fn py_url(&self) -> String {
110        self.url().to_string()
111    }
112
113    #[pyo3(name = "connect")]
114    fn py_connect<'py>(
115        &mut self,
116        py: Python<'py>,
117        instruments: Vec<Py<PyAny>>,
118        callback: Py<PyAny>,
119    ) -> PyResult<Bound<'py, PyAny>> {
120        // Convert Python instruments to Rust InstrumentAny
121        let mut instruments_any = Vec::new();
122        for inst in instruments {
123            let inst_any = pyobject_to_instrument_any(py, inst)?;
124            instruments_any.push(inst_any);
125        }
126
127        // Cache instruments first
128        self.cache_instruments(instruments_any);
129
130        let mut client = self.clone();
131
132        pyo3_async_runtimes::tokio::future_into_py(py, async move {
133            // Connect the WebSocket client
134            client.connect().await.map_err(to_pyvalue_err)?;
135
136            // Take the receiver for messages
137            if let Some(mut rx) = client.take_receiver() {
138                // Spawn task to process messages and call Python callback
139                get_runtime().spawn(async move {
140                    let _client = client; // Keep client alive in spawned task
141                    let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
142                    let order_id_map: DashMap<String, (u32, u32)> = DashMap::new();
143
144                    while let Some(msg) = rx.recv().await {
145                        match msg {
146                            NautilusWsMessage::Data(items) => {
147                                Python::attach(|py| {
148                                    for data in items {
149                                        use nautilus_model::python::data::data_to_pycapsule;
150                                        let py_obj = data_to_pycapsule(py, data);
151                                        if let Err(e) = callback.call1(py, (py_obj,)) {
152                                            log::error!("Error calling Python callback: {e}");
153                                        }
154                                    }
155                                });
156                            }
157                            NautilusWsMessage::Deltas(deltas) => {
158                                Python::attach(|py| {
159                                    use nautilus_model::{
160                                        data::{Data, OrderBookDeltas_API},
161                                        python::data::data_to_pycapsule,
162                                    };
163                                    let data = Data::Deltas(OrderBookDeltas_API::new(*deltas));
164                                    let py_obj = data_to_pycapsule(py, data);
165                                    if let Err(e) = callback.call1(py, (py_obj,)) {
166                                        log::error!("Error calling Python callback: {e}");
167                                    }
168                                });
169                            }
170                            NautilusWsMessage::BlockHeight { height, time } => {
171                                Python::attach(|py| {
172                                    use pyo3::types::PyDict;
173                                    let dict = PyDict::new(py);
174                                    let _ = dict.set_item("type", "block_height");
175                                    let _ = dict.set_item("height", height);
176                                    let _ = dict.set_item("time", time.to_rfc3339());
177                                    if let Err(e) = callback.call1(py, (dict,)) {
178                                        log::error!("Error calling Python callback for block_height: {e}");
179                                    }
180                                });
181                            }
182                            NautilusWsMessage::SubaccountSubscribed(data) => {
183                                // Get account_id from the client
184                                let Some(account_id) = _client.account_id() else {
185                                    log::warn!("Cannot parse subaccount subscription: account_id not set");
186                                    continue;
187                                };
188
189                                let instrument_cache = _client.instrument_cache();
190                                let ts_init = get_atomic_clock_realtime().get_time_ns();
191
192                                // Build maps from instrument cache
193                                let inst_map = instrument_cache.to_instrument_id_map();
194                                let oracle_map = instrument_cache.to_oracle_prices_map();
195
196                                // Parse and emit AccountState + PositionStatusReports
197                                if let Some(ref subaccount) = data.contents.subaccount {
198                                    match parse_account_state(
199                                        subaccount,
200                                        account_id,
201                                        &inst_map,
202                                        &oracle_map,
203                                        ts_init,
204                                        ts_init,
205                                    ) {
206                                        Ok(account_state) => {
207                                            Python::attach(|py| {
208                                                match account_state.into_py_any(py) {
209                                                    Ok(py_obj) => {
210                                                        if let Err(e) = callback.call1(py, (py_obj,)) {
211                                                            log::error!("Error calling Python callback for AccountState: {e}");
212                                                        }
213                                                    }
214                                                    Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
215                                                }
216                                            });
217                                        }
218                                    Err(e) => log::error!("Failed to parse account state: {e}"),
219                                }
220
221                                // Parse and emit PositionStatusReports
222                                if let Some(ref positions) = subaccount.open_perpetual_positions {
223                                    for (market, ws_position) in positions {
224                                        match parse_ws_position_report(
225                                            ws_position,
226                                            instrument_cache,
227                                            account_id,
228                                            ts_init,
229                                        ) {
230                                            Ok(report) => {
231                                                Python::attach(|py| {
232                                                    match pyo3::Py::new(py, report) {
233                                                        Ok(py_obj) => {
234                                                            if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
235                                                                log::error!("Error calling Python callback for PositionStatusReport: {e}");
236                                                            }
237                                                        }
238                                                        Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
239                                                    }
240                                                });
241                                            }
242                                            Err(e) => log::error!("Failed to parse position for {market}: {e}"),
243                                        }
244                                    }
245                                }
246                                } else {
247                                    log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
248
249                                    // Emit zero-balance account state so account gets registered
250                                    let currency = Currency::get_or_create_crypto_with_context("USDC", None);
251                                    let zero = Money::zero(currency);
252                                    let balance = AccountBalance::new_checked(zero, zero, zero)
253                                        .expect("zero balance should always be valid");
254                                    let account_state = AccountState::new(
255                                        account_id,
256                                        AccountType::Margin,
257                                        vec![balance],
258                                        vec![],
259                                        true,
260                                        UUID4::new(),
261                                        ts_init,
262                                        ts_init,
263                                        None,
264                                    );
265                                    Python::attach(|py| {
266                                        match account_state.into_py_any(py) {
267                                            Ok(py_obj) => {
268                                                if let Err(e) = callback.call1(py, (py_obj,)) {
269                                                    log::error!("Error calling Python callback for AccountState: {e}");
270                                                }
271                                            }
272                                            Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
273                                        }
274                                    });
275                                }
276                            }
277                            NautilusWsMessage::SubaccountsChannelData(data) => {
278                                let Some(account_id) = _client.account_id() else {
279                                    log::warn!("Cannot parse SubaccountsChannelData: account_id not set");
280                                    continue;
281                                };
282
283                                let instrument_cache = _client.instrument_cache();
284                                let encoder = _client.encoder();
285                                let ts_init = get_atomic_clock_realtime().get_time_ns();
286
287                                let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
288
289                                // Phase 1: Parse orders and build order_id_map (needed for fill correlation)
290                                // but DON'T send order reports yet — fills must be sent first
291                                // to prevent reconciliation from inferring fills at the limit price.
292                                let mut pending_order_reports = Vec::new();
293                                if let Some(ref orders) = data.contents.orders {
294                                    for ws_order in orders {
295                                        // Build order_id → (client_id, client_metadata) for fill correlation
296                                        if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
297                                            let client_meta = ws_order.client_metadata
298                                                .as_ref()
299                                                .and_then(|s| s.parse::<u32>().ok())
300                                                .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
301                                            order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
302                                        }
303
304                                        match parse_ws_order_report(
305                                            ws_order,
306                                            instrument_cache,
307                                            &order_contexts,
308                                            encoder,
309                                            account_id,
310                                            ts_init,
311                                        ) {
312                                            Ok(report) => {
313                                                if !report.order_status.is_open()
314                                                    && let Ok(cid) = ws_order.client_id.parse::<u32>()
315                                                {
316                                                    let meta = ws_order.client_metadata
317                                                        .as_ref()
318                                                        .and_then(|s| s.parse::<u32>().ok())
319                                                        .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
320                                                    terminal_orders.push((cid, meta, ws_order.id.clone()));
321                                                }
322                                                pending_order_reports.push(report);
323                                            }
324                                            Err(e) => log::error!("Failed to parse WS order: {e}"),
325                                        }
326                                    }
327                                }
328
329                                // Phase 2: Send fills FIRST so reconciliation sees them before
330                                // the terminal order status (prevents inferred fills at limit price)
331                                if let Some(ref fills) = data.contents.fills {
332                                    for ws_fill in fills {
333                                        match parse_ws_fill_report(
334                                            ws_fill,
335                                            instrument_cache,
336                                            &order_id_map,
337                                            &order_contexts,
338                                            encoder,
339                                            account_id,
340                                            ts_init,
341                                        ) {
342                                            Ok(report) => {
343                                                Python::attach(|py| {
344                                                    match pyo3::Py::new(py, report) {
345                                                        Ok(py_obj) => {
346                                                            if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
347                                                                log::error!("Error calling callback for FillReport: {e}");
348                                                            }
349                                                        }
350                                                        Err(e) => log::error!("Failed to convert FillReport: {e}"),
351                                                    }
352                                                });
353                                            }
354                                            Err(e) => log::error!("Failed to parse WS fill: {e}"),
355                                        }
356                                    }
357                                }
358
359                                // Phase 3: Now send order status reports
360                                for report in pending_order_reports {
361                                    Python::attach(|py| {
362                                        match pyo3::Py::new(py, report) {
363                                            Ok(py_obj) => {
364                                                if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
365                                                    log::error!("Error calling callback for OrderStatusReport: {e}");
366                                                }
367                                            }
368                                            Err(e) => log::error!("Failed to convert OrderStatusReport: {e}"),
369                                        }
370                                    });
371                                }
372
373                                // Deferred cleanup after fills are correlated
374                                for (client_id, client_metadata, order_id) in terminal_orders {
375                                    order_contexts.remove(&client_id);
376                                    encoder.remove(client_id, client_metadata);
377                                    order_id_map.remove(&order_id);
378                                }
379                            }
380                            NautilusWsMessage::MarkPrice(mark_price) => {
381                                Python::attach(|py| {
382                                    match mark_price.into_py_any(py) {
383                                        Ok(py_obj) => {
384                                            if let Err(e) = callback.call1(py, (py_obj,)) {
385                                                log::error!("Error calling Python callback for MarkPriceUpdate: {e}");
386                                            }
387                                        }
388                                        Err(e) => log::error!("Failed to convert MarkPriceUpdate to Python: {e}"),
389                                    }
390                                });
391                            }
392                            NautilusWsMessage::IndexPrice(index_price) => {
393                                Python::attach(|py| {
394                                    match index_price.into_py_any(py) {
395                                        Ok(py_obj) => {
396                                            if let Err(e) = callback.call1(py, (py_obj,)) {
397                                                log::error!("Error calling Python callback for IndexPriceUpdate: {e}");
398                                            }
399                                        }
400                                        Err(e) => log::error!("Failed to convert IndexPriceUpdate to Python: {e}"),
401                                    }
402                                });
403                            }
404                            NautilusWsMessage::FundingRate(funding_rate) => {
405                                Python::attach(|py| {
406                                    match funding_rate.into_py_any(py) {
407                                        Ok(py_obj) => {
408                                            if let Err(e) = callback.call1(py, (py_obj,)) {
409                                                log::error!("Error calling Python callback for FundingRateUpdate: {e}");
410                                            }
411                                        }
412                                        Err(e) => log::error!("Failed to convert FundingRateUpdate to Python: {e}"),
413                                    }
414                                });
415                            }
416                            NautilusWsMessage::Error(err) => {
417                                log::error!("dYdX WebSocket error: {err}");
418                            }
419                            NautilusWsMessage::Reconnected => {
420                                log::info!("dYdX WebSocket reconnected");
421                            }
422                            NautilusWsMessage::AccountState(state) => {
423                                Python::attach(|py| {
424                                    match state.into_py_any(py) {
425                                        Ok(py_obj) => {
426                                            if let Err(e) = callback.call1(py, (py_obj,)) {
427                                                log::error!("Error calling Python callback for AccountState: {e}");
428                                            }
429                                        }
430                                        Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
431                                    }
432                                });
433                            }
434                            NautilusWsMessage::Position(report) => {
435                                Python::attach(|py| {
436                                    match pyo3::Py::new(py, *report) {
437                                        Ok(py_obj) => {
438                                            if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
439                                                log::error!("Error calling Python callback for PositionStatusReport: {e}");
440                                            }
441                                        }
442                                        Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
443                                    }
444                                });
445                            }
446                            NautilusWsMessage::Order(report) => {
447                                Python::attach(|py| {
448                                    match pyo3::Py::new(py, *report) {
449                                        Ok(py_obj) => {
450                                            if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
451                                                log::error!("Error calling Python callback for OrderStatusReport: {e}");
452                                            }
453                                        }
454                                        Err(e) => log::error!("Failed to convert OrderStatusReport to Python: {e}"),
455                                    }
456                                });
457                            }
458                            NautilusWsMessage::Fill(report) => {
459                                Python::attach(|py| {
460                                    match pyo3::Py::new(py, *report) {
461                                        Ok(py_obj) => {
462                                            if let Err(e) = callback.call1(py, (py_obj.into_any(),)) {
463                                                log::error!("Error calling Python callback for FillReport: {e}");
464                                            }
465                                        }
466                                        Err(e) => log::error!("Failed to convert FillReport to Python: {e}"),
467                                    }
468                                });
469                            }
470                            NautilusWsMessage::NewInstrumentDiscovered { ticker } => {
471                                log::info!("New instrument discovered via WebSocket: {ticker}");
472                                Python::attach(|py| {
473                                    use pyo3::types::PyDict;
474                                    let dict = PyDict::new(py);
475                                    let _ = dict.set_item("type", "new_instrument_discovered");
476                                    let _ = dict.set_item("ticker", &ticker);
477                                    if let Err(e) = callback.call1(py, (dict,)) {
478                                        log::error!("Error calling Python callback for new_instrument_discovered: {e}");
479                                    }
480                                });
481                            }
482                        }
483                    }
484                });
485            }
486
487            Ok(())
488        })
489    }
490
491    #[pyo3(name = "disconnect")]
492    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
493        let mut client = self.clone();
494        pyo3_async_runtimes::tokio::future_into_py(py, async move {
495            client.disconnect().await.map_err(to_pyvalue_err)?;
496            Ok(())
497        })
498    }
499
500    #[pyo3(name = "wait_until_active")]
501    fn py_wait_until_active<'py>(
502        &self,
503        py: Python<'py>,
504        timeout_secs: f64,
505    ) -> PyResult<Bound<'py, PyAny>> {
506        let connection_mode = self.connection_mode_atomic();
507
508        pyo3_async_runtimes::tokio::future_into_py(py, async move {
509            let timeout = Duration::from_secs_f64(timeout_secs);
510            let start = Instant::now();
511
512            loop {
513                let mode = connection_mode.load();
514                let mode_u8 = mode.load(Ordering::Relaxed);
515                let is_connected = matches!(
516                    mode_u8,
517                    x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
518                );
519
520                if is_connected {
521                    break;
522                }
523
524                if start.elapsed() > timeout {
525                    return Err(to_pyvalue_err(std::io::Error::new(
526                        std::io::ErrorKind::TimedOut,
527                        format!("Client did not become active within {timeout_secs}s"),
528                    )));
529                }
530                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
531            }
532
533            Ok(())
534        })
535    }
536
537    #[pyo3(name = "cache_instrument")]
538    fn py_cache_instrument(&self, instrument: Py<PyAny>, py: Python<'_>) -> PyResult<()> {
539        let inst_any = pyobject_to_instrument_any(py, instrument)?;
540        self.cache_instrument(inst_any);
541        Ok(())
542    }
543
544    #[pyo3(name = "cache_instruments")]
545    fn py_cache_instruments(&self, instruments: Vec<Py<PyAny>>, py: Python<'_>) -> PyResult<()> {
546        let mut instruments_any = Vec::new();
547        for inst in instruments {
548            let inst_any = pyobject_to_instrument_any(py, inst)?;
549            instruments_any.push(inst_any);
550        }
551        self.cache_instruments(instruments_any);
552        Ok(())
553    }
554
555    #[pyo3(name = "is_closed")]
556    fn py_is_closed(&self) -> bool {
557        !self.is_connected()
558    }
559
560    #[pyo3(name = "subscribe_trades")]
561    fn py_subscribe_trades<'py>(
562        &self,
563        py: Python<'py>,
564        instrument_id: InstrumentId,
565    ) -> PyResult<Bound<'py, PyAny>> {
566        let client = self.clone();
567        pyo3_async_runtimes::tokio::future_into_py(py, async move {
568            client
569                .subscribe_trades(instrument_id)
570                .await
571                .map_err(to_pyvalue_err)?;
572            Ok(())
573        })
574    }
575
576    #[pyo3(name = "unsubscribe_trades")]
577    fn py_unsubscribe_trades<'py>(
578        &self,
579        py: Python<'py>,
580        instrument_id: InstrumentId,
581    ) -> PyResult<Bound<'py, PyAny>> {
582        let client = self.clone();
583        pyo3_async_runtimes::tokio::future_into_py(py, async move {
584            client
585                .unsubscribe_trades(instrument_id)
586                .await
587                .map_err(to_pyvalue_err)?;
588            Ok(())
589        })
590    }
591
592    #[pyo3(name = "subscribe_orderbook")]
593    fn py_subscribe_orderbook<'py>(
594        &self,
595        py: Python<'py>,
596        instrument_id: InstrumentId,
597    ) -> PyResult<Bound<'py, PyAny>> {
598        let client = self.clone();
599        pyo3_async_runtimes::tokio::future_into_py(py, async move {
600            client
601                .subscribe_orderbook(instrument_id)
602                .await
603                .map_err(to_pyvalue_err)?;
604            Ok(())
605        })
606    }
607
608    #[pyo3(name = "unsubscribe_orderbook")]
609    fn py_unsubscribe_orderbook<'py>(
610        &self,
611        py: Python<'py>,
612        instrument_id: InstrumentId,
613    ) -> PyResult<Bound<'py, PyAny>> {
614        let client = self.clone();
615        pyo3_async_runtimes::tokio::future_into_py(py, async move {
616            client
617                .unsubscribe_orderbook(instrument_id)
618                .await
619                .map_err(to_pyvalue_err)?;
620            Ok(())
621        })
622    }
623
624    #[pyo3(name = "subscribe_bars")]
625    fn py_subscribe_bars<'py>(
626        &self,
627        py: Python<'py>,
628        bar_type: BarType,
629    ) -> PyResult<Bound<'py, PyAny>> {
630        let spec = bar_type.spec();
631        let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
632        let resolution = resolution.to_string();
633
634        let client = self.clone();
635        let instrument_id = bar_type.instrument_id();
636
637        // Build topic for bar type registration (e.g., "ETH-USD/1MIN")
638        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
639        let topic = format!("{ticker}/{resolution}");
640
641        pyo3_async_runtimes::tokio::future_into_py(py, async move {
642            // Register bar type in handler before subscribing
643            client
644                .send_command(HandlerCommand::RegisterBarType { topic, bar_type })
645                .map_err(to_pyvalue_err)?;
646
647            // Brief delay to ensure handler processes registration
648            tokio::time::sleep(Duration::from_millis(50)).await;
649
650            client
651                .subscribe_candles(instrument_id, &resolution)
652                .await
653                .map_err(to_pyvalue_err)?;
654            Ok(())
655        })
656    }
657
658    #[pyo3(name = "unsubscribe_bars")]
659    fn py_unsubscribe_bars<'py>(
660        &self,
661        py: Python<'py>,
662        bar_type: BarType,
663    ) -> PyResult<Bound<'py, PyAny>> {
664        let spec = bar_type.spec();
665        let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
666        let resolution = resolution.to_string();
667
668        let client = self.clone();
669        let instrument_id = bar_type.instrument_id();
670
671        // Build topic for unregistration
672        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
673        let topic = format!("{ticker}/{resolution}");
674
675        pyo3_async_runtimes::tokio::future_into_py(py, async move {
676            client
677                .unsubscribe_candles(instrument_id, &resolution)
678                .await
679                .map_err(to_pyvalue_err)?;
680
681            // Unregister bar type after unsubscribing
682            client
683                .send_command(HandlerCommand::UnregisterBarType { topic })
684                .map_err(to_pyvalue_err)?;
685
686            Ok(())
687        })
688    }
689
690    #[pyo3(name = "subscribe_markets")]
691    fn py_subscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
692        let client = self.clone();
693        pyo3_async_runtimes::tokio::future_into_py(py, async move {
694            client.subscribe_markets().await.map_err(to_pyvalue_err)?;
695            Ok(())
696        })
697    }
698
699    #[pyo3(name = "unsubscribe_markets")]
700    fn py_unsubscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
701        let client = self.clone();
702        pyo3_async_runtimes::tokio::future_into_py(py, async move {
703            client.unsubscribe_markets().await.map_err(to_pyvalue_err)?;
704            Ok(())
705        })
706    }
707
708    #[pyo3(name = "subscribe_subaccount")]
709    fn py_subscribe_subaccount<'py>(
710        &self,
711        py: Python<'py>,
712        address: String,
713        subaccount_number: u32,
714    ) -> PyResult<Bound<'py, PyAny>> {
715        let client = self.clone();
716        pyo3_async_runtimes::tokio::future_into_py(py, async move {
717            client
718                .subscribe_subaccount(&address, subaccount_number)
719                .await
720                .map_err(to_pyvalue_err)?;
721            Ok(())
722        })
723    }
724
725    #[pyo3(name = "unsubscribe_subaccount")]
726    fn py_unsubscribe_subaccount<'py>(
727        &self,
728        py: Python<'py>,
729        address: String,
730        subaccount_number: u32,
731    ) -> PyResult<Bound<'py, PyAny>> {
732        let client = self.clone();
733        pyo3_async_runtimes::tokio::future_into_py(py, async move {
734            client
735                .unsubscribe_subaccount(&address, subaccount_number)
736                .await
737                .map_err(to_pyvalue_err)?;
738            Ok(())
739        })
740    }
741
742    #[pyo3(name = "subscribe_block_height")]
743    fn py_subscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
744        let client = self.clone();
745        pyo3_async_runtimes::tokio::future_into_py(py, async move {
746            client
747                .subscribe_block_height()
748                .await
749                .map_err(to_pyvalue_err)?;
750            Ok(())
751        })
752    }
753
754    #[pyo3(name = "unsubscribe_block_height")]
755    fn py_unsubscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
756        let client = self.clone();
757        pyo3_async_runtimes::tokio::future_into_py(py, async move {
758            client
759                .unsubscribe_block_height()
760                .await
761                .map_err(to_pyvalue_err)?;
762            Ok(())
763        })
764    }
765}