nautilus_bitmex/python/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the BitMEX cancel broadcaster.
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
21    python::instruments::pyobject_to_instrument_any,
22};
23use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
24
25use crate::execution::canceller::{CancelBroadcaster, CancelBroadcasterConfig};
26
27#[pymethods]
28impl CancelBroadcaster {
29    #[new]
30    #[pyo3(signature = (
31        pool_size,
32        api_key=None,
33        api_secret=None,
34        base_url=None,
35        testnet=false,
36        timeout_secs=None,
37        max_retries=None,
38        retry_delay_ms=None,
39        retry_delay_max_ms=None,
40        recv_window_ms=None,
41        max_requests_per_second=None,
42        max_requests_per_minute=None,
43        health_check_interval_secs=30,
44        health_check_timeout_secs=5,
45        expected_reject_patterns=None,
46        idempotent_success_patterns=None
47    ))]
48    #[allow(clippy::too_many_arguments)]
49    fn py_new(
50        pool_size: usize,
51        api_key: Option<String>,
52        api_secret: Option<String>,
53        base_url: Option<String>,
54        testnet: bool,
55        timeout_secs: Option<u64>,
56        max_retries: Option<u32>,
57        retry_delay_ms: Option<u64>,
58        retry_delay_max_ms: Option<u64>,
59        recv_window_ms: Option<u64>,
60        max_requests_per_second: Option<u32>,
61        max_requests_per_minute: Option<u32>,
62        health_check_interval_secs: u64,
63        health_check_timeout_secs: u64,
64        expected_reject_patterns: Option<Vec<String>>,
65        idempotent_success_patterns: Option<Vec<String>>,
66    ) -> PyResult<Self> {
67        let config = CancelBroadcasterConfig {
68            pool_size,
69            api_key,
70            api_secret,
71            base_url,
72            testnet,
73            timeout_secs,
74            max_retries,
75            retry_delay_ms,
76            retry_delay_max_ms,
77            recv_window_ms,
78            max_requests_per_second,
79            max_requests_per_minute,
80            health_check_interval_secs,
81            health_check_timeout_secs,
82            expected_reject_patterns: expected_reject_patterns
83                .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
84            idempotent_success_patterns: idempotent_success_patterns
85                .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
86        };
87
88        Self::new(config).map_err(to_pyvalue_err)
89    }
90
91    #[pyo3(name = "start")]
92    fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
93        let broadcaster = self.clone_for_async();
94        pyo3_async_runtimes::tokio::future_into_py(py, async move {
95            broadcaster.start().await.map_err(to_pyvalue_err)
96        })
97    }
98
99    #[pyo3(name = "stop")]
100    fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
101        let broadcaster = self.clone_for_async();
102        pyo3_async_runtimes::tokio::future_into_py(py, async move {
103            broadcaster.stop().await;
104            Ok(())
105        })
106    }
107
108    #[pyo3(name = "broadcast_cancel")]
109    fn py_broadcast_cancel<'py>(
110        &self,
111        py: Python<'py>,
112        instrument_id: InstrumentId,
113        client_order_id: Option<ClientOrderId>,
114        venue_order_id: Option<VenueOrderId>,
115    ) -> PyResult<Bound<'py, PyAny>> {
116        let broadcaster = self.clone_for_async();
117        pyo3_async_runtimes::tokio::future_into_py(py, async move {
118            let report = broadcaster
119                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
120                .await
121                .map_err(to_pyvalue_err)?;
122
123            Python::attach(|py| match report {
124                Some(r) => r.into_py_any(py),
125                None => Ok(py.None()),
126            })
127        })
128    }
129
130    #[pyo3(name = "broadcast_batch_cancel")]
131    fn py_broadcast_batch_cancel<'py>(
132        &self,
133        py: Python<'py>,
134        instrument_id: InstrumentId,
135        client_order_ids: Option<Vec<ClientOrderId>>,
136        venue_order_ids: Option<Vec<VenueOrderId>>,
137    ) -> PyResult<Bound<'py, PyAny>> {
138        let broadcaster = self.clone_for_async();
139        pyo3_async_runtimes::tokio::future_into_py(py, async move {
140            let reports = broadcaster
141                .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
142                .await
143                .map_err(to_pyvalue_err)?;
144
145            Python::attach(|py| {
146                let py_reports: PyResult<Vec<_>> = reports
147                    .into_iter()
148                    .map(|report| report.into_py_any(py))
149                    .collect();
150                let pylist = pyo3::types::PyList::new(py, py_reports?)
151                    .unwrap()
152                    .into_any()
153                    .unbind();
154                Ok(pylist)
155            })
156        })
157    }
158
159    #[pyo3(name = "broadcast_cancel_all")]
160    fn py_broadcast_cancel_all<'py>(
161        &self,
162        py: Python<'py>,
163        instrument_id: InstrumentId,
164        order_side: Option<nautilus_model::enums::OrderSide>,
165    ) -> PyResult<Bound<'py, PyAny>> {
166        let broadcaster = self.clone_for_async();
167        pyo3_async_runtimes::tokio::future_into_py(py, async move {
168            let reports = broadcaster
169                .broadcast_cancel_all(instrument_id, order_side)
170                .await
171                .map_err(to_pyvalue_err)?;
172
173            Python::attach(|py| {
174                let py_reports: PyResult<Vec<_>> = reports
175                    .into_iter()
176                    .map(|report| report.into_py_any(py))
177                    .collect();
178                let pylist = pyo3::types::PyList::new(py, py_reports?)
179                    .unwrap()
180                    .into_any()
181                    .unbind();
182                Ok(pylist)
183            })
184        })
185    }
186
187    #[pyo3(name = "get_metrics")]
188    fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
189        let metrics = self.get_metrics();
190        let dict = PyDict::new(py);
191        dict.set_item("total_cancels", metrics.total_cancels)?;
192        dict.set_item("successful_cancels", metrics.successful_cancels)?;
193        dict.set_item("failed_cancels", metrics.failed_cancels)?;
194        dict.set_item("expected_rejects", metrics.expected_rejects)?;
195        dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
196        dict.set_item("healthy_clients", metrics.healthy_clients)?;
197        dict.set_item("total_clients", metrics.total_clients)?;
198        Ok(dict.into())
199    }
200
201    #[pyo3(name = "get_client_stats")]
202    fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
203        let stats = self.get_client_stats();
204        let list = pyo3::types::PyList::empty(py);
205        for stat in stats {
206            let dict = PyDict::new(py);
207            dict.set_item("client_id", stat.client_id.to_string())?;
208            dict.set_item("healthy", stat.healthy)?;
209            dict.set_item("cancel_count", stat.cancel_count)?;
210            dict.set_item("error_count", stat.error_count)?;
211            list.append(dict)?;
212        }
213        Ok(list.into())
214    }
215
216    #[pyo3(name = "add_instrument")]
217    fn py_add_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
218        let inst_any = pyobject_to_instrument_any(py, instrument)?;
219        self.add_instrument(inst_any);
220        Ok(())
221    }
222}