1use std::{cell::RefCell, ops::Add, rc::Rc};
19
20use chrono::TimeDelta;
21use nautilus_common::{
22 clock::Clock,
23 timer::{TimeEvent, TimeEventCallback},
24};
25use nautilus_core::{
26 UnixNanos,
27 correctness::{self, FAILED},
28 datetime::{add_n_months_nanos, subtract_n_months_nanos},
29};
30use nautilus_model::{
31 data::{
32 QuoteTick, TradeTick,
33 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
34 },
35 enums::{AggregationSource, BarAggregation, BarIntervalType},
36 types::{Price, Quantity, fixed::FIXED_SCALAR, quantity::QuantityRaw},
37};
38
39pub trait BarAggregator {
40 fn bar_type(&self) -> BarType;
42 fn is_running(&self) -> bool;
44 fn set_await_partial(&mut self, value: bool);
45 fn set_is_running(&mut self, value: bool);
46 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos);
48 fn handle_quote(&mut self, quote: QuoteTick) {
50 let spec = self.bar_type().spec();
51 if !self.await_partial() {
52 self.update(
53 quote.extract_price(spec.price_type),
54 quote.extract_size(spec.price_type),
55 quote.ts_event,
56 );
57 }
58 }
59 fn handle_trade(&mut self, trade: TradeTick) {
61 if !self.await_partial() {
62 self.update(trade.price, trade.size, trade.ts_event);
63 }
64 }
65 fn handle_bar(&mut self, bar: Bar) {
67 if !self.await_partial() {
68 self.update_bar(bar, bar.volume, bar.ts_init);
69 }
70 }
71 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
72 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
73 fn stop_batch_update(&mut self);
74 fn await_partial(&self) -> bool;
75 fn set_partial(&mut self, partial_bar: Bar);
76}
77
78pub struct BarBuilder {
80 bar_type: BarType,
81 price_precision: u8,
82 size_precision: u8,
83 initialized: bool,
84 ts_last: UnixNanos,
85 count: usize,
86 partial_set: bool,
87 last_close: Option<Price>,
88 open: Option<Price>,
89 high: Option<Price>,
90 low: Option<Price>,
91 close: Option<Price>,
92 volume: Quantity,
93}
94
95impl BarBuilder {
96 #[must_use]
104 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
105 correctness::check_equal(
106 bar_type.aggregation_source(),
107 AggregationSource::Internal,
108 "bar_type.aggregation_source",
109 "AggregationSource::Internal",
110 )
111 .expect(FAILED);
112
113 Self {
114 bar_type,
115 price_precision,
116 size_precision,
117 initialized: false,
118 ts_last: UnixNanos::default(),
119 count: 0,
120 partial_set: false,
121 last_close: None,
122 open: None,
123 high: None,
124 low: None,
125 close: None,
126 volume: Quantity::zero(size_precision),
127 }
128 }
129
130 pub fn set_partial(&mut self, partial_bar: Bar) {
132 if self.partial_set {
133 return; }
135
136 self.open = Some(partial_bar.open);
137
138 if self.high.is_none() || partial_bar.high > self.high.unwrap() {
139 self.high = Some(partial_bar.high);
140 }
141
142 if self.low.is_none() || partial_bar.low < self.low.unwrap() {
143 self.low = Some(partial_bar.low);
144 }
145
146 if self.close.is_none() {
147 self.close = Some(partial_bar.close);
148 }
149
150 self.volume = partial_bar.volume;
151
152 if self.ts_last == 0 {
153 self.ts_last = partial_bar.ts_init;
154 }
155
156 self.partial_set = true;
157 self.initialized = true;
158 }
159
160 pub fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
162 if ts_event < self.ts_last {
163 return; }
165
166 if self.open.is_none() {
167 self.open = Some(price);
168 self.high = Some(price);
169 self.low = Some(price);
170 self.initialized = true;
171 } else {
172 if price > self.high.unwrap() {
173 self.high = Some(price);
174 }
175 if price < self.low.unwrap() {
176 self.low = Some(price);
177 }
178 }
179
180 self.close = Some(price);
181 self.volume = self.volume.add(size);
182 self.count += 1;
183 self.ts_last = ts_event;
184 }
185
186 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
187 if ts_init < self.ts_last {
188 return; }
190
191 if self.open.is_none() {
192 self.open = Some(bar.open);
193 self.high = Some(bar.high);
194 self.low = Some(bar.low);
195 self.initialized = true;
196 } else {
197 if bar.high > self.high.unwrap() {
198 self.high = Some(bar.high);
199 }
200 if bar.low < self.low.unwrap() {
201 self.low = Some(bar.low);
202 }
203 }
204
205 self.close = Some(bar.close);
206 self.volume = self.volume.add(volume);
207 self.count += 1;
208 self.ts_last = ts_init;
209 }
210
211 pub fn reset(&mut self) {
215 self.open = None;
216 self.high = None;
217 self.low = None;
218 self.volume = Quantity::zero(self.size_precision);
219 self.count = 0;
220 }
221
222 pub fn build_now(&mut self) -> Bar {
224 self.build(self.ts_last, self.ts_last)
225 }
226
227 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
229 if self.open.is_none() {
230 self.open = self.last_close;
231 self.high = self.last_close;
232 self.low = self.last_close;
233 self.close = self.last_close;
234 }
235
236 let bar = Bar::new(
238 self.bar_type,
239 self.open.unwrap(),
240 self.high.unwrap(),
241 self.low.unwrap(),
242 self.close.unwrap(),
243 self.volume,
244 ts_event,
245 ts_init,
246 );
247
248 self.last_close = self.close;
249 self.reset();
250 bar
251 }
252}
253
254pub struct BarAggregatorCore<H>
256where
257 H: FnMut(Bar),
258{
259 bar_type: BarType,
260 builder: BarBuilder,
261 handler: H,
262 handler_backup: Option<H>,
263 batch_handler: Option<Box<dyn FnMut(Bar)>>,
264 await_partial: bool,
265 is_running: bool,
266 batch_mode: bool,
267}
268
269impl<H> BarAggregatorCore<H>
270where
271 H: FnMut(Bar),
272{
273 pub fn new(
281 bar_type: BarType,
282 price_precision: u8,
283 size_precision: u8,
284 handler: H,
285 await_partial: bool,
286 ) -> Self {
287 Self {
288 bar_type,
289 builder: BarBuilder::new(bar_type, price_precision, size_precision),
290 handler,
291 handler_backup: None,
292 batch_handler: None,
293 await_partial,
294 is_running: false,
295 batch_mode: false,
296 }
297 }
298
299 pub const fn set_await_partial(&mut self, value: bool) {
300 self.await_partial = value;
301 }
302
303 pub const fn set_is_running(&mut self, value: bool) {
304 self.is_running = value;
305 }
306
307 pub const fn await_partial(&self) -> bool {
308 self.await_partial
309 }
310
311 pub fn set_partial(&mut self, partial_bar: Bar) {
313 self.builder.set_partial(partial_bar);
314 }
315
316 fn apply_update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
317 self.builder.update(price, size, ts_event);
318 }
319
320 fn build_now_and_send(&mut self) {
321 let bar = self.builder.build_now();
322 (self.handler)(bar);
323 }
324
325 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
326 let bar = self.builder.build(ts_event, ts_init);
327
328 if self.batch_mode {
329 if let Some(handler) = &mut self.batch_handler {
330 handler(bar);
331 }
332 } else {
333 (self.handler)(bar);
334 }
335 }
336
337 pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
338 self.batch_mode = true;
339 self.batch_handler = Some(handler);
340 }
341
342 pub fn stop_batch_update(&mut self) {
343 self.batch_mode = false;
344
345 if let Some(handler) = self.handler_backup.take() {
346 self.handler = handler;
347 }
348 }
349}
350
351pub struct TickBarAggregator<H>
356where
357 H: FnMut(Bar),
358{
359 core: BarAggregatorCore<H>,
360 cum_value: f64,
361}
362
363impl<H> TickBarAggregator<H>
364where
365 H: FnMut(Bar),
366{
367 pub fn new(
375 bar_type: BarType,
376 price_precision: u8,
377 size_precision: u8,
378 handler: H,
379 await_partial: bool,
380 ) -> Self {
381 Self {
382 core: BarAggregatorCore::new(
383 bar_type,
384 price_precision,
385 size_precision,
386 handler,
387 await_partial,
388 ),
389 cum_value: 0.0,
390 }
391 }
392}
393
394impl<H> BarAggregator for TickBarAggregator<H>
395where
396 H: FnMut(Bar),
397{
398 fn bar_type(&self) -> BarType {
399 self.core.bar_type
400 }
401
402 fn is_running(&self) -> bool {
403 self.core.is_running
404 }
405
406 fn set_await_partial(&mut self, value: bool) {
407 self.core.set_await_partial(value);
408 }
409
410 fn set_is_running(&mut self, value: bool) {
411 self.core.set_is_running(value);
412 }
413
414 fn await_partial(&self) -> bool {
415 self.core.await_partial()
416 }
417
418 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
420 self.core.apply_update(price, size, ts_event);
421 let spec = self.core.bar_type.spec();
422
423 if self.core.builder.count >= spec.step.get() {
424 self.core.build_now_and_send();
425 }
426 }
427
428 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
429 let mut volume_update = volume;
430 let average_price = Price::new(
431 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
432 self.core.builder.price_precision,
433 );
434
435 while volume_update.as_f64() > 0.0 {
436 let value_update = average_price.as_f64() * volume_update.as_f64();
437 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
438 self.cum_value += value_update;
439 self.core.builder.update_bar(bar, volume_update, ts_init);
440 break;
441 }
442
443 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
444 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
445 self.core.builder.update_bar(
446 bar,
447 Quantity::new(volume_diff, volume_update.precision),
448 ts_init,
449 );
450
451 self.core.build_now_and_send();
452 self.cum_value = 0.0;
453 volume_update = Quantity::new(
454 volume_update.as_f64() - volume_diff,
455 volume_update.precision,
456 );
457 }
458 }
459
460 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
461 self.core.start_batch_update(handler);
462 }
463
464 fn stop_batch_update(&mut self) {
465 self.core.stop_batch_update();
466 }
467
468 fn set_partial(&mut self, partial_bar: Bar) {
469 self.core.set_partial(partial_bar);
470 }
471}
472
473pub struct VolumeBarAggregator<H>
475where
476 H: FnMut(Bar),
477{
478 core: BarAggregatorCore<H>,
479}
480
481impl<H> VolumeBarAggregator<H>
482where
483 H: FnMut(Bar),
484{
485 pub fn new(
493 bar_type: BarType,
494 price_precision: u8,
495 size_precision: u8,
496 handler: H,
497 await_partial: bool,
498 ) -> Self {
499 Self {
500 core: BarAggregatorCore::new(
501 bar_type.standard(),
502 price_precision,
503 size_precision,
504 handler,
505 await_partial,
506 ),
507 }
508 }
509}
510
511impl<H> BarAggregator for VolumeBarAggregator<H>
512where
513 H: FnMut(Bar),
514{
515 fn bar_type(&self) -> BarType {
516 self.core.bar_type
517 }
518
519 fn is_running(&self) -> bool {
520 self.core.is_running
521 }
522
523 fn set_await_partial(&mut self, value: bool) {
524 self.core.set_await_partial(value);
525 }
526
527 fn set_is_running(&mut self, value: bool) {
528 self.core.set_is_running(value);
529 }
530
531 fn await_partial(&self) -> bool {
532 self.core.await_partial()
533 }
534
535 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
537 let mut raw_size_update = size.raw;
538 let spec = self.core.bar_type.spec();
539 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
540
541 while raw_size_update > 0 {
542 if self.core.builder.volume.raw + raw_size_update < raw_step {
543 self.core.apply_update(
544 price,
545 Quantity::from_raw(raw_size_update, size.precision),
546 ts_event,
547 );
548 break;
549 }
550
551 let raw_size_diff = raw_step - self.core.builder.volume.raw;
552 self.core.apply_update(
553 price,
554 Quantity::from_raw(raw_size_diff, size.precision),
555 ts_event,
556 );
557
558 self.core.build_now_and_send();
559 raw_size_update -= raw_size_diff;
560 }
561 }
562
563 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
564 let mut raw_volume_update = volume.raw;
565 let spec = self.core.bar_type.spec();
566 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
567
568 while raw_volume_update > 0 {
569 if self.core.builder.volume.raw + raw_volume_update < raw_step {
570 self.core.builder.update_bar(
571 bar,
572 Quantity::from_raw(raw_volume_update, volume.precision),
573 ts_init,
574 );
575 break;
576 }
577
578 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
579 self.core.builder.update_bar(
580 bar,
581 Quantity::from_raw(raw_volume_diff, volume.precision),
582 ts_init,
583 );
584
585 self.core.build_now_and_send();
586 raw_volume_update -= raw_volume_diff;
587 }
588 }
589
590 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
591 self.core.start_batch_update(handler);
592 }
593
594 fn stop_batch_update(&mut self) {
595 self.core.stop_batch_update();
596 }
597
598 fn set_partial(&mut self, partial_bar: Bar) {
599 self.core.set_partial(partial_bar);
600 }
601}
602
603pub struct ValueBarAggregator<H>
608where
609 H: FnMut(Bar),
610{
611 core: BarAggregatorCore<H>,
612 cum_value: f64,
613}
614
615impl<H> ValueBarAggregator<H>
616where
617 H: FnMut(Bar),
618{
619 pub fn new(
627 bar_type: BarType,
628 price_precision: u8,
629 size_precision: u8,
630 handler: H,
631 await_partial: bool,
632 ) -> Self {
633 Self {
634 core: BarAggregatorCore::new(
635 bar_type.standard(),
636 price_precision,
637 size_precision,
638 handler,
639 await_partial,
640 ),
641 cum_value: 0.0,
642 }
643 }
644
645 #[must_use]
646 pub const fn get_cumulative_value(&self) -> f64 {
648 self.cum_value
649 }
650}
651
652impl<H> BarAggregator for ValueBarAggregator<H>
653where
654 H: FnMut(Bar),
655{
656 fn bar_type(&self) -> BarType {
657 self.core.bar_type
658 }
659
660 fn is_running(&self) -> bool {
661 self.core.is_running
662 }
663
664 fn set_await_partial(&mut self, value: bool) {
665 self.core.set_await_partial(value);
666 }
667
668 fn set_is_running(&mut self, value: bool) {
669 self.core.set_is_running(value);
670 }
671
672 fn await_partial(&self) -> bool {
673 self.core.await_partial()
674 }
675
676 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
678 let mut size_update = size.as_f64();
679 let spec = self.core.bar_type.spec();
680
681 while size_update > 0.0 {
682 let value_update = price.as_f64() * size_update;
683 if self.cum_value + value_update < spec.step.get() as f64 {
684 self.cum_value += value_update;
685 self.core
686 .apply_update(price, Quantity::new(size_update, size.precision), ts_event);
687 break;
688 }
689
690 let value_diff = spec.step.get() as f64 - self.cum_value;
691 let size_diff = size_update * (value_diff / value_update);
692 self.core
693 .apply_update(price, Quantity::new(size_diff, size.precision), ts_event);
694
695 self.core.build_now_and_send();
696 self.cum_value = 0.0;
697 size_update -= size_diff;
698 }
699 }
700
701 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
702 let mut volume_update = volume;
703 let average_price = Price::new(
704 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
705 self.core.builder.price_precision,
706 );
707
708 while volume_update.as_f64() > 0.0 {
709 let value_update = average_price.as_f64() * volume_update.as_f64();
710 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
711 self.cum_value += value_update;
712 self.core.builder.update_bar(bar, volume_update, ts_init);
713 break;
714 }
715
716 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
717 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
718 self.core.builder.update_bar(
719 bar,
720 Quantity::new(volume_diff, volume_update.precision),
721 ts_init,
722 );
723
724 self.core.build_now_and_send();
725 self.cum_value = 0.0;
726 volume_update = Quantity::new(
727 volume_update.as_f64() - volume_diff,
728 volume_update.precision,
729 );
730 }
731 }
732
733 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
734 self.core.start_batch_update(handler);
735 }
736
737 fn stop_batch_update(&mut self) {
738 self.core.stop_batch_update();
739 }
740
741 fn set_partial(&mut self, partial_bar: Bar) {
742 self.core.set_partial(partial_bar);
743 }
744}
745
746pub struct TimeBarAggregator<H>
750where
751 H: FnMut(Bar),
752{
753 core: BarAggregatorCore<H>,
754 clock: Rc<RefCell<dyn Clock>>,
755 build_with_no_updates: bool,
756 timestamp_on_close: bool,
757 is_left_open: bool,
758 build_on_next_tick: bool,
759 stored_open_ns: UnixNanos,
760 stored_close_ns: UnixNanos,
761 timer_name: String,
762 interval_ns: UnixNanos,
763 next_close_ns: UnixNanos,
764 composite_bar_build_delay: i64,
765 add_delay: bool,
766 batch_open_ns: UnixNanos,
767 batch_next_close_ns: UnixNanos,
768 time_bars_origin: Option<TimeDelta>,
769 skip_first_non_full_bar: bool,
770}
771
772#[derive(Clone)]
773pub struct NewBarCallback<H: FnMut(Bar)> {
774 aggregator: Rc<RefCell<TimeBarAggregator<H>>>,
775}
776
777impl<H: FnMut(Bar)> NewBarCallback<H> {
778 pub const fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
779 Self { aggregator }
780 }
781}
782
783impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
784 fn from(value: NewBarCallback<H>) -> Self {
785 Self::Rust(Rc::new(move |event: TimeEvent| {
786 value.aggregator.borrow_mut().build_bar(event);
787 }))
788 }
789}
790
791impl<H> TimeBarAggregator<H>
792where
793 H: FnMut(Bar) + 'static,
794{
795 #[allow(clippy::too_many_arguments)]
803 pub fn new(
804 bar_type: BarType,
805 price_precision: u8,
806 size_precision: u8,
807 clock: Rc<RefCell<dyn Clock>>,
808 handler: H,
809 await_partial: bool,
810 build_with_no_updates: bool,
811 timestamp_on_close: bool,
812 interval_type: BarIntervalType,
813 time_bars_origin: Option<TimeDelta>,
814 composite_bar_build_delay: i64,
815 skip_first_non_full_bar: bool,
816 ) -> Self {
817 let is_left_open = match interval_type {
818 BarIntervalType::LeftOpen => true,
819 BarIntervalType::RightOpen => false,
820 };
821
822 let add_delay = bar_type.is_composite()
823 && bar_type.composite().aggregation_source() == AggregationSource::Internal;
824
825 let core = BarAggregatorCore::new(
826 bar_type.standard(),
827 price_precision,
828 size_precision,
829 handler,
830 await_partial,
831 );
832
833 Self {
834 core,
835 clock,
836 build_with_no_updates,
837 timestamp_on_close,
838 is_left_open,
839 build_on_next_tick: false,
840 stored_open_ns: UnixNanos::default(),
841 stored_close_ns: UnixNanos::default(),
842 timer_name: bar_type.to_string(),
843 interval_ns: get_bar_interval_ns(&bar_type),
844 next_close_ns: UnixNanos::default(),
845 composite_bar_build_delay,
846 add_delay,
847 batch_open_ns: UnixNanos::default(),
848 batch_next_close_ns: UnixNanos::default(),
849 time_bars_origin,
850 skip_first_non_full_bar,
851 }
852 }
853
854 pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
856 let now = self.clock.borrow().utc_now();
857 let mut start_time = get_time_bar_start(now, &self.bar_type(), self.time_bars_origin);
858
859 if start_time == now {
860 self.skip_first_non_full_bar = false;
861 }
862
863 if self.add_delay {
864 start_time += TimeDelta::microseconds(self.composite_bar_build_delay);
865 }
866
867 let spec = &self.bar_type().spec();
868 let start_time_ns = UnixNanos::from(start_time);
869
870 if spec.aggregation == BarAggregation::Month {
871 let step = spec.step.get() as u32;
872 let alert_time_ns = add_n_months_nanos(start_time_ns, step);
873
874 self.clock
875 .borrow_mut()
876 .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()))
877 .expect(FAILED);
878 } else {
879 self.clock
880 .borrow_mut()
881 .set_timer_ns(
882 &self.timer_name,
883 self.interval_ns.as_u64(),
884 start_time_ns,
885 None,
886 Some(callback.into()),
887 )
888 .expect(FAILED);
889 }
890
891 log::debug!("Started timer {}", self.timer_name);
892 Ok(())
893 }
894
895 pub fn stop(&mut self) {
897 self.clock.borrow_mut().cancel_timer(&self.timer_name);
898 }
899
900 pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
901 let spec = self.bar_type().spec();
902 self.core.batch_mode = true;
903
904 let time = time_ns.to_datetime_utc();
905 let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin);
906 self.batch_open_ns = UnixNanos::from(start_time);
907
908 if spec.aggregation == BarAggregation::Month {
909 let step = spec.step.get() as u32;
910
911 if self.batch_open_ns == time_ns {
912 self.batch_open_ns = subtract_n_months_nanos(self.batch_open_ns, step);
913 }
914
915 self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step);
916 } else {
917 if self.batch_open_ns == time_ns {
918 self.batch_open_ns -= self.interval_ns;
919 }
920
921 self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
922 }
923 }
924
925 const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
926 if self.is_left_open {
927 if self.timestamp_on_close {
928 close_ns
929 } else {
930 open_ns
931 }
932 } else {
933 open_ns
934 }
935 }
936
937 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
938 if self.skip_first_non_full_bar {
939 self.core.builder.reset();
940 self.skip_first_non_full_bar = false;
941 } else {
942 self.core.build_and_send(ts_event, ts_init);
943 }
944 }
945
946 fn batch_pre_update(&mut self, time_ns: UnixNanos) {
947 if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
948 let ts_init = self.batch_next_close_ns;
949 let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
950 self.build_and_send(ts_event, ts_init);
951 }
952 }
953
954 fn batch_post_update(&mut self, time_ns: UnixNanos) {
955 let step = self.bar_type().spec().step.get() as u32;
956
957 if !self.core.batch_mode
959 && time_ns == self.batch_next_close_ns
960 && time_ns > self.stored_open_ns
961 {
962 self.batch_next_close_ns = UnixNanos::default();
963 return;
964 }
965
966 if time_ns > self.batch_next_close_ns {
967 if self.bar_type().spec().aggregation == BarAggregation::Month {
969 while self.batch_next_close_ns < time_ns {
970 self.batch_next_close_ns = add_n_months_nanos(self.batch_next_close_ns, step);
971 }
972
973 self.batch_open_ns = subtract_n_months_nanos(self.batch_next_close_ns, step);
974 } else {
975 while self.batch_next_close_ns < time_ns {
976 self.batch_next_close_ns += self.interval_ns;
977 }
978
979 self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
980 }
981 }
982
983 if time_ns == self.batch_next_close_ns {
984 let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
985 self.build_and_send(ts_event, time_ns);
986 self.batch_open_ns = self.batch_next_close_ns;
987
988 if self.bar_type().spec().aggregation == BarAggregation::Month {
989 self.batch_next_close_ns = add_n_months_nanos(self.batch_next_close_ns, step);
990 } else {
991 self.batch_next_close_ns += self.interval_ns;
992 }
993 }
994
995 if !self.core.batch_mode {
997 self.batch_next_close_ns = UnixNanos::default();
998 }
999 }
1000
1001 fn build_bar(&mut self, event: TimeEvent) {
1002 if !self.core.builder.initialized {
1003 self.build_on_next_tick = true;
1004 self.stored_close_ns = self.next_close_ns;
1005 return;
1006 }
1007
1008 if !self.build_with_no_updates && self.core.builder.count == 0 {
1009 return;
1010 }
1011
1012 let ts_init = event.ts_event;
1013 let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1014 self.build_and_send(ts_event, ts_init);
1015
1016 self.stored_open_ns = ts_init;
1017
1018 if self.bar_type().spec().aggregation == BarAggregation::Month {
1019 let step = self.bar_type().spec().step.get() as u32;
1020 let next_alert_ns = add_n_months_nanos(ts_init, step);
1021
1022 self.clock
1023 .borrow_mut()
1024 .set_time_alert_ns(&self.timer_name, next_alert_ns, None)
1025 .expect(FAILED);
1026
1027 self.next_close_ns = next_alert_ns;
1028 } else {
1029 self.next_close_ns = self.clock.borrow().next_time_ns(&self.timer_name);
1030 }
1031 }
1032}
1033
1034impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1035where
1036 H: FnMut(Bar) + 'static,
1037{
1038 fn bar_type(&self) -> BarType {
1039 self.core.bar_type
1040 }
1041
1042 fn is_running(&self) -> bool {
1043 self.core.is_running
1044 }
1045
1046 fn set_await_partial(&mut self, value: bool) {
1047 self.core.set_await_partial(value);
1048 }
1049
1050 fn set_is_running(&mut self, value: bool) {
1051 self.core.set_is_running(value);
1052 }
1053
1054 fn await_partial(&self) -> bool {
1055 self.core.await_partial()
1056 }
1057
1058 fn update(&mut self, price: Price, size: Quantity, ts_event: UnixNanos) {
1059 if self.batch_next_close_ns != UnixNanos::default() {
1060 self.batch_pre_update(ts_event);
1061 }
1062
1063 self.core.apply_update(price, size, ts_event);
1064
1065 if self.build_on_next_tick {
1066 if ts_event <= self.stored_close_ns {
1067 let ts_init = ts_event;
1068 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1069 self.build_and_send(ts_event, ts_init);
1070 }
1071
1072 self.build_on_next_tick = false;
1073 self.stored_close_ns = UnixNanos::default();
1074 }
1075
1076 if self.batch_next_close_ns != UnixNanos::default() {
1077 self.batch_post_update(ts_event);
1078 }
1079 }
1080
1081 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1082 if self.batch_next_close_ns != UnixNanos::default() {
1083 self.batch_pre_update(ts_init);
1084 }
1085
1086 self.core.builder.update_bar(bar, volume, ts_init);
1087
1088 if self.build_on_next_tick {
1089 if ts_init <= self.stored_close_ns {
1090 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1091 self.build_and_send(ts_event, ts_init);
1092 }
1093
1094 self.build_on_next_tick = false;
1096 self.stored_close_ns = UnixNanos::default();
1097 }
1098
1099 if self.batch_next_close_ns != UnixNanos::default() {
1100 self.batch_post_update(ts_init);
1101 }
1102 }
1103
1104 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1105 self.core.start_batch_update(handler);
1106 self.start_batch_time(time_ns);
1107 }
1108
1109 fn stop_batch_update(&mut self) {
1110 self.core.stop_batch_update();
1111 }
1112
1113 fn set_partial(&mut self, partial_bar: Bar) {
1114 self.core.set_partial(partial_bar);
1115 }
1116}
1117
1118#[cfg(test)]
1122mod tests {
1123 use std::sync::{Arc, Mutex};
1124
1125 use nautilus_common::clock::TestClock;
1126 use nautilus_core::UUID4;
1127 use nautilus_model::{
1128 data::{BarSpecification, BarType},
1129 enums::{AggregationSource, BarAggregation, PriceType},
1130 instruments::{CurrencyPair, Equity, InstrumentAny, stubs::*},
1131 types::{Price, Quantity},
1132 };
1133 use rstest::rstest;
1134 use ustr::Ustr;
1135
1136 use super::*;
1137
1138 #[rstest]
1139 fn test_bar_builder_initialization(equity_aapl: Equity) {
1140 let instrument = InstrumentAny::Equity(equity_aapl);
1141 let bar_type = BarType::new(
1142 instrument.id(),
1143 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1144 AggregationSource::Internal,
1145 );
1146 let builder = BarBuilder::new(
1147 bar_type,
1148 instrument.price_precision(),
1149 instrument.size_precision(),
1150 );
1151
1152 assert!(!builder.initialized);
1153 assert_eq!(builder.ts_last, 0);
1154 assert_eq!(builder.count, 0);
1155 }
1156
1157 #[rstest]
1158 fn test_set_partial_update(equity_aapl: Equity) {
1159 let instrument = InstrumentAny::Equity(equity_aapl);
1160 let bar_type = BarType::new(
1161 instrument.id(),
1162 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1163 AggregationSource::Internal,
1164 );
1165 let mut builder = BarBuilder::new(
1166 bar_type,
1167 instrument.price_precision(),
1168 instrument.size_precision(),
1169 );
1170
1171 let partial_bar = Bar::new(
1172 bar_type,
1173 Price::from("101.00"),
1174 Price::from("102.00"),
1175 Price::from("100.00"),
1176 Price::from("101.00"),
1177 Quantity::from(100),
1178 UnixNanos::from(1),
1179 UnixNanos::from(2),
1180 );
1181
1182 builder.set_partial(partial_bar);
1183 let bar = builder.build_now();
1184
1185 assert_eq!(bar.open, partial_bar.open);
1186 assert_eq!(bar.high, partial_bar.high);
1187 assert_eq!(bar.low, partial_bar.low);
1188 assert_eq!(bar.close, partial_bar.close);
1189 assert_eq!(bar.volume, partial_bar.volume);
1190 assert_eq!(builder.ts_last, 2);
1191 }
1192
1193 #[rstest]
1194 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1195 let instrument = InstrumentAny::Equity(equity_aapl);
1196 let bar_type = BarType::new(
1197 instrument.id(),
1198 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1199 AggregationSource::Internal,
1200 );
1201 let mut builder = BarBuilder::new(
1202 bar_type,
1203 instrument.price_precision(),
1204 instrument.size_precision(),
1205 );
1206
1207 builder.update(
1208 Price::from("100.00"),
1209 Quantity::from(1),
1210 UnixNanos::from(1000),
1211 );
1212 builder.update(
1213 Price::from("95.00"),
1214 Quantity::from(1),
1215 UnixNanos::from(2000),
1216 );
1217 builder.update(
1218 Price::from("105.00"),
1219 Quantity::from(1),
1220 UnixNanos::from(3000),
1221 );
1222
1223 let bar = builder.build_now();
1224 assert!(bar.high > bar.low);
1225 assert_eq!(bar.open, Price::from("100.00"));
1226 assert_eq!(bar.high, Price::from("105.00"));
1227 assert_eq!(bar.low, Price::from("95.00"));
1228 assert_eq!(bar.close, Price::from("105.00"));
1229 }
1230
1231 #[rstest]
1232 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1233 let instrument = InstrumentAny::Equity(equity_aapl);
1234 let bar_type = BarType::new(
1235 instrument.id(),
1236 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1237 AggregationSource::Internal,
1238 );
1239 let mut builder = BarBuilder::new(
1240 bar_type,
1241 instrument.price_precision(),
1242 instrument.size_precision(),
1243 );
1244
1245 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1246 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1247
1248 assert_eq!(builder.ts_last, 1_000);
1249 assert_eq!(builder.count, 1);
1250 }
1251
1252 #[rstest]
1253 fn test_bar_builder_set_partial_updates_bar_to_expected_properties(audusd_sim: CurrencyPair) {
1254 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1255 let bar_type = BarType::new(
1256 instrument.id(),
1257 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1258 AggregationSource::Internal,
1259 );
1260 let mut builder = BarBuilder::new(
1261 bar_type,
1262 instrument.price_precision(),
1263 instrument.size_precision(),
1264 );
1265
1266 let partial_bar = Bar::new(
1267 bar_type,
1268 Price::from("1.00001"),
1269 Price::from("1.00010"),
1270 Price::from("1.00000"),
1271 Price::from("1.00002"),
1272 Quantity::from(1),
1273 UnixNanos::from(1_000_000_000),
1274 UnixNanos::from(2_000_000_000),
1275 );
1276
1277 builder.set_partial(partial_bar);
1278 let bar = builder.build_now();
1279
1280 assert_eq!(bar.open, Price::from("1.00001"));
1281 assert_eq!(bar.high, Price::from("1.00010"));
1282 assert_eq!(bar.low, Price::from("1.00000"));
1283 assert_eq!(bar.close, Price::from("1.00002"));
1284 assert_eq!(bar.volume, Quantity::from(1));
1285 assert_eq!(bar.ts_init, 2_000_000_000);
1286 assert_eq!(builder.ts_last, 2_000_000_000);
1287 }
1288
1289 #[rstest]
1290 fn test_bar_builder_set_partial_when_already_set_does_not_update(audusd_sim: CurrencyPair) {
1291 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
1292 let bar_type = BarType::new(
1293 instrument.id(),
1294 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1295 AggregationSource::Internal,
1296 );
1297 let mut builder = BarBuilder::new(
1298 bar_type,
1299 instrument.price_precision(),
1300 instrument.size_precision(),
1301 );
1302
1303 let partial_bar1 = Bar::new(
1304 bar_type,
1305 Price::from("1.00001"),
1306 Price::from("1.00010"),
1307 Price::from("1.00000"),
1308 Price::from("1.00002"),
1309 Quantity::from(1),
1310 UnixNanos::from(1_000_000_000),
1311 UnixNanos::from(1_000_000_000),
1312 );
1313
1314 let partial_bar2 = Bar::new(
1315 bar_type,
1316 Price::from("2.00001"),
1317 Price::from("2.00010"),
1318 Price::from("2.00000"),
1319 Price::from("2.00002"),
1320 Quantity::from(2),
1321 UnixNanos::from(3_000_000_000),
1322 UnixNanos::from(3_000_000_000),
1323 );
1324
1325 builder.set_partial(partial_bar1);
1326 builder.set_partial(partial_bar2);
1327 let bar = builder.build(
1328 UnixNanos::from(4_000_000_000),
1329 UnixNanos::from(4_000_000_000),
1330 );
1331
1332 assert_eq!(bar.open, Price::from("1.00001"));
1333 assert_eq!(bar.high, Price::from("1.00010"));
1334 assert_eq!(bar.low, Price::from("1.00000"));
1335 assert_eq!(bar.close, Price::from("1.00002"));
1336 assert_eq!(bar.volume, Quantity::from(1));
1337 assert_eq!(bar.ts_init, 4_000_000_000);
1338 assert_eq!(builder.ts_last, 1_000_000_000);
1339 }
1340
1341 #[rstest]
1342 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1343 let instrument = InstrumentAny::Equity(equity_aapl);
1344 let bar_type = BarType::new(
1345 instrument.id(),
1346 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1347 AggregationSource::Internal,
1348 );
1349 let mut builder = BarBuilder::new(
1350 bar_type,
1351 instrument.price_precision(),
1352 instrument.size_precision(),
1353 );
1354
1355 builder.update(
1356 Price::from("1.00000"),
1357 Quantity::from(1),
1358 UnixNanos::default(),
1359 );
1360
1361 assert!(builder.initialized);
1362 assert_eq!(builder.ts_last, 0);
1363 assert_eq!(builder.count, 1);
1364 }
1365
1366 #[rstest]
1367 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1368 equity_aapl: Equity,
1369 ) {
1370 let instrument = InstrumentAny::Equity(equity_aapl);
1371 let bar_type = BarType::new(
1372 instrument.id(),
1373 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1374 AggregationSource::Internal,
1375 );
1376 let mut builder = BarBuilder::new(bar_type, 2, 0);
1377
1378 builder.update(
1379 Price::from("1.00000"),
1380 Quantity::from(1),
1381 UnixNanos::from(1_000),
1382 );
1383 builder.update(
1384 Price::from("1.00001"),
1385 Quantity::from(1),
1386 UnixNanos::from(500),
1387 );
1388
1389 assert!(builder.initialized);
1390 assert_eq!(builder.ts_last, 1_000);
1391 assert_eq!(builder.count, 1);
1392 }
1393
1394 #[rstest]
1395 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1396 let instrument = InstrumentAny::Equity(equity_aapl);
1397 let bar_type = BarType::new(
1398 instrument.id(),
1399 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1400 AggregationSource::Internal,
1401 );
1402 let mut builder = BarBuilder::new(
1403 bar_type,
1404 instrument.price_precision(),
1405 instrument.size_precision(),
1406 );
1407
1408 for _ in 0..5 {
1409 builder.update(
1410 Price::from("1.00000"),
1411 Quantity::from(1),
1412 UnixNanos::from(1_000),
1413 );
1414 }
1415
1416 assert_eq!(builder.count, 5);
1417 }
1418
1419 #[rstest]
1420 #[should_panic]
1421 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1422 let instrument = InstrumentAny::Equity(equity_aapl);
1423 let bar_type = BarType::new(
1424 instrument.id(),
1425 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1426 AggregationSource::Internal,
1427 );
1428 let mut builder = BarBuilder::new(
1429 bar_type,
1430 instrument.price_precision(),
1431 instrument.size_precision(),
1432 );
1433 let _ = builder.build_now();
1434 }
1435
1436 #[rstest]
1437 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1438 let instrument = InstrumentAny::Equity(equity_aapl);
1439 let bar_type = BarType::new(
1440 instrument.id(),
1441 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1442 AggregationSource::Internal,
1443 );
1444 let mut builder = BarBuilder::new(
1445 bar_type,
1446 instrument.price_precision(),
1447 instrument.size_precision(),
1448 );
1449
1450 builder.update(
1451 Price::from("1.00001"),
1452 Quantity::from(2),
1453 UnixNanos::default(),
1454 );
1455 builder.update(
1456 Price::from("1.00002"),
1457 Quantity::from(2),
1458 UnixNanos::default(),
1459 );
1460 builder.update(
1461 Price::from("1.00000"),
1462 Quantity::from(1),
1463 UnixNanos::from(1_000_000_000),
1464 );
1465
1466 let bar = builder.build_now();
1467
1468 assert_eq!(bar.open, Price::from("1.00001"));
1469 assert_eq!(bar.high, Price::from("1.00002"));
1470 assert_eq!(bar.low, Price::from("1.00000"));
1471 assert_eq!(bar.close, Price::from("1.00000"));
1472 assert_eq!(bar.volume, Quantity::from(5));
1473 assert_eq!(bar.ts_init, 1_000_000_000);
1474 assert_eq!(builder.ts_last, 1_000_000_000);
1475 assert_eq!(builder.count, 0);
1476 }
1477
1478 #[rstest]
1479 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1480 let instrument = InstrumentAny::Equity(equity_aapl);
1481 let bar_type = BarType::new(
1482 instrument.id(),
1483 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1484 AggregationSource::Internal,
1485 );
1486 let mut builder = BarBuilder::new(bar_type, 2, 0);
1487
1488 builder.update(
1489 Price::from("1.00001"),
1490 Quantity::from(1),
1491 UnixNanos::default(),
1492 );
1493 builder.build_now();
1494
1495 builder.update(
1496 Price::from("1.00000"),
1497 Quantity::from(1),
1498 UnixNanos::default(),
1499 );
1500 builder.update(
1501 Price::from("1.00003"),
1502 Quantity::from(1),
1503 UnixNanos::default(),
1504 );
1505 builder.update(
1506 Price::from("1.00002"),
1507 Quantity::from(1),
1508 UnixNanos::default(),
1509 );
1510
1511 let bar = builder.build_now();
1512
1513 assert_eq!(bar.open, Price::from("1.00000"));
1514 assert_eq!(bar.high, Price::from("1.00003"));
1515 assert_eq!(bar.low, Price::from("1.00000"));
1516 assert_eq!(bar.close, Price::from("1.00002"));
1517 assert_eq!(bar.volume, Quantity::from(3));
1518 }
1519
1520 #[rstest]
1521 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1522 let instrument = InstrumentAny::Equity(equity_aapl);
1523 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1524 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1525 let handler = Arc::new(Mutex::new(Vec::new()));
1526 let handler_clone = Arc::clone(&handler);
1527
1528 let mut aggregator = TickBarAggregator::new(
1529 bar_type,
1530 instrument.price_precision(),
1531 instrument.size_precision(),
1532 move |bar: Bar| {
1533 let mut handler_guard = handler_clone.lock().unwrap();
1534 handler_guard.push(bar);
1535 },
1536 false,
1537 );
1538
1539 let trade = TradeTick::default();
1540 aggregator.handle_trade(trade);
1541
1542 let handler_guard = handler.lock().unwrap();
1543 assert_eq!(handler_guard.len(), 0);
1544 }
1545
1546 #[rstest]
1547 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1548 let instrument = InstrumentAny::Equity(equity_aapl);
1549 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1550 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1551 let handler = Arc::new(Mutex::new(Vec::new()));
1552 let handler_clone = Arc::clone(&handler);
1553
1554 let mut aggregator = TickBarAggregator::new(
1555 bar_type,
1556 instrument.price_precision(),
1557 instrument.size_precision(),
1558 move |bar: Bar| {
1559 let mut handler_guard = handler_clone.lock().unwrap();
1560 handler_guard.push(bar);
1561 },
1562 false,
1563 );
1564
1565 let trade = TradeTick::default();
1566 aggregator.handle_trade(trade);
1567 aggregator.handle_trade(trade);
1568 aggregator.handle_trade(trade);
1569
1570 let handler_guard = handler.lock().unwrap();
1571 let bar = handler_guard.first().unwrap();
1572 assert_eq!(handler_guard.len(), 1);
1573 assert_eq!(bar.open, trade.price);
1574 assert_eq!(bar.high, trade.price);
1575 assert_eq!(bar.low, trade.price);
1576 assert_eq!(bar.close, trade.price);
1577 assert_eq!(bar.volume, Quantity::from(300000));
1578 assert_eq!(bar.ts_event, trade.ts_event);
1579 assert_eq!(bar.ts_init, trade.ts_init);
1580 }
1581
1582 #[rstest]
1583 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1584 let instrument = InstrumentAny::Equity(equity_aapl);
1585 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1586 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1587 let handler = Arc::new(Mutex::new(Vec::new()));
1588 let handler_clone = Arc::clone(&handler);
1589
1590 let mut aggregator = TickBarAggregator::new(
1591 bar_type,
1592 instrument.price_precision(),
1593 instrument.size_precision(),
1594 move |bar: Bar| {
1595 let mut handler_guard = handler_clone.lock().unwrap();
1596 handler_guard.push(bar);
1597 },
1598 false,
1599 );
1600
1601 aggregator.update(
1602 Price::from("1.00001"),
1603 Quantity::from(1),
1604 UnixNanos::default(),
1605 );
1606 aggregator.update(
1607 Price::from("1.00002"),
1608 Quantity::from(1),
1609 UnixNanos::from(1000),
1610 );
1611 aggregator.update(
1612 Price::from("1.00003"),
1613 Quantity::from(1),
1614 UnixNanos::from(2000),
1615 );
1616
1617 let handler_guard = handler.lock().unwrap();
1618 assert_eq!(handler_guard.len(), 1);
1619
1620 let bar = handler_guard.first().unwrap();
1621 assert_eq!(bar.open, Price::from("1.00001"));
1622 assert_eq!(bar.high, Price::from("1.00003"));
1623 assert_eq!(bar.low, Price::from("1.00001"));
1624 assert_eq!(bar.close, Price::from("1.00003"));
1625 assert_eq!(bar.volume, Quantity::from(3));
1626 }
1627
1628 #[rstest]
1629 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1630 let instrument = InstrumentAny::Equity(equity_aapl);
1631 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1632 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1633 let handler = Arc::new(Mutex::new(Vec::new()));
1634 let handler_clone = Arc::clone(&handler);
1635
1636 let mut aggregator = TickBarAggregator::new(
1637 bar_type,
1638 instrument.price_precision(),
1639 instrument.size_precision(),
1640 move |bar: Bar| {
1641 let mut handler_guard = handler_clone.lock().unwrap();
1642 handler_guard.push(bar);
1643 },
1644 false,
1645 );
1646
1647 aggregator.update(
1648 Price::from("1.00001"),
1649 Quantity::from(1),
1650 UnixNanos::default(),
1651 );
1652 aggregator.update(
1653 Price::from("1.00002"),
1654 Quantity::from(1),
1655 UnixNanos::from(1000),
1656 );
1657 aggregator.update(
1658 Price::from("1.00003"),
1659 Quantity::from(1),
1660 UnixNanos::from(2000),
1661 );
1662 aggregator.update(
1663 Price::from("1.00004"),
1664 Quantity::from(1),
1665 UnixNanos::from(3000),
1666 );
1667
1668 let handler_guard = handler.lock().unwrap();
1669 assert_eq!(handler_guard.len(), 2);
1670
1671 let bar1 = &handler_guard[0];
1672 assert_eq!(bar1.open, Price::from("1.00001"));
1673 assert_eq!(bar1.close, Price::from("1.00002"));
1674 assert_eq!(bar1.volume, Quantity::from(2));
1675
1676 let bar2 = &handler_guard[1];
1677 assert_eq!(bar2.open, Price::from("1.00003"));
1678 assert_eq!(bar2.close, Price::from("1.00004"));
1679 assert_eq!(bar2.volume, Quantity::from(2));
1680 }
1681
1682 #[rstest]
1683 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1684 let instrument = InstrumentAny::Equity(equity_aapl);
1685 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1686 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1687 let handler = Arc::new(Mutex::new(Vec::new()));
1688 let handler_clone = Arc::clone(&handler);
1689
1690 let mut aggregator = VolumeBarAggregator::new(
1691 bar_type,
1692 instrument.price_precision(),
1693 instrument.size_precision(),
1694 move |bar: Bar| {
1695 let mut handler_guard = handler_clone.lock().unwrap();
1696 handler_guard.push(bar);
1697 },
1698 false,
1699 );
1700
1701 aggregator.update(
1702 Price::from("1.00001"),
1703 Quantity::from(25),
1704 UnixNanos::default(),
1705 );
1706
1707 let handler_guard = handler.lock().unwrap();
1708 assert_eq!(handler_guard.len(), 2);
1709 let bar1 = &handler_guard[0];
1710 assert_eq!(bar1.volume, Quantity::from(10));
1711 let bar2 = &handler_guard[1];
1712 assert_eq!(bar2.volume, Quantity::from(10));
1713 }
1714
1715 #[rstest]
1716 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1717 let instrument = InstrumentAny::Equity(equity_aapl);
1718 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1720 let handler = Arc::new(Mutex::new(Vec::new()));
1721 let handler_clone = Arc::clone(&handler);
1722
1723 let mut aggregator = ValueBarAggregator::new(
1724 bar_type,
1725 instrument.price_precision(),
1726 instrument.size_precision(),
1727 move |bar: Bar| {
1728 let mut handler_guard = handler_clone.lock().unwrap();
1729 handler_guard.push(bar);
1730 },
1731 false,
1732 );
1733
1734 aggregator.update(
1736 Price::from("100.00"),
1737 Quantity::from(5),
1738 UnixNanos::default(),
1739 );
1740 aggregator.update(
1741 Price::from("100.00"),
1742 Quantity::from(5),
1743 UnixNanos::from(1000),
1744 );
1745
1746 let handler_guard = handler.lock().unwrap();
1747 assert_eq!(handler_guard.len(), 1);
1748 let bar = handler_guard.first().unwrap();
1749 assert_eq!(bar.volume, Quantity::from(10));
1750 }
1751
1752 #[rstest]
1753 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1754 let instrument = InstrumentAny::Equity(equity_aapl);
1755 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1756 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1757 let handler = Arc::new(Mutex::new(Vec::new()));
1758 let handler_clone = Arc::clone(&handler);
1759
1760 let mut aggregator = ValueBarAggregator::new(
1761 bar_type,
1762 instrument.price_precision(),
1763 instrument.size_precision(),
1764 move |bar: Bar| {
1765 let mut handler_guard = handler_clone.lock().unwrap();
1766 handler_guard.push(bar);
1767 },
1768 false,
1769 );
1770
1771 aggregator.update(
1773 Price::from("100.00"),
1774 Quantity::from(25),
1775 UnixNanos::default(),
1776 );
1777
1778 let handler_guard = handler.lock().unwrap();
1779 assert_eq!(handler_guard.len(), 2);
1780 let remaining_value = aggregator.get_cumulative_value();
1781 assert!(remaining_value < 1000.0); }
1783
1784 #[rstest]
1785 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1786 let instrument = InstrumentAny::Equity(equity_aapl);
1787 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1789 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1790 let handler = Arc::new(Mutex::new(Vec::new()));
1791 let handler_clone = Arc::clone(&handler);
1792 let clock = Rc::new(RefCell::new(TestClock::new()));
1793
1794 let mut aggregator = TimeBarAggregator::new(
1795 bar_type,
1796 instrument.price_precision(),
1797 instrument.size_precision(),
1798 clock.clone(),
1799 move |bar: Bar| {
1800 let mut handler_guard = handler_clone.lock().unwrap();
1801 handler_guard.push(bar);
1802 },
1803 false, true, false, BarIntervalType::LeftOpen,
1807 None, 15, false, );
1811
1812 aggregator.update(
1813 Price::from("100.00"),
1814 Quantity::from(1),
1815 UnixNanos::default(),
1816 );
1817
1818 let next_sec = UnixNanos::from(1_000_000_000);
1819 clock.borrow_mut().set_time(next_sec);
1820
1821 let event = TimeEvent::new(
1822 Ustr::from("1-SECOND-LAST"),
1823 UUID4::new(),
1824 next_sec,
1825 next_sec,
1826 );
1827 aggregator.build_bar(event);
1828
1829 let handler_guard = handler.lock().unwrap();
1830 assert_eq!(handler_guard.len(), 1);
1831 let bar = handler_guard.first().unwrap();
1832 assert_eq!(bar.ts_event, UnixNanos::default());
1833 assert_eq!(bar.ts_init, next_sec);
1834 }
1835
1836 #[rstest]
1837 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1838 let instrument = InstrumentAny::Equity(equity_aapl);
1839 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1840 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1841 let handler = Arc::new(Mutex::new(Vec::new()));
1842 let handler_clone = Arc::clone(&handler);
1843 let clock = Rc::new(RefCell::new(TestClock::new()));
1844
1845 let mut aggregator = TimeBarAggregator::new(
1846 bar_type,
1847 instrument.price_precision(),
1848 instrument.size_precision(),
1849 clock.clone(),
1850 move |bar: Bar| {
1851 let mut handler_guard = handler_clone.lock().unwrap();
1852 handler_guard.push(bar);
1853 },
1854 false, true, true, BarIntervalType::LeftOpen,
1858 None,
1859 15,
1860 false, );
1862
1863 aggregator.update(
1865 Price::from("100.00"),
1866 Quantity::from(1),
1867 UnixNanos::default(),
1868 );
1869
1870 let ts1 = UnixNanos::from(1_000_000_000);
1872 clock.borrow_mut().set_time(ts1);
1873 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1874 aggregator.build_bar(event);
1875
1876 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1878
1879 let ts2 = UnixNanos::from(2_000_000_000);
1881 clock.borrow_mut().set_time(ts2);
1882 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1883 aggregator.build_bar(event);
1884
1885 let handler_guard = handler.lock().unwrap();
1886 assert_eq!(handler_guard.len(), 2);
1887
1888 let bar1 = &handler_guard[0];
1889 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
1891 assert_eq!(bar1.close, Price::from("100.00"));
1892 let bar2 = &handler_guard[1];
1893 assert_eq!(bar2.ts_event, ts2);
1894 assert_eq!(bar2.ts_init, ts2);
1895 assert_eq!(bar2.close, Price::from("101.00"));
1896 }
1897
1898 #[rstest]
1899 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
1900 let instrument = InstrumentAny::Equity(equity_aapl);
1901 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1902 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1903 let handler = Arc::new(Mutex::new(Vec::new()));
1904 let handler_clone = Arc::clone(&handler);
1905 let clock = Rc::new(RefCell::new(TestClock::new()));
1906 let mut aggregator = TimeBarAggregator::new(
1907 bar_type,
1908 instrument.price_precision(),
1909 instrument.size_precision(),
1910 clock.clone(),
1911 move |bar: Bar| {
1912 let mut handler_guard = handler_clone.lock().unwrap();
1913 handler_guard.push(bar);
1914 },
1915 false, true, true, BarIntervalType::RightOpen,
1919 None,
1920 15,
1921 false, );
1923
1924 aggregator.update(
1926 Price::from("100.00"),
1927 Quantity::from(1),
1928 UnixNanos::default(),
1929 );
1930
1931 let ts1 = UnixNanos::from(1_000_000_000);
1933 clock.borrow_mut().set_time(ts1);
1934 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1935 aggregator.build_bar(event);
1936
1937 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1939
1940 let ts2 = UnixNanos::from(2_000_000_000);
1942 clock.borrow_mut().set_time(ts2);
1943 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1944 aggregator.build_bar(event);
1945
1946 let handler_guard = handler.lock().unwrap();
1947 assert_eq!(handler_guard.len(), 2);
1948
1949 let bar1 = &handler_guard[0];
1950 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
1952 assert_eq!(bar1.close, Price::from("100.00"));
1953
1954 let bar2 = &handler_guard[1];
1955 assert_eq!(bar2.ts_event, ts1);
1956 assert_eq!(bar2.ts_init, ts2);
1957 assert_eq!(bar2.close, Price::from("101.00"));
1958 }
1959
1960 #[rstest]
1961 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
1962 let instrument = InstrumentAny::Equity(equity_aapl);
1963 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1964 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1965 let handler = Arc::new(Mutex::new(Vec::new()));
1966 let handler_clone = Arc::clone(&handler);
1967 let clock = Rc::new(RefCell::new(TestClock::new()));
1968
1969 let mut aggregator = TimeBarAggregator::new(
1971 bar_type,
1972 instrument.price_precision(),
1973 instrument.size_precision(),
1974 clock.clone(),
1975 move |bar: Bar| {
1976 let mut handler_guard = handler_clone.lock().unwrap();
1977 handler_guard.push(bar);
1978 },
1979 false, false, true, BarIntervalType::LeftOpen,
1983 None,
1984 15,
1985 false, );
1987
1988 let ts1 = UnixNanos::from(1_000_000_000);
1990 clock.borrow_mut().set_time(ts1);
1991 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1992 aggregator.build_bar(event);
1993
1994 let handler_guard = handler.lock().unwrap();
1995 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
1997
1998 let handler = Arc::new(Mutex::new(Vec::new()));
2000 let handler_clone = Arc::clone(&handler);
2001 let mut aggregator = TimeBarAggregator::new(
2002 bar_type,
2003 instrument.price_precision(),
2004 instrument.size_precision(),
2005 clock.clone(),
2006 move |bar: Bar| {
2007 let mut handler_guard = handler_clone.lock().unwrap();
2008 handler_guard.push(bar);
2009 },
2010 false,
2011 true, true, BarIntervalType::LeftOpen,
2014 None,
2015 15,
2016 false, );
2018
2019 aggregator.update(
2020 Price::from("100.00"),
2021 Quantity::from(1),
2022 UnixNanos::default(),
2023 );
2024
2025 let ts1 = UnixNanos::from(1_000_000_000);
2027 clock.borrow_mut().set_time(ts1);
2028 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2029 aggregator.build_bar(event);
2030
2031 let ts2 = UnixNanos::from(2_000_000_000);
2033 clock.borrow_mut().set_time(ts2);
2034 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2035 aggregator.build_bar(event);
2036
2037 let handler_guard = handler.lock().unwrap();
2038 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
2040 assert_eq!(bar1.close, Price::from("100.00"));
2041 let bar2 = &handler_guard[1];
2042 assert_eq!(bar2.close, Price::from("100.00")); }
2044
2045 #[rstest]
2046 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2047 let instrument = InstrumentAny::Equity(equity_aapl);
2048 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2049 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2050 let clock = Rc::new(RefCell::new(TestClock::new()));
2051 let handler = Arc::new(Mutex::new(Vec::new()));
2052 let handler_clone = Arc::clone(&handler);
2053
2054 let mut aggregator = TimeBarAggregator::new(
2055 bar_type,
2056 instrument.price_precision(),
2057 instrument.size_precision(),
2058 clock.clone(),
2059 move |bar: Bar| {
2060 let mut handler_guard = handler_clone.lock().unwrap();
2061 handler_guard.push(bar);
2062 },
2063 false, true, true, BarIntervalType::RightOpen,
2067 None,
2068 15,
2069 false, );
2071
2072 let ts1 = UnixNanos::from(1_000_000_000);
2073 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2074
2075 let ts2 = UnixNanos::from(2_000_000_000);
2076 clock.borrow_mut().set_time(ts2);
2077
2078 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2080 aggregator.build_bar(event);
2081
2082 let handler_guard = handler.lock().unwrap();
2083 let bar = handler_guard.first().unwrap();
2084 assert_eq!(bar.ts_event, UnixNanos::default());
2085 assert_eq!(bar.ts_init, ts2);
2086 }
2087
2088 #[rstest]
2089 fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2090 let instrument = InstrumentAny::Equity(equity_aapl);
2091 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2092 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2093 let clock = Rc::new(RefCell::new(TestClock::new()));
2094 let handler = Arc::new(Mutex::new(Vec::new()));
2095 let handler_clone = Arc::clone(&handler);
2096
2097 let mut aggregator = TimeBarAggregator::new(
2098 bar_type,
2099 instrument.price_precision(),
2100 instrument.size_precision(),
2101 clock.clone(),
2102 move |bar: Bar| {
2103 let mut handler_guard = handler_clone.lock().unwrap();
2104 handler_guard.push(bar);
2105 },
2106 false, true, true, BarIntervalType::LeftOpen,
2110 None,
2111 15,
2112 false, );
2114
2115 let ts1 = UnixNanos::from(1_000_000_000);
2116 clock.borrow_mut().set_time(ts1);
2117
2118 let initial_time = clock.borrow().utc_now();
2119 aggregator.start_batch_time(UnixNanos::from(
2120 initial_time.timestamp_nanos_opt().unwrap() as u64
2121 ));
2122
2123 let handler_guard = handler.lock().unwrap();
2124 assert_eq!(handler_guard.len(), 0);
2125 }
2126}