Skip to main content

nautilus_binance/futures/websocket/
handler_exec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Futures execution WebSocket handler.
17//!
18//! Implements the two-tier architecture with pending order maps for correlating
19//! WebSocket order updates with the original order context (strategy_id, etc.).
20
21use std::{
22    collections::VecDeque,
23    fmt::Debug,
24    sync::{
25        Arc, RwLock,
26        atomic::{AtomicBool, Ordering},
27    },
28};
29
30use ahash::{AHashMap, AHashSet};
31use nautilus_core::{UUID4, nanos::UnixNanos, time::AtomicTime};
32use nautilus_model::{
33    enums::{AccountType, LiquiditySide},
34    events::{
35        AccountState, OrderAccepted, OrderCanceled, OrderFilled, OrderRejected, OrderUpdated,
36    },
37    identifiers::{
38        AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
39    },
40    types::{AccountBalance, Currency, Money, Price, Quantity},
41};
42use ustr::Ustr;
43
44use super::messages::{
45    BinanceExecutionType, BinanceFuturesAccountUpdateMsg, BinanceFuturesAlgoUpdateMsg,
46    BinanceFuturesExecWsMessage, BinanceFuturesOrderUpdateMsg, ExecHandlerCommand,
47    NautilusExecWsMessage, NautilusWsMessage,
48};
49use crate::{
50    common::{
51        enums::{BinanceAlgoStatus, BinanceProductType},
52        symbol::format_instrument_id,
53    },
54    futures::http::BinanceFuturesInstrument,
55};
56
57/// Data cached for pending place requests to correlate with responses.
58pub type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
59
60/// Data cached for pending cancel requests to correlate with responses.
61pub type CancelRequestData = (
62    ClientOrderId,
63    TraderId,
64    StrategyId,
65    InstrumentId,
66    Option<VenueOrderId>,
67);
68
69/// Data cached for pending modify requests to correlate with responses.
70pub type ModifyRequestData = (
71    ClientOrderId,
72    TraderId,
73    StrategyId,
74    InstrumentId,
75    Option<VenueOrderId>,
76);
77
78/// Binance Futures execution WebSocket handler.
79///
80/// Processes user data stream messages and maintains pending order state
81/// to correlate WebSocket updates with the original order context.
82pub struct BinanceFuturesExecWsFeedHandler {
83    clock: &'static AtomicTime,
84    trader_id: TraderId,
85    account_id: AccountId,
86    account_type: AccountType,
87    product_type: BinanceProductType,
88    signal: Arc<AtomicBool>,
89    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<ExecHandlerCommand>,
90    msg_rx: tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
91    pending_place_requests: AHashMap<ClientOrderId, PlaceRequestData>,
92    pending_cancel_requests: AHashMap<ClientOrderId, CancelRequestData>,
93    pending_modify_requests: AHashMap<ClientOrderId, ModifyRequestData>,
94    active_orders: AHashMap<ClientOrderId, (TraderId, StrategyId, InstrumentId)>,
95    algo_client_order_ids: AHashSet<ClientOrderId>,
96    triggered_algo_order_ids: Arc<RwLock<AHashSet<ClientOrderId>>>,
97    instruments_cache: AHashMap<Ustr, BinanceFuturesInstrument>,
98    message_queue: VecDeque<NautilusExecWsMessage>,
99}
100
101impl Debug for BinanceFuturesExecWsFeedHandler {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct(stringify!(BinanceFuturesExecHandler))
104            .field("trader_id", &self.trader_id)
105            .field("account_id", &self.account_id)
106            .field("pending_place_requests", &self.pending_place_requests.len())
107            .field(
108                "pending_cancel_requests",
109                &self.pending_cancel_requests.len(),
110            )
111            .field("active_orders", &self.active_orders.len())
112            .field("instruments_cache", &self.instruments_cache.len())
113            .finish_non_exhaustive()
114    }
115}
116
117impl BinanceFuturesExecWsFeedHandler {
118    /// Creates a new [`BinanceFuturesExecWsFeedHandler`] instance.
119    #[allow(clippy::too_many_arguments)]
120    pub fn new(
121        clock: &'static AtomicTime,
122        trader_id: TraderId,
123        account_id: AccountId,
124        account_type: AccountType,
125        product_type: BinanceProductType,
126        signal: Arc<AtomicBool>,
127        triggered_algo_order_ids: Arc<RwLock<AHashSet<ClientOrderId>>>,
128        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<ExecHandlerCommand>,
129        msg_rx: tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>,
130    ) -> Self {
131        Self {
132            clock,
133            trader_id,
134            account_id,
135            account_type,
136            product_type,
137            signal,
138            cmd_rx,
139            msg_rx,
140            pending_place_requests: AHashMap::new(),
141            pending_cancel_requests: AHashMap::new(),
142            pending_modify_requests: AHashMap::new(),
143            active_orders: AHashMap::new(),
144            algo_client_order_ids: AHashSet::new(),
145            triggered_algo_order_ids,
146            instruments_cache: AHashMap::new(),
147            message_queue: VecDeque::new(),
148        }
149    }
150
151    /// Processes commands and messages, returning the next output event.
152    pub async fn next(&mut self) -> Option<NautilusExecWsMessage> {
153        loop {
154            if self.signal.load(Ordering::Relaxed) {
155                return None;
156            }
157
158            // Return queued messages first
159            if let Some(msg) = self.message_queue.pop_front() {
160                return Some(msg);
161            }
162
163            tokio::select! {
164                Some(cmd) = self.cmd_rx.recv() => {
165                    self.handle_command(cmd);
166                }
167                Some(msg) = self.msg_rx.recv() => {
168                    if let Some(event) = self.handle_message(msg) {
169                        return Some(event);
170                    }
171                }
172                else => {
173                    return None;
174                }
175            }
176        }
177    }
178
179    fn handle_command(&mut self, cmd: ExecHandlerCommand) {
180        match cmd {
181            ExecHandlerCommand::SetClient(_) => {
182                // WebSocket client is managed by the outer layer
183            }
184            ExecHandlerCommand::Disconnect => {
185                // WebSocket client is managed by the outer layer
186            }
187            ExecHandlerCommand::InitializeInstruments(instruments) => {
188                for inst in instruments {
189                    self.instruments_cache.insert(inst.symbol(), inst);
190                }
191            }
192            ExecHandlerCommand::UpdateInstrument(instrument) => {
193                self.instruments_cache
194                    .insert(instrument.symbol(), instrument);
195            }
196            ExecHandlerCommand::Subscribe { .. } => {
197                // Subscriptions are managed by the outer layer
198            }
199            ExecHandlerCommand::RegisterOrder {
200                client_order_id,
201                trader_id,
202                strategy_id,
203                instrument_id,
204            } => {
205                let data = (client_order_id, trader_id, strategy_id, instrument_id);
206                self.pending_place_requests.insert(client_order_id, data);
207                self.active_orders
208                    .insert(client_order_id, (trader_id, strategy_id, instrument_id));
209            }
210            ExecHandlerCommand::RegisterCancel {
211                client_order_id,
212                trader_id,
213                strategy_id,
214                instrument_id,
215                venue_order_id,
216            } => {
217                let data = (
218                    client_order_id,
219                    trader_id,
220                    strategy_id,
221                    instrument_id,
222                    venue_order_id,
223                );
224                self.pending_cancel_requests.insert(client_order_id, data);
225            }
226            ExecHandlerCommand::RegisterModify {
227                client_order_id,
228                trader_id,
229                strategy_id,
230                instrument_id,
231                venue_order_id,
232            } => {
233                let data = (
234                    client_order_id,
235                    trader_id,
236                    strategy_id,
237                    instrument_id,
238                    venue_order_id,
239                );
240                self.pending_modify_requests.insert(client_order_id, data);
241            }
242        }
243    }
244
245    fn handle_message(&mut self, msg: NautilusWsMessage) -> Option<NautilusExecWsMessage> {
246        match msg {
247            NautilusWsMessage::ExecRaw(exec_msg) => self.handle_exec_message(exec_msg),
248            NautilusWsMessage::Reconnected => Some(NautilusExecWsMessage::Reconnected),
249            NautilusWsMessage::Error(err) => {
250                log::error!(
251                    "User data stream WebSocket error: code={}, msg={}",
252                    err.code,
253                    err.msg
254                );
255                None
256            }
257            NautilusWsMessage::Data(_) | NautilusWsMessage::Exec(_) => None,
258        }
259    }
260
261    fn handle_exec_message(
262        &mut self,
263        msg: BinanceFuturesExecWsMessage,
264    ) -> Option<NautilusExecWsMessage> {
265        match msg {
266            BinanceFuturesExecWsMessage::OrderUpdate(update) => self.handle_order_update(&update),
267            BinanceFuturesExecWsMessage::AlgoUpdate(update) => self.handle_algo_update(&update),
268            BinanceFuturesExecWsMessage::AccountUpdate(update) => {
269                self.handle_account_update(&update)
270            }
271            BinanceFuturesExecWsMessage::MarginCall(mc) => {
272                log::warn!(
273                    "Margin call: cross_wallet_balance={}, positions_at_risk={}",
274                    mc.cross_wallet_balance,
275                    mc.positions.len()
276                );
277                None
278            }
279            BinanceFuturesExecWsMessage::AccountConfigUpdate(cfg) => {
280                if let Some(ref lc) = cfg.leverage_config {
281                    log::info!(
282                        "Account config update: symbol={}, leverage={}",
283                        lc.symbol,
284                        lc.leverage
285                    );
286                }
287                None
288            }
289            BinanceFuturesExecWsMessage::ListenKeyExpired => {
290                log::warn!("Listen key expired");
291                Some(NautilusExecWsMessage::ListenKeyExpired)
292            }
293        }
294    }
295
296    fn handle_order_update(
297        &mut self,
298        msg: &BinanceFuturesOrderUpdateMsg,
299    ) -> Option<NautilusExecWsMessage> {
300        let order_data = &msg.order;
301        let ts_event = UnixNanos::from((msg.event_time * 1_000_000) as u64);
302        let ts_init = self.clock.get_time_ns();
303
304        let client_order_id = ClientOrderId::new(&order_data.client_order_id);
305        let venue_order_id = VenueOrderId::new(order_data.order_id.to_string());
306
307        // Look up order context from pending/active maps, falling back to EXTERNAL
308        let (trader_id, strategy_id, instrument_id) =
309            self.get_order_context(&client_order_id, &order_data.symbol);
310
311        match order_data.execution_type {
312            BinanceExecutionType::New => {
313                // Algo orders emit OrderAccepted via ALGO_UPDATE NEW, skip here to avoid duplicate
314                if self.algo_client_order_ids.contains(&client_order_id) {
315                    log::debug!(
316                        "Skipping OrderAccepted for algo order: client_order_id={client_order_id}"
317                    );
318                    return None;
319                }
320
321                // Move from pending to active on acceptance
322                self.pending_place_requests.remove(&client_order_id);
323
324                let event = OrderAccepted::new(
325                    trader_id,
326                    strategy_id,
327                    instrument_id,
328                    client_order_id,
329                    venue_order_id,
330                    self.account_id,
331                    UUID4::new(),
332                    ts_event,
333                    ts_init,
334                    false,
335                );
336
337                Some(NautilusExecWsMessage::OrderAccepted(event))
338            }
339            BinanceExecutionType::Canceled | BinanceExecutionType::Expired => {
340                // Clean up tracking maps
341                self.pending_cancel_requests.remove(&client_order_id);
342                self.active_orders.remove(&client_order_id);
343
344                let event = OrderCanceled::new(
345                    trader_id,
346                    strategy_id,
347                    instrument_id,
348                    client_order_id,
349                    UUID4::new(),
350                    ts_event,
351                    ts_init,
352                    false,
353                    Some(venue_order_id),
354                    Some(self.account_id),
355                );
356
357                Some(NautilusExecWsMessage::OrderCanceled(event))
358            }
359            BinanceExecutionType::Trade => self.handle_trade_fill(
360                msg,
361                trader_id,
362                strategy_id,
363                instrument_id,
364                ts_event,
365                ts_init,
366            ),
367            BinanceExecutionType::Amendment => {
368                self.pending_modify_requests.remove(&client_order_id);
369
370                // Look up precision from instrument cache
371                let symbol_key = Ustr::from(&order_data.symbol);
372                let (price_precision, size_precision) =
373                    if let Some(inst) = self.instruments_cache.get(&symbol_key) {
374                        (inst.price_precision(), inst.quantity_precision())
375                    } else {
376                        log::warn!(
377                            "Instrument not found for amendment: {}, using default precision",
378                            order_data.symbol
379                        );
380                        (8, 8)
381                    };
382
383                let quantity: f64 = order_data.original_qty.parse().unwrap_or(0.0);
384                let price: f64 = order_data.original_price.parse().unwrap_or(0.0);
385
386                let event = OrderUpdated::new(
387                    trader_id,
388                    strategy_id,
389                    instrument_id,
390                    client_order_id,
391                    Quantity::new(quantity, size_precision as u8),
392                    UUID4::new(),
393                    ts_event,
394                    ts_init,
395                    false,
396                    Some(venue_order_id),
397                    Some(self.account_id),
398                    Some(Price::new(price, price_precision as u8)),
399                    None,
400                    None,
401                );
402
403                Some(NautilusExecWsMessage::OrderUpdated(event))
404            }
405            BinanceExecutionType::Calculated => {
406                log::warn!(
407                    "Calculated execution (liquidation/ADL): symbol={}, client_order_id={}",
408                    order_data.symbol,
409                    order_data.client_order_id
410                );
411                None
412            }
413        }
414    }
415
416    fn handle_trade_fill(
417        &mut self,
418        msg: &BinanceFuturesOrderUpdateMsg,
419        trader_id: TraderId,
420        strategy_id: StrategyId,
421        instrument_id: InstrumentId,
422        ts_event: UnixNanos,
423        ts_init: UnixNanos,
424    ) -> Option<NautilusExecWsMessage> {
425        let order_data = &msg.order;
426        let client_order_id = ClientOrderId::new(&order_data.client_order_id);
427        let venue_order_id = VenueOrderId::new(order_data.order_id.to_string());
428
429        // Look up precision from instrument cache
430        let symbol_key = Ustr::from(&order_data.symbol);
431        let Some(inst) = self.instruments_cache.get(&symbol_key) else {
432            log::error!(
433                "Instrument not found for fill: {}, skipping to avoid precision mismatch",
434                order_data.symbol
435            );
436            return None;
437        };
438        let price_precision = inst.price_precision();
439        let size_precision = inst.quantity_precision();
440
441        let last_qty: f64 = order_data.last_filled_qty.parse().unwrap_or(0.0);
442        let last_px: f64 = order_data.last_filled_price.parse().unwrap_or(0.0);
443        let cum_qty: f64 = order_data.cumulative_filled_qty.parse().unwrap_or(0.0);
444        let original_qty: f64 = order_data.original_qty.parse().unwrap_or(0.0);
445        let leaves_qty = original_qty - cum_qty;
446        let commission: f64 = order_data
447            .commission
448            .as_deref()
449            .unwrap_or("0")
450            .parse()
451            .unwrap_or(0.0);
452
453        let commission_currency = order_data
454            .commission_asset
455            .as_ref()
456            .map_or_else(Currency::USDT, |a| Currency::from(a.as_str()));
457
458        let liquidity_side = if order_data.is_maker {
459            LiquiditySide::Maker
460        } else {
461            LiquiditySide::Taker
462        };
463
464        let event = OrderFilled::new(
465            trader_id,
466            strategy_id,
467            instrument_id,
468            client_order_id,
469            venue_order_id,
470            self.account_id,
471            TradeId::new(order_data.trade_id.to_string()),
472            order_data.side.into(),
473            order_data.order_type.into(),
474            Quantity::new(last_qty, size_precision as u8),
475            Price::new(last_px, price_precision as u8),
476            commission_currency,
477            liquidity_side,
478            UUID4::new(),
479            ts_event,
480            ts_init,
481            false,
482            None,
483            Some(Money::new(commission, commission_currency)),
484        );
485
486        // Clean up if fully filled
487        if leaves_qty <= 0.0 {
488            self.active_orders.remove(&client_order_id);
489            log::debug!(
490                "Order fully filled: client_order_id={client_order_id}, venue_order_id={venue_order_id}"
491            );
492        }
493
494        Some(NautilusExecWsMessage::OrderFilled(event))
495    }
496
497    fn handle_account_update(
498        &mut self,
499        msg: &BinanceFuturesAccountUpdateMsg,
500    ) -> Option<NautilusExecWsMessage> {
501        let ts_event = UnixNanos::from((msg.event_time * 1_000_000) as u64);
502
503        let balances: Vec<AccountBalance> = msg
504            .account
505            .balances
506            .iter()
507            .filter_map(|b| {
508                let wallet_balance: f64 = b.wallet_balance.parse().unwrap_or(0.0);
509                let cross_wallet: f64 = b.cross_wallet_balance.parse().unwrap_or(0.0);
510                let locked = wallet_balance - cross_wallet;
511
512                if wallet_balance == 0.0 {
513                    return None;
514                }
515
516                let currency = Currency::from(&b.asset);
517                Some(AccountBalance::new(
518                    Money::new(wallet_balance, currency),
519                    Money::new(locked.max(0.0), currency),
520                    Money::new(cross_wallet, currency),
521                ))
522            })
523            .collect();
524
525        if balances.is_empty() {
526            return None;
527        }
528
529        let event = AccountState::new(
530            self.account_id,
531            self.account_type,
532            balances,
533            vec![], // Margins handled separately
534            true,   // is_reported
535            UUID4::new(),
536            ts_event,
537            self.clock.get_time_ns(),
538            None, // base_currency
539        );
540
541        Some(NautilusExecWsMessage::AccountUpdate(event))
542    }
543
544    fn handle_algo_update(
545        &mut self,
546        msg: &BinanceFuturesAlgoUpdateMsg,
547    ) -> Option<NautilusExecWsMessage> {
548        let algo_data = &msg.algo_order;
549        let ts_event = UnixNanos::from((msg.event_time * 1_000_000) as u64);
550        let ts_init = self.clock.get_time_ns();
551
552        let client_order_id = ClientOrderId::new(&algo_data.client_algo_id);
553        let venue_order_id = algo_data.actual_order_id.as_ref().map_or_else(
554            || VenueOrderId::new(algo_data.algo_id.to_string()),
555            |id| VenueOrderId::new(id.clone()),
556        );
557        let (trader_id, strategy_id, instrument_id) =
558            self.get_order_context(&client_order_id, algo_data.symbol.as_str());
559
560        match algo_data.algo_status {
561            BinanceAlgoStatus::New => {
562                // Track so ORDER_TRADE_UPDATE NEW skips duplicate OrderAccepted
563                self.algo_client_order_ids.insert(client_order_id);
564                self.pending_place_requests.remove(&client_order_id);
565
566                let event = OrderAccepted::new(
567                    trader_id,
568                    strategy_id,
569                    instrument_id,
570                    client_order_id,
571                    venue_order_id,
572                    self.account_id,
573                    UUID4::new(),
574                    ts_event,
575                    ts_init,
576                    false,
577                );
578
579                Some(NautilusExecWsMessage::OrderAccepted(event))
580            }
581            BinanceAlgoStatus::Triggering => {
582                log::info!(
583                    "Algo order triggering: client_order_id={}, algo_id={}, symbol={}",
584                    algo_data.client_algo_id,
585                    algo_data.algo_id,
586                    algo_data.symbol
587                );
588                None
589            }
590            BinanceAlgoStatus::Triggered => {
591                // Track for cancel routing: triggered orders use regular cancel endpoint
592                self.triggered_algo_order_ids
593                    .write()
594                    .expect("triggered_algo_order_ids lock poisoned")
595                    .insert(client_order_id);
596
597                log::info!(
598                    "Algo order triggered: client_order_id={}, algo_id={}, actual_order_id={:?}, symbol={}",
599                    algo_data.client_algo_id,
600                    algo_data.algo_id,
601                    algo_data.actual_order_id,
602                    algo_data.symbol
603                );
604
605                let Some(actual_order_id) = &algo_data.actual_order_id else {
606                    log::warn!(
607                        "Algo order triggered but no actual_order_id: client_order_id={client_order_id}"
608                    );
609                    return None;
610                };
611
612                let new_venue_order_id = VenueOrderId::new(actual_order_id.clone());
613
614                let symbol_key = Ustr::from(algo_data.symbol.as_str());
615                let size_precision =
616                    self.instruments_cache
617                        .get(&symbol_key)
618                        .map_or(8, |inst| inst.quantity_precision()) as u8;
619
620                let quantity: f64 = algo_data.quantity.parse().unwrap_or(0.0);
621
622                let event = OrderUpdated::new(
623                    trader_id,
624                    strategy_id,
625                    instrument_id,
626                    client_order_id,
627                    Quantity::new(quantity, size_precision),
628                    UUID4::new(),
629                    ts_event,
630                    ts_init,
631                    false,
632                    Some(new_venue_order_id),
633                    Some(self.account_id),
634                    None,
635                    None,
636                    None,
637                );
638
639                Some(NautilusExecWsMessage::OrderUpdated(event))
640            }
641            BinanceAlgoStatus::Canceled => {
642                self.algo_client_order_ids.remove(&client_order_id);
643                self.triggered_algo_order_ids
644                    .write()
645                    .expect("triggered_algo_order_ids lock poisoned")
646                    .remove(&client_order_id);
647                self.pending_cancel_requests.remove(&client_order_id);
648                self.active_orders.remove(&client_order_id);
649
650                let event = OrderCanceled::new(
651                    trader_id,
652                    strategy_id,
653                    instrument_id,
654                    client_order_id,
655                    UUID4::new(),
656                    ts_event,
657                    ts_init,
658                    false,
659                    Some(venue_order_id),
660                    Some(self.account_id),
661                );
662
663                Some(NautilusExecWsMessage::OrderCanceled(event))
664            }
665            BinanceAlgoStatus::Expired => {
666                self.algo_client_order_ids.remove(&client_order_id);
667                self.triggered_algo_order_ids
668                    .write()
669                    .expect("triggered_algo_order_ids lock poisoned")
670                    .remove(&client_order_id);
671                self.active_orders.remove(&client_order_id);
672
673                log::info!(
674                    "Algo order expired: client_order_id={}, algo_id={}",
675                    algo_data.client_algo_id,
676                    algo_data.algo_id
677                );
678
679                // Treat expired as canceled for Nautilus
680                let event = OrderCanceled::new(
681                    trader_id,
682                    strategy_id,
683                    instrument_id,
684                    client_order_id,
685                    UUID4::new(),
686                    ts_event,
687                    ts_init,
688                    false,
689                    Some(venue_order_id),
690                    Some(self.account_id),
691                );
692
693                Some(NautilusExecWsMessage::OrderCanceled(event))
694            }
695            BinanceAlgoStatus::Rejected => {
696                self.algo_client_order_ids.remove(&client_order_id);
697                self.triggered_algo_order_ids
698                    .write()
699                    .expect("triggered_algo_order_ids lock poisoned")
700                    .remove(&client_order_id);
701                self.pending_place_requests.remove(&client_order_id);
702                self.active_orders.remove(&client_order_id);
703
704                log::warn!(
705                    "Algo order rejected: client_order_id={}, algo_id={}",
706                    algo_data.client_algo_id,
707                    algo_data.algo_id
708                );
709
710                // Binance doesn't provide rejection reason in ALGO_UPDATE
711                let event = OrderRejected::new(
712                    trader_id,
713                    strategy_id,
714                    instrument_id,
715                    client_order_id,
716                    self.account_id,
717                    Ustr::from("REJECTED"),
718                    UUID4::new(),
719                    ts_event,
720                    ts_init,
721                    false,
722                    false,
723                );
724
725                Some(NautilusExecWsMessage::OrderRejected(event))
726            }
727            BinanceAlgoStatus::Finished => {
728                self.algo_client_order_ids.remove(&client_order_id);
729                self.triggered_algo_order_ids
730                    .write()
731                    .expect("triggered_algo_order_ids lock poisoned")
732                    .remove(&client_order_id);
733                self.active_orders.remove(&client_order_id);
734
735                // Check if the order was filled or just finished (canceled after trigger)
736                let executed_qty: f64 = algo_data
737                    .executed_qty
738                    .as_ref()
739                    .and_then(|q| q.parse().ok())
740                    .unwrap_or(0.0);
741
742                if executed_qty > 0.0 {
743                    log::debug!(
744                        "Algo order finished with fills: client_order_id={}, executed_qty={}",
745                        algo_data.client_algo_id,
746                        executed_qty
747                    );
748                    // Fill events are emitted via ORDER_TRADE_UPDATE, not ALGO_UPDATE
749                } else {
750                    log::debug!(
751                        "Algo order finished without fills: client_order_id={}",
752                        algo_data.client_algo_id
753                    );
754                }
755                None
756            }
757            BinanceAlgoStatus::Unknown => {
758                log::warn!(
759                    "Unknown algo status received: client_order_id={}, algo_id={}",
760                    algo_data.client_algo_id,
761                    algo_data.algo_id
762                );
763                None
764            }
765        }
766    }
767
768    /// Returns whether a client order ID is a triggered algo order.
769    ///
770    /// This is used to determine which cancel endpoint to use:
771    /// - Non-triggered algo orders use `DELETE /fapi/v1/algoOrder`
772    /// - Triggered algo orders use regular `DELETE /fapi/v1/order`
773    ///
774    /// # Panics
775    ///
776    /// Panics if `triggered_algo_order_ids` lock is poisoned.
777    #[must_use]
778    pub fn is_triggered_algo_order(&self, client_order_id: &ClientOrderId) -> bool {
779        self.triggered_algo_order_ids
780            .read()
781            .expect("triggered_algo_order_ids lock poisoned")
782            .contains(client_order_id)
783    }
784
785    /// Looks up order context from pending/active maps.
786    ///
787    /// Falls back to EXTERNAL strategy for untracked orders (e.g., orders from before
788    /// restart or created externally). Uses instruments cache when available, otherwise
789    /// constructs instrument ID from the symbol using the configured product type.
790    fn get_order_context(
791        &self,
792        client_order_id: &ClientOrderId,
793        symbol: &str,
794    ) -> (TraderId, StrategyId, InstrumentId) {
795        // First check pending place requests
796        if let Some((_, trader_id, strategy_id, instrument_id)) =
797            self.pending_place_requests.get(client_order_id)
798        {
799            return (*trader_id, *strategy_id, *instrument_id);
800        }
801
802        // Then check active orders
803        if let Some((trader_id, strategy_id, instrument_id)) =
804            self.active_orders.get(client_order_id)
805        {
806            return (*trader_id, *strategy_id, *instrument_id);
807        }
808
809        // Fall back to EXTERNAL for untracked orders (restart, external creation)
810        let symbol_ustr = Ustr::from(symbol);
811
812        // Prefer instruments cache for correct instrument ID
813        let instrument_id = if let Some(instrument) = self.instruments_cache.get(&symbol_ustr) {
814            instrument.id()
815        } else {
816            // Ultimate fallback: construct from symbol using product type
817            log::warn!("Instrument not in cache for {symbol}, constructing ID from product type");
818            format_instrument_id(&symbol_ustr, self.product_type)
819        };
820
821        log::debug!(
822            "Order context not found for {client_order_id}, using EXTERNAL with {instrument_id}"
823        );
824        (self.trader_id, StrategyId::new("EXTERNAL"), instrument_id)
825    }
826}