nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17//!
18//! Defines the `BarAggregator` trait and core aggregation types (tick, volume, value, time),
19//! along with the `BarBuilder` and `BarAggregatorCore` helpers for constructing bars.
20
21use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25    clock::Clock,
26    timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29    UnixNanos,
30    correctness::{self, FAILED},
31    datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34    data::{
35        QuoteTick, TradeTick,
36        bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37    },
38    enums::{AggregationSource, BarAggregation, BarIntervalType},
39    types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
40};
41
42/// Trait for aggregating incoming price and trade events into time-, tick-, volume-, or value-based bars.
43///
44/// Implementors receive updates and produce completed bars via handlers, with support for partial and batch updates.
45pub trait BarAggregator: Any + Debug {
46    /// The [`BarType`] to be aggregated.
47    fn bar_type(&self) -> BarType;
48    /// If the aggregator is running and will receive data from the message bus.
49    fn is_running(&self) -> bool;
50    fn set_await_partial(&mut self, value: bool);
51    /// Enables or disables awaiting a partial bar before full aggregation.
52    fn set_is_running(&mut self, value: bool);
53    /// Sets the running state of the aggregator (receiving updates when `true`).
54    /// Updates the aggregator  with the given price and size.
55    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
56    /// Updates the aggregator with the given quote.
57    fn handle_quote(&mut self, quote: QuoteTick) {
58        let spec = self.bar_type().spec();
59        if !self.await_partial() {
60            self.update(
61                quote.extract_price(spec.price_type),
62                quote.extract_size(spec.price_type),
63                quote.ts_event,
64            );
65        }
66    }
67    /// Updates the aggregator with the given trade.
68    fn handle_trade(&mut self, trade: TradeTick) {
69        if !self.await_partial() {
70            self.update(trade.price, trade.size, trade.ts_event);
71        }
72    }
73    /// Updates the aggregator with the given bar.
74    fn handle_bar(&mut self, bar: Bar) {
75        if !self.await_partial() {
76            self.update_bar(bar, bar.volume, bar.ts_init);
77        }
78    }
79    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
80    /// Incorporates an existing bar and its volume into aggregation at the given init timestamp.
81    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
82    /// Starts batch mode, sending bars to the supplied handler for the given time context.
83    fn stop_batch_update(&mut self);
84    /// Stops batch mode and restores the standard bar handler.
85    fn await_partial(&self) -> bool;
86    /// Returns `true` if awaiting a partial bar before processing updates.
87    /// Sets the initial values for a partially completed bar.
88    fn set_partial(&mut self, partial_bar: Bar);
89    /// Stop the aggregator, e.g., cancel timers. Default is no-op.
90    fn stop(&mut self) {}
91}
92
93impl dyn BarAggregator {
94    /// Returns a reference to this aggregator as `Any` for downcasting.
95    pub fn as_any(&self) -> &dyn Any {
96        self
97    }
98    /// Returns a mutable reference to this aggregator as `Any` for downcasting.
99    pub fn as_any_mut(&mut self) -> &mut dyn Any {
100        self
101    }
102}
103
104/// Provides a generic bar builder for aggregation.
105#[derive(Debug)]
106pub struct BarBuilder {
107    bar_type: BarType,
108    price_precision: u8,
109    size_precision: u8,
110    initialized: bool,
111    ts_last: UnixNanos,
112    count: usize,
113    partial_set: bool,
114    last_close: Option<Price>,
115    open: Option<Price>,
116    high: Option<Price>,
117    low: Option<Price>,
118    close: Option<Price>,
119    volume: Quantity,
120}
121
122impl BarBuilder {
123    /// Creates a new [`BarBuilder`] instance.
124    ///
125    /// # Panics
126    ///
127    /// This function panics if:
128    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
129    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
130    #[must_use]
131    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
132        correctness::check_equal(
133            &bar_type.aggregation_source(),
134            &AggregationSource::Internal,
135            "bar_type.aggregation_source",
136            "AggregationSource::Internal",
137        )
138        .expect(FAILED);
139
140        Self {
141            bar_type,
142            price_precision,
143            size_precision,
144            initialized: false,
145            ts_last: UnixNanos::default(),
146            count: 0,
147            partial_set: false,
148            last_close: None,
149            open: None,
150            high: None,
151            low: None,
152            close: None,
153            volume: Quantity::zero(size_precision),
154        }
155    }
156
157    /// Set the initial values for a partially completed bar.
158    ///
159    /// # Panics
160    ///
161    /// Panics if internal values for `high` or `low` are unexpectedly missing.
162    pub fn set_partial(&mut self, partial_bar: Bar) {
163        if self.partial_set {
164            return; // Already updated
165        }
166
167        self.open = Some(partial_bar.open);
168
169        if self.high.is_none() || partial_bar.high > self.high.unwrap() {
170            self.high = Some(partial_bar.high);
171        }
172
173        if self.low.is_none() || partial_bar.low < self.low.unwrap() {
174            self.low = Some(partial_bar.low);
175        }
176
177        if self.close.is_none() {
178            self.close = Some(partial_bar.close);
179        }
180
181        self.volume = partial_bar.volume;
182
183        if self.ts_last == 0 {
184            self.ts_last = partial_bar.ts_init;
185        }
186
187        self.partial_set = true;
188        self.initialized = true;
189    }
190
191    /// Updates the builder state with the given price, size, and event timestamp.
192    ///
193    /// # Panics
194    ///
195    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
196    pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
197        if ts_event < self.ts_last {
198            return; // Not applicable
199        }
200
201        if self.open.is_none() {
202            self.open = Some(price);
203            self.high = Some(price);
204            self.low = Some(price);
205            self.initialized = true;
206        } else {
207            if price > self.high.unwrap() {
208                self.high = Some(price);
209            }
210            if price < self.low.unwrap() {
211                self.low = Some(price);
212            }
213        }
214
215        self.close = Some(price);
216        self.volume = self.volume.add(size);
217        self.count += 1;
218        self.ts_last = ts_event;
219    }
220
221    /// Updates the builder state with a completed bar, its volume, and the bar init timestamp.
222    ///
223    /// # Panics
224    ///
225    /// Panics if `high` or `low` values are unexpectedly `None` when updating.
226    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
227        if ts_init < self.ts_last {
228            return; // Not applicable
229        }
230
231        if self.open.is_none() {
232            self.open = Some(bar.open);
233            self.high = Some(bar.high);
234            self.low = Some(bar.low);
235            self.initialized = true;
236        } else {
237            if bar.high > self.high.unwrap() {
238                self.high = Some(bar.high);
239            }
240            if bar.low < self.low.unwrap() {
241                self.low = Some(bar.low);
242            }
243        }
244
245        self.close = Some(bar.close);
246        self.volume = self.volume.add(volume);
247        self.count += 1;
248        self.ts_last = ts_init;
249    }
250
251    /// Reset the bar builder.
252    ///
253    /// All stateful fields are reset to their initial value.
254    pub fn reset(&mut self) {
255        self.open = None;
256        self.high = None;
257        self.low = None;
258        self.volume = Quantity::zero(self.size_precision);
259        self.count = 0;
260    }
261
262    /// Return the aggregated bar and reset.
263    pub fn build_now(&mut self) -> Bar {
264        self.build(self.ts_last, self.ts_last)
265    }
266
267    /// Returns the aggregated bar for the given timestamps, then resets the builder.
268    ///
269    /// # Panics
270    ///
271    /// Panics if `open`, `high`, `low`, or `close` values are `None` when building the bar.
272    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
273        if self.open.is_none() {
274            self.open = self.last_close;
275            self.high = self.last_close;
276            self.low = self.last_close;
277            self.close = self.last_close;
278        }
279
280        if let (Some(close), Some(low)) = (self.close, self.low)
281            && close < low
282        {
283            self.low = Some(close);
284        }
285
286        if let (Some(close), Some(high)) = (self.close, self.high)
287            && close > high
288        {
289            self.high = Some(close);
290        }
291
292        // SAFETY: The open was checked, so we can assume all prices are Some
293        let bar = Bar::new(
294            self.bar_type,
295            self.open.unwrap(),
296            self.high.unwrap(),
297            self.low.unwrap(),
298            self.close.unwrap(),
299            self.volume,
300            ts_event,
301            ts_init,
302        );
303
304        self.last_close = self.close;
305        self.reset();
306        bar
307    }
308}
309
310/// Provides a means of aggregating specified bar types and sending to a registered handler.
311pub struct BarAggregatorCore<H>
312where
313    H: FnMut(Bar),
314{
315    bar_type: BarType,
316    builder: BarBuilder,
317    handler: H,
318    handler_backup: Option<H>,
319    batch_handler: Option<Box<dyn FnMut(Bar)>>,
320    await_partial: bool,
321    is_running: bool,
322    batch_mode: bool,
323}
324
325impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        f.debug_struct(stringify!(BarAggregatorCore))
328            .field("bar_type", &self.bar_type)
329            .field("builder", &self.builder)
330            .field("await_partial", &self.await_partial)
331            .field("is_running", &self.is_running)
332            .field("batch_mode", &self.batch_mode)
333            .finish()
334    }
335}
336
337impl<H> BarAggregatorCore<H>
338where
339    H: FnMut(Bar),
340{
341    /// Creates a new [`BarAggregatorCore`] instance.
342    ///
343    /// # Panics
344    ///
345    /// This function panics if:
346    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
347    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
348    pub fn new(
349        bar_type: BarType,
350        price_precision: u8,
351        size_precision: u8,
352        handler: H,
353        await_partial: bool,
354    ) -> Self {
355        Self {
356            bar_type,
357            builder: BarBuilder::new(bar_type, price_precision, size_precision),
358            handler,
359            handler_backup: None,
360            batch_handler: None,
361            await_partial,
362            is_running: false,
363            batch_mode: false,
364        }
365    }
366
367    /// Sets whether to await a partial bar before processing new updates.
368    pub const fn set_await_partial(&mut self, value: bool) {
369        self.await_partial = value;
370    }
371
372    /// Sets the running state of the aggregator (receives updates when `true`).
373    pub const fn set_is_running(&mut self, value: bool) {
374        self.is_running = value;
375    }
376
377    /// Returns `true` if the aggregator is awaiting a partial bar to complete before aggregation.
378    pub const fn await_partial(&self) -> bool {
379        self.await_partial
380    }
381
382    /// Initializes builder state with a partially completed bar.
383    pub fn set_partial(&mut self, partial_bar: Bar) {
384        self.builder.set_partial(partial_bar);
385    }
386
387    fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
388        self.builder.update(price, size, ts_event);
389    }
390
391    fn build_now_and_send(&mut self) {
392        let bar = self.builder.build_now();
393        (self.handler)(bar);
394    }
395
396    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
397        let bar = self.builder.build(ts_event, ts_init);
398
399        if self.batch_mode {
400            if let Some(handler) = &mut self.batch_handler {
401                handler(bar);
402            }
403        } else {
404            (self.handler)(bar);
405        }
406    }
407
408    /// Enables batch update mode, sending bars to the provided handler instead of immediate dispatch.
409    pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
410        self.batch_mode = true;
411        self.batch_handler = Some(handler);
412    }
413
414    /// Disables batch update mode and restores the original bar handler.
415    pub fn stop_batch_update(&mut self) {
416        self.batch_mode = false;
417
418        if let Some(handler) = self.handler_backup.take() {
419            self.handler = handler;
420        }
421    }
422}
423
424/// Provides a means of building tick bars aggregated from quote and trades.
425///
426/// When received tick count reaches the step threshold of the bar
427/// specification, then a bar is created and sent to the handler.
428pub struct TickBarAggregator<H>
429where
430    H: FnMut(Bar),
431{
432    core: BarAggregatorCore<H>,
433    cum_value: f64,
434}
435
436impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
437    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438        f.debug_struct(stringify!(TickBarAggregator))
439            .field("core", &self.core)
440            .field("cum_value", &self.cum_value)
441            .finish()
442    }
443}
444
445impl<H> TickBarAggregator<H>
446where
447    H: FnMut(Bar),
448{
449    /// Creates a new [`TickBarAggregator`] instance.
450    ///
451    /// # Panics
452    ///
453    /// This function panics if:
454    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
455    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
456    pub fn new(
457        bar_type: BarType,
458        price_precision: u8,
459        size_precision: u8,
460        handler: H,
461        await_partial: bool,
462    ) -> Self {
463        Self {
464            core: BarAggregatorCore::new(
465                bar_type,
466                price_precision,
467                size_precision,
468                handler,
469                await_partial,
470            ),
471            cum_value: 0.0,
472        }
473    }
474}
475
476impl<H> BarAggregator for TickBarAggregator<H>
477where
478    H: FnMut(Bar) + 'static,
479{
480    fn bar_type(&self) -> BarType {
481        self.core.bar_type
482    }
483
484    fn is_running(&self) -> bool {
485        self.core.is_running
486    }
487
488    fn set_await_partial(&mut self, value: bool) {
489        self.core.set_await_partial(value);
490    }
491
492    fn set_is_running(&mut self, value: bool) {
493        self.core.set_is_running(value);
494    }
495
496    fn await_partial(&self) -> bool {
497        self.core.await_partial()
498    }
499
500    /// Apply the given update to the aggregator.
501    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
502        self.core.apply_update(price, size, ts_event);
503        let spec = self.core.bar_type.spec();
504
505        if self.core.builder.count >= spec.step.get() {
506            self.core.build_now_and_send();
507        }
508    }
509
510    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
511        let mut volume_update = volume;
512        let average_price = Price::new(
513            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
514            self.core.builder.price_precision,
515        );
516
517        while volume_update.as_f64() > 0.0 {
518            let value_update = average_price.as_f64() * volume_update.as_f64();
519            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
520                self.cum_value += value_update;
521                self.core.builder.update_bar(bar, volume_update, ts_init);
522                break;
523            }
524
525            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
526            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
527            self.core.builder.update_bar(
528                bar,
529                Quantity::new(volume_diff, volume_update.precision),
530                ts_init,
531            );
532
533            self.core.build_now_and_send();
534            self.cum_value = 0.0;
535            volume_update = Quantity::new(
536                volume_update.as_f64() - volume_diff,
537                volume_update.precision,
538            );
539        }
540    }
541
542    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
543        self.core.start_batch_update(handler);
544    }
545
546    fn stop_batch_update(&mut self) {
547        self.core.stop_batch_update();
548    }
549
550    fn set_partial(&mut self, partial_bar: Bar) {
551        self.core.set_partial(partial_bar);
552    }
553}
554
555/// Provides a means of building volume bars aggregated from quote and trades.
556pub struct VolumeBarAggregator<H>
557where
558    H: FnMut(Bar),
559{
560    core: BarAggregatorCore<H>,
561}
562
563impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
564    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
565        f.debug_struct(stringify!(VolumeBarAggregator))
566            .field("core", &self.core)
567            .finish()
568    }
569}
570
571impl<H> VolumeBarAggregator<H>
572where
573    H: FnMut(Bar),
574{
575    /// Creates a new [`VolumeBarAggregator`] instance.
576    ///
577    /// # Panics
578    ///
579    /// This function panics if:
580    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
581    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
582    pub fn new(
583        bar_type: BarType,
584        price_precision: u8,
585        size_precision: u8,
586        handler: H,
587        await_partial: bool,
588    ) -> Self {
589        Self {
590            core: BarAggregatorCore::new(
591                bar_type.standard(),
592                price_precision,
593                size_precision,
594                handler,
595                await_partial,
596            ),
597        }
598    }
599}
600
601impl<H> BarAggregator for VolumeBarAggregator<H>
602where
603    H: FnMut(Bar) + 'static,
604{
605    fn bar_type(&self) -> BarType {
606        self.core.bar_type
607    }
608
609    fn is_running(&self) -> bool {
610        self.core.is_running
611    }
612
613    fn set_await_partial(&mut self, value: bool) {
614        self.core.set_await_partial(value);
615    }
616
617    fn set_is_running(&mut self, value: bool) {
618        self.core.set_is_running(value);
619    }
620
621    fn await_partial(&self) -> bool {
622        self.core.await_partial()
623    }
624
625    /// Apply the given update to the aggregator.
626    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
627        let mut raw_size_update = size.raw;
628        let spec = self.core.bar_type.spec();
629        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
630
631        while raw_size_update > 0 {
632            if self.core.builder.volume.raw + raw_size_update < raw_step {
633                self.core.apply_update(
634                    price,
635                    Quantity::from_raw(raw_size_update, size.precision),
636                    ts_event,
637                );
638                break;
639            }
640
641            let raw_size_diff = raw_step - self.core.builder.volume.raw;
642            self.core.apply_update(
643                price,
644                Quantity::from_raw(raw_size_diff, size.precision),
645                ts_event,
646            );
647
648            self.core.build_now_and_send();
649            raw_size_update -= raw_size_diff;
650        }
651    }
652
653    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
654        let mut raw_volume_update = volume.raw;
655        let spec = self.core.bar_type.spec();
656        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
657
658        while raw_volume_update > 0 {
659            if self.core.builder.volume.raw + raw_volume_update < raw_step {
660                self.core.builder.update_bar(
661                    bar,
662                    Quantity::from_raw(raw_volume_update, volume.precision),
663                    ts_init,
664                );
665                break;
666            }
667
668            let raw_volume_diff = raw_step - self.core.builder.volume.raw;
669            self.core.builder.update_bar(
670                bar,
671                Quantity::from_raw(raw_volume_diff, volume.precision),
672                ts_init,
673            );
674
675            self.core.build_now_and_send();
676            raw_volume_update -= raw_volume_diff;
677        }
678    }
679
680    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
681        self.core.start_batch_update(handler);
682    }
683
684    fn stop_batch_update(&mut self) {
685        self.core.stop_batch_update();
686    }
687
688    fn set_partial(&mut self, partial_bar: Bar) {
689        self.core.set_partial(partial_bar);
690    }
691}
692
693/// Provides a means of building value bars aggregated from quote and trades.
694///
695/// When received value reaches the step threshold of the bar
696/// specification, then a bar is created and sent to the handler.
697pub struct ValueBarAggregator<H>
698where
699    H: FnMut(Bar),
700{
701    core: BarAggregatorCore<H>,
702    cum_value: f64,
703}
704
705impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
706    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707        f.debug_struct(stringify!(ValueBarAggregator))
708            .field("core", &self.core)
709            .field("cum_value", &self.cum_value)
710            .finish()
711    }
712}
713
714impl<H> ValueBarAggregator<H>
715where
716    H: FnMut(Bar),
717{
718    /// Creates a new [`ValueBarAggregator`] instance.
719    ///
720    /// # Panics
721    ///
722    /// This function panics if:
723    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
724    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
725    pub fn new(
726        bar_type: BarType,
727        price_precision: u8,
728        size_precision: u8,
729        handler: H,
730        await_partial: bool,
731    ) -> Self {
732        Self {
733            core: BarAggregatorCore::new(
734                bar_type.standard(),
735                price_precision,
736                size_precision,
737                handler,
738                await_partial,
739            ),
740            cum_value: 0.0,
741        }
742    }
743
744    #[must_use]
745    /// Returns the cumulative value for the aggregator.
746    pub const fn get_cumulative_value(&self) -> f64 {
747        self.cum_value
748    }
749}
750
751impl<H> BarAggregator for ValueBarAggregator<H>
752where
753    H: FnMut(Bar) + 'static,
754{
755    fn bar_type(&self) -> BarType {
756        self.core.bar_type
757    }
758
759    fn is_running(&self) -> bool {
760        self.core.is_running
761    }
762
763    fn set_await_partial(&mut self, value: bool) {
764        self.core.set_await_partial(value);
765    }
766
767    fn set_is_running(&mut self, value: bool) {
768        self.core.set_is_running(value);
769    }
770
771    fn await_partial(&self) -> bool {
772        self.core.await_partial()
773    }
774
775    /// Apply the given update to the aggregator.
776    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
777        let mut size_update = size.as_f64();
778        let spec = self.core.bar_type.spec();
779
780        while size_update > 0.0 {
781            let value_update = price.as_f64() * size_update;
782            if self.cum_value + value_update < spec.step.get() as f64 {
783                self.cum_value += value_update;
784                self.core
785                    .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
786                break;
787            }
788
789            let value_diff = spec.step.get() as f64 - self.cum_value;
790            let size_diff = size_update * (value_diff / value_update);
791            self.core
792                .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
793
794            self.core.build_now_and_send();
795            self.cum_value = 0.0;
796            size_update -= size_diff;
797        }
798    }
799
800    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
801        let mut volume_update = volume;
802        let average_price = Price::new(
803            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
804            self.core.builder.price_precision,
805        );
806
807        while volume_update.as_f64() > 0.0 {
808            let value_update = average_price.as_f64() * volume_update.as_f64();
809            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
810                self.cum_value += value_update;
811                self.core.builder.update_bar(bar, volume_update, ts_init);
812                break;
813            }
814
815            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
816            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
817            self.core.builder.update_bar(
818                bar,
819                Quantity::new(volume_diff, volume_update.precision),
820                ts_init,
821            );
822
823            self.core.build_now_and_send();
824            self.cum_value = 0.0;
825            volume_update = Quantity::new(
826                volume_update.as_f64() - volume_diff,
827                volume_update.precision,
828            );
829        }
830    }
831
832    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
833        self.core.start_batch_update(handler);
834    }
835
836    fn stop_batch_update(&mut self) {
837        self.core.stop_batch_update();
838    }
839
840    fn set_partial(&mut self, partial_bar: Bar) {
841        self.core.set_partial(partial_bar);
842    }
843}
844
845/// Provides a means of building time bars aggregated from quote and trades.
846///
847/// At each aggregation time interval, a bar is created and sent to the handler.
848pub struct TimeBarAggregator<H>
849where
850    H: FnMut(Bar),
851{
852    core: BarAggregatorCore<H>,
853    clock: Rc<RefCell<dyn Clock>>,
854    build_with_no_updates: bool,
855    timestamp_on_close: bool,
856    is_left_open: bool,
857    build_on_next_tick: bool,
858    stored_open_ns: UnixNanos,
859    stored_close_ns: UnixNanos,
860    timer_name: String,
861    interval_ns: UnixNanos,
862    next_close_ns: UnixNanos,
863    bar_build_delay: u64,
864    batch_open_ns: UnixNanos,
865    batch_next_close_ns: UnixNanos,
866    time_bars_origin_offset: Option<TimeDelta>,
867    skip_first_non_full_bar: bool,
868}
869
870impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
871    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872        f.debug_struct(stringify!(TimeBarAggregator))
873            .field("core", &self.core)
874            .field("build_with_no_updates", &self.build_with_no_updates)
875            .field("timestamp_on_close", &self.timestamp_on_close)
876            .field("is_left_open", &self.is_left_open)
877            .field("timer_name", &self.timer_name)
878            .field("interval_ns", &self.interval_ns)
879            .field("bar_build_delay", &self.bar_build_delay)
880            .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
881            .finish()
882    }
883}
884
885#[derive(Clone, Debug)]
886/// Callback wrapper for time-based bar aggregation events.
887///
888/// This struct provides a bridge between timer events and time bar aggregation,
889/// allowing bars to be automatically built and emitted at regular time intervals.
890/// It holds a reference to a `TimeBarAggregator` and triggers bar creation when
891/// timer events occur.
892pub struct NewBarCallback<H: FnMut(Bar)> {
893    aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
894}
895
896impl<H: FnMut(Bar)> NewBarCallback<H> {
897    /// Creates a new callback that invokes the time bar aggregator on timer events.
898    #[must_use]
899    pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
900        Self { aggregator }
901    }
902}
903
904impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
905    fn from(value: NewBarCallback<H>) -> Self {
906        Self::Rust(Rc::new(move |event: TimeEvent| {
907            value.aggregator.borrow_mut().build_bar(event);
908        }))
909    }
910}
911
912impl<H> TimeBarAggregator<H>
913where
914    H: FnMut(Bar) + 'static,
915{
916    /// Creates a new [`TimeBarAggregator`] instance.
917    ///
918    /// # Panics
919    ///
920    /// This function panics if:
921    /// - `instrument.id` is not equal to the `bar_type.instrument_id`.
922    /// - `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
923    #[allow(clippy::too_many_arguments)]
924    pub fn new(
925        bar_type: BarType,
926        price_precision: u8,
927        size_precision: u8,
928        clock: Rc<RefCell<dyn Clock>>,
929        handler: H,
930        await_partial: bool,
931        build_with_no_updates: bool,
932        timestamp_on_close: bool,
933        interval_type: BarIntervalType,
934        time_bars_origin_offset: Option<TimeDelta>,
935        bar_build_delay: u64,
936        skip_first_non_full_bar: bool,
937    ) -> Self {
938        let is_left_open = match interval_type {
939            BarIntervalType::LeftOpen => true,
940            BarIntervalType::RightOpen => false,
941        };
942
943        let core = BarAggregatorCore::new(
944            bar_type.standard(),
945            price_precision,
946            size_precision,
947            handler,
948            await_partial,
949        );
950
951        Self {
952            core,
953            clock,
954            build_with_no_updates,
955            timestamp_on_close,
956            is_left_open,
957            build_on_next_tick: false,
958            stored_open_ns: UnixNanos::default(),
959            stored_close_ns: UnixNanos::default(),
960            timer_name: bar_type.to_string(),
961            interval_ns: get_bar_interval_ns(&bar_type),
962            next_close_ns: UnixNanos::default(),
963            bar_build_delay,
964            batch_open_ns: UnixNanos::default(),
965            batch_next_close_ns: UnixNanos::default(),
966            time_bars_origin_offset,
967            skip_first_non_full_bar,
968        }
969    }
970
971    /// Starts the time bar aggregator, scheduling periodic bar builds on the clock.
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if setting up the underlying clock timer fails.
976    ///
977    /// # Panics
978    ///
979    /// Panics if the underlying clock timer registration fails.
980    pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
981        let now = self.clock.borrow().utc_now();
982        let mut start_time =
983            get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
984
985        if start_time == now {
986            self.skip_first_non_full_bar = false;
987        }
988
989        start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
990
991        let spec = &self.bar_type().spec();
992        let start_time_ns = UnixNanos::from(start_time);
993
994        if spec.aggregation == BarAggregation::Month {
995            let step = spec.step.get() as u32;
996            let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
997
998            self.clock
999                .borrow_mut()
1000                .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1001                .expect(FAILED);
1002        } else {
1003            self.clock
1004                .borrow_mut()
1005                .set_timer_ns(
1006                    &self.timer_name,
1007                    self.interval_ns.as_u64(),
1008                    Some(start_time_ns),
1009                    None,
1010                    Some(callback.into()),
1011                    None,
1012                    None,
1013                )
1014                .expect(FAILED);
1015        }
1016
1017        log::debug!("Started timer {}", self.timer_name);
1018        Ok(())
1019    }
1020
1021    /// Stops the time bar aggregator.
1022    pub fn stop(&mut self) {
1023        self.clock.borrow_mut().cancel_timer(&self.timer_name);
1024    }
1025
1026    /// Starts batch time for bar aggregation.
1027    ///
1028    /// # Panics
1029    ///
1030    /// Panics if month arithmetic operations fail for monthly aggregation intervals.
1031    pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1032        let spec = self.bar_type().spec();
1033        self.core.batch_mode = true;
1034
1035        let time = time_ns.to_datetime_utc();
1036        let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
1037        self.batch_open_ns = UnixNanos::from(start_time);
1038
1039        if spec.aggregation == BarAggregation::Month {
1040            let step = spec.step.get() as u32;
1041
1042            if self.batch_open_ns == time_ns {
1043                self.batch_open_ns =
1044                    subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1045            }
1046
1047            self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1048        } else {
1049            if self.batch_open_ns == time_ns {
1050                self.batch_open_ns -= self.interval_ns;
1051            }
1052
1053            self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1054        }
1055    }
1056
1057    const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1058        if self.is_left_open {
1059            if self.timestamp_on_close {
1060                close_ns
1061            } else {
1062                open_ns
1063            }
1064        } else {
1065            open_ns
1066        }
1067    }
1068
1069    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1070        if self.skip_first_non_full_bar {
1071            self.core.builder.reset();
1072            self.skip_first_non_full_bar = false;
1073        } else {
1074            self.core.build_and_send(ts_event, ts_init);
1075        }
1076    }
1077
1078    fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1079        if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1080            let ts_init = self.batch_next_close_ns;
1081            let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1082            self.build_and_send(ts_event, ts_init);
1083        }
1084    }
1085
1086    fn batch_post_update(&mut self, time_ns: UnixNanos) {
1087        let step = self.bar_type().spec().step.get() as u32;
1088
1089        // If not in batch mode and time matches next close, reset batch close
1090        if !self.core.batch_mode
1091            && time_ns == self.batch_next_close_ns
1092            && time_ns > self.stored_open_ns
1093        {
1094            self.batch_next_close_ns = UnixNanos::default();
1095            return;
1096        }
1097
1098        if time_ns > self.batch_next_close_ns {
1099            // Ensure batch times are coherent with last builder update
1100            if self.bar_type().spec().aggregation == BarAggregation::Month {
1101                while self.batch_next_close_ns < time_ns {
1102                    self.batch_next_close_ns =
1103                        add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1104                }
1105
1106                self.batch_open_ns =
1107                    subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1108            } else {
1109                while self.batch_next_close_ns < time_ns {
1110                    self.batch_next_close_ns += self.interval_ns;
1111                }
1112
1113                self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1114            }
1115        }
1116
1117        if time_ns == self.batch_next_close_ns {
1118            let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1119            self.build_and_send(ts_event, time_ns);
1120            self.batch_open_ns = self.batch_next_close_ns;
1121
1122            if self.bar_type().spec().aggregation == BarAggregation::Month {
1123                self.batch_next_close_ns =
1124                    add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1125            } else {
1126                self.batch_next_close_ns += self.interval_ns;
1127            }
1128        }
1129
1130        // Delay resetting batch_next_close_ns to allow creating a last historical bar when transitioning to regular bars
1131        if !self.core.batch_mode {
1132            self.batch_next_close_ns = UnixNanos::default();
1133        }
1134    }
1135
1136    fn build_bar(&mut self, event: TimeEvent) {
1137        if !self.core.builder.initialized {
1138            self.build_on_next_tick = true;
1139            self.stored_close_ns = self.next_close_ns;
1140            return;
1141        }
1142
1143        if !self.build_with_no_updates && self.core.builder.count == 0 {
1144            return;
1145        }
1146
1147        let ts_init = event.ts_event;
1148        let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1149        self.build_and_send(ts_event, ts_init);
1150
1151        self.stored_open_ns = ts_init;
1152
1153        if self.bar_type().spec().aggregation == BarAggregation::Month {
1154            let step = self.bar_type().spec().step.get() as u32;
1155            let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1156
1157            self.clock
1158                .borrow_mut()
1159                .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1160                .expect(FAILED);
1161
1162            self.next_close_ns = next_alert_ns;
1163        } else {
1164            self.next_close_ns = self
1165                .clock
1166                .borrow()
1167                .next_time_ns(&self.timer_name)
1168                .unwrap_or_default();
1169        }
1170    }
1171}
1172
1173impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1174where
1175    H: FnMut(Bar) + 'static,
1176{
1177    fn bar_type(&self) -> BarType {
1178        self.core.bar_type
1179    }
1180
1181    fn is_running(&self) -> bool {
1182        self.core.is_running
1183    }
1184
1185    fn set_await_partial(&mut self, value: bool) {
1186        self.core.set_await_partial(value);
1187    }
1188
1189    fn set_is_running(&mut self, value: bool) {
1190        self.core.set_is_running(value);
1191    }
1192
1193    fn await_partial(&self) -> bool {
1194        self.core.await_partial()
1195    }
1196    /// Stop time-based aggregator by cancelling its timer.
1197    fn stop(&mut self) {
1198        Self::stop(self);
1199    }
1200
1201    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1202        if self.batch_next_close_ns != UnixNanos::default() {
1203            self.batch_pre_update(ts_event);
1204        }
1205
1206        self.core.apply_update(price, size, ts_event);
1207
1208        if self.build_on_next_tick {
1209            if ts_event <= self.stored_close_ns {
1210                let ts_init = ts_event;
1211                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1212                self.build_and_send(ts_event, ts_init);
1213            }
1214
1215            self.build_on_next_tick = false;
1216            self.stored_close_ns = UnixNanos::default();
1217        }
1218
1219        if self.batch_next_close_ns != UnixNanos::default() {
1220            self.batch_post_update(ts_event);
1221        }
1222    }
1223
1224    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1225        if self.batch_next_close_ns != UnixNanos::default() {
1226            self.batch_pre_update(ts_init);
1227        }
1228
1229        self.core.builder.update_bar(bar, volume, ts_init);
1230
1231        if self.build_on_next_tick {
1232            if ts_init <= self.stored_close_ns {
1233                let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1234                self.build_and_send(ts_event, ts_init);
1235            }
1236
1237            // Reset flag and clear stored close
1238            self.build_on_next_tick = false;
1239            self.stored_close_ns = UnixNanos::default();
1240        }
1241
1242        if self.batch_next_close_ns != UnixNanos::default() {
1243            self.batch_post_update(ts_init);
1244        }
1245    }
1246
1247    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1248        self.core.start_batch_update(handler);
1249        self.start_batch_time(time_ns);
1250    }
1251
1252    fn stop_batch_update(&mut self) {
1253        self.core.stop_batch_update();
1254    }
1255
1256    fn set_partial(&mut self, partial_bar: Bar) {
1257        self.core.set_partial(partial_bar);
1258    }
1259}
1260
1261////////////////////////////////////////////////////////////////////////////////
1262// Tests
1263////////////////////////////////////////////////////////////////////////////////
1264#[cfg(test)]
1265mod tests {
1266    use std::sync::{Arc, Mutex};
1267
1268    use nautilus_common::clock::TestClock;
1269    use nautilus_core::UUID4;
1270    use nautilus_model::{
1271        data::{BarSpecification, BarType},
1272        enums::{AggregationSource, BarAggregation, PriceType},
1273        instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1274        types::{Price, Quantity},
1275    };
1276    use rstest::rstest;
1277    use ustr::Ustr;
1278
1279    use super::*;
1280
1281    #[rstest]
1282    fn test_bar_builder_initialization(equity_aapl: Equity) {
1283        let instrument = InstrumentAny::Equity(equity_aapl);
1284        let bar_type = BarType::new(
1285            instrument.id(),
1286            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1287            AggregationSource::Internal,
1288        );
1289        let builder = BarBuilder::new(
1290            bar_type,
1291            instrument.price_precision(),
1292            instrument.size_precision(),
1293        );
1294
1295        assert!(!builder.initialized);
1296        assert_eq!(builder.ts_last, 0);
1297        assert_eq!(builder.count, 0);
1298    }
1299
1300    #[rstest]
1301    fn test_set_partial_update(equity_aapl: Equity) {
1302        let instrument = InstrumentAny::Equity(equity_aapl);
1303        let bar_type = BarType::new(
1304            instrument.id(),
1305            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1306            AggregationSource::Internal,
1307        );
1308        let mut builder = BarBuilder::new(
1309            bar_type,
1310            instrument.price_precision(),
1311            instrument.size_precision(),
1312        );
1313
1314        let partial_bar = Bar::new(
1315            bar_type,
1316            Price::from("101.00"),
1317            Price::from("102.00"),
1318            Price::from("100.00"),
1319            Price::from("101.00"),
1320            Quantity::from(100),
1321            UnixNanos::from(1),
1322            UnixNanos::from(2),
1323        );
1324
1325        builder.set_partial(partial_bar);
1326        let bar = builder.build_now();
1327
1328        assert_eq!(bar.open, partial_bar.open);
1329        assert_eq!(bar.high, partial_bar.high);
1330        assert_eq!(bar.low, partial_bar.low);
1331        assert_eq!(bar.close, partial_bar.close);
1332        assert_eq!(bar.volume, partial_bar.volume);
1333        assert_eq!(builder.ts_last, 2);
1334    }
1335
1336    #[rstest]
1337    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1338        let instrument = InstrumentAny::Equity(equity_aapl);
1339        let bar_type = BarType::new(
1340            instrument.id(),
1341            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1342            AggregationSource::Internal,
1343        );
1344        let mut builder = BarBuilder::new(
1345            bar_type,
1346            instrument.price_precision(),
1347            instrument.size_precision(),
1348        );
1349
1350        builder.update(
1351            Price::from("100.00"),
1352            Quantity::from(1),
1353            UnixNanos::from(1000),
1354        );
1355        builder.update(
1356            Price::from("95.00"),
1357            Quantity::from(1),
1358            UnixNanos::from(2000),
1359        );
1360        builder.update(
1361            Price::from("105.00"),
1362            Quantity::from(1),
1363            UnixNanos::from(3000),
1364        );
1365
1366        let bar = builder.build_now();
1367        assert!(bar.high > bar.low);
1368        assert_eq!(bar.open, Price::from("100.00"));
1369        assert_eq!(bar.high, Price::from("105.00"));
1370        assert_eq!(bar.low, Price::from("95.00"));
1371        assert_eq!(bar.close, Price::from("105.00"));
1372    }
1373
1374    #[rstest]
1375    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1376        let instrument = InstrumentAny::Equity(equity_aapl);
1377        let bar_type = BarType::new(
1378            instrument.id(),
1379            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1380            AggregationSource::Internal,
1381        );
1382        let mut builder = BarBuilder::new(
1383            bar_type,
1384            instrument.price_precision(),
1385            instrument.size_precision(),
1386        );
1387
1388        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1389        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1390
1391        assert_eq!(builder.ts_last, 1_000);
1392        assert_eq!(builder.count, 1);
1393    }
1394
1395    #[rstest]
1396    fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1397        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1398        let bar_type = BarType::new(
1399            instrument.id(),
1400            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1401            AggregationSource::Internal,
1402        );
1403        let mut builder = BarBuilder::new(
1404            bar_type,
1405            instrument.price_precision(),
1406            instrument.size_precision(),
1407        );
1408
1409        let partial_bar = Bar::new(
1410            bar_type,
1411            Price::from("1.00001"),
1412            Price::from("1.00010"),
1413            Price::from("1.00000"),
1414            Price::from("1.00002"),
1415            Quantity::from(1),
1416            UnixNanos::from(1_000_000_000),
1417            UnixNanos::from(2_000_000_000),
1418        );
1419
1420        builder.set_partial(partial_bar);
1421        let bar = builder.build_now();
1422
1423        assert_eq!(bar.open, Price::from("1.00001"));
1424        assert_eq!(bar.high, Price::from("1.00010"));
1425        assert_eq!(bar.low, Price::from("1.00000"));
1426        assert_eq!(bar.close, Price::from("1.00002"));
1427        assert_eq!(bar.volume, Quantity::from(1));
1428        assert_eq!(bar.ts_init, 2_000_000_000);
1429        assert_eq!(builder.ts_last, 2_000_000_000);
1430    }
1431
1432    #[rstest]
1433    fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1434        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1435        let bar_type = BarType::new(
1436            instrument.id(),
1437            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1438            AggregationSource::Internal,
1439        );
1440        let mut builder = BarBuilder::new(
1441            bar_type,
1442            instrument.price_precision(),
1443            instrument.size_precision(),
1444        );
1445
1446        let partial_bar1 = Bar::new(
1447            bar_type,
1448            Price::from("1.00001"),
1449            Price::from("1.00010"),
1450            Price::from("1.00000"),
1451            Price::from("1.00002"),
1452            Quantity::from(1),
1453            UnixNanos::from(1_000_000_000),
1454            UnixNanos::from(1_000_000_000),
1455        );
1456
1457        let partial_bar2 = Bar::new(
1458            bar_type,
1459            Price::from("2.00001"),
1460            Price::from("2.00010"),
1461            Price::from("2.00000"),
1462            Price::from("2.00002"),
1463            Quantity::from(2),
1464            UnixNanos::from(3_000_000_000),
1465            UnixNanos::from(3_000_000_000),
1466        );
1467
1468        builder.set_partial(partial_bar1);
1469        builder.set_partial(partial_bar2);
1470        let bar = builder.build(
1471            UnixNanos::from(4_000_000_000),
1472            UnixNanos::from(4_000_000_000),
1473        );
1474
1475        assert_eq!(bar.open, Price::from("1.00001"));
1476        assert_eq!(bar.high, Price::from("1.00010"));
1477        assert_eq!(bar.low, Price::from("1.00000"));
1478        assert_eq!(bar.close, Price::from("1.00002"));
1479        assert_eq!(bar.volume, Quantity::from(1));
1480        assert_eq!(bar.ts_init, 4_000_000_000);
1481        assert_eq!(builder.ts_last, 1_000_000_000);
1482    }
1483
1484    #[rstest]
1485    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1486        let instrument = InstrumentAny::Equity(equity_aapl);
1487        let bar_type = BarType::new(
1488            instrument.id(),
1489            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1490            AggregationSource::Internal,
1491        );
1492        let mut builder = BarBuilder::new(
1493            bar_type,
1494            instrument.price_precision(),
1495            instrument.size_precision(),
1496        );
1497
1498        builder.update(
1499            Price::from("1.00000"),
1500            Quantity::from(1),
1501            UnixNanos::default(),
1502        );
1503
1504        assert!(builder.initialized);
1505        assert_eq!(builder.ts_last, 0);
1506        assert_eq!(builder.count, 1);
1507    }
1508
1509    #[rstest]
1510    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1511        equity_aapl: Equity,
1512    ) {
1513        let instrument = InstrumentAny::Equity(equity_aapl);
1514        let bar_type = BarType::new(
1515            instrument.id(),
1516            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1517            AggregationSource::Internal,
1518        );
1519        let mut builder = BarBuilder::new(bar_type, 2, 0);
1520
1521        builder.update(
1522            Price::from("1.00000"),
1523            Quantity::from(1),
1524            UnixNanos::from(1_000),
1525        );
1526        builder.update(
1527            Price::from("1.00001"),
1528            Quantity::from(1),
1529            UnixNanos::from(500),
1530        );
1531
1532        assert!(builder.initialized);
1533        assert_eq!(builder.ts_last, 1_000);
1534        assert_eq!(builder.count, 1);
1535    }
1536
1537    #[rstest]
1538    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1539        let instrument = InstrumentAny::Equity(equity_aapl);
1540        let bar_type = BarType::new(
1541            instrument.id(),
1542            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1543            AggregationSource::Internal,
1544        );
1545        let mut builder = BarBuilder::new(
1546            bar_type,
1547            instrument.price_precision(),
1548            instrument.size_precision(),
1549        );
1550
1551        for _ in 0..5 {
1552            builder.update(
1553                Price::from("1.00000"),
1554                Quantity::from(1),
1555                UnixNanos::from(1_000),
1556            );
1557        }
1558
1559        assert_eq!(builder.count, 5);
1560    }
1561
1562    #[rstest]
1563    #[should_panic]
1564    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1565        let instrument = InstrumentAny::Equity(equity_aapl);
1566        let bar_type = BarType::new(
1567            instrument.id(),
1568            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1569            AggregationSource::Internal,
1570        );
1571        let mut builder = BarBuilder::new(
1572            bar_type,
1573            instrument.price_precision(),
1574            instrument.size_precision(),
1575        );
1576        let _ = builder.build_now();
1577    }
1578
1579    #[rstest]
1580    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1581        let instrument = InstrumentAny::Equity(equity_aapl);
1582        let bar_type = BarType::new(
1583            instrument.id(),
1584            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1585            AggregationSource::Internal,
1586        );
1587        let mut builder = BarBuilder::new(
1588            bar_type,
1589            instrument.price_precision(),
1590            instrument.size_precision(),
1591        );
1592
1593        builder.update(
1594            Price::from("1.00001"),
1595            Quantity::from(2),
1596            UnixNanos::default(),
1597        );
1598        builder.update(
1599            Price::from("1.00002"),
1600            Quantity::from(2),
1601            UnixNanos::default(),
1602        );
1603        builder.update(
1604            Price::from("1.00000"),
1605            Quantity::from(1),
1606            UnixNanos::from(1_000_000_000),
1607        );
1608
1609        let bar = builder.build_now();
1610
1611        assert_eq!(bar.open, Price::from("1.00001"));
1612        assert_eq!(bar.high, Price::from("1.00002"));
1613        assert_eq!(bar.low, Price::from("1.00000"));
1614        assert_eq!(bar.close, Price::from("1.00000"));
1615        assert_eq!(bar.volume, Quantity::from(5));
1616        assert_eq!(bar.ts_init, 1_000_000_000);
1617        assert_eq!(builder.ts_last, 1_000_000_000);
1618        assert_eq!(builder.count, 0);
1619    }
1620
1621    #[rstest]
1622    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1623        let instrument = InstrumentAny::Equity(equity_aapl);
1624        let bar_type = BarType::new(
1625            instrument.id(),
1626            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1627            AggregationSource::Internal,
1628        );
1629        let mut builder = BarBuilder::new(bar_type, 2, 0);
1630
1631        builder.update(
1632            Price::from("1.00001"),
1633            Quantity::from(1),
1634            UnixNanos::default(),
1635        );
1636        builder.build_now();
1637
1638        builder.update(
1639            Price::from("1.00000"),
1640            Quantity::from(1),
1641            UnixNanos::default(),
1642        );
1643        builder.update(
1644            Price::from("1.00003"),
1645            Quantity::from(1),
1646            UnixNanos::default(),
1647        );
1648        builder.update(
1649            Price::from("1.00002"),
1650            Quantity::from(1),
1651            UnixNanos::default(),
1652        );
1653
1654        let bar = builder.build_now();
1655
1656        assert_eq!(bar.open, Price::from("1.00000"));
1657        assert_eq!(bar.high, Price::from("1.00003"));
1658        assert_eq!(bar.low, Price::from("1.00000"));
1659        assert_eq!(bar.close, Price::from("1.00002"));
1660        assert_eq!(bar.volume, Quantity::from(3));
1661    }
1662
1663    #[rstest]
1664    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1665        let instrument = InstrumentAny::Equity(equity_aapl);
1666        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1667        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1668        let handler = Arc::new(Mutex::new(Vec::new()));
1669        let handler_clone = Arc::clone(&handler);
1670
1671        let mut aggregator = TickBarAggregator::new(
1672            bar_type,
1673            instrument.price_precision(),
1674            instrument.size_precision(),
1675            move |bar: Bar| {
1676                let mut handler_guard = handler_clone.lock().unwrap();
1677                handler_guard.push(bar);
1678            },
1679            false,
1680        );
1681
1682        let trade = TradeTick::default();
1683        aggregator.handle_trade(trade);
1684
1685        let handler_guard = handler.lock().unwrap();
1686        assert_eq!(handler_guard.len(), 0);
1687    }
1688
1689    #[rstest]
1690    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1691        let instrument = InstrumentAny::Equity(equity_aapl);
1692        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1693        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1694        let handler = Arc::new(Mutex::new(Vec::new()));
1695        let handler_clone = Arc::clone(&handler);
1696
1697        let mut aggregator = TickBarAggregator::new(
1698            bar_type,
1699            instrument.price_precision(),
1700            instrument.size_precision(),
1701            move |bar: Bar| {
1702                let mut handler_guard = handler_clone.lock().unwrap();
1703                handler_guard.push(bar);
1704            },
1705            false,
1706        );
1707
1708        let trade = TradeTick::default();
1709        aggregator.handle_trade(trade);
1710        aggregator.handle_trade(trade);
1711        aggregator.handle_trade(trade);
1712
1713        let handler_guard = handler.lock().unwrap();
1714        let bar = handler_guard.first().unwrap();
1715        assert_eq!(handler_guard.len(), 1);
1716        assert_eq!(bar.open, trade.price);
1717        assert_eq!(bar.high, trade.price);
1718        assert_eq!(bar.low, trade.price);
1719        assert_eq!(bar.close, trade.price);
1720        assert_eq!(bar.volume, Quantity::from(300000));
1721        assert_eq!(bar.ts_event, trade.ts_event);
1722        assert_eq!(bar.ts_init, trade.ts_init);
1723    }
1724
1725    #[rstest]
1726    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1727        let instrument = InstrumentAny::Equity(equity_aapl);
1728        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1729        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1730        let handler = Arc::new(Mutex::new(Vec::new()));
1731        let handler_clone = Arc::clone(&handler);
1732
1733        let mut aggregator = TickBarAggregator::new(
1734            bar_type,
1735            instrument.price_precision(),
1736            instrument.size_precision(),
1737            move |bar: Bar| {
1738                let mut handler_guard = handler_clone.lock().unwrap();
1739                handler_guard.push(bar);
1740            },
1741            false,
1742        );
1743
1744        aggregator.update(
1745            Price::from("1.00001"),
1746            Quantity::from(1),
1747            UnixNanos::default(),
1748        );
1749        aggregator.update(
1750            Price::from("1.00002"),
1751            Quantity::from(1),
1752            UnixNanos::from(1000),
1753        );
1754        aggregator.update(
1755            Price::from("1.00003"),
1756            Quantity::from(1),
1757            UnixNanos::from(2000),
1758        );
1759
1760        let handler_guard = handler.lock().unwrap();
1761        assert_eq!(handler_guard.len(), 1);
1762
1763        let bar = handler_guard.first().unwrap();
1764        assert_eq!(bar.open, Price::from("1.00001"));
1765        assert_eq!(bar.high, Price::from("1.00003"));
1766        assert_eq!(bar.low, Price::from("1.00001"));
1767        assert_eq!(bar.close, Price::from("1.00003"));
1768        assert_eq!(bar.volume, Quantity::from(3));
1769    }
1770
1771    #[rstest]
1772    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1773        let instrument = InstrumentAny::Equity(equity_aapl);
1774        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1775        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1776        let handler = Arc::new(Mutex::new(Vec::new()));
1777        let handler_clone = Arc::clone(&handler);
1778
1779        let mut aggregator = TickBarAggregator::new(
1780            bar_type,
1781            instrument.price_precision(),
1782            instrument.size_precision(),
1783            move |bar: Bar| {
1784                let mut handler_guard = handler_clone.lock().unwrap();
1785                handler_guard.push(bar);
1786            },
1787            false,
1788        );
1789
1790        aggregator.update(
1791            Price::from("1.00001"),
1792            Quantity::from(1),
1793            UnixNanos::default(),
1794        );
1795        aggregator.update(
1796            Price::from("1.00002"),
1797            Quantity::from(1),
1798            UnixNanos::from(1000),
1799        );
1800        aggregator.update(
1801            Price::from("1.00003"),
1802            Quantity::from(1),
1803            UnixNanos::from(2000),
1804        );
1805        aggregator.update(
1806            Price::from("1.00004"),
1807            Quantity::from(1),
1808            UnixNanos::from(3000),
1809        );
1810
1811        let handler_guard = handler.lock().unwrap();
1812        assert_eq!(handler_guard.len(), 2);
1813
1814        let bar1 = &handler_guard[0];
1815        assert_eq!(bar1.open, Price::from("1.00001"));
1816        assert_eq!(bar1.close, Price::from("1.00002"));
1817        assert_eq!(bar1.volume, Quantity::from(2));
1818
1819        let bar2 = &handler_guard[1];
1820        assert_eq!(bar2.open, Price::from("1.00003"));
1821        assert_eq!(bar2.close, Price::from("1.00004"));
1822        assert_eq!(bar2.volume, Quantity::from(2));
1823    }
1824
1825    #[rstest]
1826    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1827        let instrument = InstrumentAny::Equity(equity_aapl);
1828        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1829        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1830        let handler = Arc::new(Mutex::new(Vec::new()));
1831        let handler_clone = Arc::clone(&handler);
1832
1833        let mut aggregator = VolumeBarAggregator::new(
1834            bar_type,
1835            instrument.price_precision(),
1836            instrument.size_precision(),
1837            move |bar: Bar| {
1838                let mut handler_guard = handler_clone.lock().unwrap();
1839                handler_guard.push(bar);
1840            },
1841            false,
1842        );
1843
1844        aggregator.update(
1845            Price::from("1.00001"),
1846            Quantity::from(25),
1847            UnixNanos::default(),
1848        );
1849
1850        let handler_guard = handler.lock().unwrap();
1851        assert_eq!(handler_guard.len(), 2);
1852        let bar1 = &handler_guard[0];
1853        assert_eq!(bar1.volume, Quantity::from(10));
1854        let bar2 = &handler_guard[1];
1855        assert_eq!(bar2.volume, Quantity::from(10));
1856    }
1857
1858    #[rstest]
1859    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1860        let instrument = InstrumentAny::Equity(equity_aapl);
1861        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
1862        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1863        let handler = Arc::new(Mutex::new(Vec::new()));
1864        let handler_clone = Arc::clone(&handler);
1865
1866        let mut aggregator = ValueBarAggregator::new(
1867            bar_type,
1868            instrument.price_precision(),
1869            instrument.size_precision(),
1870            move |bar: Bar| {
1871                let mut handler_guard = handler_clone.lock().unwrap();
1872                handler_guard.push(bar);
1873            },
1874            false,
1875        );
1876
1877        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
1878        aggregator.update(
1879            Price::from("100.00"),
1880            Quantity::from(5),
1881            UnixNanos::default(),
1882        );
1883        aggregator.update(
1884            Price::from("100.00"),
1885            Quantity::from(5),
1886            UnixNanos::from(1000),
1887        );
1888
1889        let handler_guard = handler.lock().unwrap();
1890        assert_eq!(handler_guard.len(), 1);
1891        let bar = handler_guard.first().unwrap();
1892        assert_eq!(bar.volume, Quantity::from(10));
1893    }
1894
1895    #[rstest]
1896    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1897        let instrument = InstrumentAny::Equity(equity_aapl);
1898        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1899        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1900        let handler = Arc::new(Mutex::new(Vec::new()));
1901        let handler_clone = Arc::clone(&handler);
1902
1903        let mut aggregator = ValueBarAggregator::new(
1904            bar_type,
1905            instrument.price_precision(),
1906            instrument.size_precision(),
1907            move |bar: Bar| {
1908                let mut handler_guard = handler_clone.lock().unwrap();
1909                handler_guard.push(bar);
1910            },
1911            false,
1912        );
1913
1914        // Single large update: $100 * 25 = $2500 (should create 2 bars)
1915        aggregator.update(
1916            Price::from("100.00"),
1917            Quantity::from(25),
1918            UnixNanos::default(),
1919        );
1920
1921        let handler_guard = handler.lock().unwrap();
1922        assert_eq!(handler_guard.len(), 2);
1923        let remaining_value = aggregator.get_cumulative_value();
1924        assert!(remaining_value < 1000.0); // Should be less than threshold
1925    }
1926
1927    #[rstest]
1928    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1929        let instrument = InstrumentAny::Equity(equity_aapl);
1930        // One second bars
1931        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1932        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1933        let handler = Arc::new(Mutex::new(Vec::new()));
1934        let handler_clone = Arc::clone(&handler);
1935        let clock = Rc::new(RefCell::new(TestClock::new()));
1936
1937        let mut aggregator = TimeBarAggregator::new(
1938            bar_type,
1939            instrument.price_precision(),
1940            instrument.size_precision(),
1941            clock.clone(),
1942            move |bar: Bar| {
1943                let mut handler_guard = handler_clone.lock().unwrap();
1944                handler_guard.push(bar);
1945            },
1946            false, // await_partial
1947            true,  // build_with_no_updates
1948            false, // timestamp_on_close
1949            BarIntervalType::LeftOpen,
1950            None,  // time_bars_origin_offset
1951            15,    // bar_build_delay
1952            false, // skip_first_non_full_bar
1953        );
1954
1955        aggregator.update(
1956            Price::from("100.00"),
1957            Quantity::from(1),
1958            UnixNanos::default(),
1959        );
1960
1961        let next_sec = UnixNanos::from(1_000_000_000);
1962        clock.borrow_mut().set_time(next_sec);
1963
1964        let event = TimeEvent::new(
1965            Ustr::from("1-SECOND-LAST"),
1966            UUID4::new(),
1967            next_sec,
1968            next_sec,
1969        );
1970        aggregator.build_bar(event);
1971
1972        let handler_guard = handler.lock().unwrap();
1973        assert_eq!(handler_guard.len(), 1);
1974        let bar = handler_guard.first().unwrap();
1975        assert_eq!(bar.ts_event, UnixNanos::default());
1976        assert_eq!(bar.ts_init, next_sec);
1977    }
1978
1979    #[rstest]
1980    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1981        let instrument = InstrumentAny::Equity(equity_aapl);
1982        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1983        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1984        let handler = Arc::new(Mutex::new(Vec::new()));
1985        let handler_clone = Arc::clone(&handler);
1986        let clock = Rc::new(RefCell::new(TestClock::new()));
1987
1988        let mut aggregator = TimeBarAggregator::new(
1989            bar_type,
1990            instrument.price_precision(),
1991            instrument.size_precision(),
1992            clock.clone(),
1993            move |bar: Bar| {
1994                let mut handler_guard = handler_clone.lock().unwrap();
1995                handler_guard.push(bar);
1996            },
1997            false, // await_partial
1998            true,  // build_with_no_updates
1999            true,  // timestamp_on_close - changed to true to verify left-open behavior
2000            BarIntervalType::LeftOpen,
2001            None,
2002            15,
2003            false, // skip_first_non_full_bar
2004        );
2005
2006        // Update in first interval
2007        aggregator.update(
2008            Price::from("100.00"),
2009            Quantity::from(1),
2010            UnixNanos::default(),
2011        );
2012
2013        // First interval close
2014        let ts1 = UnixNanos::from(1_000_000_000);
2015        clock.borrow_mut().set_time(ts1);
2016        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2017        aggregator.build_bar(event);
2018
2019        // Update in second interval
2020        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2021
2022        // Second interval close
2023        let ts2 = UnixNanos::from(2_000_000_000);
2024        clock.borrow_mut().set_time(ts2);
2025        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2026        aggregator.build_bar(event);
2027
2028        let handler_guard = handler.lock().unwrap();
2029        assert_eq!(handler_guard.len(), 2);
2030
2031        let bar1 = &handler_guard[0];
2032        assert_eq!(bar1.ts_event, ts1); // For left-open with timestamp_on_close=true
2033        assert_eq!(bar1.ts_init, ts1);
2034        assert_eq!(bar1.close, Price::from("100.00"));
2035        let bar2 = &handler_guard[1];
2036        assert_eq!(bar2.ts_event, ts2);
2037        assert_eq!(bar2.ts_init, ts2);
2038        assert_eq!(bar2.close, Price::from("101.00"));
2039    }
2040
2041    #[rstest]
2042    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
2043        let instrument = InstrumentAny::Equity(equity_aapl);
2044        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2045        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2046        let handler = Arc::new(Mutex::new(Vec::new()));
2047        let handler_clone = Arc::clone(&handler);
2048        let clock = Rc::new(RefCell::new(TestClock::new()));
2049        let mut aggregator = TimeBarAggregator::new(
2050            bar_type,
2051            instrument.price_precision(),
2052            instrument.size_precision(),
2053            clock.clone(),
2054            move |bar: Bar| {
2055                let mut handler_guard = handler_clone.lock().unwrap();
2056                handler_guard.push(bar);
2057            },
2058            false, // await_partial
2059            true,  // build_with_no_updates
2060            true,  // timestamp_on_close
2061            BarIntervalType::RightOpen,
2062            None,
2063            15,
2064            false, // skip_first_non_full_bar
2065        );
2066
2067        // Update in first interval
2068        aggregator.update(
2069            Price::from("100.00"),
2070            Quantity::from(1),
2071            UnixNanos::default(),
2072        );
2073
2074        // First interval close
2075        let ts1 = UnixNanos::from(1_000_000_000);
2076        clock.borrow_mut().set_time(ts1);
2077        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2078        aggregator.build_bar(event);
2079
2080        // Update in second interval
2081        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2082
2083        // Second interval close
2084        let ts2 = UnixNanos::from(2_000_000_000);
2085        clock.borrow_mut().set_time(ts2);
2086        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2087        aggregator.build_bar(event);
2088
2089        let handler_guard = handler.lock().unwrap();
2090        assert_eq!(handler_guard.len(), 2);
2091
2092        let bar1 = &handler_guard[0];
2093        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
2094        assert_eq!(bar1.ts_init, ts1);
2095        assert_eq!(bar1.close, Price::from("100.00"));
2096
2097        let bar2 = &handler_guard[1];
2098        assert_eq!(bar2.ts_event, ts1);
2099        assert_eq!(bar2.ts_init, ts2);
2100        assert_eq!(bar2.close, Price::from("101.00"));
2101    }
2102
2103    #[rstest]
2104    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2105        let instrument = InstrumentAny::Equity(equity_aapl);
2106        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2107        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2108        let handler = Arc::new(Mutex::new(Vec::new()));
2109        let handler_clone = Arc::clone(&handler);
2110        let clock = Rc::new(RefCell::new(TestClock::new()));
2111
2112        // First test with build_with_no_updates = false
2113        let mut aggregator = TimeBarAggregator::new(
2114            bar_type,
2115            instrument.price_precision(),
2116            instrument.size_precision(),
2117            clock.clone(),
2118            move |bar: Bar| {
2119                let mut handler_guard = handler_clone.lock().unwrap();
2120                handler_guard.push(bar);
2121            },
2122            false, // await_partial
2123            false, // build_with_no_updates disabled
2124            true,  // timestamp_on_close
2125            BarIntervalType::LeftOpen,
2126            None,
2127            15,
2128            false, // skip_first_non_full_bar
2129        );
2130
2131        // No updates, just interval close
2132        let ts1 = UnixNanos::from(1_000_000_000);
2133        clock.borrow_mut().set_time(ts1);
2134        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2135        aggregator.build_bar(event);
2136
2137        let handler_guard = handler.lock().unwrap();
2138        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
2139        drop(handler_guard);
2140
2141        // Now test with build_with_no_updates = true
2142        let handler = Arc::new(Mutex::new(Vec::new()));
2143        let handler_clone = Arc::clone(&handler);
2144        let mut aggregator = TimeBarAggregator::new(
2145            bar_type,
2146            instrument.price_precision(),
2147            instrument.size_precision(),
2148            clock.clone(),
2149            move |bar: Bar| {
2150                let mut handler_guard = handler_clone.lock().unwrap();
2151                handler_guard.push(bar);
2152            },
2153            false,
2154            true, // build_with_no_updates enabled
2155            true, // timestamp_on_close
2156            BarIntervalType::LeftOpen,
2157            None,
2158            15,
2159            false, // skip_first_non_full_bar
2160        );
2161
2162        aggregator.update(
2163            Price::from("100.00"),
2164            Quantity::from(1),
2165            UnixNanos::default(),
2166        );
2167
2168        // First interval with update
2169        let ts1 = UnixNanos::from(1_000_000_000);
2170        clock.borrow_mut().set_time(ts1);
2171        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2172        aggregator.build_bar(event);
2173
2174        // Second interval without updates
2175        let ts2 = UnixNanos::from(2_000_000_000);
2176        clock.borrow_mut().set_time(ts2);
2177        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2178        aggregator.build_bar(event);
2179
2180        let handler_guard = handler.lock().unwrap();
2181        assert_eq!(handler_guard.len(), 2); // Both bars should be built
2182        let bar1 = &handler_guard[0];
2183        assert_eq!(bar1.close, Price::from("100.00"));
2184        let bar2 = &handler_guard[1];
2185        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
2186    }
2187
2188    #[rstest]
2189    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2190        let instrument = InstrumentAny::Equity(equity_aapl);
2191        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2192        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2193        let clock = Rc::new(RefCell::new(TestClock::new()));
2194        let handler = Arc::new(Mutex::new(Vec::new()));
2195        let handler_clone = Arc::clone(&handler);
2196
2197        let mut aggregator = TimeBarAggregator::new(
2198            bar_type,
2199            instrument.price_precision(),
2200            instrument.size_precision(),
2201            clock.clone(),
2202            move |bar: Bar| {
2203                let mut handler_guard = handler_clone.lock().unwrap();
2204                handler_guard.push(bar);
2205            },
2206            false, // await_partial
2207            true,  // build_with_no_updates
2208            true,  // timestamp_on_close
2209            BarIntervalType::RightOpen,
2210            None,
2211            15,
2212            false, // skip_first_non_full_bar
2213        );
2214
2215        let ts1 = UnixNanos::from(1_000_000_000);
2216        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2217
2218        let ts2 = UnixNanos::from(2_000_000_000);
2219        clock.borrow_mut().set_time(ts2);
2220
2221        // Simulate timestamp on close
2222        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2223        aggregator.build_bar(event);
2224
2225        let handler_guard = handler.lock().unwrap();
2226        let bar = handler_guard.first().unwrap();
2227        assert_eq!(bar.ts_event, UnixNanos::default());
2228        assert_eq!(bar.ts_init, ts2);
2229    }
2230
2231    #[rstest]
2232    fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2233        let instrument = InstrumentAny::Equity(equity_aapl);
2234        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2235        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2236        let clock = Rc::new(RefCell::new(TestClock::new()));
2237        let handler = Arc::new(Mutex::new(Vec::new()));
2238        let handler_clone = Arc::clone(&handler);
2239
2240        let mut aggregator = TimeBarAggregator::new(
2241            bar_type,
2242            instrument.price_precision(),
2243            instrument.size_precision(),
2244            clock.clone(),
2245            move |bar: Bar| {
2246                let mut handler_guard = handler_clone.lock().unwrap();
2247                handler_guard.push(bar);
2248            },
2249            false, // await_partial
2250            true,  // build_with_no_updates
2251            true,  // timestamp_on_close
2252            BarIntervalType::LeftOpen,
2253            None,
2254            15,
2255            false, // skip_first_non_full_bar
2256        );
2257
2258        let ts1 = UnixNanos::from(1_000_000_000);
2259        clock.borrow_mut().set_time(ts1);
2260
2261        let initial_time = clock.borrow().utc_now();
2262        aggregator.start_batch_time(UnixNanos::from(
2263            initial_time.timestamp_nanos_opt().unwrap() as u64
2264        ));
2265
2266        let handler_guard = handler.lock().unwrap();
2267        assert_eq!(handler_guard.len(), 0);
2268    }
2269}