1use std::{
22 any::Any,
23 cell::RefCell,
24 fmt::Debug,
25 ops::Add,
26 rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31 clock::{Clock, TestClock},
32 timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35 UnixNanos,
36 correctness::{self, FAILED},
37 datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40 data::{
41 QuoteTick, TradeTick,
42 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43 },
44 enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45 types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48type BarHandler = Box<dyn FnMut(Bar)>;
50
51pub trait BarAggregator: Any + Debug {
55 fn bar_type(&self) -> BarType;
57 fn is_running(&self) -> bool;
59 fn set_is_running(&mut self, value: bool);
61 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63 fn handle_quote(&mut self, quote: QuoteTick) {
65 let spec = self.bar_type().spec();
66 self.update(
67 quote.extract_price(spec.price_type),
68 quote.extract_size(spec.price_type),
69 quote.ts_init,
70 );
71 }
72 fn handle_trade(&mut self, trade: TradeTick) {
74 self.update(trade.price, trade.size, trade.ts_init);
75 }
76 fn handle_bar(&mut self, bar: Bar) {
78 self.update_bar(bar, bar.volume, bar.ts_init);
79 }
80 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81 fn stop(&mut self) {}
83 fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85 fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87 fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89 fn build_bar(&mut self, _event: TimeEvent) {}
91 fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95 fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101 pub fn as_any(&self) -> &dyn Any {
103 self
104 }
105 pub fn as_any_mut(&mut self) -> &mut dyn Any {
107 self
108 }
109}
110
111#[derive(Debug)]
113pub struct BarBuilder {
114 bar_type: BarType,
115 price_precision: u8,
116 size_precision: u8,
117 initialized: bool,
118 ts_last: UnixNanos,
119 count: usize,
120 last_close: Option<Price>,
121 open: Option<Price>,
122 high: Option<Price>,
123 low: Option<Price>,
124 close: Option<Price>,
125 volume: Quantity,
126}
127
128impl BarBuilder {
129 #[must_use]
137 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
138 correctness::check_equal(
139 &bar_type.aggregation_source(),
140 &AggregationSource::Internal,
141 "bar_type.aggregation_source",
142 "AggregationSource::Internal",
143 )
144 .expect(FAILED);
145
146 Self {
147 bar_type,
148 price_precision,
149 size_precision,
150 initialized: false,
151 ts_last: UnixNanos::default(),
152 count: 0,
153 last_close: None,
154 open: None,
155 high: None,
156 low: None,
157 close: None,
158 volume: Quantity::zero(size_precision),
159 }
160 }
161
162 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
168 if ts_init < self.ts_last {
169 return; }
171
172 if self.open.is_none() {
173 self.open = Some(price);
174 self.high = Some(price);
175 self.low = Some(price);
176 self.initialized = true;
177 } else {
178 if price > self.high.unwrap() {
179 self.high = Some(price);
180 }
181 if price < self.low.unwrap() {
182 self.low = Some(price);
183 }
184 }
185
186 self.close = Some(price);
187 self.volume = self.volume.add(size);
188 self.count += 1;
189 self.ts_last = ts_init;
190 }
191
192 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
198 if ts_init < self.ts_last {
199 return; }
201
202 if self.open.is_none() {
203 self.open = Some(bar.open);
204 self.high = Some(bar.high);
205 self.low = Some(bar.low);
206 self.initialized = true;
207 } else {
208 if bar.high > self.high.unwrap() {
209 self.high = Some(bar.high);
210 }
211 if bar.low < self.low.unwrap() {
212 self.low = Some(bar.low);
213 }
214 }
215
216 self.close = Some(bar.close);
217 self.volume = self.volume.add(volume);
218 self.count += 1;
219 self.ts_last = ts_init;
220 }
221
222 pub fn reset(&mut self) {
226 self.open = None;
227 self.high = None;
228 self.low = None;
229 self.volume = Quantity::zero(self.size_precision);
230 self.count = 0;
231 }
232
233 pub fn build_now(&mut self) -> Bar {
235 self.build(self.ts_last, self.ts_last)
236 }
237
238 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
244 if self.open.is_none() {
245 self.open = self.last_close;
246 self.high = self.last_close;
247 self.low = self.last_close;
248 self.close = self.last_close;
249 }
250
251 if let (Some(close), Some(low)) = (self.close, self.low)
252 && close < low
253 {
254 self.low = Some(close);
255 }
256
257 if let (Some(close), Some(high)) = (self.close, self.high)
258 && close > high
259 {
260 self.high = Some(close);
261 }
262
263 let bar = Bar::new(
265 self.bar_type,
266 self.open.unwrap(),
267 self.high.unwrap(),
268 self.low.unwrap(),
269 self.close.unwrap(),
270 self.volume,
271 ts_event,
272 ts_init,
273 );
274
275 self.last_close = self.close;
276 self.reset();
277 bar
278 }
279}
280
281pub struct BarAggregatorCore {
283 bar_type: BarType,
284 builder: BarBuilder,
285 handler: BarHandler,
286 is_running: bool,
287}
288
289impl Debug for BarAggregatorCore {
290 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291 f.debug_struct(stringify!(BarAggregatorCore))
292 .field("bar_type", &self.bar_type)
293 .field("builder", &self.builder)
294 .field("is_running", &self.is_running)
295 .finish()
296 }
297}
298
299impl BarAggregatorCore {
300 pub fn new<H: FnMut(Bar) + 'static>(
308 bar_type: BarType,
309 price_precision: u8,
310 size_precision: u8,
311 handler: H,
312 ) -> Self {
313 Self {
314 bar_type,
315 builder: BarBuilder::new(bar_type, price_precision, size_precision),
316 handler: Box::new(handler),
317 is_running: false,
318 }
319 }
320
321 pub const fn set_is_running(&mut self, value: bool) {
323 self.is_running = value;
324 }
325 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
326 self.builder.update(price, size, ts_init);
327 }
328
329 fn build_now_and_send(&mut self) {
330 let bar = self.builder.build_now();
331 (self.handler)(bar);
332 }
333
334 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
335 let bar = self.builder.build(ts_event, ts_init);
336 (self.handler)(bar);
337 }
338}
339
340pub struct TickBarAggregator {
345 core: BarAggregatorCore,
346}
347
348impl Debug for TickBarAggregator {
349 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 f.debug_struct(stringify!(TickBarAggregator))
351 .field("core", &self.core)
352 .finish()
353 }
354}
355
356impl TickBarAggregator {
357 pub fn new<H: FnMut(Bar) + 'static>(
365 bar_type: BarType,
366 price_precision: u8,
367 size_precision: u8,
368 handler: H,
369 ) -> Self {
370 Self {
371 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
372 }
373 }
374}
375
376impl BarAggregator for TickBarAggregator {
377 fn bar_type(&self) -> BarType {
378 self.core.bar_type
379 }
380
381 fn is_running(&self) -> bool {
382 self.core.is_running
383 }
384
385 fn set_is_running(&mut self, value: bool) {
386 self.core.set_is_running(value);
387 }
388
389 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
391 self.core.apply_update(price, size, ts_init);
392 let spec = self.core.bar_type.spec();
393
394 if self.core.builder.count >= spec.step.get() {
395 self.core.build_now_and_send();
396 }
397 }
398
399 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
400 self.core.builder.update_bar(bar, volume, ts_init);
401 let spec = self.core.bar_type.spec();
402
403 if self.core.builder.count >= spec.step.get() {
404 self.core.build_now_and_send();
405 }
406 }
407}
408
409pub struct TickImbalanceBarAggregator {
414 core: BarAggregatorCore,
415 imbalance: isize,
416}
417
418impl Debug for TickImbalanceBarAggregator {
419 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420 f.debug_struct(stringify!(TickImbalanceBarAggregator))
421 .field("core", &self.core)
422 .field("imbalance", &self.imbalance)
423 .finish()
424 }
425}
426
427impl TickImbalanceBarAggregator {
428 pub fn new<H: FnMut(Bar) + 'static>(
436 bar_type: BarType,
437 price_precision: u8,
438 size_precision: u8,
439 handler: H,
440 ) -> Self {
441 Self {
442 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
443 imbalance: 0,
444 }
445 }
446}
447
448impl BarAggregator for TickImbalanceBarAggregator {
449 fn bar_type(&self) -> BarType {
450 self.core.bar_type
451 }
452
453 fn is_running(&self) -> bool {
454 self.core.is_running
455 }
456
457 fn set_is_running(&mut self, value: bool) {
458 self.core.set_is_running(value);
459 }
460
461 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
466 self.core.apply_update(price, size, ts_init);
467 }
468
469 fn handle_trade(&mut self, trade: TradeTick) {
470 self.core
471 .apply_update(trade.price, trade.size, trade.ts_init);
472
473 let delta = match trade.aggressor_side {
474 AggressorSide::Buyer => 1,
475 AggressorSide::Seller => -1,
476 AggressorSide::NoAggressor => 0,
477 };
478
479 if delta == 0 {
480 return;
481 }
482
483 self.imbalance += delta;
484 let threshold = self.core.bar_type.spec().step.get();
485 if self.imbalance.unsigned_abs() >= threshold {
486 self.core.build_now_and_send();
487 self.imbalance = 0;
488 }
489 }
490
491 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
492 self.core.builder.update_bar(bar, volume, ts_init);
493 }
494}
495
496pub struct TickRunsBarAggregator {
498 core: BarAggregatorCore,
499 current_run_side: Option<AggressorSide>,
500 run_count: usize,
501}
502
503impl Debug for TickRunsBarAggregator {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 f.debug_struct(stringify!(TickRunsBarAggregator))
506 .field("core", &self.core)
507 .field("current_run_side", &self.current_run_side)
508 .field("run_count", &self.run_count)
509 .finish()
510 }
511}
512
513impl TickRunsBarAggregator {
514 pub fn new<H: FnMut(Bar) + 'static>(
522 bar_type: BarType,
523 price_precision: u8,
524 size_precision: u8,
525 handler: H,
526 ) -> Self {
527 Self {
528 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
529 current_run_side: None,
530 run_count: 0,
531 }
532 }
533}
534
535impl BarAggregator for TickRunsBarAggregator {
536 fn bar_type(&self) -> BarType {
537 self.core.bar_type
538 }
539
540 fn is_running(&self) -> bool {
541 self.core.is_running
542 }
543
544 fn set_is_running(&mut self, value: bool) {
545 self.core.set_is_running(value);
546 }
547
548 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
553 self.core.apply_update(price, size, ts_init);
554 }
555
556 fn handle_trade(&mut self, trade: TradeTick) {
557 let side = match trade.aggressor_side {
558 AggressorSide::Buyer => Some(AggressorSide::Buyer),
559 AggressorSide::Seller => Some(AggressorSide::Seller),
560 AggressorSide::NoAggressor => None,
561 };
562
563 if let Some(side) = side {
564 if self.current_run_side != Some(side) {
565 self.current_run_side = Some(side);
566 self.run_count = 0;
567 self.core.builder.reset();
568 }
569
570 self.core
571 .apply_update(trade.price, trade.size, trade.ts_init);
572 self.run_count += 1;
573
574 let threshold = self.core.bar_type.spec().step.get();
575 if self.run_count >= threshold {
576 self.core.build_now_and_send();
577 self.run_count = 0;
578 self.current_run_side = None;
579 }
580 } else {
581 self.core
582 .apply_update(trade.price, trade.size, trade.ts_init);
583 }
584 }
585
586 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
587 self.core.builder.update_bar(bar, volume, ts_init);
588 }
589}
590
591pub struct VolumeBarAggregator {
593 core: BarAggregatorCore,
594}
595
596impl Debug for VolumeBarAggregator {
597 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
598 f.debug_struct(stringify!(VolumeBarAggregator))
599 .field("core", &self.core)
600 .finish()
601 }
602}
603
604impl VolumeBarAggregator {
605 pub fn new<H: FnMut(Bar) + 'static>(
613 bar_type: BarType,
614 price_precision: u8,
615 size_precision: u8,
616 handler: H,
617 ) -> Self {
618 Self {
619 core: BarAggregatorCore::new(
620 bar_type.standard(),
621 price_precision,
622 size_precision,
623 handler,
624 ),
625 }
626 }
627}
628
629impl BarAggregator for VolumeBarAggregator {
630 fn bar_type(&self) -> BarType {
631 self.core.bar_type
632 }
633
634 fn is_running(&self) -> bool {
635 self.core.is_running
636 }
637
638 fn set_is_running(&mut self, value: bool) {
639 self.core.set_is_running(value);
640 }
641
642 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
644 let mut raw_size_update = size.raw;
645 let spec = self.core.bar_type.spec();
646 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
647
648 while raw_size_update > 0 {
649 if self.core.builder.volume.raw + raw_size_update < raw_step {
650 self.core.apply_update(
651 price,
652 Quantity::from_raw(raw_size_update, size.precision),
653 ts_init,
654 );
655 break;
656 }
657
658 let raw_size_diff = raw_step - self.core.builder.volume.raw;
659 self.core.apply_update(
660 price,
661 Quantity::from_raw(raw_size_diff, size.precision),
662 ts_init,
663 );
664
665 self.core.build_now_and_send();
666 raw_size_update -= raw_size_diff;
667 }
668 }
669
670 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
671 let mut raw_volume_update = volume.raw;
672 let spec = self.core.bar_type.spec();
673 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
674
675 while raw_volume_update > 0 {
676 if self.core.builder.volume.raw + raw_volume_update < raw_step {
677 self.core.builder.update_bar(
678 bar,
679 Quantity::from_raw(raw_volume_update, volume.precision),
680 ts_init,
681 );
682 break;
683 }
684
685 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
686 self.core.builder.update_bar(
687 bar,
688 Quantity::from_raw(raw_volume_diff, volume.precision),
689 ts_init,
690 );
691
692 self.core.build_now_and_send();
693 raw_volume_update -= raw_volume_diff;
694 }
695 }
696}
697
698pub struct VolumeImbalanceBarAggregator {
700 core: BarAggregatorCore,
701 imbalance_raw: i128,
702 raw_step: i128,
703}
704
705impl Debug for VolumeImbalanceBarAggregator {
706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707 f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
708 .field("core", &self.core)
709 .field("imbalance_raw", &self.imbalance_raw)
710 .field("raw_step", &self.raw_step)
711 .finish()
712 }
713}
714
715impl VolumeImbalanceBarAggregator {
716 pub fn new<H: FnMut(Bar) + 'static>(
724 bar_type: BarType,
725 price_precision: u8,
726 size_precision: u8,
727 handler: H,
728 ) -> Self {
729 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
730 Self {
731 core: BarAggregatorCore::new(
732 bar_type.standard(),
733 price_precision,
734 size_precision,
735 handler,
736 ),
737 imbalance_raw: 0,
738 raw_step,
739 }
740 }
741}
742
743impl BarAggregator for VolumeImbalanceBarAggregator {
744 fn bar_type(&self) -> BarType {
745 self.core.bar_type
746 }
747
748 fn is_running(&self) -> bool {
749 self.core.is_running
750 }
751
752 fn set_is_running(&mut self, value: bool) {
753 self.core.set_is_running(value);
754 }
755
756 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
761 self.core.apply_update(price, size, ts_init);
762 }
763
764 fn handle_trade(&mut self, trade: TradeTick) {
765 let side = match trade.aggressor_side {
766 AggressorSide::Buyer => 1,
767 AggressorSide::Seller => -1,
768 AggressorSide::NoAggressor => {
769 self.core
770 .apply_update(trade.price, trade.size, trade.ts_init);
771 return;
772 }
773 };
774
775 let mut raw_remaining = trade.size.raw as i128;
776 while raw_remaining > 0 {
777 let imbalance_abs = self.imbalance_raw.abs();
778 let needed = (self.raw_step - imbalance_abs).max(1);
779 let raw_chunk = raw_remaining.min(needed);
780 let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
781
782 self.core
783 .apply_update(trade.price, qty_chunk, trade.ts_init);
784
785 self.imbalance_raw += side * raw_chunk;
786 raw_remaining -= raw_chunk;
787
788 if self.imbalance_raw.abs() >= self.raw_step {
789 self.core.build_now_and_send();
790 self.imbalance_raw = 0;
791 }
792 }
793 }
794
795 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
796 self.core.builder.update_bar(bar, volume, ts_init);
797 }
798}
799
800pub struct VolumeRunsBarAggregator {
802 core: BarAggregatorCore,
803 current_run_side: Option<AggressorSide>,
804 run_volume_raw: QuantityRaw,
805 raw_step: QuantityRaw,
806}
807
808impl Debug for VolumeRunsBarAggregator {
809 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810 f.debug_struct(stringify!(VolumeRunsBarAggregator))
811 .field("core", &self.core)
812 .field("current_run_side", &self.current_run_side)
813 .field("run_volume_raw", &self.run_volume_raw)
814 .field("raw_step", &self.raw_step)
815 .finish()
816 }
817}
818
819impl VolumeRunsBarAggregator {
820 pub fn new<H: FnMut(Bar) + 'static>(
828 bar_type: BarType,
829 price_precision: u8,
830 size_precision: u8,
831 handler: H,
832 ) -> Self {
833 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
834 Self {
835 core: BarAggregatorCore::new(
836 bar_type.standard(),
837 price_precision,
838 size_precision,
839 handler,
840 ),
841 current_run_side: None,
842 run_volume_raw: 0,
843 raw_step,
844 }
845 }
846}
847
848impl BarAggregator for VolumeRunsBarAggregator {
849 fn bar_type(&self) -> BarType {
850 self.core.bar_type
851 }
852
853 fn is_running(&self) -> bool {
854 self.core.is_running
855 }
856
857 fn set_is_running(&mut self, value: bool) {
858 self.core.set_is_running(value);
859 }
860
861 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
866 self.core.apply_update(price, size, ts_init);
867 }
868
869 fn handle_trade(&mut self, trade: TradeTick) {
870 let side = match trade.aggressor_side {
871 AggressorSide::Buyer => Some(AggressorSide::Buyer),
872 AggressorSide::Seller => Some(AggressorSide::Seller),
873 AggressorSide::NoAggressor => None,
874 };
875
876 let Some(side) = side else {
877 self.core
878 .apply_update(trade.price, trade.size, trade.ts_init);
879 return;
880 };
881
882 if self.current_run_side != Some(side) {
883 self.current_run_side = Some(side);
884 self.run_volume_raw = 0;
885 self.core.builder.reset();
886 }
887
888 let mut raw_remaining = trade.size.raw;
889 while raw_remaining > 0 {
890 let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
891 let raw_chunk = raw_remaining.min(needed);
892
893 self.core.apply_update(
894 trade.price,
895 Quantity::from_raw(raw_chunk, trade.size.precision),
896 trade.ts_init,
897 );
898
899 self.run_volume_raw += raw_chunk;
900 raw_remaining -= raw_chunk;
901
902 if self.run_volume_raw >= self.raw_step {
903 self.core.build_now_and_send();
904 self.run_volume_raw = 0;
905 self.current_run_side = None;
906 }
907 }
908 }
909
910 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
911 self.core.builder.update_bar(bar, volume, ts_init);
912 }
913}
914
915pub struct ValueBarAggregator {
920 core: BarAggregatorCore,
921 cum_value: f64,
922}
923
924impl Debug for ValueBarAggregator {
925 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926 f.debug_struct(stringify!(ValueBarAggregator))
927 .field("core", &self.core)
928 .field("cum_value", &self.cum_value)
929 .finish()
930 }
931}
932
933impl ValueBarAggregator {
934 pub fn new<H: FnMut(Bar) + 'static>(
942 bar_type: BarType,
943 price_precision: u8,
944 size_precision: u8,
945 handler: H,
946 ) -> Self {
947 Self {
948 core: BarAggregatorCore::new(
949 bar_type.standard(),
950 price_precision,
951 size_precision,
952 handler,
953 ),
954 cum_value: 0.0,
955 }
956 }
957
958 #[must_use]
959 pub const fn get_cumulative_value(&self) -> f64 {
961 self.cum_value
962 }
963}
964
965impl BarAggregator for ValueBarAggregator {
966 fn bar_type(&self) -> BarType {
967 self.core.bar_type
968 }
969
970 fn is_running(&self) -> bool {
971 self.core.is_running
972 }
973
974 fn set_is_running(&mut self, value: bool) {
975 self.core.set_is_running(value);
976 }
977
978 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
980 let mut size_update = size.as_f64();
981 let spec = self.core.bar_type.spec();
982
983 while size_update > 0.0 {
984 let value_update = price.as_f64() * size_update;
985 if value_update == 0.0 {
986 self.core
988 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
989 break;
990 }
991
992 if self.cum_value + value_update < spec.step.get() as f64 {
993 self.cum_value += value_update;
994 self.core
995 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
996 break;
997 }
998
999 let value_diff = spec.step.get() as f64 - self.cum_value;
1000 let size_diff = size_update * (value_diff / value_update);
1001 self.core
1002 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1003
1004 self.core.build_now_and_send();
1005 self.cum_value = 0.0;
1006 size_update -= size_diff;
1007 }
1008 }
1009
1010 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1011 let mut volume_update = volume;
1012 let average_price = Price::new(
1013 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1014 self.core.builder.price_precision,
1015 );
1016
1017 while volume_update.as_f64() > 0.0 {
1018 let value_update = average_price.as_f64() * volume_update.as_f64();
1019 if value_update == 0.0 {
1020 self.core.builder.update_bar(bar, volume_update, ts_init);
1022 break;
1023 }
1024
1025 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1026 self.cum_value += value_update;
1027 self.core.builder.update_bar(bar, volume_update, ts_init);
1028 break;
1029 }
1030
1031 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1032 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
1033 self.core.builder.update_bar(
1034 bar,
1035 Quantity::new(volume_diff, volume_update.precision),
1036 ts_init,
1037 );
1038
1039 self.core.build_now_and_send();
1040 self.cum_value = 0.0;
1041 volume_update = Quantity::new(
1042 volume_update.as_f64() - volume_diff,
1043 volume_update.precision,
1044 );
1045 }
1046 }
1047}
1048
1049pub struct ValueImbalanceBarAggregator {
1051 core: BarAggregatorCore,
1052 imbalance_value: f64,
1053 step_value: f64,
1054}
1055
1056impl Debug for ValueImbalanceBarAggregator {
1057 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058 f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1059 .field("core", &self.core)
1060 .field("imbalance_value", &self.imbalance_value)
1061 .field("step_value", &self.step_value)
1062 .finish()
1063 }
1064}
1065
1066impl ValueImbalanceBarAggregator {
1067 pub fn new<H: FnMut(Bar) + 'static>(
1075 bar_type: BarType,
1076 price_precision: u8,
1077 size_precision: u8,
1078 handler: H,
1079 ) -> Self {
1080 Self {
1081 core: BarAggregatorCore::new(
1082 bar_type.standard(),
1083 price_precision,
1084 size_precision,
1085 handler,
1086 ),
1087 imbalance_value: 0.0,
1088 step_value: bar_type.spec().step.get() as f64,
1089 }
1090 }
1091}
1092
1093impl BarAggregator for ValueImbalanceBarAggregator {
1094 fn bar_type(&self) -> BarType {
1095 self.core.bar_type
1096 }
1097
1098 fn is_running(&self) -> bool {
1099 self.core.is_running
1100 }
1101
1102 fn set_is_running(&mut self, value: bool) {
1103 self.core.set_is_running(value);
1104 }
1105
1106 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1111 self.core.apply_update(price, size, ts_init);
1112 }
1113
1114 fn handle_trade(&mut self, trade: TradeTick) {
1115 let price_f64 = trade.price.as_f64();
1116 if price_f64 == 0.0 {
1117 self.core
1118 .apply_update(trade.price, trade.size, trade.ts_init);
1119 return;
1120 }
1121
1122 let side_sign = match trade.aggressor_side {
1123 AggressorSide::Buyer => 1.0,
1124 AggressorSide::Seller => -1.0,
1125 AggressorSide::NoAggressor => {
1126 self.core
1127 .apply_update(trade.price, trade.size, trade.ts_init);
1128 return;
1129 }
1130 };
1131
1132 let mut size_remaining = trade.size.as_f64();
1133 while size_remaining > 0.0 {
1134 let value_remaining = price_f64 * size_remaining;
1135 let current_sign = self.imbalance_value.signum();
1136
1137 if current_sign == 0.0 || current_sign == side_sign {
1138 let needed = self.step_value - self.imbalance_value.abs();
1139 if value_remaining <= needed {
1140 self.imbalance_value += side_sign * value_remaining;
1141 self.core.apply_update(
1142 trade.price,
1143 Quantity::new(size_remaining, trade.size.precision),
1144 trade.ts_init,
1145 );
1146 break;
1147 }
1148
1149 let value_chunk = needed;
1150 let size_chunk = value_chunk / price_f64;
1151 self.core.apply_update(
1152 trade.price,
1153 Quantity::new(size_chunk, trade.size.precision),
1154 trade.ts_init,
1155 );
1156 self.imbalance_value += side_sign * value_chunk;
1157 size_remaining -= size_chunk;
1158
1159 if self.imbalance_value.abs() >= self.step_value {
1160 self.core.build_now_and_send();
1161 self.imbalance_value = 0.0;
1162 }
1163 } else {
1164 let value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1166 let size_chunk = value_to_flatten / price_f64;
1167 self.core.apply_update(
1168 trade.price,
1169 Quantity::new(size_chunk, trade.size.precision),
1170 trade.ts_init,
1171 );
1172 self.imbalance_value += side_sign * value_to_flatten;
1173 size_remaining -= size_chunk;
1174 }
1175 }
1176 }
1177
1178 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1179 self.core.builder.update_bar(bar, volume, ts_init);
1180 }
1181}
1182
1183pub struct ValueRunsBarAggregator {
1185 core: BarAggregatorCore,
1186 current_run_side: Option<AggressorSide>,
1187 run_value: f64,
1188 step_value: f64,
1189}
1190
1191impl Debug for ValueRunsBarAggregator {
1192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193 f.debug_struct(stringify!(ValueRunsBarAggregator))
1194 .field("core", &self.core)
1195 .field("current_run_side", &self.current_run_side)
1196 .field("run_value", &self.run_value)
1197 .field("step_value", &self.step_value)
1198 .finish()
1199 }
1200}
1201
1202impl ValueRunsBarAggregator {
1203 pub fn new<H: FnMut(Bar) + 'static>(
1211 bar_type: BarType,
1212 price_precision: u8,
1213 size_precision: u8,
1214 handler: H,
1215 ) -> Self {
1216 Self {
1217 core: BarAggregatorCore::new(
1218 bar_type.standard(),
1219 price_precision,
1220 size_precision,
1221 handler,
1222 ),
1223 current_run_side: None,
1224 run_value: 0.0,
1225 step_value: bar_type.spec().step.get() as f64,
1226 }
1227 }
1228}
1229
1230impl BarAggregator for ValueRunsBarAggregator {
1231 fn bar_type(&self) -> BarType {
1232 self.core.bar_type
1233 }
1234
1235 fn is_running(&self) -> bool {
1236 self.core.is_running
1237 }
1238
1239 fn set_is_running(&mut self, value: bool) {
1240 self.core.set_is_running(value);
1241 }
1242
1243 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1248 self.core.apply_update(price, size, ts_init);
1249 }
1250
1251 fn handle_trade(&mut self, trade: TradeTick) {
1252 let price_f64 = trade.price.as_f64();
1253 if price_f64 == 0.0 {
1254 self.core
1255 .apply_update(trade.price, trade.size, trade.ts_init);
1256 return;
1257 }
1258
1259 let side = match trade.aggressor_side {
1260 AggressorSide::Buyer => Some(AggressorSide::Buyer),
1261 AggressorSide::Seller => Some(AggressorSide::Seller),
1262 AggressorSide::NoAggressor => None,
1263 };
1264
1265 let Some(side) = side else {
1266 self.core
1267 .apply_update(trade.price, trade.size, trade.ts_init);
1268 return;
1269 };
1270
1271 if self.current_run_side != Some(side) {
1272 self.current_run_side = Some(side);
1273 self.run_value = 0.0;
1274 self.core.builder.reset();
1275 }
1276
1277 let mut size_remaining = trade.size.as_f64();
1278 while size_remaining > 0.0 {
1279 let value_update = price_f64 * size_remaining;
1280 if self.run_value + value_update < self.step_value {
1281 self.run_value += value_update;
1282 self.core.apply_update(
1283 trade.price,
1284 Quantity::new(size_remaining, trade.size.precision),
1285 trade.ts_init,
1286 );
1287 break;
1288 }
1289
1290 let value_needed = self.step_value - self.run_value;
1291 let size_chunk = value_needed / price_f64;
1292 self.core.apply_update(
1293 trade.price,
1294 Quantity::new(size_chunk, trade.size.precision),
1295 trade.ts_init,
1296 );
1297
1298 self.core.build_now_and_send();
1299 self.run_value = 0.0;
1300 self.current_run_side = None;
1301 size_remaining -= size_chunk;
1302 }
1303 }
1304
1305 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1306 self.core.builder.update_bar(bar, volume, ts_init);
1307 }
1308}
1309
1310pub struct RenkoBarAggregator {
1316 core: BarAggregatorCore,
1317 pub brick_size: PriceRaw,
1318 last_close: Option<Price>,
1319}
1320
1321impl Debug for RenkoBarAggregator {
1322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1323 f.debug_struct(stringify!(RenkoBarAggregator))
1324 .field("core", &self.core)
1325 .field("brick_size", &self.brick_size)
1326 .field("last_close", &self.last_close)
1327 .finish()
1328 }
1329}
1330
1331impl RenkoBarAggregator {
1332 pub fn new<H: FnMut(Bar) + 'static>(
1340 bar_type: BarType,
1341 price_precision: u8,
1342 size_precision: u8,
1343 price_increment: Price,
1344 handler: H,
1345 ) -> Self {
1346 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1348
1349 Self {
1350 core: BarAggregatorCore::new(
1351 bar_type.standard(),
1352 price_precision,
1353 size_precision,
1354 handler,
1355 ),
1356 brick_size,
1357 last_close: None,
1358 }
1359 }
1360}
1361
1362impl BarAggregator for RenkoBarAggregator {
1363 fn bar_type(&self) -> BarType {
1364 self.core.bar_type
1365 }
1366
1367 fn is_running(&self) -> bool {
1368 self.core.is_running
1369 }
1370
1371 fn set_is_running(&mut self, value: bool) {
1372 self.core.set_is_running(value);
1373 }
1374
1375 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1380 self.core.apply_update(price, size, ts_init);
1382
1383 if self.last_close.is_none() {
1385 self.last_close = Some(price);
1386 return;
1387 }
1388
1389 let last_close = self.last_close.unwrap();
1390
1391 let current_raw = price.raw;
1393 let last_close_raw = last_close.raw;
1394 let price_diff_raw = current_raw - last_close_raw;
1395 let abs_price_diff_raw = price_diff_raw.abs();
1396
1397 if abs_price_diff_raw >= self.brick_size {
1399 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1400 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1401 let mut current_close = last_close;
1402
1403 let total_volume = self.core.builder.volume;
1405
1406 for _i in 0..num_bricks {
1407 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1409 let brick_close = Price::from_raw(brick_close_raw, price.precision);
1410
1411 let (brick_high, brick_low) = if direction > 0.0 {
1413 (brick_close, current_close)
1414 } else {
1415 (current_close, brick_close)
1416 };
1417
1418 self.core.builder.reset();
1420 self.core.builder.open = Some(current_close);
1421 self.core.builder.high = Some(brick_high);
1422 self.core.builder.low = Some(brick_low);
1423 self.core.builder.close = Some(brick_close);
1424 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1426 self.core.builder.ts_last = ts_init;
1427 self.core.builder.initialized = true;
1428
1429 self.core.build_and_send(ts_init, ts_init);
1431
1432 current_close = brick_close;
1434 self.last_close = Some(brick_close);
1435 }
1436 }
1437 }
1438
1439 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1440 self.core.builder.update_bar(bar, volume, ts_init);
1442
1443 if self.last_close.is_none() {
1445 self.last_close = Some(bar.close);
1446 return;
1447 }
1448
1449 let last_close = self.last_close.unwrap();
1450
1451 let current_raw = bar.close.raw;
1453 let last_close_raw = last_close.raw;
1454 let price_diff_raw = current_raw - last_close_raw;
1455 let abs_price_diff_raw = price_diff_raw.abs();
1456
1457 if abs_price_diff_raw >= self.brick_size {
1459 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1460 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1461 let mut current_close = last_close;
1462
1463 let total_volume = self.core.builder.volume;
1465
1466 for _i in 0..num_bricks {
1467 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1469 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1470
1471 let (brick_high, brick_low) = if direction > 0.0 {
1473 (brick_close, current_close)
1474 } else {
1475 (current_close, brick_close)
1476 };
1477
1478 self.core.builder.reset();
1480 self.core.builder.open = Some(current_close);
1481 self.core.builder.high = Some(brick_high);
1482 self.core.builder.low = Some(brick_low);
1483 self.core.builder.close = Some(brick_close);
1484 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1486 self.core.builder.ts_last = ts_init;
1487 self.core.builder.initialized = true;
1488
1489 self.core.build_and_send(ts_init, ts_init);
1491
1492 current_close = brick_close;
1494 self.last_close = Some(brick_close);
1495 }
1496 }
1497 }
1498}
1499
1500pub struct TimeBarAggregator {
1504 core: BarAggregatorCore,
1505 clock: Rc<RefCell<dyn Clock>>,
1506 build_with_no_updates: bool,
1507 timestamp_on_close: bool,
1508 is_left_open: bool,
1509 stored_open_ns: UnixNanos,
1510 timer_name: String,
1511 interval_ns: UnixNanos,
1512 next_close_ns: UnixNanos,
1513 bar_build_delay: u64,
1514 time_bars_origin_offset: Option<TimeDelta>,
1515 skip_first_non_full_bar: bool,
1516 pub historical_mode: bool,
1517 historical_events: Vec<TimeEvent>,
1518 aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1519}
1520
1521impl Debug for TimeBarAggregator {
1522 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1523 f.debug_struct(stringify!(TimeBarAggregator))
1524 .field("core", &self.core)
1525 .field("build_with_no_updates", &self.build_with_no_updates)
1526 .field("timestamp_on_close", &self.timestamp_on_close)
1527 .field("is_left_open", &self.is_left_open)
1528 .field("timer_name", &self.timer_name)
1529 .field("interval_ns", &self.interval_ns)
1530 .field("bar_build_delay", &self.bar_build_delay)
1531 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1532 .finish()
1533 }
1534}
1535
1536impl TimeBarAggregator {
1537 #[allow(clippy::too_many_arguments)]
1545 pub fn new<H: FnMut(Bar) + 'static>(
1546 bar_type: BarType,
1547 price_precision: u8,
1548 size_precision: u8,
1549 clock: Rc<RefCell<dyn Clock>>,
1550 handler: H,
1551 build_with_no_updates: bool,
1552 timestamp_on_close: bool,
1553 interval_type: BarIntervalType,
1554 time_bars_origin_offset: Option<TimeDelta>,
1555 bar_build_delay: u64,
1556 skip_first_non_full_bar: bool,
1557 ) -> Self {
1558 let is_left_open = match interval_type {
1559 BarIntervalType::LeftOpen => true,
1560 BarIntervalType::RightOpen => false,
1561 };
1562
1563 let core = BarAggregatorCore::new(
1564 bar_type.standard(),
1565 price_precision,
1566 size_precision,
1567 handler,
1568 );
1569
1570 Self {
1571 core,
1572 clock,
1573 build_with_no_updates,
1574 timestamp_on_close,
1575 is_left_open,
1576 stored_open_ns: UnixNanos::default(),
1577 timer_name: bar_type.to_string(),
1578 interval_ns: get_bar_interval_ns(&bar_type),
1579 next_close_ns: UnixNanos::default(),
1580 bar_build_delay,
1581 time_bars_origin_offset,
1582 skip_first_non_full_bar,
1583 historical_mode: false,
1584 historical_events: Vec::new(),
1585 aggregator_weak: None,
1586 }
1587 }
1588
1589 pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1591 self.clock = clock;
1592 }
1593
1594 pub fn start_timer_internal(
1603 &mut self,
1604 aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1605 ) {
1606 let aggregator_weak = if let Some(rc) = aggregator_rc {
1608 let weak = Rc::downgrade(&rc);
1610 self.aggregator_weak = Some(weak.clone());
1611 weak
1612 } else {
1613 self.aggregator_weak
1615 .as_ref()
1616 .expect("Aggregator weak reference must be set before calling start_timer()")
1617 .clone()
1618 };
1619
1620 let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
1621 if let Some(agg) = aggregator_weak.upgrade() {
1622 agg.borrow_mut().build_bar(event);
1623 }
1624 }));
1625
1626 let now = self.clock.borrow().utc_now();
1628 let mut start_time =
1629 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1630 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1631
1632 let fire_immediately = start_time == now;
1634
1635 self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1636
1637 let spec = &self.bar_type().spec();
1638 let start_time_ns = UnixNanos::from(start_time);
1639
1640 if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1641 self.clock
1642 .borrow_mut()
1643 .set_timer_ns(
1644 &self.timer_name,
1645 self.interval_ns.as_u64(),
1646 Some(start_time_ns),
1647 None,
1648 Some(callback),
1649 Some(true), Some(fire_immediately),
1651 )
1652 .expect(FAILED);
1653
1654 if fire_immediately {
1655 self.next_close_ns = start_time_ns;
1656 } else {
1657 let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1658 self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1659 }
1660
1661 self.stored_open_ns = self.next_close_ns - self.interval_ns;
1662 } else {
1663 let alert_time = if fire_immediately {
1665 start_time
1666 } else {
1667 let step = spec.step.get() as u32;
1668 if spec.aggregation == BarAggregation::Month {
1669 add_n_months(start_time, step).expect(FAILED)
1670 } else {
1671 add_n_years(start_time, step).expect(FAILED)
1673 }
1674 };
1675
1676 self.clock
1677 .borrow_mut()
1678 .set_time_alert_ns(
1679 &self.timer_name,
1680 UnixNanos::from(alert_time),
1681 Some(callback),
1682 Some(true), )
1684 .expect(FAILED);
1685
1686 self.next_close_ns = UnixNanos::from(alert_time);
1687 self.stored_open_ns = UnixNanos::from(start_time);
1688 }
1689
1690 log::debug!(
1691 "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1692 self.timer_name,
1693 start_time,
1694 self.historical_mode,
1695 fire_immediately,
1696 now,
1697 self.bar_build_delay
1698 );
1699 }
1700
1701 pub fn stop(&mut self) {
1703 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1704 }
1705
1706 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1707 if self.skip_first_non_full_bar {
1708 self.core.builder.reset();
1709 self.skip_first_non_full_bar = false;
1710 } else {
1711 self.core.build_and_send(ts_event, ts_init);
1712 }
1713 }
1714
1715 fn build_bar(&mut self, event: TimeEvent) {
1716 if !self.core.builder.initialized {
1717 return;
1718 }
1719
1720 if !self.build_with_no_updates && self.core.builder.count == 0 {
1721 return; }
1723
1724 let ts_init = event.ts_event;
1725 let ts_event = if self.is_left_open {
1726 if self.timestamp_on_close {
1727 event.ts_event
1728 } else {
1729 self.stored_open_ns
1730 }
1731 } else {
1732 self.stored_open_ns
1733 };
1734
1735 self.build_and_send(ts_event, ts_init);
1736
1737 self.stored_open_ns = event.ts_event;
1739
1740 if self.bar_type().spec().aggregation == BarAggregation::Month {
1741 let step = self.bar_type().spec().step.get() as u32;
1742 let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1743
1744 self.clock
1745 .borrow_mut()
1746 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1747 .expect(FAILED);
1748
1749 self.next_close_ns = alert_time_ns;
1750 } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1751 let step = self.bar_type().spec().step.get() as u32;
1752 let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1753
1754 self.clock
1755 .borrow_mut()
1756 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1757 .expect(FAILED);
1758
1759 self.next_close_ns = alert_time_ns;
1760 } else {
1761 self.next_close_ns = self
1763 .clock
1764 .borrow()
1765 .next_time_ns(&self.timer_name)
1766 .unwrap_or_default();
1767 }
1768 }
1769
1770 fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1771 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1772 {
1774 let mut clock_borrow = self.clock.borrow_mut();
1775 let test_clock = clock_borrow
1776 .as_any_mut()
1777 .downcast_mut::<TestClock>()
1778 .expect("Expected TestClock in historical mode");
1779 test_clock.set_time(ts_init);
1780 }
1781 self.start_timer_internal(None);
1783 }
1784
1785 {
1787 let mut clock_borrow = self.clock.borrow_mut();
1788 let test_clock = clock_borrow
1789 .as_any_mut()
1790 .downcast_mut::<TestClock>()
1791 .expect("Expected TestClock in historical mode");
1792 self.historical_events = test_clock.advance_time(ts_init, true);
1793 }
1794 }
1795
1796 fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1797 let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1800 for event in events {
1801 self.build_bar(event);
1802 }
1803 }
1804
1805 pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1807 self.historical_events = events;
1808 }
1809}
1810
1811impl BarAggregator for TimeBarAggregator {
1812 fn bar_type(&self) -> BarType {
1813 self.core.bar_type
1814 }
1815
1816 fn is_running(&self) -> bool {
1817 self.core.is_running
1818 }
1819
1820 fn set_is_running(&mut self, value: bool) {
1821 self.core.set_is_running(value);
1822 }
1823
1824 fn stop(&mut self) {
1826 Self::stop(self);
1827 }
1828
1829 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1830 if self.historical_mode {
1831 self.preprocess_historical_events(ts_init);
1832 }
1833
1834 self.core.apply_update(price, size, ts_init);
1835
1836 if self.historical_mode {
1837 self.postprocess_historical_events(ts_init);
1838 }
1839 }
1840
1841 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1842 if self.historical_mode {
1843 self.preprocess_historical_events(ts_init);
1844 }
1845
1846 self.core.builder.update_bar(bar, volume, ts_init);
1847
1848 if self.historical_mode {
1849 self.postprocess_historical_events(ts_init);
1850 }
1851 }
1852
1853 fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1854 self.historical_mode = historical_mode;
1855 self.core.handler = handler;
1856 }
1857
1858 fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1859 self.set_historical_events_internal(events);
1860 }
1861
1862 fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1863 self.set_clock_internal(clock);
1864 }
1865
1866 fn build_bar(&mut self, event: TimeEvent) {
1867 {
1870 #[allow(clippy::use_self)]
1871 TimeBarAggregator::build_bar(self, event);
1872 }
1873 }
1874
1875 fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1876 self.aggregator_weak = Some(weak);
1877 }
1878
1879 fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1880 self.start_timer_internal(aggregator_rc);
1881 }
1882}
1883
1884#[cfg(test)]
1888mod tests {
1889 use std::sync::{Arc, Mutex};
1890
1891 use nautilus_common::clock::TestClock;
1892 use nautilus_core::{MUTEX_POISONED, UUID4};
1893 use nautilus_model::{
1894 data::{BarSpecification, BarType},
1895 enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1896 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1897 types::{Price, Quantity},
1898 };
1899 use rstest::rstest;
1900 use ustr::Ustr;
1901
1902 use super::*;
1903
1904 #[rstest]
1905 fn test_bar_builder_initialization(equity_aapl: Equity) {
1906 let instrument = InstrumentAny::Equity(equity_aapl);
1907 let bar_type = BarType::new(
1908 instrument.id(),
1909 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1910 AggregationSource::Internal,
1911 );
1912 let builder = BarBuilder::new(
1913 bar_type,
1914 instrument.price_precision(),
1915 instrument.size_precision(),
1916 );
1917
1918 assert!(!builder.initialized);
1919 assert_eq!(builder.ts_last, 0);
1920 assert_eq!(builder.count, 0);
1921 }
1922
1923 #[rstest]
1924 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1925 let instrument = InstrumentAny::Equity(equity_aapl);
1926 let bar_type = BarType::new(
1927 instrument.id(),
1928 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1929 AggregationSource::Internal,
1930 );
1931 let mut builder = BarBuilder::new(
1932 bar_type,
1933 instrument.price_precision(),
1934 instrument.size_precision(),
1935 );
1936
1937 builder.update(
1938 Price::from("100.00"),
1939 Quantity::from(1),
1940 UnixNanos::from(1000),
1941 );
1942 builder.update(
1943 Price::from("95.00"),
1944 Quantity::from(1),
1945 UnixNanos::from(2000),
1946 );
1947 builder.update(
1948 Price::from("105.00"),
1949 Quantity::from(1),
1950 UnixNanos::from(3000),
1951 );
1952
1953 let bar = builder.build_now();
1954 assert!(bar.high > bar.low);
1955 assert_eq!(bar.open, Price::from("100.00"));
1956 assert_eq!(bar.high, Price::from("105.00"));
1957 assert_eq!(bar.low, Price::from("95.00"));
1958 assert_eq!(bar.close, Price::from("105.00"));
1959 }
1960
1961 #[rstest]
1962 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1963 let instrument = InstrumentAny::Equity(equity_aapl);
1964 let bar_type = BarType::new(
1965 instrument.id(),
1966 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1967 AggregationSource::Internal,
1968 );
1969 let mut builder = BarBuilder::new(
1970 bar_type,
1971 instrument.price_precision(),
1972 instrument.size_precision(),
1973 );
1974
1975 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1976 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1977
1978 assert_eq!(builder.ts_last, 1_000);
1979 assert_eq!(builder.count, 1);
1980 }
1981
1982 #[rstest]
1983 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1984 let instrument = InstrumentAny::Equity(equity_aapl);
1985 let bar_type = BarType::new(
1986 instrument.id(),
1987 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1988 AggregationSource::Internal,
1989 );
1990 let mut builder = BarBuilder::new(
1991 bar_type,
1992 instrument.price_precision(),
1993 instrument.size_precision(),
1994 );
1995
1996 builder.update(
1997 Price::from("1.00000"),
1998 Quantity::from(1),
1999 UnixNanos::default(),
2000 );
2001
2002 assert!(builder.initialized);
2003 assert_eq!(builder.ts_last, 0);
2004 assert_eq!(builder.count, 1);
2005 }
2006
2007 #[rstest]
2008 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2009 equity_aapl: Equity,
2010 ) {
2011 let instrument = InstrumentAny::Equity(equity_aapl);
2012 let bar_type = BarType::new(
2013 instrument.id(),
2014 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2015 AggregationSource::Internal,
2016 );
2017 let mut builder = BarBuilder::new(bar_type, 2, 0);
2018
2019 builder.update(
2020 Price::from("1.00000"),
2021 Quantity::from(1),
2022 UnixNanos::from(1_000),
2023 );
2024 builder.update(
2025 Price::from("1.00001"),
2026 Quantity::from(1),
2027 UnixNanos::from(500),
2028 );
2029
2030 assert!(builder.initialized);
2031 assert_eq!(builder.ts_last, 1_000);
2032 assert_eq!(builder.count, 1);
2033 }
2034
2035 #[rstest]
2036 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2037 let instrument = InstrumentAny::Equity(equity_aapl);
2038 let bar_type = BarType::new(
2039 instrument.id(),
2040 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2041 AggregationSource::Internal,
2042 );
2043 let mut builder = BarBuilder::new(
2044 bar_type,
2045 instrument.price_precision(),
2046 instrument.size_precision(),
2047 );
2048
2049 for _ in 0..5 {
2050 builder.update(
2051 Price::from("1.00000"),
2052 Quantity::from(1),
2053 UnixNanos::from(1_000),
2054 );
2055 }
2056
2057 assert_eq!(builder.count, 5);
2058 }
2059
2060 #[rstest]
2061 #[should_panic]
2062 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2063 let instrument = InstrumentAny::Equity(equity_aapl);
2064 let bar_type = BarType::new(
2065 instrument.id(),
2066 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2067 AggregationSource::Internal,
2068 );
2069 let mut builder = BarBuilder::new(
2070 bar_type,
2071 instrument.price_precision(),
2072 instrument.size_precision(),
2073 );
2074 let _ = builder.build_now();
2075 }
2076
2077 #[rstest]
2078 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2079 let instrument = InstrumentAny::Equity(equity_aapl);
2080 let bar_type = BarType::new(
2081 instrument.id(),
2082 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2083 AggregationSource::Internal,
2084 );
2085 let mut builder = BarBuilder::new(
2086 bar_type,
2087 instrument.price_precision(),
2088 instrument.size_precision(),
2089 );
2090
2091 builder.update(
2092 Price::from("1.00001"),
2093 Quantity::from(2),
2094 UnixNanos::default(),
2095 );
2096 builder.update(
2097 Price::from("1.00002"),
2098 Quantity::from(2),
2099 UnixNanos::default(),
2100 );
2101 builder.update(
2102 Price::from("1.00000"),
2103 Quantity::from(1),
2104 UnixNanos::from(1_000_000_000),
2105 );
2106
2107 let bar = builder.build_now();
2108
2109 assert_eq!(bar.open, Price::from("1.00001"));
2110 assert_eq!(bar.high, Price::from("1.00002"));
2111 assert_eq!(bar.low, Price::from("1.00000"));
2112 assert_eq!(bar.close, Price::from("1.00000"));
2113 assert_eq!(bar.volume, Quantity::from(5));
2114 assert_eq!(bar.ts_init, 1_000_000_000);
2115 assert_eq!(builder.ts_last, 1_000_000_000);
2116 assert_eq!(builder.count, 0);
2117 }
2118
2119 #[rstest]
2120 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2121 let instrument = InstrumentAny::Equity(equity_aapl);
2122 let bar_type = BarType::new(
2123 instrument.id(),
2124 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2125 AggregationSource::Internal,
2126 );
2127 let mut builder = BarBuilder::new(bar_type, 2, 0);
2128
2129 builder.update(
2130 Price::from("1.00001"),
2131 Quantity::from(1),
2132 UnixNanos::default(),
2133 );
2134 builder.build_now();
2135
2136 builder.update(
2137 Price::from("1.00000"),
2138 Quantity::from(1),
2139 UnixNanos::default(),
2140 );
2141 builder.update(
2142 Price::from("1.00003"),
2143 Quantity::from(1),
2144 UnixNanos::default(),
2145 );
2146 builder.update(
2147 Price::from("1.00002"),
2148 Quantity::from(1),
2149 UnixNanos::default(),
2150 );
2151
2152 let bar = builder.build_now();
2153
2154 assert_eq!(bar.open, Price::from("1.00000"));
2155 assert_eq!(bar.high, Price::from("1.00003"));
2156 assert_eq!(bar.low, Price::from("1.00000"));
2157 assert_eq!(bar.close, Price::from("1.00002"));
2158 assert_eq!(bar.volume, Quantity::from(3));
2159 }
2160
2161 #[rstest]
2162 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2163 let instrument = InstrumentAny::Equity(equity_aapl);
2164 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2165 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2166 let handler = Arc::new(Mutex::new(Vec::new()));
2167 let handler_clone = Arc::clone(&handler);
2168
2169 let mut aggregator = TickBarAggregator::new(
2170 bar_type,
2171 instrument.price_precision(),
2172 instrument.size_precision(),
2173 move |bar: Bar| {
2174 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2175 handler_guard.push(bar);
2176 },
2177 );
2178
2179 let trade = TradeTick::default();
2180 aggregator.handle_trade(trade);
2181
2182 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2183 assert_eq!(handler_guard.len(), 0);
2184 }
2185
2186 #[rstest]
2187 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2188 let instrument = InstrumentAny::Equity(equity_aapl);
2189 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2190 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2191 let handler = Arc::new(Mutex::new(Vec::new()));
2192 let handler_clone = Arc::clone(&handler);
2193
2194 let mut aggregator = TickBarAggregator::new(
2195 bar_type,
2196 instrument.price_precision(),
2197 instrument.size_precision(),
2198 move |bar: Bar| {
2199 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2200 handler_guard.push(bar);
2201 },
2202 );
2203
2204 let trade = TradeTick::default();
2205 aggregator.handle_trade(trade);
2206 aggregator.handle_trade(trade);
2207 aggregator.handle_trade(trade);
2208
2209 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2210 let bar = handler_guard.first().unwrap();
2211 assert_eq!(handler_guard.len(), 1);
2212 assert_eq!(bar.open, trade.price);
2213 assert_eq!(bar.high, trade.price);
2214 assert_eq!(bar.low, trade.price);
2215 assert_eq!(bar.close, trade.price);
2216 assert_eq!(bar.volume, Quantity::from(300000));
2217 assert_eq!(bar.ts_event, trade.ts_event);
2218 assert_eq!(bar.ts_init, trade.ts_init);
2219 }
2220
2221 #[rstest]
2222 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2223 let instrument = InstrumentAny::Equity(equity_aapl);
2224 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2225 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2226 let handler = Arc::new(Mutex::new(Vec::new()));
2227 let handler_clone = Arc::clone(&handler);
2228
2229 let mut aggregator = TickBarAggregator::new(
2230 bar_type,
2231 instrument.price_precision(),
2232 instrument.size_precision(),
2233 move |bar: Bar| {
2234 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2235 handler_guard.push(bar);
2236 },
2237 );
2238
2239 aggregator.update(
2240 Price::from("1.00001"),
2241 Quantity::from(1),
2242 UnixNanos::default(),
2243 );
2244 aggregator.update(
2245 Price::from("1.00002"),
2246 Quantity::from(1),
2247 UnixNanos::from(1000),
2248 );
2249 aggregator.update(
2250 Price::from("1.00003"),
2251 Quantity::from(1),
2252 UnixNanos::from(2000),
2253 );
2254
2255 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2256 assert_eq!(handler_guard.len(), 1);
2257
2258 let bar = handler_guard.first().unwrap();
2259 assert_eq!(bar.open, Price::from("1.00001"));
2260 assert_eq!(bar.high, Price::from("1.00003"));
2261 assert_eq!(bar.low, Price::from("1.00001"));
2262 assert_eq!(bar.close, Price::from("1.00003"));
2263 assert_eq!(bar.volume, Quantity::from(3));
2264 }
2265
2266 #[rstest]
2267 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2268 let instrument = InstrumentAny::Equity(equity_aapl);
2269 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2270 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2271 let handler = Arc::new(Mutex::new(Vec::new()));
2272 let handler_clone = Arc::clone(&handler);
2273
2274 let mut aggregator = TickBarAggregator::new(
2275 bar_type,
2276 instrument.price_precision(),
2277 instrument.size_precision(),
2278 move |bar: Bar| {
2279 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2280 handler_guard.push(bar);
2281 },
2282 );
2283
2284 aggregator.update(
2285 Price::from("1.00001"),
2286 Quantity::from(1),
2287 UnixNanos::default(),
2288 );
2289 aggregator.update(
2290 Price::from("1.00002"),
2291 Quantity::from(1),
2292 UnixNanos::from(1000),
2293 );
2294 aggregator.update(
2295 Price::from("1.00003"),
2296 Quantity::from(1),
2297 UnixNanos::from(2000),
2298 );
2299 aggregator.update(
2300 Price::from("1.00004"),
2301 Quantity::from(1),
2302 UnixNanos::from(3000),
2303 );
2304
2305 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2306 assert_eq!(handler_guard.len(), 2);
2307
2308 let bar1 = &handler_guard[0];
2309 assert_eq!(bar1.open, Price::from("1.00001"));
2310 assert_eq!(bar1.close, Price::from("1.00002"));
2311 assert_eq!(bar1.volume, Quantity::from(2));
2312
2313 let bar2 = &handler_guard[1];
2314 assert_eq!(bar2.open, Price::from("1.00003"));
2315 assert_eq!(bar2.close, Price::from("1.00004"));
2316 assert_eq!(bar2.volume, Quantity::from(2));
2317 }
2318
2319 #[rstest]
2320 fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2321 let instrument = InstrumentAny::Equity(equity_aapl);
2322 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2323 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2324 let handler = Arc::new(Mutex::new(Vec::new()));
2325 let handler_clone = Arc::clone(&handler);
2326
2327 let mut aggregator = TickImbalanceBarAggregator::new(
2328 bar_type,
2329 instrument.price_precision(),
2330 instrument.size_precision(),
2331 move |bar: Bar| {
2332 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2333 handler_guard.push(bar);
2334 },
2335 );
2336
2337 let trade = TradeTick::default();
2338 aggregator.handle_trade(trade);
2339 aggregator.handle_trade(trade);
2340
2341 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2342 assert_eq!(handler_guard.len(), 1);
2343 let bar = handler_guard.first().unwrap();
2344 assert_eq!(bar.volume, Quantity::from(200000));
2345 }
2346
2347 #[rstest]
2348 fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2349 let instrument = InstrumentAny::Equity(equity_aapl);
2350 let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2351 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2352 let handler = Arc::new(Mutex::new(Vec::new()));
2353 let handler_clone = Arc::clone(&handler);
2354
2355 let mut aggregator = TickImbalanceBarAggregator::new(
2356 bar_type,
2357 instrument.price_precision(),
2358 instrument.size_precision(),
2359 move |bar: Bar| {
2360 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2361 handler_guard.push(bar);
2362 },
2363 );
2364
2365 let sell = TradeTick {
2366 aggressor_side: AggressorSide::Seller,
2367 ..TradeTick::default()
2368 };
2369
2370 aggregator.handle_trade(sell);
2371
2372 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2373 assert_eq!(handler_guard.len(), 1);
2374 }
2375
2376 #[rstest]
2377 fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2378 let instrument = InstrumentAny::Equity(equity_aapl);
2379 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2380 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2381 let handler = Arc::new(Mutex::new(Vec::new()));
2382 let handler_clone = Arc::clone(&handler);
2383
2384 let mut aggregator = TickRunsBarAggregator::new(
2385 bar_type,
2386 instrument.price_precision(),
2387 instrument.size_precision(),
2388 move |bar: Bar| {
2389 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2390 handler_guard.push(bar);
2391 },
2392 );
2393
2394 let buy = TradeTick::default();
2395 let sell = TradeTick {
2396 aggressor_side: AggressorSide::Seller,
2397 ..buy
2398 };
2399
2400 aggregator.handle_trade(buy);
2401 aggregator.handle_trade(buy);
2402 aggregator.handle_trade(sell);
2403 aggregator.handle_trade(sell);
2404
2405 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2406 assert_eq!(handler_guard.len(), 2);
2407 }
2408
2409 #[rstest]
2410 fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2411 let instrument = InstrumentAny::Equity(equity_aapl);
2412 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2413 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2414 let handler = Arc::new(Mutex::new(Vec::new()));
2415 let handler_clone = Arc::clone(&handler);
2416
2417 let mut aggregator = TickRunsBarAggregator::new(
2418 bar_type,
2419 instrument.price_precision(),
2420 instrument.size_precision(),
2421 move |bar: Bar| {
2422 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2423 handler_guard.push(bar);
2424 },
2425 );
2426
2427 let buy = TradeTick {
2428 size: Quantity::from(1),
2429 ..TradeTick::default()
2430 };
2431 let sell = TradeTick {
2432 aggressor_side: AggressorSide::Seller,
2433 size: Quantity::from(1),
2434 ..buy
2435 };
2436
2437 aggregator.handle_trade(buy);
2438 aggregator.handle_trade(buy);
2439 aggregator.handle_trade(sell);
2440 aggregator.handle_trade(sell);
2441
2442 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2443 assert_eq!(handler_guard.len(), 2);
2444 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2445 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2446 }
2447
2448 #[rstest]
2449 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2450 let instrument = InstrumentAny::Equity(equity_aapl);
2451 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2452 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2453 let handler = Arc::new(Mutex::new(Vec::new()));
2454 let handler_clone = Arc::clone(&handler);
2455
2456 let mut aggregator = VolumeBarAggregator::new(
2457 bar_type,
2458 instrument.price_precision(),
2459 instrument.size_precision(),
2460 move |bar: Bar| {
2461 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2462 handler_guard.push(bar);
2463 },
2464 );
2465
2466 aggregator.update(
2467 Price::from("1.00001"),
2468 Quantity::from(25),
2469 UnixNanos::default(),
2470 );
2471
2472 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2473 assert_eq!(handler_guard.len(), 2);
2474 let bar1 = &handler_guard[0];
2475 assert_eq!(bar1.volume, Quantity::from(10));
2476 let bar2 = &handler_guard[1];
2477 assert_eq!(bar2.volume, Quantity::from(10));
2478 }
2479
2480 #[rstest]
2481 fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2482 let instrument = InstrumentAny::Equity(equity_aapl);
2483 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2484 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2485 let handler = Arc::new(Mutex::new(Vec::new()));
2486 let handler_clone = Arc::clone(&handler);
2487
2488 let mut aggregator = VolumeRunsBarAggregator::new(
2489 bar_type,
2490 instrument.price_precision(),
2491 instrument.size_precision(),
2492 move |bar: Bar| {
2493 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2494 handler_guard.push(bar);
2495 },
2496 );
2497
2498 let buy = TradeTick {
2499 instrument_id: instrument.id(),
2500 price: Price::from("1.0"),
2501 size: Quantity::from(1),
2502 ..TradeTick::default()
2503 };
2504 let sell = TradeTick {
2505 aggressor_side: AggressorSide::Seller,
2506 ..buy
2507 };
2508
2509 aggregator.handle_trade(buy);
2510 aggregator.handle_trade(buy); aggregator.handle_trade(sell);
2512 aggregator.handle_trade(sell); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2515 assert!(handler_guard.len() >= 2);
2516 assert!(
2517 (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2518 < f64::EPSILON
2519 );
2520 }
2521
2522 #[rstest]
2523 fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2524 let instrument = InstrumentAny::Equity(equity_aapl);
2525 let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2526 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2527 let handler = Arc::new(Mutex::new(Vec::new()));
2528 let handler_clone = Arc::clone(&handler);
2529
2530 let mut aggregator = VolumeRunsBarAggregator::new(
2531 bar_type,
2532 instrument.price_precision(),
2533 instrument.size_precision(),
2534 move |bar: Bar| {
2535 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2536 handler_guard.push(bar);
2537 },
2538 );
2539
2540 let trade = TradeTick {
2541 instrument_id: instrument.id(),
2542 price: Price::from("1.0"),
2543 size: Quantity::from(5),
2544 ..TradeTick::default()
2545 };
2546
2547 aggregator.handle_trade(trade);
2548
2549 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2550 assert!(!handler_guard.is_empty());
2551 assert!(handler_guard[0].volume.as_f64() > 0.0);
2552 assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2553 }
2554
2555 #[rstest]
2556 fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2557 let instrument = InstrumentAny::Equity(equity_aapl);
2558 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2559 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2560 let handler = Arc::new(Mutex::new(Vec::new()));
2561 let handler_clone = Arc::clone(&handler);
2562
2563 let mut aggregator = VolumeImbalanceBarAggregator::new(
2564 bar_type,
2565 instrument.price_precision(),
2566 instrument.size_precision(),
2567 move |bar: Bar| {
2568 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2569 handler_guard.push(bar);
2570 },
2571 );
2572
2573 let trade_small = TradeTick {
2574 instrument_id: instrument.id(),
2575 price: Price::from("1.0"),
2576 size: Quantity::from(1),
2577 ..TradeTick::default()
2578 };
2579 let trade_large = TradeTick {
2580 size: Quantity::from(3),
2581 ..trade_small
2582 };
2583
2584 aggregator.handle_trade(trade_small);
2585 aggregator.handle_trade(trade_large);
2586
2587 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2588 assert_eq!(handler_guard.len(), 2);
2589 let total_output = handler_guard
2590 .iter()
2591 .map(|bar| bar.volume.as_f64())
2592 .sum::<f64>();
2593 let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2594 assert!((total_output - total_input).abs() < f64::EPSILON);
2595 }
2596
2597 #[rstest]
2598 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2599 let instrument = InstrumentAny::Equity(equity_aapl);
2600 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2602 let handler = Arc::new(Mutex::new(Vec::new()));
2603 let handler_clone = Arc::clone(&handler);
2604
2605 let mut aggregator = ValueBarAggregator::new(
2606 bar_type,
2607 instrument.price_precision(),
2608 instrument.size_precision(),
2609 move |bar: Bar| {
2610 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2611 handler_guard.push(bar);
2612 },
2613 );
2614
2615 aggregator.update(
2617 Price::from("100.00"),
2618 Quantity::from(5),
2619 UnixNanos::default(),
2620 );
2621 aggregator.update(
2622 Price::from("100.00"),
2623 Quantity::from(5),
2624 UnixNanos::from(1000),
2625 );
2626
2627 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2628 assert_eq!(handler_guard.len(), 1);
2629 let bar = handler_guard.first().unwrap();
2630 assert_eq!(bar.volume, Quantity::from(10));
2631 }
2632
2633 #[rstest]
2634 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2635 let instrument = InstrumentAny::Equity(equity_aapl);
2636 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2637 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2638 let handler = Arc::new(Mutex::new(Vec::new()));
2639 let handler_clone = Arc::clone(&handler);
2640
2641 let mut aggregator = ValueBarAggregator::new(
2642 bar_type,
2643 instrument.price_precision(),
2644 instrument.size_precision(),
2645 move |bar: Bar| {
2646 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2647 handler_guard.push(bar);
2648 },
2649 );
2650
2651 aggregator.update(
2653 Price::from("100.00"),
2654 Quantity::from(25),
2655 UnixNanos::default(),
2656 );
2657
2658 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2659 assert_eq!(handler_guard.len(), 2);
2660 let remaining_value = aggregator.get_cumulative_value();
2661 assert!(remaining_value < 1000.0); }
2663
2664 #[rstest]
2665 fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2666 let instrument = InstrumentAny::Equity(equity_aapl);
2667 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2668 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2669 let handler = Arc::new(Mutex::new(Vec::new()));
2670 let handler_clone = Arc::clone(&handler);
2671
2672 let mut aggregator = ValueBarAggregator::new(
2673 bar_type,
2674 instrument.price_precision(),
2675 instrument.size_precision(),
2676 move |bar: Bar| {
2677 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2678 handler_guard.push(bar);
2679 },
2680 );
2681
2682 aggregator.update(
2684 Price::from("0.00"),
2685 Quantity::from(100),
2686 UnixNanos::default(),
2687 );
2688
2689 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2691 assert_eq!(handler_guard.len(), 0);
2692
2693 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2695 }
2696
2697 #[rstest]
2698 fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2699 let instrument = InstrumentAny::Equity(equity_aapl);
2700 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2701 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2702 let handler = Arc::new(Mutex::new(Vec::new()));
2703 let handler_clone = Arc::clone(&handler);
2704
2705 let mut aggregator = ValueBarAggregator::new(
2706 bar_type,
2707 instrument.price_precision(),
2708 instrument.size_precision(),
2709 move |bar: Bar| {
2710 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2711 handler_guard.push(bar);
2712 },
2713 );
2714
2715 aggregator.update(
2717 Price::from("100.00"),
2718 Quantity::from(0),
2719 UnixNanos::default(),
2720 );
2721
2722 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2724 assert_eq!(handler_guard.len(), 0);
2725
2726 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2728 }
2729
2730 #[rstest]
2731 fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2732 let instrument = InstrumentAny::Equity(equity_aapl);
2733 let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2734 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2735 let handler = Arc::new(Mutex::new(Vec::new()));
2736 let handler_clone = Arc::clone(&handler);
2737
2738 let mut aggregator = ValueImbalanceBarAggregator::new(
2739 bar_type,
2740 instrument.price_precision(),
2741 instrument.size_precision(),
2742 move |bar: Bar| {
2743 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2744 handler_guard.push(bar);
2745 },
2746 );
2747
2748 let buy = TradeTick {
2749 price: Price::from("5.0"),
2750 size: Quantity::from(2), instrument_id: instrument.id(),
2752 ..TradeTick::default()
2753 };
2754 let sell = TradeTick {
2755 price: Price::from("5.0"),
2756 size: Quantity::from(2), aggressor_side: AggressorSide::Seller,
2758 instrument_id: instrument.id(),
2759 ..buy
2760 };
2761
2762 aggregator.handle_trade(buy);
2763 aggregator.handle_trade(sell);
2764
2765 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2766 assert!(handler_guard.is_empty());
2767 }
2768
2769 #[rstest]
2770 fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2771 let instrument = InstrumentAny::Equity(equity_aapl);
2772 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2773 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2774 let handler = Arc::new(Mutex::new(Vec::new()));
2775 let handler_clone = Arc::clone(&handler);
2776
2777 let mut aggregator = ValueRunsBarAggregator::new(
2778 bar_type,
2779 instrument.price_precision(),
2780 instrument.size_precision(),
2781 move |bar: Bar| {
2782 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2783 handler_guard.push(bar);
2784 },
2785 );
2786
2787 let trade = TradeTick {
2788 price: Price::from("10.0"),
2789 size: Quantity::from(5),
2790 instrument_id: instrument.id(),
2791 ..TradeTick::default()
2792 };
2793
2794 aggregator.handle_trade(trade);
2795 aggregator.handle_trade(trade);
2796
2797 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2798 assert_eq!(handler_guard.len(), 1);
2799 let bar = handler_guard.first().unwrap();
2800 assert_eq!(bar.volume, Quantity::from(10));
2801 }
2802
2803 #[rstest]
2804 fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2805 let instrument = InstrumentAny::Equity(equity_aapl);
2806 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2807 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2808 let handler = Arc::new(Mutex::new(Vec::new()));
2809 let handler_clone = Arc::clone(&handler);
2810
2811 let mut aggregator = ValueRunsBarAggregator::new(
2812 bar_type,
2813 instrument.price_precision(),
2814 instrument.size_precision(),
2815 move |bar: Bar| {
2816 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2817 handler_guard.push(bar);
2818 },
2819 );
2820
2821 let buy = TradeTick {
2822 price: Price::from("10.0"),
2823 size: Quantity::from(5),
2824 instrument_id: instrument.id(),
2825 ..TradeTick::default()
2826 }; let sell = TradeTick {
2828 price: Price::from("10.0"),
2829 size: Quantity::from(10),
2830 aggressor_side: AggressorSide::Seller,
2831 ..buy
2832 }; aggregator.handle_trade(buy);
2835 aggregator.handle_trade(sell);
2836
2837 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2838 assert_eq!(handler_guard.len(), 1);
2839 assert_eq!(handler_guard[0].volume, Quantity::from(10));
2840 }
2841
2842 #[rstest]
2843 fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2844 let instrument = InstrumentAny::Equity(equity_aapl);
2845 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2846 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2847 let handler = Arc::new(Mutex::new(Vec::new()));
2848 let handler_clone = Arc::clone(&handler);
2849
2850 let mut aggregator = TickRunsBarAggregator::new(
2851 bar_type,
2852 instrument.price_precision(),
2853 instrument.size_precision(),
2854 move |bar: Bar| {
2855 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2856 handler_guard.push(bar);
2857 },
2858 );
2859
2860 let buy = TradeTick::default();
2861
2862 aggregator.handle_trade(buy);
2863 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2868 assert_eq!(handler_guard.len(), 2);
2869 }
2870
2871 #[rstest]
2872 fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2873 let instrument = InstrumentAny::Equity(equity_aapl);
2874 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2875 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2876 let handler = Arc::new(Mutex::new(Vec::new()));
2877 let handler_clone = Arc::clone(&handler);
2878
2879 let mut aggregator = TickRunsBarAggregator::new(
2880 bar_type,
2881 instrument.price_precision(),
2882 instrument.size_precision(),
2883 move |bar: Bar| {
2884 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2885 handler_guard.push(bar);
2886 },
2887 );
2888
2889 let buy = TradeTick::default();
2890 let no_aggressor = TradeTick {
2891 aggressor_side: AggressorSide::NoAggressor,
2892 ..buy
2893 };
2894
2895 aggregator.handle_trade(buy);
2896 aggregator.handle_trade(no_aggressor); aggregator.handle_trade(no_aggressor); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2901 assert_eq!(handler_guard.len(), 1);
2902 }
2903
2904 #[rstest]
2905 fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2906 let instrument = InstrumentAny::Equity(equity_aapl);
2907 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2908 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2909 let handler = Arc::new(Mutex::new(Vec::new()));
2910 let handler_clone = Arc::clone(&handler);
2911
2912 let mut aggregator = VolumeRunsBarAggregator::new(
2913 bar_type,
2914 instrument.price_precision(),
2915 instrument.size_precision(),
2916 move |bar: Bar| {
2917 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2918 handler_guard.push(bar);
2919 },
2920 );
2921
2922 let buy = TradeTick {
2923 instrument_id: instrument.id(),
2924 price: Price::from("1.0"),
2925 size: Quantity::from(1),
2926 ..TradeTick::default()
2927 };
2928
2929 aggregator.handle_trade(buy);
2930 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2935 assert_eq!(handler_guard.len(), 2);
2936 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2937 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2938 }
2939
2940 #[rstest]
2941 fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2942 let instrument = InstrumentAny::Equity(equity_aapl);
2943 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2944 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2945 let handler = Arc::new(Mutex::new(Vec::new()));
2946 let handler_clone = Arc::clone(&handler);
2947
2948 let mut aggregator = ValueRunsBarAggregator::new(
2949 bar_type,
2950 instrument.price_precision(),
2951 instrument.size_precision(),
2952 move |bar: Bar| {
2953 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2954 handler_guard.push(bar);
2955 },
2956 );
2957
2958 let buy = TradeTick {
2959 instrument_id: instrument.id(),
2960 price: Price::from("10.0"),
2961 size: Quantity::from(5),
2962 ..TradeTick::default()
2963 }; aggregator.handle_trade(buy);
2966 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2971 assert_eq!(handler_guard.len(), 2);
2972 assert_eq!(handler_guard[0].volume, Quantity::from(10));
2973 assert_eq!(handler_guard[1].volume, Quantity::from(10));
2974 }
2975
2976 #[rstest]
2977 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
2978 let instrument = InstrumentAny::Equity(equity_aapl);
2979 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2981 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2982 let handler = Arc::new(Mutex::new(Vec::new()));
2983 let handler_clone = Arc::clone(&handler);
2984 let clock = Rc::new(RefCell::new(TestClock::new()));
2985
2986 let mut aggregator = TimeBarAggregator::new(
2987 bar_type,
2988 instrument.price_precision(),
2989 instrument.size_precision(),
2990 clock.clone(),
2991 move |bar: Bar| {
2992 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2993 handler_guard.push(bar);
2994 },
2995 true, false, BarIntervalType::LeftOpen,
2998 None, 15, false, );
3002
3003 aggregator.update(
3004 Price::from("100.00"),
3005 Quantity::from(1),
3006 UnixNanos::default(),
3007 );
3008
3009 let next_sec = UnixNanos::from(1_000_000_000);
3010 clock.borrow_mut().set_time(next_sec);
3011
3012 let event = TimeEvent::new(
3013 Ustr::from("1-SECOND-LAST"),
3014 UUID4::new(),
3015 next_sec,
3016 next_sec,
3017 );
3018 aggregator.build_bar(event);
3019
3020 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3021 assert_eq!(handler_guard.len(), 1);
3022 let bar = handler_guard.first().unwrap();
3023 assert_eq!(bar.ts_event, UnixNanos::default());
3024 assert_eq!(bar.ts_init, next_sec);
3025 }
3026
3027 #[rstest]
3028 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3029 let instrument = InstrumentAny::Equity(equity_aapl);
3030 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3031 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3032 let handler = Arc::new(Mutex::new(Vec::new()));
3033 let handler_clone = Arc::clone(&handler);
3034 let clock = Rc::new(RefCell::new(TestClock::new()));
3035
3036 let mut aggregator = TimeBarAggregator::new(
3037 bar_type,
3038 instrument.price_precision(),
3039 instrument.size_precision(),
3040 clock.clone(),
3041 move |bar: Bar| {
3042 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3043 handler_guard.push(bar);
3044 },
3045 true, true, BarIntervalType::LeftOpen,
3048 None,
3049 15,
3050 false, );
3052
3053 aggregator.update(
3055 Price::from("100.00"),
3056 Quantity::from(1),
3057 UnixNanos::default(),
3058 );
3059
3060 let ts1 = UnixNanos::from(1_000_000_000);
3062 clock.borrow_mut().set_time(ts1);
3063 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3064 aggregator.build_bar(event);
3065
3066 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3068
3069 let ts2 = UnixNanos::from(2_000_000_000);
3071 clock.borrow_mut().set_time(ts2);
3072 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3073 aggregator.build_bar(event);
3074
3075 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3076 assert_eq!(handler_guard.len(), 2);
3077
3078 let bar1 = &handler_guard[0];
3079 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
3081 assert_eq!(bar1.close, Price::from("100.00"));
3082 let bar2 = &handler_guard[1];
3083 assert_eq!(bar2.ts_event, ts2);
3084 assert_eq!(bar2.ts_init, ts2);
3085 assert_eq!(bar2.close, Price::from("101.00"));
3086 }
3087
3088 #[rstest]
3089 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3090 let instrument = InstrumentAny::Equity(equity_aapl);
3091 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3092 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3093 let handler = Arc::new(Mutex::new(Vec::new()));
3094 let handler_clone = Arc::clone(&handler);
3095 let clock = Rc::new(RefCell::new(TestClock::new()));
3096 let mut aggregator = TimeBarAggregator::new(
3097 bar_type,
3098 instrument.price_precision(),
3099 instrument.size_precision(),
3100 clock.clone(),
3101 move |bar: Bar| {
3102 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3103 handler_guard.push(bar);
3104 },
3105 true, true, BarIntervalType::RightOpen,
3108 None,
3109 15,
3110 false, );
3112
3113 aggregator.update(
3115 Price::from("100.00"),
3116 Quantity::from(1),
3117 UnixNanos::default(),
3118 );
3119
3120 let ts1 = UnixNanos::from(1_000_000_000);
3122 clock.borrow_mut().set_time(ts1);
3123 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3124 aggregator.build_bar(event);
3125
3126 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3128
3129 let ts2 = UnixNanos::from(2_000_000_000);
3131 clock.borrow_mut().set_time(ts2);
3132 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3133 aggregator.build_bar(event);
3134
3135 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3136 assert_eq!(handler_guard.len(), 2);
3137
3138 let bar1 = &handler_guard[0];
3139 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
3141 assert_eq!(bar1.close, Price::from("100.00"));
3142
3143 let bar2 = &handler_guard[1];
3144 assert_eq!(bar2.ts_event, ts1);
3145 assert_eq!(bar2.ts_init, ts2);
3146 assert_eq!(bar2.close, Price::from("101.00"));
3147 }
3148
3149 #[rstest]
3150 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3151 let instrument = InstrumentAny::Equity(equity_aapl);
3152 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3153 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3154 let handler = Arc::new(Mutex::new(Vec::new()));
3155 let handler_clone = Arc::clone(&handler);
3156 let clock = Rc::new(RefCell::new(TestClock::new()));
3157
3158 let mut aggregator = TimeBarAggregator::new(
3160 bar_type,
3161 instrument.price_precision(),
3162 instrument.size_precision(),
3163 clock.clone(),
3164 move |bar: Bar| {
3165 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3166 handler_guard.push(bar);
3167 },
3168 false, true, BarIntervalType::LeftOpen,
3171 None,
3172 15,
3173 false, );
3175
3176 let ts1 = UnixNanos::from(1_000_000_000);
3178 clock.borrow_mut().set_time(ts1);
3179 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3180 aggregator.build_bar(event);
3181
3182 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3183 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
3185
3186 let handler = Arc::new(Mutex::new(Vec::new()));
3188 let handler_clone = Arc::clone(&handler);
3189 let mut aggregator = TimeBarAggregator::new(
3190 bar_type,
3191 instrument.price_precision(),
3192 instrument.size_precision(),
3193 clock.clone(),
3194 move |bar: Bar| {
3195 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3196 handler_guard.push(bar);
3197 },
3198 true, true, BarIntervalType::LeftOpen,
3201 None,
3202 15,
3203 false, );
3205
3206 aggregator.update(
3207 Price::from("100.00"),
3208 Quantity::from(1),
3209 UnixNanos::default(),
3210 );
3211
3212 let ts1 = UnixNanos::from(1_000_000_000);
3214 clock.borrow_mut().set_time(ts1);
3215 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3216 aggregator.build_bar(event);
3217
3218 let ts2 = UnixNanos::from(2_000_000_000);
3220 clock.borrow_mut().set_time(ts2);
3221 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3222 aggregator.build_bar(event);
3223
3224 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3225 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
3227 assert_eq!(bar1.close, Price::from("100.00"));
3228 let bar2 = &handler_guard[1];
3229 assert_eq!(bar2.close, Price::from("100.00")); }
3231
3232 #[rstest]
3233 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3234 let instrument = InstrumentAny::Equity(equity_aapl);
3235 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3236 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3237 let clock = Rc::new(RefCell::new(TestClock::new()));
3238 let handler = Arc::new(Mutex::new(Vec::new()));
3239 let handler_clone = Arc::clone(&handler);
3240
3241 let mut aggregator = TimeBarAggregator::new(
3242 bar_type,
3243 instrument.price_precision(),
3244 instrument.size_precision(),
3245 clock.clone(),
3246 move |bar: Bar| {
3247 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3248 handler_guard.push(bar);
3249 },
3250 true, true, BarIntervalType::RightOpen,
3253 None,
3254 15,
3255 false, );
3257
3258 let ts1 = UnixNanos::from(1_000_000_000);
3259 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3260
3261 let ts2 = UnixNanos::from(2_000_000_000);
3262 clock.borrow_mut().set_time(ts2);
3263
3264 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3266 aggregator.build_bar(event);
3267
3268 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3269 let bar = handler_guard.first().unwrap();
3270 assert_eq!(bar.ts_event, UnixNanos::default());
3271 assert_eq!(bar.ts_init, ts2);
3272 }
3273
3274 #[rstest]
3279 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3280 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3281 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3283 let handler = Arc::new(Mutex::new(Vec::new()));
3284 let handler_clone = Arc::clone(&handler);
3285
3286 let aggregator = RenkoBarAggregator::new(
3287 bar_type,
3288 instrument.price_precision(),
3289 instrument.size_precision(),
3290 instrument.price_increment(),
3291 move |bar: Bar| {
3292 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3293 handler_guard.push(bar);
3294 },
3295 );
3296
3297 assert_eq!(aggregator.bar_type(), bar_type);
3298 assert!(!aggregator.is_running());
3299 let expected_brick_size = 10 * instrument.price_increment().raw;
3301 assert_eq!(aggregator.brick_size, expected_brick_size);
3302 }
3303
3304 #[rstest]
3305 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3306 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3307 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3309 let handler = Arc::new(Mutex::new(Vec::new()));
3310 let handler_clone = Arc::clone(&handler);
3311
3312 let mut aggregator = RenkoBarAggregator::new(
3313 bar_type,
3314 instrument.price_precision(),
3315 instrument.size_precision(),
3316 instrument.price_increment(),
3317 move |bar: Bar| {
3318 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3319 handler_guard.push(bar);
3320 },
3321 );
3322
3323 aggregator.update(
3325 Price::from("1.00000"),
3326 Quantity::from(1),
3327 UnixNanos::default(),
3328 );
3329 aggregator.update(
3330 Price::from("1.00005"),
3331 Quantity::from(1),
3332 UnixNanos::from(1000),
3333 );
3334
3335 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3336 assert_eq!(handler_guard.len(), 0); }
3338
3339 #[rstest]
3340 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3341 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3342 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3344 let handler = Arc::new(Mutex::new(Vec::new()));
3345 let handler_clone = Arc::clone(&handler);
3346
3347 let mut aggregator = RenkoBarAggregator::new(
3348 bar_type,
3349 instrument.price_precision(),
3350 instrument.size_precision(),
3351 instrument.price_increment(),
3352 move |bar: Bar| {
3353 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3354 handler_guard.push(bar);
3355 },
3356 );
3357
3358 aggregator.update(
3360 Price::from("1.00000"),
3361 Quantity::from(1),
3362 UnixNanos::default(),
3363 );
3364 aggregator.update(
3365 Price::from("1.00015"),
3366 Quantity::from(1),
3367 UnixNanos::from(1000),
3368 );
3369
3370 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3371 assert_eq!(handler_guard.len(), 1);
3372
3373 let bar = handler_guard.first().unwrap();
3374 assert_eq!(bar.open, Price::from("1.00000"));
3375 assert_eq!(bar.high, Price::from("1.00010"));
3376 assert_eq!(bar.low, Price::from("1.00000"));
3377 assert_eq!(bar.close, Price::from("1.00010"));
3378 assert_eq!(bar.volume, Quantity::from(2));
3379 assert_eq!(bar.ts_event, UnixNanos::from(1000));
3380 assert_eq!(bar.ts_init, UnixNanos::from(1000));
3381 }
3382
3383 #[rstest]
3384 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3385 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3386 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3388 let handler = Arc::new(Mutex::new(Vec::new()));
3389 let handler_clone = Arc::clone(&handler);
3390
3391 let mut aggregator = RenkoBarAggregator::new(
3392 bar_type,
3393 instrument.price_precision(),
3394 instrument.size_precision(),
3395 instrument.price_increment(),
3396 move |bar: Bar| {
3397 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3398 handler_guard.push(bar);
3399 },
3400 );
3401
3402 aggregator.update(
3404 Price::from("1.00000"),
3405 Quantity::from(1),
3406 UnixNanos::default(),
3407 );
3408 aggregator.update(
3409 Price::from("1.00025"),
3410 Quantity::from(1),
3411 UnixNanos::from(1000),
3412 );
3413
3414 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3415 assert_eq!(handler_guard.len(), 2);
3416
3417 let bar1 = &handler_guard[0];
3418 assert_eq!(bar1.open, Price::from("1.00000"));
3419 assert_eq!(bar1.high, Price::from("1.00010"));
3420 assert_eq!(bar1.low, Price::from("1.00000"));
3421 assert_eq!(bar1.close, Price::from("1.00010"));
3422
3423 let bar2 = &handler_guard[1];
3424 assert_eq!(bar2.open, Price::from("1.00010"));
3425 assert_eq!(bar2.high, Price::from("1.00020"));
3426 assert_eq!(bar2.low, Price::from("1.00010"));
3427 assert_eq!(bar2.close, Price::from("1.00020"));
3428 }
3429
3430 #[rstest]
3431 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3432 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3433 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3435 let handler = Arc::new(Mutex::new(Vec::new()));
3436 let handler_clone = Arc::clone(&handler);
3437
3438 let mut aggregator = RenkoBarAggregator::new(
3439 bar_type,
3440 instrument.price_precision(),
3441 instrument.size_precision(),
3442 instrument.price_increment(),
3443 move |bar: Bar| {
3444 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3445 handler_guard.push(bar);
3446 },
3447 );
3448
3449 aggregator.update(
3451 Price::from("1.00020"),
3452 Quantity::from(1),
3453 UnixNanos::default(),
3454 );
3455 aggregator.update(
3456 Price::from("1.00005"),
3457 Quantity::from(1),
3458 UnixNanos::from(1000),
3459 );
3460
3461 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3462 assert_eq!(handler_guard.len(), 1);
3463
3464 let bar = handler_guard.first().unwrap();
3465 assert_eq!(bar.open, Price::from("1.00020"));
3466 assert_eq!(bar.high, Price::from("1.00020"));
3467 assert_eq!(bar.low, Price::from("1.00010"));
3468 assert_eq!(bar.close, Price::from("1.00010"));
3469 assert_eq!(bar.volume, Quantity::from(2));
3470 }
3471
3472 #[rstest]
3473 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3474 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3475 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3477 let handler = Arc::new(Mutex::new(Vec::new()));
3478 let handler_clone = Arc::clone(&handler);
3479
3480 let mut aggregator = RenkoBarAggregator::new(
3481 bar_type,
3482 instrument.price_precision(),
3483 instrument.size_precision(),
3484 instrument.price_increment(),
3485 move |bar: Bar| {
3486 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3487 handler_guard.push(bar);
3488 },
3489 );
3490
3491 let input_bar = Bar::new(
3493 BarType::new(
3494 instrument.id(),
3495 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3496 AggregationSource::Internal,
3497 ),
3498 Price::from("1.00000"),
3499 Price::from("1.00005"),
3500 Price::from("0.99995"),
3501 Price::from("1.00005"), Quantity::from(100),
3503 UnixNanos::default(),
3504 UnixNanos::from(1000),
3505 );
3506
3507 aggregator.handle_bar(input_bar);
3508
3509 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3510 assert_eq!(handler_guard.len(), 0); }
3512
3513 #[rstest]
3514 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3515 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3516 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3518 let handler = Arc::new(Mutex::new(Vec::new()));
3519 let handler_clone = Arc::clone(&handler);
3520
3521 let mut aggregator = RenkoBarAggregator::new(
3522 bar_type,
3523 instrument.price_precision(),
3524 instrument.size_precision(),
3525 instrument.price_increment(),
3526 move |bar: Bar| {
3527 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3528 handler_guard.push(bar);
3529 },
3530 );
3531
3532 let bar1 = Bar::new(
3534 BarType::new(
3535 instrument.id(),
3536 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3537 AggregationSource::Internal,
3538 ),
3539 Price::from("1.00000"),
3540 Price::from("1.00005"),
3541 Price::from("0.99995"),
3542 Price::from("1.00000"),
3543 Quantity::from(100),
3544 UnixNanos::default(),
3545 UnixNanos::default(),
3546 );
3547
3548 let bar2 = Bar::new(
3550 BarType::new(
3551 instrument.id(),
3552 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3553 AggregationSource::Internal,
3554 ),
3555 Price::from("1.00000"),
3556 Price::from("1.00015"),
3557 Price::from("0.99995"),
3558 Price::from("1.00010"), Quantity::from(50),
3560 UnixNanos::from(60_000_000_000),
3561 UnixNanos::from(60_000_000_000),
3562 );
3563
3564 aggregator.handle_bar(bar1);
3565 aggregator.handle_bar(bar2);
3566
3567 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3568 assert_eq!(handler_guard.len(), 1);
3569
3570 let bar = handler_guard.first().unwrap();
3571 assert_eq!(bar.open, Price::from("1.00000"));
3572 assert_eq!(bar.high, Price::from("1.00010"));
3573 assert_eq!(bar.low, Price::from("1.00000"));
3574 assert_eq!(bar.close, Price::from("1.00010"));
3575 assert_eq!(bar.volume, Quantity::from(150));
3576 }
3577
3578 #[rstest]
3579 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3580 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3581 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3583 let handler = Arc::new(Mutex::new(Vec::new()));
3584 let handler_clone = Arc::clone(&handler);
3585
3586 let mut aggregator = RenkoBarAggregator::new(
3587 bar_type,
3588 instrument.price_precision(),
3589 instrument.size_precision(),
3590 instrument.price_increment(),
3591 move |bar: Bar| {
3592 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3593 handler_guard.push(bar);
3594 },
3595 );
3596
3597 let bar1 = Bar::new(
3599 BarType::new(
3600 instrument.id(),
3601 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3602 AggregationSource::Internal,
3603 ),
3604 Price::from("1.00000"),
3605 Price::from("1.00005"),
3606 Price::from("0.99995"),
3607 Price::from("1.00000"),
3608 Quantity::from(100),
3609 UnixNanos::default(),
3610 UnixNanos::default(),
3611 );
3612
3613 let bar2 = Bar::new(
3615 BarType::new(
3616 instrument.id(),
3617 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3618 AggregationSource::Internal,
3619 ),
3620 Price::from("1.00000"),
3621 Price::from("1.00035"),
3622 Price::from("0.99995"),
3623 Price::from("1.00030"), Quantity::from(50),
3625 UnixNanos::from(60_000_000_000),
3626 UnixNanos::from(60_000_000_000),
3627 );
3628
3629 aggregator.handle_bar(bar1);
3630 aggregator.handle_bar(bar2);
3631
3632 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3633 assert_eq!(handler_guard.len(), 3);
3634
3635 let bar1 = &handler_guard[0];
3636 assert_eq!(bar1.open, Price::from("1.00000"));
3637 assert_eq!(bar1.close, Price::from("1.00010"));
3638
3639 let bar2 = &handler_guard[1];
3640 assert_eq!(bar2.open, Price::from("1.00010"));
3641 assert_eq!(bar2.close, Price::from("1.00020"));
3642
3643 let bar3 = &handler_guard[2];
3644 assert_eq!(bar3.open, Price::from("1.00020"));
3645 assert_eq!(bar3.close, Price::from("1.00030"));
3646 }
3647
3648 #[rstest]
3649 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3650 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3651 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3653 let handler = Arc::new(Mutex::new(Vec::new()));
3654 let handler_clone = Arc::clone(&handler);
3655
3656 let mut aggregator = RenkoBarAggregator::new(
3657 bar_type,
3658 instrument.price_precision(),
3659 instrument.size_precision(),
3660 instrument.price_increment(),
3661 move |bar: Bar| {
3662 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3663 handler_guard.push(bar);
3664 },
3665 );
3666
3667 let bar1 = Bar::new(
3669 BarType::new(
3670 instrument.id(),
3671 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3672 AggregationSource::Internal,
3673 ),
3674 Price::from("1.00020"),
3675 Price::from("1.00025"),
3676 Price::from("1.00015"),
3677 Price::from("1.00020"),
3678 Quantity::from(100),
3679 UnixNanos::default(),
3680 UnixNanos::default(),
3681 );
3682
3683 let bar2 = Bar::new(
3685 BarType::new(
3686 instrument.id(),
3687 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3688 AggregationSource::Internal,
3689 ),
3690 Price::from("1.00020"),
3691 Price::from("1.00025"),
3692 Price::from("1.00005"),
3693 Price::from("1.00010"), Quantity::from(50),
3695 UnixNanos::from(60_000_000_000),
3696 UnixNanos::from(60_000_000_000),
3697 );
3698
3699 aggregator.handle_bar(bar1);
3700 aggregator.handle_bar(bar2);
3701
3702 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3703 assert_eq!(handler_guard.len(), 1);
3704
3705 let bar = handler_guard.first().unwrap();
3706 assert_eq!(bar.open, Price::from("1.00020"));
3707 assert_eq!(bar.high, Price::from("1.00020"));
3708 assert_eq!(bar.low, Price::from("1.00010"));
3709 assert_eq!(bar.close, Price::from("1.00010"));
3710 assert_eq!(bar.volume, Quantity::from(150));
3711 }
3712
3713 #[rstest]
3714 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3715 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3716
3717 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3720 let handler = Arc::new(Mutex::new(Vec::new()));
3721 let handler_clone = Arc::clone(&handler);
3722
3723 let aggregator_5 = RenkoBarAggregator::new(
3724 bar_type_5,
3725 instrument.price_precision(),
3726 instrument.size_precision(),
3727 instrument.price_increment(),
3728 move |_bar: Bar| {
3729 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3730 handler_guard.push(_bar);
3731 },
3732 );
3733
3734 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3736 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3737
3738 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3740 let handler2 = Arc::new(Mutex::new(Vec::new()));
3741 let handler2_clone = Arc::clone(&handler2);
3742
3743 let aggregator_20 = RenkoBarAggregator::new(
3744 bar_type_20,
3745 instrument.price_precision(),
3746 instrument.size_precision(),
3747 instrument.price_increment(),
3748 move |_bar: Bar| {
3749 let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3750 handler_guard.push(_bar);
3751 },
3752 );
3753
3754 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3756 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3757 }
3758
3759 #[rstest]
3760 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3761 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3762 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3764 let handler = Arc::new(Mutex::new(Vec::new()));
3765 let handler_clone = Arc::clone(&handler);
3766
3767 let mut aggregator = RenkoBarAggregator::new(
3768 bar_type,
3769 instrument.price_precision(),
3770 instrument.size_precision(),
3771 instrument.price_increment(),
3772 move |bar: Bar| {
3773 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3774 handler_guard.push(bar);
3775 },
3776 );
3777
3778 aggregator.update(
3780 Price::from("1.00000"),
3781 Quantity::from(1),
3782 UnixNanos::from(1000),
3783 );
3784 aggregator.update(
3785 Price::from("1.00010"),
3786 Quantity::from(1),
3787 UnixNanos::from(2000),
3788 ); aggregator.update(
3790 Price::from("1.00020"),
3791 Quantity::from(1),
3792 UnixNanos::from(3000),
3793 ); aggregator.update(
3795 Price::from("1.00025"),
3796 Quantity::from(1),
3797 UnixNanos::from(4000),
3798 ); aggregator.update(
3800 Price::from("1.00030"),
3801 Quantity::from(1),
3802 UnixNanos::from(5000),
3803 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3806 assert_eq!(handler_guard.len(), 3);
3807
3808 let bar1 = &handler_guard[0];
3809 assert_eq!(bar1.open, Price::from("1.00000"));
3810 assert_eq!(bar1.close, Price::from("1.00010"));
3811
3812 let bar2 = &handler_guard[1];
3813 assert_eq!(bar2.open, Price::from("1.00010"));
3814 assert_eq!(bar2.close, Price::from("1.00020"));
3815
3816 let bar3 = &handler_guard[2];
3817 assert_eq!(bar3.open, Price::from("1.00020"));
3818 assert_eq!(bar3.close, Price::from("1.00030"));
3819 }
3820
3821 #[rstest]
3822 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3823 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3824 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3826 let handler = Arc::new(Mutex::new(Vec::new()));
3827 let handler_clone = Arc::clone(&handler);
3828
3829 let mut aggregator = RenkoBarAggregator::new(
3830 bar_type,
3831 instrument.price_precision(),
3832 instrument.size_precision(),
3833 instrument.price_increment(),
3834 move |bar: Bar| {
3835 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3836 handler_guard.push(bar);
3837 },
3838 );
3839
3840 aggregator.update(
3842 Price::from("1.00000"),
3843 Quantity::from(1),
3844 UnixNanos::from(1000),
3845 );
3846 aggregator.update(
3847 Price::from("1.00010"),
3848 Quantity::from(1),
3849 UnixNanos::from(2000),
3850 ); aggregator.update(
3852 Price::from("0.99990"),
3853 Quantity::from(1),
3854 UnixNanos::from(3000),
3855 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3858 assert_eq!(handler_guard.len(), 3);
3859
3860 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
3862 assert_eq!(bar1.high, Price::from("1.00010"));
3863 assert_eq!(bar1.low, Price::from("1.00000"));
3864 assert_eq!(bar1.close, Price::from("1.00010"));
3865
3866 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
3868 assert_eq!(bar2.high, Price::from("1.00010"));
3869 assert_eq!(bar2.low, Price::from("1.00000"));
3870 assert_eq!(bar2.close, Price::from("1.00000"));
3871
3872 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
3874 assert_eq!(bar3.high, Price::from("1.00000"));
3875 assert_eq!(bar3.low, Price::from("0.99990"));
3876 assert_eq!(bar3.close, Price::from("0.99990"));
3877 }
3878
3879 #[rstest]
3880 fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3881 let instrument = InstrumentAny::Equity(equity_aapl);
3882 let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3883 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3884 let handler = Arc::new(Mutex::new(Vec::new()));
3885 let handler_clone = Arc::clone(&handler);
3886
3887 let mut aggregator = TickImbalanceBarAggregator::new(
3888 bar_type,
3889 instrument.price_precision(),
3890 instrument.size_precision(),
3891 move |bar: Bar| {
3892 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3893 handler_guard.push(bar);
3894 },
3895 );
3896
3897 let buy = TradeTick {
3898 aggressor_side: AggressorSide::Buyer,
3899 ..TradeTick::default()
3900 };
3901 let sell = TradeTick {
3902 aggressor_side: AggressorSide::Seller,
3903 ..TradeTick::default()
3904 };
3905
3906 aggregator.handle_trade(buy);
3907 aggregator.handle_trade(sell);
3908 aggregator.handle_trade(buy);
3909
3910 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3911 assert_eq!(handler_guard.len(), 0);
3912 }
3913
3914 #[rstest]
3915 fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3916 let instrument = InstrumentAny::Equity(equity_aapl);
3917 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3918 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3919 let handler = Arc::new(Mutex::new(Vec::new()));
3920 let handler_clone = Arc::clone(&handler);
3921
3922 let mut aggregator = TickImbalanceBarAggregator::new(
3923 bar_type,
3924 instrument.price_precision(),
3925 instrument.size_precision(),
3926 move |bar: Bar| {
3927 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3928 handler_guard.push(bar);
3929 },
3930 );
3931
3932 let buy = TradeTick {
3933 aggressor_side: AggressorSide::Buyer,
3934 ..TradeTick::default()
3935 };
3936 let no_aggressor = TradeTick {
3937 aggressor_side: AggressorSide::NoAggressor,
3938 ..TradeTick::default()
3939 };
3940
3941 aggregator.handle_trade(buy);
3942 aggregator.handle_trade(no_aggressor);
3943 aggregator.handle_trade(buy);
3944
3945 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3946 assert_eq!(handler_guard.len(), 1);
3947 }
3948
3949 #[rstest]
3950 fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3951 let instrument = InstrumentAny::Equity(equity_aapl);
3952 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3953 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3954 let handler = Arc::new(Mutex::new(Vec::new()));
3955 let handler_clone = Arc::clone(&handler);
3956
3957 let mut aggregator = TickRunsBarAggregator::new(
3958 bar_type,
3959 instrument.price_precision(),
3960 instrument.size_precision(),
3961 move |bar: Bar| {
3962 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3963 handler_guard.push(bar);
3964 },
3965 );
3966
3967 let buy = TradeTick {
3968 aggressor_side: AggressorSide::Buyer,
3969 ..TradeTick::default()
3970 };
3971 let sell = TradeTick {
3972 aggressor_side: AggressorSide::Seller,
3973 ..TradeTick::default()
3974 };
3975
3976 aggregator.handle_trade(buy);
3977 aggregator.handle_trade(buy);
3978 aggregator.handle_trade(sell);
3979 aggregator.handle_trade(sell);
3980
3981 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3982 assert_eq!(handler_guard.len(), 2);
3983 }
3984
3985 #[rstest]
3986 fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
3987 let instrument = InstrumentAny::Equity(equity_aapl);
3988 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
3989 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3990 let handler = Arc::new(Mutex::new(Vec::new()));
3991 let handler_clone = Arc::clone(&handler);
3992
3993 let mut aggregator = VolumeImbalanceBarAggregator::new(
3994 bar_type,
3995 instrument.price_precision(),
3996 instrument.size_precision(),
3997 move |bar: Bar| {
3998 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3999 handler_guard.push(bar);
4000 },
4001 );
4002
4003 let large_trade = TradeTick {
4004 size: Quantity::from(25),
4005 aggressor_side: AggressorSide::Buyer,
4006 ..TradeTick::default()
4007 };
4008
4009 aggregator.handle_trade(large_trade);
4010
4011 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4012 assert_eq!(handler_guard.len(), 2);
4013 }
4014
4015 #[rstest]
4016 fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4017 equity_aapl: Equity,
4018 ) {
4019 let instrument = InstrumentAny::Equity(equity_aapl);
4020 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4021 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4022 let handler = Arc::new(Mutex::new(Vec::new()));
4023 let handler_clone = Arc::clone(&handler);
4024
4025 let mut aggregator = VolumeImbalanceBarAggregator::new(
4026 bar_type,
4027 instrument.price_precision(),
4028 instrument.size_precision(),
4029 move |bar: Bar| {
4030 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4031 handler_guard.push(bar);
4032 },
4033 );
4034
4035 let buy = TradeTick {
4036 size: Quantity::from(5),
4037 aggressor_side: AggressorSide::Buyer,
4038 ..TradeTick::default()
4039 };
4040 let no_aggressor = TradeTick {
4041 size: Quantity::from(3),
4042 aggressor_side: AggressorSide::NoAggressor,
4043 ..TradeTick::default()
4044 };
4045
4046 aggregator.handle_trade(buy);
4047 aggregator.handle_trade(no_aggressor);
4048 aggregator.handle_trade(buy);
4049
4050 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4051 assert_eq!(handler_guard.len(), 1);
4052 }
4053
4054 #[rstest]
4055 fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4056 let instrument = InstrumentAny::Equity(equity_aapl);
4057 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4058 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4059 let handler = Arc::new(Mutex::new(Vec::new()));
4060 let handler_clone = Arc::clone(&handler);
4061
4062 let mut aggregator = VolumeRunsBarAggregator::new(
4063 bar_type,
4064 instrument.price_precision(),
4065 instrument.size_precision(),
4066 move |bar: Bar| {
4067 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4068 handler_guard.push(bar);
4069 },
4070 );
4071
4072 let large_trade = TradeTick {
4073 size: Quantity::from(25),
4074 aggressor_side: AggressorSide::Buyer,
4075 ..TradeTick::default()
4076 };
4077
4078 aggregator.handle_trade(large_trade);
4079
4080 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4081 assert_eq!(handler_guard.len(), 2);
4082 }
4083
4084 #[rstest]
4085 fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4086 let instrument = InstrumentAny::Equity(equity_aapl);
4087 let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4088 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4089 let handler = Arc::new(Mutex::new(Vec::new()));
4090 let handler_clone = Arc::clone(&handler);
4091
4092 let mut aggregator = ValueRunsBarAggregator::new(
4093 bar_type,
4094 instrument.price_precision(),
4095 instrument.size_precision(),
4096 move |bar: Bar| {
4097 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4098 handler_guard.push(bar);
4099 },
4100 );
4101
4102 let large_trade = TradeTick {
4103 price: Price::from("5.00"),
4104 size: Quantity::from(25),
4105 aggressor_side: AggressorSide::Buyer,
4106 ..TradeTick::default()
4107 };
4108
4109 aggregator.handle_trade(large_trade);
4110
4111 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4112 assert_eq!(handler_guard.len(), 2);
4113 }
4114}