nautilus_architect_ax/websocket/orders/
handler.rs1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use nautilus_model::instruments::{Instrument, InstrumentAny};
28use nautilus_network::websocket::{AuthTracker, WebSocketClient};
29use tokio_tungstenite::tungstenite::Message;
30use ustr::Ustr;
31
32use crate::websocket::messages::{
33 AxOrdersWsMessage, AxWsCancelOrder, AxWsCancelOrderResponse, AxWsCancelRejected, AxWsError,
34 AxWsGetOpenOrders, AxWsOpenOrdersResponse, AxWsOrderAcknowledged, AxWsOrderCanceled,
35 AxWsOrderDoneForDay, AxWsOrderExpired, AxWsOrderFilled, AxWsOrderPartiallyFilled,
36 AxWsOrderRejected, AxWsOrderReplaced, AxWsPlaceOrder, AxWsPlaceOrderResponse, OrderMetadata,
37};
38
39#[derive(Debug)]
41pub enum HandlerCommand {
42 SetClient(WebSocketClient),
44 Disconnect,
46 Authenticate {
48 token: String,
50 },
51 PlaceOrder {
53 request_id: i64,
55 order: AxWsPlaceOrder,
57 metadata: OrderMetadata,
59 },
60 CancelOrder {
62 request_id: i64,
64 order_id: String,
66 },
67 GetOpenOrders {
69 request_id: i64,
71 },
72 InitializeInstruments(Vec<InstrumentAny>),
74 UpdateInstrument(Box<InstrumentAny>),
76}
77
78pub(crate) struct FeedHandler {
82 signal: Arc<AtomicBool>,
83 client: Option<WebSocketClient>,
84 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
85 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
86 #[allow(dead_code)] out_tx: tokio::sync::mpsc::UnboundedSender<AxOrdersWsMessage>,
88 auth_tracker: AuthTracker,
89 instruments: AHashMap<Ustr, InstrumentAny>,
90 pending_orders: AHashMap<i64, OrderMetadata>,
91 message_queue: VecDeque<AxOrdersWsMessage>,
92}
93
94impl FeedHandler {
95 #[must_use]
97 pub fn new(
98 signal: Arc<AtomicBool>,
99 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
100 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
101 out_tx: tokio::sync::mpsc::UnboundedSender<AxOrdersWsMessage>,
102 auth_tracker: AuthTracker,
103 ) -> Self {
104 Self {
105 signal,
106 client: None,
107 cmd_rx,
108 raw_rx,
109 out_tx,
110 auth_tracker,
111 instruments: AHashMap::new(),
112 pending_orders: AHashMap::new(),
113 message_queue: VecDeque::new(),
114 }
115 }
116
117 pub async fn next(&mut self) -> Option<AxOrdersWsMessage> {
121 loop {
122 if let Some(msg) = self.message_queue.pop_front() {
123 return Some(msg);
124 }
125
126 tokio::select! {
127 Some(cmd) = self.cmd_rx.recv() => {
128 self.handle_command(cmd).await;
129 }
130
131 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
132 if self.signal.load(Ordering::Relaxed) {
133 log::debug!("Stop signal received during idle period");
134 return None;
135 }
136 continue;
137 }
138
139 msg = self.raw_rx.recv() => {
140 let msg = match msg {
141 Some(msg) => msg,
142 None => {
143 log::debug!("WebSocket stream closed");
144 return None;
145 }
146 };
147
148 if let Message::Ping(data) = &msg {
149 log::trace!("Received ping frame with {} bytes", data.len());
150 if let Some(client) = &self.client
151 && let Err(e) = client.send_pong(data.to_vec()).await
152 {
153 log::warn!("Failed to send pong frame: {e}");
154 }
155 continue;
156 }
157
158 if let Some(messages) = self.parse_raw_message(msg) {
159 self.message_queue.extend(messages);
160 }
161
162 if self.signal.load(Ordering::Relaxed) {
163 log::debug!("Stop signal received");
164 return None;
165 }
166 }
167 }
168 }
169 }
170
171 async fn handle_command(&mut self, cmd: HandlerCommand) {
172 match cmd {
173 HandlerCommand::SetClient(client) => {
174 log::debug!("WebSocketClient received by handler");
175 self.client = Some(client);
176 }
177 HandlerCommand::Disconnect => {
178 log::debug!("Disconnect command received");
179 if let Some(client) = self.client.take() {
180 client.disconnect().await;
181 }
182 }
183 HandlerCommand::Authenticate { token: _ } => {
184 log::debug!("Authenticate command received");
185 self.auth_tracker.succeed();
188 self.message_queue
189 .push_back(AxOrdersWsMessage::Authenticated);
190 }
191 HandlerCommand::PlaceOrder {
192 request_id,
193 order,
194 metadata,
195 } => {
196 log::debug!(
197 "PlaceOrder command received: request_id={request_id}, symbol={}",
198 order.s
199 );
200 self.pending_orders.insert(request_id, metadata);
201
202 if let Err(e) = self.send_json(&order).await {
203 log::error!("Failed to send place order message: {e}");
204 self.pending_orders.remove(&request_id);
205 }
206 }
207 HandlerCommand::CancelOrder {
208 request_id,
209 order_id,
210 } => {
211 log::debug!(
212 "CancelOrder command received: request_id={request_id}, order_id={order_id}"
213 );
214 self.send_cancel_order(request_id, &order_id).await;
215 }
216 HandlerCommand::GetOpenOrders { request_id } => {
217 log::debug!("GetOpenOrders command received: request_id={request_id}");
218 self.send_get_open_orders(request_id).await;
219 }
220 HandlerCommand::InitializeInstruments(instruments) => {
221 for inst in instruments {
222 self.instruments.insert(inst.symbol().inner(), inst);
223 }
224 }
225 HandlerCommand::UpdateInstrument(inst) => {
226 self.instruments.insert(inst.symbol().inner(), *inst);
227 }
228 }
229 }
230
231 async fn send_cancel_order(&self, request_id: i64, order_id: &str) {
232 let msg = AxWsCancelOrder {
233 rid: request_id,
234 t: "x".to_string(),
235 oid: order_id.to_string(),
236 };
237
238 if let Err(e) = self.send_json(&msg).await {
239 log::error!("Failed to send cancel order message: {e}");
240 }
241 }
242
243 async fn send_get_open_orders(&self, request_id: i64) {
244 let msg = AxWsGetOpenOrders {
245 rid: request_id,
246 t: "o".to_string(),
247 };
248
249 if let Err(e) = self.send_json(&msg).await {
250 log::error!("Failed to send get open orders message: {e}");
251 }
252 }
253
254 async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
255 let Some(client) = &self.client else {
256 return Err("No WebSocket client available".to_string());
257 };
258
259 let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
260 log::trace!("Sending: {payload}");
261
262 client
263 .send_text(payload, None)
264 .await
265 .map_err(|e| e.to_string())
266 }
267
268 fn parse_raw_message(&mut self, msg: Message) -> Option<Vec<AxOrdersWsMessage>> {
269 match msg {
270 Message::Text(text) => {
271 if text == nautilus_network::RECONNECTED {
272 log::info!("Received WebSocket reconnected signal");
273 return Some(vec![AxOrdersWsMessage::Reconnected]);
274 }
275
276 log::trace!("Raw websocket message: {text}");
277
278 let value: serde_json::Value = match serde_json::from_str(&text) {
279 Ok(v) => v,
280 Err(e) => {
281 log::error!("Failed to parse WebSocket message: {e}: {text}");
282 return None;
283 }
284 };
285
286 self.classify_and_parse_message(value)
287 }
288 Message::Binary(data) => {
289 log::debug!("Received binary message with {} bytes", data.len());
290 None
291 }
292 Message::Close(_) => {
293 log::debug!("Received close message, waiting for reconnection");
294 None
295 }
296 _ => None,
297 }
298 }
299
300 fn classify_and_parse_message(
301 &mut self,
302 value: serde_json::Value,
303 ) -> Option<Vec<AxOrdersWsMessage>> {
304 let obj = value.as_object()?;
305
306 if obj.contains_key("rid") && obj.contains_key("res") {
308 return self.parse_response_message(value);
309 }
310
311 let msg_type = obj.get("t").and_then(|v| v.as_str())?;
312
313 match msg_type {
314 "h" => {
315 log::trace!("Received heartbeat");
316 None
317 }
318 "n" => match serde_json::from_value::<AxWsOrderAcknowledged>(value) {
319 Ok(msg) => {
320 log::debug!("Order acknowledged: {} {}", msg.o.oid, msg.o.s);
321 Some(vec![AxOrdersWsMessage::OrderAcknowledged(msg)])
322 }
323 Err(e) => {
324 log::error!("Failed to parse order acknowledged: {e}");
325 None
326 }
327 },
328 "p" => match serde_json::from_value::<AxWsOrderPartiallyFilled>(value) {
329 Ok(msg) => {
330 log::debug!(
331 "Order partially filled: {} {} @ {}",
332 msg.o.oid,
333 msg.xs.q,
334 msg.xs.p
335 );
336 Some(vec![AxOrdersWsMessage::OrderPartiallyFilled(msg)])
337 }
338 Err(e) => {
339 log::error!("Failed to parse order partially filled: {e}");
340 None
341 }
342 },
343 "f" => match serde_json::from_value::<AxWsOrderFilled>(value) {
344 Ok(msg) => {
345 log::debug!("Order filled: {} {} @ {}", msg.o.oid, msg.xs.q, msg.xs.p);
346 Some(vec![AxOrdersWsMessage::OrderFilled(msg)])
347 }
348 Err(e) => {
349 log::error!("Failed to parse order filled: {e}");
350 None
351 }
352 },
353 "c" => match serde_json::from_value::<AxWsOrderCanceled>(value) {
354 Ok(msg) => {
355 log::debug!("Order canceled: {} reason={}", msg.o.oid, msg.xr);
356 Some(vec![AxOrdersWsMessage::OrderCanceled(msg)])
357 }
358 Err(e) => {
359 log::error!("Failed to parse order canceled: {e}");
360 None
361 }
362 },
363 "j" => match serde_json::from_value::<AxWsOrderRejected>(value) {
364 Ok(msg) => {
365 log::debug!("Order rejected: {} reason={}", msg.o.oid, msg.r);
366 Some(vec![AxOrdersWsMessage::OrderRejectedRaw(msg)])
367 }
368 Err(e) => {
369 log::error!("Failed to parse order rejected: {e}");
370 None
371 }
372 },
373 "x" => match serde_json::from_value::<AxWsOrderExpired>(value) {
374 Ok(msg) => {
375 log::debug!("Order expired: {}", msg.o.oid);
376 Some(vec![AxOrdersWsMessage::OrderExpired(msg)])
377 }
378 Err(e) => {
379 log::error!("Failed to parse order expired: {e}");
380 None
381 }
382 },
383 "r" => match serde_json::from_value::<AxWsOrderReplaced>(value) {
384 Ok(msg) => {
385 log::debug!("Order replaced: {}", msg.o.oid);
386 Some(vec![AxOrdersWsMessage::OrderReplaced(msg)])
387 }
388 Err(e) => {
389 log::error!("Failed to parse order replaced: {e}");
390 None
391 }
392 },
393 "d" => match serde_json::from_value::<AxWsOrderDoneForDay>(value) {
394 Ok(msg) => {
395 log::debug!("Order done for day: {}", msg.o.oid);
396 Some(vec![AxOrdersWsMessage::OrderDoneForDay(msg)])
397 }
398 Err(e) => {
399 log::error!("Failed to parse order done for day: {e}");
400 None
401 }
402 },
403 "e" => match serde_json::from_value::<AxWsCancelRejected>(value) {
404 Ok(msg) => {
405 log::debug!("Cancel rejected: {} reason={}", msg.oid, msg.r);
406 Some(vec![AxOrdersWsMessage::CancelRejected(msg)])
407 }
408 Err(e) => {
409 log::error!("Failed to parse cancel rejected: {e}");
410 None
411 }
412 },
413 _ => {
414 log::warn!("Unknown message type: {msg_type}");
415 Some(vec![AxOrdersWsMessage::Error(AxWsError::new(format!(
416 "Unknown message type: {msg_type}"
417 )))])
418 }
419 }
420 }
421
422 fn parse_response_message(
423 &mut self,
424 value: serde_json::Value,
425 ) -> Option<Vec<AxOrdersWsMessage>> {
426 let obj = value.as_object()?;
427 let res = obj.get("res")?;
428
429 if res.is_object() {
430 if res.get("oid").is_some() {
431 match serde_json::from_value::<AxWsPlaceOrderResponse>(value) {
432 Ok(msg) => {
433 log::debug!("Place order response: rid={} oid={}", msg.rid, msg.res.oid);
434 Some(vec![AxOrdersWsMessage::PlaceOrderResponse(msg)])
435 }
436 Err(e) => {
437 log::error!("Failed to parse place order response: {e}");
438 None
439 }
440 }
441 } else if res.get("cxl_rx").is_some() {
442 match serde_json::from_value::<AxWsCancelOrderResponse>(value) {
443 Ok(msg) => {
444 log::debug!(
445 "Cancel order response: rid={} cxl_rx={}",
446 msg.rid,
447 msg.res.cxl_rx
448 );
449 Some(vec![AxOrdersWsMessage::CancelOrderResponse(msg)])
450 }
451 Err(e) => {
452 log::error!("Failed to parse cancel order response: {e}");
453 None
454 }
455 }
456 } else {
457 log::warn!("Unknown response object format");
458 None
459 }
460 } else if res.is_array() {
461 match serde_json::from_value::<AxWsOpenOrdersResponse>(value) {
462 Ok(msg) => {
463 log::debug!(
464 "Open orders response: rid={} count={}",
465 msg.rid,
466 msg.res.len()
467 );
468 Some(vec![AxOrdersWsMessage::OpenOrdersResponse(msg)])
469 }
470 Err(e) => {
471 log::error!("Failed to parse open orders response: {e}");
472 None
473 }
474 }
475 } else {
476 log::warn!("Unknown response format");
477 None
478 }
479 }
480}