Skip to main content

nautilus_architect_ax/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for Ax WebSocket clients.
17
18use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err};
21use nautilus_model::{
22    data::{BarType, Data, OrderBookDeltas_API},
23    enums::{OrderSide, OrderType, TimeInForce},
24    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
25    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
26    types::{Price, Quantity},
27};
28use pyo3::{IntoPyObjectExt, prelude::*};
29
30use crate::{
31    common::enums::{AxCandleWidth, AxMarketDataLevel},
32    websocket::{
33        data::AxMdWebSocketClient,
34        messages::{AxOrdersWsMessage, NautilusDataWsMessage, NautilusExecWsMessage},
35        orders::AxOrdersWebSocketClient,
36    },
37};
38
39#[pymethods]
40impl AxMdWebSocketClient {
41    #[new]
42    #[pyo3(signature = (url, auth_token, heartbeat=None))]
43    fn py_new(url: String, auth_token: String, heartbeat: Option<u64>) -> Self {
44        Self::new(url, auth_token, heartbeat)
45    }
46
47    #[staticmethod]
48    #[pyo3(name = "without_auth")]
49    #[pyo3(signature = (url, heartbeat=None))]
50    fn py_without_auth(url: String, heartbeat: Option<u64>) -> Self {
51        Self::without_auth(url, heartbeat)
52    }
53
54    #[getter]
55    #[pyo3(name = "url")]
56    #[must_use]
57    pub fn py_url(&self) -> &str {
58        self.url()
59    }
60
61    #[pyo3(name = "is_active")]
62    #[must_use]
63    pub fn py_is_active(&self) -> bool {
64        self.is_active()
65    }
66
67    #[pyo3(name = "is_closed")]
68    #[must_use]
69    pub fn py_is_closed(&self) -> bool {
70        self.is_closed()
71    }
72
73    #[pyo3(name = "subscription_count")]
74    #[must_use]
75    pub fn py_subscription_count(&self) -> usize {
76        self.subscription_count()
77    }
78
79    #[pyo3(name = "set_auth_token")]
80    fn py_set_auth_token(&mut self, token: String) {
81        self.set_auth_token(token);
82    }
83
84    #[pyo3(name = "cache_instrument")]
85    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
86        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
87        Ok(())
88    }
89
90    #[pyo3(name = "connect")]
91    fn py_connect<'py>(
92        &mut self,
93        py: Python<'py>,
94        callback: Py<PyAny>,
95    ) -> PyResult<Bound<'py, PyAny>> {
96        let mut client = self.clone();
97
98        pyo3_async_runtimes::tokio::future_into_py(py, async move {
99            client.connect().await.map_err(to_pyruntime_err)?;
100
101            let stream = client.stream();
102
103            get_runtime().spawn(async move {
104                tokio::pin!(stream);
105
106                while let Some(msg) = stream.next().await {
107                    match msg {
108                        NautilusDataWsMessage::Data(data_vec) => {
109                            Python::attach(|py| {
110                                for data in data_vec {
111                                    let py_obj = data_to_pycapsule(py, data);
112                                    call_python(py, &callback, py_obj);
113                                }
114                            });
115                        }
116                        NautilusDataWsMessage::Deltas(deltas) => {
117                            Python::attach(|py| {
118                                let py_obj = data_to_pycapsule(
119                                    py,
120                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
121                                );
122                                call_python(py, &callback, py_obj);
123                            });
124                        }
125                        NautilusDataWsMessage::Bar(bar) => {
126                            Python::attach(|py| {
127                                let py_obj = data_to_pycapsule(py, Data::Bar(bar));
128                                call_python(py, &callback, py_obj);
129                            });
130                        }
131                        NautilusDataWsMessage::Heartbeat => {
132                            // Heartbeats are handled internally, no need to forward
133                        }
134                        NautilusDataWsMessage::Error(err) => {
135                            log::error!("AX WebSocket error: {err:?}");
136                        }
137                        NautilusDataWsMessage::Reconnected => {
138                            log::info!("AX WebSocket reconnected");
139                        }
140                    }
141                }
142            });
143
144            Ok(())
145        })
146    }
147
148    #[pyo3(name = "subscribe_book_deltas")]
149    fn py_subscribe_book_deltas<'py>(
150        &self,
151        py: Python<'py>,
152        instrument_id: InstrumentId,
153        level: AxMarketDataLevel,
154    ) -> PyResult<Bound<'py, PyAny>> {
155        let client = self.clone();
156        let symbol = instrument_id.symbol.to_string();
157
158        pyo3_async_runtimes::tokio::future_into_py(py, async move {
159            client
160                .subscribe_book_deltas(&symbol, level)
161                .await
162                .map_err(to_pyruntime_err)
163        })
164    }
165
166    #[pyo3(name = "subscribe_quotes")]
167    fn py_subscribe_quotes<'py>(
168        &self,
169        py: Python<'py>,
170        instrument_id: InstrumentId,
171    ) -> PyResult<Bound<'py, PyAny>> {
172        let client = self.clone();
173        let symbol = instrument_id.symbol.to_string();
174
175        pyo3_async_runtimes::tokio::future_into_py(py, async move {
176            client
177                .subscribe_quotes(&symbol)
178                .await
179                .map_err(to_pyruntime_err)
180        })
181    }
182
183    #[pyo3(name = "subscribe_trades")]
184    fn py_subscribe_trades<'py>(
185        &self,
186        py: Python<'py>,
187        instrument_id: InstrumentId,
188    ) -> PyResult<Bound<'py, PyAny>> {
189        let client = self.clone();
190        let symbol = instrument_id.symbol.to_string();
191
192        pyo3_async_runtimes::tokio::future_into_py(py, async move {
193            client
194                .subscribe_trades(&symbol)
195                .await
196                .map_err(to_pyruntime_err)
197        })
198    }
199
200    #[pyo3(name = "unsubscribe_book_deltas")]
201    fn py_unsubscribe_book_deltas<'py>(
202        &self,
203        py: Python<'py>,
204        instrument_id: InstrumentId,
205    ) -> PyResult<Bound<'py, PyAny>> {
206        let client = self.clone();
207        let symbol = instrument_id.symbol.to_string();
208
209        pyo3_async_runtimes::tokio::future_into_py(py, async move {
210            client
211                .unsubscribe_book_deltas(&symbol)
212                .await
213                .map_err(to_pyruntime_err)
214        })
215    }
216
217    #[pyo3(name = "subscribe_bars")]
218    fn py_subscribe_bars<'py>(
219        &self,
220        py: Python<'py>,
221        bar_type: BarType,
222    ) -> PyResult<Bound<'py, PyAny>> {
223        let client = self.clone();
224        let symbol = bar_type.instrument_id().symbol.to_string();
225        let width = AxCandleWidth::try_from(&bar_type.spec()).map_err(to_pyruntime_err)?;
226
227        pyo3_async_runtimes::tokio::future_into_py(py, async move {
228            client
229                .subscribe_candles(&symbol, width)
230                .await
231                .map_err(to_pyruntime_err)
232        })
233    }
234
235    #[pyo3(name = "unsubscribe_quotes")]
236    fn py_unsubscribe_quotes<'py>(
237        &self,
238        py: Python<'py>,
239        instrument_id: InstrumentId,
240    ) -> PyResult<Bound<'py, PyAny>> {
241        let client = self.clone();
242        let symbol = instrument_id.symbol.to_string();
243
244        pyo3_async_runtimes::tokio::future_into_py(py, async move {
245            client
246                .unsubscribe_quotes(&symbol)
247                .await
248                .map_err(to_pyruntime_err)
249        })
250    }
251
252    #[pyo3(name = "unsubscribe_trades")]
253    fn py_unsubscribe_trades<'py>(
254        &self,
255        py: Python<'py>,
256        instrument_id: InstrumentId,
257    ) -> PyResult<Bound<'py, PyAny>> {
258        let client = self.clone();
259        let symbol = instrument_id.symbol.to_string();
260
261        pyo3_async_runtimes::tokio::future_into_py(py, async move {
262            client
263                .unsubscribe_trades(&symbol)
264                .await
265                .map_err(to_pyruntime_err)
266        })
267    }
268
269    #[pyo3(name = "unsubscribe_bars")]
270    fn py_unsubscribe_bars<'py>(
271        &self,
272        py: Python<'py>,
273        bar_type: BarType,
274    ) -> PyResult<Bound<'py, PyAny>> {
275        let client = self.clone();
276        let symbol = bar_type.instrument_id().symbol.to_string();
277        let width = AxCandleWidth::try_from(&bar_type.spec()).map_err(to_pyruntime_err)?;
278
279        pyo3_async_runtimes::tokio::future_into_py(py, async move {
280            client
281                .unsubscribe_candles(&symbol, width)
282                .await
283                .map_err(to_pyruntime_err)
284        })
285    }
286
287    #[pyo3(name = "disconnect")]
288    fn py_disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
289        let client = self.clone();
290
291        pyo3_async_runtimes::tokio::future_into_py(py, async move {
292            client.disconnect().await;
293            Ok(())
294        })
295    }
296
297    #[pyo3(name = "close")]
298    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
299        let mut client = self.clone();
300
301        pyo3_async_runtimes::tokio::future_into_py(py, async move {
302            client.close().await;
303            Ok(())
304        })
305    }
306}
307
308#[pymethods]
309impl AxOrdersWebSocketClient {
310    #[new]
311    #[pyo3(signature = (url, account_id, trader_id, heartbeat=None))]
312    fn py_new(
313        url: String,
314        account_id: AccountId,
315        trader_id: TraderId,
316        heartbeat: Option<u64>,
317    ) -> Self {
318        Self::new(url, account_id, trader_id, heartbeat)
319    }
320
321    #[getter]
322    #[pyo3(name = "url")]
323    #[must_use]
324    pub fn py_url(&self) -> &str {
325        self.url()
326    }
327
328    #[getter]
329    #[pyo3(name = "account_id")]
330    #[must_use]
331    pub fn py_account_id(&self) -> AccountId {
332        self.account_id()
333    }
334
335    #[pyo3(name = "is_active")]
336    #[must_use]
337    pub fn py_is_active(&self) -> bool {
338        self.is_active()
339    }
340
341    #[pyo3(name = "is_closed")]
342    #[must_use]
343    pub fn py_is_closed(&self) -> bool {
344        self.is_closed()
345    }
346
347    #[pyo3(name = "cache_instrument")]
348    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
349        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
350        Ok(())
351    }
352
353    #[pyo3(name = "register_external_order")]
354    fn py_register_external_order(
355        &self,
356        client_order_id: ClientOrderId,
357        venue_order_id: VenueOrderId,
358        instrument_id: InstrumentId,
359        strategy_id: StrategyId,
360    ) -> bool {
361        self.register_external_order(client_order_id, venue_order_id, instrument_id, strategy_id)
362    }
363
364    #[pyo3(name = "connect")]
365    fn py_connect<'py>(
366        &mut self,
367        py: Python<'py>,
368        callback: Py<PyAny>,
369        bearer_token: String,
370    ) -> PyResult<Bound<'py, PyAny>> {
371        let mut client = self.clone();
372
373        pyo3_async_runtimes::tokio::future_into_py(py, async move {
374            client
375                .connect(&bearer_token)
376                .await
377                .map_err(to_pyruntime_err)?;
378
379            let stream = client.stream();
380
381            get_runtime().spawn(async move {
382                tokio::pin!(stream);
383
384                while let Some(msg) = stream.next().await {
385                    match msg {
386                        AxOrdersWsMessage::Nautilus(exec_msg) => {
387                            handle_exec_message(&callback, exec_msg);
388                        }
389                        AxOrdersWsMessage::PlaceOrderResponse(resp) => {
390                            log::debug!(
391                                "Place order response: rid={}, oid={}",
392                                resp.rid,
393                                resp.res.oid
394                            );
395                        }
396                        AxOrdersWsMessage::CancelOrderResponse(resp) => {
397                            log::debug!(
398                                "Cancel order response: rid={}, received={}",
399                                resp.rid,
400                                resp.res.cxl_rx
401                            );
402                        }
403                        AxOrdersWsMessage::OpenOrdersResponse(resp) => {
404                            log::debug!(
405                                "Open orders response: rid={}, count={}",
406                                resp.rid,
407                                resp.res.len()
408                            );
409                        }
410                        AxOrdersWsMessage::Error(err) => {
411                            log::error!(
412                                "AX orders WebSocket error: code={:?}, message={}, rid={:?}",
413                                err.code,
414                                err.message,
415                                err.request_id
416                            );
417                        }
418                        AxOrdersWsMessage::Reconnected => {
419                            log::info!("AX orders WebSocket reconnected");
420                        }
421                        AxOrdersWsMessage::Authenticated => {
422                            log::info!("AX orders WebSocket authenticated");
423                        }
424                    }
425                }
426            });
427
428            Ok(())
429        })
430    }
431
432    #[pyo3(name = "submit_order")]
433    #[pyo3(signature = (
434        trader_id,
435        strategy_id,
436        instrument_id,
437        client_order_id,
438        order_side,
439        order_type,
440        quantity,
441        time_in_force,
442        price=None,
443        trigger_price=None,
444        post_only=false,
445    ))]
446    #[allow(clippy::too_many_arguments)]
447    fn py_submit_order<'py>(
448        &self,
449        py: Python<'py>,
450        trader_id: TraderId,
451        strategy_id: StrategyId,
452        instrument_id: InstrumentId,
453        client_order_id: ClientOrderId,
454        order_side: OrderSide,
455        order_type: OrderType,
456        quantity: Quantity,
457        time_in_force: TimeInForce,
458        price: Option<Price>,
459        trigger_price: Option<Price>,
460        post_only: bool,
461    ) -> PyResult<Bound<'py, PyAny>> {
462        let client = self.clone();
463
464        pyo3_async_runtimes::tokio::future_into_py(py, async move {
465            client
466                .submit_order(
467                    trader_id,
468                    strategy_id,
469                    instrument_id,
470                    client_order_id,
471                    order_side,
472                    order_type,
473                    quantity,
474                    time_in_force,
475                    price,
476                    trigger_price,
477                    post_only,
478                )
479                .await
480                .map_err(to_pyruntime_err)?;
481            Ok(())
482        })
483    }
484
485    #[pyo3(name = "cancel_order")]
486    #[pyo3(signature = (client_order_id, venue_order_id=None))]
487    fn py_cancel_order<'py>(
488        &self,
489        py: Python<'py>,
490        client_order_id: ClientOrderId,
491        venue_order_id: Option<VenueOrderId>,
492    ) -> PyResult<Bound<'py, PyAny>> {
493        let client = self.clone();
494
495        pyo3_async_runtimes::tokio::future_into_py(py, async move {
496            client
497                .cancel_order(client_order_id, venue_order_id)
498                .await
499                .map_err(to_pyruntime_err)?;
500            Ok(())
501        })
502    }
503
504    #[pyo3(name = "get_open_orders")]
505    fn py_get_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
506        let client = self.clone();
507
508        pyo3_async_runtimes::tokio::future_into_py(py, async move {
509            client.get_open_orders().await.map_err(to_pyruntime_err)?;
510            Ok(())
511        })
512    }
513
514    #[pyo3(name = "disconnect")]
515    fn py_disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
516        let client = self.clone();
517
518        pyo3_async_runtimes::tokio::future_into_py(py, async move {
519            client.disconnect().await;
520            Ok(())
521        })
522    }
523
524    #[pyo3(name = "close")]
525    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
526        let mut client = self.clone();
527
528        pyo3_async_runtimes::tokio::future_into_py(py, async move {
529            client.close().await;
530            Ok(())
531        })
532    }
533}
534
535fn handle_exec_message(callback: &Py<PyAny>, msg: NautilusExecWsMessage) {
536    match msg {
537        NautilusExecWsMessage::OrderAccepted(event) => {
538            call_python_with_event(callback, move |py| {
539                event.into_py_any(py).map(|obj| obj.into_bound(py))
540            });
541        }
542        NautilusExecWsMessage::OrderFilled(event) => {
543            call_python_with_event(callback, move |py| {
544                event.into_py_any(py).map(|obj| obj.into_bound(py))
545            });
546        }
547        NautilusExecWsMessage::OrderCanceled(event) => {
548            call_python_with_event(callback, move |py| {
549                event.into_py_any(py).map(|obj| obj.into_bound(py))
550            });
551        }
552        NautilusExecWsMessage::OrderExpired(event) => {
553            call_python_with_event(callback, move |py| {
554                event.into_py_any(py).map(|obj| obj.into_bound(py))
555            });
556        }
557        NautilusExecWsMessage::OrderRejected(event) => {
558            call_python_with_event(callback, move |py| {
559                event.into_py_any(py).map(|obj| obj.into_bound(py))
560            });
561        }
562        NautilusExecWsMessage::OrderCancelRejected(event) => {
563            call_python_with_event(callback, move |py| {
564                event.into_py_any(py).map(|obj| obj.into_bound(py))
565            });
566        }
567        NautilusExecWsMessage::OrderStatusReports(reports) => {
568            for report in reports {
569                call_python_with_event(callback, move |py| {
570                    report.into_py_any(py).map(|obj| obj.into_bound(py))
571                });
572            }
573        }
574        NautilusExecWsMessage::FillReports(reports) => {
575            for report in reports {
576                call_python_with_event(callback, move |py| {
577                    report.into_py_any(py).map(|obj| obj.into_bound(py))
578                });
579            }
580        }
581    }
582}
583
584fn call_python_with_event<F>(callback: &Py<PyAny>, event_fn: F)
585where
586    F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
587{
588    Python::attach(|py| match event_fn(py) {
589        Ok(event) => {
590            if let Err(e) = callback.call1(py, (event,)) {
591                log::error!("Error calling Python callback: {e}");
592            }
593        }
594        Err(e) => {
595            log::error!("Error converting event to Python: {e}");
596        }
597    });
598}