1use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TriggerType},
21 identifiers::{ClientOrderId, InstrumentId, OrderListId},
22 python::instruments::pyobject_to_instrument_any,
23 types::{Price, Quantity},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
26
27use crate::execution::submitter::{SubmitBroadcaster, SubmitBroadcasterConfig};
28
29#[pymethods]
30impl SubmitBroadcaster {
31 #[new]
32 #[pyo3(signature = (
33 pool_size,
34 api_key=None,
35 api_secret=None,
36 base_url=None,
37 testnet=false,
38 timeout_secs=None,
39 max_retries=None,
40 retry_delay_ms=None,
41 retry_delay_max_ms=None,
42 recv_window_ms=None,
43 max_requests_per_second=None,
44 max_requests_per_minute=None,
45 health_check_interval_secs=30,
46 health_check_timeout_secs=5,
47 expected_reject_patterns=None
48 ))]
49 #[allow(clippy::too_many_arguments)]
50 fn py_new(
51 pool_size: usize,
52 api_key: Option<String>,
53 api_secret: Option<String>,
54 base_url: Option<String>,
55 testnet: bool,
56 timeout_secs: Option<u64>,
57 max_retries: Option<u32>,
58 retry_delay_ms: Option<u64>,
59 retry_delay_max_ms: Option<u64>,
60 recv_window_ms: Option<u64>,
61 max_requests_per_second: Option<u32>,
62 max_requests_per_minute: Option<u32>,
63 health_check_interval_secs: u64,
64 health_check_timeout_secs: u64,
65 expected_reject_patterns: Option<Vec<String>>,
66 ) -> PyResult<Self> {
67 let config = SubmitBroadcasterConfig {
68 pool_size,
69 api_key,
70 api_secret,
71 base_url,
72 testnet,
73 timeout_secs,
74 max_retries,
75 retry_delay_ms,
76 retry_delay_max_ms,
77 recv_window_ms,
78 max_requests_per_second,
79 max_requests_per_minute,
80 health_check_interval_secs,
81 health_check_timeout_secs,
82 expected_reject_patterns: expected_reject_patterns
83 .unwrap_or_else(|| SubmitBroadcasterConfig::default().expected_reject_patterns),
84 proxy_urls: vec![], };
86
87 Self::new(config).map_err(to_pyvalue_err)
88 }
89
90 #[pyo3(name = "start")]
91 fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
92 let broadcaster = self.clone_for_async();
93 pyo3_async_runtimes::tokio::future_into_py(py, async move {
94 broadcaster.start().await.map_err(to_pyvalue_err)
95 })
96 }
97
98 #[pyo3(name = "stop")]
99 fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
100 let broadcaster = self.clone_for_async();
101 pyo3_async_runtimes::tokio::future_into_py(py, async move {
102 broadcaster.stop().await;
103 Ok(())
104 })
105 }
106
107 #[pyo3(name = "broadcast_submit")]
108 #[pyo3(signature = (
109 instrument_id,
110 client_order_id,
111 order_side,
112 order_type,
113 quantity,
114 time_in_force,
115 price=None,
116 trigger_price=None,
117 trigger_type=None,
118 display_qty=None,
119 post_only=false,
120 reduce_only=false,
121 order_list_id=None,
122 contingency_type=None,
123 submit_tries=None
124 ))]
125 #[allow(clippy::too_many_arguments)]
126 fn py_broadcast_submit<'py>(
127 &self,
128 py: Python<'py>,
129 instrument_id: InstrumentId,
130 client_order_id: ClientOrderId,
131 order_side: OrderSide,
132 order_type: OrderType,
133 quantity: Quantity,
134 time_in_force: TimeInForce,
135 price: Option<Price>,
136 trigger_price: Option<Price>,
137 trigger_type: Option<TriggerType>,
138 display_qty: Option<Quantity>,
139 post_only: bool,
140 reduce_only: bool,
141 order_list_id: Option<OrderListId>,
142 contingency_type: Option<ContingencyType>,
143 submit_tries: Option<usize>,
144 ) -> PyResult<Bound<'py, PyAny>> {
145 let broadcaster = self.clone_for_async();
146 pyo3_async_runtimes::tokio::future_into_py(py, async move {
147 let report = broadcaster
148 .broadcast_submit(
149 instrument_id,
150 client_order_id,
151 order_side,
152 order_type,
153 quantity,
154 time_in_force,
155 price,
156 trigger_price,
157 trigger_type,
158 display_qty,
159 post_only,
160 reduce_only,
161 order_list_id,
162 contingency_type,
163 submit_tries,
164 )
165 .await
166 .map_err(to_pyvalue_err)?;
167
168 Python::attach(|py| report.into_py_any(py))
169 })
170 }
171
172 #[pyo3(name = "get_metrics")]
173 fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
174 let metrics = self.get_metrics();
175 let dict = PyDict::new(py);
176 dict.set_item("total_submits", metrics.total_submits)?;
177 dict.set_item("successful_submits", metrics.successful_submits)?;
178 dict.set_item("failed_submits", metrics.failed_submits)?;
179 dict.set_item("expected_rejects", metrics.expected_rejects)?;
180 dict.set_item("healthy_clients", metrics.healthy_clients)?;
181 dict.set_item("total_clients", metrics.total_clients)?;
182 Ok(dict.into())
183 }
184
185 #[pyo3(name = "get_client_stats")]
186 fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
187 let stats = self.get_client_stats();
188 let list = pyo3::types::PyList::empty(py);
189 for stat in stats {
190 let dict = PyDict::new(py);
191 dict.set_item("client_id", stat.client_id.clone())?;
192 dict.set_item("healthy", stat.healthy)?;
193 dict.set_item("submit_count", stat.submit_count)?;
194 dict.set_item("error_count", stat.error_count)?;
195 list.append(dict)?;
196 }
197 Ok(list.into())
198 }
199
200 #[pyo3(name = "cache_instrument")]
201 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
202 let inst_any = pyobject_to_instrument_any(py, instrument)?;
203 self.cache_instrument(inst_any);
204 Ok(())
205 }
206}