1use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
21 python::instruments::pyobject_to_instrument_any,
22};
23use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
24
25use crate::execution::canceller::{CancelBroadcaster, CancelBroadcasterConfig};
26
27#[pymethods]
28impl CancelBroadcaster {
29 #[new]
30 #[pyo3(signature = (
31 pool_size,
32 api_key=None,
33 api_secret=None,
34 base_url=None,
35 testnet=false,
36 timeout_secs=None,
37 max_retries=None,
38 retry_delay_ms=None,
39 retry_delay_max_ms=None,
40 recv_window_ms=None,
41 max_requests_per_second=None,
42 max_requests_per_minute=None,
43 health_check_interval_secs=30,
44 health_check_timeout_secs=5,
45 expected_reject_patterns=None,
46 idempotent_success_patterns=None
47 ))]
48 #[allow(clippy::too_many_arguments)]
49 fn py_new(
50 pool_size: usize,
51 api_key: Option<String>,
52 api_secret: Option<String>,
53 base_url: Option<String>,
54 testnet: bool,
55 timeout_secs: Option<u64>,
56 max_retries: Option<u32>,
57 retry_delay_ms: Option<u64>,
58 retry_delay_max_ms: Option<u64>,
59 recv_window_ms: Option<u64>,
60 max_requests_per_second: Option<u32>,
61 max_requests_per_minute: Option<u32>,
62 health_check_interval_secs: u64,
63 health_check_timeout_secs: u64,
64 expected_reject_patterns: Option<Vec<String>>,
65 idempotent_success_patterns: Option<Vec<String>>,
66 ) -> PyResult<Self> {
67 let config = CancelBroadcasterConfig {
68 pool_size,
69 api_key,
70 api_secret,
71 base_url,
72 testnet,
73 timeout_secs,
74 max_retries,
75 retry_delay_ms,
76 retry_delay_max_ms,
77 recv_window_ms,
78 max_requests_per_second,
79 max_requests_per_minute,
80 health_check_interval_secs,
81 health_check_timeout_secs,
82 expected_reject_patterns: expected_reject_patterns
83 .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
84 idempotent_success_patterns: idempotent_success_patterns
85 .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
86 };
87
88 Self::new(config).map_err(to_pyvalue_err)
89 }
90
91 #[pyo3(name = "start")]
92 fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
93 let broadcaster = self.clone_for_async();
94 pyo3_async_runtimes::tokio::future_into_py(py, async move {
95 broadcaster.start().await.map_err(to_pyvalue_err)
96 })
97 }
98
99 #[pyo3(name = "stop")]
100 fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
101 let broadcaster = self.clone_for_async();
102 pyo3_async_runtimes::tokio::future_into_py(py, async move {
103 broadcaster.stop().await;
104 Ok(())
105 })
106 }
107
108 #[pyo3(name = "broadcast_cancel")]
109 fn py_broadcast_cancel<'py>(
110 &self,
111 py: Python<'py>,
112 instrument_id: InstrumentId,
113 client_order_id: Option<ClientOrderId>,
114 venue_order_id: Option<VenueOrderId>,
115 ) -> PyResult<Bound<'py, PyAny>> {
116 let broadcaster = self.clone_for_async();
117 pyo3_async_runtimes::tokio::future_into_py(py, async move {
118 let report = broadcaster
119 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
120 .await
121 .map_err(to_pyvalue_err)?;
122
123 Python::attach(|py| match report {
124 Some(r) => r.into_py_any(py),
125 None => Ok(py.None()),
126 })
127 })
128 }
129
130 #[pyo3(name = "broadcast_batch_cancel")]
131 fn py_broadcast_batch_cancel<'py>(
132 &self,
133 py: Python<'py>,
134 instrument_id: InstrumentId,
135 client_order_ids: Option<Vec<ClientOrderId>>,
136 venue_order_ids: Option<Vec<VenueOrderId>>,
137 ) -> PyResult<Bound<'py, PyAny>> {
138 let broadcaster = self.clone_for_async();
139 pyo3_async_runtimes::tokio::future_into_py(py, async move {
140 let reports = broadcaster
141 .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
142 .await
143 .map_err(to_pyvalue_err)?;
144
145 Python::attach(|py| {
146 let py_reports: PyResult<Vec<_>> = reports
147 .into_iter()
148 .map(|report| report.into_py_any(py))
149 .collect();
150 let pylist = pyo3::types::PyList::new(py, py_reports?)
151 .unwrap()
152 .into_any()
153 .unbind();
154 Ok(pylist)
155 })
156 })
157 }
158
159 #[pyo3(name = "broadcast_cancel_all")]
160 fn py_broadcast_cancel_all<'py>(
161 &self,
162 py: Python<'py>,
163 instrument_id: InstrumentId,
164 order_side: Option<nautilus_model::enums::OrderSide>,
165 ) -> PyResult<Bound<'py, PyAny>> {
166 let broadcaster = self.clone_for_async();
167 pyo3_async_runtimes::tokio::future_into_py(py, async move {
168 let reports = broadcaster
169 .broadcast_cancel_all(instrument_id, order_side)
170 .await
171 .map_err(to_pyvalue_err)?;
172
173 Python::attach(|py| {
174 let py_reports: PyResult<Vec<_>> = reports
175 .into_iter()
176 .map(|report| report.into_py_any(py))
177 .collect();
178 let pylist = pyo3::types::PyList::new(py, py_reports?)
179 .unwrap()
180 .into_any()
181 .unbind();
182 Ok(pylist)
183 })
184 })
185 }
186
187 #[pyo3(name = "get_metrics")]
188 fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
189 let metrics = self.get_metrics();
190 let dict = PyDict::new(py);
191 dict.set_item("total_cancels", metrics.total_cancels)?;
192 dict.set_item("successful_cancels", metrics.successful_cancels)?;
193 dict.set_item("failed_cancels", metrics.failed_cancels)?;
194 dict.set_item("expected_rejects", metrics.expected_rejects)?;
195 dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
196 dict.set_item("healthy_clients", metrics.healthy_clients)?;
197 dict.set_item("total_clients", metrics.total_clients)?;
198 Ok(dict.into())
199 }
200
201 #[pyo3(name = "get_client_stats")]
202 fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
203 let stats = self.get_client_stats();
204 let list = pyo3::types::PyList::empty(py);
205 for stat in stats {
206 let dict = PyDict::new(py);
207 dict.set_item("client_id", stat.client_id.to_string())?;
208 dict.set_item("healthy", stat.healthy)?;
209 dict.set_item("cancel_count", stat.cancel_count)?;
210 dict.set_item("error_count", stat.error_count)?;
211 list.append(dict)?;
212 }
213 Ok(list.into())
214 }
215
216 #[pyo3(name = "add_instrument")]
217 fn py_add_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
218 let inst_any = pyobject_to_instrument_any(py, instrument)?;
219 self.add_instrument(inst_any);
220 Ok(())
221 }
222}