nautilus_architect_ax/websocket/orders/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Orders WebSocket message handler for Ax.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use nautilus_model::instruments::{Instrument, InstrumentAny};
28use nautilus_network::websocket::{AuthTracker, WebSocketClient};
29use tokio_tungstenite::tungstenite::Message;
30use ustr::Ustr;
31
32use crate::websocket::messages::{
33    AxOrdersWsMessage, AxWsCancelOrder, AxWsCancelOrderResponse, AxWsCancelRejected, AxWsError,
34    AxWsGetOpenOrders, AxWsOpenOrdersResponse, AxWsOrderAcknowledged, AxWsOrderCanceled,
35    AxWsOrderDoneForDay, AxWsOrderExpired, AxWsOrderFilled, AxWsOrderPartiallyFilled,
36    AxWsOrderRejected, AxWsOrderReplaced, AxWsPlaceOrder, AxWsPlaceOrderResponse, OrderMetadata,
37};
38
39/// Commands sent from the outer client to the inner orders handler.
40#[derive(Debug)]
41pub enum HandlerCommand {
42    /// Set the WebSocket client for this handler.
43    SetClient(WebSocketClient),
44    /// Disconnect the WebSocket connection.
45    Disconnect,
46    /// Authenticate with the provided token.
47    Authenticate {
48        /// Bearer token for authentication.
49        token: String,
50    },
51    /// Place an order.
52    PlaceOrder {
53        /// Request ID for correlation.
54        request_id: i64,
55        /// Order placement message.
56        order: AxWsPlaceOrder,
57        /// Metadata for response correlation.
58        metadata: OrderMetadata,
59    },
60    /// Cancel an order.
61    CancelOrder {
62        /// Request ID for correlation.
63        request_id: i64,
64        /// Order ID to cancel.
65        order_id: String,
66    },
67    /// Get open orders.
68    GetOpenOrders {
69        /// Request ID for correlation.
70        request_id: i64,
71    },
72    /// Initialize the instrument cache with instruments.
73    InitializeInstruments(Vec<InstrumentAny>),
74    /// Update a single instrument in the cache.
75    UpdateInstrument(Box<InstrumentAny>),
76}
77
78/// Orders feed handler that processes WebSocket messages.
79///
80/// Runs in a dedicated Tokio task and owns the WebSocket client exclusively.
81pub(crate) struct FeedHandler {
82    signal: Arc<AtomicBool>,
83    client: Option<WebSocketClient>,
84    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
85    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
86    #[allow(dead_code)] // TODO: Use for sending parsed messages
87    out_tx: tokio::sync::mpsc::UnboundedSender<AxOrdersWsMessage>,
88    auth_tracker: AuthTracker,
89    instruments: AHashMap<Ustr, InstrumentAny>,
90    pending_orders: AHashMap<i64, OrderMetadata>,
91    message_queue: VecDeque<AxOrdersWsMessage>,
92}
93
94impl FeedHandler {
95    /// Creates a new [`FeedHandler`] instance.
96    #[must_use]
97    pub fn new(
98        signal: Arc<AtomicBool>,
99        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
100        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
101        out_tx: tokio::sync::mpsc::UnboundedSender<AxOrdersWsMessage>,
102        auth_tracker: AuthTracker,
103    ) -> Self {
104        Self {
105            signal,
106            client: None,
107            cmd_rx,
108            raw_rx,
109            out_tx,
110            auth_tracker,
111            instruments: AHashMap::new(),
112            pending_orders: AHashMap::new(),
113            message_queue: VecDeque::new(),
114        }
115    }
116
117    /// Returns the next message from the handler.
118    ///
119    /// This method blocks until a message is available or the handler is stopped.
120    pub async fn next(&mut self) -> Option<AxOrdersWsMessage> {
121        loop {
122            if let Some(msg) = self.message_queue.pop_front() {
123                return Some(msg);
124            }
125
126            tokio::select! {
127                Some(cmd) = self.cmd_rx.recv() => {
128                    self.handle_command(cmd).await;
129                }
130
131                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
132                    if self.signal.load(Ordering::Relaxed) {
133                        log::debug!("Stop signal received during idle period");
134                        return None;
135                    }
136                    continue;
137                }
138
139                msg = self.raw_rx.recv() => {
140                    let msg = match msg {
141                        Some(msg) => msg,
142                        None => {
143                            log::debug!("WebSocket stream closed");
144                            return None;
145                        }
146                    };
147
148                    if let Message::Ping(data) = &msg {
149                        log::trace!("Received ping frame with {} bytes", data.len());
150                        if let Some(client) = &self.client
151                            && let Err(e) = client.send_pong(data.to_vec()).await
152                        {
153                            log::warn!("Failed to send pong frame: {e}");
154                        }
155                        continue;
156                    }
157
158                    if let Some(messages) = self.parse_raw_message(msg) {
159                        self.message_queue.extend(messages);
160                    }
161
162                    if self.signal.load(Ordering::Relaxed) {
163                        log::debug!("Stop signal received");
164                        return None;
165                    }
166                }
167            }
168        }
169    }
170
171    async fn handle_command(&mut self, cmd: HandlerCommand) {
172        match cmd {
173            HandlerCommand::SetClient(client) => {
174                log::debug!("WebSocketClient received by handler");
175                self.client = Some(client);
176            }
177            HandlerCommand::Disconnect => {
178                log::debug!("Disconnect command received");
179                if let Some(client) = self.client.take() {
180                    client.disconnect().await;
181                }
182            }
183            HandlerCommand::Authenticate { token: _ } => {
184                log::debug!("Authenticate command received");
185                // Ax uses Bearer token in connection headers, not a message
186                // This is handled at connection time, so we just mark as authenticated
187                self.auth_tracker.succeed();
188                self.message_queue
189                    .push_back(AxOrdersWsMessage::Authenticated);
190            }
191            HandlerCommand::PlaceOrder {
192                request_id,
193                order,
194                metadata,
195            } => {
196                log::debug!(
197                    "PlaceOrder command received: request_id={request_id}, symbol={}",
198                    order.s
199                );
200                self.pending_orders.insert(request_id, metadata);
201
202                if let Err(e) = self.send_json(&order).await {
203                    log::error!("Failed to send place order message: {e}");
204                    self.pending_orders.remove(&request_id);
205                }
206            }
207            HandlerCommand::CancelOrder {
208                request_id,
209                order_id,
210            } => {
211                log::debug!(
212                    "CancelOrder command received: request_id={request_id}, order_id={order_id}"
213                );
214                self.send_cancel_order(request_id, &order_id).await;
215            }
216            HandlerCommand::GetOpenOrders { request_id } => {
217                log::debug!("GetOpenOrders command received: request_id={request_id}");
218                self.send_get_open_orders(request_id).await;
219            }
220            HandlerCommand::InitializeInstruments(instruments) => {
221                for inst in instruments {
222                    self.instruments.insert(inst.symbol().inner(), inst);
223                }
224            }
225            HandlerCommand::UpdateInstrument(inst) => {
226                self.instruments.insert(inst.symbol().inner(), *inst);
227            }
228        }
229    }
230
231    async fn send_cancel_order(&self, request_id: i64, order_id: &str) {
232        let msg = AxWsCancelOrder {
233            rid: request_id,
234            t: "x".to_string(),
235            oid: order_id.to_string(),
236        };
237
238        if let Err(e) = self.send_json(&msg).await {
239            log::error!("Failed to send cancel order message: {e}");
240        }
241    }
242
243    async fn send_get_open_orders(&self, request_id: i64) {
244        let msg = AxWsGetOpenOrders {
245            rid: request_id,
246            t: "o".to_string(),
247        };
248
249        if let Err(e) = self.send_json(&msg).await {
250            log::error!("Failed to send get open orders message: {e}");
251        }
252    }
253
254    async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
255        let Some(client) = &self.client else {
256            return Err("No WebSocket client available".to_string());
257        };
258
259        let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
260        log::trace!("Sending: {payload}");
261
262        client
263            .send_text(payload, None)
264            .await
265            .map_err(|e| e.to_string())
266    }
267
268    fn parse_raw_message(&mut self, msg: Message) -> Option<Vec<AxOrdersWsMessage>> {
269        match msg {
270            Message::Text(text) => {
271                if text == nautilus_network::RECONNECTED {
272                    log::info!("Received WebSocket reconnected signal");
273                    return Some(vec![AxOrdersWsMessage::Reconnected]);
274                }
275
276                log::trace!("Raw websocket message: {text}");
277
278                let value: serde_json::Value = match serde_json::from_str(&text) {
279                    Ok(v) => v,
280                    Err(e) => {
281                        log::error!("Failed to parse WebSocket message: {e}: {text}");
282                        return None;
283                    }
284                };
285
286                self.classify_and_parse_message(value)
287            }
288            Message::Binary(data) => {
289                log::debug!("Received binary message with {} bytes", data.len());
290                None
291            }
292            Message::Close(_) => {
293                log::debug!("Received close message, waiting for reconnection");
294                None
295            }
296            _ => None,
297        }
298    }
299
300    fn classify_and_parse_message(
301        &mut self,
302        value: serde_json::Value,
303    ) -> Option<Vec<AxOrdersWsMessage>> {
304        let obj = value.as_object()?;
305
306        // Response messages have "rid" + "res", event messages have "t"
307        if obj.contains_key("rid") && obj.contains_key("res") {
308            return self.parse_response_message(value);
309        }
310
311        let msg_type = obj.get("t").and_then(|v| v.as_str())?;
312
313        match msg_type {
314            "h" => {
315                log::trace!("Received heartbeat");
316                None
317            }
318            "n" => match serde_json::from_value::<AxWsOrderAcknowledged>(value) {
319                Ok(msg) => {
320                    log::debug!("Order acknowledged: {} {}", msg.o.oid, msg.o.s);
321                    Some(vec![AxOrdersWsMessage::OrderAcknowledged(msg)])
322                }
323                Err(e) => {
324                    log::error!("Failed to parse order acknowledged: {e}");
325                    None
326                }
327            },
328            "p" => match serde_json::from_value::<AxWsOrderPartiallyFilled>(value) {
329                Ok(msg) => {
330                    log::debug!(
331                        "Order partially filled: {} {} @ {}",
332                        msg.o.oid,
333                        msg.xs.q,
334                        msg.xs.p
335                    );
336                    Some(vec![AxOrdersWsMessage::OrderPartiallyFilled(msg)])
337                }
338                Err(e) => {
339                    log::error!("Failed to parse order partially filled: {e}");
340                    None
341                }
342            },
343            "f" => match serde_json::from_value::<AxWsOrderFilled>(value) {
344                Ok(msg) => {
345                    log::debug!("Order filled: {} {} @ {}", msg.o.oid, msg.xs.q, msg.xs.p);
346                    Some(vec![AxOrdersWsMessage::OrderFilled(msg)])
347                }
348                Err(e) => {
349                    log::error!("Failed to parse order filled: {e}");
350                    None
351                }
352            },
353            "c" => match serde_json::from_value::<AxWsOrderCanceled>(value) {
354                Ok(msg) => {
355                    log::debug!("Order canceled: {} reason={}", msg.o.oid, msg.xr);
356                    Some(vec![AxOrdersWsMessage::OrderCanceled(msg)])
357                }
358                Err(e) => {
359                    log::error!("Failed to parse order canceled: {e}");
360                    None
361                }
362            },
363            "j" => match serde_json::from_value::<AxWsOrderRejected>(value) {
364                Ok(msg) => {
365                    log::debug!("Order rejected: {} reason={}", msg.o.oid, msg.r);
366                    Some(vec![AxOrdersWsMessage::OrderRejectedRaw(msg)])
367                }
368                Err(e) => {
369                    log::error!("Failed to parse order rejected: {e}");
370                    None
371                }
372            },
373            "x" => match serde_json::from_value::<AxWsOrderExpired>(value) {
374                Ok(msg) => {
375                    log::debug!("Order expired: {}", msg.o.oid);
376                    Some(vec![AxOrdersWsMessage::OrderExpired(msg)])
377                }
378                Err(e) => {
379                    log::error!("Failed to parse order expired: {e}");
380                    None
381                }
382            },
383            "r" => match serde_json::from_value::<AxWsOrderReplaced>(value) {
384                Ok(msg) => {
385                    log::debug!("Order replaced: {}", msg.o.oid);
386                    Some(vec![AxOrdersWsMessage::OrderReplaced(msg)])
387                }
388                Err(e) => {
389                    log::error!("Failed to parse order replaced: {e}");
390                    None
391                }
392            },
393            "d" => match serde_json::from_value::<AxWsOrderDoneForDay>(value) {
394                Ok(msg) => {
395                    log::debug!("Order done for day: {}", msg.o.oid);
396                    Some(vec![AxOrdersWsMessage::OrderDoneForDay(msg)])
397                }
398                Err(e) => {
399                    log::error!("Failed to parse order done for day: {e}");
400                    None
401                }
402            },
403            "e" => match serde_json::from_value::<AxWsCancelRejected>(value) {
404                Ok(msg) => {
405                    log::debug!("Cancel rejected: {} reason={}", msg.oid, msg.r);
406                    Some(vec![AxOrdersWsMessage::CancelRejected(msg)])
407                }
408                Err(e) => {
409                    log::error!("Failed to parse cancel rejected: {e}");
410                    None
411                }
412            },
413            _ => {
414                log::warn!("Unknown message type: {msg_type}");
415                Some(vec![AxOrdersWsMessage::Error(AxWsError::new(format!(
416                    "Unknown message type: {msg_type}"
417                )))])
418            }
419        }
420    }
421
422    fn parse_response_message(
423        &mut self,
424        value: serde_json::Value,
425    ) -> Option<Vec<AxOrdersWsMessage>> {
426        let obj = value.as_object()?;
427        let res = obj.get("res")?;
428
429        if res.is_object() {
430            if res.get("oid").is_some() {
431                match serde_json::from_value::<AxWsPlaceOrderResponse>(value) {
432                    Ok(msg) => {
433                        log::debug!("Place order response: rid={} oid={}", msg.rid, msg.res.oid);
434                        Some(vec![AxOrdersWsMessage::PlaceOrderResponse(msg)])
435                    }
436                    Err(e) => {
437                        log::error!("Failed to parse place order response: {e}");
438                        None
439                    }
440                }
441            } else if res.get("cxl_rx").is_some() {
442                match serde_json::from_value::<AxWsCancelOrderResponse>(value) {
443                    Ok(msg) => {
444                        log::debug!(
445                            "Cancel order response: rid={} cxl_rx={}",
446                            msg.rid,
447                            msg.res.cxl_rx
448                        );
449                        Some(vec![AxOrdersWsMessage::CancelOrderResponse(msg)])
450                    }
451                    Err(e) => {
452                        log::error!("Failed to parse cancel order response: {e}");
453                        None
454                    }
455                }
456            } else {
457                log::warn!("Unknown response object format");
458                None
459            }
460        } else if res.is_array() {
461            match serde_json::from_value::<AxWsOpenOrdersResponse>(value) {
462                Ok(msg) => {
463                    log::debug!(
464                        "Open orders response: rid={} count={}",
465                        msg.rid,
466                        msg.res.len()
467                    );
468                    Some(vec![AxOrdersWsMessage::OpenOrdersResponse(msg)])
469                }
470                Err(e) => {
471                    log::error!("Failed to parse open orders response: {e}");
472                    None
473                }
474            }
475        } else {
476            log::warn!("Unknown response format");
477            None
478        }
479    }
480}