nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{
22    any::Any,
23    cell::RefCell,
24    fmt::Debug,
25    ops::Add,
26    rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31    clock::{Clock, TestClock},
32    timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35    UnixNanos,
36    correctness::{self, FAILED},
37    datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40    data::{
41        QuoteTick, TradeTick,
42        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43    },
44    enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45    types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48/// Type alias for bar handler to reduce type complexity.
49type BarHandler = Box<dyn FnMut(Bar)>;
50
51/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
52///
53/// Implementors receive updates and produce completed bars via handlers.
54pub trait BarAggregator: Any + Debug {
55    /// The [`BarType`] to be aggregated.
56    fn bar_type(&self) -> BarType;
57    /// If the aggregator is running and will receive data from the message bus.
58    fn is_running(&self) -> bool;
59    /// Sets the running state of the aggregator (receiving updates when `true`).
60    fn set_is_running(&mut self, value: bool);
61    /// Updates the aggregator  with the given price and size.
62    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63    /// Updates the aggregator with the given quote.
64    fn handle_quote(&mut self, quote: QuoteTick) {
65        let spec = self.bar_type().spec();
66        self.update(
67            quote.extract_price(spec.price_type),
68            quote.extract_size(spec.price_type),
69            quote.ts_init,
70        );
71    }
72    /// Updates the aggregator with the given trade.
73    fn handle_trade(&mut self, trade: TradeTick) {
74        self.update(trade.price, trade.size, trade.ts_init);
75    }
76    /// Updates the aggregator with the given bar.
77    fn handle_bar(&mut self, bar: Bar) {
78        self.update_bar(bar, bar.volume, bar.ts_init);
79    }
80    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
82    fn stop(&mut self) {}
83    /// Sets historical mode (default implementation does nothing, TimeBarAggregator overrides)
84    fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85    /// Sets historical events (default implementation does nothing, TimeBarAggregator overrides)
86    fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87    /// Sets clock for time bar aggregators (default implementation does nothing, TimeBarAggregator overrides)
88    fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89    /// Builds a bar from a time event (default implementation does nothing, TimeBarAggregator overrides)
90    fn build_bar(&mut self, _event: TimeEvent) {}
91    /// Starts the timer for time bar aggregators.
92    /// Default implementation does nothing, TimeBarAggregator overrides.
93    /// Takes an optional Rc to create weak reference internally.
94    fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95    /// Sets the weak reference to the aggregator wrapper (for historical mode).
96    /// Default implementation does nothing, TimeBarAggregator overrides.
97    fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101    /// Returns a reference to this aggregator as `Any` for downcasting.
102    pub fn as_any(&self) -> &dyn Any {
103        self
104    }
105    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
106    pub fn as_any_mut(&mut self) -> &mut dyn Any {
107        self
108    }
109}
110
111/// Provides a generic bar builder for aggregation.
112#[derive(Debug)]
113pub struct BarBuilder {
114    bar_type: BarType,
115    price_precision: u8,
116    size_precision: u8,
117    initialized: bool,
118    ts_last: UnixNanos,
119    count: usize,
120    last_close: Option<Price>,
121    open: Option<Price>,
122    high: Option<Price>,
123    low: Option<Price>,
124    close: Option<Price>,
125    volume: Quantity,
126}
127
128impl BarBuilder {
129    /// Creates a new [`BarBuilder`] instance.
130    ///
131    /// # Panics
132    ///
133    /// This function panics if:
134    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
135    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
136    #[must_use]
137    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
138        correctness::check_equal(
139            &bar_type.aggregation_source(),
140            &AggregationSource::Internal,
141            "bar_type.aggregation_source",
142            "AggregationSource::Internal",
143        )
144        .expect(FAILED);
145
146        Self {
147            bar_type,
148            price_precision,
149            size_precision,
150            initialized: false,
151            ts_last: UnixNanos::default(),
152            count: 0,
153            last_close: None,
154            open: None,
155            high: None,
156            low: None,
157            close: None,
158            volume: Quantity::zero(size_precision),
159        }
160    }
161
162    /// Updates the builder state with the given price, size, and init timestamp.
163    ///
164    /// # Panics
165    ///
166    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
167    pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
168        if ts_init < self.ts_last {
169            return; // Not applicable
170        }
171
172        if self.open.is_none() {
173            self.open = Some(price);
174            self.high = Some(price);
175            self.low = Some(price);
176            self.initialized = true;
177        } else {
178            if price > self.high.unwrap() {
179                self.high = Some(price);
180            }
181            if price < self.low.unwrap() {
182                self.low = Some(price);
183            }
184        }
185
186        self.close = Some(price);
187        self.volume = self.volume.add(size);
188        self.count += 1;
189        self.ts_last = ts_init;
190    }
191
192    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
193    ///
194    /// # Panics
195    ///
196    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
197    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
198        if ts_init < self.ts_last {
199            return; // Not applicable
200        }
201
202        if self.open.is_none() {
203            self.open = Some(bar.open);
204            self.high = Some(bar.high);
205            self.low = Some(bar.low);
206            self.initialized = true;
207        } else {
208            if bar.high > self.high.unwrap() {
209                self.high = Some(bar.high);
210            }
211            if bar.low < self.low.unwrap() {
212                self.low = Some(bar.low);
213            }
214        }
215
216        self.close = Some(bar.close);
217        self.volume = self.volume.add(volume);
218        self.count += 1;
219        self.ts_last = ts_init;
220    }
221
222    /// Reset the bar builder.
223    ///
224    /// All stateful fields are reset to their initial value.
225    pub fn reset(&mut self) {
226        self.open = None;
227        self.high = None;
228        self.low = None;
229        self.volume = Quantity::zero(self.size_precision);
230        self.count = 0;
231    }
232
233    /// Return the aggregated bar and reset.
234    pub fn build_now(&mut self) -> Bar {
235        self.build(self.ts_last, self.ts_last)
236    }
237
238    /// Returns the aggregated bar for the given timestamps, then resets the builder.
239    ///
240    /// # Panics
241    ///
242    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
243    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
244        if self.open.is_none() {
245            self.open = self.last_close;
246            self.high = self.last_close;
247            self.low = self.last_close;
248            self.close = self.last_close;
249        }
250
251        if let (Some(close), Some(low)) = (self.close, self.low)
252            && close < low
253        {
254            self.low = Some(close);
255        }
256
257        if let (Some(close), Some(high)) = (self.close, self.high)
258            && close > high
259        {
260            self.high = Some(close);
261        }
262
263        // SAFETY: The open was checked, so we can assume all prices are Some
264        let bar = Bar::new(
265            self.bar_type,
266            self.open.unwrap(),
267            self.high.unwrap(),
268            self.low.unwrap(),
269            self.close.unwrap(),
270            self.volume,
271            ts_event,
272            ts_init,
273        );
274
275        self.last_close = self.close;
276        self.reset();
277        bar
278    }
279}
280
281/// Provides a means of aggregating specified bar types and sending to a registered handler.
282pub struct BarAggregatorCore {
283    bar_type: BarType,
284    builder: BarBuilder,
285    handler: BarHandler,
286    is_running: bool,
287}
288
289impl Debug for BarAggregatorCore {
290    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291        f.debug_struct(stringify!(BarAggregatorCore))
292            .field("bar_type", &self.bar_type)
293            .field("builder", &self.builder)
294            .field("is_running", &self.is_running)
295            .finish()
296    }
297}
298
299impl BarAggregatorCore {
300    /// Creates a new [`BarAggregatorCore`] instance.
301    ///
302    /// # Panics
303    ///
304    /// This function panics if:
305    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
306    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
307    pub fn new<H: FnMut(Bar) + 'static>(
308        bar_type: BarType,
309        price_precision: u8,
310        size_precision: u8,
311        handler: H,
312    ) -> Self {
313        Self {
314            bar_type,
315            builder: BarBuilder::new(bar_type, price_precision, size_precision),
316            handler: Box::new(handler),
317            is_running: false,
318        }
319    }
320
321    /// Sets the running state of the aggregator (receives updates when `true`).
322    pub const fn set_is_running(&mut self, value: bool) {
323        self.is_running = value;
324    }
325    fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
326        self.builder.update(price, size, ts_init);
327    }
328
329    fn build_now_and_send(&mut self) {
330        let bar = self.builder.build_now();
331        (self.handler)(bar);
332    }
333
334    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
335        let bar = self.builder.build(ts_event, ts_init);
336        (self.handler)(bar);
337    }
338}
339
340/// Provides a means of building tick bars aggregated from quote and trades.
341///
342/// When received tick count reaches the step threshold of the bar
343/// specification, then a bar is created and sent to the handler.
344pub struct TickBarAggregator {
345    core: BarAggregatorCore,
346}
347
348impl Debug for TickBarAggregator {
349    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350        f.debug_struct(stringify!(TickBarAggregator))
351            .field("core", &self.core)
352            .finish()
353    }
354}
355
356impl TickBarAggregator {
357    /// Creates a new [`TickBarAggregator`] instance.
358    ///
359    /// # Panics
360    ///
361    /// This function panics if:
362    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
363    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
364    pub fn new<H: FnMut(Bar) + 'static>(
365        bar_type: BarType,
366        price_precision: u8,
367        size_precision: u8,
368        handler: H,
369    ) -> Self {
370        Self {
371            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
372        }
373    }
374}
375
376impl BarAggregator for TickBarAggregator {
377    fn bar_type(&self) -> BarType {
378        self.core.bar_type
379    }
380
381    fn is_running(&self) -> bool {
382        self.core.is_running
383    }
384
385    fn set_is_running(&mut self, value: bool) {
386        self.core.set_is_running(value);
387    }
388
389    /// Apply the given update to the aggregator.
390    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
391        self.core.apply_update(price, size, ts_init);
392        let spec = self.core.bar_type.spec();
393
394        if self.core.builder.count >= spec.step.get() {
395            self.core.build_now_and_send();
396        }
397    }
398
399    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
400        self.core.builder.update_bar(bar, volume, ts_init);
401        let spec = self.core.bar_type.spec();
402
403        if self.core.builder.count >= spec.step.get() {
404            self.core.build_now_and_send();
405        }
406    }
407}
408
409/// Aggregates bars based on tick buy/sell imbalance.
410///
411/// Increments imbalance by +1 for buyer-aggressed trades and -1 for seller-aggressed trades.
412/// Emits a bar when the absolute imbalance reaches the step threshold.
413pub struct TickImbalanceBarAggregator {
414    core: BarAggregatorCore,
415    imbalance: isize,
416}
417
418impl Debug for TickImbalanceBarAggregator {
419    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420        f.debug_struct(stringify!(TickImbalanceBarAggregator))
421            .field("core", &self.core)
422            .field("imbalance", &self.imbalance)
423            .finish()
424    }
425}
426
427impl TickImbalanceBarAggregator {
428    /// Creates a new [`TickImbalanceBarAggregator`] instance.
429    ///
430    /// # Panics
431    ///
432    /// This function panics if:
433    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
434    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
435    pub fn new<H: FnMut(Bar) + 'static>(
436        bar_type: BarType,
437        price_precision: u8,
438        size_precision: u8,
439        handler: H,
440    ) -> Self {
441        Self {
442            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
443            imbalance: 0,
444        }
445    }
446}
447
448impl BarAggregator for TickImbalanceBarAggregator {
449    fn bar_type(&self) -> BarType {
450        self.core.bar_type
451    }
452
453    fn is_running(&self) -> bool {
454        self.core.is_running
455    }
456
457    fn set_is_running(&mut self, value: bool) {
458        self.core.set_is_running(value);
459    }
460
461    /// Apply the given update to the aggregator.
462    ///
463    /// Note: side-aware logic lives in `handle_trade`. This method is used for
464    /// quote/bar updates where no aggressor side is available.
465    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
466        self.core.apply_update(price, size, ts_init);
467    }
468
469    fn handle_trade(&mut self, trade: TradeTick) {
470        self.core
471            .apply_update(trade.price, trade.size, trade.ts_init);
472
473        let delta = match trade.aggressor_side {
474            AggressorSide::Buyer => 1,
475            AggressorSide::Seller => -1,
476            AggressorSide::NoAggressor => 0,
477        };
478
479        if delta == 0 {
480            return;
481        }
482
483        self.imbalance += delta;
484        let threshold = self.core.bar_type.spec().step.get();
485        if self.imbalance.unsigned_abs() >= threshold {
486            self.core.build_now_and_send();
487            self.imbalance = 0;
488        }
489    }
490
491    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
492        self.core.builder.update_bar(bar, volume, ts_init);
493    }
494}
495
496/// Aggregates bars based on consecutive buy/sell tick runs.
497pub struct TickRunsBarAggregator {
498    core: BarAggregatorCore,
499    current_run_side: Option<AggressorSide>,
500    run_count: usize,
501}
502
503impl Debug for TickRunsBarAggregator {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        f.debug_struct(stringify!(TickRunsBarAggregator))
506            .field("core", &self.core)
507            .field("current_run_side", &self.current_run_side)
508            .field("run_count", &self.run_count)
509            .finish()
510    }
511}
512
513impl TickRunsBarAggregator {
514    /// Creates a new [`TickRunsBarAggregator`] instance.
515    ///
516    /// # Panics
517    ///
518    /// This function panics if:
519    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
520    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
521    pub fn new<H: FnMut(Bar) + 'static>(
522        bar_type: BarType,
523        price_precision: u8,
524        size_precision: u8,
525        handler: H,
526    ) -> Self {
527        Self {
528            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
529            current_run_side: None,
530            run_count: 0,
531        }
532    }
533}
534
535impl BarAggregator for TickRunsBarAggregator {
536    fn bar_type(&self) -> BarType {
537        self.core.bar_type
538    }
539
540    fn is_running(&self) -> bool {
541        self.core.is_running
542    }
543
544    fn set_is_running(&mut self, value: bool) {
545        self.core.set_is_running(value);
546    }
547
548    /// Apply the given update to the aggregator.
549    ///
550    /// Note: side-aware logic lives in `handle_trade`. This method is used for
551    /// quote/bar updates where no aggressor side is available.
552    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
553        self.core.apply_update(price, size, ts_init);
554    }
555
556    fn handle_trade(&mut self, trade: TradeTick) {
557        let side = match trade.aggressor_side {
558            AggressorSide::Buyer => Some(AggressorSide::Buyer),
559            AggressorSide::Seller => Some(AggressorSide::Seller),
560            AggressorSide::NoAggressor => None,
561        };
562
563        if let Some(side) = side {
564            if self.current_run_side != Some(side) {
565                self.current_run_side = Some(side);
566                self.run_count = 0;
567                self.core.builder.reset();
568            }
569
570            self.core
571                .apply_update(trade.price, trade.size, trade.ts_init);
572            self.run_count += 1;
573
574            let threshold = self.core.bar_type.spec().step.get();
575            if self.run_count >= threshold {
576                self.core.build_now_and_send();
577                self.run_count = 0;
578                self.current_run_side = None;
579            }
580        } else {
581            self.core
582                .apply_update(trade.price, trade.size, trade.ts_init);
583        }
584    }
585
586    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
587        self.core.builder.update_bar(bar, volume, ts_init);
588    }
589}
590
591/// Provides a means of building volume bars aggregated from quote and trades.
592pub struct VolumeBarAggregator {
593    core: BarAggregatorCore,
594}
595
596impl Debug for VolumeBarAggregator {
597    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
598        f.debug_struct(stringify!(VolumeBarAggregator))
599            .field("core", &self.core)
600            .finish()
601    }
602}
603
604impl VolumeBarAggregator {
605    /// Creates a new [`VolumeBarAggregator`] instance.
606    ///
607    /// # Panics
608    ///
609    /// This function panics if:
610    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
611    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
612    pub fn new<H: FnMut(Bar) + 'static>(
613        bar_type: BarType,
614        price_precision: u8,
615        size_precision: u8,
616        handler: H,
617    ) -> Self {
618        Self {
619            core: BarAggregatorCore::new(
620                bar_type.standard(),
621                price_precision,
622                size_precision,
623                handler,
624            ),
625        }
626    }
627}
628
629impl BarAggregator for VolumeBarAggregator {
630    fn bar_type(&self) -> BarType {
631        self.core.bar_type
632    }
633
634    fn is_running(&self) -> bool {
635        self.core.is_running
636    }
637
638    fn set_is_running(&mut self, value: bool) {
639        self.core.set_is_running(value);
640    }
641
642    /// Apply the given update to the aggregator.
643    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
644        let mut raw_size_update = size.raw;
645        let spec = self.core.bar_type.spec();
646        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
647
648        while raw_size_update > 0 {
649            if self.core.builder.volume.raw + raw_size_update < raw_step {
650                self.core.apply_update(
651                    price,
652                    Quantity::from_raw(raw_size_update, size.precision),
653                    ts_init,
654                );
655                break;
656            }
657
658            let raw_size_diff = raw_step - self.core.builder.volume.raw;
659            self.core.apply_update(
660                price,
661                Quantity::from_raw(raw_size_diff, size.precision),
662                ts_init,
663            );
664
665            self.core.build_now_and_send();
666            raw_size_update -= raw_size_diff;
667        }
668    }
669
670    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
671        let mut raw_volume_update = volume.raw;
672        let spec = self.core.bar_type.spec();
673        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
674
675        while raw_volume_update > 0 {
676            if self.core.builder.volume.raw + raw_volume_update < raw_step {
677                self.core.builder.update_bar(
678                    bar,
679                    Quantity::from_raw(raw_volume_update, volume.precision),
680                    ts_init,
681                );
682                break;
683            }
684
685            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
686            self.core.builder.update_bar(
687                bar,
688                Quantity::from_raw(raw_volume_diff, volume.precision),
689                ts_init,
690            );
691
692            self.core.build_now_and_send();
693            raw_volume_update -= raw_volume_diff;
694        }
695    }
696}
697
698/// Aggregates bars based on buy/sell volume imbalance.
699pub struct VolumeImbalanceBarAggregator {
700    core: BarAggregatorCore,
701    imbalance_raw: i128,
702    raw_step: i128,
703}
704
705impl Debug for VolumeImbalanceBarAggregator {
706    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707        f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
708            .field("core", &self.core)
709            .field("imbalance_raw", &self.imbalance_raw)
710            .field("raw_step", &self.raw_step)
711            .finish()
712    }
713}
714
715impl VolumeImbalanceBarAggregator {
716    /// Creates a new [`VolumeImbalanceBarAggregator`] instance.
717    ///
718    /// # Panics
719    ///
720    /// This function panics if:
721    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
722    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
723    pub fn new<H: FnMut(Bar) + 'static>(
724        bar_type: BarType,
725        price_precision: u8,
726        size_precision: u8,
727        handler: H,
728    ) -> Self {
729        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
730        Self {
731            core: BarAggregatorCore::new(
732                bar_type.standard(),
733                price_precision,
734                size_precision,
735                handler,
736            ),
737            imbalance_raw: 0,
738            raw_step,
739        }
740    }
741}
742
743impl BarAggregator for VolumeImbalanceBarAggregator {
744    fn bar_type(&self) -> BarType {
745        self.core.bar_type
746    }
747
748    fn is_running(&self) -> bool {
749        self.core.is_running
750    }
751
752    fn set_is_running(&mut self, value: bool) {
753        self.core.set_is_running(value);
754    }
755
756    /// Apply the given update to the aggregator.
757    ///
758    /// Note: side-aware logic lives in `handle_trade`. This method is used for
759    /// quote/bar updates where no aggressor side is available.
760    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
761        self.core.apply_update(price, size, ts_init);
762    }
763
764    fn handle_trade(&mut self, trade: TradeTick) {
765        let side = match trade.aggressor_side {
766            AggressorSide::Buyer => 1,
767            AggressorSide::Seller => -1,
768            AggressorSide::NoAggressor => {
769                self.core
770                    .apply_update(trade.price, trade.size, trade.ts_init);
771                return;
772            }
773        };
774
775        let mut raw_remaining = trade.size.raw as i128;
776        while raw_remaining > 0 {
777            let imbalance_abs = self.imbalance_raw.abs();
778            let needed = (self.raw_step - imbalance_abs).max(1);
779            let raw_chunk = raw_remaining.min(needed);
780            let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
781
782            self.core
783                .apply_update(trade.price, qty_chunk, trade.ts_init);
784
785            self.imbalance_raw += side * raw_chunk;
786            raw_remaining -= raw_chunk;
787
788            if self.imbalance_raw.abs() >= self.raw_step {
789                self.core.build_now_and_send();
790                self.imbalance_raw = 0;
791            }
792        }
793    }
794
795    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
796        self.core.builder.update_bar(bar, volume, ts_init);
797    }
798}
799
800/// Aggregates bars based on consecutive buy/sell volume runs.
801pub struct VolumeRunsBarAggregator {
802    core: BarAggregatorCore,
803    current_run_side: Option<AggressorSide>,
804    run_volume_raw: QuantityRaw,
805    raw_step: QuantityRaw,
806}
807
808impl Debug for VolumeRunsBarAggregator {
809    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810        f.debug_struct(stringify!(VolumeRunsBarAggregator))
811            .field("core", &self.core)
812            .field("current_run_side", &self.current_run_side)
813            .field("run_volume_raw", &self.run_volume_raw)
814            .field("raw_step", &self.raw_step)
815            .finish()
816    }
817}
818
819impl VolumeRunsBarAggregator {
820    /// Creates a new [`VolumeRunsBarAggregator`] instance.
821    ///
822    /// # Panics
823    ///
824    /// This function panics if:
825    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
826    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
827    pub fn new<H: FnMut(Bar) + 'static>(
828        bar_type: BarType,
829        price_precision: u8,
830        size_precision: u8,
831        handler: H,
832    ) -> Self {
833        let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
834        Self {
835            core: BarAggregatorCore::new(
836                bar_type.standard(),
837                price_precision,
838                size_precision,
839                handler,
840            ),
841            current_run_side: None,
842            run_volume_raw: 0,
843            raw_step,
844        }
845    }
846}
847
848impl BarAggregator for VolumeRunsBarAggregator {
849    fn bar_type(&self) -> BarType {
850        self.core.bar_type
851    }
852
853    fn is_running(&self) -> bool {
854        self.core.is_running
855    }
856
857    fn set_is_running(&mut self, value: bool) {
858        self.core.set_is_running(value);
859    }
860
861    /// Apply the given update to the aggregator.
862    ///
863    /// Note: side-aware logic lives in `handle_trade`. This method is used for
864    /// quote/bar updates where no aggressor side is available.
865    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
866        self.core.apply_update(price, size, ts_init);
867    }
868
869    fn handle_trade(&mut self, trade: TradeTick) {
870        let side = match trade.aggressor_side {
871            AggressorSide::Buyer => Some(AggressorSide::Buyer),
872            AggressorSide::Seller => Some(AggressorSide::Seller),
873            AggressorSide::NoAggressor => None,
874        };
875
876        let Some(side) = side else {
877            self.core
878                .apply_update(trade.price, trade.size, trade.ts_init);
879            return;
880        };
881
882        if self.current_run_side != Some(side) {
883            self.current_run_side = Some(side);
884            self.run_volume_raw = 0;
885            self.core.builder.reset();
886        }
887
888        let mut raw_remaining = trade.size.raw;
889        while raw_remaining > 0 {
890            let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
891            let raw_chunk = raw_remaining.min(needed);
892
893            self.core.apply_update(
894                trade.price,
895                Quantity::from_raw(raw_chunk, trade.size.precision),
896                trade.ts_init,
897            );
898
899            self.run_volume_raw += raw_chunk;
900            raw_remaining -= raw_chunk;
901
902            if self.run_volume_raw >= self.raw_step {
903                self.core.build_now_and_send();
904                self.run_volume_raw = 0;
905                self.current_run_side = None;
906            }
907        }
908    }
909
910    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
911        self.core.builder.update_bar(bar, volume, ts_init);
912    }
913}
914
915/// Provides a means of building value bars aggregated from quote and trades.
916///
917/// When received value reaches the step threshold of the bar
918/// specification, then a bar is created and sent to the handler.
919pub struct ValueBarAggregator {
920    core: BarAggregatorCore,
921    cum_value: f64,
922}
923
924impl Debug for ValueBarAggregator {
925    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926        f.debug_struct(stringify!(ValueBarAggregator))
927            .field("core", &self.core)
928            .field("cum_value", &self.cum_value)
929            .finish()
930    }
931}
932
933impl ValueBarAggregator {
934    /// Creates a new [`ValueBarAggregator`] instance.
935    ///
936    /// # Panics
937    ///
938    /// This function panics if:
939    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
940    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
941    pub fn new<H: FnMut(Bar) + 'static>(
942        bar_type: BarType,
943        price_precision: u8,
944        size_precision: u8,
945        handler: H,
946    ) -> Self {
947        Self {
948            core: BarAggregatorCore::new(
949                bar_type.standard(),
950                price_precision,
951                size_precision,
952                handler,
953            ),
954            cum_value: 0.0,
955        }
956    }
957
958    #[must_use]
959    /// Returns the cumulative value for the aggregator.
960    pub const fn get_cumulative_value(&self) -> f64 {
961        self.cum_value
962    }
963}
964
965impl BarAggregator for ValueBarAggregator {
966    fn bar_type(&self) -> BarType {
967        self.core.bar_type
968    }
969
970    fn is_running(&self) -> bool {
971        self.core.is_running
972    }
973
974    fn set_is_running(&mut self, value: bool) {
975        self.core.set_is_running(value);
976    }
977
978    /// Apply the given update to the aggregator.
979    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
980        let mut size_update = size.as_f64();
981        let spec = self.core.bar_type.spec();
982
983        while size_update > 0.0 {
984            let value_update = price.as_f64() * size_update;
985            if value_update == 0.0 {
986                // Prevent division by zero - apply remaining size without triggering bar
987                self.core
988                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
989                break;
990            }
991
992            if self.cum_value + value_update < spec.step.get() as f64 {
993                self.cum_value += value_update;
994                self.core
995                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
996                break;
997            }
998
999            let value_diff = spec.step.get() as f64 - self.cum_value;
1000            let size_diff = size_update * (value_diff / value_update);
1001            self.core
1002                .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1003
1004            self.core.build_now_and_send();
1005            self.cum_value = 0.0;
1006            size_update -= size_diff;
1007        }
1008    }
1009
1010    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1011        let mut volume_update = volume;
1012        let average_price = Price::new(
1013            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1014            self.core.builder.price_precision,
1015        );
1016
1017        while volume_update.as_f64() > 0.0 {
1018            let value_update = average_price.as_f64() * volume_update.as_f64();
1019            if value_update == 0.0 {
1020                // Prevent division by zero - apply remaining volume without triggering bar
1021                self.core.builder.update_bar(bar, volume_update, ts_init);
1022                break;
1023            }
1024
1025            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1026                self.cum_value += value_update;
1027                self.core.builder.update_bar(bar, volume_update, ts_init);
1028                break;
1029            }
1030
1031            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1032            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
1033            self.core.builder.update_bar(
1034                bar,
1035                Quantity::new(volume_diff, volume_update.precision),
1036                ts_init,
1037            );
1038
1039            self.core.build_now_and_send();
1040            self.cum_value = 0.0;
1041            volume_update = Quantity::new(
1042                volume_update.as_f64() - volume_diff,
1043                volume_update.precision,
1044            );
1045        }
1046    }
1047}
1048
1049/// Aggregates bars based on buy/sell notional imbalance.
1050pub struct ValueImbalanceBarAggregator {
1051    core: BarAggregatorCore,
1052    imbalance_value: f64,
1053    step_value: f64,
1054}
1055
1056impl Debug for ValueImbalanceBarAggregator {
1057    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058        f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1059            .field("core", &self.core)
1060            .field("imbalance_value", &self.imbalance_value)
1061            .field("step_value", &self.step_value)
1062            .finish()
1063    }
1064}
1065
1066impl ValueImbalanceBarAggregator {
1067    /// Creates a new [`ValueImbalanceBarAggregator`] instance.
1068    ///
1069    /// # Panics
1070    ///
1071    /// This function panics if:
1072    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1073    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1074    pub fn new<H: FnMut(Bar) + 'static>(
1075        bar_type: BarType,
1076        price_precision: u8,
1077        size_precision: u8,
1078        handler: H,
1079    ) -> Self {
1080        Self {
1081            core: BarAggregatorCore::new(
1082                bar_type.standard(),
1083                price_precision,
1084                size_precision,
1085                handler,
1086            ),
1087            imbalance_value: 0.0,
1088            step_value: bar_type.spec().step.get() as f64,
1089        }
1090    }
1091}
1092
1093impl BarAggregator for ValueImbalanceBarAggregator {
1094    fn bar_type(&self) -> BarType {
1095        self.core.bar_type
1096    }
1097
1098    fn is_running(&self) -> bool {
1099        self.core.is_running
1100    }
1101
1102    fn set_is_running(&mut self, value: bool) {
1103        self.core.set_is_running(value);
1104    }
1105
1106    /// Apply the given update to the aggregator.
1107    ///
1108    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1109    /// quote/bar updates where no aggressor side is available.
1110    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1111        self.core.apply_update(price, size, ts_init);
1112    }
1113
1114    fn handle_trade(&mut self, trade: TradeTick) {
1115        let price_f64 = trade.price.as_f64();
1116        if price_f64 == 0.0 {
1117            self.core
1118                .apply_update(trade.price, trade.size, trade.ts_init);
1119            return;
1120        }
1121
1122        let side_sign = match trade.aggressor_side {
1123            AggressorSide::Buyer => 1.0,
1124            AggressorSide::Seller => -1.0,
1125            AggressorSide::NoAggressor => {
1126                self.core
1127                    .apply_update(trade.price, trade.size, trade.ts_init);
1128                return;
1129            }
1130        };
1131
1132        let mut size_remaining = trade.size.as_f64();
1133        while size_remaining > 0.0 {
1134            let value_remaining = price_f64 * size_remaining;
1135            let current_sign = self.imbalance_value.signum();
1136
1137            if current_sign == 0.0 || current_sign == side_sign {
1138                let needed = self.step_value - self.imbalance_value.abs();
1139                if value_remaining <= needed {
1140                    self.imbalance_value += side_sign * value_remaining;
1141                    self.core.apply_update(
1142                        trade.price,
1143                        Quantity::new(size_remaining, trade.size.precision),
1144                        trade.ts_init,
1145                    );
1146                    break;
1147                }
1148
1149                let value_chunk = needed;
1150                let size_chunk = value_chunk / price_f64;
1151                self.core.apply_update(
1152                    trade.price,
1153                    Quantity::new(size_chunk, trade.size.precision),
1154                    trade.ts_init,
1155                );
1156                self.imbalance_value += side_sign * value_chunk;
1157                size_remaining -= size_chunk;
1158
1159                if self.imbalance_value.abs() >= self.step_value {
1160                    self.core.build_now_and_send();
1161                    self.imbalance_value = 0.0;
1162                }
1163            } else {
1164                // Opposing side: first neutralize existing imbalance
1165                let value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1166                let size_chunk = value_to_flatten / price_f64;
1167                self.core.apply_update(
1168                    trade.price,
1169                    Quantity::new(size_chunk, trade.size.precision),
1170                    trade.ts_init,
1171                );
1172                self.imbalance_value += side_sign * value_to_flatten;
1173                size_remaining -= size_chunk;
1174            }
1175        }
1176    }
1177
1178    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1179        self.core.builder.update_bar(bar, volume, ts_init);
1180    }
1181}
1182
1183/// Aggregates bars based on consecutive buy/sell notional runs.
1184pub struct ValueRunsBarAggregator {
1185    core: BarAggregatorCore,
1186    current_run_side: Option<AggressorSide>,
1187    run_value: f64,
1188    step_value: f64,
1189}
1190
1191impl Debug for ValueRunsBarAggregator {
1192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193        f.debug_struct(stringify!(ValueRunsBarAggregator))
1194            .field("core", &self.core)
1195            .field("current_run_side", &self.current_run_side)
1196            .field("run_value", &self.run_value)
1197            .field("step_value", &self.step_value)
1198            .finish()
1199    }
1200}
1201
1202impl ValueRunsBarAggregator {
1203    /// Creates a new [`ValueRunsBarAggregator`] instance.
1204    ///
1205    /// # Panics
1206    ///
1207    /// This function panics if:
1208    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1209    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1210    pub fn new<H: FnMut(Bar) + 'static>(
1211        bar_type: BarType,
1212        price_precision: u8,
1213        size_precision: u8,
1214        handler: H,
1215    ) -> Self {
1216        Self {
1217            core: BarAggregatorCore::new(
1218                bar_type.standard(),
1219                price_precision,
1220                size_precision,
1221                handler,
1222            ),
1223            current_run_side: None,
1224            run_value: 0.0,
1225            step_value: bar_type.spec().step.get() as f64,
1226        }
1227    }
1228}
1229
1230impl BarAggregator for ValueRunsBarAggregator {
1231    fn bar_type(&self) -> BarType {
1232        self.core.bar_type
1233    }
1234
1235    fn is_running(&self) -> bool {
1236        self.core.is_running
1237    }
1238
1239    fn set_is_running(&mut self, value: bool) {
1240        self.core.set_is_running(value);
1241    }
1242
1243    /// Apply the given update to the aggregator.
1244    ///
1245    /// Note: side-aware logic lives in `handle_trade`. This method is used for
1246    /// quote/bar updates where no aggressor side is available.
1247    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1248        self.core.apply_update(price, size, ts_init);
1249    }
1250
1251    fn handle_trade(&mut self, trade: TradeTick) {
1252        let price_f64 = trade.price.as_f64();
1253        if price_f64 == 0.0 {
1254            self.core
1255                .apply_update(trade.price, trade.size, trade.ts_init);
1256            return;
1257        }
1258
1259        let side = match trade.aggressor_side {
1260            AggressorSide::Buyer => Some(AggressorSide::Buyer),
1261            AggressorSide::Seller => Some(AggressorSide::Seller),
1262            AggressorSide::NoAggressor => None,
1263        };
1264
1265        let Some(side) = side else {
1266            self.core
1267                .apply_update(trade.price, trade.size, trade.ts_init);
1268            return;
1269        };
1270
1271        if self.current_run_side != Some(side) {
1272            self.current_run_side = Some(side);
1273            self.run_value = 0.0;
1274            self.core.builder.reset();
1275        }
1276
1277        let mut size_remaining = trade.size.as_f64();
1278        while size_remaining > 0.0 {
1279            let value_update = price_f64 * size_remaining;
1280            if self.run_value + value_update < self.step_value {
1281                self.run_value += value_update;
1282                self.core.apply_update(
1283                    trade.price,
1284                    Quantity::new(size_remaining, trade.size.precision),
1285                    trade.ts_init,
1286                );
1287                break;
1288            }
1289
1290            let value_needed = self.step_value - self.run_value;
1291            let size_chunk = value_needed / price_f64;
1292            self.core.apply_update(
1293                trade.price,
1294                Quantity::new(size_chunk, trade.size.precision),
1295                trade.ts_init,
1296            );
1297
1298            self.core.build_now_and_send();
1299            self.run_value = 0.0;
1300            self.current_run_side = None;
1301            size_remaining -= size_chunk;
1302        }
1303    }
1304
1305    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1306        self.core.builder.update_bar(bar, volume, ts_init);
1307    }
1308}
1309
1310/// Provides a means of building Renko bars aggregated from quote and trades.
1311///
1312/// Renko bars are created when the price moves by a fixed amount (brick size)
1313/// regardless of time or volume. Each bar represents a price movement equal
1314/// to the step size in the bar specification.
1315pub struct RenkoBarAggregator {
1316    core: BarAggregatorCore,
1317    pub brick_size: PriceRaw,
1318    last_close: Option<Price>,
1319}
1320
1321impl Debug for RenkoBarAggregator {
1322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1323        f.debug_struct(stringify!(RenkoBarAggregator))
1324            .field("core", &self.core)
1325            .field("brick_size", &self.brick_size)
1326            .field("last_close", &self.last_close)
1327            .finish()
1328    }
1329}
1330
1331impl RenkoBarAggregator {
1332    /// Creates a new [`RenkoBarAggregator`] instance.
1333    ///
1334    /// # Panics
1335    ///
1336    /// This function panics if:
1337    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1338    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1339    pub fn new<H: FnMut(Bar) + 'static>(
1340        bar_type: BarType,
1341        price_precision: u8,
1342        size_precision: u8,
1343        price_increment: Price,
1344        handler: H,
1345    ) -> Self {
1346        // Calculate brick size in raw price units (step * price_increment.raw)
1347        let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1348
1349        Self {
1350            core: BarAggregatorCore::new(
1351                bar_type.standard(),
1352                price_precision,
1353                size_precision,
1354                handler,
1355            ),
1356            brick_size,
1357            last_close: None,
1358        }
1359    }
1360}
1361
1362impl BarAggregator for RenkoBarAggregator {
1363    fn bar_type(&self) -> BarType {
1364        self.core.bar_type
1365    }
1366
1367    fn is_running(&self) -> bool {
1368        self.core.is_running
1369    }
1370
1371    fn set_is_running(&mut self, value: bool) {
1372        self.core.set_is_running(value);
1373    }
1374
1375    /// Apply the given update to the aggregator.
1376    ///
1377    /// For Renko bars, we check if the price movement from the last close
1378    /// is greater than or equal to the brick size. If so, we create new bars.
1379    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1380        // Always update the builder with the current tick
1381        self.core.apply_update(price, size, ts_init);
1382
1383        // Initialize last_close if this is the first update
1384        if self.last_close.is_none() {
1385            self.last_close = Some(price);
1386            return;
1387        }
1388
1389        let last_close = self.last_close.unwrap();
1390
1391        // Convert prices to raw units (integers) to avoid floating point precision issues
1392        let current_raw = price.raw;
1393        let last_close_raw = last_close.raw;
1394        let price_diff_raw = current_raw - last_close_raw;
1395        let abs_price_diff_raw = price_diff_raw.abs();
1396
1397        // Check if we need to create one or more Renko bars
1398        if abs_price_diff_raw >= self.brick_size {
1399            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1400            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1401            let mut current_close = last_close;
1402
1403            // Store the current builder volume to distribute across bricks
1404            let total_volume = self.core.builder.volume;
1405
1406            for _i in 0..num_bricks {
1407                // Calculate the close price for this brick using raw price units
1408                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1409                let brick_close = Price::from_raw(brick_close_raw, price.precision);
1410
1411                // For Renko bars: open = previous close, high/low depend on direction
1412                let (brick_high, brick_low) = if direction > 0.0 {
1413                    (brick_close, current_close)
1414                } else {
1415                    (current_close, brick_close)
1416                };
1417
1418                // Reset builder for this brick
1419                self.core.builder.reset();
1420                self.core.builder.open = Some(current_close);
1421                self.core.builder.high = Some(brick_high);
1422                self.core.builder.low = Some(brick_low);
1423                self.core.builder.close = Some(brick_close);
1424                self.core.builder.volume = total_volume; // Each brick gets the full volume
1425                self.core.builder.count = 1;
1426                self.core.builder.ts_last = ts_init;
1427                self.core.builder.initialized = true;
1428
1429                // Build and send the bar
1430                self.core.build_and_send(ts_init, ts_init);
1431
1432                // Update for the next brick
1433                current_close = brick_close;
1434                self.last_close = Some(brick_close);
1435            }
1436        }
1437    }
1438
1439    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1440        // Always update the builder with the current bar
1441        self.core.builder.update_bar(bar, volume, ts_init);
1442
1443        // Initialize last_close if this is the first update
1444        if self.last_close.is_none() {
1445            self.last_close = Some(bar.close);
1446            return;
1447        }
1448
1449        let last_close = self.last_close.unwrap();
1450
1451        // Convert prices to raw units (integers) to avoid floating point precision issues
1452        let current_raw = bar.close.raw;
1453        let last_close_raw = last_close.raw;
1454        let price_diff_raw = current_raw - last_close_raw;
1455        let abs_price_diff_raw = price_diff_raw.abs();
1456
1457        // Check if we need to create one or more Renko bars
1458        if abs_price_diff_raw >= self.brick_size {
1459            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1460            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1461            let mut current_close = last_close;
1462
1463            // Store the current builder volume to distribute across bricks
1464            let total_volume = self.core.builder.volume;
1465
1466            for _i in 0..num_bricks {
1467                // Calculate the close price for this brick using raw price units
1468                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1469                let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1470
1471                // For Renko bars: open = previous close, high/low depend on direction
1472                let (brick_high, brick_low) = if direction > 0.0 {
1473                    (brick_close, current_close)
1474                } else {
1475                    (current_close, brick_close)
1476                };
1477
1478                // Reset builder for this brick
1479                self.core.builder.reset();
1480                self.core.builder.open = Some(current_close);
1481                self.core.builder.high = Some(brick_high);
1482                self.core.builder.low = Some(brick_low);
1483                self.core.builder.close = Some(brick_close);
1484                self.core.builder.volume = total_volume; // Each brick gets the full volume
1485                self.core.builder.count = 1;
1486                self.core.builder.ts_last = ts_init;
1487                self.core.builder.initialized = true;
1488
1489                // Build and send the bar
1490                self.core.build_and_send(ts_init, ts_init);
1491
1492                // Update for the next brick
1493                current_close = brick_close;
1494                self.last_close = Some(brick_close);
1495            }
1496        }
1497    }
1498}
1499
1500/// Provides a means of building time bars aggregated from quote and trades.
1501///
1502/// At each aggregation time interval, a bar is created and sent to the handler.
1503pub struct TimeBarAggregator {
1504    core: BarAggregatorCore,
1505    clock: Rc<RefCell<dyn Clock>>,
1506    build_with_no_updates: bool,
1507    timestamp_on_close: bool,
1508    is_left_open: bool,
1509    stored_open_ns: UnixNanos,
1510    timer_name: String,
1511    interval_ns: UnixNanos,
1512    next_close_ns: UnixNanos,
1513    bar_build_delay: u64,
1514    time_bars_origin_offset: Option<TimeDelta>,
1515    skip_first_non_full_bar: bool,
1516    pub historical_mode: bool,
1517    historical_events: Vec<TimeEvent>,
1518    aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1519}
1520
1521impl Debug for TimeBarAggregator {
1522    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1523        f.debug_struct(stringify!(TimeBarAggregator))
1524            .field("core", &self.core)
1525            .field("build_with_no_updates", &self.build_with_no_updates)
1526            .field("timestamp_on_close", &self.timestamp_on_close)
1527            .field("is_left_open", &self.is_left_open)
1528            .field("timer_name", &self.timer_name)
1529            .field("interval_ns", &self.interval_ns)
1530            .field("bar_build_delay", &self.bar_build_delay)
1531            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1532            .finish()
1533    }
1534}
1535
1536impl TimeBarAggregator {
1537    /// Creates a new [`TimeBarAggregator`] instance.
1538    ///
1539    /// # Panics
1540    ///
1541    /// This function panics if:
1542    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1543    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1544    #[allow(clippy::too_many_arguments)]
1545    pub fn new<H: FnMut(Bar) + 'static>(
1546        bar_type: BarType,
1547        price_precision: u8,
1548        size_precision: u8,
1549        clock: Rc<RefCell<dyn Clock>>,
1550        handler: H,
1551        build_with_no_updates: bool,
1552        timestamp_on_close: bool,
1553        interval_type: BarIntervalType,
1554        time_bars_origin_offset: Option<TimeDelta>,
1555        bar_build_delay: u64,
1556        skip_first_non_full_bar: bool,
1557    ) -> Self {
1558        let is_left_open = match interval_type {
1559            BarIntervalType::LeftOpen => true,
1560            BarIntervalType::RightOpen => false,
1561        };
1562
1563        let core = BarAggregatorCore::new(
1564            bar_type.standard(),
1565            price_precision,
1566            size_precision,
1567            handler,
1568        );
1569
1570        Self {
1571            core,
1572            clock,
1573            build_with_no_updates,
1574            timestamp_on_close,
1575            is_left_open,
1576            stored_open_ns: UnixNanos::default(),
1577            timer_name: bar_type.to_string(),
1578            interval_ns: get_bar_interval_ns(&bar_type),
1579            next_close_ns: UnixNanos::default(),
1580            bar_build_delay,
1581            time_bars_origin_offset,
1582            skip_first_non_full_bar,
1583            historical_mode: false,
1584            historical_events: Vec::new(),
1585            aggregator_weak: None,
1586        }
1587    }
1588
1589    /// Sets the clock for the aggregator (internal method).
1590    pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1591        self.clock = clock;
1592    }
1593
1594    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
1595    ///
1596    /// This matches the Cython `start_timer()` method exactly.
1597    /// Creates a callback to `build_bar` using a weak reference to the aggregator.
1598    ///
1599    /// # Panics
1600    ///
1601    /// Panics if aggregator_rc is None and aggregator_weak hasn't been set, or if timer registration fails.
1602    pub fn start_timer_internal(
1603        &mut self,
1604        aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1605    ) {
1606        // Create callback that calls build_bar through the weak reference
1607        let aggregator_weak = if let Some(rc) = aggregator_rc {
1608            // Store weak reference for future use (e.g., in build_bar for month/year)
1609            let weak = Rc::downgrade(&rc);
1610            self.aggregator_weak = Some(weak.clone());
1611            weak
1612        } else {
1613            // Use existing weak reference (for historical mode where it was set earlier)
1614            self.aggregator_weak
1615                .as_ref()
1616                .expect("Aggregator weak reference must be set before calling start_timer()")
1617                .clone()
1618        };
1619
1620        let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
1621            if let Some(agg) = aggregator_weak.upgrade() {
1622                agg.borrow_mut().build_bar(event);
1623            }
1624        }));
1625
1626        // Computing start_time
1627        let now = self.clock.borrow().utc_now();
1628        let mut start_time =
1629            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1630        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1631
1632        // Closing a partial bar at the transition from historical to backtest data
1633        let fire_immediately = start_time == now;
1634
1635        self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1636
1637        let spec = &self.bar_type().spec();
1638        let start_time_ns = UnixNanos::from(start_time);
1639
1640        if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1641            self.clock
1642                .borrow_mut()
1643                .set_timer_ns(
1644                    &self.timer_name,
1645                    self.interval_ns.as_u64(),
1646                    Some(start_time_ns),
1647                    None,
1648                    Some(callback),
1649                    Some(true), // allow_past
1650                    Some(fire_immediately),
1651                )
1652                .expect(FAILED);
1653
1654            if fire_immediately {
1655                self.next_close_ns = start_time_ns;
1656            } else {
1657                let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1658                self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1659            }
1660
1661            self.stored_open_ns = self.next_close_ns - self.interval_ns;
1662        } else {
1663            // The monthly/yearly alert time is defined iteratively at each alert time as there is no regular interval
1664            let alert_time = if fire_immediately {
1665                start_time
1666            } else {
1667                let step = spec.step.get() as u32;
1668                if spec.aggregation == BarAggregation::Month {
1669                    add_n_months(start_time, step).expect(FAILED)
1670                } else {
1671                    // Year aggregation
1672                    add_n_years(start_time, step).expect(FAILED)
1673                }
1674            };
1675
1676            self.clock
1677                .borrow_mut()
1678                .set_time_alert_ns(
1679                    &self.timer_name,
1680                    UnixNanos::from(alert_time),
1681                    Some(callback),
1682                    Some(true), // allow_past
1683                )
1684                .expect(FAILED);
1685
1686            self.next_close_ns = UnixNanos::from(alert_time);
1687            self.stored_open_ns = UnixNanos::from(start_time);
1688        }
1689
1690        log::debug!(
1691            "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1692            self.timer_name,
1693            start_time,
1694            self.historical_mode,
1695            fire_immediately,
1696            now,
1697            self.bar_build_delay
1698        );
1699    }
1700
1701    /// Stops the time bar aggregator.
1702    pub fn stop(&mut self) {
1703        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1704    }
1705
1706    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1707        if self.skip_first_non_full_bar {
1708            self.core.builder.reset();
1709            self.skip_first_non_full_bar = false;
1710        } else {
1711            self.core.build_and_send(ts_event, ts_init);
1712        }
1713    }
1714
1715    fn build_bar(&mut self, event: TimeEvent) {
1716        if !self.core.builder.initialized {
1717            return;
1718        }
1719
1720        if !self.build_with_no_updates && self.core.builder.count == 0 {
1721            return; // Do not build bar when no update
1722        }
1723
1724        let ts_init = event.ts_event;
1725        let ts_event = if self.is_left_open {
1726            if self.timestamp_on_close {
1727                event.ts_event
1728            } else {
1729                self.stored_open_ns
1730            }
1731        } else {
1732            self.stored_open_ns
1733        };
1734
1735        self.build_and_send(ts_event, ts_init);
1736
1737        // Close time becomes the next open time
1738        self.stored_open_ns = event.ts_event;
1739
1740        if self.bar_type().spec().aggregation == BarAggregation::Month {
1741            let step = self.bar_type().spec().step.get() as u32;
1742            let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1743
1744            self.clock
1745                .borrow_mut()
1746                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1747                .expect(FAILED);
1748
1749            self.next_close_ns = alert_time_ns;
1750        } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1751            let step = self.bar_type().spec().step.get() as u32;
1752            let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1753
1754            self.clock
1755                .borrow_mut()
1756                .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1757                .expect(FAILED);
1758
1759            self.next_close_ns = alert_time_ns;
1760        } else {
1761            // On receiving this event, timer should now have a new `next_time_ns`
1762            self.next_close_ns = self
1763                .clock
1764                .borrow()
1765                .next_time_ns(&self.timer_name)
1766                .unwrap_or_default();
1767        }
1768    }
1769
1770    fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1771        if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1772            // In historical mode, clock is always a TestClock (set by data engine)
1773            {
1774                let mut clock_borrow = self.clock.borrow_mut();
1775                let test_clock = clock_borrow
1776                    .as_any_mut()
1777                    .downcast_mut::<TestClock>()
1778                    .expect("Expected TestClock in historical mode");
1779                test_clock.set_time(ts_init);
1780            }
1781            // In historical mode, weak reference should already be set
1782            self.start_timer_internal(None);
1783        }
1784
1785        // Advance this aggregator's independent clock and collect timer events
1786        {
1787            let mut clock_borrow = self.clock.borrow_mut();
1788            let test_clock = clock_borrow
1789                .as_any_mut()
1790                .downcast_mut::<TestClock>()
1791                .expect("Expected TestClock in historical mode");
1792            self.historical_events = test_clock.advance_time(ts_init, true);
1793        }
1794    }
1795
1796    fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1797        // Process timer events after data processing
1798        // Collect events first to avoid borrow checker issues
1799        let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1800        for event in events {
1801            self.build_bar(event);
1802        }
1803    }
1804
1805    /// Sets historical events (called by data engine after advancing clock)
1806    pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1807        self.historical_events = events;
1808    }
1809}
1810
1811impl BarAggregator for TimeBarAggregator {
1812    fn bar_type(&self) -> BarType {
1813        self.core.bar_type
1814    }
1815
1816    fn is_running(&self) -> bool {
1817        self.core.is_running
1818    }
1819
1820    fn set_is_running(&mut self, value: bool) {
1821        self.core.set_is_running(value);
1822    }
1823
1824    /// Stop time-based aggregator by canceling its timer.
1825    fn stop(&mut self) {
1826        Self::stop(self);
1827    }
1828
1829    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1830        if self.historical_mode {
1831            self.preprocess_historical_events(ts_init);
1832        }
1833
1834        self.core.apply_update(price, size, ts_init);
1835
1836        if self.historical_mode {
1837            self.postprocess_historical_events(ts_init);
1838        }
1839    }
1840
1841    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1842        if self.historical_mode {
1843            self.preprocess_historical_events(ts_init);
1844        }
1845
1846        self.core.builder.update_bar(bar, volume, ts_init);
1847
1848        if self.historical_mode {
1849            self.postprocess_historical_events(ts_init);
1850        }
1851    }
1852
1853    fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1854        self.historical_mode = historical_mode;
1855        self.core.handler = handler;
1856    }
1857
1858    fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1859        self.set_historical_events_internal(events);
1860    }
1861
1862    fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1863        self.set_clock_internal(clock);
1864    }
1865
1866    fn build_bar(&mut self, event: TimeEvent) {
1867        // Delegate to the implementation method
1868        // We use the struct name here to disambiguate from the trait method
1869        {
1870            #[allow(clippy::use_self)]
1871            TimeBarAggregator::build_bar(self, event);
1872        }
1873    }
1874
1875    fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1876        self.aggregator_weak = Some(weak);
1877    }
1878
1879    fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1880        self.start_timer_internal(aggregator_rc);
1881    }
1882}
1883
1884////////////////////////////////////////////////////////////////////////////////
1885// Tests
1886////////////////////////////////////////////////////////////////////////////////
1887#[cfg(test)]
1888mod tests {
1889    use std::sync::{Arc, Mutex};
1890
1891    use nautilus_common::clock::TestClock;
1892    use nautilus_core::{MUTEX_POISONED, UUID4};
1893    use nautilus_model::{
1894        data::{BarSpecification, BarType},
1895        enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1896        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1897        types::{Price, Quantity},
1898    };
1899    use rstest::rstest;
1900    use ustr::Ustr;
1901
1902    use super::*;
1903
1904    #[rstest]
1905    fn test_bar_builder_initialization(equity_aapl: Equity) {
1906        let instrument = InstrumentAny::Equity(equity_aapl);
1907        let bar_type = BarType::new(
1908            instrument.id(),
1909            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1910            AggregationSource::Internal,
1911        );
1912        let builder = BarBuilder::new(
1913            bar_type,
1914            instrument.price_precision(),
1915            instrument.size_precision(),
1916        );
1917
1918        assert!(!builder.initialized);
1919        assert_eq!(builder.ts_last, 0);
1920        assert_eq!(builder.count, 0);
1921    }
1922
1923    #[rstest]
1924    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1925        let instrument = InstrumentAny::Equity(equity_aapl);
1926        let bar_type = BarType::new(
1927            instrument.id(),
1928            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1929            AggregationSource::Internal,
1930        );
1931        let mut builder = BarBuilder::new(
1932            bar_type,
1933            instrument.price_precision(),
1934            instrument.size_precision(),
1935        );
1936
1937        builder.update(
1938            Price::from("100.00"),
1939            Quantity::from(1),
1940            UnixNanos::from(1000),
1941        );
1942        builder.update(
1943            Price::from("95.00"),
1944            Quantity::from(1),
1945            UnixNanos::from(2000),
1946        );
1947        builder.update(
1948            Price::from("105.00"),
1949            Quantity::from(1),
1950            UnixNanos::from(3000),
1951        );
1952
1953        let bar = builder.build_now();
1954        assert!(bar.high > bar.low);
1955        assert_eq!(bar.open, Price::from("100.00"));
1956        assert_eq!(bar.high, Price::from("105.00"));
1957        assert_eq!(bar.low, Price::from("95.00"));
1958        assert_eq!(bar.close, Price::from("105.00"));
1959    }
1960
1961    #[rstest]
1962    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1963        let instrument = InstrumentAny::Equity(equity_aapl);
1964        let bar_type = BarType::new(
1965            instrument.id(),
1966            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1967            AggregationSource::Internal,
1968        );
1969        let mut builder = BarBuilder::new(
1970            bar_type,
1971            instrument.price_precision(),
1972            instrument.size_precision(),
1973        );
1974
1975        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1976        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1977
1978        assert_eq!(builder.ts_last, 1_000);
1979        assert_eq!(builder.count, 1);
1980    }
1981
1982    #[rstest]
1983    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1984        let instrument = InstrumentAny::Equity(equity_aapl);
1985        let bar_type = BarType::new(
1986            instrument.id(),
1987            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1988            AggregationSource::Internal,
1989        );
1990        let mut builder = BarBuilder::new(
1991            bar_type,
1992            instrument.price_precision(),
1993            instrument.size_precision(),
1994        );
1995
1996        builder.update(
1997            Price::from("1.00000"),
1998            Quantity::from(1),
1999            UnixNanos::default(),
2000        );
2001
2002        assert!(builder.initialized);
2003        assert_eq!(builder.ts_last, 0);
2004        assert_eq!(builder.count, 1);
2005    }
2006
2007    #[rstest]
2008    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2009        equity_aapl: Equity,
2010    ) {
2011        let instrument = InstrumentAny::Equity(equity_aapl);
2012        let bar_type = BarType::new(
2013            instrument.id(),
2014            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2015            AggregationSource::Internal,
2016        );
2017        let mut builder = BarBuilder::new(bar_type, 2, 0);
2018
2019        builder.update(
2020            Price::from("1.00000"),
2021            Quantity::from(1),
2022            UnixNanos::from(1_000),
2023        );
2024        builder.update(
2025            Price::from("1.00001"),
2026            Quantity::from(1),
2027            UnixNanos::from(500),
2028        );
2029
2030        assert!(builder.initialized);
2031        assert_eq!(builder.ts_last, 1_000);
2032        assert_eq!(builder.count, 1);
2033    }
2034
2035    #[rstest]
2036    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2037        let instrument = InstrumentAny::Equity(equity_aapl);
2038        let bar_type = BarType::new(
2039            instrument.id(),
2040            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2041            AggregationSource::Internal,
2042        );
2043        let mut builder = BarBuilder::new(
2044            bar_type,
2045            instrument.price_precision(),
2046            instrument.size_precision(),
2047        );
2048
2049        for _ in 0..5 {
2050            builder.update(
2051                Price::from("1.00000"),
2052                Quantity::from(1),
2053                UnixNanos::from(1_000),
2054            );
2055        }
2056
2057        assert_eq!(builder.count, 5);
2058    }
2059
2060    #[rstest]
2061    #[should_panic]
2062    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2063        let instrument = InstrumentAny::Equity(equity_aapl);
2064        let bar_type = BarType::new(
2065            instrument.id(),
2066            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2067            AggregationSource::Internal,
2068        );
2069        let mut builder = BarBuilder::new(
2070            bar_type,
2071            instrument.price_precision(),
2072            instrument.size_precision(),
2073        );
2074        let _ = builder.build_now();
2075    }
2076
2077    #[rstest]
2078    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2079        let instrument = InstrumentAny::Equity(equity_aapl);
2080        let bar_type = BarType::new(
2081            instrument.id(),
2082            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2083            AggregationSource::Internal,
2084        );
2085        let mut builder = BarBuilder::new(
2086            bar_type,
2087            instrument.price_precision(),
2088            instrument.size_precision(),
2089        );
2090
2091        builder.update(
2092            Price::from("1.00001"),
2093            Quantity::from(2),
2094            UnixNanos::default(),
2095        );
2096        builder.update(
2097            Price::from("1.00002"),
2098            Quantity::from(2),
2099            UnixNanos::default(),
2100        );
2101        builder.update(
2102            Price::from("1.00000"),
2103            Quantity::from(1),
2104            UnixNanos::from(1_000_000_000),
2105        );
2106
2107        let bar = builder.build_now();
2108
2109        assert_eq!(bar.open, Price::from("1.00001"));
2110        assert_eq!(bar.high, Price::from("1.00002"));
2111        assert_eq!(bar.low, Price::from("1.00000"));
2112        assert_eq!(bar.close, Price::from("1.00000"));
2113        assert_eq!(bar.volume, Quantity::from(5));
2114        assert_eq!(bar.ts_init, 1_000_000_000);
2115        assert_eq!(builder.ts_last, 1_000_000_000);
2116        assert_eq!(builder.count, 0);
2117    }
2118
2119    #[rstest]
2120    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2121        let instrument = InstrumentAny::Equity(equity_aapl);
2122        let bar_type = BarType::new(
2123            instrument.id(),
2124            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2125            AggregationSource::Internal,
2126        );
2127        let mut builder = BarBuilder::new(bar_type, 2, 0);
2128
2129        builder.update(
2130            Price::from("1.00001"),
2131            Quantity::from(1),
2132            UnixNanos::default(),
2133        );
2134        builder.build_now();
2135
2136        builder.update(
2137            Price::from("1.00000"),
2138            Quantity::from(1),
2139            UnixNanos::default(),
2140        );
2141        builder.update(
2142            Price::from("1.00003"),
2143            Quantity::from(1),
2144            UnixNanos::default(),
2145        );
2146        builder.update(
2147            Price::from("1.00002"),
2148            Quantity::from(1),
2149            UnixNanos::default(),
2150        );
2151
2152        let bar = builder.build_now();
2153
2154        assert_eq!(bar.open, Price::from("1.00000"));
2155        assert_eq!(bar.high, Price::from("1.00003"));
2156        assert_eq!(bar.low, Price::from("1.00000"));
2157        assert_eq!(bar.close, Price::from("1.00002"));
2158        assert_eq!(bar.volume, Quantity::from(3));
2159    }
2160
2161    #[rstest]
2162    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2163        let instrument = InstrumentAny::Equity(equity_aapl);
2164        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2165        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2166        let handler = Arc::new(Mutex::new(Vec::new()));
2167        let handler_clone = Arc::clone(&handler);
2168
2169        let mut aggregator = TickBarAggregator::new(
2170            bar_type,
2171            instrument.price_precision(),
2172            instrument.size_precision(),
2173            move |bar: Bar| {
2174                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2175                handler_guard.push(bar);
2176            },
2177        );
2178
2179        let trade = TradeTick::default();
2180        aggregator.handle_trade(trade);
2181
2182        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2183        assert_eq!(handler_guard.len(), 0);
2184    }
2185
2186    #[rstest]
2187    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2188        let instrument = InstrumentAny::Equity(equity_aapl);
2189        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2190        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2191        let handler = Arc::new(Mutex::new(Vec::new()));
2192        let handler_clone = Arc::clone(&handler);
2193
2194        let mut aggregator = TickBarAggregator::new(
2195            bar_type,
2196            instrument.price_precision(),
2197            instrument.size_precision(),
2198            move |bar: Bar| {
2199                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2200                handler_guard.push(bar);
2201            },
2202        );
2203
2204        let trade = TradeTick::default();
2205        aggregator.handle_trade(trade);
2206        aggregator.handle_trade(trade);
2207        aggregator.handle_trade(trade);
2208
2209        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2210        let bar = handler_guard.first().unwrap();
2211        assert_eq!(handler_guard.len(), 1);
2212        assert_eq!(bar.open, trade.price);
2213        assert_eq!(bar.high, trade.price);
2214        assert_eq!(bar.low, trade.price);
2215        assert_eq!(bar.close, trade.price);
2216        assert_eq!(bar.volume, Quantity::from(300000));
2217        assert_eq!(bar.ts_event, trade.ts_event);
2218        assert_eq!(bar.ts_init, trade.ts_init);
2219    }
2220
2221    #[rstest]
2222    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2223        let instrument = InstrumentAny::Equity(equity_aapl);
2224        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2225        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2226        let handler = Arc::new(Mutex::new(Vec::new()));
2227        let handler_clone = Arc::clone(&handler);
2228
2229        let mut aggregator = TickBarAggregator::new(
2230            bar_type,
2231            instrument.price_precision(),
2232            instrument.size_precision(),
2233            move |bar: Bar| {
2234                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2235                handler_guard.push(bar);
2236            },
2237        );
2238
2239        aggregator.update(
2240            Price::from("1.00001"),
2241            Quantity::from(1),
2242            UnixNanos::default(),
2243        );
2244        aggregator.update(
2245            Price::from("1.00002"),
2246            Quantity::from(1),
2247            UnixNanos::from(1000),
2248        );
2249        aggregator.update(
2250            Price::from("1.00003"),
2251            Quantity::from(1),
2252            UnixNanos::from(2000),
2253        );
2254
2255        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2256        assert_eq!(handler_guard.len(), 1);
2257
2258        let bar = handler_guard.first().unwrap();
2259        assert_eq!(bar.open, Price::from("1.00001"));
2260        assert_eq!(bar.high, Price::from("1.00003"));
2261        assert_eq!(bar.low, Price::from("1.00001"));
2262        assert_eq!(bar.close, Price::from("1.00003"));
2263        assert_eq!(bar.volume, Quantity::from(3));
2264    }
2265
2266    #[rstest]
2267    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2268        let instrument = InstrumentAny::Equity(equity_aapl);
2269        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2270        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2271        let handler = Arc::new(Mutex::new(Vec::new()));
2272        let handler_clone = Arc::clone(&handler);
2273
2274        let mut aggregator = TickBarAggregator::new(
2275            bar_type,
2276            instrument.price_precision(),
2277            instrument.size_precision(),
2278            move |bar: Bar| {
2279                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2280                handler_guard.push(bar);
2281            },
2282        );
2283
2284        aggregator.update(
2285            Price::from("1.00001"),
2286            Quantity::from(1),
2287            UnixNanos::default(),
2288        );
2289        aggregator.update(
2290            Price::from("1.00002"),
2291            Quantity::from(1),
2292            UnixNanos::from(1000),
2293        );
2294        aggregator.update(
2295            Price::from("1.00003"),
2296            Quantity::from(1),
2297            UnixNanos::from(2000),
2298        );
2299        aggregator.update(
2300            Price::from("1.00004"),
2301            Quantity::from(1),
2302            UnixNanos::from(3000),
2303        );
2304
2305        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2306        assert_eq!(handler_guard.len(), 2);
2307
2308        let bar1 = &handler_guard[0];
2309        assert_eq!(bar1.open, Price::from("1.00001"));
2310        assert_eq!(bar1.close, Price::from("1.00002"));
2311        assert_eq!(bar1.volume, Quantity::from(2));
2312
2313        let bar2 = &handler_guard[1];
2314        assert_eq!(bar2.open, Price::from("1.00003"));
2315        assert_eq!(bar2.close, Price::from("1.00004"));
2316        assert_eq!(bar2.volume, Quantity::from(2));
2317    }
2318
2319    #[rstest]
2320    fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2321        let instrument = InstrumentAny::Equity(equity_aapl);
2322        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2323        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2324        let handler = Arc::new(Mutex::new(Vec::new()));
2325        let handler_clone = Arc::clone(&handler);
2326
2327        let mut aggregator = TickImbalanceBarAggregator::new(
2328            bar_type,
2329            instrument.price_precision(),
2330            instrument.size_precision(),
2331            move |bar: Bar| {
2332                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2333                handler_guard.push(bar);
2334            },
2335        );
2336
2337        let trade = TradeTick::default();
2338        aggregator.handle_trade(trade);
2339        aggregator.handle_trade(trade);
2340
2341        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2342        assert_eq!(handler_guard.len(), 1);
2343        let bar = handler_guard.first().unwrap();
2344        assert_eq!(bar.volume, Quantity::from(200000));
2345    }
2346
2347    #[rstest]
2348    fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2349        let instrument = InstrumentAny::Equity(equity_aapl);
2350        let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2351        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2352        let handler = Arc::new(Mutex::new(Vec::new()));
2353        let handler_clone = Arc::clone(&handler);
2354
2355        let mut aggregator = TickImbalanceBarAggregator::new(
2356            bar_type,
2357            instrument.price_precision(),
2358            instrument.size_precision(),
2359            move |bar: Bar| {
2360                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2361                handler_guard.push(bar);
2362            },
2363        );
2364
2365        let sell = TradeTick {
2366            aggressor_side: AggressorSide::Seller,
2367            ..TradeTick::default()
2368        };
2369
2370        aggregator.handle_trade(sell);
2371
2372        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2373        assert_eq!(handler_guard.len(), 1);
2374    }
2375
2376    #[rstest]
2377    fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2378        let instrument = InstrumentAny::Equity(equity_aapl);
2379        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2380        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2381        let handler = Arc::new(Mutex::new(Vec::new()));
2382        let handler_clone = Arc::clone(&handler);
2383
2384        let mut aggregator = TickRunsBarAggregator::new(
2385            bar_type,
2386            instrument.price_precision(),
2387            instrument.size_precision(),
2388            move |bar: Bar| {
2389                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2390                handler_guard.push(bar);
2391            },
2392        );
2393
2394        let buy = TradeTick::default();
2395        let sell = TradeTick {
2396            aggressor_side: AggressorSide::Seller,
2397            ..buy
2398        };
2399
2400        aggregator.handle_trade(buy);
2401        aggregator.handle_trade(buy);
2402        aggregator.handle_trade(sell);
2403        aggregator.handle_trade(sell);
2404
2405        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2406        assert_eq!(handler_guard.len(), 2);
2407    }
2408
2409    #[rstest]
2410    fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2411        let instrument = InstrumentAny::Equity(equity_aapl);
2412        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2413        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2414        let handler = Arc::new(Mutex::new(Vec::new()));
2415        let handler_clone = Arc::clone(&handler);
2416
2417        let mut aggregator = TickRunsBarAggregator::new(
2418            bar_type,
2419            instrument.price_precision(),
2420            instrument.size_precision(),
2421            move |bar: Bar| {
2422                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2423                handler_guard.push(bar);
2424            },
2425        );
2426
2427        let buy = TradeTick {
2428            size: Quantity::from(1),
2429            ..TradeTick::default()
2430        };
2431        let sell = TradeTick {
2432            aggressor_side: AggressorSide::Seller,
2433            size: Quantity::from(1),
2434            ..buy
2435        };
2436
2437        aggregator.handle_trade(buy);
2438        aggregator.handle_trade(buy);
2439        aggregator.handle_trade(sell);
2440        aggregator.handle_trade(sell);
2441
2442        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2443        assert_eq!(handler_guard.len(), 2);
2444        assert_eq!(handler_guard[0].volume, Quantity::from(2));
2445        assert_eq!(handler_guard[1].volume, Quantity::from(2));
2446    }
2447
2448    #[rstest]
2449    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2450        let instrument = InstrumentAny::Equity(equity_aapl);
2451        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2452        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2453        let handler = Arc::new(Mutex::new(Vec::new()));
2454        let handler_clone = Arc::clone(&handler);
2455
2456        let mut aggregator = VolumeBarAggregator::new(
2457            bar_type,
2458            instrument.price_precision(),
2459            instrument.size_precision(),
2460            move |bar: Bar| {
2461                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2462                handler_guard.push(bar);
2463            },
2464        );
2465
2466        aggregator.update(
2467            Price::from("1.00001"),
2468            Quantity::from(25),
2469            UnixNanos::default(),
2470        );
2471
2472        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2473        assert_eq!(handler_guard.len(), 2);
2474        let bar1 = &handler_guard[0];
2475        assert_eq!(bar1.volume, Quantity::from(10));
2476        let bar2 = &handler_guard[1];
2477        assert_eq!(bar2.volume, Quantity::from(10));
2478    }
2479
2480    #[rstest]
2481    fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2482        let instrument = InstrumentAny::Equity(equity_aapl);
2483        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2484        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2485        let handler = Arc::new(Mutex::new(Vec::new()));
2486        let handler_clone = Arc::clone(&handler);
2487
2488        let mut aggregator = VolumeRunsBarAggregator::new(
2489            bar_type,
2490            instrument.price_precision(),
2491            instrument.size_precision(),
2492            move |bar: Bar| {
2493                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2494                handler_guard.push(bar);
2495            },
2496        );
2497
2498        let buy = TradeTick {
2499            instrument_id: instrument.id(),
2500            price: Price::from("1.0"),
2501            size: Quantity::from(1),
2502            ..TradeTick::default()
2503        };
2504        let sell = TradeTick {
2505            aggressor_side: AggressorSide::Seller,
2506            ..buy
2507        };
2508
2509        aggregator.handle_trade(buy);
2510        aggregator.handle_trade(buy); // emit first bar at 2
2511        aggregator.handle_trade(sell);
2512        aggregator.handle_trade(sell); // emit second bar at 2 sell-side
2513
2514        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2515        assert!(handler_guard.len() >= 2);
2516        assert!(
2517            (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2518                < f64::EPSILON
2519        );
2520    }
2521
2522    #[rstest]
2523    fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2524        let instrument = InstrumentAny::Equity(equity_aapl);
2525        let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2526        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2527        let handler = Arc::new(Mutex::new(Vec::new()));
2528        let handler_clone = Arc::clone(&handler);
2529
2530        let mut aggregator = VolumeRunsBarAggregator::new(
2531            bar_type,
2532            instrument.price_precision(),
2533            instrument.size_precision(),
2534            move |bar: Bar| {
2535                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2536                handler_guard.push(bar);
2537            },
2538        );
2539
2540        let trade = TradeTick {
2541            instrument_id: instrument.id(),
2542            price: Price::from("1.0"),
2543            size: Quantity::from(5),
2544            ..TradeTick::default()
2545        };
2546
2547        aggregator.handle_trade(trade);
2548
2549        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2550        assert!(!handler_guard.is_empty());
2551        assert!(handler_guard[0].volume.as_f64() > 0.0);
2552        assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2553    }
2554
2555    #[rstest]
2556    fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2557        let instrument = InstrumentAny::Equity(equity_aapl);
2558        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2559        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2560        let handler = Arc::new(Mutex::new(Vec::new()));
2561        let handler_clone = Arc::clone(&handler);
2562
2563        let mut aggregator = VolumeImbalanceBarAggregator::new(
2564            bar_type,
2565            instrument.price_precision(),
2566            instrument.size_precision(),
2567            move |bar: Bar| {
2568                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2569                handler_guard.push(bar);
2570            },
2571        );
2572
2573        let trade_small = TradeTick {
2574            instrument_id: instrument.id(),
2575            price: Price::from("1.0"),
2576            size: Quantity::from(1),
2577            ..TradeTick::default()
2578        };
2579        let trade_large = TradeTick {
2580            size: Quantity::from(3),
2581            ..trade_small
2582        };
2583
2584        aggregator.handle_trade(trade_small);
2585        aggregator.handle_trade(trade_large);
2586
2587        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2588        assert_eq!(handler_guard.len(), 2);
2589        let total_output = handler_guard
2590            .iter()
2591            .map(|bar| bar.volume.as_f64())
2592            .sum::<f64>();
2593        let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2594        assert!((total_output - total_input).abs() < f64::EPSILON);
2595    }
2596
2597    #[rstest]
2598    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2599        let instrument = InstrumentAny::Equity(equity_aapl);
2600        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
2601        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2602        let handler = Arc::new(Mutex::new(Vec::new()));
2603        let handler_clone = Arc::clone(&handler);
2604
2605        let mut aggregator = ValueBarAggregator::new(
2606            bar_type,
2607            instrument.price_precision(),
2608            instrument.size_precision(),
2609            move |bar: Bar| {
2610                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2611                handler_guard.push(bar);
2612            },
2613        );
2614
2615        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
2616        aggregator.update(
2617            Price::from("100.00"),
2618            Quantity::from(5),
2619            UnixNanos::default(),
2620        );
2621        aggregator.update(
2622            Price::from("100.00"),
2623            Quantity::from(5),
2624            UnixNanos::from(1000),
2625        );
2626
2627        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2628        assert_eq!(handler_guard.len(), 1);
2629        let bar = handler_guard.first().unwrap();
2630        assert_eq!(bar.volume, Quantity::from(10));
2631    }
2632
2633    #[rstest]
2634    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2635        let instrument = InstrumentAny::Equity(equity_aapl);
2636        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2637        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2638        let handler = Arc::new(Mutex::new(Vec::new()));
2639        let handler_clone = Arc::clone(&handler);
2640
2641        let mut aggregator = ValueBarAggregator::new(
2642            bar_type,
2643            instrument.price_precision(),
2644            instrument.size_precision(),
2645            move |bar: Bar| {
2646                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2647                handler_guard.push(bar);
2648            },
2649        );
2650
2651        // Single large update: $100 * 25 = $2500 (should create 2 bars)
2652        aggregator.update(
2653            Price::from("100.00"),
2654            Quantity::from(25),
2655            UnixNanos::default(),
2656        );
2657
2658        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2659        assert_eq!(handler_guard.len(), 2);
2660        let remaining_value = aggregator.get_cumulative_value();
2661        assert!(remaining_value < 1000.0); // Should be less than threshold
2662    }
2663
2664    #[rstest]
2665    fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2666        let instrument = InstrumentAny::Equity(equity_aapl);
2667        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2668        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2669        let handler = Arc::new(Mutex::new(Vec::new()));
2670        let handler_clone = Arc::clone(&handler);
2671
2672        let mut aggregator = ValueBarAggregator::new(
2673            bar_type,
2674            instrument.price_precision(),
2675            instrument.size_precision(),
2676            move |bar: Bar| {
2677                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2678                handler_guard.push(bar);
2679            },
2680        );
2681
2682        // Update with zero price should not cause division by zero
2683        aggregator.update(
2684            Price::from("0.00"),
2685            Quantity::from(100),
2686            UnixNanos::default(),
2687        );
2688
2689        // No bars should be emitted since value is zero
2690        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2691        assert_eq!(handler_guard.len(), 0);
2692
2693        // Cumulative value should remain zero
2694        assert_eq!(aggregator.get_cumulative_value(), 0.0);
2695    }
2696
2697    #[rstest]
2698    fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2699        let instrument = InstrumentAny::Equity(equity_aapl);
2700        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2701        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2702        let handler = Arc::new(Mutex::new(Vec::new()));
2703        let handler_clone = Arc::clone(&handler);
2704
2705        let mut aggregator = ValueBarAggregator::new(
2706            bar_type,
2707            instrument.price_precision(),
2708            instrument.size_precision(),
2709            move |bar: Bar| {
2710                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2711                handler_guard.push(bar);
2712            },
2713        );
2714
2715        // Update with zero size should not cause issues
2716        aggregator.update(
2717            Price::from("100.00"),
2718            Quantity::from(0),
2719            UnixNanos::default(),
2720        );
2721
2722        // No bars should be emitted
2723        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2724        assert_eq!(handler_guard.len(), 0);
2725
2726        // Cumulative value should remain zero
2727        assert_eq!(aggregator.get_cumulative_value(), 0.0);
2728    }
2729
2730    #[rstest]
2731    fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2732        let instrument = InstrumentAny::Equity(equity_aapl);
2733        let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2734        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2735        let handler = Arc::new(Mutex::new(Vec::new()));
2736        let handler_clone = Arc::clone(&handler);
2737
2738        let mut aggregator = ValueImbalanceBarAggregator::new(
2739            bar_type,
2740            instrument.price_precision(),
2741            instrument.size_precision(),
2742            move |bar: Bar| {
2743                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2744                handler_guard.push(bar);
2745            },
2746        );
2747
2748        let buy = TradeTick {
2749            price: Price::from("5.0"),
2750            size: Quantity::from(2), // value 10, should emit one bar
2751            instrument_id: instrument.id(),
2752            ..TradeTick::default()
2753        };
2754        let sell = TradeTick {
2755            price: Price::from("5.0"),
2756            size: Quantity::from(2), // value 10, should emit another bar
2757            aggressor_side: AggressorSide::Seller,
2758            instrument_id: instrument.id(),
2759            ..buy
2760        };
2761
2762        aggregator.handle_trade(buy);
2763        aggregator.handle_trade(sell);
2764
2765        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2766        assert!(handler_guard.is_empty());
2767    }
2768
2769    #[rstest]
2770    fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2771        let instrument = InstrumentAny::Equity(equity_aapl);
2772        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2773        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2774        let handler = Arc::new(Mutex::new(Vec::new()));
2775        let handler_clone = Arc::clone(&handler);
2776
2777        let mut aggregator = ValueRunsBarAggregator::new(
2778            bar_type,
2779            instrument.price_precision(),
2780            instrument.size_precision(),
2781            move |bar: Bar| {
2782                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2783                handler_guard.push(bar);
2784            },
2785        );
2786
2787        let trade = TradeTick {
2788            price: Price::from("10.0"),
2789            size: Quantity::from(5),
2790            instrument_id: instrument.id(),
2791            ..TradeTick::default()
2792        };
2793
2794        aggregator.handle_trade(trade);
2795        aggregator.handle_trade(trade);
2796
2797        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2798        assert_eq!(handler_guard.len(), 1);
2799        let bar = handler_guard.first().unwrap();
2800        assert_eq!(bar.volume, Quantity::from(10));
2801    }
2802
2803    #[rstest]
2804    fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2805        let instrument = InstrumentAny::Equity(equity_aapl);
2806        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2807        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2808        let handler = Arc::new(Mutex::new(Vec::new()));
2809        let handler_clone = Arc::clone(&handler);
2810
2811        let mut aggregator = ValueRunsBarAggregator::new(
2812            bar_type,
2813            instrument.price_precision(),
2814            instrument.size_precision(),
2815            move |bar: Bar| {
2816                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2817                handler_guard.push(bar);
2818            },
2819        );
2820
2821        let buy = TradeTick {
2822            price: Price::from("10.0"),
2823            size: Quantity::from(5),
2824            instrument_id: instrument.id(),
2825            ..TradeTick::default()
2826        }; // value 50
2827        let sell = TradeTick {
2828            price: Price::from("10.0"),
2829            size: Quantity::from(10),
2830            aggressor_side: AggressorSide::Seller,
2831            ..buy
2832        }; // value 100
2833
2834        aggregator.handle_trade(buy);
2835        aggregator.handle_trade(sell);
2836
2837        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2838        assert_eq!(handler_guard.len(), 1);
2839        assert_eq!(handler_guard[0].volume, Quantity::from(10));
2840    }
2841
2842    #[rstest]
2843    fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2844        let instrument = InstrumentAny::Equity(equity_aapl);
2845        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2846        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2847        let handler = Arc::new(Mutex::new(Vec::new()));
2848        let handler_clone = Arc::clone(&handler);
2849
2850        let mut aggregator = TickRunsBarAggregator::new(
2851            bar_type,
2852            instrument.price_precision(),
2853            instrument.size_precision(),
2854            move |bar: Bar| {
2855                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2856                handler_guard.push(bar);
2857            },
2858        );
2859
2860        let buy = TradeTick::default();
2861
2862        aggregator.handle_trade(buy);
2863        aggregator.handle_trade(buy); // Emit bar 1 (run complete)
2864        aggregator.handle_trade(buy); // Start new run
2865        aggregator.handle_trade(buy); // Emit bar 2 (new run complete)
2866
2867        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2868        assert_eq!(handler_guard.len(), 2);
2869    }
2870
2871    #[rstest]
2872    fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2873        let instrument = InstrumentAny::Equity(equity_aapl);
2874        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2875        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2876        let handler = Arc::new(Mutex::new(Vec::new()));
2877        let handler_clone = Arc::clone(&handler);
2878
2879        let mut aggregator = TickRunsBarAggregator::new(
2880            bar_type,
2881            instrument.price_precision(),
2882            instrument.size_precision(),
2883            move |bar: Bar| {
2884                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2885                handler_guard.push(bar);
2886            },
2887        );
2888
2889        let buy = TradeTick::default();
2890        let no_aggressor = TradeTick {
2891            aggressor_side: AggressorSide::NoAggressor,
2892            ..buy
2893        };
2894
2895        aggregator.handle_trade(buy);
2896        aggregator.handle_trade(no_aggressor); // Should not affect run count
2897        aggregator.handle_trade(no_aggressor); // Should not affect run count
2898        aggregator.handle_trade(buy); // Continue run to threshold
2899
2900        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2901        assert_eq!(handler_guard.len(), 1);
2902    }
2903
2904    #[rstest]
2905    fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2906        let instrument = InstrumentAny::Equity(equity_aapl);
2907        let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2908        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2909        let handler = Arc::new(Mutex::new(Vec::new()));
2910        let handler_clone = Arc::clone(&handler);
2911
2912        let mut aggregator = VolumeRunsBarAggregator::new(
2913            bar_type,
2914            instrument.price_precision(),
2915            instrument.size_precision(),
2916            move |bar: Bar| {
2917                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2918                handler_guard.push(bar);
2919            },
2920        );
2921
2922        let buy = TradeTick {
2923            instrument_id: instrument.id(),
2924            price: Price::from("1.0"),
2925            size: Quantity::from(1),
2926            ..TradeTick::default()
2927        };
2928
2929        aggregator.handle_trade(buy);
2930        aggregator.handle_trade(buy); // Emit bar 1 (2.0 volume reached)
2931        aggregator.handle_trade(buy); // Start new run
2932        aggregator.handle_trade(buy); // Emit bar 2 (new 2.0 volume reached)
2933
2934        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2935        assert_eq!(handler_guard.len(), 2);
2936        assert_eq!(handler_guard[0].volume, Quantity::from(2));
2937        assert_eq!(handler_guard[1].volume, Quantity::from(2));
2938    }
2939
2940    #[rstest]
2941    fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2942        let instrument = InstrumentAny::Equity(equity_aapl);
2943        let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2944        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2945        let handler = Arc::new(Mutex::new(Vec::new()));
2946        let handler_clone = Arc::clone(&handler);
2947
2948        let mut aggregator = ValueRunsBarAggregator::new(
2949            bar_type,
2950            instrument.price_precision(),
2951            instrument.size_precision(),
2952            move |bar: Bar| {
2953                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2954                handler_guard.push(bar);
2955            },
2956        );
2957
2958        let buy = TradeTick {
2959            instrument_id: instrument.id(),
2960            price: Price::from("10.0"),
2961            size: Quantity::from(5),
2962            ..TradeTick::default()
2963        }; // value 50 per trade
2964
2965        aggregator.handle_trade(buy);
2966        aggregator.handle_trade(buy); // Emit bar 1 (100 value reached)
2967        aggregator.handle_trade(buy); // Start new run
2968        aggregator.handle_trade(buy); // Emit bar 2 (new 100 value reached)
2969
2970        let handler_guard = handler.lock().expect(MUTEX_POISONED);
2971        assert_eq!(handler_guard.len(), 2);
2972        assert_eq!(handler_guard[0].volume, Quantity::from(10));
2973        assert_eq!(handler_guard[1].volume, Quantity::from(10));
2974    }
2975
2976    #[rstest]
2977    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
2978        let instrument = InstrumentAny::Equity(equity_aapl);
2979        // One second bars
2980        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2981        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2982        let handler = Arc::new(Mutex::new(Vec::new()));
2983        let handler_clone = Arc::clone(&handler);
2984        let clock = Rc::new(RefCell::new(TestClock::new()));
2985
2986        let mut aggregator = TimeBarAggregator::new(
2987            bar_type,
2988            instrument.price_precision(),
2989            instrument.size_precision(),
2990            clock.clone(),
2991            move |bar: Bar| {
2992                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2993                handler_guard.push(bar);
2994            },
2995            true,  // build_with_no_updates
2996            false, // timestamp_on_close
2997            BarIntervalType::LeftOpen,
2998            None,  // time_bars_origin_offset
2999            15,    // bar_build_delay
3000            false, // skip_first_non_full_bar
3001        );
3002
3003        aggregator.update(
3004            Price::from("100.00"),
3005            Quantity::from(1),
3006            UnixNanos::default(),
3007        );
3008
3009        let next_sec = UnixNanos::from(1_000_000_000);
3010        clock.borrow_mut().set_time(next_sec);
3011
3012        let event = TimeEvent::new(
3013            Ustr::from("1-SECOND-LAST"),
3014            UUID4::new(),
3015            next_sec,
3016            next_sec,
3017        );
3018        aggregator.build_bar(event);
3019
3020        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3021        assert_eq!(handler_guard.len(), 1);
3022        let bar = handler_guard.first().unwrap();
3023        assert_eq!(bar.ts_event, UnixNanos::default());
3024        assert_eq!(bar.ts_init, next_sec);
3025    }
3026
3027    #[rstest]
3028    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3029        let instrument = InstrumentAny::Equity(equity_aapl);
3030        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3031        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3032        let handler = Arc::new(Mutex::new(Vec::new()));
3033        let handler_clone = Arc::clone(&handler);
3034        let clock = Rc::new(RefCell::new(TestClock::new()));
3035
3036        let mut aggregator = TimeBarAggregator::new(
3037            bar_type,
3038            instrument.price_precision(),
3039            instrument.size_precision(),
3040            clock.clone(),
3041            move |bar: Bar| {
3042                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3043                handler_guard.push(bar);
3044            },
3045            true, // build_with_no_updates
3046            true, // timestamp_on_close - changed to true to verify left-open behavior
3047            BarIntervalType::LeftOpen,
3048            None,
3049            15,
3050            false, // skip_first_non_full_bar
3051        );
3052
3053        // Update in first interval
3054        aggregator.update(
3055            Price::from("100.00"),
3056            Quantity::from(1),
3057            UnixNanos::default(),
3058        );
3059
3060        // First interval close
3061        let ts1 = UnixNanos::from(1_000_000_000);
3062        clock.borrow_mut().set_time(ts1);
3063        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3064        aggregator.build_bar(event);
3065
3066        // Update in second interval
3067        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3068
3069        // Second interval close
3070        let ts2 = UnixNanos::from(2_000_000_000);
3071        clock.borrow_mut().set_time(ts2);
3072        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3073        aggregator.build_bar(event);
3074
3075        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3076        assert_eq!(handler_guard.len(), 2);
3077
3078        let bar1 = &handler_guard[0];
3079        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
3080        assert_eq!(bar1.ts_init, ts1);
3081        assert_eq!(bar1.close, Price::from("100.00"));
3082        let bar2 = &handler_guard[1];
3083        assert_eq!(bar2.ts_event, ts2);
3084        assert_eq!(bar2.ts_init, ts2);
3085        assert_eq!(bar2.close, Price::from("101.00"));
3086    }
3087
3088    #[rstest]
3089    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3090        let instrument = InstrumentAny::Equity(equity_aapl);
3091        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3092        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3093        let handler = Arc::new(Mutex::new(Vec::new()));
3094        let handler_clone = Arc::clone(&handler);
3095        let clock = Rc::new(RefCell::new(TestClock::new()));
3096        let mut aggregator = TimeBarAggregator::new(
3097            bar_type,
3098            instrument.price_precision(),
3099            instrument.size_precision(),
3100            clock.clone(),
3101            move |bar: Bar| {
3102                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3103                handler_guard.push(bar);
3104            },
3105            true, // build_with_no_updates
3106            true, // timestamp_on_close
3107            BarIntervalType::RightOpen,
3108            None,
3109            15,
3110            false, // skip_first_non_full_bar
3111        );
3112
3113        // Update in first interval
3114        aggregator.update(
3115            Price::from("100.00"),
3116            Quantity::from(1),
3117            UnixNanos::default(),
3118        );
3119
3120        // First interval close
3121        let ts1 = UnixNanos::from(1_000_000_000);
3122        clock.borrow_mut().set_time(ts1);
3123        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3124        aggregator.build_bar(event);
3125
3126        // Update in second interval
3127        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3128
3129        // Second interval close
3130        let ts2 = UnixNanos::from(2_000_000_000);
3131        clock.borrow_mut().set_time(ts2);
3132        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3133        aggregator.build_bar(event);
3134
3135        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3136        assert_eq!(handler_guard.len(), 2);
3137
3138        let bar1 = &handler_guard[0];
3139        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
3140        assert_eq!(bar1.ts_init, ts1);
3141        assert_eq!(bar1.close, Price::from("100.00"));
3142
3143        let bar2 = &handler_guard[1];
3144        assert_eq!(bar2.ts_event, ts1);
3145        assert_eq!(bar2.ts_init, ts2);
3146        assert_eq!(bar2.close, Price::from("101.00"));
3147    }
3148
3149    #[rstest]
3150    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3151        let instrument = InstrumentAny::Equity(equity_aapl);
3152        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3153        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3154        let handler = Arc::new(Mutex::new(Vec::new()));
3155        let handler_clone = Arc::clone(&handler);
3156        let clock = Rc::new(RefCell::new(TestClock::new()));
3157
3158        // First test with build_with_no_updates = false
3159        let mut aggregator = TimeBarAggregator::new(
3160            bar_type,
3161            instrument.price_precision(),
3162            instrument.size_precision(),
3163            clock.clone(),
3164            move |bar: Bar| {
3165                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3166                handler_guard.push(bar);
3167            },
3168            false, // build_with_no_updates disabled
3169            true,  // timestamp_on_close
3170            BarIntervalType::LeftOpen,
3171            None,
3172            15,
3173            false, // skip_first_non_full_bar
3174        );
3175
3176        // No updates, just interval close
3177        let ts1 = UnixNanos::from(1_000_000_000);
3178        clock.borrow_mut().set_time(ts1);
3179        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3180        aggregator.build_bar(event);
3181
3182        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3183        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
3184        drop(handler_guard);
3185
3186        // Now test with build_with_no_updates = true
3187        let handler = Arc::new(Mutex::new(Vec::new()));
3188        let handler_clone = Arc::clone(&handler);
3189        let mut aggregator = TimeBarAggregator::new(
3190            bar_type,
3191            instrument.price_precision(),
3192            instrument.size_precision(),
3193            clock.clone(),
3194            move |bar: Bar| {
3195                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3196                handler_guard.push(bar);
3197            },
3198            true, // build_with_no_updates enabled
3199            true, // timestamp_on_close
3200            BarIntervalType::LeftOpen,
3201            None,
3202            15,
3203            false, // skip_first_non_full_bar
3204        );
3205
3206        aggregator.update(
3207            Price::from("100.00"),
3208            Quantity::from(1),
3209            UnixNanos::default(),
3210        );
3211
3212        // First interval with update
3213        let ts1 = UnixNanos::from(1_000_000_000);
3214        clock.borrow_mut().set_time(ts1);
3215        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3216        aggregator.build_bar(event);
3217
3218        // Second interval without updates
3219        let ts2 = UnixNanos::from(2_000_000_000);
3220        clock.borrow_mut().set_time(ts2);
3221        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3222        aggregator.build_bar(event);
3223
3224        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3225        assert_eq!(handler_guard.len(), 2); // Both bars should be built
3226        let bar1 = &handler_guard[0];
3227        assert_eq!(bar1.close, Price::from("100.00"));
3228        let bar2 = &handler_guard[1];
3229        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
3230    }
3231
3232    #[rstest]
3233    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3234        let instrument = InstrumentAny::Equity(equity_aapl);
3235        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3236        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3237        let clock = Rc::new(RefCell::new(TestClock::new()));
3238        let handler = Arc::new(Mutex::new(Vec::new()));
3239        let handler_clone = Arc::clone(&handler);
3240
3241        let mut aggregator = TimeBarAggregator::new(
3242            bar_type,
3243            instrument.price_precision(),
3244            instrument.size_precision(),
3245            clock.clone(),
3246            move |bar: Bar| {
3247                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3248                handler_guard.push(bar);
3249            },
3250            true, // build_with_no_updates
3251            true, // timestamp_on_close
3252            BarIntervalType::RightOpen,
3253            None,
3254            15,
3255            false, // skip_first_non_full_bar
3256        );
3257
3258        let ts1 = UnixNanos::from(1_000_000_000);
3259        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3260
3261        let ts2 = UnixNanos::from(2_000_000_000);
3262        clock.borrow_mut().set_time(ts2);
3263
3264        // Simulate timestamp on close
3265        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3266        aggregator.build_bar(event);
3267
3268        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3269        let bar = handler_guard.first().unwrap();
3270        assert_eq!(bar.ts_event, UnixNanos::default());
3271        assert_eq!(bar.ts_init, ts2);
3272    }
3273
3274    // ========================================================================
3275    // RenkoBarAggregator Tests
3276    // ========================================================================
3277
3278    #[rstest]
3279    fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3280        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3281        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3282        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3283        let handler = Arc::new(Mutex::new(Vec::new()));
3284        let handler_clone = Arc::clone(&handler);
3285
3286        let aggregator = RenkoBarAggregator::new(
3287            bar_type,
3288            instrument.price_precision(),
3289            instrument.size_precision(),
3290            instrument.price_increment(),
3291            move |bar: Bar| {
3292                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3293                handler_guard.push(bar);
3294            },
3295        );
3296
3297        assert_eq!(aggregator.bar_type(), bar_type);
3298        assert!(!aggregator.is_running());
3299        // 10 pips * price_increment.raw (depends on precision mode)
3300        let expected_brick_size = 10 * instrument.price_increment().raw;
3301        assert_eq!(aggregator.brick_size, expected_brick_size);
3302    }
3303
3304    #[rstest]
3305    fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3306        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3307        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3308        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3309        let handler = Arc::new(Mutex::new(Vec::new()));
3310        let handler_clone = Arc::clone(&handler);
3311
3312        let mut aggregator = RenkoBarAggregator::new(
3313            bar_type,
3314            instrument.price_precision(),
3315            instrument.size_precision(),
3316            instrument.price_increment(),
3317            move |bar: Bar| {
3318                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3319                handler_guard.push(bar);
3320            },
3321        );
3322
3323        // Small price movement (5 pips, less than 10 pip brick size)
3324        aggregator.update(
3325            Price::from("1.00000"),
3326            Quantity::from(1),
3327            UnixNanos::default(),
3328        );
3329        aggregator.update(
3330            Price::from("1.00005"),
3331            Quantity::from(1),
3332            UnixNanos::from(1000),
3333        );
3334
3335        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3336        assert_eq!(handler_guard.len(), 0); // No bar created yet
3337    }
3338
3339    #[rstest]
3340    fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3341        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3342        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3343        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3344        let handler = Arc::new(Mutex::new(Vec::new()));
3345        let handler_clone = Arc::clone(&handler);
3346
3347        let mut aggregator = RenkoBarAggregator::new(
3348            bar_type,
3349            instrument.price_precision(),
3350            instrument.size_precision(),
3351            instrument.price_increment(),
3352            move |bar: Bar| {
3353                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3354                handler_guard.push(bar);
3355            },
3356        );
3357
3358        // Price movement exceeding brick size (15 pips)
3359        aggregator.update(
3360            Price::from("1.00000"),
3361            Quantity::from(1),
3362            UnixNanos::default(),
3363        );
3364        aggregator.update(
3365            Price::from("1.00015"),
3366            Quantity::from(1),
3367            UnixNanos::from(1000),
3368        );
3369
3370        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3371        assert_eq!(handler_guard.len(), 1);
3372
3373        let bar = handler_guard.first().unwrap();
3374        assert_eq!(bar.open, Price::from("1.00000"));
3375        assert_eq!(bar.high, Price::from("1.00010"));
3376        assert_eq!(bar.low, Price::from("1.00000"));
3377        assert_eq!(bar.close, Price::from("1.00010"));
3378        assert_eq!(bar.volume, Quantity::from(2));
3379        assert_eq!(bar.ts_event, UnixNanos::from(1000));
3380        assert_eq!(bar.ts_init, UnixNanos::from(1000));
3381    }
3382
3383    #[rstest]
3384    fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3385        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3386        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3387        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3388        let handler = Arc::new(Mutex::new(Vec::new()));
3389        let handler_clone = Arc::clone(&handler);
3390
3391        let mut aggregator = RenkoBarAggregator::new(
3392            bar_type,
3393            instrument.price_precision(),
3394            instrument.size_precision(),
3395            instrument.price_increment(),
3396            move |bar: Bar| {
3397                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3398                handler_guard.push(bar);
3399            },
3400        );
3401
3402        // Large price movement creating multiple bricks (25 pips = 2 bricks)
3403        aggregator.update(
3404            Price::from("1.00000"),
3405            Quantity::from(1),
3406            UnixNanos::default(),
3407        );
3408        aggregator.update(
3409            Price::from("1.00025"),
3410            Quantity::from(1),
3411            UnixNanos::from(1000),
3412        );
3413
3414        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3415        assert_eq!(handler_guard.len(), 2);
3416
3417        let bar1 = &handler_guard[0];
3418        assert_eq!(bar1.open, Price::from("1.00000"));
3419        assert_eq!(bar1.high, Price::from("1.00010"));
3420        assert_eq!(bar1.low, Price::from("1.00000"));
3421        assert_eq!(bar1.close, Price::from("1.00010"));
3422
3423        let bar2 = &handler_guard[1];
3424        assert_eq!(bar2.open, Price::from("1.00010"));
3425        assert_eq!(bar2.high, Price::from("1.00020"));
3426        assert_eq!(bar2.low, Price::from("1.00010"));
3427        assert_eq!(bar2.close, Price::from("1.00020"));
3428    }
3429
3430    #[rstest]
3431    fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3432        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3433        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3434        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3435        let handler = Arc::new(Mutex::new(Vec::new()));
3436        let handler_clone = Arc::clone(&handler);
3437
3438        let mut aggregator = RenkoBarAggregator::new(
3439            bar_type,
3440            instrument.price_precision(),
3441            instrument.size_precision(),
3442            instrument.price_increment(),
3443            move |bar: Bar| {
3444                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3445                handler_guard.push(bar);
3446            },
3447        );
3448
3449        // Start at higher price and move down
3450        aggregator.update(
3451            Price::from("1.00020"),
3452            Quantity::from(1),
3453            UnixNanos::default(),
3454        );
3455        aggregator.update(
3456            Price::from("1.00005"),
3457            Quantity::from(1),
3458            UnixNanos::from(1000),
3459        );
3460
3461        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3462        assert_eq!(handler_guard.len(), 1);
3463
3464        let bar = handler_guard.first().unwrap();
3465        assert_eq!(bar.open, Price::from("1.00020"));
3466        assert_eq!(bar.high, Price::from("1.00020"));
3467        assert_eq!(bar.low, Price::from("1.00010"));
3468        assert_eq!(bar.close, Price::from("1.00010"));
3469        assert_eq!(bar.volume, Quantity::from(2));
3470    }
3471
3472    #[rstest]
3473    fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3474        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3475        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3476        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3477        let handler = Arc::new(Mutex::new(Vec::new()));
3478        let handler_clone = Arc::clone(&handler);
3479
3480        let mut aggregator = RenkoBarAggregator::new(
3481            bar_type,
3482            instrument.price_precision(),
3483            instrument.size_precision(),
3484            instrument.price_increment(),
3485            move |bar: Bar| {
3486                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3487                handler_guard.push(bar);
3488            },
3489        );
3490
3491        // Create a bar with small price movement (5 pips)
3492        let input_bar = Bar::new(
3493            BarType::new(
3494                instrument.id(),
3495                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3496                AggregationSource::Internal,
3497            ),
3498            Price::from("1.00000"),
3499            Price::from("1.00005"),
3500            Price::from("0.99995"),
3501            Price::from("1.00005"), // 5 pip move up (less than 10 pip brick)
3502            Quantity::from(100),
3503            UnixNanos::default(),
3504            UnixNanos::from(1000),
3505        );
3506
3507        aggregator.handle_bar(input_bar);
3508
3509        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3510        assert_eq!(handler_guard.len(), 0); // No bar created yet
3511    }
3512
3513    #[rstest]
3514    fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3515        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3516        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3517        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3518        let handler = Arc::new(Mutex::new(Vec::new()));
3519        let handler_clone = Arc::clone(&handler);
3520
3521        let mut aggregator = RenkoBarAggregator::new(
3522            bar_type,
3523            instrument.price_precision(),
3524            instrument.size_precision(),
3525            instrument.price_increment(),
3526            move |bar: Bar| {
3527                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3528                handler_guard.push(bar);
3529            },
3530        );
3531
3532        // First bar to establish baseline
3533        let bar1 = Bar::new(
3534            BarType::new(
3535                instrument.id(),
3536                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3537                AggregationSource::Internal,
3538            ),
3539            Price::from("1.00000"),
3540            Price::from("1.00005"),
3541            Price::from("0.99995"),
3542            Price::from("1.00000"),
3543            Quantity::from(100),
3544            UnixNanos::default(),
3545            UnixNanos::default(),
3546        );
3547
3548        // Second bar with price movement exceeding brick size (10 pips)
3549        let bar2 = Bar::new(
3550            BarType::new(
3551                instrument.id(),
3552                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3553                AggregationSource::Internal,
3554            ),
3555            Price::from("1.00000"),
3556            Price::from("1.00015"),
3557            Price::from("0.99995"),
3558            Price::from("1.00010"), // 10 pip move up (exactly 1 brick)
3559            Quantity::from(50),
3560            UnixNanos::from(60_000_000_000),
3561            UnixNanos::from(60_000_000_000),
3562        );
3563
3564        aggregator.handle_bar(bar1);
3565        aggregator.handle_bar(bar2);
3566
3567        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3568        assert_eq!(handler_guard.len(), 1);
3569
3570        let bar = handler_guard.first().unwrap();
3571        assert_eq!(bar.open, Price::from("1.00000"));
3572        assert_eq!(bar.high, Price::from("1.00010"));
3573        assert_eq!(bar.low, Price::from("1.00000"));
3574        assert_eq!(bar.close, Price::from("1.00010"));
3575        assert_eq!(bar.volume, Quantity::from(150));
3576    }
3577
3578    #[rstest]
3579    fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3580        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3581        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3582        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3583        let handler = Arc::new(Mutex::new(Vec::new()));
3584        let handler_clone = Arc::clone(&handler);
3585
3586        let mut aggregator = RenkoBarAggregator::new(
3587            bar_type,
3588            instrument.price_precision(),
3589            instrument.size_precision(),
3590            instrument.price_increment(),
3591            move |bar: Bar| {
3592                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3593                handler_guard.push(bar);
3594            },
3595        );
3596
3597        // First bar to establish baseline
3598        let bar1 = Bar::new(
3599            BarType::new(
3600                instrument.id(),
3601                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3602                AggregationSource::Internal,
3603            ),
3604            Price::from("1.00000"),
3605            Price::from("1.00005"),
3606            Price::from("0.99995"),
3607            Price::from("1.00000"),
3608            Quantity::from(100),
3609            UnixNanos::default(),
3610            UnixNanos::default(),
3611        );
3612
3613        // Second bar with large price movement (30 pips = 3 bricks)
3614        let bar2 = Bar::new(
3615            BarType::new(
3616                instrument.id(),
3617                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3618                AggregationSource::Internal,
3619            ),
3620            Price::from("1.00000"),
3621            Price::from("1.00035"),
3622            Price::from("0.99995"),
3623            Price::from("1.00030"), // 30 pip move up (exactly 3 bricks)
3624            Quantity::from(50),
3625            UnixNanos::from(60_000_000_000),
3626            UnixNanos::from(60_000_000_000),
3627        );
3628
3629        aggregator.handle_bar(bar1);
3630        aggregator.handle_bar(bar2);
3631
3632        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3633        assert_eq!(handler_guard.len(), 3);
3634
3635        let bar1 = &handler_guard[0];
3636        assert_eq!(bar1.open, Price::from("1.00000"));
3637        assert_eq!(bar1.close, Price::from("1.00010"));
3638
3639        let bar2 = &handler_guard[1];
3640        assert_eq!(bar2.open, Price::from("1.00010"));
3641        assert_eq!(bar2.close, Price::from("1.00020"));
3642
3643        let bar3 = &handler_guard[2];
3644        assert_eq!(bar3.open, Price::from("1.00020"));
3645        assert_eq!(bar3.close, Price::from("1.00030"));
3646    }
3647
3648    #[rstest]
3649    fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3650        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3651        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3652        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3653        let handler = Arc::new(Mutex::new(Vec::new()));
3654        let handler_clone = Arc::clone(&handler);
3655
3656        let mut aggregator = RenkoBarAggregator::new(
3657            bar_type,
3658            instrument.price_precision(),
3659            instrument.size_precision(),
3660            instrument.price_increment(),
3661            move |bar: Bar| {
3662                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3663                handler_guard.push(bar);
3664            },
3665        );
3666
3667        // First bar to establish baseline
3668        let bar1 = Bar::new(
3669            BarType::new(
3670                instrument.id(),
3671                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3672                AggregationSource::Internal,
3673            ),
3674            Price::from("1.00020"),
3675            Price::from("1.00025"),
3676            Price::from("1.00015"),
3677            Price::from("1.00020"),
3678            Quantity::from(100),
3679            UnixNanos::default(),
3680            UnixNanos::default(),
3681        );
3682
3683        // Second bar with downward price movement (10 pips down)
3684        let bar2 = Bar::new(
3685            BarType::new(
3686                instrument.id(),
3687                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3688                AggregationSource::Internal,
3689            ),
3690            Price::from("1.00020"),
3691            Price::from("1.00025"),
3692            Price::from("1.00005"),
3693            Price::from("1.00010"), // 10 pip move down (exactly 1 brick)
3694            Quantity::from(50),
3695            UnixNanos::from(60_000_000_000),
3696            UnixNanos::from(60_000_000_000),
3697        );
3698
3699        aggregator.handle_bar(bar1);
3700        aggregator.handle_bar(bar2);
3701
3702        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3703        assert_eq!(handler_guard.len(), 1);
3704
3705        let bar = handler_guard.first().unwrap();
3706        assert_eq!(bar.open, Price::from("1.00020"));
3707        assert_eq!(bar.high, Price::from("1.00020"));
3708        assert_eq!(bar.low, Price::from("1.00010"));
3709        assert_eq!(bar.close, Price::from("1.00010"));
3710        assert_eq!(bar.volume, Quantity::from(150));
3711    }
3712
3713    #[rstest]
3714    fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3715        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3716
3717        // Test different brick sizes
3718        let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); // 5 pip brick size
3719        let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3720        let handler = Arc::new(Mutex::new(Vec::new()));
3721        let handler_clone = Arc::clone(&handler);
3722
3723        let aggregator_5 = RenkoBarAggregator::new(
3724            bar_type_5,
3725            instrument.price_precision(),
3726            instrument.size_precision(),
3727            instrument.price_increment(),
3728            move |_bar: Bar| {
3729                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3730                handler_guard.push(_bar);
3731            },
3732        );
3733
3734        // 5 pips * price_increment.raw (depends on precision mode)
3735        let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3736        assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3737
3738        let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); // 20 pip brick size
3739        let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3740        let handler2 = Arc::new(Mutex::new(Vec::new()));
3741        let handler2_clone = Arc::clone(&handler2);
3742
3743        let aggregator_20 = RenkoBarAggregator::new(
3744            bar_type_20,
3745            instrument.price_precision(),
3746            instrument.size_precision(),
3747            instrument.price_increment(),
3748            move |_bar: Bar| {
3749                let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3750                handler_guard.push(_bar);
3751            },
3752        );
3753
3754        // 20 pips * price_increment.raw (depends on precision mode)
3755        let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3756        assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3757    }
3758
3759    #[rstest]
3760    fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3761        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3762        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3763        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3764        let handler = Arc::new(Mutex::new(Vec::new()));
3765        let handler_clone = Arc::clone(&handler);
3766
3767        let mut aggregator = RenkoBarAggregator::new(
3768            bar_type,
3769            instrument.price_precision(),
3770            instrument.size_precision(),
3771            instrument.price_increment(),
3772            move |bar: Bar| {
3773                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3774                handler_guard.push(bar);
3775            },
3776        );
3777
3778        // Sequential updates creating multiple bars
3779        aggregator.update(
3780            Price::from("1.00000"),
3781            Quantity::from(1),
3782            UnixNanos::from(1000),
3783        );
3784        aggregator.update(
3785            Price::from("1.00010"),
3786            Quantity::from(1),
3787            UnixNanos::from(2000),
3788        ); // First brick
3789        aggregator.update(
3790            Price::from("1.00020"),
3791            Quantity::from(1),
3792            UnixNanos::from(3000),
3793        ); // Second brick
3794        aggregator.update(
3795            Price::from("1.00025"),
3796            Quantity::from(1),
3797            UnixNanos::from(4000),
3798        ); // Partial third brick
3799        aggregator.update(
3800            Price::from("1.00030"),
3801            Quantity::from(1),
3802            UnixNanos::from(5000),
3803        ); // Complete third brick
3804
3805        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3806        assert_eq!(handler_guard.len(), 3);
3807
3808        let bar1 = &handler_guard[0];
3809        assert_eq!(bar1.open, Price::from("1.00000"));
3810        assert_eq!(bar1.close, Price::from("1.00010"));
3811
3812        let bar2 = &handler_guard[1];
3813        assert_eq!(bar2.open, Price::from("1.00010"));
3814        assert_eq!(bar2.close, Price::from("1.00020"));
3815
3816        let bar3 = &handler_guard[2];
3817        assert_eq!(bar3.open, Price::from("1.00020"));
3818        assert_eq!(bar3.close, Price::from("1.00030"));
3819    }
3820
3821    #[rstest]
3822    fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3823        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3824        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
3825        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3826        let handler = Arc::new(Mutex::new(Vec::new()));
3827        let handler_clone = Arc::clone(&handler);
3828
3829        let mut aggregator = RenkoBarAggregator::new(
3830            bar_type,
3831            instrument.price_precision(),
3832            instrument.size_precision(),
3833            instrument.price_increment(),
3834            move |bar: Bar| {
3835                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3836                handler_guard.push(bar);
3837            },
3838        );
3839
3840        // Mixed direction movement: up then down
3841        aggregator.update(
3842            Price::from("1.00000"),
3843            Quantity::from(1),
3844            UnixNanos::from(1000),
3845        );
3846        aggregator.update(
3847            Price::from("1.00010"),
3848            Quantity::from(1),
3849            UnixNanos::from(2000),
3850        ); // Up brick
3851        aggregator.update(
3852            Price::from("0.99990"),
3853            Quantity::from(1),
3854            UnixNanos::from(3000),
3855        ); // Down 2 bricks (20 pips)
3856
3857        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3858        assert_eq!(handler_guard.len(), 3);
3859
3860        let bar1 = &handler_guard[0]; // Up brick
3861        assert_eq!(bar1.open, Price::from("1.00000"));
3862        assert_eq!(bar1.high, Price::from("1.00010"));
3863        assert_eq!(bar1.low, Price::from("1.00000"));
3864        assert_eq!(bar1.close, Price::from("1.00010"));
3865
3866        let bar2 = &handler_guard[1]; // First down brick
3867        assert_eq!(bar2.open, Price::from("1.00010"));
3868        assert_eq!(bar2.high, Price::from("1.00010"));
3869        assert_eq!(bar2.low, Price::from("1.00000"));
3870        assert_eq!(bar2.close, Price::from("1.00000"));
3871
3872        let bar3 = &handler_guard[2]; // Second down brick
3873        assert_eq!(bar3.open, Price::from("1.00000"));
3874        assert_eq!(bar3.high, Price::from("1.00000"));
3875        assert_eq!(bar3.low, Price::from("0.99990"));
3876        assert_eq!(bar3.close, Price::from("0.99990"));
3877    }
3878
3879    #[rstest]
3880    fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3881        let instrument = InstrumentAny::Equity(equity_aapl);
3882        let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3883        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3884        let handler = Arc::new(Mutex::new(Vec::new()));
3885        let handler_clone = Arc::clone(&handler);
3886
3887        let mut aggregator = TickImbalanceBarAggregator::new(
3888            bar_type,
3889            instrument.price_precision(),
3890            instrument.size_precision(),
3891            move |bar: Bar| {
3892                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3893                handler_guard.push(bar);
3894            },
3895        );
3896
3897        let buy = TradeTick {
3898            aggressor_side: AggressorSide::Buyer,
3899            ..TradeTick::default()
3900        };
3901        let sell = TradeTick {
3902            aggressor_side: AggressorSide::Seller,
3903            ..TradeTick::default()
3904        };
3905
3906        aggregator.handle_trade(buy);
3907        aggregator.handle_trade(sell);
3908        aggregator.handle_trade(buy);
3909
3910        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3911        assert_eq!(handler_guard.len(), 0);
3912    }
3913
3914    #[rstest]
3915    fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3916        let instrument = InstrumentAny::Equity(equity_aapl);
3917        let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3918        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3919        let handler = Arc::new(Mutex::new(Vec::new()));
3920        let handler_clone = Arc::clone(&handler);
3921
3922        let mut aggregator = TickImbalanceBarAggregator::new(
3923            bar_type,
3924            instrument.price_precision(),
3925            instrument.size_precision(),
3926            move |bar: Bar| {
3927                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3928                handler_guard.push(bar);
3929            },
3930        );
3931
3932        let buy = TradeTick {
3933            aggressor_side: AggressorSide::Buyer,
3934            ..TradeTick::default()
3935        };
3936        let no_aggressor = TradeTick {
3937            aggressor_side: AggressorSide::NoAggressor,
3938            ..TradeTick::default()
3939        };
3940
3941        aggregator.handle_trade(buy);
3942        aggregator.handle_trade(no_aggressor);
3943        aggregator.handle_trade(buy);
3944
3945        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3946        assert_eq!(handler_guard.len(), 1);
3947    }
3948
3949    #[rstest]
3950    fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3951        let instrument = InstrumentAny::Equity(equity_aapl);
3952        let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3953        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3954        let handler = Arc::new(Mutex::new(Vec::new()));
3955        let handler_clone = Arc::clone(&handler);
3956
3957        let mut aggregator = TickRunsBarAggregator::new(
3958            bar_type,
3959            instrument.price_precision(),
3960            instrument.size_precision(),
3961            move |bar: Bar| {
3962                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3963                handler_guard.push(bar);
3964            },
3965        );
3966
3967        let buy = TradeTick {
3968            aggressor_side: AggressorSide::Buyer,
3969            ..TradeTick::default()
3970        };
3971        let sell = TradeTick {
3972            aggressor_side: AggressorSide::Seller,
3973            ..TradeTick::default()
3974        };
3975
3976        aggregator.handle_trade(buy);
3977        aggregator.handle_trade(buy);
3978        aggregator.handle_trade(sell);
3979        aggregator.handle_trade(sell);
3980
3981        let handler_guard = handler.lock().expect(MUTEX_POISONED);
3982        assert_eq!(handler_guard.len(), 2);
3983    }
3984
3985    #[rstest]
3986    fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
3987        let instrument = InstrumentAny::Equity(equity_aapl);
3988        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
3989        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3990        let handler = Arc::new(Mutex::new(Vec::new()));
3991        let handler_clone = Arc::clone(&handler);
3992
3993        let mut aggregator = VolumeImbalanceBarAggregator::new(
3994            bar_type,
3995            instrument.price_precision(),
3996            instrument.size_precision(),
3997            move |bar: Bar| {
3998                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3999                handler_guard.push(bar);
4000            },
4001        );
4002
4003        let large_trade = TradeTick {
4004            size: Quantity::from(25),
4005            aggressor_side: AggressorSide::Buyer,
4006            ..TradeTick::default()
4007        };
4008
4009        aggregator.handle_trade(large_trade);
4010
4011        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4012        assert_eq!(handler_guard.len(), 2);
4013    }
4014
4015    #[rstest]
4016    fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4017        equity_aapl: Equity,
4018    ) {
4019        let instrument = InstrumentAny::Equity(equity_aapl);
4020        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4021        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4022        let handler = Arc::new(Mutex::new(Vec::new()));
4023        let handler_clone = Arc::clone(&handler);
4024
4025        let mut aggregator = VolumeImbalanceBarAggregator::new(
4026            bar_type,
4027            instrument.price_precision(),
4028            instrument.size_precision(),
4029            move |bar: Bar| {
4030                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4031                handler_guard.push(bar);
4032            },
4033        );
4034
4035        let buy = TradeTick {
4036            size: Quantity::from(5),
4037            aggressor_side: AggressorSide::Buyer,
4038            ..TradeTick::default()
4039        };
4040        let no_aggressor = TradeTick {
4041            size: Quantity::from(3),
4042            aggressor_side: AggressorSide::NoAggressor,
4043            ..TradeTick::default()
4044        };
4045
4046        aggregator.handle_trade(buy);
4047        aggregator.handle_trade(no_aggressor);
4048        aggregator.handle_trade(buy);
4049
4050        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4051        assert_eq!(handler_guard.len(), 1);
4052    }
4053
4054    #[rstest]
4055    fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4056        let instrument = InstrumentAny::Equity(equity_aapl);
4057        let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4058        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4059        let handler = Arc::new(Mutex::new(Vec::new()));
4060        let handler_clone = Arc::clone(&handler);
4061
4062        let mut aggregator = VolumeRunsBarAggregator::new(
4063            bar_type,
4064            instrument.price_precision(),
4065            instrument.size_precision(),
4066            move |bar: Bar| {
4067                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4068                handler_guard.push(bar);
4069            },
4070        );
4071
4072        let large_trade = TradeTick {
4073            size: Quantity::from(25),
4074            aggressor_side: AggressorSide::Buyer,
4075            ..TradeTick::default()
4076        };
4077
4078        aggregator.handle_trade(large_trade);
4079
4080        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4081        assert_eq!(handler_guard.len(), 2);
4082    }
4083
4084    #[rstest]
4085    fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4086        let instrument = InstrumentAny::Equity(equity_aapl);
4087        let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4088        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4089        let handler = Arc::new(Mutex::new(Vec::new()));
4090        let handler_clone = Arc::clone(&handler);
4091
4092        let mut aggregator = ValueRunsBarAggregator::new(
4093            bar_type,
4094            instrument.price_precision(),
4095            instrument.size_precision(),
4096            move |bar: Bar| {
4097                let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4098                handler_guard.push(bar);
4099            },
4100        );
4101
4102        let large_trade = TradeTick {
4103            price: Price::from("5.00"),
4104            size: Quantity::from(25),
4105            aggressor_side: AggressorSide::Buyer,
4106            ..TradeTick::default()
4107        };
4108
4109        aggregator.handle_trade(large_trade);
4110
4111        let handler_guard = handler.lock().expect(MUTEX_POISONED);
4112        assert_eq!(handler_guard.len(), 2);
4113    }
4114}