1use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25 clock::Clock,
26 timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29 SharedCell, UnixNanos, WeakCell,
30 correctness::{self, FAILED},
31 datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34 data::{
35 QuoteTick, TradeTick,
36 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37 },
38 enums::{AggregationSource, BarAggregation, BarIntervalType},
39 types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
40};
41
42pub trait BarAggregator: Any + Debug {
46 fn bar_type(&self) -> BarType;
48 fn is_running(&self) -> bool;
50 fn set_await_partial(&mut self, value: bool);
51 fn set_is_running(&mut self, value: bool);
53 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
56 fn handle_quote(&mut self, quote: QuoteTick) {
58 let spec = self.bar_type().spec();
59 if !self.await_partial() {
60 self.update(
61 quote.extract_price(spec.price_type),
62 quote.extract_size(spec.price_type),
63 quote.ts_event,
64 );
65 }
66 }
67 fn handle_trade(&mut self, trade: TradeTick) {
69 if !self.await_partial() {
70 self.update(trade.price, trade.size, trade.ts_event);
71 }
72 }
73 fn handle_bar(&mut self, bar: Bar) {
75 if !self.await_partial() {
76 self.update_bar(bar, bar.volume, bar.ts_init);
77 }
78 }
79 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
80 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
82 fn stop_batch_update(&mut self);
84 fn await_partial(&self) -> bool;
86 fn set_partial(&mut self, partial_bar: Bar);
89 fn stop(&mut self) {}
91}
92
93impl dyn BarAggregator {
94 pub fn as_any(&self) -> &dyn Any {
96 self
97 }
98 pub fn as_any_mut(&mut self) -> &mut dyn Any {
100 self
101 }
102}
103
104#[derive(Debug)]
106pub struct BarBuilder {
107 bar_type: BarType,
108 price_precision: u8,
109 size_precision: u8,
110 initialized: bool,
111 ts_last: UnixNanos,
112 count: usize,
113 partial_set: bool,
114 last_close: Option<Price>,
115 open: Option<Price>,
116 high: Option<Price>,
117 low: Option<Price>,
118 close: Option<Price>,
119 volume: Quantity,
120}
121
122impl BarBuilder {
123 #[must_use]
131 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
132 correctness::check_equal(
133 &bar_type.aggregation_source(),
134 &AggregationSource::Internal,
135 "bar_type.aggregation_source",
136 "AggregationSource::Internal",
137 )
138 .expect(FAILED);
139
140 Self {
141 bar_type,
142 price_precision,
143 size_precision,
144 initialized: false,
145 ts_last: UnixNanos::default(),
146 count: 0,
147 partial_set: false,
148 last_close: None,
149 open: None,
150 high: None,
151 low: None,
152 close: None,
153 volume: Quantity::zero(size_precision),
154 }
155 }
156
157 pub fn set_partial(&mut self, partial_bar: Bar) {
163 if self.partial_set {
164 return; }
166
167 self.open = Some(partial_bar.open);
168
169 if self.high.is_none() || partial_bar.high > self.high.unwrap() {
170 self.high = Some(partial_bar.high);
171 }
172
173 if self.low.is_none() || partial_bar.low < self.low.unwrap() {
174 self.low = Some(partial_bar.low);
175 }
176
177 if self.close.is_none() {
178 self.close = Some(partial_bar.close);
179 }
180
181 self.volume = partial_bar.volume;
182
183 if self.ts_last == 0 {
184 self.ts_last = partial_bar.ts_init;
185 }
186
187 self.partial_set = true;
188 self.initialized = true;
189 }
190
191 pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
197 if ts_event < self.ts_last {
198 return; }
200
201 if self.open.is_none() {
202 self.open = Some(price);
203 self.high = Some(price);
204 self.low = Some(price);
205 self.initialized = true;
206 } else {
207 if price > self.high.unwrap() {
208 self.high = Some(price);
209 }
210 if price < self.low.unwrap() {
211 self.low = Some(price);
212 }
213 }
214
215 self.close = Some(price);
216 self.volume = self.volume.add(size);
217 self.count += 1;
218 self.ts_last = ts_event;
219 }
220
221 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
227 if ts_init < self.ts_last {
228 return; }
230
231 if self.open.is_none() {
232 self.open = Some(bar.open);
233 self.high = Some(bar.high);
234 self.low = Some(bar.low);
235 self.initialized = true;
236 } else {
237 if bar.high > self.high.unwrap() {
238 self.high = Some(bar.high);
239 }
240 if bar.low < self.low.unwrap() {
241 self.low = Some(bar.low);
242 }
243 }
244
245 self.close = Some(bar.close);
246 self.volume = self.volume.add(volume);
247 self.count += 1;
248 self.ts_last = ts_init;
249 }
250
251 pub fn reset(&mut self) {
255 self.open = None;
256 self.high = None;
257 self.low = None;
258 self.volume = Quantity::zero(self.size_precision);
259 self.count = 0;
260 }
261
262 pub fn build_now(&mut self) -> Bar {
264 self.build(self.ts_last, self.ts_last)
265 }
266
267 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
273 if self.open.is_none() {
274 self.open = self.last_close;
275 self.high = self.last_close;
276 self.low = self.last_close;
277 self.close = self.last_close;
278 }
279
280 if let (Some(close), Some(low)) = (self.close, self.low)
281 && close < low
282 {
283 self.low = Some(close);
284 }
285
286 if let (Some(close), Some(high)) = (self.close, self.high)
287 && close > high
288 {
289 self.high = Some(close);
290 }
291
292 let bar = Bar::new(
294 self.bar_type,
295 self.open.unwrap(),
296 self.high.unwrap(),
297 self.low.unwrap(),
298 self.close.unwrap(),
299 self.volume,
300 ts_event,
301 ts_init,
302 );
303
304 self.last_close = self.close;
305 self.reset();
306 bar
307 }
308}
309
310pub struct BarAggregatorCore<H>
312where
313 H: FnMut(Bar),
314{
315 bar_type: BarType,
316 builder: BarBuilder,
317 handler: H,
318 handler_backup: Option<H>,
319 batch_handler: Option<Box<dyn FnMut(Bar)>>,
320 await_partial: bool,
321 is_running: bool,
322 batch_mode: bool,
323}
324
325impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 f.debug_struct(stringify!(BarAggregatorCore))
328 .field("bar_type", &self.bar_type)
329 .field("builder", &self.builder)
330 .field("await_partial", &self.await_partial)
331 .field("is_running", &self.is_running)
332 .field("batch_mode", &self.batch_mode)
333 .finish()
334 }
335}
336
337impl<H> BarAggregatorCore<H>
338where
339 H: FnMut(Bar),
340{
341 pub fn new(
349 bar_type: BarType,
350 price_precision: u8,
351 size_precision: u8,
352 handler: H,
353 await_partial: bool,
354 ) -> Self {
355 Self {
356 bar_type,
357 builder: BarBuilder::new(bar_type, price_precision, size_precision),
358 handler,
359 handler_backup: None,
360 batch_handler: None,
361 await_partial,
362 is_running: false,
363 batch_mode: false,
364 }
365 }
366
367 pub const fn set_await_partial(&mut self, value: bool) {
369 self.await_partial = value;
370 }
371
372 pub const fn set_is_running(&mut self, value: bool) {
374 self.is_running = value;
375 }
376
377 pub const fn await_partial(&self) -> bool {
379 self.await_partial
380 }
381
382 pub fn set_partial(&mut self, partial_bar: Bar) {
384 self.builder.set_partial(partial_bar);
385 }
386
387 fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
388 self.builder.update(price, size, ts_event);
389 }
390
391 fn build_now_and_send(&mut self) {
392 let bar = self.builder.build_now();
393 (self.handler)(bar);
394 }
395
396 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
397 let bar = self.builder.build(ts_event, ts_init);
398
399 if self.batch_mode {
400 if let Some(handler) = &mut self.batch_handler {
401 handler(bar);
402 }
403 } else {
404 (self.handler)(bar);
405 }
406 }
407
408 pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
410 self.batch_mode = true;
411 self.batch_handler = Some(handler);
412 }
413
414 pub fn stop_batch_update(&mut self) {
416 self.batch_mode = false;
417
418 if let Some(handler) = self.handler_backup.take() {
419 self.handler = handler;
420 }
421 }
422}
423
424pub struct TickBarAggregator<H>
429where
430 H: FnMut(Bar),
431{
432 core: BarAggregatorCore<H>,
433 cum_value: f64,
434}
435
436impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 f.debug_struct(stringify!(TickBarAggregator))
439 .field("core", &self.core)
440 .field("cum_value", &self.cum_value)
441 .finish()
442 }
443}
444
445impl<H> TickBarAggregator<H>
446where
447 H: FnMut(Bar),
448{
449 pub fn new(
457 bar_type: BarType,
458 price_precision: u8,
459 size_precision: u8,
460 handler: H,
461 await_partial: bool,
462 ) -> Self {
463 Self {
464 core: BarAggregatorCore::new(
465 bar_type,
466 price_precision,
467 size_precision,
468 handler,
469 await_partial,
470 ),
471 cum_value: 0.0,
472 }
473 }
474}
475
476impl<H> BarAggregator for TickBarAggregator<H>
477where
478 H: FnMut(Bar) + 'static,
479{
480 fn bar_type(&self) -> BarType {
481 self.core.bar_type
482 }
483
484 fn is_running(&self) -> bool {
485 self.core.is_running
486 }
487
488 fn set_await_partial(&mut self, value: bool) {
489 self.core.set_await_partial(value);
490 }
491
492 fn set_is_running(&mut self, value: bool) {
493 self.core.set_is_running(value);
494 }
495
496 fn await_partial(&self) -> bool {
497 self.core.await_partial()
498 }
499
500 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
502 self.core.apply_update(price, size, ts_event);
503 let spec = self.core.bar_type.spec();
504
505 if self.core.builder.count >= spec.step.get() {
506 self.core.build_now_and_send();
507 }
508 }
509
510 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
511 let mut volume_update = volume;
512 let average_price = Price::new(
513 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
514 self.core.builder.price_precision,
515 );
516
517 while volume_update.as_f64() > 0.0 {
518 let value_update = average_price.as_f64() * volume_update.as_f64();
519 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
520 self.cum_value += value_update;
521 self.core.builder.update_bar(bar, volume_update, ts_init);
522 break;
523 }
524
525 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
526 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
527 self.core.builder.update_bar(
528 bar,
529 Quantity::new(volume_diff, volume_update.precision),
530 ts_init,
531 );
532
533 self.core.build_now_and_send();
534 self.cum_value = 0.0;
535 volume_update = Quantity::new(
536 volume_update.as_f64() - volume_diff,
537 volume_update.precision,
538 );
539 }
540 }
541
542 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
543 self.core.start_batch_update(handler);
544 }
545
546 fn stop_batch_update(&mut self) {
547 self.core.stop_batch_update();
548 }
549
550 fn set_partial(&mut self, partial_bar: Bar) {
551 self.core.set_partial(partial_bar);
552 }
553}
554
555pub struct VolumeBarAggregator<H>
557where
558 H: FnMut(Bar),
559{
560 core: BarAggregatorCore<H>,
561}
562
563impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
564 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
565 f.debug_struct(stringify!(VolumeBarAggregator))
566 .field("core", &self.core)
567 .finish()
568 }
569}
570
571impl<H> VolumeBarAggregator<H>
572where
573 H: FnMut(Bar),
574{
575 pub fn new(
583 bar_type: BarType,
584 price_precision: u8,
585 size_precision: u8,
586 handler: H,
587 await_partial: bool,
588 ) -> Self {
589 Self {
590 core: BarAggregatorCore::new(
591 bar_type.standard(),
592 price_precision,
593 size_precision,
594 handler,
595 await_partial,
596 ),
597 }
598 }
599}
600
601impl<H> BarAggregator for VolumeBarAggregator<H>
602where
603 H: FnMut(Bar) + 'static,
604{
605 fn bar_type(&self) -> BarType {
606 self.core.bar_type
607 }
608
609 fn is_running(&self) -> bool {
610 self.core.is_running
611 }
612
613 fn set_await_partial(&mut self, value: bool) {
614 self.core.set_await_partial(value);
615 }
616
617 fn set_is_running(&mut self, value: bool) {
618 self.core.set_is_running(value);
619 }
620
621 fn await_partial(&self) -> bool {
622 self.core.await_partial()
623 }
624
625 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
627 let mut raw_size_update = size.raw;
628 let spec = self.core.bar_type.spec();
629 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
630
631 while raw_size_update > 0 {
632 if self.core.builder.volume.raw + raw_size_update < raw_step {
633 self.core.apply_update(
634 price,
635 Quantity::from_raw(raw_size_update, size.precision),
636 ts_event,
637 );
638 break;
639 }
640
641 let raw_size_diff = raw_step - self.core.builder.volume.raw;
642 self.core.apply_update(
643 price,
644 Quantity::from_raw(raw_size_diff, size.precision),
645 ts_event,
646 );
647
648 self.core.build_now_and_send();
649 raw_size_update -= raw_size_diff;
650 }
651 }
652
653 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
654 let mut raw_volume_update = volume.raw;
655 let spec = self.core.bar_type.spec();
656 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
657
658 while raw_volume_update > 0 {
659 if self.core.builder.volume.raw + raw_volume_update < raw_step {
660 self.core.builder.update_bar(
661 bar,
662 Quantity::from_raw(raw_volume_update, volume.precision),
663 ts_init,
664 );
665 break;
666 }
667
668 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
669 self.core.builder.update_bar(
670 bar,
671 Quantity::from_raw(raw_volume_diff, volume.precision),
672 ts_init,
673 );
674
675 self.core.build_now_and_send();
676 raw_volume_update -= raw_volume_diff;
677 }
678 }
679
680 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
681 self.core.start_batch_update(handler);
682 }
683
684 fn stop_batch_update(&mut self) {
685 self.core.stop_batch_update();
686 }
687
688 fn set_partial(&mut self, partial_bar: Bar) {
689 self.core.set_partial(partial_bar);
690 }
691}
692
693pub struct ValueBarAggregator<H>
698where
699 H: FnMut(Bar),
700{
701 core: BarAggregatorCore<H>,
702 cum_value: f64,
703}
704
705impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707 f.debug_struct(stringify!(ValueBarAggregator))
708 .field("core", &self.core)
709 .field("cum_value", &self.cum_value)
710 .finish()
711 }
712}
713
714impl<H> ValueBarAggregator<H>
715where
716 H: FnMut(Bar),
717{
718 pub fn new(
726 bar_type: BarType,
727 price_precision: u8,
728 size_precision: u8,
729 handler: H,
730 await_partial: bool,
731 ) -> Self {
732 Self {
733 core: BarAggregatorCore::new(
734 bar_type.standard(),
735 price_precision,
736 size_precision,
737 handler,
738 await_partial,
739 ),
740 cum_value: 0.0,
741 }
742 }
743
744 #[must_use]
745 pub const fn get_cumulative_value(&self) -> f64 {
747 self.cum_value
748 }
749}
750
751impl<H> BarAggregator for ValueBarAggregator<H>
752where
753 H: FnMut(Bar) + 'static,
754{
755 fn bar_type(&self) -> BarType {
756 self.core.bar_type
757 }
758
759 fn is_running(&self) -> bool {
760 self.core.is_running
761 }
762
763 fn set_await_partial(&mut self, value: bool) {
764 self.core.set_await_partial(value);
765 }
766
767 fn set_is_running(&mut self, value: bool) {
768 self.core.set_is_running(value);
769 }
770
771 fn await_partial(&self) -> bool {
772 self.core.await_partial()
773 }
774
775 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
777 let mut size_update = size.as_f64();
778 let spec = self.core.bar_type.spec();
779
780 while size_update > 0.0 {
781 let value_update = price.as_f64() * size_update;
782 if self.cum_value + value_update < spec.step.get() as f64 {
783 self.cum_value += value_update;
784 self.core
785 .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
786 break;
787 }
788
789 let value_diff = spec.step.get() as f64 - self.cum_value;
790 let size_diff = size_update * (value_diff / value_update);
791 self.core
792 .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
793
794 self.core.build_now_and_send();
795 self.cum_value = 0.0;
796 size_update -= size_diff;
797 }
798 }
799
800 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
801 let mut volume_update = volume;
802 let average_price = Price::new(
803 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
804 self.core.builder.price_precision,
805 );
806
807 while volume_update.as_f64() > 0.0 {
808 let value_update = average_price.as_f64() * volume_update.as_f64();
809 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
810 self.cum_value += value_update;
811 self.core.builder.update_bar(bar, volume_update, ts_init);
812 break;
813 }
814
815 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
816 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
817 self.core.builder.update_bar(
818 bar,
819 Quantity::new(volume_diff, volume_update.precision),
820 ts_init,
821 );
822
823 self.core.build_now_and_send();
824 self.cum_value = 0.0;
825 volume_update = Quantity::new(
826 volume_update.as_f64() - volume_diff,
827 volume_update.precision,
828 );
829 }
830 }
831
832 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
833 self.core.start_batch_update(handler);
834 }
835
836 fn stop_batch_update(&mut self) {
837 self.core.stop_batch_update();
838 }
839
840 fn set_partial(&mut self, partial_bar: Bar) {
841 self.core.set_partial(partial_bar);
842 }
843}
844
845pub struct TimeBarAggregator<H>
849where
850 H: FnMut(Bar),
851{
852 core: BarAggregatorCore<H>,
853 clock: Rc<RefCell<dyn Clock>>,
854 build_with_no_updates: bool,
855 timestamp_on_close: bool,
856 is_left_open: bool,
857 build_on_next_tick: bool,
858 stored_open_ns: UnixNanos,
859 stored_close_ns: UnixNanos,
860 timer_name: String,
861 interval_ns: UnixNanos,
862 next_close_ns: UnixNanos,
863 bar_build_delay: u64,
864 batch_open_ns: UnixNanos,
865 batch_next_close_ns: UnixNanos,
866 time_bars_origin_offset: Option<TimeDelta>,
867 skip_first_non_full_bar: bool,
868}
869
870impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
871 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872 f.debug_struct(stringify!(TimeBarAggregator))
873 .field("core", &self.core)
874 .field("build_with_no_updates", &self.build_with_no_updates)
875 .field("timestamp_on_close", &self.timestamp_on_close)
876 .field("is_left_open", &self.is_left_open)
877 .field("timer_name", &self.timer_name)
878 .field("interval_ns", &self.interval_ns)
879 .field("bar_build_delay", &self.bar_build_delay)
880 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
881 .finish()
882 }
883}
884
885#[derive(Clone, Debug)]
886pub struct NewBarCallback<H: FnMut(Bar)> {
893 aggregator: WeakCell<TimeBarAggregator<H>>,
894}
895
896impl<H: FnMut(Bar)> NewBarCallback<H> {
897 #[must_use]
899 pub fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
900 let shared: SharedCell<TimeBarAggregator<H>> = SharedCell::from(aggregator);
901 Self {
902 aggregator: shared.downgrade(),
903 }
904 }
905}
906
907impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
908 fn from(value: NewBarCallback<H>) -> Self {
909 Self::Rust(Rc::new(move |event: TimeEvent| {
910 if let Some(agg) = value.aggregator.upgrade() {
911 agg.borrow_mut().build_bar(event);
912 }
913 }))
914 }
915}
916
917impl<H> TimeBarAggregator<H>
918where
919 H: FnMut(Bar) + 'static,
920{
921 #[allow(clippy::too_many_arguments)]
929 pub fn new(
930 bar_type: BarType,
931 price_precision: u8,
932 size_precision: u8,
933 clock: Rc<RefCell<dyn Clock>>,
934 handler: H,
935 await_partial: bool,
936 build_with_no_updates: bool,
937 timestamp_on_close: bool,
938 interval_type: BarIntervalType,
939 time_bars_origin_offset: Option<TimeDelta>,
940 bar_build_delay: u64,
941 skip_first_non_full_bar: bool,
942 ) -> Self {
943 let is_left_open = match interval_type {
944 BarIntervalType::LeftOpen => true,
945 BarIntervalType::RightOpen => false,
946 };
947
948 let core = BarAggregatorCore::new(
949 bar_type.standard(),
950 price_precision,
951 size_precision,
952 handler,
953 await_partial,
954 );
955
956 Self {
957 core,
958 clock,
959 build_with_no_updates,
960 timestamp_on_close,
961 is_left_open,
962 build_on_next_tick: false,
963 stored_open_ns: UnixNanos::default(),
964 stored_close_ns: UnixNanos::default(),
965 timer_name: bar_type.to_string(),
966 interval_ns: get_bar_interval_ns(&bar_type),
967 next_close_ns: UnixNanos::default(),
968 bar_build_delay,
969 batch_open_ns: UnixNanos::default(),
970 batch_next_close_ns: UnixNanos::default(),
971 time_bars_origin_offset,
972 skip_first_non_full_bar,
973 }
974 }
975
976 pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
986 let now = self.clock.borrow().utc_now();
987 let mut start_time =
988 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
989
990 if start_time == now {
991 self.skip_first_non_full_bar = false;
992 }
993
994 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
995
996 let spec = &self.bar_type().spec();
997 let start_time_ns = UnixNanos::from(start_time);
998
999 if spec.aggregation == BarAggregation::Month {
1000 let step = spec.step.get() as u32;
1001 let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
1002
1003 self.clock
1004 .borrow_mut()
1005 .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1006 .expect(FAILED);
1007 } else {
1008 self.clock
1009 .borrow_mut()
1010 .set_timer_ns(
1011 &self.timer_name,
1012 self.interval_ns.as_u64(),
1013 Some(start_time_ns),
1014 None,
1015 Some(callback.into()),
1016 None,
1017 None,
1018 )
1019 .expect(FAILED);
1020 }
1021
1022 log::debug!("Started timer {}", self.timer_name);
1023 Ok(())
1024 }
1025
1026 pub fn stop(&mut self) {
1028 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1029 }
1030
1031 pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1037 let spec = self.bar_type().spec();
1038 self.core.batch_mode = true;
1039
1040 let time = time_ns.to_datetime_utc();
1041 let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
1042 self.batch_open_ns = UnixNanos::from(start_time);
1043
1044 if spec.aggregation == BarAggregation::Month {
1045 let step = spec.step.get() as u32;
1046
1047 if self.batch_open_ns == time_ns {
1048 self.batch_open_ns =
1049 subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1050 }
1051
1052 self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1053 } else {
1054 if self.batch_open_ns == time_ns {
1055 self.batch_open_ns -= self.interval_ns;
1056 }
1057
1058 self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1059 }
1060 }
1061
1062 const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1063 if self.is_left_open {
1064 if self.timestamp_on_close {
1065 close_ns
1066 } else {
1067 open_ns
1068 }
1069 } else {
1070 open_ns
1071 }
1072 }
1073
1074 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1075 if self.skip_first_non_full_bar {
1076 self.core.builder.reset();
1077 self.skip_first_non_full_bar = false;
1078 } else {
1079 self.core.build_and_send(ts_event, ts_init);
1080 }
1081 }
1082
1083 fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1084 if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1085 let ts_init = self.batch_next_close_ns;
1086 let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1087 self.build_and_send(ts_event, ts_init);
1088 }
1089 }
1090
1091 fn batch_post_update(&mut self, time_ns: UnixNanos) {
1092 let step = self.bar_type().spec().step.get() as u32;
1093
1094 if !self.core.batch_mode
1096 && time_ns == self.batch_next_close_ns
1097 && time_ns > self.stored_open_ns
1098 {
1099 self.batch_next_close_ns = UnixNanos::default();
1100 return;
1101 }
1102
1103 if time_ns > self.batch_next_close_ns {
1104 if self.bar_type().spec().aggregation == BarAggregation::Month {
1106 while self.batch_next_close_ns < time_ns {
1107 self.batch_next_close_ns =
1108 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1109 }
1110
1111 self.batch_open_ns =
1112 subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1113 } else {
1114 while self.batch_next_close_ns < time_ns {
1115 self.batch_next_close_ns += self.interval_ns;
1116 }
1117
1118 self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1119 }
1120 }
1121
1122 if time_ns == self.batch_next_close_ns {
1123 let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1124 self.build_and_send(ts_event, time_ns);
1125 self.batch_open_ns = self.batch_next_close_ns;
1126
1127 if self.bar_type().spec().aggregation == BarAggregation::Month {
1128 self.batch_next_close_ns =
1129 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1130 } else {
1131 self.batch_next_close_ns += self.interval_ns;
1132 }
1133 }
1134
1135 if !self.core.batch_mode {
1137 self.batch_next_close_ns = UnixNanos::default();
1138 }
1139 }
1140
1141 fn build_bar(&mut self, event: TimeEvent) {
1142 if !self.core.builder.initialized {
1143 self.build_on_next_tick = true;
1144 self.stored_close_ns = self.next_close_ns;
1145 return;
1146 }
1147
1148 if !self.build_with_no_updates && self.core.builder.count == 0 {
1149 return;
1150 }
1151
1152 let ts_init = event.ts_event;
1153 let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1154 self.build_and_send(ts_event, ts_init);
1155
1156 self.stored_open_ns = ts_init;
1157
1158 if self.bar_type().spec().aggregation == BarAggregation::Month {
1159 let step = self.bar_type().spec().step.get() as u32;
1160 let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1161
1162 self.clock
1163 .borrow_mut()
1164 .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1165 .expect(FAILED);
1166
1167 self.next_close_ns = next_alert_ns;
1168 } else {
1169 self.next_close_ns = self
1170 .clock
1171 .borrow()
1172 .next_time_ns(&self.timer_name)
1173 .unwrap_or_default();
1174 }
1175 }
1176}
1177
1178impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1179where
1180 H: FnMut(Bar) + 'static,
1181{
1182 fn bar_type(&self) -> BarType {
1183 self.core.bar_type
1184 }
1185
1186 fn is_running(&self) -> bool {
1187 self.core.is_running
1188 }
1189
1190 fn set_await_partial(&mut self, value: bool) {
1191 self.core.set_await_partial(value);
1192 }
1193
1194 fn set_is_running(&mut self, value: bool) {
1195 self.core.set_is_running(value);
1196 }
1197
1198 fn await_partial(&self) -> bool {
1199 self.core.await_partial()
1200 }
1201 fn stop(&mut self) {
1203 Self::stop(self);
1204 }
1205
1206 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1207 if self.batch_next_close_ns != UnixNanos::default() {
1208 self.batch_pre_update(ts_event);
1209 }
1210
1211 self.core.apply_update(price, size, ts_event);
1212
1213 if self.build_on_next_tick {
1214 if ts_event <= self.stored_close_ns {
1215 let ts_init = ts_event;
1216 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1217 self.build_and_send(ts_event, ts_init);
1218 }
1219
1220 self.build_on_next_tick = false;
1221 self.stored_close_ns = UnixNanos::default();
1222 }
1223
1224 if self.batch_next_close_ns != UnixNanos::default() {
1225 self.batch_post_update(ts_event);
1226 }
1227 }
1228
1229 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1230 if self.batch_next_close_ns != UnixNanos::default() {
1231 self.batch_pre_update(ts_init);
1232 }
1233
1234 self.core.builder.update_bar(bar, volume, ts_init);
1235
1236 if self.build_on_next_tick {
1237 if ts_init <= self.stored_close_ns {
1238 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1239 self.build_and_send(ts_event, ts_init);
1240 }
1241
1242 self.build_on_next_tick = false;
1244 self.stored_close_ns = UnixNanos::default();
1245 }
1246
1247 if self.batch_next_close_ns != UnixNanos::default() {
1248 self.batch_post_update(ts_init);
1249 }
1250 }
1251
1252 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1253 self.core.start_batch_update(handler);
1254 self.start_batch_time(time_ns);
1255 }
1256
1257 fn stop_batch_update(&mut self) {
1258 self.core.stop_batch_update();
1259 }
1260
1261 fn set_partial(&mut self, partial_bar: Bar) {
1262 self.core.set_partial(partial_bar);
1263 }
1264}
1265
1266#[cfg(test)]
1270mod tests {
1271 use std::sync::{Arc, Mutex};
1272
1273 use nautilus_common::clock::TestClock;
1274 use nautilus_core::UUID4;
1275 use nautilus_model::{
1276 data::{BarSpecification, BarType},
1277 enums::{AggregationSource, BarAggregation, PriceType},
1278 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1279 types::{Price, Quantity},
1280 };
1281 use rstest::rstest;
1282 use ustr::Ustr;
1283
1284 use super::*;
1285
1286 #[rstest]
1287 fn test_bar_builder_initialization(equity_aapl: Equity) {
1288 let instrument = InstrumentAny::Equity(equity_aapl);
1289 let bar_type = BarType::new(
1290 instrument.id(),
1291 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1292 AggregationSource::Internal,
1293 );
1294 let builder = BarBuilder::new(
1295 bar_type,
1296 instrument.price_precision(),
1297 instrument.size_precision(),
1298 );
1299
1300 assert!(!builder.initialized);
1301 assert_eq!(builder.ts_last, 0);
1302 assert_eq!(builder.count, 0);
1303 }
1304
1305 #[rstest]
1306 fn test_set_partial_update(equity_aapl: Equity) {
1307 let instrument = InstrumentAny::Equity(equity_aapl);
1308 let bar_type = BarType::new(
1309 instrument.id(),
1310 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1311 AggregationSource::Internal,
1312 );
1313 let mut builder = BarBuilder::new(
1314 bar_type,
1315 instrument.price_precision(),
1316 instrument.size_precision(),
1317 );
1318
1319 let partial_bar = Bar::new(
1320 bar_type,
1321 Price::from("101.00"),
1322 Price::from("102.00"),
1323 Price::from("100.00"),
1324 Price::from("101.00"),
1325 Quantity::from(100),
1326 UnixNanos::from(1),
1327 UnixNanos::from(2),
1328 );
1329
1330 builder.set_partial(partial_bar);
1331 let bar = builder.build_now();
1332
1333 assert_eq!(bar.open, partial_bar.open);
1334 assert_eq!(bar.high, partial_bar.high);
1335 assert_eq!(bar.low, partial_bar.low);
1336 assert_eq!(bar.close, partial_bar.close);
1337 assert_eq!(bar.volume, partial_bar.volume);
1338 assert_eq!(builder.ts_last, 2);
1339 }
1340
1341 #[rstest]
1342 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1343 let instrument = InstrumentAny::Equity(equity_aapl);
1344 let bar_type = BarType::new(
1345 instrument.id(),
1346 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1347 AggregationSource::Internal,
1348 );
1349 let mut builder = BarBuilder::new(
1350 bar_type,
1351 instrument.price_precision(),
1352 instrument.size_precision(),
1353 );
1354
1355 builder.update(
1356 Price::from("100.00"),
1357 Quantity::from(1),
1358 UnixNanos::from(1000),
1359 );
1360 builder.update(
1361 Price::from("95.00"),
1362 Quantity::from(1),
1363 UnixNanos::from(2000),
1364 );
1365 builder.update(
1366 Price::from("105.00"),
1367 Quantity::from(1),
1368 UnixNanos::from(3000),
1369 );
1370
1371 let bar = builder.build_now();
1372 assert!(bar.high > bar.low);
1373 assert_eq!(bar.open, Price::from("100.00"));
1374 assert_eq!(bar.high, Price::from("105.00"));
1375 assert_eq!(bar.low, Price::from("95.00"));
1376 assert_eq!(bar.close, Price::from("105.00"));
1377 }
1378
1379 #[rstest]
1380 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1381 let instrument = InstrumentAny::Equity(equity_aapl);
1382 let bar_type = BarType::new(
1383 instrument.id(),
1384 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1385 AggregationSource::Internal,
1386 );
1387 let mut builder = BarBuilder::new(
1388 bar_type,
1389 instrument.price_precision(),
1390 instrument.size_precision(),
1391 );
1392
1393 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1394 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1395
1396 assert_eq!(builder.ts_last, 1_000);
1397 assert_eq!(builder.count, 1);
1398 }
1399
1400 #[rstest]
1401 fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1402 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1403 let bar_type = BarType::new(
1404 instrument.id(),
1405 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1406 AggregationSource::Internal,
1407 );
1408 let mut builder = BarBuilder::new(
1409 bar_type,
1410 instrument.price_precision(),
1411 instrument.size_precision(),
1412 );
1413
1414 let partial_bar = Bar::new(
1415 bar_type,
1416 Price::from("1.00001"),
1417 Price::from("1.00010"),
1418 Price::from("1.00000"),
1419 Price::from("1.00002"),
1420 Quantity::from(1),
1421 UnixNanos::from(1_000_000_000),
1422 UnixNanos::from(2_000_000_000),
1423 );
1424
1425 builder.set_partial(partial_bar);
1426 let bar = builder.build_now();
1427
1428 assert_eq!(bar.open, Price::from("1.00001"));
1429 assert_eq!(bar.high, Price::from("1.00010"));
1430 assert_eq!(bar.low, Price::from("1.00000"));
1431 assert_eq!(bar.close, Price::from("1.00002"));
1432 assert_eq!(bar.volume, Quantity::from(1));
1433 assert_eq!(bar.ts_init, 2_000_000_000);
1434 assert_eq!(builder.ts_last, 2_000_000_000);
1435 }
1436
1437 #[rstest]
1438 fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1439 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1440 let bar_type = BarType::new(
1441 instrument.id(),
1442 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1443 AggregationSource::Internal,
1444 );
1445 let mut builder = BarBuilder::new(
1446 bar_type,
1447 instrument.price_precision(),
1448 instrument.size_precision(),
1449 );
1450
1451 let partial_bar1 = Bar::new(
1452 bar_type,
1453 Price::from("1.00001"),
1454 Price::from("1.00010"),
1455 Price::from("1.00000"),
1456 Price::from("1.00002"),
1457 Quantity::from(1),
1458 UnixNanos::from(1_000_000_000),
1459 UnixNanos::from(1_000_000_000),
1460 );
1461
1462 let partial_bar2 = Bar::new(
1463 bar_type,
1464 Price::from("2.00001"),
1465 Price::from("2.00010"),
1466 Price::from("2.00000"),
1467 Price::from("2.00002"),
1468 Quantity::from(2),
1469 UnixNanos::from(3_000_000_000),
1470 UnixNanos::from(3_000_000_000),
1471 );
1472
1473 builder.set_partial(partial_bar1);
1474 builder.set_partial(partial_bar2);
1475 let bar = builder.build(
1476 UnixNanos::from(4_000_000_000),
1477 UnixNanos::from(4_000_000_000),
1478 );
1479
1480 assert_eq!(bar.open, Price::from("1.00001"));
1481 assert_eq!(bar.high, Price::from("1.00010"));
1482 assert_eq!(bar.low, Price::from("1.00000"));
1483 assert_eq!(bar.close, Price::from("1.00002"));
1484 assert_eq!(bar.volume, Quantity::from(1));
1485 assert_eq!(bar.ts_init, 4_000_000_000);
1486 assert_eq!(builder.ts_last, 1_000_000_000);
1487 }
1488
1489 #[rstest]
1490 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1491 let instrument = InstrumentAny::Equity(equity_aapl);
1492 let bar_type = BarType::new(
1493 instrument.id(),
1494 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1495 AggregationSource::Internal,
1496 );
1497 let mut builder = BarBuilder::new(
1498 bar_type,
1499 instrument.price_precision(),
1500 instrument.size_precision(),
1501 );
1502
1503 builder.update(
1504 Price::from("1.00000"),
1505 Quantity::from(1),
1506 UnixNanos::default(),
1507 );
1508
1509 assert!(builder.initialized);
1510 assert_eq!(builder.ts_last, 0);
1511 assert_eq!(builder.count, 1);
1512 }
1513
1514 #[rstest]
1515 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1516 equity_aapl: Equity,
1517 ) {
1518 let instrument = InstrumentAny::Equity(equity_aapl);
1519 let bar_type = BarType::new(
1520 instrument.id(),
1521 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1522 AggregationSource::Internal,
1523 );
1524 let mut builder = BarBuilder::new(bar_type, 2, 0);
1525
1526 builder.update(
1527 Price::from("1.00000"),
1528 Quantity::from(1),
1529 UnixNanos::from(1_000),
1530 );
1531 builder.update(
1532 Price::from("1.00001"),
1533 Quantity::from(1),
1534 UnixNanos::from(500),
1535 );
1536
1537 assert!(builder.initialized);
1538 assert_eq!(builder.ts_last, 1_000);
1539 assert_eq!(builder.count, 1);
1540 }
1541
1542 #[rstest]
1543 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1544 let instrument = InstrumentAny::Equity(equity_aapl);
1545 let bar_type = BarType::new(
1546 instrument.id(),
1547 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1548 AggregationSource::Internal,
1549 );
1550 let mut builder = BarBuilder::new(
1551 bar_type,
1552 instrument.price_precision(),
1553 instrument.size_precision(),
1554 );
1555
1556 for _ in 0..5 {
1557 builder.update(
1558 Price::from("1.00000"),
1559 Quantity::from(1),
1560 UnixNanos::from(1_000),
1561 );
1562 }
1563
1564 assert_eq!(builder.count, 5);
1565 }
1566
1567 #[rstest]
1568 #[should_panic]
1569 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1570 let instrument = InstrumentAny::Equity(equity_aapl);
1571 let bar_type = BarType::new(
1572 instrument.id(),
1573 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1574 AggregationSource::Internal,
1575 );
1576 let mut builder = BarBuilder::new(
1577 bar_type,
1578 instrument.price_precision(),
1579 instrument.size_precision(),
1580 );
1581 let _ = builder.build_now();
1582 }
1583
1584 #[rstest]
1585 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1586 let instrument = InstrumentAny::Equity(equity_aapl);
1587 let bar_type = BarType::new(
1588 instrument.id(),
1589 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1590 AggregationSource::Internal,
1591 );
1592 let mut builder = BarBuilder::new(
1593 bar_type,
1594 instrument.price_precision(),
1595 instrument.size_precision(),
1596 );
1597
1598 builder.update(
1599 Price::from("1.00001"),
1600 Quantity::from(2),
1601 UnixNanos::default(),
1602 );
1603 builder.update(
1604 Price::from("1.00002"),
1605 Quantity::from(2),
1606 UnixNanos::default(),
1607 );
1608 builder.update(
1609 Price::from("1.00000"),
1610 Quantity::from(1),
1611 UnixNanos::from(1_000_000_000),
1612 );
1613
1614 let bar = builder.build_now();
1615
1616 assert_eq!(bar.open, Price::from("1.00001"));
1617 assert_eq!(bar.high, Price::from("1.00002"));
1618 assert_eq!(bar.low, Price::from("1.00000"));
1619 assert_eq!(bar.close, Price::from("1.00000"));
1620 assert_eq!(bar.volume, Quantity::from(5));
1621 assert_eq!(bar.ts_init, 1_000_000_000);
1622 assert_eq!(builder.ts_last, 1_000_000_000);
1623 assert_eq!(builder.count, 0);
1624 }
1625
1626 #[rstest]
1627 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1628 let instrument = InstrumentAny::Equity(equity_aapl);
1629 let bar_type = BarType::new(
1630 instrument.id(),
1631 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1632 AggregationSource::Internal,
1633 );
1634 let mut builder = BarBuilder::new(bar_type, 2, 0);
1635
1636 builder.update(
1637 Price::from("1.00001"),
1638 Quantity::from(1),
1639 UnixNanos::default(),
1640 );
1641 builder.build_now();
1642
1643 builder.update(
1644 Price::from("1.00000"),
1645 Quantity::from(1),
1646 UnixNanos::default(),
1647 );
1648 builder.update(
1649 Price::from("1.00003"),
1650 Quantity::from(1),
1651 UnixNanos::default(),
1652 );
1653 builder.update(
1654 Price::from("1.00002"),
1655 Quantity::from(1),
1656 UnixNanos::default(),
1657 );
1658
1659 let bar = builder.build_now();
1660
1661 assert_eq!(bar.open, Price::from("1.00000"));
1662 assert_eq!(bar.high, Price::from("1.00003"));
1663 assert_eq!(bar.low, Price::from("1.00000"));
1664 assert_eq!(bar.close, Price::from("1.00002"));
1665 assert_eq!(bar.volume, Quantity::from(3));
1666 }
1667
1668 #[rstest]
1669 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1670 let instrument = InstrumentAny::Equity(equity_aapl);
1671 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1672 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1673 let handler = Arc::new(Mutex::new(Vec::new()));
1674 let handler_clone = Arc::clone(&handler);
1675
1676 let mut aggregator = TickBarAggregator::new(
1677 bar_type,
1678 instrument.price_precision(),
1679 instrument.size_precision(),
1680 move |bar: Bar| {
1681 let mut handler_guard = handler_clone.lock().unwrap();
1682 handler_guard.push(bar);
1683 },
1684 false,
1685 );
1686
1687 let trade = TradeTick::default();
1688 aggregator.handle_trade(trade);
1689
1690 let handler_guard = handler.lock().unwrap();
1691 assert_eq!(handler_guard.len(), 0);
1692 }
1693
1694 #[rstest]
1695 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1696 let instrument = InstrumentAny::Equity(equity_aapl);
1697 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1698 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1699 let handler = Arc::new(Mutex::new(Vec::new()));
1700 let handler_clone = Arc::clone(&handler);
1701
1702 let mut aggregator = TickBarAggregator::new(
1703 bar_type,
1704 instrument.price_precision(),
1705 instrument.size_precision(),
1706 move |bar: Bar| {
1707 let mut handler_guard = handler_clone.lock().unwrap();
1708 handler_guard.push(bar);
1709 },
1710 false,
1711 );
1712
1713 let trade = TradeTick::default();
1714 aggregator.handle_trade(trade);
1715 aggregator.handle_trade(trade);
1716 aggregator.handle_trade(trade);
1717
1718 let handler_guard = handler.lock().unwrap();
1719 let bar = handler_guard.first().unwrap();
1720 assert_eq!(handler_guard.len(), 1);
1721 assert_eq!(bar.open, trade.price);
1722 assert_eq!(bar.high, trade.price);
1723 assert_eq!(bar.low, trade.price);
1724 assert_eq!(bar.close, trade.price);
1725 assert_eq!(bar.volume, Quantity::from(300000));
1726 assert_eq!(bar.ts_event, trade.ts_event);
1727 assert_eq!(bar.ts_init, trade.ts_init);
1728 }
1729
1730 #[rstest]
1731 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1732 let instrument = InstrumentAny::Equity(equity_aapl);
1733 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1734 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1735 let handler = Arc::new(Mutex::new(Vec::new()));
1736 let handler_clone = Arc::clone(&handler);
1737
1738 let mut aggregator = TickBarAggregator::new(
1739 bar_type,
1740 instrument.price_precision(),
1741 instrument.size_precision(),
1742 move |bar: Bar| {
1743 let mut handler_guard = handler_clone.lock().unwrap();
1744 handler_guard.push(bar);
1745 },
1746 false,
1747 );
1748
1749 aggregator.update(
1750 Price::from("1.00001"),
1751 Quantity::from(1),
1752 UnixNanos::default(),
1753 );
1754 aggregator.update(
1755 Price::from("1.00002"),
1756 Quantity::from(1),
1757 UnixNanos::from(1000),
1758 );
1759 aggregator.update(
1760 Price::from("1.00003"),
1761 Quantity::from(1),
1762 UnixNanos::from(2000),
1763 );
1764
1765 let handler_guard = handler.lock().unwrap();
1766 assert_eq!(handler_guard.len(), 1);
1767
1768 let bar = handler_guard.first().unwrap();
1769 assert_eq!(bar.open, Price::from("1.00001"));
1770 assert_eq!(bar.high, Price::from("1.00003"));
1771 assert_eq!(bar.low, Price::from("1.00001"));
1772 assert_eq!(bar.close, Price::from("1.00003"));
1773 assert_eq!(bar.volume, Quantity::from(3));
1774 }
1775
1776 #[rstest]
1777 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1778 let instrument = InstrumentAny::Equity(equity_aapl);
1779 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1780 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1781 let handler = Arc::new(Mutex::new(Vec::new()));
1782 let handler_clone = Arc::clone(&handler);
1783
1784 let mut aggregator = TickBarAggregator::new(
1785 bar_type,
1786 instrument.price_precision(),
1787 instrument.size_precision(),
1788 move |bar: Bar| {
1789 let mut handler_guard = handler_clone.lock().unwrap();
1790 handler_guard.push(bar);
1791 },
1792 false,
1793 );
1794
1795 aggregator.update(
1796 Price::from("1.00001"),
1797 Quantity::from(1),
1798 UnixNanos::default(),
1799 );
1800 aggregator.update(
1801 Price::from("1.00002"),
1802 Quantity::from(1),
1803 UnixNanos::from(1000),
1804 );
1805 aggregator.update(
1806 Price::from("1.00003"),
1807 Quantity::from(1),
1808 UnixNanos::from(2000),
1809 );
1810 aggregator.update(
1811 Price::from("1.00004"),
1812 Quantity::from(1),
1813 UnixNanos::from(3000),
1814 );
1815
1816 let handler_guard = handler.lock().unwrap();
1817 assert_eq!(handler_guard.len(), 2);
1818
1819 let bar1 = &handler_guard[0];
1820 assert_eq!(bar1.open, Price::from("1.00001"));
1821 assert_eq!(bar1.close, Price::from("1.00002"));
1822 assert_eq!(bar1.volume, Quantity::from(2));
1823
1824 let bar2 = &handler_guard[1];
1825 assert_eq!(bar2.open, Price::from("1.00003"));
1826 assert_eq!(bar2.close, Price::from("1.00004"));
1827 assert_eq!(bar2.volume, Quantity::from(2));
1828 }
1829
1830 #[rstest]
1831 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1832 let instrument = InstrumentAny::Equity(equity_aapl);
1833 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1834 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1835 let handler = Arc::new(Mutex::new(Vec::new()));
1836 let handler_clone = Arc::clone(&handler);
1837
1838 let mut aggregator = VolumeBarAggregator::new(
1839 bar_type,
1840 instrument.price_precision(),
1841 instrument.size_precision(),
1842 move |bar: Bar| {
1843 let mut handler_guard = handler_clone.lock().unwrap();
1844 handler_guard.push(bar);
1845 },
1846 false,
1847 );
1848
1849 aggregator.update(
1850 Price::from("1.00001"),
1851 Quantity::from(25),
1852 UnixNanos::default(),
1853 );
1854
1855 let handler_guard = handler.lock().unwrap();
1856 assert_eq!(handler_guard.len(), 2);
1857 let bar1 = &handler_guard[0];
1858 assert_eq!(bar1.volume, Quantity::from(10));
1859 let bar2 = &handler_guard[1];
1860 assert_eq!(bar2.volume, Quantity::from(10));
1861 }
1862
1863 #[rstest]
1864 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1865 let instrument = InstrumentAny::Equity(equity_aapl);
1866 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1868 let handler = Arc::new(Mutex::new(Vec::new()));
1869 let handler_clone = Arc::clone(&handler);
1870
1871 let mut aggregator = ValueBarAggregator::new(
1872 bar_type,
1873 instrument.price_precision(),
1874 instrument.size_precision(),
1875 move |bar: Bar| {
1876 let mut handler_guard = handler_clone.lock().unwrap();
1877 handler_guard.push(bar);
1878 },
1879 false,
1880 );
1881
1882 aggregator.update(
1884 Price::from("100.00"),
1885 Quantity::from(5),
1886 UnixNanos::default(),
1887 );
1888 aggregator.update(
1889 Price::from("100.00"),
1890 Quantity::from(5),
1891 UnixNanos::from(1000),
1892 );
1893
1894 let handler_guard = handler.lock().unwrap();
1895 assert_eq!(handler_guard.len(), 1);
1896 let bar = handler_guard.first().unwrap();
1897 assert_eq!(bar.volume, Quantity::from(10));
1898 }
1899
1900 #[rstest]
1901 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1902 let instrument = InstrumentAny::Equity(equity_aapl);
1903 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1904 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1905 let handler = Arc::new(Mutex::new(Vec::new()));
1906 let handler_clone = Arc::clone(&handler);
1907
1908 let mut aggregator = ValueBarAggregator::new(
1909 bar_type,
1910 instrument.price_precision(),
1911 instrument.size_precision(),
1912 move |bar: Bar| {
1913 let mut handler_guard = handler_clone.lock().unwrap();
1914 handler_guard.push(bar);
1915 },
1916 false,
1917 );
1918
1919 aggregator.update(
1921 Price::from("100.00"),
1922 Quantity::from(25),
1923 UnixNanos::default(),
1924 );
1925
1926 let handler_guard = handler.lock().unwrap();
1927 assert_eq!(handler_guard.len(), 2);
1928 let remaining_value = aggregator.get_cumulative_value();
1929 assert!(remaining_value < 1000.0); }
1931
1932 #[rstest]
1933 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1934 let instrument = InstrumentAny::Equity(equity_aapl);
1935 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1937 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1938 let handler = Arc::new(Mutex::new(Vec::new()));
1939 let handler_clone = Arc::clone(&handler);
1940 let clock = Rc::new(RefCell::new(TestClock::new()));
1941
1942 let mut aggregator = TimeBarAggregator::new(
1943 bar_type,
1944 instrument.price_precision(),
1945 instrument.size_precision(),
1946 clock.clone(),
1947 move |bar: Bar| {
1948 let mut handler_guard = handler_clone.lock().unwrap();
1949 handler_guard.push(bar);
1950 },
1951 false, true, false, BarIntervalType::LeftOpen,
1955 None, 15, false, );
1959
1960 aggregator.update(
1961 Price::from("100.00"),
1962 Quantity::from(1),
1963 UnixNanos::default(),
1964 );
1965
1966 let next_sec = UnixNanos::from(1_000_000_000);
1967 clock.borrow_mut().set_time(next_sec);
1968
1969 let event = TimeEvent::new(
1970 Ustr::from("1-SECOND-LAST"),
1971 UUID4::new(),
1972 next_sec,
1973 next_sec,
1974 );
1975 aggregator.build_bar(event);
1976
1977 let handler_guard = handler.lock().unwrap();
1978 assert_eq!(handler_guard.len(), 1);
1979 let bar = handler_guard.first().unwrap();
1980 assert_eq!(bar.ts_event, UnixNanos::default());
1981 assert_eq!(bar.ts_init, next_sec);
1982 }
1983
1984 #[rstest]
1985 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1986 let instrument = InstrumentAny::Equity(equity_aapl);
1987 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1988 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1989 let handler = Arc::new(Mutex::new(Vec::new()));
1990 let handler_clone = Arc::clone(&handler);
1991 let clock = Rc::new(RefCell::new(TestClock::new()));
1992
1993 let mut aggregator = TimeBarAggregator::new(
1994 bar_type,
1995 instrument.price_precision(),
1996 instrument.size_precision(),
1997 clock.clone(),
1998 move |bar: Bar| {
1999 let mut handler_guard = handler_clone.lock().unwrap();
2000 handler_guard.push(bar);
2001 },
2002 false, true, true, BarIntervalType::LeftOpen,
2006 None,
2007 15,
2008 false, );
2010
2011 aggregator.update(
2013 Price::from("100.00"),
2014 Quantity::from(1),
2015 UnixNanos::default(),
2016 );
2017
2018 let ts1 = UnixNanos::from(1_000_000_000);
2020 clock.borrow_mut().set_time(ts1);
2021 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2022 aggregator.build_bar(event);
2023
2024 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2026
2027 let ts2 = UnixNanos::from(2_000_000_000);
2029 clock.borrow_mut().set_time(ts2);
2030 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2031 aggregator.build_bar(event);
2032
2033 let handler_guard = handler.lock().unwrap();
2034 assert_eq!(handler_guard.len(), 2);
2035
2036 let bar1 = &handler_guard[0];
2037 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
2039 assert_eq!(bar1.close, Price::from("100.00"));
2040 let bar2 = &handler_guard[1];
2041 assert_eq!(bar2.ts_event, ts2);
2042 assert_eq!(bar2.ts_init, ts2);
2043 assert_eq!(bar2.close, Price::from("101.00"));
2044 }
2045
2046 #[rstest]
2047 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
2048 let instrument = InstrumentAny::Equity(equity_aapl);
2049 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2050 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2051 let handler = Arc::new(Mutex::new(Vec::new()));
2052 let handler_clone = Arc::clone(&handler);
2053 let clock = Rc::new(RefCell::new(TestClock::new()));
2054 let mut aggregator = TimeBarAggregator::new(
2055 bar_type,
2056 instrument.price_precision(),
2057 instrument.size_precision(),
2058 clock.clone(),
2059 move |bar: Bar| {
2060 let mut handler_guard = handler_clone.lock().unwrap();
2061 handler_guard.push(bar);
2062 },
2063 false, true, true, BarIntervalType::RightOpen,
2067 None,
2068 15,
2069 false, );
2071
2072 aggregator.update(
2074 Price::from("100.00"),
2075 Quantity::from(1),
2076 UnixNanos::default(),
2077 );
2078
2079 let ts1 = UnixNanos::from(1_000_000_000);
2081 clock.borrow_mut().set_time(ts1);
2082 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2083 aggregator.build_bar(event);
2084
2085 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2087
2088 let ts2 = UnixNanos::from(2_000_000_000);
2090 clock.borrow_mut().set_time(ts2);
2091 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2092 aggregator.build_bar(event);
2093
2094 let handler_guard = handler.lock().unwrap();
2095 assert_eq!(handler_guard.len(), 2);
2096
2097 let bar1 = &handler_guard[0];
2098 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
2100 assert_eq!(bar1.close, Price::from("100.00"));
2101
2102 let bar2 = &handler_guard[1];
2103 assert_eq!(bar2.ts_event, ts1);
2104 assert_eq!(bar2.ts_init, ts2);
2105 assert_eq!(bar2.close, Price::from("101.00"));
2106 }
2107
2108 #[rstest]
2109 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2110 let instrument = InstrumentAny::Equity(equity_aapl);
2111 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2112 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2113 let handler = Arc::new(Mutex::new(Vec::new()));
2114 let handler_clone = Arc::clone(&handler);
2115 let clock = Rc::new(RefCell::new(TestClock::new()));
2116
2117 let mut aggregator = TimeBarAggregator::new(
2119 bar_type,
2120 instrument.price_precision(),
2121 instrument.size_precision(),
2122 clock.clone(),
2123 move |bar: Bar| {
2124 let mut handler_guard = handler_clone.lock().unwrap();
2125 handler_guard.push(bar);
2126 },
2127 false, false, true, BarIntervalType::LeftOpen,
2131 None,
2132 15,
2133 false, );
2135
2136 let ts1 = UnixNanos::from(1_000_000_000);
2138 clock.borrow_mut().set_time(ts1);
2139 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2140 aggregator.build_bar(event);
2141
2142 let handler_guard = handler.lock().unwrap();
2143 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
2145
2146 let handler = Arc::new(Mutex::new(Vec::new()));
2148 let handler_clone = Arc::clone(&handler);
2149 let mut aggregator = TimeBarAggregator::new(
2150 bar_type,
2151 instrument.price_precision(),
2152 instrument.size_precision(),
2153 clock.clone(),
2154 move |bar: Bar| {
2155 let mut handler_guard = handler_clone.lock().unwrap();
2156 handler_guard.push(bar);
2157 },
2158 false,
2159 true, true, BarIntervalType::LeftOpen,
2162 None,
2163 15,
2164 false, );
2166
2167 aggregator.update(
2168 Price::from("100.00"),
2169 Quantity::from(1),
2170 UnixNanos::default(),
2171 );
2172
2173 let ts1 = UnixNanos::from(1_000_000_000);
2175 clock.borrow_mut().set_time(ts1);
2176 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2177 aggregator.build_bar(event);
2178
2179 let ts2 = UnixNanos::from(2_000_000_000);
2181 clock.borrow_mut().set_time(ts2);
2182 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2183 aggregator.build_bar(event);
2184
2185 let handler_guard = handler.lock().unwrap();
2186 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
2188 assert_eq!(bar1.close, Price::from("100.00"));
2189 let bar2 = &handler_guard[1];
2190 assert_eq!(bar2.close, Price::from("100.00")); }
2192
2193 #[rstest]
2194 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2195 let instrument = InstrumentAny::Equity(equity_aapl);
2196 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2197 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2198 let clock = Rc::new(RefCell::new(TestClock::new()));
2199 let handler = Arc::new(Mutex::new(Vec::new()));
2200 let handler_clone = Arc::clone(&handler);
2201
2202 let mut aggregator = TimeBarAggregator::new(
2203 bar_type,
2204 instrument.price_precision(),
2205 instrument.size_precision(),
2206 clock.clone(),
2207 move |bar: Bar| {
2208 let mut handler_guard = handler_clone.lock().unwrap();
2209 handler_guard.push(bar);
2210 },
2211 false, true, true, BarIntervalType::RightOpen,
2215 None,
2216 15,
2217 false, );
2219
2220 let ts1 = UnixNanos::from(1_000_000_000);
2221 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2222
2223 let ts2 = UnixNanos::from(2_000_000_000);
2224 clock.borrow_mut().set_time(ts2);
2225
2226 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2228 aggregator.build_bar(event);
2229
2230 let handler_guard = handler.lock().unwrap();
2231 let bar = handler_guard.first().unwrap();
2232 assert_eq!(bar.ts_event, UnixNanos::default());
2233 assert_eq!(bar.ts_init, ts2);
2234 }
2235
2236 #[rstest]
2237 fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2238 let instrument = InstrumentAny::Equity(equity_aapl);
2239 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2240 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2241 let clock = Rc::new(RefCell::new(TestClock::new()));
2242 let handler = Arc::new(Mutex::new(Vec::new()));
2243 let handler_clone = Arc::clone(&handler);
2244
2245 let mut aggregator = TimeBarAggregator::new(
2246 bar_type,
2247 instrument.price_precision(),
2248 instrument.size_precision(),
2249 clock.clone(),
2250 move |bar: Bar| {
2251 let mut handler_guard = handler_clone.lock().unwrap();
2252 handler_guard.push(bar);
2253 },
2254 false, true, true, BarIntervalType::LeftOpen,
2258 None,
2259 15,
2260 false, );
2262
2263 let ts1 = UnixNanos::from(1_000_000_000);
2264 clock.borrow_mut().set_time(ts1);
2265
2266 let initial_time = clock.borrow().utc_now();
2267 aggregator.start_batch_time(UnixNanos::from(
2268 initial_time.timestamp_nanos_opt().unwrap() as u64
2269 ));
2270
2271 let handler_guard = handler.lock().unwrap();
2272 assert_eq!(handler_guard.len(), 0);
2273 }
2274}