nautilus_bitmex/python/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the BitMEX cancel broadcaster.
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20    enums::OrderSide,
21    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
22    python::instruments::pyobject_to_instrument_any,
23};
24use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
25
26use crate::execution::canceller::{CancelBroadcaster, CancelBroadcasterConfig};
27
28#[pymethods]
29impl CancelBroadcaster {
30    #[new]
31    #[pyo3(signature = (
32        pool_size,
33        api_key=None,
34        api_secret=None,
35        base_url=None,
36        testnet=false,
37        timeout_secs=None,
38        max_retries=None,
39        retry_delay_ms=None,
40        retry_delay_max_ms=None,
41        recv_window_ms=None,
42        max_requests_per_second=None,
43        max_requests_per_minute=None,
44        health_check_interval_secs=30,
45        health_check_timeout_secs=5,
46        expected_reject_patterns=None,
47        idempotent_success_patterns=None,
48        proxy_urls=None
49    ))]
50    #[allow(clippy::too_many_arguments)]
51    fn py_new(
52        pool_size: usize,
53        api_key: Option<String>,
54        api_secret: Option<String>,
55        base_url: Option<String>,
56        testnet: bool,
57        timeout_secs: Option<u64>,
58        max_retries: Option<u32>,
59        retry_delay_ms: Option<u64>,
60        retry_delay_max_ms: Option<u64>,
61        recv_window_ms: Option<u64>,
62        max_requests_per_second: Option<u32>,
63        max_requests_per_minute: Option<u32>,
64        health_check_interval_secs: u64,
65        health_check_timeout_secs: u64,
66        expected_reject_patterns: Option<Vec<String>>,
67        idempotent_success_patterns: Option<Vec<String>>,
68        proxy_urls: Option<Vec<Option<String>>>,
69    ) -> PyResult<Self> {
70        let config = CancelBroadcasterConfig {
71            pool_size,
72            api_key,
73            api_secret,
74            base_url,
75            testnet,
76            timeout_secs,
77            max_retries,
78            retry_delay_ms,
79            retry_delay_max_ms,
80            recv_window_ms,
81            max_requests_per_second,
82            max_requests_per_minute,
83            health_check_interval_secs,
84            health_check_timeout_secs,
85            expected_reject_patterns: expected_reject_patterns
86                .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
87            idempotent_success_patterns: idempotent_success_patterns
88                .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
89            proxy_urls: proxy_urls.unwrap_or_default(),
90        };
91
92        Self::new(config).map_err(to_pyvalue_err)
93    }
94
95    #[pyo3(name = "cache_instrument")]
96    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
97        let inst_any = pyobject_to_instrument_any(py, instrument)?;
98        self.cache_instrument(inst_any);
99        Ok(())
100    }
101
102    #[pyo3(name = "start")]
103    fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
104        let broadcaster = self.clone_for_async();
105        pyo3_async_runtimes::tokio::future_into_py(py, async move {
106            broadcaster.start().await.map_err(to_pyvalue_err)
107        })
108    }
109
110    #[pyo3(name = "stop")]
111    fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
112        let broadcaster = self.clone_for_async();
113        pyo3_async_runtimes::tokio::future_into_py(py, async move {
114            broadcaster.stop().await;
115            Ok(())
116        })
117    }
118
119    #[pyo3(name = "broadcast_cancel")]
120    fn py_broadcast_cancel<'py>(
121        &self,
122        py: Python<'py>,
123        instrument_id: InstrumentId,
124        client_order_id: Option<ClientOrderId>,
125        venue_order_id: Option<VenueOrderId>,
126    ) -> PyResult<Bound<'py, PyAny>> {
127        let broadcaster = self.clone_for_async();
128        pyo3_async_runtimes::tokio::future_into_py(py, async move {
129            let report = broadcaster
130                .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
131                .await
132                .map_err(to_pyvalue_err)?;
133
134            Python::attach(|py| match report {
135                Some(r) => r.into_py_any(py),
136                None => Ok(py.None()),
137            })
138        })
139    }
140
141    #[pyo3(name = "broadcast_batch_cancel")]
142    fn py_broadcast_batch_cancel<'py>(
143        &self,
144        py: Python<'py>,
145        instrument_id: InstrumentId,
146        client_order_ids: Option<Vec<ClientOrderId>>,
147        venue_order_ids: Option<Vec<VenueOrderId>>,
148    ) -> PyResult<Bound<'py, PyAny>> {
149        let broadcaster = self.clone_for_async();
150        pyo3_async_runtimes::tokio::future_into_py(py, async move {
151            let reports = broadcaster
152                .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
153                .await
154                .map_err(to_pyvalue_err)?;
155
156            Python::attach(|py| {
157                let py_reports: PyResult<Vec<_>> = reports
158                    .into_iter()
159                    .map(|report| report.into_py_any(py))
160                    .collect();
161                let pylist = pyo3::types::PyList::new(py, py_reports?)
162                    .unwrap()
163                    .into_any()
164                    .unbind();
165                Ok(pylist)
166            })
167        })
168    }
169
170    #[pyo3(name = "broadcast_cancel_all")]
171    fn py_broadcast_cancel_all<'py>(
172        &self,
173        py: Python<'py>,
174        instrument_id: InstrumentId,
175        order_side: Option<OrderSide>,
176    ) -> PyResult<Bound<'py, PyAny>> {
177        let broadcaster = self.clone_for_async();
178        pyo3_async_runtimes::tokio::future_into_py(py, async move {
179            let reports = broadcaster
180                .broadcast_cancel_all(instrument_id, order_side)
181                .await
182                .map_err(to_pyvalue_err)?;
183
184            Python::attach(|py| {
185                let py_reports: PyResult<Vec<_>> = reports
186                    .into_iter()
187                    .map(|report| report.into_py_any(py))
188                    .collect();
189                let pylist = pyo3::types::PyList::new(py, py_reports?)
190                    .unwrap()
191                    .into_any()
192                    .unbind();
193                Ok(pylist)
194            })
195        })
196    }
197
198    #[pyo3(name = "get_metrics")]
199    fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
200        let metrics = self.get_metrics();
201        let dict = PyDict::new(py);
202        dict.set_item("total_cancels", metrics.total_cancels)?;
203        dict.set_item("successful_cancels", metrics.successful_cancels)?;
204        dict.set_item("failed_cancels", metrics.failed_cancels)?;
205        dict.set_item("expected_rejects", metrics.expected_rejects)?;
206        dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
207        dict.set_item("healthy_clients", metrics.healthy_clients)?;
208        dict.set_item("total_clients", metrics.total_clients)?;
209        Ok(dict.into())
210    }
211
212    #[pyo3(name = "get_client_stats")]
213    fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
214        let stats = self.get_client_stats();
215        let list = pyo3::types::PyList::empty(py);
216        for stat in stats {
217            let dict = PyDict::new(py);
218            dict.set_item("client_id", stat.client_id.clone())?;
219            dict.set_item("healthy", stat.healthy)?;
220            dict.set_item("cancel_count", stat.cancel_count)?;
221            dict.set_item("error_count", stat.error_count)?;
222            list.append(dict)?;
223        }
224        Ok(list.into())
225    }
226}