1use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
21 python::instruments::pyobject_to_instrument_any,
22};
23use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
24
25use crate::execution::canceller::{CancelBroadcaster, CancelBroadcasterConfig};
26
27#[pymethods]
28impl CancelBroadcaster {
29 #[new]
30 #[pyo3(signature = (
31 pool_size,
32 api_key=None,
33 api_secret=None,
34 base_url=None,
35 testnet=false,
36 timeout_secs=None,
37 max_retries=None,
38 retry_delay_ms=None,
39 retry_delay_max_ms=None,
40 recv_window_ms=None,
41 max_requests_per_second=None,
42 max_requests_per_minute=None,
43 health_check_interval_secs=30,
44 health_check_timeout_secs=5,
45 expected_reject_patterns=None,
46 idempotent_success_patterns=None,
47 proxy_urls=None
48 ))]
49 #[allow(clippy::too_many_arguments)]
50 fn py_new(
51 pool_size: usize,
52 api_key: Option<String>,
53 api_secret: Option<String>,
54 base_url: Option<String>,
55 testnet: bool,
56 timeout_secs: Option<u64>,
57 max_retries: Option<u32>,
58 retry_delay_ms: Option<u64>,
59 retry_delay_max_ms: Option<u64>,
60 recv_window_ms: Option<u64>,
61 max_requests_per_second: Option<u32>,
62 max_requests_per_minute: Option<u32>,
63 health_check_interval_secs: u64,
64 health_check_timeout_secs: u64,
65 expected_reject_patterns: Option<Vec<String>>,
66 idempotent_success_patterns: Option<Vec<String>>,
67 proxy_urls: Option<Vec<Option<String>>>,
68 ) -> PyResult<Self> {
69 let config = CancelBroadcasterConfig {
70 pool_size,
71 api_key,
72 api_secret,
73 base_url,
74 testnet,
75 timeout_secs,
76 max_retries,
77 retry_delay_ms,
78 retry_delay_max_ms,
79 recv_window_ms,
80 max_requests_per_second,
81 max_requests_per_minute,
82 health_check_interval_secs,
83 health_check_timeout_secs,
84 expected_reject_patterns: expected_reject_patterns
85 .unwrap_or_else(|| CancelBroadcasterConfig::default().expected_reject_patterns),
86 idempotent_success_patterns: idempotent_success_patterns
87 .unwrap_or_else(|| CancelBroadcasterConfig::default().idempotent_success_patterns),
88 proxy_urls: proxy_urls.unwrap_or_default(),
89 };
90
91 Self::new(config).map_err(to_pyvalue_err)
92 }
93
94 #[pyo3(name = "cache_instrument")]
95 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
96 let inst_any = pyobject_to_instrument_any(py, instrument)?;
97 self.cache_instrument(inst_any);
98 Ok(())
99 }
100
101 #[pyo3(name = "start")]
102 fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
103 let broadcaster = self.clone_for_async();
104 pyo3_async_runtimes::tokio::future_into_py(py, async move {
105 broadcaster.start().await.map_err(to_pyvalue_err)
106 })
107 }
108
109 #[pyo3(name = "stop")]
110 fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
111 let broadcaster = self.clone_for_async();
112 pyo3_async_runtimes::tokio::future_into_py(py, async move {
113 broadcaster.stop().await;
114 Ok(())
115 })
116 }
117
118 #[pyo3(name = "broadcast_cancel")]
119 fn py_broadcast_cancel<'py>(
120 &self,
121 py: Python<'py>,
122 instrument_id: InstrumentId,
123 client_order_id: Option<ClientOrderId>,
124 venue_order_id: Option<VenueOrderId>,
125 ) -> PyResult<Bound<'py, PyAny>> {
126 let broadcaster = self.clone_for_async();
127 pyo3_async_runtimes::tokio::future_into_py(py, async move {
128 let report = broadcaster
129 .broadcast_cancel(instrument_id, client_order_id, venue_order_id)
130 .await
131 .map_err(to_pyvalue_err)?;
132
133 Python::attach(|py| match report {
134 Some(r) => r.into_py_any(py),
135 None => Ok(py.None()),
136 })
137 })
138 }
139
140 #[pyo3(name = "broadcast_batch_cancel")]
141 fn py_broadcast_batch_cancel<'py>(
142 &self,
143 py: Python<'py>,
144 instrument_id: InstrumentId,
145 client_order_ids: Option<Vec<ClientOrderId>>,
146 venue_order_ids: Option<Vec<VenueOrderId>>,
147 ) -> PyResult<Bound<'py, PyAny>> {
148 let broadcaster = self.clone_for_async();
149 pyo3_async_runtimes::tokio::future_into_py(py, async move {
150 let reports = broadcaster
151 .broadcast_batch_cancel(instrument_id, client_order_ids, venue_order_ids)
152 .await
153 .map_err(to_pyvalue_err)?;
154
155 Python::attach(|py| {
156 let py_reports: PyResult<Vec<_>> = reports
157 .into_iter()
158 .map(|report| report.into_py_any(py))
159 .collect();
160 let pylist = pyo3::types::PyList::new(py, py_reports?)
161 .unwrap()
162 .into_any()
163 .unbind();
164 Ok(pylist)
165 })
166 })
167 }
168
169 #[pyo3(name = "broadcast_cancel_all")]
170 fn py_broadcast_cancel_all<'py>(
171 &self,
172 py: Python<'py>,
173 instrument_id: InstrumentId,
174 order_side: Option<nautilus_model::enums::OrderSide>,
175 ) -> PyResult<Bound<'py, PyAny>> {
176 let broadcaster = self.clone_for_async();
177 pyo3_async_runtimes::tokio::future_into_py(py, async move {
178 let reports = broadcaster
179 .broadcast_cancel_all(instrument_id, order_side)
180 .await
181 .map_err(to_pyvalue_err)?;
182
183 Python::attach(|py| {
184 let py_reports: PyResult<Vec<_>> = reports
185 .into_iter()
186 .map(|report| report.into_py_any(py))
187 .collect();
188 let pylist = pyo3::types::PyList::new(py, py_reports?)
189 .unwrap()
190 .into_any()
191 .unbind();
192 Ok(pylist)
193 })
194 })
195 }
196
197 #[pyo3(name = "get_metrics")]
198 fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
199 let metrics = self.get_metrics();
200 let dict = PyDict::new(py);
201 dict.set_item("total_cancels", metrics.total_cancels)?;
202 dict.set_item("successful_cancels", metrics.successful_cancels)?;
203 dict.set_item("failed_cancels", metrics.failed_cancels)?;
204 dict.set_item("expected_rejects", metrics.expected_rejects)?;
205 dict.set_item("idempotent_successes", metrics.idempotent_successes)?;
206 dict.set_item("healthy_clients", metrics.healthy_clients)?;
207 dict.set_item("total_clients", metrics.total_clients)?;
208 Ok(dict.into())
209 }
210
211 #[pyo3(name = "get_client_stats")]
212 fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
213 let stats = self.get_client_stats();
214 let list = pyo3::types::PyList::empty(py);
215 for stat in stats {
216 let dict = PyDict::new(py);
217 dict.set_item("client_id", stat.client_id.clone())?;
218 dict.set_item("healthy", stat.healthy)?;
219 dict.set_item("cancel_count", stat.cancel_count)?;
220 dict.set_item("error_count", stat.error_count)?;
221 list.append(dict)?;
222 }
223 Ok(list.into())
224 }
225}