1use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
21 identifiers::{ClientOrderId, InstrumentId, OrderListId},
22 python::instruments::pyobject_to_instrument_any,
23 types::{Price, Quantity},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
26
27use crate::{
28 broadcast::submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
29 common::enums::BitmexPegPriceType,
30};
31
32#[pymethods]
33impl SubmitBroadcaster {
34 #[new]
35 #[pyo3(signature = (
36 pool_size,
37 api_key=None,
38 api_secret=None,
39 base_url=None,
40 testnet=false,
41 timeout_secs=None,
42 max_retries=None,
43 retry_delay_ms=None,
44 retry_delay_max_ms=None,
45 recv_window_ms=None,
46 max_requests_per_second=None,
47 max_requests_per_minute=None,
48 health_check_interval_secs=30,
49 health_check_timeout_secs=5,
50 expected_reject_patterns=None
51 ))]
52 #[allow(clippy::too_many_arguments)]
53 fn py_new(
54 pool_size: usize,
55 api_key: Option<String>,
56 api_secret: Option<String>,
57 base_url: Option<String>,
58 testnet: bool,
59 timeout_secs: Option<u64>,
60 max_retries: Option<u32>,
61 retry_delay_ms: Option<u64>,
62 retry_delay_max_ms: Option<u64>,
63 recv_window_ms: Option<u64>,
64 max_requests_per_second: Option<u32>,
65 max_requests_per_minute: Option<u32>,
66 health_check_interval_secs: u64,
67 health_check_timeout_secs: u64,
68 expected_reject_patterns: Option<Vec<String>>,
69 ) -> PyResult<Self> {
70 let config = SubmitBroadcasterConfig {
71 pool_size,
72 api_key,
73 api_secret,
74 base_url,
75 testnet,
76 timeout_secs,
77 max_retries,
78 retry_delay_ms,
79 retry_delay_max_ms,
80 recv_window_ms,
81 max_requests_per_second,
82 max_requests_per_minute,
83 health_check_interval_secs,
84 health_check_timeout_secs,
85 expected_reject_patterns: expected_reject_patterns
86 .unwrap_or_else(|| SubmitBroadcasterConfig::default().expected_reject_patterns),
87 proxy_urls: vec![], };
89
90 Self::new(config).map_err(to_pyvalue_err)
91 }
92
93 #[pyo3(name = "start")]
94 fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
95 let broadcaster = self.clone_for_async();
96 pyo3_async_runtimes::tokio::future_into_py(py, async move {
97 broadcaster.start().await.map_err(to_pyvalue_err)
98 })
99 }
100
101 #[pyo3(name = "stop")]
102 fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
103 let broadcaster = self.clone_for_async();
104 pyo3_async_runtimes::tokio::future_into_py(py, async move {
105 broadcaster.stop().await;
106 Ok(())
107 })
108 }
109
110 #[pyo3(name = "broadcast_submit")]
111 #[pyo3(signature = (
112 instrument_id,
113 client_order_id,
114 order_side,
115 order_type,
116 quantity,
117 time_in_force,
118 price=None,
119 trigger_price=None,
120 trigger_type=None,
121 trailing_offset=None,
122 trailing_offset_type=None,
123 display_qty=None,
124 post_only=false,
125 reduce_only=false,
126 order_list_id=None,
127 contingency_type=None,
128 submit_tries=None,
129 peg_price_type=None,
130 peg_offset_value=None
131 ))]
132 #[allow(clippy::too_many_arguments)]
133 fn py_broadcast_submit<'py>(
134 &self,
135 py: Python<'py>,
136 instrument_id: InstrumentId,
137 client_order_id: ClientOrderId,
138 order_side: OrderSide,
139 order_type: OrderType,
140 quantity: Quantity,
141 time_in_force: TimeInForce,
142 price: Option<Price>,
143 trigger_price: Option<Price>,
144 trigger_type: Option<TriggerType>,
145 trailing_offset: Option<f64>,
146 trailing_offset_type: Option<TrailingOffsetType>,
147 display_qty: Option<Quantity>,
148 post_only: bool,
149 reduce_only: bool,
150 order_list_id: Option<OrderListId>,
151 contingency_type: Option<ContingencyType>,
152 submit_tries: Option<usize>,
153 peg_price_type: Option<String>,
154 peg_offset_value: Option<f64>,
155 ) -> PyResult<Bound<'py, PyAny>> {
156 let broadcaster = self.clone_for_async();
157
158 let peg_price_type: Option<BitmexPegPriceType> = peg_price_type
159 .map(|s| {
160 s.parse::<BitmexPegPriceType>()
161 .map_err(|_| to_pyvalue_err(format!("Invalid peg_price_type: {s}")))
162 })
163 .transpose()?;
164
165 pyo3_async_runtimes::tokio::future_into_py(py, async move {
166 let report = broadcaster
167 .broadcast_submit(
168 instrument_id,
169 client_order_id,
170 order_side,
171 order_type,
172 quantity,
173 time_in_force,
174 price,
175 trigger_price,
176 trigger_type,
177 trailing_offset,
178 trailing_offset_type,
179 display_qty,
180 post_only,
181 reduce_only,
182 order_list_id,
183 contingency_type,
184 submit_tries,
185 peg_price_type,
186 peg_offset_value,
187 )
188 .await
189 .map_err(to_pyvalue_err)?;
190
191 Python::attach(|py| report.into_py_any(py))
192 })
193 }
194
195 #[pyo3(name = "get_metrics")]
196 fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
197 let metrics = self.get_metrics();
198 let dict = PyDict::new(py);
199 dict.set_item("total_submits", metrics.total_submits)?;
200 dict.set_item("successful_submits", metrics.successful_submits)?;
201 dict.set_item("failed_submits", metrics.failed_submits)?;
202 dict.set_item("expected_rejects", metrics.expected_rejects)?;
203 dict.set_item("healthy_clients", metrics.healthy_clients)?;
204 dict.set_item("total_clients", metrics.total_clients)?;
205 Ok(dict.into())
206 }
207
208 #[pyo3(name = "get_client_stats")]
209 fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
210 let stats = self.get_client_stats();
211 let list = pyo3::types::PyList::empty(py);
212 for stat in stats {
213 let dict = PyDict::new(py);
214 dict.set_item("client_id", stat.client_id.clone())?;
215 dict.set_item("healthy", stat.healthy)?;
216 dict.set_item("submit_count", stat.submit_count)?;
217 dict.set_item("error_count", stat.error_count)?;
218 list.append(dict)?;
219 }
220 Ok(list.into())
221 }
222
223 #[pyo3(name = "cache_instrument")]
224 fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
225 let inst_any = pyobject_to_instrument_any(py, instrument)?;
226 self.cache_instrument(inst_any);
227 Ok(())
228 }
229}