nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25    clock::Clock,
26    timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29    SharedCell, UnixNanos, WeakCell,
30    correctness::{self, FAILED},
31    datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34    data::{
35        QuoteTick, TradeTick,
36        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37    },
38    enums::{AggregationSource, BarAggregation, BarIntervalType},
39    types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
40};
41
42/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
43///
44/// Implementors receive updates and produce completed bars via handlers, with support for batch updates.
45pub trait BarAggregator: Any + Debug {
46    /// The [`BarType`] to be aggregated.
47    fn bar_type(&self) -> BarType;
48    /// If the aggregator is running and will receive data from the message bus.
49    fn is_running(&self) -> bool;
50    /// Sets the running state of the aggregator (receiving updates when `true`).
51    fn set_is_running(&mut self, value: bool);
52    /// Updates the aggregator  with the given price and size.
53    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
54    /// Updates the aggregator with the given quote.
55    fn handle_quote(&mut self, quote: QuoteTick) {
56        let spec = self.bar_type().spec();
57        self.update(
58            quote.extract_price(spec.price_type),
59            quote.extract_size(spec.price_type),
60            quote.ts_init,
61        );
62    }
63    /// Updates the aggregator with the given trade.
64    fn handle_trade(&mut self, trade: TradeTick) {
65        self.update(trade.price, trade.size, trade.ts_init);
66    }
67    /// Updates the aggregator with the given bar.
68    fn handle_bar(&mut self, bar: Bar) {
69        self.update_bar(bar, bar.volume, bar.ts_init);
70    }
71    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
72    /// Incorporates an existing bar and its volume into aggregation at the given init timestamp.
73    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
74    /// Starts batch mode, sending bars to the supplied handler for the given time context.
75    fn stop_batch_update(&mut self);
76    /// Stops batch mode and restores the standard bar handler.
77    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
78    fn stop(&mut self) {}
79}
80
81impl dyn BarAggregator {
82    /// Returns a reference to this aggregator as `Any` for downcasting.
83    pub fn as_any(&self) -> &dyn Any {
84        self
85    }
86    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
87    pub fn as_any_mut(&mut self) -> &mut dyn Any {
88        self
89    }
90}
91
92/// Provides a generic bar builder for aggregation.
93#[derive(Debug)]
94pub struct BarBuilder {
95    bar_type: BarType,
96    price_precision: u8,
97    size_precision: u8,
98    initialized: bool,
99    ts_last: UnixNanos,
100    count: usize,
101    last_close: Option<Price>,
102    open: Option<Price>,
103    high: Option<Price>,
104    low: Option<Price>,
105    close: Option<Price>,
106    volume: Quantity,
107}
108
109impl BarBuilder {
110    /// Creates a new [`BarBuilder`] instance.
111    ///
112    /// # Panics
113    ///
114    /// This function panics if:
115    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
116    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
117    #[must_use]
118    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
119        correctness::check_equal(
120            &bar_type.aggregation_source(),
121            &AggregationSource::Internal,
122            "bar_type.aggregation_source",
123            "AggregationSource::Internal",
124        )
125        .expect(FAILED);
126
127        Self {
128            bar_type,
129            price_precision,
130            size_precision,
131            initialized: false,
132            ts_last: UnixNanos::default(),
133            count: 0,
134            last_close: None,
135            open: None,
136            high: None,
137            low: None,
138            close: None,
139            volume: Quantity::zero(size_precision),
140        }
141    }
142
143    /// Updates the builder state with the given price, size, and init timestamp.
144    ///
145    /// # Panics
146    ///
147    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
148    pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
149        if ts_init < self.ts_last {
150            return; // Not applicable
151        }
152
153        if self.open.is_none() {
154            self.open = Some(price);
155            self.high = Some(price);
156            self.low = Some(price);
157            self.initialized = true;
158        } else {
159            if price > self.high.unwrap() {
160                self.high = Some(price);
161            }
162            if price < self.low.unwrap() {
163                self.low = Some(price);
164            }
165        }
166
167        self.close = Some(price);
168        self.volume = self.volume.add(size);
169        self.count += 1;
170        self.ts_last = ts_init;
171    }
172
173    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
174    ///
175    /// # Panics
176    ///
177    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
178    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
179        if ts_init < self.ts_last {
180            return; // Not applicable
181        }
182
183        if self.open.is_none() {
184            self.open = Some(bar.open);
185            self.high = Some(bar.high);
186            self.low = Some(bar.low);
187            self.initialized = true;
188        } else {
189            if bar.high > self.high.unwrap() {
190                self.high = Some(bar.high);
191            }
192            if bar.low < self.low.unwrap() {
193                self.low = Some(bar.low);
194            }
195        }
196
197        self.close = Some(bar.close);
198        self.volume = self.volume.add(volume);
199        self.count += 1;
200        self.ts_last = ts_init;
201    }
202
203    /// Reset the bar builder.
204    ///
205    /// All stateful fields are reset to their initial value.
206    pub fn reset(&mut self) {
207        self.open = None;
208        self.high = None;
209        self.low = None;
210        self.volume = Quantity::zero(self.size_precision);
211        self.count = 0;
212    }
213
214    /// Return the aggregated bar and reset.
215    pub fn build_now(&mut self) -> Bar {
216        self.build(self.ts_last, self.ts_last)
217    }
218
219    /// Returns the aggregated bar for the given timestamps, then resets the builder.
220    ///
221    /// # Panics
222    ///
223    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
224    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
225        if self.open.is_none() {
226            self.open = self.last_close;
227            self.high = self.last_close;
228            self.low = self.last_close;
229            self.close = self.last_close;
230        }
231
232        if let (Some(close), Some(low)) = (self.close, self.low)
233            && close < low
234        {
235            self.low = Some(close);
236        }
237
238        if let (Some(close), Some(high)) = (self.close, self.high)
239            && close > high
240        {
241            self.high = Some(close);
242        }
243
244        // SAFETY: The open was checked, so we can assume all prices are Some
245        let bar = Bar::new(
246            self.bar_type,
247            self.open.unwrap(),
248            self.high.unwrap(),
249            self.low.unwrap(),
250            self.close.unwrap(),
251            self.volume,
252            ts_event,
253            ts_init,
254        );
255
256        self.last_close = self.close;
257        self.reset();
258        bar
259    }
260}
261
262/// Provides a means of aggregating specified bar types and sending to a registered handler.
263pub struct BarAggregatorCore<H>
264where
265    H: FnMut(Bar),
266{
267    bar_type: BarType,
268    builder: BarBuilder,
269    handler: H,
270    handler_backup: Option<H>,
271    batch_handler: Option<Box<dyn FnMut(Bar)>>,
272    is_running: bool,
273    batch_mode: bool,
274}
275
276impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        f.debug_struct(stringify!(BarAggregatorCore))
279            .field("bar_type", &self.bar_type)
280            .field("builder", &self.builder)
281            .field("is_running", &self.is_running)
282            .field("batch_mode", &self.batch_mode)
283            .finish()
284    }
285}
286
287impl<H> BarAggregatorCore<H>
288where
289    H: FnMut(Bar),
290{
291    /// Creates a new [`BarAggregatorCore`] instance.
292    ///
293    /// # Panics
294    ///
295    /// This function panics if:
296    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
297    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
298    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8, handler: H) -> Self {
299        Self {
300            bar_type,
301            builder: BarBuilder::new(bar_type, price_precision, size_precision),
302            handler,
303            handler_backup: None,
304            batch_handler: None,
305            is_running: false,
306            batch_mode: false,
307        }
308    }
309
310    /// Sets the running state of the aggregator (receives updates when `true`).
311    pub const fn set_is_running(&mut self, value: bool) {
312        self.is_running = value;
313    }
314
315    fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
316        self.builder.update(price, size, ts_init);
317    }
318
319    fn build_now_and_send(&mut self) {
320        let bar = self.builder.build_now();
321        (self.handler)(bar);
322    }
323
324    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
325        let bar = self.builder.build(ts_event, ts_init);
326
327        if self.batch_mode {
328            if let Some(handler) = &mut self.batch_handler {
329                handler(bar);
330            }
331        } else {
332            (self.handler)(bar);
333        }
334    }
335
336    /// Enables batch update mode, sending bars to the provided handler instead of immediate dispatch.
337    pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
338        self.batch_mode = true;
339        self.batch_handler = Some(handler);
340    }
341
342    /// Disables batch update mode and restores the original bar handler.
343    pub fn stop_batch_update(&mut self) {
344        self.batch_mode = false;
345
346        if let Some(handler) = self.handler_backup.take() {
347            self.handler = handler;
348        }
349    }
350}
351
352/// Provides a means of building tick bars aggregated from quote and trades.
353///
354/// When received tick count reaches the step threshold of the bar
355/// specification, then a bar is created and sent to the handler.
356pub struct TickBarAggregator<H>
357where
358    H: FnMut(Bar),
359{
360    core: BarAggregatorCore<H>,
361    cum_value: f64,
362}
363
364impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        f.debug_struct(stringify!(TickBarAggregator))
367            .field("core", &self.core)
368            .field("cum_value", &self.cum_value)
369            .finish()
370    }
371}
372
373impl<H> TickBarAggregator<H>
374where
375    H: FnMut(Bar),
376{
377    /// Creates a new [`TickBarAggregator`] instance.
378    ///
379    /// # Panics
380    ///
381    /// This function panics if:
382    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
383    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
384    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8, handler: H) -> Self {
385        Self {
386            core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
387            cum_value: 0.0,
388        }
389    }
390}
391
392impl<H> BarAggregator for TickBarAggregator<H>
393where
394    H: FnMut(Bar) + 'static,
395{
396    fn bar_type(&self) -> BarType {
397        self.core.bar_type
398    }
399
400    fn is_running(&self) -> bool {
401        self.core.is_running
402    }
403
404    fn set_is_running(&mut self, value: bool) {
405        self.core.set_is_running(value);
406    }
407
408    /// Apply the given update to the aggregator.
409    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
410        self.core.apply_update(price, size, ts_init);
411        let spec = self.core.bar_type.spec();
412
413        if self.core.builder.count >= spec.step.get() {
414            self.core.build_now_and_send();
415        }
416    }
417
418    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
419        let mut volume_update = volume;
420        let average_price = Price::new(
421            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
422            self.core.builder.price_precision,
423        );
424
425        while volume_update.as_f64() > 0.0 {
426            let value_update = average_price.as_f64() * volume_update.as_f64();
427            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
428                self.cum_value += value_update;
429                self.core.builder.update_bar(bar, volume_update, ts_init);
430                break;
431            }
432
433            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
434            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
435            self.core.builder.update_bar(
436                bar,
437                Quantity::new(volume_diff, volume_update.precision),
438                ts_init,
439            );
440
441            self.core.build_now_and_send();
442            self.cum_value = 0.0;
443            volume_update = Quantity::new(
444                volume_update.as_f64() - volume_diff,
445                volume_update.precision,
446            );
447        }
448    }
449
450    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
451        self.core.start_batch_update(handler);
452    }
453
454    fn stop_batch_update(&mut self) {
455        self.core.stop_batch_update();
456    }
457}
458
459/// Provides a means of building volume bars aggregated from quote and trades.
460pub struct VolumeBarAggregator<H>
461where
462    H: FnMut(Bar),
463{
464    core: BarAggregatorCore<H>,
465}
466
467impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
468    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469        f.debug_struct(stringify!(VolumeBarAggregator))
470            .field("core", &self.core)
471            .finish()
472    }
473}
474
475impl<H> VolumeBarAggregator<H>
476where
477    H: FnMut(Bar),
478{
479    /// Creates a new [`VolumeBarAggregator`] instance.
480    ///
481    /// # Panics
482    ///
483    /// This function panics if:
484    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
485    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
486    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8, handler: H) -> Self {
487        Self {
488            core: BarAggregatorCore::new(
489                bar_type.standard(),
490                price_precision,
491                size_precision,
492                handler,
493            ),
494        }
495    }
496}
497
498impl<H> BarAggregator for VolumeBarAggregator<H>
499where
500    H: FnMut(Bar) + 'static,
501{
502    fn bar_type(&self) -> BarType {
503        self.core.bar_type
504    }
505
506    fn is_running(&self) -> bool {
507        self.core.is_running
508    }
509
510    fn set_is_running(&mut self, value: bool) {
511        self.core.set_is_running(value);
512    }
513
514    /// Apply the given update to the aggregator.
515    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
516        let mut raw_size_update = size.raw;
517        let spec = self.core.bar_type.spec();
518        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
519
520        while raw_size_update > 0 {
521            if self.core.builder.volume.raw + raw_size_update < raw_step {
522                self.core.apply_update(
523                    price,
524                    Quantity::from_raw(raw_size_update, size.precision),
525                    ts_init,
526                );
527                break;
528            }
529
530            let raw_size_diff = raw_step - self.core.builder.volume.raw;
531            self.core.apply_update(
532                price,
533                Quantity::from_raw(raw_size_diff, size.precision),
534                ts_init,
535            );
536
537            self.core.build_now_and_send();
538            raw_size_update -= raw_size_diff;
539        }
540    }
541
542    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
543        let mut raw_volume_update = volume.raw;
544        let spec = self.core.bar_type.spec();
545        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
546
547        while raw_volume_update > 0 {
548            if self.core.builder.volume.raw + raw_volume_update < raw_step {
549                self.core.builder.update_bar(
550                    bar,
551                    Quantity::from_raw(raw_volume_update, volume.precision),
552                    ts_init,
553                );
554                break;
555            }
556
557            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
558            self.core.builder.update_bar(
559                bar,
560                Quantity::from_raw(raw_volume_diff, volume.precision),
561                ts_init,
562            );
563
564            self.core.build_now_and_send();
565            raw_volume_update -= raw_volume_diff;
566        }
567    }
568
569    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
570        self.core.start_batch_update(handler);
571    }
572
573    fn stop_batch_update(&mut self) {
574        self.core.stop_batch_update();
575    }
576}
577
578/// Provides a means of building value bars aggregated from quote and trades.
579///
580/// When received value reaches the step threshold of the bar
581/// specification, then a bar is created and sent to the handler.
582pub struct ValueBarAggregator<H>
583where
584    H: FnMut(Bar),
585{
586    core: BarAggregatorCore<H>,
587    cum_value: f64,
588}
589
590impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
591    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592        f.debug_struct(stringify!(ValueBarAggregator))
593            .field("core", &self.core)
594            .field("cum_value", &self.cum_value)
595            .finish()
596    }
597}
598
599impl<H> ValueBarAggregator<H>
600where
601    H: FnMut(Bar),
602{
603    /// Creates a new [`ValueBarAggregator`] instance.
604    ///
605    /// # Panics
606    ///
607    /// This function panics if:
608    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
609    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
610    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8, handler: H) -> Self {
611        Self {
612            core: BarAggregatorCore::new(
613                bar_type.standard(),
614                price_precision,
615                size_precision,
616                handler,
617            ),
618            cum_value: 0.0,
619        }
620    }
621
622    #[must_use]
623    /// Returns the cumulative value for the aggregator.
624    pub const fn get_cumulative_value(&self) -> f64 {
625        self.cum_value
626    }
627}
628
629impl<H> BarAggregator for ValueBarAggregator<H>
630where
631    H: FnMut(Bar) + 'static,
632{
633    fn bar_type(&self) -> BarType {
634        self.core.bar_type
635    }
636
637    fn is_running(&self) -> bool {
638        self.core.is_running
639    }
640
641    fn set_is_running(&mut self, value: bool) {
642        self.core.set_is_running(value);
643    }
644
645    /// Apply the given update to the aggregator.
646    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
647        let mut size_update = size.as_f64();
648        let spec = self.core.bar_type.spec();
649
650        while size_update > 0.0 {
651            let value_update = price.as_f64() * size_update;
652            if self.cum_value + value_update < spec.step.get() as f64 {
653                self.cum_value += value_update;
654                self.core
655                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
656                break;
657            }
658
659            let value_diff = spec.step.get() as f64 - self.cum_value;
660            let size_diff = size_update * (value_diff / value_update);
661            self.core
662                .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
663
664            self.core.build_now_and_send();
665            self.cum_value = 0.0;
666            size_update -= size_diff;
667        }
668    }
669
670    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
671        let mut volume_update = volume;
672        let average_price = Price::new(
673            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
674            self.core.builder.price_precision,
675        );
676
677        while volume_update.as_f64() > 0.0 {
678            let value_update = average_price.as_f64() * volume_update.as_f64();
679            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
680                self.cum_value += value_update;
681                self.core.builder.update_bar(bar, volume_update, ts_init);
682                break;
683            }
684
685            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
686            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
687            self.core.builder.update_bar(
688                bar,
689                Quantity::new(volume_diff, volume_update.precision),
690                ts_init,
691            );
692
693            self.core.build_now_and_send();
694            self.cum_value = 0.0;
695            volume_update = Quantity::new(
696                volume_update.as_f64() - volume_diff,
697                volume_update.precision,
698            );
699        }
700    }
701
702    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
703        self.core.start_batch_update(handler);
704    }
705
706    fn stop_batch_update(&mut self) {
707        self.core.stop_batch_update();
708    }
709}
710
711/// Provides a means of building Renko bars aggregated from quote and trades.
712///
713/// Renko bars are created when the price moves by a fixed amount (brick size)
714/// regardless of time or volume. Each bar represents a price movement equal
715/// to the step size in the bar specification.
716pub struct RenkoBarAggregator<H>
717where
718    H: FnMut(Bar),
719{
720    core: BarAggregatorCore<H>,
721    pub brick_size: PriceRaw,
722    last_close: Option<Price>,
723}
724
725impl<H: FnMut(Bar)> Debug for RenkoBarAggregator<H> {
726    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727        f.debug_struct(stringify!(RenkoBarAggregator))
728            .field("core", &self.core)
729            .field("brick_size", &self.brick_size)
730            .field("last_close", &self.last_close)
731            .finish()
732    }
733}
734
735impl<H> RenkoBarAggregator<H>
736where
737    H: FnMut(Bar),
738{
739    /// Creates a new [`RenkoBarAggregator`] instance.
740    ///
741    /// # Panics
742    ///
743    /// This function panics if:
744    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
745    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
746    pub fn new(
747        bar_type: BarType,
748        price_precision: u8,
749        size_precision: u8,
750        price_increment: Price,
751        handler: H,
752    ) -> Self {
753        // Calculate brick size in raw price units (step * price_increment.raw)
754        let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
755
756        Self {
757            core: BarAggregatorCore::new(
758                bar_type.standard(),
759                price_precision,
760                size_precision,
761                handler,
762            ),
763            brick_size,
764            last_close: None,
765        }
766    }
767}
768
769impl<H> BarAggregator for RenkoBarAggregator<H>
770where
771    H: FnMut(Bar) + 'static,
772{
773    fn bar_type(&self) -> BarType {
774        self.core.bar_type
775    }
776
777    fn is_running(&self) -> bool {
778        self.core.is_running
779    }
780
781    fn set_is_running(&mut self, value: bool) {
782        self.core.set_is_running(value);
783    }
784
785    /// Apply the given update to the aggregator.
786    ///
787    /// For Renko bars, we check if the price movement from the last close
788    /// is greater than or equal to the brick size. If so, we create new bars.
789    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
790        // Always update the builder with the current tick
791        self.core.apply_update(price, size, ts_init);
792
793        // Initialize last_close if this is the first update
794        if self.last_close.is_none() {
795            self.last_close = Some(price);
796            return;
797        }
798
799        let last_close = self.last_close.unwrap();
800
801        // Convert prices to raw units (integers) to avoid floating point precision issues
802        let current_raw = price.raw;
803        let last_close_raw = last_close.raw;
804        let price_diff_raw = current_raw - last_close_raw;
805        let abs_price_diff_raw = price_diff_raw.abs();
806
807        // Check if we need to create one or more Renko bars
808        if abs_price_diff_raw >= self.brick_size {
809            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
810            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
811            let mut current_close = last_close;
812
813            // Store the current builder volume to distribute across bricks
814            let total_volume = self.core.builder.volume;
815
816            for _i in 0..num_bricks {
817                // Calculate the close price for this brick using raw price units
818                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
819                let brick_close = Price::from_raw(brick_close_raw, price.precision);
820
821                // For Renko bars: open = previous close, high/low depend on direction
822                let (brick_high, brick_low) = if direction > 0.0 {
823                    (brick_close, current_close)
824                } else {
825                    (current_close, brick_close)
826                };
827
828                // Reset builder for this brick
829                self.core.builder.reset();
830                self.core.builder.open = Some(current_close);
831                self.core.builder.high = Some(brick_high);
832                self.core.builder.low = Some(brick_low);
833                self.core.builder.close = Some(brick_close);
834                self.core.builder.volume = total_volume; // Each brick gets the full volume
835                self.core.builder.count = 1;
836                self.core.builder.ts_last = ts_init;
837                self.core.builder.initialized = true;
838
839                // Build and send the bar
840                self.core.build_and_send(ts_init, ts_init);
841
842                // Update for the next brick
843                current_close = brick_close;
844                self.last_close = Some(brick_close);
845            }
846        }
847    }
848
849    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
850        // Always update the builder with the current bar
851        self.core.builder.update_bar(bar, volume, ts_init);
852
853        // Initialize last_close if this is the first update
854        if self.last_close.is_none() {
855            self.last_close = Some(bar.close);
856            return;
857        }
858
859        let last_close = self.last_close.unwrap();
860
861        // Convert prices to raw units (integers) to avoid floating point precision issues
862        let current_raw = bar.close.raw;
863        let last_close_raw = last_close.raw;
864        let price_diff_raw = current_raw - last_close_raw;
865        let abs_price_diff_raw = price_diff_raw.abs();
866
867        // Check if we need to create one or more Renko bars
868        if abs_price_diff_raw >= self.brick_size {
869            let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
870            let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
871            let mut current_close = last_close;
872
873            // Store the current builder volume to distribute across bricks
874            let total_volume = self.core.builder.volume;
875
876            for _i in 0..num_bricks {
877                // Calculate the close price for this brick using raw price units
878                let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
879                let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
880
881                // For Renko bars: open = previous close, high/low depend on direction
882                let (brick_high, brick_low) = if direction > 0.0 {
883                    (brick_close, current_close)
884                } else {
885                    (current_close, brick_close)
886                };
887
888                // Reset builder for this brick
889                self.core.builder.reset();
890                self.core.builder.open = Some(current_close);
891                self.core.builder.high = Some(brick_high);
892                self.core.builder.low = Some(brick_low);
893                self.core.builder.close = Some(brick_close);
894                self.core.builder.volume = total_volume; // Each brick gets the full volume
895                self.core.builder.count = 1;
896                self.core.builder.ts_last = ts_init;
897                self.core.builder.initialized = true;
898
899                // Build and send the bar
900                self.core.build_and_send(ts_init, ts_init);
901
902                // Update for the next brick
903                current_close = brick_close;
904                self.last_close = Some(brick_close);
905            }
906        }
907    }
908
909    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
910        self.core.start_batch_update(handler);
911    }
912
913    fn stop_batch_update(&mut self) {
914        self.core.stop_batch_update();
915    }
916}
917
918/// Provides a means of building time bars aggregated from quote and trades.
919///
920/// At each aggregation time interval, a bar is created and sent to the handler.
921pub struct TimeBarAggregator<H>
922where
923    H: FnMut(Bar),
924{
925    core: BarAggregatorCore<H>,
926    clock: Rc<RefCell<dyn Clock>>,
927    build_with_no_updates: bool,
928    timestamp_on_close: bool,
929    is_left_open: bool,
930    build_on_next_tick: bool,
931    stored_open_ns: UnixNanos,
932    stored_close_ns: UnixNanos,
933    timer_name: String,
934    interval_ns: UnixNanos,
935    next_close_ns: UnixNanos,
936    bar_build_delay: u64,
937    batch_open_ns: UnixNanos,
938    batch_next_close_ns: UnixNanos,
939    time_bars_origin_offset: Option<TimeDelta>,
940    skip_first_non_full_bar: bool,
941}
942
943impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
944    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
945        f.debug_struct(stringify!(TimeBarAggregator))
946            .field("core", &self.core)
947            .field("build_with_no_updates", &self.build_with_no_updates)
948            .field("timestamp_on_close", &self.timestamp_on_close)
949            .field("is_left_open", &self.is_left_open)
950            .field("timer_name", &self.timer_name)
951            .field("interval_ns", &self.interval_ns)
952            .field("bar_build_delay", &self.bar_build_delay)
953            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
954            .finish()
955    }
956}
957
958#[derive(Clone, Debug)]
959/// Callback wrapper for time-based bar aggregation events.
960///
961/// This struct provides a bridge between timer events and time bar aggregation,
962/// allowing bars to be automatically built and emitted at regular time intervals.
963/// It holds a reference to a `TimeBarAggregator` and triggers bar creation when
964/// timer events occur.
965pub struct NewBarCallback<H: FnMut(Bar)> {
966    aggregator: WeakCell<TimeBarAggregator<H>>,
967}
968
969impl<H: FnMut(Bar)> NewBarCallback<H> {
970    /// Creates a new callback that invokes the time bar aggregator on timer events.
971    #[must_use]
972    pub fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
973        let shared: SharedCell<TimeBarAggregator<H>> = SharedCell::from(aggregator);
974        Self {
975            aggregator: shared.downgrade(),
976        }
977    }
978}
979
980impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
981    fn from(value: NewBarCallback<H>) -> Self {
982        Self::Rust(Rc::new(move |event: TimeEvent| {
983            if let Some(agg) = value.aggregator.upgrade() {
984                agg.borrow_mut().build_bar(event);
985            }
986        }))
987    }
988}
989
990impl<H> TimeBarAggregator<H>
991where
992    H: FnMut(Bar) + 'static,
993{
994    /// Creates a new [`TimeBarAggregator`] instance.
995    ///
996    /// # Panics
997    ///
998    /// This function panics if:
999    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
1000    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
1001    #[allow(clippy::too_many_arguments)]
1002    pub fn new(
1003        bar_type: BarType,
1004        price_precision: u8,
1005        size_precision: u8,
1006        clock: Rc<RefCell<dyn Clock>>,
1007        handler: H,
1008        build_with_no_updates: bool,
1009        timestamp_on_close: bool,
1010        interval_type: BarIntervalType,
1011        time_bars_origin_offset: Option<TimeDelta>,
1012        bar_build_delay: u64,
1013        skip_first_non_full_bar: bool,
1014    ) -> Self {
1015        let is_left_open = match interval_type {
1016            BarIntervalType::LeftOpen => true,
1017            BarIntervalType::RightOpen => false,
1018        };
1019
1020        let core = BarAggregatorCore::new(
1021            bar_type.standard(),
1022            price_precision,
1023            size_precision,
1024            handler,
1025        );
1026
1027        Self {
1028            core,
1029            clock,
1030            build_with_no_updates,
1031            timestamp_on_close,
1032            is_left_open,
1033            build_on_next_tick: false,
1034            stored_open_ns: UnixNanos::default(),
1035            stored_close_ns: UnixNanos::default(),
1036            timer_name: bar_type.to_string(),
1037            interval_ns: get_bar_interval_ns(&bar_type),
1038            next_close_ns: UnixNanos::default(),
1039            bar_build_delay,
1040            batch_open_ns: UnixNanos::default(),
1041            batch_next_close_ns: UnixNanos::default(),
1042            time_bars_origin_offset,
1043            skip_first_non_full_bar,
1044        }
1045    }
1046
1047    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if setting up the underlying clock timer fails.
1052    ///
1053    /// # Panics
1054    ///
1055    /// Panics if the underlying clock timer registration fails.
1056    pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
1057        let now = self.clock.borrow().utc_now();
1058        let mut start_time =
1059            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1060
1061        if start_time == now {
1062            self.skip_first_non_full_bar = false;
1063        }
1064
1065        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1066
1067        let spec = &self.bar_type().spec();
1068        let start_time_ns = UnixNanos::from(start_time);
1069
1070        if spec.aggregation == BarAggregation::Month {
1071            let step = spec.step.get() as u32;
1072            let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
1073
1074            self.clock
1075                .borrow_mut()
1076                .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1077                .expect(FAILED);
1078        } else {
1079            self.clock
1080                .borrow_mut()
1081                .set_timer_ns(
1082                    &self.timer_name,
1083                    self.interval_ns.as_u64(),
1084                    Some(start_time_ns),
1085                    None,
1086                    Some(callback.into()),
1087                    None,
1088                    None,
1089                )
1090                .expect(FAILED);
1091        }
1092
1093        log::debug!("Started timer {}", self.timer_name);
1094        Ok(())
1095    }
1096
1097    /// Stops the time bar aggregator.
1098    pub fn stop(&mut self) {
1099        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1100    }
1101
1102    /// Starts batch time for bar aggregation.
1103    ///
1104    /// # Panics
1105    ///
1106    /// Panics if month arithmetic operations fail for monthly aggregation intervals.
1107    pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1108        let spec = self.bar_type().spec();
1109        self.core.batch_mode = true;
1110
1111        let time = time_ns.to_datetime_utc();
1112        let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
1113        self.batch_open_ns = UnixNanos::from(start_time);
1114
1115        if spec.aggregation == BarAggregation::Month {
1116            let step = spec.step.get() as u32;
1117
1118            if self.batch_open_ns == time_ns {
1119                self.batch_open_ns =
1120                    subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1121            }
1122
1123            self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1124        } else {
1125            if self.batch_open_ns == time_ns {
1126                self.batch_open_ns -= self.interval_ns;
1127            }
1128
1129            self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1130        }
1131    }
1132
1133    const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1134        if self.is_left_open {
1135            if self.timestamp_on_close {
1136                close_ns
1137            } else {
1138                open_ns
1139            }
1140        } else {
1141            open_ns
1142        }
1143    }
1144
1145    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1146        if self.skip_first_non_full_bar {
1147            self.core.builder.reset();
1148            self.skip_first_non_full_bar = false;
1149        } else {
1150            self.core.build_and_send(ts_event, ts_init);
1151        }
1152    }
1153
1154    fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1155        if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1156            let ts_init = self.batch_next_close_ns;
1157            let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1158            self.build_and_send(ts_event, ts_init);
1159        }
1160    }
1161
1162    fn batch_post_update(&mut self, time_ns: UnixNanos) {
1163        let step = self.bar_type().spec().step.get() as u32;
1164
1165        // If not in batch mode and time matches next close, reset batch close
1166        if !self.core.batch_mode
1167            && time_ns == self.batch_next_close_ns
1168            && time_ns > self.stored_open_ns
1169        {
1170            self.batch_next_close_ns = UnixNanos::default();
1171            return;
1172        }
1173
1174        if time_ns > self.batch_next_close_ns {
1175            // Ensure batch times are coherent with last builder update
1176            if self.bar_type().spec().aggregation == BarAggregation::Month {
1177                while self.batch_next_close_ns < time_ns {
1178                    self.batch_next_close_ns =
1179                        add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1180                }
1181
1182                self.batch_open_ns =
1183                    subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1184            } else {
1185                while self.batch_next_close_ns < time_ns {
1186                    self.batch_next_close_ns += self.interval_ns;
1187                }
1188
1189                self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1190            }
1191        }
1192
1193        if time_ns == self.batch_next_close_ns {
1194            let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1195            self.build_and_send(ts_event, time_ns);
1196            self.batch_open_ns = self.batch_next_close_ns;
1197
1198            if self.bar_type().spec().aggregation == BarAggregation::Month {
1199                self.batch_next_close_ns =
1200                    add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1201            } else {
1202                self.batch_next_close_ns += self.interval_ns;
1203            }
1204        }
1205
1206        // Delay resetting batch_next_close_ns to allow creating a last historical bar when transitioning to regular bars
1207        if !self.core.batch_mode {
1208            self.batch_next_close_ns = UnixNanos::default();
1209        }
1210    }
1211
1212    fn build_bar(&mut self, event: TimeEvent) {
1213        if !self.core.builder.initialized {
1214            self.build_on_next_tick = true;
1215            self.stored_close_ns = self.next_close_ns;
1216            return;
1217        }
1218
1219        if !self.build_with_no_updates && self.core.builder.count == 0 {
1220            return;
1221        }
1222
1223        let ts_init = event.ts_event;
1224        let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1225        self.build_and_send(ts_event, ts_init);
1226
1227        self.stored_open_ns = ts_init;
1228
1229        if self.bar_type().spec().aggregation == BarAggregation::Month {
1230            let step = self.bar_type().spec().step.get() as u32;
1231            let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1232
1233            self.clock
1234                .borrow_mut()
1235                .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1236                .expect(FAILED);
1237
1238            self.next_close_ns = next_alert_ns;
1239        } else {
1240            self.next_close_ns = self
1241                .clock
1242                .borrow()
1243                .next_time_ns(&self.timer_name)
1244                .unwrap_or_default();
1245        }
1246    }
1247}
1248
1249impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1250where
1251    H: FnMut(Bar) + 'static,
1252{
1253    fn bar_type(&self) -> BarType {
1254        self.core.bar_type
1255    }
1256
1257    fn is_running(&self) -> bool {
1258        self.core.is_running
1259    }
1260
1261    fn set_is_running(&mut self, value: bool) {
1262        self.core.set_is_running(value);
1263    }
1264
1265    /// Stop time-based aggregator by canceling its timer.
1266    fn stop(&mut self) {
1267        Self::stop(self);
1268    }
1269
1270    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1271        if self.batch_next_close_ns != UnixNanos::default() {
1272            self.batch_pre_update(ts_init);
1273        }
1274
1275        self.core.apply_update(price, size, ts_init);
1276
1277        if self.build_on_next_tick {
1278            if ts_init <= self.stored_close_ns {
1279                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1280                self.build_and_send(ts_event, ts_init);
1281            }
1282
1283            self.build_on_next_tick = false;
1284            self.stored_close_ns = UnixNanos::default();
1285        }
1286
1287        if self.batch_next_close_ns != UnixNanos::default() {
1288            self.batch_post_update(ts_init);
1289        }
1290    }
1291
1292    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1293        if self.batch_next_close_ns != UnixNanos::default() {
1294            self.batch_pre_update(ts_init);
1295        }
1296
1297        self.core.builder.update_bar(bar, volume, ts_init);
1298
1299        if self.build_on_next_tick {
1300            if ts_init <= self.stored_close_ns {
1301                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1302                self.build_and_send(ts_event, ts_init);
1303            }
1304
1305            // Reset flag and clear stored close
1306            self.build_on_next_tick = false;
1307            self.stored_close_ns = UnixNanos::default();
1308        }
1309
1310        if self.batch_next_close_ns != UnixNanos::default() {
1311            self.batch_post_update(ts_init);
1312        }
1313    }
1314
1315    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1316        self.core.start_batch_update(handler);
1317        self.start_batch_time(time_ns);
1318    }
1319
1320    fn stop_batch_update(&mut self) {
1321        self.core.stop_batch_update();
1322    }
1323}
1324
1325////////////////////////////////////////////////////////////////////////////////
1326// Tests
1327////////////////////////////////////////////////////////////////////////////////
1328#[cfg(test)]
1329mod tests {
1330    use std::sync::{Arc, Mutex};
1331
1332    use nautilus_common::clock::TestClock;
1333    use nautilus_core::UUID4;
1334    use nautilus_model::{
1335        data::{BarSpecification, BarType},
1336        enums::{AggregationSource, BarAggregation, PriceType},
1337        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1338        types::{Price, Quantity},
1339    };
1340    use rstest::rstest;
1341    use ustr::Ustr;
1342
1343    use super::*;
1344
1345    #[rstest]
1346    fn test_bar_builder_initialization(equity_aapl: Equity) {
1347        let instrument = InstrumentAny::Equity(equity_aapl);
1348        let bar_type = BarType::new(
1349            instrument.id(),
1350            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1351            AggregationSource::Internal,
1352        );
1353        let builder = BarBuilder::new(
1354            bar_type,
1355            instrument.price_precision(),
1356            instrument.size_precision(),
1357        );
1358
1359        assert!(!builder.initialized);
1360        assert_eq!(builder.ts_last, 0);
1361        assert_eq!(builder.count, 0);
1362    }
1363
1364    #[rstest]
1365    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1366        let instrument = InstrumentAny::Equity(equity_aapl);
1367        let bar_type = BarType::new(
1368            instrument.id(),
1369            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1370            AggregationSource::Internal,
1371        );
1372        let mut builder = BarBuilder::new(
1373            bar_type,
1374            instrument.price_precision(),
1375            instrument.size_precision(),
1376        );
1377
1378        builder.update(
1379            Price::from("100.00"),
1380            Quantity::from(1),
1381            UnixNanos::from(1000),
1382        );
1383        builder.update(
1384            Price::from("95.00"),
1385            Quantity::from(1),
1386            UnixNanos::from(2000),
1387        );
1388        builder.update(
1389            Price::from("105.00"),
1390            Quantity::from(1),
1391            UnixNanos::from(3000),
1392        );
1393
1394        let bar = builder.build_now();
1395        assert!(bar.high > bar.low);
1396        assert_eq!(bar.open, Price::from("100.00"));
1397        assert_eq!(bar.high, Price::from("105.00"));
1398        assert_eq!(bar.low, Price::from("95.00"));
1399        assert_eq!(bar.close, Price::from("105.00"));
1400    }
1401
1402    #[rstest]
1403    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1404        let instrument = InstrumentAny::Equity(equity_aapl);
1405        let bar_type = BarType::new(
1406            instrument.id(),
1407            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1408            AggregationSource::Internal,
1409        );
1410        let mut builder = BarBuilder::new(
1411            bar_type,
1412            instrument.price_precision(),
1413            instrument.size_precision(),
1414        );
1415
1416        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1417        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1418
1419        assert_eq!(builder.ts_last, 1_000);
1420        assert_eq!(builder.count, 1);
1421    }
1422
1423    #[rstest]
1424    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1425        let instrument = InstrumentAny::Equity(equity_aapl);
1426        let bar_type = BarType::new(
1427            instrument.id(),
1428            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1429            AggregationSource::Internal,
1430        );
1431        let mut builder = BarBuilder::new(
1432            bar_type,
1433            instrument.price_precision(),
1434            instrument.size_precision(),
1435        );
1436
1437        builder.update(
1438            Price::from("1.00000"),
1439            Quantity::from(1),
1440            UnixNanos::default(),
1441        );
1442
1443        assert!(builder.initialized);
1444        assert_eq!(builder.ts_last, 0);
1445        assert_eq!(builder.count, 1);
1446    }
1447
1448    #[rstest]
1449    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1450        equity_aapl: Equity,
1451    ) {
1452        let instrument = InstrumentAny::Equity(equity_aapl);
1453        let bar_type = BarType::new(
1454            instrument.id(),
1455            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1456            AggregationSource::Internal,
1457        );
1458        let mut builder = BarBuilder::new(bar_type, 2, 0);
1459
1460        builder.update(
1461            Price::from("1.00000"),
1462            Quantity::from(1),
1463            UnixNanos::from(1_000),
1464        );
1465        builder.update(
1466            Price::from("1.00001"),
1467            Quantity::from(1),
1468            UnixNanos::from(500),
1469        );
1470
1471        assert!(builder.initialized);
1472        assert_eq!(builder.ts_last, 1_000);
1473        assert_eq!(builder.count, 1);
1474    }
1475
1476    #[rstest]
1477    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1478        let instrument = InstrumentAny::Equity(equity_aapl);
1479        let bar_type = BarType::new(
1480            instrument.id(),
1481            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1482            AggregationSource::Internal,
1483        );
1484        let mut builder = BarBuilder::new(
1485            bar_type,
1486            instrument.price_precision(),
1487            instrument.size_precision(),
1488        );
1489
1490        for _ in 0..5 {
1491            builder.update(
1492                Price::from("1.00000"),
1493                Quantity::from(1),
1494                UnixNanos::from(1_000),
1495            );
1496        }
1497
1498        assert_eq!(builder.count, 5);
1499    }
1500
1501    #[rstest]
1502    #[should_panic]
1503    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1504        let instrument = InstrumentAny::Equity(equity_aapl);
1505        let bar_type = BarType::new(
1506            instrument.id(),
1507            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1508            AggregationSource::Internal,
1509        );
1510        let mut builder = BarBuilder::new(
1511            bar_type,
1512            instrument.price_precision(),
1513            instrument.size_precision(),
1514        );
1515        let _ = builder.build_now();
1516    }
1517
1518    #[rstest]
1519    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1520        let instrument = InstrumentAny::Equity(equity_aapl);
1521        let bar_type = BarType::new(
1522            instrument.id(),
1523            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1524            AggregationSource::Internal,
1525        );
1526        let mut builder = BarBuilder::new(
1527            bar_type,
1528            instrument.price_precision(),
1529            instrument.size_precision(),
1530        );
1531
1532        builder.update(
1533            Price::from("1.00001"),
1534            Quantity::from(2),
1535            UnixNanos::default(),
1536        );
1537        builder.update(
1538            Price::from("1.00002"),
1539            Quantity::from(2),
1540            UnixNanos::default(),
1541        );
1542        builder.update(
1543            Price::from("1.00000"),
1544            Quantity::from(1),
1545            UnixNanos::from(1_000_000_000),
1546        );
1547
1548        let bar = builder.build_now();
1549
1550        assert_eq!(bar.open, Price::from("1.00001"));
1551        assert_eq!(bar.high, Price::from("1.00002"));
1552        assert_eq!(bar.low, Price::from("1.00000"));
1553        assert_eq!(bar.close, Price::from("1.00000"));
1554        assert_eq!(bar.volume, Quantity::from(5));
1555        assert_eq!(bar.ts_init, 1_000_000_000);
1556        assert_eq!(builder.ts_last, 1_000_000_000);
1557        assert_eq!(builder.count, 0);
1558    }
1559
1560    #[rstest]
1561    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1562        let instrument = InstrumentAny::Equity(equity_aapl);
1563        let bar_type = BarType::new(
1564            instrument.id(),
1565            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1566            AggregationSource::Internal,
1567        );
1568        let mut builder = BarBuilder::new(bar_type, 2, 0);
1569
1570        builder.update(
1571            Price::from("1.00001"),
1572            Quantity::from(1),
1573            UnixNanos::default(),
1574        );
1575        builder.build_now();
1576
1577        builder.update(
1578            Price::from("1.00000"),
1579            Quantity::from(1),
1580            UnixNanos::default(),
1581        );
1582        builder.update(
1583            Price::from("1.00003"),
1584            Quantity::from(1),
1585            UnixNanos::default(),
1586        );
1587        builder.update(
1588            Price::from("1.00002"),
1589            Quantity::from(1),
1590            UnixNanos::default(),
1591        );
1592
1593        let bar = builder.build_now();
1594
1595        assert_eq!(bar.open, Price::from("1.00000"));
1596        assert_eq!(bar.high, Price::from("1.00003"));
1597        assert_eq!(bar.low, Price::from("1.00000"));
1598        assert_eq!(bar.close, Price::from("1.00002"));
1599        assert_eq!(bar.volume, Quantity::from(3));
1600    }
1601
1602    #[rstest]
1603    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1604        let instrument = InstrumentAny::Equity(equity_aapl);
1605        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1606        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1607        let handler = Arc::new(Mutex::new(Vec::new()));
1608        let handler_clone = Arc::clone(&handler);
1609
1610        let mut aggregator = TickBarAggregator::new(
1611            bar_type,
1612            instrument.price_precision(),
1613            instrument.size_precision(),
1614            move |bar: Bar| {
1615                let mut handler_guard = handler_clone.lock().unwrap();
1616                handler_guard.push(bar);
1617            },
1618        );
1619
1620        let trade = TradeTick::default();
1621        aggregator.handle_trade(trade);
1622
1623        let handler_guard = handler.lock().unwrap();
1624        assert_eq!(handler_guard.len(), 0);
1625    }
1626
1627    #[rstest]
1628    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1629        let instrument = InstrumentAny::Equity(equity_aapl);
1630        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1631        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1632        let handler = Arc::new(Mutex::new(Vec::new()));
1633        let handler_clone = Arc::clone(&handler);
1634
1635        let mut aggregator = TickBarAggregator::new(
1636            bar_type,
1637            instrument.price_precision(),
1638            instrument.size_precision(),
1639            move |bar: Bar| {
1640                let mut handler_guard = handler_clone.lock().unwrap();
1641                handler_guard.push(bar);
1642            },
1643        );
1644
1645        let trade = TradeTick::default();
1646        aggregator.handle_trade(trade);
1647        aggregator.handle_trade(trade);
1648        aggregator.handle_trade(trade);
1649
1650        let handler_guard = handler.lock().unwrap();
1651        let bar = handler_guard.first().unwrap();
1652        assert_eq!(handler_guard.len(), 1);
1653        assert_eq!(bar.open, trade.price);
1654        assert_eq!(bar.high, trade.price);
1655        assert_eq!(bar.low, trade.price);
1656        assert_eq!(bar.close, trade.price);
1657        assert_eq!(bar.volume, Quantity::from(300000));
1658        assert_eq!(bar.ts_event, trade.ts_event);
1659        assert_eq!(bar.ts_init, trade.ts_init);
1660    }
1661
1662    #[rstest]
1663    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1664        let instrument = InstrumentAny::Equity(equity_aapl);
1665        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1666        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1667        let handler = Arc::new(Mutex::new(Vec::new()));
1668        let handler_clone = Arc::clone(&handler);
1669
1670        let mut aggregator = TickBarAggregator::new(
1671            bar_type,
1672            instrument.price_precision(),
1673            instrument.size_precision(),
1674            move |bar: Bar| {
1675                let mut handler_guard = handler_clone.lock().unwrap();
1676                handler_guard.push(bar);
1677            },
1678        );
1679
1680        aggregator.update(
1681            Price::from("1.00001"),
1682            Quantity::from(1),
1683            UnixNanos::default(),
1684        );
1685        aggregator.update(
1686            Price::from("1.00002"),
1687            Quantity::from(1),
1688            UnixNanos::from(1000),
1689        );
1690        aggregator.update(
1691            Price::from("1.00003"),
1692            Quantity::from(1),
1693            UnixNanos::from(2000),
1694        );
1695
1696        let handler_guard = handler.lock().unwrap();
1697        assert_eq!(handler_guard.len(), 1);
1698
1699        let bar = handler_guard.first().unwrap();
1700        assert_eq!(bar.open, Price::from("1.00001"));
1701        assert_eq!(bar.high, Price::from("1.00003"));
1702        assert_eq!(bar.low, Price::from("1.00001"));
1703        assert_eq!(bar.close, Price::from("1.00003"));
1704        assert_eq!(bar.volume, Quantity::from(3));
1705    }
1706
1707    #[rstest]
1708    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1709        let instrument = InstrumentAny::Equity(equity_aapl);
1710        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1711        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1712        let handler = Arc::new(Mutex::new(Vec::new()));
1713        let handler_clone = Arc::clone(&handler);
1714
1715        let mut aggregator = TickBarAggregator::new(
1716            bar_type,
1717            instrument.price_precision(),
1718            instrument.size_precision(),
1719            move |bar: Bar| {
1720                let mut handler_guard = handler_clone.lock().unwrap();
1721                handler_guard.push(bar);
1722            },
1723        );
1724
1725        aggregator.update(
1726            Price::from("1.00001"),
1727            Quantity::from(1),
1728            UnixNanos::default(),
1729        );
1730        aggregator.update(
1731            Price::from("1.00002"),
1732            Quantity::from(1),
1733            UnixNanos::from(1000),
1734        );
1735        aggregator.update(
1736            Price::from("1.00003"),
1737            Quantity::from(1),
1738            UnixNanos::from(2000),
1739        );
1740        aggregator.update(
1741            Price::from("1.00004"),
1742            Quantity::from(1),
1743            UnixNanos::from(3000),
1744        );
1745
1746        let handler_guard = handler.lock().unwrap();
1747        assert_eq!(handler_guard.len(), 2);
1748
1749        let bar1 = &handler_guard[0];
1750        assert_eq!(bar1.open, Price::from("1.00001"));
1751        assert_eq!(bar1.close, Price::from("1.00002"));
1752        assert_eq!(bar1.volume, Quantity::from(2));
1753
1754        let bar2 = &handler_guard[1];
1755        assert_eq!(bar2.open, Price::from("1.00003"));
1756        assert_eq!(bar2.close, Price::from("1.00004"));
1757        assert_eq!(bar2.volume, Quantity::from(2));
1758    }
1759
1760    #[rstest]
1761    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1762        let instrument = InstrumentAny::Equity(equity_aapl);
1763        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1764        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1765        let handler = Arc::new(Mutex::new(Vec::new()));
1766        let handler_clone = Arc::clone(&handler);
1767
1768        let mut aggregator = VolumeBarAggregator::new(
1769            bar_type,
1770            instrument.price_precision(),
1771            instrument.size_precision(),
1772            move |bar: Bar| {
1773                let mut handler_guard = handler_clone.lock().unwrap();
1774                handler_guard.push(bar);
1775            },
1776        );
1777
1778        aggregator.update(
1779            Price::from("1.00001"),
1780            Quantity::from(25),
1781            UnixNanos::default(),
1782        );
1783
1784        let handler_guard = handler.lock().unwrap();
1785        assert_eq!(handler_guard.len(), 2);
1786        let bar1 = &handler_guard[0];
1787        assert_eq!(bar1.volume, Quantity::from(10));
1788        let bar2 = &handler_guard[1];
1789        assert_eq!(bar2.volume, Quantity::from(10));
1790    }
1791
1792    #[rstest]
1793    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1794        let instrument = InstrumentAny::Equity(equity_aapl);
1795        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
1796        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1797        let handler = Arc::new(Mutex::new(Vec::new()));
1798        let handler_clone = Arc::clone(&handler);
1799
1800        let mut aggregator = ValueBarAggregator::new(
1801            bar_type,
1802            instrument.price_precision(),
1803            instrument.size_precision(),
1804            move |bar: Bar| {
1805                let mut handler_guard = handler_clone.lock().unwrap();
1806                handler_guard.push(bar);
1807            },
1808        );
1809
1810        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
1811        aggregator.update(
1812            Price::from("100.00"),
1813            Quantity::from(5),
1814            UnixNanos::default(),
1815        );
1816        aggregator.update(
1817            Price::from("100.00"),
1818            Quantity::from(5),
1819            UnixNanos::from(1000),
1820        );
1821
1822        let handler_guard = handler.lock().unwrap();
1823        assert_eq!(handler_guard.len(), 1);
1824        let bar = handler_guard.first().unwrap();
1825        assert_eq!(bar.volume, Quantity::from(10));
1826    }
1827
1828    #[rstest]
1829    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1830        let instrument = InstrumentAny::Equity(equity_aapl);
1831        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1832        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1833        let handler = Arc::new(Mutex::new(Vec::new()));
1834        let handler_clone = Arc::clone(&handler);
1835
1836        let mut aggregator = ValueBarAggregator::new(
1837            bar_type,
1838            instrument.price_precision(),
1839            instrument.size_precision(),
1840            move |bar: Bar| {
1841                let mut handler_guard = handler_clone.lock().unwrap();
1842                handler_guard.push(bar);
1843            },
1844        );
1845
1846        // Single large update: $100 * 25 = $2500 (should create 2 bars)
1847        aggregator.update(
1848            Price::from("100.00"),
1849            Quantity::from(25),
1850            UnixNanos::default(),
1851        );
1852
1853        let handler_guard = handler.lock().unwrap();
1854        assert_eq!(handler_guard.len(), 2);
1855        let remaining_value = aggregator.get_cumulative_value();
1856        assert!(remaining_value < 1000.0); // Should be less than threshold
1857    }
1858
1859    #[rstest]
1860    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1861        let instrument = InstrumentAny::Equity(equity_aapl);
1862        // One second bars
1863        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1864        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1865        let handler = Arc::new(Mutex::new(Vec::new()));
1866        let handler_clone = Arc::clone(&handler);
1867        let clock = Rc::new(RefCell::new(TestClock::new()));
1868
1869        let mut aggregator = TimeBarAggregator::new(
1870            bar_type,
1871            instrument.price_precision(),
1872            instrument.size_precision(),
1873            clock.clone(),
1874            move |bar: Bar| {
1875                let mut handler_guard = handler_clone.lock().unwrap();
1876                handler_guard.push(bar);
1877            },
1878            true,  // build_with_no_updates
1879            false, // timestamp_on_close
1880            BarIntervalType::LeftOpen,
1881            None,  // time_bars_origin_offset
1882            15,    // bar_build_delay
1883            false, // skip_first_non_full_bar
1884        );
1885
1886        aggregator.update(
1887            Price::from("100.00"),
1888            Quantity::from(1),
1889            UnixNanos::default(),
1890        );
1891
1892        let next_sec = UnixNanos::from(1_000_000_000);
1893        clock.borrow_mut().set_time(next_sec);
1894
1895        let event = TimeEvent::new(
1896            Ustr::from("1-SECOND-LAST"),
1897            UUID4::new(),
1898            next_sec,
1899            next_sec,
1900        );
1901        aggregator.build_bar(event);
1902
1903        let handler_guard = handler.lock().unwrap();
1904        assert_eq!(handler_guard.len(), 1);
1905        let bar = handler_guard.first().unwrap();
1906        assert_eq!(bar.ts_event, UnixNanos::default());
1907        assert_eq!(bar.ts_init, next_sec);
1908    }
1909
1910    #[rstest]
1911    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1912        let instrument = InstrumentAny::Equity(equity_aapl);
1913        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1914        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1915        let handler = Arc::new(Mutex::new(Vec::new()));
1916        let handler_clone = Arc::clone(&handler);
1917        let clock = Rc::new(RefCell::new(TestClock::new()));
1918
1919        let mut aggregator = TimeBarAggregator::new(
1920            bar_type,
1921            instrument.price_precision(),
1922            instrument.size_precision(),
1923            clock.clone(),
1924            move |bar: Bar| {
1925                let mut handler_guard = handler_clone.lock().unwrap();
1926                handler_guard.push(bar);
1927            },
1928            true, // build_with_no_updates
1929            true, // timestamp_on_close - changed to true to verify left-open behavior
1930            BarIntervalType::LeftOpen,
1931            None,
1932            15,
1933            false, // skip_first_non_full_bar
1934        );
1935
1936        // Update in first interval
1937        aggregator.update(
1938            Price::from("100.00"),
1939            Quantity::from(1),
1940            UnixNanos::default(),
1941        );
1942
1943        // First interval close
1944        let ts1 = UnixNanos::from(1_000_000_000);
1945        clock.borrow_mut().set_time(ts1);
1946        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1947        aggregator.build_bar(event);
1948
1949        // Update in second interval
1950        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1951
1952        // Second interval close
1953        let ts2 = UnixNanos::from(2_000_000_000);
1954        clock.borrow_mut().set_time(ts2);
1955        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1956        aggregator.build_bar(event);
1957
1958        let handler_guard = handler.lock().unwrap();
1959        assert_eq!(handler_guard.len(), 2);
1960
1961        let bar1 = &handler_guard[0];
1962        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
1963        assert_eq!(bar1.ts_init, ts1);
1964        assert_eq!(bar1.close, Price::from("100.00"));
1965        let bar2 = &handler_guard[1];
1966        assert_eq!(bar2.ts_event, ts2);
1967        assert_eq!(bar2.ts_init, ts2);
1968        assert_eq!(bar2.close, Price::from("101.00"));
1969    }
1970
1971    #[rstest]
1972    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
1973        let instrument = InstrumentAny::Equity(equity_aapl);
1974        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1975        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1976        let handler = Arc::new(Mutex::new(Vec::new()));
1977        let handler_clone = Arc::clone(&handler);
1978        let clock = Rc::new(RefCell::new(TestClock::new()));
1979        let mut aggregator = TimeBarAggregator::new(
1980            bar_type,
1981            instrument.price_precision(),
1982            instrument.size_precision(),
1983            clock.clone(),
1984            move |bar: Bar| {
1985                let mut handler_guard = handler_clone.lock().unwrap();
1986                handler_guard.push(bar);
1987            },
1988            true, // build_with_no_updates
1989            true, // timestamp_on_close
1990            BarIntervalType::RightOpen,
1991            None,
1992            15,
1993            false, // skip_first_non_full_bar
1994        );
1995
1996        // Update in first interval
1997        aggregator.update(
1998            Price::from("100.00"),
1999            Quantity::from(1),
2000            UnixNanos::default(),
2001        );
2002
2003        // First interval close
2004        let ts1 = UnixNanos::from(1_000_000_000);
2005        clock.borrow_mut().set_time(ts1);
2006        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2007        aggregator.build_bar(event);
2008
2009        // Update in second interval
2010        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2011
2012        // Second interval close
2013        let ts2 = UnixNanos::from(2_000_000_000);
2014        clock.borrow_mut().set_time(ts2);
2015        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2016        aggregator.build_bar(event);
2017
2018        let handler_guard = handler.lock().unwrap();
2019        assert_eq!(handler_guard.len(), 2);
2020
2021        let bar1 = &handler_guard[0];
2022        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
2023        assert_eq!(bar1.ts_init, ts1);
2024        assert_eq!(bar1.close, Price::from("100.00"));
2025
2026        let bar2 = &handler_guard[1];
2027        assert_eq!(bar2.ts_event, ts1);
2028        assert_eq!(bar2.ts_init, ts2);
2029        assert_eq!(bar2.close, Price::from("101.00"));
2030    }
2031
2032    #[rstest]
2033    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2034        let instrument = InstrumentAny::Equity(equity_aapl);
2035        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2036        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2037        let handler = Arc::new(Mutex::new(Vec::new()));
2038        let handler_clone = Arc::clone(&handler);
2039        let clock = Rc::new(RefCell::new(TestClock::new()));
2040
2041        // First test with build_with_no_updates = false
2042        let mut aggregator = TimeBarAggregator::new(
2043            bar_type,
2044            instrument.price_precision(),
2045            instrument.size_precision(),
2046            clock.clone(),
2047            move |bar: Bar| {
2048                let mut handler_guard = handler_clone.lock().unwrap();
2049                handler_guard.push(bar);
2050            },
2051            false, // build_with_no_updates disabled
2052            true,  // timestamp_on_close
2053            BarIntervalType::LeftOpen,
2054            None,
2055            15,
2056            false, // skip_first_non_full_bar
2057        );
2058
2059        // No updates, just interval close
2060        let ts1 = UnixNanos::from(1_000_000_000);
2061        clock.borrow_mut().set_time(ts1);
2062        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2063        aggregator.build_bar(event);
2064
2065        let handler_guard = handler.lock().unwrap();
2066        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
2067        drop(handler_guard);
2068
2069        // Now test with build_with_no_updates = true
2070        let handler = Arc::new(Mutex::new(Vec::new()));
2071        let handler_clone = Arc::clone(&handler);
2072        let mut aggregator = TimeBarAggregator::new(
2073            bar_type,
2074            instrument.price_precision(),
2075            instrument.size_precision(),
2076            clock.clone(),
2077            move |bar: Bar| {
2078                let mut handler_guard = handler_clone.lock().unwrap();
2079                handler_guard.push(bar);
2080            },
2081            true, // build_with_no_updates enabled
2082            true, // timestamp_on_close
2083            BarIntervalType::LeftOpen,
2084            None,
2085            15,
2086            false, // skip_first_non_full_bar
2087        );
2088
2089        aggregator.update(
2090            Price::from("100.00"),
2091            Quantity::from(1),
2092            UnixNanos::default(),
2093        );
2094
2095        // First interval with update
2096        let ts1 = UnixNanos::from(1_000_000_000);
2097        clock.borrow_mut().set_time(ts1);
2098        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2099        aggregator.build_bar(event);
2100
2101        // Second interval without updates
2102        let ts2 = UnixNanos::from(2_000_000_000);
2103        clock.borrow_mut().set_time(ts2);
2104        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2105        aggregator.build_bar(event);
2106
2107        let handler_guard = handler.lock().unwrap();
2108        assert_eq!(handler_guard.len(), 2); // Both bars should be built
2109        let bar1 = &handler_guard[0];
2110        assert_eq!(bar1.close, Price::from("100.00"));
2111        let bar2 = &handler_guard[1];
2112        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
2113    }
2114
2115    #[rstest]
2116    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2117        let instrument = InstrumentAny::Equity(equity_aapl);
2118        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2119        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2120        let clock = Rc::new(RefCell::new(TestClock::new()));
2121        let handler = Arc::new(Mutex::new(Vec::new()));
2122        let handler_clone = Arc::clone(&handler);
2123
2124        let mut aggregator = TimeBarAggregator::new(
2125            bar_type,
2126            instrument.price_precision(),
2127            instrument.size_precision(),
2128            clock.clone(),
2129            move |bar: Bar| {
2130                let mut handler_guard = handler_clone.lock().unwrap();
2131                handler_guard.push(bar);
2132            },
2133            true, // build_with_no_updates
2134            true, // timestamp_on_close
2135            BarIntervalType::RightOpen,
2136            None,
2137            15,
2138            false, // skip_first_non_full_bar
2139        );
2140
2141        let ts1 = UnixNanos::from(1_000_000_000);
2142        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2143
2144        let ts2 = UnixNanos::from(2_000_000_000);
2145        clock.borrow_mut().set_time(ts2);
2146
2147        // Simulate timestamp on close
2148        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2149        aggregator.build_bar(event);
2150
2151        let handler_guard = handler.lock().unwrap();
2152        let bar = handler_guard.first().unwrap();
2153        assert_eq!(bar.ts_event, UnixNanos::default());
2154        assert_eq!(bar.ts_init, ts2);
2155    }
2156
2157    #[rstest]
2158    fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2159        let instrument = InstrumentAny::Equity(equity_aapl);
2160        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2161        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2162        let clock = Rc::new(RefCell::new(TestClock::new()));
2163        let handler = Arc::new(Mutex::new(Vec::new()));
2164        let handler_clone = Arc::clone(&handler);
2165
2166        let mut aggregator = TimeBarAggregator::new(
2167            bar_type,
2168            instrument.price_precision(),
2169            instrument.size_precision(),
2170            clock.clone(),
2171            move |bar: Bar| {
2172                let mut handler_guard = handler_clone.lock().unwrap();
2173                handler_guard.push(bar);
2174            },
2175            true, // build_with_no_updates
2176            true, // timestamp_on_close
2177            BarIntervalType::LeftOpen,
2178            None,
2179            15,
2180            false, // skip_first_non_full_bar
2181        );
2182
2183        let ts1 = UnixNanos::from(1_000_000_000);
2184        clock.borrow_mut().set_time(ts1);
2185
2186        let initial_time = clock.borrow().utc_now();
2187        aggregator.start_batch_time(UnixNanos::from(
2188            initial_time.timestamp_nanos_opt().unwrap() as u64
2189        ));
2190
2191        let handler_guard = handler.lock().unwrap();
2192        assert_eq!(handler_guard.len(), 0);
2193    }
2194
2195    // ========================================================================
2196    // RenkoBarAggregator Tests
2197    // ========================================================================
2198
2199    #[rstest]
2200    fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
2201        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2202        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2203        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2204        let handler = Arc::new(Mutex::new(Vec::new()));
2205        let handler_clone = Arc::clone(&handler);
2206
2207        let aggregator = RenkoBarAggregator::new(
2208            bar_type,
2209            instrument.price_precision(),
2210            instrument.size_precision(),
2211            instrument.price_increment(),
2212            move |bar: Bar| {
2213                let mut handler_guard = handler_clone.lock().unwrap();
2214                handler_guard.push(bar);
2215            },
2216        );
2217
2218        assert_eq!(aggregator.bar_type(), bar_type);
2219        assert!(!aggregator.is_running());
2220        // 10 pips * price_increment.raw (depends on precision mode)
2221        let expected_brick_size = 10 * instrument.price_increment().raw;
2222        assert_eq!(aggregator.brick_size, expected_brick_size);
2223    }
2224
2225    #[rstest]
2226    fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
2227        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2228        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2229        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2230        let handler = Arc::new(Mutex::new(Vec::new()));
2231        let handler_clone = Arc::clone(&handler);
2232
2233        let mut aggregator = RenkoBarAggregator::new(
2234            bar_type,
2235            instrument.price_precision(),
2236            instrument.size_precision(),
2237            instrument.price_increment(),
2238            move |bar: Bar| {
2239                let mut handler_guard = handler_clone.lock().unwrap();
2240                handler_guard.push(bar);
2241            },
2242        );
2243
2244        // Small price movement (5 pips, less than 10 pip brick size)
2245        aggregator.update(
2246            Price::from("1.00000"),
2247            Quantity::from(1),
2248            UnixNanos::default(),
2249        );
2250        aggregator.update(
2251            Price::from("1.00005"),
2252            Quantity::from(1),
2253            UnixNanos::from(1000),
2254        );
2255
2256        let handler_guard = handler.lock().unwrap();
2257        assert_eq!(handler_guard.len(), 0); // No bar created yet
2258    }
2259
2260    #[rstest]
2261    fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
2262        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2263        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2264        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2265        let handler = Arc::new(Mutex::new(Vec::new()));
2266        let handler_clone = Arc::clone(&handler);
2267
2268        let mut aggregator = RenkoBarAggregator::new(
2269            bar_type,
2270            instrument.price_precision(),
2271            instrument.size_precision(),
2272            instrument.price_increment(),
2273            move |bar: Bar| {
2274                let mut handler_guard = handler_clone.lock().unwrap();
2275                handler_guard.push(bar);
2276            },
2277        );
2278
2279        // Price movement exceeding brick size (15 pips)
2280        aggregator.update(
2281            Price::from("1.00000"),
2282            Quantity::from(1),
2283            UnixNanos::default(),
2284        );
2285        aggregator.update(
2286            Price::from("1.00015"),
2287            Quantity::from(1),
2288            UnixNanos::from(1000),
2289        );
2290
2291        let handler_guard = handler.lock().unwrap();
2292        assert_eq!(handler_guard.len(), 1);
2293
2294        let bar = handler_guard.first().unwrap();
2295        assert_eq!(bar.open, Price::from("1.00000"));
2296        assert_eq!(bar.high, Price::from("1.00010"));
2297        assert_eq!(bar.low, Price::from("1.00000"));
2298        assert_eq!(bar.close, Price::from("1.00010"));
2299        assert_eq!(bar.volume, Quantity::from(2));
2300        assert_eq!(bar.ts_event, UnixNanos::from(1000));
2301        assert_eq!(bar.ts_init, UnixNanos::from(1000));
2302    }
2303
2304    #[rstest]
2305    fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
2306        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2307        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2308        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2309        let handler = Arc::new(Mutex::new(Vec::new()));
2310        let handler_clone = Arc::clone(&handler);
2311
2312        let mut aggregator = RenkoBarAggregator::new(
2313            bar_type,
2314            instrument.price_precision(),
2315            instrument.size_precision(),
2316            instrument.price_increment(),
2317            move |bar: Bar| {
2318                let mut handler_guard = handler_clone.lock().unwrap();
2319                handler_guard.push(bar);
2320            },
2321        );
2322
2323        // Large price movement creating multiple bricks (25 pips = 2 bricks)
2324        aggregator.update(
2325            Price::from("1.00000"),
2326            Quantity::from(1),
2327            UnixNanos::default(),
2328        );
2329        aggregator.update(
2330            Price::from("1.00025"),
2331            Quantity::from(1),
2332            UnixNanos::from(1000),
2333        );
2334
2335        let handler_guard = handler.lock().unwrap();
2336        assert_eq!(handler_guard.len(), 2);
2337
2338        let bar1 = &handler_guard[0];
2339        assert_eq!(bar1.open, Price::from("1.00000"));
2340        assert_eq!(bar1.high, Price::from("1.00010"));
2341        assert_eq!(bar1.low, Price::from("1.00000"));
2342        assert_eq!(bar1.close, Price::from("1.00010"));
2343
2344        let bar2 = &handler_guard[1];
2345        assert_eq!(bar2.open, Price::from("1.00010"));
2346        assert_eq!(bar2.high, Price::from("1.00020"));
2347        assert_eq!(bar2.low, Price::from("1.00010"));
2348        assert_eq!(bar2.close, Price::from("1.00020"));
2349    }
2350
2351    #[rstest]
2352    fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
2353        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2354        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2355        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2356        let handler = Arc::new(Mutex::new(Vec::new()));
2357        let handler_clone = Arc::clone(&handler);
2358
2359        let mut aggregator = RenkoBarAggregator::new(
2360            bar_type,
2361            instrument.price_precision(),
2362            instrument.size_precision(),
2363            instrument.price_increment(),
2364            move |bar: Bar| {
2365                let mut handler_guard = handler_clone.lock().unwrap();
2366                handler_guard.push(bar);
2367            },
2368        );
2369
2370        // Start at higher price and move down
2371        aggregator.update(
2372            Price::from("1.00020"),
2373            Quantity::from(1),
2374            UnixNanos::default(),
2375        );
2376        aggregator.update(
2377            Price::from("1.00005"),
2378            Quantity::from(1),
2379            UnixNanos::from(1000),
2380        );
2381
2382        let handler_guard = handler.lock().unwrap();
2383        assert_eq!(handler_guard.len(), 1);
2384
2385        let bar = handler_guard.first().unwrap();
2386        assert_eq!(bar.open, Price::from("1.00020"));
2387        assert_eq!(bar.high, Price::from("1.00020"));
2388        assert_eq!(bar.low, Price::from("1.00010"));
2389        assert_eq!(bar.close, Price::from("1.00010"));
2390        assert_eq!(bar.volume, Quantity::from(2));
2391    }
2392
2393    #[rstest]
2394    fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
2395        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2396        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2397        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2398        let handler = Arc::new(Mutex::new(Vec::new()));
2399        let handler_clone = Arc::clone(&handler);
2400
2401        let mut aggregator = RenkoBarAggregator::new(
2402            bar_type,
2403            instrument.price_precision(),
2404            instrument.size_precision(),
2405            instrument.price_increment(),
2406            move |bar: Bar| {
2407                let mut handler_guard = handler_clone.lock().unwrap();
2408                handler_guard.push(bar);
2409            },
2410        );
2411
2412        // Create a bar with small price movement (5 pips)
2413        let input_bar = Bar::new(
2414            BarType::new(
2415                instrument.id(),
2416                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2417                AggregationSource::Internal,
2418            ),
2419            Price::from("1.00000"),
2420            Price::from("1.00005"),
2421            Price::from("0.99995"),
2422            Price::from("1.00005"), // 5 pip move up (less than 10 pip brick)
2423            Quantity::from(100),
2424            UnixNanos::default(),
2425            UnixNanos::from(1000),
2426        );
2427
2428        aggregator.handle_bar(input_bar);
2429
2430        let handler_guard = handler.lock().unwrap();
2431        assert_eq!(handler_guard.len(), 0); // No bar created yet
2432    }
2433
2434    #[rstest]
2435    fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
2436        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2437        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2438        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2439        let handler = Arc::new(Mutex::new(Vec::new()));
2440        let handler_clone = Arc::clone(&handler);
2441
2442        let mut aggregator = RenkoBarAggregator::new(
2443            bar_type,
2444            instrument.price_precision(),
2445            instrument.size_precision(),
2446            instrument.price_increment(),
2447            move |bar: Bar| {
2448                let mut handler_guard = handler_clone.lock().unwrap();
2449                handler_guard.push(bar);
2450            },
2451        );
2452
2453        // First bar to establish baseline
2454        let bar1 = Bar::new(
2455            BarType::new(
2456                instrument.id(),
2457                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2458                AggregationSource::Internal,
2459            ),
2460            Price::from("1.00000"),
2461            Price::from("1.00005"),
2462            Price::from("0.99995"),
2463            Price::from("1.00000"),
2464            Quantity::from(100),
2465            UnixNanos::default(),
2466            UnixNanos::default(),
2467        );
2468
2469        // Second bar with price movement exceeding brick size (10 pips)
2470        let bar2 = Bar::new(
2471            BarType::new(
2472                instrument.id(),
2473                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2474                AggregationSource::Internal,
2475            ),
2476            Price::from("1.00000"),
2477            Price::from("1.00015"),
2478            Price::from("0.99995"),
2479            Price::from("1.00010"), // 10 pip move up (exactly 1 brick)
2480            Quantity::from(50),
2481            UnixNanos::from(60_000_000_000),
2482            UnixNanos::from(60_000_000_000),
2483        );
2484
2485        aggregator.handle_bar(bar1);
2486        aggregator.handle_bar(bar2);
2487
2488        let handler_guard = handler.lock().unwrap();
2489        assert_eq!(handler_guard.len(), 1);
2490
2491        let bar = handler_guard.first().unwrap();
2492        assert_eq!(bar.open, Price::from("1.00000"));
2493        assert_eq!(bar.high, Price::from("1.00010"));
2494        assert_eq!(bar.low, Price::from("1.00000"));
2495        assert_eq!(bar.close, Price::from("1.00010"));
2496        assert_eq!(bar.volume, Quantity::from(150));
2497    }
2498
2499    #[rstest]
2500    fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
2501        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2502        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2503        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2504        let handler = Arc::new(Mutex::new(Vec::new()));
2505        let handler_clone = Arc::clone(&handler);
2506
2507        let mut aggregator = RenkoBarAggregator::new(
2508            bar_type,
2509            instrument.price_precision(),
2510            instrument.size_precision(),
2511            instrument.price_increment(),
2512            move |bar: Bar| {
2513                let mut handler_guard = handler_clone.lock().unwrap();
2514                handler_guard.push(bar);
2515            },
2516        );
2517
2518        // First bar to establish baseline
2519        let bar1 = Bar::new(
2520            BarType::new(
2521                instrument.id(),
2522                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2523                AggregationSource::Internal,
2524            ),
2525            Price::from("1.00000"),
2526            Price::from("1.00005"),
2527            Price::from("0.99995"),
2528            Price::from("1.00000"),
2529            Quantity::from(100),
2530            UnixNanos::default(),
2531            UnixNanos::default(),
2532        );
2533
2534        // Second bar with large price movement (30 pips = 3 bricks)
2535        let bar2 = Bar::new(
2536            BarType::new(
2537                instrument.id(),
2538                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2539                AggregationSource::Internal,
2540            ),
2541            Price::from("1.00000"),
2542            Price::from("1.00035"),
2543            Price::from("0.99995"),
2544            Price::from("1.00030"), // 30 pip move up (exactly 3 bricks)
2545            Quantity::from(50),
2546            UnixNanos::from(60_000_000_000),
2547            UnixNanos::from(60_000_000_000),
2548        );
2549
2550        aggregator.handle_bar(bar1);
2551        aggregator.handle_bar(bar2);
2552
2553        let handler_guard = handler.lock().unwrap();
2554        assert_eq!(handler_guard.len(), 3);
2555
2556        let bar1 = &handler_guard[0];
2557        assert_eq!(bar1.open, Price::from("1.00000"));
2558        assert_eq!(bar1.close, Price::from("1.00010"));
2559
2560        let bar2 = &handler_guard[1];
2561        assert_eq!(bar2.open, Price::from("1.00010"));
2562        assert_eq!(bar2.close, Price::from("1.00020"));
2563
2564        let bar3 = &handler_guard[2];
2565        assert_eq!(bar3.open, Price::from("1.00020"));
2566        assert_eq!(bar3.close, Price::from("1.00030"));
2567    }
2568
2569    #[rstest]
2570    fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
2571        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2572        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2573        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2574        let handler = Arc::new(Mutex::new(Vec::new()));
2575        let handler_clone = Arc::clone(&handler);
2576
2577        let mut aggregator = RenkoBarAggregator::new(
2578            bar_type,
2579            instrument.price_precision(),
2580            instrument.size_precision(),
2581            instrument.price_increment(),
2582            move |bar: Bar| {
2583                let mut handler_guard = handler_clone.lock().unwrap();
2584                handler_guard.push(bar);
2585            },
2586        );
2587
2588        // First bar to establish baseline
2589        let bar1 = Bar::new(
2590            BarType::new(
2591                instrument.id(),
2592                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2593                AggregationSource::Internal,
2594            ),
2595            Price::from("1.00020"),
2596            Price::from("1.00025"),
2597            Price::from("1.00015"),
2598            Price::from("1.00020"),
2599            Quantity::from(100),
2600            UnixNanos::default(),
2601            UnixNanos::default(),
2602        );
2603
2604        // Second bar with downward price movement (10 pips down)
2605        let bar2 = Bar::new(
2606            BarType::new(
2607                instrument.id(),
2608                BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2609                AggregationSource::Internal,
2610            ),
2611            Price::from("1.00020"),
2612            Price::from("1.00025"),
2613            Price::from("1.00005"),
2614            Price::from("1.00010"), // 10 pip move down (exactly 1 brick)
2615            Quantity::from(50),
2616            UnixNanos::from(60_000_000_000),
2617            UnixNanos::from(60_000_000_000),
2618        );
2619
2620        aggregator.handle_bar(bar1);
2621        aggregator.handle_bar(bar2);
2622
2623        let handler_guard = handler.lock().unwrap();
2624        assert_eq!(handler_guard.len(), 1);
2625
2626        let bar = handler_guard.first().unwrap();
2627        assert_eq!(bar.open, Price::from("1.00020"));
2628        assert_eq!(bar.high, Price::from("1.00020"));
2629        assert_eq!(bar.low, Price::from("1.00010"));
2630        assert_eq!(bar.close, Price::from("1.00010"));
2631        assert_eq!(bar.volume, Quantity::from(150));
2632    }
2633
2634    #[rstest]
2635    fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
2636        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2637
2638        // Test different brick sizes
2639        let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); // 5 pip brick size
2640        let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
2641        let handler = Arc::new(Mutex::new(Vec::new()));
2642        let handler_clone = Arc::clone(&handler);
2643
2644        let aggregator_5 = RenkoBarAggregator::new(
2645            bar_type_5,
2646            instrument.price_precision(),
2647            instrument.size_precision(),
2648            instrument.price_increment(),
2649            move |_bar: Bar| {
2650                let mut handler_guard = handler_clone.lock().unwrap();
2651                handler_guard.push(_bar);
2652            },
2653        );
2654
2655        // 5 pips * price_increment.raw (depends on precision mode)
2656        let expected_brick_size_5 = 5 * instrument.price_increment().raw;
2657        assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
2658
2659        let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); // 20 pip brick size
2660        let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
2661        let handler2 = Arc::new(Mutex::new(Vec::new()));
2662        let handler2_clone = Arc::clone(&handler2);
2663
2664        let aggregator_20 = RenkoBarAggregator::new(
2665            bar_type_20,
2666            instrument.price_precision(),
2667            instrument.size_precision(),
2668            instrument.price_increment(),
2669            move |_bar: Bar| {
2670                let mut handler_guard = handler2_clone.lock().unwrap();
2671                handler_guard.push(_bar);
2672            },
2673        );
2674
2675        // 20 pips * price_increment.raw (depends on precision mode)
2676        let expected_brick_size_20 = 20 * instrument.price_increment().raw;
2677        assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
2678    }
2679
2680    #[rstest]
2681    fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
2682        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2683        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2684        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2685        let handler = Arc::new(Mutex::new(Vec::new()));
2686        let handler_clone = Arc::clone(&handler);
2687
2688        let mut aggregator = RenkoBarAggregator::new(
2689            bar_type,
2690            instrument.price_precision(),
2691            instrument.size_precision(),
2692            instrument.price_increment(),
2693            move |bar: Bar| {
2694                let mut handler_guard = handler_clone.lock().unwrap();
2695                handler_guard.push(bar);
2696            },
2697        );
2698
2699        // Sequential updates creating multiple bars
2700        aggregator.update(
2701            Price::from("1.00000"),
2702            Quantity::from(1),
2703            UnixNanos::from(1000),
2704        );
2705        aggregator.update(
2706            Price::from("1.00010"),
2707            Quantity::from(1),
2708            UnixNanos::from(2000),
2709        ); // First brick
2710        aggregator.update(
2711            Price::from("1.00020"),
2712            Quantity::from(1),
2713            UnixNanos::from(3000),
2714        ); // Second brick
2715        aggregator.update(
2716            Price::from("1.00025"),
2717            Quantity::from(1),
2718            UnixNanos::from(4000),
2719        ); // Partial third brick
2720        aggregator.update(
2721            Price::from("1.00030"),
2722            Quantity::from(1),
2723            UnixNanos::from(5000),
2724        ); // Complete third brick
2725
2726        let handler_guard = handler.lock().unwrap();
2727        assert_eq!(handler_guard.len(), 3);
2728
2729        let bar1 = &handler_guard[0];
2730        assert_eq!(bar1.open, Price::from("1.00000"));
2731        assert_eq!(bar1.close, Price::from("1.00010"));
2732
2733        let bar2 = &handler_guard[1];
2734        assert_eq!(bar2.open, Price::from("1.00010"));
2735        assert_eq!(bar2.close, Price::from("1.00020"));
2736
2737        let bar3 = &handler_guard[2];
2738        assert_eq!(bar3.open, Price::from("1.00020"));
2739        assert_eq!(bar3.close, Price::from("1.00030"));
2740    }
2741
2742    #[rstest]
2743    fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
2744        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2745        let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); // 10 pip brick size
2746        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2747        let handler = Arc::new(Mutex::new(Vec::new()));
2748        let handler_clone = Arc::clone(&handler);
2749
2750        let mut aggregator = RenkoBarAggregator::new(
2751            bar_type,
2752            instrument.price_precision(),
2753            instrument.size_precision(),
2754            instrument.price_increment(),
2755            move |bar: Bar| {
2756                let mut handler_guard = handler_clone.lock().unwrap();
2757                handler_guard.push(bar);
2758            },
2759        );
2760
2761        // Mixed direction movement: up then down
2762        aggregator.update(
2763            Price::from("1.00000"),
2764            Quantity::from(1),
2765            UnixNanos::from(1000),
2766        );
2767        aggregator.update(
2768            Price::from("1.00010"),
2769            Quantity::from(1),
2770            UnixNanos::from(2000),
2771        ); // Up brick
2772        aggregator.update(
2773            Price::from("0.99990"),
2774            Quantity::from(1),
2775            UnixNanos::from(3000),
2776        ); // Down 2 bricks (20 pips)
2777
2778        let handler_guard = handler.lock().unwrap();
2779        assert_eq!(handler_guard.len(), 3);
2780
2781        let bar1 = &handler_guard[0]; // Up brick
2782        assert_eq!(bar1.open, Price::from("1.00000"));
2783        assert_eq!(bar1.high, Price::from("1.00010"));
2784        assert_eq!(bar1.low, Price::from("1.00000"));
2785        assert_eq!(bar1.close, Price::from("1.00010"));
2786
2787        let bar2 = &handler_guard[1]; // First down brick
2788        assert_eq!(bar2.open, Price::from("1.00010"));
2789        assert_eq!(bar2.high, Price::from("1.00010"));
2790        assert_eq!(bar2.low, Price::from("1.00000"));
2791        assert_eq!(bar2.close, Price::from("1.00000"));
2792
2793        let bar3 = &handler_guard[2]; // Second down brick
2794        assert_eq!(bar3.open, Price::from("1.00000"));
2795        assert_eq!(bar3.high, Price::from("1.00000"));
2796        assert_eq!(bar3.low, Price::from("0.99990"));
2797        assert_eq!(bar3.close, Price::from("0.99990"));
2798    }
2799}