1use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25 clock::Clock,
26 timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29 SharedCell, UnixNanos, WeakCell,
30 correctness::{self, FAILED},
31 datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34 data::{
35 QuoteTick, TradeTick,
36 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37 },
38 enums::{AggregationSource, BarAggregation, BarIntervalType},
39 types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
40};
41
42pub trait BarAggregator: Any + Debug {
46 fn bar_type(&self) -> BarType;
48 fn is_running(&self) -> bool;
50 fn set_await_partial(&mut self, value: bool);
51 fn set_is_running(&mut self, value: bool);
53 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
56 fn handle_quote(&mut self, quote: QuoteTick) {
58 let spec = self.bar_type().spec();
59 if !self.await_partial() {
60 self.update(
61 quote.extract_price(spec.price_type),
62 quote.extract_size(spec.price_type),
63 quote.ts_init,
64 );
65 }
66 }
67 fn handle_trade(&mut self, trade: TradeTick) {
69 if !self.await_partial() {
70 self.update(trade.price, trade.size, trade.ts_init);
71 }
72 }
73 fn handle_bar(&mut self, bar: Bar) {
75 if !self.await_partial() {
76 self.update_bar(bar, bar.volume, bar.ts_init);
77 }
78 }
79 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
80 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
82 fn stop_batch_update(&mut self);
84 fn await_partial(&self) -> bool;
86 fn set_partial(&mut self, partial_bar: Bar);
89 fn stop(&mut self) {}
91}
92
93impl dyn BarAggregator {
94 pub fn as_any(&self) -> &dyn Any {
96 self
97 }
98 pub fn as_any_mut(&mut self) -> &mut dyn Any {
100 self
101 }
102}
103
104#[derive(Debug)]
106pub struct BarBuilder {
107 bar_type: BarType,
108 price_precision: u8,
109 size_precision: u8,
110 initialized: bool,
111 ts_last: UnixNanos,
112 count: usize,
113 partial_set: bool,
114 last_close: Option<Price>,
115 open: Option<Price>,
116 high: Option<Price>,
117 low: Option<Price>,
118 close: Option<Price>,
119 volume: Quantity,
120}
121
122impl BarBuilder {
123 #[must_use]
131 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
132 correctness::check_equal(
133 &bar_type.aggregation_source(),
134 &AggregationSource::Internal,
135 "bar_type.aggregation_source",
136 "AggregationSource::Internal",
137 )
138 .expect(FAILED);
139
140 Self {
141 bar_type,
142 price_precision,
143 size_precision,
144 initialized: false,
145 ts_last: UnixNanos::default(),
146 count: 0,
147 partial_set: false,
148 last_close: None,
149 open: None,
150 high: None,
151 low: None,
152 close: None,
153 volume: Quantity::zero(size_precision),
154 }
155 }
156
157 pub fn set_partial(&mut self, partial_bar: Bar) {
163 if self.partial_set {
164 return; }
166
167 self.open = Some(partial_bar.open);
168
169 if self.high.is_none() || partial_bar.high > self.high.unwrap() {
170 self.high = Some(partial_bar.high);
171 }
172
173 if self.low.is_none() || partial_bar.low < self.low.unwrap() {
174 self.low = Some(partial_bar.low);
175 }
176
177 if self.close.is_none() {
178 self.close = Some(partial_bar.close);
179 }
180
181 self.volume = partial_bar.volume;
182
183 if self.ts_last == 0 {
184 self.ts_last = partial_bar.ts_init;
185 }
186
187 self.partial_set = true;
188 self.initialized = true;
189 }
190
191 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
197 if ts_init < self.ts_last {
198 return; }
200
201 if self.open.is_none() {
202 self.open = Some(price);
203 self.high = Some(price);
204 self.low = Some(price);
205 self.initialized = true;
206 } else {
207 if price > self.high.unwrap() {
208 self.high = Some(price);
209 }
210 if price < self.low.unwrap() {
211 self.low = Some(price);
212 }
213 }
214
215 self.close = Some(price);
216 self.volume = self.volume.add(size);
217 self.count += 1;
218 self.ts_last = ts_init;
219 }
220
221 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
227 if ts_init < self.ts_last {
228 return; }
230
231 if self.open.is_none() {
232 self.open = Some(bar.open);
233 self.high = Some(bar.high);
234 self.low = Some(bar.low);
235 self.initialized = true;
236 } else {
237 if bar.high > self.high.unwrap() {
238 self.high = Some(bar.high);
239 }
240 if bar.low < self.low.unwrap() {
241 self.low = Some(bar.low);
242 }
243 }
244
245 self.close = Some(bar.close);
246 self.volume = self.volume.add(volume);
247 self.count += 1;
248 self.ts_last = ts_init;
249 }
250
251 pub fn reset(&mut self) {
255 self.open = None;
256 self.high = None;
257 self.low = None;
258 self.volume = Quantity::zero(self.size_precision);
259 self.count = 0;
260 }
261
262 pub fn build_now(&mut self) -> Bar {
264 self.build(self.ts_last, self.ts_last)
265 }
266
267 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
273 if self.open.is_none() {
274 self.open = self.last_close;
275 self.high = self.last_close;
276 self.low = self.last_close;
277 self.close = self.last_close;
278 }
279
280 if let (Some(close), Some(low)) = (self.close, self.low)
281 && close < low
282 {
283 self.low = Some(close);
284 }
285
286 if let (Some(close), Some(high)) = (self.close, self.high)
287 && close > high
288 {
289 self.high = Some(close);
290 }
291
292 let bar = Bar::new(
294 self.bar_type,
295 self.open.unwrap(),
296 self.high.unwrap(),
297 self.low.unwrap(),
298 self.close.unwrap(),
299 self.volume,
300 ts_event,
301 ts_init,
302 );
303
304 self.last_close = self.close;
305 self.reset();
306 bar
307 }
308}
309
310pub struct BarAggregatorCore<H>
312where
313 H: FnMut(Bar),
314{
315 bar_type: BarType,
316 builder: BarBuilder,
317 handler: H,
318 handler_backup: Option<H>,
319 batch_handler: Option<Box<dyn FnMut(Bar)>>,
320 await_partial: bool,
321 is_running: bool,
322 batch_mode: bool,
323}
324
325impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 f.debug_struct(stringify!(BarAggregatorCore))
328 .field("bar_type", &self.bar_type)
329 .field("builder", &self.builder)
330 .field("await_partial", &self.await_partial)
331 .field("is_running", &self.is_running)
332 .field("batch_mode", &self.batch_mode)
333 .finish()
334 }
335}
336
337impl<H> BarAggregatorCore<H>
338where
339 H: FnMut(Bar),
340{
341 pub fn new(
349 bar_type: BarType,
350 price_precision: u8,
351 size_precision: u8,
352 handler: H,
353 await_partial: bool,
354 ) -> Self {
355 Self {
356 bar_type,
357 builder: BarBuilder::new(bar_type, price_precision, size_precision),
358 handler,
359 handler_backup: None,
360 batch_handler: None,
361 await_partial,
362 is_running: false,
363 batch_mode: false,
364 }
365 }
366
367 pub const fn set_await_partial(&mut self, value: bool) {
369 self.await_partial = value;
370 }
371
372 pub const fn set_is_running(&mut self, value: bool) {
374 self.is_running = value;
375 }
376
377 pub const fn await_partial(&self) -> bool {
379 self.await_partial
380 }
381
382 pub fn set_partial(&mut self, partial_bar: Bar) {
384 self.builder.set_partial(partial_bar);
385 }
386
387 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
388 self.builder.update(price, size, ts_init);
389 }
390
391 fn build_now_and_send(&mut self) {
392 let bar = self.builder.build_now();
393 (self.handler)(bar);
394 }
395
396 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
397 let bar = self.builder.build(ts_event, ts_init);
398
399 if self.batch_mode {
400 if let Some(handler) = &mut self.batch_handler {
401 handler(bar);
402 }
403 } else {
404 (self.handler)(bar);
405 }
406 }
407
408 pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
410 self.batch_mode = true;
411 self.batch_handler = Some(handler);
412 }
413
414 pub fn stop_batch_update(&mut self) {
416 self.batch_mode = false;
417
418 if let Some(handler) = self.handler_backup.take() {
419 self.handler = handler;
420 }
421 }
422}
423
424pub struct TickBarAggregator<H>
429where
430 H: FnMut(Bar),
431{
432 core: BarAggregatorCore<H>,
433 cum_value: f64,
434}
435
436impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 f.debug_struct(stringify!(TickBarAggregator))
439 .field("core", &self.core)
440 .field("cum_value", &self.cum_value)
441 .finish()
442 }
443}
444
445impl<H> TickBarAggregator<H>
446where
447 H: FnMut(Bar),
448{
449 pub fn new(
457 bar_type: BarType,
458 price_precision: u8,
459 size_precision: u8,
460 handler: H,
461 await_partial: bool,
462 ) -> Self {
463 Self {
464 core: BarAggregatorCore::new(
465 bar_type,
466 price_precision,
467 size_precision,
468 handler,
469 await_partial,
470 ),
471 cum_value: 0.0,
472 }
473 }
474}
475
476impl<H> BarAggregator for TickBarAggregator<H>
477where
478 H: FnMut(Bar) + 'static,
479{
480 fn bar_type(&self) -> BarType {
481 self.core.bar_type
482 }
483
484 fn is_running(&self) -> bool {
485 self.core.is_running
486 }
487
488 fn set_await_partial(&mut self, value: bool) {
489 self.core.set_await_partial(value);
490 }
491
492 fn set_is_running(&mut self, value: bool) {
493 self.core.set_is_running(value);
494 }
495
496 fn await_partial(&self) -> bool {
497 self.core.await_partial()
498 }
499
500 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
502 self.core.apply_update(price, size, ts_init);
503 let spec = self.core.bar_type.spec();
504
505 if self.core.builder.count >= spec.step.get() {
506 self.core.build_now_and_send();
507 }
508 }
509
510 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
511 let mut volume_update = volume;
512 let average_price = Price::new(
513 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
514 self.core.builder.price_precision,
515 );
516
517 while volume_update.as_f64() > 0.0 {
518 let value_update = average_price.as_f64() * volume_update.as_f64();
519 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
520 self.cum_value += value_update;
521 self.core.builder.update_bar(bar, volume_update, ts_init);
522 break;
523 }
524
525 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
526 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
527 self.core.builder.update_bar(
528 bar,
529 Quantity::new(volume_diff, volume_update.precision),
530 ts_init,
531 );
532
533 self.core.build_now_and_send();
534 self.cum_value = 0.0;
535 volume_update = Quantity::new(
536 volume_update.as_f64() - volume_diff,
537 volume_update.precision,
538 );
539 }
540 }
541
542 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
543 self.core.start_batch_update(handler);
544 }
545
546 fn stop_batch_update(&mut self) {
547 self.core.stop_batch_update();
548 }
549
550 fn set_partial(&mut self, partial_bar: Bar) {
551 self.core.set_partial(partial_bar);
552 }
553}
554
555pub struct VolumeBarAggregator<H>
557where
558 H: FnMut(Bar),
559{
560 core: BarAggregatorCore<H>,
561}
562
563impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
564 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
565 f.debug_struct(stringify!(VolumeBarAggregator))
566 .field("core", &self.core)
567 .finish()
568 }
569}
570
571impl<H> VolumeBarAggregator<H>
572where
573 H: FnMut(Bar),
574{
575 pub fn new(
583 bar_type: BarType,
584 price_precision: u8,
585 size_precision: u8,
586 handler: H,
587 await_partial: bool,
588 ) -> Self {
589 Self {
590 core: BarAggregatorCore::new(
591 bar_type.standard(),
592 price_precision,
593 size_precision,
594 handler,
595 await_partial,
596 ),
597 }
598 }
599}
600
601impl<H> BarAggregator for VolumeBarAggregator<H>
602where
603 H: FnMut(Bar) + 'static,
604{
605 fn bar_type(&self) -> BarType {
606 self.core.bar_type
607 }
608
609 fn is_running(&self) -> bool {
610 self.core.is_running
611 }
612
613 fn set_await_partial(&mut self, value: bool) {
614 self.core.set_await_partial(value);
615 }
616
617 fn set_is_running(&mut self, value: bool) {
618 self.core.set_is_running(value);
619 }
620
621 fn await_partial(&self) -> bool {
622 self.core.await_partial()
623 }
624
625 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
627 let mut raw_size_update = size.raw;
628 let spec = self.core.bar_type.spec();
629 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
630
631 while raw_size_update > 0 {
632 if self.core.builder.volume.raw + raw_size_update < raw_step {
633 self.core.apply_update(
634 price,
635 Quantity::from_raw(raw_size_update, size.precision),
636 ts_init,
637 );
638 break;
639 }
640
641 let raw_size_diff = raw_step - self.core.builder.volume.raw;
642 self.core.apply_update(
643 price,
644 Quantity::from_raw(raw_size_diff, size.precision),
645 ts_init,
646 );
647
648 self.core.build_now_and_send();
649 raw_size_update -= raw_size_diff;
650 }
651 }
652
653 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
654 let mut raw_volume_update = volume.raw;
655 let spec = self.core.bar_type.spec();
656 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
657
658 while raw_volume_update > 0 {
659 if self.core.builder.volume.raw + raw_volume_update < raw_step {
660 self.core.builder.update_bar(
661 bar,
662 Quantity::from_raw(raw_volume_update, volume.precision),
663 ts_init,
664 );
665 break;
666 }
667
668 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
669 self.core.builder.update_bar(
670 bar,
671 Quantity::from_raw(raw_volume_diff, volume.precision),
672 ts_init,
673 );
674
675 self.core.build_now_and_send();
676 raw_volume_update -= raw_volume_diff;
677 }
678 }
679
680 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
681 self.core.start_batch_update(handler);
682 }
683
684 fn stop_batch_update(&mut self) {
685 self.core.stop_batch_update();
686 }
687
688 fn set_partial(&mut self, partial_bar: Bar) {
689 self.core.set_partial(partial_bar);
690 }
691}
692
693pub struct ValueBarAggregator<H>
698where
699 H: FnMut(Bar),
700{
701 core: BarAggregatorCore<H>,
702 cum_value: f64,
703}
704
705impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707 f.debug_struct(stringify!(ValueBarAggregator))
708 .field("core", &self.core)
709 .field("cum_value", &self.cum_value)
710 .finish()
711 }
712}
713
714impl<H> ValueBarAggregator<H>
715where
716 H: FnMut(Bar),
717{
718 pub fn new(
726 bar_type: BarType,
727 price_precision: u8,
728 size_precision: u8,
729 handler: H,
730 await_partial: bool,
731 ) -> Self {
732 Self {
733 core: BarAggregatorCore::new(
734 bar_type.standard(),
735 price_precision,
736 size_precision,
737 handler,
738 await_partial,
739 ),
740 cum_value: 0.0,
741 }
742 }
743
744 #[must_use]
745 pub const fn get_cumulative_value(&self) -> f64 {
747 self.cum_value
748 }
749}
750
751impl<H> BarAggregator for ValueBarAggregator<H>
752where
753 H: FnMut(Bar) + 'static,
754{
755 fn bar_type(&self) -> BarType {
756 self.core.bar_type
757 }
758
759 fn is_running(&self) -> bool {
760 self.core.is_running
761 }
762
763 fn set_await_partial(&mut self, value: bool) {
764 self.core.set_await_partial(value);
765 }
766
767 fn set_is_running(&mut self, value: bool) {
768 self.core.set_is_running(value);
769 }
770
771 fn await_partial(&self) -> bool {
772 self.core.await_partial()
773 }
774
775 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
777 let mut size_update = size.as_f64();
778 let spec = self.core.bar_type.spec();
779
780 while size_update > 0.0 {
781 let value_update = price.as_f64() * size_update;
782 if self.cum_value + value_update < spec.step.get() as f64 {
783 self.cum_value += value_update;
784 self.core
785 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
786 break;
787 }
788
789 let value_diff = spec.step.get() as f64 - self.cum_value;
790 let size_diff = size_update * (value_diff / value_update);
791 self.core
792 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
793
794 self.core.build_now_and_send();
795 self.cum_value = 0.0;
796 size_update -= size_diff;
797 }
798 }
799
800 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
801 let mut volume_update = volume;
802 let average_price = Price::new(
803 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
804 self.core.builder.price_precision,
805 );
806
807 while volume_update.as_f64() > 0.0 {
808 let value_update = average_price.as_f64() * volume_update.as_f64();
809 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
810 self.cum_value += value_update;
811 self.core.builder.update_bar(bar, volume_update, ts_init);
812 break;
813 }
814
815 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
816 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
817 self.core.builder.update_bar(
818 bar,
819 Quantity::new(volume_diff, volume_update.precision),
820 ts_init,
821 );
822
823 self.core.build_now_and_send();
824 self.cum_value = 0.0;
825 volume_update = Quantity::new(
826 volume_update.as_f64() - volume_diff,
827 volume_update.precision,
828 );
829 }
830 }
831
832 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
833 self.core.start_batch_update(handler);
834 }
835
836 fn stop_batch_update(&mut self) {
837 self.core.stop_batch_update();
838 }
839
840 fn set_partial(&mut self, partial_bar: Bar) {
841 self.core.set_partial(partial_bar);
842 }
843}
844
845pub struct TimeBarAggregator<H>
849where
850 H: FnMut(Bar),
851{
852 core: BarAggregatorCore<H>,
853 clock: Rc<RefCell<dyn Clock>>,
854 build_with_no_updates: bool,
855 timestamp_on_close: bool,
856 is_left_open: bool,
857 build_on_next_tick: bool,
858 stored_open_ns: UnixNanos,
859 stored_close_ns: UnixNanos,
860 timer_name: String,
861 interval_ns: UnixNanos,
862 next_close_ns: UnixNanos,
863 bar_build_delay: u64,
864 batch_open_ns: UnixNanos,
865 batch_next_close_ns: UnixNanos,
866 time_bars_origin_offset: Option<TimeDelta>,
867 skip_first_non_full_bar: bool,
868}
869
870impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
871 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872 f.debug_struct(stringify!(TimeBarAggregator))
873 .field("core", &self.core)
874 .field("build_with_no_updates", &self.build_with_no_updates)
875 .field("timestamp_on_close", &self.timestamp_on_close)
876 .field("is_left_open", &self.is_left_open)
877 .field("timer_name", &self.timer_name)
878 .field("interval_ns", &self.interval_ns)
879 .field("bar_build_delay", &self.bar_build_delay)
880 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
881 .finish()
882 }
883}
884
885#[derive(Clone, Debug)]
886pub struct NewBarCallback<H: FnMut(Bar)> {
893 aggregator: WeakCell<TimeBarAggregator<H>>,
894}
895
896impl<H: FnMut(Bar)> NewBarCallback<H> {
897 #[must_use]
899 pub fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
900 let shared: SharedCell<TimeBarAggregator<H>> = SharedCell::from(aggregator);
901 Self {
902 aggregator: shared.downgrade(),
903 }
904 }
905}
906
907impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
908 fn from(value: NewBarCallback<H>) -> Self {
909 Self::Rust(Rc::new(move |event: TimeEvent| {
910 if let Some(agg) = value.aggregator.upgrade() {
911 agg.borrow_mut().build_bar(event);
912 }
913 }))
914 }
915}
916
917impl<H> TimeBarAggregator<H>
918where
919 H: FnMut(Bar) + 'static,
920{
921 #[allow(clippy::too_many_arguments)]
929 pub fn new(
930 bar_type: BarType,
931 price_precision: u8,
932 size_precision: u8,
933 clock: Rc<RefCell<dyn Clock>>,
934 handler: H,
935 await_partial: bool,
936 build_with_no_updates: bool,
937 timestamp_on_close: bool,
938 interval_type: BarIntervalType,
939 time_bars_origin_offset: Option<TimeDelta>,
940 bar_build_delay: u64,
941 skip_first_non_full_bar: bool,
942 ) -> Self {
943 let is_left_open = match interval_type {
944 BarIntervalType::LeftOpen => true,
945 BarIntervalType::RightOpen => false,
946 };
947
948 let core = BarAggregatorCore::new(
949 bar_type.standard(),
950 price_precision,
951 size_precision,
952 handler,
953 await_partial,
954 );
955
956 Self {
957 core,
958 clock,
959 build_with_no_updates,
960 timestamp_on_close,
961 is_left_open,
962 build_on_next_tick: false,
963 stored_open_ns: UnixNanos::default(),
964 stored_close_ns: UnixNanos::default(),
965 timer_name: bar_type.to_string(),
966 interval_ns: get_bar_interval_ns(&bar_type),
967 next_close_ns: UnixNanos::default(),
968 bar_build_delay,
969 batch_open_ns: UnixNanos::default(),
970 batch_next_close_ns: UnixNanos::default(),
971 time_bars_origin_offset,
972 skip_first_non_full_bar,
973 }
974 }
975
976 pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
986 let now = self.clock.borrow().utc_now();
987 let mut start_time =
988 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
989
990 if start_time == now {
991 self.skip_first_non_full_bar = false;
992 }
993
994 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
995
996 let spec = &self.bar_type().spec();
997 let start_time_ns = UnixNanos::from(start_time);
998
999 if spec.aggregation == BarAggregation::Month {
1000 let step = spec.step.get() as u32;
1001 let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
1002
1003 self.clock
1004 .borrow_mut()
1005 .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1006 .expect(FAILED);
1007 } else {
1008 self.clock
1009 .borrow_mut()
1010 .set_timer_ns(
1011 &self.timer_name,
1012 self.interval_ns.as_u64(),
1013 Some(start_time_ns),
1014 None,
1015 Some(callback.into()),
1016 None,
1017 None,
1018 )
1019 .expect(FAILED);
1020 }
1021
1022 log::debug!("Started timer {}", self.timer_name);
1023 Ok(())
1024 }
1025
1026 pub fn stop(&mut self) {
1028 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1029 }
1030
1031 pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1037 let spec = self.bar_type().spec();
1038 self.core.batch_mode = true;
1039
1040 let time = time_ns.to_datetime_utc();
1041 let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
1042 self.batch_open_ns = UnixNanos::from(start_time);
1043
1044 if spec.aggregation == BarAggregation::Month {
1045 let step = spec.step.get() as u32;
1046
1047 if self.batch_open_ns == time_ns {
1048 self.batch_open_ns =
1049 subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1050 }
1051
1052 self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1053 } else {
1054 if self.batch_open_ns == time_ns {
1055 self.batch_open_ns -= self.interval_ns;
1056 }
1057
1058 self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1059 }
1060 }
1061
1062 const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1063 if self.is_left_open {
1064 if self.timestamp_on_close {
1065 close_ns
1066 } else {
1067 open_ns
1068 }
1069 } else {
1070 open_ns
1071 }
1072 }
1073
1074 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1075 if self.skip_first_non_full_bar {
1076 self.core.builder.reset();
1077 self.skip_first_non_full_bar = false;
1078 } else {
1079 self.core.build_and_send(ts_event, ts_init);
1080 }
1081 }
1082
1083 fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1084 if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1085 let ts_init = self.batch_next_close_ns;
1086 let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1087 self.build_and_send(ts_event, ts_init);
1088 }
1089 }
1090
1091 fn batch_post_update(&mut self, time_ns: UnixNanos) {
1092 let step = self.bar_type().spec().step.get() as u32;
1093
1094 if !self.core.batch_mode
1096 && time_ns == self.batch_next_close_ns
1097 && time_ns > self.stored_open_ns
1098 {
1099 self.batch_next_close_ns = UnixNanos::default();
1100 return;
1101 }
1102
1103 if time_ns > self.batch_next_close_ns {
1104 if self.bar_type().spec().aggregation == BarAggregation::Month {
1106 while self.batch_next_close_ns < time_ns {
1107 self.batch_next_close_ns =
1108 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1109 }
1110
1111 self.batch_open_ns =
1112 subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1113 } else {
1114 while self.batch_next_close_ns < time_ns {
1115 self.batch_next_close_ns += self.interval_ns;
1116 }
1117
1118 self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1119 }
1120 }
1121
1122 if time_ns == self.batch_next_close_ns {
1123 let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1124 self.build_and_send(ts_event, time_ns);
1125 self.batch_open_ns = self.batch_next_close_ns;
1126
1127 if self.bar_type().spec().aggregation == BarAggregation::Month {
1128 self.batch_next_close_ns =
1129 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1130 } else {
1131 self.batch_next_close_ns += self.interval_ns;
1132 }
1133 }
1134
1135 if !self.core.batch_mode {
1137 self.batch_next_close_ns = UnixNanos::default();
1138 }
1139 }
1140
1141 fn build_bar(&mut self, event: TimeEvent) {
1142 if !self.core.builder.initialized {
1143 self.build_on_next_tick = true;
1144 self.stored_close_ns = self.next_close_ns;
1145 return;
1146 }
1147
1148 if !self.build_with_no_updates && self.core.builder.count == 0 {
1149 return;
1150 }
1151
1152 let ts_init = event.ts_event;
1153 let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1154 self.build_and_send(ts_event, ts_init);
1155
1156 self.stored_open_ns = ts_init;
1157
1158 if self.bar_type().spec().aggregation == BarAggregation::Month {
1159 let step = self.bar_type().spec().step.get() as u32;
1160 let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1161
1162 self.clock
1163 .borrow_mut()
1164 .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1165 .expect(FAILED);
1166
1167 self.next_close_ns = next_alert_ns;
1168 } else {
1169 self.next_close_ns = self
1170 .clock
1171 .borrow()
1172 .next_time_ns(&self.timer_name)
1173 .unwrap_or_default();
1174 }
1175 }
1176}
1177
1178impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1179where
1180 H: FnMut(Bar) + 'static,
1181{
1182 fn bar_type(&self) -> BarType {
1183 self.core.bar_type
1184 }
1185
1186 fn is_running(&self) -> bool {
1187 self.core.is_running
1188 }
1189
1190 fn set_await_partial(&mut self, value: bool) {
1191 self.core.set_await_partial(value);
1192 }
1193
1194 fn set_is_running(&mut self, value: bool) {
1195 self.core.set_is_running(value);
1196 }
1197
1198 fn await_partial(&self) -> bool {
1199 self.core.await_partial()
1200 }
1201 fn stop(&mut self) {
1203 Self::stop(self);
1204 }
1205
1206 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1207 if self.batch_next_close_ns != UnixNanos::default() {
1208 self.batch_pre_update(ts_init);
1209 }
1210
1211 self.core.apply_update(price, size, ts_init);
1212
1213 if self.build_on_next_tick {
1214 if ts_init <= self.stored_close_ns {
1215 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1216 self.build_and_send(ts_event, ts_init);
1217 }
1218
1219 self.build_on_next_tick = false;
1220 self.stored_close_ns = UnixNanos::default();
1221 }
1222
1223 if self.batch_next_close_ns != UnixNanos::default() {
1224 self.batch_post_update(ts_init);
1225 }
1226 }
1227
1228 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1229 if self.batch_next_close_ns != UnixNanos::default() {
1230 self.batch_pre_update(ts_init);
1231 }
1232
1233 self.core.builder.update_bar(bar, volume, ts_init);
1234
1235 if self.build_on_next_tick {
1236 if ts_init <= self.stored_close_ns {
1237 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1238 self.build_and_send(ts_event, ts_init);
1239 }
1240
1241 self.build_on_next_tick = false;
1243 self.stored_close_ns = UnixNanos::default();
1244 }
1245
1246 if self.batch_next_close_ns != UnixNanos::default() {
1247 self.batch_post_update(ts_init);
1248 }
1249 }
1250
1251 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1252 self.core.start_batch_update(handler);
1253 self.start_batch_time(time_ns);
1254 }
1255
1256 fn stop_batch_update(&mut self) {
1257 self.core.stop_batch_update();
1258 }
1259
1260 fn set_partial(&mut self, partial_bar: Bar) {
1261 self.core.set_partial(partial_bar);
1262 }
1263}
1264
1265#[cfg(test)]
1269mod tests {
1270 use std::sync::{Arc, Mutex};
1271
1272 use nautilus_common::clock::TestClock;
1273 use nautilus_core::UUID4;
1274 use nautilus_model::{
1275 data::{BarSpecification, BarType},
1276 enums::{AggregationSource, BarAggregation, PriceType},
1277 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1278 types::{Price, Quantity},
1279 };
1280 use rstest::rstest;
1281 use ustr::Ustr;
1282
1283 use super::*;
1284
1285 #[rstest]
1286 fn test_bar_builder_initialization(equity_aapl: Equity) {
1287 let instrument = InstrumentAny::Equity(equity_aapl);
1288 let bar_type = BarType::new(
1289 instrument.id(),
1290 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1291 AggregationSource::Internal,
1292 );
1293 let builder = BarBuilder::new(
1294 bar_type,
1295 instrument.price_precision(),
1296 instrument.size_precision(),
1297 );
1298
1299 assert!(!builder.initialized);
1300 assert_eq!(builder.ts_last, 0);
1301 assert_eq!(builder.count, 0);
1302 }
1303
1304 #[rstest]
1305 fn test_set_partial_update(equity_aapl: Equity) {
1306 let instrument = InstrumentAny::Equity(equity_aapl);
1307 let bar_type = BarType::new(
1308 instrument.id(),
1309 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1310 AggregationSource::Internal,
1311 );
1312 let mut builder = BarBuilder::new(
1313 bar_type,
1314 instrument.price_precision(),
1315 instrument.size_precision(),
1316 );
1317
1318 let partial_bar = Bar::new(
1319 bar_type,
1320 Price::from("101.00"),
1321 Price::from("102.00"),
1322 Price::from("100.00"),
1323 Price::from("101.00"),
1324 Quantity::from(100),
1325 UnixNanos::from(1),
1326 UnixNanos::from(2),
1327 );
1328
1329 builder.set_partial(partial_bar);
1330 let bar = builder.build_now();
1331
1332 assert_eq!(bar.open, partial_bar.open);
1333 assert_eq!(bar.high, partial_bar.high);
1334 assert_eq!(bar.low, partial_bar.low);
1335 assert_eq!(bar.close, partial_bar.close);
1336 assert_eq!(bar.volume, partial_bar.volume);
1337 assert_eq!(builder.ts_last, 2);
1338 }
1339
1340 #[rstest]
1341 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1342 let instrument = InstrumentAny::Equity(equity_aapl);
1343 let bar_type = BarType::new(
1344 instrument.id(),
1345 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1346 AggregationSource::Internal,
1347 );
1348 let mut builder = BarBuilder::new(
1349 bar_type,
1350 instrument.price_precision(),
1351 instrument.size_precision(),
1352 );
1353
1354 builder.update(
1355 Price::from("100.00"),
1356 Quantity::from(1),
1357 UnixNanos::from(1000),
1358 );
1359 builder.update(
1360 Price::from("95.00"),
1361 Quantity::from(1),
1362 UnixNanos::from(2000),
1363 );
1364 builder.update(
1365 Price::from("105.00"),
1366 Quantity::from(1),
1367 UnixNanos::from(3000),
1368 );
1369
1370 let bar = builder.build_now();
1371 assert!(bar.high > bar.low);
1372 assert_eq!(bar.open, Price::from("100.00"));
1373 assert_eq!(bar.high, Price::from("105.00"));
1374 assert_eq!(bar.low, Price::from("95.00"));
1375 assert_eq!(bar.close, Price::from("105.00"));
1376 }
1377
1378 #[rstest]
1379 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1380 let instrument = InstrumentAny::Equity(equity_aapl);
1381 let bar_type = BarType::new(
1382 instrument.id(),
1383 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1384 AggregationSource::Internal,
1385 );
1386 let mut builder = BarBuilder::new(
1387 bar_type,
1388 instrument.price_precision(),
1389 instrument.size_precision(),
1390 );
1391
1392 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1393 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1394
1395 assert_eq!(builder.ts_last, 1_000);
1396 assert_eq!(builder.count, 1);
1397 }
1398
1399 #[rstest]
1400 fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1401 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1402 let bar_type = BarType::new(
1403 instrument.id(),
1404 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1405 AggregationSource::Internal,
1406 );
1407 let mut builder = BarBuilder::new(
1408 bar_type,
1409 instrument.price_precision(),
1410 instrument.size_precision(),
1411 );
1412
1413 let partial_bar = Bar::new(
1414 bar_type,
1415 Price::from("1.00001"),
1416 Price::from("1.00010"),
1417 Price::from("1.00000"),
1418 Price::from("1.00002"),
1419 Quantity::from(1),
1420 UnixNanos::from(1_000_000_000),
1421 UnixNanos::from(2_000_000_000),
1422 );
1423
1424 builder.set_partial(partial_bar);
1425 let bar = builder.build_now();
1426
1427 assert_eq!(bar.open, Price::from("1.00001"));
1428 assert_eq!(bar.high, Price::from("1.00010"));
1429 assert_eq!(bar.low, Price::from("1.00000"));
1430 assert_eq!(bar.close, Price::from("1.00002"));
1431 assert_eq!(bar.volume, Quantity::from(1));
1432 assert_eq!(bar.ts_init, 2_000_000_000);
1433 assert_eq!(builder.ts_last, 2_000_000_000);
1434 }
1435
1436 #[rstest]
1437 fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1438 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1439 let bar_type = BarType::new(
1440 instrument.id(),
1441 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1442 AggregationSource::Internal,
1443 );
1444 let mut builder = BarBuilder::new(
1445 bar_type,
1446 instrument.price_precision(),
1447 instrument.size_precision(),
1448 );
1449
1450 let partial_bar1 = Bar::new(
1451 bar_type,
1452 Price::from("1.00001"),
1453 Price::from("1.00010"),
1454 Price::from("1.00000"),
1455 Price::from("1.00002"),
1456 Quantity::from(1),
1457 UnixNanos::from(1_000_000_000),
1458 UnixNanos::from(1_000_000_000),
1459 );
1460
1461 let partial_bar2 = Bar::new(
1462 bar_type,
1463 Price::from("2.00001"),
1464 Price::from("2.00010"),
1465 Price::from("2.00000"),
1466 Price::from("2.00002"),
1467 Quantity::from(2),
1468 UnixNanos::from(3_000_000_000),
1469 UnixNanos::from(3_000_000_000),
1470 );
1471
1472 builder.set_partial(partial_bar1);
1473 builder.set_partial(partial_bar2);
1474 let bar = builder.build(
1475 UnixNanos::from(4_000_000_000),
1476 UnixNanos::from(4_000_000_000),
1477 );
1478
1479 assert_eq!(bar.open, Price::from("1.00001"));
1480 assert_eq!(bar.high, Price::from("1.00010"));
1481 assert_eq!(bar.low, Price::from("1.00000"));
1482 assert_eq!(bar.close, Price::from("1.00002"));
1483 assert_eq!(bar.volume, Quantity::from(1));
1484 assert_eq!(bar.ts_init, 4_000_000_000);
1485 assert_eq!(builder.ts_last, 1_000_000_000);
1486 }
1487
1488 #[rstest]
1489 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1490 let instrument = InstrumentAny::Equity(equity_aapl);
1491 let bar_type = BarType::new(
1492 instrument.id(),
1493 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1494 AggregationSource::Internal,
1495 );
1496 let mut builder = BarBuilder::new(
1497 bar_type,
1498 instrument.price_precision(),
1499 instrument.size_precision(),
1500 );
1501
1502 builder.update(
1503 Price::from("1.00000"),
1504 Quantity::from(1),
1505 UnixNanos::default(),
1506 );
1507
1508 assert!(builder.initialized);
1509 assert_eq!(builder.ts_last, 0);
1510 assert_eq!(builder.count, 1);
1511 }
1512
1513 #[rstest]
1514 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1515 equity_aapl: Equity,
1516 ) {
1517 let instrument = InstrumentAny::Equity(equity_aapl);
1518 let bar_type = BarType::new(
1519 instrument.id(),
1520 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1521 AggregationSource::Internal,
1522 );
1523 let mut builder = BarBuilder::new(bar_type, 2, 0);
1524
1525 builder.update(
1526 Price::from("1.00000"),
1527 Quantity::from(1),
1528 UnixNanos::from(1_000),
1529 );
1530 builder.update(
1531 Price::from("1.00001"),
1532 Quantity::from(1),
1533 UnixNanos::from(500),
1534 );
1535
1536 assert!(builder.initialized);
1537 assert_eq!(builder.ts_last, 1_000);
1538 assert_eq!(builder.count, 1);
1539 }
1540
1541 #[rstest]
1542 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1543 let instrument = InstrumentAny::Equity(equity_aapl);
1544 let bar_type = BarType::new(
1545 instrument.id(),
1546 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1547 AggregationSource::Internal,
1548 );
1549 let mut builder = BarBuilder::new(
1550 bar_type,
1551 instrument.price_precision(),
1552 instrument.size_precision(),
1553 );
1554
1555 for _ in 0..5 {
1556 builder.update(
1557 Price::from("1.00000"),
1558 Quantity::from(1),
1559 UnixNanos::from(1_000),
1560 );
1561 }
1562
1563 assert_eq!(builder.count, 5);
1564 }
1565
1566 #[rstest]
1567 #[should_panic]
1568 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1569 let instrument = InstrumentAny::Equity(equity_aapl);
1570 let bar_type = BarType::new(
1571 instrument.id(),
1572 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1573 AggregationSource::Internal,
1574 );
1575 let mut builder = BarBuilder::new(
1576 bar_type,
1577 instrument.price_precision(),
1578 instrument.size_precision(),
1579 );
1580 let _ = builder.build_now();
1581 }
1582
1583 #[rstest]
1584 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1585 let instrument = InstrumentAny::Equity(equity_aapl);
1586 let bar_type = BarType::new(
1587 instrument.id(),
1588 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1589 AggregationSource::Internal,
1590 );
1591 let mut builder = BarBuilder::new(
1592 bar_type,
1593 instrument.price_precision(),
1594 instrument.size_precision(),
1595 );
1596
1597 builder.update(
1598 Price::from("1.00001"),
1599 Quantity::from(2),
1600 UnixNanos::default(),
1601 );
1602 builder.update(
1603 Price::from("1.00002"),
1604 Quantity::from(2),
1605 UnixNanos::default(),
1606 );
1607 builder.update(
1608 Price::from("1.00000"),
1609 Quantity::from(1),
1610 UnixNanos::from(1_000_000_000),
1611 );
1612
1613 let bar = builder.build_now();
1614
1615 assert_eq!(bar.open, Price::from("1.00001"));
1616 assert_eq!(bar.high, Price::from("1.00002"));
1617 assert_eq!(bar.low, Price::from("1.00000"));
1618 assert_eq!(bar.close, Price::from("1.00000"));
1619 assert_eq!(bar.volume, Quantity::from(5));
1620 assert_eq!(bar.ts_init, 1_000_000_000);
1621 assert_eq!(builder.ts_last, 1_000_000_000);
1622 assert_eq!(builder.count, 0);
1623 }
1624
1625 #[rstest]
1626 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1627 let instrument = InstrumentAny::Equity(equity_aapl);
1628 let bar_type = BarType::new(
1629 instrument.id(),
1630 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1631 AggregationSource::Internal,
1632 );
1633 let mut builder = BarBuilder::new(bar_type, 2, 0);
1634
1635 builder.update(
1636 Price::from("1.00001"),
1637 Quantity::from(1),
1638 UnixNanos::default(),
1639 );
1640 builder.build_now();
1641
1642 builder.update(
1643 Price::from("1.00000"),
1644 Quantity::from(1),
1645 UnixNanos::default(),
1646 );
1647 builder.update(
1648 Price::from("1.00003"),
1649 Quantity::from(1),
1650 UnixNanos::default(),
1651 );
1652 builder.update(
1653 Price::from("1.00002"),
1654 Quantity::from(1),
1655 UnixNanos::default(),
1656 );
1657
1658 let bar = builder.build_now();
1659
1660 assert_eq!(bar.open, Price::from("1.00000"));
1661 assert_eq!(bar.high, Price::from("1.00003"));
1662 assert_eq!(bar.low, Price::from("1.00000"));
1663 assert_eq!(bar.close, Price::from("1.00002"));
1664 assert_eq!(bar.volume, Quantity::from(3));
1665 }
1666
1667 #[rstest]
1668 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1669 let instrument = InstrumentAny::Equity(equity_aapl);
1670 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1671 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1672 let handler = Arc::new(Mutex::new(Vec::new()));
1673 let handler_clone = Arc::clone(&handler);
1674
1675 let mut aggregator = TickBarAggregator::new(
1676 bar_type,
1677 instrument.price_precision(),
1678 instrument.size_precision(),
1679 move |bar: Bar| {
1680 let mut handler_guard = handler_clone.lock().unwrap();
1681 handler_guard.push(bar);
1682 },
1683 false,
1684 );
1685
1686 let trade = TradeTick::default();
1687 aggregator.handle_trade(trade);
1688
1689 let handler_guard = handler.lock().unwrap();
1690 assert_eq!(handler_guard.len(), 0);
1691 }
1692
1693 #[rstest]
1694 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1695 let instrument = InstrumentAny::Equity(equity_aapl);
1696 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1697 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1698 let handler = Arc::new(Mutex::new(Vec::new()));
1699 let handler_clone = Arc::clone(&handler);
1700
1701 let mut aggregator = TickBarAggregator::new(
1702 bar_type,
1703 instrument.price_precision(),
1704 instrument.size_precision(),
1705 move |bar: Bar| {
1706 let mut handler_guard = handler_clone.lock().unwrap();
1707 handler_guard.push(bar);
1708 },
1709 false,
1710 );
1711
1712 let trade = TradeTick::default();
1713 aggregator.handle_trade(trade);
1714 aggregator.handle_trade(trade);
1715 aggregator.handle_trade(trade);
1716
1717 let handler_guard = handler.lock().unwrap();
1718 let bar = handler_guard.first().unwrap();
1719 assert_eq!(handler_guard.len(), 1);
1720 assert_eq!(bar.open, trade.price);
1721 assert_eq!(bar.high, trade.price);
1722 assert_eq!(bar.low, trade.price);
1723 assert_eq!(bar.close, trade.price);
1724 assert_eq!(bar.volume, Quantity::from(300000));
1725 assert_eq!(bar.ts_event, trade.ts_event);
1726 assert_eq!(bar.ts_init, trade.ts_init);
1727 }
1728
1729 #[rstest]
1730 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1731 let instrument = InstrumentAny::Equity(equity_aapl);
1732 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1733 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1734 let handler = Arc::new(Mutex::new(Vec::new()));
1735 let handler_clone = Arc::clone(&handler);
1736
1737 let mut aggregator = TickBarAggregator::new(
1738 bar_type,
1739 instrument.price_precision(),
1740 instrument.size_precision(),
1741 move |bar: Bar| {
1742 let mut handler_guard = handler_clone.lock().unwrap();
1743 handler_guard.push(bar);
1744 },
1745 false,
1746 );
1747
1748 aggregator.update(
1749 Price::from("1.00001"),
1750 Quantity::from(1),
1751 UnixNanos::default(),
1752 );
1753 aggregator.update(
1754 Price::from("1.00002"),
1755 Quantity::from(1),
1756 UnixNanos::from(1000),
1757 );
1758 aggregator.update(
1759 Price::from("1.00003"),
1760 Quantity::from(1),
1761 UnixNanos::from(2000),
1762 );
1763
1764 let handler_guard = handler.lock().unwrap();
1765 assert_eq!(handler_guard.len(), 1);
1766
1767 let bar = handler_guard.first().unwrap();
1768 assert_eq!(bar.open, Price::from("1.00001"));
1769 assert_eq!(bar.high, Price::from("1.00003"));
1770 assert_eq!(bar.low, Price::from("1.00001"));
1771 assert_eq!(bar.close, Price::from("1.00003"));
1772 assert_eq!(bar.volume, Quantity::from(3));
1773 }
1774
1775 #[rstest]
1776 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1777 let instrument = InstrumentAny::Equity(equity_aapl);
1778 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1779 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1780 let handler = Arc::new(Mutex::new(Vec::new()));
1781 let handler_clone = Arc::clone(&handler);
1782
1783 let mut aggregator = TickBarAggregator::new(
1784 bar_type,
1785 instrument.price_precision(),
1786 instrument.size_precision(),
1787 move |bar: Bar| {
1788 let mut handler_guard = handler_clone.lock().unwrap();
1789 handler_guard.push(bar);
1790 },
1791 false,
1792 );
1793
1794 aggregator.update(
1795 Price::from("1.00001"),
1796 Quantity::from(1),
1797 UnixNanos::default(),
1798 );
1799 aggregator.update(
1800 Price::from("1.00002"),
1801 Quantity::from(1),
1802 UnixNanos::from(1000),
1803 );
1804 aggregator.update(
1805 Price::from("1.00003"),
1806 Quantity::from(1),
1807 UnixNanos::from(2000),
1808 );
1809 aggregator.update(
1810 Price::from("1.00004"),
1811 Quantity::from(1),
1812 UnixNanos::from(3000),
1813 );
1814
1815 let handler_guard = handler.lock().unwrap();
1816 assert_eq!(handler_guard.len(), 2);
1817
1818 let bar1 = &handler_guard[0];
1819 assert_eq!(bar1.open, Price::from("1.00001"));
1820 assert_eq!(bar1.close, Price::from("1.00002"));
1821 assert_eq!(bar1.volume, Quantity::from(2));
1822
1823 let bar2 = &handler_guard[1];
1824 assert_eq!(bar2.open, Price::from("1.00003"));
1825 assert_eq!(bar2.close, Price::from("1.00004"));
1826 assert_eq!(bar2.volume, Quantity::from(2));
1827 }
1828
1829 #[rstest]
1830 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1831 let instrument = InstrumentAny::Equity(equity_aapl);
1832 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1833 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1834 let handler = Arc::new(Mutex::new(Vec::new()));
1835 let handler_clone = Arc::clone(&handler);
1836
1837 let mut aggregator = VolumeBarAggregator::new(
1838 bar_type,
1839 instrument.price_precision(),
1840 instrument.size_precision(),
1841 move |bar: Bar| {
1842 let mut handler_guard = handler_clone.lock().unwrap();
1843 handler_guard.push(bar);
1844 },
1845 false,
1846 );
1847
1848 aggregator.update(
1849 Price::from("1.00001"),
1850 Quantity::from(25),
1851 UnixNanos::default(),
1852 );
1853
1854 let handler_guard = handler.lock().unwrap();
1855 assert_eq!(handler_guard.len(), 2);
1856 let bar1 = &handler_guard[0];
1857 assert_eq!(bar1.volume, Quantity::from(10));
1858 let bar2 = &handler_guard[1];
1859 assert_eq!(bar2.volume, Quantity::from(10));
1860 }
1861
1862 #[rstest]
1863 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1864 let instrument = InstrumentAny::Equity(equity_aapl);
1865 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1867 let handler = Arc::new(Mutex::new(Vec::new()));
1868 let handler_clone = Arc::clone(&handler);
1869
1870 let mut aggregator = ValueBarAggregator::new(
1871 bar_type,
1872 instrument.price_precision(),
1873 instrument.size_precision(),
1874 move |bar: Bar| {
1875 let mut handler_guard = handler_clone.lock().unwrap();
1876 handler_guard.push(bar);
1877 },
1878 false,
1879 );
1880
1881 aggregator.update(
1883 Price::from("100.00"),
1884 Quantity::from(5),
1885 UnixNanos::default(),
1886 );
1887 aggregator.update(
1888 Price::from("100.00"),
1889 Quantity::from(5),
1890 UnixNanos::from(1000),
1891 );
1892
1893 let handler_guard = handler.lock().unwrap();
1894 assert_eq!(handler_guard.len(), 1);
1895 let bar = handler_guard.first().unwrap();
1896 assert_eq!(bar.volume, Quantity::from(10));
1897 }
1898
1899 #[rstest]
1900 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1901 let instrument = InstrumentAny::Equity(equity_aapl);
1902 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1903 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1904 let handler = Arc::new(Mutex::new(Vec::new()));
1905 let handler_clone = Arc::clone(&handler);
1906
1907 let mut aggregator = ValueBarAggregator::new(
1908 bar_type,
1909 instrument.price_precision(),
1910 instrument.size_precision(),
1911 move |bar: Bar| {
1912 let mut handler_guard = handler_clone.lock().unwrap();
1913 handler_guard.push(bar);
1914 },
1915 false,
1916 );
1917
1918 aggregator.update(
1920 Price::from("100.00"),
1921 Quantity::from(25),
1922 UnixNanos::default(),
1923 );
1924
1925 let handler_guard = handler.lock().unwrap();
1926 assert_eq!(handler_guard.len(), 2);
1927 let remaining_value = aggregator.get_cumulative_value();
1928 assert!(remaining_value < 1000.0); }
1930
1931 #[rstest]
1932 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1933 let instrument = InstrumentAny::Equity(equity_aapl);
1934 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1936 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1937 let handler = Arc::new(Mutex::new(Vec::new()));
1938 let handler_clone = Arc::clone(&handler);
1939 let clock = Rc::new(RefCell::new(TestClock::new()));
1940
1941 let mut aggregator = TimeBarAggregator::new(
1942 bar_type,
1943 instrument.price_precision(),
1944 instrument.size_precision(),
1945 clock.clone(),
1946 move |bar: Bar| {
1947 let mut handler_guard = handler_clone.lock().unwrap();
1948 handler_guard.push(bar);
1949 },
1950 false, true, false, BarIntervalType::LeftOpen,
1954 None, 15, false, );
1958
1959 aggregator.update(
1960 Price::from("100.00"),
1961 Quantity::from(1),
1962 UnixNanos::default(),
1963 );
1964
1965 let next_sec = UnixNanos::from(1_000_000_000);
1966 clock.borrow_mut().set_time(next_sec);
1967
1968 let event = TimeEvent::new(
1969 Ustr::from("1-SECOND-LAST"),
1970 UUID4::new(),
1971 next_sec,
1972 next_sec,
1973 );
1974 aggregator.build_bar(event);
1975
1976 let handler_guard = handler.lock().unwrap();
1977 assert_eq!(handler_guard.len(), 1);
1978 let bar = handler_guard.first().unwrap();
1979 assert_eq!(bar.ts_event, UnixNanos::default());
1980 assert_eq!(bar.ts_init, next_sec);
1981 }
1982
1983 #[rstest]
1984 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1985 let instrument = InstrumentAny::Equity(equity_aapl);
1986 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1987 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1988 let handler = Arc::new(Mutex::new(Vec::new()));
1989 let handler_clone = Arc::clone(&handler);
1990 let clock = Rc::new(RefCell::new(TestClock::new()));
1991
1992 let mut aggregator = TimeBarAggregator::new(
1993 bar_type,
1994 instrument.price_precision(),
1995 instrument.size_precision(),
1996 clock.clone(),
1997 move |bar: Bar| {
1998 let mut handler_guard = handler_clone.lock().unwrap();
1999 handler_guard.push(bar);
2000 },
2001 false, true, true, BarIntervalType::LeftOpen,
2005 None,
2006 15,
2007 false, );
2009
2010 aggregator.update(
2012 Price::from("100.00"),
2013 Quantity::from(1),
2014 UnixNanos::default(),
2015 );
2016
2017 let ts1 = UnixNanos::from(1_000_000_000);
2019 clock.borrow_mut().set_time(ts1);
2020 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2021 aggregator.build_bar(event);
2022
2023 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2025
2026 let ts2 = UnixNanos::from(2_000_000_000);
2028 clock.borrow_mut().set_time(ts2);
2029 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2030 aggregator.build_bar(event);
2031
2032 let handler_guard = handler.lock().unwrap();
2033 assert_eq!(handler_guard.len(), 2);
2034
2035 let bar1 = &handler_guard[0];
2036 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
2038 assert_eq!(bar1.close, Price::from("100.00"));
2039 let bar2 = &handler_guard[1];
2040 assert_eq!(bar2.ts_event, ts2);
2041 assert_eq!(bar2.ts_init, ts2);
2042 assert_eq!(bar2.close, Price::from("101.00"));
2043 }
2044
2045 #[rstest]
2046 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
2047 let instrument = InstrumentAny::Equity(equity_aapl);
2048 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2049 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2050 let handler = Arc::new(Mutex::new(Vec::new()));
2051 let handler_clone = Arc::clone(&handler);
2052 let clock = Rc::new(RefCell::new(TestClock::new()));
2053 let mut aggregator = TimeBarAggregator::new(
2054 bar_type,
2055 instrument.price_precision(),
2056 instrument.size_precision(),
2057 clock.clone(),
2058 move |bar: Bar| {
2059 let mut handler_guard = handler_clone.lock().unwrap();
2060 handler_guard.push(bar);
2061 },
2062 false, true, true, BarIntervalType::RightOpen,
2066 None,
2067 15,
2068 false, );
2070
2071 aggregator.update(
2073 Price::from("100.00"),
2074 Quantity::from(1),
2075 UnixNanos::default(),
2076 );
2077
2078 let ts1 = UnixNanos::from(1_000_000_000);
2080 clock.borrow_mut().set_time(ts1);
2081 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2082 aggregator.build_bar(event);
2083
2084 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2086
2087 let ts2 = UnixNanos::from(2_000_000_000);
2089 clock.borrow_mut().set_time(ts2);
2090 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2091 aggregator.build_bar(event);
2092
2093 let handler_guard = handler.lock().unwrap();
2094 assert_eq!(handler_guard.len(), 2);
2095
2096 let bar1 = &handler_guard[0];
2097 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
2099 assert_eq!(bar1.close, Price::from("100.00"));
2100
2101 let bar2 = &handler_guard[1];
2102 assert_eq!(bar2.ts_event, ts1);
2103 assert_eq!(bar2.ts_init, ts2);
2104 assert_eq!(bar2.close, Price::from("101.00"));
2105 }
2106
2107 #[rstest]
2108 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2109 let instrument = InstrumentAny::Equity(equity_aapl);
2110 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2111 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2112 let handler = Arc::new(Mutex::new(Vec::new()));
2113 let handler_clone = Arc::clone(&handler);
2114 let clock = Rc::new(RefCell::new(TestClock::new()));
2115
2116 let mut aggregator = TimeBarAggregator::new(
2118 bar_type,
2119 instrument.price_precision(),
2120 instrument.size_precision(),
2121 clock.clone(),
2122 move |bar: Bar| {
2123 let mut handler_guard = handler_clone.lock().unwrap();
2124 handler_guard.push(bar);
2125 },
2126 false, false, true, BarIntervalType::LeftOpen,
2130 None,
2131 15,
2132 false, );
2134
2135 let ts1 = UnixNanos::from(1_000_000_000);
2137 clock.borrow_mut().set_time(ts1);
2138 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2139 aggregator.build_bar(event);
2140
2141 let handler_guard = handler.lock().unwrap();
2142 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
2144
2145 let handler = Arc::new(Mutex::new(Vec::new()));
2147 let handler_clone = Arc::clone(&handler);
2148 let mut aggregator = TimeBarAggregator::new(
2149 bar_type,
2150 instrument.price_precision(),
2151 instrument.size_precision(),
2152 clock.clone(),
2153 move |bar: Bar| {
2154 let mut handler_guard = handler_clone.lock().unwrap();
2155 handler_guard.push(bar);
2156 },
2157 false,
2158 true, true, BarIntervalType::LeftOpen,
2161 None,
2162 15,
2163 false, );
2165
2166 aggregator.update(
2167 Price::from("100.00"),
2168 Quantity::from(1),
2169 UnixNanos::default(),
2170 );
2171
2172 let ts1 = UnixNanos::from(1_000_000_000);
2174 clock.borrow_mut().set_time(ts1);
2175 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2176 aggregator.build_bar(event);
2177
2178 let ts2 = UnixNanos::from(2_000_000_000);
2180 clock.borrow_mut().set_time(ts2);
2181 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2182 aggregator.build_bar(event);
2183
2184 let handler_guard = handler.lock().unwrap();
2185 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
2187 assert_eq!(bar1.close, Price::from("100.00"));
2188 let bar2 = &handler_guard[1];
2189 assert_eq!(bar2.close, Price::from("100.00")); }
2191
2192 #[rstest]
2193 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2194 let instrument = InstrumentAny::Equity(equity_aapl);
2195 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2196 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2197 let clock = Rc::new(RefCell::new(TestClock::new()));
2198 let handler = Arc::new(Mutex::new(Vec::new()));
2199 let handler_clone = Arc::clone(&handler);
2200
2201 let mut aggregator = TimeBarAggregator::new(
2202 bar_type,
2203 instrument.price_precision(),
2204 instrument.size_precision(),
2205 clock.clone(),
2206 move |bar: Bar| {
2207 let mut handler_guard = handler_clone.lock().unwrap();
2208 handler_guard.push(bar);
2209 },
2210 false, true, true, BarIntervalType::RightOpen,
2214 None,
2215 15,
2216 false, );
2218
2219 let ts1 = UnixNanos::from(1_000_000_000);
2220 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2221
2222 let ts2 = UnixNanos::from(2_000_000_000);
2223 clock.borrow_mut().set_time(ts2);
2224
2225 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2227 aggregator.build_bar(event);
2228
2229 let handler_guard = handler.lock().unwrap();
2230 let bar = handler_guard.first().unwrap();
2231 assert_eq!(bar.ts_event, UnixNanos::default());
2232 assert_eq!(bar.ts_init, ts2);
2233 }
2234
2235 #[rstest]
2236 fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2237 let instrument = InstrumentAny::Equity(equity_aapl);
2238 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2239 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2240 let clock = Rc::new(RefCell::new(TestClock::new()));
2241 let handler = Arc::new(Mutex::new(Vec::new()));
2242 let handler_clone = Arc::clone(&handler);
2243
2244 let mut aggregator = TimeBarAggregator::new(
2245 bar_type,
2246 instrument.price_precision(),
2247 instrument.size_precision(),
2248 clock.clone(),
2249 move |bar: Bar| {
2250 let mut handler_guard = handler_clone.lock().unwrap();
2251 handler_guard.push(bar);
2252 },
2253 false, true, true, BarIntervalType::LeftOpen,
2257 None,
2258 15,
2259 false, );
2261
2262 let ts1 = UnixNanos::from(1_000_000_000);
2263 clock.borrow_mut().set_time(ts1);
2264
2265 let initial_time = clock.borrow().utc_now();
2266 aggregator.start_batch_time(UnixNanos::from(
2267 initial_time.timestamp_nanos_opt().unwrap() as u64
2268 ));
2269
2270 let handler_guard = handler.lock().unwrap();
2271 assert_eq!(handler_guard.len(), 0);
2272 }
2273}