1use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err};
21use nautilus_model::{
22 data::{BarType, Data, OrderBookDeltas_API},
23 enums::{OrderSide, OrderType, TimeInForce},
24 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
25 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
26 types::{Price, Quantity},
27};
28use pyo3::{IntoPyObjectExt, prelude::*};
29
30use crate::{
31 common::enums::{AxCandleWidth, AxMarketDataLevel},
32 websocket::{
33 data::AxMdWebSocketClient,
34 messages::{AxOrdersWsMessage, NautilusDataWsMessage, NautilusExecWsMessage},
35 orders::AxOrdersWebSocketClient,
36 },
37};
38
39#[pymethods]
40impl AxMdWebSocketClient {
41 #[new]
42 #[pyo3(signature = (url, auth_token, heartbeat=None))]
43 fn py_new(url: String, auth_token: String, heartbeat: Option<u64>) -> Self {
44 Self::new(url, auth_token, heartbeat)
45 }
46
47 #[staticmethod]
48 #[pyo3(name = "without_auth")]
49 #[pyo3(signature = (url, heartbeat=None))]
50 fn py_without_auth(url: String, heartbeat: Option<u64>) -> Self {
51 Self::without_auth(url, heartbeat)
52 }
53
54 #[getter]
55 #[pyo3(name = "url")]
56 #[must_use]
57 pub fn py_url(&self) -> &str {
58 self.url()
59 }
60
61 #[pyo3(name = "is_active")]
62 #[must_use]
63 pub fn py_is_active(&self) -> bool {
64 self.is_active()
65 }
66
67 #[pyo3(name = "is_closed")]
68 #[must_use]
69 pub fn py_is_closed(&self) -> bool {
70 self.is_closed()
71 }
72
73 #[pyo3(name = "subscription_count")]
74 #[must_use]
75 pub fn py_subscription_count(&self) -> usize {
76 self.subscription_count()
77 }
78
79 #[pyo3(name = "set_auth_token")]
80 fn py_set_auth_token(&mut self, token: String) {
81 self.set_auth_token(token);
82 }
83
84 #[pyo3(name = "cache_instrument")]
85 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
86 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
87 Ok(())
88 }
89
90 #[pyo3(name = "connect")]
91 fn py_connect<'py>(
92 &mut self,
93 py: Python<'py>,
94 callback: Py<PyAny>,
95 ) -> PyResult<Bound<'py, PyAny>> {
96 let mut client = self.clone();
97
98 pyo3_async_runtimes::tokio::future_into_py(py, async move {
99 client.connect().await.map_err(to_pyruntime_err)?;
100
101 let stream = client.stream();
102
103 get_runtime().spawn(async move {
104 tokio::pin!(stream);
105
106 while let Some(msg) = stream.next().await {
107 match msg {
108 NautilusDataWsMessage::Data(data_vec) => {
109 Python::attach(|py| {
110 for data in data_vec {
111 let py_obj = data_to_pycapsule(py, data);
112 call_python(py, &callback, py_obj);
113 }
114 });
115 }
116 NautilusDataWsMessage::Deltas(deltas) => {
117 Python::attach(|py| {
118 let py_obj = data_to_pycapsule(
119 py,
120 Data::Deltas(OrderBookDeltas_API::new(deltas)),
121 );
122 call_python(py, &callback, py_obj);
123 });
124 }
125 NautilusDataWsMessage::Bar(bar) => {
126 Python::attach(|py| {
127 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
128 call_python(py, &callback, py_obj);
129 });
130 }
131 NautilusDataWsMessage::Heartbeat => {
132 }
134 NautilusDataWsMessage::Error(err) => {
135 log::error!("AX WebSocket error: {err:?}");
136 }
137 NautilusDataWsMessage::Reconnected => {
138 log::info!("AX WebSocket reconnected");
139 }
140 }
141 }
142 });
143
144 Ok(())
145 })
146 }
147
148 #[pyo3(name = "subscribe_book_deltas")]
149 fn py_subscribe_book_deltas<'py>(
150 &self,
151 py: Python<'py>,
152 instrument_id: InstrumentId,
153 level: AxMarketDataLevel,
154 ) -> PyResult<Bound<'py, PyAny>> {
155 let client = self.clone();
156 let symbol = instrument_id.symbol.to_string();
157
158 pyo3_async_runtimes::tokio::future_into_py(py, async move {
159 client
160 .subscribe_book_deltas(&symbol, level)
161 .await
162 .map_err(to_pyruntime_err)
163 })
164 }
165
166 #[pyo3(name = "subscribe_quotes")]
167 fn py_subscribe_quotes<'py>(
168 &self,
169 py: Python<'py>,
170 instrument_id: InstrumentId,
171 ) -> PyResult<Bound<'py, PyAny>> {
172 let client = self.clone();
173 let symbol = instrument_id.symbol.to_string();
174
175 pyo3_async_runtimes::tokio::future_into_py(py, async move {
176 client
177 .subscribe_quotes(&symbol)
178 .await
179 .map_err(to_pyruntime_err)
180 })
181 }
182
183 #[pyo3(name = "subscribe_trades")]
184 fn py_subscribe_trades<'py>(
185 &self,
186 py: Python<'py>,
187 instrument_id: InstrumentId,
188 ) -> PyResult<Bound<'py, PyAny>> {
189 let client = self.clone();
190 let symbol = instrument_id.symbol.to_string();
191
192 pyo3_async_runtimes::tokio::future_into_py(py, async move {
193 client
194 .subscribe_trades(&symbol)
195 .await
196 .map_err(to_pyruntime_err)
197 })
198 }
199
200 #[pyo3(name = "unsubscribe_book_deltas")]
201 fn py_unsubscribe_book_deltas<'py>(
202 &self,
203 py: Python<'py>,
204 instrument_id: InstrumentId,
205 ) -> PyResult<Bound<'py, PyAny>> {
206 let client = self.clone();
207 let symbol = instrument_id.symbol.to_string();
208
209 pyo3_async_runtimes::tokio::future_into_py(py, async move {
210 client
211 .unsubscribe_book_deltas(&symbol)
212 .await
213 .map_err(to_pyruntime_err)
214 })
215 }
216
217 #[pyo3(name = "subscribe_bars")]
218 fn py_subscribe_bars<'py>(
219 &self,
220 py: Python<'py>,
221 bar_type: BarType,
222 ) -> PyResult<Bound<'py, PyAny>> {
223 let client = self.clone();
224 let symbol = bar_type.instrument_id().symbol.to_string();
225 let width = AxCandleWidth::try_from(&bar_type.spec()).map_err(to_pyruntime_err)?;
226
227 pyo3_async_runtimes::tokio::future_into_py(py, async move {
228 client
229 .subscribe_candles(&symbol, width)
230 .await
231 .map_err(to_pyruntime_err)
232 })
233 }
234
235 #[pyo3(name = "unsubscribe_quotes")]
236 fn py_unsubscribe_quotes<'py>(
237 &self,
238 py: Python<'py>,
239 instrument_id: InstrumentId,
240 ) -> PyResult<Bound<'py, PyAny>> {
241 let client = self.clone();
242 let symbol = instrument_id.symbol.to_string();
243
244 pyo3_async_runtimes::tokio::future_into_py(py, async move {
245 client
246 .unsubscribe_quotes(&symbol)
247 .await
248 .map_err(to_pyruntime_err)
249 })
250 }
251
252 #[pyo3(name = "unsubscribe_trades")]
253 fn py_unsubscribe_trades<'py>(
254 &self,
255 py: Python<'py>,
256 instrument_id: InstrumentId,
257 ) -> PyResult<Bound<'py, PyAny>> {
258 let client = self.clone();
259 let symbol = instrument_id.symbol.to_string();
260
261 pyo3_async_runtimes::tokio::future_into_py(py, async move {
262 client
263 .unsubscribe_trades(&symbol)
264 .await
265 .map_err(to_pyruntime_err)
266 })
267 }
268
269 #[pyo3(name = "unsubscribe_bars")]
270 fn py_unsubscribe_bars<'py>(
271 &self,
272 py: Python<'py>,
273 bar_type: BarType,
274 ) -> PyResult<Bound<'py, PyAny>> {
275 let client = self.clone();
276 let symbol = bar_type.instrument_id().symbol.to_string();
277 let width = AxCandleWidth::try_from(&bar_type.spec()).map_err(to_pyruntime_err)?;
278
279 pyo3_async_runtimes::tokio::future_into_py(py, async move {
280 client
281 .unsubscribe_candles(&symbol, width)
282 .await
283 .map_err(to_pyruntime_err)
284 })
285 }
286
287 #[pyo3(name = "disconnect")]
288 fn py_disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
289 let client = self.clone();
290
291 pyo3_async_runtimes::tokio::future_into_py(py, async move {
292 client.disconnect().await;
293 Ok(())
294 })
295 }
296
297 #[pyo3(name = "close")]
298 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
299 let mut client = self.clone();
300
301 pyo3_async_runtimes::tokio::future_into_py(py, async move {
302 client.close().await;
303 Ok(())
304 })
305 }
306}
307
308#[pymethods]
309impl AxOrdersWebSocketClient {
310 #[new]
311 #[pyo3(signature = (url, account_id, trader_id, heartbeat=None))]
312 fn py_new(
313 url: String,
314 account_id: AccountId,
315 trader_id: TraderId,
316 heartbeat: Option<u64>,
317 ) -> Self {
318 Self::new(url, account_id, trader_id, heartbeat)
319 }
320
321 #[getter]
322 #[pyo3(name = "url")]
323 #[must_use]
324 pub fn py_url(&self) -> &str {
325 self.url()
326 }
327
328 #[getter]
329 #[pyo3(name = "account_id")]
330 #[must_use]
331 pub fn py_account_id(&self) -> AccountId {
332 self.account_id()
333 }
334
335 #[pyo3(name = "is_active")]
336 #[must_use]
337 pub fn py_is_active(&self) -> bool {
338 self.is_active()
339 }
340
341 #[pyo3(name = "is_closed")]
342 #[must_use]
343 pub fn py_is_closed(&self) -> bool {
344 self.is_closed()
345 }
346
347 #[pyo3(name = "cache_instrument")]
348 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
349 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
350 Ok(())
351 }
352
353 #[pyo3(name = "register_external_order")]
354 fn py_register_external_order(
355 &self,
356 client_order_id: ClientOrderId,
357 venue_order_id: VenueOrderId,
358 instrument_id: InstrumentId,
359 strategy_id: StrategyId,
360 ) -> bool {
361 self.register_external_order(client_order_id, venue_order_id, instrument_id, strategy_id)
362 }
363
364 #[pyo3(name = "connect")]
365 fn py_connect<'py>(
366 &mut self,
367 py: Python<'py>,
368 callback: Py<PyAny>,
369 bearer_token: String,
370 ) -> PyResult<Bound<'py, PyAny>> {
371 let mut client = self.clone();
372
373 pyo3_async_runtimes::tokio::future_into_py(py, async move {
374 client
375 .connect(&bearer_token)
376 .await
377 .map_err(to_pyruntime_err)?;
378
379 let stream = client.stream();
380
381 get_runtime().spawn(async move {
382 tokio::pin!(stream);
383
384 while let Some(msg) = stream.next().await {
385 match msg {
386 AxOrdersWsMessage::Nautilus(exec_msg) => {
387 handle_exec_message(&callback, exec_msg);
388 }
389 AxOrdersWsMessage::PlaceOrderResponse(resp) => {
390 log::debug!(
391 "Place order response: rid={}, oid={}",
392 resp.rid,
393 resp.res.oid
394 );
395 }
396 AxOrdersWsMessage::CancelOrderResponse(resp) => {
397 log::debug!(
398 "Cancel order response: rid={}, received={}",
399 resp.rid,
400 resp.res.cxl_rx
401 );
402 }
403 AxOrdersWsMessage::OpenOrdersResponse(resp) => {
404 log::debug!(
405 "Open orders response: rid={}, count={}",
406 resp.rid,
407 resp.res.len()
408 );
409 }
410 AxOrdersWsMessage::Error(err) => {
411 log::error!(
412 "AX orders WebSocket error: code={:?}, message={}, rid={:?}",
413 err.code,
414 err.message,
415 err.request_id
416 );
417 }
418 AxOrdersWsMessage::Reconnected => {
419 log::info!("AX orders WebSocket reconnected");
420 }
421 AxOrdersWsMessage::Authenticated => {
422 log::info!("AX orders WebSocket authenticated");
423 }
424 }
425 }
426 });
427
428 Ok(())
429 })
430 }
431
432 #[pyo3(name = "submit_order")]
433 #[pyo3(signature = (
434 trader_id,
435 strategy_id,
436 instrument_id,
437 client_order_id,
438 order_side,
439 order_type,
440 quantity,
441 time_in_force,
442 price=None,
443 trigger_price=None,
444 post_only=false,
445 ))]
446 #[allow(clippy::too_many_arguments)]
447 fn py_submit_order<'py>(
448 &self,
449 py: Python<'py>,
450 trader_id: TraderId,
451 strategy_id: StrategyId,
452 instrument_id: InstrumentId,
453 client_order_id: ClientOrderId,
454 order_side: OrderSide,
455 order_type: OrderType,
456 quantity: Quantity,
457 time_in_force: TimeInForce,
458 price: Option<Price>,
459 trigger_price: Option<Price>,
460 post_only: bool,
461 ) -> PyResult<Bound<'py, PyAny>> {
462 let client = self.clone();
463
464 pyo3_async_runtimes::tokio::future_into_py(py, async move {
465 client
466 .submit_order(
467 trader_id,
468 strategy_id,
469 instrument_id,
470 client_order_id,
471 order_side,
472 order_type,
473 quantity,
474 time_in_force,
475 price,
476 trigger_price,
477 post_only,
478 )
479 .await
480 .map_err(to_pyruntime_err)?;
481 Ok(())
482 })
483 }
484
485 #[pyo3(name = "cancel_order")]
486 #[pyo3(signature = (client_order_id, venue_order_id=None))]
487 fn py_cancel_order<'py>(
488 &self,
489 py: Python<'py>,
490 client_order_id: ClientOrderId,
491 venue_order_id: Option<VenueOrderId>,
492 ) -> PyResult<Bound<'py, PyAny>> {
493 let client = self.clone();
494
495 pyo3_async_runtimes::tokio::future_into_py(py, async move {
496 client
497 .cancel_order(client_order_id, venue_order_id)
498 .await
499 .map_err(to_pyruntime_err)?;
500 Ok(())
501 })
502 }
503
504 #[pyo3(name = "get_open_orders")]
505 fn py_get_open_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
506 let client = self.clone();
507
508 pyo3_async_runtimes::tokio::future_into_py(py, async move {
509 client.get_open_orders().await.map_err(to_pyruntime_err)?;
510 Ok(())
511 })
512 }
513
514 #[pyo3(name = "disconnect")]
515 fn py_disconnect<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
516 let client = self.clone();
517
518 pyo3_async_runtimes::tokio::future_into_py(py, async move {
519 client.disconnect().await;
520 Ok(())
521 })
522 }
523
524 #[pyo3(name = "close")]
525 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
526 let mut client = self.clone();
527
528 pyo3_async_runtimes::tokio::future_into_py(py, async move {
529 client.close().await;
530 Ok(())
531 })
532 }
533}
534
535fn handle_exec_message(callback: &Py<PyAny>, msg: NautilusExecWsMessage) {
536 match msg {
537 NautilusExecWsMessage::OrderAccepted(event) => {
538 call_python_with_event(callback, move |py| {
539 event.into_py_any(py).map(|obj| obj.into_bound(py))
540 });
541 }
542 NautilusExecWsMessage::OrderFilled(event) => {
543 call_python_with_event(callback, move |py| {
544 event.into_py_any(py).map(|obj| obj.into_bound(py))
545 });
546 }
547 NautilusExecWsMessage::OrderCanceled(event) => {
548 call_python_with_event(callback, move |py| {
549 event.into_py_any(py).map(|obj| obj.into_bound(py))
550 });
551 }
552 NautilusExecWsMessage::OrderExpired(event) => {
553 call_python_with_event(callback, move |py| {
554 event.into_py_any(py).map(|obj| obj.into_bound(py))
555 });
556 }
557 NautilusExecWsMessage::OrderRejected(event) => {
558 call_python_with_event(callback, move |py| {
559 event.into_py_any(py).map(|obj| obj.into_bound(py))
560 });
561 }
562 NautilusExecWsMessage::OrderCancelRejected(event) => {
563 call_python_with_event(callback, move |py| {
564 event.into_py_any(py).map(|obj| obj.into_bound(py))
565 });
566 }
567 NautilusExecWsMessage::OrderStatusReports(reports) => {
568 for report in reports {
569 call_python_with_event(callback, move |py| {
570 report.into_py_any(py).map(|obj| obj.into_bound(py))
571 });
572 }
573 }
574 NautilusExecWsMessage::FillReports(reports) => {
575 for report in reports {
576 call_python_with_event(callback, move |py| {
577 report.into_py_any(py).map(|obj| obj.into_bound(py))
578 });
579 }
580 }
581 }
582}
583
584fn call_python_with_event<F>(callback: &Py<PyAny>, event_fn: F)
585where
586 F: FnOnce(Python<'_>) -> PyResult<Bound<'_, PyAny>> + Send + 'static,
587{
588 Python::attach(|py| match event_fn(py) {
589 Ok(event) => {
590 if let Err(e) = callback.call1(py, (event,)) {
591 log::error!("Error calling Python callback: {e}");
592 }
593 }
594 Err(e) => {
595 log::error!("Error converting event to Python: {e}");
596 }
597 });
598}