nautilus_model/orderbook/
book.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! A performant, generic, multi-purpose order book.
17
18use std::{collections::HashSet, fmt::Display};
19
20use indexmap::IndexMap;
21use nautilus_core::UnixNanos;
22use rust_decimal::Decimal;
23
24use super::{
25    aggregation::pre_process_order, analysis, display::pprint_book, level::BookLevel,
26    own::OwnOrderBook,
27};
28use crate::{
29    data::{BookOrder, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick},
30    enums::{BookAction, BookType, OrderSide, OrderSideSpecified, OrderStatus},
31    identifiers::InstrumentId,
32    orderbook::{InvalidBookOperation, ladder::BookLadder},
33    types::{
34        Price, Quantity,
35        price::{PRICE_ERROR, PRICE_UNDEF},
36    },
37};
38
39/// Provides a high-performance, versatile order book.
40///
41/// Maintains buy (bid) and sell (ask) orders in price-time priority, supporting multiple
42/// market data formats:
43/// - L3 (MBO): Market By Order - tracks individual orders with unique IDs.
44/// - L2 (MBP): Market By Price - aggregates orders at each price level.
45/// - L1 (MBP): Top-of-Book - maintains only the best bid and ask prices.
46#[derive(Clone, Debug)]
47#[cfg_attr(
48    feature = "python",
49    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
50)]
51pub struct OrderBook {
52    /// The instrument ID for the order book.
53    pub instrument_id: InstrumentId,
54    /// The order book type (MBP types will aggregate orders).
55    pub book_type: BookType,
56    /// The last event sequence number for the order book.
57    pub sequence: u64,
58    /// The timestamp of the last event applied to the order book.
59    pub ts_last: UnixNanos,
60    /// The current count of updates applied to the order book.
61    pub update_count: u64,
62    pub(crate) bids: BookLadder,
63    pub(crate) asks: BookLadder,
64}
65
66impl PartialEq for OrderBook {
67    fn eq(&self, other: &Self) -> bool {
68        self.instrument_id == other.instrument_id && self.book_type == other.book_type
69    }
70}
71
72impl Eq for OrderBook {}
73
74impl Display for OrderBook {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(
77            f,
78            "{}(instrument_id={}, book_type={}, update_count={})",
79            stringify!(OrderBook),
80            self.instrument_id,
81            self.book_type,
82            self.update_count,
83        )
84    }
85}
86
87impl OrderBook {
88    /// Creates a new [`OrderBook`] instance.
89    #[must_use]
90    pub fn new(instrument_id: InstrumentId, book_type: BookType) -> Self {
91        Self {
92            instrument_id,
93            book_type,
94            sequence: 0,
95            ts_last: UnixNanos::default(),
96            update_count: 0,
97            bids: BookLadder::new(OrderSideSpecified::Buy, book_type),
98            asks: BookLadder::new(OrderSideSpecified::Sell, book_type),
99        }
100    }
101
102    /// Resets the order book to its initial empty state.
103    pub fn reset(&mut self) {
104        self.bids.clear();
105        self.asks.clear();
106        self.sequence = 0;
107        self.ts_last = UnixNanos::default();
108        self.update_count = 0;
109    }
110
111    /// Adds an order to the book after preprocessing based on book type.
112    pub fn add(&mut self, order: BookOrder, flags: u8, sequence: u64, ts_event: UnixNanos) {
113        let order = pre_process_order(self.book_type, order, flags);
114        match order.side.as_specified() {
115            OrderSideSpecified::Buy => self.bids.add(order),
116            OrderSideSpecified::Sell => self.asks.add(order),
117        }
118
119        self.increment(sequence, ts_event);
120    }
121
122    /// Updates an existing order in the book after preprocessing based on book type.
123    pub fn update(&mut self, order: BookOrder, flags: u8, sequence: u64, ts_event: UnixNanos) {
124        let order = pre_process_order(self.book_type, order, flags);
125        match order.side.as_specified() {
126            OrderSideSpecified::Buy => self.bids.update(order),
127            OrderSideSpecified::Sell => self.asks.update(order),
128        }
129
130        self.increment(sequence, ts_event);
131    }
132
133    /// Deletes an order from the book after preprocessing based on book type.
134    pub fn delete(&mut self, order: BookOrder, flags: u8, sequence: u64, ts_event: UnixNanos) {
135        let order = pre_process_order(self.book_type, order, flags);
136        match order.side.as_specified() {
137            OrderSideSpecified::Buy => self.bids.delete(order, sequence, ts_event),
138            OrderSideSpecified::Sell => self.asks.delete(order, sequence, ts_event),
139        }
140
141        self.increment(sequence, ts_event);
142    }
143
144    /// Clears all orders from both sides of the book.
145    pub fn clear(&mut self, sequence: u64, ts_event: UnixNanos) {
146        self.bids.clear();
147        self.asks.clear();
148        self.increment(sequence, ts_event);
149    }
150
151    /// Clears all bid orders from the book.
152    pub fn clear_bids(&mut self, sequence: u64, ts_event: UnixNanos) {
153        self.bids.clear();
154        self.increment(sequence, ts_event);
155    }
156
157    /// Clears all ask orders from the book.
158    pub fn clear_asks(&mut self, sequence: u64, ts_event: UnixNanos) {
159        self.asks.clear();
160        self.increment(sequence, ts_event);
161    }
162
163    /// Removes overlapped bid/ask levels when the book is strictly crossed (best bid > best ask)
164    ///
165    /// - Acts only when both sides exist and the book is crossed.
166    /// - Deletes by removing whole price levels via the ladder API to preserve invariants.
167    /// - `side=None` or `NoOrderSide` clears both overlapped ranges (conservative, may widen spread).
168    /// - `side=Buy` clears crossed bids only; side=Sell clears crossed asks only.
169    /// - Returns removed price levels (crossed bids first, then crossed asks), or None if nothing removed.
170    pub fn clear_stale_levels(&mut self, side: Option<OrderSide>) -> Option<Vec<BookLevel>> {
171        if self.book_type == BookType::L1_MBP {
172            // L1_MBP maintains a single top-of-book price per side; nothing to do
173            return None;
174        }
175
176        let (Some(best_bid), Some(best_ask)) = (self.best_bid_price(), self.best_ask_price())
177        else {
178            return None;
179        };
180
181        if best_bid <= best_ask {
182            return None;
183        }
184
185        let mut removed_levels = Vec::new();
186        let mut clear_bids = false;
187        let mut clear_asks = false;
188
189        match side {
190            Some(OrderSide::Buy) => clear_bids = true,
191            Some(OrderSide::Sell) => clear_asks = true,
192            _ => {
193                clear_bids = true;
194                clear_asks = true;
195            }
196        }
197
198        // Collect prices to remove for asks (prices <= best_bid)
199        let mut ask_prices_to_remove = Vec::new();
200        if clear_asks {
201            for (bp, _level) in self.asks.levels.iter() {
202                if bp.value <= best_bid {
203                    ask_prices_to_remove.push(*bp);
204                } else {
205                    break;
206                }
207            }
208        }
209
210        // Collect prices to remove for bids (prices >= best_ask)
211        let mut bid_prices_to_remove = Vec::new();
212        if clear_bids {
213            for (bp, _level) in self.bids.levels.iter() {
214                if bp.value >= best_ask {
215                    bid_prices_to_remove.push(*bp);
216                } else {
217                    break;
218                }
219            }
220        }
221
222        if ask_prices_to_remove.is_empty() && bid_prices_to_remove.is_empty() {
223            return None;
224        }
225
226        let bid_count = bid_prices_to_remove.len();
227        let ask_count = ask_prices_to_remove.len();
228
229        // Remove and collect bid levels
230        for price in bid_prices_to_remove {
231            if let Some(level) = self.bids.remove_level(price) {
232                removed_levels.push(level);
233            }
234        }
235
236        // Remove and collect ask levels
237        for price in ask_prices_to_remove {
238            if let Some(level) = self.asks.remove_level(price) {
239                removed_levels.push(level);
240            }
241        }
242
243        self.increment(self.sequence, self.ts_last);
244
245        if removed_levels.is_empty() {
246            None
247        } else {
248            let total_orders: usize = removed_levels.iter().map(|level| level.orders.len()).sum();
249
250            log::warn!(
251                "Removed {} stale/crossed levels (instrument_id={}, bid_levels={}, ask_levels={}, total_orders={}), book was crossed with best_bid={} > best_ask={}",
252                removed_levels.len(),
253                self.instrument_id,
254                bid_count,
255                ask_count,
256                total_orders,
257                best_bid,
258                best_ask
259            );
260
261            Some(removed_levels)
262        }
263    }
264
265    /// Applies a single order book delta operation.
266    pub fn apply_delta(&mut self, delta: &OrderBookDelta) {
267        let order = delta.order;
268        let flags = delta.flags;
269        let sequence = delta.sequence;
270        let ts_event = delta.ts_event;
271        match delta.action {
272            BookAction::Add => self.add(order, flags, sequence, ts_event),
273            BookAction::Update => self.update(order, flags, sequence, ts_event),
274            BookAction::Delete => self.delete(order, flags, sequence, ts_event),
275            BookAction::Clear => self.clear(sequence, ts_event),
276        }
277    }
278
279    /// Applies multiple order book delta operations.
280    pub fn apply_deltas(&mut self, deltas: &OrderBookDeltas) {
281        for delta in &deltas.deltas {
282            self.apply_delta(delta);
283        }
284    }
285
286    /// Replaces current book state with a depth snapshot.
287    pub fn apply_depth(&mut self, depth: &OrderBookDepth10) {
288        self.bids.clear();
289        self.asks.clear();
290
291        for order in depth.bids {
292            // Skip padding entries
293            if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
294                continue;
295            }
296
297            debug_assert_eq!(
298                order.side,
299                OrderSide::Buy,
300                "Bid order must have Buy side, was {:?}",
301                order.side
302            );
303
304            let order = pre_process_order(self.book_type, order, depth.flags);
305            self.bids.add(order);
306        }
307
308        for order in depth.asks {
309            // Skip padding entries
310            if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
311                continue;
312            }
313
314            debug_assert_eq!(
315                order.side,
316                OrderSide::Sell,
317                "Ask order must have Sell side, was {:?}",
318                order.side
319            );
320
321            let order = pre_process_order(self.book_type, order, depth.flags);
322            self.asks.add(order);
323        }
324
325        self.increment(depth.sequence, depth.ts_event);
326    }
327
328    /// Returns an iterator over bid price levels.
329    pub fn bids(&self, depth: Option<usize>) -> impl Iterator<Item = &BookLevel> {
330        self.bids.levels.values().take(depth.unwrap_or(usize::MAX))
331    }
332
333    /// Returns an iterator over ask price levels.
334    pub fn asks(&self, depth: Option<usize>) -> impl Iterator<Item = &BookLevel> {
335        self.asks.levels.values().take(depth.unwrap_or(usize::MAX))
336    }
337
338    /// Returns bid price levels as a map of price to size.
339    pub fn bids_as_map(&self, depth: Option<usize>) -> IndexMap<Decimal, Decimal> {
340        self.bids(depth)
341            .map(|level| (level.price.value.as_decimal(), level.size_decimal()))
342            .collect()
343    }
344
345    /// Returns ask price levels as a map of price to size.
346    pub fn asks_as_map(&self, depth: Option<usize>) -> IndexMap<Decimal, Decimal> {
347        self.asks(depth)
348            .map(|level| (level.price.value.as_decimal(), level.size_decimal()))
349            .collect()
350    }
351
352    /// Groups bid quantities by price into buckets, limited by depth.
353    pub fn group_bids(
354        &self,
355        group_size: Decimal,
356        depth: Option<usize>,
357    ) -> IndexMap<Decimal, Decimal> {
358        group_levels(self.bids(None), group_size, depth, true)
359    }
360
361    /// Groups ask quantities by price into buckets, limited by depth.
362    pub fn group_asks(
363        &self,
364        group_size: Decimal,
365        depth: Option<usize>,
366    ) -> IndexMap<Decimal, Decimal> {
367        group_levels(self.asks(None), group_size, depth, false)
368    }
369
370    /// Maps bid prices to total public size per level, excluding own orders up to a depth limit.
371    ///
372    /// With `own_book`, subtracts own order sizes, filtered by `status` if provided.
373    /// Uses `accepted_buffer_ns` to include only orders accepted at least that many
374    /// nanoseconds before `now` (defaults to now).
375    pub fn bids_filtered_as_map(
376        &self,
377        depth: Option<usize>,
378        own_book: Option<&OwnOrderBook>,
379        status: Option<HashSet<OrderStatus>>,
380        accepted_buffer_ns: Option<u64>,
381        now: Option<u64>,
382    ) -> IndexMap<Decimal, Decimal> {
383        let mut public_map = self
384            .bids(depth)
385            .map(|level| (level.price.value.as_decimal(), level.size_decimal()))
386            .collect::<IndexMap<Decimal, Decimal>>();
387
388        if let Some(own_book) = own_book {
389            filter_quantities(
390                &mut public_map,
391                own_book.bid_quantity(status, None, None, accepted_buffer_ns, now),
392            );
393        }
394
395        public_map
396    }
397
398    /// Maps ask prices to total public size per level, excluding own orders up to a depth limit.
399    ///
400    /// With `own_book`, subtracts own order sizes, filtered by `status` if provided.
401    /// Uses `accepted_buffer_ns` to include only orders accepted at least that many
402    /// nanoseconds before `now` (defaults to now).
403    pub fn asks_filtered_as_map(
404        &self,
405        depth: Option<usize>,
406        own_book: Option<&OwnOrderBook>,
407        status: Option<HashSet<OrderStatus>>,
408        accepted_buffer_ns: Option<u64>,
409        now: Option<u64>,
410    ) -> IndexMap<Decimal, Decimal> {
411        let mut public_map = self
412            .asks(depth)
413            .map(|level| (level.price.value.as_decimal(), level.size_decimal()))
414            .collect::<IndexMap<Decimal, Decimal>>();
415
416        if let Some(own_book) = own_book {
417            filter_quantities(
418                &mut public_map,
419                own_book.ask_quantity(status, None, None, accepted_buffer_ns, now),
420            );
421        }
422
423        public_map
424    }
425
426    /// Groups bid quantities into price buckets, truncating to a maximum depth, excluding own orders.
427    ///
428    /// With `own_book`, subtracts own order sizes, filtered by `status` if provided.
429    /// Uses `accepted_buffer_ns` to include only orders accepted at least that many
430    /// nanoseconds before `now` (defaults to now).
431    pub fn group_bids_filtered(
432        &self,
433        group_size: Decimal,
434        depth: Option<usize>,
435        own_book: Option<&OwnOrderBook>,
436        status: Option<HashSet<OrderStatus>>,
437        accepted_buffer_ns: Option<u64>,
438        now: Option<u64>,
439    ) -> IndexMap<Decimal, Decimal> {
440        let mut public_map = group_levels(self.bids(None), group_size, depth, true);
441
442        if let Some(own_book) = own_book {
443            filter_quantities(
444                &mut public_map,
445                own_book.bid_quantity(status, depth, Some(group_size), accepted_buffer_ns, now),
446            );
447        }
448
449        public_map
450    }
451
452    /// Groups ask quantities into price buckets, truncating to a maximum depth, excluding own orders.
453    ///
454    /// With `own_book`, subtracts own order sizes, filtered by `status` if provided.
455    /// Uses `accepted_buffer_ns` to include only orders accepted at least that many
456    /// nanoseconds before `now` (defaults to now).
457    pub fn group_asks_filtered(
458        &self,
459        group_size: Decimal,
460        depth: Option<usize>,
461        own_book: Option<&OwnOrderBook>,
462        status: Option<HashSet<OrderStatus>>,
463        accepted_buffer_ns: Option<u64>,
464        now: Option<u64>,
465    ) -> IndexMap<Decimal, Decimal> {
466        let mut public_map = group_levels(self.asks(None), group_size, depth, false);
467
468        if let Some(own_book) = own_book {
469            filter_quantities(
470                &mut public_map,
471                own_book.ask_quantity(status, depth, Some(group_size), accepted_buffer_ns, now),
472            );
473        }
474
475        public_map
476    }
477
478    /// Returns true if the book has any bid orders.
479    #[must_use]
480    pub fn has_bid(&self) -> bool {
481        self.bids.top().is_some_and(|top| !top.orders.is_empty())
482    }
483
484    /// Returns true if the book has any ask orders.
485    #[must_use]
486    pub fn has_ask(&self) -> bool {
487        self.asks.top().is_some_and(|top| !top.orders.is_empty())
488    }
489
490    /// Returns the best bid price if available.
491    #[must_use]
492    pub fn best_bid_price(&self) -> Option<Price> {
493        self.bids.top().map(|top| top.price.value)
494    }
495
496    /// Returns the best ask price if available.
497    #[must_use]
498    pub fn best_ask_price(&self) -> Option<Price> {
499        self.asks.top().map(|top| top.price.value)
500    }
501
502    /// Returns the size at the best bid price if available.
503    #[must_use]
504    pub fn best_bid_size(&self) -> Option<Quantity> {
505        self.bids
506            .top()
507            .and_then(|top| top.first().map(|order| order.size))
508    }
509
510    /// Returns the size at the best ask price if available.
511    #[must_use]
512    pub fn best_ask_size(&self) -> Option<Quantity> {
513        self.asks
514            .top()
515            .and_then(|top| top.first().map(|order| order.size))
516    }
517
518    /// Returns the spread between best ask and bid prices if both exist.
519    #[must_use]
520    pub fn spread(&self) -> Option<f64> {
521        match (self.best_ask_price(), self.best_bid_price()) {
522            (Some(ask), Some(bid)) => Some(ask.as_f64() - bid.as_f64()),
523            _ => None,
524        }
525    }
526
527    /// Returns the midpoint between best ask and bid prices if both exist.
528    #[must_use]
529    pub fn midpoint(&self) -> Option<f64> {
530        match (self.best_ask_price(), self.best_bid_price()) {
531            (Some(ask), Some(bid)) => Some((ask.as_f64() + bid.as_f64()) / 2.0),
532            _ => None,
533        }
534    }
535
536    /// Calculates the average price to fill the specified quantity.
537    #[must_use]
538    pub fn get_avg_px_for_quantity(&self, qty: Quantity, order_side: OrderSide) -> f64 {
539        let levels = match order_side.as_specified() {
540            OrderSideSpecified::Buy => &self.asks.levels,
541            OrderSideSpecified::Sell => &self.bids.levels,
542        };
543
544        analysis::get_avg_px_for_quantity(qty, levels)
545    }
546
547    /// Calculates average price and quantity for target exposure. Returns (price, quantity, executed_exposure).
548    #[must_use]
549    pub fn get_avg_px_qty_for_exposure(
550        &self,
551        target_exposure: Quantity,
552        order_side: OrderSide,
553    ) -> (f64, f64, f64) {
554        let levels = match order_side.as_specified() {
555            OrderSideSpecified::Buy => &self.asks.levels,
556            OrderSideSpecified::Sell => &self.bids.levels,
557        };
558
559        analysis::get_avg_px_qty_for_exposure(target_exposure, levels)
560    }
561
562    /// Returns the total quantity available at specified price level.
563    #[must_use]
564    pub fn get_quantity_for_price(&self, price: Price, order_side: OrderSide) -> f64 {
565        let levels = match order_side.as_specified() {
566            OrderSideSpecified::Buy => &self.asks.levels,
567            OrderSideSpecified::Sell => &self.bids.levels,
568        };
569
570        analysis::get_quantity_for_price(price, order_side, levels)
571    }
572
573    /// Simulates fills for an order, returning list of (price, quantity) tuples.
574    #[must_use]
575    pub fn simulate_fills(&self, order: &BookOrder) -> Vec<(Price, Quantity)> {
576        match order.side.as_specified() {
577            OrderSideSpecified::Buy => self.asks.simulate_fills(order),
578            OrderSideSpecified::Sell => self.bids.simulate_fills(order),
579        }
580    }
581
582    /// Return a formatted string representation of the order book.
583    #[must_use]
584    pub fn pprint(&self, num_levels: usize, group_size: Option<Decimal>) -> String {
585        pprint_book(self, num_levels, group_size)
586    }
587
588    fn increment(&mut self, sequence: u64, ts_event: UnixNanos) {
589        // Critical invariant checks: panic in debug, warn in release
590        if sequence < self.sequence {
591            let msg = format!(
592                "Sequence number should not go backwards: old={}, new={}",
593                self.sequence, sequence
594            );
595            debug_assert!(sequence >= self.sequence, "{}", msg);
596            log::warn!("{}", msg);
597        }
598
599        if ts_event < self.ts_last {
600            let msg = format!(
601                "Timestamp should not go backwards: old={}, new={}",
602                self.ts_last, ts_event
603            );
604            debug_assert!(ts_event >= self.ts_last, "{}", msg);
605            log::warn!("{}", msg);
606        }
607
608        if self.update_count == u64::MAX {
609            // Debug assert to catch in development
610            debug_assert!(
611                self.update_count < u64::MAX,
612                "Update count at u64::MAX limit (about to overflow): {}",
613                self.update_count
614            );
615
616            // Spam warnings in production when at/near u64::MAX
617            log::warn!(
618                "Update count at u64::MAX: {} (instrument_id={})",
619                self.update_count,
620                self.instrument_id
621            );
622        }
623
624        self.sequence = sequence;
625        self.ts_last = ts_event;
626        self.update_count = self.update_count.saturating_add(1);
627    }
628
629    /// Updates L1 book state from a quote tick. Only valid for L1_MBP book type.
630    ///
631    /// # Errors
632    ///
633    /// Returns an error if the book type is not `L1_MBP` (operation is invalid).
634    pub fn update_quote_tick(&mut self, quote: &QuoteTick) -> Result<(), InvalidBookOperation> {
635        if self.book_type != BookType::L1_MBP {
636            return Err(InvalidBookOperation::Update(self.book_type));
637        }
638
639        // Note: Crossed quotes (bid > ask) can occur temporarily in volatile markets or during updates
640        // This is more of a data quality warning than a hard invariant
641        if cfg!(debug_assertions) && quote.bid_price > quote.ask_price {
642            log::warn!(
643                "Quote has crossed prices: bid={}, ask={} for {}",
644                quote.bid_price,
645                quote.ask_price,
646                self.instrument_id
647            );
648        }
649        debug_assert!(
650            quote.bid_size.is_positive() && quote.ask_size.is_positive(),
651            "Quote has non-positive sizes: bid_size={}, ask_size={}",
652            quote.bid_size,
653            quote.ask_size
654        );
655
656        let bid = BookOrder::new(
657            OrderSide::Buy,
658            quote.bid_price,
659            quote.bid_size,
660            OrderSide::Buy as u64,
661        );
662
663        let ask = BookOrder::new(
664            OrderSide::Sell,
665            quote.ask_price,
666            quote.ask_size,
667            OrderSide::Sell as u64,
668        );
669
670        self.update_book_bid(bid, quote.ts_event);
671        self.update_book_ask(ask, quote.ts_event);
672
673        self.increment(self.sequence.saturating_add(1), quote.ts_event);
674
675        Ok(())
676    }
677
678    /// Updates L1 book state from a trade tick. Only valid for L1_MBP book type.
679    ///
680    /// # Errors
681    ///
682    /// Returns an error if the book type is not `L1_MBP` (operation is invalid).
683    pub fn update_trade_tick(&mut self, trade: &TradeTick) -> Result<(), InvalidBookOperation> {
684        if self.book_type != BookType::L1_MBP {
685            return Err(InvalidBookOperation::Update(self.book_type));
686        }
687
688        // Note: Prices can be zero or negative for certain instruments (options, commodities, spreads)
689        debug_assert!(
690            trade.price.raw != PRICE_UNDEF && trade.price.raw != PRICE_ERROR,
691            "Trade has invalid/uninitialized price: {}",
692            trade.price
693        );
694        debug_assert!(
695            trade.size.is_positive(),
696            "Trade has non-positive size: {}",
697            trade.size
698        );
699
700        let bid = BookOrder::new(
701            OrderSide::Buy,
702            trade.price,
703            trade.size,
704            OrderSide::Buy as u64,
705        );
706
707        let ask = BookOrder::new(
708            OrderSide::Sell,
709            trade.price,
710            trade.size,
711            OrderSide::Sell as u64,
712        );
713
714        self.update_book_bid(bid, trade.ts_event);
715        self.update_book_ask(ask, trade.ts_event);
716
717        self.increment(self.sequence.saturating_add(1), trade.ts_event);
718
719        Ok(())
720    }
721
722    fn update_book_bid(&mut self, order: BookOrder, ts_event: UnixNanos) {
723        if let Some(top_bids) = self.bids.top()
724            && let Some(top_bid) = top_bids.first()
725        {
726            self.bids.remove_order(top_bid.order_id, 0, ts_event);
727        }
728        self.bids.add(order);
729    }
730
731    fn update_book_ask(&mut self, order: BookOrder, ts_event: UnixNanos) {
732        if let Some(top_asks) = self.asks.top()
733            && let Some(top_ask) = top_asks.first()
734        {
735            self.asks.remove_order(top_ask.order_id, 0, ts_event);
736        }
737        self.asks.add(order);
738    }
739}
740
741fn filter_quantities(
742    public_map: &mut IndexMap<Decimal, Decimal>,
743    own_map: IndexMap<Decimal, Decimal>,
744) {
745    for (price, own_size) in own_map {
746        if let Some(public_size) = public_map.get_mut(&price) {
747            *public_size = (*public_size - own_size).max(Decimal::ZERO);
748
749            if *public_size == Decimal::ZERO {
750                public_map.shift_remove(&price);
751            }
752        }
753    }
754}
755
756fn group_levels<'a>(
757    levels_iter: impl Iterator<Item = &'a BookLevel>,
758    group_size: Decimal,
759    depth: Option<usize>,
760    is_bid: bool,
761) -> IndexMap<Decimal, Decimal> {
762    if group_size <= Decimal::ZERO {
763        log::error!("Invalid group_size: {group_size}, must be positive; returning empty map");
764        return IndexMap::new();
765    }
766
767    let mut levels = IndexMap::new();
768    let depth = depth.unwrap_or(usize::MAX);
769
770    for level in levels_iter {
771        let price = level.price.value.as_decimal();
772        let grouped_price = if is_bid {
773            (price / group_size).floor() * group_size
774        } else {
775            (price / group_size).ceil() * group_size
776        };
777        let size = level.size_decimal();
778
779        levels
780            .entry(grouped_price)
781            .and_modify(|total| *total += size)
782            .or_insert(size);
783
784        if levels.len() > depth {
785            levels.pop();
786            break;
787        }
788    }
789
790    levels
791}