Skip to main content

nautilus_bitmex/python/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the BitMEX submit broadcaster.
17
18use nautilus_core::python::to_pyvalue_err;
19use nautilus_model::{
20    enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
21    identifiers::{ClientOrderId, InstrumentId, OrderListId},
22    python::instruments::pyobject_to_instrument_any,
23    types::{Price, Quantity},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*, types::PyDict};
26
27use crate::{
28    broadcast::submitter::{SubmitBroadcaster, SubmitBroadcasterConfig},
29    common::enums::BitmexPegPriceType,
30};
31
32#[pymethods]
33impl SubmitBroadcaster {
34    #[new]
35    #[pyo3(signature = (
36        pool_size,
37        api_key=None,
38        api_secret=None,
39        base_url=None,
40        testnet=false,
41        timeout_secs=None,
42        max_retries=None,
43        retry_delay_ms=None,
44        retry_delay_max_ms=None,
45        recv_window_ms=None,
46        max_requests_per_second=None,
47        max_requests_per_minute=None,
48        health_check_interval_secs=30,
49        health_check_timeout_secs=5,
50        expected_reject_patterns=None
51    ))]
52    #[allow(clippy::too_many_arguments)]
53    fn py_new(
54        pool_size: usize,
55        api_key: Option<String>,
56        api_secret: Option<String>,
57        base_url: Option<String>,
58        testnet: bool,
59        timeout_secs: Option<u64>,
60        max_retries: Option<u32>,
61        retry_delay_ms: Option<u64>,
62        retry_delay_max_ms: Option<u64>,
63        recv_window_ms: Option<u64>,
64        max_requests_per_second: Option<u32>,
65        max_requests_per_minute: Option<u32>,
66        health_check_interval_secs: u64,
67        health_check_timeout_secs: u64,
68        expected_reject_patterns: Option<Vec<String>>,
69    ) -> PyResult<Self> {
70        let config = SubmitBroadcasterConfig {
71            pool_size,
72            api_key,
73            api_secret,
74            base_url,
75            testnet,
76            timeout_secs,
77            max_retries,
78            retry_delay_ms,
79            retry_delay_max_ms,
80            recv_window_ms,
81            max_requests_per_second,
82            max_requests_per_minute,
83            health_check_interval_secs,
84            health_check_timeout_secs,
85            expected_reject_patterns: expected_reject_patterns
86                .unwrap_or_else(|| SubmitBroadcasterConfig::default().expected_reject_patterns),
87            proxy_urls: vec![], // TODO: Add proxy_urls parameter to Python API when needed
88        };
89
90        Self::new(config).map_err(to_pyvalue_err)
91    }
92
93    #[pyo3(name = "start")]
94    fn py_start<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
95        let broadcaster = self.clone_for_async();
96        pyo3_async_runtimes::tokio::future_into_py(py, async move {
97            broadcaster.start().await.map_err(to_pyvalue_err)
98        })
99    }
100
101    #[pyo3(name = "stop")]
102    fn py_stop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
103        let broadcaster = self.clone_for_async();
104        pyo3_async_runtimes::tokio::future_into_py(py, async move {
105            broadcaster.stop().await;
106            Ok(())
107        })
108    }
109
110    #[pyo3(name = "broadcast_submit")]
111    #[pyo3(signature = (
112        instrument_id,
113        client_order_id,
114        order_side,
115        order_type,
116        quantity,
117        time_in_force,
118        price=None,
119        trigger_price=None,
120        trigger_type=None,
121        trailing_offset=None,
122        trailing_offset_type=None,
123        display_qty=None,
124        post_only=false,
125        reduce_only=false,
126        order_list_id=None,
127        contingency_type=None,
128        submit_tries=None,
129        peg_price_type=None,
130        peg_offset_value=None
131    ))]
132    #[allow(clippy::too_many_arguments)]
133    fn py_broadcast_submit<'py>(
134        &self,
135        py: Python<'py>,
136        instrument_id: InstrumentId,
137        client_order_id: ClientOrderId,
138        order_side: OrderSide,
139        order_type: OrderType,
140        quantity: Quantity,
141        time_in_force: TimeInForce,
142        price: Option<Price>,
143        trigger_price: Option<Price>,
144        trigger_type: Option<TriggerType>,
145        trailing_offset: Option<f64>,
146        trailing_offset_type: Option<TrailingOffsetType>,
147        display_qty: Option<Quantity>,
148        post_only: bool,
149        reduce_only: bool,
150        order_list_id: Option<OrderListId>,
151        contingency_type: Option<ContingencyType>,
152        submit_tries: Option<usize>,
153        peg_price_type: Option<String>,
154        peg_offset_value: Option<f64>,
155    ) -> PyResult<Bound<'py, PyAny>> {
156        let broadcaster = self.clone_for_async();
157
158        let peg_price_type: Option<BitmexPegPriceType> = peg_price_type
159            .map(|s| {
160                s.parse::<BitmexPegPriceType>()
161                    .map_err(|_| to_pyvalue_err(format!("Invalid peg_price_type: {s}")))
162            })
163            .transpose()?;
164
165        pyo3_async_runtimes::tokio::future_into_py(py, async move {
166            let report = broadcaster
167                .broadcast_submit(
168                    instrument_id,
169                    client_order_id,
170                    order_side,
171                    order_type,
172                    quantity,
173                    time_in_force,
174                    price,
175                    trigger_price,
176                    trigger_type,
177                    trailing_offset,
178                    trailing_offset_type,
179                    display_qty,
180                    post_only,
181                    reduce_only,
182                    order_list_id,
183                    contingency_type,
184                    submit_tries,
185                    peg_price_type,
186                    peg_offset_value,
187                )
188                .await
189                .map_err(to_pyvalue_err)?;
190
191            Python::attach(|py| report.into_py_any(py))
192        })
193    }
194
195    #[pyo3(name = "get_metrics")]
196    fn py_get_metrics(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
197        let metrics = self.get_metrics();
198        let dict = PyDict::new(py);
199        dict.set_item("total_submits", metrics.total_submits)?;
200        dict.set_item("successful_submits", metrics.successful_submits)?;
201        dict.set_item("failed_submits", metrics.failed_submits)?;
202        dict.set_item("expected_rejects", metrics.expected_rejects)?;
203        dict.set_item("healthy_clients", metrics.healthy_clients)?;
204        dict.set_item("total_clients", metrics.total_clients)?;
205        Ok(dict.into())
206    }
207
208    #[pyo3(name = "get_client_stats")]
209    fn py_get_client_stats(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
210        let stats = self.get_client_stats();
211        let list = pyo3::types::PyList::empty(py);
212        for stat in stats {
213            let dict = PyDict::new(py);
214            dict.set_item("client_id", stat.client_id.clone())?;
215            dict.set_item("healthy", stat.healthy)?;
216            dict.set_item("submit_count", stat.submit_count)?;
217            dict.set_item("error_count", stat.error_count)?;
218            list.append(dict)?;
219        }
220        Ok(list.into())
221    }
222
223    #[pyo3(name = "cache_instrument")]
224    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
225        let inst_any = pyobject_to_instrument_any(py, instrument)?;
226        self.cache_instrument(inst_any);
227        Ok(())
228    }
229}