1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use dashmap::DashMap;
28use nautilus_core::{
29 UUID4,
30 nanos::UnixNanos,
31 time::{AtomicTime, get_atomic_clock_realtime},
32};
33use nautilus_model::{
34 enums::{LiquiditySide, OrderSide as NautilusOrderSide, OrderStatus, OrderType, TimeInForce},
35 events::{
36 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderExpired, OrderFilled, OrderRejected,
37 },
38 identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
39 instruments::{Instrument, InstrumentAny},
40 reports::{FillReport, OrderStatusReport},
41 types::{Money, Price, Quantity},
42};
43use nautilus_network::websocket::{AuthTracker, WebSocketClient};
44use tokio_tungstenite::tungstenite::Message;
45use ustr::Ustr;
46
47use crate::{
48 common::{
49 consts::AX_POST_ONLY_REJECT,
50 enums::{AxOrderRequestType, AxOrderSide, AxTimeInForce},
51 parse::{ax_timestamp_s_to_unix_nanos, cid_to_client_order_id},
52 },
53 websocket::{
54 messages::{
55 AxOrdersWsMessage, AxWsCancelOrder, AxWsCancelRejected, AxWsGetOpenOrders, AxWsOrder,
56 AxWsOrderAcknowledged, AxWsOrderCanceled, AxWsOrderDoneForDay, AxWsOrderEvent,
57 AxWsOrderExpired, AxWsOrderFilled, AxWsOrderPartiallyFilled, AxWsOrderRejected,
58 AxWsOrderReplaced, AxWsOrderResponse, AxWsPlaceOrder, AxWsRawMessage,
59 AxWsTradeExecution, NautilusExecWsMessage, OrderMetadata,
60 },
61 parse::parse_order_message,
62 },
63};
64
65fn map_time_in_force(tif: AxTimeInForce) -> TimeInForce {
66 match tif {
67 AxTimeInForce::Gtc => TimeInForce::Gtc,
68 AxTimeInForce::Ioc => TimeInForce::Ioc,
69 AxTimeInForce::Fok => TimeInForce::Fok,
70 AxTimeInForce::Day => TimeInForce::Day,
71 AxTimeInForce::Gtd => TimeInForce::Gtd,
72 AxTimeInForce::Ato => TimeInForce::AtTheOpen,
73 AxTimeInForce::Atc => TimeInForce::AtTheClose,
74 }
75}
76
77fn map_order_side(side: AxOrderSide) -> NautilusOrderSide {
78 match side {
79 AxOrderSide::Buy => NautilusOrderSide::Buy,
80 AxOrderSide::Sell => NautilusOrderSide::Sell,
81 }
82}
83
84#[derive(Clone, Debug)]
86pub struct WsOrderInfo {
87 pub client_order_id: ClientOrderId,
89 pub symbol: Ustr,
91}
92
93#[derive(Debug)]
95pub enum HandlerCommand {
96 SetClient(WebSocketClient),
98 Disconnect,
100 Authenticate {
102 token: String,
104 },
105 PlaceOrder {
107 request_id: i64,
109 order: AxWsPlaceOrder,
111 order_info: WsOrderInfo,
113 },
114 CancelOrder {
116 request_id: i64,
118 order_id: String,
120 },
121 GetOpenOrders {
123 request_id: i64,
125 },
126 InitializeInstruments(Vec<InstrumentAny>),
128 UpdateInstrument(Box<InstrumentAny>),
130 StoreOrderMetadata {
132 client_order_id: ClientOrderId,
134 metadata: OrderMetadata,
136 },
137}
138
139pub(crate) struct FeedHandler {
143 clock: &'static AtomicTime,
144 signal: Arc<AtomicBool>,
145 client: Option<WebSocketClient>,
146 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
147 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
148 auth_tracker: AuthTracker,
149 account_id: AccountId,
150 instruments: AHashMap<Ustr, InstrumentAny>,
151 pending_orders: AHashMap<i64, WsOrderInfo>,
152 message_queue: VecDeque<AxOrdersWsMessage>,
153 orders_metadata: Arc<DashMap<ClientOrderId, OrderMetadata>>,
154 venue_to_client_id: Arc<DashMap<VenueOrderId, ClientOrderId>>,
155 cid_to_client_order_id: Arc<DashMap<u64, ClientOrderId>>,
156 bearer_token: Option<String>,
157 needs_reauthentication: bool,
158}
159
160impl FeedHandler {
161 #[must_use]
163 #[allow(clippy::too_many_arguments)]
164 pub fn new(
165 signal: Arc<AtomicBool>,
166 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
167 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
168 auth_tracker: AuthTracker,
169 account_id: AccountId,
170 orders_metadata: Arc<DashMap<ClientOrderId, OrderMetadata>>,
171 venue_to_client_id: Arc<DashMap<VenueOrderId, ClientOrderId>>,
172 cid_to_client_order_id: Arc<DashMap<u64, ClientOrderId>>,
173 ) -> Self {
174 Self {
175 clock: get_atomic_clock_realtime(),
176 signal,
177 client: None,
178 cmd_rx,
179 raw_rx,
180 auth_tracker,
181 account_id,
182 instruments: AHashMap::new(),
183 pending_orders: AHashMap::new(),
184 message_queue: VecDeque::new(),
185 orders_metadata,
186 venue_to_client_id,
187 cid_to_client_order_id,
188 bearer_token: None,
189 needs_reauthentication: false,
190 }
191 }
192
193 fn generate_ts_init(&self) -> UnixNanos {
194 self.clock.get_time_ns()
195 }
196
197 async fn reauthenticate(&mut self) {
198 if self.bearer_token.is_some() {
199 log::info!("Re-authenticating after reconnection");
200
201 self.auth_tracker.succeed();
203 self.message_queue
204 .push_back(AxOrdersWsMessage::Authenticated);
205 log::info!("Re-authentication completed");
206 } else {
207 log::warn!("Cannot re-authenticate: no bearer token stored");
208 }
209 }
210
211 pub async fn next(&mut self) -> Option<AxOrdersWsMessage> {
215 loop {
216 if self.needs_reauthentication && self.message_queue.is_empty() {
217 self.needs_reauthentication = false;
218 self.reauthenticate().await;
219 }
220
221 if let Some(msg) = self.message_queue.pop_front() {
222 return Some(msg);
223 }
224
225 tokio::select! {
226 Some(cmd) = self.cmd_rx.recv() => {
227 self.handle_command(cmd).await;
228 }
229
230 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
231 if self.signal.load(Ordering::Acquire) {
232 log::debug!("Stop signal received during idle period");
233 return None;
234 }
235 continue;
236 }
237
238 msg = self.raw_rx.recv() => {
239 let msg = match msg {
240 Some(msg) => msg,
241 None => {
242 log::debug!("WebSocket stream closed");
243 return None;
244 }
245 };
246
247 if let Message::Ping(data) = &msg {
248 log::trace!("Received ping frame with {} bytes", data.len());
249 if let Some(client) = &self.client
250 && let Err(e) = client.send_pong(data.to_vec()).await
251 {
252 log::warn!("Failed to send pong frame: {e}");
253 }
254 continue;
255 }
256
257 if let Some(messages) = self.parse_raw_message(msg) {
258 self.message_queue.extend(messages);
259 }
260
261 if self.signal.load(Ordering::Acquire) {
262 log::debug!("Stop signal received");
263 return None;
264 }
265 }
266 }
267 }
268 }
269
270 async fn handle_command(&mut self, cmd: HandlerCommand) {
271 match cmd {
272 HandlerCommand::SetClient(client) => {
273 log::debug!("WebSocketClient received by handler");
274 self.client = Some(client);
275 }
276 HandlerCommand::Disconnect => {
277 log::debug!("Disconnect command received");
278 self.auth_tracker.fail("Disconnected");
279 if let Some(client) = self.client.take() {
280 client.disconnect().await;
281 }
282 }
283 HandlerCommand::Authenticate { token } => {
284 log::debug!("Authenticate command received");
285 self.bearer_token = Some(token);
286
287 self.auth_tracker.succeed();
289 self.message_queue
290 .push_back(AxOrdersWsMessage::Authenticated);
291 }
292 HandlerCommand::PlaceOrder {
293 request_id,
294 order,
295 order_info,
296 } => {
297 log::debug!(
298 "PlaceOrder command received: request_id={request_id}, symbol={}",
299 order.s
300 );
301 self.pending_orders.insert(request_id, order_info);
302
303 if let Err(e) = self.send_json(&order).await {
304 log::error!("Failed to send place order message: {e}");
305 self.pending_orders.remove(&request_id);
306 }
307 }
308 HandlerCommand::CancelOrder {
309 request_id,
310 order_id,
311 } => {
312 log::debug!(
313 "CancelOrder command received: request_id={request_id}, order_id={order_id}"
314 );
315 self.send_cancel_order(request_id, &order_id).await;
316 }
317 HandlerCommand::GetOpenOrders { request_id } => {
318 log::debug!("GetOpenOrders command received: request_id={request_id}");
319 self.send_get_open_orders(request_id).await;
320 }
321 HandlerCommand::InitializeInstruments(instruments) => {
322 for inst in instruments {
323 self.instruments.insert(inst.symbol().inner(), inst);
324 }
325 }
326 HandlerCommand::UpdateInstrument(inst) => {
327 self.instruments.insert(inst.symbol().inner(), *inst);
328 }
329 HandlerCommand::StoreOrderMetadata {
330 client_order_id,
331 metadata,
332 } => {
333 self.orders_metadata.insert(client_order_id, metadata);
334 }
335 }
336 }
337
338 async fn send_cancel_order(&self, request_id: i64, order_id: &str) {
339 let msg = AxWsCancelOrder {
340 rid: request_id,
341 t: AxOrderRequestType::CancelOrder,
342 oid: order_id.to_string(),
343 };
344
345 if let Err(e) = self.send_json(&msg).await {
346 log::error!("Failed to send cancel order message: {e}");
347 }
348 }
349
350 async fn send_get_open_orders(&self, request_id: i64) {
351 let msg = AxWsGetOpenOrders {
352 rid: request_id,
353 t: AxOrderRequestType::GetOpenOrders,
354 };
355
356 if let Err(e) = self.send_json(&msg).await {
357 log::error!("Failed to send get open orders message: {e}");
358 }
359 }
360
361 async fn send_json<T: serde::Serialize>(&self, msg: &T) -> Result<(), String> {
362 let Some(client) = &self.client else {
363 return Err("No WebSocket client available".to_string());
364 };
365
366 let payload = serde_json::to_string(msg).map_err(|e| e.to_string())?;
367 log::trace!("Sending: {payload}");
368
369 client
370 .send_text(payload, None)
371 .await
372 .map_err(|e| e.to_string())
373 }
374
375 fn parse_raw_message(&mut self, msg: Message) -> Option<Vec<AxOrdersWsMessage>> {
376 match msg {
377 Message::Text(text) => {
378 if text == nautilus_network::RECONNECTED {
379 log::info!("Received WebSocket reconnected signal");
380 self.auth_tracker.fail("Reconnecting");
381 self.needs_reauthentication = true;
382 return Some(vec![AxOrdersWsMessage::Reconnected]);
383 }
384
385 log::trace!("Raw websocket message: {text}");
386
387 let raw_msg: AxWsRawMessage = match parse_order_message(&text) {
388 Ok(v) => v,
389 Err(e) => {
390 log::error!("Failed to parse WebSocket message: {e}: {text}");
391 return None;
392 }
393 };
394
395 self.handle_raw_message(raw_msg)
396 }
397 Message::Binary(data) => {
398 log::debug!("Received binary message with {} bytes", data.len());
399 None
400 }
401 Message::Close(_) => {
402 log::debug!("Received close message, waiting for reconnection");
403 None
404 }
405 _ => None,
406 }
407 }
408
409 fn handle_raw_message(&mut self, raw_msg: AxWsRawMessage) -> Option<Vec<AxOrdersWsMessage>> {
410 match raw_msg {
411 AxWsRawMessage::Error(err) => {
412 log::warn!(
413 "Order error response: rid={} code={} msg={}",
414 err.rid,
415 err.err.code,
416 err.err.msg
417 );
418
419 if let Some(order_info) = self.pending_orders.remove(&err.rid) {
420 self.orders_metadata.remove(&order_info.client_order_id);
421 log::debug!(
422 "Cleaned up metadata for failed order: {}",
423 order_info.client_order_id
424 );
425 }
426
427 Some(vec![AxOrdersWsMessage::Error(err.into())])
428 }
429 AxWsRawMessage::Response(resp) => self.handle_response(resp),
430 AxWsRawMessage::Event(event) => self.handle_event(*event),
431 }
432 }
433
434 fn handle_response(&mut self, resp: AxWsOrderResponse) -> Option<Vec<AxOrdersWsMessage>> {
435 match resp {
436 AxWsOrderResponse::PlaceOrder(msg) => {
437 log::debug!("Place order response: rid={} oid={}", msg.rid, msg.res.oid);
438 self.pending_orders.remove(&msg.rid);
439 Some(vec![AxOrdersWsMessage::PlaceOrderResponse(msg)])
440 }
441 AxWsOrderResponse::CancelOrder(msg) => {
442 log::debug!(
443 "Cancel order response: rid={} accepted={}",
444 msg.rid,
445 msg.res.cxl_rx
446 );
447 Some(vec![AxOrdersWsMessage::CancelOrderResponse(msg)])
448 }
449 AxWsOrderResponse::OpenOrders(msg) => {
450 log::debug!("Open orders response: {} orders", msg.res.len());
451 Some(vec![AxOrdersWsMessage::OpenOrdersResponse(msg)])
452 }
453 AxWsOrderResponse::List(msg) => {
454 let order_count = msg.res.o.as_ref().map_or(0, |o| o.len());
455 log::debug!(
456 "List subscription response: rid={} li={} orders={}",
457 msg.rid,
458 msg.res.li,
459 order_count
460 );
461 None
462 }
463 }
464 }
465
466 fn handle_event(&mut self, event: AxWsOrderEvent) -> Option<Vec<AxOrdersWsMessage>> {
467 match event {
468 AxWsOrderEvent::Heartbeat => {
469 log::trace!("Received heartbeat");
470 None
471 }
472 AxWsOrderEvent::Acknowledged(msg) => self.handle_order_acknowledged(msg),
473 AxWsOrderEvent::PartiallyFilled(msg) => self.handle_order_partially_filled(msg),
474 AxWsOrderEvent::Filled(msg) => self.handle_order_filled(msg),
475 AxWsOrderEvent::Canceled(msg) => self.handle_order_canceled(msg),
476 AxWsOrderEvent::Rejected(msg) => self.handle_order_rejected(msg),
477 AxWsOrderEvent::Expired(msg) => self.handle_order_expired(msg),
478 AxWsOrderEvent::Replaced(msg) => self.handle_order_replaced(msg),
479 AxWsOrderEvent::DoneForDay(msg) => self.handle_order_done_for_day(msg),
480 AxWsOrderEvent::CancelRejected(msg) => self.handle_cancel_rejected(msg),
481 }
482 }
483
484 fn handle_order_acknowledged(
485 &mut self,
486 msg: AxWsOrderAcknowledged,
487 ) -> Option<Vec<AxOrdersWsMessage>> {
488 log::debug!("Order acknowledged: {} {}", msg.o.oid, msg.o.s);
489
490 if let Some(event) = self.create_order_accepted(&msg.o, msg.ts) {
491 Some(vec![AxOrdersWsMessage::Nautilus(
492 NautilusExecWsMessage::OrderAccepted(event),
493 )])
494 } else if let Some(report) =
495 self.create_order_status_report(&msg.o, OrderStatus::Accepted, msg.ts)
496 {
497 log::debug!("Created OrderStatusReport for external order {}", msg.o.oid);
498 Some(vec![AxOrdersWsMessage::Nautilus(
499 NautilusExecWsMessage::OrderStatusReports(vec![report]),
500 )])
501 } else {
502 log::warn!(
503 "Could not create OrderAccepted event for order {}",
504 msg.o.oid
505 );
506 None
507 }
508 }
509
510 fn handle_order_partially_filled(
511 &mut self,
512 msg: AxWsOrderPartiallyFilled,
513 ) -> Option<Vec<AxOrdersWsMessage>> {
514 log::debug!(
515 "Order partially filled: {} {} @ {}",
516 msg.o.oid,
517 msg.xs.q,
518 msg.xs.p
519 );
520
521 if let Some(event) = self.create_order_filled(&msg.o, &msg.xs, msg.ts) {
522 Some(vec![AxOrdersWsMessage::Nautilus(
523 NautilusExecWsMessage::OrderFilled(Box::new(event)),
524 )])
525 } else if let Some(report) = self.create_fill_report(&msg.o, &msg.xs, msg.ts) {
526 log::debug!("Created FillReport for external order {}", msg.o.oid);
527 Some(vec![AxOrdersWsMessage::Nautilus(
528 NautilusExecWsMessage::FillReports(vec![report]),
529 )])
530 } else {
531 log::warn!("Could not create OrderFilled event for order {}", msg.o.oid);
532 None
533 }
534 }
535
536 fn handle_order_filled(&mut self, msg: AxWsOrderFilled) -> Option<Vec<AxOrdersWsMessage>> {
537 log::debug!("Order filled: {} {} @ {}", msg.o.oid, msg.xs.q, msg.xs.p);
538
539 let message = if let Some(event) = self.create_order_filled(&msg.o, &msg.xs, msg.ts) {
540 Some(vec![AxOrdersWsMessage::Nautilus(
541 NautilusExecWsMessage::OrderFilled(Box::new(event)),
542 )])
543 } else if let Some(report) = self.create_fill_report(&msg.o, &msg.xs, msg.ts) {
544 log::debug!("Created FillReport for external order {}", msg.o.oid);
545 Some(vec![AxOrdersWsMessage::Nautilus(
546 NautilusExecWsMessage::FillReports(vec![report]),
547 )])
548 } else {
549 log::warn!("Could not create OrderFilled event for order {}", msg.o.oid);
550 None
551 };
552
553 self.cleanup_terminal_order_tracking(&msg.o);
554 message
555 }
556
557 fn handle_order_canceled(&mut self, msg: AxWsOrderCanceled) -> Option<Vec<AxOrdersWsMessage>> {
558 log::debug!("Order canceled: {} reason={}", msg.o.oid, msg.xr);
559
560 let message = if let Some(event) = self.create_order_canceled(&msg.o, msg.ts) {
561 Some(vec![AxOrdersWsMessage::Nautilus(
562 NautilusExecWsMessage::OrderCanceled(event),
563 )])
564 } else if let Some(report) =
565 self.create_order_status_report(&msg.o, OrderStatus::Canceled, msg.ts)
566 {
567 log::debug!("Created OrderStatusReport for external order {}", msg.o.oid);
568 Some(vec![AxOrdersWsMessage::Nautilus(
569 NautilusExecWsMessage::OrderStatusReports(vec![report]),
570 )])
571 } else {
572 log::warn!(
573 "Could not create OrderCanceled event for order {}",
574 msg.o.oid
575 );
576 None
577 };
578
579 self.cleanup_terminal_order_tracking(&msg.o);
580 message
581 }
582
583 fn handle_order_rejected(&mut self, msg: AxWsOrderRejected) -> Option<Vec<AxOrdersWsMessage>> {
584 let reason = msg.r.as_deref().or(msg.txt.as_deref()).unwrap_or("UNKNOWN");
586
587 let message = if let Some(event) = self.create_order_rejected(&msg.o, reason, msg.ts) {
588 Some(vec![AxOrdersWsMessage::Nautilus(
589 NautilusExecWsMessage::OrderRejected(event),
590 )])
591 } else {
592 log::warn!(
593 "Could not create OrderRejected event for order {}",
594 msg.o.oid
595 );
596 None
597 };
598
599 self.cleanup_terminal_order_tracking(&msg.o);
600 message
601 }
602
603 fn handle_order_expired(&mut self, msg: AxWsOrderExpired) -> Option<Vec<AxOrdersWsMessage>> {
604 log::debug!("Order expired: {}", msg.o.oid);
605
606 let message = if let Some(event) = self.create_order_expired(&msg.o, msg.ts) {
607 Some(vec![AxOrdersWsMessage::Nautilus(
608 NautilusExecWsMessage::OrderExpired(event),
609 )])
610 } else if let Some(report) =
611 self.create_order_status_report(&msg.o, OrderStatus::Expired, msg.ts)
612 {
613 log::debug!("Created OrderStatusReport for external order {}", msg.o.oid);
614 Some(vec![AxOrdersWsMessage::Nautilus(
615 NautilusExecWsMessage::OrderStatusReports(vec![report]),
616 )])
617 } else {
618 log::warn!(
619 "Could not create OrderExpired event for order {}",
620 msg.o.oid
621 );
622 None
623 };
624
625 self.cleanup_terminal_order_tracking(&msg.o);
626 message
627 }
628
629 fn handle_order_replaced(&mut self, msg: AxWsOrderReplaced) -> Option<Vec<AxOrdersWsMessage>> {
630 log::debug!("Order replaced: {}", msg.o.oid);
631
632 if let Some(event) = self.create_order_accepted(&msg.o, msg.ts) {
634 Some(vec![AxOrdersWsMessage::Nautilus(
635 NautilusExecWsMessage::OrderAccepted(event),
636 )])
637 } else if let Some(report) =
638 self.create_order_status_report(&msg.o, OrderStatus::Accepted, msg.ts)
639 {
640 log::debug!(
641 "Created OrderStatusReport for external replaced order {}",
642 msg.o.oid
643 );
644 Some(vec![AxOrdersWsMessage::Nautilus(
645 NautilusExecWsMessage::OrderStatusReports(vec![report]),
646 )])
647 } else {
648 log::warn!(
649 "Could not create OrderAccepted event for replaced order {}",
650 msg.o.oid
651 );
652 None
653 }
654 }
655
656 fn handle_order_done_for_day(
657 &mut self,
658 msg: AxWsOrderDoneForDay,
659 ) -> Option<Vec<AxOrdersWsMessage>> {
660 log::debug!("Order done for day: {}", msg.o.oid);
661
662 let message = if let Some(event) = self.create_order_expired(&msg.o, msg.ts) {
663 Some(vec![AxOrdersWsMessage::Nautilus(
664 NautilusExecWsMessage::OrderExpired(event),
665 )])
666 } else if let Some(report) =
667 self.create_order_status_report(&msg.o, OrderStatus::Expired, msg.ts)
668 {
669 log::debug!(
670 "Created OrderStatusReport for external done-for-day order {}",
671 msg.o.oid
672 );
673 Some(vec![AxOrdersWsMessage::Nautilus(
674 NautilusExecWsMessage::OrderStatusReports(vec![report]),
675 )])
676 } else {
677 log::warn!(
678 "Could not create OrderExpired event for done-for-day order {}",
679 msg.o.oid
680 );
681 None
682 };
683
684 self.cleanup_terminal_order_tracking(&msg.o);
685 message
686 }
687
688 fn handle_cancel_rejected(
689 &mut self,
690 msg: AxWsCancelRejected,
691 ) -> Option<Vec<AxOrdersWsMessage>> {
692 log::warn!("Cancel rejected: {} reason={}", msg.oid, msg.r);
693
694 let venue_order_id = VenueOrderId::new(&msg.oid);
695 if let Some(client_order_id) = self.venue_to_client_id.get(&venue_order_id)
696 && let Some(metadata) = self.orders_metadata.get(&client_order_id)
697 {
698 let event = OrderCancelRejected::new(
699 metadata.trader_id,
700 metadata.strategy_id,
701 metadata.instrument_id,
702 metadata.client_order_id,
703 Ustr::from(msg.r.as_ref()),
704 UUID4::new(),
705 self.generate_ts_init(),
706 metadata.ts_init,
707 false,
708 Some(venue_order_id),
709 Some(self.account_id),
710 );
711 Some(vec![AxOrdersWsMessage::Nautilus(
712 NautilusExecWsMessage::OrderCancelRejected(event),
713 )])
714 } else {
715 log::warn!(
716 "Could not find metadata for cancel rejected order {}",
717 msg.oid
718 );
719 None
720 }
721 }
722
723 fn lookup_order_metadata(
724 &self,
725 order: &AxWsOrder,
726 ) -> Option<dashmap::mapref::one::Ref<'_, ClientOrderId, OrderMetadata>> {
727 let venue_order_id = VenueOrderId::new(&order.oid);
728
729 if let Some(client_order_id) = self.venue_to_client_id.get(&venue_order_id)
731 && let Some(metadata) = self.orders_metadata.get(&*client_order_id)
732 {
733 return Some(metadata);
734 }
735
736 if let Some(cid) = order.cid
738 && let Some(client_order_id) = self.cid_to_client_order_id.get(&cid)
739 && let Some(metadata) = self.orders_metadata.get(&*client_order_id)
740 {
741 return Some(metadata);
742 }
743
744 None
745 }
746
747 fn create_order_accepted(&mut self, order: &AxWsOrder, event_ts: i64) -> Option<OrderAccepted> {
748 let venue_order_id = VenueOrderId::new(&order.oid);
749 let metadata = self.lookup_order_metadata(order)?;
750
751 let client_order_id = metadata.client_order_id;
753 let trader_id = metadata.trader_id;
754 let strategy_id = metadata.strategy_id;
755 let instrument_id = metadata.instrument_id;
756
757 drop(metadata);
759
760 self.venue_to_client_id
762 .insert(venue_order_id, client_order_id);
763
764 if let Some(mut entry) = self.orders_metadata.get_mut(&client_order_id) {
766 entry.venue_order_id = Some(venue_order_id);
767 }
768
769 let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
770
771 Some(OrderAccepted::new(
772 trader_id,
773 strategy_id,
774 instrument_id,
775 client_order_id,
776 venue_order_id,
777 self.account_id,
778 UUID4::new(),
779 ts_event,
780 self.generate_ts_init(),
781 false,
782 ))
783 }
784
785 fn create_order_filled(
786 &self,
787 order: &AxWsOrder,
788 execution: &AxWsTradeExecution,
789 event_ts: i64,
790 ) -> Option<OrderFilled> {
791 let venue_order_id = VenueOrderId::new(&order.oid);
792 let metadata = self.lookup_order_metadata(order)?;
793
794 let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
795
796 let last_qty = Quantity::new(execution.q as f64, metadata.size_precision);
798 let last_px = Price::from_decimal_dp(execution.p, metadata.price_precision).ok()?;
799
800 let order_side = match order.d {
801 AxOrderSide::Buy => NautilusOrderSide::Buy,
802 AxOrderSide::Sell => NautilusOrderSide::Sell,
803 };
804
805 let order_type = OrderType::Limit;
807
808 let liquidity_side = if execution.agg {
810 LiquiditySide::Taker
811 } else {
812 LiquiditySide::Maker
813 };
814
815 Some(OrderFilled::new(
816 metadata.trader_id,
817 metadata.strategy_id,
818 metadata.instrument_id,
819 metadata.client_order_id,
820 venue_order_id,
821 self.account_id,
822 TradeId::new(&execution.tid),
823 order_side,
824 order_type,
825 last_qty,
826 last_px,
827 metadata.quote_currency,
828 liquidity_side,
829 UUID4::new(),
830 ts_event,
831 self.generate_ts_init(),
832 false,
833 None, None, ))
836 }
837
838 fn create_order_canceled(&self, order: &AxWsOrder, event_ts: i64) -> Option<OrderCanceled> {
839 let venue_order_id = VenueOrderId::new(&order.oid);
840 let metadata = self.lookup_order_metadata(order)?;
841
842 let client_order_id = metadata.client_order_id;
843 let trader_id = metadata.trader_id;
844 let strategy_id = metadata.strategy_id;
845 let instrument_id = metadata.instrument_id;
846
847 let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
848
849 Some(OrderCanceled::new(
850 trader_id,
851 strategy_id,
852 instrument_id,
853 client_order_id,
854 UUID4::new(),
855 ts_event,
856 self.generate_ts_init(),
857 false,
858 Some(venue_order_id),
859 Some(self.account_id),
860 ))
861 }
862
863 fn create_order_expired(&self, order: &AxWsOrder, event_ts: i64) -> Option<OrderExpired> {
864 let venue_order_id = VenueOrderId::new(&order.oid);
865 let metadata = self.lookup_order_metadata(order)?;
866
867 let client_order_id = metadata.client_order_id;
868 let trader_id = metadata.trader_id;
869 let strategy_id = metadata.strategy_id;
870 let instrument_id = metadata.instrument_id;
871
872 let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
873
874 Some(OrderExpired::new(
875 trader_id,
876 strategy_id,
877 instrument_id,
878 client_order_id,
879 UUID4::new(),
880 ts_event,
881 self.generate_ts_init(),
882 false,
883 Some(venue_order_id),
884 Some(self.account_id),
885 ))
886 }
887
888 fn create_order_rejected(
889 &self,
890 order: &AxWsOrder,
891 reason: &str,
892 event_ts: i64,
893 ) -> Option<OrderRejected> {
894 let metadata = self.lookup_order_metadata(order)?;
895
896 let client_order_id = metadata.client_order_id;
897 let trader_id = metadata.trader_id;
898 let strategy_id = metadata.strategy_id;
899 let instrument_id = metadata.instrument_id;
900
901 let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
902 let due_post_only = reason.contains(AX_POST_ONLY_REJECT);
903
904 Some(OrderRejected::new(
905 trader_id,
906 strategy_id,
907 instrument_id,
908 client_order_id,
909 self.account_id,
910 Ustr::from(reason),
911 UUID4::new(),
912 ts_event,
913 self.generate_ts_init(),
914 false,
915 due_post_only,
916 ))
917 }
918
919 fn cleanup_terminal_order_tracking(&mut self, order: &AxWsOrder) {
920 let venue_order_id = VenueOrderId::new(&order.oid);
921 let client_order_id = self
922 .venue_to_client_id
923 .remove(&venue_order_id)
924 .map(|(_, v)| v)
925 .or_else(|| {
926 order
927 .cid
928 .and_then(|cid| self.cid_to_client_order_id.remove(&cid).map(|(_, v)| v))
929 });
930
931 if let Some(client_order_id) = client_order_id {
932 self.orders_metadata.remove(&client_order_id);
933 }
934
935 if let Some(cid) = order.cid {
936 self.cid_to_client_order_id.remove(&cid);
937 }
938 }
939
940 fn create_order_status_report(
941 &self,
942 order: &AxWsOrder,
943 order_status: OrderStatus,
944 event_ts: i64,
945 ) -> Option<OrderStatusReport> {
946 let instrument = self.instruments.get(&order.s)?;
947 let venue_order_id = VenueOrderId::new(&order.oid);
948 let instrument_id = instrument.id();
949 let order_side = map_order_side(order.d);
950 let time_in_force = map_time_in_force(order.tif);
951
952 let quantity = Quantity::new(order.q as f64, instrument.size_precision());
953 let filled_qty = Quantity::new(order.xq as f64, instrument.size_precision());
954
955 let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
956 let ts_init = self.generate_ts_init();
957
958 let client_order_id = order.cid.map(|cid| {
959 self.cid_to_client_order_id
960 .get(&cid)
961 .map_or_else(|| cid_to_client_order_id(cid), |v| *v)
962 });
963
964 let mut report = OrderStatusReport::new(
965 self.account_id,
966 instrument_id,
967 client_order_id,
968 venue_order_id,
969 order_side,
970 OrderType::Limit, time_in_force,
972 order_status,
973 quantity,
974 filled_qty,
975 ts_event, ts_event, ts_init,
978 Some(UUID4::new()),
979 );
980
981 if let Ok(price) = Price::from_decimal_dp(order.p, instrument.price_precision()) {
982 report = report.with_price(price);
983 }
984
985 Some(report)
986 }
987
988 fn create_fill_report(
989 &self,
990 order: &AxWsOrder,
991 execution: &AxWsTradeExecution,
992 event_ts: i64,
993 ) -> Option<FillReport> {
994 let instrument = self.instruments.get(&order.s)?;
995 let venue_order_id = VenueOrderId::new(&order.oid);
996 let instrument_id = instrument.id();
997 let order_side = map_order_side(order.d);
998
999 let last_qty = Quantity::new(execution.q as f64, instrument.size_precision());
1000 let last_px = Price::from_decimal_dp(execution.p, instrument.price_precision()).ok()?;
1001
1002 let liquidity_side = if execution.agg {
1004 LiquiditySide::Taker
1005 } else {
1006 LiquiditySide::Maker
1007 };
1008
1009 let ts_event = ax_timestamp_s_to_unix_nanos(event_ts);
1010 let ts_init = self.generate_ts_init();
1011
1012 let client_order_id = order.cid.map(|cid| {
1013 self.cid_to_client_order_id
1014 .get(&cid)
1015 .map_or_else(|| cid_to_client_order_id(cid), |v| *v)
1016 });
1017
1018 let commission = Money::new(0.0, instrument.quote_currency());
1020
1021 Some(FillReport::new(
1022 self.account_id,
1023 instrument_id,
1024 venue_order_id,
1025 TradeId::new(&execution.tid),
1026 order_side,
1027 last_qty,
1028 last_px,
1029 commission,
1030 liquidity_side,
1031 client_order_id,
1032 None, ts_event,
1034 ts_init,
1035 Some(UUID4::new()),
1036 ))
1037 }
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042 use std::sync::{Arc, atomic::AtomicBool};
1043
1044 use dashmap::DashMap;
1045 use nautilus_model::{
1046 identifiers::{InstrumentId, StrategyId, TraderId},
1047 types::Currency,
1048 };
1049 use nautilus_network::websocket::AuthTracker;
1050 use rstest::rstest;
1051 use rust_decimal_macros::dec;
1052 use ustr::Ustr;
1053
1054 use super::*;
1055 use crate::{
1056 common::enums::{AxOrderSide, AxOrderStatus, AxTimeInForce},
1057 websocket::messages::{AxWsOrderRejected, AxWsPlaceOrderResponse, AxWsPlaceOrderResult},
1058 };
1059
1060 fn test_handler() -> FeedHandler {
1061 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1062 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1063 FeedHandler::new(
1064 Arc::new(AtomicBool::new(false)),
1065 cmd_rx,
1066 raw_rx,
1067 AuthTracker::default(),
1068 AccountId::from("AX-001"),
1069 Arc::new(DashMap::new()),
1070 Arc::new(DashMap::new()),
1071 Arc::new(DashMap::new()),
1072 )
1073 }
1074
1075 fn sample_order(cid: u64) -> AxWsOrder {
1076 AxWsOrder {
1077 oid: "OID-1".to_string(),
1078 u: "user-1".to_string(),
1079 s: Ustr::from("BTCUSD-PERP"),
1080 p: dec!(50000),
1081 q: 100,
1082 xq: 100,
1083 rq: 0,
1084 o: AxOrderStatus::Filled,
1085 d: AxOrderSide::Buy,
1086 tif: AxTimeInForce::Gtc,
1087 ts: 1_700_000_000,
1088 tn: 1,
1089 cid: Some(cid),
1090 tag: None,
1091 txt: None,
1092 }
1093 }
1094
1095 fn sample_metadata(
1096 client_order_id: ClientOrderId,
1097 venue_order_id: VenueOrderId,
1098 ) -> OrderMetadata {
1099 OrderMetadata {
1100 trader_id: TraderId::from("TRADER-001"),
1101 strategy_id: StrategyId::from("S-001"),
1102 instrument_id: InstrumentId::from("BTCUSD-PERP.AX"),
1103 client_order_id,
1104 venue_order_id: Some(venue_order_id),
1105 ts_init: UnixNanos::from(1_700_000_000_000_000_000u64),
1106 size_precision: 8,
1107 price_precision: 2,
1108 quote_currency: Currency::USD(),
1109 }
1110 }
1111
1112 #[rstest]
1113 fn test_place_order_response_cleans_pending_order() {
1114 let mut handler = test_handler();
1115 let request_id = 11;
1116 handler.pending_orders.insert(
1117 request_id,
1118 WsOrderInfo {
1119 client_order_id: ClientOrderId::from("CID-11"),
1120 symbol: Ustr::from("BTCUSD-PERP"),
1121 },
1122 );
1123
1124 let response = AxWsOrderResponse::PlaceOrder(AxWsPlaceOrderResponse {
1125 rid: request_id,
1126 res: AxWsPlaceOrderResult {
1127 oid: "OID-11".to_string(),
1128 },
1129 });
1130
1131 let messages = handler.handle_response(response).unwrap();
1132 assert_eq!(messages.len(), 1);
1133 assert!(handler.pending_orders.get(&request_id).is_none());
1134 }
1135
1136 #[rstest]
1137 fn test_handle_order_filled_cleans_tracking_maps() {
1138 let mut handler = test_handler();
1139
1140 let client_order_id = ClientOrderId::from("CID-22");
1141 let venue_order_id = VenueOrderId::new("OID-1");
1142 let cid = 22_u64;
1143
1144 handler.orders_metadata.insert(
1145 client_order_id,
1146 sample_metadata(client_order_id, venue_order_id),
1147 );
1148 handler
1149 .venue_to_client_id
1150 .insert(venue_order_id, client_order_id);
1151 handler.cid_to_client_order_id.insert(cid, client_order_id);
1152
1153 let msg = AxWsOrderFilled {
1154 ts: 1_700_000_001,
1155 tn: 2,
1156 eid: "EID-1".to_string(),
1157 o: sample_order(cid),
1158 xs: AxWsTradeExecution {
1159 tid: "T-1".to_string(),
1160 s: Ustr::from("BTCUSD-PERP"),
1161 q: 100,
1162 p: dec!(50000),
1163 d: AxOrderSide::Buy,
1164 agg: true,
1165 },
1166 };
1167
1168 let messages = handler.handle_order_filled(msg).unwrap();
1169 assert_eq!(messages.len(), 1);
1170 assert!(handler.orders_metadata.get(&client_order_id).is_none());
1171 assert!(handler.venue_to_client_id.get(&venue_order_id).is_none());
1172 assert!(handler.cid_to_client_order_id.get(&cid).is_none());
1173 }
1174
1175 #[rstest]
1176 fn test_handle_order_rejected_cleans_tracking_maps() {
1177 let mut handler = test_handler();
1178
1179 let client_order_id = ClientOrderId::from("CID-33");
1180 let venue_order_id = VenueOrderId::new("OID-1");
1181 let cid = 33_u64;
1182
1183 handler.orders_metadata.insert(
1184 client_order_id,
1185 sample_metadata(client_order_id, venue_order_id),
1186 );
1187 handler
1188 .venue_to_client_id
1189 .insert(venue_order_id, client_order_id);
1190 handler.cid_to_client_order_id.insert(cid, client_order_id);
1191
1192 let msg = AxWsOrderRejected {
1193 ts: 1_700_000_002,
1194 tn: 3,
1195 eid: "EID-3".to_string(),
1196 o: sample_order(cid),
1197 r: Some("rejected".to_string()),
1198 txt: None,
1199 };
1200
1201 let messages = handler.handle_order_rejected(msg);
1202 assert!(messages.is_some());
1203 assert!(handler.orders_metadata.get(&client_order_id).is_none());
1204 assert!(handler.venue_to_client_id.get(&venue_order_id).is_none());
1205 assert!(handler.cid_to_client_order_id.get(&cid).is_none());
1206 }
1207}