1use std::{cell::RefCell, ops::Add, rc::Rc};
19
20use chrono::TimeDelta;
21use nautilus_common::{
22 clock::Clock,
23 timer::{TimeEvent, TimeEventCallback},
24};
25use nautilus_core::{
26 UnixNanos,
27 correctness::{self, FAILED},
28 datetime::{add_n_months_nanos, subtract_n_months_nanos},
29};
30use nautilus_model::{
31 data::{
32 QuoteTick, TradeTick,
33 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
34 },
35 enums::{AggregationSource, BarAggregation, BarIntervalType},
36 types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
37};
38
39pub trait BarAggregator {
40 fn bar_type(&self) -> BarType;
42 fn is_running(&self) -> bool;
44 fn set_await_partial(&mut self, value: bool);
45 fn set_is_running(&mut self, value: bool);
46 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
48 fn handle_quote(&mut self, quote: QuoteTick) {
50 let spec = self.bar_type().spec();
51 if !self.await_partial() {
52 self.update(
53 quote.extract_price(spec.price_type),
54 quote.extract_size(spec.price_type),
55 quote.ts_event,
56 );
57 }
58 }
59 fn handle_trade(&mut self, trade: TradeTick) {
61 if !self.await_partial() {
62 self.update(trade.price, trade.size, trade.ts_event);
63 }
64 }
65 fn handle_bar(&mut self, bar: Bar) {
67 if !self.await_partial() {
68 self.update_bar(bar, bar.volume, bar.ts_init);
69 }
70 }
71 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
72 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
73 fn stop_batch_update(&mut self);
74 fn await_partial(&self) -> bool;
75 fn set_partial(&mut self, partial_bar: Bar);
76}
77
78pub struct BarBuilder {
80 bar_type: BarType,
81 price_precision: u8,
82 size_precision: u8,
83 initialized: bool,
84 ts_last: UnixNanos,
85 count: usize,
86 partial_set: bool,
87 last_close: Option<Price>,
88 open: Option<Price>,
89 high: Option<Price>,
90 low: Option<Price>,
91 close: Option<Price>,
92 volume: Quantity,
93}
94
95impl BarBuilder {
96 #[must_use]
104 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
105 correctness::check_equal(
106 bar_type.aggregation_source(),
107 AggregationSource::Internal,
108 "bar_type.aggregation_source",
109 "AggregationSource::Internal",
110 )
111 .expect(FAILED);
112
113 Self {
114 bar_type,
115 price_precision,
116 size_precision,
117 initialized: false,
118 ts_last: UnixNanos::default(),
119 count: 0,
120 partial_set: false,
121 last_close: None,
122 open: None,
123 high: None,
124 low: None,
125 close: None,
126 volume: Quantity::zero(size_precision),
127 }
128 }
129
130 pub fn set_partial(&mut self, partial_bar: Bar) {
132 if self.partial_set {
133 return; }
135
136 self.open = Some(partial_bar.open);
137
138 if self.high.is_none() || partial_bar.high > self.high.unwrap() {
139 self.high = Some(partial_bar.high);
140 }
141
142 if self.low.is_none() || partial_bar.low < self.low.unwrap() {
143 self.low = Some(partial_bar.low);
144 }
145
146 if self.close.is_none() {
147 self.close = Some(partial_bar.close);
148 }
149
150 self.volume = partial_bar.volume;
151
152 if self.ts_last == 0 {
153 self.ts_last = partial_bar.ts_init;
154 }
155
156 self.partial_set = true;
157 self.initialized = true;
158 }
159
160 pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
162 if ts_event < self.ts_last {
163 return; }
165
166 if self.open.is_none() {
167 self.open = Some(price);
168 self.high = Some(price);
169 self.low = Some(price);
170 self.initialized = true;
171 } else {
172 if price > self.high.unwrap() {
173 self.high = Some(price);
174 }
175 if price < self.low.unwrap() {
176 self.low = Some(price);
177 }
178 }
179
180 self.close = Some(price);
181 self.volume = self.volume.add(size);
182 self.count += 1;
183 self.ts_last = ts_event;
184 }
185
186 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
187 if ts_init < self.ts_last {
188 return; }
190
191 if self.open.is_none() {
192 self.open = Some(bar.open);
193 self.high = Some(bar.high);
194 self.low = Some(bar.low);
195 self.initialized = true;
196 } else {
197 if bar.high > self.high.unwrap() {
198 self.high = Some(bar.high);
199 }
200 if bar.low < self.low.unwrap() {
201 self.low = Some(bar.low);
202 }
203 }
204
205 self.close = Some(bar.close);
206 self.volume = self.volume.add(volume);
207 self.count += 1;
208 self.ts_last = ts_init;
209 }
210
211 pub fn reset(&mut self) {
215 self.open = None;
216 self.high = None;
217 self.low = None;
218 self.volume = Quantity::zero(self.size_precision);
219 self.count = 0;
220 }
221
222 pub fn build_now(&mut self) -> Bar {
224 self.build(self.ts_last, self.ts_last)
225 }
226
227 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
229 if self.open.is_none() {
230 self.open = self.last_close;
231 self.high = self.last_close;
232 self.low = self.last_close;
233 self.close = self.last_close;
234 }
235
236 if let (Some(close), Some(low)) = (self.close, self.low) {
237 if close < low {
238 self.low = Some(close);
239 }
240 }
241
242 if let (Some(close), Some(high)) = (self.close, self.high) {
243 if close > high {
244 self.high = Some(close);
245 }
246 }
247
248 let bar = Bar::new(
250 self.bar_type,
251 self.open.unwrap(),
252 self.high.unwrap(),
253 self.low.unwrap(),
254 self.close.unwrap(),
255 self.volume,
256 ts_event,
257 ts_init,
258 );
259
260 self.last_close = self.close;
261 self.reset();
262 bar
263 }
264}
265
266pub struct BarAggregatorCore<H>
268where
269 H: FnMut(Bar),
270{
271 bar_type: BarType,
272 builder: BarBuilder,
273 handler: H,
274 handler_backup: Option<H>,
275 batch_handler: Option<Box<dyn FnMut(Bar)>>,
276 await_partial: bool,
277 is_running: bool,
278 batch_mode: bool,
279}
280
281impl<H> BarAggregatorCore<H>
282where
283 H: FnMut(Bar),
284{
285 pub fn new(
293 bar_type: BarType,
294 price_precision: u8,
295 size_precision: u8,
296 handler: H,
297 await_partial: bool,
298 ) -> Self {
299 Self {
300 bar_type,
301 builder: BarBuilder::new(bar_type, price_precision, size_precision),
302 handler,
303 handler_backup: None,
304 batch_handler: None,
305 await_partial,
306 is_running: false,
307 batch_mode: false,
308 }
309 }
310
311 pub const fn set_await_partial(&mut self, value: bool) {
312 self.await_partial = value;
313 }
314
315 pub const fn set_is_running(&mut self, value: bool) {
316 self.is_running = value;
317 }
318
319 pub const fn await_partial(&self) -> bool {
320 self.await_partial
321 }
322
323 pub fn set_partial(&mut self, partial_bar: Bar) {
325 self.builder.set_partial(partial_bar);
326 }
327
328 fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
329 self.builder.update(price, size, ts_event);
330 }
331
332 fn build_now_and_send(&mut self) {
333 let bar = self.builder.build_now();
334 (self.handler)(bar);
335 }
336
337 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
338 let bar = self.builder.build(ts_event, ts_init);
339
340 if self.batch_mode {
341 if let Some(handler) = &mut self.batch_handler {
342 handler(bar);
343 }
344 } else {
345 (self.handler)(bar);
346 }
347 }
348
349 pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
350 self.batch_mode = true;
351 self.batch_handler = Some(handler);
352 }
353
354 pub fn stop_batch_update(&mut self) {
355 self.batch_mode = false;
356
357 if let Some(handler) = self.handler_backup.take() {
358 self.handler = handler;
359 }
360 }
361}
362
363pub struct TickBarAggregator<H>
368where
369 H: FnMut(Bar),
370{
371 core: BarAggregatorCore<H>,
372 cum_value: f64,
373}
374
375impl<H> TickBarAggregator<H>
376where
377 H: FnMut(Bar),
378{
379 pub fn new(
387 bar_type: BarType,
388 price_precision: u8,
389 size_precision: u8,
390 handler: H,
391 await_partial: bool,
392 ) -> Self {
393 Self {
394 core: BarAggregatorCore::new(
395 bar_type,
396 price_precision,
397 size_precision,
398 handler,
399 await_partial,
400 ),
401 cum_value: 0.0,
402 }
403 }
404}
405
406impl<H> BarAggregator for TickBarAggregator<H>
407where
408 H: FnMut(Bar),
409{
410 fn bar_type(&self) -> BarType {
411 self.core.bar_type
412 }
413
414 fn is_running(&self) -> bool {
415 self.core.is_running
416 }
417
418 fn set_await_partial(&mut self, value: bool) {
419 self.core.set_await_partial(value);
420 }
421
422 fn set_is_running(&mut self, value: bool) {
423 self.core.set_is_running(value);
424 }
425
426 fn await_partial(&self) -> bool {
427 self.core.await_partial()
428 }
429
430 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
432 self.core.apply_update(price, size, ts_event);
433 let spec = self.core.bar_type.spec();
434
435 if self.core.builder.count >= spec.step.get() {
436 self.core.build_now_and_send();
437 }
438 }
439
440 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
441 let mut volume_update = volume;
442 let average_price = Price::new(
443 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
444 self.core.builder.price_precision,
445 );
446
447 while volume_update.as_f64() > 0.0 {
448 let value_update = average_price.as_f64() * volume_update.as_f64();
449 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
450 self.cum_value += value_update;
451 self.core.builder.update_bar(bar, volume_update, ts_init);
452 break;
453 }
454
455 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
456 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
457 self.core.builder.update_bar(
458 bar,
459 Quantity::new(volume_diff, volume_update.precision),
460 ts_init,
461 );
462
463 self.core.build_now_and_send();
464 self.cum_value = 0.0;
465 volume_update = Quantity::new(
466 volume_update.as_f64() - volume_diff,
467 volume_update.precision,
468 );
469 }
470 }
471
472 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
473 self.core.start_batch_update(handler);
474 }
475
476 fn stop_batch_update(&mut self) {
477 self.core.stop_batch_update();
478 }
479
480 fn set_partial(&mut self, partial_bar: Bar) {
481 self.core.set_partial(partial_bar);
482 }
483}
484
485pub struct VolumeBarAggregator<H>
487where
488 H: FnMut(Bar),
489{
490 core: BarAggregatorCore<H>,
491}
492
493impl<H> VolumeBarAggregator<H>
494where
495 H: FnMut(Bar),
496{
497 pub fn new(
505 bar_type: BarType,
506 price_precision: u8,
507 size_precision: u8,
508 handler: H,
509 await_partial: bool,
510 ) -> Self {
511 Self {
512 core: BarAggregatorCore::new(
513 bar_type.standard(),
514 price_precision,
515 size_precision,
516 handler,
517 await_partial,
518 ),
519 }
520 }
521}
522
523impl<H> BarAggregator for VolumeBarAggregator<H>
524where
525 H: FnMut(Bar),
526{
527 fn bar_type(&self) -> BarType {
528 self.core.bar_type
529 }
530
531 fn is_running(&self) -> bool {
532 self.core.is_running
533 }
534
535 fn set_await_partial(&mut self, value: bool) {
536 self.core.set_await_partial(value);
537 }
538
539 fn set_is_running(&mut self, value: bool) {
540 self.core.set_is_running(value);
541 }
542
543 fn await_partial(&self) -> bool {
544 self.core.await_partial()
545 }
546
547 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
549 let mut raw_size_update = size.raw;
550 let spec = self.core.bar_type.spec();
551 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
552
553 while raw_size_update > 0 {
554 if self.core.builder.volume.raw + raw_size_update < raw_step {
555 self.core.apply_update(
556 price,
557 Quantity::from_raw(raw_size_update, size.precision),
558 ts_event,
559 );
560 break;
561 }
562
563 let raw_size_diff = raw_step - self.core.builder.volume.raw;
564 self.core.apply_update(
565 price,
566 Quantity::from_raw(raw_size_diff, size.precision),
567 ts_event,
568 );
569
570 self.core.build_now_and_send();
571 raw_size_update -= raw_size_diff;
572 }
573 }
574
575 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
576 let mut raw_volume_update = volume.raw;
577 let spec = self.core.bar_type.spec();
578 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
579
580 while raw_volume_update > 0 {
581 if self.core.builder.volume.raw + raw_volume_update < raw_step {
582 self.core.builder.update_bar(
583 bar,
584 Quantity::from_raw(raw_volume_update, volume.precision),
585 ts_init,
586 );
587 break;
588 }
589
590 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
591 self.core.builder.update_bar(
592 bar,
593 Quantity::from_raw(raw_volume_diff, volume.precision),
594 ts_init,
595 );
596
597 self.core.build_now_and_send();
598 raw_volume_update -= raw_volume_diff;
599 }
600 }
601
602 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
603 self.core.start_batch_update(handler);
604 }
605
606 fn stop_batch_update(&mut self) {
607 self.core.stop_batch_update();
608 }
609
610 fn set_partial(&mut self, partial_bar: Bar) {
611 self.core.set_partial(partial_bar);
612 }
613}
614
615pub struct ValueBarAggregator<H>
620where
621 H: FnMut(Bar),
622{
623 core: BarAggregatorCore<H>,
624 cum_value: f64,
625}
626
627impl<H> ValueBarAggregator<H>
628where
629 H: FnMut(Bar),
630{
631 pub fn new(
639 bar_type: BarType,
640 price_precision: u8,
641 size_precision: u8,
642 handler: H,
643 await_partial: bool,
644 ) -> Self {
645 Self {
646 core: BarAggregatorCore::new(
647 bar_type.standard(),
648 price_precision,
649 size_precision,
650 handler,
651 await_partial,
652 ),
653 cum_value: 0.0,
654 }
655 }
656
657 #[must_use]
658 pub const fn get_cumulative_value(&self) -> f64 {
660 self.cum_value
661 }
662}
663
664impl<H> BarAggregator for ValueBarAggregator<H>
665where
666 H: FnMut(Bar),
667{
668 fn bar_type(&self) -> BarType {
669 self.core.bar_type
670 }
671
672 fn is_running(&self) -> bool {
673 self.core.is_running
674 }
675
676 fn set_await_partial(&mut self, value: bool) {
677 self.core.set_await_partial(value);
678 }
679
680 fn set_is_running(&mut self, value: bool) {
681 self.core.set_is_running(value);
682 }
683
684 fn await_partial(&self) -> bool {
685 self.core.await_partial()
686 }
687
688 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
690 let mut size_update = size.as_f64();
691 let spec = self.core.bar_type.spec();
692
693 while size_update > 0.0 {
694 let value_update = price.as_f64() * size_update;
695 if self.cum_value + value_update < spec.step.get() as f64 {
696 self.cum_value += value_update;
697 self.core
698 .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
699 break;
700 }
701
702 let value_diff = spec.step.get() as f64 - self.cum_value;
703 let size_diff = size_update * (value_diff / value_update);
704 self.core
705 .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
706
707 self.core.build_now_and_send();
708 self.cum_value = 0.0;
709 size_update -= size_diff;
710 }
711 }
712
713 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
714 let mut volume_update = volume;
715 let average_price = Price::new(
716 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
717 self.core.builder.price_precision,
718 );
719
720 while volume_update.as_f64() > 0.0 {
721 let value_update = average_price.as_f64() * volume_update.as_f64();
722 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
723 self.cum_value += value_update;
724 self.core.builder.update_bar(bar, volume_update, ts_init);
725 break;
726 }
727
728 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
729 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
730 self.core.builder.update_bar(
731 bar,
732 Quantity::new(volume_diff, volume_update.precision),
733 ts_init,
734 );
735
736 self.core.build_now_and_send();
737 self.cum_value = 0.0;
738 volume_update = Quantity::new(
739 volume_update.as_f64() - volume_diff,
740 volume_update.precision,
741 );
742 }
743 }
744
745 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
746 self.core.start_batch_update(handler);
747 }
748
749 fn stop_batch_update(&mut self) {
750 self.core.stop_batch_update();
751 }
752
753 fn set_partial(&mut self, partial_bar: Bar) {
754 self.core.set_partial(partial_bar);
755 }
756}
757
758pub struct TimeBarAggregator<H>
762where
763 H: FnMut(Bar),
764{
765 core: BarAggregatorCore<H>,
766 clock: Rc<RefCell<dyn Clock>>,
767 build_with_no_updates: bool,
768 timestamp_on_close: bool,
769 is_left_open: bool,
770 build_on_next_tick: bool,
771 stored_open_ns: UnixNanos,
772 stored_close_ns: UnixNanos,
773 timer_name: String,
774 interval_ns: UnixNanos,
775 next_close_ns: UnixNanos,
776 composite_bar_build_delay: i64,
777 add_delay: bool,
778 batch_open_ns: UnixNanos,
779 batch_next_close_ns: UnixNanos,
780 time_bars_origin: Option<TimeDelta>,
781 skip_first_non_full_bar: bool,
782}
783
784#[derive(Clone)]
785pub struct NewBarCallback<H: FnMut(Bar)> {
786 aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
787}
788
789impl<H: FnMut(Bar)> NewBarCallback<H> {
790 pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
791 Self { aggregator }
792 }
793}
794
795impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
796 fn from(value: NewBarCallback<H>) -> Self {
797 Self::Rust(Rc::new(move |event: TimeEvent| {
798 value.aggregator.borrow_mut().build_bar(event);
799 }))
800 }
801}
802
803impl<H> TimeBarAggregator<H>
804where
805 H: FnMut(Bar) + 'static,
806{
807 #[allow(clippy::too_many_arguments)]
815 pub fn new(
816 bar_type: BarType,
817 price_precision: u8,
818 size_precision: u8,
819 clock: Rc<RefCell<dyn Clock>>,
820 handler: H,
821 await_partial: bool,
822 build_with_no_updates: bool,
823 timestamp_on_close: bool,
824 interval_type: BarIntervalType,
825 time_bars_origin: Option<TimeDelta>,
826 composite_bar_build_delay: i64,
827 skip_first_non_full_bar: bool,
828 ) -> Self {
829 let is_left_open = match interval_type {
830 BarIntervalType::LeftOpen => true,
831 BarIntervalType::RightOpen => false,
832 };
833
834 let add_delay = bar_type.is_composite()
835 && bar_type.composite().aggregation_source() == AggregationSource::Internal;
836
837 let core = BarAggregatorCore::new(
838 bar_type.standard(),
839 price_precision,
840 size_precision,
841 handler,
842 await_partial,
843 );
844
845 Self {
846 core,
847 clock,
848 build_with_no_updates,
849 timestamp_on_close,
850 is_left_open,
851 build_on_next_tick: false,
852 stored_open_ns: UnixNanos::default(),
853 stored_close_ns: UnixNanos::default(),
854 timer_name: bar_type.to_string(),
855 interval_ns: get_bar_interval_ns(&bar_type),
856 next_close_ns: UnixNanos::default(),
857 composite_bar_build_delay,
858 add_delay,
859 batch_open_ns: UnixNanos::default(),
860 batch_next_close_ns: UnixNanos::default(),
861 time_bars_origin,
862 skip_first_non_full_bar,
863 }
864 }
865
866 pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
868 let now = self.clock.borrow().utc_now();
869 let mut start_time = get_time_bar_start(now, &self.bar_type(), self.time_bars_origin);
870
871 if start_time == now {
872 self.skip_first_non_full_bar = false;
873 }
874
875 if self.add_delay {
876 start_time += TimeDelta::microseconds(self.composite_bar_build_delay);
877 }
878
879 let spec = &self.bar_type().spec();
880 let start_time_ns = UnixNanos::from(start_time);
881
882 if spec.aggregation == BarAggregation::Month {
883 let step = spec.step.get() as u32;
884 let alert_time_ns = add_n_months_nanos(start_time_ns, step);
885
886 self.clock
887 .borrow_mut()
888 .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
889 .expect(FAILED);
890 } else {
891 self.clock
892 .borrow_mut()
893 .set_timer_ns(
894 &self.timer_name,
895 self.interval_ns.as_u64(),
896 start_time_ns,
897 None,
898 Some(callback.into()),
899 None,
900 )
901 .expect(FAILED);
902 }
903
904 log::debug!("Started timer {}", self.timer_name);
905 Ok(())
906 }
907
908 pub fn stop(&mut self) {
910 self.clock.borrow_mut().cancel_timer(&self.timer_name);
911 }
912
913 pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
914 let spec = self.bar_type().spec();
915 self.core.batch_mode = true;
916
917 let time = time_ns.to_datetime_utc();
918 let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin);
919 self.batch_open_ns = UnixNanos::from(start_time);
920
921 if spec.aggregation == BarAggregation::Month {
922 let step = spec.step.get() as u32;
923
924 if self.batch_open_ns == time_ns {
925 self.batch_open_ns = subtract_n_months_nanos(self.batch_open_ns, step);
926 }
927
928 self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step);
929 } else {
930 if self.batch_open_ns == time_ns {
931 self.batch_open_ns -= self.interval_ns;
932 }
933
934 self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
935 }
936 }
937
938 const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
939 if self.is_left_open {
940 if self.timestamp_on_close {
941 close_ns
942 } else {
943 open_ns
944 }
945 } else {
946 open_ns
947 }
948 }
949
950 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
951 if self.skip_first_non_full_bar {
952 self.core.builder.reset();
953 self.skip_first_non_full_bar = false;
954 } else {
955 self.core.build_and_send(ts_event, ts_init);
956 }
957 }
958
959 fn batch_pre_update(&mut self, time_ns: UnixNanos) {
960 if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
961 let ts_init = self.batch_next_close_ns;
962 let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
963 self.build_and_send(ts_event, ts_init);
964 }
965 }
966
967 fn batch_post_update(&mut self, time_ns: UnixNanos) {
968 let step = self.bar_type().spec().step.get() as u32;
969
970 if !self.core.batch_mode
972 && time_ns == self.batch_next_close_ns
973 && time_ns > self.stored_open_ns
974 {
975 self.batch_next_close_ns = UnixNanos::default();
976 return;
977 }
978
979 if time_ns > self.batch_next_close_ns {
980 if self.bar_type().spec().aggregation == BarAggregation::Month {
982 while self.batch_next_close_ns < time_ns {
983 self.batch_next_close_ns = add_n_months_nanos(self.batch_next_close_ns, step);
984 }
985
986 self.batch_open_ns = subtract_n_months_nanos(self.batch_next_close_ns, step);
987 } else {
988 while self.batch_next_close_ns < time_ns {
989 self.batch_next_close_ns += self.interval_ns;
990 }
991
992 self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
993 }
994 }
995
996 if time_ns == self.batch_next_close_ns {
997 let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
998 self.build_and_send(ts_event, time_ns);
999 self.batch_open_ns = self.batch_next_close_ns;
1000
1001 if self.bar_type().spec().aggregation == BarAggregation::Month {
1002 self.batch_next_close_ns = add_n_months_nanos(self.batch_next_close_ns, step);
1003 } else {
1004 self.batch_next_close_ns += self.interval_ns;
1005 }
1006 }
1007
1008 if !self.core.batch_mode {
1010 self.batch_next_close_ns = UnixNanos::default();
1011 }
1012 }
1013
1014 fn build_bar(&mut self, event: TimeEvent) {
1015 if !self.core.builder.initialized {
1016 self.build_on_next_tick = true;
1017 self.stored_close_ns = self.next_close_ns;
1018 return;
1019 }
1020
1021 if !self.build_with_no_updates && self.core.builder.count == 0 {
1022 return;
1023 }
1024
1025 let ts_init = event.ts_event;
1026 let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1027 self.build_and_send(ts_event, ts_init);
1028
1029 self.stored_open_ns = ts_init;
1030
1031 if self.bar_type().spec().aggregation == BarAggregation::Month {
1032 let step = self.bar_type().spec().step.get() as u32;
1033 let next_alert_ns = add_n_months_nanos(ts_init, step);
1034
1035 self.clock
1036 .borrow_mut()
1037 .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1038 .expect(FAILED);
1039
1040 self.next_close_ns = next_alert_ns;
1041 } else {
1042 self.next_close_ns = self.clock.borrow().next_time_ns(&self.timer_name);
1043 }
1044 }
1045}
1046
1047impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1048where
1049 H: FnMut(Bar) + 'static,
1050{
1051 fn bar_type(&self) -> BarType {
1052 self.core.bar_type
1053 }
1054
1055 fn is_running(&self) -> bool {
1056 self.core.is_running
1057 }
1058
1059 fn set_await_partial(&mut self, value: bool) {
1060 self.core.set_await_partial(value);
1061 }
1062
1063 fn set_is_running(&mut self, value: bool) {
1064 self.core.set_is_running(value);
1065 }
1066
1067 fn await_partial(&self) -> bool {
1068 self.core.await_partial()
1069 }
1070
1071 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1072 if self.batch_next_close_ns != UnixNanos::default() {
1073 self.batch_pre_update(ts_event);
1074 }
1075
1076 self.core.apply_update(price, size, ts_event);
1077
1078 if self.build_on_next_tick {
1079 if ts_event <= self.stored_close_ns {
1080 let ts_init = ts_event;
1081 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1082 self.build_and_send(ts_event, ts_init);
1083 }
1084
1085 self.build_on_next_tick = false;
1086 self.stored_close_ns = UnixNanos::default();
1087 }
1088
1089 if self.batch_next_close_ns != UnixNanos::default() {
1090 self.batch_post_update(ts_event);
1091 }
1092 }
1093
1094 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1095 if self.batch_next_close_ns != UnixNanos::default() {
1096 self.batch_pre_update(ts_init);
1097 }
1098
1099 self.core.builder.update_bar(bar, volume, ts_init);
1100
1101 if self.build_on_next_tick {
1102 if ts_init <= self.stored_close_ns {
1103 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1104 self.build_and_send(ts_event, ts_init);
1105 }
1106
1107 self.build_on_next_tick = false;
1109 self.stored_close_ns = UnixNanos::default();
1110 }
1111
1112 if self.batch_next_close_ns != UnixNanos::default() {
1113 self.batch_post_update(ts_init);
1114 }
1115 }
1116
1117 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1118 self.core.start_batch_update(handler);
1119 self.start_batch_time(time_ns);
1120 }
1121
1122 fn stop_batch_update(&mut self) {
1123 self.core.stop_batch_update();
1124 }
1125
1126 fn set_partial(&mut self, partial_bar: Bar) {
1127 self.core.set_partial(partial_bar);
1128 }
1129}
1130
1131#[cfg(test)]
1135mod tests {
1136 use std::sync::{Arc, Mutex};
1137
1138 use nautilus_common::clock::TestClock;
1139 use nautilus_core::UUID4;
1140 use nautilus_model::{
1141 data::{BarSpecification, BarType},
1142 enums::{AggregationSource, BarAggregation, PriceType},
1143 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1144 types::{Price, Quantity},
1145 };
1146 use rstest::rstest;
1147 use ustr::Ustr;
1148
1149 use super::*;
1150
1151 #[rstest]
1152 fn test_bar_builder_initialization(equity_aapl: Equity) {
1153 let instrument = InstrumentAny::Equity(equity_aapl);
1154 let bar_type = BarType::new(
1155 instrument.id(),
1156 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1157 AggregationSource::Internal,
1158 );
1159 let builder = BarBuilder::new(
1160 bar_type,
1161 instrument.price_precision(),
1162 instrument.size_precision(),
1163 );
1164
1165 assert!(!builder.initialized);
1166 assert_eq!(builder.ts_last, 0);
1167 assert_eq!(builder.count, 0);
1168 }
1169
1170 #[rstest]
1171 fn test_set_partial_update(equity_aapl: Equity) {
1172 let instrument = InstrumentAny::Equity(equity_aapl);
1173 let bar_type = BarType::new(
1174 instrument.id(),
1175 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1176 AggregationSource::Internal,
1177 );
1178 let mut builder = BarBuilder::new(
1179 bar_type,
1180 instrument.price_precision(),
1181 instrument.size_precision(),
1182 );
1183
1184 let partial_bar = Bar::new(
1185 bar_type,
1186 Price::from("101.00"),
1187 Price::from("102.00"),
1188 Price::from("100.00"),
1189 Price::from("101.00"),
1190 Quantity::from(100),
1191 UnixNanos::from(1),
1192 UnixNanos::from(2),
1193 );
1194
1195 builder.set_partial(partial_bar);
1196 let bar = builder.build_now();
1197
1198 assert_eq!(bar.open, partial_bar.open);
1199 assert_eq!(bar.high, partial_bar.high);
1200 assert_eq!(bar.low, partial_bar.low);
1201 assert_eq!(bar.close, partial_bar.close);
1202 assert_eq!(bar.volume, partial_bar.volume);
1203 assert_eq!(builder.ts_last, 2);
1204 }
1205
1206 #[rstest]
1207 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1208 let instrument = InstrumentAny::Equity(equity_aapl);
1209 let bar_type = BarType::new(
1210 instrument.id(),
1211 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1212 AggregationSource::Internal,
1213 );
1214 let mut builder = BarBuilder::new(
1215 bar_type,
1216 instrument.price_precision(),
1217 instrument.size_precision(),
1218 );
1219
1220 builder.update(
1221 Price::from("100.00"),
1222 Quantity::from(1),
1223 UnixNanos::from(1000),
1224 );
1225 builder.update(
1226 Price::from("95.00"),
1227 Quantity::from(1),
1228 UnixNanos::from(2000),
1229 );
1230 builder.update(
1231 Price::from("105.00"),
1232 Quantity::from(1),
1233 UnixNanos::from(3000),
1234 );
1235
1236 let bar = builder.build_now();
1237 assert!(bar.high > bar.low);
1238 assert_eq!(bar.open, Price::from("100.00"));
1239 assert_eq!(bar.high, Price::from("105.00"));
1240 assert_eq!(bar.low, Price::from("95.00"));
1241 assert_eq!(bar.close, Price::from("105.00"));
1242 }
1243
1244 #[rstest]
1245 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1246 let instrument = InstrumentAny::Equity(equity_aapl);
1247 let bar_type = BarType::new(
1248 instrument.id(),
1249 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1250 AggregationSource::Internal,
1251 );
1252 let mut builder = BarBuilder::new(
1253 bar_type,
1254 instrument.price_precision(),
1255 instrument.size_precision(),
1256 );
1257
1258 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1259 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1260
1261 assert_eq!(builder.ts_last, 1_000);
1262 assert_eq!(builder.count, 1);
1263 }
1264
1265 #[rstest]
1266 fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1267 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1268 let bar_type = BarType::new(
1269 instrument.id(),
1270 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1271 AggregationSource::Internal,
1272 );
1273 let mut builder = BarBuilder::new(
1274 bar_type,
1275 instrument.price_precision(),
1276 instrument.size_precision(),
1277 );
1278
1279 let partial_bar = Bar::new(
1280 bar_type,
1281 Price::from("1.00001"),
1282 Price::from("1.00010"),
1283 Price::from("1.00000"),
1284 Price::from("1.00002"),
1285 Quantity::from(1),
1286 UnixNanos::from(1_000_000_000),
1287 UnixNanos::from(2_000_000_000),
1288 );
1289
1290 builder.set_partial(partial_bar);
1291 let bar = builder.build_now();
1292
1293 assert_eq!(bar.open, Price::from("1.00001"));
1294 assert_eq!(bar.high, Price::from("1.00010"));
1295 assert_eq!(bar.low, Price::from("1.00000"));
1296 assert_eq!(bar.close, Price::from("1.00002"));
1297 assert_eq!(bar.volume, Quantity::from(1));
1298 assert_eq!(bar.ts_init, 2_000_000_000);
1299 assert_eq!(builder.ts_last, 2_000_000_000);
1300 }
1301
1302 #[rstest]
1303 fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1304 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1305 let bar_type = BarType::new(
1306 instrument.id(),
1307 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1308 AggregationSource::Internal,
1309 );
1310 let mut builder = BarBuilder::new(
1311 bar_type,
1312 instrument.price_precision(),
1313 instrument.size_precision(),
1314 );
1315
1316 let partial_bar1 = Bar::new(
1317 bar_type,
1318 Price::from("1.00001"),
1319 Price::from("1.00010"),
1320 Price::from("1.00000"),
1321 Price::from("1.00002"),
1322 Quantity::from(1),
1323 UnixNanos::from(1_000_000_000),
1324 UnixNanos::from(1_000_000_000),
1325 );
1326
1327 let partial_bar2 = Bar::new(
1328 bar_type,
1329 Price::from("2.00001"),
1330 Price::from("2.00010"),
1331 Price::from("2.00000"),
1332 Price::from("2.00002"),
1333 Quantity::from(2),
1334 UnixNanos::from(3_000_000_000),
1335 UnixNanos::from(3_000_000_000),
1336 );
1337
1338 builder.set_partial(partial_bar1);
1339 builder.set_partial(partial_bar2);
1340 let bar = builder.build(
1341 UnixNanos::from(4_000_000_000),
1342 UnixNanos::from(4_000_000_000),
1343 );
1344
1345 assert_eq!(bar.open, Price::from("1.00001"));
1346 assert_eq!(bar.high, Price::from("1.00010"));
1347 assert_eq!(bar.low, Price::from("1.00000"));
1348 assert_eq!(bar.close, Price::from("1.00002"));
1349 assert_eq!(bar.volume, Quantity::from(1));
1350 assert_eq!(bar.ts_init, 4_000_000_000);
1351 assert_eq!(builder.ts_last, 1_000_000_000);
1352 }
1353
1354 #[rstest]
1355 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1356 let instrument = InstrumentAny::Equity(equity_aapl);
1357 let bar_type = BarType::new(
1358 instrument.id(),
1359 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1360 AggregationSource::Internal,
1361 );
1362 let mut builder = BarBuilder::new(
1363 bar_type,
1364 instrument.price_precision(),
1365 instrument.size_precision(),
1366 );
1367
1368 builder.update(
1369 Price::from("1.00000"),
1370 Quantity::from(1),
1371 UnixNanos::default(),
1372 );
1373
1374 assert!(builder.initialized);
1375 assert_eq!(builder.ts_last, 0);
1376 assert_eq!(builder.count, 1);
1377 }
1378
1379 #[rstest]
1380 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1381 equity_aapl: Equity,
1382 ) {
1383 let instrument = InstrumentAny::Equity(equity_aapl);
1384 let bar_type = BarType::new(
1385 instrument.id(),
1386 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1387 AggregationSource::Internal,
1388 );
1389 let mut builder = BarBuilder::new(bar_type, 2, 0);
1390
1391 builder.update(
1392 Price::from("1.00000"),
1393 Quantity::from(1),
1394 UnixNanos::from(1_000),
1395 );
1396 builder.update(
1397 Price::from("1.00001"),
1398 Quantity::from(1),
1399 UnixNanos::from(500),
1400 );
1401
1402 assert!(builder.initialized);
1403 assert_eq!(builder.ts_last, 1_000);
1404 assert_eq!(builder.count, 1);
1405 }
1406
1407 #[rstest]
1408 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1409 let instrument = InstrumentAny::Equity(equity_aapl);
1410 let bar_type = BarType::new(
1411 instrument.id(),
1412 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1413 AggregationSource::Internal,
1414 );
1415 let mut builder = BarBuilder::new(
1416 bar_type,
1417 instrument.price_precision(),
1418 instrument.size_precision(),
1419 );
1420
1421 for _ in 0..5 {
1422 builder.update(
1423 Price::from("1.00000"),
1424 Quantity::from(1),
1425 UnixNanos::from(1_000),
1426 );
1427 }
1428
1429 assert_eq!(builder.count, 5);
1430 }
1431
1432 #[rstest]
1433 #[should_panic]
1434 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1435 let instrument = InstrumentAny::Equity(equity_aapl);
1436 let bar_type = BarType::new(
1437 instrument.id(),
1438 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1439 AggregationSource::Internal,
1440 );
1441 let mut builder = BarBuilder::new(
1442 bar_type,
1443 instrument.price_precision(),
1444 instrument.size_precision(),
1445 );
1446 let _ = builder.build_now();
1447 }
1448
1449 #[rstest]
1450 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1451 let instrument = InstrumentAny::Equity(equity_aapl);
1452 let bar_type = BarType::new(
1453 instrument.id(),
1454 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1455 AggregationSource::Internal,
1456 );
1457 let mut builder = BarBuilder::new(
1458 bar_type,
1459 instrument.price_precision(),
1460 instrument.size_precision(),
1461 );
1462
1463 builder.update(
1464 Price::from("1.00001"),
1465 Quantity::from(2),
1466 UnixNanos::default(),
1467 );
1468 builder.update(
1469 Price::from("1.00002"),
1470 Quantity::from(2),
1471 UnixNanos::default(),
1472 );
1473 builder.update(
1474 Price::from("1.00000"),
1475 Quantity::from(1),
1476 UnixNanos::from(1_000_000_000),
1477 );
1478
1479 let bar = builder.build_now();
1480
1481 assert_eq!(bar.open, Price::from("1.00001"));
1482 assert_eq!(bar.high, Price::from("1.00002"));
1483 assert_eq!(bar.low, Price::from("1.00000"));
1484 assert_eq!(bar.close, Price::from("1.00000"));
1485 assert_eq!(bar.volume, Quantity::from(5));
1486 assert_eq!(bar.ts_init, 1_000_000_000);
1487 assert_eq!(builder.ts_last, 1_000_000_000);
1488 assert_eq!(builder.count, 0);
1489 }
1490
1491 #[rstest]
1492 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1493 let instrument = InstrumentAny::Equity(equity_aapl);
1494 let bar_type = BarType::new(
1495 instrument.id(),
1496 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1497 AggregationSource::Internal,
1498 );
1499 let mut builder = BarBuilder::new(bar_type, 2, 0);
1500
1501 builder.update(
1502 Price::from("1.00001"),
1503 Quantity::from(1),
1504 UnixNanos::default(),
1505 );
1506 builder.build_now();
1507
1508 builder.update(
1509 Price::from("1.00000"),
1510 Quantity::from(1),
1511 UnixNanos::default(),
1512 );
1513 builder.update(
1514 Price::from("1.00003"),
1515 Quantity::from(1),
1516 UnixNanos::default(),
1517 );
1518 builder.update(
1519 Price::from("1.00002"),
1520 Quantity::from(1),
1521 UnixNanos::default(),
1522 );
1523
1524 let bar = builder.build_now();
1525
1526 assert_eq!(bar.open, Price::from("1.00000"));
1527 assert_eq!(bar.high, Price::from("1.00003"));
1528 assert_eq!(bar.low, Price::from("1.00000"));
1529 assert_eq!(bar.close, Price::from("1.00002"));
1530 assert_eq!(bar.volume, Quantity::from(3));
1531 }
1532
1533 #[rstest]
1534 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1535 let instrument = InstrumentAny::Equity(equity_aapl);
1536 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1537 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1538 let handler = Arc::new(Mutex::new(Vec::new()));
1539 let handler_clone = Arc::clone(&handler);
1540
1541 let mut aggregator = TickBarAggregator::new(
1542 bar_type,
1543 instrument.price_precision(),
1544 instrument.size_precision(),
1545 move |bar: Bar| {
1546 let mut handler_guard = handler_clone.lock().unwrap();
1547 handler_guard.push(bar);
1548 },
1549 false,
1550 );
1551
1552 let trade = TradeTick::default();
1553 aggregator.handle_trade(trade);
1554
1555 let handler_guard = handler.lock().unwrap();
1556 assert_eq!(handler_guard.len(), 0);
1557 }
1558
1559 #[rstest]
1560 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1561 let instrument = InstrumentAny::Equity(equity_aapl);
1562 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1563 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1564 let handler = Arc::new(Mutex::new(Vec::new()));
1565 let handler_clone = Arc::clone(&handler);
1566
1567 let mut aggregator = TickBarAggregator::new(
1568 bar_type,
1569 instrument.price_precision(),
1570 instrument.size_precision(),
1571 move |bar: Bar| {
1572 let mut handler_guard = handler_clone.lock().unwrap();
1573 handler_guard.push(bar);
1574 },
1575 false,
1576 );
1577
1578 let trade = TradeTick::default();
1579 aggregator.handle_trade(trade);
1580 aggregator.handle_trade(trade);
1581 aggregator.handle_trade(trade);
1582
1583 let handler_guard = handler.lock().unwrap();
1584 let bar = handler_guard.first().unwrap();
1585 assert_eq!(handler_guard.len(), 1);
1586 assert_eq!(bar.open, trade.price);
1587 assert_eq!(bar.high, trade.price);
1588 assert_eq!(bar.low, trade.price);
1589 assert_eq!(bar.close, trade.price);
1590 assert_eq!(bar.volume, Quantity::from(300000));
1591 assert_eq!(bar.ts_event, trade.ts_event);
1592 assert_eq!(bar.ts_init, trade.ts_init);
1593 }
1594
1595 #[rstest]
1596 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1597 let instrument = InstrumentAny::Equity(equity_aapl);
1598 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1599 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1600 let handler = Arc::new(Mutex::new(Vec::new()));
1601 let handler_clone = Arc::clone(&handler);
1602
1603 let mut aggregator = TickBarAggregator::new(
1604 bar_type,
1605 instrument.price_precision(),
1606 instrument.size_precision(),
1607 move |bar: Bar| {
1608 let mut handler_guard = handler_clone.lock().unwrap();
1609 handler_guard.push(bar);
1610 },
1611 false,
1612 );
1613
1614 aggregator.update(
1615 Price::from("1.00001"),
1616 Quantity::from(1),
1617 UnixNanos::default(),
1618 );
1619 aggregator.update(
1620 Price::from("1.00002"),
1621 Quantity::from(1),
1622 UnixNanos::from(1000),
1623 );
1624 aggregator.update(
1625 Price::from("1.00003"),
1626 Quantity::from(1),
1627 UnixNanos::from(2000),
1628 );
1629
1630 let handler_guard = handler.lock().unwrap();
1631 assert_eq!(handler_guard.len(), 1);
1632
1633 let bar = handler_guard.first().unwrap();
1634 assert_eq!(bar.open, Price::from("1.00001"));
1635 assert_eq!(bar.high, Price::from("1.00003"));
1636 assert_eq!(bar.low, Price::from("1.00001"));
1637 assert_eq!(bar.close, Price::from("1.00003"));
1638 assert_eq!(bar.volume, Quantity::from(3));
1639 }
1640
1641 #[rstest]
1642 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1643 let instrument = InstrumentAny::Equity(equity_aapl);
1644 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1645 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1646 let handler = Arc::new(Mutex::new(Vec::new()));
1647 let handler_clone = Arc::clone(&handler);
1648
1649 let mut aggregator = TickBarAggregator::new(
1650 bar_type,
1651 instrument.price_precision(),
1652 instrument.size_precision(),
1653 move |bar: Bar| {
1654 let mut handler_guard = handler_clone.lock().unwrap();
1655 handler_guard.push(bar);
1656 },
1657 false,
1658 );
1659
1660 aggregator.update(
1661 Price::from("1.00001"),
1662 Quantity::from(1),
1663 UnixNanos::default(),
1664 );
1665 aggregator.update(
1666 Price::from("1.00002"),
1667 Quantity::from(1),
1668 UnixNanos::from(1000),
1669 );
1670 aggregator.update(
1671 Price::from("1.00003"),
1672 Quantity::from(1),
1673 UnixNanos::from(2000),
1674 );
1675 aggregator.update(
1676 Price::from("1.00004"),
1677 Quantity::from(1),
1678 UnixNanos::from(3000),
1679 );
1680
1681 let handler_guard = handler.lock().unwrap();
1682 assert_eq!(handler_guard.len(), 2);
1683
1684 let bar1 = &handler_guard[0];
1685 assert_eq!(bar1.open, Price::from("1.00001"));
1686 assert_eq!(bar1.close, Price::from("1.00002"));
1687 assert_eq!(bar1.volume, Quantity::from(2));
1688
1689 let bar2 = &handler_guard[1];
1690 assert_eq!(bar2.open, Price::from("1.00003"));
1691 assert_eq!(bar2.close, Price::from("1.00004"));
1692 assert_eq!(bar2.volume, Quantity::from(2));
1693 }
1694
1695 #[rstest]
1696 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1697 let instrument = InstrumentAny::Equity(equity_aapl);
1698 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1699 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1700 let handler = Arc::new(Mutex::new(Vec::new()));
1701 let handler_clone = Arc::clone(&handler);
1702
1703 let mut aggregator = VolumeBarAggregator::new(
1704 bar_type,
1705 instrument.price_precision(),
1706 instrument.size_precision(),
1707 move |bar: Bar| {
1708 let mut handler_guard = handler_clone.lock().unwrap();
1709 handler_guard.push(bar);
1710 },
1711 false,
1712 );
1713
1714 aggregator.update(
1715 Price::from("1.00001"),
1716 Quantity::from(25),
1717 UnixNanos::default(),
1718 );
1719
1720 let handler_guard = handler.lock().unwrap();
1721 assert_eq!(handler_guard.len(), 2);
1722 let bar1 = &handler_guard[0];
1723 assert_eq!(bar1.volume, Quantity::from(10));
1724 let bar2 = &handler_guard[1];
1725 assert_eq!(bar2.volume, Quantity::from(10));
1726 }
1727
1728 #[rstest]
1729 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1730 let instrument = InstrumentAny::Equity(equity_aapl);
1731 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1733 let handler = Arc::new(Mutex::new(Vec::new()));
1734 let handler_clone = Arc::clone(&handler);
1735
1736 let mut aggregator = ValueBarAggregator::new(
1737 bar_type,
1738 instrument.price_precision(),
1739 instrument.size_precision(),
1740 move |bar: Bar| {
1741 let mut handler_guard = handler_clone.lock().unwrap();
1742 handler_guard.push(bar);
1743 },
1744 false,
1745 );
1746
1747 aggregator.update(
1749 Price::from("100.00"),
1750 Quantity::from(5),
1751 UnixNanos::default(),
1752 );
1753 aggregator.update(
1754 Price::from("100.00"),
1755 Quantity::from(5),
1756 UnixNanos::from(1000),
1757 );
1758
1759 let handler_guard = handler.lock().unwrap();
1760 assert_eq!(handler_guard.len(), 1);
1761 let bar = handler_guard.first().unwrap();
1762 assert_eq!(bar.volume, Quantity::from(10));
1763 }
1764
1765 #[rstest]
1766 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1767 let instrument = InstrumentAny::Equity(equity_aapl);
1768 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1769 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1770 let handler = Arc::new(Mutex::new(Vec::new()));
1771 let handler_clone = Arc::clone(&handler);
1772
1773 let mut aggregator = ValueBarAggregator::new(
1774 bar_type,
1775 instrument.price_precision(),
1776 instrument.size_precision(),
1777 move |bar: Bar| {
1778 let mut handler_guard = handler_clone.lock().unwrap();
1779 handler_guard.push(bar);
1780 },
1781 false,
1782 );
1783
1784 aggregator.update(
1786 Price::from("100.00"),
1787 Quantity::from(25),
1788 UnixNanos::default(),
1789 );
1790
1791 let handler_guard = handler.lock().unwrap();
1792 assert_eq!(handler_guard.len(), 2);
1793 let remaining_value = aggregator.get_cumulative_value();
1794 assert!(remaining_value < 1000.0); }
1796
1797 #[rstest]
1798 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1799 let instrument = InstrumentAny::Equity(equity_aapl);
1800 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1802 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1803 let handler = Arc::new(Mutex::new(Vec::new()));
1804 let handler_clone = Arc::clone(&handler);
1805 let clock = Rc::new(RefCell::new(TestClock::new()));
1806
1807 let mut aggregator = TimeBarAggregator::new(
1808 bar_type,
1809 instrument.price_precision(),
1810 instrument.size_precision(),
1811 clock.clone(),
1812 move |bar: Bar| {
1813 let mut handler_guard = handler_clone.lock().unwrap();
1814 handler_guard.push(bar);
1815 },
1816 false, true, false, BarIntervalType::LeftOpen,
1820 None, 15, false, );
1824
1825 aggregator.update(
1826 Price::from("100.00"),
1827 Quantity::from(1),
1828 UnixNanos::default(),
1829 );
1830
1831 let next_sec = UnixNanos::from(1_000_000_000);
1832 clock.borrow_mut().set_time(next_sec);
1833
1834 let event = TimeEvent::new(
1835 Ustr::from("1-SECOND-LAST"),
1836 UUID4::new(),
1837 next_sec,
1838 next_sec,
1839 );
1840 aggregator.build_bar(event);
1841
1842 let handler_guard = handler.lock().unwrap();
1843 assert_eq!(handler_guard.len(), 1);
1844 let bar = handler_guard.first().unwrap();
1845 assert_eq!(bar.ts_event, UnixNanos::default());
1846 assert_eq!(bar.ts_init, next_sec);
1847 }
1848
1849 #[rstest]
1850 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1851 let instrument = InstrumentAny::Equity(equity_aapl);
1852 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1853 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1854 let handler = Arc::new(Mutex::new(Vec::new()));
1855 let handler_clone = Arc::clone(&handler);
1856 let clock = Rc::new(RefCell::new(TestClock::new()));
1857
1858 let mut aggregator = TimeBarAggregator::new(
1859 bar_type,
1860 instrument.price_precision(),
1861 instrument.size_precision(),
1862 clock.clone(),
1863 move |bar: Bar| {
1864 let mut handler_guard = handler_clone.lock().unwrap();
1865 handler_guard.push(bar);
1866 },
1867 false, true, true, BarIntervalType::LeftOpen,
1871 None,
1872 15,
1873 false, );
1875
1876 aggregator.update(
1878 Price::from("100.00"),
1879 Quantity::from(1),
1880 UnixNanos::default(),
1881 );
1882
1883 let ts1 = UnixNanos::from(1_000_000_000);
1885 clock.borrow_mut().set_time(ts1);
1886 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1887 aggregator.build_bar(event);
1888
1889 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1891
1892 let ts2 = UnixNanos::from(2_000_000_000);
1894 clock.borrow_mut().set_time(ts2);
1895 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1896 aggregator.build_bar(event);
1897
1898 let handler_guard = handler.lock().unwrap();
1899 assert_eq!(handler_guard.len(), 2);
1900
1901 let bar1 = &handler_guard[0];
1902 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
1904 assert_eq!(bar1.close, Price::from("100.00"));
1905 let bar2 = &handler_guard[1];
1906 assert_eq!(bar2.ts_event, ts2);
1907 assert_eq!(bar2.ts_init, ts2);
1908 assert_eq!(bar2.close, Price::from("101.00"));
1909 }
1910
1911 #[rstest]
1912 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
1913 let instrument = InstrumentAny::Equity(equity_aapl);
1914 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1915 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1916 let handler = Arc::new(Mutex::new(Vec::new()));
1917 let handler_clone = Arc::clone(&handler);
1918 let clock = Rc::new(RefCell::new(TestClock::new()));
1919 let mut aggregator = TimeBarAggregator::new(
1920 bar_type,
1921 instrument.price_precision(),
1922 instrument.size_precision(),
1923 clock.clone(),
1924 move |bar: Bar| {
1925 let mut handler_guard = handler_clone.lock().unwrap();
1926 handler_guard.push(bar);
1927 },
1928 false, true, true, BarIntervalType::RightOpen,
1932 None,
1933 15,
1934 false, );
1936
1937 aggregator.update(
1939 Price::from("100.00"),
1940 Quantity::from(1),
1941 UnixNanos::default(),
1942 );
1943
1944 let ts1 = UnixNanos::from(1_000_000_000);
1946 clock.borrow_mut().set_time(ts1);
1947 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1948 aggregator.build_bar(event);
1949
1950 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1952
1953 let ts2 = UnixNanos::from(2_000_000_000);
1955 clock.borrow_mut().set_time(ts2);
1956 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1957 aggregator.build_bar(event);
1958
1959 let handler_guard = handler.lock().unwrap();
1960 assert_eq!(handler_guard.len(), 2);
1961
1962 let bar1 = &handler_guard[0];
1963 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
1965 assert_eq!(bar1.close, Price::from("100.00"));
1966
1967 let bar2 = &handler_guard[1];
1968 assert_eq!(bar2.ts_event, ts1);
1969 assert_eq!(bar2.ts_init, ts2);
1970 assert_eq!(bar2.close, Price::from("101.00"));
1971 }
1972
1973 #[rstest]
1974 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
1975 let instrument = InstrumentAny::Equity(equity_aapl);
1976 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1977 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1978 let handler = Arc::new(Mutex::new(Vec::new()));
1979 let handler_clone = Arc::clone(&handler);
1980 let clock = Rc::new(RefCell::new(TestClock::new()));
1981
1982 let mut aggregator = TimeBarAggregator::new(
1984 bar_type,
1985 instrument.price_precision(),
1986 instrument.size_precision(),
1987 clock.clone(),
1988 move |bar: Bar| {
1989 let mut handler_guard = handler_clone.lock().unwrap();
1990 handler_guard.push(bar);
1991 },
1992 false, false, true, BarIntervalType::LeftOpen,
1996 None,
1997 15,
1998 false, );
2000
2001 let ts1 = UnixNanos::from(1_000_000_000);
2003 clock.borrow_mut().set_time(ts1);
2004 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2005 aggregator.build_bar(event);
2006
2007 let handler_guard = handler.lock().unwrap();
2008 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
2010
2011 let handler = Arc::new(Mutex::new(Vec::new()));
2013 let handler_clone = Arc::clone(&handler);
2014 let mut aggregator = TimeBarAggregator::new(
2015 bar_type,
2016 instrument.price_precision(),
2017 instrument.size_precision(),
2018 clock.clone(),
2019 move |bar: Bar| {
2020 let mut handler_guard = handler_clone.lock().unwrap();
2021 handler_guard.push(bar);
2022 },
2023 false,
2024 true, true, BarIntervalType::LeftOpen,
2027 None,
2028 15,
2029 false, );
2031
2032 aggregator.update(
2033 Price::from("100.00"),
2034 Quantity::from(1),
2035 UnixNanos::default(),
2036 );
2037
2038 let ts1 = UnixNanos::from(1_000_000_000);
2040 clock.borrow_mut().set_time(ts1);
2041 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2042 aggregator.build_bar(event);
2043
2044 let ts2 = UnixNanos::from(2_000_000_000);
2046 clock.borrow_mut().set_time(ts2);
2047 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2048 aggregator.build_bar(event);
2049
2050 let handler_guard = handler.lock().unwrap();
2051 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
2053 assert_eq!(bar1.close, Price::from("100.00"));
2054 let bar2 = &handler_guard[1];
2055 assert_eq!(bar2.close, Price::from("100.00")); }
2057
2058 #[rstest]
2059 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2060 let instrument = InstrumentAny::Equity(equity_aapl);
2061 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2062 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2063 let clock = Rc::new(RefCell::new(TestClock::new()));
2064 let handler = Arc::new(Mutex::new(Vec::new()));
2065 let handler_clone = Arc::clone(&handler);
2066
2067 let mut aggregator = TimeBarAggregator::new(
2068 bar_type,
2069 instrument.price_precision(),
2070 instrument.size_precision(),
2071 clock.clone(),
2072 move |bar: Bar| {
2073 let mut handler_guard = handler_clone.lock().unwrap();
2074 handler_guard.push(bar);
2075 },
2076 false, true, true, BarIntervalType::RightOpen,
2080 None,
2081 15,
2082 false, );
2084
2085 let ts1 = UnixNanos::from(1_000_000_000);
2086 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2087
2088 let ts2 = UnixNanos::from(2_000_000_000);
2089 clock.borrow_mut().set_time(ts2);
2090
2091 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2093 aggregator.build_bar(event);
2094
2095 let handler_guard = handler.lock().unwrap();
2096 let bar = handler_guard.first().unwrap();
2097 assert_eq!(bar.ts_event, UnixNanos::default());
2098 assert_eq!(bar.ts_init, ts2);
2099 }
2100
2101 #[rstest]
2102 fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2103 let instrument = InstrumentAny::Equity(equity_aapl);
2104 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2105 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2106 let clock = Rc::new(RefCell::new(TestClock::new()));
2107 let handler = Arc::new(Mutex::new(Vec::new()));
2108 let handler_clone = Arc::clone(&handler);
2109
2110 let mut aggregator = TimeBarAggregator::new(
2111 bar_type,
2112 instrument.price_precision(),
2113 instrument.size_precision(),
2114 clock.clone(),
2115 move |bar: Bar| {
2116 let mut handler_guard = handler_clone.lock().unwrap();
2117 handler_guard.push(bar);
2118 },
2119 false, true, true, BarIntervalType::LeftOpen,
2123 None,
2124 15,
2125 false, );
2127
2128 let ts1 = UnixNanos::from(1_000_000_000);
2129 clock.borrow_mut().set_time(ts1);
2130
2131 let initial_time = clock.borrow().utc_now();
2132 aggregator.start_batch_time(UnixNanos::from(
2133 initial_time.timestamp_nanos_opt().unwrap() as u64
2134 ));
2135
2136 let handler_guard = handler.lock().unwrap();
2137 assert_eq!(handler_guard.len(), 0);
2138 }
2139}