nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17
18use std::{cell::RefCell, ops::Add, rc::Rc};
19
20use chrono::TimeDelta;
21use nautilus_common::{
22    clock::Clock,
23    timer::{TimeEvent, TimeEventCallback},
24};
25use nautilus_core::{
26    UnixNanos,
27    correctness::{self, FAILED},
28    datetime::{add_n_months_nanos, subtract_n_months_nanos},
29};
30use nautilus_model::{
31    data::{
32        QuoteTick, TradeTick,
33        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
34    },
35    enums::{AggregationSource, BarAggregation, BarIntervalType},
36    types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
37};
38
39pub trait BarAggregator {
40    /// The [`BarType`] to be aggregated.
41    fn bar_type(&self) -> BarType;
42    /// If the aggregator is running and will receive data from the message bus.
43    fn is_running(&self) -> bool;
44    fn set_await_partial(&mut self, value: bool);
45    fn set_is_running(&mut self, value: bool);
46    /// Updates the aggregator  with the given price and size.
47    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
48    /// Updates the aggregator with the given quote.
49    fn handle_quote(&mut self, quote: QuoteTick) {
50        let spec = self.bar_type().spec();
51        if !self.await_partial() {
52            self.update(
53                quote.extract_price(spec.price_type),
54                quote.extract_size(spec.price_type),
55                quote.ts_event,
56            );
57        }
58    }
59    /// Updates the aggregator with the given trade.
60    fn handle_trade(&mut self, trade: TradeTick) {
61        if !self.await_partial() {
62            self.update(trade.price, trade.size, trade.ts_event);
63        }
64    }
65    /// Updates the aggregator with the given bar.
66    fn handle_bar(&mut self, bar: Bar) {
67        if !self.await_partial() {
68            self.update_bar(bar, bar.volume, bar.ts_init);
69        }
70    }
71    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
72    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
73    fn stop_batch_update(&mut self);
74    fn await_partial(&self) -> bool;
75    fn set_partial(&mut self, partial_bar: Bar);
76}
77
78/// Provides a generic bar builder for aggregation.
79pub struct BarBuilder {
80    bar_type: BarType,
81    price_precision: u8,
82    size_precision: u8,
83    initialized: bool,
84    ts_last: UnixNanos,
85    count: usize,
86    partial_set: bool,
87    last_close: Option<Price>,
88    open: Option<Price>,
89    high: Option<Price>,
90    low: Option<Price>,
91    close: Option<Price>,
92    volume: Quantity,
93}
94
95impl BarBuilder {
96    /// Creates a new [`BarBuilder`] instance.
97    ///
98    /// # Panics
99    ///
100    /// This function panics:
101    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
102    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
103    #[must_use]
104    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
105        correctness::check_equal(
106            bar_type.aggregation_source(),
107            AggregationSource::Internal,
108            "bar_type.aggregation_source",
109            "AggregationSource::Internal",
110        )
111        .expect(FAILED);
112
113        Self {
114            bar_type,
115            price_precision,
116            size_precision,
117            initialized: false,
118            ts_last: UnixNanos::default(),
119            count: 0,
120            partial_set: false,
121            last_close: None,
122            open: None,
123            high: None,
124            low: None,
125            close: None,
126            volume: Quantity::zero(size_precision),
127        }
128    }
129
130    /// Set the initial values for a partially completed bar.
131    pub fn set_partial(&mut self, partial_bar: Bar) {
132        if self.partial_set {
133            return; // Already updated
134        }
135
136        self.open = Some(partial_bar.open);
137
138        if self.high.is_none() || partial_bar.high > self.high.unwrap() {
139            self.high = Some(partial_bar.high);
140        }
141
142        if self.low.is_none() || partial_bar.low < self.low.unwrap() {
143            self.low = Some(partial_bar.low);
144        }
145
146        if self.close.is_none() {
147            self.close = Some(partial_bar.close);
148        }
149
150        self.volume = partial_bar.volume;
151
152        if self.ts_last == 0 {
153            self.ts_last = partial_bar.ts_init;
154        }
155
156        self.partial_set = true;
157        self.initialized = true;
158    }
159
160    /// Update the bar builder.
161    pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
162        if ts_event < self.ts_last {
163            return; // Not applicable
164        }
165
166        if self.open.is_none() {
167            self.open = Some(price);
168            self.high = Some(price);
169            self.low = Some(price);
170            self.initialized = true;
171        } else {
172            if price > self.high.unwrap() {
173                self.high = Some(price);
174            }
175            if price < self.low.unwrap() {
176                self.low = Some(price);
177            }
178        }
179
180        self.close = Some(price);
181        self.volume = self.volume.add(size);
182        self.count += 1;
183        self.ts_last = ts_event;
184    }
185
186    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
187        if ts_init < self.ts_last {
188            return; // Not applicable
189        }
190
191        if self.open.is_none() {
192            self.open = Some(bar.open);
193            self.high = Some(bar.high);
194            self.low = Some(bar.low);
195            self.initialized = true;
196        } else {
197            if bar.high > self.high.unwrap() {
198                self.high = Some(bar.high);
199            }
200            if bar.low < self.low.unwrap() {
201                self.low = Some(bar.low);
202            }
203        }
204
205        self.close = Some(bar.close);
206        self.volume = self.volume.add(volume);
207        self.count += 1;
208        self.ts_last = ts_init;
209    }
210
211    /// Reset the bar builder.
212    ///
213    /// All stateful fields are reset to their initial value.
214    pub fn reset(&mut self) {
215        self.open = None;
216        self.high = None;
217        self.low = None;
218        self.volume = Quantity::zero(self.size_precision);
219        self.count = 0;
220    }
221
222    /// Return the aggregated bar and reset.
223    pub fn build_now(&mut self) -> Bar {
224        self.build(self.ts_last, self.ts_last)
225    }
226
227    /// Return the aggregated bar with the given closing timestamp, and reset.
228    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
229        if self.open.is_none() {
230            self.open = self.last_close;
231            self.high = self.last_close;
232            self.low = self.last_close;
233            self.close = self.last_close;
234        }
235
236        if let (Some(close), Some(low)) = (self.close, self.low) {
237            if close < low {
238                self.low = Some(close);
239            }
240        }
241
242        if let (Some(close), Some(high)) = (self.close, self.high) {
243            if close > high {
244                self.high = Some(close);
245            }
246        }
247
248        // SAFETY: The open was checked, so we can assume all prices are Some
249        let bar = Bar::new(
250            self.bar_type,
251            self.open.unwrap(),
252            self.high.unwrap(),
253            self.low.unwrap(),
254            self.close.unwrap(),
255            self.volume,
256            ts_event,
257            ts_init,
258        );
259
260        self.last_close = self.close;
261        self.reset();
262        bar
263    }
264}
265
266/// Provides a means of aggregating specified bar types and sending to a registered handler.
267pub struct BarAggregatorCore<H>
268where
269    H: FnMut(Bar),
270{
271    bar_type: BarType,
272    builder: BarBuilder,
273    handler: H,
274    handler_backup: Option<H>,
275    batch_handler: Option<Box<dyn FnMut(Bar)>>,
276    await_partial: bool,
277    is_running: bool,
278    batch_mode: bool,
279}
280
281impl<H> BarAggregatorCore<H>
282where
283    H: FnMut(Bar),
284{
285    /// Creates a new [`BarAggregatorCore`] instance.
286    ///
287    /// # Panics
288    ///
289    /// This function panics:
290    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
291    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
292    pub fn new(
293        bar_type: BarType,
294        price_precision: u8,
295        size_precision: u8,
296        handler: H,
297        await_partial: bool,
298    ) -> Self {
299        Self {
300            bar_type,
301            builder: BarBuilder::new(bar_type, price_precision, size_precision),
302            handler,
303            handler_backup: None,
304            batch_handler: None,
305            await_partial,
306            is_running: false,
307            batch_mode: false,
308        }
309    }
310
311    pub const fn set_await_partial(&mut self, value: bool) {
312        self.await_partial = value;
313    }
314
315    pub const fn set_is_running(&mut self, value: bool) {
316        self.is_running = value;
317    }
318
319    pub const fn await_partial(&self) -> bool {
320        self.await_partial
321    }
322
323    /// Set the initial values for a partially completed bar.
324    pub fn set_partial(&mut self, partial_bar: Bar) {
325        self.builder.set_partial(partial_bar);
326    }
327
328    fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
329        self.builder.update(price, size, ts_event);
330    }
331
332    fn build_now_and_send(&mut self) {
333        let bar = self.builder.build_now();
334        (self.handler)(bar);
335    }
336
337    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
338        let bar = self.builder.build(ts_event, ts_init);
339
340        if self.batch_mode {
341            if let Some(handler) = &mut self.batch_handler {
342                handler(bar);
343            }
344        } else {
345            (self.handler)(bar);
346        }
347    }
348
349    pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
350        self.batch_mode = true;
351        self.batch_handler = Some(handler);
352    }
353
354    pub fn stop_batch_update(&mut self) {
355        self.batch_mode = false;
356
357        if let Some(handler) = self.handler_backup.take() {
358            self.handler = handler;
359        }
360    }
361}
362
363/// Provides a means of building tick bars aggregated from quote and trades.
364///
365/// When received tick count reaches the step threshold of the bar
366/// specification, then a bar is created and sent to the handler.
367pub struct TickBarAggregator<H>
368where
369    H: FnMut(Bar),
370{
371    core: BarAggregatorCore<H>,
372    cum_value: f64,
373}
374
375impl<H> TickBarAggregator<H>
376where
377    H: FnMut(Bar),
378{
379    /// Creates a new [`TickBarAggregator`] instance.
380    ///
381    /// # Panics
382    ///
383    /// This function panics:
384    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
385    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
386    pub fn new(
387        bar_type: BarType,
388        price_precision: u8,
389        size_precision: u8,
390        handler: H,
391        await_partial: bool,
392    ) -> Self {
393        Self {
394            core: BarAggregatorCore::new(
395                bar_type,
396                price_precision,
397                size_precision,
398                handler,
399                await_partial,
400            ),
401            cum_value: 0.0,
402        }
403    }
404}
405
406impl<H> BarAggregator for TickBarAggregator<H>
407where
408    H: FnMut(Bar),
409{
410    fn bar_type(&self) -> BarType {
411        self.core.bar_type
412    }
413
414    fn is_running(&self) -> bool {
415        self.core.is_running
416    }
417
418    fn set_await_partial(&mut self, value: bool) {
419        self.core.set_await_partial(value);
420    }
421
422    fn set_is_running(&mut self, value: bool) {
423        self.core.set_is_running(value);
424    }
425
426    fn await_partial(&self) -> bool {
427        self.core.await_partial()
428    }
429
430    /// Apply the given update to the aggregator.
431    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
432        self.core.apply_update(price, size, ts_event);
433        let spec = self.core.bar_type.spec();
434
435        if self.core.builder.count >= spec.step.get() {
436            self.core.build_now_and_send();
437        }
438    }
439
440    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
441        let mut volume_update = volume;
442        let average_price = Price::new(
443            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
444            self.core.builder.price_precision,
445        );
446
447        while volume_update.as_f64() > 0.0 {
448            let value_update = average_price.as_f64() * volume_update.as_f64();
449            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
450                self.cum_value += value_update;
451                self.core.builder.update_bar(bar, volume_update, ts_init);
452                break;
453            }
454
455            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
456            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
457            self.core.builder.update_bar(
458                bar,
459                Quantity::new(volume_diff, volume_update.precision),
460                ts_init,
461            );
462
463            self.core.build_now_and_send();
464            self.cum_value = 0.0;
465            volume_update = Quantity::new(
466                volume_update.as_f64() - volume_diff,
467                volume_update.precision,
468            );
469        }
470    }
471
472    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
473        self.core.start_batch_update(handler);
474    }
475
476    fn stop_batch_update(&mut self) {
477        self.core.stop_batch_update();
478    }
479
480    fn set_partial(&mut self, partial_bar: Bar) {
481        self.core.set_partial(partial_bar);
482    }
483}
484
485/// Provides a means of building volume bars aggregated from quote and trades.
486pub struct VolumeBarAggregator<H>
487where
488    H: FnMut(Bar),
489{
490    core: BarAggregatorCore<H>,
491}
492
493impl<H> VolumeBarAggregator<H>
494where
495    H: FnMut(Bar),
496{
497    /// Creates a new [`VolumeBarAggregator`] instance.
498    ///
499    /// # Panics
500    ///
501    /// This function panics:
502    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
503    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
504    pub fn new(
505        bar_type: BarType,
506        price_precision: u8,
507        size_precision: u8,
508        handler: H,
509        await_partial: bool,
510    ) -> Self {
511        Self {
512            core: BarAggregatorCore::new(
513                bar_type.standard(),
514                price_precision,
515                size_precision,
516                handler,
517                await_partial,
518            ),
519        }
520    }
521}
522
523impl<H> BarAggregator for VolumeBarAggregator<H>
524where
525    H: FnMut(Bar),
526{
527    fn bar_type(&self) -> BarType {
528        self.core.bar_type
529    }
530
531    fn is_running(&self) -> bool {
532        self.core.is_running
533    }
534
535    fn set_await_partial(&mut self, value: bool) {
536        self.core.set_await_partial(value);
537    }
538
539    fn set_is_running(&mut self, value: bool) {
540        self.core.set_is_running(value);
541    }
542
543    fn await_partial(&self) -> bool {
544        self.core.await_partial()
545    }
546
547    /// Apply the given update to the aggregator.
548    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
549        let mut raw_size_update = size.raw;
550        let spec = self.core.bar_type.spec();
551        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
552
553        while raw_size_update > 0 {
554            if self.core.builder.volume.raw + raw_size_update < raw_step {
555                self.core.apply_update(
556                    price,
557                    Quantity::from_raw(raw_size_update, size.precision),
558                    ts_event,
559                );
560                break;
561            }
562
563            let raw_size_diff = raw_step - self.core.builder.volume.raw;
564            self.core.apply_update(
565                price,
566                Quantity::from_raw(raw_size_diff, size.precision),
567                ts_event,
568            );
569
570            self.core.build_now_and_send();
571            raw_size_update -= raw_size_diff;
572        }
573    }
574
575    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
576        let mut raw_volume_update = volume.raw;
577        let spec = self.core.bar_type.spec();
578        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
579
580        while raw_volume_update > 0 {
581            if self.core.builder.volume.raw + raw_volume_update < raw_step {
582                self.core.builder.update_bar(
583                    bar,
584                    Quantity::from_raw(raw_volume_update, volume.precision),
585                    ts_init,
586                );
587                break;
588            }
589
590            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
591            self.core.builder.update_bar(
592                bar,
593                Quantity::from_raw(raw_volume_diff, volume.precision),
594                ts_init,
595            );
596
597            self.core.build_now_and_send();
598            raw_volume_update -= raw_volume_diff;
599        }
600    }
601
602    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
603        self.core.start_batch_update(handler);
604    }
605
606    fn stop_batch_update(&mut self) {
607        self.core.stop_batch_update();
608    }
609
610    fn set_partial(&mut self, partial_bar: Bar) {
611        self.core.set_partial(partial_bar);
612    }
613}
614
615/// Provides a means of building value bars aggregated from quote and trades.
616///
617/// When received value reaches the step threshold of the bar
618/// specification, then a bar is created and sent to the handler.
619pub struct ValueBarAggregator<H>
620where
621    H: FnMut(Bar),
622{
623    core: BarAggregatorCore<H>,
624    cum_value: f64,
625}
626
627impl<H> ValueBarAggregator<H>
628where
629    H: FnMut(Bar),
630{
631    /// Creates a new [`ValueBarAggregator`] instance.
632    ///
633    /// # Panics
634    ///
635    /// This function panics:
636    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
637    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
638    pub fn new(
639        bar_type: BarType,
640        price_precision: u8,
641        size_precision: u8,
642        handler: H,
643        await_partial: bool,
644    ) -> Self {
645        Self {
646            core: BarAggregatorCore::new(
647                bar_type.standard(),
648                price_precision,
649                size_precision,
650                handler,
651                await_partial,
652            ),
653            cum_value: 0.0,
654        }
655    }
656
657    #[must_use]
658    /// Returns the cumulative value for the aggregator.
659    pub const fn get_cumulative_value(&self) -> f64 {
660        self.cum_value
661    }
662}
663
664impl<H> BarAggregator for ValueBarAggregator<H>
665where
666    H: FnMut(Bar),
667{
668    fn bar_type(&self) -> BarType {
669        self.core.bar_type
670    }
671
672    fn is_running(&self) -> bool {
673        self.core.is_running
674    }
675
676    fn set_await_partial(&mut self, value: bool) {
677        self.core.set_await_partial(value);
678    }
679
680    fn set_is_running(&mut self, value: bool) {
681        self.core.set_is_running(value);
682    }
683
684    fn await_partial(&self) -> bool {
685        self.core.await_partial()
686    }
687
688    /// Apply the given update to the aggregator.
689    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
690        let mut size_update = size.as_f64();
691        let spec = self.core.bar_type.spec();
692
693        while size_update > 0.0 {
694            let value_update = price.as_f64() * size_update;
695            if self.cum_value + value_update < spec.step.get() as f64 {
696                self.cum_value += value_update;
697                self.core
698                    .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
699                break;
700            }
701
702            let value_diff = spec.step.get() as f64 - self.cum_value;
703            let size_diff = size_update * (value_diff / value_update);
704            self.core
705                .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
706
707            self.core.build_now_and_send();
708            self.cum_value = 0.0;
709            size_update -= size_diff;
710        }
711    }
712
713    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
714        let mut volume_update = volume;
715        let average_price = Price::new(
716            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
717            self.core.builder.price_precision,
718        );
719
720        while volume_update.as_f64() > 0.0 {
721            let value_update = average_price.as_f64() * volume_update.as_f64();
722            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
723                self.cum_value += value_update;
724                self.core.builder.update_bar(bar, volume_update, ts_init);
725                break;
726            }
727
728            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
729            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
730            self.core.builder.update_bar(
731                bar,
732                Quantity::new(volume_diff, volume_update.precision),
733                ts_init,
734            );
735
736            self.core.build_now_and_send();
737            self.cum_value = 0.0;
738            volume_update = Quantity::new(
739                volume_update.as_f64() - volume_diff,
740                volume_update.precision,
741            );
742        }
743    }
744
745    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
746        self.core.start_batch_update(handler);
747    }
748
749    fn stop_batch_update(&mut self) {
750        self.core.stop_batch_update();
751    }
752
753    fn set_partial(&mut self, partial_bar: Bar) {
754        self.core.set_partial(partial_bar);
755    }
756}
757
758/// Provides a means of building time bars aggregated from quote and trades.
759///
760/// At each aggregation time interval, a bar is created and sent to the handler.
761pub struct TimeBarAggregator<H>
762where
763    H: FnMut(Bar),
764{
765    core: BarAggregatorCore<H>,
766    clock: Rc<RefCell<dyn Clock>>,
767    build_with_no_updates: bool,
768    timestamp_on_close: bool,
769    is_left_open: bool,
770    build_on_next_tick: bool,
771    stored_open_ns: UnixNanos,
772    stored_close_ns: UnixNanos,
773    timer_name: String,
774    interval_ns: UnixNanos,
775    next_close_ns: UnixNanos,
776    composite_bar_build_delay: i64,
777    add_delay: bool,
778    batch_open_ns: UnixNanos,
779    batch_next_close_ns: UnixNanos,
780    time_bars_origin: Option<TimeDelta>,
781    skip_first_non_full_bar: bool,
782}
783
784#[derive(Clone)]
785pub struct NewBarCallback<H: FnMut(Bar)> {
786    aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
787}
788
789impl<H: FnMut(Bar)> NewBarCallback<H> {
790    pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
791        Self { aggregator }
792    }
793}
794
795impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
796    fn from(value: NewBarCallback<H>) -> Self {
797        Self::Rust(Rc::new(move |event: TimeEvent| {
798            value.aggregator.borrow_mut().build_bar(event);
799        }))
800    }
801}
802
803impl<H> TimeBarAggregator<H>
804where
805    H: FnMut(Bar) + 'static,
806{
807    /// Creates a new [`TimeBarAggregator`] instance.
808    ///
809    /// # Panics
810    ///
811    /// This function panics:
812    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
813    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
814    #[allow(clippy::too_many_arguments)]
815    pub fn new(
816        bar_type: BarType,
817        price_precision: u8,
818        size_precision: u8,
819        clock: Rc<RefCell<dyn Clock>>,
820        handler: H,
821        await_partial: bool,
822        build_with_no_updates: bool,
823        timestamp_on_close: bool,
824        interval_type: BarIntervalType,
825        time_bars_origin: Option<TimeDelta>,
826        composite_bar_build_delay: i64,
827        skip_first_non_full_bar: bool,
828    ) -> Self {
829        let is_left_open = match interval_type {
830            BarIntervalType::LeftOpen => true,
831            BarIntervalType::RightOpen => false,
832        };
833
834        let add_delay = bar_type.is_composite()
835            && bar_type.composite().aggregation_source() == AggregationSource::Internal;
836
837        let core = BarAggregatorCore::new(
838            bar_type.standard(),
839            price_precision,
840            size_precision,
841            handler,
842            await_partial,
843        );
844
845        Self {
846            core,
847            clock,
848            build_with_no_updates,
849            timestamp_on_close,
850            is_left_open,
851            build_on_next_tick: false,
852            stored_open_ns: UnixNanos::default(),
853            stored_close_ns: UnixNanos::default(),
854            timer_name: bar_type.to_string(),
855            interval_ns: get_bar_interval_ns(&bar_type),
856            next_close_ns: UnixNanos::default(),
857            composite_bar_build_delay,
858            add_delay,
859            batch_open_ns: UnixNanos::default(),
860            batch_next_close_ns: UnixNanos::default(),
861            time_bars_origin,
862            skip_first_non_full_bar,
863        }
864    }
865
866    /// Starts the time bar aggregator.
867    pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
868        let now = self.clock.borrow().utc_now();
869        let mut start_time = get_time_bar_start(now, &self.bar_type(), self.time_bars_origin);
870
871        if start_time == now {
872            self.skip_first_non_full_bar = false;
873        }
874
875        if self.add_delay {
876            start_time += TimeDelta::microseconds(self.composite_bar_build_delay);
877        }
878
879        let spec = &self.bar_type().spec();
880        let start_time_ns = UnixNanos::from(start_time);
881
882        if spec.aggregation == BarAggregation::Month {
883            let step = spec.step.get() as u32;
884            let alert_time_ns = add_n_months_nanos(start_time_ns, step);
885
886            self.clock
887                .borrow_mut()
888                .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
889                .expect(FAILED);
890        } else {
891            self.clock
892                .borrow_mut()
893                .set_timer_ns(
894                    &self.timer_name,
895                    self.interval_ns.as_u64(),
896                    start_time_ns,
897                    None,
898                    Some(callback.into()),
899                    None,
900                )
901                .expect(FAILED);
902        }
903
904        log::debug!("Started timer {}", self.timer_name);
905        Ok(())
906    }
907
908    /// Stops the time bar aggregator.
909    pub fn stop(&mut self) {
910        self.clock.borrow_mut().cancel_timer(&self.timer_name);
911    }
912
913    pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
914        let spec = self.bar_type().spec();
915        self.core.batch_mode = true;
916
917        let time = time_ns.to_datetime_utc();
918        let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin);
919        self.batch_open_ns = UnixNanos::from(start_time);
920
921        if spec.aggregation == BarAggregation::Month {
922            let step = spec.step.get() as u32;
923
924            if self.batch_open_ns == time_ns {
925                self.batch_open_ns = subtract_n_months_nanos(self.batch_open_ns, step);
926            }
927
928            self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step);
929        } else {
930            if self.batch_open_ns == time_ns {
931                self.batch_open_ns -= self.interval_ns;
932            }
933
934            self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
935        }
936    }
937
938    const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
939        if self.is_left_open {
940            if self.timestamp_on_close {
941                close_ns
942            } else {
943                open_ns
944            }
945        } else {
946            open_ns
947        }
948    }
949
950    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
951        if self.skip_first_non_full_bar {
952            self.core.builder.reset();
953            self.skip_first_non_full_bar = false;
954        } else {
955            self.core.build_and_send(ts_event, ts_init);
956        }
957    }
958
959    fn batch_pre_update(&mut self, time_ns: UnixNanos) {
960        if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
961            let ts_init = self.batch_next_close_ns;
962            let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
963            self.build_and_send(ts_event, ts_init);
964        }
965    }
966
967    fn batch_post_update(&mut self, time_ns: UnixNanos) {
968        let step = self.bar_type().spec().step.get() as u32;
969
970        // If not in batch mode and time matches next close, reset batch close
971        if !self.core.batch_mode
972            && time_ns == self.batch_next_close_ns
973            && time_ns > self.stored_open_ns
974        {
975            self.batch_next_close_ns = UnixNanos::default();
976            return;
977        }
978
979        if time_ns > self.batch_next_close_ns {
980            // Ensure batch times are coherent with last builder update
981            if self.bar_type().spec().aggregation == BarAggregation::Month {
982                while self.batch_next_close_ns < time_ns {
983                    self.batch_next_close_ns = add_n_months_nanos(self.batch_next_close_ns, step);
984                }
985
986                self.batch_open_ns = subtract_n_months_nanos(self.batch_next_close_ns, step);
987            } else {
988                while self.batch_next_close_ns < time_ns {
989                    self.batch_next_close_ns += self.interval_ns;
990                }
991
992                self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
993            }
994        }
995
996        if time_ns == self.batch_next_close_ns {
997            let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
998            self.build_and_send(ts_event, time_ns);
999            self.batch_open_ns = self.batch_next_close_ns;
1000
1001            if self.bar_type().spec().aggregation == BarAggregation::Month {
1002                self.batch_next_close_ns = add_n_months_nanos(self.batch_next_close_ns, step);
1003            } else {
1004                self.batch_next_close_ns += self.interval_ns;
1005            }
1006        }
1007
1008        // Delay resetting batch_next_close_ns to allow creating a last historical bar when transitioning to regular bars
1009        if !self.core.batch_mode {
1010            self.batch_next_close_ns = UnixNanos::default();
1011        }
1012    }
1013
1014    fn build_bar(&mut self, event: TimeEvent) {
1015        if !self.core.builder.initialized {
1016            self.build_on_next_tick = true;
1017            self.stored_close_ns = self.next_close_ns;
1018            return;
1019        }
1020
1021        if !self.build_with_no_updates && self.core.builder.count == 0 {
1022            return;
1023        }
1024
1025        let ts_init = event.ts_event;
1026        let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1027        self.build_and_send(ts_event, ts_init);
1028
1029        self.stored_open_ns = ts_init;
1030
1031        if self.bar_type().spec().aggregation == BarAggregation::Month {
1032            let step = self.bar_type().spec().step.get() as u32;
1033            let next_alert_ns = add_n_months_nanos(ts_init, step);
1034
1035            self.clock
1036                .borrow_mut()
1037                .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1038                .expect(FAILED);
1039
1040            self.next_close_ns = next_alert_ns;
1041        } else {
1042            self.next_close_ns = self.clock.borrow().next_time_ns(&self.timer_name);
1043        }
1044    }
1045}
1046
1047impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1048where
1049    H: FnMut(Bar) + 'static,
1050{
1051    fn bar_type(&self) -> BarType {
1052        self.core.bar_type
1053    }
1054
1055    fn is_running(&self) -> bool {
1056        self.core.is_running
1057    }
1058
1059    fn set_await_partial(&mut self, value: bool) {
1060        self.core.set_await_partial(value);
1061    }
1062
1063    fn set_is_running(&mut self, value: bool) {
1064        self.core.set_is_running(value);
1065    }
1066
1067    fn await_partial(&self) -> bool {
1068        self.core.await_partial()
1069    }
1070
1071    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1072        if self.batch_next_close_ns != UnixNanos::default() {
1073            self.batch_pre_update(ts_event);
1074        }
1075
1076        self.core.apply_update(price, size, ts_event);
1077
1078        if self.build_on_next_tick {
1079            if ts_event <= self.stored_close_ns {
1080                let ts_init = ts_event;
1081                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1082                self.build_and_send(ts_event, ts_init);
1083            }
1084
1085            self.build_on_next_tick = false;
1086            self.stored_close_ns = UnixNanos::default();
1087        }
1088
1089        if self.batch_next_close_ns != UnixNanos::default() {
1090            self.batch_post_update(ts_event);
1091        }
1092    }
1093
1094    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1095        if self.batch_next_close_ns != UnixNanos::default() {
1096            self.batch_pre_update(ts_init);
1097        }
1098
1099        self.core.builder.update_bar(bar, volume, ts_init);
1100
1101        if self.build_on_next_tick {
1102            if ts_init <= self.stored_close_ns {
1103                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1104                self.build_and_send(ts_event, ts_init);
1105            }
1106
1107            // Reset flag and clear stored close
1108            self.build_on_next_tick = false;
1109            self.stored_close_ns = UnixNanos::default();
1110        }
1111
1112        if self.batch_next_close_ns != UnixNanos::default() {
1113            self.batch_post_update(ts_init);
1114        }
1115    }
1116
1117    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1118        self.core.start_batch_update(handler);
1119        self.start_batch_time(time_ns);
1120    }
1121
1122    fn stop_batch_update(&mut self) {
1123        self.core.stop_batch_update();
1124    }
1125
1126    fn set_partial(&mut self, partial_bar: Bar) {
1127        self.core.set_partial(partial_bar);
1128    }
1129}
1130
1131////////////////////////////////////////////////////////////////////////////////
1132// Tests
1133////////////////////////////////////////////////////////////////////////////////
1134#[cfg(test)]
1135mod tests {
1136    use std::sync::{Arc, Mutex};
1137
1138    use nautilus_common::clock::TestClock;
1139    use nautilus_core::UUID4;
1140    use nautilus_model::{
1141        data::{BarSpecification, BarType},
1142        enums::{AggregationSource, BarAggregation, PriceType},
1143        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1144        types::{Price, Quantity},
1145    };
1146    use rstest::rstest;
1147    use ustr::Ustr;
1148
1149    use super::*;
1150
1151    #[rstest]
1152    fn test_bar_builder_initialization(equity_aapl: Equity) {
1153        let instrument = InstrumentAny::Equity(equity_aapl);
1154        let bar_type = BarType::new(
1155            instrument.id(),
1156            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1157            AggregationSource::Internal,
1158        );
1159        let builder = BarBuilder::new(
1160            bar_type,
1161            instrument.price_precision(),
1162            instrument.size_precision(),
1163        );
1164
1165        assert!(!builder.initialized);
1166        assert_eq!(builder.ts_last, 0);
1167        assert_eq!(builder.count, 0);
1168    }
1169
1170    #[rstest]
1171    fn test_set_partial_update(equity_aapl: Equity) {
1172        let instrument = InstrumentAny::Equity(equity_aapl);
1173        let bar_type = BarType::new(
1174            instrument.id(),
1175            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1176            AggregationSource::Internal,
1177        );
1178        let mut builder = BarBuilder::new(
1179            bar_type,
1180            instrument.price_precision(),
1181            instrument.size_precision(),
1182        );
1183
1184        let partial_bar = Bar::new(
1185            bar_type,
1186            Price::from("101.00"),
1187            Price::from("102.00"),
1188            Price::from("100.00"),
1189            Price::from("101.00"),
1190            Quantity::from(100),
1191            UnixNanos::from(1),
1192            UnixNanos::from(2),
1193        );
1194
1195        builder.set_partial(partial_bar);
1196        let bar = builder.build_now();
1197
1198        assert_eq!(bar.open, partial_bar.open);
1199        assert_eq!(bar.high, partial_bar.high);
1200        assert_eq!(bar.low, partial_bar.low);
1201        assert_eq!(bar.close, partial_bar.close);
1202        assert_eq!(bar.volume, partial_bar.volume);
1203        assert_eq!(builder.ts_last, 2);
1204    }
1205
1206    #[rstest]
1207    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1208        let instrument = InstrumentAny::Equity(equity_aapl);
1209        let bar_type = BarType::new(
1210            instrument.id(),
1211            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1212            AggregationSource::Internal,
1213        );
1214        let mut builder = BarBuilder::new(
1215            bar_type,
1216            instrument.price_precision(),
1217            instrument.size_precision(),
1218        );
1219
1220        builder.update(
1221            Price::from("100.00"),
1222            Quantity::from(1),
1223            UnixNanos::from(1000),
1224        );
1225        builder.update(
1226            Price::from("95.00"),
1227            Quantity::from(1),
1228            UnixNanos::from(2000),
1229        );
1230        builder.update(
1231            Price::from("105.00"),
1232            Quantity::from(1),
1233            UnixNanos::from(3000),
1234        );
1235
1236        let bar = builder.build_now();
1237        assert!(bar.high > bar.low);
1238        assert_eq!(bar.open, Price::from("100.00"));
1239        assert_eq!(bar.high, Price::from("105.00"));
1240        assert_eq!(bar.low, Price::from("95.00"));
1241        assert_eq!(bar.close, Price::from("105.00"));
1242    }
1243
1244    #[rstest]
1245    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1246        let instrument = InstrumentAny::Equity(equity_aapl);
1247        let bar_type = BarType::new(
1248            instrument.id(),
1249            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1250            AggregationSource::Internal,
1251        );
1252        let mut builder = BarBuilder::new(
1253            bar_type,
1254            instrument.price_precision(),
1255            instrument.size_precision(),
1256        );
1257
1258        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1259        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1260
1261        assert_eq!(builder.ts_last, 1_000);
1262        assert_eq!(builder.count, 1);
1263    }
1264
1265    #[rstest]
1266    fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1267        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1268        let bar_type = BarType::new(
1269            instrument.id(),
1270            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1271            AggregationSource::Internal,
1272        );
1273        let mut builder = BarBuilder::new(
1274            bar_type,
1275            instrument.price_precision(),
1276            instrument.size_precision(),
1277        );
1278
1279        let partial_bar = Bar::new(
1280            bar_type,
1281            Price::from("1.00001"),
1282            Price::from("1.00010"),
1283            Price::from("1.00000"),
1284            Price::from("1.00002"),
1285            Quantity::from(1),
1286            UnixNanos::from(1_000_000_000),
1287            UnixNanos::from(2_000_000_000),
1288        );
1289
1290        builder.set_partial(partial_bar);
1291        let bar = builder.build_now();
1292
1293        assert_eq!(bar.open, Price::from("1.00001"));
1294        assert_eq!(bar.high, Price::from("1.00010"));
1295        assert_eq!(bar.low, Price::from("1.00000"));
1296        assert_eq!(bar.close, Price::from("1.00002"));
1297        assert_eq!(bar.volume, Quantity::from(1));
1298        assert_eq!(bar.ts_init, 2_000_000_000);
1299        assert_eq!(builder.ts_last, 2_000_000_000);
1300    }
1301
1302    #[rstest]
1303    fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1304        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1305        let bar_type = BarType::new(
1306            instrument.id(),
1307            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1308            AggregationSource::Internal,
1309        );
1310        let mut builder = BarBuilder::new(
1311            bar_type,
1312            instrument.price_precision(),
1313            instrument.size_precision(),
1314        );
1315
1316        let partial_bar1 = Bar::new(
1317            bar_type,
1318            Price::from("1.00001"),
1319            Price::from("1.00010"),
1320            Price::from("1.00000"),
1321            Price::from("1.00002"),
1322            Quantity::from(1),
1323            UnixNanos::from(1_000_000_000),
1324            UnixNanos::from(1_000_000_000),
1325        );
1326
1327        let partial_bar2 = Bar::new(
1328            bar_type,
1329            Price::from("2.00001"),
1330            Price::from("2.00010"),
1331            Price::from("2.00000"),
1332            Price::from("2.00002"),
1333            Quantity::from(2),
1334            UnixNanos::from(3_000_000_000),
1335            UnixNanos::from(3_000_000_000),
1336        );
1337
1338        builder.set_partial(partial_bar1);
1339        builder.set_partial(partial_bar2);
1340        let bar = builder.build(
1341            UnixNanos::from(4_000_000_000),
1342            UnixNanos::from(4_000_000_000),
1343        );
1344
1345        assert_eq!(bar.open, Price::from("1.00001"));
1346        assert_eq!(bar.high, Price::from("1.00010"));
1347        assert_eq!(bar.low, Price::from("1.00000"));
1348        assert_eq!(bar.close, Price::from("1.00002"));
1349        assert_eq!(bar.volume, Quantity::from(1));
1350        assert_eq!(bar.ts_init, 4_000_000_000);
1351        assert_eq!(builder.ts_last, 1_000_000_000);
1352    }
1353
1354    #[rstest]
1355    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1356        let instrument = InstrumentAny::Equity(equity_aapl);
1357        let bar_type = BarType::new(
1358            instrument.id(),
1359            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1360            AggregationSource::Internal,
1361        );
1362        let mut builder = BarBuilder::new(
1363            bar_type,
1364            instrument.price_precision(),
1365            instrument.size_precision(),
1366        );
1367
1368        builder.update(
1369            Price::from("1.00000"),
1370            Quantity::from(1),
1371            UnixNanos::default(),
1372        );
1373
1374        assert!(builder.initialized);
1375        assert_eq!(builder.ts_last, 0);
1376        assert_eq!(builder.count, 1);
1377    }
1378
1379    #[rstest]
1380    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1381        equity_aapl: Equity,
1382    ) {
1383        let instrument = InstrumentAny::Equity(equity_aapl);
1384        let bar_type = BarType::new(
1385            instrument.id(),
1386            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1387            AggregationSource::Internal,
1388        );
1389        let mut builder = BarBuilder::new(bar_type, 2, 0);
1390
1391        builder.update(
1392            Price::from("1.00000"),
1393            Quantity::from(1),
1394            UnixNanos::from(1_000),
1395        );
1396        builder.update(
1397            Price::from("1.00001"),
1398            Quantity::from(1),
1399            UnixNanos::from(500),
1400        );
1401
1402        assert!(builder.initialized);
1403        assert_eq!(builder.ts_last, 1_000);
1404        assert_eq!(builder.count, 1);
1405    }
1406
1407    #[rstest]
1408    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1409        let instrument = InstrumentAny::Equity(equity_aapl);
1410        let bar_type = BarType::new(
1411            instrument.id(),
1412            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1413            AggregationSource::Internal,
1414        );
1415        let mut builder = BarBuilder::new(
1416            bar_type,
1417            instrument.price_precision(),
1418            instrument.size_precision(),
1419        );
1420
1421        for _ in 0..5 {
1422            builder.update(
1423                Price::from("1.00000"),
1424                Quantity::from(1),
1425                UnixNanos::from(1_000),
1426            );
1427        }
1428
1429        assert_eq!(builder.count, 5);
1430    }
1431
1432    #[rstest]
1433    #[should_panic]
1434    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1435        let instrument = InstrumentAny::Equity(equity_aapl);
1436        let bar_type = BarType::new(
1437            instrument.id(),
1438            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1439            AggregationSource::Internal,
1440        );
1441        let mut builder = BarBuilder::new(
1442            bar_type,
1443            instrument.price_precision(),
1444            instrument.size_precision(),
1445        );
1446        let _ = builder.build_now();
1447    }
1448
1449    #[rstest]
1450    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1451        let instrument = InstrumentAny::Equity(equity_aapl);
1452        let bar_type = BarType::new(
1453            instrument.id(),
1454            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1455            AggregationSource::Internal,
1456        );
1457        let mut builder = BarBuilder::new(
1458            bar_type,
1459            instrument.price_precision(),
1460            instrument.size_precision(),
1461        );
1462
1463        builder.update(
1464            Price::from("1.00001"),
1465            Quantity::from(2),
1466            UnixNanos::default(),
1467        );
1468        builder.update(
1469            Price::from("1.00002"),
1470            Quantity::from(2),
1471            UnixNanos::default(),
1472        );
1473        builder.update(
1474            Price::from("1.00000"),
1475            Quantity::from(1),
1476            UnixNanos::from(1_000_000_000),
1477        );
1478
1479        let bar = builder.build_now();
1480
1481        assert_eq!(bar.open, Price::from("1.00001"));
1482        assert_eq!(bar.high, Price::from("1.00002"));
1483        assert_eq!(bar.low, Price::from("1.00000"));
1484        assert_eq!(bar.close, Price::from("1.00000"));
1485        assert_eq!(bar.volume, Quantity::from(5));
1486        assert_eq!(bar.ts_init, 1_000_000_000);
1487        assert_eq!(builder.ts_last, 1_000_000_000);
1488        assert_eq!(builder.count, 0);
1489    }
1490
1491    #[rstest]
1492    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1493        let instrument = InstrumentAny::Equity(equity_aapl);
1494        let bar_type = BarType::new(
1495            instrument.id(),
1496            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1497            AggregationSource::Internal,
1498        );
1499        let mut builder = BarBuilder::new(bar_type, 2, 0);
1500
1501        builder.update(
1502            Price::from("1.00001"),
1503            Quantity::from(1),
1504            UnixNanos::default(),
1505        );
1506        builder.build_now();
1507
1508        builder.update(
1509            Price::from("1.00000"),
1510            Quantity::from(1),
1511            UnixNanos::default(),
1512        );
1513        builder.update(
1514            Price::from("1.00003"),
1515            Quantity::from(1),
1516            UnixNanos::default(),
1517        );
1518        builder.update(
1519            Price::from("1.00002"),
1520            Quantity::from(1),
1521            UnixNanos::default(),
1522        );
1523
1524        let bar = builder.build_now();
1525
1526        assert_eq!(bar.open, Price::from("1.00000"));
1527        assert_eq!(bar.high, Price::from("1.00003"));
1528        assert_eq!(bar.low, Price::from("1.00000"));
1529        assert_eq!(bar.close, Price::from("1.00002"));
1530        assert_eq!(bar.volume, Quantity::from(3));
1531    }
1532
1533    #[rstest]
1534    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1535        let instrument = InstrumentAny::Equity(equity_aapl);
1536        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1537        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1538        let handler = Arc::new(Mutex::new(Vec::new()));
1539        let handler_clone = Arc::clone(&handler);
1540
1541        let mut aggregator = TickBarAggregator::new(
1542            bar_type,
1543            instrument.price_precision(),
1544            instrument.size_precision(),
1545            move |bar: Bar| {
1546                let mut handler_guard = handler_clone.lock().unwrap();
1547                handler_guard.push(bar);
1548            },
1549            false,
1550        );
1551
1552        let trade = TradeTick::default();
1553        aggregator.handle_trade(trade);
1554
1555        let handler_guard = handler.lock().unwrap();
1556        assert_eq!(handler_guard.len(), 0);
1557    }
1558
1559    #[rstest]
1560    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1561        let instrument = InstrumentAny::Equity(equity_aapl);
1562        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1563        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1564        let handler = Arc::new(Mutex::new(Vec::new()));
1565        let handler_clone = Arc::clone(&handler);
1566
1567        let mut aggregator = TickBarAggregator::new(
1568            bar_type,
1569            instrument.price_precision(),
1570            instrument.size_precision(),
1571            move |bar: Bar| {
1572                let mut handler_guard = handler_clone.lock().unwrap();
1573                handler_guard.push(bar);
1574            },
1575            false,
1576        );
1577
1578        let trade = TradeTick::default();
1579        aggregator.handle_trade(trade);
1580        aggregator.handle_trade(trade);
1581        aggregator.handle_trade(trade);
1582
1583        let handler_guard = handler.lock().unwrap();
1584        let bar = handler_guard.first().unwrap();
1585        assert_eq!(handler_guard.len(), 1);
1586        assert_eq!(bar.open, trade.price);
1587        assert_eq!(bar.high, trade.price);
1588        assert_eq!(bar.low, trade.price);
1589        assert_eq!(bar.close, trade.price);
1590        assert_eq!(bar.volume, Quantity::from(300000));
1591        assert_eq!(bar.ts_event, trade.ts_event);
1592        assert_eq!(bar.ts_init, trade.ts_init);
1593    }
1594
1595    #[rstest]
1596    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1597        let instrument = InstrumentAny::Equity(equity_aapl);
1598        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1599        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1600        let handler = Arc::new(Mutex::new(Vec::new()));
1601        let handler_clone = Arc::clone(&handler);
1602
1603        let mut aggregator = TickBarAggregator::new(
1604            bar_type,
1605            instrument.price_precision(),
1606            instrument.size_precision(),
1607            move |bar: Bar| {
1608                let mut handler_guard = handler_clone.lock().unwrap();
1609                handler_guard.push(bar);
1610            },
1611            false,
1612        );
1613
1614        aggregator.update(
1615            Price::from("1.00001"),
1616            Quantity::from(1),
1617            UnixNanos::default(),
1618        );
1619        aggregator.update(
1620            Price::from("1.00002"),
1621            Quantity::from(1),
1622            UnixNanos::from(1000),
1623        );
1624        aggregator.update(
1625            Price::from("1.00003"),
1626            Quantity::from(1),
1627            UnixNanos::from(2000),
1628        );
1629
1630        let handler_guard = handler.lock().unwrap();
1631        assert_eq!(handler_guard.len(), 1);
1632
1633        let bar = handler_guard.first().unwrap();
1634        assert_eq!(bar.open, Price::from("1.00001"));
1635        assert_eq!(bar.high, Price::from("1.00003"));
1636        assert_eq!(bar.low, Price::from("1.00001"));
1637        assert_eq!(bar.close, Price::from("1.00003"));
1638        assert_eq!(bar.volume, Quantity::from(3));
1639    }
1640
1641    #[rstest]
1642    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1643        let instrument = InstrumentAny::Equity(equity_aapl);
1644        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1645        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1646        let handler = Arc::new(Mutex::new(Vec::new()));
1647        let handler_clone = Arc::clone(&handler);
1648
1649        let mut aggregator = TickBarAggregator::new(
1650            bar_type,
1651            instrument.price_precision(),
1652            instrument.size_precision(),
1653            move |bar: Bar| {
1654                let mut handler_guard = handler_clone.lock().unwrap();
1655                handler_guard.push(bar);
1656            },
1657            false,
1658        );
1659
1660        aggregator.update(
1661            Price::from("1.00001"),
1662            Quantity::from(1),
1663            UnixNanos::default(),
1664        );
1665        aggregator.update(
1666            Price::from("1.00002"),
1667            Quantity::from(1),
1668            UnixNanos::from(1000),
1669        );
1670        aggregator.update(
1671            Price::from("1.00003"),
1672            Quantity::from(1),
1673            UnixNanos::from(2000),
1674        );
1675        aggregator.update(
1676            Price::from("1.00004"),
1677            Quantity::from(1),
1678            UnixNanos::from(3000),
1679        );
1680
1681        let handler_guard = handler.lock().unwrap();
1682        assert_eq!(handler_guard.len(), 2);
1683
1684        let bar1 = &handler_guard[0];
1685        assert_eq!(bar1.open, Price::from("1.00001"));
1686        assert_eq!(bar1.close, Price::from("1.00002"));
1687        assert_eq!(bar1.volume, Quantity::from(2));
1688
1689        let bar2 = &handler_guard[1];
1690        assert_eq!(bar2.open, Price::from("1.00003"));
1691        assert_eq!(bar2.close, Price::from("1.00004"));
1692        assert_eq!(bar2.volume, Quantity::from(2));
1693    }
1694
1695    #[rstest]
1696    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1697        let instrument = InstrumentAny::Equity(equity_aapl);
1698        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1699        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1700        let handler = Arc::new(Mutex::new(Vec::new()));
1701        let handler_clone = Arc::clone(&handler);
1702
1703        let mut aggregator = VolumeBarAggregator::new(
1704            bar_type,
1705            instrument.price_precision(),
1706            instrument.size_precision(),
1707            move |bar: Bar| {
1708                let mut handler_guard = handler_clone.lock().unwrap();
1709                handler_guard.push(bar);
1710            },
1711            false,
1712        );
1713
1714        aggregator.update(
1715            Price::from("1.00001"),
1716            Quantity::from(25),
1717            UnixNanos::default(),
1718        );
1719
1720        let handler_guard = handler.lock().unwrap();
1721        assert_eq!(handler_guard.len(), 2);
1722        let bar1 = &handler_guard[0];
1723        assert_eq!(bar1.volume, Quantity::from(10));
1724        let bar2 = &handler_guard[1];
1725        assert_eq!(bar2.volume, Quantity::from(10));
1726    }
1727
1728    #[rstest]
1729    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1730        let instrument = InstrumentAny::Equity(equity_aapl);
1731        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
1732        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1733        let handler = Arc::new(Mutex::new(Vec::new()));
1734        let handler_clone = Arc::clone(&handler);
1735
1736        let mut aggregator = ValueBarAggregator::new(
1737            bar_type,
1738            instrument.price_precision(),
1739            instrument.size_precision(),
1740            move |bar: Bar| {
1741                let mut handler_guard = handler_clone.lock().unwrap();
1742                handler_guard.push(bar);
1743            },
1744            false,
1745        );
1746
1747        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
1748        aggregator.update(
1749            Price::from("100.00"),
1750            Quantity::from(5),
1751            UnixNanos::default(),
1752        );
1753        aggregator.update(
1754            Price::from("100.00"),
1755            Quantity::from(5),
1756            UnixNanos::from(1000),
1757        );
1758
1759        let handler_guard = handler.lock().unwrap();
1760        assert_eq!(handler_guard.len(), 1);
1761        let bar = handler_guard.first().unwrap();
1762        assert_eq!(bar.volume, Quantity::from(10));
1763    }
1764
1765    #[rstest]
1766    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1767        let instrument = InstrumentAny::Equity(equity_aapl);
1768        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1769        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1770        let handler = Arc::new(Mutex::new(Vec::new()));
1771        let handler_clone = Arc::clone(&handler);
1772
1773        let mut aggregator = ValueBarAggregator::new(
1774            bar_type,
1775            instrument.price_precision(),
1776            instrument.size_precision(),
1777            move |bar: Bar| {
1778                let mut handler_guard = handler_clone.lock().unwrap();
1779                handler_guard.push(bar);
1780            },
1781            false,
1782        );
1783
1784        // Single large update: $100 * 25 = $2500 (should create 2 bars)
1785        aggregator.update(
1786            Price::from("100.00"),
1787            Quantity::from(25),
1788            UnixNanos::default(),
1789        );
1790
1791        let handler_guard = handler.lock().unwrap();
1792        assert_eq!(handler_guard.len(), 2);
1793        let remaining_value = aggregator.get_cumulative_value();
1794        assert!(remaining_value < 1000.0); // Should be less than threshold
1795    }
1796
1797    #[rstest]
1798    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1799        let instrument = InstrumentAny::Equity(equity_aapl);
1800        // One second bars
1801        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1802        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1803        let handler = Arc::new(Mutex::new(Vec::new()));
1804        let handler_clone = Arc::clone(&handler);
1805        let clock = Rc::new(RefCell::new(TestClock::new()));
1806
1807        let mut aggregator = TimeBarAggregator::new(
1808            bar_type,
1809            instrument.price_precision(),
1810            instrument.size_precision(),
1811            clock.clone(),
1812            move |bar: Bar| {
1813                let mut handler_guard = handler_clone.lock().unwrap();
1814                handler_guard.push(bar);
1815            },
1816            false, // await_partial
1817            true,  // build_with_no_updates
1818            false, // timestamp_on_close
1819            BarIntervalType::LeftOpen,
1820            None,  // time_bars_origin
1821            15,    // composite_bar_build_delay
1822            false, // skip_first_non_full_bar
1823        );
1824
1825        aggregator.update(
1826            Price::from("100.00"),
1827            Quantity::from(1),
1828            UnixNanos::default(),
1829        );
1830
1831        let next_sec = UnixNanos::from(1_000_000_000);
1832        clock.borrow_mut().set_time(next_sec);
1833
1834        let event = TimeEvent::new(
1835            Ustr::from("1-SECOND-LAST"),
1836            UUID4::new(),
1837            next_sec,
1838            next_sec,
1839        );
1840        aggregator.build_bar(event);
1841
1842        let handler_guard = handler.lock().unwrap();
1843        assert_eq!(handler_guard.len(), 1);
1844        let bar = handler_guard.first().unwrap();
1845        assert_eq!(bar.ts_event, UnixNanos::default());
1846        assert_eq!(bar.ts_init, next_sec);
1847    }
1848
1849    #[rstest]
1850    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1851        let instrument = InstrumentAny::Equity(equity_aapl);
1852        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1853        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1854        let handler = Arc::new(Mutex::new(Vec::new()));
1855        let handler_clone = Arc::clone(&handler);
1856        let clock = Rc::new(RefCell::new(TestClock::new()));
1857
1858        let mut aggregator = TimeBarAggregator::new(
1859            bar_type,
1860            instrument.price_precision(),
1861            instrument.size_precision(),
1862            clock.clone(),
1863            move |bar: Bar| {
1864                let mut handler_guard = handler_clone.lock().unwrap();
1865                handler_guard.push(bar);
1866            },
1867            false, // await_partial
1868            true,  // build_with_no_updates
1869            true,  // timestamp_on_close - changed to true to verify left-open behavior
1870            BarIntervalType::LeftOpen,
1871            None,
1872            15,
1873            false, // skip_first_non_full_bar
1874        );
1875
1876        // Update in first interval
1877        aggregator.update(
1878            Price::from("100.00"),
1879            Quantity::from(1),
1880            UnixNanos::default(),
1881        );
1882
1883        // First interval close
1884        let ts1 = UnixNanos::from(1_000_000_000);
1885        clock.borrow_mut().set_time(ts1);
1886        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1887        aggregator.build_bar(event);
1888
1889        // Update in second interval
1890        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1891
1892        // Second interval close
1893        let ts2 = UnixNanos::from(2_000_000_000);
1894        clock.borrow_mut().set_time(ts2);
1895        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1896        aggregator.build_bar(event);
1897
1898        let handler_guard = handler.lock().unwrap();
1899        assert_eq!(handler_guard.len(), 2);
1900
1901        let bar1 = &handler_guard[0];
1902        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
1903        assert_eq!(bar1.ts_init, ts1);
1904        assert_eq!(bar1.close, Price::from("100.00"));
1905        let bar2 = &handler_guard[1];
1906        assert_eq!(bar2.ts_event, ts2);
1907        assert_eq!(bar2.ts_init, ts2);
1908        assert_eq!(bar2.close, Price::from("101.00"));
1909    }
1910
1911    #[rstest]
1912    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
1913        let instrument = InstrumentAny::Equity(equity_aapl);
1914        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1915        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1916        let handler = Arc::new(Mutex::new(Vec::new()));
1917        let handler_clone = Arc::clone(&handler);
1918        let clock = Rc::new(RefCell::new(TestClock::new()));
1919        let mut aggregator = TimeBarAggregator::new(
1920            bar_type,
1921            instrument.price_precision(),
1922            instrument.size_precision(),
1923            clock.clone(),
1924            move |bar: Bar| {
1925                let mut handler_guard = handler_clone.lock().unwrap();
1926                handler_guard.push(bar);
1927            },
1928            false, // await_partial
1929            true,  // build_with_no_updates
1930            true,  // timestamp_on_close
1931            BarIntervalType::RightOpen,
1932            None,
1933            15,
1934            false, // skip_first_non_full_bar
1935        );
1936
1937        // Update in first interval
1938        aggregator.update(
1939            Price::from("100.00"),
1940            Quantity::from(1),
1941            UnixNanos::default(),
1942        );
1943
1944        // First interval close
1945        let ts1 = UnixNanos::from(1_000_000_000);
1946        clock.borrow_mut().set_time(ts1);
1947        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1948        aggregator.build_bar(event);
1949
1950        // Update in second interval
1951        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1952
1953        // Second interval close
1954        let ts2 = UnixNanos::from(2_000_000_000);
1955        clock.borrow_mut().set_time(ts2);
1956        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1957        aggregator.build_bar(event);
1958
1959        let handler_guard = handler.lock().unwrap();
1960        assert_eq!(handler_guard.len(), 2);
1961
1962        let bar1 = &handler_guard[0];
1963        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
1964        assert_eq!(bar1.ts_init, ts1);
1965        assert_eq!(bar1.close, Price::from("100.00"));
1966
1967        let bar2 = &handler_guard[1];
1968        assert_eq!(bar2.ts_event, ts1);
1969        assert_eq!(bar2.ts_init, ts2);
1970        assert_eq!(bar2.close, Price::from("101.00"));
1971    }
1972
1973    #[rstest]
1974    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
1975        let instrument = InstrumentAny::Equity(equity_aapl);
1976        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1977        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1978        let handler = Arc::new(Mutex::new(Vec::new()));
1979        let handler_clone = Arc::clone(&handler);
1980        let clock = Rc::new(RefCell::new(TestClock::new()));
1981
1982        // First test with build_with_no_updates = false
1983        let mut aggregator = TimeBarAggregator::new(
1984            bar_type,
1985            instrument.price_precision(),
1986            instrument.size_precision(),
1987            clock.clone(),
1988            move |bar: Bar| {
1989                let mut handler_guard = handler_clone.lock().unwrap();
1990                handler_guard.push(bar);
1991            },
1992            false, // await_partial
1993            false, // build_with_no_updates disabled
1994            true,  // timestamp_on_close
1995            BarIntervalType::LeftOpen,
1996            None,
1997            15,
1998            false, // skip_first_non_full_bar
1999        );
2000
2001        // No updates, just interval close
2002        let ts1 = UnixNanos::from(1_000_000_000);
2003        clock.borrow_mut().set_time(ts1);
2004        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2005        aggregator.build_bar(event);
2006
2007        let handler_guard = handler.lock().unwrap();
2008        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
2009        drop(handler_guard);
2010
2011        // Now test with build_with_no_updates = true
2012        let handler = Arc::new(Mutex::new(Vec::new()));
2013        let handler_clone = Arc::clone(&handler);
2014        let mut aggregator = TimeBarAggregator::new(
2015            bar_type,
2016            instrument.price_precision(),
2017            instrument.size_precision(),
2018            clock.clone(),
2019            move |bar: Bar| {
2020                let mut handler_guard = handler_clone.lock().unwrap();
2021                handler_guard.push(bar);
2022            },
2023            false,
2024            true, // build_with_no_updates enabled
2025            true, // timestamp_on_close
2026            BarIntervalType::LeftOpen,
2027            None,
2028            15,
2029            false, // skip_first_non_full_bar
2030        );
2031
2032        aggregator.update(
2033            Price::from("100.00"),
2034            Quantity::from(1),
2035            UnixNanos::default(),
2036        );
2037
2038        // First interval with update
2039        let ts1 = UnixNanos::from(1_000_000_000);
2040        clock.borrow_mut().set_time(ts1);
2041        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2042        aggregator.build_bar(event);
2043
2044        // Second interval without updates
2045        let ts2 = UnixNanos::from(2_000_000_000);
2046        clock.borrow_mut().set_time(ts2);
2047        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2048        aggregator.build_bar(event);
2049
2050        let handler_guard = handler.lock().unwrap();
2051        assert_eq!(handler_guard.len(), 2); // Both bars should be built
2052        let bar1 = &handler_guard[0];
2053        assert_eq!(bar1.close, Price::from("100.00"));
2054        let bar2 = &handler_guard[1];
2055        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
2056    }
2057
2058    #[rstest]
2059    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2060        let instrument = InstrumentAny::Equity(equity_aapl);
2061        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2062        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2063        let clock = Rc::new(RefCell::new(TestClock::new()));
2064        let handler = Arc::new(Mutex::new(Vec::new()));
2065        let handler_clone = Arc::clone(&handler);
2066
2067        let mut aggregator = TimeBarAggregator::new(
2068            bar_type,
2069            instrument.price_precision(),
2070            instrument.size_precision(),
2071            clock.clone(),
2072            move |bar: Bar| {
2073                let mut handler_guard = handler_clone.lock().unwrap();
2074                handler_guard.push(bar);
2075            },
2076            false, // await_partial
2077            true,  // build_with_no_updates
2078            true,  // timestamp_on_close
2079            BarIntervalType::RightOpen,
2080            None,
2081            15,
2082            false, // skip_first_non_full_bar
2083        );
2084
2085        let ts1 = UnixNanos::from(1_000_000_000);
2086        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2087
2088        let ts2 = UnixNanos::from(2_000_000_000);
2089        clock.borrow_mut().set_time(ts2);
2090
2091        // Simulate timestamp on close
2092        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2093        aggregator.build_bar(event);
2094
2095        let handler_guard = handler.lock().unwrap();
2096        let bar = handler_guard.first().unwrap();
2097        assert_eq!(bar.ts_event, UnixNanos::default());
2098        assert_eq!(bar.ts_init, ts2);
2099    }
2100
2101    #[rstest]
2102    fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2103        let instrument = InstrumentAny::Equity(equity_aapl);
2104        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2105        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2106        let clock = Rc::new(RefCell::new(TestClock::new()));
2107        let handler = Arc::new(Mutex::new(Vec::new()));
2108        let handler_clone = Arc::clone(&handler);
2109
2110        let mut aggregator = TimeBarAggregator::new(
2111            bar_type,
2112            instrument.price_precision(),
2113            instrument.size_precision(),
2114            clock.clone(),
2115            move |bar: Bar| {
2116                let mut handler_guard = handler_clone.lock().unwrap();
2117                handler_guard.push(bar);
2118            },
2119            false, // await_partial
2120            true,  // build_with_no_updates
2121            true,  // timestamp_on_close
2122            BarIntervalType::LeftOpen,
2123            None,
2124            15,
2125            false, // skip_first_non_full_bar
2126        );
2127
2128        let ts1 = UnixNanos::from(1_000_000_000);
2129        clock.borrow_mut().set_time(ts1);
2130
2131        let initial_time = clock.borrow().utc_now();
2132        aggregator.start_batch_time(UnixNanos::from(
2133            initial_time.timestamp_nanos_opt().unwrap() as u64
2134        ));
2135
2136        let handler_guard = handler.lock().unwrap();
2137        assert_eq!(handler_guard.len(), 0);
2138    }
2139}