1#![allow(dead_code)]
20#![allow(unused_variables)]
21#![allow(unused_assignments)]
22
23use std::{cell::RefCell, ops::Add, rc::Rc};
24
25use chrono::{DateTime, TimeDelta, TimeZone, Utc};
26use nautilus_common::{
27 clock::Clock,
28 timer::{TimeEvent, TimeEventCallback},
29};
30use nautilus_core::{
31 correctness::{self, FAILED},
32 datetime::{add_n_months, subtract_n_months},
33 UnixNanos,
34};
35use nautilus_model::{
36 data::{
37 bar::{get_bar_interval, get_bar_interval_ns, get_time_bar_start, Bar, BarType},
38 QuoteTick, TradeTick,
39 },
40 enums::{AggregationSource, BarAggregation, BarIntervalType},
41 types::{fixed::FIXED_SCALAR, quantity::QuantityRaw, Price, Quantity},
42};
43
44pub trait BarAggregator {
45 fn bar_type(&self) -> BarType;
47 fn is_running(&self) -> bool;
49 fn set_await_partial(&mut self, value: bool);
50 fn set_is_running(&mut self, value: bool);
51 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
53 fn handle_quote(&mut self, quote: QuoteTick) {
55 let spec = self.bar_type().spec();
56 self.update(
57 quote.extract_price(spec.price_type),
58 quote.extract_size(spec.price_type),
59 quote.ts_event,
60 );
61 }
62 fn handle_trade(&mut self, trade: TradeTick) {
64 self.update(trade.price, trade.size, trade.ts_event);
65 }
66 fn handle_bar(&mut self, bar: Bar) {
67 self.update_bar(bar, bar.volume, bar.ts_init);
68 }
69 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
70 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>);
71 fn stop_batch_update(&mut self);
72}
73
74pub struct BarBuilder {
76 bar_type: BarType,
77 price_precision: u8,
78 size_precision: u8,
79 initialized: bool,
80 ts_last: UnixNanos,
81 count: usize,
82 partial_set: bool,
83 last_close: Option<Price>,
84 open: Option<Price>,
85 high: Option<Price>,
86 low: Option<Price>,
87 close: Option<Price>,
88 volume: Quantity,
89}
90
91impl BarBuilder {
92 #[must_use]
100 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
101 correctness::check_equal(
102 bar_type.aggregation_source(),
103 AggregationSource::Internal,
104 "bar_type.aggregation_source",
105 "AggregationSource::Internal",
106 )
107 .expect(FAILED);
108
109 Self {
110 bar_type,
111 price_precision,
112 size_precision,
113 initialized: false,
114 ts_last: UnixNanos::default(),
115 count: 0,
116 partial_set: false,
117 last_close: None,
118 open: None,
119 high: None,
120 low: None,
121 close: None,
122 volume: Quantity::zero(size_precision),
123 }
124 }
125
126 pub fn set_partial(&mut self, partial_bar: Bar) {
128 if self.partial_set {
129 return; }
131
132 self.open = Some(partial_bar.open);
133
134 if self.high.is_none() || partial_bar.high > self.high.unwrap() {
135 self.high = Some(partial_bar.high);
136 }
137
138 if self.low.is_none() || partial_bar.low < self.low.unwrap() {
139 self.low = Some(partial_bar.low);
140 }
141
142 if self.close.is_none() {
143 self.close = Some(partial_bar.close);
144 }
145
146 self.volume = partial_bar.volume;
147
148 if self.ts_last == 0 {
149 self.ts_last = partial_bar.ts_init;
150 }
151
152 self.partial_set = true;
153 self.initialized = true;
154 }
155
156 pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
158 if ts_event < self.ts_last {
159 return; }
161
162 if self.open.is_none() {
163 self.open = Some(price);
164 self.high = Some(price);
165 self.low = Some(price);
166 self.initialized = true;
167 } else {
168 if price > self.high.unwrap() {
169 self.high = Some(price);
170 }
171 if price < self.low.unwrap() {
172 self.low = Some(price);
173 }
174 }
175
176 self.close = Some(price);
177 self.volume = self.volume.add(size);
178 self.count += 1;
179 self.ts_last = ts_event;
180 }
181
182 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
183 if ts_init < self.ts_last {
184 return; }
186
187 if self.open.is_none() {
188 self.open = Some(bar.open);
189 self.high = Some(bar.high);
190 self.low = Some(bar.low);
191 self.initialized = true;
192 } else {
193 if bar.high > self.high.unwrap() {
194 self.high = Some(bar.high);
195 }
196 if bar.low < self.low.unwrap() {
197 self.low = Some(bar.low);
198 }
199 }
200
201 self.close = Some(bar.close);
202 self.volume = self.volume.add(volume);
203 self.count += 1;
204 self.ts_last = ts_init;
205 }
206
207 pub fn reset(&mut self) {
211 self.open = None;
212 self.high = None;
213 self.low = None;
214 self.volume = Quantity::zero(self.size_precision);
215 self.count = 0;
216 }
217
218 pub fn build_now(&mut self) -> Bar {
220 self.build(self.ts_last, self.ts_last)
221 }
222
223 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
225 if self.open.is_none() {
226 self.open = self.last_close;
227 self.high = self.last_close;
228 self.low = self.last_close;
229 self.close = self.last_close;
230 }
231
232 let bar = Bar::new(
234 self.bar_type,
235 self.open.unwrap(),
236 self.high.unwrap(),
237 self.low.unwrap(),
238 self.close.unwrap(),
239 self.volume,
240 ts_event,
241 ts_init,
242 );
243
244 self.last_close = self.close;
245 self.reset();
246 bar
247 }
248}
249
250pub struct BarAggregatorCore<H>
252where
253 H: FnMut(Bar),
254{
255 bar_type: BarType,
256 builder: BarBuilder,
257 handler: H,
258 handler_backup: Option<H>,
259 batch_handler: Option<Box<dyn FnMut(Bar)>>,
260 await_partial: bool,
261 is_running: bool,
262 batch_mode: bool,
263}
264
265impl<H> BarAggregatorCore<H>
266where
267 H: FnMut(Bar),
268{
269 pub fn new(
277 bar_type: BarType,
278 price_precision: u8,
279 size_precision: u8,
280 handler: H,
281 await_partial: bool,
282 ) -> Self {
283 Self {
284 bar_type,
285 builder: BarBuilder::new(bar_type, price_precision, size_precision),
286 handler,
287 handler_backup: None,
288 batch_handler: None,
289 await_partial,
290 is_running: false,
291 batch_mode: false,
292 }
293 }
294
295 pub const fn set_await_partial(&mut self, value: bool) {
296 self.await_partial = value;
297 }
298
299 pub const fn set_is_running(&mut self, value: bool) {
300 self.is_running = value;
301 }
302
303 pub fn set_partial(&mut self, partial_bar: Bar) {
305 self.builder.set_partial(partial_bar);
306 }
307
308 fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
309 self.builder.update(price, size, ts_event);
310 }
311
312 fn build_now_and_send(&mut self) {
313 let bar = self.builder.build_now();
314 (self.handler)(bar);
315 }
316
317 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
318 let bar = self.builder.build(ts_event, ts_init);
319 if self.batch_mode {
320 if let Some(handler) = &mut self.batch_handler {
321 handler(bar);
322 }
323 } else {
324 (self.handler)(bar);
325 }
326 }
327
328 pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
329 self.batch_mode = true;
330 self.batch_handler = Some(handler);
331 }
332
333 pub fn stop_batch_update(&mut self) {
334 self.batch_mode = false;
335 if let Some(handler) = self.handler_backup.take() {
336 self.handler = handler;
337 }
338 }
339}
340
341pub struct TickBarAggregator<H>
346where
347 H: FnMut(Bar),
348{
349 core: BarAggregatorCore<H>,
350 cum_value: f64,
351}
352
353impl<H> TickBarAggregator<H>
354where
355 H: FnMut(Bar),
356{
357 pub fn new(
365 bar_type: BarType,
366 price_precision: u8,
367 size_precision: u8,
368 handler: H,
369 await_partial: bool,
370 ) -> Self {
371 Self {
372 core: BarAggregatorCore::new(
373 bar_type,
374 price_precision,
375 size_precision,
376 handler,
377 await_partial,
378 ),
379 cum_value: 0.0,
380 }
381 }
382}
383
384impl<H> BarAggregator for TickBarAggregator<H>
385where
386 H: FnMut(Bar),
387{
388 fn bar_type(&self) -> BarType {
389 self.core.bar_type
390 }
391
392 fn is_running(&self) -> bool {
393 self.core.is_running
394 }
395
396 fn set_await_partial(&mut self, value: bool) {
397 self.core.await_partial = value;
398 }
399
400 fn set_is_running(&mut self, value: bool) {
401 self.core.is_running = value;
402 }
403
404 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
406 self.core.apply_update(price, size, ts_event);
407 let spec = self.core.bar_type.spec();
408
409 if self.core.builder.count >= spec.step.get() {
410 self.core.build_now_and_send();
411 }
412 }
413
414 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
415 let mut volume_update = volume;
416 let average_price = Price::new(
417 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
418 self.core.builder.price_precision,
419 );
420
421 while volume_update.as_f64() > 0.0 {
422 let value_update = average_price.as_f64() * volume_update.as_f64();
423 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
424 self.cum_value += value_update;
425 self.core.builder.update_bar(bar, volume_update, ts_init);
426 break;
427 }
428
429 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
430 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
431 self.core.builder.update_bar(
432 bar,
433 Quantity::new(volume_diff, volume_update.precision),
434 ts_init,
435 );
436
437 self.core.build_now_and_send();
438 self.cum_value = 0.0;
439 volume_update = Quantity::new(
440 volume_update.as_f64() - volume_diff,
441 volume_update.precision,
442 );
443 }
444 }
445
446 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
447 self.core.start_batch_update(handler);
448 }
449
450 fn stop_batch_update(&mut self) {
451 self.core.stop_batch_update();
452 }
453}
454
455pub struct VolumeBarAggregator<H>
457where
458 H: FnMut(Bar),
459{
460 core: BarAggregatorCore<H>,
461}
462
463impl<H> VolumeBarAggregator<H>
464where
465 H: FnMut(Bar),
466{
467 pub fn new(
475 bar_type: BarType,
476 price_precision: u8,
477 size_precision: u8,
478 handler: H,
479 await_partial: bool,
480 ) -> Self {
481 Self {
482 core: BarAggregatorCore::new(
483 bar_type,
484 price_precision,
485 size_precision,
486 handler,
487 await_partial,
488 ),
489 }
490 }
491}
492
493impl<H> BarAggregator for VolumeBarAggregator<H>
494where
495 H: FnMut(Bar),
496{
497 fn bar_type(&self) -> BarType {
498 self.core.bar_type
499 }
500
501 fn is_running(&self) -> bool {
502 self.core.is_running
503 }
504
505 fn set_await_partial(&mut self, value: bool) {
506 self.core.await_partial = value;
507 }
508
509 fn set_is_running(&mut self, value: bool) {
510 self.core.is_running = value;
511 }
512
513 #[allow(unused_assignments)] fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
516 let mut raw_size_update = size.raw;
517 let spec = self.core.bar_type.spec();
518 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
519 let mut raw_size_diff = 0;
520
521 while raw_size_update > 0 {
522 if self.core.builder.volume.raw + raw_size_update < raw_step {
523 self.core.apply_update(
524 price,
525 Quantity::from_raw(raw_size_update, size.precision),
526 ts_event,
527 );
528 break;
529 }
530
531 raw_size_diff = raw_step - self.core.builder.volume.raw;
532 self.core.apply_update(
533 price,
534 Quantity::from_raw(raw_size_diff, size.precision),
535 ts_event,
536 );
537
538 self.core.build_now_and_send();
539 raw_size_update -= raw_size_diff;
540 }
541 }
542
543 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
544 let mut raw_volume_update = volume.raw;
545 let spec = self.core.bar_type.spec();
546 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
547 let mut _raw_volume_diff = 0;
548
549 while raw_volume_update > 0 {
550 if self.core.builder.volume.raw + raw_volume_update < raw_step {
551 self.core.builder.update_bar(
552 bar,
553 Quantity::from_raw(raw_volume_update, volume.precision),
554 ts_init,
555 );
556 break;
557 }
558
559 _raw_volume_diff = raw_step - self.core.builder.volume.raw;
560 self.core.builder.update_bar(
561 bar,
562 Quantity::from_raw(_raw_volume_diff, volume.precision),
563 ts_init,
564 );
565
566 self.core.build_now_and_send();
567 raw_volume_update -= _raw_volume_diff;
568 }
569 }
570
571 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
572 self.core.start_batch_update(handler);
573 }
574
575 fn stop_batch_update(&mut self) {
576 self.core.stop_batch_update();
577 }
578}
579
580pub struct ValueBarAggregator<H>
585where
586 H: FnMut(Bar),
587{
588 core: BarAggregatorCore<H>,
589 cum_value: f64,
590}
591
592impl<H> ValueBarAggregator<H>
593where
594 H: FnMut(Bar),
595{
596 pub fn new(
604 bar_type: BarType,
605 price_precision: u8,
606 size_precision: u8,
607 handler: H,
608 await_partial: bool,
609 ) -> Self {
610 Self {
611 core: BarAggregatorCore::new(
612 bar_type,
613 price_precision,
614 size_precision,
615 handler,
616 await_partial,
617 ),
618 cum_value: 0.0,
619 }
620 }
621
622 #[must_use]
623 pub const fn get_cumulative_value(&self) -> f64 {
625 self.cum_value
626 }
627}
628
629impl<H> BarAggregator for ValueBarAggregator<H>
630where
631 H: FnMut(Bar),
632{
633 fn bar_type(&self) -> BarType {
634 self.core.bar_type
635 }
636
637 fn is_running(&self) -> bool {
638 self.core.is_running
639 }
640
641 fn set_await_partial(&mut self, value: bool) {
642 self.core.await_partial = value;
643 }
644
645 fn set_is_running(&mut self, value: bool) {
646 self.core.is_running = value;
647 }
648
649 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
651 let mut size_update = size.as_f64();
652 let spec = self.core.bar_type.spec();
653
654 while size_update > 0.0 {
655 let value_update = price.as_f64() * size_update;
656 if self.cum_value + value_update < spec.step.get() as f64 {
657 self.cum_value += value_update;
658 self.core
659 .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
660 break;
661 }
662
663 let value_diff = spec.step.get() as f64 - self.cum_value;
664 let size_diff = size_update * (value_diff / value_update);
665 self.core
666 .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
667
668 self.core.build_now_and_send();
669 self.cum_value = 0.0;
670 size_update -= size_diff;
671 }
672 }
673
674 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
675 let mut volume_update = volume;
676 let average_price = Price::new(
677 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
678 self.core.builder.price_precision,
679 );
680
681 while volume_update.as_f64() > 0.0 {
682 let value_update = average_price.as_f64() * volume_update.as_f64();
683 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
684 self.cum_value += value_update;
685 self.core.builder.update_bar(bar, volume_update, ts_init);
686 break;
687 }
688
689 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
690 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
691 self.core.builder.update_bar(
692 bar,
693 Quantity::new(volume_diff, volume_update.precision),
694 ts_init,
695 );
696
697 self.core.build_now_and_send();
698 self.cum_value = 0.0;
699 volume_update = Quantity::new(
700 volume_update.as_f64() - volume_diff,
701 volume_update.precision,
702 );
703 }
704 }
705
706 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
707 self.core.start_batch_update(handler);
708 }
709
710 fn stop_batch_update(&mut self) {
711 self.core.stop_batch_update();
712 }
713}
714
715pub struct TimeBarAggregator<H>
719where
720 H: FnMut(Bar),
721{
722 core: BarAggregatorCore<H>,
723 clock: Rc<RefCell<dyn Clock>>,
724 build_with_no_updates: bool,
725 timestamp_on_close: bool,
726 is_left_open: bool,
727 build_on_next_tick: bool,
728 stored_open_ns: UnixNanos,
729 stored_close_ns: UnixNanos,
730 cached_update: Option<(Price, Quantity, u64)>,
731 timer_name: String,
732 interval: TimeDelta,
733 interval_ns: UnixNanos,
734 next_close_ns: UnixNanos,
735 composite_bar_build_delay: i32,
736 add_delay: bool,
737 batch_open_ns: UnixNanos,
738 batch_next_close_ns: UnixNanos,
739 time_bars_origin: Option<DateTime<Utc>>,
740}
741
742#[derive(Clone)]
743pub struct NewBarCallback<H: FnMut(Bar)> {
744 aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
745}
746
747impl<H: FnMut(Bar)> NewBarCallback<H> {
748 pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
749 Self { aggregator }
750 }
751}
752
753impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
754 fn from(value: NewBarCallback<H>) -> Self {
755 Self::Rust(Rc::new(move |event: TimeEvent| {
756 value.aggregator.borrow_mut().build_bar(event);
757 }))
758 }
759}
760
761impl<H> TimeBarAggregator<H>
762where
763 H: FnMut(Bar) + 'static,
764{
765 #[allow(clippy::too_many_arguments)]
773 pub fn new(
774 bar_type: BarType,
775 price_precision: u8,
776 size_precision: u8,
777 clock: Rc<RefCell<dyn Clock>>,
778 handler: H,
779 await_partial: bool,
780 build_with_no_updates: bool,
781 timestamp_on_close: bool,
782 interval_type: BarIntervalType,
783 time_bars_origin: Option<DateTime<Utc>>,
784 composite_bar_build_delay: i32,
785 ) -> Self {
786 let _is_left_open = match interval_type {
787 BarIntervalType::LeftOpen => true,
788 BarIntervalType::RightOpen => false,
789 };
790
791 let add_delay = if bar_type.is_composite() {
793 matches!(
794 bar_type.composite().aggregation_source(),
795 AggregationSource::Internal
796 )
797 } else {
798 false
799 };
800
801 let core = BarAggregatorCore::new(
802 bar_type,
803 price_precision,
804 size_precision,
805 handler,
806 await_partial,
807 );
808
809 Self {
810 core,
811 clock,
812 build_with_no_updates,
813 timestamp_on_close,
814 is_left_open: false,
815 build_on_next_tick: false,
816 stored_open_ns: UnixNanos::default(),
817 stored_close_ns: UnixNanos::default(),
818 cached_update: None,
819 timer_name: bar_type.to_string(),
820 interval: get_bar_interval(&bar_type),
821 interval_ns: get_bar_interval_ns(&bar_type),
822 next_close_ns: UnixNanos::default(),
823 composite_bar_build_delay,
824 add_delay,
825 batch_open_ns: UnixNanos::default(),
826 batch_next_close_ns: UnixNanos::default(),
827 time_bars_origin,
828 }
829 }
830
831 pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
833 let now = self.clock.borrow().utc_now();
834 let start_time = get_time_bar_start(now, &self.bar_type());
835 let start_time_ns = UnixNanos::from(start_time.timestamp_nanos_opt().unwrap() as u64);
836
837 self.clock
838 .borrow_mut()
839 .set_timer_ns(
840 &self.timer_name,
841 self.interval_ns.as_u64(),
842 start_time_ns,
843 None,
844 Some(callback.into()),
845 )
846 .expect(FAILED);
847
848 log::debug!("Started timer {}", self.timer_name);
849 Ok(())
850 }
851
852 pub fn stop(&mut self) {
854 self.clock.borrow_mut().cancel_timer(&self.timer_name);
855 }
856
857 fn batch_pre_update(&mut self, time_ns: UnixNanos) {
858 if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
859 let ts_init = self.batch_next_close_ns;
860
861 let ts_event = if self.is_left_open {
863 if self.timestamp_on_close {
864 self.batch_next_close_ns
865 } else {
866 self.batch_open_ns
867 }
868 } else {
869 self.batch_open_ns
870 };
871
872 self.core.build_and_send(ts_event, ts_init);
873 }
874 }
875
876 fn batch_post_update(&mut self, time_ns: UnixNanos) {
877 if !self.core.batch_mode
879 && time_ns == self.batch_next_close_ns
880 && time_ns > self.stored_open_ns
881 {
882 self.batch_next_close_ns = UnixNanos::default();
883 return;
884 }
885
886 if time_ns > self.batch_next_close_ns {
887 if self.bar_type().spec().aggregation == BarAggregation::Month {
889 } else {
892 while self.batch_next_close_ns < time_ns {
893 self.batch_next_close_ns += self.interval_ns;
894 }
895 self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
896 }
897 }
898
899 if time_ns == self.batch_next_close_ns {
900 let ts_event = if self.is_left_open {
902 if self.timestamp_on_close {
903 self.batch_next_close_ns
904 } else {
905 self.batch_open_ns
906 }
907 } else {
908 self.batch_open_ns
909 };
910
911 self.core.build_and_send(ts_event, time_ns);
912 self.batch_open_ns = self.batch_next_close_ns;
913
914 if self.bar_type().spec().aggregation == BarAggregation::Month {
915 } else {
917 self.batch_next_close_ns += self.interval_ns;
918 }
919 }
920
921 if !self.core.batch_mode {
924 self.batch_next_close_ns = UnixNanos::default();
925 }
926 }
927
928 pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
929 self.core.batch_mode = true;
930
931 let dt = Utc.timestamp_nanos(time_ns.as_u64() as i64);
932 let mut start_dt = get_time_bar_start(dt, &self.bar_type());
933 self.batch_open_ns = UnixNanos::from(
934 start_dt
935 .timestamp_nanos_opt()
936 .expect("Timestamp out of range") as u64,
937 );
938
939 let spec = self.bar_type().spec();
940 let step = spec.step.get() as isize;
941
942 if spec.aggregation == BarAggregation::Month {
943 if self.batch_open_ns == time_ns {
944 start_dt = subtract_n_months(start_dt, step).expect("Failed subtracting months");
945 self.batch_open_ns = UnixNanos::from(
946 start_dt
947 .timestamp_nanos_opt()
948 .expect("Bad month subtraction") as u64,
949 );
950 }
951
952 let next_dt = add_n_months(start_dt, step).expect("Failed adding months");
953 self.batch_next_close_ns =
954 UnixNanos::from(next_dt.timestamp_nanos_opt().expect("Bad month addition") as u64);
955 } else {
956 if self.batch_open_ns == time_ns {
957 self.batch_open_ns = UnixNanos::from(
958 self.batch_open_ns
959 .as_u64()
960 .saturating_sub(self.interval_ns.as_u64()),
961 );
962 }
963
964 self.batch_next_close_ns = UnixNanos::from(
965 self.batch_open_ns
966 .as_u64()
967 .saturating_add(self.interval_ns.as_u64()),
968 );
969 }
970 }
971
972 fn build_bar(&mut self, event: TimeEvent) {
973 if !self.core.builder.initialized {
974 self.build_on_next_tick = true;
975 self.stored_close_ns = self.next_close_ns;
976 return;
977 }
978
979 if !self.build_with_no_updates && self.core.builder.count == 0 {
980 return;
981 }
982
983 let ts_init = event.ts_event;
984 let ts_event = if self.is_left_open {
985 if self.timestamp_on_close {
986 event.ts_event
987 } else {
988 self.stored_open_ns
989 }
990 } else {
991 self.stored_open_ns
992 };
993
994 self.core.build_and_send(ts_event, ts_init);
995 self.stored_open_ns = event.ts_event;
996 self.next_close_ns = self.clock.borrow().next_time_ns(&self.timer_name);
997 }
998}
999
1000impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1001where
1002 H: FnMut(Bar) + 'static,
1003{
1004 fn bar_type(&self) -> BarType {
1005 self.core.bar_type
1006 }
1007
1008 fn is_running(&self) -> bool {
1009 self.core.is_running
1010 }
1011
1012 fn set_await_partial(&mut self, value: bool) {
1013 self.core.await_partial = value;
1014 }
1015
1016 fn set_is_running(&mut self, value: bool) {
1017 self.core.is_running = value;
1018 }
1019
1020 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1021 self.core.apply_update(price, size, ts_event);
1022 if self.build_on_next_tick {
1023 let ts_init = ts_event;
1024
1025 let ts_event = if self.is_left_open {
1026 if self.timestamp_on_close {
1027 self.stored_close_ns
1028 } else {
1029 self.stored_open_ns
1030 }
1031 } else {
1032 self.stored_open_ns
1033 };
1034
1035 self.core.build_and_send(ts_event, ts_init);
1036 self.build_on_next_tick = false;
1037 self.stored_close_ns = UnixNanos::default();
1038 }
1039 }
1040
1041 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1042 if self.batch_next_close_ns != 0 {
1043 self.batch_pre_update(ts_init);
1044 }
1045
1046 self.core.builder.update_bar(bar, volume, ts_init);
1047
1048 if self.build_on_next_tick {
1049 if ts_init <= self.stored_close_ns {
1050 let ts_event = if self.is_left_open {
1051 if self.timestamp_on_close {
1052 self.stored_close_ns
1053 } else {
1054 self.stored_open_ns
1055 }
1056 } else {
1057 self.stored_open_ns
1058 };
1059
1060 self.core.build_and_send(ts_event, ts_init);
1061 }
1062
1063 self.build_on_next_tick = false;
1065 self.stored_close_ns = UnixNanos::default();
1066 }
1067
1068 if self.batch_next_close_ns != 0 {
1069 self.batch_post_update(ts_init);
1070 }
1071 }
1072
1073 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
1074 self.core.start_batch_update(handler);
1075 }
1076
1077 fn stop_batch_update(&mut self) {
1078 self.core.stop_batch_update();
1079 }
1080}
1081
1082#[cfg(test)]
1086mod tests {
1087 use std::sync::{Arc, Mutex};
1088
1089 use nautilus_common::clock::TestClock;
1090 use nautilus_core::UUID4;
1091 use nautilus_model::{
1092 data::{BarSpecification, BarType},
1093 enums::{AggregationSource, BarAggregation, PriceType},
1094 instruments::{stubs::*, CurrencyPair, Equity, InstrumentAny},
1095 types::{Price, Quantity},
1096 };
1097 use rstest::rstest;
1098 use ustr::Ustr;
1099
1100 use super::*;
1101
1102 #[rstest]
1103 fn test_bar_builder_initialization(equity_aapl: Equity) {
1104 let instrument = InstrumentAny::Equity(equity_aapl);
1105 let bar_type = BarType::new(
1106 instrument.id(),
1107 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1108 AggregationSource::Internal,
1109 );
1110 let builder = BarBuilder::new(
1111 bar_type,
1112 instrument.price_precision(),
1113 instrument.size_precision(),
1114 );
1115
1116 assert!(!builder.initialized);
1117 assert_eq!(builder.ts_last, 0);
1118 assert_eq!(builder.count, 0);
1119 }
1120
1121 #[rstest]
1122 fn test_set_partial_update(equity_aapl: Equity) {
1123 let instrument = InstrumentAny::Equity(equity_aapl);
1124 let bar_type = BarType::new(
1125 instrument.id(),
1126 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1127 AggregationSource::Internal,
1128 );
1129 let mut builder = BarBuilder::new(
1130 bar_type,
1131 instrument.price_precision(),
1132 instrument.size_precision(),
1133 );
1134
1135 let partial_bar = Bar::new(
1136 bar_type,
1137 Price::from("101.00"),
1138 Price::from("102.00"),
1139 Price::from("100.00"),
1140 Price::from("101.00"),
1141 Quantity::from(100),
1142 UnixNanos::from(1),
1143 UnixNanos::from(2),
1144 );
1145
1146 builder.set_partial(partial_bar);
1147 let bar = builder.build_now();
1148
1149 assert_eq!(bar.open, partial_bar.open);
1150 assert_eq!(bar.high, partial_bar.high);
1151 assert_eq!(bar.low, partial_bar.low);
1152 assert_eq!(bar.close, partial_bar.close);
1153 assert_eq!(bar.volume, partial_bar.volume);
1154 assert_eq!(builder.ts_last, 2);
1155 }
1156
1157 #[rstest]
1158 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1159 let instrument = InstrumentAny::Equity(equity_aapl);
1160 let bar_type = BarType::new(
1161 instrument.id(),
1162 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1163 AggregationSource::Internal,
1164 );
1165 let mut builder = BarBuilder::new(
1166 bar_type,
1167 instrument.price_precision(),
1168 instrument.size_precision(),
1169 );
1170
1171 builder.update(
1172 Price::from("100.00"),
1173 Quantity::from(1),
1174 UnixNanos::from(1000),
1175 );
1176 builder.update(
1177 Price::from("95.00"),
1178 Quantity::from(1),
1179 UnixNanos::from(2000),
1180 );
1181 builder.update(
1182 Price::from("105.00"),
1183 Quantity::from(1),
1184 UnixNanos::from(3000),
1185 );
1186
1187 let bar = builder.build_now();
1188 assert!(bar.high > bar.low);
1189 assert_eq!(bar.open, Price::from("100.00"));
1190 assert_eq!(bar.high, Price::from("105.00"));
1191 assert_eq!(bar.low, Price::from("95.00"));
1192 assert_eq!(bar.close, Price::from("105.00"));
1193 }
1194
1195 #[rstest]
1196 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1197 let instrument = InstrumentAny::Equity(equity_aapl);
1198 let bar_type = BarType::new(
1199 instrument.id(),
1200 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1201 AggregationSource::Internal,
1202 );
1203 let mut builder = BarBuilder::new(
1204 bar_type,
1205 instrument.price_precision(),
1206 instrument.size_precision(),
1207 );
1208
1209 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1210 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1211
1212 assert_eq!(builder.ts_last, 1_000);
1213 assert_eq!(builder.count, 1);
1214 }
1215
1216 #[rstest]
1217 fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1218 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1219 let bar_type = BarType::new(
1220 instrument.id(),
1221 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1222 AggregationSource::Internal,
1223 );
1224 let mut builder = BarBuilder::new(
1225 bar_type,
1226 instrument.price_precision(),
1227 instrument.size_precision(),
1228 );
1229
1230 let partial_bar = Bar::new(
1231 bar_type,
1232 Price::from("1.00001"),
1233 Price::from("1.00010"),
1234 Price::from("1.00000"),
1235 Price::from("1.00002"),
1236 Quantity::from(1),
1237 UnixNanos::from(1_000_000_000),
1238 UnixNanos::from(2_000_000_000),
1239 );
1240
1241 builder.set_partial(partial_bar);
1242 let bar = builder.build_now();
1243
1244 assert_eq!(bar.open, Price::from("1.00001"));
1245 assert_eq!(bar.high, Price::from("1.00010"));
1246 assert_eq!(bar.low, Price::from("1.00000"));
1247 assert_eq!(bar.close, Price::from("1.00002"));
1248 assert_eq!(bar.volume, Quantity::from(1));
1249 assert_eq!(bar.ts_init, 2_000_000_000);
1250 assert_eq!(builder.ts_last, 2_000_000_000);
1251 }
1252
1253 #[rstest]
1254 fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1255 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1256 let bar_type = BarType::new(
1257 instrument.id(),
1258 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1259 AggregationSource::Internal,
1260 );
1261 let mut builder = BarBuilder::new(
1262 bar_type,
1263 instrument.price_precision(),
1264 instrument.size_precision(),
1265 );
1266
1267 let partial_bar1 = Bar::new(
1268 bar_type,
1269 Price::from("1.00001"),
1270 Price::from("1.00010"),
1271 Price::from("1.00000"),
1272 Price::from("1.00002"),
1273 Quantity::from(1),
1274 UnixNanos::from(1_000_000_000),
1275 UnixNanos::from(1_000_000_000),
1276 );
1277
1278 let partial_bar2 = Bar::new(
1279 bar_type,
1280 Price::from("2.00001"),
1281 Price::from("2.00010"),
1282 Price::from("2.00000"),
1283 Price::from("2.00002"),
1284 Quantity::from(2),
1285 UnixNanos::from(3_000_000_000),
1286 UnixNanos::from(3_000_000_000),
1287 );
1288
1289 builder.set_partial(partial_bar1);
1290 builder.set_partial(partial_bar2);
1291 let bar = builder.build(
1292 UnixNanos::from(4_000_000_000),
1293 UnixNanos::from(4_000_000_000),
1294 );
1295
1296 assert_eq!(bar.open, Price::from("1.00001"));
1297 assert_eq!(bar.high, Price::from("1.00010"));
1298 assert_eq!(bar.low, Price::from("1.00000"));
1299 assert_eq!(bar.close, Price::from("1.00002"));
1300 assert_eq!(bar.volume, Quantity::from(1));
1301 assert_eq!(bar.ts_init, 4_000_000_000);
1302 assert_eq!(builder.ts_last, 1_000_000_000);
1303 }
1304
1305 #[rstest]
1306 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1307 let instrument = InstrumentAny::Equity(equity_aapl);
1308 let bar_type = BarType::new(
1309 instrument.id(),
1310 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1311 AggregationSource::Internal,
1312 );
1313 let mut builder = BarBuilder::new(
1314 bar_type,
1315 instrument.price_precision(),
1316 instrument.size_precision(),
1317 );
1318
1319 builder.update(
1320 Price::from("1.00000"),
1321 Quantity::from(1),
1322 UnixNanos::default(),
1323 );
1324
1325 assert!(builder.initialized);
1326 assert_eq!(builder.ts_last, 0);
1327 assert_eq!(builder.count, 1);
1328 }
1329
1330 #[rstest]
1331 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1332 equity_aapl: Equity,
1333 ) {
1334 let instrument = InstrumentAny::Equity(equity_aapl);
1335 let bar_type = BarType::new(
1336 instrument.id(),
1337 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1338 AggregationSource::Internal,
1339 );
1340 let mut builder = BarBuilder::new(bar_type, 2, 0);
1341
1342 builder.update(
1343 Price::from("1.00000"),
1344 Quantity::from(1),
1345 UnixNanos::from(1_000),
1346 );
1347 builder.update(
1348 Price::from("1.00001"),
1349 Quantity::from(1),
1350 UnixNanos::from(500),
1351 );
1352
1353 assert!(builder.initialized);
1354 assert_eq!(builder.ts_last, 1_000);
1355 assert_eq!(builder.count, 1);
1356 }
1357
1358 #[rstest]
1359 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1360 let instrument = InstrumentAny::Equity(equity_aapl);
1361 let bar_type = BarType::new(
1362 instrument.id(),
1363 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1364 AggregationSource::Internal,
1365 );
1366 let mut builder = BarBuilder::new(
1367 bar_type,
1368 instrument.price_precision(),
1369 instrument.size_precision(),
1370 );
1371
1372 for _ in 0..5 {
1373 builder.update(
1374 Price::from("1.00000"),
1375 Quantity::from(1),
1376 UnixNanos::from(1_000),
1377 );
1378 }
1379
1380 assert_eq!(builder.count, 5);
1381 }
1382
1383 #[rstest]
1384 #[should_panic]
1385 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1386 let instrument = InstrumentAny::Equity(equity_aapl);
1387 let bar_type = BarType::new(
1388 instrument.id(),
1389 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1390 AggregationSource::Internal,
1391 );
1392 let mut builder = BarBuilder::new(
1393 bar_type,
1394 instrument.price_precision(),
1395 instrument.size_precision(),
1396 );
1397 let _ = builder.build_now();
1398 }
1399
1400 #[rstest]
1401 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1402 let instrument = InstrumentAny::Equity(equity_aapl);
1403 let bar_type = BarType::new(
1404 instrument.id(),
1405 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1406 AggregationSource::Internal,
1407 );
1408 let mut builder = BarBuilder::new(
1409 bar_type,
1410 instrument.price_precision(),
1411 instrument.size_precision(),
1412 );
1413
1414 builder.update(
1415 Price::from("1.00001"),
1416 Quantity::from(2),
1417 UnixNanos::default(),
1418 );
1419 builder.update(
1420 Price::from("1.00002"),
1421 Quantity::from(2),
1422 UnixNanos::default(),
1423 );
1424 builder.update(
1425 Price::from("1.00000"),
1426 Quantity::from(1),
1427 UnixNanos::from(1_000_000_000),
1428 );
1429
1430 let bar = builder.build_now();
1431
1432 assert_eq!(bar.open, Price::from("1.00001"));
1433 assert_eq!(bar.high, Price::from("1.00002"));
1434 assert_eq!(bar.low, Price::from("1.00000"));
1435 assert_eq!(bar.close, Price::from("1.00000"));
1436 assert_eq!(bar.volume, Quantity::from(5));
1437 assert_eq!(bar.ts_init, 1_000_000_000);
1438 assert_eq!(builder.ts_last, 1_000_000_000);
1439 assert_eq!(builder.count, 0);
1440 }
1441
1442 #[rstest]
1443 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1444 let instrument = InstrumentAny::Equity(equity_aapl);
1445 let bar_type = BarType::new(
1446 instrument.id(),
1447 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1448 AggregationSource::Internal,
1449 );
1450 let mut builder = BarBuilder::new(bar_type, 2, 0);
1451
1452 builder.update(
1453 Price::from("1.00001"),
1454 Quantity::from(1),
1455 UnixNanos::default(),
1456 );
1457 builder.build_now();
1458
1459 builder.update(
1460 Price::from("1.00000"),
1461 Quantity::from(1),
1462 UnixNanos::default(),
1463 );
1464 builder.update(
1465 Price::from("1.00003"),
1466 Quantity::from(1),
1467 UnixNanos::default(),
1468 );
1469 builder.update(
1470 Price::from("1.00002"),
1471 Quantity::from(1),
1472 UnixNanos::default(),
1473 );
1474
1475 let bar = builder.build_now();
1476
1477 assert_eq!(bar.open, Price::from("1.00000"));
1478 assert_eq!(bar.high, Price::from("1.00003"));
1479 assert_eq!(bar.low, Price::from("1.00000"));
1480 assert_eq!(bar.close, Price::from("1.00002"));
1481 assert_eq!(bar.volume, Quantity::from(3));
1482 }
1483
1484 #[rstest]
1485 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1486 let instrument = InstrumentAny::Equity(equity_aapl);
1487 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1488 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1489 let handler = Arc::new(Mutex::new(Vec::new()));
1490 let handler_clone = Arc::clone(&handler);
1491
1492 let mut aggregator = TickBarAggregator::new(
1493 bar_type,
1494 instrument.price_precision(),
1495 instrument.size_precision(),
1496 move |bar: Bar| {
1497 let mut handler_guard = handler_clone.lock().unwrap();
1498 handler_guard.push(bar);
1499 },
1500 false,
1501 );
1502
1503 let trade = TradeTick::default();
1504 aggregator.handle_trade(trade);
1505
1506 let handler_guard = handler.lock().unwrap();
1507 assert_eq!(handler_guard.len(), 0);
1508 }
1509
1510 #[rstest]
1511 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1512 let instrument = InstrumentAny::Equity(equity_aapl);
1513 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1514 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1515 let handler = Arc::new(Mutex::new(Vec::new()));
1516 let handler_clone = Arc::clone(&handler);
1517
1518 let mut aggregator = TickBarAggregator::new(
1519 bar_type,
1520 instrument.price_precision(),
1521 instrument.size_precision(),
1522 move |bar: Bar| {
1523 let mut handler_guard = handler_clone.lock().unwrap();
1524 handler_guard.push(bar);
1525 },
1526 false,
1527 );
1528
1529 let trade = TradeTick::default();
1530 aggregator.handle_trade(trade);
1531 aggregator.handle_trade(trade);
1532 aggregator.handle_trade(trade);
1533
1534 let handler_guard = handler.lock().unwrap();
1535 let bar = handler_guard.first().unwrap();
1536 assert_eq!(handler_guard.len(), 1);
1537 assert_eq!(bar.open, trade.price);
1538 assert_eq!(bar.high, trade.price);
1539 assert_eq!(bar.low, trade.price);
1540 assert_eq!(bar.close, trade.price);
1541 assert_eq!(bar.volume, Quantity::from(300000));
1542 assert_eq!(bar.ts_event, trade.ts_event);
1543 assert_eq!(bar.ts_init, trade.ts_init);
1544 }
1545
1546 #[rstest]
1547 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1548 let instrument = InstrumentAny::Equity(equity_aapl);
1549 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1550 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1551 let handler = Arc::new(Mutex::new(Vec::new()));
1552 let handler_clone = Arc::clone(&handler);
1553
1554 let mut aggregator = TickBarAggregator::new(
1555 bar_type,
1556 instrument.price_precision(),
1557 instrument.size_precision(),
1558 move |bar: Bar| {
1559 let mut handler_guard = handler_clone.lock().unwrap();
1560 handler_guard.push(bar);
1561 },
1562 false,
1563 );
1564
1565 aggregator.update(
1566 Price::from("1.00001"),
1567 Quantity::from(1),
1568 UnixNanos::default(),
1569 );
1570 aggregator.update(
1571 Price::from("1.00002"),
1572 Quantity::from(1),
1573 UnixNanos::from(1000),
1574 );
1575 aggregator.update(
1576 Price::from("1.00003"),
1577 Quantity::from(1),
1578 UnixNanos::from(2000),
1579 );
1580
1581 let handler_guard = handler.lock().unwrap();
1582 assert_eq!(handler_guard.len(), 1);
1583
1584 let bar = handler_guard.first().unwrap();
1585 assert_eq!(bar.open, Price::from("1.00001"));
1586 assert_eq!(bar.high, Price::from("1.00003"));
1587 assert_eq!(bar.low, Price::from("1.00001"));
1588 assert_eq!(bar.close, Price::from("1.00003"));
1589 assert_eq!(bar.volume, Quantity::from(3));
1590 }
1591
1592 #[rstest]
1593 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1594 let instrument = InstrumentAny::Equity(equity_aapl);
1595 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1596 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1597 let handler = Arc::new(Mutex::new(Vec::new()));
1598 let handler_clone = Arc::clone(&handler);
1599
1600 let mut aggregator = TickBarAggregator::new(
1601 bar_type,
1602 instrument.price_precision(),
1603 instrument.size_precision(),
1604 move |bar: Bar| {
1605 let mut handler_guard = handler_clone.lock().unwrap();
1606 handler_guard.push(bar);
1607 },
1608 false,
1609 );
1610
1611 aggregator.update(
1612 Price::from("1.00001"),
1613 Quantity::from(1),
1614 UnixNanos::default(),
1615 );
1616 aggregator.update(
1617 Price::from("1.00002"),
1618 Quantity::from(1),
1619 UnixNanos::from(1000),
1620 );
1621 aggregator.update(
1622 Price::from("1.00003"),
1623 Quantity::from(1),
1624 UnixNanos::from(2000),
1625 );
1626 aggregator.update(
1627 Price::from("1.00004"),
1628 Quantity::from(1),
1629 UnixNanos::from(3000),
1630 );
1631
1632 let handler_guard = handler.lock().unwrap();
1633 assert_eq!(handler_guard.len(), 2);
1634
1635 let bar1 = &handler_guard[0];
1636 assert_eq!(bar1.open, Price::from("1.00001"));
1637 assert_eq!(bar1.close, Price::from("1.00002"));
1638 assert_eq!(bar1.volume, Quantity::from(2));
1639
1640 let bar2 = &handler_guard[1];
1641 assert_eq!(bar2.open, Price::from("1.00003"));
1642 assert_eq!(bar2.close, Price::from("1.00004"));
1643 assert_eq!(bar2.volume, Quantity::from(2));
1644 }
1645
1646 #[rstest]
1647 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1648 let instrument = InstrumentAny::Equity(equity_aapl);
1649 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1650 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1651 let handler = Arc::new(Mutex::new(Vec::new()));
1652 let handler_clone = Arc::clone(&handler);
1653
1654 let mut aggregator = VolumeBarAggregator::new(
1655 bar_type,
1656 instrument.price_precision(),
1657 instrument.size_precision(),
1658 move |bar: Bar| {
1659 let mut handler_guard = handler_clone.lock().unwrap();
1660 handler_guard.push(bar);
1661 },
1662 false,
1663 );
1664
1665 aggregator.update(
1666 Price::from("1.00001"),
1667 Quantity::from(25),
1668 UnixNanos::default(),
1669 );
1670
1671 let handler_guard = handler.lock().unwrap();
1672 assert_eq!(handler_guard.len(), 2);
1673 let bar1 = &handler_guard[0];
1674 assert_eq!(bar1.volume, Quantity::from(10));
1675 let bar2 = &handler_guard[1];
1676 assert_eq!(bar2.volume, Quantity::from(10));
1677 }
1678
1679 #[rstest]
1680 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1681 let instrument = InstrumentAny::Equity(equity_aapl);
1682 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1684 let handler = Arc::new(Mutex::new(Vec::new()));
1685 let handler_clone = Arc::clone(&handler);
1686
1687 let mut aggregator = ValueBarAggregator::new(
1688 bar_type,
1689 instrument.price_precision(),
1690 instrument.size_precision(),
1691 move |bar: Bar| {
1692 let mut handler_guard = handler_clone.lock().unwrap();
1693 handler_guard.push(bar);
1694 },
1695 false,
1696 );
1697
1698 aggregator.update(
1700 Price::from("100.00"),
1701 Quantity::from(5),
1702 UnixNanos::default(),
1703 );
1704 aggregator.update(
1705 Price::from("100.00"),
1706 Quantity::from(5),
1707 UnixNanos::from(1000),
1708 );
1709
1710 let handler_guard = handler.lock().unwrap();
1711 assert_eq!(handler_guard.len(), 1);
1712 let bar = handler_guard.first().unwrap();
1713 assert_eq!(bar.volume, Quantity::from(10));
1714 }
1715
1716 #[rstest]
1717 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1718 let instrument = InstrumentAny::Equity(equity_aapl);
1719 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1720 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1721 let handler = Arc::new(Mutex::new(Vec::new()));
1722 let handler_clone = Arc::clone(&handler);
1723
1724 let mut aggregator = ValueBarAggregator::new(
1725 bar_type,
1726 instrument.price_precision(),
1727 instrument.size_precision(),
1728 move |bar: Bar| {
1729 let mut handler_guard = handler_clone.lock().unwrap();
1730 handler_guard.push(bar);
1731 },
1732 false,
1733 );
1734
1735 aggregator.update(
1737 Price::from("100.00"),
1738 Quantity::from(25),
1739 UnixNanos::default(),
1740 );
1741
1742 let handler_guard = handler.lock().unwrap();
1743 assert_eq!(handler_guard.len(), 2);
1744 let remaining_value = aggregator.get_cumulative_value();
1745 assert!(remaining_value < 1000.0); }
1747
1748 #[rstest]
1749 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1750 let instrument = InstrumentAny::Equity(equity_aapl);
1751 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1753 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1754 let handler = Arc::new(Mutex::new(Vec::new()));
1755 let handler_clone = Arc::clone(&handler);
1756 let clock = Rc::new(RefCell::new(TestClock::new()));
1757
1758 let mut aggregator = TimeBarAggregator::new(
1759 bar_type,
1760 instrument.price_precision(),
1761 instrument.size_precision(),
1762 clock.clone(),
1763 move |bar: Bar| {
1764 let mut handler_guard = handler_clone.lock().unwrap();
1765 handler_guard.push(bar);
1766 },
1767 false,
1768 true, false, BarIntervalType::LeftOpen,
1771 None, 15, );
1774
1775 aggregator.update(
1776 Price::from("100.00"),
1777 Quantity::from(1),
1778 UnixNanos::default(),
1779 );
1780
1781 let next_sec = UnixNanos::from(1_000_000_000);
1782 clock.borrow_mut().set_time(next_sec);
1783
1784 let event = TimeEvent::new(
1785 Ustr::from("1-SECOND-LAST"),
1786 UUID4::new(),
1787 next_sec,
1788 next_sec,
1789 );
1790 aggregator.build_bar(event);
1791
1792 let handler_guard = handler.lock().unwrap();
1793 assert_eq!(handler_guard.len(), 1);
1794 let bar = handler_guard.first().unwrap();
1795 assert_eq!(bar.ts_event, UnixNanos::default());
1796 assert_eq!(bar.ts_init, next_sec);
1797 }
1798
1799 #[rstest]
1800 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1801 let instrument = InstrumentAny::Equity(equity_aapl);
1802 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1803 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1804 let handler = Arc::new(Mutex::new(Vec::new()));
1805 let handler_clone = Arc::clone(&handler);
1806 let clock = Rc::new(RefCell::new(TestClock::new()));
1807
1808 let mut aggregator = TimeBarAggregator::new(
1809 bar_type,
1810 instrument.price_precision(),
1811 instrument.size_precision(),
1812 clock.clone(),
1813 move |bar: Bar| {
1814 let mut handler_guard = handler_clone.lock().unwrap();
1815 handler_guard.push(bar);
1816 },
1817 false,
1818 true, true, BarIntervalType::LeftOpen,
1821 None,
1822 15,
1823 );
1824
1825 aggregator.update(
1827 Price::from("100.00"),
1828 Quantity::from(1),
1829 UnixNanos::default(),
1830 );
1831
1832 let ts1 = UnixNanos::from(1_000_000_000);
1834 clock.borrow_mut().set_time(ts1);
1835 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1836 aggregator.build_bar(event);
1837
1838 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1840
1841 let ts2 = UnixNanos::from(2_000_000_000);
1843 clock.borrow_mut().set_time(ts2);
1844 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1845 aggregator.build_bar(event);
1846
1847 let handler_guard = handler.lock().unwrap();
1848 assert_eq!(handler_guard.len(), 2);
1849
1850 let bar1 = &handler_guard[0];
1851 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
1853 assert_eq!(bar1.close, Price::from("100.00"));
1854 let bar2 = &handler_guard[1];
1855 assert_eq!(bar2.ts_event, ts1);
1856 assert_eq!(bar2.ts_init, ts2);
1857 assert_eq!(bar2.close, Price::from("101.00"));
1858 }
1859
1860 #[rstest]
1861 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
1862 let instrument = InstrumentAny::Equity(equity_aapl);
1863 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1864 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1865 let handler = Arc::new(Mutex::new(Vec::new()));
1866 let handler_clone = Arc::clone(&handler);
1867 let clock = Rc::new(RefCell::new(TestClock::new()));
1868 let mut aggregator = TimeBarAggregator::new(
1869 bar_type,
1870 instrument.price_precision(),
1871 instrument.size_precision(),
1872 clock.clone(),
1873 move |bar: Bar| {
1874 let mut handler_guard = handler_clone.lock().unwrap();
1875 handler_guard.push(bar);
1876 },
1877 false,
1878 true, true, BarIntervalType::RightOpen,
1881 None,
1882 15,
1883 );
1884
1885 aggregator.update(
1887 Price::from("100.00"),
1888 Quantity::from(1),
1889 UnixNanos::default(),
1890 );
1891
1892 let ts1 = UnixNanos::from(1_000_000_000);
1894 clock.borrow_mut().set_time(ts1);
1895 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1896 aggregator.build_bar(event);
1897
1898 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1900
1901 let ts2 = UnixNanos::from(2_000_000_000);
1903 clock.borrow_mut().set_time(ts2);
1904 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1905 aggregator.build_bar(event);
1906
1907 let handler_guard = handler.lock().unwrap();
1908 assert_eq!(handler_guard.len(), 2);
1909
1910 let bar1 = &handler_guard[0];
1911 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
1913 assert_eq!(bar1.close, Price::from("100.00"));
1914
1915 let bar2 = &handler_guard[1];
1916 assert_eq!(bar2.ts_event, ts1);
1917 assert_eq!(bar2.ts_init, ts2);
1918 assert_eq!(bar2.close, Price::from("101.00"));
1919 }
1920
1921 #[rstest]
1922 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
1923 let instrument = InstrumentAny::Equity(equity_aapl);
1924 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1925 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1926 let handler = Arc::new(Mutex::new(Vec::new()));
1927 let handler_clone = Arc::clone(&handler);
1928 let clock = Rc::new(RefCell::new(TestClock::new()));
1929
1930 let mut aggregator = TimeBarAggregator::new(
1932 bar_type,
1933 instrument.price_precision(),
1934 instrument.size_precision(),
1935 clock.clone(),
1936 move |bar: Bar| {
1937 let mut handler_guard = handler_clone.lock().unwrap();
1938 handler_guard.push(bar);
1939 },
1940 false,
1941 false, true, BarIntervalType::LeftOpen,
1944 None,
1945 15,
1946 );
1947
1948 let ts1 = UnixNanos::from(1_000_000_000);
1950 clock.borrow_mut().set_time(ts1);
1951 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1952 aggregator.build_bar(event);
1953
1954 let handler_guard = handler.lock().unwrap();
1955 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
1957
1958 let handler = Arc::new(Mutex::new(Vec::new()));
1960 let handler_clone = Arc::clone(&handler);
1961 let mut aggregator = TimeBarAggregator::new(
1962 bar_type,
1963 instrument.price_precision(),
1964 instrument.size_precision(),
1965 clock.clone(),
1966 move |bar: Bar| {
1967 let mut handler_guard = handler_clone.lock().unwrap();
1968 handler_guard.push(bar);
1969 },
1970 false,
1971 true, true, BarIntervalType::LeftOpen,
1974 None,
1975 15,
1976 );
1977
1978 aggregator.update(
1979 Price::from("100.00"),
1980 Quantity::from(1),
1981 UnixNanos::default(),
1982 );
1983
1984 let ts1 = UnixNanos::from(1_000_000_000);
1986 clock.borrow_mut().set_time(ts1);
1987 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1988 aggregator.build_bar(event);
1989
1990 let ts2 = UnixNanos::from(2_000_000_000);
1992 clock.borrow_mut().set_time(ts2);
1993 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1994 aggregator.build_bar(event);
1995
1996 let handler_guard = handler.lock().unwrap();
1997 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
1999 assert_eq!(bar1.close, Price::from("100.00"));
2000 let bar2 = &handler_guard[1];
2001 assert_eq!(bar2.close, Price::from("100.00")); }
2003
2004 #[rstest]
2005 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2006 let instrument = InstrumentAny::Equity(equity_aapl);
2007 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2008 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2009 let clock = Rc::new(RefCell::new(TestClock::new()));
2010 let handler = Arc::new(Mutex::new(Vec::new()));
2011 let handler_clone = Arc::clone(&handler);
2012
2013 let mut aggregator = TimeBarAggregator::new(
2014 bar_type,
2015 instrument.price_precision(),
2016 instrument.size_precision(),
2017 clock.clone(),
2018 move |bar: Bar| {
2019 let mut handler_guard = handler_clone.lock().unwrap();
2020 handler_guard.push(bar);
2021 },
2022 false,
2023 true,
2024 true,
2025 BarIntervalType::RightOpen,
2026 None,
2027 15,
2028 );
2029
2030 let ts1 = UnixNanos::from(1_000_000_000);
2031 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2032
2033 let ts2 = UnixNanos::from(2_000_000_000);
2034 clock.borrow_mut().set_time(ts2);
2035
2036 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2038 aggregator.build_bar(event);
2039
2040 let handler_guard = handler.lock().unwrap();
2041 let bar = handler_guard.first().unwrap();
2042 assert_eq!(bar.ts_event, UnixNanos::default());
2043 assert_eq!(bar.ts_init, ts2);
2044 }
2045
2046 #[rstest]
2047 fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2048 let instrument = InstrumentAny::Equity(equity_aapl);
2049 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2050 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2051 let clock = Rc::new(RefCell::new(TestClock::new()));
2052 let handler = Arc::new(Mutex::new(Vec::new()));
2053 let handler_clone = Arc::clone(&handler);
2054
2055 let mut aggregator = TimeBarAggregator::new(
2056 bar_type,
2057 instrument.price_precision(),
2058 instrument.size_precision(),
2059 clock.clone(),
2060 move |bar: Bar| {
2061 let mut handler_guard = handler_clone.lock().unwrap();
2062 handler_guard.push(bar);
2063 },
2064 false,
2065 true,
2066 true,
2067 BarIntervalType::LeftOpen,
2068 None,
2069 15,
2070 );
2071
2072 let initial_time = clock.borrow().utc_now();
2073 aggregator.start_batch_time(UnixNanos::from(
2074 initial_time.timestamp_nanos_opt().unwrap() as u64
2075 ));
2076
2077 let handler_guard = handler.lock().unwrap();
2078 assert_eq!(handler_guard.len(), 0);
2079 }
2080}