nautilus_data/
aggregation.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregation machinery.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21#![allow(unused_assignments)]
22
23use std::{cell::RefCell, ops::Add, rc::Rc};
24
25use chrono::{DateTime, TimeDelta, TimeZone, Utc};
26use nautilus_common::{
27    clock::Clock,
28    timer::{TimeEvent, TimeEventCallback},
29};
30use nautilus_core::{
31    correctness::{self, FAILED},
32    datetime::{add_n_months, subtract_n_months},
33    UnixNanos,
34};
35use nautilus_model::{
36    data::{
37        bar::{get_bar_interval, get_bar_interval_ns, get_time_bar_start, Bar, BarType},
38        QuoteTick, TradeTick,
39    },
40    enums::{AggregationSource, BarAggregation, BarIntervalType},
41    types::{fixed::FIXED_SCALAR, quantity::QuantityRaw, Price, Quantity},
42};
43
44pub trait BarAggregator {
45    /// The [`BarType`] to be aggregated.
46    fn bar_type(&self) -> BarType;
47    /// If the aggregator is running and will receive data from the message bus.
48    fn is_running(&self) -> bool;
49    fn set_await_partial(&mut self, value: bool);
50    fn set_is_running(&mut self, value: bool);
51    /// Updates theaggregator  with the given price and size.
52    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
53    /// Updates the aggregator with the given quote.
54    fn handle_quote(&mut self, quote: QuoteTick) {
55        let spec = self.bar_type().spec();
56        self.update(
57            quote.extract_price(spec.price_type),
58            quote.extract_size(spec.price_type),
59            quote.ts_event,
60        );
61    }
62    /// Updates the aggregator with the given trade.
63    fn handle_trade(&mut self, trade: TradeTick) {
64        self.update(trade.price, trade.size, trade.ts_event);
65    }
66    fn handle_bar(&mut self, bar: Bar) {
67        self.update_bar(bar, bar.volume, bar.ts_init);
68    }
69    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
70    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>);
71    fn stop_batch_update(&mut self);
72}
73
74/// Provides a generic bar builder for aggregation.
75pub struct BarBuilder {
76    bar_type: BarType,
77    price_precision: u8,
78    size_precision: u8,
79    initialized: bool,
80    ts_last: UnixNanos,
81    count: usize,
82    partial_set: bool,
83    last_close: Option<Price>,
84    open: Option<Price>,
85    high: Option<Price>,
86    low: Option<Price>,
87    close: Option<Price>,
88    volume: Quantity,
89}
90
91impl BarBuilder {
92    /// Creates a new [`BarBuilder`] instance.
93    ///
94    /// # Panics
95    ///
96    /// This function panics:
97    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
98    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
99    #[must_use]
100    pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
101        correctness::check_equal(
102            bar_type.aggregation_source(),
103            AggregationSource::Internal,
104            "bar_type.aggregation_source",
105            "AggregationSource::Internal",
106        )
107        .expect(FAILED);
108
109        Self {
110            bar_type,
111            price_precision,
112            size_precision,
113            initialized: false,
114            ts_last: UnixNanos::default(),
115            count: 0,
116            partial_set: false,
117            last_close: None,
118            open: None,
119            high: None,
120            low: None,
121            close: None,
122            volume: Quantity::zero(size_precision),
123        }
124    }
125
126    /// Set the initial values for a partially completed bar.
127    pub fn set_partial(&mut self, partial_bar: Bar) {
128        if self.partial_set {
129            return; // Already updated
130        }
131
132        self.open = Some(partial_bar.open);
133
134        if self.high.is_none() || partial_bar.high > self.high.unwrap() {
135            self.high = Some(partial_bar.high);
136        }
137
138        if self.low.is_none() || partial_bar.low < self.low.unwrap() {
139            self.low = Some(partial_bar.low);
140        }
141
142        if self.close.is_none() {
143            self.close = Some(partial_bar.close);
144        }
145
146        self.volume = partial_bar.volume;
147
148        if self.ts_last == 0 {
149            self.ts_last = partial_bar.ts_init;
150        }
151
152        self.partial_set = true;
153        self.initialized = true;
154    }
155
156    /// Update the bar builder.
157    pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
158        if ts_event < self.ts_last {
159            return; // Not applicable
160        }
161
162        if self.open.is_none() {
163            self.open = Some(price);
164            self.high = Some(price);
165            self.low = Some(price);
166            self.initialized = true;
167        } else {
168            if price > self.high.unwrap() {
169                self.high = Some(price);
170            }
171            if price < self.low.unwrap() {
172                self.low = Some(price);
173            }
174        }
175
176        self.close = Some(price);
177        self.volume = self.volume.add(size);
178        self.count += 1;
179        self.ts_last = ts_event;
180    }
181
182    pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
183        if ts_init < self.ts_last {
184            return; // Not applicable
185        }
186
187        if self.open.is_none() {
188            self.open = Some(bar.open);
189            self.high = Some(bar.high);
190            self.low = Some(bar.low);
191            self.initialized = true;
192        } else {
193            if bar.high > self.high.unwrap() {
194                self.high = Some(bar.high);
195            }
196            if bar.low < self.low.unwrap() {
197                self.low = Some(bar.low);
198            }
199        }
200
201        self.close = Some(bar.close);
202        self.volume = self.volume.add(volume);
203        self.count += 1;
204        self.ts_last = ts_init;
205    }
206
207    /// Reset the bar builder.
208    ///
209    /// All stateful fields are reset to their initial value.
210    pub fn reset(&mut self) {
211        self.open = None;
212        self.high = None;
213        self.low = None;
214        self.volume = Quantity::zero(self.size_precision);
215        self.count = 0;
216    }
217
218    /// Return the aggregated bar and reset.
219    pub fn build_now(&mut self) -> Bar {
220        self.build(self.ts_last, self.ts_last)
221    }
222
223    /// Return the aggregated bar with the given closing timestamp, and reset.
224    pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
225        if self.open.is_none() {
226            self.open = self.last_close;
227            self.high = self.last_close;
228            self.low = self.last_close;
229            self.close = self.last_close;
230        }
231
232        // SAFETY: The open was checked, so we can assume all prices are Some
233        let bar = Bar::new(
234            self.bar_type,
235            self.open.unwrap(),
236            self.high.unwrap(),
237            self.low.unwrap(),
238            self.close.unwrap(),
239            self.volume,
240            ts_event,
241            ts_init,
242        );
243
244        self.last_close = self.close;
245        self.reset();
246        bar
247    }
248}
249
250/// Provides a means of aggregating specified bar types and sending to a registered handler.
251pub struct BarAggregatorCore<H>
252where
253    H: FnMut(Bar),
254{
255    bar_type: BarType,
256    builder: BarBuilder,
257    handler: H,
258    handler_backup: Option<H>,
259    batch_handler: Option<Box<dyn FnMut(Bar)>>,
260    await_partial: bool,
261    is_running: bool,
262    batch_mode: bool,
263}
264
265impl<H> BarAggregatorCore<H>
266where
267    H: FnMut(Bar),
268{
269    /// Creates a new [`BarAggregatorCore`] instance.
270    ///
271    /// # Panics
272    ///
273    /// This function panics:
274    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
275    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
276    pub fn new(
277        bar_type: BarType,
278        price_precision: u8,
279        size_precision: u8,
280        handler: H,
281        await_partial: bool,
282    ) -> Self {
283        Self {
284            bar_type,
285            builder: BarBuilder::new(bar_type, price_precision, size_precision),
286            handler,
287            handler_backup: None,
288            batch_handler: None,
289            await_partial,
290            is_running: false,
291            batch_mode: false,
292        }
293    }
294
295    pub const fn set_await_partial(&mut self, value: bool) {
296        self.await_partial = value;
297    }
298
299    pub const fn set_is_running(&mut self, value: bool) {
300        self.is_running = value;
301    }
302
303    /// Set the initial values for a partially completed bar.
304    pub fn set_partial(&mut self, partial_bar: Bar) {
305        self.builder.set_partial(partial_bar);
306    }
307
308    fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
309        self.builder.update(price, size, ts_event);
310    }
311
312    fn build_now_and_send(&mut self) {
313        let bar = self.builder.build_now();
314        (self.handler)(bar);
315    }
316
317    fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
318        let bar = self.builder.build(ts_event, ts_init);
319        if self.batch_mode {
320            if let Some(handler) = &mut self.batch_handler {
321                handler(bar);
322            }
323        } else {
324            (self.handler)(bar);
325        }
326    }
327
328    pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
329        self.batch_mode = true;
330        self.batch_handler = Some(handler);
331    }
332
333    pub fn stop_batch_update(&mut self) {
334        self.batch_mode = false;
335        if let Some(handler) = self.handler_backup.take() {
336            self.handler = handler;
337        }
338    }
339}
340
341/// Provides a means of building tick bars aggregated from quote and trades.
342///
343/// When received tick count reaches the step threshold of the bar
344/// specification, then a bar is created and sent to the handler.
345pub struct TickBarAggregator<H>
346where
347    H: FnMut(Bar),
348{
349    core: BarAggregatorCore<H>,
350    cum_value: f64,
351}
352
353impl<H> TickBarAggregator<H>
354where
355    H: FnMut(Bar),
356{
357    /// Creates a new [`TickBarAggregator`] instance.
358    ///
359    /// # Panics
360    ///
361    /// This function panics:
362    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
363    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
364    pub fn new(
365        bar_type: BarType,
366        price_precision: u8,
367        size_precision: u8,
368        handler: H,
369        await_partial: bool,
370    ) -> Self {
371        Self {
372            core: BarAggregatorCore::new(
373                bar_type,
374                price_precision,
375                size_precision,
376                handler,
377                await_partial,
378            ),
379            cum_value: 0.0,
380        }
381    }
382}
383
384impl<H> BarAggregator for TickBarAggregator<H>
385where
386    H: FnMut(Bar),
387{
388    fn bar_type(&self) -> BarType {
389        self.core.bar_type
390    }
391
392    fn is_running(&self) -> bool {
393        self.core.is_running
394    }
395
396    fn set_await_partial(&mut self, value: bool) {
397        self.core.await_partial = value;
398    }
399
400    fn set_is_running(&mut self, value: bool) {
401        self.core.is_running = value;
402    }
403
404    /// Apply the given update to the aggregator.
405    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
406        self.core.apply_update(price, size, ts_event);
407        let spec = self.core.bar_type.spec();
408
409        if self.core.builder.count >= spec.step.get() {
410            self.core.build_now_and_send();
411        }
412    }
413
414    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
415        let mut volume_update = volume;
416        let average_price = Price::new(
417            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
418            self.core.builder.price_precision,
419        );
420
421        while volume_update.as_f64() > 0.0 {
422            let value_update = average_price.as_f64() * volume_update.as_f64();
423            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
424                self.cum_value += value_update;
425                self.core.builder.update_bar(bar, volume_update, ts_init);
426                break;
427            }
428
429            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
430            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
431            self.core.builder.update_bar(
432                bar,
433                Quantity::new(volume_diff, volume_update.precision),
434                ts_init,
435            );
436
437            self.core.build_now_and_send();
438            self.cum_value = 0.0;
439            volume_update = Quantity::new(
440                volume_update.as_f64() - volume_diff,
441                volume_update.precision,
442            );
443        }
444    }
445
446    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
447        self.core.start_batch_update(handler);
448    }
449
450    fn stop_batch_update(&mut self) {
451        self.core.stop_batch_update();
452    }
453}
454
455/// Provides a means of building volume bars aggregated from quote and trades.
456pub struct VolumeBarAggregator<H>
457where
458    H: FnMut(Bar),
459{
460    core: BarAggregatorCore<H>,
461}
462
463impl<H> VolumeBarAggregator<H>
464where
465    H: FnMut(Bar),
466{
467    /// Creates a new [`VolumeBarAggregator`] instance.
468    ///
469    /// # Panics
470    ///
471    /// This function panics:
472    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
473    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
474    pub fn new(
475        bar_type: BarType,
476        price_precision: u8,
477        size_precision: u8,
478        handler: H,
479        await_partial: bool,
480    ) -> Self {
481        Self {
482            core: BarAggregatorCore::new(
483                bar_type,
484                price_precision,
485                size_precision,
486                handler,
487                await_partial,
488            ),
489        }
490    }
491}
492
493impl<H> BarAggregator for VolumeBarAggregator<H>
494where
495    H: FnMut(Bar),
496{
497    fn bar_type(&self) -> BarType {
498        self.core.bar_type
499    }
500
501    fn is_running(&self) -> bool {
502        self.core.is_running
503    }
504
505    fn set_await_partial(&mut self, value: bool) {
506        self.core.await_partial = value;
507    }
508
509    fn set_is_running(&mut self, value: bool) {
510        self.core.is_running = value;
511    }
512
513    /// Apply the given update to the aggregator.
514    #[allow(unused_assignments)] // Temp for development
515    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
516        let mut raw_size_update = size.raw;
517        let spec = self.core.bar_type.spec();
518        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
519        let mut raw_size_diff = 0;
520
521        while raw_size_update > 0 {
522            if self.core.builder.volume.raw + raw_size_update < raw_step {
523                self.core.apply_update(
524                    price,
525                    Quantity::from_raw(raw_size_update, size.precision),
526                    ts_event,
527                );
528                break;
529            }
530
531            raw_size_diff = raw_step - self.core.builder.volume.raw;
532            self.core.apply_update(
533                price,
534                Quantity::from_raw(raw_size_diff, size.precision),
535                ts_event,
536            );
537
538            self.core.build_now_and_send();
539            raw_size_update -= raw_size_diff;
540        }
541    }
542
543    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
544        let mut raw_volume_update = volume.raw;
545        let spec = self.core.bar_type.spec();
546        let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
547        let mut _raw_volume_diff = 0;
548
549        while raw_volume_update > 0 {
550            if self.core.builder.volume.raw + raw_volume_update < raw_step {
551                self.core.builder.update_bar(
552                    bar,
553                    Quantity::from_raw(raw_volume_update, volume.precision),
554                    ts_init,
555                );
556                break;
557            }
558
559            _raw_volume_diff = raw_step - self.core.builder.volume.raw;
560            self.core.builder.update_bar(
561                bar,
562                Quantity::from_raw(_raw_volume_diff, volume.precision),
563                ts_init,
564            );
565
566            self.core.build_now_and_send();
567            raw_volume_update -= _raw_volume_diff;
568        }
569    }
570
571    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
572        self.core.start_batch_update(handler);
573    }
574
575    fn stop_batch_update(&mut self) {
576        self.core.stop_batch_update();
577    }
578}
579
580/// Provides a means of building value bars aggregated from quote and trades.
581///
582/// When received value reaches the step threshold of the bar
583/// specification, then a bar is created and sent to the handler.
584pub struct ValueBarAggregator<H>
585where
586    H: FnMut(Bar),
587{
588    core: BarAggregatorCore<H>,
589    cum_value: f64,
590}
591
592impl<H> ValueBarAggregator<H>
593where
594    H: FnMut(Bar),
595{
596    /// Creates a new [`ValueBarAggregator`] instance.
597    ///
598    /// # Panics
599    ///
600    /// This function panics:
601    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
602    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
603    pub fn new(
604        bar_type: BarType,
605        price_precision: u8,
606        size_precision: u8,
607        handler: H,
608        await_partial: bool,
609    ) -> Self {
610        Self {
611            core: BarAggregatorCore::new(
612                bar_type,
613                price_precision,
614                size_precision,
615                handler,
616                await_partial,
617            ),
618            cum_value: 0.0,
619        }
620    }
621
622    #[must_use]
623    /// Returns the cumulative value for the aggregator.
624    pub const fn get_cumulative_value(&self) -> f64 {
625        self.cum_value
626    }
627}
628
629impl<H> BarAggregator for ValueBarAggregator<H>
630where
631    H: FnMut(Bar),
632{
633    fn bar_type(&self) -> BarType {
634        self.core.bar_type
635    }
636
637    fn is_running(&self) -> bool {
638        self.core.is_running
639    }
640
641    fn set_await_partial(&mut self, value: bool) {
642        self.core.await_partial = value;
643    }
644
645    fn set_is_running(&mut self, value: bool) {
646        self.core.is_running = value;
647    }
648
649    /// Apply the given update to the aggregator.
650    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
651        let mut size_update = size.as_f64();
652        let spec = self.core.bar_type.spec();
653
654        while size_update > 0.0 {
655            let value_update = price.as_f64() * size_update;
656            if self.cum_value + value_update < spec.step.get() as f64 {
657                self.cum_value += value_update;
658                self.core
659                    .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
660                break;
661            }
662
663            let value_diff = spec.step.get() as f64 - self.cum_value;
664            let size_diff = size_update * (value_diff / value_update);
665            self.core
666                .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
667
668            self.core.build_now_and_send();
669            self.cum_value = 0.0;
670            size_update -= size_diff;
671        }
672    }
673
674    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
675        let mut volume_update = volume;
676        let average_price = Price::new(
677            (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
678            self.core.builder.price_precision,
679        );
680
681        while volume_update.as_f64() > 0.0 {
682            let value_update = average_price.as_f64() * volume_update.as_f64();
683            if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
684                self.cum_value += value_update;
685                self.core.builder.update_bar(bar, volume_update, ts_init);
686                break;
687            }
688
689            let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
690            let volume_diff = volume_update.as_f64() * (value_diff / value_update);
691            self.core.builder.update_bar(
692                bar,
693                Quantity::new(volume_diff, volume_update.precision),
694                ts_init,
695            );
696
697            self.core.build_now_and_send();
698            self.cum_value = 0.0;
699            volume_update = Quantity::new(
700                volume_update.as_f64() - volume_diff,
701                volume_update.precision,
702            );
703        }
704    }
705
706    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
707        self.core.start_batch_update(handler);
708    }
709
710    fn stop_batch_update(&mut self) {
711        self.core.stop_batch_update();
712    }
713}
714
715/// Provides a means of building time bars aggregated from quote and trades.
716///
717/// At each aggregation time interval, a bar is created and sent to the handler.
718pub struct TimeBarAggregator<H>
719where
720    H: FnMut(Bar),
721{
722    core: BarAggregatorCore<H>,
723    clock: Rc<RefCell<dyn Clock>>,
724    build_with_no_updates: bool,
725    timestamp_on_close: bool,
726    is_left_open: bool,
727    build_on_next_tick: bool,
728    stored_open_ns: UnixNanos,
729    stored_close_ns: UnixNanos,
730    cached_update: Option<(Price, Quantity, u64)>,
731    timer_name: String,
732    interval: TimeDelta,
733    interval_ns: UnixNanos,
734    next_close_ns: UnixNanos,
735    composite_bar_build_delay: i32,
736    add_delay: bool,
737    batch_open_ns: UnixNanos,
738    batch_next_close_ns: UnixNanos,
739    time_bars_origin: Option<DateTime<Utc>>,
740}
741
742#[derive(Clone)]
743pub struct NewBarCallback<H: FnMut(Bar)> {
744    aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
745}
746
747impl<H: FnMut(Bar)> NewBarCallback<H> {
748    pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
749        Self { aggregator }
750    }
751}
752
753impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
754    fn from(value: NewBarCallback<H>) -> Self {
755        Self::Rust(Rc::new(move |event: TimeEvent| {
756            value.aggregator.borrow_mut().build_bar(event);
757        }))
758    }
759}
760
761impl<H> TimeBarAggregator<H>
762where
763    H: FnMut(Bar) + 'static,
764{
765    /// Creates a new [`TimeBarAggregator`] instance.
766    ///
767    /// # Panics
768    ///
769    /// This function panics:
770    /// - If `instrument.id` is not equal to the `bar_type.instrument_id`.
771    /// - If `bar_type.aggregation_source` is not equal to `AggregationSource::Internal`.
772    #[allow(clippy::too_many_arguments)]
773    pub fn new(
774        bar_type: BarType,
775        price_precision: u8,
776        size_precision: u8,
777        clock: Rc<RefCell<dyn Clock>>,
778        handler: H,
779        await_partial: bool,
780        build_with_no_updates: bool,
781        timestamp_on_close: bool,
782        interval_type: BarIntervalType,
783        time_bars_origin: Option<DateTime<Utc>>,
784        composite_bar_build_delay: i32,
785    ) -> Self {
786        let _is_left_open = match interval_type {
787            BarIntervalType::LeftOpen => true,
788            BarIntervalType::RightOpen => false,
789        };
790
791        // Change how we check for composite internally aggregated bars
792        let add_delay = if bar_type.is_composite() {
793            matches!(
794                bar_type.composite().aggregation_source(),
795                AggregationSource::Internal
796            )
797        } else {
798            false
799        };
800
801        let core = BarAggregatorCore::new(
802            bar_type,
803            price_precision,
804            size_precision,
805            handler,
806            await_partial,
807        );
808
809        Self {
810            core,
811            clock,
812            build_with_no_updates,
813            timestamp_on_close,
814            is_left_open: false,
815            build_on_next_tick: false,
816            stored_open_ns: UnixNanos::default(),
817            stored_close_ns: UnixNanos::default(),
818            cached_update: None,
819            timer_name: bar_type.to_string(),
820            interval: get_bar_interval(&bar_type),
821            interval_ns: get_bar_interval_ns(&bar_type),
822            next_close_ns: UnixNanos::default(),
823            composite_bar_build_delay,
824            add_delay,
825            batch_open_ns: UnixNanos::default(),
826            batch_next_close_ns: UnixNanos::default(),
827            time_bars_origin,
828        }
829    }
830
831    /// Starts the time bar aggregator.
832    pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
833        let now = self.clock.borrow().utc_now();
834        let start_time = get_time_bar_start(now, &self.bar_type());
835        let start_time_ns = UnixNanos::from(start_time.timestamp_nanos_opt().unwrap() as u64);
836
837        self.clock
838            .borrow_mut()
839            .set_timer_ns(
840                &self.timer_name,
841                self.interval_ns.as_u64(),
842                start_time_ns,
843                None,
844                Some(callback.into()),
845            )
846            .expect(FAILED);
847
848        log::debug!("Started timer {}", self.timer_name);
849        Ok(())
850    }
851
852    /// Stops the time bar aggregator.
853    pub fn stop(&mut self) {
854        self.clock.borrow_mut().cancel_timer(&self.timer_name);
855    }
856
857    fn batch_pre_update(&mut self, time_ns: UnixNanos) {
858        if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
859            let ts_init = self.batch_next_close_ns;
860
861            // Adjust timestamp based on interval type
862            let ts_event = if self.is_left_open {
863                if self.timestamp_on_close {
864                    self.batch_next_close_ns
865                } else {
866                    self.batch_open_ns
867                }
868            } else {
869                self.batch_open_ns
870            };
871
872            self.core.build_and_send(ts_event, ts_init);
873        }
874    }
875
876    fn batch_post_update(&mut self, time_ns: UnixNanos) {
877        // If not in batch mode and time matches next close, reset batch close
878        if !self.core.batch_mode
879            && time_ns == self.batch_next_close_ns
880            && time_ns > self.stored_open_ns
881        {
882            self.batch_next_close_ns = UnixNanos::default();
883            return;
884        }
885
886        if time_ns > self.batch_next_close_ns {
887            // Ensure batch times are coherent with last builder update
888            if self.bar_type().spec().aggregation == BarAggregation::Month {
889                // TODO: Handle monthly bars which need special date arithmetic
890                // This will need chrono/datetime handling
891            } else {
892                while self.batch_next_close_ns < time_ns {
893                    self.batch_next_close_ns += self.interval_ns;
894                }
895                self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
896            }
897        }
898
899        if time_ns == self.batch_next_close_ns {
900            // Adjust timestamp based on interval type
901            let ts_event = if self.is_left_open {
902                if self.timestamp_on_close {
903                    self.batch_next_close_ns
904                } else {
905                    self.batch_open_ns
906                }
907            } else {
908                self.batch_open_ns
909            };
910
911            self.core.build_and_send(ts_event, time_ns);
912            self.batch_open_ns = self.batch_next_close_ns;
913
914            if self.bar_type().spec().aggregation == BarAggregation::Month {
915                // TODO: Handle monthly bars increment
916            } else {
917                self.batch_next_close_ns += self.interval_ns;
918            }
919        }
920
921        // Delay resetting batch_next_close_ns to allow creating a last historical bar
922        // when transitioning to regular bars
923        if !self.core.batch_mode {
924            self.batch_next_close_ns = UnixNanos::default();
925        }
926    }
927
928    pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
929        self.core.batch_mode = true;
930
931        let dt = Utc.timestamp_nanos(time_ns.as_u64() as i64);
932        let mut start_dt = get_time_bar_start(dt, &self.bar_type());
933        self.batch_open_ns = UnixNanos::from(
934            start_dt
935                .timestamp_nanos_opt()
936                .expect("Timestamp out of range") as u64,
937        );
938
939        let spec = self.bar_type().spec();
940        let step = spec.step.get() as isize;
941
942        if spec.aggregation == BarAggregation::Month {
943            if self.batch_open_ns == time_ns {
944                start_dt = subtract_n_months(start_dt, step).expect("Failed subtracting months");
945                self.batch_open_ns = UnixNanos::from(
946                    start_dt
947                        .timestamp_nanos_opt()
948                        .expect("Bad month subtraction") as u64,
949                );
950            }
951
952            let next_dt = add_n_months(start_dt, step).expect("Failed adding months");
953            self.batch_next_close_ns =
954                UnixNanos::from(next_dt.timestamp_nanos_opt().expect("Bad month addition") as u64);
955        } else {
956            if self.batch_open_ns == time_ns {
957                self.batch_open_ns = UnixNanos::from(
958                    self.batch_open_ns
959                        .as_u64()
960                        .saturating_sub(self.interval_ns.as_u64()),
961                );
962            }
963
964            self.batch_next_close_ns = UnixNanos::from(
965                self.batch_open_ns
966                    .as_u64()
967                    .saturating_add(self.interval_ns.as_u64()),
968            );
969        }
970    }
971
972    fn build_bar(&mut self, event: TimeEvent) {
973        if !self.core.builder.initialized {
974            self.build_on_next_tick = true;
975            self.stored_close_ns = self.next_close_ns;
976            return;
977        }
978
979        if !self.build_with_no_updates && self.core.builder.count == 0 {
980            return;
981        }
982
983        let ts_init = event.ts_event;
984        let ts_event = if self.is_left_open {
985            if self.timestamp_on_close {
986                event.ts_event
987            } else {
988                self.stored_open_ns
989            }
990        } else {
991            self.stored_open_ns
992        };
993
994        self.core.build_and_send(ts_event, ts_init);
995        self.stored_open_ns = event.ts_event;
996        self.next_close_ns = self.clock.borrow().next_time_ns(&self.timer_name);
997    }
998}
999
1000impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1001where
1002    H: FnMut(Bar) + 'static,
1003{
1004    fn bar_type(&self) -> BarType {
1005        self.core.bar_type
1006    }
1007
1008    fn is_running(&self) -> bool {
1009        self.core.is_running
1010    }
1011
1012    fn set_await_partial(&mut self, value: bool) {
1013        self.core.await_partial = value;
1014    }
1015
1016    fn set_is_running(&mut self, value: bool) {
1017        self.core.is_running = value;
1018    }
1019
1020    fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1021        self.core.apply_update(price, size, ts_event);
1022        if self.build_on_next_tick {
1023            let ts_init = ts_event;
1024
1025            let ts_event = if self.is_left_open {
1026                if self.timestamp_on_close {
1027                    self.stored_close_ns
1028                } else {
1029                    self.stored_open_ns
1030                }
1031            } else {
1032                self.stored_open_ns
1033            };
1034
1035            self.core.build_and_send(ts_event, ts_init);
1036            self.build_on_next_tick = false;
1037            self.stored_close_ns = UnixNanos::default();
1038        }
1039    }
1040
1041    fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1042        if self.batch_next_close_ns != 0 {
1043            self.batch_pre_update(ts_init);
1044        }
1045
1046        self.core.builder.update_bar(bar, volume, ts_init);
1047
1048        if self.build_on_next_tick {
1049            if ts_init <= self.stored_close_ns {
1050                let ts_event = if self.is_left_open {
1051                    if self.timestamp_on_close {
1052                        self.stored_close_ns
1053                    } else {
1054                        self.stored_open_ns
1055                    }
1056                } else {
1057                    self.stored_open_ns
1058                };
1059
1060                self.core.build_and_send(ts_event, ts_init);
1061            }
1062
1063            // Reset flag and clear stored close
1064            self.build_on_next_tick = false;
1065            self.stored_close_ns = UnixNanos::default();
1066        }
1067
1068        if self.batch_next_close_ns != 0 {
1069            self.batch_post_update(ts_init);
1070        }
1071    }
1072
1073    fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
1074        self.core.start_batch_update(handler);
1075    }
1076
1077    fn stop_batch_update(&mut self) {
1078        self.core.stop_batch_update();
1079    }
1080}
1081
1082////////////////////////////////////////////////////////////////////////////////
1083// Tests
1084////////////////////////////////////////////////////////////////////////////////
1085#[cfg(test)]
1086mod tests {
1087    use std::sync::{Arc, Mutex};
1088
1089    use nautilus_common::clock::TestClock;
1090    use nautilus_core::UUID4;
1091    use nautilus_model::{
1092        data::{BarSpecification, BarType},
1093        enums::{AggregationSource, BarAggregation, PriceType},
1094        instruments::{stubs::*, CurrencyPair, Equity, InstrumentAny},
1095        types::{Price, Quantity},
1096    };
1097    use rstest::rstest;
1098    use ustr::Ustr;
1099
1100    use super::*;
1101
1102    #[rstest]
1103    fn test_bar_builder_initialization(equity_aapl: Equity) {
1104        let instrument = InstrumentAny::Equity(equity_aapl);
1105        let bar_type = BarType::new(
1106            instrument.id(),
1107            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1108            AggregationSource::Internal,
1109        );
1110        let builder = BarBuilder::new(
1111            bar_type,
1112            instrument.price_precision(),
1113            instrument.size_precision(),
1114        );
1115
1116        assert!(!builder.initialized);
1117        assert_eq!(builder.ts_last, 0);
1118        assert_eq!(builder.count, 0);
1119    }
1120
1121    #[rstest]
1122    fn test_set_partial_update(equity_aapl: Equity) {
1123        let instrument = InstrumentAny::Equity(equity_aapl);
1124        let bar_type = BarType::new(
1125            instrument.id(),
1126            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1127            AggregationSource::Internal,
1128        );
1129        let mut builder = BarBuilder::new(
1130            bar_type,
1131            instrument.price_precision(),
1132            instrument.size_precision(),
1133        );
1134
1135        let partial_bar = Bar::new(
1136            bar_type,
1137            Price::from("101.00"),
1138            Price::from("102.00"),
1139            Price::from("100.00"),
1140            Price::from("101.00"),
1141            Quantity::from(100),
1142            UnixNanos::from(1),
1143            UnixNanos::from(2),
1144        );
1145
1146        builder.set_partial(partial_bar);
1147        let bar = builder.build_now();
1148
1149        assert_eq!(bar.open, partial_bar.open);
1150        assert_eq!(bar.high, partial_bar.high);
1151        assert_eq!(bar.low, partial_bar.low);
1152        assert_eq!(bar.close, partial_bar.close);
1153        assert_eq!(bar.volume, partial_bar.volume);
1154        assert_eq!(builder.ts_last, 2);
1155    }
1156
1157    #[rstest]
1158    fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1159        let instrument = InstrumentAny::Equity(equity_aapl);
1160        let bar_type = BarType::new(
1161            instrument.id(),
1162            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1163            AggregationSource::Internal,
1164        );
1165        let mut builder = BarBuilder::new(
1166            bar_type,
1167            instrument.price_precision(),
1168            instrument.size_precision(),
1169        );
1170
1171        builder.update(
1172            Price::from("100.00"),
1173            Quantity::from(1),
1174            UnixNanos::from(1000),
1175        );
1176        builder.update(
1177            Price::from("95.00"),
1178            Quantity::from(1),
1179            UnixNanos::from(2000),
1180        );
1181        builder.update(
1182            Price::from("105.00"),
1183            Quantity::from(1),
1184            UnixNanos::from(3000),
1185        );
1186
1187        let bar = builder.build_now();
1188        assert!(bar.high > bar.low);
1189        assert_eq!(bar.open, Price::from("100.00"));
1190        assert_eq!(bar.high, Price::from("105.00"));
1191        assert_eq!(bar.low, Price::from("95.00"));
1192        assert_eq!(bar.close, Price::from("105.00"));
1193    }
1194
1195    #[rstest]
1196    fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1197        let instrument = InstrumentAny::Equity(equity_aapl);
1198        let bar_type = BarType::new(
1199            instrument.id(),
1200            BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1201            AggregationSource::Internal,
1202        );
1203        let mut builder = BarBuilder::new(
1204            bar_type,
1205            instrument.price_precision(),
1206            instrument.size_precision(),
1207        );
1208
1209        builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1210        builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1211
1212        assert_eq!(builder.ts_last, 1_000);
1213        assert_eq!(builder.count, 1);
1214    }
1215
1216    #[rstest]
1217    fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1218        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1219        let bar_type = BarType::new(
1220            instrument.id(),
1221            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1222            AggregationSource::Internal,
1223        );
1224        let mut builder = BarBuilder::new(
1225            bar_type,
1226            instrument.price_precision(),
1227            instrument.size_precision(),
1228        );
1229
1230        let partial_bar = Bar::new(
1231            bar_type,
1232            Price::from("1.00001"),
1233            Price::from("1.00010"),
1234            Price::from("1.00000"),
1235            Price::from("1.00002"),
1236            Quantity::from(1),
1237            UnixNanos::from(1_000_000_000),
1238            UnixNanos::from(2_000_000_000),
1239        );
1240
1241        builder.set_partial(partial_bar);
1242        let bar = builder.build_now();
1243
1244        assert_eq!(bar.open, Price::from("1.00001"));
1245        assert_eq!(bar.high, Price::from("1.00010"));
1246        assert_eq!(bar.low, Price::from("1.00000"));
1247        assert_eq!(bar.close, Price::from("1.00002"));
1248        assert_eq!(bar.volume, Quantity::from(1));
1249        assert_eq!(bar.ts_init, 2_000_000_000);
1250        assert_eq!(builder.ts_last, 2_000_000_000);
1251    }
1252
1253    #[rstest]
1254    fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1255        let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1256        let bar_type = BarType::new(
1257            instrument.id(),
1258            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1259            AggregationSource::Internal,
1260        );
1261        let mut builder = BarBuilder::new(
1262            bar_type,
1263            instrument.price_precision(),
1264            instrument.size_precision(),
1265        );
1266
1267        let partial_bar1 = Bar::new(
1268            bar_type,
1269            Price::from("1.00001"),
1270            Price::from("1.00010"),
1271            Price::from("1.00000"),
1272            Price::from("1.00002"),
1273            Quantity::from(1),
1274            UnixNanos::from(1_000_000_000),
1275            UnixNanos::from(1_000_000_000),
1276        );
1277
1278        let partial_bar2 = Bar::new(
1279            bar_type,
1280            Price::from("2.00001"),
1281            Price::from("2.00010"),
1282            Price::from("2.00000"),
1283            Price::from("2.00002"),
1284            Quantity::from(2),
1285            UnixNanos::from(3_000_000_000),
1286            UnixNanos::from(3_000_000_000),
1287        );
1288
1289        builder.set_partial(partial_bar1);
1290        builder.set_partial(partial_bar2);
1291        let bar = builder.build(
1292            UnixNanos::from(4_000_000_000),
1293            UnixNanos::from(4_000_000_000),
1294        );
1295
1296        assert_eq!(bar.open, Price::from("1.00001"));
1297        assert_eq!(bar.high, Price::from("1.00010"));
1298        assert_eq!(bar.low, Price::from("1.00000"));
1299        assert_eq!(bar.close, Price::from("1.00002"));
1300        assert_eq!(bar.volume, Quantity::from(1));
1301        assert_eq!(bar.ts_init, 4_000_000_000);
1302        assert_eq!(builder.ts_last, 1_000_000_000);
1303    }
1304
1305    #[rstest]
1306    fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1307        let instrument = InstrumentAny::Equity(equity_aapl);
1308        let bar_type = BarType::new(
1309            instrument.id(),
1310            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1311            AggregationSource::Internal,
1312        );
1313        let mut builder = BarBuilder::new(
1314            bar_type,
1315            instrument.price_precision(),
1316            instrument.size_precision(),
1317        );
1318
1319        builder.update(
1320            Price::from("1.00000"),
1321            Quantity::from(1),
1322            UnixNanos::default(),
1323        );
1324
1325        assert!(builder.initialized);
1326        assert_eq!(builder.ts_last, 0);
1327        assert_eq!(builder.count, 1);
1328    }
1329
1330    #[rstest]
1331    fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1332        equity_aapl: Equity,
1333    ) {
1334        let instrument = InstrumentAny::Equity(equity_aapl);
1335        let bar_type = BarType::new(
1336            instrument.id(),
1337            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1338            AggregationSource::Internal,
1339        );
1340        let mut builder = BarBuilder::new(bar_type, 2, 0);
1341
1342        builder.update(
1343            Price::from("1.00000"),
1344            Quantity::from(1),
1345            UnixNanos::from(1_000),
1346        );
1347        builder.update(
1348            Price::from("1.00001"),
1349            Quantity::from(1),
1350            UnixNanos::from(500),
1351        );
1352
1353        assert!(builder.initialized);
1354        assert_eq!(builder.ts_last, 1_000);
1355        assert_eq!(builder.count, 1);
1356    }
1357
1358    #[rstest]
1359    fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1360        let instrument = InstrumentAny::Equity(equity_aapl);
1361        let bar_type = BarType::new(
1362            instrument.id(),
1363            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1364            AggregationSource::Internal,
1365        );
1366        let mut builder = BarBuilder::new(
1367            bar_type,
1368            instrument.price_precision(),
1369            instrument.size_precision(),
1370        );
1371
1372        for _ in 0..5 {
1373            builder.update(
1374                Price::from("1.00000"),
1375                Quantity::from(1),
1376                UnixNanos::from(1_000),
1377            );
1378        }
1379
1380        assert_eq!(builder.count, 5);
1381    }
1382
1383    #[rstest]
1384    #[should_panic]
1385    fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1386        let instrument = InstrumentAny::Equity(equity_aapl);
1387        let bar_type = BarType::new(
1388            instrument.id(),
1389            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1390            AggregationSource::Internal,
1391        );
1392        let mut builder = BarBuilder::new(
1393            bar_type,
1394            instrument.price_precision(),
1395            instrument.size_precision(),
1396        );
1397        let _ = builder.build_now();
1398    }
1399
1400    #[rstest]
1401    fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1402        let instrument = InstrumentAny::Equity(equity_aapl);
1403        let bar_type = BarType::new(
1404            instrument.id(),
1405            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1406            AggregationSource::Internal,
1407        );
1408        let mut builder = BarBuilder::new(
1409            bar_type,
1410            instrument.price_precision(),
1411            instrument.size_precision(),
1412        );
1413
1414        builder.update(
1415            Price::from("1.00001"),
1416            Quantity::from(2),
1417            UnixNanos::default(),
1418        );
1419        builder.update(
1420            Price::from("1.00002"),
1421            Quantity::from(2),
1422            UnixNanos::default(),
1423        );
1424        builder.update(
1425            Price::from("1.00000"),
1426            Quantity::from(1),
1427            UnixNanos::from(1_000_000_000),
1428        );
1429
1430        let bar = builder.build_now();
1431
1432        assert_eq!(bar.open, Price::from("1.00001"));
1433        assert_eq!(bar.high, Price::from("1.00002"));
1434        assert_eq!(bar.low, Price::from("1.00000"));
1435        assert_eq!(bar.close, Price::from("1.00000"));
1436        assert_eq!(bar.volume, Quantity::from(5));
1437        assert_eq!(bar.ts_init, 1_000_000_000);
1438        assert_eq!(builder.ts_last, 1_000_000_000);
1439        assert_eq!(builder.count, 0);
1440    }
1441
1442    #[rstest]
1443    fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1444        let instrument = InstrumentAny::Equity(equity_aapl);
1445        let bar_type = BarType::new(
1446            instrument.id(),
1447            BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1448            AggregationSource::Internal,
1449        );
1450        let mut builder = BarBuilder::new(bar_type, 2, 0);
1451
1452        builder.update(
1453            Price::from("1.00001"),
1454            Quantity::from(1),
1455            UnixNanos::default(),
1456        );
1457        builder.build_now();
1458
1459        builder.update(
1460            Price::from("1.00000"),
1461            Quantity::from(1),
1462            UnixNanos::default(),
1463        );
1464        builder.update(
1465            Price::from("1.00003"),
1466            Quantity::from(1),
1467            UnixNanos::default(),
1468        );
1469        builder.update(
1470            Price::from("1.00002"),
1471            Quantity::from(1),
1472            UnixNanos::default(),
1473        );
1474
1475        let bar = builder.build_now();
1476
1477        assert_eq!(bar.open, Price::from("1.00000"));
1478        assert_eq!(bar.high, Price::from("1.00003"));
1479        assert_eq!(bar.low, Price::from("1.00000"));
1480        assert_eq!(bar.close, Price::from("1.00002"));
1481        assert_eq!(bar.volume, Quantity::from(3));
1482    }
1483
1484    #[rstest]
1485    fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1486        let instrument = InstrumentAny::Equity(equity_aapl);
1487        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1488        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1489        let handler = Arc::new(Mutex::new(Vec::new()));
1490        let handler_clone = Arc::clone(&handler);
1491
1492        let mut aggregator = TickBarAggregator::new(
1493            bar_type,
1494            instrument.price_precision(),
1495            instrument.size_precision(),
1496            move |bar: Bar| {
1497                let mut handler_guard = handler_clone.lock().unwrap();
1498                handler_guard.push(bar);
1499            },
1500            false,
1501        );
1502
1503        let trade = TradeTick::default();
1504        aggregator.handle_trade(trade);
1505
1506        let handler_guard = handler.lock().unwrap();
1507        assert_eq!(handler_guard.len(), 0);
1508    }
1509
1510    #[rstest]
1511    fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1512        let instrument = InstrumentAny::Equity(equity_aapl);
1513        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1514        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1515        let handler = Arc::new(Mutex::new(Vec::new()));
1516        let handler_clone = Arc::clone(&handler);
1517
1518        let mut aggregator = TickBarAggregator::new(
1519            bar_type,
1520            instrument.price_precision(),
1521            instrument.size_precision(),
1522            move |bar: Bar| {
1523                let mut handler_guard = handler_clone.lock().unwrap();
1524                handler_guard.push(bar);
1525            },
1526            false,
1527        );
1528
1529        let trade = TradeTick::default();
1530        aggregator.handle_trade(trade);
1531        aggregator.handle_trade(trade);
1532        aggregator.handle_trade(trade);
1533
1534        let handler_guard = handler.lock().unwrap();
1535        let bar = handler_guard.first().unwrap();
1536        assert_eq!(handler_guard.len(), 1);
1537        assert_eq!(bar.open, trade.price);
1538        assert_eq!(bar.high, trade.price);
1539        assert_eq!(bar.low, trade.price);
1540        assert_eq!(bar.close, trade.price);
1541        assert_eq!(bar.volume, Quantity::from(300000));
1542        assert_eq!(bar.ts_event, trade.ts_event);
1543        assert_eq!(bar.ts_init, trade.ts_init);
1544    }
1545
1546    #[rstest]
1547    fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1548        let instrument = InstrumentAny::Equity(equity_aapl);
1549        let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1550        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1551        let handler = Arc::new(Mutex::new(Vec::new()));
1552        let handler_clone = Arc::clone(&handler);
1553
1554        let mut aggregator = TickBarAggregator::new(
1555            bar_type,
1556            instrument.price_precision(),
1557            instrument.size_precision(),
1558            move |bar: Bar| {
1559                let mut handler_guard = handler_clone.lock().unwrap();
1560                handler_guard.push(bar);
1561            },
1562            false,
1563        );
1564
1565        aggregator.update(
1566            Price::from("1.00001"),
1567            Quantity::from(1),
1568            UnixNanos::default(),
1569        );
1570        aggregator.update(
1571            Price::from("1.00002"),
1572            Quantity::from(1),
1573            UnixNanos::from(1000),
1574        );
1575        aggregator.update(
1576            Price::from("1.00003"),
1577            Quantity::from(1),
1578            UnixNanos::from(2000),
1579        );
1580
1581        let handler_guard = handler.lock().unwrap();
1582        assert_eq!(handler_guard.len(), 1);
1583
1584        let bar = handler_guard.first().unwrap();
1585        assert_eq!(bar.open, Price::from("1.00001"));
1586        assert_eq!(bar.high, Price::from("1.00003"));
1587        assert_eq!(bar.low, Price::from("1.00001"));
1588        assert_eq!(bar.close, Price::from("1.00003"));
1589        assert_eq!(bar.volume, Quantity::from(3));
1590    }
1591
1592    #[rstest]
1593    fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1594        let instrument = InstrumentAny::Equity(equity_aapl);
1595        let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1596        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1597        let handler = Arc::new(Mutex::new(Vec::new()));
1598        let handler_clone = Arc::clone(&handler);
1599
1600        let mut aggregator = TickBarAggregator::new(
1601            bar_type,
1602            instrument.price_precision(),
1603            instrument.size_precision(),
1604            move |bar: Bar| {
1605                let mut handler_guard = handler_clone.lock().unwrap();
1606                handler_guard.push(bar);
1607            },
1608            false,
1609        );
1610
1611        aggregator.update(
1612            Price::from("1.00001"),
1613            Quantity::from(1),
1614            UnixNanos::default(),
1615        );
1616        aggregator.update(
1617            Price::from("1.00002"),
1618            Quantity::from(1),
1619            UnixNanos::from(1000),
1620        );
1621        aggregator.update(
1622            Price::from("1.00003"),
1623            Quantity::from(1),
1624            UnixNanos::from(2000),
1625        );
1626        aggregator.update(
1627            Price::from("1.00004"),
1628            Quantity::from(1),
1629            UnixNanos::from(3000),
1630        );
1631
1632        let handler_guard = handler.lock().unwrap();
1633        assert_eq!(handler_guard.len(), 2);
1634
1635        let bar1 = &handler_guard[0];
1636        assert_eq!(bar1.open, Price::from("1.00001"));
1637        assert_eq!(bar1.close, Price::from("1.00002"));
1638        assert_eq!(bar1.volume, Quantity::from(2));
1639
1640        let bar2 = &handler_guard[1];
1641        assert_eq!(bar2.open, Price::from("1.00003"));
1642        assert_eq!(bar2.close, Price::from("1.00004"));
1643        assert_eq!(bar2.volume, Quantity::from(2));
1644    }
1645
1646    #[rstest]
1647    fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1648        let instrument = InstrumentAny::Equity(equity_aapl);
1649        let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1650        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1651        let handler = Arc::new(Mutex::new(Vec::new()));
1652        let handler_clone = Arc::clone(&handler);
1653
1654        let mut aggregator = VolumeBarAggregator::new(
1655            bar_type,
1656            instrument.price_precision(),
1657            instrument.size_precision(),
1658            move |bar: Bar| {
1659                let mut handler_guard = handler_clone.lock().unwrap();
1660                handler_guard.push(bar);
1661            },
1662            false,
1663        );
1664
1665        aggregator.update(
1666            Price::from("1.00001"),
1667            Quantity::from(25),
1668            UnixNanos::default(),
1669        );
1670
1671        let handler_guard = handler.lock().unwrap();
1672        assert_eq!(handler_guard.len(), 2);
1673        let bar1 = &handler_guard[0];
1674        assert_eq!(bar1.volume, Quantity::from(10));
1675        let bar2 = &handler_guard[1];
1676        assert_eq!(bar2.volume, Quantity::from(10));
1677    }
1678
1679    #[rstest]
1680    fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1681        let instrument = InstrumentAny::Equity(equity_aapl);
1682        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); // $1000 value step
1683        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1684        let handler = Arc::new(Mutex::new(Vec::new()));
1685        let handler_clone = Arc::clone(&handler);
1686
1687        let mut aggregator = ValueBarAggregator::new(
1688            bar_type,
1689            instrument.price_precision(),
1690            instrument.size_precision(),
1691            move |bar: Bar| {
1692                let mut handler_guard = handler_clone.lock().unwrap();
1693                handler_guard.push(bar);
1694            },
1695            false,
1696        );
1697
1698        // Updates to reach value threshold: 100 * 5 + 100 * 5 = $1000
1699        aggregator.update(
1700            Price::from("100.00"),
1701            Quantity::from(5),
1702            UnixNanos::default(),
1703        );
1704        aggregator.update(
1705            Price::from("100.00"),
1706            Quantity::from(5),
1707            UnixNanos::from(1000),
1708        );
1709
1710        let handler_guard = handler.lock().unwrap();
1711        assert_eq!(handler_guard.len(), 1);
1712        let bar = handler_guard.first().unwrap();
1713        assert_eq!(bar.volume, Quantity::from(10));
1714    }
1715
1716    #[rstest]
1717    fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1718        let instrument = InstrumentAny::Equity(equity_aapl);
1719        let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1720        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1721        let handler = Arc::new(Mutex::new(Vec::new()));
1722        let handler_clone = Arc::clone(&handler);
1723
1724        let mut aggregator = ValueBarAggregator::new(
1725            bar_type,
1726            instrument.price_precision(),
1727            instrument.size_precision(),
1728            move |bar: Bar| {
1729                let mut handler_guard = handler_clone.lock().unwrap();
1730                handler_guard.push(bar);
1731            },
1732            false,
1733        );
1734
1735        // Single large update: $100 * 25 = $2500 (should create 2 bars)
1736        aggregator.update(
1737            Price::from("100.00"),
1738            Quantity::from(25),
1739            UnixNanos::default(),
1740        );
1741
1742        let handler_guard = handler.lock().unwrap();
1743        assert_eq!(handler_guard.len(), 2);
1744        let remaining_value = aggregator.get_cumulative_value();
1745        assert!(remaining_value < 1000.0); // Should be less than threshold
1746    }
1747
1748    #[rstest]
1749    fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1750        let instrument = InstrumentAny::Equity(equity_aapl);
1751        // One second bars
1752        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1753        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1754        let handler = Arc::new(Mutex::new(Vec::new()));
1755        let handler_clone = Arc::clone(&handler);
1756        let clock = Rc::new(RefCell::new(TestClock::new()));
1757
1758        let mut aggregator = TimeBarAggregator::new(
1759            bar_type,
1760            instrument.price_precision(),
1761            instrument.size_precision(),
1762            clock.clone(),
1763            move |bar: Bar| {
1764                let mut handler_guard = handler_clone.lock().unwrap();
1765                handler_guard.push(bar);
1766            },
1767            false,
1768            true,  // build_with_no_updates
1769            false, // timestamp_on_close
1770            BarIntervalType::LeftOpen,
1771            None, // time_bars_origin
1772            15,   // composite_bar_build_delay
1773        );
1774
1775        aggregator.update(
1776            Price::from("100.00"),
1777            Quantity::from(1),
1778            UnixNanos::default(),
1779        );
1780
1781        let next_sec = UnixNanos::from(1_000_000_000);
1782        clock.borrow_mut().set_time(next_sec);
1783
1784        let event = TimeEvent::new(
1785            Ustr::from("1-SECOND-LAST"),
1786            UUID4::new(),
1787            next_sec,
1788            next_sec,
1789        );
1790        aggregator.build_bar(event);
1791
1792        let handler_guard = handler.lock().unwrap();
1793        assert_eq!(handler_guard.len(), 1);
1794        let bar = handler_guard.first().unwrap();
1795        assert_eq!(bar.ts_event, UnixNanos::default());
1796        assert_eq!(bar.ts_init, next_sec);
1797    }
1798
1799    #[rstest]
1800    fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1801        let instrument = InstrumentAny::Equity(equity_aapl);
1802        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1803        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1804        let handler = Arc::new(Mutex::new(Vec::new()));
1805        let handler_clone = Arc::clone(&handler);
1806        let clock = Rc::new(RefCell::new(TestClock::new()));
1807
1808        let mut aggregator = TimeBarAggregator::new(
1809            bar_type,
1810            instrument.price_precision(),
1811            instrument.size_precision(),
1812            clock.clone(),
1813            move |bar: Bar| {
1814                let mut handler_guard = handler_clone.lock().unwrap();
1815                handler_guard.push(bar);
1816            },
1817            false,
1818            true, // build_with_no_updates
1819            true, // timestamp_on_close - changed to true to verify left-open behavior
1820            BarIntervalType::LeftOpen,
1821            None,
1822            15,
1823        );
1824
1825        // Update in first interval
1826        aggregator.update(
1827            Price::from("100.00"),
1828            Quantity::from(1),
1829            UnixNanos::default(),
1830        );
1831
1832        // First interval close
1833        let ts1 = UnixNanos::from(1_000_000_000);
1834        clock.borrow_mut().set_time(ts1);
1835        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1836        aggregator.build_bar(event);
1837
1838        // Update in second interval
1839        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1840
1841        // Second interval close
1842        let ts2 = UnixNanos::from(2_000_000_000);
1843        clock.borrow_mut().set_time(ts2);
1844        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1845        aggregator.build_bar(event);
1846
1847        let handler_guard = handler.lock().unwrap();
1848        assert_eq!(handler_guard.len(), 2);
1849
1850        let bar1 = &handler_guard[0];
1851        assert_eq!(bar1.ts_event, UnixNanos::default()); // For left-open with timestamp_on_close=true
1852        assert_eq!(bar1.ts_init, ts1);
1853        assert_eq!(bar1.close, Price::from("100.00"));
1854        let bar2 = &handler_guard[1];
1855        assert_eq!(bar2.ts_event, ts1);
1856        assert_eq!(bar2.ts_init, ts2);
1857        assert_eq!(bar2.close, Price::from("101.00"));
1858    }
1859
1860    #[rstest]
1861    fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
1862        let instrument = InstrumentAny::Equity(equity_aapl);
1863        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1864        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1865        let handler = Arc::new(Mutex::new(Vec::new()));
1866        let handler_clone = Arc::clone(&handler);
1867        let clock = Rc::new(RefCell::new(TestClock::new()));
1868        let mut aggregator = TimeBarAggregator::new(
1869            bar_type,
1870            instrument.price_precision(),
1871            instrument.size_precision(),
1872            clock.clone(),
1873            move |bar: Bar| {
1874                let mut handler_guard = handler_clone.lock().unwrap();
1875                handler_guard.push(bar);
1876            },
1877            false,
1878            true, // build_with_no_updates
1879            true, // timestamp_on_close
1880            BarIntervalType::RightOpen,
1881            None,
1882            15,
1883        );
1884
1885        // Update in first interval
1886        aggregator.update(
1887            Price::from("100.00"),
1888            Quantity::from(1),
1889            UnixNanos::default(),
1890        );
1891
1892        // First interval close
1893        let ts1 = UnixNanos::from(1_000_000_000);
1894        clock.borrow_mut().set_time(ts1);
1895        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1896        aggregator.build_bar(event);
1897
1898        // Update in second interval
1899        aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1900
1901        // Second interval close
1902        let ts2 = UnixNanos::from(2_000_000_000);
1903        clock.borrow_mut().set_time(ts2);
1904        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1905        aggregator.build_bar(event);
1906
1907        let handler_guard = handler.lock().unwrap();
1908        assert_eq!(handler_guard.len(), 2);
1909
1910        let bar1 = &handler_guard[0];
1911        assert_eq!(bar1.ts_event, UnixNanos::default()); // Right-open interval starts inclusive
1912        assert_eq!(bar1.ts_init, ts1);
1913        assert_eq!(bar1.close, Price::from("100.00"));
1914
1915        let bar2 = &handler_guard[1];
1916        assert_eq!(bar2.ts_event, ts1);
1917        assert_eq!(bar2.ts_init, ts2);
1918        assert_eq!(bar2.close, Price::from("101.00"));
1919    }
1920
1921    #[rstest]
1922    fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
1923        let instrument = InstrumentAny::Equity(equity_aapl);
1924        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1925        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1926        let handler = Arc::new(Mutex::new(Vec::new()));
1927        let handler_clone = Arc::clone(&handler);
1928        let clock = Rc::new(RefCell::new(TestClock::new()));
1929
1930        // First test with build_with_no_updates = false
1931        let mut aggregator = TimeBarAggregator::new(
1932            bar_type,
1933            instrument.price_precision(),
1934            instrument.size_precision(),
1935            clock.clone(),
1936            move |bar: Bar| {
1937                let mut handler_guard = handler_clone.lock().unwrap();
1938                handler_guard.push(bar);
1939            },
1940            false,
1941            false, // build_with_no_updates disabled
1942            true,  // timestamp_on_close
1943            BarIntervalType::LeftOpen,
1944            None,
1945            15,
1946        );
1947
1948        // No updates, just interval close
1949        let ts1 = UnixNanos::from(1_000_000_000);
1950        clock.borrow_mut().set_time(ts1);
1951        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1952        aggregator.build_bar(event);
1953
1954        let handler_guard = handler.lock().unwrap();
1955        assert_eq!(handler_guard.len(), 0); // No bar should be built without updates
1956        drop(handler_guard);
1957
1958        // Now test with build_with_no_updates = true
1959        let handler = Arc::new(Mutex::new(Vec::new()));
1960        let handler_clone = Arc::clone(&handler);
1961        let mut aggregator = TimeBarAggregator::new(
1962            bar_type,
1963            instrument.price_precision(),
1964            instrument.size_precision(),
1965            clock.clone(),
1966            move |bar: Bar| {
1967                let mut handler_guard = handler_clone.lock().unwrap();
1968                handler_guard.push(bar);
1969            },
1970            false,
1971            true, // build_with_no_updates enabled
1972            true, // timestamp_on_close
1973            BarIntervalType::LeftOpen,
1974            None,
1975            15,
1976        );
1977
1978        aggregator.update(
1979            Price::from("100.00"),
1980            Quantity::from(1),
1981            UnixNanos::default(),
1982        );
1983
1984        // First interval with update
1985        let ts1 = UnixNanos::from(1_000_000_000);
1986        clock.borrow_mut().set_time(ts1);
1987        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1988        aggregator.build_bar(event);
1989
1990        // Second interval without updates
1991        let ts2 = UnixNanos::from(2_000_000_000);
1992        clock.borrow_mut().set_time(ts2);
1993        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1994        aggregator.build_bar(event);
1995
1996        let handler_guard = handler.lock().unwrap();
1997        assert_eq!(handler_guard.len(), 2); // Both bars should be built
1998        let bar1 = &handler_guard[0];
1999        assert_eq!(bar1.close, Price::from("100.00"));
2000        let bar2 = &handler_guard[1];
2001        assert_eq!(bar2.close, Price::from("100.00")); // Should use last close
2002    }
2003
2004    #[rstest]
2005    fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2006        let instrument = InstrumentAny::Equity(equity_aapl);
2007        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2008        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2009        let clock = Rc::new(RefCell::new(TestClock::new()));
2010        let handler = Arc::new(Mutex::new(Vec::new()));
2011        let handler_clone = Arc::clone(&handler);
2012
2013        let mut aggregator = TimeBarAggregator::new(
2014            bar_type,
2015            instrument.price_precision(),
2016            instrument.size_precision(),
2017            clock.clone(),
2018            move |bar: Bar| {
2019                let mut handler_guard = handler_clone.lock().unwrap();
2020                handler_guard.push(bar);
2021            },
2022            false,
2023            true,
2024            true,
2025            BarIntervalType::RightOpen,
2026            None,
2027            15,
2028        );
2029
2030        let ts1 = UnixNanos::from(1_000_000_000);
2031        aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2032
2033        let ts2 = UnixNanos::from(2_000_000_000);
2034        clock.borrow_mut().set_time(ts2);
2035
2036        // Simulate timestamp on close
2037        let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2038        aggregator.build_bar(event);
2039
2040        let handler_guard = handler.lock().unwrap();
2041        let bar = handler_guard.first().unwrap();
2042        assert_eq!(bar.ts_event, UnixNanos::default());
2043        assert_eq!(bar.ts_init, ts2);
2044    }
2045
2046    #[rstest]
2047    fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2048        let instrument = InstrumentAny::Equity(equity_aapl);
2049        let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2050        let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2051        let clock = Rc::new(RefCell::new(TestClock::new()));
2052        let handler = Arc::new(Mutex::new(Vec::new()));
2053        let handler_clone = Arc::clone(&handler);
2054
2055        let mut aggregator = TimeBarAggregator::new(
2056            bar_type,
2057            instrument.price_precision(),
2058            instrument.size_precision(),
2059            clock.clone(),
2060            move |bar: Bar| {
2061                let mut handler_guard = handler_clone.lock().unwrap();
2062                handler_guard.push(bar);
2063            },
2064            false,
2065            true,
2066            true,
2067            BarIntervalType::LeftOpen,
2068            None,
2069            15,
2070        );
2071
2072        let initial_time = clock.borrow().utc_now();
2073        aggregator.start_batch_time(UnixNanos::from(
2074            initial_time.timestamp_nanos_opt().unwrap() as u64
2075        ));
2076
2077        let handler_guard = handler.lock().unwrap();
2078        assert_eq!(handler_guard.len(), 0);
2079    }
2080}