nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17
18use std::{cell::RefCell, ops::Add, rc::Rc};
19
20use chrono::TimeDelta;
21use nautilus_common::{
22    clock::Clock,
23    timer::{TimeEvent, TimeEventCallback},
24};
25use nautilus_core::{
26    UnixNanos,
27    correctness::{self, FAILED},
28    datetime::{add_n_months_nanos, subtract_n_months_nanos},
29};
30use nautilus_model::{
31    data::{
32        QuoteTick, TradeTick,
33        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
34    },
35    enums::{AggregationSource, BarAggregation, BarIntervalType},
36    types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
37};
38
39pub trait BarAggregator {
40    /// The [`BarType`] to be aggregated.
41    fn bar_type(&self) -> BarType;
42    /// If the aggregator is running and will receive data from the message bus.
43    fn is_running(&self) -> bool;
44    fn set_await_partial(&mut self, value: bool);
45    fn set_is_running(&mut self, value: bool);
46    /// Updates the aggregator  with the given price and size.
47    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
48    /// Updates the aggregator with the given quote.
49    fn handle_quote(&mut self, quote: QuoteTick) {
50        let spec = self.bar_type().spec();
51        if !self.await_partial() {
52            self.update(
53                quote.extract_price(spec.price_type),
54                quote.extract_size(spec.price_type),
55                quote.ts_event,
56            );
57        }
58    }
59    /// Updates the aggregator with the given trade.
60    fn handle_trade(&mut self, trade: TradeTick) {
61        if !self.await_partial() {
62            self.update(trade.price, trade.size, trade.ts_event);
63        }
64    }
65    /// Updates the aggregator with the given bar.
66    fn handle_bar(&mut self, bar: Bar) {
67        if !self.await_partial() {
68            self.update_bar(bar, bar.volume, bar.ts_init);
69        }
70    }
71    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
72    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
73    fn stop_batch_update(&mut self);
74    fn await_partial(&self) -> bool;
75    fn set_partial(&mut self, partial_bar: Bar);
76}
77
78/// Provides a generic bar builder for aggregation.
79pub struct BarBuilder {
80    bar_type: BarType,
81    price_precision: u8,
82    size_precision: u8,
83    initialized: bool,
84    ts_last: UnixNanos,
85    count: usize,
86    partial_set: bool,
87    last_close: Option<Price>,
88    open: Option<Price>,
89    high: Option<Price>,
90    low: Option<Price>,
91    close: Option<Price>,
92    volume: Quantity,
93}
94
95impl BarBuilder {
96    /// Creates a new [`BarBuilder`] instance.
97    ///
98    /// # Panics
99    ///
100    /// This function panics:
101    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
102    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
103    #[must_use]
104    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
105        correctness::check_equal(
106            bar_type.aggregation_source(),
107            AggregationSource::Internal,
108            "bar_type.aggregation_source",
109            "AggregationSource::Internal",
110        )
111        .expect(FAILED);
112
113        Self {
114            bar_type,
115            price_precision,
116            size_precision,
117            initialized: false,
118            ts_last: UnixNanos::default(),
119            count: 0,
120            partial_set: false,
121            last_close: None,
122            open: None,
123            high: None,
124            low: None,
125            close: None,
126            volume: Quantity::zero(size_precision),
127        }
128    }
129
130    /// Set the initial values for a partially completed bar.
131    pub fn set_partial(&mut self, partial_bar: Bar) {
132        if self.partial_set {
133            return; // Already updated
134        }
135
136        self.open = Some(partial_bar.open);
137
138        if self.high.is_none() || partial_bar.high > self.high.unwrap() {
139            self.high = Some(partial_bar.high);
140        }
141
142        if self.low.is_none() || partial_bar.low < self.low.unwrap() {
143            self.low = Some(partial_bar.low);
144        }
145
146        if self.close.is_none() {
147            self.close = Some(partial_bar.close);
148        }
149
150        self.volume = partial_bar.volume;
151
152        if self.ts_last == 0 {
153            self.ts_last = partial_bar.ts_init;
154        }
155
156        self.partial_set = true;
157        self.initialized = true;
158    }
159
160    /// Update the bar builder.
161    pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
162        if ts_event < self.ts_last {
163            return; // Not applicable
164        }
165
166        if self.open.is_none() {
167            self.open = Some(price);
168            self.high = Some(price);
169            self.low = Some(price);
170            self.initialized = true;
171        } else {
172            if price > self.high.unwrap() {
173                self.high = Some(price);
174            }
175            if price < self.low.unwrap() {
176                self.low = Some(price);
177            }
178        }
179
180        self.close = Some(price);
181        self.volume = self.volume.add(size);
182        self.count += 1;
183        self.ts_last = ts_event;
184    }
185
186    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
187        if ts_init < self.ts_last {
188            return; // Not applicable
189        }
190
191        if self.open.is_none() {
192            self.open = Some(bar.open);
193            self.high = Some(bar.high);
194            self.low = Some(bar.low);
195            self.initialized = true;
196        } else {
197            if bar.high > self.high.unwrap() {
198                self.high = Some(bar.high);
199            }
200            if bar.low < self.low.unwrap() {
201                self.low = Some(bar.low);
202            }
203        }
204
205        self.close = Some(bar.close);
206        self.volume = self.volume.add(volume);
207        self.count += 1;
208        self.ts_last = ts_init;
209    }
210
211    /// Reset the bar builder.
212    ///
213    /// All stateful fields are reset to their initial value.
214    pub fn reset(&mut self) {
215        self.open = None;
216        self.high = None;
217        self.low = None;
218        self.volume = Quantity::zero(self.size_precision);
219        self.count = 0;
220    }
221
222    /// Return the aggregated bar and reset.
223    pub fn build_now(&mut self) -> Bar {
224        self.build(self.ts_last, self.ts_last)
225    }
226
227    /// Return the aggregated bar with the given closing timestamp, and reset.
228    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
229        if self.open.is_none() {
230            self.open = self.last_close;
231            self.high = self.last_close;
232            self.low = self.last_close;
233            self.close = self.last_close;
234        }
235
236        // SAFETY: The open was checked, so we can assume all prices are Some
237        let bar = Bar::new(
238            self.bar_type,
239            self.open.unwrap(),
240            self.high.unwrap(),
241            self.low.unwrap(),
242            self.close.unwrap(),
243            self.volume,
244            ts_event,
245            ts_init,
246        );
247
248        self.last_close = self.close;
249        self.reset();
250        bar
251    }
252}
253
254/// Provides a means of aggregating specified bar types and sending to a registered handler.
255pub struct BarAggregatorCore<H>
256where
257    H: FnMut(Bar),
258{
259    bar_type: BarType,
260    builder: BarBuilder,
261    handler: H,
262    handler_backup: Option<H>,
263    batch_handler: Option<Box<dyn FnMut(Bar)>>,
264    await_partial: bool,
265    is_running: bool,
266    batch_mode: bool,
267}
268
269impl<H> BarAggregatorCore<H>
270where
271    H: FnMut(Bar),
272{
273    /// Creates a new [`BarAggregatorCore`] instance.
274    ///
275    /// # Panics
276    ///
277    /// This function panics:
278    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
279    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
280    pub fn new(
281        bar_type: BarType,
282        price_precision: u8,
283        size_precision: u8,
284        handler: H,
285        await_partial: bool,
286    ) -> Self {
287        Self {
288            bar_type,
289            builder: BarBuilder::new(bar_type, price_precision, size_precision),
290            handler,
291            handler_backup: None,
292            batch_handler: None,
293            await_partial,
294            is_running: false,
295            batch_mode: false,
296        }
297    }
298
299    pub const fn set_await_partial(&mut self, value: bool) {
300        self.await_partial = value;
301    }
302
303    pub const fn set_is_running(&mut self, value: bool) {
304        self.is_running = value;
305    }
306
307    pub const fn await_partial(&self) -> bool {
308        self.await_partial
309    }
310
311    /// Set the initial values for a partially completed bar.
312    pub fn set_partial(&mut self, partial_bar: Bar) {
313        self.builder.set_partial(partial_bar);
314    }
315
316    fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
317        self.builder.update(price, size, ts_event);
318    }
319
320    fn build_now_and_send(&mut self) {
321        let bar = self.builder.build_now();
322        (self.handler)(bar);
323    }
324
325    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
326        let bar = self.builder.build(ts_event, ts_init);
327
328        if self.batch_mode {
329            if let Some(handler) = &mut self.batch_handler {
330                handler(bar);
331            }
332        } else {
333            (self.handler)(bar);
334        }
335    }
336
337    pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
338        self.batch_mode = true;
339        self.batch_handler = Some(handler);
340    }
341
342    pub fn stop_batch_update(&mut self) {
343        self.batch_mode = false;
344
345        if let Some(handler) = self.handler_backup.take() {
346            self.handler = handler;
347        }
348    }
349}
350
351/// Provides a means of building tick bars aggregated from quote and trades.
352///
353/// When received tick count reaches the step threshold of the bar
354/// specification, then a bar is created and sent to the handler.
355pub struct TickBarAggregator<H>
356where
357    H: FnMut(Bar),
358{
359    core: BarAggregatorCore<H>,
360    cum_value: f64,
361}
362
363impl<H> TickBarAggregator<H>
364where
365    H: FnMut(Bar),
366{
367    /// Creates a new [`TickBarAggregator`] instance.
368    ///
369    /// # Panics
370    ///
371    /// This function panics:
372    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
373    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
374    pub fn new(
375        bar_type: BarType,
376        price_precision: u8,
377        size_precision: u8,
378        handler: H,
379        await_partial: bool,
380    ) -> Self {
381        Self {
382            core: BarAggregatorCore::new(
383                bar_type,
384                price_precision,
385                size_precision,
386                handler,
387                await_partial,
388            ),
389            cum_value: 0.0,
390        }
391    }
392}
393
394impl<H> BarAggregator for TickBarAggregator<H>
395where
396    H: FnMut(Bar),
397{
398    fn bar_type(&self) -> BarType {
399        self.core.bar_type
400    }
401
402    fn is_running(&self) -> bool {
403        self.core.is_running
404    }
405
406    fn set_await_partial(&mut self, value: bool) {
407        self.core.set_await_partial(value);
408    }
409
410    fn set_is_running(&mut self, value: bool) {
411        self.core.set_is_running(value);
412    }
413
414    fn await_partial(&self) -> bool {
415        self.core.await_partial()
416    }
417
418    /// Apply the given update to the aggregator.
419    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
420        self.core.apply_update(price, size, ts_event);
421        let spec = self.core.bar_type.spec();
422
423        if self.core.builder.count >= spec.step.get() {
424            self.core.build_now_and_send();
425        }
426    }
427
428    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
429        let mut volume_update = volume;
430        let average_price = Price::new(
431            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
432            self.core.builder.price_precision,
433        );
434
435        while volume_update.as_f64() > 0.0 {
436            let value_update = average_price.as_f64() * volume_update.as_f64();
437            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
438                self.cum_value += value_update;
439                self.core.builder.update_bar(bar, volume_update, ts_init);
440                break;
441            }
442
443            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
444            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
445            self.core.builder.update_bar(
446                bar,
447                Quantity::new(volume_diff, volume_update.precision),
448                ts_init,
449            );
450
451            self.core.build_now_and_send();
452            self.cum_value = 0.0;
453            volume_update = Quantity::new(
454                volume_update.as_f64() - volume_diff,
455                volume_update.precision,
456            );
457        }
458    }
459
460    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
461        self.core.start_batch_update(handler);
462    }
463
464    fn stop_batch_update(&mut self) {
465        self.core.stop_batch_update();
466    }
467
468    fn set_partial(&mut self, partial_bar: Bar) {
469        self.core.set_partial(partial_bar);
470    }
471}
472
473/// Provides a means of building volume bars aggregated from quote and trades.
474pub struct VolumeBarAggregator<H>
475where
476    H: FnMut(Bar),
477{
478    core: BarAggregatorCore<H>,
479}
480
481impl<H> VolumeBarAggregator<H>
482where
483    H: FnMut(Bar),
484{
485    /// Creates a new [`VolumeBarAggregator`] instance.
486    ///
487    /// # Panics
488    ///
489    /// This function panics:
490    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
491    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
492    pub fn new(
493        bar_type: BarType,
494        price_precision: u8,
495        size_precision: u8,
496        handler: H,
497        await_partial: bool,
498    ) -> Self {
499        Self {
500            core: BarAggregatorCore::new(
501                bar_type.standard(),
502                price_precision,
503                size_precision,
504                handler,
505                await_partial,
506            ),
507        }
508    }
509}
510
511impl<H> BarAggregator for VolumeBarAggregator<H>
512where
513    H: FnMut(Bar),
514{
515    fn bar_type(&self) -> BarType {
516        self.core.bar_type
517    }
518
519    fn is_running(&self) -> bool {
520        self.core.is_running
521    }
522
523    fn set_await_partial(&mut self, value: bool) {
524        self.core.set_await_partial(value);
525    }
526
527    fn set_is_running(&mut self, value: bool) {
528        self.core.set_is_running(value);
529    }
530
531    fn await_partial(&self) -> bool {
532        self.core.await_partial()
533    }
534
535    /// Apply the given update to the aggregator.
536    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
537        let mut raw_size_update = size.raw;
538        let spec = self.core.bar_type.spec();
539        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
540
541        while raw_size_update > 0 {
542            if self.core.builder.volume.raw + raw_size_update < raw_step {
543                self.core.apply_update(
544                    price,
545                    Quantity::from_raw(raw_size_update, size.precision),
546                    ts_event,
547                );
548                break;
549            }
550
551            let raw_size_diff = raw_step - self.core.builder.volume.raw;
552            self.core.apply_update(
553                price,
554                Quantity::from_raw(raw_size_diff, size.precision),
555                ts_event,
556            );
557
558            self.core.build_now_and_send();
559            raw_size_update -= raw_size_diff;
560        }
561    }
562
563    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
564        let mut raw_volume_update = volume.raw;
565        let spec = self.core.bar_type.spec();
566        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
567
568        while raw_volume_update > 0 {
569            if self.core.builder.volume.raw + raw_volume_update < raw_step {
570                self.core.builder.update_bar(
571                    bar,
572                    Quantity::from_raw(raw_volume_update, volume.precision),
573                    ts_init,
574                );
575                break;
576            }
577
578            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
579            self.core.builder.update_bar(
580                bar,
581                Quantity::from_raw(raw_volume_diff, volume.precision),
582                ts_init,
583            );
584
585            self.core.build_now_and_send();
586            raw_volume_update -= raw_volume_diff;
587        }
588    }
589
590    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
591        self.core.start_batch_update(handler);
592    }
593
594    fn stop_batch_update(&mut self) {
595        self.core.stop_batch_update();
596    }
597
598    fn set_partial(&mut self, partial_bar: Bar) {
599        self.core.set_partial(partial_bar);
600    }
601}
602
603/// Provides a means of building value bars aggregated from quote and trades.
604///
605/// When received value reaches the step threshold of the bar
606/// specification, then a bar is created and sent to the handler.
607pub struct ValueBarAggregator<H>
608where
609    H: FnMut(Bar),
610{
611    core: BarAggregatorCore<H>,
612    cum_value: f64,
613}
614
615impl<H> ValueBarAggregator<H>
616where
617    H: FnMut(Bar),
618{
619    /// Creates a new [`ValueBarAggregator`] instance.
620    ///
621    /// # Panics
622    ///
623    /// This function panics:
624    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
625    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
626    pub fn new(
627        bar_type: BarType,
628        price_precision: u8,
629        size_precision: u8,
630        handler: H,
631        await_partial: bool,
632    ) -> Self {
633        Self {
634            core: BarAggregatorCore::new(
635                bar_type.standard(),
636                price_precision,
637                size_precision,
638                handler,
639                await_partial,
640            ),
641            cum_value: 0.0,
642        }
643    }
644
645    #[must_use]
646    /// Returns the cumulative value for the aggregator.
647    pub const fn get_cumulative_value(&self) -> f64 {
648        self.cum_value
649    }
650}
651
652impl<H> BarAggregator for ValueBarAggregator<H>
653where
654    H: FnMut(Bar),
655{
656    fn bar_type(&self) -> BarType {
657        self.core.bar_type
658    }
659
660    fn is_running(&self) -> bool {
661        self.core.is_running
662    }
663
664    fn set_await_partial(&mut self, value: bool) {
665        self.core.set_await_partial(value);
666    }
667
668    fn set_is_running(&mut self, value: bool) {
669        self.core.set_is_running(value);
670    }
671
672    fn await_partial(&self) -> bool {
673        self.core.await_partial()
674    }
675
676    /// Apply the given update to the aggregator.
677    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
678        let mut size_update = size.as_f64();
679        let spec = self.core.bar_type.spec();
680
681        while size_update > 0.0 {
682            let value_update = price.as_f64() * size_update;
683            if self.cum_value + value_update < spec.step.get() as f64 {
684                self.cum_value += value_update;
685                self.core
686                    .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
687                break;
688            }
689
690            let value_diff = spec.step.get() as f64 - self.cum_value;
691            let size_diff = size_update * (value_diff / value_update);
692            self.core
693                .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
694
695            self.core.build_now_and_send();
696            self.cum_value = 0.0;
697            size_update -= size_diff;
698        }
699    }
700
701    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
702        let mut volume_update = volume;
703        let average_price = Price::new(
704            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
705            self.core.builder.price_precision,
706        );
707
708        while volume_update.as_f64() > 0.0 {
709            let value_update = average_price.as_f64() * volume_update.as_f64();
710            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
711                self.cum_value += value_update;
712                self.core.builder.update_bar(bar, volume_update, ts_init);
713                break;
714            }
715
716            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
717            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
718            self.core.builder.update_bar(
719                bar,
720                Quantity::new(volume_diff, volume_update.precision),
721                ts_init,
722            );
723
724            self.core.build_now_and_send();
725            self.cum_value = 0.0;
726            volume_update = Quantity::new(
727                volume_update.as_f64() - volume_diff,
728                volume_update.precision,
729            );
730        }
731    }
732
733    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
734        self.core.start_batch_update(handler);
735    }
736
737    fn stop_batch_update(&mut self) {
738        self.core.stop_batch_update();
739    }
740
741    fn set_partial(&mut self, partial_bar: Bar) {
742        self.core.set_partial(partial_bar);
743    }
744}
745
746/// Provides a means of building time bars aggregated from quote and trades.
747///
748/// At each aggregation time interval, a bar is created and sent to the handler.
749pub struct TimeBarAggregator<H>
750where
751    H: FnMut(Bar),
752{
753    core: BarAggregatorCore<H>,
754    clock: Rc<RefCell<dyn Clock>>,
755    build_with_no_updates: bool,
756    timestamp_on_close: bool,
757    is_left_open: bool,
758    build_on_next_tick: bool,
759    stored_open_ns: UnixNanos,
760    stored_close_ns: UnixNanos,
761    timer_name: String,
762    interval_ns: UnixNanos,
763    next_close_ns: UnixNanos,
764    composite_bar_build_delay: i64,
765    add_delay: bool,
766    batch_open_ns: UnixNanos,
767    batch_next_close_ns: UnixNanos,
768    time_bars_origin: Option<TimeDelta>,
769    skip_first_non_full_bar: bool,
770}
771
772#[derive(Clone)]
773pub struct NewBarCallback<H: FnMut(Bar)> {
774    aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
775}
776
777impl<H: FnMut(Bar)> NewBarCallback<H> {
778    pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
779        Self { aggregator }
780    }
781}
782
783impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
784    fn from(value: NewBarCallback<H>) -> Self {
785        Self::Rust(Rc::new(move |event: TimeEvent| {
786            value.aggregator.borrow_mut().build_bar(event);
787        }))
788    }
789}
790
791impl<H> TimeBarAggregator<H>
792where
793    H: FnMut(Bar) + 'static,
794{
795    /// Creates a new [`TimeBarAggregator`] instance.
796    ///
797    /// # Panics
798    ///
799    /// This function panics:
800    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
801    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
802    #[allow(clippy::too_many_arguments)]
803    pub fn new(
804        bar_type: BarType,
805        price_precision: u8,
806        size_precision: u8,
807        clock: Rc<RefCell<dyn Clock>>,
808        handler: H,
809        await_partial: bool,
810        build_with_no_updates: bool,
811        timestamp_on_close: bool,
812        interval_type: BarIntervalType,
813        time_bars_origin: Option<TimeDelta>,
814        composite_bar_build_delay: i64,
815        skip_first_non_full_bar: bool,
816    ) -> Self {
817        let is_left_open = match interval_type {
818            BarIntervalType::LeftOpen => true,
819            BarIntervalType::RightOpen => false,
820        };
821
822        let add_delay = bar_type.is_composite()
823            && bar_type.composite().aggregation_source() == AggregationSource::Internal;
824
825        let core = BarAggregatorCore::new(
826            bar_type.standard(),
827            price_precision,
828            size_precision,
829            handler,
830            await_partial,
831        );
832
833        Self {
834            core,
835            clock,
836            build_with_no_updates,
837            timestamp_on_close,
838            is_left_open,
839            build_on_next_tick: false,
840            stored_open_ns: UnixNanos::default(),
841            stored_close_ns: UnixNanos::default(),
842            timer_name: bar_type.to_string(),
843            interval_ns: get_bar_interval_ns(&bar_type),
844            next_close_ns: UnixNanos::default(),
845            composite_bar_build_delay,
846            add_delay,
847            batch_open_ns: UnixNanos::default(),
848            batch_next_close_ns: UnixNanos::default(),
849            time_bars_origin,
850            skip_first_non_full_bar,
851        }
852    }
853
854    /// Starts the time bar aggregator.
855    pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
856        let now = self.clock.borrow().utc_now();
857        let mut start_time = get_time_bar_start(now, &self.bar_type(), self.time_bars_origin);
858
859        if start_time == now {
860            self.skip_first_non_full_bar = false;
861        }
862
863        if self.add_delay {
864            start_time += TimeDelta::microseconds(self.composite_bar_build_delay);
865        }
866
867        let spec = &self.bar_type().spec();
868        let start_time_ns = UnixNanos::from(start_time);
869
870        if spec.aggregation == BarAggregation::Month {
871            let step = spec.step.get() as u32;
872            let alert_time_ns = add_n_months_nanos(start_time_ns, step);
873
874            self.clock
875                .borrow_mut()
876                .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()))
877                .expect(FAILED);
878        } else {
879            self.clock
880                .borrow_mut()
881                .set_timer_ns(
882                    &self.timer_name,
883                    self.interval_ns.as_u64(),
884                    start_time_ns,
885                    None,
886                    Some(callback.into()),
887                )
888                .expect(FAILED);
889        }
890
891        log::debug!("Started timer {}", self.timer_name);
892        Ok(())
893    }
894
895    /// Stops the time bar aggregator.
896    pub fn stop(&mut self) {
897        self.clock.borrow_mut().cancel_timer(&self.timer_name);
898    }
899
900    pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
901        let spec = self.bar_type().spec();
902        self.core.batch_mode = true;
903
904        let time = time_ns.to_datetime_utc();
905        let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin);
906        self.batch_open_ns = UnixNanos::from(start_time);
907
908        if spec.aggregation == BarAggregation::Month {
909            let step = spec.step.get() as u32;
910
911            if self.batch_open_ns == time_ns {
912                self.batch_open_ns = subtract_n_months_nanos(self.batch_open_ns, step);
913            }
914
915            self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step);
916        } else {
917            if self.batch_open_ns == time_ns {
918                self.batch_open_ns -= self.interval_ns;
919            }
920
921            self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
922        }
923    }
924
925    const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
926        if self.is_left_open {
927            if self.timestamp_on_close {
928                close_ns
929            } else {
930                open_ns
931            }
932        } else {
933            open_ns
934        }
935    }
936
937    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
938        if self.skip_first_non_full_bar {
939            self.core.builder.reset();
940            self.skip_first_non_full_bar = false;
941        } else {
942            self.core.build_and_send(ts_event, ts_init);
943        }
944    }
945
946    fn batch_pre_update(&mut self, time_ns: UnixNanos) {
947        if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
948            let ts_init = self.batch_next_close_ns;
949            let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
950            self.build_and_send(ts_event, ts_init);
951        }
952    }
953
954    fn batch_post_update(&mut self, time_ns: UnixNanos) {
955        let step = self.bar_type().spec().step.get() as u32;
956
957        // If not in batch mode and time matches next close, reset batch close
958        if !self.core.batch_mode
959            && time_ns == self.batch_next_close_ns
960            && time_ns > self.stored_open_ns
961        {
962            self.batch_next_close_ns = UnixNanos::default();
963            return;
964        }
965
966        if time_ns > self.batch_next_close_ns {
967            // Ensure batch times are coherent with last builder update
968            if self.bar_type().spec().aggregation == BarAggregation::Month {
969                while self.batch_next_close_ns < time_ns {
970                    self.batch_next_close_ns = add_n_months_nanos(self.batch_next_close_ns, step);
971                }
972
973                self.batch_open_ns = subtract_n_months_nanos(self.batch_next_close_ns, step);
974            } else {
975                while self.batch_next_close_ns < time_ns {
976                    self.batch_next_close_ns += self.interval_ns;
977                }
978
979                self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
980            }
981        }
982
983        if time_ns == self.batch_next_close_ns {
984            let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
985            self.build_and_send(ts_event, time_ns);
986            self.batch_open_ns = self.batch_next_close_ns;
987
988            if self.bar_type().spec().aggregation == BarAggregation::Month {
989                self.batch_next_close_ns = add_n_months_nanos(self.batch_next_close_ns, step);
990            } else {
991                self.batch_next_close_ns += self.interval_ns;
992            }
993        }
994
995        // Delay resetting batch_next_close_ns to allow creating a last historical bar when transitioning to regular bars
996        if !self.core.batch_mode {
997            self.batch_next_close_ns = UnixNanos::default();
998        }
999    }
1000
1001    fn build_bar(&mut self, event: TimeEvent) {
1002        if !self.core.builder.initialized {
1003            self.build_on_next_tick = true;
1004            self.stored_close_ns = self.next_close_ns;
1005            return;
1006        }
1007
1008        if !self.build_with_no_updates && self.core.builder.count == 0 {
1009            return;
1010        }
1011
1012        let ts_init = event.ts_event;
1013        let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1014        self.build_and_send(ts_event, ts_init);
1015
1016        self.stored_open_ns = ts_init;
1017
1018        if self.bar_type().spec().aggregation == BarAggregation::Month {
1019            let step = self.bar_type().spec().step.get() as u32;
1020            let next_alert_ns = add_n_months_nanos(ts_init, step);
1021
1022            self.clock
1023                .borrow_mut()
1024                .set_time_alert_ns(&self.timer_name, next_alert_ns, None)
1025                .expect(FAILED);
1026
1027            self.next_close_ns = next_alert_ns;
1028        } else {
1029            self.next_close_ns = self.clock.borrow().next_time_ns(&self.timer_name);
1030        }
1031    }
1032}
1033
1034impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1035where
1036    H: FnMut(Bar) + 'static,
1037{
1038    fn bar_type(&self) -> BarType {
1039        self.core.bar_type
1040    }
1041
1042    fn is_running(&self) -> bool {
1043        self.core.is_running
1044    }
1045
1046    fn set_await_partial(&mut self, value: bool) {
1047        self.core.set_await_partial(value);
1048    }
1049
1050    fn set_is_running(&mut self, value: bool) {
1051        self.core.set_is_running(value);
1052    }
1053
1054    fn await_partial(&self) -> bool {
1055        self.core.await_partial()
1056    }
1057
1058    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1059        if self.batch_next_close_ns != UnixNanos::default() {
1060            self.batch_pre_update(ts_event);
1061        }
1062
1063        self.core.apply_update(price, size, ts_event);
1064
1065        if self.build_on_next_tick {
1066            if ts_event <= self.stored_close_ns {
1067                let ts_init = ts_event;
1068                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1069                self.build_and_send(ts_event, ts_init);
1070            }
1071
1072            self.build_on_next_tick = false;
1073            self.stored_close_ns = UnixNanos::default();
1074        }
1075
1076        if self.batch_next_close_ns != UnixNanos::default() {
1077            self.batch_post_update(ts_event);
1078        }
1079    }
1080
1081    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1082        if self.batch_next_close_ns != UnixNanos::default() {
1083            self.batch_pre_update(ts_init);
1084        }
1085
1086        self.core.builder.update_bar(bar, volume, ts_init);
1087
1088        if self.build_on_next_tick {
1089            if ts_init <= self.stored_close_ns {
1090                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1091                self.build_and_send(ts_event, ts_init);
1092            }
1093
1094            // Reset flag and clear stored close
1095            self.build_on_next_tick = false;
1096            self.stored_close_ns = UnixNanos::default();
1097        }
1098
1099        if self.batch_next_close_ns != UnixNanos::default() {
1100            self.batch_post_update(ts_init);
1101        }
1102    }
1103
1104    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1105        self.core.start_batch_update(handler);
1106        self.start_batch_time(time_ns);
1107    }
1108
1109    fn stop_batch_update(&mut self) {
1110        self.core.stop_batch_update();
1111    }
1112
1113    fn set_partial(&mut self, partial_bar: Bar) {
1114        self.core.set_partial(partial_bar);
1115    }
1116}
1117
1118////////////////////////////////////////////////////////////////////////////////
1119// Tests
1120////////////////////////////////////////////////////////////////////////////////
1121#[cfg(test)]
1122mod tests {
1123    use std::sync::{Arc, Mutex};
1124
1125    use nautilus_common::clock::TestClock;
1126    use nautilus_core::UUID4;
1127    use nautilus_model::{
1128        data::{BarSpecification, BarType},
1129        enums::{AggregationSource, BarAggregation, PriceType},
1130        instruments::{CurrencyPair, Equity, InstrumentAny, stubs::*},
1131        types::{Price, Quantity},
1132    };
1133    use rstest::rstest;
1134    use ustr::Ustr;
1135
1136    use super::*;
1137
1138    #[rstest]
1139    fn test_bar_builder_initialization(equity_aapl: Equity) {
1140        let instrument = InstrumentAny::Equity(equity_aapl);
1141        let bar_type = BarType::new(
1142            instrument.id(),
1143            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1144            AggregationSource::Internal,
1145        );
1146        let builder = BarBuilder::new(
1147            bar_type,
1148            instrument.price_precision(),
1149            instrument.size_precision(),
1150        );
1151
1152        assert!(!builder.initialized);
1153        assert_eq!(builder.ts_last, 0);
1154        assert_eq!(builder.count, 0);
1155    }
1156
1157    #[rstest]
1158    fn test_set_partial_update(equity_aapl: Equity) {
1159        let instrument = InstrumentAny::Equity(equity_aapl);
1160        let bar_type = BarType::new(
1161            instrument.id(),
1162            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1163            AggregationSource::Internal,
1164        );
1165        let mut builder = BarBuilder::new(
1166            bar_type,
1167            instrument.price_precision(),
1168            instrument.size_precision(),
1169        );
1170
1171        let partial_bar = Bar::new(
1172            bar_type,
1173            Price::from("101.00"),
1174            Price::from("102.00"),
1175            Price::from("100.00"),
1176            Price::from("101.00"),
1177            Quantity::from(100),
1178            UnixNanos::from(1),
1179            UnixNanos::from(2),
1180        );
1181
1182        builder.set_partial(partial_bar);
1183        let bar = builder.build_now();
1184
1185        assert_eq!(bar.open, partial_bar.open);
1186        assert_eq!(bar.high, partial_bar.high);
1187        assert_eq!(bar.low, partial_bar.low);
1188        assert_eq!(bar.close, partial_bar.close);
1189        assert_eq!(bar.volume, partial_bar.volume);
1190        assert_eq!(builder.ts_last, 2);
1191    }
1192
1193    #[rstest]
1194    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1195        let instrument = InstrumentAny::Equity(equity_aapl);
1196        let bar_type = BarType::new(
1197            instrument.id(),
1198            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1199            AggregationSource::Internal,
1200        );
1201        let mut builder = BarBuilder::new(
1202            bar_type,
1203            instrument.price_precision(),
1204            instrument.size_precision(),
1205        );
1206
1207        builder.update(
1208            Price::from("100.00"),
1209            Quantity::from(1),
1210            UnixNanos::from(1000),
1211        );
1212        builder.update(
1213            Price::from("95.00"),
1214            Quantity::from(1),
1215            UnixNanos::from(2000),
1216        );
1217        builder.update(
1218            Price::from("105.00"),
1219            Quantity::from(1),
1220            UnixNanos::from(3000),
1221        );
1222
1223        let bar = builder.build_now();
1224        assert!(bar.high > bar.low);
1225        assert_eq!(bar.open, Price::from("100.00"));
1226        assert_eq!(bar.high, Price::from("105.00"));
1227        assert_eq!(bar.low, Price::from("95.00"));
1228        assert_eq!(bar.close, Price::from("105.00"));
1229    }
1230
1231    #[rstest]
1232    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1233        let instrument = InstrumentAny::Equity(equity_aapl);
1234        let bar_type = BarType::new(
1235            instrument.id(),
1236            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1237            AggregationSource::Internal,
1238        );
1239        let mut builder = BarBuilder::new(
1240            bar_type,
1241            instrument.price_precision(),
1242            instrument.size_precision(),
1243        );
1244
1245        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1246        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1247
1248        assert_eq!(builder.ts_last, 1_000);
1249        assert_eq!(builder.count, 1);
1250    }
1251
1252    #[rstest]
1253    fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1254        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1255        let bar_type = BarType::new(
1256            instrument.id(),
1257            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1258            AggregationSource::Internal,
1259        );
1260        let mut builder = BarBuilder::new(
1261            bar_type,
1262            instrument.price_precision(),
1263            instrument.size_precision(),
1264        );
1265
1266        let partial_bar = Bar::new(
1267            bar_type,
1268            Price::from("1.00001"),
1269            Price::from("1.00010"),
1270            Price::from("1.00000"),
1271            Price::from("1.00002"),
1272            Quantity::from(1),
1273            UnixNanos::from(1_000_000_000),
1274            UnixNanos::from(2_000_000_000),
1275        );
1276
1277        builder.set_partial(partial_bar);
1278        let bar = builder.build_now();
1279
1280        assert_eq!(bar.open, Price::from("1.00001"));
1281        assert_eq!(bar.high, Price::from("1.00010"));
1282        assert_eq!(bar.low, Price::from("1.00000"));
1283        assert_eq!(bar.close, Price::from("1.00002"));
1284        assert_eq!(bar.volume, Quantity::from(1));
1285        assert_eq!(bar.ts_init, 2_000_000_000);
1286        assert_eq!(builder.ts_last, 2_000_000_000);
1287    }
1288
1289    #[rstest]
1290    fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1291        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1292        let bar_type = BarType::new(
1293            instrument.id(),
1294            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1295            AggregationSource::Internal,
1296        );
1297        let mut builder = BarBuilder::new(
1298            bar_type,
1299            instrument.price_precision(),
1300            instrument.size_precision(),
1301        );
1302
1303        let partial_bar1 = Bar::new(
1304            bar_type,
1305            Price::from("1.00001"),
1306            Price::from("1.00010"),
1307            Price::from("1.00000"),
1308            Price::from("1.00002"),
1309            Quantity::from(1),
1310            UnixNanos::from(1_000_000_000),
1311            UnixNanos::from(1_000_000_000),
1312        );
1313
1314        let partial_bar2 = Bar::new(
1315            bar_type,
1316            Price::from("2.00001"),
1317            Price::from("2.00010"),
1318            Price::from("2.00000"),
1319            Price::from("2.00002"),
1320            Quantity::from(2),
1321            UnixNanos::from(3_000_000_000),
1322            UnixNanos::from(3_000_000_000),
1323        );
1324
1325        builder.set_partial(partial_bar1);
1326        builder.set_partial(partial_bar2);
1327        let bar = builder.build(
1328            UnixNanos::from(4_000_000_000),
1329            UnixNanos::from(4_000_000_000),
1330        );
1331
1332        assert_eq!(bar.open, Price::from("1.00001"));
1333        assert_eq!(bar.high, Price::from("1.00010"));
1334        assert_eq!(bar.low, Price::from("1.00000"));
1335        assert_eq!(bar.close, Price::from("1.00002"));
1336        assert_eq!(bar.volume, Quantity::from(1));
1337        assert_eq!(bar.ts_init, 4_000_000_000);
1338        assert_eq!(builder.ts_last, 1_000_000_000);
1339    }
1340
1341    #[rstest]
1342    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1343        let instrument = InstrumentAny::Equity(equity_aapl);
1344        let bar_type = BarType::new(
1345            instrument.id(),
1346            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1347            AggregationSource::Internal,
1348        );
1349        let mut builder = BarBuilder::new(
1350            bar_type,
1351            instrument.price_precision(),
1352            instrument.size_precision(),
1353        );
1354
1355        builder.update(
1356            Price::from("1.00000"),
1357            Quantity::from(1),
1358            UnixNanos::default(),
1359        );
1360
1361        assert!(builder.initialized);
1362        assert_eq!(builder.ts_last, 0);
1363        assert_eq!(builder.count, 1);
1364    }
1365
1366    #[rstest]
1367    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1368        equity_aapl: Equity,
1369    ) {
1370        let instrument = InstrumentAny::Equity(equity_aapl);
1371        let bar_type = BarType::new(
1372            instrument.id(),
1373            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1374            AggregationSource::Internal,
1375        );
1376        let mut builder = BarBuilder::new(bar_type, 2, 0);
1377
1378        builder.update(
1379            Price::from("1.00000"),
1380            Quantity::from(1),
1381            UnixNanos::from(1_000),
1382        );
1383        builder.update(
1384            Price::from("1.00001"),
1385            Quantity::from(1),
1386            UnixNanos::from(500),
1387        );
1388
1389        assert!(builder.initialized);
1390        assert_eq!(builder.ts_last, 1_000);
1391        assert_eq!(builder.count, 1);
1392    }
1393
1394    #[rstest]
1395    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1396        let instrument = InstrumentAny::Equity(equity_aapl);
1397        let bar_type = BarType::new(
1398            instrument.id(),
1399            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1400            AggregationSource::Internal,
1401        );
1402        let mut builder = BarBuilder::new(
1403            bar_type,
1404            instrument.price_precision(),
1405            instrument.size_precision(),
1406        );
1407
1408        for _ in 0..5 {
1409            builder.update(
1410                Price::from("1.00000"),
1411                Quantity::from(1),
1412                UnixNanos::from(1_000),
1413            );
1414        }
1415
1416        assert_eq!(builder.count, 5);
1417    }
1418
1419    #[rstest]
1420    #[should_panic]
1421    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1422        let instrument = InstrumentAny::Equity(equity_aapl);
1423        let bar_type = BarType::new(
1424            instrument.id(),
1425            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1426            AggregationSource::Internal,
1427        );
1428        let mut builder = BarBuilder::new(
1429            bar_type,
1430            instrument.price_precision(),
1431            instrument.size_precision(),
1432        );
1433        let _ = builder.build_now();
1434    }
1435
1436    #[rstest]
1437    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1438        let instrument = InstrumentAny::Equity(equity_aapl);
1439        let bar_type = BarType::new(
1440            instrument.id(),
1441            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1442            AggregationSource::Internal,
1443        );
1444        let mut builder = BarBuilder::new(
1445            bar_type,
1446            instrument.price_precision(),
1447            instrument.size_precision(),
1448        );
1449
1450        builder.update(
1451            Price::from("1.00001"),
1452            Quantity::from(2),
1453            UnixNanos::default(),
1454        );
1455        builder.update(
1456            Price::from("1.00002"),
1457            Quantity::from(2),
1458            UnixNanos::default(),
1459        );
1460        builder.update(
1461            Price::from("1.00000"),
1462            Quantity::from(1),
1463            UnixNanos::from(1_000_000_000),
1464        );
1465
1466        let bar = builder.build_now();
1467
1468        assert_eq!(bar.open, Price::from("1.00001"));
1469        assert_eq!(bar.high, Price::from("1.00002"));
1470        assert_eq!(bar.low, Price::from("1.00000"));
1471        assert_eq!(bar.close, Price::from("1.00000"));
1472        assert_eq!(bar.volume, Quantity::from(5));
1473        assert_eq!(bar.ts_init, 1_000_000_000);
1474        assert_eq!(builder.ts_last, 1_000_000_000);
1475        assert_eq!(builder.count, 0);
1476    }
1477
1478    #[rstest]
1479    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1480        let instrument = InstrumentAny::Equity(equity_aapl);
1481        let bar_type = BarType::new(
1482            instrument.id(),
1483            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1484            AggregationSource::Internal,
1485        );
1486        let mut builder = BarBuilder::new(bar_type, 2, 0);
1487
1488        builder.update(
1489            Price::from("1.00001"),
1490            Quantity::from(1),
1491            UnixNanos::default(),
1492        );
1493        builder.build_now();
1494
1495        builder.update(
1496            Price::from("1.00000"),
1497            Quantity::from(1),
1498            UnixNanos::default(),
1499        );
1500        builder.update(
1501            Price::from("1.00003"),
1502            Quantity::from(1),
1503            UnixNanos::default(),
1504        );
1505        builder.update(
1506            Price::from("1.00002"),
1507            Quantity::from(1),
1508            UnixNanos::default(),
1509        );
1510
1511        let bar = builder.build_now();
1512
1513        assert_eq!(bar.open, Price::from("1.00000"));
1514        assert_eq!(bar.high, Price::from("1.00003"));
1515        assert_eq!(bar.low, Price::from("1.00000"));
1516        assert_eq!(bar.close, Price::from("1.00002"));
1517        assert_eq!(bar.volume, Quantity::from(3));
1518    }
1519
1520    #[rstest]
1521    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1522        let instrument = InstrumentAny::Equity(equity_aapl);
1523        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1524        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1525        let handler = Arc::new(Mutex::new(Vec::new()));
1526        let handler_clone = Arc::clone(&handler);
1527
1528        let mut aggregator = TickBarAggregator::new(
1529            bar_type,
1530            instrument.price_precision(),
1531            instrument.size_precision(),
1532            move |bar: Bar| {
1533                let mut handler_guard = handler_clone.lock().unwrap();
1534                handler_guard.push(bar);
1535            },
1536            false,
1537        );
1538
1539        let trade = TradeTick::default();
1540        aggregator.handle_trade(trade);
1541
1542        let handler_guard = handler.lock().unwrap();
1543        assert_eq!(handler_guard.len(), 0);
1544    }
1545
1546    #[rstest]
1547    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1548        let instrument = InstrumentAny::Equity(equity_aapl);
1549        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1550        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1551        let handler = Arc::new(Mutex::new(Vec::new()));
1552        let handler_clone = Arc::clone(&handler);
1553
1554        let mut aggregator = TickBarAggregator::new(
1555            bar_type,
1556            instrument.price_precision(),
1557            instrument.size_precision(),
1558            move |bar: Bar| {
1559                let mut handler_guard = handler_clone.lock().unwrap();
1560                handler_guard.push(bar);
1561            },
1562            false,
1563        );
1564
1565        let trade = TradeTick::default();
1566        aggregator.handle_trade(trade);
1567        aggregator.handle_trade(trade);
1568        aggregator.handle_trade(trade);
1569
1570        let handler_guard = handler.lock().unwrap();
1571        let bar = handler_guard.first().unwrap();
1572        assert_eq!(handler_guard.len(), 1);
1573        assert_eq!(bar.open, trade.price);
1574        assert_eq!(bar.high, trade.price);
1575        assert_eq!(bar.low, trade.price);
1576        assert_eq!(bar.close, trade.price);
1577        assert_eq!(bar.volume, Quantity::from(300000));
1578        assert_eq!(bar.ts_event, trade.ts_event);
1579        assert_eq!(bar.ts_init, trade.ts_init);
1580    }
1581
1582    #[rstest]
1583    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1584        let instrument = InstrumentAny::Equity(equity_aapl);
1585        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1586        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1587        let handler = Arc::new(Mutex::new(Vec::new()));
1588        let handler_clone = Arc::clone(&handler);
1589
1590        let mut aggregator = TickBarAggregator::new(
1591            bar_type,
1592            instrument.price_precision(),
1593            instrument.size_precision(),
1594            move |bar: Bar| {
1595                let mut handler_guard = handler_clone.lock().unwrap();
1596                handler_guard.push(bar);
1597            },
1598            false,
1599        );
1600
1601        aggregator.update(
1602            Price::from("1.00001"),
1603            Quantity::from(1),
1604            UnixNanos::default(),
1605        );
1606        aggregator.update(
1607            Price::from("1.00002"),
1608            Quantity::from(1),
1609            UnixNanos::from(1000),
1610        );
1611        aggregator.update(
1612            Price::from("1.00003"),
1613            Quantity::from(1),
1614            UnixNanos::from(2000),
1615        );
1616
1617        let handler_guard = handler.lock().unwrap();
1618        assert_eq!(handler_guard.len(), 1);
1619
1620        let bar = handler_guard.first().unwrap();
1621        assert_eq!(bar.open, Price::from("1.00001"));
1622        assert_eq!(bar.high, Price::from("1.00003"));
1623        assert_eq!(bar.low, Price::from("1.00001"));
1624        assert_eq!(bar.close, Price::from("1.00003"));
1625        assert_eq!(bar.volume, Quantity::from(3));
1626    }
1627
1628    #[rstest]
1629    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1630        let instrument = InstrumentAny::Equity(equity_aapl);
1631        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1632        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1633        let handler = Arc::new(Mutex::new(Vec::new()));
1634        let handler_clone = Arc::clone(&handler);
1635
1636        let mut aggregator = TickBarAggregator::new(
1637            bar_type,
1638            instrument.price_precision(),
1639            instrument.size_precision(),
1640            move |bar: Bar| {
1641                let mut handler_guard = handler_clone.lock().unwrap();
1642                handler_guard.push(bar);
1643            },
1644            false,
1645        );
1646
1647        aggregator.update(
1648            Price::from("1.00001"),
1649            Quantity::from(1),
1650            UnixNanos::default(),
1651        );
1652        aggregator.update(
1653            Price::from("1.00002"),
1654            Quantity::from(1),
1655            UnixNanos::from(1000),
1656        );
1657        aggregator.update(
1658            Price::from("1.00003"),
1659            Quantity::from(1),
1660            UnixNanos::from(2000),
1661        );
1662        aggregator.update(
1663            Price::from("1.00004"),
1664            Quantity::from(1),
1665            UnixNanos::from(3000),
1666        );
1667
1668        let handler_guard = handler.lock().unwrap();
1669        assert_eq!(handler_guard.len(), 2);
1670
1671        let bar1 = &handler_guard[0];
1672        assert_eq!(bar1.open, Price::from("1.00001"));
1673        assert_eq!(bar1.close, Price::from("1.00002"));
1674        assert_eq!(bar1.volume, Quantity::from(2));
1675
1676        let bar2 = &handler_guard[1];
1677        assert_eq!(bar2.open, Price::from("1.00003"));
1678        assert_eq!(bar2.close, Price::from("1.00004"));
1679        assert_eq!(bar2.volume, Quantity::from(2));
1680    }
1681
1682    #[rstest]
1683    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1684        let instrument = InstrumentAny::Equity(equity_aapl);
1685        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1686        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1687        let handler = Arc::new(Mutex::new(Vec::new()));
1688        let handler_clone = Arc::clone(&handler);
1689
1690        let mut aggregator = VolumeBarAggregator::new(
1691            bar_type,
1692            instrument.price_precision(),
1693            instrument.size_precision(),
1694            move |bar: Bar| {
1695                let mut handler_guard = handler_clone.lock().unwrap();
1696                handler_guard.push(bar);
1697            },
1698            false,
1699        );
1700
1701        aggregator.update(
1702            Price::from("1.00001"),
1703            Quantity::from(25),
1704            UnixNanos::default(),
1705        );
1706
1707        let handler_guard = handler.lock().unwrap();
1708        assert_eq!(handler_guard.len(), 2);
1709        let bar1 = &handler_guard[0];
1710        assert_eq!(bar1.volume, Quantity::from(10));
1711        let bar2 = &handler_guard[1];
1712        assert_eq!(bar2.volume, Quantity::from(10));
1713    }
1714
1715    #[rstest]
1716    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1717        let instrument = InstrumentAny::Equity(equity_aapl);
1718        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
1719        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1720        let handler = Arc::new(Mutex::new(Vec::new()));
1721        let handler_clone = Arc::clone(&handler);
1722
1723        let mut aggregator = ValueBarAggregator::new(
1724            bar_type,
1725            instrument.price_precision(),
1726            instrument.size_precision(),
1727            move |bar: Bar| {
1728                let mut handler_guard = handler_clone.lock().unwrap();
1729                handler_guard.push(bar);
1730            },
1731            false,
1732        );
1733
1734        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
1735        aggregator.update(
1736            Price::from("100.00"),
1737            Quantity::from(5),
1738            UnixNanos::default(),
1739        );
1740        aggregator.update(
1741            Price::from("100.00"),
1742            Quantity::from(5),
1743            UnixNanos::from(1000),
1744        );
1745
1746        let handler_guard = handler.lock().unwrap();
1747        assert_eq!(handler_guard.len(), 1);
1748        let bar = handler_guard.first().unwrap();
1749        assert_eq!(bar.volume, Quantity::from(10));
1750    }
1751
1752    #[rstest]
1753    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1754        let instrument = InstrumentAny::Equity(equity_aapl);
1755        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1756        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1757        let handler = Arc::new(Mutex::new(Vec::new()));
1758        let handler_clone = Arc::clone(&handler);
1759
1760        let mut aggregator = ValueBarAggregator::new(
1761            bar_type,
1762            instrument.price_precision(),
1763            instrument.size_precision(),
1764            move |bar: Bar| {
1765                let mut handler_guard = handler_clone.lock().unwrap();
1766                handler_guard.push(bar);
1767            },
1768            false,
1769        );
1770
1771        // Single large update: $100 * 25 = $2500 (should create 2 bars)
1772        aggregator.update(
1773            Price::from("100.00"),
1774            Quantity::from(25),
1775            UnixNanos::default(),
1776        );
1777
1778        let handler_guard = handler.lock().unwrap();
1779        assert_eq!(handler_guard.len(), 2);
1780        let remaining_value = aggregator.get_cumulative_value();
1781        assert!(remaining_value < 1000.0); // Should be less than threshold
1782    }
1783
1784    #[rstest]
1785    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1786        let instrument = InstrumentAny::Equity(equity_aapl);
1787        // One second bars
1788        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1789        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1790        let handler = Arc::new(Mutex::new(Vec::new()));
1791        let handler_clone = Arc::clone(&handler);
1792        let clock = Rc::new(RefCell::new(TestClock::new()));
1793
1794        let mut aggregator = TimeBarAggregator::new(
1795            bar_type,
1796            instrument.price_precision(),
1797            instrument.size_precision(),
1798            clock.clone(),
1799            move |bar: Bar| {
1800                let mut handler_guard = handler_clone.lock().unwrap();
1801                handler_guard.push(bar);
1802            },
1803            false, // await_partial
1804            true,  // build_with_no_updates
1805            false, // timestamp_on_close
1806            BarIntervalType::LeftOpen,
1807            None,  // time_bars_origin
1808            15,    // composite_bar_build_delay
1809            false, // skip_first_non_full_bar
1810        );
1811
1812        aggregator.update(
1813            Price::from("100.00"),
1814            Quantity::from(1),
1815            UnixNanos::default(),
1816        );
1817
1818        let next_sec = UnixNanos::from(1_000_000_000);
1819        clock.borrow_mut().set_time(next_sec);
1820
1821        let event = TimeEvent::new(
1822            Ustr::from("1-SECOND-LAST"),
1823            UUID4::new(),
1824            next_sec,
1825            next_sec,
1826        );
1827        aggregator.build_bar(event);
1828
1829        let handler_guard = handler.lock().unwrap();
1830        assert_eq!(handler_guard.len(), 1);
1831        let bar = handler_guard.first().unwrap();
1832        assert_eq!(bar.ts_event, UnixNanos::default());
1833        assert_eq!(bar.ts_init, next_sec);
1834    }
1835
1836    #[rstest]
1837    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1838        let instrument = InstrumentAny::Equity(equity_aapl);
1839        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1840        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1841        let handler = Arc::new(Mutex::new(Vec::new()));
1842        let handler_clone = Arc::clone(&handler);
1843        let clock = Rc::new(RefCell::new(TestClock::new()));
1844
1845        let mut aggregator = TimeBarAggregator::new(
1846            bar_type,
1847            instrument.price_precision(),
1848            instrument.size_precision(),
1849            clock.clone(),
1850            move |bar: Bar| {
1851                let mut handler_guard = handler_clone.lock().unwrap();
1852                handler_guard.push(bar);
1853            },
1854            false, // await_partial
1855            true,  // build_with_no_updates
1856            true,  // timestamp_on_close - changed to true to verify left-open behavior
1857            BarIntervalType::LeftOpen,
1858            None,
1859            15,
1860            false, // skip_first_non_full_bar
1861        );
1862
1863        // Update in first interval
1864        aggregator.update(
1865            Price::from("100.00"),
1866            Quantity::from(1),
1867            UnixNanos::default(),
1868        );
1869
1870        // First interval close
1871        let ts1 = UnixNanos::from(1_000_000_000);
1872        clock.borrow_mut().set_time(ts1);
1873        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1874        aggregator.build_bar(event);
1875
1876        // Update in second interval
1877        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1878
1879        // Second interval close
1880        let ts2 = UnixNanos::from(2_000_000_000);
1881        clock.borrow_mut().set_time(ts2);
1882        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1883        aggregator.build_bar(event);
1884
1885        let handler_guard = handler.lock().unwrap();
1886        assert_eq!(handler_guard.len(), 2);
1887
1888        let bar1 = &handler_guard[0];
1889        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
1890        assert_eq!(bar1.ts_init, ts1);
1891        assert_eq!(bar1.close, Price::from("100.00"));
1892        let bar2 = &handler_guard[1];
1893        assert_eq!(bar2.ts_event, ts2);
1894        assert_eq!(bar2.ts_init, ts2);
1895        assert_eq!(bar2.close, Price::from("101.00"));
1896    }
1897
1898    #[rstest]
1899    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
1900        let instrument = InstrumentAny::Equity(equity_aapl);
1901        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1902        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1903        let handler = Arc::new(Mutex::new(Vec::new()));
1904        let handler_clone = Arc::clone(&handler);
1905        let clock = Rc::new(RefCell::new(TestClock::new()));
1906        let mut aggregator = TimeBarAggregator::new(
1907            bar_type,
1908            instrument.price_precision(),
1909            instrument.size_precision(),
1910            clock.clone(),
1911            move |bar: Bar| {
1912                let mut handler_guard = handler_clone.lock().unwrap();
1913                handler_guard.push(bar);
1914            },
1915            false, // await_partial
1916            true,  // build_with_no_updates
1917            true,  // timestamp_on_close
1918            BarIntervalType::RightOpen,
1919            None,
1920            15,
1921            false, // skip_first_non_full_bar
1922        );
1923
1924        // Update in first interval
1925        aggregator.update(
1926            Price::from("100.00"),
1927            Quantity::from(1),
1928            UnixNanos::default(),
1929        );
1930
1931        // First interval close
1932        let ts1 = UnixNanos::from(1_000_000_000);
1933        clock.borrow_mut().set_time(ts1);
1934        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1935        aggregator.build_bar(event);
1936
1937        // Update in second interval
1938        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1939
1940        // Second interval close
1941        let ts2 = UnixNanos::from(2_000_000_000);
1942        clock.borrow_mut().set_time(ts2);
1943        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1944        aggregator.build_bar(event);
1945
1946        let handler_guard = handler.lock().unwrap();
1947        assert_eq!(handler_guard.len(), 2);
1948
1949        let bar1 = &handler_guard[0];
1950        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
1951        assert_eq!(bar1.ts_init, ts1);
1952        assert_eq!(bar1.close, Price::from("100.00"));
1953
1954        let bar2 = &handler_guard[1];
1955        assert_eq!(bar2.ts_event, ts1);
1956        assert_eq!(bar2.ts_init, ts2);
1957        assert_eq!(bar2.close, Price::from("101.00"));
1958    }
1959
1960    #[rstest]
1961    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
1962        let instrument = InstrumentAny::Equity(equity_aapl);
1963        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1964        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1965        let handler = Arc::new(Mutex::new(Vec::new()));
1966        let handler_clone = Arc::clone(&handler);
1967        let clock = Rc::new(RefCell::new(TestClock::new()));
1968
1969        // First test with build_with_no_updates = false
1970        let mut aggregator = TimeBarAggregator::new(
1971            bar_type,
1972            instrument.price_precision(),
1973            instrument.size_precision(),
1974            clock.clone(),
1975            move |bar: Bar| {
1976                let mut handler_guard = handler_clone.lock().unwrap();
1977                handler_guard.push(bar);
1978            },
1979            false, // await_partial
1980            false, // build_with_no_updates disabled
1981            true,  // timestamp_on_close
1982            BarIntervalType::LeftOpen,
1983            None,
1984            15,
1985            false, // skip_first_non_full_bar
1986        );
1987
1988        // No updates, just interval close
1989        let ts1 = UnixNanos::from(1_000_000_000);
1990        clock.borrow_mut().set_time(ts1);
1991        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1992        aggregator.build_bar(event);
1993
1994        let handler_guard = handler.lock().unwrap();
1995        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
1996        drop(handler_guard);
1997
1998        // Now test with build_with_no_updates = true
1999        let handler = Arc::new(Mutex::new(Vec::new()));
2000        let handler_clone = Arc::clone(&handler);
2001        let mut aggregator = TimeBarAggregator::new(
2002            bar_type,
2003            instrument.price_precision(),
2004            instrument.size_precision(),
2005            clock.clone(),
2006            move |bar: Bar| {
2007                let mut handler_guard = handler_clone.lock().unwrap();
2008                handler_guard.push(bar);
2009            },
2010            false,
2011            true, // build_with_no_updates enabled
2012            true, // timestamp_on_close
2013            BarIntervalType::LeftOpen,
2014            None,
2015            15,
2016            false, // skip_first_non_full_bar
2017        );
2018
2019        aggregator.update(
2020            Price::from("100.00"),
2021            Quantity::from(1),
2022            UnixNanos::default(),
2023        );
2024
2025        // First interval with update
2026        let ts1 = UnixNanos::from(1_000_000_000);
2027        clock.borrow_mut().set_time(ts1);
2028        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2029        aggregator.build_bar(event);
2030
2031        // Second interval without updates
2032        let ts2 = UnixNanos::from(2_000_000_000);
2033        clock.borrow_mut().set_time(ts2);
2034        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2035        aggregator.build_bar(event);
2036
2037        let handler_guard = handler.lock().unwrap();
2038        assert_eq!(handler_guard.len(), 2); // Both bars should be built
2039        let bar1 = &handler_guard[0];
2040        assert_eq!(bar1.close, Price::from("100.00"));
2041        let bar2 = &handler_guard[1];
2042        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
2043    }
2044
2045    #[rstest]
2046    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2047        let instrument = InstrumentAny::Equity(equity_aapl);
2048        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2049        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2050        let clock = Rc::new(RefCell::new(TestClock::new()));
2051        let handler = Arc::new(Mutex::new(Vec::new()));
2052        let handler_clone = Arc::clone(&handler);
2053
2054        let mut aggregator = TimeBarAggregator::new(
2055            bar_type,
2056            instrument.price_precision(),
2057            instrument.size_precision(),
2058            clock.clone(),
2059            move |bar: Bar| {
2060                let mut handler_guard = handler_clone.lock().unwrap();
2061                handler_guard.push(bar);
2062            },
2063            false, // await_partial
2064            true,  // build_with_no_updates
2065            true,  // timestamp_on_close
2066            BarIntervalType::RightOpen,
2067            None,
2068            15,
2069            false, // skip_first_non_full_bar
2070        );
2071
2072        let ts1 = UnixNanos::from(1_000_000_000);
2073        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2074
2075        let ts2 = UnixNanos::from(2_000_000_000);
2076        clock.borrow_mut().set_time(ts2);
2077
2078        // Simulate timestamp on close
2079        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2080        aggregator.build_bar(event);
2081
2082        let handler_guard = handler.lock().unwrap();
2083        let bar = handler_guard.first().unwrap();
2084        assert_eq!(bar.ts_event, UnixNanos::default());
2085        assert_eq!(bar.ts_init, ts2);
2086    }
2087
2088    #[rstest]
2089    fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2090        let instrument = InstrumentAny::Equity(equity_aapl);
2091        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2092        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2093        let clock = Rc::new(RefCell::new(TestClock::new()));
2094        let handler = Arc::new(Mutex::new(Vec::new()));
2095        let handler_clone = Arc::clone(&handler);
2096
2097        let mut aggregator = TimeBarAggregator::new(
2098            bar_type,
2099            instrument.price_precision(),
2100            instrument.size_precision(),
2101            clock.clone(),
2102            move |bar: Bar| {
2103                let mut handler_guard = handler_clone.lock().unwrap();
2104                handler_guard.push(bar);
2105            },
2106            false, // await_partial
2107            true,  // build_with_no_updates
2108            true,  // timestamp_on_close
2109            BarIntervalType::LeftOpen,
2110            None,
2111            15,
2112            false, // skip_first_non_full_bar
2113        );
2114
2115        let ts1 = UnixNanos::from(1_000_000_000);
2116        clock.borrow_mut().set_time(ts1);
2117
2118        let initial_time = clock.borrow().utc_now();
2119        aggregator.start_batch_time(UnixNanos::from(
2120            initial_time.timestamp_nanos_opt().unwrap() as u64
2121        ));
2122
2123        let handler_guard = handler.lock().unwrap();
2124        assert_eq!(handler_guard.len(), 0);
2125    }
2126}