nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25    clock::Clock,
26    timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29    SharedCell, UnixNanos, WeakCell,
30    correctness::{self, FAILED},
31    datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34    data::{
35        QuoteTick, TradeTick,
36        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37    },
38    enums::{AggregationSource, BarAggregation, BarIntervalType},
39    types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
40};
41
42/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
43///
44/// Implementors receive updates and produce completed bars via handlers, with support for partial and batch updates.
45pub trait BarAggregator: Any + Debug {
46    /// The [`BarType`] to be aggregated.
47    fn bar_type(&self) -> BarType;
48    /// If the aggregator is running and will receive data from the message bus.
49    fn is_running(&self) -> bool;
50    fn set_await_partial(&mut self, value: bool);
51    /// Enables or disables awaiting a partial bar before full aggregation.
52    fn set_is_running(&mut self, value: bool);
53    /// Sets the running state of the aggregator (receiving updates when `true`).
54    /// Updates the aggregator  with the given price and size.
55    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
56    /// Updates the aggregator with the given quote.
57    fn handle_quote(&mut self, quote: QuoteTick) {
58        let spec = self.bar_type().spec();
59        if !self.await_partial() {
60            self.update(
61                quote.extract_price(spec.price_type),
62                quote.extract_size(spec.price_type),
63                quote.ts_init,
64            );
65        }
66    }
67    /// Updates the aggregator with the given trade.
68    fn handle_trade(&mut self, trade: TradeTick) {
69        if !self.await_partial() {
70            self.update(trade.price, trade.size, trade.ts_init);
71        }
72    }
73    /// Updates the aggregator with the given bar.
74    fn handle_bar(&mut self, bar: Bar) {
75        if !self.await_partial() {
76            self.update_bar(bar, bar.volume, bar.ts_init);
77        }
78    }
79    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
80    /// Incorporates an existing bar and its volume into aggregation at the given init timestamp.
81    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
82    /// Starts batch mode, sending bars to the supplied handler for the given time context.
83    fn stop_batch_update(&mut self);
84    /// Stops batch mode and restores the standard bar handler.
85    fn await_partial(&self) -> bool;
86    /// Returns `true` if awaiting a partial bar before processing updates.
87    /// Sets the initial values for a partially completed bar.
88    fn set_partial(&mut self, partial_bar: Bar);
89    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
90    fn stop(&mut self) {}
91}
92
93impl dyn BarAggregator {
94    /// Returns a reference to this aggregator as `Any` for downcasting.
95    pub fn as_any(&self) -> &dyn Any {
96        self
97    }
98    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
99    pub fn as_any_mut(&mut self) -> &mut dyn Any {
100        self
101    }
102}
103
104/// Provides a generic bar builder for aggregation.
105#[derive(Debug)]
106pub struct BarBuilder {
107    bar_type: BarType,
108    price_precision: u8,
109    size_precision: u8,
110    initialized: bool,
111    ts_last: UnixNanos,
112    count: usize,
113    partial_set: bool,
114    last_close: Option<Price>,
115    open: Option<Price>,
116    high: Option<Price>,
117    low: Option<Price>,
118    close: Option<Price>,
119    volume: Quantity,
120}
121
122impl BarBuilder {
123    /// Creates a new [`BarBuilder`] instance.
124    ///
125    /// # Panics
126    ///
127    /// This function panics if:
128    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
129    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
130    #[must_use]
131    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
132        correctness::check_equal(
133            &bar_type.aggregation_source(),
134            &AggregationSource::Internal,
135            "bar_type.aggregation_source",
136            "AggregationSource::Internal",
137        )
138        .expect(FAILED);
139
140        Self {
141            bar_type,
142            price_precision,
143            size_precision,
144            initialized: false,
145            ts_last: UnixNanos::default(),
146            count: 0,
147            partial_set: false,
148            last_close: None,
149            open: None,
150            high: None,
151            low: None,
152            close: None,
153            volume: Quantity::zero(size_precision),
154        }
155    }
156
157    /// Set the initial values for a partially completed bar.
158    ///
159    /// # Panics
160    ///
161    /// Panics if internal values for `high` or `low` are unexpectedly missing.
162    pub fn set_partial(&mut self, partial_bar: Bar) {
163        if self.partial_set {
164            return; // Already updated
165        }
166
167        self.open = Some(partial_bar.open);
168
169        if self.high.is_none() || partial_bar.high > self.high.unwrap() {
170            self.high = Some(partial_bar.high);
171        }
172
173        if self.low.is_none() || partial_bar.low < self.low.unwrap() {
174            self.low = Some(partial_bar.low);
175        }
176
177        if self.close.is_none() {
178            self.close = Some(partial_bar.close);
179        }
180
181        self.volume = partial_bar.volume;
182
183        if self.ts_last == 0 {
184            self.ts_last = partial_bar.ts_init;
185        }
186
187        self.partial_set = true;
188        self.initialized = true;
189    }
190
191    /// Updates the builder state with the given price, size, and init timestamp.
192    ///
193    /// # Panics
194    ///
195    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
196    pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
197        if ts_init < self.ts_last {
198            return; // Not applicable
199        }
200
201        if self.open.is_none() {
202            self.open = Some(price);
203            self.high = Some(price);
204            self.low = Some(price);
205            self.initialized = true;
206        } else {
207            if price > self.high.unwrap() {
208                self.high = Some(price);
209            }
210            if price < self.low.unwrap() {
211                self.low = Some(price);
212            }
213        }
214
215        self.close = Some(price);
216        self.volume = self.volume.add(size);
217        self.count += 1;
218        self.ts_last = ts_init;
219    }
220
221    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
222    ///
223    /// # Panics
224    ///
225    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
226    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
227        if ts_init < self.ts_last {
228            return; // Not applicable
229        }
230
231        if self.open.is_none() {
232            self.open = Some(bar.open);
233            self.high = Some(bar.high);
234            self.low = Some(bar.low);
235            self.initialized = true;
236        } else {
237            if bar.high > self.high.unwrap() {
238                self.high = Some(bar.high);
239            }
240            if bar.low < self.low.unwrap() {
241                self.low = Some(bar.low);
242            }
243        }
244
245        self.close = Some(bar.close);
246        self.volume = self.volume.add(volume);
247        self.count += 1;
248        self.ts_last = ts_init;
249    }
250
251    /// Reset the bar builder.
252    ///
253    /// All stateful fields are reset to their initial value.
254    pub fn reset(&mut self) {
255        self.open = None;
256        self.high = None;
257        self.low = None;
258        self.volume = Quantity::zero(self.size_precision);
259        self.count = 0;
260    }
261
262    /// Return the aggregated bar and reset.
263    pub fn build_now(&mut self) -> Bar {
264        self.build(self.ts_last, self.ts_last)
265    }
266
267    /// Returns the aggregated bar for the given timestamps, then resets the builder.
268    ///
269    /// # Panics
270    ///
271    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
272    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
273        if self.open.is_none() {
274            self.open = self.last_close;
275            self.high = self.last_close;
276            self.low = self.last_close;
277            self.close = self.last_close;
278        }
279
280        if let (Some(close), Some(low)) = (self.close, self.low)
281            && close < low
282        {
283            self.low = Some(close);
284        }
285
286        if let (Some(close), Some(high)) = (self.close, self.high)
287            && close > high
288        {
289            self.high = Some(close);
290        }
291
292        // SAFETY: The open was checked, so we can assume all prices are Some
293        let bar = Bar::new(
294            self.bar_type,
295            self.open.unwrap(),
296            self.high.unwrap(),
297            self.low.unwrap(),
298            self.close.unwrap(),
299            self.volume,
300            ts_event,
301            ts_init,
302        );
303
304        self.last_close = self.close;
305        self.reset();
306        bar
307    }
308}
309
310/// Provides a means of aggregating specified bar types and sending to a registered handler.
311pub struct BarAggregatorCore<H>
312where
313    H: FnMut(Bar),
314{
315    bar_type: BarType,
316    builder: BarBuilder,
317    handler: H,
318    handler_backup: Option<H>,
319    batch_handler: Option<Box<dyn FnMut(Bar)>>,
320    await_partial: bool,
321    is_running: bool,
322    batch_mode: bool,
323}
324
325impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        f.debug_struct(stringify!(BarAggregatorCore))
328            .field("bar_type", &self.bar_type)
329            .field("builder", &self.builder)
330            .field("await_partial", &self.await_partial)
331            .field("is_running", &self.is_running)
332            .field("batch_mode", &self.batch_mode)
333            .finish()
334    }
335}
336
337impl<H> BarAggregatorCore<H>
338where
339    H: FnMut(Bar),
340{
341    /// Creates a new [`BarAggregatorCore`] instance.
342    ///
343    /// # Panics
344    ///
345    /// This function panics if:
346    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
347    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
348    pub fn new(
349        bar_type: BarType,
350        price_precision: u8,
351        size_precision: u8,
352        handler: H,
353        await_partial: bool,
354    ) -> Self {
355        Self {
356            bar_type,
357            builder: BarBuilder::new(bar_type, price_precision, size_precision),
358            handler,
359            handler_backup: None,
360            batch_handler: None,
361            await_partial,
362            is_running: false,
363            batch_mode: false,
364        }
365    }
366
367    /// Sets whether to await a partial bar before processing new updates.
368    pub const fn set_await_partial(&mut self, value: bool) {
369        self.await_partial = value;
370    }
371
372    /// Sets the running state of the aggregator (receives updates when `true`).
373    pub const fn set_is_running(&mut self, value: bool) {
374        self.is_running = value;
375    }
376
377    /// Returns `true` if the aggregator is awaiting a partial bar to complete before aggregation.
378    pub const fn await_partial(&self) -> bool {
379        self.await_partial
380    }
381
382    /// Initializes builder state with a partially completed bar.
383    pub fn set_partial(&mut self, partial_bar: Bar) {
384        self.builder.set_partial(partial_bar);
385    }
386
387    fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
388        self.builder.update(price, size, ts_init);
389    }
390
391    fn build_now_and_send(&mut self) {
392        let bar = self.builder.build_now();
393        (self.handler)(bar);
394    }
395
396    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
397        let bar = self.builder.build(ts_event, ts_init);
398
399        if self.batch_mode {
400            if let Some(handler) = &mut self.batch_handler {
401                handler(bar);
402            }
403        } else {
404            (self.handler)(bar);
405        }
406    }
407
408    /// Enables batch update mode, sending bars to the provided handler instead of immediate dispatch.
409    pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
410        self.batch_mode = true;
411        self.batch_handler = Some(handler);
412    }
413
414    /// Disables batch update mode and restores the original bar handler.
415    pub fn stop_batch_update(&mut self) {
416        self.batch_mode = false;
417
418        if let Some(handler) = self.handler_backup.take() {
419            self.handler = handler;
420        }
421    }
422}
423
424/// Provides a means of building tick bars aggregated from quote and trades.
425///
426/// When received tick count reaches the step threshold of the bar
427/// specification, then a bar is created and sent to the handler.
428pub struct TickBarAggregator<H>
429where
430    H: FnMut(Bar),
431{
432    core: BarAggregatorCore<H>,
433    cum_value: f64,
434}
435
436impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        f.debug_struct(stringify!(TickBarAggregator))
439            .field("core", &self.core)
440            .field("cum_value", &self.cum_value)
441            .finish()
442    }
443}
444
445impl<H> TickBarAggregator<H>
446where
447    H: FnMut(Bar),
448{
449    /// Creates a new [`TickBarAggregator`] instance.
450    ///
451    /// # Panics
452    ///
453    /// This function panics if:
454    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
455    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
456    pub fn new(
457        bar_type: BarType,
458        price_precision: u8,
459        size_precision: u8,
460        handler: H,
461        await_partial: bool,
462    ) -> Self {
463        Self {
464            core: BarAggregatorCore::new(
465                bar_type,
466                price_precision,
467                size_precision,
468                handler,
469                await_partial,
470            ),
471            cum_value: 0.0,
472        }
473    }
474}
475
476impl<H> BarAggregator for TickBarAggregator<H>
477where
478    H: FnMut(Bar) + 'static,
479{
480    fn bar_type(&self) -> BarType {
481        self.core.bar_type
482    }
483
484    fn is_running(&self) -> bool {
485        self.core.is_running
486    }
487
488    fn set_await_partial(&mut self, value: bool) {
489        self.core.set_await_partial(value);
490    }
491
492    fn set_is_running(&mut self, value: bool) {
493        self.core.set_is_running(value);
494    }
495
496    fn await_partial(&self) -> bool {
497        self.core.await_partial()
498    }
499
500    /// Apply the given update to the aggregator.
501    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
502        self.core.apply_update(price, size, ts_init);
503        let spec = self.core.bar_type.spec();
504
505        if self.core.builder.count >= spec.step.get() {
506            self.core.build_now_and_send();
507        }
508    }
509
510    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
511        let mut volume_update = volume;
512        let average_price = Price::new(
513            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
514            self.core.builder.price_precision,
515        );
516
517        while volume_update.as_f64() > 0.0 {
518            let value_update = average_price.as_f64() * volume_update.as_f64();
519            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
520                self.cum_value += value_update;
521                self.core.builder.update_bar(bar, volume_update, ts_init);
522                break;
523            }
524
525            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
526            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
527            self.core.builder.update_bar(
528                bar,
529                Quantity::new(volume_diff, volume_update.precision),
530                ts_init,
531            );
532
533            self.core.build_now_and_send();
534            self.cum_value = 0.0;
535            volume_update = Quantity::new(
536                volume_update.as_f64() - volume_diff,
537                volume_update.precision,
538            );
539        }
540    }
541
542    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
543        self.core.start_batch_update(handler);
544    }
545
546    fn stop_batch_update(&mut self) {
547        self.core.stop_batch_update();
548    }
549
550    fn set_partial(&mut self, partial_bar: Bar) {
551        self.core.set_partial(partial_bar);
552    }
553}
554
555/// Provides a means of building volume bars aggregated from quote and trades.
556pub struct VolumeBarAggregator<H>
557where
558    H: FnMut(Bar),
559{
560    core: BarAggregatorCore<H>,
561}
562
563impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
564    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
565        f.debug_struct(stringify!(VolumeBarAggregator))
566            .field("core", &self.core)
567            .finish()
568    }
569}
570
571impl<H> VolumeBarAggregator<H>
572where
573    H: FnMut(Bar),
574{
575    /// Creates a new [`VolumeBarAggregator`] instance.
576    ///
577    /// # Panics
578    ///
579    /// This function panics if:
580    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
581    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
582    pub fn new(
583        bar_type: BarType,
584        price_precision: u8,
585        size_precision: u8,
586        handler: H,
587        await_partial: bool,
588    ) -> Self {
589        Self {
590            core: BarAggregatorCore::new(
591                bar_type.standard(),
592                price_precision,
593                size_precision,
594                handler,
595                await_partial,
596            ),
597        }
598    }
599}
600
601impl<H> BarAggregator for VolumeBarAggregator<H>
602where
603    H: FnMut(Bar) + 'static,
604{
605    fn bar_type(&self) -> BarType {
606        self.core.bar_type
607    }
608
609    fn is_running(&self) -> bool {
610        self.core.is_running
611    }
612
613    fn set_await_partial(&mut self, value: bool) {
614        self.core.set_await_partial(value);
615    }
616
617    fn set_is_running(&mut self, value: bool) {
618        self.core.set_is_running(value);
619    }
620
621    fn await_partial(&self) -> bool {
622        self.core.await_partial()
623    }
624
625    /// Apply the given update to the aggregator.
626    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
627        let mut raw_size_update = size.raw;
628        let spec = self.core.bar_type.spec();
629        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
630
631        while raw_size_update > 0 {
632            if self.core.builder.volume.raw + raw_size_update < raw_step {
633                self.core.apply_update(
634                    price,
635                    Quantity::from_raw(raw_size_update, size.precision),
636                    ts_init,
637                );
638                break;
639            }
640
641            let raw_size_diff = raw_step - self.core.builder.volume.raw;
642            self.core.apply_update(
643                price,
644                Quantity::from_raw(raw_size_diff, size.precision),
645                ts_init,
646            );
647
648            self.core.build_now_and_send();
649            raw_size_update -= raw_size_diff;
650        }
651    }
652
653    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
654        let mut raw_volume_update = volume.raw;
655        let spec = self.core.bar_type.spec();
656        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
657
658        while raw_volume_update > 0 {
659            if self.core.builder.volume.raw + raw_volume_update < raw_step {
660                self.core.builder.update_bar(
661                    bar,
662                    Quantity::from_raw(raw_volume_update, volume.precision),
663                    ts_init,
664                );
665                break;
666            }
667
668            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
669            self.core.builder.update_bar(
670                bar,
671                Quantity::from_raw(raw_volume_diff, volume.precision),
672                ts_init,
673            );
674
675            self.core.build_now_and_send();
676            raw_volume_update -= raw_volume_diff;
677        }
678    }
679
680    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
681        self.core.start_batch_update(handler);
682    }
683
684    fn stop_batch_update(&mut self) {
685        self.core.stop_batch_update();
686    }
687
688    fn set_partial(&mut self, partial_bar: Bar) {
689        self.core.set_partial(partial_bar);
690    }
691}
692
693/// Provides a means of building value bars aggregated from quote and trades.
694///
695/// When received value reaches the step threshold of the bar
696/// specification, then a bar is created and sent to the handler.
697pub struct ValueBarAggregator<H>
698where
699    H: FnMut(Bar),
700{
701    core: BarAggregatorCore<H>,
702    cum_value: f64,
703}
704
705impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
706    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707        f.debug_struct(stringify!(ValueBarAggregator))
708            .field("core", &self.core)
709            .field("cum_value", &self.cum_value)
710            .finish()
711    }
712}
713
714impl<H> ValueBarAggregator<H>
715where
716    H: FnMut(Bar),
717{
718    /// Creates a new [`ValueBarAggregator`] instance.
719    ///
720    /// # Panics
721    ///
722    /// This function panics if:
723    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
724    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
725    pub fn new(
726        bar_type: BarType,
727        price_precision: u8,
728        size_precision: u8,
729        handler: H,
730        await_partial: bool,
731    ) -> Self {
732        Self {
733            core: BarAggregatorCore::new(
734                bar_type.standard(),
735                price_precision,
736                size_precision,
737                handler,
738                await_partial,
739            ),
740            cum_value: 0.0,
741        }
742    }
743
744    #[must_use]
745    /// Returns the cumulative value for the aggregator.
746    pub const fn get_cumulative_value(&self) -> f64 {
747        self.cum_value
748    }
749}
750
751impl<H> BarAggregator for ValueBarAggregator<H>
752where
753    H: FnMut(Bar) + 'static,
754{
755    fn bar_type(&self) -> BarType {
756        self.core.bar_type
757    }
758
759    fn is_running(&self) -> bool {
760        self.core.is_running
761    }
762
763    fn set_await_partial(&mut self, value: bool) {
764        self.core.set_await_partial(value);
765    }
766
767    fn set_is_running(&mut self, value: bool) {
768        self.core.set_is_running(value);
769    }
770
771    fn await_partial(&self) -> bool {
772        self.core.await_partial()
773    }
774
775    /// Apply the given update to the aggregator.
776    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
777        let mut size_update = size.as_f64();
778        let spec = self.core.bar_type.spec();
779
780        while size_update > 0.0 {
781            let value_update = price.as_f64() * size_update;
782            if self.cum_value + value_update < spec.step.get() as f64 {
783                self.cum_value += value_update;
784                self.core
785                    .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
786                break;
787            }
788
789            let value_diff = spec.step.get() as f64 - self.cum_value;
790            let size_diff = size_update * (value_diff / value_update);
791            self.core
792                .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
793
794            self.core.build_now_and_send();
795            self.cum_value = 0.0;
796            size_update -= size_diff;
797        }
798    }
799
800    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
801        let mut volume_update = volume;
802        let average_price = Price::new(
803            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
804            self.core.builder.price_precision,
805        );
806
807        while volume_update.as_f64() > 0.0 {
808            let value_update = average_price.as_f64() * volume_update.as_f64();
809            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
810                self.cum_value += value_update;
811                self.core.builder.update_bar(bar, volume_update, ts_init);
812                break;
813            }
814
815            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
816            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
817            self.core.builder.update_bar(
818                bar,
819                Quantity::new(volume_diff, volume_update.precision),
820                ts_init,
821            );
822
823            self.core.build_now_and_send();
824            self.cum_value = 0.0;
825            volume_update = Quantity::new(
826                volume_update.as_f64() - volume_diff,
827                volume_update.precision,
828            );
829        }
830    }
831
832    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
833        self.core.start_batch_update(handler);
834    }
835
836    fn stop_batch_update(&mut self) {
837        self.core.stop_batch_update();
838    }
839
840    fn set_partial(&mut self, partial_bar: Bar) {
841        self.core.set_partial(partial_bar);
842    }
843}
844
845/// Provides a means of building time bars aggregated from quote and trades.
846///
847/// At each aggregation time interval, a bar is created and sent to the handler.
848pub struct TimeBarAggregator<H>
849where
850    H: FnMut(Bar),
851{
852    core: BarAggregatorCore<H>,
853    clock: Rc<RefCell<dyn Clock>>,
854    build_with_no_updates: bool,
855    timestamp_on_close: bool,
856    is_left_open: bool,
857    build_on_next_tick: bool,
858    stored_open_ns: UnixNanos,
859    stored_close_ns: UnixNanos,
860    timer_name: String,
861    interval_ns: UnixNanos,
862    next_close_ns: UnixNanos,
863    bar_build_delay: u64,
864    batch_open_ns: UnixNanos,
865    batch_next_close_ns: UnixNanos,
866    time_bars_origin_offset: Option<TimeDelta>,
867    skip_first_non_full_bar: bool,
868}
869
870impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
871    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872        f.debug_struct(stringify!(TimeBarAggregator))
873            .field("core", &self.core)
874            .field("build_with_no_updates", &self.build_with_no_updates)
875            .field("timestamp_on_close", &self.timestamp_on_close)
876            .field("is_left_open", &self.is_left_open)
877            .field("timer_name", &self.timer_name)
878            .field("interval_ns", &self.interval_ns)
879            .field("bar_build_delay", &self.bar_build_delay)
880            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
881            .finish()
882    }
883}
884
885#[derive(Clone, Debug)]
886/// Callback wrapper for time-based bar aggregation events.
887///
888/// This struct provides a bridge between timer events and time bar aggregation,
889/// allowing bars to be automatically built and emitted at regular time intervals.
890/// It holds a reference to a `TimeBarAggregator` and triggers bar creation when
891/// timer events occur.
892pub struct NewBarCallback<H: FnMut(Bar)> {
893    aggregator: WeakCell<TimeBarAggregator<H>>,
894}
895
896impl<H: FnMut(Bar)> NewBarCallback<H> {
897    /// Creates a new callback that invokes the time bar aggregator on timer events.
898    #[must_use]
899    pub fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
900        let shared: SharedCell<TimeBarAggregator<H>> = SharedCell::from(aggregator);
901        Self {
902            aggregator: shared.downgrade(),
903        }
904    }
905}
906
907impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
908    fn from(value: NewBarCallback<H>) -> Self {
909        Self::Rust(Rc::new(move |event: TimeEvent| {
910            if let Some(agg) = value.aggregator.upgrade() {
911                agg.borrow_mut().build_bar(event);
912            }
913        }))
914    }
915}
916
917impl<H> TimeBarAggregator<H>
918where
919    H: FnMut(Bar) + 'static,
920{
921    /// Creates a new [`TimeBarAggregator`] instance.
922    ///
923    /// # Panics
924    ///
925    /// This function panics if:
926    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
927    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
928    #[allow(clippy::too_many_arguments)]
929    pub fn new(
930        bar_type: BarType,
931        price_precision: u8,
932        size_precision: u8,
933        clock: Rc<RefCell<dyn Clock>>,
934        handler: H,
935        await_partial: bool,
936        build_with_no_updates: bool,
937        timestamp_on_close: bool,
938        interval_type: BarIntervalType,
939        time_bars_origin_offset: Option<TimeDelta>,
940        bar_build_delay: u64,
941        skip_first_non_full_bar: bool,
942    ) -> Self {
943        let is_left_open = match interval_type {
944            BarIntervalType::LeftOpen => true,
945            BarIntervalType::RightOpen => false,
946        };
947
948        let core = BarAggregatorCore::new(
949            bar_type.standard(),
950            price_precision,
951            size_precision,
952            handler,
953            await_partial,
954        );
955
956        Self {
957            core,
958            clock,
959            build_with_no_updates,
960            timestamp_on_close,
961            is_left_open,
962            build_on_next_tick: false,
963            stored_open_ns: UnixNanos::default(),
964            stored_close_ns: UnixNanos::default(),
965            timer_name: bar_type.to_string(),
966            interval_ns: get_bar_interval_ns(&bar_type),
967            next_close_ns: UnixNanos::default(),
968            bar_build_delay,
969            batch_open_ns: UnixNanos::default(),
970            batch_next_close_ns: UnixNanos::default(),
971            time_bars_origin_offset,
972            skip_first_non_full_bar,
973        }
974    }
975
976    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
977    ///
978    /// # Errors
979    ///
980    /// Returns an error if setting up the underlying clock timer fails.
981    ///
982    /// # Panics
983    ///
984    /// Panics if the underlying clock timer registration fails.
985    pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
986        let now = self.clock.borrow().utc_now();
987        let mut start_time =
988            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
989
990        if start_time == now {
991            self.skip_first_non_full_bar = false;
992        }
993
994        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
995
996        let spec = &self.bar_type().spec();
997        let start_time_ns = UnixNanos::from(start_time);
998
999        if spec.aggregation == BarAggregation::Month {
1000            let step = spec.step.get() as u32;
1001            let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
1002
1003            self.clock
1004                .borrow_mut()
1005                .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1006                .expect(FAILED);
1007        } else {
1008            self.clock
1009                .borrow_mut()
1010                .set_timer_ns(
1011                    &self.timer_name,
1012                    self.interval_ns.as_u64(),
1013                    Some(start_time_ns),
1014                    None,
1015                    Some(callback.into()),
1016                    None,
1017                    None,
1018                )
1019                .expect(FAILED);
1020        }
1021
1022        log::debug!("Started timer {}", self.timer_name);
1023        Ok(())
1024    }
1025
1026    /// Stops the time bar aggregator.
1027    pub fn stop(&mut self) {
1028        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1029    }
1030
1031    /// Starts batch time for bar aggregation.
1032    ///
1033    /// # Panics
1034    ///
1035    /// Panics if month arithmetic operations fail for monthly aggregation intervals.
1036    pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1037        let spec = self.bar_type().spec();
1038        self.core.batch_mode = true;
1039
1040        let time = time_ns.to_datetime_utc();
1041        let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
1042        self.batch_open_ns = UnixNanos::from(start_time);
1043
1044        if spec.aggregation == BarAggregation::Month {
1045            let step = spec.step.get() as u32;
1046
1047            if self.batch_open_ns == time_ns {
1048                self.batch_open_ns =
1049                    subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1050            }
1051
1052            self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1053        } else {
1054            if self.batch_open_ns == time_ns {
1055                self.batch_open_ns -= self.interval_ns;
1056            }
1057
1058            self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1059        }
1060    }
1061
1062    const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1063        if self.is_left_open {
1064            if self.timestamp_on_close {
1065                close_ns
1066            } else {
1067                open_ns
1068            }
1069        } else {
1070            open_ns
1071        }
1072    }
1073
1074    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1075        if self.skip_first_non_full_bar {
1076            self.core.builder.reset();
1077            self.skip_first_non_full_bar = false;
1078        } else {
1079            self.core.build_and_send(ts_event, ts_init);
1080        }
1081    }
1082
1083    fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1084        if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1085            let ts_init = self.batch_next_close_ns;
1086            let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1087            self.build_and_send(ts_event, ts_init);
1088        }
1089    }
1090
1091    fn batch_post_update(&mut self, time_ns: UnixNanos) {
1092        let step = self.bar_type().spec().step.get() as u32;
1093
1094        // If not in batch mode and time matches next close, reset batch close
1095        if !self.core.batch_mode
1096            && time_ns == self.batch_next_close_ns
1097            && time_ns > self.stored_open_ns
1098        {
1099            self.batch_next_close_ns = UnixNanos::default();
1100            return;
1101        }
1102
1103        if time_ns > self.batch_next_close_ns {
1104            // Ensure batch times are coherent with last builder update
1105            if self.bar_type().spec().aggregation == BarAggregation::Month {
1106                while self.batch_next_close_ns < time_ns {
1107                    self.batch_next_close_ns =
1108                        add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1109                }
1110
1111                self.batch_open_ns =
1112                    subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1113            } else {
1114                while self.batch_next_close_ns < time_ns {
1115                    self.batch_next_close_ns += self.interval_ns;
1116                }
1117
1118                self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1119            }
1120        }
1121
1122        if time_ns == self.batch_next_close_ns {
1123            let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1124            self.build_and_send(ts_event, time_ns);
1125            self.batch_open_ns = self.batch_next_close_ns;
1126
1127            if self.bar_type().spec().aggregation == BarAggregation::Month {
1128                self.batch_next_close_ns =
1129                    add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1130            } else {
1131                self.batch_next_close_ns += self.interval_ns;
1132            }
1133        }
1134
1135        // Delay resetting batch_next_close_ns to allow creating a last historical bar when transitioning to regular bars
1136        if !self.core.batch_mode {
1137            self.batch_next_close_ns = UnixNanos::default();
1138        }
1139    }
1140
1141    fn build_bar(&mut self, event: TimeEvent) {
1142        if !self.core.builder.initialized {
1143            self.build_on_next_tick = true;
1144            self.stored_close_ns = self.next_close_ns;
1145            return;
1146        }
1147
1148        if !self.build_with_no_updates && self.core.builder.count == 0 {
1149            return;
1150        }
1151
1152        let ts_init = event.ts_event;
1153        let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1154        self.build_and_send(ts_event, ts_init);
1155
1156        self.stored_open_ns = ts_init;
1157
1158        if self.bar_type().spec().aggregation == BarAggregation::Month {
1159            let step = self.bar_type().spec().step.get() as u32;
1160            let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1161
1162            self.clock
1163                .borrow_mut()
1164                .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1165                .expect(FAILED);
1166
1167            self.next_close_ns = next_alert_ns;
1168        } else {
1169            self.next_close_ns = self
1170                .clock
1171                .borrow()
1172                .next_time_ns(&self.timer_name)
1173                .unwrap_or_default();
1174        }
1175    }
1176}
1177
1178impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1179where
1180    H: FnMut(Bar) + 'static,
1181{
1182    fn bar_type(&self) -> BarType {
1183        self.core.bar_type
1184    }
1185
1186    fn is_running(&self) -> bool {
1187        self.core.is_running
1188    }
1189
1190    fn set_await_partial(&mut self, value: bool) {
1191        self.core.set_await_partial(value);
1192    }
1193
1194    fn set_is_running(&mut self, value: bool) {
1195        self.core.set_is_running(value);
1196    }
1197
1198    fn await_partial(&self) -> bool {
1199        self.core.await_partial()
1200    }
1201    /// Stop time-based aggregator by canceling its timer.
1202    fn stop(&mut self) {
1203        Self::stop(self);
1204    }
1205
1206    fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1207        if self.batch_next_close_ns != UnixNanos::default() {
1208            self.batch_pre_update(ts_init);
1209        }
1210
1211        self.core.apply_update(price, size, ts_init);
1212
1213        if self.build_on_next_tick {
1214            if ts_init <= self.stored_close_ns {
1215                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1216                self.build_and_send(ts_event, ts_init);
1217            }
1218
1219            self.build_on_next_tick = false;
1220            self.stored_close_ns = UnixNanos::default();
1221        }
1222
1223        if self.batch_next_close_ns != UnixNanos::default() {
1224            self.batch_post_update(ts_init);
1225        }
1226    }
1227
1228    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1229        if self.batch_next_close_ns != UnixNanos::default() {
1230            self.batch_pre_update(ts_init);
1231        }
1232
1233        self.core.builder.update_bar(bar, volume, ts_init);
1234
1235        if self.build_on_next_tick {
1236            if ts_init <= self.stored_close_ns {
1237                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1238                self.build_and_send(ts_event, ts_init);
1239            }
1240
1241            // Reset flag and clear stored close
1242            self.build_on_next_tick = false;
1243            self.stored_close_ns = UnixNanos::default();
1244        }
1245
1246        if self.batch_next_close_ns != UnixNanos::default() {
1247            self.batch_post_update(ts_init);
1248        }
1249    }
1250
1251    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1252        self.core.start_batch_update(handler);
1253        self.start_batch_time(time_ns);
1254    }
1255
1256    fn stop_batch_update(&mut self) {
1257        self.core.stop_batch_update();
1258    }
1259
1260    fn set_partial(&mut self, partial_bar: Bar) {
1261        self.core.set_partial(partial_bar);
1262    }
1263}
1264
1265////////////////////////////////////////////////////////////////////////////////
1266// Tests
1267////////////////////////////////////////////////////////////////////////////////
1268#[cfg(test)]
1269mod tests {
1270    use std::sync::{Arc, Mutex};
1271
1272    use nautilus_common::clock::TestClock;
1273    use nautilus_core::UUID4;
1274    use nautilus_model::{
1275        data::{BarSpecification, BarType},
1276        enums::{AggregationSource, BarAggregation, PriceType},
1277        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1278        types::{Price, Quantity},
1279    };
1280    use rstest::rstest;
1281    use ustr::Ustr;
1282
1283    use super::*;
1284
1285    #[rstest]
1286    fn test_bar_builder_initialization(equity_aapl: Equity) {
1287        let instrument = InstrumentAny::Equity(equity_aapl);
1288        let bar_type = BarType::new(
1289            instrument.id(),
1290            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1291            AggregationSource::Internal,
1292        );
1293        let builder = BarBuilder::new(
1294            bar_type,
1295            instrument.price_precision(),
1296            instrument.size_precision(),
1297        );
1298
1299        assert!(!builder.initialized);
1300        assert_eq!(builder.ts_last, 0);
1301        assert_eq!(builder.count, 0);
1302    }
1303
1304    #[rstest]
1305    fn test_set_partial_update(equity_aapl: Equity) {
1306        let instrument = InstrumentAny::Equity(equity_aapl);
1307        let bar_type = BarType::new(
1308            instrument.id(),
1309            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1310            AggregationSource::Internal,
1311        );
1312        let mut builder = BarBuilder::new(
1313            bar_type,
1314            instrument.price_precision(),
1315            instrument.size_precision(),
1316        );
1317
1318        let partial_bar = Bar::new(
1319            bar_type,
1320            Price::from("101.00"),
1321            Price::from("102.00"),
1322            Price::from("100.00"),
1323            Price::from("101.00"),
1324            Quantity::from(100),
1325            UnixNanos::from(1),
1326            UnixNanos::from(2),
1327        );
1328
1329        builder.set_partial(partial_bar);
1330        let bar = builder.build_now();
1331
1332        assert_eq!(bar.open, partial_bar.open);
1333        assert_eq!(bar.high, partial_bar.high);
1334        assert_eq!(bar.low, partial_bar.low);
1335        assert_eq!(bar.close, partial_bar.close);
1336        assert_eq!(bar.volume, partial_bar.volume);
1337        assert_eq!(builder.ts_last, 2);
1338    }
1339
1340    #[rstest]
1341    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1342        let instrument = InstrumentAny::Equity(equity_aapl);
1343        let bar_type = BarType::new(
1344            instrument.id(),
1345            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1346            AggregationSource::Internal,
1347        );
1348        let mut builder = BarBuilder::new(
1349            bar_type,
1350            instrument.price_precision(),
1351            instrument.size_precision(),
1352        );
1353
1354        builder.update(
1355            Price::from("100.00"),
1356            Quantity::from(1),
1357            UnixNanos::from(1000),
1358        );
1359        builder.update(
1360            Price::from("95.00"),
1361            Quantity::from(1),
1362            UnixNanos::from(2000),
1363        );
1364        builder.update(
1365            Price::from("105.00"),
1366            Quantity::from(1),
1367            UnixNanos::from(3000),
1368        );
1369
1370        let bar = builder.build_now();
1371        assert!(bar.high > bar.low);
1372        assert_eq!(bar.open, Price::from("100.00"));
1373        assert_eq!(bar.high, Price::from("105.00"));
1374        assert_eq!(bar.low, Price::from("95.00"));
1375        assert_eq!(bar.close, Price::from("105.00"));
1376    }
1377
1378    #[rstest]
1379    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1380        let instrument = InstrumentAny::Equity(equity_aapl);
1381        let bar_type = BarType::new(
1382            instrument.id(),
1383            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1384            AggregationSource::Internal,
1385        );
1386        let mut builder = BarBuilder::new(
1387            bar_type,
1388            instrument.price_precision(),
1389            instrument.size_precision(),
1390        );
1391
1392        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1393        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1394
1395        assert_eq!(builder.ts_last, 1_000);
1396        assert_eq!(builder.count, 1);
1397    }
1398
1399    #[rstest]
1400    fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1401        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1402        let bar_type = BarType::new(
1403            instrument.id(),
1404            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1405            AggregationSource::Internal,
1406        );
1407        let mut builder = BarBuilder::new(
1408            bar_type,
1409            instrument.price_precision(),
1410            instrument.size_precision(),
1411        );
1412
1413        let partial_bar = Bar::new(
1414            bar_type,
1415            Price::from("1.00001"),
1416            Price::from("1.00010"),
1417            Price::from("1.00000"),
1418            Price::from("1.00002"),
1419            Quantity::from(1),
1420            UnixNanos::from(1_000_000_000),
1421            UnixNanos::from(2_000_000_000),
1422        );
1423
1424        builder.set_partial(partial_bar);
1425        let bar = builder.build_now();
1426
1427        assert_eq!(bar.open, Price::from("1.00001"));
1428        assert_eq!(bar.high, Price::from("1.00010"));
1429        assert_eq!(bar.low, Price::from("1.00000"));
1430        assert_eq!(bar.close, Price::from("1.00002"));
1431        assert_eq!(bar.volume, Quantity::from(1));
1432        assert_eq!(bar.ts_init, 2_000_000_000);
1433        assert_eq!(builder.ts_last, 2_000_000_000);
1434    }
1435
1436    #[rstest]
1437    fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1438        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1439        let bar_type = BarType::new(
1440            instrument.id(),
1441            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1442            AggregationSource::Internal,
1443        );
1444        let mut builder = BarBuilder::new(
1445            bar_type,
1446            instrument.price_precision(),
1447            instrument.size_precision(),
1448        );
1449
1450        let partial_bar1 = Bar::new(
1451            bar_type,
1452            Price::from("1.00001"),
1453            Price::from("1.00010"),
1454            Price::from("1.00000"),
1455            Price::from("1.00002"),
1456            Quantity::from(1),
1457            UnixNanos::from(1_000_000_000),
1458            UnixNanos::from(1_000_000_000),
1459        );
1460
1461        let partial_bar2 = Bar::new(
1462            bar_type,
1463            Price::from("2.00001"),
1464            Price::from("2.00010"),
1465            Price::from("2.00000"),
1466            Price::from("2.00002"),
1467            Quantity::from(2),
1468            UnixNanos::from(3_000_000_000),
1469            UnixNanos::from(3_000_000_000),
1470        );
1471
1472        builder.set_partial(partial_bar1);
1473        builder.set_partial(partial_bar2);
1474        let bar = builder.build(
1475            UnixNanos::from(4_000_000_000),
1476            UnixNanos::from(4_000_000_000),
1477        );
1478
1479        assert_eq!(bar.open, Price::from("1.00001"));
1480        assert_eq!(bar.high, Price::from("1.00010"));
1481        assert_eq!(bar.low, Price::from("1.00000"));
1482        assert_eq!(bar.close, Price::from("1.00002"));
1483        assert_eq!(bar.volume, Quantity::from(1));
1484        assert_eq!(bar.ts_init, 4_000_000_000);
1485        assert_eq!(builder.ts_last, 1_000_000_000);
1486    }
1487
1488    #[rstest]
1489    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1490        let instrument = InstrumentAny::Equity(equity_aapl);
1491        let bar_type = BarType::new(
1492            instrument.id(),
1493            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1494            AggregationSource::Internal,
1495        );
1496        let mut builder = BarBuilder::new(
1497            bar_type,
1498            instrument.price_precision(),
1499            instrument.size_precision(),
1500        );
1501
1502        builder.update(
1503            Price::from("1.00000"),
1504            Quantity::from(1),
1505            UnixNanos::default(),
1506        );
1507
1508        assert!(builder.initialized);
1509        assert_eq!(builder.ts_last, 0);
1510        assert_eq!(builder.count, 1);
1511    }
1512
1513    #[rstest]
1514    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1515        equity_aapl: Equity,
1516    ) {
1517        let instrument = InstrumentAny::Equity(equity_aapl);
1518        let bar_type = BarType::new(
1519            instrument.id(),
1520            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1521            AggregationSource::Internal,
1522        );
1523        let mut builder = BarBuilder::new(bar_type, 2, 0);
1524
1525        builder.update(
1526            Price::from("1.00000"),
1527            Quantity::from(1),
1528            UnixNanos::from(1_000),
1529        );
1530        builder.update(
1531            Price::from("1.00001"),
1532            Quantity::from(1),
1533            UnixNanos::from(500),
1534        );
1535
1536        assert!(builder.initialized);
1537        assert_eq!(builder.ts_last, 1_000);
1538        assert_eq!(builder.count, 1);
1539    }
1540
1541    #[rstest]
1542    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1543        let instrument = InstrumentAny::Equity(equity_aapl);
1544        let bar_type = BarType::new(
1545            instrument.id(),
1546            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1547            AggregationSource::Internal,
1548        );
1549        let mut builder = BarBuilder::new(
1550            bar_type,
1551            instrument.price_precision(),
1552            instrument.size_precision(),
1553        );
1554
1555        for _ in 0..5 {
1556            builder.update(
1557                Price::from("1.00000"),
1558                Quantity::from(1),
1559                UnixNanos::from(1_000),
1560            );
1561        }
1562
1563        assert_eq!(builder.count, 5);
1564    }
1565
1566    #[rstest]
1567    #[should_panic]
1568    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1569        let instrument = InstrumentAny::Equity(equity_aapl);
1570        let bar_type = BarType::new(
1571            instrument.id(),
1572            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1573            AggregationSource::Internal,
1574        );
1575        let mut builder = BarBuilder::new(
1576            bar_type,
1577            instrument.price_precision(),
1578            instrument.size_precision(),
1579        );
1580        let _ = builder.build_now();
1581    }
1582
1583    #[rstest]
1584    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1585        let instrument = InstrumentAny::Equity(equity_aapl);
1586        let bar_type = BarType::new(
1587            instrument.id(),
1588            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1589            AggregationSource::Internal,
1590        );
1591        let mut builder = BarBuilder::new(
1592            bar_type,
1593            instrument.price_precision(),
1594            instrument.size_precision(),
1595        );
1596
1597        builder.update(
1598            Price::from("1.00001"),
1599            Quantity::from(2),
1600            UnixNanos::default(),
1601        );
1602        builder.update(
1603            Price::from("1.00002"),
1604            Quantity::from(2),
1605            UnixNanos::default(),
1606        );
1607        builder.update(
1608            Price::from("1.00000"),
1609            Quantity::from(1),
1610            UnixNanos::from(1_000_000_000),
1611        );
1612
1613        let bar = builder.build_now();
1614
1615        assert_eq!(bar.open, Price::from("1.00001"));
1616        assert_eq!(bar.high, Price::from("1.00002"));
1617        assert_eq!(bar.low, Price::from("1.00000"));
1618        assert_eq!(bar.close, Price::from("1.00000"));
1619        assert_eq!(bar.volume, Quantity::from(5));
1620        assert_eq!(bar.ts_init, 1_000_000_000);
1621        assert_eq!(builder.ts_last, 1_000_000_000);
1622        assert_eq!(builder.count, 0);
1623    }
1624
1625    #[rstest]
1626    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1627        let instrument = InstrumentAny::Equity(equity_aapl);
1628        let bar_type = BarType::new(
1629            instrument.id(),
1630            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1631            AggregationSource::Internal,
1632        );
1633        let mut builder = BarBuilder::new(bar_type, 2, 0);
1634
1635        builder.update(
1636            Price::from("1.00001"),
1637            Quantity::from(1),
1638            UnixNanos::default(),
1639        );
1640        builder.build_now();
1641
1642        builder.update(
1643            Price::from("1.00000"),
1644            Quantity::from(1),
1645            UnixNanos::default(),
1646        );
1647        builder.update(
1648            Price::from("1.00003"),
1649            Quantity::from(1),
1650            UnixNanos::default(),
1651        );
1652        builder.update(
1653            Price::from("1.00002"),
1654            Quantity::from(1),
1655            UnixNanos::default(),
1656        );
1657
1658        let bar = builder.build_now();
1659
1660        assert_eq!(bar.open, Price::from("1.00000"));
1661        assert_eq!(bar.high, Price::from("1.00003"));
1662        assert_eq!(bar.low, Price::from("1.00000"));
1663        assert_eq!(bar.close, Price::from("1.00002"));
1664        assert_eq!(bar.volume, Quantity::from(3));
1665    }
1666
1667    #[rstest]
1668    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1669        let instrument = InstrumentAny::Equity(equity_aapl);
1670        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1671        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1672        let handler = Arc::new(Mutex::new(Vec::new()));
1673        let handler_clone = Arc::clone(&handler);
1674
1675        let mut aggregator = TickBarAggregator::new(
1676            bar_type,
1677            instrument.price_precision(),
1678            instrument.size_precision(),
1679            move |bar: Bar| {
1680                let mut handler_guard = handler_clone.lock().unwrap();
1681                handler_guard.push(bar);
1682            },
1683            false,
1684        );
1685
1686        let trade = TradeTick::default();
1687        aggregator.handle_trade(trade);
1688
1689        let handler_guard = handler.lock().unwrap();
1690        assert_eq!(handler_guard.len(), 0);
1691    }
1692
1693    #[rstest]
1694    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1695        let instrument = InstrumentAny::Equity(equity_aapl);
1696        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1697        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1698        let handler = Arc::new(Mutex::new(Vec::new()));
1699        let handler_clone = Arc::clone(&handler);
1700
1701        let mut aggregator = TickBarAggregator::new(
1702            bar_type,
1703            instrument.price_precision(),
1704            instrument.size_precision(),
1705            move |bar: Bar| {
1706                let mut handler_guard = handler_clone.lock().unwrap();
1707                handler_guard.push(bar);
1708            },
1709            false,
1710        );
1711
1712        let trade = TradeTick::default();
1713        aggregator.handle_trade(trade);
1714        aggregator.handle_trade(trade);
1715        aggregator.handle_trade(trade);
1716
1717        let handler_guard = handler.lock().unwrap();
1718        let bar = handler_guard.first().unwrap();
1719        assert_eq!(handler_guard.len(), 1);
1720        assert_eq!(bar.open, trade.price);
1721        assert_eq!(bar.high, trade.price);
1722        assert_eq!(bar.low, trade.price);
1723        assert_eq!(bar.close, trade.price);
1724        assert_eq!(bar.volume, Quantity::from(300000));
1725        assert_eq!(bar.ts_event, trade.ts_event);
1726        assert_eq!(bar.ts_init, trade.ts_init);
1727    }
1728
1729    #[rstest]
1730    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1731        let instrument = InstrumentAny::Equity(equity_aapl);
1732        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1733        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1734        let handler = Arc::new(Mutex::new(Vec::new()));
1735        let handler_clone = Arc::clone(&handler);
1736
1737        let mut aggregator = TickBarAggregator::new(
1738            bar_type,
1739            instrument.price_precision(),
1740            instrument.size_precision(),
1741            move |bar: Bar| {
1742                let mut handler_guard = handler_clone.lock().unwrap();
1743                handler_guard.push(bar);
1744            },
1745            false,
1746        );
1747
1748        aggregator.update(
1749            Price::from("1.00001"),
1750            Quantity::from(1),
1751            UnixNanos::default(),
1752        );
1753        aggregator.update(
1754            Price::from("1.00002"),
1755            Quantity::from(1),
1756            UnixNanos::from(1000),
1757        );
1758        aggregator.update(
1759            Price::from("1.00003"),
1760            Quantity::from(1),
1761            UnixNanos::from(2000),
1762        );
1763
1764        let handler_guard = handler.lock().unwrap();
1765        assert_eq!(handler_guard.len(), 1);
1766
1767        let bar = handler_guard.first().unwrap();
1768        assert_eq!(bar.open, Price::from("1.00001"));
1769        assert_eq!(bar.high, Price::from("1.00003"));
1770        assert_eq!(bar.low, Price::from("1.00001"));
1771        assert_eq!(bar.close, Price::from("1.00003"));
1772        assert_eq!(bar.volume, Quantity::from(3));
1773    }
1774
1775    #[rstest]
1776    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1777        let instrument = InstrumentAny::Equity(equity_aapl);
1778        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1779        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1780        let handler = Arc::new(Mutex::new(Vec::new()));
1781        let handler_clone = Arc::clone(&handler);
1782
1783        let mut aggregator = TickBarAggregator::new(
1784            bar_type,
1785            instrument.price_precision(),
1786            instrument.size_precision(),
1787            move |bar: Bar| {
1788                let mut handler_guard = handler_clone.lock().unwrap();
1789                handler_guard.push(bar);
1790            },
1791            false,
1792        );
1793
1794        aggregator.update(
1795            Price::from("1.00001"),
1796            Quantity::from(1),
1797            UnixNanos::default(),
1798        );
1799        aggregator.update(
1800            Price::from("1.00002"),
1801            Quantity::from(1),
1802            UnixNanos::from(1000),
1803        );
1804        aggregator.update(
1805            Price::from("1.00003"),
1806            Quantity::from(1),
1807            UnixNanos::from(2000),
1808        );
1809        aggregator.update(
1810            Price::from("1.00004"),
1811            Quantity::from(1),
1812            UnixNanos::from(3000),
1813        );
1814
1815        let handler_guard = handler.lock().unwrap();
1816        assert_eq!(handler_guard.len(), 2);
1817
1818        let bar1 = &handler_guard[0];
1819        assert_eq!(bar1.open, Price::from("1.00001"));
1820        assert_eq!(bar1.close, Price::from("1.00002"));
1821        assert_eq!(bar1.volume, Quantity::from(2));
1822
1823        let bar2 = &handler_guard[1];
1824        assert_eq!(bar2.open, Price::from("1.00003"));
1825        assert_eq!(bar2.close, Price::from("1.00004"));
1826        assert_eq!(bar2.volume, Quantity::from(2));
1827    }
1828
1829    #[rstest]
1830    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1831        let instrument = InstrumentAny::Equity(equity_aapl);
1832        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1833        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1834        let handler = Arc::new(Mutex::new(Vec::new()));
1835        let handler_clone = Arc::clone(&handler);
1836
1837        let mut aggregator = VolumeBarAggregator::new(
1838            bar_type,
1839            instrument.price_precision(),
1840            instrument.size_precision(),
1841            move |bar: Bar| {
1842                let mut handler_guard = handler_clone.lock().unwrap();
1843                handler_guard.push(bar);
1844            },
1845            false,
1846        );
1847
1848        aggregator.update(
1849            Price::from("1.00001"),
1850            Quantity::from(25),
1851            UnixNanos::default(),
1852        );
1853
1854        let handler_guard = handler.lock().unwrap();
1855        assert_eq!(handler_guard.len(), 2);
1856        let bar1 = &handler_guard[0];
1857        assert_eq!(bar1.volume, Quantity::from(10));
1858        let bar2 = &handler_guard[1];
1859        assert_eq!(bar2.volume, Quantity::from(10));
1860    }
1861
1862    #[rstest]
1863    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1864        let instrument = InstrumentAny::Equity(equity_aapl);
1865        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
1866        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1867        let handler = Arc::new(Mutex::new(Vec::new()));
1868        let handler_clone = Arc::clone(&handler);
1869
1870        let mut aggregator = ValueBarAggregator::new(
1871            bar_type,
1872            instrument.price_precision(),
1873            instrument.size_precision(),
1874            move |bar: Bar| {
1875                let mut handler_guard = handler_clone.lock().unwrap();
1876                handler_guard.push(bar);
1877            },
1878            false,
1879        );
1880
1881        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
1882        aggregator.update(
1883            Price::from("100.00"),
1884            Quantity::from(5),
1885            UnixNanos::default(),
1886        );
1887        aggregator.update(
1888            Price::from("100.00"),
1889            Quantity::from(5),
1890            UnixNanos::from(1000),
1891        );
1892
1893        let handler_guard = handler.lock().unwrap();
1894        assert_eq!(handler_guard.len(), 1);
1895        let bar = handler_guard.first().unwrap();
1896        assert_eq!(bar.volume, Quantity::from(10));
1897    }
1898
1899    #[rstest]
1900    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1901        let instrument = InstrumentAny::Equity(equity_aapl);
1902        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1903        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1904        let handler = Arc::new(Mutex::new(Vec::new()));
1905        let handler_clone = Arc::clone(&handler);
1906
1907        let mut aggregator = ValueBarAggregator::new(
1908            bar_type,
1909            instrument.price_precision(),
1910            instrument.size_precision(),
1911            move |bar: Bar| {
1912                let mut handler_guard = handler_clone.lock().unwrap();
1913                handler_guard.push(bar);
1914            },
1915            false,
1916        );
1917
1918        // Single large update: $100 * 25 = $2500 (should create 2 bars)
1919        aggregator.update(
1920            Price::from("100.00"),
1921            Quantity::from(25),
1922            UnixNanos::default(),
1923        );
1924
1925        let handler_guard = handler.lock().unwrap();
1926        assert_eq!(handler_guard.len(), 2);
1927        let remaining_value = aggregator.get_cumulative_value();
1928        assert!(remaining_value < 1000.0); // Should be less than threshold
1929    }
1930
1931    #[rstest]
1932    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1933        let instrument = InstrumentAny::Equity(equity_aapl);
1934        // One second bars
1935        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1936        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1937        let handler = Arc::new(Mutex::new(Vec::new()));
1938        let handler_clone = Arc::clone(&handler);
1939        let clock = Rc::new(RefCell::new(TestClock::new()));
1940
1941        let mut aggregator = TimeBarAggregator::new(
1942            bar_type,
1943            instrument.price_precision(),
1944            instrument.size_precision(),
1945            clock.clone(),
1946            move |bar: Bar| {
1947                let mut handler_guard = handler_clone.lock().unwrap();
1948                handler_guard.push(bar);
1949            },
1950            false, // await_partial
1951            true,  // build_with_no_updates
1952            false, // timestamp_on_close
1953            BarIntervalType::LeftOpen,
1954            None,  // time_bars_origin_offset
1955            15,    // bar_build_delay
1956            false, // skip_first_non_full_bar
1957        );
1958
1959        aggregator.update(
1960            Price::from("100.00"),
1961            Quantity::from(1),
1962            UnixNanos::default(),
1963        );
1964
1965        let next_sec = UnixNanos::from(1_000_000_000);
1966        clock.borrow_mut().set_time(next_sec);
1967
1968        let event = TimeEvent::new(
1969            Ustr::from("1-SECOND-LAST"),
1970            UUID4::new(),
1971            next_sec,
1972            next_sec,
1973        );
1974        aggregator.build_bar(event);
1975
1976        let handler_guard = handler.lock().unwrap();
1977        assert_eq!(handler_guard.len(), 1);
1978        let bar = handler_guard.first().unwrap();
1979        assert_eq!(bar.ts_event, UnixNanos::default());
1980        assert_eq!(bar.ts_init, next_sec);
1981    }
1982
1983    #[rstest]
1984    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1985        let instrument = InstrumentAny::Equity(equity_aapl);
1986        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1987        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1988        let handler = Arc::new(Mutex::new(Vec::new()));
1989        let handler_clone = Arc::clone(&handler);
1990        let clock = Rc::new(RefCell::new(TestClock::new()));
1991
1992        let mut aggregator = TimeBarAggregator::new(
1993            bar_type,
1994            instrument.price_precision(),
1995            instrument.size_precision(),
1996            clock.clone(),
1997            move |bar: Bar| {
1998                let mut handler_guard = handler_clone.lock().unwrap();
1999                handler_guard.push(bar);
2000            },
2001            false, // await_partial
2002            true,  // build_with_no_updates
2003            true,  // timestamp_on_close - changed to true to verify left-open behavior
2004            BarIntervalType::LeftOpen,
2005            None,
2006            15,
2007            false, // skip_first_non_full_bar
2008        );
2009
2010        // Update in first interval
2011        aggregator.update(
2012            Price::from("100.00"),
2013            Quantity::from(1),
2014            UnixNanos::default(),
2015        );
2016
2017        // First interval close
2018        let ts1 = UnixNanos::from(1_000_000_000);
2019        clock.borrow_mut().set_time(ts1);
2020        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2021        aggregator.build_bar(event);
2022
2023        // Update in second interval
2024        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2025
2026        // Second interval close
2027        let ts2 = UnixNanos::from(2_000_000_000);
2028        clock.borrow_mut().set_time(ts2);
2029        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2030        aggregator.build_bar(event);
2031
2032        let handler_guard = handler.lock().unwrap();
2033        assert_eq!(handler_guard.len(), 2);
2034
2035        let bar1 = &handler_guard[0];
2036        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
2037        assert_eq!(bar1.ts_init, ts1);
2038        assert_eq!(bar1.close, Price::from("100.00"));
2039        let bar2 = &handler_guard[1];
2040        assert_eq!(bar2.ts_event, ts2);
2041        assert_eq!(bar2.ts_init, ts2);
2042        assert_eq!(bar2.close, Price::from("101.00"));
2043    }
2044
2045    #[rstest]
2046    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
2047        let instrument = InstrumentAny::Equity(equity_aapl);
2048        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2049        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2050        let handler = Arc::new(Mutex::new(Vec::new()));
2051        let handler_clone = Arc::clone(&handler);
2052        let clock = Rc::new(RefCell::new(TestClock::new()));
2053        let mut aggregator = TimeBarAggregator::new(
2054            bar_type,
2055            instrument.price_precision(),
2056            instrument.size_precision(),
2057            clock.clone(),
2058            move |bar: Bar| {
2059                let mut handler_guard = handler_clone.lock().unwrap();
2060                handler_guard.push(bar);
2061            },
2062            false, // await_partial
2063            true,  // build_with_no_updates
2064            true,  // timestamp_on_close
2065            BarIntervalType::RightOpen,
2066            None,
2067            15,
2068            false, // skip_first_non_full_bar
2069        );
2070
2071        // Update in first interval
2072        aggregator.update(
2073            Price::from("100.00"),
2074            Quantity::from(1),
2075            UnixNanos::default(),
2076        );
2077
2078        // First interval close
2079        let ts1 = UnixNanos::from(1_000_000_000);
2080        clock.borrow_mut().set_time(ts1);
2081        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2082        aggregator.build_bar(event);
2083
2084        // Update in second interval
2085        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2086
2087        // Second interval close
2088        let ts2 = UnixNanos::from(2_000_000_000);
2089        clock.borrow_mut().set_time(ts2);
2090        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2091        aggregator.build_bar(event);
2092
2093        let handler_guard = handler.lock().unwrap();
2094        assert_eq!(handler_guard.len(), 2);
2095
2096        let bar1 = &handler_guard[0];
2097        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
2098        assert_eq!(bar1.ts_init, ts1);
2099        assert_eq!(bar1.close, Price::from("100.00"));
2100
2101        let bar2 = &handler_guard[1];
2102        assert_eq!(bar2.ts_event, ts1);
2103        assert_eq!(bar2.ts_init, ts2);
2104        assert_eq!(bar2.close, Price::from("101.00"));
2105    }
2106
2107    #[rstest]
2108    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2109        let instrument = InstrumentAny::Equity(equity_aapl);
2110        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2111        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2112        let handler = Arc::new(Mutex::new(Vec::new()));
2113        let handler_clone = Arc::clone(&handler);
2114        let clock = Rc::new(RefCell::new(TestClock::new()));
2115
2116        // First test with build_with_no_updates = false
2117        let mut aggregator = TimeBarAggregator::new(
2118            bar_type,
2119            instrument.price_precision(),
2120            instrument.size_precision(),
2121            clock.clone(),
2122            move |bar: Bar| {
2123                let mut handler_guard = handler_clone.lock().unwrap();
2124                handler_guard.push(bar);
2125            },
2126            false, // await_partial
2127            false, // build_with_no_updates disabled
2128            true,  // timestamp_on_close
2129            BarIntervalType::LeftOpen,
2130            None,
2131            15,
2132            false, // skip_first_non_full_bar
2133        );
2134
2135        // No updates, just interval close
2136        let ts1 = UnixNanos::from(1_000_000_000);
2137        clock.borrow_mut().set_time(ts1);
2138        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2139        aggregator.build_bar(event);
2140
2141        let handler_guard = handler.lock().unwrap();
2142        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
2143        drop(handler_guard);
2144
2145        // Now test with build_with_no_updates = true
2146        let handler = Arc::new(Mutex::new(Vec::new()));
2147        let handler_clone = Arc::clone(&handler);
2148        let mut aggregator = TimeBarAggregator::new(
2149            bar_type,
2150            instrument.price_precision(),
2151            instrument.size_precision(),
2152            clock.clone(),
2153            move |bar: Bar| {
2154                let mut handler_guard = handler_clone.lock().unwrap();
2155                handler_guard.push(bar);
2156            },
2157            false,
2158            true, // build_with_no_updates enabled
2159            true, // timestamp_on_close
2160            BarIntervalType::LeftOpen,
2161            None,
2162            15,
2163            false, // skip_first_non_full_bar
2164        );
2165
2166        aggregator.update(
2167            Price::from("100.00"),
2168            Quantity::from(1),
2169            UnixNanos::default(),
2170        );
2171
2172        // First interval with update
2173        let ts1 = UnixNanos::from(1_000_000_000);
2174        clock.borrow_mut().set_time(ts1);
2175        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2176        aggregator.build_bar(event);
2177
2178        // Second interval without updates
2179        let ts2 = UnixNanos::from(2_000_000_000);
2180        clock.borrow_mut().set_time(ts2);
2181        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2182        aggregator.build_bar(event);
2183
2184        let handler_guard = handler.lock().unwrap();
2185        assert_eq!(handler_guard.len(), 2); // Both bars should be built
2186        let bar1 = &handler_guard[0];
2187        assert_eq!(bar1.close, Price::from("100.00"));
2188        let bar2 = &handler_guard[1];
2189        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
2190    }
2191
2192    #[rstest]
2193    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2194        let instrument = InstrumentAny::Equity(equity_aapl);
2195        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2196        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2197        let clock = Rc::new(RefCell::new(TestClock::new()));
2198        let handler = Arc::new(Mutex::new(Vec::new()));
2199        let handler_clone = Arc::clone(&handler);
2200
2201        let mut aggregator = TimeBarAggregator::new(
2202            bar_type,
2203            instrument.price_precision(),
2204            instrument.size_precision(),
2205            clock.clone(),
2206            move |bar: Bar| {
2207                let mut handler_guard = handler_clone.lock().unwrap();
2208                handler_guard.push(bar);
2209            },
2210            false, // await_partial
2211            true,  // build_with_no_updates
2212            true,  // timestamp_on_close
2213            BarIntervalType::RightOpen,
2214            None,
2215            15,
2216            false, // skip_first_non_full_bar
2217        );
2218
2219        let ts1 = UnixNanos::from(1_000_000_000);
2220        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2221
2222        let ts2 = UnixNanos::from(2_000_000_000);
2223        clock.borrow_mut().set_time(ts2);
2224
2225        // Simulate timestamp on close
2226        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2227        aggregator.build_bar(event);
2228
2229        let handler_guard = handler.lock().unwrap();
2230        let bar = handler_guard.first().unwrap();
2231        assert_eq!(bar.ts_event, UnixNanos::default());
2232        assert_eq!(bar.ts_init, ts2);
2233    }
2234
2235    #[rstest]
2236    fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2237        let instrument = InstrumentAny::Equity(equity_aapl);
2238        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2239        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2240        let clock = Rc::new(RefCell::new(TestClock::new()));
2241        let handler = Arc::new(Mutex::new(Vec::new()));
2242        let handler_clone = Arc::clone(&handler);
2243
2244        let mut aggregator = TimeBarAggregator::new(
2245            bar_type,
2246            instrument.price_precision(),
2247            instrument.size_precision(),
2248            clock.clone(),
2249            move |bar: Bar| {
2250                let mut handler_guard = handler_clone.lock().unwrap();
2251                handler_guard.push(bar);
2252            },
2253            false, // await_partial
2254            true,  // build_with_no_updates
2255            true,  // timestamp_on_close
2256            BarIntervalType::LeftOpen,
2257            None,
2258            15,
2259            false, // skip_first_non_full_bar
2260        );
2261
2262        let ts1 = UnixNanos::from(1_000_000_000);
2263        clock.borrow_mut().set_time(ts1);
2264
2265        let initial_time = clock.borrow().utc_now();
2266        aggregator.start_batch_time(UnixNanos::from(
2267            initial_time.timestamp_nanos_opt().unwrap() as u64
2268        ));
2269
2270        let handler_guard = handler.lock().unwrap();
2271        assert_eq!(handler_guard.len(), 0);
2272    }
2273}