nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25    clock::Clock,
26    timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29    SharedCell, UnixNanos, WeakCell,
30    correctness::{self, FAILED},
31    datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34    data::{
35        QuoteTick, TradeTick,
36        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37    },
38    enums::{AggregationSource, BarAggregation, BarIntervalType},
39    types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
40};
41
42/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
43///
44/// Implementors receive updates and produce completed bars via handlers, with support for partial and batch updates.
45pub trait BarAggregator: Any + Debug {
46    /// The [`BarType`] to be aggregated.
47    fn bar_type(&self) -> BarType;
48    /// If the aggregator is running and will receive data from the message bus.
49    fn is_running(&self) -> bool;
50    fn set_await_partial(&mut self, value: bool);
51    /// Enables or disables awaiting a partial bar before full aggregation.
52    fn set_is_running(&mut self, value: bool);
53    /// Sets the running state of the aggregator (receiving updates when `true`).
54    /// Updates the aggregator  with the given price and size.
55    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
56    /// Updates the aggregator with the given quote.
57    fn handle_quote(&mut self, quote: QuoteTick) {
58        let spec = self.bar_type().spec();
59        if !self.await_partial() {
60            self.update(
61                quote.extract_price(spec.price_type),
62                quote.extract_size(spec.price_type),
63                quote.ts_event,
64            );
65        }
66    }
67    /// Updates the aggregator with the given trade.
68    fn handle_trade(&mut self, trade: TradeTick) {
69        if !self.await_partial() {
70            self.update(trade.price, trade.size, trade.ts_event);
71        }
72    }
73    /// Updates the aggregator with the given bar.
74    fn handle_bar(&mut self, bar: Bar) {
75        if !self.await_partial() {
76            self.update_bar(bar, bar.volume, bar.ts_init);
77        }
78    }
79    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
80    /// Incorporates an existing bar and its volume into aggregation at the given init timestamp.
81    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
82    /// Starts batch mode, sending bars to the supplied handler for the given time context.
83    fn stop_batch_update(&mut self);
84    /// Stops batch mode and restores the standard bar handler.
85    fn await_partial(&self) -> bool;
86    /// Returns `true` if awaiting a partial bar before processing updates.
87    /// Sets the initial values for a partially completed bar.
88    fn set_partial(&mut self, partial_bar: Bar);
89    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
90    fn stop(&mut self) {}
91}
92
93impl dyn BarAggregator {
94    /// Returns a reference to this aggregator as `Any` for downcasting.
95    pub fn as_any(&self) -> &dyn Any {
96        self
97    }
98    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
99    pub fn as_any_mut(&mut self) -> &mut dyn Any {
100        self
101    }
102}
103
104/// Provides a generic bar builder for aggregation.
105#[derive(Debug)]
106pub struct BarBuilder {
107    bar_type: BarType,
108    price_precision: u8,
109    size_precision: u8,
110    initialized: bool,
111    ts_last: UnixNanos,
112    count: usize,
113    partial_set: bool,
114    last_close: Option<Price>,
115    open: Option<Price>,
116    high: Option<Price>,
117    low: Option<Price>,
118    close: Option<Price>,
119    volume: Quantity,
120}
121
122impl BarBuilder {
123    /// Creates a new [`BarBuilder`] instance.
124    ///
125    /// # Panics
126    ///
127    /// This function panics if:
128    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
129    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
130    #[must_use]
131    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
132        correctness::check_equal(
133            &bar_type.aggregation_source(),
134            &AggregationSource::Internal,
135            "bar_type.aggregation_source",
136            "AggregationSource::Internal",
137        )
138        .expect(FAILED);
139
140        Self {
141            bar_type,
142            price_precision,
143            size_precision,
144            initialized: false,
145            ts_last: UnixNanos::default(),
146            count: 0,
147            partial_set: false,
148            last_close: None,
149            open: None,
150            high: None,
151            low: None,
152            close: None,
153            volume: Quantity::zero(size_precision),
154        }
155    }
156
157    /// Set the initial values for a partially completed bar.
158    ///
159    /// # Panics
160    ///
161    /// Panics if internal values for `high` or `low` are unexpectedly missing.
162    pub fn set_partial(&mut self, partial_bar: Bar) {
163        if self.partial_set {
164            return; // Already updated
165        }
166
167        self.open = Some(partial_bar.open);
168
169        if self.high.is_none() || partial_bar.high > self.high.unwrap() {
170            self.high = Some(partial_bar.high);
171        }
172
173        if self.low.is_none() || partial_bar.low < self.low.unwrap() {
174            self.low = Some(partial_bar.low);
175        }
176
177        if self.close.is_none() {
178            self.close = Some(partial_bar.close);
179        }
180
181        self.volume = partial_bar.volume;
182
183        if self.ts_last == 0 {
184            self.ts_last = partial_bar.ts_init;
185        }
186
187        self.partial_set = true;
188        self.initialized = true;
189    }
190
191    /// Updates the builder state with the given price, size, and event timestamp.
192    ///
193    /// # Panics
194    ///
195    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
196    pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
197        if ts_event < self.ts_last {
198            return; // Not applicable
199        }
200
201        if self.open.is_none() {
202            self.open = Some(price);
203            self.high = Some(price);
204            self.low = Some(price);
205            self.initialized = true;
206        } else {
207            if price > self.high.unwrap() {
208                self.high = Some(price);
209            }
210            if price < self.low.unwrap() {
211                self.low = Some(price);
212            }
213        }
214
215        self.close = Some(price);
216        self.volume = self.volume.add(size);
217        self.count += 1;
218        self.ts_last = ts_event;
219    }
220
221    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
222    ///
223    /// # Panics
224    ///
225    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
226    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
227        if ts_init < self.ts_last {
228            return; // Not applicable
229        }
230
231        if self.open.is_none() {
232            self.open = Some(bar.open);
233            self.high = Some(bar.high);
234            self.low = Some(bar.low);
235            self.initialized = true;
236        } else {
237            if bar.high > self.high.unwrap() {
238                self.high = Some(bar.high);
239            }
240            if bar.low < self.low.unwrap() {
241                self.low = Some(bar.low);
242            }
243        }
244
245        self.close = Some(bar.close);
246        self.volume = self.volume.add(volume);
247        self.count += 1;
248        self.ts_last = ts_init;
249    }
250
251    /// Reset the bar builder.
252    ///
253    /// All stateful fields are reset to their initial value.
254    pub fn reset(&mut self) {
255        self.open = None;
256        self.high = None;
257        self.low = None;
258        self.volume = Quantity::zero(self.size_precision);
259        self.count = 0;
260    }
261
262    /// Return the aggregated bar and reset.
263    pub fn build_now(&mut self) -> Bar {
264        self.build(self.ts_last, self.ts_last)
265    }
266
267    /// Returns the aggregated bar for the given timestamps, then resets the builder.
268    ///
269    /// # Panics
270    ///
271    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
272    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
273        if self.open.is_none() {
274            self.open = self.last_close;
275            self.high = self.last_close;
276            self.low = self.last_close;
277            self.close = self.last_close;
278        }
279
280        if let (Some(close), Some(low)) = (self.close, self.low)
281            && close < low
282        {
283            self.low = Some(close);
284        }
285
286        if let (Some(close), Some(high)) = (self.close, self.high)
287            && close > high
288        {
289            self.high = Some(close);
290        }
291
292        // SAFETY: The open was checked, so we can assume all prices are Some
293        let bar = Bar::new(
294            self.bar_type,
295            self.open.unwrap(),
296            self.high.unwrap(),
297            self.low.unwrap(),
298            self.close.unwrap(),
299            self.volume,
300            ts_event,
301            ts_init,
302        );
303
304        self.last_close = self.close;
305        self.reset();
306        bar
307    }
308}
309
310/// Provides a means of aggregating specified bar types and sending to a registered handler.
311pub struct BarAggregatorCore<H>
312where
313    H: FnMut(Bar),
314{
315    bar_type: BarType,
316    builder: BarBuilder,
317    handler: H,
318    handler_backup: Option<H>,
319    batch_handler: Option<Box<dyn FnMut(Bar)>>,
320    await_partial: bool,
321    is_running: bool,
322    batch_mode: bool,
323}
324
325impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        f.debug_struct(stringify!(BarAggregatorCore))
328            .field("bar_type", &self.bar_type)
329            .field("builder", &self.builder)
330            .field("await_partial", &self.await_partial)
331            .field("is_running", &self.is_running)
332            .field("batch_mode", &self.batch_mode)
333            .finish()
334    }
335}
336
337impl<H> BarAggregatorCore<H>
338where
339    H: FnMut(Bar),
340{
341    /// Creates a new [`BarAggregatorCore`] instance.
342    ///
343    /// # Panics
344    ///
345    /// This function panics if:
346    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
347    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
348    pub fn new(
349        bar_type: BarType,
350        price_precision: u8,
351        size_precision: u8,
352        handler: H,
353        await_partial: bool,
354    ) -> Self {
355        Self {
356            bar_type,
357            builder: BarBuilder::new(bar_type, price_precision, size_precision),
358            handler,
359            handler_backup: None,
360            batch_handler: None,
361            await_partial,
362            is_running: false,
363            batch_mode: false,
364        }
365    }
366
367    /// Sets whether to await a partial bar before processing new updates.
368    pub const fn set_await_partial(&mut self, value: bool) {
369        self.await_partial = value;
370    }
371
372    /// Sets the running state of the aggregator (receives updates when `true`).
373    pub const fn set_is_running(&mut self, value: bool) {
374        self.is_running = value;
375    }
376
377    /// Returns `true` if the aggregator is awaiting a partial bar to complete before aggregation.
378    pub const fn await_partial(&self) -> bool {
379        self.await_partial
380    }
381
382    /// Initializes builder state with a partially completed bar.
383    pub fn set_partial(&mut self, partial_bar: Bar) {
384        self.builder.set_partial(partial_bar);
385    }
386
387    fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
388        self.builder.update(price, size, ts_event);
389    }
390
391    fn build_now_and_send(&mut self) {
392        let bar = self.builder.build_now();
393        (self.handler)(bar);
394    }
395
396    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
397        let bar = self.builder.build(ts_event, ts_init);
398
399        if self.batch_mode {
400            if let Some(handler) = &mut self.batch_handler {
401                handler(bar);
402            }
403        } else {
404            (self.handler)(bar);
405        }
406    }
407
408    /// Enables batch update mode, sending bars to the provided handler instead of immediate dispatch.
409    pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
410        self.batch_mode = true;
411        self.batch_handler = Some(handler);
412    }
413
414    /// Disables batch update mode and restores the original bar handler.
415    pub fn stop_batch_update(&mut self) {
416        self.batch_mode = false;
417
418        if let Some(handler) = self.handler_backup.take() {
419            self.handler = handler;
420        }
421    }
422}
423
424/// Provides a means of building tick bars aggregated from quote and trades.
425///
426/// When received tick count reaches the step threshold of the bar
427/// specification, then a bar is created and sent to the handler.
428pub struct TickBarAggregator<H>
429where
430    H: FnMut(Bar),
431{
432    core: BarAggregatorCore<H>,
433    cum_value: f64,
434}
435
436impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        f.debug_struct(stringify!(TickBarAggregator))
439            .field("core", &self.core)
440            .field("cum_value", &self.cum_value)
441            .finish()
442    }
443}
444
445impl<H> TickBarAggregator<H>
446where
447    H: FnMut(Bar),
448{
449    /// Creates a new [`TickBarAggregator`] instance.
450    ///
451    /// # Panics
452    ///
453    /// This function panics if:
454    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
455    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
456    pub fn new(
457        bar_type: BarType,
458        price_precision: u8,
459        size_precision: u8,
460        handler: H,
461        await_partial: bool,
462    ) -> Self {
463        Self {
464            core: BarAggregatorCore::new(
465                bar_type,
466                price_precision,
467                size_precision,
468                handler,
469                await_partial,
470            ),
471            cum_value: 0.0,
472        }
473    }
474}
475
476impl<H> BarAggregator for TickBarAggregator<H>
477where
478    H: FnMut(Bar) + 'static,
479{
480    fn bar_type(&self) -> BarType {
481        self.core.bar_type
482    }
483
484    fn is_running(&self) -> bool {
485        self.core.is_running
486    }
487
488    fn set_await_partial(&mut self, value: bool) {
489        self.core.set_await_partial(value);
490    }
491
492    fn set_is_running(&mut self, value: bool) {
493        self.core.set_is_running(value);
494    }
495
496    fn await_partial(&self) -> bool {
497        self.core.await_partial()
498    }
499
500    /// Apply the given update to the aggregator.
501    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
502        self.core.apply_update(price, size, ts_event);
503        let spec = self.core.bar_type.spec();
504
505        if self.core.builder.count >= spec.step.get() {
506            self.core.build_now_and_send();
507        }
508    }
509
510    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
511        let mut volume_update = volume;
512        let average_price = Price::new(
513            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
514            self.core.builder.price_precision,
515        );
516
517        while volume_update.as_f64() > 0.0 {
518            let value_update = average_price.as_f64() * volume_update.as_f64();
519            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
520                self.cum_value += value_update;
521                self.core.builder.update_bar(bar, volume_update, ts_init);
522                break;
523            }
524
525            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
526            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
527            self.core.builder.update_bar(
528                bar,
529                Quantity::new(volume_diff, volume_update.precision),
530                ts_init,
531            );
532
533            self.core.build_now_and_send();
534            self.cum_value = 0.0;
535            volume_update = Quantity::new(
536                volume_update.as_f64() - volume_diff,
537                volume_update.precision,
538            );
539        }
540    }
541
542    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
543        self.core.start_batch_update(handler);
544    }
545
546    fn stop_batch_update(&mut self) {
547        self.core.stop_batch_update();
548    }
549
550    fn set_partial(&mut self, partial_bar: Bar) {
551        self.core.set_partial(partial_bar);
552    }
553}
554
555/// Provides a means of building volume bars aggregated from quote and trades.
556pub struct VolumeBarAggregator<H>
557where
558    H: FnMut(Bar),
559{
560    core: BarAggregatorCore<H>,
561}
562
563impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
564    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
565        f.debug_struct(stringify!(VolumeBarAggregator))
566            .field("core", &self.core)
567            .finish()
568    }
569}
570
571impl<H> VolumeBarAggregator<H>
572where
573    H: FnMut(Bar),
574{
575    /// Creates a new [`VolumeBarAggregator`] instance.
576    ///
577    /// # Panics
578    ///
579    /// This function panics if:
580    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
581    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
582    pub fn new(
583        bar_type: BarType,
584        price_precision: u8,
585        size_precision: u8,
586        handler: H,
587        await_partial: bool,
588    ) -> Self {
589        Self {
590            core: BarAggregatorCore::new(
591                bar_type.standard(),
592                price_precision,
593                size_precision,
594                handler,
595                await_partial,
596            ),
597        }
598    }
599}
600
601impl<H> BarAggregator for VolumeBarAggregator<H>
602where
603    H: FnMut(Bar) + 'static,
604{
605    fn bar_type(&self) -> BarType {
606        self.core.bar_type
607    }
608
609    fn is_running(&self) -> bool {
610        self.core.is_running
611    }
612
613    fn set_await_partial(&mut self, value: bool) {
614        self.core.set_await_partial(value);
615    }
616
617    fn set_is_running(&mut self, value: bool) {
618        self.core.set_is_running(value);
619    }
620
621    fn await_partial(&self) -> bool {
622        self.core.await_partial()
623    }
624
625    /// Apply the given update to the aggregator.
626    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
627        let mut raw_size_update = size.raw;
628        let spec = self.core.bar_type.spec();
629        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
630
631        while raw_size_update > 0 {
632            if self.core.builder.volume.raw + raw_size_update < raw_step {
633                self.core.apply_update(
634                    price,
635                    Quantity::from_raw(raw_size_update, size.precision),
636                    ts_event,
637                );
638                break;
639            }
640
641            let raw_size_diff = raw_step - self.core.builder.volume.raw;
642            self.core.apply_update(
643                price,
644                Quantity::from_raw(raw_size_diff, size.precision),
645                ts_event,
646            );
647
648            self.core.build_now_and_send();
649            raw_size_update -= raw_size_diff;
650        }
651    }
652
653    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
654        let mut raw_volume_update = volume.raw;
655        let spec = self.core.bar_type.spec();
656        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
657
658        while raw_volume_update > 0 {
659            if self.core.builder.volume.raw + raw_volume_update < raw_step {
660                self.core.builder.update_bar(
661                    bar,
662                    Quantity::from_raw(raw_volume_update, volume.precision),
663                    ts_init,
664                );
665                break;
666            }
667
668            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
669            self.core.builder.update_bar(
670                bar,
671                Quantity::from_raw(raw_volume_diff, volume.precision),
672                ts_init,
673            );
674
675            self.core.build_now_and_send();
676            raw_volume_update -= raw_volume_diff;
677        }
678    }
679
680    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
681        self.core.start_batch_update(handler);
682    }
683
684    fn stop_batch_update(&mut self) {
685        self.core.stop_batch_update();
686    }
687
688    fn set_partial(&mut self, partial_bar: Bar) {
689        self.core.set_partial(partial_bar);
690    }
691}
692
693/// Provides a means of building value bars aggregated from quote and trades.
694///
695/// When received value reaches the step threshold of the bar
696/// specification, then a bar is created and sent to the handler.
697pub struct ValueBarAggregator<H>
698where
699    H: FnMut(Bar),
700{
701    core: BarAggregatorCore<H>,
702    cum_value: f64,
703}
704
705impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
706    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707        f.debug_struct(stringify!(ValueBarAggregator))
708            .field("core", &self.core)
709            .field("cum_value", &self.cum_value)
710            .finish()
711    }
712}
713
714impl<H> ValueBarAggregator<H>
715where
716    H: FnMut(Bar),
717{
718    /// Creates a new [`ValueBarAggregator`] instance.
719    ///
720    /// # Panics
721    ///
722    /// This function panics if:
723    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
724    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
725    pub fn new(
726        bar_type: BarType,
727        price_precision: u8,
728        size_precision: u8,
729        handler: H,
730        await_partial: bool,
731    ) -> Self {
732        Self {
733            core: BarAggregatorCore::new(
734                bar_type.standard(),
735                price_precision,
736                size_precision,
737                handler,
738                await_partial,
739            ),
740            cum_value: 0.0,
741        }
742    }
743
744    #[must_use]
745    /// Returns the cumulative value for the aggregator.
746    pub const fn get_cumulative_value(&self) -> f64 {
747        self.cum_value
748    }
749}
750
751impl<H> BarAggregator for ValueBarAggregator<H>
752where
753    H: FnMut(Bar) + 'static,
754{
755    fn bar_type(&self) -> BarType {
756        self.core.bar_type
757    }
758
759    fn is_running(&self) -> bool {
760        self.core.is_running
761    }
762
763    fn set_await_partial(&mut self, value: bool) {
764        self.core.set_await_partial(value);
765    }
766
767    fn set_is_running(&mut self, value: bool) {
768        self.core.set_is_running(value);
769    }
770
771    fn await_partial(&self) -> bool {
772        self.core.await_partial()
773    }
774
775    /// Apply the given update to the aggregator.
776    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
777        let mut size_update = size.as_f64();
778        let spec = self.core.bar_type.spec();
779
780        while size_update > 0.0 {
781            let value_update = price.as_f64() * size_update;
782            if self.cum_value + value_update < spec.step.get() as f64 {
783                self.cum_value += value_update;
784                self.core
785                    .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
786                break;
787            }
788
789            let value_diff = spec.step.get() as f64 - self.cum_value;
790            let size_diff = size_update * (value_diff / value_update);
791            self.core
792                .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
793
794            self.core.build_now_and_send();
795            self.cum_value = 0.0;
796            size_update -= size_diff;
797        }
798    }
799
800    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
801        let mut volume_update = volume;
802        let average_price = Price::new(
803            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
804            self.core.builder.price_precision,
805        );
806
807        while volume_update.as_f64() > 0.0 {
808            let value_update = average_price.as_f64() * volume_update.as_f64();
809            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
810                self.cum_value += value_update;
811                self.core.builder.update_bar(bar, volume_update, ts_init);
812                break;
813            }
814
815            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
816            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
817            self.core.builder.update_bar(
818                bar,
819                Quantity::new(volume_diff, volume_update.precision),
820                ts_init,
821            );
822
823            self.core.build_now_and_send();
824            self.cum_value = 0.0;
825            volume_update = Quantity::new(
826                volume_update.as_f64() - volume_diff,
827                volume_update.precision,
828            );
829        }
830    }
831
832    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
833        self.core.start_batch_update(handler);
834    }
835
836    fn stop_batch_update(&mut self) {
837        self.core.stop_batch_update();
838    }
839
840    fn set_partial(&mut self, partial_bar: Bar) {
841        self.core.set_partial(partial_bar);
842    }
843}
844
845/// Provides a means of building time bars aggregated from quote and trades.
846///
847/// At each aggregation time interval, a bar is created and sent to the handler.
848pub struct TimeBarAggregator<H>
849where
850    H: FnMut(Bar),
851{
852    core: BarAggregatorCore<H>,
853    clock: Rc<RefCell<dyn Clock>>,
854    build_with_no_updates: bool,
855    timestamp_on_close: bool,
856    is_left_open: bool,
857    build_on_next_tick: bool,
858    stored_open_ns: UnixNanos,
859    stored_close_ns: UnixNanos,
860    timer_name: String,
861    interval_ns: UnixNanos,
862    next_close_ns: UnixNanos,
863    bar_build_delay: u64,
864    batch_open_ns: UnixNanos,
865    batch_next_close_ns: UnixNanos,
866    time_bars_origin_offset: Option<TimeDelta>,
867    skip_first_non_full_bar: bool,
868}
869
870impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
871    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872        f.debug_struct(stringify!(TimeBarAggregator))
873            .field("core", &self.core)
874            .field("build_with_no_updates", &self.build_with_no_updates)
875            .field("timestamp_on_close", &self.timestamp_on_close)
876            .field("is_left_open", &self.is_left_open)
877            .field("timer_name", &self.timer_name)
878            .field("interval_ns", &self.interval_ns)
879            .field("bar_build_delay", &self.bar_build_delay)
880            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
881            .finish()
882    }
883}
884
885#[derive(Clone, Debug)]
886/// Callback wrapper for time-based bar aggregation events.
887///
888/// This struct provides a bridge between timer events and time bar aggregation,
889/// allowing bars to be automatically built and emitted at regular time intervals.
890/// It holds a reference to a `TimeBarAggregator` and triggers bar creation when
891/// timer events occur.
892pub struct NewBarCallback<H: FnMut(Bar)> {
893    aggregator: WeakCell<TimeBarAggregator<H>>,
894}
895
896impl<H: FnMut(Bar)> NewBarCallback<H> {
897    /// Creates a new callback that invokes the time bar aggregator on timer events.
898    #[must_use]
899    pub fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
900        let shared: SharedCell<TimeBarAggregator<H>> = SharedCell::from(aggregator);
901        Self {
902            aggregator: shared.downgrade(),
903        }
904    }
905}
906
907impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
908    fn from(value: NewBarCallback<H>) -> Self {
909        Self::Rust(Rc::new(move |event: TimeEvent| {
910            if let Some(agg) = value.aggregator.upgrade() {
911                agg.borrow_mut().build_bar(event);
912            }
913        }))
914    }
915}
916
917impl<H> TimeBarAggregator<H>
918where
919    H: FnMut(Bar) + 'static,
920{
921    /// Creates a new [`TimeBarAggregator`] instance.
922    ///
923    /// # Panics
924    ///
925    /// This function panics if:
926    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
927    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
928    #[allow(clippy::too_many_arguments)]
929    pub fn new(
930        bar_type: BarType,
931        price_precision: u8,
932        size_precision: u8,
933        clock: Rc<RefCell<dyn Clock>>,
934        handler: H,
935        await_partial: bool,
936        build_with_no_updates: bool,
937        timestamp_on_close: bool,
938        interval_type: BarIntervalType,
939        time_bars_origin_offset: Option<TimeDelta>,
940        bar_build_delay: u64,
941        skip_first_non_full_bar: bool,
942    ) -> Self {
943        let is_left_open = match interval_type {
944            BarIntervalType::LeftOpen => true,
945            BarIntervalType::RightOpen => false,
946        };
947
948        let core = BarAggregatorCore::new(
949            bar_type.standard(),
950            price_precision,
951            size_precision,
952            handler,
953            await_partial,
954        );
955
956        Self {
957            core,
958            clock,
959            build_with_no_updates,
960            timestamp_on_close,
961            is_left_open,
962            build_on_next_tick: false,
963            stored_open_ns: UnixNanos::default(),
964            stored_close_ns: UnixNanos::default(),
965            timer_name: bar_type.to_string(),
966            interval_ns: get_bar_interval_ns(&bar_type),
967            next_close_ns: UnixNanos::default(),
968            bar_build_delay,
969            batch_open_ns: UnixNanos::default(),
970            batch_next_close_ns: UnixNanos::default(),
971            time_bars_origin_offset,
972            skip_first_non_full_bar,
973        }
974    }
975
976    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
977    ///
978    /// # Errors
979    ///
980    /// Returns an error if setting up the underlying clock timer fails.
981    ///
982    /// # Panics
983    ///
984    /// Panics if the underlying clock timer registration fails.
985    pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
986        let now = self.clock.borrow().utc_now();
987        let mut start_time =
988            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
989
990        if start_time == now {
991            self.skip_first_non_full_bar = false;
992        }
993
994        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
995
996        let spec = &self.bar_type().spec();
997        let start_time_ns = UnixNanos::from(start_time);
998
999        if spec.aggregation == BarAggregation::Month {
1000            let step = spec.step.get() as u32;
1001            let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
1002
1003            self.clock
1004                .borrow_mut()
1005                .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1006                .expect(FAILED);
1007        } else {
1008            self.clock
1009                .borrow_mut()
1010                .set_timer_ns(
1011                    &self.timer_name,
1012                    self.interval_ns.as_u64(),
1013                    Some(start_time_ns),
1014                    None,
1015                    Some(callback.into()),
1016                    None,
1017                    None,
1018                )
1019                .expect(FAILED);
1020        }
1021
1022        log::debug!("Started timer {}", self.timer_name);
1023        Ok(())
1024    }
1025
1026    /// Stops the time bar aggregator.
1027    pub fn stop(&mut self) {
1028        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1029    }
1030
1031    /// Starts batch time for bar aggregation.
1032    ///
1033    /// # Panics
1034    ///
1035    /// Panics if month arithmetic operations fail for monthly aggregation intervals.
1036    pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1037        let spec = self.bar_type().spec();
1038        self.core.batch_mode = true;
1039
1040        let time = time_ns.to_datetime_utc();
1041        let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
1042        self.batch_open_ns = UnixNanos::from(start_time);
1043
1044        if spec.aggregation == BarAggregation::Month {
1045            let step = spec.step.get() as u32;
1046
1047            if self.batch_open_ns == time_ns {
1048                self.batch_open_ns =
1049                    subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1050            }
1051
1052            self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1053        } else {
1054            if self.batch_open_ns == time_ns {
1055                self.batch_open_ns -= self.interval_ns;
1056            }
1057
1058            self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1059        }
1060    }
1061
1062    const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1063        if self.is_left_open {
1064            if self.timestamp_on_close {
1065                close_ns
1066            } else {
1067                open_ns
1068            }
1069        } else {
1070            open_ns
1071        }
1072    }
1073
1074    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1075        if self.skip_first_non_full_bar {
1076            self.core.builder.reset();
1077            self.skip_first_non_full_bar = false;
1078        } else {
1079            self.core.build_and_send(ts_event, ts_init);
1080        }
1081    }
1082
1083    fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1084        if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1085            let ts_init = self.batch_next_close_ns;
1086            let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1087            self.build_and_send(ts_event, ts_init);
1088        }
1089    }
1090
1091    fn batch_post_update(&mut self, time_ns: UnixNanos) {
1092        let step = self.bar_type().spec().step.get() as u32;
1093
1094        // If not in batch mode and time matches next close, reset batch close
1095        if !self.core.batch_mode
1096            && time_ns == self.batch_next_close_ns
1097            && time_ns > self.stored_open_ns
1098        {
1099            self.batch_next_close_ns = UnixNanos::default();
1100            return;
1101        }
1102
1103        if time_ns > self.batch_next_close_ns {
1104            // Ensure batch times are coherent with last builder update
1105            if self.bar_type().spec().aggregation == BarAggregation::Month {
1106                while self.batch_next_close_ns < time_ns {
1107                    self.batch_next_close_ns =
1108                        add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1109                }
1110
1111                self.batch_open_ns =
1112                    subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1113            } else {
1114                while self.batch_next_close_ns < time_ns {
1115                    self.batch_next_close_ns += self.interval_ns;
1116                }
1117
1118                self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1119            }
1120        }
1121
1122        if time_ns == self.batch_next_close_ns {
1123            let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1124            self.build_and_send(ts_event, time_ns);
1125            self.batch_open_ns = self.batch_next_close_ns;
1126
1127            if self.bar_type().spec().aggregation == BarAggregation::Month {
1128                self.batch_next_close_ns =
1129                    add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1130            } else {
1131                self.batch_next_close_ns += self.interval_ns;
1132            }
1133        }
1134
1135        // Delay resetting batch_next_close_ns to allow creating a last historical bar when transitioning to regular bars
1136        if !self.core.batch_mode {
1137            self.batch_next_close_ns = UnixNanos::default();
1138        }
1139    }
1140
1141    fn build_bar(&mut self, event: TimeEvent) {
1142        if !self.core.builder.initialized {
1143            self.build_on_next_tick = true;
1144            self.stored_close_ns = self.next_close_ns;
1145            return;
1146        }
1147
1148        if !self.build_with_no_updates && self.core.builder.count == 0 {
1149            return;
1150        }
1151
1152        let ts_init = event.ts_event;
1153        let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1154        self.build_and_send(ts_event, ts_init);
1155
1156        self.stored_open_ns = ts_init;
1157
1158        if self.bar_type().spec().aggregation == BarAggregation::Month {
1159            let step = self.bar_type().spec().step.get() as u32;
1160            let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1161
1162            self.clock
1163                .borrow_mut()
1164                .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1165                .expect(FAILED);
1166
1167            self.next_close_ns = next_alert_ns;
1168        } else {
1169            self.next_close_ns = self
1170                .clock
1171                .borrow()
1172                .next_time_ns(&self.timer_name)
1173                .unwrap_or_default();
1174        }
1175    }
1176}
1177
1178impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1179where
1180    H: FnMut(Bar) + 'static,
1181{
1182    fn bar_type(&self) -> BarType {
1183        self.core.bar_type
1184    }
1185
1186    fn is_running(&self) -> bool {
1187        self.core.is_running
1188    }
1189
1190    fn set_await_partial(&mut self, value: bool) {
1191        self.core.set_await_partial(value);
1192    }
1193
1194    fn set_is_running(&mut self, value: bool) {
1195        self.core.set_is_running(value);
1196    }
1197
1198    fn await_partial(&self) -> bool {
1199        self.core.await_partial()
1200    }
1201    /// Stop time-based aggregator by canceling its timer.
1202    fn stop(&mut self) {
1203        Self::stop(self);
1204    }
1205
1206    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1207        if self.batch_next_close_ns != UnixNanos::default() {
1208            self.batch_pre_update(ts_event);
1209        }
1210
1211        self.core.apply_update(price, size, ts_event);
1212
1213        if self.build_on_next_tick {
1214            if ts_event <= self.stored_close_ns {
1215                let ts_init = ts_event;
1216                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1217                self.build_and_send(ts_event, ts_init);
1218            }
1219
1220            self.build_on_next_tick = false;
1221            self.stored_close_ns = UnixNanos::default();
1222        }
1223
1224        if self.batch_next_close_ns != UnixNanos::default() {
1225            self.batch_post_update(ts_event);
1226        }
1227    }
1228
1229    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1230        if self.batch_next_close_ns != UnixNanos::default() {
1231            self.batch_pre_update(ts_init);
1232        }
1233
1234        self.core.builder.update_bar(bar, volume, ts_init);
1235
1236        if self.build_on_next_tick {
1237            if ts_init <= self.stored_close_ns {
1238                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1239                self.build_and_send(ts_event, ts_init);
1240            }
1241
1242            // Reset flag and clear stored close
1243            self.build_on_next_tick = false;
1244            self.stored_close_ns = UnixNanos::default();
1245        }
1246
1247        if self.batch_next_close_ns != UnixNanos::default() {
1248            self.batch_post_update(ts_init);
1249        }
1250    }
1251
1252    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1253        self.core.start_batch_update(handler);
1254        self.start_batch_time(time_ns);
1255    }
1256
1257    fn stop_batch_update(&mut self) {
1258        self.core.stop_batch_update();
1259    }
1260
1261    fn set_partial(&mut self, partial_bar: Bar) {
1262        self.core.set_partial(partial_bar);
1263    }
1264}
1265
1266////////////////////////////////////////////////////////////////////////////////
1267// Tests
1268////////////////////////////////////////////////////////////////////////////////
1269#[cfg(test)]
1270mod tests {
1271    use std::sync::{Arc, Mutex};
1272
1273    use nautilus_common::clock::TestClock;
1274    use nautilus_core::UUID4;
1275    use nautilus_model::{
1276        data::{BarSpecification, BarType},
1277        enums::{AggregationSource, BarAggregation, PriceType},
1278        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1279        types::{Price, Quantity},
1280    };
1281    use rstest::rstest;
1282    use ustr::Ustr;
1283
1284    use super::*;
1285
1286    #[rstest]
1287    fn test_bar_builder_initialization(equity_aapl: Equity) {
1288        let instrument = InstrumentAny::Equity(equity_aapl);
1289        let bar_type = BarType::new(
1290            instrument.id(),
1291            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1292            AggregationSource::Internal,
1293        );
1294        let builder = BarBuilder::new(
1295            bar_type,
1296            instrument.price_precision(),
1297            instrument.size_precision(),
1298        );
1299
1300        assert!(!builder.initialized);
1301        assert_eq!(builder.ts_last, 0);
1302        assert_eq!(builder.count, 0);
1303    }
1304
1305    #[rstest]
1306    fn test_set_partial_update(equity_aapl: Equity) {
1307        let instrument = InstrumentAny::Equity(equity_aapl);
1308        let bar_type = BarType::new(
1309            instrument.id(),
1310            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1311            AggregationSource::Internal,
1312        );
1313        let mut builder = BarBuilder::new(
1314            bar_type,
1315            instrument.price_precision(),
1316            instrument.size_precision(),
1317        );
1318
1319        let partial_bar = Bar::new(
1320            bar_type,
1321            Price::from("101.00"),
1322            Price::from("102.00"),
1323            Price::from("100.00"),
1324            Price::from("101.00"),
1325            Quantity::from(100),
1326            UnixNanos::from(1),
1327            UnixNanos::from(2),
1328        );
1329
1330        builder.set_partial(partial_bar);
1331        let bar = builder.build_now();
1332
1333        assert_eq!(bar.open, partial_bar.open);
1334        assert_eq!(bar.high, partial_bar.high);
1335        assert_eq!(bar.low, partial_bar.low);
1336        assert_eq!(bar.close, partial_bar.close);
1337        assert_eq!(bar.volume, partial_bar.volume);
1338        assert_eq!(builder.ts_last, 2);
1339    }
1340
1341    #[rstest]
1342    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1343        let instrument = InstrumentAny::Equity(equity_aapl);
1344        let bar_type = BarType::new(
1345            instrument.id(),
1346            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1347            AggregationSource::Internal,
1348        );
1349        let mut builder = BarBuilder::new(
1350            bar_type,
1351            instrument.price_precision(),
1352            instrument.size_precision(),
1353        );
1354
1355        builder.update(
1356            Price::from("100.00"),
1357            Quantity::from(1),
1358            UnixNanos::from(1000),
1359        );
1360        builder.update(
1361            Price::from("95.00"),
1362            Quantity::from(1),
1363            UnixNanos::from(2000),
1364        );
1365        builder.update(
1366            Price::from("105.00"),
1367            Quantity::from(1),
1368            UnixNanos::from(3000),
1369        );
1370
1371        let bar = builder.build_now();
1372        assert!(bar.high > bar.low);
1373        assert_eq!(bar.open, Price::from("100.00"));
1374        assert_eq!(bar.high, Price::from("105.00"));
1375        assert_eq!(bar.low, Price::from("95.00"));
1376        assert_eq!(bar.close, Price::from("105.00"));
1377    }
1378
1379    #[rstest]
1380    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1381        let instrument = InstrumentAny::Equity(equity_aapl);
1382        let bar_type = BarType::new(
1383            instrument.id(),
1384            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1385            AggregationSource::Internal,
1386        );
1387        let mut builder = BarBuilder::new(
1388            bar_type,
1389            instrument.price_precision(),
1390            instrument.size_precision(),
1391        );
1392
1393        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1394        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1395
1396        assert_eq!(builder.ts_last, 1_000);
1397        assert_eq!(builder.count, 1);
1398    }
1399
1400    #[rstest]
1401    fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1402        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1403        let bar_type = BarType::new(
1404            instrument.id(),
1405            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1406            AggregationSource::Internal,
1407        );
1408        let mut builder = BarBuilder::new(
1409            bar_type,
1410            instrument.price_precision(),
1411            instrument.size_precision(),
1412        );
1413
1414        let partial_bar = Bar::new(
1415            bar_type,
1416            Price::from("1.00001"),
1417            Price::from("1.00010"),
1418            Price::from("1.00000"),
1419            Price::from("1.00002"),
1420            Quantity::from(1),
1421            UnixNanos::from(1_000_000_000),
1422            UnixNanos::from(2_000_000_000),
1423        );
1424
1425        builder.set_partial(partial_bar);
1426        let bar = builder.build_now();
1427
1428        assert_eq!(bar.open, Price::from("1.00001"));
1429        assert_eq!(bar.high, Price::from("1.00010"));
1430        assert_eq!(bar.low, Price::from("1.00000"));
1431        assert_eq!(bar.close, Price::from("1.00002"));
1432        assert_eq!(bar.volume, Quantity::from(1));
1433        assert_eq!(bar.ts_init, 2_000_000_000);
1434        assert_eq!(builder.ts_last, 2_000_000_000);
1435    }
1436
1437    #[rstest]
1438    fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1439        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1440        let bar_type = BarType::new(
1441            instrument.id(),
1442            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1443            AggregationSource::Internal,
1444        );
1445        let mut builder = BarBuilder::new(
1446            bar_type,
1447            instrument.price_precision(),
1448            instrument.size_precision(),
1449        );
1450
1451        let partial_bar1 = Bar::new(
1452            bar_type,
1453            Price::from("1.00001"),
1454            Price::from("1.00010"),
1455            Price::from("1.00000"),
1456            Price::from("1.00002"),
1457            Quantity::from(1),
1458            UnixNanos::from(1_000_000_000),
1459            UnixNanos::from(1_000_000_000),
1460        );
1461
1462        let partial_bar2 = Bar::new(
1463            bar_type,
1464            Price::from("2.00001"),
1465            Price::from("2.00010"),
1466            Price::from("2.00000"),
1467            Price::from("2.00002"),
1468            Quantity::from(2),
1469            UnixNanos::from(3_000_000_000),
1470            UnixNanos::from(3_000_000_000),
1471        );
1472
1473        builder.set_partial(partial_bar1);
1474        builder.set_partial(partial_bar2);
1475        let bar = builder.build(
1476            UnixNanos::from(4_000_000_000),
1477            UnixNanos::from(4_000_000_000),
1478        );
1479
1480        assert_eq!(bar.open, Price::from("1.00001"));
1481        assert_eq!(bar.high, Price::from("1.00010"));
1482        assert_eq!(bar.low, Price::from("1.00000"));
1483        assert_eq!(bar.close, Price::from("1.00002"));
1484        assert_eq!(bar.volume, Quantity::from(1));
1485        assert_eq!(bar.ts_init, 4_000_000_000);
1486        assert_eq!(builder.ts_last, 1_000_000_000);
1487    }
1488
1489    #[rstest]
1490    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1491        let instrument = InstrumentAny::Equity(equity_aapl);
1492        let bar_type = BarType::new(
1493            instrument.id(),
1494            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1495            AggregationSource::Internal,
1496        );
1497        let mut builder = BarBuilder::new(
1498            bar_type,
1499            instrument.price_precision(),
1500            instrument.size_precision(),
1501        );
1502
1503        builder.update(
1504            Price::from("1.00000"),
1505            Quantity::from(1),
1506            UnixNanos::default(),
1507        );
1508
1509        assert!(builder.initialized);
1510        assert_eq!(builder.ts_last, 0);
1511        assert_eq!(builder.count, 1);
1512    }
1513
1514    #[rstest]
1515    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1516        equity_aapl: Equity,
1517    ) {
1518        let instrument = InstrumentAny::Equity(equity_aapl);
1519        let bar_type = BarType::new(
1520            instrument.id(),
1521            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1522            AggregationSource::Internal,
1523        );
1524        let mut builder = BarBuilder::new(bar_type, 2, 0);
1525
1526        builder.update(
1527            Price::from("1.00000"),
1528            Quantity::from(1),
1529            UnixNanos::from(1_000),
1530        );
1531        builder.update(
1532            Price::from("1.00001"),
1533            Quantity::from(1),
1534            UnixNanos::from(500),
1535        );
1536
1537        assert!(builder.initialized);
1538        assert_eq!(builder.ts_last, 1_000);
1539        assert_eq!(builder.count, 1);
1540    }
1541
1542    #[rstest]
1543    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1544        let instrument = InstrumentAny::Equity(equity_aapl);
1545        let bar_type = BarType::new(
1546            instrument.id(),
1547            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1548            AggregationSource::Internal,
1549        );
1550        let mut builder = BarBuilder::new(
1551            bar_type,
1552            instrument.price_precision(),
1553            instrument.size_precision(),
1554        );
1555
1556        for _ in 0..5 {
1557            builder.update(
1558                Price::from("1.00000"),
1559                Quantity::from(1),
1560                UnixNanos::from(1_000),
1561            );
1562        }
1563
1564        assert_eq!(builder.count, 5);
1565    }
1566
1567    #[rstest]
1568    #[should_panic]
1569    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1570        let instrument = InstrumentAny::Equity(equity_aapl);
1571        let bar_type = BarType::new(
1572            instrument.id(),
1573            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1574            AggregationSource::Internal,
1575        );
1576        let mut builder = BarBuilder::new(
1577            bar_type,
1578            instrument.price_precision(),
1579            instrument.size_precision(),
1580        );
1581        let _ = builder.build_now();
1582    }
1583
1584    #[rstest]
1585    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1586        let instrument = InstrumentAny::Equity(equity_aapl);
1587        let bar_type = BarType::new(
1588            instrument.id(),
1589            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1590            AggregationSource::Internal,
1591        );
1592        let mut builder = BarBuilder::new(
1593            bar_type,
1594            instrument.price_precision(),
1595            instrument.size_precision(),
1596        );
1597
1598        builder.update(
1599            Price::from("1.00001"),
1600            Quantity::from(2),
1601            UnixNanos::default(),
1602        );
1603        builder.update(
1604            Price::from("1.00002"),
1605            Quantity::from(2),
1606            UnixNanos::default(),
1607        );
1608        builder.update(
1609            Price::from("1.00000"),
1610            Quantity::from(1),
1611            UnixNanos::from(1_000_000_000),
1612        );
1613
1614        let bar = builder.build_now();
1615
1616        assert_eq!(bar.open, Price::from("1.00001"));
1617        assert_eq!(bar.high, Price::from("1.00002"));
1618        assert_eq!(bar.low, Price::from("1.00000"));
1619        assert_eq!(bar.close, Price::from("1.00000"));
1620        assert_eq!(bar.volume, Quantity::from(5));
1621        assert_eq!(bar.ts_init, 1_000_000_000);
1622        assert_eq!(builder.ts_last, 1_000_000_000);
1623        assert_eq!(builder.count, 0);
1624    }
1625
1626    #[rstest]
1627    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1628        let instrument = InstrumentAny::Equity(equity_aapl);
1629        let bar_type = BarType::new(
1630            instrument.id(),
1631            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1632            AggregationSource::Internal,
1633        );
1634        let mut builder = BarBuilder::new(bar_type, 2, 0);
1635
1636        builder.update(
1637            Price::from("1.00001"),
1638            Quantity::from(1),
1639            UnixNanos::default(),
1640        );
1641        builder.build_now();
1642
1643        builder.update(
1644            Price::from("1.00000"),
1645            Quantity::from(1),
1646            UnixNanos::default(),
1647        );
1648        builder.update(
1649            Price::from("1.00003"),
1650            Quantity::from(1),
1651            UnixNanos::default(),
1652        );
1653        builder.update(
1654            Price::from("1.00002"),
1655            Quantity::from(1),
1656            UnixNanos::default(),
1657        );
1658
1659        let bar = builder.build_now();
1660
1661        assert_eq!(bar.open, Price::from("1.00000"));
1662        assert_eq!(bar.high, Price::from("1.00003"));
1663        assert_eq!(bar.low, Price::from("1.00000"));
1664        assert_eq!(bar.close, Price::from("1.00002"));
1665        assert_eq!(bar.volume, Quantity::from(3));
1666    }
1667
1668    #[rstest]
1669    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1670        let instrument = InstrumentAny::Equity(equity_aapl);
1671        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1672        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1673        let handler = Arc::new(Mutex::new(Vec::new()));
1674        let handler_clone = Arc::clone(&handler);
1675
1676        let mut aggregator = TickBarAggregator::new(
1677            bar_type,
1678            instrument.price_precision(),
1679            instrument.size_precision(),
1680            move |bar: Bar| {
1681                let mut handler_guard = handler_clone.lock().unwrap();
1682                handler_guard.push(bar);
1683            },
1684            false,
1685        );
1686
1687        let trade = TradeTick::default();
1688        aggregator.handle_trade(trade);
1689
1690        let handler_guard = handler.lock().unwrap();
1691        assert_eq!(handler_guard.len(), 0);
1692    }
1693
1694    #[rstest]
1695    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1696        let instrument = InstrumentAny::Equity(equity_aapl);
1697        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1698        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1699        let handler = Arc::new(Mutex::new(Vec::new()));
1700        let handler_clone = Arc::clone(&handler);
1701
1702        let mut aggregator = TickBarAggregator::new(
1703            bar_type,
1704            instrument.price_precision(),
1705            instrument.size_precision(),
1706            move |bar: Bar| {
1707                let mut handler_guard = handler_clone.lock().unwrap();
1708                handler_guard.push(bar);
1709            },
1710            false,
1711        );
1712
1713        let trade = TradeTick::default();
1714        aggregator.handle_trade(trade);
1715        aggregator.handle_trade(trade);
1716        aggregator.handle_trade(trade);
1717
1718        let handler_guard = handler.lock().unwrap();
1719        let bar = handler_guard.first().unwrap();
1720        assert_eq!(handler_guard.len(), 1);
1721        assert_eq!(bar.open, trade.price);
1722        assert_eq!(bar.high, trade.price);
1723        assert_eq!(bar.low, trade.price);
1724        assert_eq!(bar.close, trade.price);
1725        assert_eq!(bar.volume, Quantity::from(300000));
1726        assert_eq!(bar.ts_event, trade.ts_event);
1727        assert_eq!(bar.ts_init, trade.ts_init);
1728    }
1729
1730    #[rstest]
1731    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1732        let instrument = InstrumentAny::Equity(equity_aapl);
1733        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1734        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1735        let handler = Arc::new(Mutex::new(Vec::new()));
1736        let handler_clone = Arc::clone(&handler);
1737
1738        let mut aggregator = TickBarAggregator::new(
1739            bar_type,
1740            instrument.price_precision(),
1741            instrument.size_precision(),
1742            move |bar: Bar| {
1743                let mut handler_guard = handler_clone.lock().unwrap();
1744                handler_guard.push(bar);
1745            },
1746            false,
1747        );
1748
1749        aggregator.update(
1750            Price::from("1.00001"),
1751            Quantity::from(1),
1752            UnixNanos::default(),
1753        );
1754        aggregator.update(
1755            Price::from("1.00002"),
1756            Quantity::from(1),
1757            UnixNanos::from(1000),
1758        );
1759        aggregator.update(
1760            Price::from("1.00003"),
1761            Quantity::from(1),
1762            UnixNanos::from(2000),
1763        );
1764
1765        let handler_guard = handler.lock().unwrap();
1766        assert_eq!(handler_guard.len(), 1);
1767
1768        let bar = handler_guard.first().unwrap();
1769        assert_eq!(bar.open, Price::from("1.00001"));
1770        assert_eq!(bar.high, Price::from("1.00003"));
1771        assert_eq!(bar.low, Price::from("1.00001"));
1772        assert_eq!(bar.close, Price::from("1.00003"));
1773        assert_eq!(bar.volume, Quantity::from(3));
1774    }
1775
1776    #[rstest]
1777    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1778        let instrument = InstrumentAny::Equity(equity_aapl);
1779        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1780        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1781        let handler = Arc::new(Mutex::new(Vec::new()));
1782        let handler_clone = Arc::clone(&handler);
1783
1784        let mut aggregator = TickBarAggregator::new(
1785            bar_type,
1786            instrument.price_precision(),
1787            instrument.size_precision(),
1788            move |bar: Bar| {
1789                let mut handler_guard = handler_clone.lock().unwrap();
1790                handler_guard.push(bar);
1791            },
1792            false,
1793        );
1794
1795        aggregator.update(
1796            Price::from("1.00001"),
1797            Quantity::from(1),
1798            UnixNanos::default(),
1799        );
1800        aggregator.update(
1801            Price::from("1.00002"),
1802            Quantity::from(1),
1803            UnixNanos::from(1000),
1804        );
1805        aggregator.update(
1806            Price::from("1.00003"),
1807            Quantity::from(1),
1808            UnixNanos::from(2000),
1809        );
1810        aggregator.update(
1811            Price::from("1.00004"),
1812            Quantity::from(1),
1813            UnixNanos::from(3000),
1814        );
1815
1816        let handler_guard = handler.lock().unwrap();
1817        assert_eq!(handler_guard.len(), 2);
1818
1819        let bar1 = &handler_guard[0];
1820        assert_eq!(bar1.open, Price::from("1.00001"));
1821        assert_eq!(bar1.close, Price::from("1.00002"));
1822        assert_eq!(bar1.volume, Quantity::from(2));
1823
1824        let bar2 = &handler_guard[1];
1825        assert_eq!(bar2.open, Price::from("1.00003"));
1826        assert_eq!(bar2.close, Price::from("1.00004"));
1827        assert_eq!(bar2.volume, Quantity::from(2));
1828    }
1829
1830    #[rstest]
1831    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1832        let instrument = InstrumentAny::Equity(equity_aapl);
1833        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1834        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1835        let handler = Arc::new(Mutex::new(Vec::new()));
1836        let handler_clone = Arc::clone(&handler);
1837
1838        let mut aggregator = VolumeBarAggregator::new(
1839            bar_type,
1840            instrument.price_precision(),
1841            instrument.size_precision(),
1842            move |bar: Bar| {
1843                let mut handler_guard = handler_clone.lock().unwrap();
1844                handler_guard.push(bar);
1845            },
1846            false,
1847        );
1848
1849        aggregator.update(
1850            Price::from("1.00001"),
1851            Quantity::from(25),
1852            UnixNanos::default(),
1853        );
1854
1855        let handler_guard = handler.lock().unwrap();
1856        assert_eq!(handler_guard.len(), 2);
1857        let bar1 = &handler_guard[0];
1858        assert_eq!(bar1.volume, Quantity::from(10));
1859        let bar2 = &handler_guard[1];
1860        assert_eq!(bar2.volume, Quantity::from(10));
1861    }
1862
1863    #[rstest]
1864    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1865        let instrument = InstrumentAny::Equity(equity_aapl);
1866        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
1867        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1868        let handler = Arc::new(Mutex::new(Vec::new()));
1869        let handler_clone = Arc::clone(&handler);
1870
1871        let mut aggregator = ValueBarAggregator::new(
1872            bar_type,
1873            instrument.price_precision(),
1874            instrument.size_precision(),
1875            move |bar: Bar| {
1876                let mut handler_guard = handler_clone.lock().unwrap();
1877                handler_guard.push(bar);
1878            },
1879            false,
1880        );
1881
1882        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
1883        aggregator.update(
1884            Price::from("100.00"),
1885            Quantity::from(5),
1886            UnixNanos::default(),
1887        );
1888        aggregator.update(
1889            Price::from("100.00"),
1890            Quantity::from(5),
1891            UnixNanos::from(1000),
1892        );
1893
1894        let handler_guard = handler.lock().unwrap();
1895        assert_eq!(handler_guard.len(), 1);
1896        let bar = handler_guard.first().unwrap();
1897        assert_eq!(bar.volume, Quantity::from(10));
1898    }
1899
1900    #[rstest]
1901    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1902        let instrument = InstrumentAny::Equity(equity_aapl);
1903        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1904        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1905        let handler = Arc::new(Mutex::new(Vec::new()));
1906        let handler_clone = Arc::clone(&handler);
1907
1908        let mut aggregator = ValueBarAggregator::new(
1909            bar_type,
1910            instrument.price_precision(),
1911            instrument.size_precision(),
1912            move |bar: Bar| {
1913                let mut handler_guard = handler_clone.lock().unwrap();
1914                handler_guard.push(bar);
1915            },
1916            false,
1917        );
1918
1919        // Single large update: $100 * 25 = $2500 (should create 2 bars)
1920        aggregator.update(
1921            Price::from("100.00"),
1922            Quantity::from(25),
1923            UnixNanos::default(),
1924        );
1925
1926        let handler_guard = handler.lock().unwrap();
1927        assert_eq!(handler_guard.len(), 2);
1928        let remaining_value = aggregator.get_cumulative_value();
1929        assert!(remaining_value < 1000.0); // Should be less than threshold
1930    }
1931
1932    #[rstest]
1933    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1934        let instrument = InstrumentAny::Equity(equity_aapl);
1935        // One second bars
1936        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1937        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1938        let handler = Arc::new(Mutex::new(Vec::new()));
1939        let handler_clone = Arc::clone(&handler);
1940        let clock = Rc::new(RefCell::new(TestClock::new()));
1941
1942        let mut aggregator = TimeBarAggregator::new(
1943            bar_type,
1944            instrument.price_precision(),
1945            instrument.size_precision(),
1946            clock.clone(),
1947            move |bar: Bar| {
1948                let mut handler_guard = handler_clone.lock().unwrap();
1949                handler_guard.push(bar);
1950            },
1951            false, // await_partial
1952            true,  // build_with_no_updates
1953            false, // timestamp_on_close
1954            BarIntervalType::LeftOpen,
1955            None,  // time_bars_origin_offset
1956            15,    // bar_build_delay
1957            false, // skip_first_non_full_bar
1958        );
1959
1960        aggregator.update(
1961            Price::from("100.00"),
1962            Quantity::from(1),
1963            UnixNanos::default(),
1964        );
1965
1966        let next_sec = UnixNanos::from(1_000_000_000);
1967        clock.borrow_mut().set_time(next_sec);
1968
1969        let event = TimeEvent::new(
1970            Ustr::from("1-SECOND-LAST"),
1971            UUID4::new(),
1972            next_sec,
1973            next_sec,
1974        );
1975        aggregator.build_bar(event);
1976
1977        let handler_guard = handler.lock().unwrap();
1978        assert_eq!(handler_guard.len(), 1);
1979        let bar = handler_guard.first().unwrap();
1980        assert_eq!(bar.ts_event, UnixNanos::default());
1981        assert_eq!(bar.ts_init, next_sec);
1982    }
1983
1984    #[rstest]
1985    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1986        let instrument = InstrumentAny::Equity(equity_aapl);
1987        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1988        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1989        let handler = Arc::new(Mutex::new(Vec::new()));
1990        let handler_clone = Arc::clone(&handler);
1991        let clock = Rc::new(RefCell::new(TestClock::new()));
1992
1993        let mut aggregator = TimeBarAggregator::new(
1994            bar_type,
1995            instrument.price_precision(),
1996            instrument.size_precision(),
1997            clock.clone(),
1998            move |bar: Bar| {
1999                let mut handler_guard = handler_clone.lock().unwrap();
2000                handler_guard.push(bar);
2001            },
2002            false, // await_partial
2003            true,  // build_with_no_updates
2004            true,  // timestamp_on_close - changed to true to verify left-open behavior
2005            BarIntervalType::LeftOpen,
2006            None,
2007            15,
2008            false, // skip_first_non_full_bar
2009        );
2010
2011        // Update in first interval
2012        aggregator.update(
2013            Price::from("100.00"),
2014            Quantity::from(1),
2015            UnixNanos::default(),
2016        );
2017
2018        // First interval close
2019        let ts1 = UnixNanos::from(1_000_000_000);
2020        clock.borrow_mut().set_time(ts1);
2021        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2022        aggregator.build_bar(event);
2023
2024        // Update in second interval
2025        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2026
2027        // Second interval close
2028        let ts2 = UnixNanos::from(2_000_000_000);
2029        clock.borrow_mut().set_time(ts2);
2030        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2031        aggregator.build_bar(event);
2032
2033        let handler_guard = handler.lock().unwrap();
2034        assert_eq!(handler_guard.len(), 2);
2035
2036        let bar1 = &handler_guard[0];
2037        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
2038        assert_eq!(bar1.ts_init, ts1);
2039        assert_eq!(bar1.close, Price::from("100.00"));
2040        let bar2 = &handler_guard[1];
2041        assert_eq!(bar2.ts_event, ts2);
2042        assert_eq!(bar2.ts_init, ts2);
2043        assert_eq!(bar2.close, Price::from("101.00"));
2044    }
2045
2046    #[rstest]
2047    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
2048        let instrument = InstrumentAny::Equity(equity_aapl);
2049        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2050        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2051        let handler = Arc::new(Mutex::new(Vec::new()));
2052        let handler_clone = Arc::clone(&handler);
2053        let clock = Rc::new(RefCell::new(TestClock::new()));
2054        let mut aggregator = TimeBarAggregator::new(
2055            bar_type,
2056            instrument.price_precision(),
2057            instrument.size_precision(),
2058            clock.clone(),
2059            move |bar: Bar| {
2060                let mut handler_guard = handler_clone.lock().unwrap();
2061                handler_guard.push(bar);
2062            },
2063            false, // await_partial
2064            true,  // build_with_no_updates
2065            true,  // timestamp_on_close
2066            BarIntervalType::RightOpen,
2067            None,
2068            15,
2069            false, // skip_first_non_full_bar
2070        );
2071
2072        // Update in first interval
2073        aggregator.update(
2074            Price::from("100.00"),
2075            Quantity::from(1),
2076            UnixNanos::default(),
2077        );
2078
2079        // First interval close
2080        let ts1 = UnixNanos::from(1_000_000_000);
2081        clock.borrow_mut().set_time(ts1);
2082        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2083        aggregator.build_bar(event);
2084
2085        // Update in second interval
2086        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2087
2088        // Second interval close
2089        let ts2 = UnixNanos::from(2_000_000_000);
2090        clock.borrow_mut().set_time(ts2);
2091        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2092        aggregator.build_bar(event);
2093
2094        let handler_guard = handler.lock().unwrap();
2095        assert_eq!(handler_guard.len(), 2);
2096
2097        let bar1 = &handler_guard[0];
2098        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
2099        assert_eq!(bar1.ts_init, ts1);
2100        assert_eq!(bar1.close, Price::from("100.00"));
2101
2102        let bar2 = &handler_guard[1];
2103        assert_eq!(bar2.ts_event, ts1);
2104        assert_eq!(bar2.ts_init, ts2);
2105        assert_eq!(bar2.close, Price::from("101.00"));
2106    }
2107
2108    #[rstest]
2109    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2110        let instrument = InstrumentAny::Equity(equity_aapl);
2111        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2112        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2113        let handler = Arc::new(Mutex::new(Vec::new()));
2114        let handler_clone = Arc::clone(&handler);
2115        let clock = Rc::new(RefCell::new(TestClock::new()));
2116
2117        // First test with build_with_no_updates = false
2118        let mut aggregator = TimeBarAggregator::new(
2119            bar_type,
2120            instrument.price_precision(),
2121            instrument.size_precision(),
2122            clock.clone(),
2123            move |bar: Bar| {
2124                let mut handler_guard = handler_clone.lock().unwrap();
2125                handler_guard.push(bar);
2126            },
2127            false, // await_partial
2128            false, // build_with_no_updates disabled
2129            true,  // timestamp_on_close
2130            BarIntervalType::LeftOpen,
2131            None,
2132            15,
2133            false, // skip_first_non_full_bar
2134        );
2135
2136        // No updates, just interval close
2137        let ts1 = UnixNanos::from(1_000_000_000);
2138        clock.borrow_mut().set_time(ts1);
2139        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2140        aggregator.build_bar(event);
2141
2142        let handler_guard = handler.lock().unwrap();
2143        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
2144        drop(handler_guard);
2145
2146        // Now test with build_with_no_updates = true
2147        let handler = Arc::new(Mutex::new(Vec::new()));
2148        let handler_clone = Arc::clone(&handler);
2149        let mut aggregator = TimeBarAggregator::new(
2150            bar_type,
2151            instrument.price_precision(),
2152            instrument.size_precision(),
2153            clock.clone(),
2154            move |bar: Bar| {
2155                let mut handler_guard = handler_clone.lock().unwrap();
2156                handler_guard.push(bar);
2157            },
2158            false,
2159            true, // build_with_no_updates enabled
2160            true, // timestamp_on_close
2161            BarIntervalType::LeftOpen,
2162            None,
2163            15,
2164            false, // skip_first_non_full_bar
2165        );
2166
2167        aggregator.update(
2168            Price::from("100.00"),
2169            Quantity::from(1),
2170            UnixNanos::default(),
2171        );
2172
2173        // First interval with update
2174        let ts1 = UnixNanos::from(1_000_000_000);
2175        clock.borrow_mut().set_time(ts1);
2176        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2177        aggregator.build_bar(event);
2178
2179        // Second interval without updates
2180        let ts2 = UnixNanos::from(2_000_000_000);
2181        clock.borrow_mut().set_time(ts2);
2182        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2183        aggregator.build_bar(event);
2184
2185        let handler_guard = handler.lock().unwrap();
2186        assert_eq!(handler_guard.len(), 2); // Both bars should be built
2187        let bar1 = &handler_guard[0];
2188        assert_eq!(bar1.close, Price::from("100.00"));
2189        let bar2 = &handler_guard[1];
2190        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
2191    }
2192
2193    #[rstest]
2194    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2195        let instrument = InstrumentAny::Equity(equity_aapl);
2196        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2197        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2198        let clock = Rc::new(RefCell::new(TestClock::new()));
2199        let handler = Arc::new(Mutex::new(Vec::new()));
2200        let handler_clone = Arc::clone(&handler);
2201
2202        let mut aggregator = TimeBarAggregator::new(
2203            bar_type,
2204            instrument.price_precision(),
2205            instrument.size_precision(),
2206            clock.clone(),
2207            move |bar: Bar| {
2208                let mut handler_guard = handler_clone.lock().unwrap();
2209                handler_guard.push(bar);
2210            },
2211            false, // await_partial
2212            true,  // build_with_no_updates
2213            true,  // timestamp_on_close
2214            BarIntervalType::RightOpen,
2215            None,
2216            15,
2217            false, // skip_first_non_full_bar
2218        );
2219
2220        let ts1 = UnixNanos::from(1_000_000_000);
2221        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2222
2223        let ts2 = UnixNanos::from(2_000_000_000);
2224        clock.borrow_mut().set_time(ts2);
2225
2226        // Simulate timestamp on close
2227        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2228        aggregator.build_bar(event);
2229
2230        let handler_guard = handler.lock().unwrap();
2231        let bar = handler_guard.first().unwrap();
2232        assert_eq!(bar.ts_event, UnixNanos::default());
2233        assert_eq!(bar.ts_init, ts2);
2234    }
2235
2236    #[rstest]
2237    fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2238        let instrument = InstrumentAny::Equity(equity_aapl);
2239        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2240        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2241        let clock = Rc::new(RefCell::new(TestClock::new()));
2242        let handler = Arc::new(Mutex::new(Vec::new()));
2243        let handler_clone = Arc::clone(&handler);
2244
2245        let mut aggregator = TimeBarAggregator::new(
2246            bar_type,
2247            instrument.price_precision(),
2248            instrument.size_precision(),
2249            clock.clone(),
2250            move |bar: Bar| {
2251                let mut handler_guard = handler_clone.lock().unwrap();
2252                handler_guard.push(bar);
2253            },
2254            false, // await_partial
2255            true,  // build_with_no_updates
2256            true,  // timestamp_on_close
2257            BarIntervalType::LeftOpen,
2258            None,
2259            15,
2260            false, // skip_first_non_full_bar
2261        );
2262
2263        let ts1 = UnixNanos::from(1_000_000_000);
2264        clock.borrow_mut().set_time(ts1);
2265
2266        let initial_time = clock.borrow().utc_now();
2267        aggregator.start_batch_time(UnixNanos::from(
2268            initial_time.timestamp_nanos_opt().unwrap() as u64
2269        ));
2270
2271        let handler_guard = handler.lock().unwrap();
2272        assert_eq!(handler_guard.len(), 0);
2273    }
2274}