nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{
22    any::Any,
23    cell::RefCell,
24    fmt::Debug,
25    ops::Add,
26    rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31    clock::{Clock, TestClock},
32    timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35    UnixNanos,
36    correctness::{self, FAILED},
37    datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40    data::{
41        QuoteTick, TradeTick,
42        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43    },
44    enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45    types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48/// Type alias for bar handler to reduce type complexity.
49type BarHandler = Box<dyn FnMut(Bar)>;
50
51/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
52///
53/// Implementors receive updates and produce completed bars via handlers.
54pub trait BarAggregator: Any + Debug {
55    /// The [`BarType`] to be aggregated.
56    fn bar_type(&self) -> BarType;
57    /// If the aggregator is running and will receive data from the message bus.
58    fn is_running(&self) -> bool;
59    /// Sets the running state of the aggregator (receiving updates when `true`).
60    fn set_is_running(&mut self, value: bool);
61    /// Updates the aggregator  with the given price and size.
62    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63    /// Updates the aggregator with the given quote.
64    fn handle_quote(&mut self, quote: QuoteTick) {
65        let spec = self.bar_type().spec();
66        self.update(
67            quote.extract_price(spec.price_type),
68            quote.extract_size(spec.price_type),
69            quote.ts_init,
70        );
71    }
72    /// Updates the aggregator with the given trade.
73    fn handle_trade(&mut self, trade: TradeTick) {
74        self.update(trade.price, trade.size, trade.ts_init);
75    }
76    /// Updates the aggregator with the given bar.
77    fn handle_bar(&mut self, bar: Bar) {
78        self.update_bar(bar, bar.volume, bar.ts_init);
79    }
80    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
82    fn stop(&mut self) {}
83    /// Sets historical mode (default implementation does nothing, TimeBarAggregator overrides)
84    fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85    /// Sets historical events (default implementation does nothing, TimeBarAggregator overrides)
86    fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87    /// Sets clock for time bar aggregators (default implementation does nothing, TimeBarAggregator overrides)
88    fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89    /// Builds a bar from a time event (default implementation does nothing, TimeBarAggregator overrides)
90    fn build_bar(&mut self, _event: TimeEvent) {}
91    /// Starts the timer for time bar aggregators.
92    /// Default implementation does nothing, TimeBarAggregator overrides.
93    /// Takes an optional Rc to create weak reference internally.
94    fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95    /// Sets the weak reference to the aggregator wrapper (for historical mode).
96    /// Default implementation does nothing, TimeBarAggregator overrides.
97    fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101    /// Returns a reference to this aggregator as `Any` for downcasting.
102    pub fn as_any(&self) -> &dyn Any {
103        self
104    }
105    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
106    pub fn as_any_mut(&mut self) -> &mut dyn Any {
107        self
108    }
109}
110
111/// Provides a generic bar builder for aggregation.
112#[derive(Debug)]
113pub struct BarBuilder {
114    bar_type: BarType,
115    price_precision: u8,
116    size_precision: u8,
117    initialized: bool,
118    ts_last: UnixNanos,
119    count: usize,
120    last_close: Option<Price>,
121    open: Option<Price>,
122    high: Option<Price>,
123    low: Option<Price>,
124    close: Option<Price>,
125    volume: Quantity,
126}
127
128impl BarBuilder {
129    /// Creates a new [`BarBuilder`] instance.
130    ///
131    /// # Panics
132    ///
133    /// This function panics if:
134    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
135    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
136    #[must_use]
137    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
138        correctness::check_equal(
139            &bar_type.aggregation_source(),
140            &AggregationSource::Internal,
141            "bar_type.aggregation_source",
142            "AggregationSource::Internal",
143        )
144        .expect(FAILED);
145
146        Self {
147            bar_type,
148            price_precision,
149            size_precision,
150            initialized: false,
151            ts_last: UnixNanos::default(),
152            count: 0,
153            last_close: None,
154            open: None,
155            high: None,
156            low: None,
157            close: None,
158            volume: Quantity::zero(size_precision),
159        }
160    }
161
162    /// Updates the builder state with the given price, size, and init timestamp.
163    ///
164    /// # Panics
165    ///
166    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
167    pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
168        if ts_init < self.ts_last {
169            return; // Not applicable
170        }
171
172        if self.open.is_none() {
173            self.open = Some(price);
174            self.high = Some(price);
175            self.low = Some(price);
176            self.initialized = true;
177        } else {
178            if price > self.high.unwrap() {
179                self.high = Some(price);
180            }
181            if price < self.low.unwrap() {
182                self.low = Some(price);
183            }
184        }
185
186        self.close = Some(price);
187        self.volume = self.volume.add(size);
188        self.count += 1;
189        self.ts_last = ts_init;
190    }
191
192    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
193    ///
194    /// # Panics
195    ///
196    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
197    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
198        if ts_init < self.ts_last {
199            return; // Not applicable
200        }
201
202        if self.open.is_none() {
203            self.open = Some(bar.open);
204            self.high = Some(bar.high);
205            self.low = Some(bar.low);
206            self.initialized = true;
207        } else {
208            if bar.high > self.high.unwrap() {
209                self.high = Some(bar.high);
210            }
211            if bar.low < self.low.unwrap() {
212                self.low = Some(bar.low);
213            }
214        }
215
216        self.close = Some(bar.close);
217        self.volume = self.volume.add(volume);
218        self.count += 1;
219        self.ts_last = ts_init;
220    }
221
222    /// Reset the bar builder.
223    ///
224    /// All stateful fields are reset to their initial value.
225    pub fn reset(&mut self) {
226        self.open = None;
227        self.high = None;
228        self.low = None;
229        self.volume = Quantity::zero(self.size_precision);
230        self.count = 0;
231    }
232
233    /// Return the aggregated bar and reset.
234    pub fn build_now(&mut self) -> Bar {
235        self.build(self.ts_last, self.ts_last)
236    }
237
238    /// Returns the aggregated bar for the given timestamps, then resets the builder.
239    ///
240    /// # Panics
241    ///
242    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
243    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
244        if self.open.is_none() {
245            self.open = self.last_close;
246            self.high = self.last_close;
247            self.low = self.last_close;
248            self.close = self.last_close;
249        }
250
251        if let (Some(close), Some(low)) = (self.close, self.low)
252            && close < low
253        {
254            self.low = Some(close);
255        }
256
257        if let (Some(close), Some(high)) = (self.close, self.high)
258            && close > high
259        {
260            self.high = Some(close);
261        }
262
263        // SAFETY: The open was checked, so we can assume all prices are Some
264        let bar = Bar::new(
265            self.bar_type,
266            self.open.unwrap(),
267            self.high.unwrap(),
268            self.low.unwrap(),
269            self.close.unwrap(),
270            self.volume,
271            ts_event,
272            ts_init,
273        );
274
275        self.last_close = self.close;
276        self.reset();
277        bar
278    }
279}
280
281/// Provides a means of aggregating specified bar types and sending to a registered handler.
282pub struct BarAggregatorCore {
283    bar_type: BarType,
284    builder: BarBuilder,
285    handler: BarHandler,
286    is_running: bool,
287}
288
289impl Debug for BarAggregatorCore {
290    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291        f.debug_struct(stringify!(BarAggregatorCore))
292            .field("bar_type", &self.bar_type)
293            .field("builder", &self.builder)
294            .field("is_running", &self.is_running)
295            .finish()
296    }
297}
298
299impl BarAggregatorCore {
300    /// Creates a new [`BarAggregatorCore`] instance.
301    ///
302    /// # Panics
303    ///
304    /// This function panics if:
305    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
306    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
307    pub fn new<H: FnMut(Bar) + 'static>(
308        bar_type: BarType,
309        price_precision: u8,
310        size_precision: u8,
311        handler: H,
312    ) -> Self {
313        Self {
314            bar_type,
315            builder: BarBuilder::new(bar_type, price_precision, size_precision),
316            handler: Box::new(handler),
317            is_running: false,
318        }
319    }
320
321    /// Sets the running state of the aggregator (receives updates when `true`).
322    pub const fn set_is_running(&mut self, value: bool) {
323        self.is_running = value;
324    }
325    fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
326        self.builder.update(price, size, ts_init);
327    }
328
329    fn build_now_and_send(&mut self) {
330        let bar = self.builder.build_now();
331        (self.handler)(bar);
332    }
333
334    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
335        let bar = self.builder.build(ts_event, ts_init);
336        (self.handler)(bar);
337    }
338}
339
340/// Provides a means of building tick bars aggregated from quote and trades.
341///
342/// When received tick count reaches the step threshold of the bar
343/// specification, then a bar is created and sent to the handler.
344pub struct TickBarAggregator {
345    core: BarAggregatorCore,
346}
347
348impl Debug for TickBarAggregator {
349    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350        f.debug_struct(stringify!(TickBarAggregator))
351            .field("core", &self.core)
352            .finish()
353    }
354}
355
356impl TickBarAggregator {
357    /// Creates a new [`TickBarAggregator`] instance.
358    ///
359    /// # Panics
360    ///
361    /// This function panics if:
362    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
363    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
364    pub fn new<H: FnMut(Bar) + 'static>(
365        bar_type: BarType,
366        price_precision: u8,
367        size_precision: u8,
368        handler: H,
369    ) -> Self {
370        Self {
371            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
372        }
373    }
374}
375
376impl BarAggregator for TickBarAggregator {
377    fn bar_type(&self) -> BarType {
378        self.core.bar_type
379    }
380
381    fn is_running(&self) -> bool {
382        self.core.is_running
383    }
384
385    fn set_is_running(&mut self, value: bool) {
386        self.core.set_is_running(value);
387    }
388
389    /// Apply the given update to the aggregator.
390    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
391        self.core.apply_update(price, size, ts_init);
392        let spec = self.core.bar_type.spec();
393
394        if self.core.builder.count >= spec.step.get() {
395            self.core.build_now_and_send();
396        }
397    }
398
399    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
400        self.core.builder.update_bar(bar, volume, ts_init);
401        let spec = self.core.bar_type.spec();
402
403        if self.core.builder.count >= spec.step.get() {
404            self.core.build_now_and_send();
405        }
406    }
407}
408
409/// Aggregates bars based on tick buy/sell imbalance.
410///
411/// Increments imbalance by +1 for buyer-aggressed trades and -1 for seller-aggressed trades.
412/// Emits a bar when the absolute imbalance reaches the step threshold.
413pub struct TickImbalanceBarAggregator {
414    core: BarAggregatorCore,
415    imbalance: isize,
416}
417
418impl Debug for TickImbalanceBarAggregator {
419    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420        f.debug_struct(stringify!(TickImbalanceBarAggregator))
421            .field("core", &self.core)
422            .field("imbalance", &self.imbalance)
423            .finish()
424    }
425}
426
427impl TickImbalanceBarAggregator {
428    /// Creates a new [`TickImbalanceBarAggregator`] instance.
429    ///
430    /// # Panics
431    ///
432    /// This function panics if:
433    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
434    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
435    pub fn new<H: FnMut(Bar) + 'static>(
436        bar_type: BarType,
437        price_precision: u8,
438        size_precision: u8,
439        handler: H,
440    ) -> Self {
441        Self {
442            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
443            imbalance: 0,
444        }
445    }
446}
447
448impl BarAggregator for TickImbalanceBarAggregator {
449    fn bar_type(&self) -> BarType {
450        self.core.bar_type
451    }
452
453    fn is_running(&self) -> bool {
454        self.core.is_running
455    }
456
457    fn set_is_running(&mut self, value: bool) {
458        self.core.set_is_running(value);
459    }
460
461    /// Apply the given update to the aggregator.
462    ///
463    /// Note: side-aware logic lives in `handle_trade`. This method is used for
464    /// quote/bar updates where no aggressor side is available.
465    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
466        self.core.apply_update(price, size, ts_init);
467    }
468
469    fn handle_trade(&mut self, trade: TradeTick) {
470        self.core
471            .apply_update(trade.price, trade.size, trade.ts_init);
472
473        let delta = match trade.aggressor_side {
474            AggressorSide::Buyer => 1,
475            AggressorSide::Seller => -1,
476            AggressorSide::NoAggressor => 0,
477        };
478
479        if delta == 0 {
480            return;
481        }
482
483        self.imbalance += delta;
484        let threshold = self.core.bar_type.spec().step.get();
485        if self.imbalance.unsigned_abs() >= threshold {
486            self.core.build_now_and_send();
487            self.imbalance = 0;
488        }
489    }
490
491    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
492        self.core.builder.update_bar(bar, volume, ts_init);
493    }
494}
495
496/// Aggregates bars based on consecutive buy/sell tick runs.
497pub struct TickRunsBarAggregator {
498    core: BarAggregatorCore,
499    current_run_side: Option<AggressorSide>,
500    run_count: usize,
501}
502
503impl Debug for TickRunsBarAggregator {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        f.debug_struct(stringify!(TickRunsBarAggregator))
506            .field("core", &self.core)
507            .field("current_run_side", &self.current_run_side)
508            .field("run_count", &self.run_count)
509            .finish()
510    }
511}
512
513impl TickRunsBarAggregator {
514    /// Creates a new [`TickRunsBarAggregator`] instance.
515    ///
516    /// # Panics
517    ///
518    /// This function panics if:
519    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
520    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
521    pub fn new<H: FnMut(Bar) + 'static>(
522        bar_type: BarType,
523        price_precision: u8,
524        size_precision: u8,
525        handler: H,
526    ) -> Self {
527        Self {
528            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
529            current_run_side: None,
530            run_count: 0,
531        }
532    }
533}
534
535impl BarAggregator for TickRunsBarAggregator {
536    fn bar_type(&self) -> BarType {
537        self.core.bar_type
538    }
539
540    fn is_running(&self) -> bool {
541        self.core.is_running
542    }
543
544    fn set_is_running(&mut self, value: bool) {
545        self.core.set_is_running(value);
546    }
547
548    /// Apply the given update to the aggregator.
549    ///
550    /// Note: side-aware logic lives in `handle_trade`. This method is used for
551    /// quote/bar updates where no aggressor side is available.
552    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
553        self.core.apply_update(price, size, ts_init);
554    }
555
556    fn handle_trade(&mut self, trade: TradeTick) {
557        let side = match trade.aggressor_side {
558            AggressorSide::Buyer => Some(AggressorSide::Buyer),
559            AggressorSide::Seller => Some(AggressorSide::Seller),
560            AggressorSide::NoAggressor => None,
561        };
562
563        if let Some(side) = side {
564            if self.current_run_side != Some(side) {
565                self.current_run_side = Some(side);
566                self.run_count = 0;
567                self.core.builder.reset();
568            }
569
570            self.core
571                .apply_update(trade.price, trade.size, trade.ts_init);
572            self.run_count += 1;
573
574            let threshold = self.core.bar_type.spec().step.get();
575            if self.run_count >= threshold {
576                self.core.build_now_and_send();
577                self.run_count = 0;
578                self.current_run_side = None;
579            }
580        } else {
581            self.core
582                .apply_update(trade.price, trade.size, trade.ts_init);
583        }
584    }
585
586    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
587        self.core.builder.update_bar(bar, volume, ts_init);
588    }
589}
590
591/// Provides a means of building volume bars aggregated from quote and trades.
592pub struct VolumeBarAggregator {
593    core: BarAggregatorCore,
594}
595
596impl Debug for VolumeBarAggregator {
597    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
598        f.debug_struct(stringify!(VolumeBarAggregator))
599            .field("core", &self.core)
600            .finish()
601    }
602}
603
604impl VolumeBarAggregator {
605    /// Creates a new [`VolumeBarAggregator`] instance.
606    ///
607    /// # Panics
608    ///
609    /// This function panics if:
610    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
611    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
612    pub fn new<H: FnMut(Bar) + 'static>(
613        bar_type: BarType,
614        price_precision: u8,
615        size_precision: u8,
616        handler: H,
617    ) -> Self {
618        Self {
619            core: BarAggregatorCore::new(
620                bar_type.standard(),
621                price_precision,
622                size_precision,
623                handler,
624            ),
625        }
626    }
627}
628
629impl BarAggregator for VolumeBarAggregator {
630    fn bar_type(&self) -> BarType {
631        self.core.bar_type
632    }
633
634    fn is_running(&self) -> bool {
635        self.core.is_running
636    }
637
638    fn set_is_running(&mut self, value: bool) {
639        self.core.set_is_running(value);
640    }
641
642    /// Apply the given update to the aggregator.
643    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
644        let mut raw_size_update = size.raw;
645        let spec = self.core.bar_type.spec();
646        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
647
648        while raw_size_update > 0 {
649            if self.core.builder.volume.raw + raw_size_update < raw_step {
650                self.core.apply_update(
651                    price,
652                    Quantity::from_raw(raw_size_update, size.precision),
653                    ts_init,
654                );
655                break;
656            }
657
658            let raw_size_diff = raw_step - self.core.builder.volume.raw;
659            self.core.apply_update(
660                price,
661                Quantity::from_raw(raw_size_diff, size.precision),
662                ts_init,
663            );
664
665            self.core.build_now_and_send();
666            raw_size_update -= raw_size_diff;
667        }
668    }
669
670    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
671        let mut raw_volume_update = volume.raw;
672        let spec = self.core.bar_type.spec();
673        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
674
675        while raw_volume_update > 0 {
676            if self.core.builder.volume.raw + raw_volume_update < raw_step {
677                self.core.builder.update_bar(
678                    bar,
679                    Quantity::from_raw(raw_volume_update, volume.precision),
680                    ts_init,
681                );
682                break;
683            }
684
685            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
686            self.core.builder.update_bar(
687                bar,
688                Quantity::from_raw(raw_volume_diff, volume.precision),
689                ts_init,
690            );
691
692            self.core.build_now_and_send();
693            raw_volume_update -= raw_volume_diff;
694        }
695    }
696}
697
698/// Aggregates bars based on buy/sell volume imbalance.
699pub struct VolumeImbalanceBarAggregator {
700    core: BarAggregatorCore,
701    imbalance_raw: i128,
702    raw_step: i128,
703}
704
705impl Debug for VolumeImbalanceBarAggregator {
706    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707        f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
708            .field("core", &self.core)
709            .field("imbalance_raw", &self.imbalance_raw)
710            .field("raw_step", &self.raw_step)
711            .finish()
712    }
713}
714
715impl VolumeImbalanceBarAggregator {
716    /// Creates a new [`VolumeImbalanceBarAggregator`] instance.
717    ///
718    /// # Panics
719    ///
720    /// This function panics if:
721    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
722    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
723    pub fn new<H: FnMut(Bar) + 'static>(
724        bar_type: BarType,
725        price_precision: u8,
726        size_precision: u8,
727        handler: H,
728    ) -> Self {
729        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
730        Self {
731            core: BarAggregatorCore::new(
732                bar_type.standard(),
733                price_precision,
734                size_precision,
735                handler,
736            ),
737            imbalance_raw: 0,
738            raw_step,
739        }
740    }
741}
742
743impl BarAggregator for VolumeImbalanceBarAggregator {
744    fn bar_type(&self) -> BarType {
745        self.core.bar_type
746    }
747
748    fn is_running(&self) -> bool {
749        self.core.is_running
750    }
751
752    fn set_is_running(&mut self, value: bool) {
753        self.core.set_is_running(value);
754    }
755
756    /// Apply the given update to the aggregator.
757    ///
758    /// Note: side-aware logic lives in `handle_trade`. This method is used for
759    /// quote/bar updates where no aggressor side is available.
760    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
761        self.core.apply_update(price, size, ts_init);
762    }
763
764    fn handle_trade(&mut self, trade: TradeTick) {
765        let side = match trade.aggressor_side {
766            AggressorSide::Buyer => 1,
767            AggressorSide::Seller => -1,
768            AggressorSide::NoAggressor => {
769                self.core
770                    .apply_update(trade.price, trade.size, trade.ts_init);
771                return;
772            }
773        };
774
775        let mut raw_remaining = trade.size.raw as i128;
776        while raw_remaining > 0 {
777            let imbalance_abs = self.imbalance_raw.abs();
778            let needed = (self.raw_step - imbalance_abs).max(1);
779            let raw_chunk = raw_remaining.min(needed);
780            let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
781
782            self.core
783                .apply_update(trade.price, qty_chunk, trade.ts_init);
784
785            self.imbalance_raw += side * raw_chunk;
786            raw_remaining -= raw_chunk;
787
788            if self.imbalance_raw.abs() >= self.raw_step {
789                self.core.build_now_and_send();
790                self.imbalance_raw = 0;
791            }
792        }
793    }
794
795    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
796        self.core.builder.update_bar(bar, volume, ts_init);
797    }
798}
799
800/// Aggregates bars based on consecutive buy/sell volume runs.
801pub struct VolumeRunsBarAggregator {
802    core: BarAggregatorCore,
803    current_run_side: Option<AggressorSide>,
804    run_volume_raw: QuantityRaw,
805    raw_step: QuantityRaw,
806}
807
808impl Debug for VolumeRunsBarAggregator {
809    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810        f.debug_struct(stringify!(VolumeRunsBarAggregator))
811            .field("core", &self.core)
812            .field("current_run_side", &self.current_run_side)
813            .field("run_volume_raw", &self.run_volume_raw)
814            .field("raw_step", &self.raw_step)
815            .finish()
816    }
817}
818
819impl VolumeRunsBarAggregator {
820    /// Creates a new [`VolumeRunsBarAggregator`] instance.
821    ///
822    /// # Panics
823    ///
824    /// This function panics if:
825    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
826    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
827    pub fn new<H: FnMut(Bar) + 'static>(
828        bar_type: BarType,
829        price_precision: u8,
830        size_precision: u8,
831        handler: H,
832    ) -> Self {
833        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
834        Self {
835            core: BarAggregatorCore::new(
836                bar_type.standard(),
837                price_precision,
838                size_precision,
839                handler,
840            ),
841            current_run_side: None,
842            run_volume_raw: 0,
843            raw_step,
844        }
845    }
846}
847
848impl BarAggregator for VolumeRunsBarAggregator {
849    fn bar_type(&self) -> BarType {
850        self.core.bar_type
851    }
852
853    fn is_running(&self) -> bool {
854        self.core.is_running
855    }
856
857    fn set_is_running(&mut self, value: bool) {
858        self.core.set_is_running(value);
859    }
860
861    /// Apply the given update to the aggregator.
862    ///
863    /// Note: side-aware logic lives in `handle_trade`. This method is used for
864    /// quote/bar updates where no aggressor side is available.
865    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
866        self.core.apply_update(price, size, ts_init);
867    }
868
869    fn handle_trade(&mut self, trade: TradeTick) {
870        let side = match trade.aggressor_side {
871            AggressorSide::Buyer => Some(AggressorSide::Buyer),
872            AggressorSide::Seller => Some(AggressorSide::Seller),
873            AggressorSide::NoAggressor => None,
874        };
875
876        let Some(side) = side else {
877            self.core
878                .apply_update(trade.price, trade.size, trade.ts_init);
879            return;
880        };
881
882        if self.current_run_side != Some(side) {
883            self.current_run_side = Some(side);
884            self.run_volume_raw = 0;
885            self.core.builder.reset();
886        }
887
888        let mut raw_remaining = trade.size.raw;
889        while raw_remaining > 0 {
890            let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
891            let raw_chunk = raw_remaining.min(needed);
892
893            self.core.apply_update(
894                trade.price,
895                Quantity::from_raw(raw_chunk, trade.size.precision),
896                trade.ts_init,
897            );
898
899            self.run_volume_raw += raw_chunk;
900            raw_remaining -= raw_chunk;
901
902            if self.run_volume_raw >= self.raw_step {
903                self.core.build_now_and_send();
904                self.run_volume_raw = 0;
905                self.current_run_side = None;
906            }
907        }
908    }
909
910    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
911        self.core.builder.update_bar(bar, volume, ts_init);
912    }
913}
914
915/// Provides a means of building value bars aggregated from quote and trades.
916///
917/// When received value reaches the step threshold of the bar
918/// specification, then a bar is created and sent to the handler.
919pub struct ValueBarAggregator {
920    core: BarAggregatorCore,
921    cum_value: f64,
922}
923
924impl Debug for ValueBarAggregator {
925    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926        f.debug_struct(stringify!(ValueBarAggregator))
927            .field("core", &self.core)
928            .field("cum_value", &self.cum_value)
929            .finish()
930    }
931}
932
933impl ValueBarAggregator {
934    /// Creates a new [`ValueBarAggregator`] instance.
935    ///
936    /// # Panics
937    ///
938    /// This function panics if:
939    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
940    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
941    pub fn new<H: FnMut(Bar) + 'static>(
942        bar_type: BarType,
943        price_precision: u8,
944        size_precision: u8,
945        handler: H,
946    ) -> Self {
947        Self {
948            core: BarAggregatorCore::new(
949                bar_type.standard(),
950                price_precision,
951                size_precision,
952                handler,
953            ),
954            cum_value: 0.0,
955        }
956    }
957
958    #[must_use]
959    /// Returns the cumulative value for the aggregator.
960    pub const fn get_cumulative_value(&self) -> f64 {
961        self.cum_value
962    }
963}
964
965impl BarAggregator for ValueBarAggregator {
966    fn bar_type(&self) -> BarType {
967        self.core.bar_type
968    }
969
970    fn is_running(&self) -> bool {
971        self.core.is_running
972    }
973
974    fn set_is_running(&mut self, value: bool) {
975        self.core.set_is_running(value);
976    }
977
978    /// Apply the given update to the aggregator.
979    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
980        let mut size_update = size.as_f64();
981        let spec = self.core.bar_type.spec();
982
983        while size_update > 0.0 {
984            let value_update = price.as_f64() * size_update;
985            if value_update == 0.0 {
986                // Prevent division by zero - apply remaining size without triggering bar
987                self.core
988                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
989                break;
990            }
991
992            if self.cum_value + value_update < spec.step.get() as f64 {
993                self.cum_value += value_update;
994                self.core
995                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
996                break;
997            }
998
999            let value_diff = spec.step.get() as f64 - self.cum_value;
1000            let size_diff = size_update * (value_diff / value_update);
1001            self.core
1002                .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1003
1004            self.core.build_now_and_send();
1005            self.cum_value = 0.0;
1006            size_update -= size_diff;
1007        }
1008    }
1009
1010    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1011        let mut volume_update = volume;
1012        let average_price = Price::new(
1013            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1014            self.core.builder.price_precision,
1015        );
1016
1017        while volume_update.as_f64() > 0.0 {
1018            let value_update = average_price.as_f64() * volume_update.as_f64();
1019            if value_update == 0.0 {
1020                // Prevent division by zero - apply remaining volume without triggering bar
1021                self.core.builder.update_bar(bar, volume_update, ts_init);
1022                break;
1023            }
1024
1025            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1026                self.cum_value += value_update;
1027                self.core.builder.update_bar(bar, volume_update, ts_init);
1028                break;
1029            }
1030
1031            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1032            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
1033            self.core.builder.update_bar(
1034                bar,
1035                Quantity::new(volume_diff, volume_update.precision),
1036                ts_init,
1037            );
1038
1039            self.core.build_now_and_send();
1040            self.cum_value = 0.0;
1041            volume_update = Quantity::new(
1042                volume_update.as_f64() - volume_diff,
1043                volume_update.precision,
1044            );
1045        }
1046    }
1047}
1048
1049/// Aggregates bars based on buy/sell notional imbalance.
1050pub struct ValueImbalanceBarAggregator {
1051    core: BarAggregatorCore,
1052    imbalance_value: f64,
1053    step_value: f64,
1054}
1055
1056impl Debug for ValueImbalanceBarAggregator {
1057    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058        f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1059            .field("core", &self.core)
1060            .field("imbalance_value", &self.imbalance_value)
1061            .field("step_value", &self.step_value)
1062            .finish()
1063    }
1064}
1065
1066impl ValueImbalanceBarAggregator {
1067    /// Creates a new [`ValueImbalanceBarAggregator`] instance.
1068    ///
1069    /// # Panics
1070    ///
1071    /// This function panics if:
1072    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1073    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1074    pub fn new<H: FnMut(Bar) + 'static>(
1075        bar_type: BarType,
1076        price_precision: u8,
1077        size_precision: u8,
1078        handler: H,
1079    ) -> Self {
1080        Self {
1081            core: BarAggregatorCore::new(
1082                bar_type.standard(),
1083                price_precision,
1084                size_precision,
1085                handler,
1086            ),
1087            imbalance_value: 0.0,
1088            step_value: bar_type.spec().step.get() as f64,
1089        }
1090    }
1091}
1092
1093impl BarAggregator for ValueImbalanceBarAggregator {
1094    fn bar_type(&self) -> BarType {
1095        self.core.bar_type
1096    }
1097
1098    fn is_running(&self) -> bool {
1099        self.core.is_running
1100    }
1101
1102    fn set_is_running(&mut self, value: bool) {
1103        self.core.set_is_running(value);
1104    }
1105
1106    /// Apply the given update to the aggregator.
1107    ///
1108    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1109    /// quote/bar updates where no aggressor side is available.
1110    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1111        self.core.apply_update(price, size, ts_init);
1112    }
1113
1114    fn handle_trade(&mut self, trade: TradeTick) {
1115        let price_f64 = trade.price.as_f64();
1116        if price_f64 == 0.0 {
1117            self.core
1118                .apply_update(trade.price, trade.size, trade.ts_init);
1119            return;
1120        }
1121
1122        let side_sign = match trade.aggressor_side {
1123            AggressorSide::Buyer => 1.0,
1124            AggressorSide::Seller => -1.0,
1125            AggressorSide::NoAggressor => {
1126                self.core
1127                    .apply_update(trade.price, trade.size, trade.ts_init);
1128                return;
1129            }
1130        };
1131
1132        let mut size_remaining = trade.size.as_f64();
1133        while size_remaining > 0.0 {
1134            let value_remaining = price_f64 * size_remaining;
1135            let current_sign = self.imbalance_value.signum();
1136
1137            if current_sign == 0.0 || current_sign == side_sign {
1138                let needed = self.step_value - self.imbalance_value.abs();
1139                if value_remaining <= needed {
1140                    self.imbalance_value += side_sign * value_remaining;
1141                    self.core.apply_update(
1142                        trade.price,
1143                        Quantity::new(size_remaining, trade.size.precision),
1144                        trade.ts_init,
1145                    );
1146                    break;
1147                }
1148
1149                let value_chunk = needed;
1150                let size_chunk = value_chunk / price_f64;
1151                self.core.apply_update(
1152                    trade.price,
1153                    Quantity::new(size_chunk, trade.size.precision),
1154                    trade.ts_init,
1155                );
1156                self.imbalance_value += side_sign * value_chunk;
1157                size_remaining -= size_chunk;
1158
1159                if self.imbalance_value.abs() >= self.step_value {
1160                    self.core.build_now_and_send();
1161                    self.imbalance_value = 0.0;
1162                }
1163            } else {
1164                // Opposing side: first neutralize existing imbalance
1165                let value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1166                let size_chunk = value_to_flatten / price_f64;
1167                self.core.apply_update(
1168                    trade.price,
1169                    Quantity::new(size_chunk, trade.size.precision),
1170                    trade.ts_init,
1171                );
1172                self.imbalance_value += side_sign * value_to_flatten;
1173                size_remaining -= size_chunk;
1174            }
1175        }
1176    }
1177
1178    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1179        self.core.builder.update_bar(bar, volume, ts_init);
1180    }
1181}
1182
1183/// Aggregates bars based on consecutive buy/sell notional runs.
1184pub struct ValueRunsBarAggregator {
1185    core: BarAggregatorCore,
1186    current_run_side: Option<AggressorSide>,
1187    run_value: f64,
1188    step_value: f64,
1189}
1190
1191impl Debug for ValueRunsBarAggregator {
1192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193        f.debug_struct(stringify!(ValueRunsBarAggregator))
1194            .field("core", &self.core)
1195            .field("current_run_side", &self.current_run_side)
1196            .field("run_value", &self.run_value)
1197            .field("step_value", &self.step_value)
1198            .finish()
1199    }
1200}
1201
1202impl ValueRunsBarAggregator {
1203    /// Creates a new [`ValueRunsBarAggregator`] instance.
1204    ///
1205    /// # Panics
1206    ///
1207    /// This function panics if:
1208    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1209    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1210    pub fn new<H: FnMut(Bar) + 'static>(
1211        bar_type: BarType,
1212        price_precision: u8,
1213        size_precision: u8,
1214        handler: H,
1215    ) -> Self {
1216        Self {
1217            core: BarAggregatorCore::new(
1218                bar_type.standard(),
1219                price_precision,
1220                size_precision,
1221                handler,
1222            ),
1223            current_run_side: None,
1224            run_value: 0.0,
1225            step_value: bar_type.spec().step.get() as f64,
1226        }
1227    }
1228}
1229
1230impl BarAggregator for ValueRunsBarAggregator {
1231    fn bar_type(&self) -> BarType {
1232        self.core.bar_type
1233    }
1234
1235    fn is_running(&self) -> bool {
1236        self.core.is_running
1237    }
1238
1239    fn set_is_running(&mut self, value: bool) {
1240        self.core.set_is_running(value);
1241    }
1242
1243    /// Apply the given update to the aggregator.
1244    ///
1245    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1246    /// quote/bar updates where no aggressor side is available.
1247    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1248        self.core.apply_update(price, size, ts_init);
1249    }
1250
1251    fn handle_trade(&mut self, trade: TradeTick) {
1252        let price_f64 = trade.price.as_f64();
1253        if price_f64 == 0.0 {
1254            self.core
1255                .apply_update(trade.price, trade.size, trade.ts_init);
1256            return;
1257        }
1258
1259        let side = match trade.aggressor_side {
1260            AggressorSide::Buyer => Some(AggressorSide::Buyer),
1261            AggressorSide::Seller => Some(AggressorSide::Seller),
1262            AggressorSide::NoAggressor => None,
1263        };
1264
1265        let Some(side) = side else {
1266            self.core
1267                .apply_update(trade.price, trade.size, trade.ts_init);
1268            return;
1269        };
1270
1271        if self.current_run_side != Some(side) {
1272            self.current_run_side = Some(side);
1273            self.run_value = 0.0;
1274            self.core.builder.reset();
1275        }
1276
1277        let mut size_remaining = trade.size.as_f64();
1278        while size_remaining > 0.0 {
1279            let value_update = price_f64 * size_remaining;
1280            if self.run_value + value_update < self.step_value {
1281                self.run_value += value_update;
1282                self.core.apply_update(
1283                    trade.price,
1284                    Quantity::new(size_remaining, trade.size.precision),
1285                    trade.ts_init,
1286                );
1287                break;
1288            }
1289
1290            let value_needed = self.step_value - self.run_value;
1291            let size_chunk = value_needed / price_f64;
1292            self.core.apply_update(
1293                trade.price,
1294                Quantity::new(size_chunk, trade.size.precision),
1295                trade.ts_init,
1296            );
1297
1298            self.core.build_now_and_send();
1299            self.run_value = 0.0;
1300            self.current_run_side = None;
1301            size_remaining -= size_chunk;
1302        }
1303    }
1304
1305    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1306        self.core.builder.update_bar(bar, volume, ts_init);
1307    }
1308}
1309
1310/// Provides a means of building Renko bars aggregated from quote and trades.
1311///
1312/// Renko bars are created when the price moves by a fixed amount (brick size)
1313/// regardless of time or volume. Each bar represents a price movement equal
1314/// to the step size in the bar specification.
1315pub struct RenkoBarAggregator {
1316    core: BarAggregatorCore,
1317    pub brick_size: PriceRaw,
1318    last_close: Option<Price>,
1319}
1320
1321impl Debug for RenkoBarAggregator {
1322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1323        f.debug_struct(stringify!(RenkoBarAggregator))
1324            .field("core", &self.core)
1325            .field("brick_size", &self.brick_size)
1326            .field("last_close", &self.last_close)
1327            .finish()
1328    }
1329}
1330
1331impl RenkoBarAggregator {
1332    /// Creates a new [`RenkoBarAggregator`] instance.
1333    ///
1334    /// # Panics
1335    ///
1336    /// This function panics if:
1337    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1338    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1339    pub fn new<H: FnMut(Bar) + 'static>(
1340        bar_type: BarType,
1341        price_precision: u8,
1342        size_precision: u8,
1343        price_increment: Price,
1344        handler: H,
1345    ) -> Self {
1346        // Calculate brick size in raw price units (step * price_increment.raw)
1347        let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1348
1349        Self {
1350            core: BarAggregatorCore::new(
1351                bar_type.standard(),
1352                price_precision,
1353                size_precision,
1354                handler,
1355            ),
1356            brick_size,
1357            last_close: None,
1358        }
1359    }
1360}
1361
1362impl BarAggregator for RenkoBarAggregator {
1363    fn bar_type(&self) -> BarType {
1364        self.core.bar_type
1365    }
1366
1367    fn is_running(&self) -> bool {
1368        self.core.is_running
1369    }
1370
1371    fn set_is_running(&mut self, value: bool) {
1372        self.core.set_is_running(value);
1373    }
1374
1375    /// Apply the given update to the aggregator.
1376    ///
1377    /// For Renko bars, we check if the price movement from the last close
1378    /// is greater than or equal to the brick size. If so, we create new bars.
1379    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1380        // Always update the builder with the current tick
1381        self.core.apply_update(price, size, ts_init);
1382
1383        // Initialize last_close if this is the first update
1384        if self.last_close.is_none() {
1385            self.last_close = Some(price);
1386            return;
1387        }
1388
1389        let last_close = self.last_close.unwrap();
1390
1391        // Convert prices to raw units (integers) to avoid floating point precision issues
1392        let current_raw = price.raw;
1393        let last_close_raw = last_close.raw;
1394        let price_diff_raw = current_raw - last_close_raw;
1395        let abs_price_diff_raw = price_diff_raw.abs();
1396
1397        // Check if we need to create one or more Renko bars
1398        if abs_price_diff_raw >= self.brick_size {
1399            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1400            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1401            let mut current_close = last_close;
1402
1403            // Store the current builder volume to distribute across bricks
1404            let total_volume = self.core.builder.volume;
1405
1406            for _i in 0..num_bricks {
1407                // Calculate the close price for this brick using raw price units
1408                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1409                let brick_close = Price::from_raw(brick_close_raw, price.precision);
1410
1411                // For Renko bars: open = previous close, high/low depend on direction
1412                let (brick_high, brick_low) = if direction > 0.0 {
1413                    (brick_close, current_close)
1414                } else {
1415                    (current_close, brick_close)
1416                };
1417
1418                // Reset builder for this brick
1419                self.core.builder.reset();
1420                self.core.builder.open = Some(current_close);
1421                self.core.builder.high = Some(brick_high);
1422                self.core.builder.low = Some(brick_low);
1423                self.core.builder.close = Some(brick_close);
1424                self.core.builder.volume = total_volume; // Each brick gets the full volume
1425                self.core.builder.count = 1;
1426                self.core.builder.ts_last = ts_init;
1427                self.core.builder.initialized = true;
1428
1429                // Build and send the bar
1430                self.core.build_and_send(ts_init, ts_init);
1431
1432                // Update for the next brick
1433                current_close = brick_close;
1434                self.last_close = Some(brick_close);
1435            }
1436        }
1437    }
1438
1439    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1440        // Always update the builder with the current bar
1441        self.core.builder.update_bar(bar, volume, ts_init);
1442
1443        // Initialize last_close if this is the first update
1444        if self.last_close.is_none() {
1445            self.last_close = Some(bar.close);
1446            return;
1447        }
1448
1449        let last_close = self.last_close.unwrap();
1450
1451        // Convert prices to raw units (integers) to avoid floating point precision issues
1452        let current_raw = bar.close.raw;
1453        let last_close_raw = last_close.raw;
1454        let price_diff_raw = current_raw - last_close_raw;
1455        let abs_price_diff_raw = price_diff_raw.abs();
1456
1457        // Check if we need to create one or more Renko bars
1458        if abs_price_diff_raw >= self.brick_size {
1459            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1460            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1461            let mut current_close = last_close;
1462
1463            // Store the current builder volume to distribute across bricks
1464            let total_volume = self.core.builder.volume;
1465
1466            for _i in 0..num_bricks {
1467                // Calculate the close price for this brick using raw price units
1468                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1469                let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1470
1471                // For Renko bars: open = previous close, high/low depend on direction
1472                let (brick_high, brick_low) = if direction > 0.0 {
1473                    (brick_close, current_close)
1474                } else {
1475                    (current_close, brick_close)
1476                };
1477
1478                // Reset builder for this brick
1479                self.core.builder.reset();
1480                self.core.builder.open = Some(current_close);
1481                self.core.builder.high = Some(brick_high);
1482                self.core.builder.low = Some(brick_low);
1483                self.core.builder.close = Some(brick_close);
1484                self.core.builder.volume = total_volume; // Each brick gets the full volume
1485                self.core.builder.count = 1;
1486                self.core.builder.ts_last = ts_init;
1487                self.core.builder.initialized = true;
1488
1489                // Build and send the bar
1490                self.core.build_and_send(ts_init, ts_init);
1491
1492                // Update for the next brick
1493                current_close = brick_close;
1494                self.last_close = Some(brick_close);
1495            }
1496        }
1497    }
1498}
1499
1500/// Provides a means of building time bars aggregated from quote and trades.
1501///
1502/// At each aggregation time interval, a bar is created and sent to the handler.
1503pub struct TimeBarAggregator {
1504    core: BarAggregatorCore,
1505    clock: Rc<RefCell<dyn Clock>>,
1506    build_with_no_updates: bool,
1507    timestamp_on_close: bool,
1508    is_left_open: bool,
1509    stored_open_ns: UnixNanos,
1510    timer_name: String,
1511    interval_ns: UnixNanos,
1512    next_close_ns: UnixNanos,
1513    bar_build_delay: u64,
1514    time_bars_origin_offset: Option<TimeDelta>,
1515    skip_first_non_full_bar: bool,
1516    pub historical_mode: bool,
1517    historical_events: Vec<TimeEvent>,
1518    aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1519}
1520
1521impl Debug for TimeBarAggregator {
1522    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1523        f.debug_struct(stringify!(TimeBarAggregator))
1524            .field("core", &self.core)
1525            .field("build_with_no_updates", &self.build_with_no_updates)
1526            .field("timestamp_on_close", &self.timestamp_on_close)
1527            .field("is_left_open", &self.is_left_open)
1528            .field("timer_name", &self.timer_name)
1529            .field("interval_ns", &self.interval_ns)
1530            .field("bar_build_delay", &self.bar_build_delay)
1531            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1532            .finish()
1533    }
1534}
1535
1536impl TimeBarAggregator {
1537    /// Creates a new [`TimeBarAggregator`] instance.
1538    ///
1539    /// # Panics
1540    ///
1541    /// This function panics if:
1542    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1543    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1544    #[allow(clippy::too_many_arguments)]
1545    pub fn new<H: FnMut(Bar) + 'static>(
1546        bar_type: BarType,
1547        price_precision: u8,
1548        size_precision: u8,
1549        clock: Rc<RefCell<dyn Clock>>,
1550        handler: H,
1551        build_with_no_updates: bool,
1552        timestamp_on_close: bool,
1553        interval_type: BarIntervalType,
1554        time_bars_origin_offset: Option<TimeDelta>,
1555        bar_build_delay: u64,
1556        skip_first_non_full_bar: bool,
1557    ) -> Self {
1558        let is_left_open = match interval_type {
1559            BarIntervalType::LeftOpen => true,
1560            BarIntervalType::RightOpen => false,
1561        };
1562
1563        let core = BarAggregatorCore::new(
1564            bar_type.standard(),
1565            price_precision,
1566            size_precision,
1567            handler,
1568        );
1569
1570        Self {
1571            core,
1572            clock,
1573            build_with_no_updates,
1574            timestamp_on_close,
1575            is_left_open,
1576            stored_open_ns: UnixNanos::default(),
1577            timer_name: bar_type.to_string(),
1578            interval_ns: get_bar_interval_ns(&bar_type),
1579            next_close_ns: UnixNanos::default(),
1580            bar_build_delay,
1581            time_bars_origin_offset,
1582            skip_first_non_full_bar,
1583            historical_mode: false,
1584            historical_events: Vec::new(),
1585            aggregator_weak: None,
1586        }
1587    }
1588
1589    /// Sets the clock for the aggregator (internal method).
1590    pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1591        self.clock = clock;
1592    }
1593
1594    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
1595    ///
1596    /// This matches the Cython `start_timer()` method exactly.
1597    /// Creates a callback to `build_bar` using a weak reference to the aggregator.
1598    ///
1599    /// # Panics
1600    ///
1601    /// Panics if aggregator_rc is None and aggregator_weak hasn't been set, or if timer registration fails.
1602    pub fn start_timer_internal(
1603        &mut self,
1604        aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1605    ) {
1606        // Create callback that calls build_bar through the weak reference
1607        let aggregator_weak = if let Some(rc) = aggregator_rc {
1608            // Store weak reference for future use (e.g., in build_bar for month/year)
1609            let weak = Rc::downgrade(&rc);
1610            self.aggregator_weak = Some(weak.clone());
1611            weak
1612        } else {
1613            // Use existing weak reference (for historical mode where it was set earlier)
1614            self.aggregator_weak
1615                .as_ref()
1616                .expect("Aggregator weak reference must be set before calling start_timer()")
1617                .clone()
1618        };
1619
1620        let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
1621            if let Some(agg) = aggregator_weak.upgrade() {
1622                agg.borrow_mut().build_bar(event);
1623            }
1624        }));
1625
1626        // Computing start_time
1627        let now = self.clock.borrow().utc_now();
1628        let mut start_time =
1629            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1630        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1631
1632        // Closing a partial bar at the transition from historical to backtest data
1633        let fire_immediately = start_time == now;
1634
1635        self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1636
1637        let spec = &self.bar_type().spec();
1638        let start_time_ns = UnixNanos::from(start_time);
1639
1640        if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1641            self.clock
1642                .borrow_mut()
1643                .set_timer_ns(
1644                    &self.timer_name,
1645                    self.interval_ns.as_u64(),
1646                    Some(start_time_ns),
1647                    None,
1648                    Some(callback),
1649                    Some(true), // allow_past
1650                    Some(fire_immediately),
1651                )
1652                .expect(FAILED);
1653
1654            if fire_immediately {
1655                self.next_close_ns = start_time_ns;
1656            } else {
1657                let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1658                self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1659            }
1660
1661            self.stored_open_ns = self.next_close_ns - self.interval_ns;
1662        } else {
1663            // The monthly/yearly alert time is defined iteratively at each alert time as there is no regular interval
1664            let alert_time = if fire_immediately {
1665                start_time
1666            } else {
1667                let step = spec.step.get() as u32;
1668                if spec.aggregation == BarAggregation::Month {
1669                    add_n_months(start_time, step).expect(FAILED)
1670                } else {
1671                    // Year aggregation
1672                    add_n_years(start_time, step).expect(FAILED)
1673                }
1674            };
1675
1676            self.clock
1677                .borrow_mut()
1678                .set_time_alert_ns(
1679                    &self.timer_name,
1680                    UnixNanos::from(alert_time),
1681                    Some(callback),
1682                    Some(true), // allow_past
1683                )
1684                .expect(FAILED);
1685
1686            self.next_close_ns = UnixNanos::from(alert_time);
1687            self.stored_open_ns = UnixNanos::from(start_time);
1688        }
1689
1690        log::debug!(
1691            "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1692            self.timer_name,
1693            start_time,
1694            self.historical_mode,
1695            fire_immediately,
1696            now,
1697            self.bar_build_delay
1698        );
1699    }
1700
1701    /// Stops the time bar aggregator.
1702    pub fn stop(&mut self) {
1703        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1704    }
1705
1706    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1707        if self.skip_first_non_full_bar {
1708            self.core.builder.reset();
1709            self.skip_first_non_full_bar = false;
1710        } else {
1711            self.core.build_and_send(ts_event, ts_init);
1712        }
1713    }
1714
1715    fn build_bar(&mut self, event: TimeEvent) {
1716        if !self.core.builder.initialized {
1717            return;
1718        }
1719
1720        if !self.build_with_no_updates && self.core.builder.count == 0 {
1721            return; // Do not build bar when no update
1722        }
1723
1724        let ts_init = event.ts_event;
1725        let ts_event = if self.is_left_open {
1726            if self.timestamp_on_close {
1727                event.ts_event
1728            } else {
1729                self.stored_open_ns
1730            }
1731        } else {
1732            self.stored_open_ns
1733        };
1734
1735        self.build_and_send(ts_event, ts_init);
1736
1737        // Close time becomes the next open time
1738        self.stored_open_ns = event.ts_event;
1739
1740        if self.bar_type().spec().aggregation == BarAggregation::Month {
1741            let step = self.bar_type().spec().step.get() as u32;
1742            let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1743
1744            self.clock
1745                .borrow_mut()
1746                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1747                .expect(FAILED);
1748
1749            self.next_close_ns = alert_time_ns;
1750        } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1751            let step = self.bar_type().spec().step.get() as u32;
1752            let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1753
1754            self.clock
1755                .borrow_mut()
1756                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1757                .expect(FAILED);
1758
1759            self.next_close_ns = alert_time_ns;
1760        } else {
1761            // On receiving this event, timer should now have a new `next_time_ns`
1762            self.next_close_ns = self
1763                .clock
1764                .borrow()
1765                .next_time_ns(&self.timer_name)
1766                .unwrap_or_default();
1767        }
1768    }
1769
1770    fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1771        if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1772            // In historical mode, clock is always a TestClock (set by data engine)
1773            {
1774                let mut clock_borrow = self.clock.borrow_mut();
1775                let test_clock = clock_borrow
1776                    .as_any_mut()
1777                    .downcast_mut::<TestClock>()
1778                    .expect("Expected TestClock in historical mode");
1779                test_clock.set_time(ts_init);
1780            }
1781            // In historical mode, weak reference should already be set
1782            self.start_timer_internal(None);
1783        }
1784
1785        // Advance this aggregator's independent clock and collect timer events
1786        {
1787            let mut clock_borrow = self.clock.borrow_mut();
1788            let test_clock = clock_borrow
1789                .as_any_mut()
1790                .downcast_mut::<TestClock>()
1791                .expect("Expected TestClock in historical mode");
1792            self.historical_events = test_clock.advance_time(ts_init, true);
1793        }
1794    }
1795
1796    fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1797        // Process timer events after data processing
1798        // Collect events first to avoid borrow checker issues
1799        let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1800        for event in events {
1801            self.build_bar(event);
1802        }
1803    }
1804
1805    /// Sets historical events (called by data engine after advancing clock)
1806    pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1807        self.historical_events = events;
1808    }
1809}
1810
1811impl BarAggregator for TimeBarAggregator {
1812    fn bar_type(&self) -> BarType {
1813        self.core.bar_type
1814    }
1815
1816    fn is_running(&self) -> bool {
1817        self.core.is_running
1818    }
1819
1820    fn set_is_running(&mut self, value: bool) {
1821        self.core.set_is_running(value);
1822    }
1823
1824    /// Stop time-based aggregator by canceling its timer.
1825    fn stop(&mut self) {
1826        Self::stop(self);
1827    }
1828
1829    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1830        if self.historical_mode {
1831            self.preprocess_historical_events(ts_init);
1832        }
1833
1834        self.core.apply_update(price, size, ts_init);
1835
1836        if self.historical_mode {
1837            self.postprocess_historical_events(ts_init);
1838        }
1839    }
1840
1841    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1842        if self.historical_mode {
1843            self.preprocess_historical_events(ts_init);
1844        }
1845
1846        self.core.builder.update_bar(bar, volume, ts_init);
1847
1848        if self.historical_mode {
1849            self.postprocess_historical_events(ts_init);
1850        }
1851    }
1852
1853    fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1854        self.historical_mode = historical_mode;
1855        self.core.handler = handler;
1856    }
1857
1858    fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1859        self.set_historical_events_internal(events);
1860    }
1861
1862    fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1863        self.set_clock_internal(clock);
1864    }
1865
1866    fn build_bar(&mut self, event: TimeEvent) {
1867        // Delegate to the implementation method
1868        // We use the struct name here to disambiguate from the trait method
1869        {
1870            #[allow(clippy::use_self)]
1871            TimeBarAggregator::build_bar(self, event);
1872        }
1873    }
1874
1875    fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1876        self.aggregator_weak = Some(weak);
1877    }
1878
1879    fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1880        self.start_timer_internal(aggregator_rc);
1881    }
1882}
1883
1884#[cfg(test)]
1885mod tests {
1886    use std::sync::{Arc, Mutex};
1887
1888    use nautilus_common::clock::TestClock;
1889    use nautilus_core::{MUTEX_POISONED, UUID4};
1890    use nautilus_model::{
1891        data::{BarSpecification, BarType},
1892        enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1893        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1894        types::{Price, Quantity},
1895    };
1896    use rstest::rstest;
1897    use ustr::Ustr;
1898
1899    use super::*;
1900
1901    #[rstest]
1902    fn test_bar_builder_initialization(equity_aapl: Equity) {
1903        let instrument = InstrumentAny::Equity(equity_aapl);
1904        let bar_type = BarType::new(
1905            instrument.id(),
1906            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1907            AggregationSource::Internal,
1908        );
1909        let builder = BarBuilder::new(
1910            bar_type,
1911            instrument.price_precision(),
1912            instrument.size_precision(),
1913        );
1914
1915        assert!(!builder.initialized);
1916        assert_eq!(builder.ts_last, 0);
1917        assert_eq!(builder.count, 0);
1918    }
1919
1920    #[rstest]
1921    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1922        let instrument = InstrumentAny::Equity(equity_aapl);
1923        let bar_type = BarType::new(
1924            instrument.id(),
1925            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1926            AggregationSource::Internal,
1927        );
1928        let mut builder = BarBuilder::new(
1929            bar_type,
1930            instrument.price_precision(),
1931            instrument.size_precision(),
1932        );
1933
1934        builder.update(
1935            Price::from("100.00"),
1936            Quantity::from(1),
1937            UnixNanos::from(1000),
1938        );
1939        builder.update(
1940            Price::from("95.00"),
1941            Quantity::from(1),
1942            UnixNanos::from(2000),
1943        );
1944        builder.update(
1945            Price::from("105.00"),
1946            Quantity::from(1),
1947            UnixNanos::from(3000),
1948        );
1949
1950        let bar = builder.build_now();
1951        assert!(bar.high > bar.low);
1952        assert_eq!(bar.open, Price::from("100.00"));
1953        assert_eq!(bar.high, Price::from("105.00"));
1954        assert_eq!(bar.low, Price::from("95.00"));
1955        assert_eq!(bar.close, Price::from("105.00"));
1956    }
1957
1958    #[rstest]
1959    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1960        let instrument = InstrumentAny::Equity(equity_aapl);
1961        let bar_type = BarType::new(
1962            instrument.id(),
1963            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1964            AggregationSource::Internal,
1965        );
1966        let mut builder = BarBuilder::new(
1967            bar_type,
1968            instrument.price_precision(),
1969            instrument.size_precision(),
1970        );
1971
1972        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1973        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1974
1975        assert_eq!(builder.ts_last, 1_000);
1976        assert_eq!(builder.count, 1);
1977    }
1978
1979    #[rstest]
1980    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1981        let instrument = InstrumentAny::Equity(equity_aapl);
1982        let bar_type = BarType::new(
1983            instrument.id(),
1984            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1985            AggregationSource::Internal,
1986        );
1987        let mut builder = BarBuilder::new(
1988            bar_type,
1989            instrument.price_precision(),
1990            instrument.size_precision(),
1991        );
1992
1993        builder.update(
1994            Price::from("1.00000"),
1995            Quantity::from(1),
1996            UnixNanos::default(),
1997        );
1998
1999        assert!(builder.initialized);
2000        assert_eq!(builder.ts_last, 0);
2001        assert_eq!(builder.count, 1);
2002    }
2003
2004    #[rstest]
2005    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2006        equity_aapl: Equity,
2007    ) {
2008        let instrument = InstrumentAny::Equity(equity_aapl);
2009        let bar_type = BarType::new(
2010            instrument.id(),
2011            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2012            AggregationSource::Internal,
2013        );
2014        let mut builder = BarBuilder::new(bar_type, 2, 0);
2015
2016        builder.update(
2017            Price::from("1.00000"),
2018            Quantity::from(1),
2019            UnixNanos::from(1_000),
2020        );
2021        builder.update(
2022            Price::from("1.00001"),
2023            Quantity::from(1),
2024            UnixNanos::from(500),
2025        );
2026
2027        assert!(builder.initialized);
2028        assert_eq!(builder.ts_last, 1_000);
2029        assert_eq!(builder.count, 1);
2030    }
2031
2032    #[rstest]
2033    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2034        let instrument = InstrumentAny::Equity(equity_aapl);
2035        let bar_type = BarType::new(
2036            instrument.id(),
2037            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2038            AggregationSource::Internal,
2039        );
2040        let mut builder = BarBuilder::new(
2041            bar_type,
2042            instrument.price_precision(),
2043            instrument.size_precision(),
2044        );
2045
2046        for _ in 0..5 {
2047            builder.update(
2048                Price::from("1.00000"),
2049                Quantity::from(1),
2050                UnixNanos::from(1_000),
2051            );
2052        }
2053
2054        assert_eq!(builder.count, 5);
2055    }
2056
2057    #[rstest]
2058    #[should_panic]
2059    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2060        let instrument = InstrumentAny::Equity(equity_aapl);
2061        let bar_type = BarType::new(
2062            instrument.id(),
2063            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2064            AggregationSource::Internal,
2065        );
2066        let mut builder = BarBuilder::new(
2067            bar_type,
2068            instrument.price_precision(),
2069            instrument.size_precision(),
2070        );
2071        let _ = builder.build_now();
2072    }
2073
2074    #[rstest]
2075    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2076        let instrument = InstrumentAny::Equity(equity_aapl);
2077        let bar_type = BarType::new(
2078            instrument.id(),
2079            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2080            AggregationSource::Internal,
2081        );
2082        let mut builder = BarBuilder::new(
2083            bar_type,
2084            instrument.price_precision(),
2085            instrument.size_precision(),
2086        );
2087
2088        builder.update(
2089            Price::from("1.00001"),
2090            Quantity::from(2),
2091            UnixNanos::default(),
2092        );
2093        builder.update(
2094            Price::from("1.00002"),
2095            Quantity::from(2),
2096            UnixNanos::default(),
2097        );
2098        builder.update(
2099            Price::from("1.00000"),
2100            Quantity::from(1),
2101            UnixNanos::from(1_000_000_000),
2102        );
2103
2104        let bar = builder.build_now();
2105
2106        assert_eq!(bar.open, Price::from("1.00001"));
2107        assert_eq!(bar.high, Price::from("1.00002"));
2108        assert_eq!(bar.low, Price::from("1.00000"));
2109        assert_eq!(bar.close, Price::from("1.00000"));
2110        assert_eq!(bar.volume, Quantity::from(5));
2111        assert_eq!(bar.ts_init, 1_000_000_000);
2112        assert_eq!(builder.ts_last, 1_000_000_000);
2113        assert_eq!(builder.count, 0);
2114    }
2115
2116    #[rstest]
2117    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2118        let instrument = InstrumentAny::Equity(equity_aapl);
2119        let bar_type = BarType::new(
2120            instrument.id(),
2121            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2122            AggregationSource::Internal,
2123        );
2124        let mut builder = BarBuilder::new(bar_type, 2, 0);
2125
2126        builder.update(
2127            Price::from("1.00001"),
2128            Quantity::from(1),
2129            UnixNanos::default(),
2130        );
2131        builder.build_now();
2132
2133        builder.update(
2134            Price::from("1.00000"),
2135            Quantity::from(1),
2136            UnixNanos::default(),
2137        );
2138        builder.update(
2139            Price::from("1.00003"),
2140            Quantity::from(1),
2141            UnixNanos::default(),
2142        );
2143        builder.update(
2144            Price::from("1.00002"),
2145            Quantity::from(1),
2146            UnixNanos::default(),
2147        );
2148
2149        let bar = builder.build_now();
2150
2151        assert_eq!(bar.open, Price::from("1.00000"));
2152        assert_eq!(bar.high, Price::from("1.00003"));
2153        assert_eq!(bar.low, Price::from("1.00000"));
2154        assert_eq!(bar.close, Price::from("1.00002"));
2155        assert_eq!(bar.volume, Quantity::from(3));
2156    }
2157
2158    #[rstest]
2159    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2160        let instrument = InstrumentAny::Equity(equity_aapl);
2161        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2162        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2163        let handler = Arc::new(Mutex::new(Vec::new()));
2164        let handler_clone = Arc::clone(&handler);
2165
2166        let mut aggregator = TickBarAggregator::new(
2167            bar_type,
2168            instrument.price_precision(),
2169            instrument.size_precision(),
2170            move |bar: Bar| {
2171                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2172                handler_guard.push(bar);
2173            },
2174        );
2175
2176        let trade = TradeTick::default();
2177        aggregator.handle_trade(trade);
2178
2179        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2180        assert_eq!(handler_guard.len(), 0);
2181    }
2182
2183    #[rstest]
2184    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2185        let instrument = InstrumentAny::Equity(equity_aapl);
2186        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2187        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2188        let handler = Arc::new(Mutex::new(Vec::new()));
2189        let handler_clone = Arc::clone(&handler);
2190
2191        let mut aggregator = TickBarAggregator::new(
2192            bar_type,
2193            instrument.price_precision(),
2194            instrument.size_precision(),
2195            move |bar: Bar| {
2196                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2197                handler_guard.push(bar);
2198            },
2199        );
2200
2201        let trade = TradeTick::default();
2202        aggregator.handle_trade(trade);
2203        aggregator.handle_trade(trade);
2204        aggregator.handle_trade(trade);
2205
2206        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2207        let bar = handler_guard.first().unwrap();
2208        assert_eq!(handler_guard.len(), 1);
2209        assert_eq!(bar.open, trade.price);
2210        assert_eq!(bar.high, trade.price);
2211        assert_eq!(bar.low, trade.price);
2212        assert_eq!(bar.close, trade.price);
2213        assert_eq!(bar.volume, Quantity::from(300000));
2214        assert_eq!(bar.ts_event, trade.ts_event);
2215        assert_eq!(bar.ts_init, trade.ts_init);
2216    }
2217
2218    #[rstest]
2219    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2220        let instrument = InstrumentAny::Equity(equity_aapl);
2221        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2222        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2223        let handler = Arc::new(Mutex::new(Vec::new()));
2224        let handler_clone = Arc::clone(&handler);
2225
2226        let mut aggregator = TickBarAggregator::new(
2227            bar_type,
2228            instrument.price_precision(),
2229            instrument.size_precision(),
2230            move |bar: Bar| {
2231                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2232                handler_guard.push(bar);
2233            },
2234        );
2235
2236        aggregator.update(
2237            Price::from("1.00001"),
2238            Quantity::from(1),
2239            UnixNanos::default(),
2240        );
2241        aggregator.update(
2242            Price::from("1.00002"),
2243            Quantity::from(1),
2244            UnixNanos::from(1000),
2245        );
2246        aggregator.update(
2247            Price::from("1.00003"),
2248            Quantity::from(1),
2249            UnixNanos::from(2000),
2250        );
2251
2252        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2253        assert_eq!(handler_guard.len(), 1);
2254
2255        let bar = handler_guard.first().unwrap();
2256        assert_eq!(bar.open, Price::from("1.00001"));
2257        assert_eq!(bar.high, Price::from("1.00003"));
2258        assert_eq!(bar.low, Price::from("1.00001"));
2259        assert_eq!(bar.close, Price::from("1.00003"));
2260        assert_eq!(bar.volume, Quantity::from(3));
2261    }
2262
2263    #[rstest]
2264    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2265        let instrument = InstrumentAny::Equity(equity_aapl);
2266        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2267        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2268        let handler = Arc::new(Mutex::new(Vec::new()));
2269        let handler_clone = Arc::clone(&handler);
2270
2271        let mut aggregator = TickBarAggregator::new(
2272            bar_type,
2273            instrument.price_precision(),
2274            instrument.size_precision(),
2275            move |bar: Bar| {
2276                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2277                handler_guard.push(bar);
2278            },
2279        );
2280
2281        aggregator.update(
2282            Price::from("1.00001"),
2283            Quantity::from(1),
2284            UnixNanos::default(),
2285        );
2286        aggregator.update(
2287            Price::from("1.00002"),
2288            Quantity::from(1),
2289            UnixNanos::from(1000),
2290        );
2291        aggregator.update(
2292            Price::from("1.00003"),
2293            Quantity::from(1),
2294            UnixNanos::from(2000),
2295        );
2296        aggregator.update(
2297            Price::from("1.00004"),
2298            Quantity::from(1),
2299            UnixNanos::from(3000),
2300        );
2301
2302        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2303        assert_eq!(handler_guard.len(), 2);
2304
2305        let bar1 = &handler_guard[0];
2306        assert_eq!(bar1.open, Price::from("1.00001"));
2307        assert_eq!(bar1.close, Price::from("1.00002"));
2308        assert_eq!(bar1.volume, Quantity::from(2));
2309
2310        let bar2 = &handler_guard[1];
2311        assert_eq!(bar2.open, Price::from("1.00003"));
2312        assert_eq!(bar2.close, Price::from("1.00004"));
2313        assert_eq!(bar2.volume, Quantity::from(2));
2314    }
2315
2316    #[rstest]
2317    fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2318        let instrument = InstrumentAny::Equity(equity_aapl);
2319        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2320        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2321        let handler = Arc::new(Mutex::new(Vec::new()));
2322        let handler_clone = Arc::clone(&handler);
2323
2324        let mut aggregator = TickImbalanceBarAggregator::new(
2325            bar_type,
2326            instrument.price_precision(),
2327            instrument.size_precision(),
2328            move |bar: Bar| {
2329                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2330                handler_guard.push(bar);
2331            },
2332        );
2333
2334        let trade = TradeTick::default();
2335        aggregator.handle_trade(trade);
2336        aggregator.handle_trade(trade);
2337
2338        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2339        assert_eq!(handler_guard.len(), 1);
2340        let bar = handler_guard.first().unwrap();
2341        assert_eq!(bar.volume, Quantity::from(200000));
2342    }
2343
2344    #[rstest]
2345    fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2346        let instrument = InstrumentAny::Equity(equity_aapl);
2347        let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2348        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2349        let handler = Arc::new(Mutex::new(Vec::new()));
2350        let handler_clone = Arc::clone(&handler);
2351
2352        let mut aggregator = TickImbalanceBarAggregator::new(
2353            bar_type,
2354            instrument.price_precision(),
2355            instrument.size_precision(),
2356            move |bar: Bar| {
2357                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2358                handler_guard.push(bar);
2359            },
2360        );
2361
2362        let sell = TradeTick {
2363            aggressor_side: AggressorSide::Seller,
2364            ..TradeTick::default()
2365        };
2366
2367        aggregator.handle_trade(sell);
2368
2369        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2370        assert_eq!(handler_guard.len(), 1);
2371    }
2372
2373    #[rstest]
2374    fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2375        let instrument = InstrumentAny::Equity(equity_aapl);
2376        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2377        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2378        let handler = Arc::new(Mutex::new(Vec::new()));
2379        let handler_clone = Arc::clone(&handler);
2380
2381        let mut aggregator = TickRunsBarAggregator::new(
2382            bar_type,
2383            instrument.price_precision(),
2384            instrument.size_precision(),
2385            move |bar: Bar| {
2386                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2387                handler_guard.push(bar);
2388            },
2389        );
2390
2391        let buy = TradeTick::default();
2392        let sell = TradeTick {
2393            aggressor_side: AggressorSide::Seller,
2394            ..buy
2395        };
2396
2397        aggregator.handle_trade(buy);
2398        aggregator.handle_trade(buy);
2399        aggregator.handle_trade(sell);
2400        aggregator.handle_trade(sell);
2401
2402        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2403        assert_eq!(handler_guard.len(), 2);
2404    }
2405
2406    #[rstest]
2407    fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2408        let instrument = InstrumentAny::Equity(equity_aapl);
2409        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2410        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2411        let handler = Arc::new(Mutex::new(Vec::new()));
2412        let handler_clone = Arc::clone(&handler);
2413
2414        let mut aggregator = TickRunsBarAggregator::new(
2415            bar_type,
2416            instrument.price_precision(),
2417            instrument.size_precision(),
2418            move |bar: Bar| {
2419                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2420                handler_guard.push(bar);
2421            },
2422        );
2423
2424        let buy = TradeTick {
2425            size: Quantity::from(1),
2426            ..TradeTick::default()
2427        };
2428        let sell = TradeTick {
2429            aggressor_side: AggressorSide::Seller,
2430            size: Quantity::from(1),
2431            ..buy
2432        };
2433
2434        aggregator.handle_trade(buy);
2435        aggregator.handle_trade(buy);
2436        aggregator.handle_trade(sell);
2437        aggregator.handle_trade(sell);
2438
2439        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2440        assert_eq!(handler_guard.len(), 2);
2441        assert_eq!(handler_guard[0].volume, Quantity::from(2));
2442        assert_eq!(handler_guard[1].volume, Quantity::from(2));
2443    }
2444
2445    #[rstest]
2446    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2447        let instrument = InstrumentAny::Equity(equity_aapl);
2448        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2449        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2450        let handler = Arc::new(Mutex::new(Vec::new()));
2451        let handler_clone = Arc::clone(&handler);
2452
2453        let mut aggregator = VolumeBarAggregator::new(
2454            bar_type,
2455            instrument.price_precision(),
2456            instrument.size_precision(),
2457            move |bar: Bar| {
2458                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2459                handler_guard.push(bar);
2460            },
2461        );
2462
2463        aggregator.update(
2464            Price::from("1.00001"),
2465            Quantity::from(25),
2466            UnixNanos::default(),
2467        );
2468
2469        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2470        assert_eq!(handler_guard.len(), 2);
2471        let bar1 = &handler_guard[0];
2472        assert_eq!(bar1.volume, Quantity::from(10));
2473        let bar2 = &handler_guard[1];
2474        assert_eq!(bar2.volume, Quantity::from(10));
2475    }
2476
2477    #[rstest]
2478    fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2479        let instrument = InstrumentAny::Equity(equity_aapl);
2480        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2481        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2482        let handler = Arc::new(Mutex::new(Vec::new()));
2483        let handler_clone = Arc::clone(&handler);
2484
2485        let mut aggregator = VolumeRunsBarAggregator::new(
2486            bar_type,
2487            instrument.price_precision(),
2488            instrument.size_precision(),
2489            move |bar: Bar| {
2490                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2491                handler_guard.push(bar);
2492            },
2493        );
2494
2495        let buy = TradeTick {
2496            instrument_id: instrument.id(),
2497            price: Price::from("1.0"),
2498            size: Quantity::from(1),
2499            ..TradeTick::default()
2500        };
2501        let sell = TradeTick {
2502            aggressor_side: AggressorSide::Seller,
2503            ..buy
2504        };
2505
2506        aggregator.handle_trade(buy);
2507        aggregator.handle_trade(buy); // emit first bar at 2
2508        aggregator.handle_trade(sell);
2509        aggregator.handle_trade(sell); // emit second bar at 2 sell-side
2510
2511        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2512        assert!(handler_guard.len() >= 2);
2513        assert!(
2514            (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2515                < f64::EPSILON
2516        );
2517    }
2518
2519    #[rstest]
2520    fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2521        let instrument = InstrumentAny::Equity(equity_aapl);
2522        let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2523        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2524        let handler = Arc::new(Mutex::new(Vec::new()));
2525        let handler_clone = Arc::clone(&handler);
2526
2527        let mut aggregator = VolumeRunsBarAggregator::new(
2528            bar_type,
2529            instrument.price_precision(),
2530            instrument.size_precision(),
2531            move |bar: Bar| {
2532                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2533                handler_guard.push(bar);
2534            },
2535        );
2536
2537        let trade = TradeTick {
2538            instrument_id: instrument.id(),
2539            price: Price::from("1.0"),
2540            size: Quantity::from(5),
2541            ..TradeTick::default()
2542        };
2543
2544        aggregator.handle_trade(trade);
2545
2546        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2547        assert!(!handler_guard.is_empty());
2548        assert!(handler_guard[0].volume.as_f64() > 0.0);
2549        assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2550    }
2551
2552    #[rstest]
2553    fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2554        let instrument = InstrumentAny::Equity(equity_aapl);
2555        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2556        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2557        let handler = Arc::new(Mutex::new(Vec::new()));
2558        let handler_clone = Arc::clone(&handler);
2559
2560        let mut aggregator = VolumeImbalanceBarAggregator::new(
2561            bar_type,
2562            instrument.price_precision(),
2563            instrument.size_precision(),
2564            move |bar: Bar| {
2565                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2566                handler_guard.push(bar);
2567            },
2568        );
2569
2570        let trade_small = TradeTick {
2571            instrument_id: instrument.id(),
2572            price: Price::from("1.0"),
2573            size: Quantity::from(1),
2574            ..TradeTick::default()
2575        };
2576        let trade_large = TradeTick {
2577            size: Quantity::from(3),
2578            ..trade_small
2579        };
2580
2581        aggregator.handle_trade(trade_small);
2582        aggregator.handle_trade(trade_large);
2583
2584        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2585        assert_eq!(handler_guard.len(), 2);
2586        let total_output = handler_guard
2587            .iter()
2588            .map(|bar| bar.volume.as_f64())
2589            .sum::<f64>();
2590        let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2591        assert!((total_output - total_input).abs() < f64::EPSILON);
2592    }
2593
2594    #[rstest]
2595    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2596        let instrument = InstrumentAny::Equity(equity_aapl);
2597        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
2598        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2599        let handler = Arc::new(Mutex::new(Vec::new()));
2600        let handler_clone = Arc::clone(&handler);
2601
2602        let mut aggregator = ValueBarAggregator::new(
2603            bar_type,
2604            instrument.price_precision(),
2605            instrument.size_precision(),
2606            move |bar: Bar| {
2607                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2608                handler_guard.push(bar);
2609            },
2610        );
2611
2612        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
2613        aggregator.update(
2614            Price::from("100.00"),
2615            Quantity::from(5),
2616            UnixNanos::default(),
2617        );
2618        aggregator.update(
2619            Price::from("100.00"),
2620            Quantity::from(5),
2621            UnixNanos::from(1000),
2622        );
2623
2624        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2625        assert_eq!(handler_guard.len(), 1);
2626        let bar = handler_guard.first().unwrap();
2627        assert_eq!(bar.volume, Quantity::from(10));
2628    }
2629
2630    #[rstest]
2631    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2632        let instrument = InstrumentAny::Equity(equity_aapl);
2633        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2634        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2635        let handler = Arc::new(Mutex::new(Vec::new()));
2636        let handler_clone = Arc::clone(&handler);
2637
2638        let mut aggregator = ValueBarAggregator::new(
2639            bar_type,
2640            instrument.price_precision(),
2641            instrument.size_precision(),
2642            move |bar: Bar| {
2643                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2644                handler_guard.push(bar);
2645            },
2646        );
2647
2648        // Single large update: $100 * 25 = $2500 (should create 2 bars)
2649        aggregator.update(
2650            Price::from("100.00"),
2651            Quantity::from(25),
2652            UnixNanos::default(),
2653        );
2654
2655        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2656        assert_eq!(handler_guard.len(), 2);
2657        let remaining_value = aggregator.get_cumulative_value();
2658        assert!(remaining_value < 1000.0); // Should be less than threshold
2659    }
2660
2661    #[rstest]
2662    fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2663        let instrument = InstrumentAny::Equity(equity_aapl);
2664        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2665        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2666        let handler = Arc::new(Mutex::new(Vec::new()));
2667        let handler_clone = Arc::clone(&handler);
2668
2669        let mut aggregator = ValueBarAggregator::new(
2670            bar_type,
2671            instrument.price_precision(),
2672            instrument.size_precision(),
2673            move |bar: Bar| {
2674                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2675                handler_guard.push(bar);
2676            },
2677        );
2678
2679        // Update with zero price should not cause division by zero
2680        aggregator.update(
2681            Price::from("0.00"),
2682            Quantity::from(100),
2683            UnixNanos::default(),
2684        );
2685
2686        // No bars should be emitted since value is zero
2687        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2688        assert_eq!(handler_guard.len(), 0);
2689
2690        // Cumulative value should remain zero
2691        assert_eq!(aggregator.get_cumulative_value(), 0.0);
2692    }
2693
2694    #[rstest]
2695    fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2696        let instrument = InstrumentAny::Equity(equity_aapl);
2697        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2698        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2699        let handler = Arc::new(Mutex::new(Vec::new()));
2700        let handler_clone = Arc::clone(&handler);
2701
2702        let mut aggregator = ValueBarAggregator::new(
2703            bar_type,
2704            instrument.price_precision(),
2705            instrument.size_precision(),
2706            move |bar: Bar| {
2707                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2708                handler_guard.push(bar);
2709            },
2710        );
2711
2712        // Update with zero size should not cause issues
2713        aggregator.update(
2714            Price::from("100.00"),
2715            Quantity::from(0),
2716            UnixNanos::default(),
2717        );
2718
2719        // No bars should be emitted
2720        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2721        assert_eq!(handler_guard.len(), 0);
2722
2723        // Cumulative value should remain zero
2724        assert_eq!(aggregator.get_cumulative_value(), 0.0);
2725    }
2726
2727    #[rstest]
2728    fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2729        let instrument = InstrumentAny::Equity(equity_aapl);
2730        let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2731        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2732        let handler = Arc::new(Mutex::new(Vec::new()));
2733        let handler_clone = Arc::clone(&handler);
2734
2735        let mut aggregator = ValueImbalanceBarAggregator::new(
2736            bar_type,
2737            instrument.price_precision(),
2738            instrument.size_precision(),
2739            move |bar: Bar| {
2740                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2741                handler_guard.push(bar);
2742            },
2743        );
2744
2745        let buy = TradeTick {
2746            price: Price::from("5.0"),
2747            size: Quantity::from(2), // value 10, should emit one bar
2748            instrument_id: instrument.id(),
2749            ..TradeTick::default()
2750        };
2751        let sell = TradeTick {
2752            price: Price::from("5.0"),
2753            size: Quantity::from(2), // value 10, should emit another bar
2754            aggressor_side: AggressorSide::Seller,
2755            instrument_id: instrument.id(),
2756            ..buy
2757        };
2758
2759        aggregator.handle_trade(buy);
2760        aggregator.handle_trade(sell);
2761
2762        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2763        assert!(handler_guard.is_empty());
2764    }
2765
2766    #[rstest]
2767    fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2768        let instrument = InstrumentAny::Equity(equity_aapl);
2769        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2770        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2771        let handler = Arc::new(Mutex::new(Vec::new()));
2772        let handler_clone = Arc::clone(&handler);
2773
2774        let mut aggregator = ValueRunsBarAggregator::new(
2775            bar_type,
2776            instrument.price_precision(),
2777            instrument.size_precision(),
2778            move |bar: Bar| {
2779                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2780                handler_guard.push(bar);
2781            },
2782        );
2783
2784        let trade = TradeTick {
2785            price: Price::from("10.0"),
2786            size: Quantity::from(5),
2787            instrument_id: instrument.id(),
2788            ..TradeTick::default()
2789        };
2790
2791        aggregator.handle_trade(trade);
2792        aggregator.handle_trade(trade);
2793
2794        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2795        assert_eq!(handler_guard.len(), 1);
2796        let bar = handler_guard.first().unwrap();
2797        assert_eq!(bar.volume, Quantity::from(10));
2798    }
2799
2800    #[rstest]
2801    fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2802        let instrument = InstrumentAny::Equity(equity_aapl);
2803        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2804        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2805        let handler = Arc::new(Mutex::new(Vec::new()));
2806        let handler_clone = Arc::clone(&handler);
2807
2808        let mut aggregator = ValueRunsBarAggregator::new(
2809            bar_type,
2810            instrument.price_precision(),
2811            instrument.size_precision(),
2812            move |bar: Bar| {
2813                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2814                handler_guard.push(bar);
2815            },
2816        );
2817
2818        let buy = TradeTick {
2819            price: Price::from("10.0"),
2820            size: Quantity::from(5),
2821            instrument_id: instrument.id(),
2822            ..TradeTick::default()
2823        }; // value 50
2824        let sell = TradeTick {
2825            price: Price::from("10.0"),
2826            size: Quantity::from(10),
2827            aggressor_side: AggressorSide::Seller,
2828            ..buy
2829        }; // value 100
2830
2831        aggregator.handle_trade(buy);
2832        aggregator.handle_trade(sell);
2833
2834        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2835        assert_eq!(handler_guard.len(), 1);
2836        assert_eq!(handler_guard[0].volume, Quantity::from(10));
2837    }
2838
2839    #[rstest]
2840    fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2841        let instrument = InstrumentAny::Equity(equity_aapl);
2842        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2843        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2844        let handler = Arc::new(Mutex::new(Vec::new()));
2845        let handler_clone = Arc::clone(&handler);
2846
2847        let mut aggregator = TickRunsBarAggregator::new(
2848            bar_type,
2849            instrument.price_precision(),
2850            instrument.size_precision(),
2851            move |bar: Bar| {
2852                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2853                handler_guard.push(bar);
2854            },
2855        );
2856
2857        let buy = TradeTick::default();
2858
2859        aggregator.handle_trade(buy);
2860        aggregator.handle_trade(buy); // Emit bar 1 (run complete)
2861        aggregator.handle_trade(buy); // Start new run
2862        aggregator.handle_trade(buy); // Emit bar 2 (new run complete)
2863
2864        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2865        assert_eq!(handler_guard.len(), 2);
2866    }
2867
2868    #[rstest]
2869    fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2870        let instrument = InstrumentAny::Equity(equity_aapl);
2871        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2872        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2873        let handler = Arc::new(Mutex::new(Vec::new()));
2874        let handler_clone = Arc::clone(&handler);
2875
2876        let mut aggregator = TickRunsBarAggregator::new(
2877            bar_type,
2878            instrument.price_precision(),
2879            instrument.size_precision(),
2880            move |bar: Bar| {
2881                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2882                handler_guard.push(bar);
2883            },
2884        );
2885
2886        let buy = TradeTick::default();
2887        let no_aggressor = TradeTick {
2888            aggressor_side: AggressorSide::NoAggressor,
2889            ..buy
2890        };
2891
2892        aggregator.handle_trade(buy);
2893        aggregator.handle_trade(no_aggressor); // Should not affect run count
2894        aggregator.handle_trade(no_aggressor); // Should not affect run count
2895        aggregator.handle_trade(buy); // Continue run to threshold
2896
2897        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2898        assert_eq!(handler_guard.len(), 1);
2899    }
2900
2901    #[rstest]
2902    fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2903        let instrument = InstrumentAny::Equity(equity_aapl);
2904        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2905        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2906        let handler = Arc::new(Mutex::new(Vec::new()));
2907        let handler_clone = Arc::clone(&handler);
2908
2909        let mut aggregator = VolumeRunsBarAggregator::new(
2910            bar_type,
2911            instrument.price_precision(),
2912            instrument.size_precision(),
2913            move |bar: Bar| {
2914                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2915                handler_guard.push(bar);
2916            },
2917        );
2918
2919        let buy = TradeTick {
2920            instrument_id: instrument.id(),
2921            price: Price::from("1.0"),
2922            size: Quantity::from(1),
2923            ..TradeTick::default()
2924        };
2925
2926        aggregator.handle_trade(buy);
2927        aggregator.handle_trade(buy); // Emit bar 1 (2.0 volume reached)
2928        aggregator.handle_trade(buy); // Start new run
2929        aggregator.handle_trade(buy); // Emit bar 2 (new 2.0 volume reached)
2930
2931        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2932        assert_eq!(handler_guard.len(), 2);
2933        assert_eq!(handler_guard[0].volume, Quantity::from(2));
2934        assert_eq!(handler_guard[1].volume, Quantity::from(2));
2935    }
2936
2937    #[rstest]
2938    fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2939        let instrument = InstrumentAny::Equity(equity_aapl);
2940        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2941        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2942        let handler = Arc::new(Mutex::new(Vec::new()));
2943        let handler_clone = Arc::clone(&handler);
2944
2945        let mut aggregator = ValueRunsBarAggregator::new(
2946            bar_type,
2947            instrument.price_precision(),
2948            instrument.size_precision(),
2949            move |bar: Bar| {
2950                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2951                handler_guard.push(bar);
2952            },
2953        );
2954
2955        let buy = TradeTick {
2956            instrument_id: instrument.id(),
2957            price: Price::from("10.0"),
2958            size: Quantity::from(5),
2959            ..TradeTick::default()
2960        }; // value 50 per trade
2961
2962        aggregator.handle_trade(buy);
2963        aggregator.handle_trade(buy); // Emit bar 1 (100 value reached)
2964        aggregator.handle_trade(buy); // Start new run
2965        aggregator.handle_trade(buy); // Emit bar 2 (new 100 value reached)
2966
2967        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2968        assert_eq!(handler_guard.len(), 2);
2969        assert_eq!(handler_guard[0].volume, Quantity::from(10));
2970        assert_eq!(handler_guard[1].volume, Quantity::from(10));
2971    }
2972
2973    #[rstest]
2974    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
2975        let instrument = InstrumentAny::Equity(equity_aapl);
2976        // One second bars
2977        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2978        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2979        let handler = Arc::new(Mutex::new(Vec::new()));
2980        let handler_clone = Arc::clone(&handler);
2981        let clock = Rc::new(RefCell::new(TestClock::new()));
2982
2983        let mut aggregator = TimeBarAggregator::new(
2984            bar_type,
2985            instrument.price_precision(),
2986            instrument.size_precision(),
2987            clock.clone(),
2988            move |bar: Bar| {
2989                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2990                handler_guard.push(bar);
2991            },
2992            true,  // build_with_no_updates
2993            false, // timestamp_on_close
2994            BarIntervalType::LeftOpen,
2995            None,  // time_bars_origin_offset
2996            15,    // bar_build_delay
2997            false, // skip_first_non_full_bar
2998        );
2999
3000        aggregator.update(
3001            Price::from("100.00"),
3002            Quantity::from(1),
3003            UnixNanos::default(),
3004        );
3005
3006        let next_sec = UnixNanos::from(1_000_000_000);
3007        clock.borrow_mut().set_time(next_sec);
3008
3009        let event = TimeEvent::new(
3010            Ustr::from("1-SECOND-LAST"),
3011            UUID4::new(),
3012            next_sec,
3013            next_sec,
3014        );
3015        aggregator.build_bar(event);
3016
3017        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3018        assert_eq!(handler_guard.len(), 1);
3019        let bar = handler_guard.first().unwrap();
3020        assert_eq!(bar.ts_event, UnixNanos::default());
3021        assert_eq!(bar.ts_init, next_sec);
3022    }
3023
3024    #[rstest]
3025    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3026        let instrument = InstrumentAny::Equity(equity_aapl);
3027        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3028        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3029        let handler = Arc::new(Mutex::new(Vec::new()));
3030        let handler_clone = Arc::clone(&handler);
3031        let clock = Rc::new(RefCell::new(TestClock::new()));
3032
3033        let mut aggregator = TimeBarAggregator::new(
3034            bar_type,
3035            instrument.price_precision(),
3036            instrument.size_precision(),
3037            clock.clone(),
3038            move |bar: Bar| {
3039                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3040                handler_guard.push(bar);
3041            },
3042            true, // build_with_no_updates
3043            true, // timestamp_on_close - changed to true to verify left-open behavior
3044            BarIntervalType::LeftOpen,
3045            None,
3046            15,
3047            false, // skip_first_non_full_bar
3048        );
3049
3050        // Update in first interval
3051        aggregator.update(
3052            Price::from("100.00"),
3053            Quantity::from(1),
3054            UnixNanos::default(),
3055        );
3056
3057        // First interval close
3058        let ts1 = UnixNanos::from(1_000_000_000);
3059        clock.borrow_mut().set_time(ts1);
3060        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3061        aggregator.build_bar(event);
3062
3063        // Update in second interval
3064        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3065
3066        // Second interval close
3067        let ts2 = UnixNanos::from(2_000_000_000);
3068        clock.borrow_mut().set_time(ts2);
3069        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3070        aggregator.build_bar(event);
3071
3072        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3073        assert_eq!(handler_guard.len(), 2);
3074
3075        let bar1 = &handler_guard[0];
3076        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
3077        assert_eq!(bar1.ts_init, ts1);
3078        assert_eq!(bar1.close, Price::from("100.00"));
3079        let bar2 = &handler_guard[1];
3080        assert_eq!(bar2.ts_event, ts2);
3081        assert_eq!(bar2.ts_init, ts2);
3082        assert_eq!(bar2.close, Price::from("101.00"));
3083    }
3084
3085    #[rstest]
3086    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3087        let instrument = InstrumentAny::Equity(equity_aapl);
3088        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3089        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3090        let handler = Arc::new(Mutex::new(Vec::new()));
3091        let handler_clone = Arc::clone(&handler);
3092        let clock = Rc::new(RefCell::new(TestClock::new()));
3093        let mut aggregator = TimeBarAggregator::new(
3094            bar_type,
3095            instrument.price_precision(),
3096            instrument.size_precision(),
3097            clock.clone(),
3098            move |bar: Bar| {
3099                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3100                handler_guard.push(bar);
3101            },
3102            true, // build_with_no_updates
3103            true, // timestamp_on_close
3104            BarIntervalType::RightOpen,
3105            None,
3106            15,
3107            false, // skip_first_non_full_bar
3108        );
3109
3110        // Update in first interval
3111        aggregator.update(
3112            Price::from("100.00"),
3113            Quantity::from(1),
3114            UnixNanos::default(),
3115        );
3116
3117        // First interval close
3118        let ts1 = UnixNanos::from(1_000_000_000);
3119        clock.borrow_mut().set_time(ts1);
3120        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3121        aggregator.build_bar(event);
3122
3123        // Update in second interval
3124        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3125
3126        // Second interval close
3127        let ts2 = UnixNanos::from(2_000_000_000);
3128        clock.borrow_mut().set_time(ts2);
3129        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3130        aggregator.build_bar(event);
3131
3132        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3133        assert_eq!(handler_guard.len(), 2);
3134
3135        let bar1 = &handler_guard[0];
3136        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
3137        assert_eq!(bar1.ts_init, ts1);
3138        assert_eq!(bar1.close, Price::from("100.00"));
3139
3140        let bar2 = &handler_guard[1];
3141        assert_eq!(bar2.ts_event, ts1);
3142        assert_eq!(bar2.ts_init, ts2);
3143        assert_eq!(bar2.close, Price::from("101.00"));
3144    }
3145
3146    #[rstest]
3147    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3148        let instrument = InstrumentAny::Equity(equity_aapl);
3149        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3150        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3151        let handler = Arc::new(Mutex::new(Vec::new()));
3152        let handler_clone = Arc::clone(&handler);
3153        let clock = Rc::new(RefCell::new(TestClock::new()));
3154
3155        // First test with build_with_no_updates = false
3156        let mut aggregator = TimeBarAggregator::new(
3157            bar_type,
3158            instrument.price_precision(),
3159            instrument.size_precision(),
3160            clock.clone(),
3161            move |bar: Bar| {
3162                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3163                handler_guard.push(bar);
3164            },
3165            false, // build_with_no_updates disabled
3166            true,  // timestamp_on_close
3167            BarIntervalType::LeftOpen,
3168            None,
3169            15,
3170            false, // skip_first_non_full_bar
3171        );
3172
3173        // No updates, just interval close
3174        let ts1 = UnixNanos::from(1_000_000_000);
3175        clock.borrow_mut().set_time(ts1);
3176        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3177        aggregator.build_bar(event);
3178
3179        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3180        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
3181        drop(handler_guard);
3182
3183        // Now test with build_with_no_updates = true
3184        let handler = Arc::new(Mutex::new(Vec::new()));
3185        let handler_clone = Arc::clone(&handler);
3186        let mut aggregator = TimeBarAggregator::new(
3187            bar_type,
3188            instrument.price_precision(),
3189            instrument.size_precision(),
3190            clock.clone(),
3191            move |bar: Bar| {
3192                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3193                handler_guard.push(bar);
3194            },
3195            true, // build_with_no_updates enabled
3196            true, // timestamp_on_close
3197            BarIntervalType::LeftOpen,
3198            None,
3199            15,
3200            false, // skip_first_non_full_bar
3201        );
3202
3203        aggregator.update(
3204            Price::from("100.00"),
3205            Quantity::from(1),
3206            UnixNanos::default(),
3207        );
3208
3209        // First interval with update
3210        let ts1 = UnixNanos::from(1_000_000_000);
3211        clock.borrow_mut().set_time(ts1);
3212        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3213        aggregator.build_bar(event);
3214
3215        // Second interval without updates
3216        let ts2 = UnixNanos::from(2_000_000_000);
3217        clock.borrow_mut().set_time(ts2);
3218        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3219        aggregator.build_bar(event);
3220
3221        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3222        assert_eq!(handler_guard.len(), 2); // Both bars should be built
3223        let bar1 = &handler_guard[0];
3224        assert_eq!(bar1.close, Price::from("100.00"));
3225        let bar2 = &handler_guard[1];
3226        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
3227    }
3228
3229    #[rstest]
3230    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3231        let instrument = InstrumentAny::Equity(equity_aapl);
3232        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3233        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3234        let clock = Rc::new(RefCell::new(TestClock::new()));
3235        let handler = Arc::new(Mutex::new(Vec::new()));
3236        let handler_clone = Arc::clone(&handler);
3237
3238        let mut aggregator = TimeBarAggregator::new(
3239            bar_type,
3240            instrument.price_precision(),
3241            instrument.size_precision(),
3242            clock.clone(),
3243            move |bar: Bar| {
3244                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3245                handler_guard.push(bar);
3246            },
3247            true, // build_with_no_updates
3248            true, // timestamp_on_close
3249            BarIntervalType::RightOpen,
3250            None,
3251            15,
3252            false, // skip_first_non_full_bar
3253        );
3254
3255        let ts1 = UnixNanos::from(1_000_000_000);
3256        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3257
3258        let ts2 = UnixNanos::from(2_000_000_000);
3259        clock.borrow_mut().set_time(ts2);
3260
3261        // Simulate timestamp on close
3262        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3263        aggregator.build_bar(event);
3264
3265        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3266        let bar = handler_guard.first().unwrap();
3267        assert_eq!(bar.ts_event, UnixNanos::default());
3268        assert_eq!(bar.ts_init, ts2);
3269    }
3270
3271    // ========================================================================
3272    // RenkoBarAggregator Tests
3273    // ========================================================================
3274
3275    #[rstest]
3276    fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3277        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3278        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3279        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3280        let handler = Arc::new(Mutex::new(Vec::new()));
3281        let handler_clone = Arc::clone(&handler);
3282
3283        let aggregator = RenkoBarAggregator::new(
3284            bar_type,
3285            instrument.price_precision(),
3286            instrument.size_precision(),
3287            instrument.price_increment(),
3288            move |bar: Bar| {
3289                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3290                handler_guard.push(bar);
3291            },
3292        );
3293
3294        assert_eq!(aggregator.bar_type(), bar_type);
3295        assert!(!aggregator.is_running());
3296        // 10 pips * price_increment.raw (depends on precision mode)
3297        let expected_brick_size = 10 * instrument.price_increment().raw;
3298        assert_eq!(aggregator.brick_size, expected_brick_size);
3299    }
3300
3301    #[rstest]
3302    fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3303        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3304        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3305        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3306        let handler = Arc::new(Mutex::new(Vec::new()));
3307        let handler_clone = Arc::clone(&handler);
3308
3309        let mut aggregator = RenkoBarAggregator::new(
3310            bar_type,
3311            instrument.price_precision(),
3312            instrument.size_precision(),
3313            instrument.price_increment(),
3314            move |bar: Bar| {
3315                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3316                handler_guard.push(bar);
3317            },
3318        );
3319
3320        // Small price movement (5 pips, less than 10 pip brick size)
3321        aggregator.update(
3322            Price::from("1.00000"),
3323            Quantity::from(1),
3324            UnixNanos::default(),
3325        );
3326        aggregator.update(
3327            Price::from("1.00005"),
3328            Quantity::from(1),
3329            UnixNanos::from(1000),
3330        );
3331
3332        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3333        assert_eq!(handler_guard.len(), 0); // No bar created yet
3334    }
3335
3336    #[rstest]
3337    fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3338        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3339        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3340        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3341        let handler = Arc::new(Mutex::new(Vec::new()));
3342        let handler_clone = Arc::clone(&handler);
3343
3344        let mut aggregator = RenkoBarAggregator::new(
3345            bar_type,
3346            instrument.price_precision(),
3347            instrument.size_precision(),
3348            instrument.price_increment(),
3349            move |bar: Bar| {
3350                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3351                handler_guard.push(bar);
3352            },
3353        );
3354
3355        // Price movement exceeding brick size (15 pips)
3356        aggregator.update(
3357            Price::from("1.00000"),
3358            Quantity::from(1),
3359            UnixNanos::default(),
3360        );
3361        aggregator.update(
3362            Price::from("1.00015"),
3363            Quantity::from(1),
3364            UnixNanos::from(1000),
3365        );
3366
3367        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3368        assert_eq!(handler_guard.len(), 1);
3369
3370        let bar = handler_guard.first().unwrap();
3371        assert_eq!(bar.open, Price::from("1.00000"));
3372        assert_eq!(bar.high, Price::from("1.00010"));
3373        assert_eq!(bar.low, Price::from("1.00000"));
3374        assert_eq!(bar.close, Price::from("1.00010"));
3375        assert_eq!(bar.volume, Quantity::from(2));
3376        assert_eq!(bar.ts_event, UnixNanos::from(1000));
3377        assert_eq!(bar.ts_init, UnixNanos::from(1000));
3378    }
3379
3380    #[rstest]
3381    fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3382        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3383        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3384        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3385        let handler = Arc::new(Mutex::new(Vec::new()));
3386        let handler_clone = Arc::clone(&handler);
3387
3388        let mut aggregator = RenkoBarAggregator::new(
3389            bar_type,
3390            instrument.price_precision(),
3391            instrument.size_precision(),
3392            instrument.price_increment(),
3393            move |bar: Bar| {
3394                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3395                handler_guard.push(bar);
3396            },
3397        );
3398
3399        // Large price movement creating multiple bricks (25 pips = 2 bricks)
3400        aggregator.update(
3401            Price::from("1.00000"),
3402            Quantity::from(1),
3403            UnixNanos::default(),
3404        );
3405        aggregator.update(
3406            Price::from("1.00025"),
3407            Quantity::from(1),
3408            UnixNanos::from(1000),
3409        );
3410
3411        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3412        assert_eq!(handler_guard.len(), 2);
3413
3414        let bar1 = &handler_guard[0];
3415        assert_eq!(bar1.open, Price::from("1.00000"));
3416        assert_eq!(bar1.high, Price::from("1.00010"));
3417        assert_eq!(bar1.low, Price::from("1.00000"));
3418        assert_eq!(bar1.close, Price::from("1.00010"));
3419
3420        let bar2 = &handler_guard[1];
3421        assert_eq!(bar2.open, Price::from("1.00010"));
3422        assert_eq!(bar2.high, Price::from("1.00020"));
3423        assert_eq!(bar2.low, Price::from("1.00010"));
3424        assert_eq!(bar2.close, Price::from("1.00020"));
3425    }
3426
3427    #[rstest]
3428    fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3429        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3430        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3431        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3432        let handler = Arc::new(Mutex::new(Vec::new()));
3433        let handler_clone = Arc::clone(&handler);
3434
3435        let mut aggregator = RenkoBarAggregator::new(
3436            bar_type,
3437            instrument.price_precision(),
3438            instrument.size_precision(),
3439            instrument.price_increment(),
3440            move |bar: Bar| {
3441                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3442                handler_guard.push(bar);
3443            },
3444        );
3445
3446        // Start at higher price and move down
3447        aggregator.update(
3448            Price::from("1.00020"),
3449            Quantity::from(1),
3450            UnixNanos::default(),
3451        );
3452        aggregator.update(
3453            Price::from("1.00005"),
3454            Quantity::from(1),
3455            UnixNanos::from(1000),
3456        );
3457
3458        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3459        assert_eq!(handler_guard.len(), 1);
3460
3461        let bar = handler_guard.first().unwrap();
3462        assert_eq!(bar.open, Price::from("1.00020"));
3463        assert_eq!(bar.high, Price::from("1.00020"));
3464        assert_eq!(bar.low, Price::from("1.00010"));
3465        assert_eq!(bar.close, Price::from("1.00010"));
3466        assert_eq!(bar.volume, Quantity::from(2));
3467    }
3468
3469    #[rstest]
3470    fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3471        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3472        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3473        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3474        let handler = Arc::new(Mutex::new(Vec::new()));
3475        let handler_clone = Arc::clone(&handler);
3476
3477        let mut aggregator = RenkoBarAggregator::new(
3478            bar_type,
3479            instrument.price_precision(),
3480            instrument.size_precision(),
3481            instrument.price_increment(),
3482            move |bar: Bar| {
3483                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3484                handler_guard.push(bar);
3485            },
3486        );
3487
3488        // Create a bar with small price movement (5 pips)
3489        let input_bar = Bar::new(
3490            BarType::new(
3491                instrument.id(),
3492                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3493                AggregationSource::Internal,
3494            ),
3495            Price::from("1.00000"),
3496            Price::from("1.00005"),
3497            Price::from("0.99995"),
3498            Price::from("1.00005"), // 5 pip move up (less than 10 pip brick)
3499            Quantity::from(100),
3500            UnixNanos::default(),
3501            UnixNanos::from(1000),
3502        );
3503
3504        aggregator.handle_bar(input_bar);
3505
3506        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3507        assert_eq!(handler_guard.len(), 0); // No bar created yet
3508    }
3509
3510    #[rstest]
3511    fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3512        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3513        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3514        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3515        let handler = Arc::new(Mutex::new(Vec::new()));
3516        let handler_clone = Arc::clone(&handler);
3517
3518        let mut aggregator = RenkoBarAggregator::new(
3519            bar_type,
3520            instrument.price_precision(),
3521            instrument.size_precision(),
3522            instrument.price_increment(),
3523            move |bar: Bar| {
3524                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3525                handler_guard.push(bar);
3526            },
3527        );
3528
3529        // First bar to establish baseline
3530        let bar1 = Bar::new(
3531            BarType::new(
3532                instrument.id(),
3533                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3534                AggregationSource::Internal,
3535            ),
3536            Price::from("1.00000"),
3537            Price::from("1.00005"),
3538            Price::from("0.99995"),
3539            Price::from("1.00000"),
3540            Quantity::from(100),
3541            UnixNanos::default(),
3542            UnixNanos::default(),
3543        );
3544
3545        // Second bar with price movement exceeding brick size (10 pips)
3546        let bar2 = Bar::new(
3547            BarType::new(
3548                instrument.id(),
3549                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3550                AggregationSource::Internal,
3551            ),
3552            Price::from("1.00000"),
3553            Price::from("1.00015"),
3554            Price::from("0.99995"),
3555            Price::from("1.00010"), // 10 pip move up (exactly 1 brick)
3556            Quantity::from(50),
3557            UnixNanos::from(60_000_000_000),
3558            UnixNanos::from(60_000_000_000),
3559        );
3560
3561        aggregator.handle_bar(bar1);
3562        aggregator.handle_bar(bar2);
3563
3564        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3565        assert_eq!(handler_guard.len(), 1);
3566
3567        let bar = handler_guard.first().unwrap();
3568        assert_eq!(bar.open, Price::from("1.00000"));
3569        assert_eq!(bar.high, Price::from("1.00010"));
3570        assert_eq!(bar.low, Price::from("1.00000"));
3571        assert_eq!(bar.close, Price::from("1.00010"));
3572        assert_eq!(bar.volume, Quantity::from(150));
3573    }
3574
3575    #[rstest]
3576    fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3577        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3578        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3579        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3580        let handler = Arc::new(Mutex::new(Vec::new()));
3581        let handler_clone = Arc::clone(&handler);
3582
3583        let mut aggregator = RenkoBarAggregator::new(
3584            bar_type,
3585            instrument.price_precision(),
3586            instrument.size_precision(),
3587            instrument.price_increment(),
3588            move |bar: Bar| {
3589                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3590                handler_guard.push(bar);
3591            },
3592        );
3593
3594        // First bar to establish baseline
3595        let bar1 = Bar::new(
3596            BarType::new(
3597                instrument.id(),
3598                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3599                AggregationSource::Internal,
3600            ),
3601            Price::from("1.00000"),
3602            Price::from("1.00005"),
3603            Price::from("0.99995"),
3604            Price::from("1.00000"),
3605            Quantity::from(100),
3606            UnixNanos::default(),
3607            UnixNanos::default(),
3608        );
3609
3610        // Second bar with large price movement (30 pips = 3 bricks)
3611        let bar2 = Bar::new(
3612            BarType::new(
3613                instrument.id(),
3614                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3615                AggregationSource::Internal,
3616            ),
3617            Price::from("1.00000"),
3618            Price::from("1.00035"),
3619            Price::from("0.99995"),
3620            Price::from("1.00030"), // 30 pip move up (exactly 3 bricks)
3621            Quantity::from(50),
3622            UnixNanos::from(60_000_000_000),
3623            UnixNanos::from(60_000_000_000),
3624        );
3625
3626        aggregator.handle_bar(bar1);
3627        aggregator.handle_bar(bar2);
3628
3629        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3630        assert_eq!(handler_guard.len(), 3);
3631
3632        let bar1 = &handler_guard[0];
3633        assert_eq!(bar1.open, Price::from("1.00000"));
3634        assert_eq!(bar1.close, Price::from("1.00010"));
3635
3636        let bar2 = &handler_guard[1];
3637        assert_eq!(bar2.open, Price::from("1.00010"));
3638        assert_eq!(bar2.close, Price::from("1.00020"));
3639
3640        let bar3 = &handler_guard[2];
3641        assert_eq!(bar3.open, Price::from("1.00020"));
3642        assert_eq!(bar3.close, Price::from("1.00030"));
3643    }
3644
3645    #[rstest]
3646    fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3647        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3648        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3649        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3650        let handler = Arc::new(Mutex::new(Vec::new()));
3651        let handler_clone = Arc::clone(&handler);
3652
3653        let mut aggregator = RenkoBarAggregator::new(
3654            bar_type,
3655            instrument.price_precision(),
3656            instrument.size_precision(),
3657            instrument.price_increment(),
3658            move |bar: Bar| {
3659                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3660                handler_guard.push(bar);
3661            },
3662        );
3663
3664        // First bar to establish baseline
3665        let bar1 = Bar::new(
3666            BarType::new(
3667                instrument.id(),
3668                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3669                AggregationSource::Internal,
3670            ),
3671            Price::from("1.00020"),
3672            Price::from("1.00025"),
3673            Price::from("1.00015"),
3674            Price::from("1.00020"),
3675            Quantity::from(100),
3676            UnixNanos::default(),
3677            UnixNanos::default(),
3678        );
3679
3680        // Second bar with downward price movement (10 pips down)
3681        let bar2 = Bar::new(
3682            BarType::new(
3683                instrument.id(),
3684                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3685                AggregationSource::Internal,
3686            ),
3687            Price::from("1.00020"),
3688            Price::from("1.00025"),
3689            Price::from("1.00005"),
3690            Price::from("1.00010"), // 10 pip move down (exactly 1 brick)
3691            Quantity::from(50),
3692            UnixNanos::from(60_000_000_000),
3693            UnixNanos::from(60_000_000_000),
3694        );
3695
3696        aggregator.handle_bar(bar1);
3697        aggregator.handle_bar(bar2);
3698
3699        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3700        assert_eq!(handler_guard.len(), 1);
3701
3702        let bar = handler_guard.first().unwrap();
3703        assert_eq!(bar.open, Price::from("1.00020"));
3704        assert_eq!(bar.high, Price::from("1.00020"));
3705        assert_eq!(bar.low, Price::from("1.00010"));
3706        assert_eq!(bar.close, Price::from("1.00010"));
3707        assert_eq!(bar.volume, Quantity::from(150));
3708    }
3709
3710    #[rstest]
3711    fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3712        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3713
3714        // Test different brick sizes
3715        let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); // 5 pip brick size
3716        let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3717        let handler = Arc::new(Mutex::new(Vec::new()));
3718        let handler_clone = Arc::clone(&handler);
3719
3720        let aggregator_5 = RenkoBarAggregator::new(
3721            bar_type_5,
3722            instrument.price_precision(),
3723            instrument.size_precision(),
3724            instrument.price_increment(),
3725            move |_bar: Bar| {
3726                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3727                handler_guard.push(_bar);
3728            },
3729        );
3730
3731        // 5 pips * price_increment.raw (depends on precision mode)
3732        let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3733        assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3734
3735        let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); // 20 pip brick size
3736        let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3737        let handler2 = Arc::new(Mutex::new(Vec::new()));
3738        let handler2_clone = Arc::clone(&handler2);
3739
3740        let aggregator_20 = RenkoBarAggregator::new(
3741            bar_type_20,
3742            instrument.price_precision(),
3743            instrument.size_precision(),
3744            instrument.price_increment(),
3745            move |_bar: Bar| {
3746                let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3747                handler_guard.push(_bar);
3748            },
3749        );
3750
3751        // 20 pips * price_increment.raw (depends on precision mode)
3752        let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3753        assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3754    }
3755
3756    #[rstest]
3757    fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3758        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3759        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3760        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3761        let handler = Arc::new(Mutex::new(Vec::new()));
3762        let handler_clone = Arc::clone(&handler);
3763
3764        let mut aggregator = RenkoBarAggregator::new(
3765            bar_type,
3766            instrument.price_precision(),
3767            instrument.size_precision(),
3768            instrument.price_increment(),
3769            move |bar: Bar| {
3770                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3771                handler_guard.push(bar);
3772            },
3773        );
3774
3775        // Sequential updates creating multiple bars
3776        aggregator.update(
3777            Price::from("1.00000"),
3778            Quantity::from(1),
3779            UnixNanos::from(1000),
3780        );
3781        aggregator.update(
3782            Price::from("1.00010"),
3783            Quantity::from(1),
3784            UnixNanos::from(2000),
3785        ); // First brick
3786        aggregator.update(
3787            Price::from("1.00020"),
3788            Quantity::from(1),
3789            UnixNanos::from(3000),
3790        ); // Second brick
3791        aggregator.update(
3792            Price::from("1.00025"),
3793            Quantity::from(1),
3794            UnixNanos::from(4000),
3795        ); // Partial third brick
3796        aggregator.update(
3797            Price::from("1.00030"),
3798            Quantity::from(1),
3799            UnixNanos::from(5000),
3800        ); // Complete third brick
3801
3802        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3803        assert_eq!(handler_guard.len(), 3);
3804
3805        let bar1 = &handler_guard[0];
3806        assert_eq!(bar1.open, Price::from("1.00000"));
3807        assert_eq!(bar1.close, Price::from("1.00010"));
3808
3809        let bar2 = &handler_guard[1];
3810        assert_eq!(bar2.open, Price::from("1.00010"));
3811        assert_eq!(bar2.close, Price::from("1.00020"));
3812
3813        let bar3 = &handler_guard[2];
3814        assert_eq!(bar3.open, Price::from("1.00020"));
3815        assert_eq!(bar3.close, Price::from("1.00030"));
3816    }
3817
3818    #[rstest]
3819    fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3820        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3821        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3822        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3823        let handler = Arc::new(Mutex::new(Vec::new()));
3824        let handler_clone = Arc::clone(&handler);
3825
3826        let mut aggregator = RenkoBarAggregator::new(
3827            bar_type,
3828            instrument.price_precision(),
3829            instrument.size_precision(),
3830            instrument.price_increment(),
3831            move |bar: Bar| {
3832                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3833                handler_guard.push(bar);
3834            },
3835        );
3836
3837        // Mixed direction movement: up then down
3838        aggregator.update(
3839            Price::from("1.00000"),
3840            Quantity::from(1),
3841            UnixNanos::from(1000),
3842        );
3843        aggregator.update(
3844            Price::from("1.00010"),
3845            Quantity::from(1),
3846            UnixNanos::from(2000),
3847        ); // Up brick
3848        aggregator.update(
3849            Price::from("0.99990"),
3850            Quantity::from(1),
3851            UnixNanos::from(3000),
3852        ); // Down 2 bricks (20 pips)
3853
3854        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3855        assert_eq!(handler_guard.len(), 3);
3856
3857        let bar1 = &handler_guard[0]; // Up brick
3858        assert_eq!(bar1.open, Price::from("1.00000"));
3859        assert_eq!(bar1.high, Price::from("1.00010"));
3860        assert_eq!(bar1.low, Price::from("1.00000"));
3861        assert_eq!(bar1.close, Price::from("1.00010"));
3862
3863        let bar2 = &handler_guard[1]; // First down brick
3864        assert_eq!(bar2.open, Price::from("1.00010"));
3865        assert_eq!(bar2.high, Price::from("1.00010"));
3866        assert_eq!(bar2.low, Price::from("1.00000"));
3867        assert_eq!(bar2.close, Price::from("1.00000"));
3868
3869        let bar3 = &handler_guard[2]; // Second down brick
3870        assert_eq!(bar3.open, Price::from("1.00000"));
3871        assert_eq!(bar3.high, Price::from("1.00000"));
3872        assert_eq!(bar3.low, Price::from("0.99990"));
3873        assert_eq!(bar3.close, Price::from("0.99990"));
3874    }
3875
3876    #[rstest]
3877    fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3878        let instrument = InstrumentAny::Equity(equity_aapl);
3879        let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3880        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3881        let handler = Arc::new(Mutex::new(Vec::new()));
3882        let handler_clone = Arc::clone(&handler);
3883
3884        let mut aggregator = TickImbalanceBarAggregator::new(
3885            bar_type,
3886            instrument.price_precision(),
3887            instrument.size_precision(),
3888            move |bar: Bar| {
3889                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3890                handler_guard.push(bar);
3891            },
3892        );
3893
3894        let buy = TradeTick {
3895            aggressor_side: AggressorSide::Buyer,
3896            ..TradeTick::default()
3897        };
3898        let sell = TradeTick {
3899            aggressor_side: AggressorSide::Seller,
3900            ..TradeTick::default()
3901        };
3902
3903        aggregator.handle_trade(buy);
3904        aggregator.handle_trade(sell);
3905        aggregator.handle_trade(buy);
3906
3907        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3908        assert_eq!(handler_guard.len(), 0);
3909    }
3910
3911    #[rstest]
3912    fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3913        let instrument = InstrumentAny::Equity(equity_aapl);
3914        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3915        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3916        let handler = Arc::new(Mutex::new(Vec::new()));
3917        let handler_clone = Arc::clone(&handler);
3918
3919        let mut aggregator = TickImbalanceBarAggregator::new(
3920            bar_type,
3921            instrument.price_precision(),
3922            instrument.size_precision(),
3923            move |bar: Bar| {
3924                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3925                handler_guard.push(bar);
3926            },
3927        );
3928
3929        let buy = TradeTick {
3930            aggressor_side: AggressorSide::Buyer,
3931            ..TradeTick::default()
3932        };
3933        let no_aggressor = TradeTick {
3934            aggressor_side: AggressorSide::NoAggressor,
3935            ..TradeTick::default()
3936        };
3937
3938        aggregator.handle_trade(buy);
3939        aggregator.handle_trade(no_aggressor);
3940        aggregator.handle_trade(buy);
3941
3942        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3943        assert_eq!(handler_guard.len(), 1);
3944    }
3945
3946    #[rstest]
3947    fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3948        let instrument = InstrumentAny::Equity(equity_aapl);
3949        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3950        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3951        let handler = Arc::new(Mutex::new(Vec::new()));
3952        let handler_clone = Arc::clone(&handler);
3953
3954        let mut aggregator = TickRunsBarAggregator::new(
3955            bar_type,
3956            instrument.price_precision(),
3957            instrument.size_precision(),
3958            move |bar: Bar| {
3959                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3960                handler_guard.push(bar);
3961            },
3962        );
3963
3964        let buy = TradeTick {
3965            aggressor_side: AggressorSide::Buyer,
3966            ..TradeTick::default()
3967        };
3968        let sell = TradeTick {
3969            aggressor_side: AggressorSide::Seller,
3970            ..TradeTick::default()
3971        };
3972
3973        aggregator.handle_trade(buy);
3974        aggregator.handle_trade(buy);
3975        aggregator.handle_trade(sell);
3976        aggregator.handle_trade(sell);
3977
3978        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3979        assert_eq!(handler_guard.len(), 2);
3980    }
3981
3982    #[rstest]
3983    fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
3984        let instrument = InstrumentAny::Equity(equity_aapl);
3985        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
3986        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3987        let handler = Arc::new(Mutex::new(Vec::new()));
3988        let handler_clone = Arc::clone(&handler);
3989
3990        let mut aggregator = VolumeImbalanceBarAggregator::new(
3991            bar_type,
3992            instrument.price_precision(),
3993            instrument.size_precision(),
3994            move |bar: Bar| {
3995                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3996                handler_guard.push(bar);
3997            },
3998        );
3999
4000        let large_trade = TradeTick {
4001            size: Quantity::from(25),
4002            aggressor_side: AggressorSide::Buyer,
4003            ..TradeTick::default()
4004        };
4005
4006        aggregator.handle_trade(large_trade);
4007
4008        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4009        assert_eq!(handler_guard.len(), 2);
4010    }
4011
4012    #[rstest]
4013    fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4014        equity_aapl: Equity,
4015    ) {
4016        let instrument = InstrumentAny::Equity(equity_aapl);
4017        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4018        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4019        let handler = Arc::new(Mutex::new(Vec::new()));
4020        let handler_clone = Arc::clone(&handler);
4021
4022        let mut aggregator = VolumeImbalanceBarAggregator::new(
4023            bar_type,
4024            instrument.price_precision(),
4025            instrument.size_precision(),
4026            move |bar: Bar| {
4027                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4028                handler_guard.push(bar);
4029            },
4030        );
4031
4032        let buy = TradeTick {
4033            size: Quantity::from(5),
4034            aggressor_side: AggressorSide::Buyer,
4035            ..TradeTick::default()
4036        };
4037        let no_aggressor = TradeTick {
4038            size: Quantity::from(3),
4039            aggressor_side: AggressorSide::NoAggressor,
4040            ..TradeTick::default()
4041        };
4042
4043        aggregator.handle_trade(buy);
4044        aggregator.handle_trade(no_aggressor);
4045        aggregator.handle_trade(buy);
4046
4047        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4048        assert_eq!(handler_guard.len(), 1);
4049    }
4050
4051    #[rstest]
4052    fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4053        let instrument = InstrumentAny::Equity(equity_aapl);
4054        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4055        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4056        let handler = Arc::new(Mutex::new(Vec::new()));
4057        let handler_clone = Arc::clone(&handler);
4058
4059        let mut aggregator = VolumeRunsBarAggregator::new(
4060            bar_type,
4061            instrument.price_precision(),
4062            instrument.size_precision(),
4063            move |bar: Bar| {
4064                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4065                handler_guard.push(bar);
4066            },
4067        );
4068
4069        let large_trade = TradeTick {
4070            size: Quantity::from(25),
4071            aggressor_side: AggressorSide::Buyer,
4072            ..TradeTick::default()
4073        };
4074
4075        aggregator.handle_trade(large_trade);
4076
4077        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4078        assert_eq!(handler_guard.len(), 2);
4079    }
4080
4081    #[rstest]
4082    fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4083        let instrument = InstrumentAny::Equity(equity_aapl);
4084        let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4085        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4086        let handler = Arc::new(Mutex::new(Vec::new()));
4087        let handler_clone = Arc::clone(&handler);
4088
4089        let mut aggregator = ValueRunsBarAggregator::new(
4090            bar_type,
4091            instrument.price_precision(),
4092            instrument.size_precision(),
4093            move |bar: Bar| {
4094                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4095                handler_guard.push(bar);
4096            },
4097        );
4098
4099        let large_trade = TradeTick {
4100            price: Price::from("5.00"),
4101            size: Quantity::from(25),
4102            aggressor_side: AggressorSide::Buyer,
4103            ..TradeTick::default()
4104        };
4105
4106        aggregator.handle_trade(large_trade);
4107
4108        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4109        assert_eq!(handler_guard.len(), 2);
4110    }
4111}