nautilus_bitmex/python/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the BitMEX cancel broadcaster.
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
21    python::instruments::pyobject_to_instrument_any,
22};
23use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
24
25use crate::execution::canceller::{CancelBroadcaster, CancelBroadcasterConfig};
26
27#[pymethods]
28impl CancelBroadcaster {
29    #[new]
30    #[pyo3(signature = (
31        pool_size,
32        api_key=None,
33        api_secret=None,
34        base_url=None,
35        testnet=false,
36        timeout_secs=None,
37        max_retries=None,
38        retry_delay_ms=None,
39        retry_delay_max_ms=None,
40        recv_window_ms=None,
41        max_requests_per_second=None,
42        max_requests_per_minute=None,
43        health_check_interval_secs=30,
44        health_check_timeout_secs=5,
45        expected_reject_patterns=None,
46        idempotent_success_patterns=None,
47        proxy_urls=None
48    ))]
49    #[allow(clippy::too_many_arguments)]
50    fn py_new(
51        pool_size: usize,
52        api_key: Option<String>,
53        api_secret: Option<String>,
54        base_url: Option<String>,
55        testnet: bool,
56        timeout_secs: Option<u64>,
57        max_retries: Option<u32>,
58        retry_delay_ms: Option<u64>,
59        retry_delay_max_ms: Option<u64>,
60        recv_window_ms: Option<u64>,
61        max_requests_per_second: Option<u32>,
62        max_requests_per_minute: Option<u32>,
63        health_check_interval_secs: u64,
64        health_check_timeout_secs: u64,
65        expected_reject_patterns: Option<Vec<String>>,
66        idempotent_success_patterns: Option<Vec<String>>,
67        proxy_urls: Option<Vec<Option<String>>>,
68    ) -> PyResult<Self> {
69        let config = CancelBroadcasterConfig {
70            pool_size,
71            api_key,
72            api_secret,
73            base_url,
74            testnet,
75            timeout_secs,
76            max_retries,
77            retry_delay_ms,
78            retry_delay_max_ms,
79            recv_window_ms,
80            max_requests_per_second,
81            max_requests_per_minute,
82            health_check_interval_secs,
83            health_check_timeout_secs,
84            expected_reject_patterns: expected_reject_patterns
85                .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
86            idempotent_success_patterns: idempotent_success_patterns
87                .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
88            proxy_urls: proxy_urls.unwrap_or_default(),
89        };
90
91        Self::new(config).map_err(to_pyvalue_err)
92    }
93
94    #[pyo3(name = "cache_instrument")]
95    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
96        let inst_any = pyobject_to_instrument_any(py, instrument)?;
97        self.cache_instrument(inst_any);
98        Ok(())
99    }
100
101    #[pyo3(name = "start")]
102    fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
103        let broadcaster = self.clone_for_async();
104        pyo3_async_runtimes::tokio::future_into_py(py, async move {
105            broadcaster.start().await.map_err(to_pyvalue_err)
106        })
107    }
108
109    #[pyo3(name = "stop")]
110    fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
111        let broadcaster = self.clone_for_async();
112        pyo3_async_runtimes::tokio::future_into_py(py, async move {
113            broadcaster.stop().await;
114            Ok(())
115        })
116    }
117
118    #[pyo3(name = "broadcast_cancel")]
119    fn py_broadcast_cancel<'py>(
120        &self,
121        py: Python<'py>,
122        instrument_id: InstrumentId,
123        client_order_id: Option<ClientOrderId>,
124        venue_order_id: Option<VenueOrderId>,
125    ) -> PyResult<Bound<'py, PyAny>> {
126        let broadcaster = self.clone_for_async();
127        pyo3_async_runtimes::tokio::future_into_py(py, async move {
128            let report = broadcaster
129                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
130                .await
131                .map_err(to_pyvalue_err)?;
132
133            Python::attach(|py| match report {
134                Some(r) => r.into_py_any(py),
135                None => Ok(py.None()),
136            })
137        })
138    }
139
140    #[pyo3(name = "broadcast_batch_cancel")]
141    fn py_broadcast_batch_cancel<'py>(
142        &self,
143        py: Python<'py>,
144        instrument_id: InstrumentId,
145        client_order_ids: Option<Vec<ClientOrderId>>,
146        venue_order_ids: Option<Vec<VenueOrderId>>,
147    ) -> PyResult<Bound<'py, PyAny>> {
148        let broadcaster = self.clone_for_async();
149        pyo3_async_runtimes::tokio::future_into_py(py, async move {
150            let reports = broadcaster
151                .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
152                .await
153                .map_err(to_pyvalue_err)?;
154
155            Python::attach(|py| {
156                let py_reports: PyResult<Vec<_>> = reports
157                    .into_iter()
158                    .map(|report| report.into_py_any(py))
159                    .collect();
160                let pylist = pyo3::types::PyList::new(py, py_reports?)
161                    .unwrap()
162                    .into_any()
163                    .unbind();
164                Ok(pylist)
165            })
166        })
167    }
168
169    #[pyo3(name = "broadcast_cancel_all")]
170    fn py_broadcast_cancel_all<'py>(
171        &self,
172        py: Python<'py>,
173        instrument_id: InstrumentId,
174        order_side: Option<nautilus_model::enums::OrderSide>,
175    ) -> PyResult<Bound<'py, PyAny>> {
176        let broadcaster = self.clone_for_async();
177        pyo3_async_runtimes::tokio::future_into_py(py, async move {
178            let reports = broadcaster
179                .broadcast_cancel_all(instrument_id, order_side)
180                .await
181                .map_err(to_pyvalue_err)?;
182
183            Python::attach(|py| {
184                let py_reports: PyResult<Vec<_>> = reports
185                    .into_iter()
186                    .map(|report| report.into_py_any(py))
187                    .collect();
188                let pylist = pyo3::types::PyList::new(py, py_reports?)
189                    .unwrap()
190                    .into_any()
191                    .unbind();
192                Ok(pylist)
193            })
194        })
195    }
196
197    #[pyo3(name = "get_metrics")]
198    fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
199        let metrics = self.get_metrics();
200        let dict = PyDict::new(py);
201        dict.set_item("total_cancels", metrics.total_cancels)?;
202        dict.set_item("successful_cancels", metrics.successful_cancels)?;
203        dict.set_item("failed_cancels", metrics.failed_cancels)?;
204        dict.set_item("expected_rejects", metrics.expected_rejects)?;
205        dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
206        dict.set_item("healthy_clients", metrics.healthy_clients)?;
207        dict.set_item("total_clients", metrics.total_clients)?;
208        Ok(dict.into())
209    }
210
211    #[pyo3(name = "get_client_stats")]
212    fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
213        let stats = self.get_client_stats();
214        let list = pyo3::types::PyList::empty(py);
215        for stat in stats {
216            let dict = PyDict::new(py);
217            dict.set_item("client_id", stat.client_id.clone())?;
218            dict.set_item("healthy", stat.healthy)?;
219            dict.set_item("cancel_count", stat.cancel_count)?;
220            dict.set_item("error_count", stat.error_count)?;
221            list.append(dict)?;
222        }
223        Ok(list.into())
224    }
225}