1use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25 clock::Clock,
26 timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29 UnixNanos,
30 correctness::{self, FAILED},
31 datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34 data::{
35 QuoteTick, TradeTick,
36 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37 },
38 enums::{AggregationSource, BarAggregation, BarIntervalType},
39 types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
40};
41
42pub trait BarAggregator: Any + Debug {
46 fn bar_type(&self) -> BarType;
48 fn is_running(&self) -> bool;
50 fn set_await_partial(&mut self, value: bool);
51 fn set_is_running(&mut self, value: bool);
53 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
56 fn handle_quote(&mut self, quote: QuoteTick) {
58 let spec = self.bar_type().spec();
59 if !self.await_partial() {
60 self.update(
61 quote.extract_price(spec.price_type),
62 quote.extract_size(spec.price_type),
63 quote.ts_event,
64 );
65 }
66 }
67 fn handle_trade(&mut self, trade: TradeTick) {
69 if !self.await_partial() {
70 self.update(trade.price, trade.size, trade.ts_event);
71 }
72 }
73 fn handle_bar(&mut self, bar: Bar) {
75 if !self.await_partial() {
76 self.update_bar(bar, bar.volume, bar.ts_init);
77 }
78 }
79 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
80 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
82 fn stop_batch_update(&mut self);
84 fn await_partial(&self) -> bool;
86 fn set_partial(&mut self, partial_bar: Bar);
89 fn stop(&mut self) {}
91}
92
93impl dyn BarAggregator {
94 pub fn as_any(&self) -> &dyn Any {
96 self
97 }
98 pub fn as_any_mut(&mut self) -> &mut dyn Any {
100 self
101 }
102}
103
104#[derive(Debug)]
106pub struct BarBuilder {
107 bar_type: BarType,
108 price_precision: u8,
109 size_precision: u8,
110 initialized: bool,
111 ts_last: UnixNanos,
112 count: usize,
113 partial_set: bool,
114 last_close: Option<Price>,
115 open: Option<Price>,
116 high: Option<Price>,
117 low: Option<Price>,
118 close: Option<Price>,
119 volume: Quantity,
120}
121
122impl BarBuilder {
123 #[must_use]
131 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
132 correctness::check_equal(
133 &bar_type.aggregation_source(),
134 &AggregationSource::Internal,
135 "bar_type.aggregation_source",
136 "AggregationSource::Internal",
137 )
138 .expect(FAILED);
139
140 Self {
141 bar_type,
142 price_precision,
143 size_precision,
144 initialized: false,
145 ts_last: UnixNanos::default(),
146 count: 0,
147 partial_set: false,
148 last_close: None,
149 open: None,
150 high: None,
151 low: None,
152 close: None,
153 volume: Quantity::zero(size_precision),
154 }
155 }
156
157 pub fn set_partial(&mut self, partial_bar: Bar) {
163 if self.partial_set {
164 return; }
166
167 self.open = Some(partial_bar.open);
168
169 if self.high.is_none() || partial_bar.high > self.high.unwrap() {
170 self.high = Some(partial_bar.high);
171 }
172
173 if self.low.is_none() || partial_bar.low < self.low.unwrap() {
174 self.low = Some(partial_bar.low);
175 }
176
177 if self.close.is_none() {
178 self.close = Some(partial_bar.close);
179 }
180
181 self.volume = partial_bar.volume;
182
183 if self.ts_last == 0 {
184 self.ts_last = partial_bar.ts_init;
185 }
186
187 self.partial_set = true;
188 self.initialized = true;
189 }
190
191 pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
197 if ts_event < self.ts_last {
198 return; }
200
201 if self.open.is_none() {
202 self.open = Some(price);
203 self.high = Some(price);
204 self.low = Some(price);
205 self.initialized = true;
206 } else {
207 if price > self.high.unwrap() {
208 self.high = Some(price);
209 }
210 if price < self.low.unwrap() {
211 self.low = Some(price);
212 }
213 }
214
215 self.close = Some(price);
216 self.volume = self.volume.add(size);
217 self.count += 1;
218 self.ts_last = ts_event;
219 }
220
221 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
227 if ts_init < self.ts_last {
228 return; }
230
231 if self.open.is_none() {
232 self.open = Some(bar.open);
233 self.high = Some(bar.high);
234 self.low = Some(bar.low);
235 self.initialized = true;
236 } else {
237 if bar.high > self.high.unwrap() {
238 self.high = Some(bar.high);
239 }
240 if bar.low < self.low.unwrap() {
241 self.low = Some(bar.low);
242 }
243 }
244
245 self.close = Some(bar.close);
246 self.volume = self.volume.add(volume);
247 self.count += 1;
248 self.ts_last = ts_init;
249 }
250
251 pub fn reset(&mut self) {
255 self.open = None;
256 self.high = None;
257 self.low = None;
258 self.volume = Quantity::zero(self.size_precision);
259 self.count = 0;
260 }
261
262 pub fn build_now(&mut self) -> Bar {
264 self.build(self.ts_last, self.ts_last)
265 }
266
267 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
273 if self.open.is_none() {
274 self.open = self.last_close;
275 self.high = self.last_close;
276 self.low = self.last_close;
277 self.close = self.last_close;
278 }
279
280 if let (Some(close), Some(low)) = (self.close, self.low)
281 && close < low
282 {
283 self.low = Some(close);
284 }
285
286 if let (Some(close), Some(high)) = (self.close, self.high)
287 && close > high
288 {
289 self.high = Some(close);
290 }
291
292 let bar = Bar::new(
294 self.bar_type,
295 self.open.unwrap(),
296 self.high.unwrap(),
297 self.low.unwrap(),
298 self.close.unwrap(),
299 self.volume,
300 ts_event,
301 ts_init,
302 );
303
304 self.last_close = self.close;
305 self.reset();
306 bar
307 }
308}
309
310pub struct BarAggregatorCore<H>
312where
313 H: FnMut(Bar),
314{
315 bar_type: BarType,
316 builder: BarBuilder,
317 handler: H,
318 handler_backup: Option<H>,
319 batch_handler: Option<Box<dyn FnMut(Bar)>>,
320 await_partial: bool,
321 is_running: bool,
322 batch_mode: bool,
323}
324
325impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 f.debug_struct(stringify!(BarAggregatorCore))
328 .field("bar_type", &self.bar_type)
329 .field("builder", &self.builder)
330 .field("await_partial", &self.await_partial)
331 .field("is_running", &self.is_running)
332 .field("batch_mode", &self.batch_mode)
333 .finish()
334 }
335}
336
337impl<H> BarAggregatorCore<H>
338where
339 H: FnMut(Bar),
340{
341 pub fn new(
349 bar_type: BarType,
350 price_precision: u8,
351 size_precision: u8,
352 handler: H,
353 await_partial: bool,
354 ) -> Self {
355 Self {
356 bar_type,
357 builder: BarBuilder::new(bar_type, price_precision, size_precision),
358 handler,
359 handler_backup: None,
360 batch_handler: None,
361 await_partial,
362 is_running: false,
363 batch_mode: false,
364 }
365 }
366
367 pub const fn set_await_partial(&mut self, value: bool) {
369 self.await_partial = value;
370 }
371
372 pub const fn set_is_running(&mut self, value: bool) {
374 self.is_running = value;
375 }
376
377 pub const fn await_partial(&self) -> bool {
379 self.await_partial
380 }
381
382 pub fn set_partial(&mut self, partial_bar: Bar) {
384 self.builder.set_partial(partial_bar);
385 }
386
387 fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
388 self.builder.update(price, size, ts_event);
389 }
390
391 fn build_now_and_send(&mut self) {
392 let bar = self.builder.build_now();
393 (self.handler)(bar);
394 }
395
396 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
397 let bar = self.builder.build(ts_event, ts_init);
398
399 if self.batch_mode {
400 if let Some(handler) = &mut self.batch_handler {
401 handler(bar);
402 }
403 } else {
404 (self.handler)(bar);
405 }
406 }
407
408 pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
410 self.batch_mode = true;
411 self.batch_handler = Some(handler);
412 }
413
414 pub fn stop_batch_update(&mut self) {
416 self.batch_mode = false;
417
418 if let Some(handler) = self.handler_backup.take() {
419 self.handler = handler;
420 }
421 }
422}
423
424pub struct TickBarAggregator<H>
429where
430 H: FnMut(Bar),
431{
432 core: BarAggregatorCore<H>,
433 cum_value: f64,
434}
435
436impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
437 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
438 f.debug_struct(stringify!(TickBarAggregator))
439 .field("core", &self.core)
440 .field("cum_value", &self.cum_value)
441 .finish()
442 }
443}
444
445impl<H> TickBarAggregator<H>
446where
447 H: FnMut(Bar),
448{
449 pub fn new(
457 bar_type: BarType,
458 price_precision: u8,
459 size_precision: u8,
460 handler: H,
461 await_partial: bool,
462 ) -> Self {
463 Self {
464 core: BarAggregatorCore::new(
465 bar_type,
466 price_precision,
467 size_precision,
468 handler,
469 await_partial,
470 ),
471 cum_value: 0.0,
472 }
473 }
474}
475
476impl<H> BarAggregator for TickBarAggregator<H>
477where
478 H: FnMut(Bar) + 'static,
479{
480 fn bar_type(&self) -> BarType {
481 self.core.bar_type
482 }
483
484 fn is_running(&self) -> bool {
485 self.core.is_running
486 }
487
488 fn set_await_partial(&mut self, value: bool) {
489 self.core.set_await_partial(value);
490 }
491
492 fn set_is_running(&mut self, value: bool) {
493 self.core.set_is_running(value);
494 }
495
496 fn await_partial(&self) -> bool {
497 self.core.await_partial()
498 }
499
500 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
502 self.core.apply_update(price, size, ts_event);
503 let spec = self.core.bar_type.spec();
504
505 if self.core.builder.count >= spec.step.get() {
506 self.core.build_now_and_send();
507 }
508 }
509
510 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
511 let mut volume_update = volume;
512 let average_price = Price::new(
513 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
514 self.core.builder.price_precision,
515 );
516
517 while volume_update.as_f64() > 0.0 {
518 let value_update = average_price.as_f64() * volume_update.as_f64();
519 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
520 self.cum_value += value_update;
521 self.core.builder.update_bar(bar, volume_update, ts_init);
522 break;
523 }
524
525 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
526 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
527 self.core.builder.update_bar(
528 bar,
529 Quantity::new(volume_diff, volume_update.precision),
530 ts_init,
531 );
532
533 self.core.build_now_and_send();
534 self.cum_value = 0.0;
535 volume_update = Quantity::new(
536 volume_update.as_f64() - volume_diff,
537 volume_update.precision,
538 );
539 }
540 }
541
542 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
543 self.core.start_batch_update(handler);
544 }
545
546 fn stop_batch_update(&mut self) {
547 self.core.stop_batch_update();
548 }
549
550 fn set_partial(&mut self, partial_bar: Bar) {
551 self.core.set_partial(partial_bar);
552 }
553}
554
555pub struct VolumeBarAggregator<H>
557where
558 H: FnMut(Bar),
559{
560 core: BarAggregatorCore<H>,
561}
562
563impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
564 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
565 f.debug_struct(stringify!(VolumeBarAggregator))
566 .field("core", &self.core)
567 .finish()
568 }
569}
570
571impl<H> VolumeBarAggregator<H>
572where
573 H: FnMut(Bar),
574{
575 pub fn new(
583 bar_type: BarType,
584 price_precision: u8,
585 size_precision: u8,
586 handler: H,
587 await_partial: bool,
588 ) -> Self {
589 Self {
590 core: BarAggregatorCore::new(
591 bar_type.standard(),
592 price_precision,
593 size_precision,
594 handler,
595 await_partial,
596 ),
597 }
598 }
599}
600
601impl<H> BarAggregator for VolumeBarAggregator<H>
602where
603 H: FnMut(Bar) + 'static,
604{
605 fn bar_type(&self) -> BarType {
606 self.core.bar_type
607 }
608
609 fn is_running(&self) -> bool {
610 self.core.is_running
611 }
612
613 fn set_await_partial(&mut self, value: bool) {
614 self.core.set_await_partial(value);
615 }
616
617 fn set_is_running(&mut self, value: bool) {
618 self.core.set_is_running(value);
619 }
620
621 fn await_partial(&self) -> bool {
622 self.core.await_partial()
623 }
624
625 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
627 let mut raw_size_update = size.raw;
628 let spec = self.core.bar_type.spec();
629 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
630
631 while raw_size_update > 0 {
632 if self.core.builder.volume.raw + raw_size_update < raw_step {
633 self.core.apply_update(
634 price,
635 Quantity::from_raw(raw_size_update, size.precision),
636 ts_event,
637 );
638 break;
639 }
640
641 let raw_size_diff = raw_step - self.core.builder.volume.raw;
642 self.core.apply_update(
643 price,
644 Quantity::from_raw(raw_size_diff, size.precision),
645 ts_event,
646 );
647
648 self.core.build_now_and_send();
649 raw_size_update -= raw_size_diff;
650 }
651 }
652
653 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
654 let mut raw_volume_update = volume.raw;
655 let spec = self.core.bar_type.spec();
656 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
657
658 while raw_volume_update > 0 {
659 if self.core.builder.volume.raw + raw_volume_update < raw_step {
660 self.core.builder.update_bar(
661 bar,
662 Quantity::from_raw(raw_volume_update, volume.precision),
663 ts_init,
664 );
665 break;
666 }
667
668 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
669 self.core.builder.update_bar(
670 bar,
671 Quantity::from_raw(raw_volume_diff, volume.precision),
672 ts_init,
673 );
674
675 self.core.build_now_and_send();
676 raw_volume_update -= raw_volume_diff;
677 }
678 }
679
680 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
681 self.core.start_batch_update(handler);
682 }
683
684 fn stop_batch_update(&mut self) {
685 self.core.stop_batch_update();
686 }
687
688 fn set_partial(&mut self, partial_bar: Bar) {
689 self.core.set_partial(partial_bar);
690 }
691}
692
693pub struct ValueBarAggregator<H>
698where
699 H: FnMut(Bar),
700{
701 core: BarAggregatorCore<H>,
702 cum_value: f64,
703}
704
705impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707 f.debug_struct(stringify!(ValueBarAggregator))
708 .field("core", &self.core)
709 .field("cum_value", &self.cum_value)
710 .finish()
711 }
712}
713
714impl<H> ValueBarAggregator<H>
715where
716 H: FnMut(Bar),
717{
718 pub fn new(
726 bar_type: BarType,
727 price_precision: u8,
728 size_precision: u8,
729 handler: H,
730 await_partial: bool,
731 ) -> Self {
732 Self {
733 core: BarAggregatorCore::new(
734 bar_type.standard(),
735 price_precision,
736 size_precision,
737 handler,
738 await_partial,
739 ),
740 cum_value: 0.0,
741 }
742 }
743
744 #[must_use]
745 pub const fn get_cumulative_value(&self) -> f64 {
747 self.cum_value
748 }
749}
750
751impl<H> BarAggregator for ValueBarAggregator<H>
752where
753 H: FnMut(Bar) + 'static,
754{
755 fn bar_type(&self) -> BarType {
756 self.core.bar_type
757 }
758
759 fn is_running(&self) -> bool {
760 self.core.is_running
761 }
762
763 fn set_await_partial(&mut self, value: bool) {
764 self.core.set_await_partial(value);
765 }
766
767 fn set_is_running(&mut self, value: bool) {
768 self.core.set_is_running(value);
769 }
770
771 fn await_partial(&self) -> bool {
772 self.core.await_partial()
773 }
774
775 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
777 let mut size_update = size.as_f64();
778 let spec = self.core.bar_type.spec();
779
780 while size_update > 0.0 {
781 let value_update = price.as_f64() * size_update;
782 if self.cum_value + value_update < spec.step.get() as f64 {
783 self.cum_value += value_update;
784 self.core
785 .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
786 break;
787 }
788
789 let value_diff = spec.step.get() as f64 - self.cum_value;
790 let size_diff = size_update * (value_diff / value_update);
791 self.core
792 .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
793
794 self.core.build_now_and_send();
795 self.cum_value = 0.0;
796 size_update -= size_diff;
797 }
798 }
799
800 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
801 let mut volume_update = volume;
802 let average_price = Price::new(
803 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
804 self.core.builder.price_precision,
805 );
806
807 while volume_update.as_f64() > 0.0 {
808 let value_update = average_price.as_f64() * volume_update.as_f64();
809 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
810 self.cum_value += value_update;
811 self.core.builder.update_bar(bar, volume_update, ts_init);
812 break;
813 }
814
815 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
816 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
817 self.core.builder.update_bar(
818 bar,
819 Quantity::new(volume_diff, volume_update.precision),
820 ts_init,
821 );
822
823 self.core.build_now_and_send();
824 self.cum_value = 0.0;
825 volume_update = Quantity::new(
826 volume_update.as_f64() - volume_diff,
827 volume_update.precision,
828 );
829 }
830 }
831
832 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
833 self.core.start_batch_update(handler);
834 }
835
836 fn stop_batch_update(&mut self) {
837 self.core.stop_batch_update();
838 }
839
840 fn set_partial(&mut self, partial_bar: Bar) {
841 self.core.set_partial(partial_bar);
842 }
843}
844
845pub struct TimeBarAggregator<H>
849where
850 H: FnMut(Bar),
851{
852 core: BarAggregatorCore<H>,
853 clock: Rc<RefCell<dyn Clock>>,
854 build_with_no_updates: bool,
855 timestamp_on_close: bool,
856 is_left_open: bool,
857 build_on_next_tick: bool,
858 stored_open_ns: UnixNanos,
859 stored_close_ns: UnixNanos,
860 timer_name: String,
861 interval_ns: UnixNanos,
862 next_close_ns: UnixNanos,
863 bar_build_delay: u64,
864 batch_open_ns: UnixNanos,
865 batch_next_close_ns: UnixNanos,
866 time_bars_origin_offset: Option<TimeDelta>,
867 skip_first_non_full_bar: bool,
868}
869
870impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
871 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
872 f.debug_struct(stringify!(TimeBarAggregator))
873 .field("core", &self.core)
874 .field("build_with_no_updates", &self.build_with_no_updates)
875 .field("timestamp_on_close", &self.timestamp_on_close)
876 .field("is_left_open", &self.is_left_open)
877 .field("timer_name", &self.timer_name)
878 .field("interval_ns", &self.interval_ns)
879 .field("bar_build_delay", &self.bar_build_delay)
880 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
881 .finish()
882 }
883}
884
885#[derive(Clone, Debug)]
886pub struct NewBarCallback<H: FnMut(Bar)> {
893 aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
894}
895
896impl<H: FnMut(Bar)> NewBarCallback<H> {
897 #[must_use]
899 pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
900 Self { aggregator }
901 }
902}
903
904impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
905 fn from(value: NewBarCallback<H>) -> Self {
906 Self::Rust(Rc::new(move |event: TimeEvent| {
907 value.aggregator.borrow_mut().build_bar(event);
908 }))
909 }
910}
911
912impl<H> TimeBarAggregator<H>
913where
914 H: FnMut(Bar) + 'static,
915{
916 #[allow(clippy::too_many_arguments)]
924 pub fn new(
925 bar_type: BarType,
926 price_precision: u8,
927 size_precision: u8,
928 clock: Rc<RefCell<dyn Clock>>,
929 handler: H,
930 await_partial: bool,
931 build_with_no_updates: bool,
932 timestamp_on_close: bool,
933 interval_type: BarIntervalType,
934 time_bars_origin_offset: Option<TimeDelta>,
935 bar_build_delay: u64,
936 skip_first_non_full_bar: bool,
937 ) -> Self {
938 let is_left_open = match interval_type {
939 BarIntervalType::LeftOpen => true,
940 BarIntervalType::RightOpen => false,
941 };
942
943 let core = BarAggregatorCore::new(
944 bar_type.standard(),
945 price_precision,
946 size_precision,
947 handler,
948 await_partial,
949 );
950
951 Self {
952 core,
953 clock,
954 build_with_no_updates,
955 timestamp_on_close,
956 is_left_open,
957 build_on_next_tick: false,
958 stored_open_ns: UnixNanos::default(),
959 stored_close_ns: UnixNanos::default(),
960 timer_name: bar_type.to_string(),
961 interval_ns: get_bar_interval_ns(&bar_type),
962 next_close_ns: UnixNanos::default(),
963 bar_build_delay,
964 batch_open_ns: UnixNanos::default(),
965 batch_next_close_ns: UnixNanos::default(),
966 time_bars_origin_offset,
967 skip_first_non_full_bar,
968 }
969 }
970
971 pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
981 let now = self.clock.borrow().utc_now();
982 let mut start_time =
983 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
984
985 if start_time == now {
986 self.skip_first_non_full_bar = false;
987 }
988
989 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
990
991 let spec = &self.bar_type().spec();
992 let start_time_ns = UnixNanos::from(start_time);
993
994 if spec.aggregation == BarAggregation::Month {
995 let step = spec.step.get() as u32;
996 let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
997
998 self.clock
999 .borrow_mut()
1000 .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1001 .expect(FAILED);
1002 } else {
1003 self.clock
1004 .borrow_mut()
1005 .set_timer_ns(
1006 &self.timer_name,
1007 self.interval_ns.as_u64(),
1008 Some(start_time_ns),
1009 None,
1010 Some(callback.into()),
1011 None,
1012 None,
1013 )
1014 .expect(FAILED);
1015 }
1016
1017 log::debug!("Started timer {}", self.timer_name);
1018 Ok(())
1019 }
1020
1021 pub fn stop(&mut self) {
1023 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1024 }
1025
1026 pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1032 let spec = self.bar_type().spec();
1033 self.core.batch_mode = true;
1034
1035 let time = time_ns.to_datetime_utc();
1036 let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
1037 self.batch_open_ns = UnixNanos::from(start_time);
1038
1039 if spec.aggregation == BarAggregation::Month {
1040 let step = spec.step.get() as u32;
1041
1042 if self.batch_open_ns == time_ns {
1043 self.batch_open_ns =
1044 subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1045 }
1046
1047 self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1048 } else {
1049 if self.batch_open_ns == time_ns {
1050 self.batch_open_ns -= self.interval_ns;
1051 }
1052
1053 self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1054 }
1055 }
1056
1057 const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1058 if self.is_left_open {
1059 if self.timestamp_on_close {
1060 close_ns
1061 } else {
1062 open_ns
1063 }
1064 } else {
1065 open_ns
1066 }
1067 }
1068
1069 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1070 if self.skip_first_non_full_bar {
1071 self.core.builder.reset();
1072 self.skip_first_non_full_bar = false;
1073 } else {
1074 self.core.build_and_send(ts_event, ts_init);
1075 }
1076 }
1077
1078 fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1079 if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1080 let ts_init = self.batch_next_close_ns;
1081 let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1082 self.build_and_send(ts_event, ts_init);
1083 }
1084 }
1085
1086 fn batch_post_update(&mut self, time_ns: UnixNanos) {
1087 let step = self.bar_type().spec().step.get() as u32;
1088
1089 if !self.core.batch_mode
1091 && time_ns == self.batch_next_close_ns
1092 && time_ns > self.stored_open_ns
1093 {
1094 self.batch_next_close_ns = UnixNanos::default();
1095 return;
1096 }
1097
1098 if time_ns > self.batch_next_close_ns {
1099 if self.bar_type().spec().aggregation == BarAggregation::Month {
1101 while self.batch_next_close_ns < time_ns {
1102 self.batch_next_close_ns =
1103 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1104 }
1105
1106 self.batch_open_ns =
1107 subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1108 } else {
1109 while self.batch_next_close_ns < time_ns {
1110 self.batch_next_close_ns += self.interval_ns;
1111 }
1112
1113 self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1114 }
1115 }
1116
1117 if time_ns == self.batch_next_close_ns {
1118 let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1119 self.build_and_send(ts_event, time_ns);
1120 self.batch_open_ns = self.batch_next_close_ns;
1121
1122 if self.bar_type().spec().aggregation == BarAggregation::Month {
1123 self.batch_next_close_ns =
1124 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1125 } else {
1126 self.batch_next_close_ns += self.interval_ns;
1127 }
1128 }
1129
1130 if !self.core.batch_mode {
1132 self.batch_next_close_ns = UnixNanos::default();
1133 }
1134 }
1135
1136 fn build_bar(&mut self, event: TimeEvent) {
1137 if !self.core.builder.initialized {
1138 self.build_on_next_tick = true;
1139 self.stored_close_ns = self.next_close_ns;
1140 return;
1141 }
1142
1143 if !self.build_with_no_updates && self.core.builder.count == 0 {
1144 return;
1145 }
1146
1147 let ts_init = event.ts_event;
1148 let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1149 self.build_and_send(ts_event, ts_init);
1150
1151 self.stored_open_ns = ts_init;
1152
1153 if self.bar_type().spec().aggregation == BarAggregation::Month {
1154 let step = self.bar_type().spec().step.get() as u32;
1155 let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1156
1157 self.clock
1158 .borrow_mut()
1159 .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1160 .expect(FAILED);
1161
1162 self.next_close_ns = next_alert_ns;
1163 } else {
1164 self.next_close_ns = self
1165 .clock
1166 .borrow()
1167 .next_time_ns(&self.timer_name)
1168 .unwrap_or_default();
1169 }
1170 }
1171}
1172
1173impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1174where
1175 H: FnMut(Bar) + 'static,
1176{
1177 fn bar_type(&self) -> BarType {
1178 self.core.bar_type
1179 }
1180
1181 fn is_running(&self) -> bool {
1182 self.core.is_running
1183 }
1184
1185 fn set_await_partial(&mut self, value: bool) {
1186 self.core.set_await_partial(value);
1187 }
1188
1189 fn set_is_running(&mut self, value: bool) {
1190 self.core.set_is_running(value);
1191 }
1192
1193 fn await_partial(&self) -> bool {
1194 self.core.await_partial()
1195 }
1196 fn stop(&mut self) {
1198 Self::stop(self);
1199 }
1200
1201 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1202 if self.batch_next_close_ns != UnixNanos::default() {
1203 self.batch_pre_update(ts_event);
1204 }
1205
1206 self.core.apply_update(price, size, ts_event);
1207
1208 if self.build_on_next_tick {
1209 if ts_event <= self.stored_close_ns {
1210 let ts_init = ts_event;
1211 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1212 self.build_and_send(ts_event, ts_init);
1213 }
1214
1215 self.build_on_next_tick = false;
1216 self.stored_close_ns = UnixNanos::default();
1217 }
1218
1219 if self.batch_next_close_ns != UnixNanos::default() {
1220 self.batch_post_update(ts_event);
1221 }
1222 }
1223
1224 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1225 if self.batch_next_close_ns != UnixNanos::default() {
1226 self.batch_pre_update(ts_init);
1227 }
1228
1229 self.core.builder.update_bar(bar, volume, ts_init);
1230
1231 if self.build_on_next_tick {
1232 if ts_init <= self.stored_close_ns {
1233 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1234 self.build_and_send(ts_event, ts_init);
1235 }
1236
1237 self.build_on_next_tick = false;
1239 self.stored_close_ns = UnixNanos::default();
1240 }
1241
1242 if self.batch_next_close_ns != UnixNanos::default() {
1243 self.batch_post_update(ts_init);
1244 }
1245 }
1246
1247 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1248 self.core.start_batch_update(handler);
1249 self.start_batch_time(time_ns);
1250 }
1251
1252 fn stop_batch_update(&mut self) {
1253 self.core.stop_batch_update();
1254 }
1255
1256 fn set_partial(&mut self, partial_bar: Bar) {
1257 self.core.set_partial(partial_bar);
1258 }
1259}
1260
1261#[cfg(test)]
1265mod tests {
1266 use std::sync::{Arc, Mutex};
1267
1268 use nautilus_common::clock::TestClock;
1269 use nautilus_core::UUID4;
1270 use nautilus_model::{
1271 data::{BarSpecification, BarType},
1272 enums::{AggregationSource, BarAggregation, PriceType},
1273 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1274 types::{Price, Quantity},
1275 };
1276 use rstest::rstest;
1277 use ustr::Ustr;
1278
1279 use super::*;
1280
1281 #[rstest]
1282 fn test_bar_builder_initialization(equity_aapl: Equity) {
1283 let instrument = InstrumentAny::Equity(equity_aapl);
1284 let bar_type = BarType::new(
1285 instrument.id(),
1286 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1287 AggregationSource::Internal,
1288 );
1289 let builder = BarBuilder::new(
1290 bar_type,
1291 instrument.price_precision(),
1292 instrument.size_precision(),
1293 );
1294
1295 assert!(!builder.initialized);
1296 assert_eq!(builder.ts_last, 0);
1297 assert_eq!(builder.count, 0);
1298 }
1299
1300 #[rstest]
1301 fn test_set_partial_update(equity_aapl: Equity) {
1302 let instrument = InstrumentAny::Equity(equity_aapl);
1303 let bar_type = BarType::new(
1304 instrument.id(),
1305 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1306 AggregationSource::Internal,
1307 );
1308 let mut builder = BarBuilder::new(
1309 bar_type,
1310 instrument.price_precision(),
1311 instrument.size_precision(),
1312 );
1313
1314 let partial_bar = Bar::new(
1315 bar_type,
1316 Price::from("101.00"),
1317 Price::from("102.00"),
1318 Price::from("100.00"),
1319 Price::from("101.00"),
1320 Quantity::from(100),
1321 UnixNanos::from(1),
1322 UnixNanos::from(2),
1323 );
1324
1325 builder.set_partial(partial_bar);
1326 let bar = builder.build_now();
1327
1328 assert_eq!(bar.open, partial_bar.open);
1329 assert_eq!(bar.high, partial_bar.high);
1330 assert_eq!(bar.low, partial_bar.low);
1331 assert_eq!(bar.close, partial_bar.close);
1332 assert_eq!(bar.volume, partial_bar.volume);
1333 assert_eq!(builder.ts_last, 2);
1334 }
1335
1336 #[rstest]
1337 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1338 let instrument = InstrumentAny::Equity(equity_aapl);
1339 let bar_type = BarType::new(
1340 instrument.id(),
1341 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1342 AggregationSource::Internal,
1343 );
1344 let mut builder = BarBuilder::new(
1345 bar_type,
1346 instrument.price_precision(),
1347 instrument.size_precision(),
1348 );
1349
1350 builder.update(
1351 Price::from("100.00"),
1352 Quantity::from(1),
1353 UnixNanos::from(1000),
1354 );
1355 builder.update(
1356 Price::from("95.00"),
1357 Quantity::from(1),
1358 UnixNanos::from(2000),
1359 );
1360 builder.update(
1361 Price::from("105.00"),
1362 Quantity::from(1),
1363 UnixNanos::from(3000),
1364 );
1365
1366 let bar = builder.build_now();
1367 assert!(bar.high > bar.low);
1368 assert_eq!(bar.open, Price::from("100.00"));
1369 assert_eq!(bar.high, Price::from("105.00"));
1370 assert_eq!(bar.low, Price::from("95.00"));
1371 assert_eq!(bar.close, Price::from("105.00"));
1372 }
1373
1374 #[rstest]
1375 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1376 let instrument = InstrumentAny::Equity(equity_aapl);
1377 let bar_type = BarType::new(
1378 instrument.id(),
1379 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1380 AggregationSource::Internal,
1381 );
1382 let mut builder = BarBuilder::new(
1383 bar_type,
1384 instrument.price_precision(),
1385 instrument.size_precision(),
1386 );
1387
1388 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1389 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1390
1391 assert_eq!(builder.ts_last, 1_000);
1392 assert_eq!(builder.count, 1);
1393 }
1394
1395 #[rstest]
1396 fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1397 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1398 let bar_type = BarType::new(
1399 instrument.id(),
1400 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1401 AggregationSource::Internal,
1402 );
1403 let mut builder = BarBuilder::new(
1404 bar_type,
1405 instrument.price_precision(),
1406 instrument.size_precision(),
1407 );
1408
1409 let partial_bar = Bar::new(
1410 bar_type,
1411 Price::from("1.00001"),
1412 Price::from("1.00010"),
1413 Price::from("1.00000"),
1414 Price::from("1.00002"),
1415 Quantity::from(1),
1416 UnixNanos::from(1_000_000_000),
1417 UnixNanos::from(2_000_000_000),
1418 );
1419
1420 builder.set_partial(partial_bar);
1421 let bar = builder.build_now();
1422
1423 assert_eq!(bar.open, Price::from("1.00001"));
1424 assert_eq!(bar.high, Price::from("1.00010"));
1425 assert_eq!(bar.low, Price::from("1.00000"));
1426 assert_eq!(bar.close, Price::from("1.00002"));
1427 assert_eq!(bar.volume, Quantity::from(1));
1428 assert_eq!(bar.ts_init, 2_000_000_000);
1429 assert_eq!(builder.ts_last, 2_000_000_000);
1430 }
1431
1432 #[rstest]
1433 fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1434 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1435 let bar_type = BarType::new(
1436 instrument.id(),
1437 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1438 AggregationSource::Internal,
1439 );
1440 let mut builder = BarBuilder::new(
1441 bar_type,
1442 instrument.price_precision(),
1443 instrument.size_precision(),
1444 );
1445
1446 let partial_bar1 = Bar::new(
1447 bar_type,
1448 Price::from("1.00001"),
1449 Price::from("1.00010"),
1450 Price::from("1.00000"),
1451 Price::from("1.00002"),
1452 Quantity::from(1),
1453 UnixNanos::from(1_000_000_000),
1454 UnixNanos::from(1_000_000_000),
1455 );
1456
1457 let partial_bar2 = Bar::new(
1458 bar_type,
1459 Price::from("2.00001"),
1460 Price::from("2.00010"),
1461 Price::from("2.00000"),
1462 Price::from("2.00002"),
1463 Quantity::from(2),
1464 UnixNanos::from(3_000_000_000),
1465 UnixNanos::from(3_000_000_000),
1466 );
1467
1468 builder.set_partial(partial_bar1);
1469 builder.set_partial(partial_bar2);
1470 let bar = builder.build(
1471 UnixNanos::from(4_000_000_000),
1472 UnixNanos::from(4_000_000_000),
1473 );
1474
1475 assert_eq!(bar.open, Price::from("1.00001"));
1476 assert_eq!(bar.high, Price::from("1.00010"));
1477 assert_eq!(bar.low, Price::from("1.00000"));
1478 assert_eq!(bar.close, Price::from("1.00002"));
1479 assert_eq!(bar.volume, Quantity::from(1));
1480 assert_eq!(bar.ts_init, 4_000_000_000);
1481 assert_eq!(builder.ts_last, 1_000_000_000);
1482 }
1483
1484 #[rstest]
1485 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1486 let instrument = InstrumentAny::Equity(equity_aapl);
1487 let bar_type = BarType::new(
1488 instrument.id(),
1489 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1490 AggregationSource::Internal,
1491 );
1492 let mut builder = BarBuilder::new(
1493 bar_type,
1494 instrument.price_precision(),
1495 instrument.size_precision(),
1496 );
1497
1498 builder.update(
1499 Price::from("1.00000"),
1500 Quantity::from(1),
1501 UnixNanos::default(),
1502 );
1503
1504 assert!(builder.initialized);
1505 assert_eq!(builder.ts_last, 0);
1506 assert_eq!(builder.count, 1);
1507 }
1508
1509 #[rstest]
1510 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1511 equity_aapl: Equity,
1512 ) {
1513 let instrument = InstrumentAny::Equity(equity_aapl);
1514 let bar_type = BarType::new(
1515 instrument.id(),
1516 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1517 AggregationSource::Internal,
1518 );
1519 let mut builder = BarBuilder::new(bar_type, 2, 0);
1520
1521 builder.update(
1522 Price::from("1.00000"),
1523 Quantity::from(1),
1524 UnixNanos::from(1_000),
1525 );
1526 builder.update(
1527 Price::from("1.00001"),
1528 Quantity::from(1),
1529 UnixNanos::from(500),
1530 );
1531
1532 assert!(builder.initialized);
1533 assert_eq!(builder.ts_last, 1_000);
1534 assert_eq!(builder.count, 1);
1535 }
1536
1537 #[rstest]
1538 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1539 let instrument = InstrumentAny::Equity(equity_aapl);
1540 let bar_type = BarType::new(
1541 instrument.id(),
1542 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1543 AggregationSource::Internal,
1544 );
1545 let mut builder = BarBuilder::new(
1546 bar_type,
1547 instrument.price_precision(),
1548 instrument.size_precision(),
1549 );
1550
1551 for _ in 0..5 {
1552 builder.update(
1553 Price::from("1.00000"),
1554 Quantity::from(1),
1555 UnixNanos::from(1_000),
1556 );
1557 }
1558
1559 assert_eq!(builder.count, 5);
1560 }
1561
1562 #[rstest]
1563 #[should_panic]
1564 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1565 let instrument = InstrumentAny::Equity(equity_aapl);
1566 let bar_type = BarType::new(
1567 instrument.id(),
1568 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1569 AggregationSource::Internal,
1570 );
1571 let mut builder = BarBuilder::new(
1572 bar_type,
1573 instrument.price_precision(),
1574 instrument.size_precision(),
1575 );
1576 let _ = builder.build_now();
1577 }
1578
1579 #[rstest]
1580 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1581 let instrument = InstrumentAny::Equity(equity_aapl);
1582 let bar_type = BarType::new(
1583 instrument.id(),
1584 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1585 AggregationSource::Internal,
1586 );
1587 let mut builder = BarBuilder::new(
1588 bar_type,
1589 instrument.price_precision(),
1590 instrument.size_precision(),
1591 );
1592
1593 builder.update(
1594 Price::from("1.00001"),
1595 Quantity::from(2),
1596 UnixNanos::default(),
1597 );
1598 builder.update(
1599 Price::from("1.00002"),
1600 Quantity::from(2),
1601 UnixNanos::default(),
1602 );
1603 builder.update(
1604 Price::from("1.00000"),
1605 Quantity::from(1),
1606 UnixNanos::from(1_000_000_000),
1607 );
1608
1609 let bar = builder.build_now();
1610
1611 assert_eq!(bar.open, Price::from("1.00001"));
1612 assert_eq!(bar.high, Price::from("1.00002"));
1613 assert_eq!(bar.low, Price::from("1.00000"));
1614 assert_eq!(bar.close, Price::from("1.00000"));
1615 assert_eq!(bar.volume, Quantity::from(5));
1616 assert_eq!(bar.ts_init, 1_000_000_000);
1617 assert_eq!(builder.ts_last, 1_000_000_000);
1618 assert_eq!(builder.count, 0);
1619 }
1620
1621 #[rstest]
1622 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1623 let instrument = InstrumentAny::Equity(equity_aapl);
1624 let bar_type = BarType::new(
1625 instrument.id(),
1626 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1627 AggregationSource::Internal,
1628 );
1629 let mut builder = BarBuilder::new(bar_type, 2, 0);
1630
1631 builder.update(
1632 Price::from("1.00001"),
1633 Quantity::from(1),
1634 UnixNanos::default(),
1635 );
1636 builder.build_now();
1637
1638 builder.update(
1639 Price::from("1.00000"),
1640 Quantity::from(1),
1641 UnixNanos::default(),
1642 );
1643 builder.update(
1644 Price::from("1.00003"),
1645 Quantity::from(1),
1646 UnixNanos::default(),
1647 );
1648 builder.update(
1649 Price::from("1.00002"),
1650 Quantity::from(1),
1651 UnixNanos::default(),
1652 );
1653
1654 let bar = builder.build_now();
1655
1656 assert_eq!(bar.open, Price::from("1.00000"));
1657 assert_eq!(bar.high, Price::from("1.00003"));
1658 assert_eq!(bar.low, Price::from("1.00000"));
1659 assert_eq!(bar.close, Price::from("1.00002"));
1660 assert_eq!(bar.volume, Quantity::from(3));
1661 }
1662
1663 #[rstest]
1664 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1665 let instrument = InstrumentAny::Equity(equity_aapl);
1666 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1667 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1668 let handler = Arc::new(Mutex::new(Vec::new()));
1669 let handler_clone = Arc::clone(&handler);
1670
1671 let mut aggregator = TickBarAggregator::new(
1672 bar_type,
1673 instrument.price_precision(),
1674 instrument.size_precision(),
1675 move |bar: Bar| {
1676 let mut handler_guard = handler_clone.lock().unwrap();
1677 handler_guard.push(bar);
1678 },
1679 false,
1680 );
1681
1682 let trade = TradeTick::default();
1683 aggregator.handle_trade(trade);
1684
1685 let handler_guard = handler.lock().unwrap();
1686 assert_eq!(handler_guard.len(), 0);
1687 }
1688
1689 #[rstest]
1690 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1691 let instrument = InstrumentAny::Equity(equity_aapl);
1692 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1693 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1694 let handler = Arc::new(Mutex::new(Vec::new()));
1695 let handler_clone = Arc::clone(&handler);
1696
1697 let mut aggregator = TickBarAggregator::new(
1698 bar_type,
1699 instrument.price_precision(),
1700 instrument.size_precision(),
1701 move |bar: Bar| {
1702 let mut handler_guard = handler_clone.lock().unwrap();
1703 handler_guard.push(bar);
1704 },
1705 false,
1706 );
1707
1708 let trade = TradeTick::default();
1709 aggregator.handle_trade(trade);
1710 aggregator.handle_trade(trade);
1711 aggregator.handle_trade(trade);
1712
1713 let handler_guard = handler.lock().unwrap();
1714 let bar = handler_guard.first().unwrap();
1715 assert_eq!(handler_guard.len(), 1);
1716 assert_eq!(bar.open, trade.price);
1717 assert_eq!(bar.high, trade.price);
1718 assert_eq!(bar.low, trade.price);
1719 assert_eq!(bar.close, trade.price);
1720 assert_eq!(bar.volume, Quantity::from(300000));
1721 assert_eq!(bar.ts_event, trade.ts_event);
1722 assert_eq!(bar.ts_init, trade.ts_init);
1723 }
1724
1725 #[rstest]
1726 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1727 let instrument = InstrumentAny::Equity(equity_aapl);
1728 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1729 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1730 let handler = Arc::new(Mutex::new(Vec::new()));
1731 let handler_clone = Arc::clone(&handler);
1732
1733 let mut aggregator = TickBarAggregator::new(
1734 bar_type,
1735 instrument.price_precision(),
1736 instrument.size_precision(),
1737 move |bar: Bar| {
1738 let mut handler_guard = handler_clone.lock().unwrap();
1739 handler_guard.push(bar);
1740 },
1741 false,
1742 );
1743
1744 aggregator.update(
1745 Price::from("1.00001"),
1746 Quantity::from(1),
1747 UnixNanos::default(),
1748 );
1749 aggregator.update(
1750 Price::from("1.00002"),
1751 Quantity::from(1),
1752 UnixNanos::from(1000),
1753 );
1754 aggregator.update(
1755 Price::from("1.00003"),
1756 Quantity::from(1),
1757 UnixNanos::from(2000),
1758 );
1759
1760 let handler_guard = handler.lock().unwrap();
1761 assert_eq!(handler_guard.len(), 1);
1762
1763 let bar = handler_guard.first().unwrap();
1764 assert_eq!(bar.open, Price::from("1.00001"));
1765 assert_eq!(bar.high, Price::from("1.00003"));
1766 assert_eq!(bar.low, Price::from("1.00001"));
1767 assert_eq!(bar.close, Price::from("1.00003"));
1768 assert_eq!(bar.volume, Quantity::from(3));
1769 }
1770
1771 #[rstest]
1772 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1773 let instrument = InstrumentAny::Equity(equity_aapl);
1774 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1775 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1776 let handler = Arc::new(Mutex::new(Vec::new()));
1777 let handler_clone = Arc::clone(&handler);
1778
1779 let mut aggregator = TickBarAggregator::new(
1780 bar_type,
1781 instrument.price_precision(),
1782 instrument.size_precision(),
1783 move |bar: Bar| {
1784 let mut handler_guard = handler_clone.lock().unwrap();
1785 handler_guard.push(bar);
1786 },
1787 false,
1788 );
1789
1790 aggregator.update(
1791 Price::from("1.00001"),
1792 Quantity::from(1),
1793 UnixNanos::default(),
1794 );
1795 aggregator.update(
1796 Price::from("1.00002"),
1797 Quantity::from(1),
1798 UnixNanos::from(1000),
1799 );
1800 aggregator.update(
1801 Price::from("1.00003"),
1802 Quantity::from(1),
1803 UnixNanos::from(2000),
1804 );
1805 aggregator.update(
1806 Price::from("1.00004"),
1807 Quantity::from(1),
1808 UnixNanos::from(3000),
1809 );
1810
1811 let handler_guard = handler.lock().unwrap();
1812 assert_eq!(handler_guard.len(), 2);
1813
1814 let bar1 = &handler_guard[0];
1815 assert_eq!(bar1.open, Price::from("1.00001"));
1816 assert_eq!(bar1.close, Price::from("1.00002"));
1817 assert_eq!(bar1.volume, Quantity::from(2));
1818
1819 let bar2 = &handler_guard[1];
1820 assert_eq!(bar2.open, Price::from("1.00003"));
1821 assert_eq!(bar2.close, Price::from("1.00004"));
1822 assert_eq!(bar2.volume, Quantity::from(2));
1823 }
1824
1825 #[rstest]
1826 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1827 let instrument = InstrumentAny::Equity(equity_aapl);
1828 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1829 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1830 let handler = Arc::new(Mutex::new(Vec::new()));
1831 let handler_clone = Arc::clone(&handler);
1832
1833 let mut aggregator = VolumeBarAggregator::new(
1834 bar_type,
1835 instrument.price_precision(),
1836 instrument.size_precision(),
1837 move |bar: Bar| {
1838 let mut handler_guard = handler_clone.lock().unwrap();
1839 handler_guard.push(bar);
1840 },
1841 false,
1842 );
1843
1844 aggregator.update(
1845 Price::from("1.00001"),
1846 Quantity::from(25),
1847 UnixNanos::default(),
1848 );
1849
1850 let handler_guard = handler.lock().unwrap();
1851 assert_eq!(handler_guard.len(), 2);
1852 let bar1 = &handler_guard[0];
1853 assert_eq!(bar1.volume, Quantity::from(10));
1854 let bar2 = &handler_guard[1];
1855 assert_eq!(bar2.volume, Quantity::from(10));
1856 }
1857
1858 #[rstest]
1859 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1860 let instrument = InstrumentAny::Equity(equity_aapl);
1861 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1863 let handler = Arc::new(Mutex::new(Vec::new()));
1864 let handler_clone = Arc::clone(&handler);
1865
1866 let mut aggregator = ValueBarAggregator::new(
1867 bar_type,
1868 instrument.price_precision(),
1869 instrument.size_precision(),
1870 move |bar: Bar| {
1871 let mut handler_guard = handler_clone.lock().unwrap();
1872 handler_guard.push(bar);
1873 },
1874 false,
1875 );
1876
1877 aggregator.update(
1879 Price::from("100.00"),
1880 Quantity::from(5),
1881 UnixNanos::default(),
1882 );
1883 aggregator.update(
1884 Price::from("100.00"),
1885 Quantity::from(5),
1886 UnixNanos::from(1000),
1887 );
1888
1889 let handler_guard = handler.lock().unwrap();
1890 assert_eq!(handler_guard.len(), 1);
1891 let bar = handler_guard.first().unwrap();
1892 assert_eq!(bar.volume, Quantity::from(10));
1893 }
1894
1895 #[rstest]
1896 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1897 let instrument = InstrumentAny::Equity(equity_aapl);
1898 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1899 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1900 let handler = Arc::new(Mutex::new(Vec::new()));
1901 let handler_clone = Arc::clone(&handler);
1902
1903 let mut aggregator = ValueBarAggregator::new(
1904 bar_type,
1905 instrument.price_precision(),
1906 instrument.size_precision(),
1907 move |bar: Bar| {
1908 let mut handler_guard = handler_clone.lock().unwrap();
1909 handler_guard.push(bar);
1910 },
1911 false,
1912 );
1913
1914 aggregator.update(
1916 Price::from("100.00"),
1917 Quantity::from(25),
1918 UnixNanos::default(),
1919 );
1920
1921 let handler_guard = handler.lock().unwrap();
1922 assert_eq!(handler_guard.len(), 2);
1923 let remaining_value = aggregator.get_cumulative_value();
1924 assert!(remaining_value < 1000.0); }
1926
1927 #[rstest]
1928 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1929 let instrument = InstrumentAny::Equity(equity_aapl);
1930 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1932 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1933 let handler = Arc::new(Mutex::new(Vec::new()));
1934 let handler_clone = Arc::clone(&handler);
1935 let clock = Rc::new(RefCell::new(TestClock::new()));
1936
1937 let mut aggregator = TimeBarAggregator::new(
1938 bar_type,
1939 instrument.price_precision(),
1940 instrument.size_precision(),
1941 clock.clone(),
1942 move |bar: Bar| {
1943 let mut handler_guard = handler_clone.lock().unwrap();
1944 handler_guard.push(bar);
1945 },
1946 false, true, false, BarIntervalType::LeftOpen,
1950 None, 15, false, );
1954
1955 aggregator.update(
1956 Price::from("100.00"),
1957 Quantity::from(1),
1958 UnixNanos::default(),
1959 );
1960
1961 let next_sec = UnixNanos::from(1_000_000_000);
1962 clock.borrow_mut().set_time(next_sec);
1963
1964 let event = TimeEvent::new(
1965 Ustr::from("1-SECOND-LAST"),
1966 UUID4::new(),
1967 next_sec,
1968 next_sec,
1969 );
1970 aggregator.build_bar(event);
1971
1972 let handler_guard = handler.lock().unwrap();
1973 assert_eq!(handler_guard.len(), 1);
1974 let bar = handler_guard.first().unwrap();
1975 assert_eq!(bar.ts_event, UnixNanos::default());
1976 assert_eq!(bar.ts_init, next_sec);
1977 }
1978
1979 #[rstest]
1980 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1981 let instrument = InstrumentAny::Equity(equity_aapl);
1982 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1983 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1984 let handler = Arc::new(Mutex::new(Vec::new()));
1985 let handler_clone = Arc::clone(&handler);
1986 let clock = Rc::new(RefCell::new(TestClock::new()));
1987
1988 let mut aggregator = TimeBarAggregator::new(
1989 bar_type,
1990 instrument.price_precision(),
1991 instrument.size_precision(),
1992 clock.clone(),
1993 move |bar: Bar| {
1994 let mut handler_guard = handler_clone.lock().unwrap();
1995 handler_guard.push(bar);
1996 },
1997 false, true, true, BarIntervalType::LeftOpen,
2001 None,
2002 15,
2003 false, );
2005
2006 aggregator.update(
2008 Price::from("100.00"),
2009 Quantity::from(1),
2010 UnixNanos::default(),
2011 );
2012
2013 let ts1 = UnixNanos::from(1_000_000_000);
2015 clock.borrow_mut().set_time(ts1);
2016 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2017 aggregator.build_bar(event);
2018
2019 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2021
2022 let ts2 = UnixNanos::from(2_000_000_000);
2024 clock.borrow_mut().set_time(ts2);
2025 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2026 aggregator.build_bar(event);
2027
2028 let handler_guard = handler.lock().unwrap();
2029 assert_eq!(handler_guard.len(), 2);
2030
2031 let bar1 = &handler_guard[0];
2032 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
2034 assert_eq!(bar1.close, Price::from("100.00"));
2035 let bar2 = &handler_guard[1];
2036 assert_eq!(bar2.ts_event, ts2);
2037 assert_eq!(bar2.ts_init, ts2);
2038 assert_eq!(bar2.close, Price::from("101.00"));
2039 }
2040
2041 #[rstest]
2042 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
2043 let instrument = InstrumentAny::Equity(equity_aapl);
2044 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2045 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2046 let handler = Arc::new(Mutex::new(Vec::new()));
2047 let handler_clone = Arc::clone(&handler);
2048 let clock = Rc::new(RefCell::new(TestClock::new()));
2049 let mut aggregator = TimeBarAggregator::new(
2050 bar_type,
2051 instrument.price_precision(),
2052 instrument.size_precision(),
2053 clock.clone(),
2054 move |bar: Bar| {
2055 let mut handler_guard = handler_clone.lock().unwrap();
2056 handler_guard.push(bar);
2057 },
2058 false, true, true, BarIntervalType::RightOpen,
2062 None,
2063 15,
2064 false, );
2066
2067 aggregator.update(
2069 Price::from("100.00"),
2070 Quantity::from(1),
2071 UnixNanos::default(),
2072 );
2073
2074 let ts1 = UnixNanos::from(1_000_000_000);
2076 clock.borrow_mut().set_time(ts1);
2077 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2078 aggregator.build_bar(event);
2079
2080 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2082
2083 let ts2 = UnixNanos::from(2_000_000_000);
2085 clock.borrow_mut().set_time(ts2);
2086 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2087 aggregator.build_bar(event);
2088
2089 let handler_guard = handler.lock().unwrap();
2090 assert_eq!(handler_guard.len(), 2);
2091
2092 let bar1 = &handler_guard[0];
2093 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
2095 assert_eq!(bar1.close, Price::from("100.00"));
2096
2097 let bar2 = &handler_guard[1];
2098 assert_eq!(bar2.ts_event, ts1);
2099 assert_eq!(bar2.ts_init, ts2);
2100 assert_eq!(bar2.close, Price::from("101.00"));
2101 }
2102
2103 #[rstest]
2104 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2105 let instrument = InstrumentAny::Equity(equity_aapl);
2106 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2107 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2108 let handler = Arc::new(Mutex::new(Vec::new()));
2109 let handler_clone = Arc::clone(&handler);
2110 let clock = Rc::new(RefCell::new(TestClock::new()));
2111
2112 let mut aggregator = TimeBarAggregator::new(
2114 bar_type,
2115 instrument.price_precision(),
2116 instrument.size_precision(),
2117 clock.clone(),
2118 move |bar: Bar| {
2119 let mut handler_guard = handler_clone.lock().unwrap();
2120 handler_guard.push(bar);
2121 },
2122 false, false, true, BarIntervalType::LeftOpen,
2126 None,
2127 15,
2128 false, );
2130
2131 let ts1 = UnixNanos::from(1_000_000_000);
2133 clock.borrow_mut().set_time(ts1);
2134 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2135 aggregator.build_bar(event);
2136
2137 let handler_guard = handler.lock().unwrap();
2138 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
2140
2141 let handler = Arc::new(Mutex::new(Vec::new()));
2143 let handler_clone = Arc::clone(&handler);
2144 let mut aggregator = TimeBarAggregator::new(
2145 bar_type,
2146 instrument.price_precision(),
2147 instrument.size_precision(),
2148 clock.clone(),
2149 move |bar: Bar| {
2150 let mut handler_guard = handler_clone.lock().unwrap();
2151 handler_guard.push(bar);
2152 },
2153 false,
2154 true, true, BarIntervalType::LeftOpen,
2157 None,
2158 15,
2159 false, );
2161
2162 aggregator.update(
2163 Price::from("100.00"),
2164 Quantity::from(1),
2165 UnixNanos::default(),
2166 );
2167
2168 let ts1 = UnixNanos::from(1_000_000_000);
2170 clock.borrow_mut().set_time(ts1);
2171 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2172 aggregator.build_bar(event);
2173
2174 let ts2 = UnixNanos::from(2_000_000_000);
2176 clock.borrow_mut().set_time(ts2);
2177 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2178 aggregator.build_bar(event);
2179
2180 let handler_guard = handler.lock().unwrap();
2181 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
2183 assert_eq!(bar1.close, Price::from("100.00"));
2184 let bar2 = &handler_guard[1];
2185 assert_eq!(bar2.close, Price::from("100.00")); }
2187
2188 #[rstest]
2189 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2190 let instrument = InstrumentAny::Equity(equity_aapl);
2191 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2192 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2193 let clock = Rc::new(RefCell::new(TestClock::new()));
2194 let handler = Arc::new(Mutex::new(Vec::new()));
2195 let handler_clone = Arc::clone(&handler);
2196
2197 let mut aggregator = TimeBarAggregator::new(
2198 bar_type,
2199 instrument.price_precision(),
2200 instrument.size_precision(),
2201 clock.clone(),
2202 move |bar: Bar| {
2203 let mut handler_guard = handler_clone.lock().unwrap();
2204 handler_guard.push(bar);
2205 },
2206 false, true, true, BarIntervalType::RightOpen,
2210 None,
2211 15,
2212 false, );
2214
2215 let ts1 = UnixNanos::from(1_000_000_000);
2216 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2217
2218 let ts2 = UnixNanos::from(2_000_000_000);
2219 clock.borrow_mut().set_time(ts2);
2220
2221 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2223 aggregator.build_bar(event);
2224
2225 let handler_guard = handler.lock().unwrap();
2226 let bar = handler_guard.first().unwrap();
2227 assert_eq!(bar.ts_event, UnixNanos::default());
2228 assert_eq!(bar.ts_init, ts2);
2229 }
2230
2231 #[rstest]
2232 fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2233 let instrument = InstrumentAny::Equity(equity_aapl);
2234 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2235 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2236 let clock = Rc::new(RefCell::new(TestClock::new()));
2237 let handler = Arc::new(Mutex::new(Vec::new()));
2238 let handler_clone = Arc::clone(&handler);
2239
2240 let mut aggregator = TimeBarAggregator::new(
2241 bar_type,
2242 instrument.price_precision(),
2243 instrument.size_precision(),
2244 clock.clone(),
2245 move |bar: Bar| {
2246 let mut handler_guard = handler_clone.lock().unwrap();
2247 handler_guard.push(bar);
2248 },
2249 false, true, true, BarIntervalType::LeftOpen,
2253 None,
2254 15,
2255 false, );
2257
2258 let ts1 = UnixNanos::from(1_000_000_000);
2259 clock.borrow_mut().set_time(ts1);
2260
2261 let initial_time = clock.borrow().utc_now();
2262 aggregator.start_batch_time(UnixNanos::from(
2263 initial_time.timestamp_nanos_opt().unwrap() as u64
2264 ));
2265
2266 let handler_guard = handler.lock().unwrap();
2267 assert_eq!(handler_guard.len(), 0);
2268 }
2269}