Skip to main content

nautilus_architect_ax/websocket/orders/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Orders WebSocket message handler for Ax.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use dashmap::DashMap;
28use nautilus_core::{
29    UUID4,
30    nanos::UnixNanos,
31    time::{AtomicTime, get_atomic_clock_realtime},
32};
33use nautilus_model::{
34    enums::{LiquiditySide, OrderSide as NautilusOrderSide, OrderStatus, OrderType, TimeInForce},
35    events::{
36        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderExpired, OrderFilled, OrderRejected,
37    },
38    identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
39    instruments::{Instrument, InstrumentAny},
40    reports::{FillReport, OrderStatusReport},
41    types::{Money, Price, Quantity},
42};
43use nautilus_network::websocket::{AuthTracker, WebSocketClient};
44use tokio_tungstenite::tungstenite::Message;
45use ustr::Ustr;
46
47use crate::{
48    common::{
49        consts::AX_POST_ONLY_REJECT,
50        enums::{AxOrderRequestType, AxOrderSide, AxTimeInForce},
51        parse::{ax_timestamp_s_to_unix_nanos, cid_to_client_order_id},
52    },
53    websocket::{
54        messages::{
55            AxOrdersWsMessage, AxWsCancelOrder, AxWsCancelRejected, AxWsGetOpenOrders, AxWsOrder,
56            AxWsOrderAcknowledged, AxWsOrderCanceled, AxWsOrderDoneForDay, AxWsOrderEvent,
57            AxWsOrderExpired, AxWsOrderFilled, AxWsOrderPartiallyFilled, AxWsOrderRejected,
58            AxWsOrderReplaced, AxWsOrderResponse, AxWsPlaceOrder, AxWsRawMessage,
59            AxWsTradeExecution, NautilusExecWsMessage, OrderMetadata,
60        },
61        parse::parse_order_message,
62    },
63};
64
65fn map_time_in_force(tif: AxTimeInForce) -> TimeInForce {
66    match tif {
67        AxTimeInForce::Gtc => TimeInForce::Gtc,
68        AxTimeInForce::Ioc => TimeInForce::Ioc,
69        AxTimeInForce::Fok => TimeInForce::Fok,
70        AxTimeInForce::Day => TimeInForce::Day,
71        AxTimeInForce::Gtd => TimeInForce::Gtd,
72        AxTimeInForce::Ato => TimeInForce::AtTheOpen,
73        AxTimeInForce::Atc => TimeInForce::AtTheClose,
74    }
75}
76
77fn map_order_side(side: AxOrderSide) -> NautilusOrderSide {
78    match side {
79        AxOrderSide::Buy => NautilusOrderSide::Buy,
80        AxOrderSide::Sell => NautilusOrderSide::Sell,
81    }
82}
83
84/// Simple tracking info for pending WebSocket orders.
85#[derive(Clone, Debug)]
86pub struct WsOrderInfo {
87    /// Client order ID for correlation.
88    pub client_order_id: ClientOrderId,
89    /// Instrument symbol.
90    pub symbol: Ustr,
91}
92
93/// Commands sent from the outer client to the inner orders handler.
94#[derive(Debug)]
95pub enum HandlerCommand {
96    /// Set the WebSocket client for this handler.
97    SetClient(WebSocketClient),
98    /// Disconnect the WebSocket connection.
99    Disconnect,
100    /// Authenticate with the provided token.
101    Authenticate {
102        /// Bearer token for authentication.
103        token: String,
104    },
105    /// Place an order.
106    PlaceOrder {
107        /// Request ID for correlation.
108        request_id: i64,
109        /// Order placement message.
110        order: AxWsPlaceOrder,
111        /// Order info for tracking.
112        order_info: WsOrderInfo,
113    },
114    /// Cancel an order.
115    CancelOrder {
116        /// Request ID for correlation.
117        request_id: i64,
118        /// Order ID to cancel.
119        order_id: String,
120    },
121    /// Get open orders.
122    GetOpenOrders {
123        /// Request ID for correlation.
124        request_id: i64,
125    },
126    /// Initialize the instrument cache with instruments.
127    InitializeInstruments(Vec<InstrumentAny>),
128    /// Update a single instrument in the cache.
129    UpdateInstrument(Box<InstrumentAny>),
130    /// Store order metadata for a pending order.
131    StoreOrderMetadata {
132        /// Client order ID.
133        client_order_id: ClientOrderId,
134        /// Order metadata.
135        metadata: OrderMetadata,
136    },
137}
138
139/// Orders feed handler that processes WebSocket messages and produces domain events.
140///
141/// Runs in a dedicated Tokio task and owns the WebSocket client exclusively.
142pub(crate) struct FeedHandler {
143    clock: &'static AtomicTime,
144    signal: Arc<AtomicBool>,
145    client: Option<WebSocketClient>,
146    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
147    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
148    auth_tracker: AuthTracker,
149    account_id: AccountId,
150    instruments: AHashMap<Ustr, InstrumentAny>,
151    pending_orders: AHashMap<i64, WsOrderInfo>,
152    message_queue: VecDeque<AxOrdersWsMessage>,
153    orders_metadata: Arc<DashMap<ClientOrderId, OrderMetadata>>,
154    venue_to_client_id: Arc<DashMap<VenueOrderId, ClientOrderId>>,
155    cid_to_client_order_id: Arc<DashMap<u64, ClientOrderId>>,
156    bearer_token: Option<String>,
157    needs_reauthentication: bool,
158}
159
160impl FeedHandler {
161    /// Creates a new [`FeedHandler`] instance.
162    #[must_use]
163    #[allow(clippy::too_many_arguments)]
164    pub fn new(
165        signal: Arc<AtomicBool>,
166        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
167        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
168        auth_tracker: AuthTracker,
169        account_id: AccountId,
170        orders_metadata: Arc<DashMap<ClientOrderId, OrderMetadata>>,
171        venue_to_client_id: Arc<DashMap<VenueOrderId, ClientOrderId>>,
172        cid_to_client_order_id: Arc<DashMap<u64, ClientOrderId>>,
173    ) -> Self {
174        Self {
175            clock: get_atomic_clock_realtime(),
176            signal,
177            client: None,
178            cmd_rx,
179            raw_rx,
180            auth_tracker,
181            account_id,
182            instruments: AHashMap::new(),
183            pending_orders: AHashMap::new(),
184            message_queue: VecDeque::new(),
185            orders_metadata,
186            venue_to_client_id,
187            cid_to_client_order_id,
188            bearer_token: None,
189            needs_reauthentication: false,
190        }
191    }
192
193    fn generate_ts_init(&self) -> UnixNanos {
194        self.clock.get_time_ns()
195    }
196
197    async fn reauthenticate(&mut self) {
198        if self.bearer_token.is_some() {
199            log::info!("Re-authenticating after reconnection");
200
201            // Ax uses Bearer token in connection headers which persist across reconnect
202            self.auth_tracker.succeed();
203            self.message_queue
204                .push_back(AxOrdersWsMessage::Authenticated);
205            log::info!("Re-authentication completed");
206        } else {
207            log::warn!("Cannot re-authenticate: no bearer token stored");
208        }
209    }
210
211    /// Returns the next message from the handler.
212    ///
213    /// This method blocks until a message is available or the handler is stopped.
214    pub async fn next(&mut self) -> Option<AxOrdersWsMessage> {
215        loop {
216            if self.needs_reauthentication && self.message_queue.is_empty() {
217                self.needs_reauthentication = false;
218                self.reauthenticate().await;
219            }
220
221            if let Some(msg) = self.message_queue.pop_front() {
222                return Some(msg);
223            }
224
225            tokio::select! {
226                Some(cmd) = self.cmd_rx.recv() => {
227                    self.handle_command(cmd).await;
228                }
229
230                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
231                    if self.signal.load(Ordering::Acquire) {
232                        log::debug!("Stop signal received during idle period");
233                        return None;
234                    }
235                    continue;
236                }
237
238                msg = self.raw_rx.recv() => {
239                    let msg = match msg {
240                        Some(msg) => msg,
241                        None => {
242                            log::debug!("WebSocket stream closed");
243                            return None;
244                        }
245                    };
246
247                    if let Message::Ping(data) = &msg {
248                        log::trace!("Received ping frame with {} bytes", data.len());
249                        if let Some(client) = &self.client
250                            && let Err(e) = client.send_pong(data.to_vec()).await
251                        {
252                            log::warn!("Failed to send pong frame: {e}");
253                        }
254                        continue;
255                    }
256
257                    if let Some(messages) = self.parse_raw_message(msg) {
258                        self.message_queue.extend(messages);
259                    }
260
261                    if self.signal.load(Ordering::Acquire) {
262                        log::debug!("Stop signal received");
263                        return None;
264                    }
265                }
266            }
267        }
268    }
269
270    async fn handle_command(&mut self, cmd: HandlerCommand) {
271        match cmd {
272            HandlerCommand::SetClient(client) => {
273                log::debug!("WebSocketClient received by handler");
274                self.client = Some(client);
275            }
276            HandlerCommand::Disconnect => {
277                log::debug!("Disconnect command received");
278                self.auth_tracker.fail("Disconnected");
279                if let Some(client) = self.client.take() {
280                    client.disconnect().await;
281                }
282            }
283            HandlerCommand::Authenticate { token } => {
284                log::debug!("Authenticate command received");
285                self.bearer_token = Some(token);
286
287                // Ax uses Bearer token in connection headers (handled at connect time)
288                self.auth_tracker.succeed();
289                self.message_queue
290                    .push_back(AxOrdersWsMessage::Authenticated);
291            }
292            HandlerCommand::PlaceOrder {
293                request_id,
294                order,
295                order_info,
296            } => {
297                log::debug!(
298                    "PlaceOrder command received: request_id={request_id}, symbol={}",
299                    order.s
300                );
301                self.pending_orders.insert(request_id, order_info);
302
303                if let Err(e) = self.send_json(&order).await {
304                    log::error!("Failed to send place order message: {e}");
305                    self.pending_orders.remove(&request_id);
306                }
307            }
308            HandlerCommand::CancelOrder {
309                request_id,
310                order_id,
311            } => {
312                log::debug!(
313                    "CancelOrder command received: request_id={request_id}, order_id={order_id}"
314                );
315                self.send_cancel_order(request_id, &order_id).await;
316            }
317            HandlerCommand::GetOpenOrders { request_id } => {
318                log::debug!("GetOpenOrders command received: request_id={request_id}");
319                self.send_get_open_orders(request_id).await;
320            }
321            HandlerCommand::InitializeInstruments(instruments) => {
322                for inst in instruments {
323                    self.instruments.insert(inst.symbol().inner(), inst);
324                }
325            }
326            HandlerCommand::UpdateInstrument(inst) => {
327                self.instruments.insert(inst.symbol().inner(), *inst);
328            }
329            HandlerCommand::StoreOrderMetadata {
330                client_order_id,
331                metadata,
332            } => {
333                self.orders_metadata.insert(client_order_id, metadata);
334            }
335        }
336    }
337
338    async fn send_cancel_order(&self, request_id: i64, order_id: &str) {
339        let msg = AxWsCancelOrder {
340            rid: request_id,
341            t: AxOrderRequestType::CancelOrder,
342            oid: order_id.to_string(),
343        };
344
345        if let Err(e) = self.send_json(&msg).await {
346            log::error!("Failed to send cancel order message: {e}");
347        }
348    }
349
350    async fn send_get_open_orders(&self, request_id: i64) {
351        let msg = AxWsGetOpenOrders {
352            rid: request_id,
353            t: AxOrderRequestType::GetOpenOrders,
354        };
355
356        if let Err(e) = self.send_json(&msg).await {
357            log::error!("Failed to send get open orders message: {e}");
358        }
359    }
360
361    async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
362        let Some(client) = &self.client else {
363            return Err("No WebSocket client available".to_string());
364        };
365
366        let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
367        log::trace!("Sending: {payload}");
368
369        client
370            .send_text(payload, None)
371            .await
372            .map_err(|e| e.to_string())
373    }
374
375    fn parse_raw_message(&mut self, msg: Message) -> Option<Vec<AxOrdersWsMessage>> {
376        match msg {
377            Message::Text(text) => {
378                if text == nautilus_network::RECONNECTED {
379                    log::info!("Received WebSocket reconnected signal");
380                    self.auth_tracker.fail("Reconnecting");
381                    self.needs_reauthentication = true;
382                    return Some(vec![AxOrdersWsMessage::Reconnected]);
383                }
384
385                log::trace!("Raw websocket message: {text}");
386
387                let raw_msg: AxWsRawMessage = match parse_order_message(&text) {
388                    Ok(v) => v,
389                    Err(e) => {
390                        log::error!("Failed to parse WebSocket message: {e}: {text}");
391                        return None;
392                    }
393                };
394
395                self.handle_raw_message(raw_msg)
396            }
397            Message::Binary(data) => {
398                log::debug!("Received binary message with {} bytes", data.len());
399                None
400            }
401            Message::Close(_) => {
402                log::debug!("Received close message, waiting for reconnection");
403                None
404            }
405            _ => None,
406        }
407    }
408
409    fn handle_raw_message(&mut self, raw_msg: AxWsRawMessage) -> Option<Vec<AxOrdersWsMessage>> {
410        match raw_msg {
411            AxWsRawMessage::Error(err) => {
412                log::warn!(
413                    "Order error response: rid={} code={} msg={}",
414                    err.rid,
415                    err.err.code,
416                    err.err.msg
417                );
418
419                if let Some(order_info) = self.pending_orders.remove(&err.rid) {
420                    self.orders_metadata.remove(&order_info.client_order_id);
421                    log::debug!(
422                        "Cleaned up metadata for failed order: {}",
423                        order_info.client_order_id
424                    );
425                }
426
427                Some(vec![AxOrdersWsMessage::Error(err.into())])
428            }
429            AxWsRawMessage::Response(resp) => self.handle_response(resp),
430            AxWsRawMessage::Event(event) => self.handle_event(*event),
431        }
432    }
433
434    fn handle_response(&mut self, resp: AxWsOrderResponse) -> Option<Vec<AxOrdersWsMessage>> {
435        match resp {
436            AxWsOrderResponse::PlaceOrder(msg) => {
437                log::debug!("Place order response: rid={} oid={}", msg.rid, msg.res.oid);
438                self.pending_orders.remove(&msg.rid);
439                Some(vec![AxOrdersWsMessage::PlaceOrderResponse(msg)])
440            }
441            AxWsOrderResponse::CancelOrder(msg) => {
442                log::debug!(
443                    "Cancel order response: rid={} accepted={}",
444                    msg.rid,
445                    msg.res.cxl_rx
446                );
447                Some(vec![AxOrdersWsMessage::CancelOrderResponse(msg)])
448            }
449            AxWsOrderResponse::OpenOrders(msg) => {
450                log::debug!("Open orders response: {} orders", msg.res.len());
451                Some(vec![AxOrdersWsMessage::OpenOrdersResponse(msg)])
452            }
453            AxWsOrderResponse::List(msg) => {
454                let order_count = msg.res.o.as_ref().map_or(0, |o| o.len());
455                log::debug!(
456                    "List subscription response: rid={} li={} orders={}",
457                    msg.rid,
458                    msg.res.li,
459                    order_count
460                );
461                None
462            }
463        }
464    }
465
466    fn handle_event(&mut self, event: AxWsOrderEvent) -> Option<Vec<AxOrdersWsMessage>> {
467        match event {
468            AxWsOrderEvent::Heartbeat => {
469                log::trace!("Received heartbeat");
470                None
471            }
472            AxWsOrderEvent::Acknowledged(msg) => self.handle_order_acknowledged(msg),
473            AxWsOrderEvent::PartiallyFilled(msg) => self.handle_order_partially_filled(msg),
474            AxWsOrderEvent::Filled(msg) => self.handle_order_filled(msg),
475            AxWsOrderEvent::Canceled(msg) => self.handle_order_canceled(msg),
476            AxWsOrderEvent::Rejected(msg) => self.handle_order_rejected(msg),
477            AxWsOrderEvent::Expired(msg) => self.handle_order_expired(msg),
478            AxWsOrderEvent::Replaced(msg) => self.handle_order_replaced(msg),
479            AxWsOrderEvent::DoneForDay(msg) => self.handle_order_done_for_day(msg),
480            AxWsOrderEvent::CancelRejected(msg) => self.handle_cancel_rejected(msg),
481        }
482    }
483
484    fn handle_order_acknowledged(
485        &mut self,
486        msg: AxWsOrderAcknowledged,
487    ) -> Option<Vec<AxOrdersWsMessage>> {
488        log::debug!("Order acknowledged: {} {}", msg.o.oid, msg.o.s);
489
490        if let Some(event) = self.create_order_accepted(&msg.o, msg.ts) {
491            Some(vec![AxOrdersWsMessage::Nautilus(
492                NautilusExecWsMessage::OrderAccepted(event),
493            )])
494        } else if let Some(report) =
495            self.create_order_status_report(&msg.o, OrderStatus::Accepted, msg.ts)
496        {
497            log::debug!("Created OrderStatusReport for external order {}", msg.o.oid);
498            Some(vec![AxOrdersWsMessage::Nautilus(
499                NautilusExecWsMessage::OrderStatusReports(vec![report]),
500            )])
501        } else {
502            log::warn!(
503                "Could not create OrderAccepted event for order {}",
504                msg.o.oid
505            );
506            None
507        }
508    }
509
510    fn handle_order_partially_filled(
511        &mut self,
512        msg: AxWsOrderPartiallyFilled,
513    ) -> Option<Vec<AxOrdersWsMessage>> {
514        log::debug!(
515            "Order partially filled: {} {} @ {}",
516            msg.o.oid,
517            msg.xs.q,
518            msg.xs.p
519        );
520
521        if let Some(event) = self.create_order_filled(&msg.o, &msg.xs, msg.ts) {
522            Some(vec![AxOrdersWsMessage::Nautilus(
523                NautilusExecWsMessage::OrderFilled(Box::new(event)),
524            )])
525        } else if let Some(report) = self.create_fill_report(&msg.o, &msg.xs, msg.ts) {
526            log::debug!("Created FillReport for external order {}", msg.o.oid);
527            Some(vec![AxOrdersWsMessage::Nautilus(
528                NautilusExecWsMessage::FillReports(vec![report]),
529            )])
530        } else {
531            log::warn!("Could not create OrderFilled event for order {}", msg.o.oid);
532            None
533        }
534    }
535
536    fn handle_order_filled(&mut self, msg: AxWsOrderFilled) -> Option<Vec<AxOrdersWsMessage>> {
537        log::debug!("Order filled: {} {} @ {}", msg.o.oid, msg.xs.q, msg.xs.p);
538
539        let message = if let Some(event) = self.create_order_filled(&msg.o, &msg.xs, msg.ts) {
540            Some(vec![AxOrdersWsMessage::Nautilus(
541                NautilusExecWsMessage::OrderFilled(Box::new(event)),
542            )])
543        } else if let Some(report) = self.create_fill_report(&msg.o, &msg.xs, msg.ts) {
544            log::debug!("Created FillReport for external order {}", msg.o.oid);
545            Some(vec![AxOrdersWsMessage::Nautilus(
546                NautilusExecWsMessage::FillReports(vec![report]),
547            )])
548        } else {
549            log::warn!("Could not create OrderFilled event for order {}", msg.o.oid);
550            None
551        };
552
553        self.cleanup_terminal_order_tracking(&msg.o);
554        message
555    }
556
557    fn handle_order_canceled(&mut self, msg: AxWsOrderCanceled) -> Option<Vec<AxOrdersWsMessage>> {
558        log::debug!("Order canceled: {} reason={}", msg.o.oid, msg.xr);
559
560        let message = if let Some(event) = self.create_order_canceled(&msg.o, msg.ts) {
561            Some(vec![AxOrdersWsMessage::Nautilus(
562                NautilusExecWsMessage::OrderCanceled(event),
563            )])
564        } else if let Some(report) =
565            self.create_order_status_report(&msg.o, OrderStatus::Canceled, msg.ts)
566        {
567            log::debug!("Created OrderStatusReport for external order {}", msg.o.oid);
568            Some(vec![AxOrdersWsMessage::Nautilus(
569                NautilusExecWsMessage::OrderStatusReports(vec![report]),
570            )])
571        } else {
572            log::warn!(
573                "Could not create OrderCanceled event for order {}",
574                msg.o.oid
575            );
576            None
577        };
578
579        self.cleanup_terminal_order_tracking(&msg.o);
580        message
581    }
582
583    fn handle_order_rejected(&mut self, msg: AxWsOrderRejected) -> Option<Vec<AxOrdersWsMessage>> {
584        // Use r, or txt, or "UNKNOWN" as fallback
585        let reason = msg.r.as_deref().or(msg.txt.as_deref()).unwrap_or("UNKNOWN");
586
587        let message = if let Some(event) = self.create_order_rejected(&msg.o, reason, msg.ts) {
588            Some(vec![AxOrdersWsMessage::Nautilus(
589                NautilusExecWsMessage::OrderRejected(event),
590            )])
591        } else {
592            log::warn!(
593                "Could not create OrderRejected event for order {}",
594                msg.o.oid
595            );
596            None
597        };
598
599        self.cleanup_terminal_order_tracking(&msg.o);
600        message
601    }
602
603    fn handle_order_expired(&mut self, msg: AxWsOrderExpired) -> Option<Vec<AxOrdersWsMessage>> {
604        log::debug!("Order expired: {}", msg.o.oid);
605
606        let message = if let Some(event) = self.create_order_expired(&msg.o, msg.ts) {
607            Some(vec![AxOrdersWsMessage::Nautilus(
608                NautilusExecWsMessage::OrderExpired(event),
609            )])
610        } else if let Some(report) =
611            self.create_order_status_report(&msg.o, OrderStatus::Expired, msg.ts)
612        {
613            log::debug!("Created OrderStatusReport for external order {}", msg.o.oid);
614            Some(vec![AxOrdersWsMessage::Nautilus(
615                NautilusExecWsMessage::OrderStatusReports(vec![report]),
616            )])
617        } else {
618            log::warn!(
619                "Could not create OrderExpired event for order {}",
620                msg.o.oid
621            );
622            None
623        };
624
625        self.cleanup_terminal_order_tracking(&msg.o);
626        message
627    }
628
629    fn handle_order_replaced(&mut self, msg: AxWsOrderReplaced) -> Option<Vec<AxOrdersWsMessage>> {
630        log::debug!("Order replaced: {}", msg.o.oid);
631
632        // Order replaced is treated as accepted with new parameters
633        if let Some(event) = self.create_order_accepted(&msg.o, msg.ts) {
634            Some(vec![AxOrdersWsMessage::Nautilus(
635                NautilusExecWsMessage::OrderAccepted(event),
636            )])
637        } else if let Some(report) =
638            self.create_order_status_report(&msg.o, OrderStatus::Accepted, msg.ts)
639        {
640            log::debug!(
641                "Created OrderStatusReport for external replaced order {}",
642                msg.o.oid
643            );
644            Some(vec![AxOrdersWsMessage::Nautilus(
645                NautilusExecWsMessage::OrderStatusReports(vec![report]),
646            )])
647        } else {
648            log::warn!(
649                "Could not create OrderAccepted event for replaced order {}",
650                msg.o.oid
651            );
652            None
653        }
654    }
655
656    fn handle_order_done_for_day(
657        &mut self,
658        msg: AxWsOrderDoneForDay,
659    ) -> Option<Vec<AxOrdersWsMessage>> {
660        log::debug!("Order done for day: {}", msg.o.oid);
661
662        let message = if let Some(event) = self.create_order_expired(&msg.o, msg.ts) {
663            Some(vec![AxOrdersWsMessage::Nautilus(
664                NautilusExecWsMessage::OrderExpired(event),
665            )])
666        } else if let Some(report) =
667            self.create_order_status_report(&msg.o, OrderStatus::Expired, msg.ts)
668        {
669            log::debug!(
670                "Created OrderStatusReport for external done-for-day order {}",
671                msg.o.oid
672            );
673            Some(vec![AxOrdersWsMessage::Nautilus(
674                NautilusExecWsMessage::OrderStatusReports(vec![report]),
675            )])
676        } else {
677            log::warn!(
678                "Could not create OrderExpired event for done-for-day order {}",
679                msg.o.oid
680            );
681            None
682        };
683
684        self.cleanup_terminal_order_tracking(&msg.o);
685        message
686    }
687
688    fn handle_cancel_rejected(
689        &mut self,
690        msg: AxWsCancelRejected,
691    ) -> Option<Vec<AxOrdersWsMessage>> {
692        log::warn!("Cancel rejected: {} reason={}", msg.oid, msg.r);
693
694        let venue_order_id = VenueOrderId::new(&msg.oid);
695        if let Some(client_order_id) = self.venue_to_client_id.get(&venue_order_id)
696            && let Some(metadata) = self.orders_metadata.get(&client_order_id)
697        {
698            let event = OrderCancelRejected::new(
699                metadata.trader_id,
700                metadata.strategy_id,
701                metadata.instrument_id,
702                metadata.client_order_id,
703                Ustr::from(msg.r.as_ref()),
704                UUID4::new(),
705                self.generate_ts_init(),
706                metadata.ts_init,
707                false,
708                Some(venue_order_id),
709                Some(self.account_id),
710            );
711            Some(vec![AxOrdersWsMessage::Nautilus(
712                NautilusExecWsMessage::OrderCancelRejected(event),
713            )])
714        } else {
715            log::warn!(
716                "Could not find metadata for cancel rejected order {}",
717                msg.oid
718            );
719            None
720        }
721    }
722
723    fn lookup_order_metadata(
724        &self,
725        order: &AxWsOrder,
726    ) -> Option<dashmap::mapref::one::Ref<'_, ClientOrderId, OrderMetadata>> {
727        let venue_order_id = VenueOrderId::new(&order.oid);
728
729        // Try venue_order_id mapping first
730        if let Some(client_order_id) = self.venue_to_client_id.get(&venue_order_id)
731            && let Some(metadata) = self.orders_metadata.get(&*client_order_id)
732        {
733            return Some(metadata);
734        }
735
736        // Try cid mapping second
737        if let Some(cid) = order.cid
738            && let Some(client_order_id) = self.cid_to_client_order_id.get(&cid)
739            && let Some(metadata) = self.orders_metadata.get(&*client_order_id)
740        {
741            return Some(metadata);
742        }
743
744        None
745    }
746
747    fn create_order_accepted(&mut self, order: &AxWsOrder, event_ts: i64) -> Option<OrderAccepted> {
748        let venue_order_id = VenueOrderId::new(&order.oid);
749        let metadata = self.lookup_order_metadata(order)?;
750
751        // Extract values before dropping the read guard
752        let client_order_id = metadata.client_order_id;
753        let trader_id = metadata.trader_id;
754        let strategy_id = metadata.strategy_id;
755        let instrument_id = metadata.instrument_id;
756
757        // Drop the read guard before acquiring write lock
758        drop(metadata);
759
760        // Update venue_order_id mapping
761        self.venue_to_client_id
762            .insert(venue_order_id, client_order_id);
763
764        // Update metadata with venue_order_id
765        if let Some(mut entry) = self.orders_metadata.get_mut(&client_order_id) {
766            entry.venue_order_id = Some(venue_order_id);
767        }
768
769        let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
770
771        Some(OrderAccepted::new(
772            trader_id,
773            strategy_id,
774            instrument_id,
775            client_order_id,
776            venue_order_id,
777            self.account_id,
778            UUID4::new(),
779            ts_event,
780            self.generate_ts_init(),
781            false,
782        ))
783    }
784
785    fn create_order_filled(
786        &self,
787        order: &AxWsOrder,
788        execution: &AxWsTradeExecution,
789        event_ts: i64,
790    ) -> Option<OrderFilled> {
791        let venue_order_id = VenueOrderId::new(&order.oid);
792        let metadata = self.lookup_order_metadata(order)?;
793
794        let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
795
796        // AX uses u64 contracts - use instrument precision from metadata
797        let last_qty = Quantity::new(execution.q as f64, metadata.size_precision);
798        let last_px = Price::from_decimal_dp(execution.p, metadata.price_precision).ok()?;
799
800        let order_side = match order.d {
801            AxOrderSide::Buy => NautilusOrderSide::Buy,
802            AxOrderSide::Sell => NautilusOrderSide::Sell,
803        };
804
805        // AX primarily uses limit orders
806        let order_type = OrderType::Limit;
807
808        // agg=true means aggressor (taker), agg=false means maker
809        let liquidity_side = if execution.agg {
810            LiquiditySide::Taker
811        } else {
812            LiquiditySide::Maker
813        };
814
815        Some(OrderFilled::new(
816            metadata.trader_id,
817            metadata.strategy_id,
818            metadata.instrument_id,
819            metadata.client_order_id,
820            venue_order_id,
821            self.account_id,
822            TradeId::new(&execution.tid),
823            order_side,
824            order_type,
825            last_qty,
826            last_px,
827            metadata.quote_currency,
828            liquidity_side,
829            UUID4::new(),
830            ts_event,
831            self.generate_ts_init(),
832            false,
833            None, // position_id
834            None, // commission
835        ))
836    }
837
838    fn create_order_canceled(&self, order: &AxWsOrder, event_ts: i64) -> Option<OrderCanceled> {
839        let venue_order_id = VenueOrderId::new(&order.oid);
840        let metadata = self.lookup_order_metadata(order)?;
841
842        let client_order_id = metadata.client_order_id;
843        let trader_id = metadata.trader_id;
844        let strategy_id = metadata.strategy_id;
845        let instrument_id = metadata.instrument_id;
846
847        let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
848
849        Some(OrderCanceled::new(
850            trader_id,
851            strategy_id,
852            instrument_id,
853            client_order_id,
854            UUID4::new(),
855            ts_event,
856            self.generate_ts_init(),
857            false,
858            Some(venue_order_id),
859            Some(self.account_id),
860        ))
861    }
862
863    fn create_order_expired(&self, order: &AxWsOrder, event_ts: i64) -> Option<OrderExpired> {
864        let venue_order_id = VenueOrderId::new(&order.oid);
865        let metadata = self.lookup_order_metadata(order)?;
866
867        let client_order_id = metadata.client_order_id;
868        let trader_id = metadata.trader_id;
869        let strategy_id = metadata.strategy_id;
870        let instrument_id = metadata.instrument_id;
871
872        let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
873
874        Some(OrderExpired::new(
875            trader_id,
876            strategy_id,
877            instrument_id,
878            client_order_id,
879            UUID4::new(),
880            ts_event,
881            self.generate_ts_init(),
882            false,
883            Some(venue_order_id),
884            Some(self.account_id),
885        ))
886    }
887
888    fn create_order_rejected(
889        &self,
890        order: &AxWsOrder,
891        reason: &str,
892        event_ts: i64,
893    ) -> Option<OrderRejected> {
894        let metadata = self.lookup_order_metadata(order)?;
895
896        let client_order_id = metadata.client_order_id;
897        let trader_id = metadata.trader_id;
898        let strategy_id = metadata.strategy_id;
899        let instrument_id = metadata.instrument_id;
900
901        let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
902        let due_post_only = reason.contains(AX_POST_ONLY_REJECT);
903
904        Some(OrderRejected::new(
905            trader_id,
906            strategy_id,
907            instrument_id,
908            client_order_id,
909            self.account_id,
910            Ustr::from(reason),
911            UUID4::new(),
912            ts_event,
913            self.generate_ts_init(),
914            false,
915            due_post_only,
916        ))
917    }
918
919    fn cleanup_terminal_order_tracking(&mut self, order: &AxWsOrder) {
920        let venue_order_id = VenueOrderId::new(&order.oid);
921        let client_order_id = self
922            .venue_to_client_id
923            .remove(&venue_order_id)
924            .map(|(_, v)| v)
925            .or_else(|| {
926                order
927                    .cid
928                    .and_then(|cid| self.cid_to_client_order_id.remove(&cid).map(|(_, v)| v))
929            });
930
931        if let Some(client_order_id) = client_order_id {
932            self.orders_metadata.remove(&client_order_id);
933        }
934
935        if let Some(cid) = order.cid {
936            self.cid_to_client_order_id.remove(&cid);
937        }
938    }
939
940    fn create_order_status_report(
941        &self,
942        order: &AxWsOrder,
943        order_status: OrderStatus,
944        event_ts: i64,
945    ) -> Option<OrderStatusReport> {
946        let instrument = self.instruments.get(&order.s)?;
947        let venue_order_id = VenueOrderId::new(&order.oid);
948        let instrument_id = instrument.id();
949        let order_side = map_order_side(order.d);
950        let time_in_force = map_time_in_force(order.tif);
951
952        let quantity = Quantity::new(order.q as f64, instrument.size_precision());
953        let filled_qty = Quantity::new(order.xq as f64, instrument.size_precision());
954
955        let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
956        let ts_init = self.generate_ts_init();
957
958        let client_order_id = order.cid.map(|cid| {
959            self.cid_to_client_order_id
960                .get(&cid)
961                .map_or_else(|| cid_to_client_order_id(cid), |v| *v)
962        });
963
964        let mut report = OrderStatusReport::new(
965            self.account_id,
966            instrument_id,
967            client_order_id,
968            venue_order_id,
969            order_side,
970            OrderType::Limit, // AX primarily uses limit orders
971            time_in_force,
972            order_status,
973            quantity,
974            filled_qty,
975            ts_event, // ts_accepted
976            ts_event, // ts_last
977            ts_init,
978            Some(UUID4::new()),
979        );
980
981        if let Ok(price) = Price::from_decimal_dp(order.p, instrument.price_precision()) {
982            report = report.with_price(price);
983        }
984
985        Some(report)
986    }
987
988    fn create_fill_report(
989        &self,
990        order: &AxWsOrder,
991        execution: &AxWsTradeExecution,
992        event_ts: i64,
993    ) -> Option<FillReport> {
994        let instrument = self.instruments.get(&order.s)?;
995        let venue_order_id = VenueOrderId::new(&order.oid);
996        let instrument_id = instrument.id();
997        let order_side = map_order_side(order.d);
998
999        let last_qty = Quantity::new(execution.q as f64, instrument.size_precision());
1000        let last_px = Price::from_decimal_dp(execution.p, instrument.price_precision()).ok()?;
1001
1002        // agg=true means aggressor (taker), agg=false means maker
1003        let liquidity_side = if execution.agg {
1004            LiquiditySide::Taker
1005        } else {
1006            LiquiditySide::Maker
1007        };
1008
1009        let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
1010        let ts_init = self.generate_ts_init();
1011
1012        let client_order_id = order.cid.map(|cid| {
1013            self.cid_to_client_order_id
1014                .get(&cid)
1015                .map_or_else(|| cid_to_client_order_id(cid), |v| *v)
1016        });
1017
1018        // AX doesn't provide commission in WebSocket fill events
1019        let commission = Money::new(0.0, instrument.quote_currency());
1020
1021        Some(FillReport::new(
1022            self.account_id,
1023            instrument_id,
1024            venue_order_id,
1025            TradeId::new(&execution.tid),
1026            order_side,
1027            last_qty,
1028            last_px,
1029            commission,
1030            liquidity_side,
1031            client_order_id,
1032            None, // venue_position_id
1033            ts_event,
1034            ts_init,
1035            Some(UUID4::new()),
1036        ))
1037    }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042    use std::sync::{Arc, atomic::AtomicBool};
1043
1044    use dashmap::DashMap;
1045    use nautilus_model::{
1046        identifiers::{InstrumentId, StrategyId, TraderId},
1047        types::Currency,
1048    };
1049    use nautilus_network::websocket::AuthTracker;
1050    use rstest::rstest;
1051    use rust_decimal_macros::dec;
1052    use ustr::Ustr;
1053
1054    use super::*;
1055    use crate::{
1056        common::enums::{AxOrderSide, AxOrderStatus, AxTimeInForce},
1057        websocket::messages::{AxWsOrderRejected, AxWsPlaceOrderResponse, AxWsPlaceOrderResult},
1058    };
1059
1060    fn test_handler() -> FeedHandler {
1061        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1062        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1063        FeedHandler::new(
1064            Arc::new(AtomicBool::new(false)),
1065            cmd_rx,
1066            raw_rx,
1067            AuthTracker::default(),
1068            AccountId::from("AX-001"),
1069            Arc::new(DashMap::new()),
1070            Arc::new(DashMap::new()),
1071            Arc::new(DashMap::new()),
1072        )
1073    }
1074
1075    fn sample_order(cid: u64) -> AxWsOrder {
1076        AxWsOrder {
1077            oid: "OID-1".to_string(),
1078            u: "user-1".to_string(),
1079            s: Ustr::from("BTCUSD-PERP"),
1080            p: dec!(50000),
1081            q: 100,
1082            xq: 100,
1083            rq: 0,
1084            o: AxOrderStatus::Filled,
1085            d: AxOrderSide::Buy,
1086            tif: AxTimeInForce::Gtc,
1087            ts: 1_700_000_000,
1088            tn: 1,
1089            cid: Some(cid),
1090            tag: None,
1091            txt: None,
1092        }
1093    }
1094
1095    fn sample_metadata(
1096        client_order_id: ClientOrderId,
1097        venue_order_id: VenueOrderId,
1098    ) -> OrderMetadata {
1099        OrderMetadata {
1100            trader_id: TraderId::from("TRADER-001"),
1101            strategy_id: StrategyId::from("S-001"),
1102            instrument_id: InstrumentId::from("BTCUSD-PERP.AX"),
1103            client_order_id,
1104            venue_order_id: Some(venue_order_id),
1105            ts_init: UnixNanos::from(1_700_000_000_000_000_000u64),
1106            size_precision: 8,
1107            price_precision: 2,
1108            quote_currency: Currency::USD(),
1109        }
1110    }
1111
1112    #[rstest]
1113    fn test_place_order_response_cleans_pending_order() {
1114        let mut handler = test_handler();
1115        let request_id = 11;
1116        handler.pending_orders.insert(
1117            request_id,
1118            WsOrderInfo {
1119                client_order_id: ClientOrderId::from("CID-11"),
1120                symbol: Ustr::from("BTCUSD-PERP"),
1121            },
1122        );
1123
1124        let response = AxWsOrderResponse::PlaceOrder(AxWsPlaceOrderResponse {
1125            rid: request_id,
1126            res: AxWsPlaceOrderResult {
1127                oid: "OID-11".to_string(),
1128            },
1129        });
1130
1131        let messages = handler.handle_response(response).unwrap();
1132        assert_eq!(messages.len(), 1);
1133        assert!(handler.pending_orders.get(&request_id).is_none());
1134    }
1135
1136    #[rstest]
1137    fn test_handle_order_filled_cleans_tracking_maps() {
1138        let mut handler = test_handler();
1139
1140        let client_order_id = ClientOrderId::from("CID-22");
1141        let venue_order_id = VenueOrderId::new("OID-1");
1142        let cid = 22_u64;
1143
1144        handler.orders_metadata.insert(
1145            client_order_id,
1146            sample_metadata(client_order_id, venue_order_id),
1147        );
1148        handler
1149            .venue_to_client_id
1150            .insert(venue_order_id, client_order_id);
1151        handler.cid_to_client_order_id.insert(cid, client_order_id);
1152
1153        let msg = AxWsOrderFilled {
1154            ts: 1_700_000_001,
1155            tn: 2,
1156            eid: "EID-1".to_string(),
1157            o: sample_order(cid),
1158            xs: AxWsTradeExecution {
1159                tid: "T-1".to_string(),
1160                s: Ustr::from("BTCUSD-PERP"),
1161                q: 100,
1162                p: dec!(50000),
1163                d: AxOrderSide::Buy,
1164                agg: true,
1165            },
1166        };
1167
1168        let messages = handler.handle_order_filled(msg).unwrap();
1169        assert_eq!(messages.len(), 1);
1170        assert!(handler.orders_metadata.get(&client_order_id).is_none());
1171        assert!(handler.venue_to_client_id.get(&venue_order_id).is_none());
1172        assert!(handler.cid_to_client_order_id.get(&cid).is_none());
1173    }
1174
1175    #[rstest]
1176    fn test_handle_order_rejected_cleans_tracking_maps() {
1177        let mut handler = test_handler();
1178
1179        let client_order_id = ClientOrderId::from("CID-33");
1180        let venue_order_id = VenueOrderId::new("OID-1");
1181        let cid = 33_u64;
1182
1183        handler.orders_metadata.insert(
1184            client_order_id,
1185            sample_metadata(client_order_id, venue_order_id),
1186        );
1187        handler
1188            .venue_to_client_id
1189            .insert(venue_order_id, client_order_id);
1190        handler.cid_to_client_order_id.insert(cid, client_order_id);
1191
1192        let msg = AxWsOrderRejected {
1193            ts: 1_700_000_002,
1194            tn: 3,
1195            eid: "EID-3".to_string(),
1196            o: sample_order(cid),
1197            r: Some("rejected".to_string()),
1198            txt: None,
1199        };
1200
1201        let messages = handler.handle_order_rejected(msg);
1202        assert!(messages.is_some());
1203        assert!(handler.orders_metadata.get(&client_order_id).is_none());
1204        assert!(handler.venue_to_client_id.get(&venue_order_id).is_none());
1205        assert!(handler.cid_to_client_order_id.get(&cid).is_none());
1206    }
1207}