1use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 enums::OrderSide,
21 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
22 python::instruments::pyobject_to_instrument_any,
23};
24use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
25
26use crate::execution::canceller::{CancelBroadcaster, CancelBroadcasterConfig};
27
28#[pymethods]
29impl CancelBroadcaster {
30 #[new]
31 #[pyo3(signature = (
32 pool_size,
33 api_key=None,
34 api_secret=None,
35 base_url=None,
36 testnet=false,
37 timeout_secs=None,
38 max_retries=None,
39 retry_delay_ms=None,
40 retry_delay_max_ms=None,
41 recv_window_ms=None,
42 max_requests_per_second=None,
43 max_requests_per_minute=None,
44 health_check_interval_secs=30,
45 health_check_timeout_secs=5,
46 expected_reject_patterns=None,
47 idempotent_success_patterns=None,
48 proxy_urls=None
49 ))]
50 #[allow(clippy::too_many_arguments)]
51 fn py_new(
52 pool_size: usize,
53 api_key: Option<String>,
54 api_secret: Option<String>,
55 base_url: Option<String>,
56 testnet: bool,
57 timeout_secs: Option<u64>,
58 max_retries: Option<u32>,
59 retry_delay_ms: Option<u64>,
60 retry_delay_max_ms: Option<u64>,
61 recv_window_ms: Option<u64>,
62 max_requests_per_second: Option<u32>,
63 max_requests_per_minute: Option<u32>,
64 health_check_interval_secs: u64,
65 health_check_timeout_secs: u64,
66 expected_reject_patterns: Option<Vec<String>>,
67 idempotent_success_patterns: Option<Vec<String>>,
68 proxy_urls: Option<Vec<Option<String>>>,
69 ) -> PyResult<Self> {
70 let config = CancelBroadcasterConfig {
71 pool_size,
72 api_key,
73 api_secret,
74 base_url,
75 testnet,
76 timeout_secs,
77 max_retries,
78 retry_delay_ms,
79 retry_delay_max_ms,
80 recv_window_ms,
81 max_requests_per_second,
82 max_requests_per_minute,
83 health_check_interval_secs,
84 health_check_timeout_secs,
85 expected_reject_patterns: expected_reject_patterns
86 .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
87 idempotent_success_patterns: idempotent_success_patterns
88 .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
89 proxy_urls: proxy_urls.unwrap_or_default(),
90 };
91
92 Self::new(config).map_err(to_pyvalue_err)
93 }
94
95 #[pyo3(name = "cache_instrument")]
96 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
97 let inst_any = pyobject_to_instrument_any(py, instrument)?;
98 self.cache_instrument(inst_any);
99 Ok(())
100 }
101
102 #[pyo3(name = "start")]
103 fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
104 let broadcaster = self.clone_for_async();
105 pyo3_async_runtimes::tokio::future_into_py(py, async move {
106 broadcaster.start().await.map_err(to_pyvalue_err)
107 })
108 }
109
110 #[pyo3(name = "stop")]
111 fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
112 let broadcaster = self.clone_for_async();
113 pyo3_async_runtimes::tokio::future_into_py(py, async move {
114 broadcaster.stop().await;
115 Ok(())
116 })
117 }
118
119 #[pyo3(name = "broadcast_cancel")]
120 fn py_broadcast_cancel<'py>(
121 &self,
122 py: Python<'py>,
123 instrument_id: InstrumentId,
124 client_order_id: Option<ClientOrderId>,
125 venue_order_id: Option<VenueOrderId>,
126 ) -> PyResult<Bound<'py, PyAny>> {
127 let broadcaster = self.clone_for_async();
128 pyo3_async_runtimes::tokio::future_into_py(py, async move {
129 let report = broadcaster
130 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
131 .await
132 .map_err(to_pyvalue_err)?;
133
134 Python::attach(|py| match report {
135 Some(r) => r.into_py_any(py),
136 None => Ok(py.None()),
137 })
138 })
139 }
140
141 #[pyo3(name = "broadcast_batch_cancel")]
142 fn py_broadcast_batch_cancel<'py>(
143 &self,
144 py: Python<'py>,
145 instrument_id: InstrumentId,
146 client_order_ids: Option<Vec<ClientOrderId>>,
147 venue_order_ids: Option<Vec<VenueOrderId>>,
148 ) -> PyResult<Bound<'py, PyAny>> {
149 let broadcaster = self.clone_for_async();
150 pyo3_async_runtimes::tokio::future_into_py(py, async move {
151 let reports = broadcaster
152 .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
153 .await
154 .map_err(to_pyvalue_err)?;
155
156 Python::attach(|py| {
157 let py_reports: PyResult<Vec<_>> = reports
158 .into_iter()
159 .map(|report| report.into_py_any(py))
160 .collect();
161 let pylist = pyo3::types::PyList::new(py, py_reports?)
162 .unwrap()
163 .into_any()
164 .unbind();
165 Ok(pylist)
166 })
167 })
168 }
169
170 #[pyo3(name = "broadcast_cancel_all")]
171 fn py_broadcast_cancel_all<'py>(
172 &self,
173 py: Python<'py>,
174 instrument_id: InstrumentId,
175 order_side: Option<OrderSide>,
176 ) -> PyResult<Bound<'py, PyAny>> {
177 let broadcaster = self.clone_for_async();
178 pyo3_async_runtimes::tokio::future_into_py(py, async move {
179 let reports = broadcaster
180 .broadcast_cancel_all(instrument_id, order_side)
181 .await
182 .map_err(to_pyvalue_err)?;
183
184 Python::attach(|py| {
185 let py_reports: PyResult<Vec<_>> = reports
186 .into_iter()
187 .map(|report| report.into_py_any(py))
188 .collect();
189 let pylist = pyo3::types::PyList::new(py, py_reports?)
190 .unwrap()
191 .into_any()
192 .unbind();
193 Ok(pylist)
194 })
195 })
196 }
197
198 #[pyo3(name = "get_metrics")]
199 fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
200 let metrics = self.get_metrics();
201 let dict = PyDict::new(py);
202 dict.set_item("total_cancels", metrics.total_cancels)?;
203 dict.set_item("successful_cancels", metrics.successful_cancels)?;
204 dict.set_item("failed_cancels", metrics.failed_cancels)?;
205 dict.set_item("expected_rejects", metrics.expected_rejects)?;
206 dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
207 dict.set_item("healthy_clients", metrics.healthy_clients)?;
208 dict.set_item("total_clients", metrics.total_clients)?;
209 Ok(dict.into())
210 }
211
212 #[pyo3(name = "get_client_stats")]
213 fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
214 let stats = self.get_client_stats();
215 let list = pyo3::types::PyList::empty(py);
216 for stat in stats {
217 let dict = PyDict::new(py);
218 dict.set_item("client_id", stat.client_id.clone())?;
219 dict.set_item("healthy", stat.healthy)?;
220 dict.set_item("cancel_count", stat.cancel_count)?;
221 dict.set_item("error_count", stat.error_count)?;
222 list.append(dict)?;
223 }
224 Ok(list.into())
225 }
226}