1use std::{
22 any::Any,
23 cell::RefCell,
24 fmt::Debug,
25 ops::Add,
26 rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31 clock::{Clock, TestClock},
32 timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35 UnixNanos,
36 correctness::{self, FAILED},
37 datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40 data::{
41 QuoteTick, TradeTick,
42 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43 },
44 enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45 types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48type BarHandler = Box<dyn FnMut(Bar)>;
50
51pub trait BarAggregator: Any + Debug {
55 fn bar_type(&self) -> BarType;
57 fn is_running(&self) -> bool;
59 fn set_is_running(&mut self, value: bool);
61 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63 fn handle_quote(&mut self, quote: QuoteTick) {
65 let spec = self.bar_type().spec();
66 self.update(
67 quote.extract_price(spec.price_type),
68 quote.extract_size(spec.price_type),
69 quote.ts_init,
70 );
71 }
72 fn handle_trade(&mut self, trade: TradeTick) {
74 self.update(trade.price, trade.size, trade.ts_init);
75 }
76 fn handle_bar(&mut self, bar: Bar) {
78 self.update_bar(bar, bar.volume, bar.ts_init);
79 }
80 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81 fn stop(&mut self) {}
83 fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85 fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87 fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89 fn build_bar(&mut self, _event: TimeEvent) {}
91 fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95 fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101 pub fn as_any(&self) -> &dyn Any {
103 self
104 }
105 pub fn as_any_mut(&mut self) -> &mut dyn Any {
107 self
108 }
109}
110
111#[derive(Debug)]
113pub struct BarBuilder {
114 bar_type: BarType,
115 price_precision: u8,
116 size_precision: u8,
117 initialized: bool,
118 ts_last: UnixNanos,
119 count: usize,
120 last_close: Option<Price>,
121 open: Option<Price>,
122 high: Option<Price>,
123 low: Option<Price>,
124 close: Option<Price>,
125 volume: Quantity,
126}
127
128impl BarBuilder {
129 #[must_use]
137 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
138 correctness::check_equal(
139 &bar_type.aggregation_source(),
140 &AggregationSource::Internal,
141 "bar_type.aggregation_source",
142 "AggregationSource::Internal",
143 )
144 .expect(FAILED);
145
146 Self {
147 bar_type,
148 price_precision,
149 size_precision,
150 initialized: false,
151 ts_last: UnixNanos::default(),
152 count: 0,
153 last_close: None,
154 open: None,
155 high: None,
156 low: None,
157 close: None,
158 volume: Quantity::zero(size_precision),
159 }
160 }
161
162 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
168 if ts_init < self.ts_last {
169 return; }
171
172 if self.open.is_none() {
173 self.open = Some(price);
174 self.high = Some(price);
175 self.low = Some(price);
176 self.initialized = true;
177 } else {
178 if price > self.high.unwrap() {
179 self.high = Some(price);
180 }
181 if price < self.low.unwrap() {
182 self.low = Some(price);
183 }
184 }
185
186 self.close = Some(price);
187 self.volume = self.volume.add(size);
188 self.count += 1;
189 self.ts_last = ts_init;
190 }
191
192 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
198 if ts_init < self.ts_last {
199 return; }
201
202 if self.open.is_none() {
203 self.open = Some(bar.open);
204 self.high = Some(bar.high);
205 self.low = Some(bar.low);
206 self.initialized = true;
207 } else {
208 if bar.high > self.high.unwrap() {
209 self.high = Some(bar.high);
210 }
211 if bar.low < self.low.unwrap() {
212 self.low = Some(bar.low);
213 }
214 }
215
216 self.close = Some(bar.close);
217 self.volume = self.volume.add(volume);
218 self.count += 1;
219 self.ts_last = ts_init;
220 }
221
222 pub fn reset(&mut self) {
226 self.open = None;
227 self.high = None;
228 self.low = None;
229 self.volume = Quantity::zero(self.size_precision);
230 self.count = 0;
231 }
232
233 pub fn build_now(&mut self) -> Bar {
235 self.build(self.ts_last, self.ts_last)
236 }
237
238 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
244 if self.open.is_none() {
245 self.open = self.last_close;
246 self.high = self.last_close;
247 self.low = self.last_close;
248 self.close = self.last_close;
249 }
250
251 if let (Some(close), Some(low)) = (self.close, self.low)
252 && close < low
253 {
254 self.low = Some(close);
255 }
256
257 if let (Some(close), Some(high)) = (self.close, self.high)
258 && close > high
259 {
260 self.high = Some(close);
261 }
262
263 let bar = Bar::new(
265 self.bar_type,
266 self.open.unwrap(),
267 self.high.unwrap(),
268 self.low.unwrap(),
269 self.close.unwrap(),
270 self.volume,
271 ts_event,
272 ts_init,
273 );
274
275 self.last_close = self.close;
276 self.reset();
277 bar
278 }
279}
280
281pub struct BarAggregatorCore {
283 bar_type: BarType,
284 builder: BarBuilder,
285 handler: BarHandler,
286 is_running: bool,
287}
288
289impl Debug for BarAggregatorCore {
290 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291 f.debug_struct(stringify!(BarAggregatorCore))
292 .field("bar_type", &self.bar_type)
293 .field("builder", &self.builder)
294 .field("is_running", &self.is_running)
295 .finish()
296 }
297}
298
299impl BarAggregatorCore {
300 pub fn new<H: FnMut(Bar) + 'static>(
308 bar_type: BarType,
309 price_precision: u8,
310 size_precision: u8,
311 handler: H,
312 ) -> Self {
313 Self {
314 bar_type,
315 builder: BarBuilder::new(bar_type, price_precision, size_precision),
316 handler: Box::new(handler),
317 is_running: false,
318 }
319 }
320
321 pub const fn set_is_running(&mut self, value: bool) {
323 self.is_running = value;
324 }
325 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
326 self.builder.update(price, size, ts_init);
327 }
328
329 fn build_now_and_send(&mut self) {
330 let bar = self.builder.build_now();
331 (self.handler)(bar);
332 }
333
334 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
335 let bar = self.builder.build(ts_event, ts_init);
336 (self.handler)(bar);
337 }
338}
339
340pub struct TickBarAggregator {
345 core: BarAggregatorCore,
346}
347
348impl Debug for TickBarAggregator {
349 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 f.debug_struct(stringify!(TickBarAggregator))
351 .field("core", &self.core)
352 .finish()
353 }
354}
355
356impl TickBarAggregator {
357 pub fn new<H: FnMut(Bar) + 'static>(
365 bar_type: BarType,
366 price_precision: u8,
367 size_precision: u8,
368 handler: H,
369 ) -> Self {
370 Self {
371 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
372 }
373 }
374}
375
376impl BarAggregator for TickBarAggregator {
377 fn bar_type(&self) -> BarType {
378 self.core.bar_type
379 }
380
381 fn is_running(&self) -> bool {
382 self.core.is_running
383 }
384
385 fn set_is_running(&mut self, value: bool) {
386 self.core.set_is_running(value);
387 }
388
389 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
391 self.core.apply_update(price, size, ts_init);
392 let spec = self.core.bar_type.spec();
393
394 if self.core.builder.count >= spec.step.get() {
395 self.core.build_now_and_send();
396 }
397 }
398
399 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
400 self.core.builder.update_bar(bar, volume, ts_init);
401 let spec = self.core.bar_type.spec();
402
403 if self.core.builder.count >= spec.step.get() {
404 self.core.build_now_and_send();
405 }
406 }
407}
408
409pub struct TickImbalanceBarAggregator {
414 core: BarAggregatorCore,
415 imbalance: isize,
416}
417
418impl Debug for TickImbalanceBarAggregator {
419 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420 f.debug_struct(stringify!(TickImbalanceBarAggregator))
421 .field("core", &self.core)
422 .field("imbalance", &self.imbalance)
423 .finish()
424 }
425}
426
427impl TickImbalanceBarAggregator {
428 pub fn new<H: FnMut(Bar) + 'static>(
436 bar_type: BarType,
437 price_precision: u8,
438 size_precision: u8,
439 handler: H,
440 ) -> Self {
441 Self {
442 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
443 imbalance: 0,
444 }
445 }
446}
447
448impl BarAggregator for TickImbalanceBarAggregator {
449 fn bar_type(&self) -> BarType {
450 self.core.bar_type
451 }
452
453 fn is_running(&self) -> bool {
454 self.core.is_running
455 }
456
457 fn set_is_running(&mut self, value: bool) {
458 self.core.set_is_running(value);
459 }
460
461 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
466 self.core.apply_update(price, size, ts_init);
467 }
468
469 fn handle_trade(&mut self, trade: TradeTick) {
470 self.core
471 .apply_update(trade.price, trade.size, trade.ts_init);
472
473 let delta = match trade.aggressor_side {
474 AggressorSide::Buyer => 1,
475 AggressorSide::Seller => -1,
476 AggressorSide::NoAggressor => 0,
477 };
478
479 if delta == 0 {
480 return;
481 }
482
483 self.imbalance += delta;
484 let threshold = self.core.bar_type.spec().step.get();
485 if self.imbalance.unsigned_abs() >= threshold {
486 self.core.build_now_and_send();
487 self.imbalance = 0;
488 }
489 }
490
491 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
492 self.core.builder.update_bar(bar, volume, ts_init);
493 }
494}
495
496pub struct TickRunsBarAggregator {
498 core: BarAggregatorCore,
499 current_run_side: Option<AggressorSide>,
500 run_count: usize,
501}
502
503impl Debug for TickRunsBarAggregator {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 f.debug_struct(stringify!(TickRunsBarAggregator))
506 .field("core", &self.core)
507 .field("current_run_side", &self.current_run_side)
508 .field("run_count", &self.run_count)
509 .finish()
510 }
511}
512
513impl TickRunsBarAggregator {
514 pub fn new<H: FnMut(Bar) + 'static>(
522 bar_type: BarType,
523 price_precision: u8,
524 size_precision: u8,
525 handler: H,
526 ) -> Self {
527 Self {
528 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
529 current_run_side: None,
530 run_count: 0,
531 }
532 }
533}
534
535impl BarAggregator for TickRunsBarAggregator {
536 fn bar_type(&self) -> BarType {
537 self.core.bar_type
538 }
539
540 fn is_running(&self) -> bool {
541 self.core.is_running
542 }
543
544 fn set_is_running(&mut self, value: bool) {
545 self.core.set_is_running(value);
546 }
547
548 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
553 self.core.apply_update(price, size, ts_init);
554 }
555
556 fn handle_trade(&mut self, trade: TradeTick) {
557 let side = match trade.aggressor_side {
558 AggressorSide::Buyer => Some(AggressorSide::Buyer),
559 AggressorSide::Seller => Some(AggressorSide::Seller),
560 AggressorSide::NoAggressor => None,
561 };
562
563 if let Some(side) = side {
564 if self.current_run_side != Some(side) {
565 self.current_run_side = Some(side);
566 self.run_count = 0;
567 self.core.builder.reset();
568 }
569
570 self.core
571 .apply_update(trade.price, trade.size, trade.ts_init);
572 self.run_count += 1;
573
574 let threshold = self.core.bar_type.spec().step.get();
575 if self.run_count >= threshold {
576 self.core.build_now_and_send();
577 self.run_count = 0;
578 self.current_run_side = None;
579 }
580 } else {
581 self.core
582 .apply_update(trade.price, trade.size, trade.ts_init);
583 }
584 }
585
586 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
587 self.core.builder.update_bar(bar, volume, ts_init);
588 }
589}
590
591pub struct VolumeBarAggregator {
593 core: BarAggregatorCore,
594}
595
596impl Debug for VolumeBarAggregator {
597 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
598 f.debug_struct(stringify!(VolumeBarAggregator))
599 .field("core", &self.core)
600 .finish()
601 }
602}
603
604impl VolumeBarAggregator {
605 pub fn new<H: FnMut(Bar) + 'static>(
613 bar_type: BarType,
614 price_precision: u8,
615 size_precision: u8,
616 handler: H,
617 ) -> Self {
618 Self {
619 core: BarAggregatorCore::new(
620 bar_type.standard(),
621 price_precision,
622 size_precision,
623 handler,
624 ),
625 }
626 }
627}
628
629impl BarAggregator for VolumeBarAggregator {
630 fn bar_type(&self) -> BarType {
631 self.core.bar_type
632 }
633
634 fn is_running(&self) -> bool {
635 self.core.is_running
636 }
637
638 fn set_is_running(&mut self, value: bool) {
639 self.core.set_is_running(value);
640 }
641
642 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
644 let mut raw_size_update = size.raw;
645 let spec = self.core.bar_type.spec();
646 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
647
648 while raw_size_update > 0 {
649 if self.core.builder.volume.raw + raw_size_update < raw_step {
650 self.core.apply_update(
651 price,
652 Quantity::from_raw(raw_size_update, size.precision),
653 ts_init,
654 );
655 break;
656 }
657
658 let raw_size_diff = raw_step - self.core.builder.volume.raw;
659 self.core.apply_update(
660 price,
661 Quantity::from_raw(raw_size_diff, size.precision),
662 ts_init,
663 );
664
665 self.core.build_now_and_send();
666 raw_size_update -= raw_size_diff;
667 }
668 }
669
670 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
671 let mut raw_volume_update = volume.raw;
672 let spec = self.core.bar_type.spec();
673 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
674
675 while raw_volume_update > 0 {
676 if self.core.builder.volume.raw + raw_volume_update < raw_step {
677 self.core.builder.update_bar(
678 bar,
679 Quantity::from_raw(raw_volume_update, volume.precision),
680 ts_init,
681 );
682 break;
683 }
684
685 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
686 self.core.builder.update_bar(
687 bar,
688 Quantity::from_raw(raw_volume_diff, volume.precision),
689 ts_init,
690 );
691
692 self.core.build_now_and_send();
693 raw_volume_update -= raw_volume_diff;
694 }
695 }
696}
697
698pub struct VolumeImbalanceBarAggregator {
700 core: BarAggregatorCore,
701 imbalance_raw: i128,
702 raw_step: i128,
703}
704
705impl Debug for VolumeImbalanceBarAggregator {
706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707 f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
708 .field("core", &self.core)
709 .field("imbalance_raw", &self.imbalance_raw)
710 .field("raw_step", &self.raw_step)
711 .finish()
712 }
713}
714
715impl VolumeImbalanceBarAggregator {
716 pub fn new<H: FnMut(Bar) + 'static>(
724 bar_type: BarType,
725 price_precision: u8,
726 size_precision: u8,
727 handler: H,
728 ) -> Self {
729 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
730 Self {
731 core: BarAggregatorCore::new(
732 bar_type.standard(),
733 price_precision,
734 size_precision,
735 handler,
736 ),
737 imbalance_raw: 0,
738 raw_step,
739 }
740 }
741}
742
743impl BarAggregator for VolumeImbalanceBarAggregator {
744 fn bar_type(&self) -> BarType {
745 self.core.bar_type
746 }
747
748 fn is_running(&self) -> bool {
749 self.core.is_running
750 }
751
752 fn set_is_running(&mut self, value: bool) {
753 self.core.set_is_running(value);
754 }
755
756 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
761 self.core.apply_update(price, size, ts_init);
762 }
763
764 fn handle_trade(&mut self, trade: TradeTick) {
765 let side = match trade.aggressor_side {
766 AggressorSide::Buyer => 1,
767 AggressorSide::Seller => -1,
768 AggressorSide::NoAggressor => {
769 self.core
770 .apply_update(trade.price, trade.size, trade.ts_init);
771 return;
772 }
773 };
774
775 let mut raw_remaining = trade.size.raw as i128;
776 while raw_remaining > 0 {
777 let imbalance_abs = self.imbalance_raw.abs();
778 let needed = (self.raw_step - imbalance_abs).max(1);
779 let raw_chunk = raw_remaining.min(needed);
780 let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
781
782 self.core
783 .apply_update(trade.price, qty_chunk, trade.ts_init);
784
785 self.imbalance_raw += side * raw_chunk;
786 raw_remaining -= raw_chunk;
787
788 if self.imbalance_raw.abs() >= self.raw_step {
789 self.core.build_now_and_send();
790 self.imbalance_raw = 0;
791 }
792 }
793 }
794
795 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
796 self.core.builder.update_bar(bar, volume, ts_init);
797 }
798}
799
800pub struct VolumeRunsBarAggregator {
802 core: BarAggregatorCore,
803 current_run_side: Option<AggressorSide>,
804 run_volume_raw: QuantityRaw,
805 raw_step: QuantityRaw,
806}
807
808impl Debug for VolumeRunsBarAggregator {
809 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810 f.debug_struct(stringify!(VolumeRunsBarAggregator))
811 .field("core", &self.core)
812 .field("current_run_side", &self.current_run_side)
813 .field("run_volume_raw", &self.run_volume_raw)
814 .field("raw_step", &self.raw_step)
815 .finish()
816 }
817}
818
819impl VolumeRunsBarAggregator {
820 pub fn new<H: FnMut(Bar) + 'static>(
828 bar_type: BarType,
829 price_precision: u8,
830 size_precision: u8,
831 handler: H,
832 ) -> Self {
833 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
834 Self {
835 core: BarAggregatorCore::new(
836 bar_type.standard(),
837 price_precision,
838 size_precision,
839 handler,
840 ),
841 current_run_side: None,
842 run_volume_raw: 0,
843 raw_step,
844 }
845 }
846}
847
848impl BarAggregator for VolumeRunsBarAggregator {
849 fn bar_type(&self) -> BarType {
850 self.core.bar_type
851 }
852
853 fn is_running(&self) -> bool {
854 self.core.is_running
855 }
856
857 fn set_is_running(&mut self, value: bool) {
858 self.core.set_is_running(value);
859 }
860
861 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
866 self.core.apply_update(price, size, ts_init);
867 }
868
869 fn handle_trade(&mut self, trade: TradeTick) {
870 let side = match trade.aggressor_side {
871 AggressorSide::Buyer => Some(AggressorSide::Buyer),
872 AggressorSide::Seller => Some(AggressorSide::Seller),
873 AggressorSide::NoAggressor => None,
874 };
875
876 let Some(side) = side else {
877 self.core
878 .apply_update(trade.price, trade.size, trade.ts_init);
879 return;
880 };
881
882 if self.current_run_side != Some(side) {
883 self.current_run_side = Some(side);
884 self.run_volume_raw = 0;
885 self.core.builder.reset();
886 }
887
888 let mut raw_remaining = trade.size.raw;
889 while raw_remaining > 0 {
890 let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
891 let raw_chunk = raw_remaining.min(needed);
892
893 self.core.apply_update(
894 trade.price,
895 Quantity::from_raw(raw_chunk, trade.size.precision),
896 trade.ts_init,
897 );
898
899 self.run_volume_raw += raw_chunk;
900 raw_remaining -= raw_chunk;
901
902 if self.run_volume_raw >= self.raw_step {
903 self.core.build_now_and_send();
904 self.run_volume_raw = 0;
905 self.current_run_side = None;
906 }
907 }
908 }
909
910 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
911 self.core.builder.update_bar(bar, volume, ts_init);
912 }
913}
914
915pub struct ValueBarAggregator {
920 core: BarAggregatorCore,
921 cum_value: f64,
922}
923
924impl Debug for ValueBarAggregator {
925 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926 f.debug_struct(stringify!(ValueBarAggregator))
927 .field("core", &self.core)
928 .field("cum_value", &self.cum_value)
929 .finish()
930 }
931}
932
933impl ValueBarAggregator {
934 pub fn new<H: FnMut(Bar) + 'static>(
942 bar_type: BarType,
943 price_precision: u8,
944 size_precision: u8,
945 handler: H,
946 ) -> Self {
947 Self {
948 core: BarAggregatorCore::new(
949 bar_type.standard(),
950 price_precision,
951 size_precision,
952 handler,
953 ),
954 cum_value: 0.0,
955 }
956 }
957
958 #[must_use]
959 pub const fn get_cumulative_value(&self) -> f64 {
961 self.cum_value
962 }
963}
964
965impl BarAggregator for ValueBarAggregator {
966 fn bar_type(&self) -> BarType {
967 self.core.bar_type
968 }
969
970 fn is_running(&self) -> bool {
971 self.core.is_running
972 }
973
974 fn set_is_running(&mut self, value: bool) {
975 self.core.set_is_running(value);
976 }
977
978 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
980 let mut size_update = size.as_f64();
981 let spec = self.core.bar_type.spec();
982
983 while size_update > 0.0 {
984 let value_update = price.as_f64() * size_update;
985 if value_update == 0.0 {
986 self.core
988 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
989 break;
990 }
991
992 if self.cum_value + value_update < spec.step.get() as f64 {
993 self.cum_value += value_update;
994 self.core
995 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
996 break;
997 }
998
999 let value_diff = spec.step.get() as f64 - self.cum_value;
1000 let size_diff = size_update * (value_diff / value_update);
1001 self.core
1002 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1003
1004 self.core.build_now_and_send();
1005 self.cum_value = 0.0;
1006 size_update -= size_diff;
1007 }
1008 }
1009
1010 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1011 let mut volume_update = volume;
1012 let average_price = Price::new(
1013 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1014 self.core.builder.price_precision,
1015 );
1016
1017 while volume_update.as_f64() > 0.0 {
1018 let value_update = average_price.as_f64() * volume_update.as_f64();
1019 if value_update == 0.0 {
1020 self.core.builder.update_bar(bar, volume_update, ts_init);
1022 break;
1023 }
1024
1025 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1026 self.cum_value += value_update;
1027 self.core.builder.update_bar(bar, volume_update, ts_init);
1028 break;
1029 }
1030
1031 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1032 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
1033 self.core.builder.update_bar(
1034 bar,
1035 Quantity::new(volume_diff, volume_update.precision),
1036 ts_init,
1037 );
1038
1039 self.core.build_now_and_send();
1040 self.cum_value = 0.0;
1041 volume_update = Quantity::new(
1042 volume_update.as_f64() - volume_diff,
1043 volume_update.precision,
1044 );
1045 }
1046 }
1047}
1048
1049pub struct ValueImbalanceBarAggregator {
1051 core: BarAggregatorCore,
1052 imbalance_value: f64,
1053 step_value: f64,
1054}
1055
1056impl Debug for ValueImbalanceBarAggregator {
1057 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058 f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1059 .field("core", &self.core)
1060 .field("imbalance_value", &self.imbalance_value)
1061 .field("step_value", &self.step_value)
1062 .finish()
1063 }
1064}
1065
1066impl ValueImbalanceBarAggregator {
1067 pub fn new<H: FnMut(Bar) + 'static>(
1075 bar_type: BarType,
1076 price_precision: u8,
1077 size_precision: u8,
1078 handler: H,
1079 ) -> Self {
1080 Self {
1081 core: BarAggregatorCore::new(
1082 bar_type.standard(),
1083 price_precision,
1084 size_precision,
1085 handler,
1086 ),
1087 imbalance_value: 0.0,
1088 step_value: bar_type.spec().step.get() as f64,
1089 }
1090 }
1091}
1092
1093impl BarAggregator for ValueImbalanceBarAggregator {
1094 fn bar_type(&self) -> BarType {
1095 self.core.bar_type
1096 }
1097
1098 fn is_running(&self) -> bool {
1099 self.core.is_running
1100 }
1101
1102 fn set_is_running(&mut self, value: bool) {
1103 self.core.set_is_running(value);
1104 }
1105
1106 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1111 self.core.apply_update(price, size, ts_init);
1112 }
1113
1114 fn handle_trade(&mut self, trade: TradeTick) {
1115 let price_f64 = trade.price.as_f64();
1116 if price_f64 == 0.0 {
1117 self.core
1118 .apply_update(trade.price, trade.size, trade.ts_init);
1119 return;
1120 }
1121
1122 let side_sign = match trade.aggressor_side {
1123 AggressorSide::Buyer => 1.0,
1124 AggressorSide::Seller => -1.0,
1125 AggressorSide::NoAggressor => {
1126 self.core
1127 .apply_update(trade.price, trade.size, trade.ts_init);
1128 return;
1129 }
1130 };
1131
1132 let mut size_remaining = trade.size.as_f64();
1133 while size_remaining > 0.0 {
1134 let value_remaining = price_f64 * size_remaining;
1135 let current_sign = self.imbalance_value.signum();
1136
1137 if current_sign == 0.0 || current_sign == side_sign {
1138 let needed = self.step_value - self.imbalance_value.abs();
1139 if value_remaining <= needed {
1140 self.imbalance_value += side_sign * value_remaining;
1141 self.core.apply_update(
1142 trade.price,
1143 Quantity::new(size_remaining, trade.size.precision),
1144 trade.ts_init,
1145 );
1146 break;
1147 }
1148
1149 let value_chunk = needed;
1150 let size_chunk = value_chunk / price_f64;
1151 self.core.apply_update(
1152 trade.price,
1153 Quantity::new(size_chunk, trade.size.precision),
1154 trade.ts_init,
1155 );
1156 self.imbalance_value += side_sign * value_chunk;
1157 size_remaining -= size_chunk;
1158
1159 if self.imbalance_value.abs() >= self.step_value {
1160 self.core.build_now_and_send();
1161 self.imbalance_value = 0.0;
1162 }
1163 } else {
1164 let value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1166 let size_chunk = value_to_flatten / price_f64;
1167 self.core.apply_update(
1168 trade.price,
1169 Quantity::new(size_chunk, trade.size.precision),
1170 trade.ts_init,
1171 );
1172 self.imbalance_value += side_sign * value_to_flatten;
1173 size_remaining -= size_chunk;
1174 }
1175 }
1176 }
1177
1178 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1179 self.core.builder.update_bar(bar, volume, ts_init);
1180 }
1181}
1182
1183pub struct ValueRunsBarAggregator {
1185 core: BarAggregatorCore,
1186 current_run_side: Option<AggressorSide>,
1187 run_value: f64,
1188 step_value: f64,
1189}
1190
1191impl Debug for ValueRunsBarAggregator {
1192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193 f.debug_struct(stringify!(ValueRunsBarAggregator))
1194 .field("core", &self.core)
1195 .field("current_run_side", &self.current_run_side)
1196 .field("run_value", &self.run_value)
1197 .field("step_value", &self.step_value)
1198 .finish()
1199 }
1200}
1201
1202impl ValueRunsBarAggregator {
1203 pub fn new<H: FnMut(Bar) + 'static>(
1211 bar_type: BarType,
1212 price_precision: u8,
1213 size_precision: u8,
1214 handler: H,
1215 ) -> Self {
1216 Self {
1217 core: BarAggregatorCore::new(
1218 bar_type.standard(),
1219 price_precision,
1220 size_precision,
1221 handler,
1222 ),
1223 current_run_side: None,
1224 run_value: 0.0,
1225 step_value: bar_type.spec().step.get() as f64,
1226 }
1227 }
1228}
1229
1230impl BarAggregator for ValueRunsBarAggregator {
1231 fn bar_type(&self) -> BarType {
1232 self.core.bar_type
1233 }
1234
1235 fn is_running(&self) -> bool {
1236 self.core.is_running
1237 }
1238
1239 fn set_is_running(&mut self, value: bool) {
1240 self.core.set_is_running(value);
1241 }
1242
1243 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1248 self.core.apply_update(price, size, ts_init);
1249 }
1250
1251 fn handle_trade(&mut self, trade: TradeTick) {
1252 let price_f64 = trade.price.as_f64();
1253 if price_f64 == 0.0 {
1254 self.core
1255 .apply_update(trade.price, trade.size, trade.ts_init);
1256 return;
1257 }
1258
1259 let side = match trade.aggressor_side {
1260 AggressorSide::Buyer => Some(AggressorSide::Buyer),
1261 AggressorSide::Seller => Some(AggressorSide::Seller),
1262 AggressorSide::NoAggressor => None,
1263 };
1264
1265 let Some(side) = side else {
1266 self.core
1267 .apply_update(trade.price, trade.size, trade.ts_init);
1268 return;
1269 };
1270
1271 if self.current_run_side != Some(side) {
1272 self.current_run_side = Some(side);
1273 self.run_value = 0.0;
1274 self.core.builder.reset();
1275 }
1276
1277 let mut size_remaining = trade.size.as_f64();
1278 while size_remaining > 0.0 {
1279 let value_update = price_f64 * size_remaining;
1280 if self.run_value + value_update < self.step_value {
1281 self.run_value += value_update;
1282 self.core.apply_update(
1283 trade.price,
1284 Quantity::new(size_remaining, trade.size.precision),
1285 trade.ts_init,
1286 );
1287 break;
1288 }
1289
1290 let value_needed = self.step_value - self.run_value;
1291 let size_chunk = value_needed / price_f64;
1292 self.core.apply_update(
1293 trade.price,
1294 Quantity::new(size_chunk, trade.size.precision),
1295 trade.ts_init,
1296 );
1297
1298 self.core.build_now_and_send();
1299 self.run_value = 0.0;
1300 self.current_run_side = None;
1301 size_remaining -= size_chunk;
1302 }
1303 }
1304
1305 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1306 self.core.builder.update_bar(bar, volume, ts_init);
1307 }
1308}
1309
1310pub struct RenkoBarAggregator {
1316 core: BarAggregatorCore,
1317 pub brick_size: PriceRaw,
1318 last_close: Option<Price>,
1319}
1320
1321impl Debug for RenkoBarAggregator {
1322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1323 f.debug_struct(stringify!(RenkoBarAggregator))
1324 .field("core", &self.core)
1325 .field("brick_size", &self.brick_size)
1326 .field("last_close", &self.last_close)
1327 .finish()
1328 }
1329}
1330
1331impl RenkoBarAggregator {
1332 pub fn new<H: FnMut(Bar) + 'static>(
1340 bar_type: BarType,
1341 price_precision: u8,
1342 size_precision: u8,
1343 price_increment: Price,
1344 handler: H,
1345 ) -> Self {
1346 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1348
1349 Self {
1350 core: BarAggregatorCore::new(
1351 bar_type.standard(),
1352 price_precision,
1353 size_precision,
1354 handler,
1355 ),
1356 brick_size,
1357 last_close: None,
1358 }
1359 }
1360}
1361
1362impl BarAggregator for RenkoBarAggregator {
1363 fn bar_type(&self) -> BarType {
1364 self.core.bar_type
1365 }
1366
1367 fn is_running(&self) -> bool {
1368 self.core.is_running
1369 }
1370
1371 fn set_is_running(&mut self, value: bool) {
1372 self.core.set_is_running(value);
1373 }
1374
1375 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1380 self.core.apply_update(price, size, ts_init);
1382
1383 if self.last_close.is_none() {
1385 self.last_close = Some(price);
1386 return;
1387 }
1388
1389 let last_close = self.last_close.unwrap();
1390
1391 let current_raw = price.raw;
1393 let last_close_raw = last_close.raw;
1394 let price_diff_raw = current_raw - last_close_raw;
1395 let abs_price_diff_raw = price_diff_raw.abs();
1396
1397 if abs_price_diff_raw >= self.brick_size {
1399 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1400 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1401 let mut current_close = last_close;
1402
1403 let total_volume = self.core.builder.volume;
1405
1406 for _i in 0..num_bricks {
1407 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1409 let brick_close = Price::from_raw(brick_close_raw, price.precision);
1410
1411 let (brick_high, brick_low) = if direction > 0.0 {
1413 (brick_close, current_close)
1414 } else {
1415 (current_close, brick_close)
1416 };
1417
1418 self.core.builder.reset();
1420 self.core.builder.open = Some(current_close);
1421 self.core.builder.high = Some(brick_high);
1422 self.core.builder.low = Some(brick_low);
1423 self.core.builder.close = Some(brick_close);
1424 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1426 self.core.builder.ts_last = ts_init;
1427 self.core.builder.initialized = true;
1428
1429 self.core.build_and_send(ts_init, ts_init);
1431
1432 current_close = brick_close;
1434 self.last_close = Some(brick_close);
1435 }
1436 }
1437 }
1438
1439 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1440 self.core.builder.update_bar(bar, volume, ts_init);
1442
1443 if self.last_close.is_none() {
1445 self.last_close = Some(bar.close);
1446 return;
1447 }
1448
1449 let last_close = self.last_close.unwrap();
1450
1451 let current_raw = bar.close.raw;
1453 let last_close_raw = last_close.raw;
1454 let price_diff_raw = current_raw - last_close_raw;
1455 let abs_price_diff_raw = price_diff_raw.abs();
1456
1457 if abs_price_diff_raw >= self.brick_size {
1459 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1460 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1461 let mut current_close = last_close;
1462
1463 let total_volume = self.core.builder.volume;
1465
1466 for _i in 0..num_bricks {
1467 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1469 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1470
1471 let (brick_high, brick_low) = if direction > 0.0 {
1473 (brick_close, current_close)
1474 } else {
1475 (current_close, brick_close)
1476 };
1477
1478 self.core.builder.reset();
1480 self.core.builder.open = Some(current_close);
1481 self.core.builder.high = Some(brick_high);
1482 self.core.builder.low = Some(brick_low);
1483 self.core.builder.close = Some(brick_close);
1484 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1486 self.core.builder.ts_last = ts_init;
1487 self.core.builder.initialized = true;
1488
1489 self.core.build_and_send(ts_init, ts_init);
1491
1492 current_close = brick_close;
1494 self.last_close = Some(brick_close);
1495 }
1496 }
1497 }
1498}
1499
1500pub struct TimeBarAggregator {
1504 core: BarAggregatorCore,
1505 clock: Rc<RefCell<dyn Clock>>,
1506 build_with_no_updates: bool,
1507 timestamp_on_close: bool,
1508 is_left_open: bool,
1509 stored_open_ns: UnixNanos,
1510 timer_name: String,
1511 interval_ns: UnixNanos,
1512 next_close_ns: UnixNanos,
1513 bar_build_delay: u64,
1514 time_bars_origin_offset: Option<TimeDelta>,
1515 skip_first_non_full_bar: bool,
1516 pub historical_mode: bool,
1517 historical_events: Vec<TimeEvent>,
1518 aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1519}
1520
1521impl Debug for TimeBarAggregator {
1522 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1523 f.debug_struct(stringify!(TimeBarAggregator))
1524 .field("core", &self.core)
1525 .field("build_with_no_updates", &self.build_with_no_updates)
1526 .field("timestamp_on_close", &self.timestamp_on_close)
1527 .field("is_left_open", &self.is_left_open)
1528 .field("timer_name", &self.timer_name)
1529 .field("interval_ns", &self.interval_ns)
1530 .field("bar_build_delay", &self.bar_build_delay)
1531 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1532 .finish()
1533 }
1534}
1535
1536impl TimeBarAggregator {
1537 #[allow(clippy::too_many_arguments)]
1545 pub fn new<H: FnMut(Bar) + 'static>(
1546 bar_type: BarType,
1547 price_precision: u8,
1548 size_precision: u8,
1549 clock: Rc<RefCell<dyn Clock>>,
1550 handler: H,
1551 build_with_no_updates: bool,
1552 timestamp_on_close: bool,
1553 interval_type: BarIntervalType,
1554 time_bars_origin_offset: Option<TimeDelta>,
1555 bar_build_delay: u64,
1556 skip_first_non_full_bar: bool,
1557 ) -> Self {
1558 let is_left_open = match interval_type {
1559 BarIntervalType::LeftOpen => true,
1560 BarIntervalType::RightOpen => false,
1561 };
1562
1563 let core = BarAggregatorCore::new(
1564 bar_type.standard(),
1565 price_precision,
1566 size_precision,
1567 handler,
1568 );
1569
1570 Self {
1571 core,
1572 clock,
1573 build_with_no_updates,
1574 timestamp_on_close,
1575 is_left_open,
1576 stored_open_ns: UnixNanos::default(),
1577 timer_name: bar_type.to_string(),
1578 interval_ns: get_bar_interval_ns(&bar_type),
1579 next_close_ns: UnixNanos::default(),
1580 bar_build_delay,
1581 time_bars_origin_offset,
1582 skip_first_non_full_bar,
1583 historical_mode: false,
1584 historical_events: Vec::new(),
1585 aggregator_weak: None,
1586 }
1587 }
1588
1589 pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1591 self.clock = clock;
1592 }
1593
1594 pub fn start_timer_internal(
1603 &mut self,
1604 aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1605 ) {
1606 let aggregator_weak = if let Some(rc) = aggregator_rc {
1608 let weak = Rc::downgrade(&rc);
1610 self.aggregator_weak = Some(weak.clone());
1611 weak
1612 } else {
1613 self.aggregator_weak
1615 .as_ref()
1616 .expect("Aggregator weak reference must be set before calling start_timer()")
1617 .clone()
1618 };
1619
1620 let callback = TimeEventCallback::Rust(Rc::new(move |event: TimeEvent| {
1621 if let Some(agg) = aggregator_weak.upgrade() {
1622 agg.borrow_mut().build_bar(event);
1623 }
1624 }));
1625
1626 let now = self.clock.borrow().utc_now();
1628 let mut start_time =
1629 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1630 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1631
1632 let fire_immediately = start_time == now;
1634
1635 self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1636
1637 let spec = &self.bar_type().spec();
1638 let start_time_ns = UnixNanos::from(start_time);
1639
1640 if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1641 self.clock
1642 .borrow_mut()
1643 .set_timer_ns(
1644 &self.timer_name,
1645 self.interval_ns.as_u64(),
1646 Some(start_time_ns),
1647 None,
1648 Some(callback),
1649 Some(true), Some(fire_immediately),
1651 )
1652 .expect(FAILED);
1653
1654 if fire_immediately {
1655 self.next_close_ns = start_time_ns;
1656 } else {
1657 let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1658 self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1659 }
1660
1661 self.stored_open_ns = self.next_close_ns - self.interval_ns;
1662 } else {
1663 let alert_time = if fire_immediately {
1665 start_time
1666 } else {
1667 let step = spec.step.get() as u32;
1668 if spec.aggregation == BarAggregation::Month {
1669 add_n_months(start_time, step).expect(FAILED)
1670 } else {
1671 add_n_years(start_time, step).expect(FAILED)
1673 }
1674 };
1675
1676 self.clock
1677 .borrow_mut()
1678 .set_time_alert_ns(
1679 &self.timer_name,
1680 UnixNanos::from(alert_time),
1681 Some(callback),
1682 Some(true), )
1684 .expect(FAILED);
1685
1686 self.next_close_ns = UnixNanos::from(alert_time);
1687 self.stored_open_ns = UnixNanos::from(start_time);
1688 }
1689
1690 log::debug!(
1691 "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1692 self.timer_name,
1693 start_time,
1694 self.historical_mode,
1695 fire_immediately,
1696 now,
1697 self.bar_build_delay
1698 );
1699 }
1700
1701 pub fn stop(&mut self) {
1703 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1704 }
1705
1706 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1707 if self.skip_first_non_full_bar {
1708 self.core.builder.reset();
1709 self.skip_first_non_full_bar = false;
1710 } else {
1711 self.core.build_and_send(ts_event, ts_init);
1712 }
1713 }
1714
1715 fn build_bar(&mut self, event: TimeEvent) {
1716 if !self.core.builder.initialized {
1717 return;
1718 }
1719
1720 if !self.build_with_no_updates && self.core.builder.count == 0 {
1721 return; }
1723
1724 let ts_init = event.ts_event;
1725 let ts_event = if self.is_left_open {
1726 if self.timestamp_on_close {
1727 event.ts_event
1728 } else {
1729 self.stored_open_ns
1730 }
1731 } else {
1732 self.stored_open_ns
1733 };
1734
1735 self.build_and_send(ts_event, ts_init);
1736
1737 self.stored_open_ns = event.ts_event;
1739
1740 if self.bar_type().spec().aggregation == BarAggregation::Month {
1741 let step = self.bar_type().spec().step.get() as u32;
1742 let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1743
1744 self.clock
1745 .borrow_mut()
1746 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1747 .expect(FAILED);
1748
1749 self.next_close_ns = alert_time_ns;
1750 } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1751 let step = self.bar_type().spec().step.get() as u32;
1752 let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1753
1754 self.clock
1755 .borrow_mut()
1756 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1757 .expect(FAILED);
1758
1759 self.next_close_ns = alert_time_ns;
1760 } else {
1761 self.next_close_ns = self
1763 .clock
1764 .borrow()
1765 .next_time_ns(&self.timer_name)
1766 .unwrap_or_default();
1767 }
1768 }
1769
1770 fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1771 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1772 {
1774 let mut clock_borrow = self.clock.borrow_mut();
1775 let test_clock = clock_borrow
1776 .as_any_mut()
1777 .downcast_mut::<TestClock>()
1778 .expect("Expected TestClock in historical mode");
1779 test_clock.set_time(ts_init);
1780 }
1781 self.start_timer_internal(None);
1783 }
1784
1785 {
1787 let mut clock_borrow = self.clock.borrow_mut();
1788 let test_clock = clock_borrow
1789 .as_any_mut()
1790 .downcast_mut::<TestClock>()
1791 .expect("Expected TestClock in historical mode");
1792 self.historical_events = test_clock.advance_time(ts_init, true);
1793 }
1794 }
1795
1796 fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1797 let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1800 for event in events {
1801 self.build_bar(event);
1802 }
1803 }
1804
1805 pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1807 self.historical_events = events;
1808 }
1809}
1810
1811impl BarAggregator for TimeBarAggregator {
1812 fn bar_type(&self) -> BarType {
1813 self.core.bar_type
1814 }
1815
1816 fn is_running(&self) -> bool {
1817 self.core.is_running
1818 }
1819
1820 fn set_is_running(&mut self, value: bool) {
1821 self.core.set_is_running(value);
1822 }
1823
1824 fn stop(&mut self) {
1826 Self::stop(self);
1827 }
1828
1829 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1830 if self.historical_mode {
1831 self.preprocess_historical_events(ts_init);
1832 }
1833
1834 self.core.apply_update(price, size, ts_init);
1835
1836 if self.historical_mode {
1837 self.postprocess_historical_events(ts_init);
1838 }
1839 }
1840
1841 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1842 if self.historical_mode {
1843 self.preprocess_historical_events(ts_init);
1844 }
1845
1846 self.core.builder.update_bar(bar, volume, ts_init);
1847
1848 if self.historical_mode {
1849 self.postprocess_historical_events(ts_init);
1850 }
1851 }
1852
1853 fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1854 self.historical_mode = historical_mode;
1855 self.core.handler = handler;
1856 }
1857
1858 fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1859 self.set_historical_events_internal(events);
1860 }
1861
1862 fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1863 self.set_clock_internal(clock);
1864 }
1865
1866 fn build_bar(&mut self, event: TimeEvent) {
1867 {
1870 #[allow(clippy::use_self)]
1871 TimeBarAggregator::build_bar(self, event);
1872 }
1873 }
1874
1875 fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1876 self.aggregator_weak = Some(weak);
1877 }
1878
1879 fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1880 self.start_timer_internal(aggregator_rc);
1881 }
1882}
1883
1884#[cfg(test)]
1885mod tests {
1886 use std::sync::{Arc, Mutex};
1887
1888 use nautilus_common::clock::TestClock;
1889 use nautilus_core::{MUTEX_POISONED, UUID4};
1890 use nautilus_model::{
1891 data::{BarSpecification, BarType},
1892 enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1893 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1894 types::{Price, Quantity},
1895 };
1896 use rstest::rstest;
1897 use ustr::Ustr;
1898
1899 use super::*;
1900
1901 #[rstest]
1902 fn test_bar_builder_initialization(equity_aapl: Equity) {
1903 let instrument = InstrumentAny::Equity(equity_aapl);
1904 let bar_type = BarType::new(
1905 instrument.id(),
1906 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1907 AggregationSource::Internal,
1908 );
1909 let builder = BarBuilder::new(
1910 bar_type,
1911 instrument.price_precision(),
1912 instrument.size_precision(),
1913 );
1914
1915 assert!(!builder.initialized);
1916 assert_eq!(builder.ts_last, 0);
1917 assert_eq!(builder.count, 0);
1918 }
1919
1920 #[rstest]
1921 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1922 let instrument = InstrumentAny::Equity(equity_aapl);
1923 let bar_type = BarType::new(
1924 instrument.id(),
1925 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1926 AggregationSource::Internal,
1927 );
1928 let mut builder = BarBuilder::new(
1929 bar_type,
1930 instrument.price_precision(),
1931 instrument.size_precision(),
1932 );
1933
1934 builder.update(
1935 Price::from("100.00"),
1936 Quantity::from(1),
1937 UnixNanos::from(1000),
1938 );
1939 builder.update(
1940 Price::from("95.00"),
1941 Quantity::from(1),
1942 UnixNanos::from(2000),
1943 );
1944 builder.update(
1945 Price::from("105.00"),
1946 Quantity::from(1),
1947 UnixNanos::from(3000),
1948 );
1949
1950 let bar = builder.build_now();
1951 assert!(bar.high > bar.low);
1952 assert_eq!(bar.open, Price::from("100.00"));
1953 assert_eq!(bar.high, Price::from("105.00"));
1954 assert_eq!(bar.low, Price::from("95.00"));
1955 assert_eq!(bar.close, Price::from("105.00"));
1956 }
1957
1958 #[rstest]
1959 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1960 let instrument = InstrumentAny::Equity(equity_aapl);
1961 let bar_type = BarType::new(
1962 instrument.id(),
1963 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1964 AggregationSource::Internal,
1965 );
1966 let mut builder = BarBuilder::new(
1967 bar_type,
1968 instrument.price_precision(),
1969 instrument.size_precision(),
1970 );
1971
1972 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1973 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1974
1975 assert_eq!(builder.ts_last, 1_000);
1976 assert_eq!(builder.count, 1);
1977 }
1978
1979 #[rstest]
1980 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1981 let instrument = InstrumentAny::Equity(equity_aapl);
1982 let bar_type = BarType::new(
1983 instrument.id(),
1984 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1985 AggregationSource::Internal,
1986 );
1987 let mut builder = BarBuilder::new(
1988 bar_type,
1989 instrument.price_precision(),
1990 instrument.size_precision(),
1991 );
1992
1993 builder.update(
1994 Price::from("1.00000"),
1995 Quantity::from(1),
1996 UnixNanos::default(),
1997 );
1998
1999 assert!(builder.initialized);
2000 assert_eq!(builder.ts_last, 0);
2001 assert_eq!(builder.count, 1);
2002 }
2003
2004 #[rstest]
2005 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2006 equity_aapl: Equity,
2007 ) {
2008 let instrument = InstrumentAny::Equity(equity_aapl);
2009 let bar_type = BarType::new(
2010 instrument.id(),
2011 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2012 AggregationSource::Internal,
2013 );
2014 let mut builder = BarBuilder::new(bar_type, 2, 0);
2015
2016 builder.update(
2017 Price::from("1.00000"),
2018 Quantity::from(1),
2019 UnixNanos::from(1_000),
2020 );
2021 builder.update(
2022 Price::from("1.00001"),
2023 Quantity::from(1),
2024 UnixNanos::from(500),
2025 );
2026
2027 assert!(builder.initialized);
2028 assert_eq!(builder.ts_last, 1_000);
2029 assert_eq!(builder.count, 1);
2030 }
2031
2032 #[rstest]
2033 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2034 let instrument = InstrumentAny::Equity(equity_aapl);
2035 let bar_type = BarType::new(
2036 instrument.id(),
2037 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2038 AggregationSource::Internal,
2039 );
2040 let mut builder = BarBuilder::new(
2041 bar_type,
2042 instrument.price_precision(),
2043 instrument.size_precision(),
2044 );
2045
2046 for _ in 0..5 {
2047 builder.update(
2048 Price::from("1.00000"),
2049 Quantity::from(1),
2050 UnixNanos::from(1_000),
2051 );
2052 }
2053
2054 assert_eq!(builder.count, 5);
2055 }
2056
2057 #[rstest]
2058 #[should_panic]
2059 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2060 let instrument = InstrumentAny::Equity(equity_aapl);
2061 let bar_type = BarType::new(
2062 instrument.id(),
2063 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2064 AggregationSource::Internal,
2065 );
2066 let mut builder = BarBuilder::new(
2067 bar_type,
2068 instrument.price_precision(),
2069 instrument.size_precision(),
2070 );
2071 let _ = builder.build_now();
2072 }
2073
2074 #[rstest]
2075 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2076 let instrument = InstrumentAny::Equity(equity_aapl);
2077 let bar_type = BarType::new(
2078 instrument.id(),
2079 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2080 AggregationSource::Internal,
2081 );
2082 let mut builder = BarBuilder::new(
2083 bar_type,
2084 instrument.price_precision(),
2085 instrument.size_precision(),
2086 );
2087
2088 builder.update(
2089 Price::from("1.00001"),
2090 Quantity::from(2),
2091 UnixNanos::default(),
2092 );
2093 builder.update(
2094 Price::from("1.00002"),
2095 Quantity::from(2),
2096 UnixNanos::default(),
2097 );
2098 builder.update(
2099 Price::from("1.00000"),
2100 Quantity::from(1),
2101 UnixNanos::from(1_000_000_000),
2102 );
2103
2104 let bar = builder.build_now();
2105
2106 assert_eq!(bar.open, Price::from("1.00001"));
2107 assert_eq!(bar.high, Price::from("1.00002"));
2108 assert_eq!(bar.low, Price::from("1.00000"));
2109 assert_eq!(bar.close, Price::from("1.00000"));
2110 assert_eq!(bar.volume, Quantity::from(5));
2111 assert_eq!(bar.ts_init, 1_000_000_000);
2112 assert_eq!(builder.ts_last, 1_000_000_000);
2113 assert_eq!(builder.count, 0);
2114 }
2115
2116 #[rstest]
2117 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2118 let instrument = InstrumentAny::Equity(equity_aapl);
2119 let bar_type = BarType::new(
2120 instrument.id(),
2121 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2122 AggregationSource::Internal,
2123 );
2124 let mut builder = BarBuilder::new(bar_type, 2, 0);
2125
2126 builder.update(
2127 Price::from("1.00001"),
2128 Quantity::from(1),
2129 UnixNanos::default(),
2130 );
2131 builder.build_now();
2132
2133 builder.update(
2134 Price::from("1.00000"),
2135 Quantity::from(1),
2136 UnixNanos::default(),
2137 );
2138 builder.update(
2139 Price::from("1.00003"),
2140 Quantity::from(1),
2141 UnixNanos::default(),
2142 );
2143 builder.update(
2144 Price::from("1.00002"),
2145 Quantity::from(1),
2146 UnixNanos::default(),
2147 );
2148
2149 let bar = builder.build_now();
2150
2151 assert_eq!(bar.open, Price::from("1.00000"));
2152 assert_eq!(bar.high, Price::from("1.00003"));
2153 assert_eq!(bar.low, Price::from("1.00000"));
2154 assert_eq!(bar.close, Price::from("1.00002"));
2155 assert_eq!(bar.volume, Quantity::from(3));
2156 }
2157
2158 #[rstest]
2159 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2160 let instrument = InstrumentAny::Equity(equity_aapl);
2161 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2162 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2163 let handler = Arc::new(Mutex::new(Vec::new()));
2164 let handler_clone = Arc::clone(&handler);
2165
2166 let mut aggregator = TickBarAggregator::new(
2167 bar_type,
2168 instrument.price_precision(),
2169 instrument.size_precision(),
2170 move |bar: Bar| {
2171 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2172 handler_guard.push(bar);
2173 },
2174 );
2175
2176 let trade = TradeTick::default();
2177 aggregator.handle_trade(trade);
2178
2179 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2180 assert_eq!(handler_guard.len(), 0);
2181 }
2182
2183 #[rstest]
2184 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2185 let instrument = InstrumentAny::Equity(equity_aapl);
2186 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2187 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2188 let handler = Arc::new(Mutex::new(Vec::new()));
2189 let handler_clone = Arc::clone(&handler);
2190
2191 let mut aggregator = TickBarAggregator::new(
2192 bar_type,
2193 instrument.price_precision(),
2194 instrument.size_precision(),
2195 move |bar: Bar| {
2196 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2197 handler_guard.push(bar);
2198 },
2199 );
2200
2201 let trade = TradeTick::default();
2202 aggregator.handle_trade(trade);
2203 aggregator.handle_trade(trade);
2204 aggregator.handle_trade(trade);
2205
2206 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2207 let bar = handler_guard.first().unwrap();
2208 assert_eq!(handler_guard.len(), 1);
2209 assert_eq!(bar.open, trade.price);
2210 assert_eq!(bar.high, trade.price);
2211 assert_eq!(bar.low, trade.price);
2212 assert_eq!(bar.close, trade.price);
2213 assert_eq!(bar.volume, Quantity::from(300000));
2214 assert_eq!(bar.ts_event, trade.ts_event);
2215 assert_eq!(bar.ts_init, trade.ts_init);
2216 }
2217
2218 #[rstest]
2219 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2220 let instrument = InstrumentAny::Equity(equity_aapl);
2221 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2222 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2223 let handler = Arc::new(Mutex::new(Vec::new()));
2224 let handler_clone = Arc::clone(&handler);
2225
2226 let mut aggregator = TickBarAggregator::new(
2227 bar_type,
2228 instrument.price_precision(),
2229 instrument.size_precision(),
2230 move |bar: Bar| {
2231 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2232 handler_guard.push(bar);
2233 },
2234 );
2235
2236 aggregator.update(
2237 Price::from("1.00001"),
2238 Quantity::from(1),
2239 UnixNanos::default(),
2240 );
2241 aggregator.update(
2242 Price::from("1.00002"),
2243 Quantity::from(1),
2244 UnixNanos::from(1000),
2245 );
2246 aggregator.update(
2247 Price::from("1.00003"),
2248 Quantity::from(1),
2249 UnixNanos::from(2000),
2250 );
2251
2252 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2253 assert_eq!(handler_guard.len(), 1);
2254
2255 let bar = handler_guard.first().unwrap();
2256 assert_eq!(bar.open, Price::from("1.00001"));
2257 assert_eq!(bar.high, Price::from("1.00003"));
2258 assert_eq!(bar.low, Price::from("1.00001"));
2259 assert_eq!(bar.close, Price::from("1.00003"));
2260 assert_eq!(bar.volume, Quantity::from(3));
2261 }
2262
2263 #[rstest]
2264 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2265 let instrument = InstrumentAny::Equity(equity_aapl);
2266 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2267 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2268 let handler = Arc::new(Mutex::new(Vec::new()));
2269 let handler_clone = Arc::clone(&handler);
2270
2271 let mut aggregator = TickBarAggregator::new(
2272 bar_type,
2273 instrument.price_precision(),
2274 instrument.size_precision(),
2275 move |bar: Bar| {
2276 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2277 handler_guard.push(bar);
2278 },
2279 );
2280
2281 aggregator.update(
2282 Price::from("1.00001"),
2283 Quantity::from(1),
2284 UnixNanos::default(),
2285 );
2286 aggregator.update(
2287 Price::from("1.00002"),
2288 Quantity::from(1),
2289 UnixNanos::from(1000),
2290 );
2291 aggregator.update(
2292 Price::from("1.00003"),
2293 Quantity::from(1),
2294 UnixNanos::from(2000),
2295 );
2296 aggregator.update(
2297 Price::from("1.00004"),
2298 Quantity::from(1),
2299 UnixNanos::from(3000),
2300 );
2301
2302 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2303 assert_eq!(handler_guard.len(), 2);
2304
2305 let bar1 = &handler_guard[0];
2306 assert_eq!(bar1.open, Price::from("1.00001"));
2307 assert_eq!(bar1.close, Price::from("1.00002"));
2308 assert_eq!(bar1.volume, Quantity::from(2));
2309
2310 let bar2 = &handler_guard[1];
2311 assert_eq!(bar2.open, Price::from("1.00003"));
2312 assert_eq!(bar2.close, Price::from("1.00004"));
2313 assert_eq!(bar2.volume, Quantity::from(2));
2314 }
2315
2316 #[rstest]
2317 fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2318 let instrument = InstrumentAny::Equity(equity_aapl);
2319 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2320 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2321 let handler = Arc::new(Mutex::new(Vec::new()));
2322 let handler_clone = Arc::clone(&handler);
2323
2324 let mut aggregator = TickImbalanceBarAggregator::new(
2325 bar_type,
2326 instrument.price_precision(),
2327 instrument.size_precision(),
2328 move |bar: Bar| {
2329 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2330 handler_guard.push(bar);
2331 },
2332 );
2333
2334 let trade = TradeTick::default();
2335 aggregator.handle_trade(trade);
2336 aggregator.handle_trade(trade);
2337
2338 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2339 assert_eq!(handler_guard.len(), 1);
2340 let bar = handler_guard.first().unwrap();
2341 assert_eq!(bar.volume, Quantity::from(200000));
2342 }
2343
2344 #[rstest]
2345 fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2346 let instrument = InstrumentAny::Equity(equity_aapl);
2347 let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2348 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2349 let handler = Arc::new(Mutex::new(Vec::new()));
2350 let handler_clone = Arc::clone(&handler);
2351
2352 let mut aggregator = TickImbalanceBarAggregator::new(
2353 bar_type,
2354 instrument.price_precision(),
2355 instrument.size_precision(),
2356 move |bar: Bar| {
2357 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2358 handler_guard.push(bar);
2359 },
2360 );
2361
2362 let sell = TradeTick {
2363 aggressor_side: AggressorSide::Seller,
2364 ..TradeTick::default()
2365 };
2366
2367 aggregator.handle_trade(sell);
2368
2369 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2370 assert_eq!(handler_guard.len(), 1);
2371 }
2372
2373 #[rstest]
2374 fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2375 let instrument = InstrumentAny::Equity(equity_aapl);
2376 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2377 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2378 let handler = Arc::new(Mutex::new(Vec::new()));
2379 let handler_clone = Arc::clone(&handler);
2380
2381 let mut aggregator = TickRunsBarAggregator::new(
2382 bar_type,
2383 instrument.price_precision(),
2384 instrument.size_precision(),
2385 move |bar: Bar| {
2386 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2387 handler_guard.push(bar);
2388 },
2389 );
2390
2391 let buy = TradeTick::default();
2392 let sell = TradeTick {
2393 aggressor_side: AggressorSide::Seller,
2394 ..buy
2395 };
2396
2397 aggregator.handle_trade(buy);
2398 aggregator.handle_trade(buy);
2399 aggregator.handle_trade(sell);
2400 aggregator.handle_trade(sell);
2401
2402 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2403 assert_eq!(handler_guard.len(), 2);
2404 }
2405
2406 #[rstest]
2407 fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2408 let instrument = InstrumentAny::Equity(equity_aapl);
2409 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2410 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2411 let handler = Arc::new(Mutex::new(Vec::new()));
2412 let handler_clone = Arc::clone(&handler);
2413
2414 let mut aggregator = TickRunsBarAggregator::new(
2415 bar_type,
2416 instrument.price_precision(),
2417 instrument.size_precision(),
2418 move |bar: Bar| {
2419 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2420 handler_guard.push(bar);
2421 },
2422 );
2423
2424 let buy = TradeTick {
2425 size: Quantity::from(1),
2426 ..TradeTick::default()
2427 };
2428 let sell = TradeTick {
2429 aggressor_side: AggressorSide::Seller,
2430 size: Quantity::from(1),
2431 ..buy
2432 };
2433
2434 aggregator.handle_trade(buy);
2435 aggregator.handle_trade(buy);
2436 aggregator.handle_trade(sell);
2437 aggregator.handle_trade(sell);
2438
2439 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2440 assert_eq!(handler_guard.len(), 2);
2441 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2442 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2443 }
2444
2445 #[rstest]
2446 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2447 let instrument = InstrumentAny::Equity(equity_aapl);
2448 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2449 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2450 let handler = Arc::new(Mutex::new(Vec::new()));
2451 let handler_clone = Arc::clone(&handler);
2452
2453 let mut aggregator = VolumeBarAggregator::new(
2454 bar_type,
2455 instrument.price_precision(),
2456 instrument.size_precision(),
2457 move |bar: Bar| {
2458 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2459 handler_guard.push(bar);
2460 },
2461 );
2462
2463 aggregator.update(
2464 Price::from("1.00001"),
2465 Quantity::from(25),
2466 UnixNanos::default(),
2467 );
2468
2469 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2470 assert_eq!(handler_guard.len(), 2);
2471 let bar1 = &handler_guard[0];
2472 assert_eq!(bar1.volume, Quantity::from(10));
2473 let bar2 = &handler_guard[1];
2474 assert_eq!(bar2.volume, Quantity::from(10));
2475 }
2476
2477 #[rstest]
2478 fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2479 let instrument = InstrumentAny::Equity(equity_aapl);
2480 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2481 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2482 let handler = Arc::new(Mutex::new(Vec::new()));
2483 let handler_clone = Arc::clone(&handler);
2484
2485 let mut aggregator = VolumeRunsBarAggregator::new(
2486 bar_type,
2487 instrument.price_precision(),
2488 instrument.size_precision(),
2489 move |bar: Bar| {
2490 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2491 handler_guard.push(bar);
2492 },
2493 );
2494
2495 let buy = TradeTick {
2496 instrument_id: instrument.id(),
2497 price: Price::from("1.0"),
2498 size: Quantity::from(1),
2499 ..TradeTick::default()
2500 };
2501 let sell = TradeTick {
2502 aggressor_side: AggressorSide::Seller,
2503 ..buy
2504 };
2505
2506 aggregator.handle_trade(buy);
2507 aggregator.handle_trade(buy); aggregator.handle_trade(sell);
2509 aggregator.handle_trade(sell); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2512 assert!(handler_guard.len() >= 2);
2513 assert!(
2514 (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2515 < f64::EPSILON
2516 );
2517 }
2518
2519 #[rstest]
2520 fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2521 let instrument = InstrumentAny::Equity(equity_aapl);
2522 let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2523 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2524 let handler = Arc::new(Mutex::new(Vec::new()));
2525 let handler_clone = Arc::clone(&handler);
2526
2527 let mut aggregator = VolumeRunsBarAggregator::new(
2528 bar_type,
2529 instrument.price_precision(),
2530 instrument.size_precision(),
2531 move |bar: Bar| {
2532 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2533 handler_guard.push(bar);
2534 },
2535 );
2536
2537 let trade = TradeTick {
2538 instrument_id: instrument.id(),
2539 price: Price::from("1.0"),
2540 size: Quantity::from(5),
2541 ..TradeTick::default()
2542 };
2543
2544 aggregator.handle_trade(trade);
2545
2546 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2547 assert!(!handler_guard.is_empty());
2548 assert!(handler_guard[0].volume.as_f64() > 0.0);
2549 assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2550 }
2551
2552 #[rstest]
2553 fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2554 let instrument = InstrumentAny::Equity(equity_aapl);
2555 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2556 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2557 let handler = Arc::new(Mutex::new(Vec::new()));
2558 let handler_clone = Arc::clone(&handler);
2559
2560 let mut aggregator = VolumeImbalanceBarAggregator::new(
2561 bar_type,
2562 instrument.price_precision(),
2563 instrument.size_precision(),
2564 move |bar: Bar| {
2565 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2566 handler_guard.push(bar);
2567 },
2568 );
2569
2570 let trade_small = TradeTick {
2571 instrument_id: instrument.id(),
2572 price: Price::from("1.0"),
2573 size: Quantity::from(1),
2574 ..TradeTick::default()
2575 };
2576 let trade_large = TradeTick {
2577 size: Quantity::from(3),
2578 ..trade_small
2579 };
2580
2581 aggregator.handle_trade(trade_small);
2582 aggregator.handle_trade(trade_large);
2583
2584 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2585 assert_eq!(handler_guard.len(), 2);
2586 let total_output = handler_guard
2587 .iter()
2588 .map(|bar| bar.volume.as_f64())
2589 .sum::<f64>();
2590 let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2591 assert!((total_output - total_input).abs() < f64::EPSILON);
2592 }
2593
2594 #[rstest]
2595 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2596 let instrument = InstrumentAny::Equity(equity_aapl);
2597 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2599 let handler = Arc::new(Mutex::new(Vec::new()));
2600 let handler_clone = Arc::clone(&handler);
2601
2602 let mut aggregator = ValueBarAggregator::new(
2603 bar_type,
2604 instrument.price_precision(),
2605 instrument.size_precision(),
2606 move |bar: Bar| {
2607 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2608 handler_guard.push(bar);
2609 },
2610 );
2611
2612 aggregator.update(
2614 Price::from("100.00"),
2615 Quantity::from(5),
2616 UnixNanos::default(),
2617 );
2618 aggregator.update(
2619 Price::from("100.00"),
2620 Quantity::from(5),
2621 UnixNanos::from(1000),
2622 );
2623
2624 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2625 assert_eq!(handler_guard.len(), 1);
2626 let bar = handler_guard.first().unwrap();
2627 assert_eq!(bar.volume, Quantity::from(10));
2628 }
2629
2630 #[rstest]
2631 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2632 let instrument = InstrumentAny::Equity(equity_aapl);
2633 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2634 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2635 let handler = Arc::new(Mutex::new(Vec::new()));
2636 let handler_clone = Arc::clone(&handler);
2637
2638 let mut aggregator = ValueBarAggregator::new(
2639 bar_type,
2640 instrument.price_precision(),
2641 instrument.size_precision(),
2642 move |bar: Bar| {
2643 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2644 handler_guard.push(bar);
2645 },
2646 );
2647
2648 aggregator.update(
2650 Price::from("100.00"),
2651 Quantity::from(25),
2652 UnixNanos::default(),
2653 );
2654
2655 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2656 assert_eq!(handler_guard.len(), 2);
2657 let remaining_value = aggregator.get_cumulative_value();
2658 assert!(remaining_value < 1000.0); }
2660
2661 #[rstest]
2662 fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2663 let instrument = InstrumentAny::Equity(equity_aapl);
2664 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2665 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2666 let handler = Arc::new(Mutex::new(Vec::new()));
2667 let handler_clone = Arc::clone(&handler);
2668
2669 let mut aggregator = ValueBarAggregator::new(
2670 bar_type,
2671 instrument.price_precision(),
2672 instrument.size_precision(),
2673 move |bar: Bar| {
2674 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2675 handler_guard.push(bar);
2676 },
2677 );
2678
2679 aggregator.update(
2681 Price::from("0.00"),
2682 Quantity::from(100),
2683 UnixNanos::default(),
2684 );
2685
2686 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2688 assert_eq!(handler_guard.len(), 0);
2689
2690 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2692 }
2693
2694 #[rstest]
2695 fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2696 let instrument = InstrumentAny::Equity(equity_aapl);
2697 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2698 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2699 let handler = Arc::new(Mutex::new(Vec::new()));
2700 let handler_clone = Arc::clone(&handler);
2701
2702 let mut aggregator = ValueBarAggregator::new(
2703 bar_type,
2704 instrument.price_precision(),
2705 instrument.size_precision(),
2706 move |bar: Bar| {
2707 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2708 handler_guard.push(bar);
2709 },
2710 );
2711
2712 aggregator.update(
2714 Price::from("100.00"),
2715 Quantity::from(0),
2716 UnixNanos::default(),
2717 );
2718
2719 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2721 assert_eq!(handler_guard.len(), 0);
2722
2723 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2725 }
2726
2727 #[rstest]
2728 fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2729 let instrument = InstrumentAny::Equity(equity_aapl);
2730 let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2731 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2732 let handler = Arc::new(Mutex::new(Vec::new()));
2733 let handler_clone = Arc::clone(&handler);
2734
2735 let mut aggregator = ValueImbalanceBarAggregator::new(
2736 bar_type,
2737 instrument.price_precision(),
2738 instrument.size_precision(),
2739 move |bar: Bar| {
2740 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2741 handler_guard.push(bar);
2742 },
2743 );
2744
2745 let buy = TradeTick {
2746 price: Price::from("5.0"),
2747 size: Quantity::from(2), instrument_id: instrument.id(),
2749 ..TradeTick::default()
2750 };
2751 let sell = TradeTick {
2752 price: Price::from("5.0"),
2753 size: Quantity::from(2), aggressor_side: AggressorSide::Seller,
2755 instrument_id: instrument.id(),
2756 ..buy
2757 };
2758
2759 aggregator.handle_trade(buy);
2760 aggregator.handle_trade(sell);
2761
2762 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2763 assert!(handler_guard.is_empty());
2764 }
2765
2766 #[rstest]
2767 fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2768 let instrument = InstrumentAny::Equity(equity_aapl);
2769 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2770 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2771 let handler = Arc::new(Mutex::new(Vec::new()));
2772 let handler_clone = Arc::clone(&handler);
2773
2774 let mut aggregator = ValueRunsBarAggregator::new(
2775 bar_type,
2776 instrument.price_precision(),
2777 instrument.size_precision(),
2778 move |bar: Bar| {
2779 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2780 handler_guard.push(bar);
2781 },
2782 );
2783
2784 let trade = TradeTick {
2785 price: Price::from("10.0"),
2786 size: Quantity::from(5),
2787 instrument_id: instrument.id(),
2788 ..TradeTick::default()
2789 };
2790
2791 aggregator.handle_trade(trade);
2792 aggregator.handle_trade(trade);
2793
2794 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2795 assert_eq!(handler_guard.len(), 1);
2796 let bar = handler_guard.first().unwrap();
2797 assert_eq!(bar.volume, Quantity::from(10));
2798 }
2799
2800 #[rstest]
2801 fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2802 let instrument = InstrumentAny::Equity(equity_aapl);
2803 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2804 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2805 let handler = Arc::new(Mutex::new(Vec::new()));
2806 let handler_clone = Arc::clone(&handler);
2807
2808 let mut aggregator = ValueRunsBarAggregator::new(
2809 bar_type,
2810 instrument.price_precision(),
2811 instrument.size_precision(),
2812 move |bar: Bar| {
2813 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2814 handler_guard.push(bar);
2815 },
2816 );
2817
2818 let buy = TradeTick {
2819 price: Price::from("10.0"),
2820 size: Quantity::from(5),
2821 instrument_id: instrument.id(),
2822 ..TradeTick::default()
2823 }; let sell = TradeTick {
2825 price: Price::from("10.0"),
2826 size: Quantity::from(10),
2827 aggressor_side: AggressorSide::Seller,
2828 ..buy
2829 }; aggregator.handle_trade(buy);
2832 aggregator.handle_trade(sell);
2833
2834 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2835 assert_eq!(handler_guard.len(), 1);
2836 assert_eq!(handler_guard[0].volume, Quantity::from(10));
2837 }
2838
2839 #[rstest]
2840 fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2841 let instrument = InstrumentAny::Equity(equity_aapl);
2842 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2843 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2844 let handler = Arc::new(Mutex::new(Vec::new()));
2845 let handler_clone = Arc::clone(&handler);
2846
2847 let mut aggregator = TickRunsBarAggregator::new(
2848 bar_type,
2849 instrument.price_precision(),
2850 instrument.size_precision(),
2851 move |bar: Bar| {
2852 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2853 handler_guard.push(bar);
2854 },
2855 );
2856
2857 let buy = TradeTick::default();
2858
2859 aggregator.handle_trade(buy);
2860 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2865 assert_eq!(handler_guard.len(), 2);
2866 }
2867
2868 #[rstest]
2869 fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2870 let instrument = InstrumentAny::Equity(equity_aapl);
2871 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2872 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2873 let handler = Arc::new(Mutex::new(Vec::new()));
2874 let handler_clone = Arc::clone(&handler);
2875
2876 let mut aggregator = TickRunsBarAggregator::new(
2877 bar_type,
2878 instrument.price_precision(),
2879 instrument.size_precision(),
2880 move |bar: Bar| {
2881 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2882 handler_guard.push(bar);
2883 },
2884 );
2885
2886 let buy = TradeTick::default();
2887 let no_aggressor = TradeTick {
2888 aggressor_side: AggressorSide::NoAggressor,
2889 ..buy
2890 };
2891
2892 aggregator.handle_trade(buy);
2893 aggregator.handle_trade(no_aggressor); aggregator.handle_trade(no_aggressor); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2898 assert_eq!(handler_guard.len(), 1);
2899 }
2900
2901 #[rstest]
2902 fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2903 let instrument = InstrumentAny::Equity(equity_aapl);
2904 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2905 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2906 let handler = Arc::new(Mutex::new(Vec::new()));
2907 let handler_clone = Arc::clone(&handler);
2908
2909 let mut aggregator = VolumeRunsBarAggregator::new(
2910 bar_type,
2911 instrument.price_precision(),
2912 instrument.size_precision(),
2913 move |bar: Bar| {
2914 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2915 handler_guard.push(bar);
2916 },
2917 );
2918
2919 let buy = TradeTick {
2920 instrument_id: instrument.id(),
2921 price: Price::from("1.0"),
2922 size: Quantity::from(1),
2923 ..TradeTick::default()
2924 };
2925
2926 aggregator.handle_trade(buy);
2927 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2932 assert_eq!(handler_guard.len(), 2);
2933 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2934 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2935 }
2936
2937 #[rstest]
2938 fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2939 let instrument = InstrumentAny::Equity(equity_aapl);
2940 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2941 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2942 let handler = Arc::new(Mutex::new(Vec::new()));
2943 let handler_clone = Arc::clone(&handler);
2944
2945 let mut aggregator = ValueRunsBarAggregator::new(
2946 bar_type,
2947 instrument.price_precision(),
2948 instrument.size_precision(),
2949 move |bar: Bar| {
2950 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2951 handler_guard.push(bar);
2952 },
2953 );
2954
2955 let buy = TradeTick {
2956 instrument_id: instrument.id(),
2957 price: Price::from("10.0"),
2958 size: Quantity::from(5),
2959 ..TradeTick::default()
2960 }; aggregator.handle_trade(buy);
2963 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2968 assert_eq!(handler_guard.len(), 2);
2969 assert_eq!(handler_guard[0].volume, Quantity::from(10));
2970 assert_eq!(handler_guard[1].volume, Quantity::from(10));
2971 }
2972
2973 #[rstest]
2974 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
2975 let instrument = InstrumentAny::Equity(equity_aapl);
2976 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2978 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2979 let handler = Arc::new(Mutex::new(Vec::new()));
2980 let handler_clone = Arc::clone(&handler);
2981 let clock = Rc::new(RefCell::new(TestClock::new()));
2982
2983 let mut aggregator = TimeBarAggregator::new(
2984 bar_type,
2985 instrument.price_precision(),
2986 instrument.size_precision(),
2987 clock.clone(),
2988 move |bar: Bar| {
2989 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2990 handler_guard.push(bar);
2991 },
2992 true, false, BarIntervalType::LeftOpen,
2995 None, 15, false, );
2999
3000 aggregator.update(
3001 Price::from("100.00"),
3002 Quantity::from(1),
3003 UnixNanos::default(),
3004 );
3005
3006 let next_sec = UnixNanos::from(1_000_000_000);
3007 clock.borrow_mut().set_time(next_sec);
3008
3009 let event = TimeEvent::new(
3010 Ustr::from("1-SECOND-LAST"),
3011 UUID4::new(),
3012 next_sec,
3013 next_sec,
3014 );
3015 aggregator.build_bar(event);
3016
3017 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3018 assert_eq!(handler_guard.len(), 1);
3019 let bar = handler_guard.first().unwrap();
3020 assert_eq!(bar.ts_event, UnixNanos::default());
3021 assert_eq!(bar.ts_init, next_sec);
3022 }
3023
3024 #[rstest]
3025 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3026 let instrument = InstrumentAny::Equity(equity_aapl);
3027 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3028 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3029 let handler = Arc::new(Mutex::new(Vec::new()));
3030 let handler_clone = Arc::clone(&handler);
3031 let clock = Rc::new(RefCell::new(TestClock::new()));
3032
3033 let mut aggregator = TimeBarAggregator::new(
3034 bar_type,
3035 instrument.price_precision(),
3036 instrument.size_precision(),
3037 clock.clone(),
3038 move |bar: Bar| {
3039 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3040 handler_guard.push(bar);
3041 },
3042 true, true, BarIntervalType::LeftOpen,
3045 None,
3046 15,
3047 false, );
3049
3050 aggregator.update(
3052 Price::from("100.00"),
3053 Quantity::from(1),
3054 UnixNanos::default(),
3055 );
3056
3057 let ts1 = UnixNanos::from(1_000_000_000);
3059 clock.borrow_mut().set_time(ts1);
3060 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3061 aggregator.build_bar(event);
3062
3063 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3065
3066 let ts2 = UnixNanos::from(2_000_000_000);
3068 clock.borrow_mut().set_time(ts2);
3069 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3070 aggregator.build_bar(event);
3071
3072 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3073 assert_eq!(handler_guard.len(), 2);
3074
3075 let bar1 = &handler_guard[0];
3076 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
3078 assert_eq!(bar1.close, Price::from("100.00"));
3079 let bar2 = &handler_guard[1];
3080 assert_eq!(bar2.ts_event, ts2);
3081 assert_eq!(bar2.ts_init, ts2);
3082 assert_eq!(bar2.close, Price::from("101.00"));
3083 }
3084
3085 #[rstest]
3086 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3087 let instrument = InstrumentAny::Equity(equity_aapl);
3088 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3089 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3090 let handler = Arc::new(Mutex::new(Vec::new()));
3091 let handler_clone = Arc::clone(&handler);
3092 let clock = Rc::new(RefCell::new(TestClock::new()));
3093 let mut aggregator = TimeBarAggregator::new(
3094 bar_type,
3095 instrument.price_precision(),
3096 instrument.size_precision(),
3097 clock.clone(),
3098 move |bar: Bar| {
3099 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3100 handler_guard.push(bar);
3101 },
3102 true, true, BarIntervalType::RightOpen,
3105 None,
3106 15,
3107 false, );
3109
3110 aggregator.update(
3112 Price::from("100.00"),
3113 Quantity::from(1),
3114 UnixNanos::default(),
3115 );
3116
3117 let ts1 = UnixNanos::from(1_000_000_000);
3119 clock.borrow_mut().set_time(ts1);
3120 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3121 aggregator.build_bar(event);
3122
3123 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3125
3126 let ts2 = UnixNanos::from(2_000_000_000);
3128 clock.borrow_mut().set_time(ts2);
3129 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3130 aggregator.build_bar(event);
3131
3132 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3133 assert_eq!(handler_guard.len(), 2);
3134
3135 let bar1 = &handler_guard[0];
3136 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
3138 assert_eq!(bar1.close, Price::from("100.00"));
3139
3140 let bar2 = &handler_guard[1];
3141 assert_eq!(bar2.ts_event, ts1);
3142 assert_eq!(bar2.ts_init, ts2);
3143 assert_eq!(bar2.close, Price::from("101.00"));
3144 }
3145
3146 #[rstest]
3147 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3148 let instrument = InstrumentAny::Equity(equity_aapl);
3149 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3150 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3151 let handler = Arc::new(Mutex::new(Vec::new()));
3152 let handler_clone = Arc::clone(&handler);
3153 let clock = Rc::new(RefCell::new(TestClock::new()));
3154
3155 let mut aggregator = TimeBarAggregator::new(
3157 bar_type,
3158 instrument.price_precision(),
3159 instrument.size_precision(),
3160 clock.clone(),
3161 move |bar: Bar| {
3162 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3163 handler_guard.push(bar);
3164 },
3165 false, true, BarIntervalType::LeftOpen,
3168 None,
3169 15,
3170 false, );
3172
3173 let ts1 = UnixNanos::from(1_000_000_000);
3175 clock.borrow_mut().set_time(ts1);
3176 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3177 aggregator.build_bar(event);
3178
3179 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3180 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
3182
3183 let handler = Arc::new(Mutex::new(Vec::new()));
3185 let handler_clone = Arc::clone(&handler);
3186 let mut aggregator = TimeBarAggregator::new(
3187 bar_type,
3188 instrument.price_precision(),
3189 instrument.size_precision(),
3190 clock.clone(),
3191 move |bar: Bar| {
3192 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3193 handler_guard.push(bar);
3194 },
3195 true, true, BarIntervalType::LeftOpen,
3198 None,
3199 15,
3200 false, );
3202
3203 aggregator.update(
3204 Price::from("100.00"),
3205 Quantity::from(1),
3206 UnixNanos::default(),
3207 );
3208
3209 let ts1 = UnixNanos::from(1_000_000_000);
3211 clock.borrow_mut().set_time(ts1);
3212 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3213 aggregator.build_bar(event);
3214
3215 let ts2 = UnixNanos::from(2_000_000_000);
3217 clock.borrow_mut().set_time(ts2);
3218 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3219 aggregator.build_bar(event);
3220
3221 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3222 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
3224 assert_eq!(bar1.close, Price::from("100.00"));
3225 let bar2 = &handler_guard[1];
3226 assert_eq!(bar2.close, Price::from("100.00")); }
3228
3229 #[rstest]
3230 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3231 let instrument = InstrumentAny::Equity(equity_aapl);
3232 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3233 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3234 let clock = Rc::new(RefCell::new(TestClock::new()));
3235 let handler = Arc::new(Mutex::new(Vec::new()));
3236 let handler_clone = Arc::clone(&handler);
3237
3238 let mut aggregator = TimeBarAggregator::new(
3239 bar_type,
3240 instrument.price_precision(),
3241 instrument.size_precision(),
3242 clock.clone(),
3243 move |bar: Bar| {
3244 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3245 handler_guard.push(bar);
3246 },
3247 true, true, BarIntervalType::RightOpen,
3250 None,
3251 15,
3252 false, );
3254
3255 let ts1 = UnixNanos::from(1_000_000_000);
3256 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3257
3258 let ts2 = UnixNanos::from(2_000_000_000);
3259 clock.borrow_mut().set_time(ts2);
3260
3261 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3263 aggregator.build_bar(event);
3264
3265 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3266 let bar = handler_guard.first().unwrap();
3267 assert_eq!(bar.ts_event, UnixNanos::default());
3268 assert_eq!(bar.ts_init, ts2);
3269 }
3270
3271 #[rstest]
3276 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3277 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3278 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3280 let handler = Arc::new(Mutex::new(Vec::new()));
3281 let handler_clone = Arc::clone(&handler);
3282
3283 let aggregator = RenkoBarAggregator::new(
3284 bar_type,
3285 instrument.price_precision(),
3286 instrument.size_precision(),
3287 instrument.price_increment(),
3288 move |bar: Bar| {
3289 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3290 handler_guard.push(bar);
3291 },
3292 );
3293
3294 assert_eq!(aggregator.bar_type(), bar_type);
3295 assert!(!aggregator.is_running());
3296 let expected_brick_size = 10 * instrument.price_increment().raw;
3298 assert_eq!(aggregator.brick_size, expected_brick_size);
3299 }
3300
3301 #[rstest]
3302 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3303 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3304 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3306 let handler = Arc::new(Mutex::new(Vec::new()));
3307 let handler_clone = Arc::clone(&handler);
3308
3309 let mut aggregator = RenkoBarAggregator::new(
3310 bar_type,
3311 instrument.price_precision(),
3312 instrument.size_precision(),
3313 instrument.price_increment(),
3314 move |bar: Bar| {
3315 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3316 handler_guard.push(bar);
3317 },
3318 );
3319
3320 aggregator.update(
3322 Price::from("1.00000"),
3323 Quantity::from(1),
3324 UnixNanos::default(),
3325 );
3326 aggregator.update(
3327 Price::from("1.00005"),
3328 Quantity::from(1),
3329 UnixNanos::from(1000),
3330 );
3331
3332 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3333 assert_eq!(handler_guard.len(), 0); }
3335
3336 #[rstest]
3337 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3338 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3339 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3341 let handler = Arc::new(Mutex::new(Vec::new()));
3342 let handler_clone = Arc::clone(&handler);
3343
3344 let mut aggregator = RenkoBarAggregator::new(
3345 bar_type,
3346 instrument.price_precision(),
3347 instrument.size_precision(),
3348 instrument.price_increment(),
3349 move |bar: Bar| {
3350 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3351 handler_guard.push(bar);
3352 },
3353 );
3354
3355 aggregator.update(
3357 Price::from("1.00000"),
3358 Quantity::from(1),
3359 UnixNanos::default(),
3360 );
3361 aggregator.update(
3362 Price::from("1.00015"),
3363 Quantity::from(1),
3364 UnixNanos::from(1000),
3365 );
3366
3367 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3368 assert_eq!(handler_guard.len(), 1);
3369
3370 let bar = handler_guard.first().unwrap();
3371 assert_eq!(bar.open, Price::from("1.00000"));
3372 assert_eq!(bar.high, Price::from("1.00010"));
3373 assert_eq!(bar.low, Price::from("1.00000"));
3374 assert_eq!(bar.close, Price::from("1.00010"));
3375 assert_eq!(bar.volume, Quantity::from(2));
3376 assert_eq!(bar.ts_event, UnixNanos::from(1000));
3377 assert_eq!(bar.ts_init, UnixNanos::from(1000));
3378 }
3379
3380 #[rstest]
3381 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3382 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3383 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3385 let handler = Arc::new(Mutex::new(Vec::new()));
3386 let handler_clone = Arc::clone(&handler);
3387
3388 let mut aggregator = RenkoBarAggregator::new(
3389 bar_type,
3390 instrument.price_precision(),
3391 instrument.size_precision(),
3392 instrument.price_increment(),
3393 move |bar: Bar| {
3394 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3395 handler_guard.push(bar);
3396 },
3397 );
3398
3399 aggregator.update(
3401 Price::from("1.00000"),
3402 Quantity::from(1),
3403 UnixNanos::default(),
3404 );
3405 aggregator.update(
3406 Price::from("1.00025"),
3407 Quantity::from(1),
3408 UnixNanos::from(1000),
3409 );
3410
3411 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3412 assert_eq!(handler_guard.len(), 2);
3413
3414 let bar1 = &handler_guard[0];
3415 assert_eq!(bar1.open, Price::from("1.00000"));
3416 assert_eq!(bar1.high, Price::from("1.00010"));
3417 assert_eq!(bar1.low, Price::from("1.00000"));
3418 assert_eq!(bar1.close, Price::from("1.00010"));
3419
3420 let bar2 = &handler_guard[1];
3421 assert_eq!(bar2.open, Price::from("1.00010"));
3422 assert_eq!(bar2.high, Price::from("1.00020"));
3423 assert_eq!(bar2.low, Price::from("1.00010"));
3424 assert_eq!(bar2.close, Price::from("1.00020"));
3425 }
3426
3427 #[rstest]
3428 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3429 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3430 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3432 let handler = Arc::new(Mutex::new(Vec::new()));
3433 let handler_clone = Arc::clone(&handler);
3434
3435 let mut aggregator = RenkoBarAggregator::new(
3436 bar_type,
3437 instrument.price_precision(),
3438 instrument.size_precision(),
3439 instrument.price_increment(),
3440 move |bar: Bar| {
3441 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3442 handler_guard.push(bar);
3443 },
3444 );
3445
3446 aggregator.update(
3448 Price::from("1.00020"),
3449 Quantity::from(1),
3450 UnixNanos::default(),
3451 );
3452 aggregator.update(
3453 Price::from("1.00005"),
3454 Quantity::from(1),
3455 UnixNanos::from(1000),
3456 );
3457
3458 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3459 assert_eq!(handler_guard.len(), 1);
3460
3461 let bar = handler_guard.first().unwrap();
3462 assert_eq!(bar.open, Price::from("1.00020"));
3463 assert_eq!(bar.high, Price::from("1.00020"));
3464 assert_eq!(bar.low, Price::from("1.00010"));
3465 assert_eq!(bar.close, Price::from("1.00010"));
3466 assert_eq!(bar.volume, Quantity::from(2));
3467 }
3468
3469 #[rstest]
3470 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3471 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3472 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3474 let handler = Arc::new(Mutex::new(Vec::new()));
3475 let handler_clone = Arc::clone(&handler);
3476
3477 let mut aggregator = RenkoBarAggregator::new(
3478 bar_type,
3479 instrument.price_precision(),
3480 instrument.size_precision(),
3481 instrument.price_increment(),
3482 move |bar: Bar| {
3483 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3484 handler_guard.push(bar);
3485 },
3486 );
3487
3488 let input_bar = Bar::new(
3490 BarType::new(
3491 instrument.id(),
3492 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3493 AggregationSource::Internal,
3494 ),
3495 Price::from("1.00000"),
3496 Price::from("1.00005"),
3497 Price::from("0.99995"),
3498 Price::from("1.00005"), Quantity::from(100),
3500 UnixNanos::default(),
3501 UnixNanos::from(1000),
3502 );
3503
3504 aggregator.handle_bar(input_bar);
3505
3506 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3507 assert_eq!(handler_guard.len(), 0); }
3509
3510 #[rstest]
3511 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3512 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3513 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3515 let handler = Arc::new(Mutex::new(Vec::new()));
3516 let handler_clone = Arc::clone(&handler);
3517
3518 let mut aggregator = RenkoBarAggregator::new(
3519 bar_type,
3520 instrument.price_precision(),
3521 instrument.size_precision(),
3522 instrument.price_increment(),
3523 move |bar: Bar| {
3524 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3525 handler_guard.push(bar);
3526 },
3527 );
3528
3529 let bar1 = Bar::new(
3531 BarType::new(
3532 instrument.id(),
3533 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3534 AggregationSource::Internal,
3535 ),
3536 Price::from("1.00000"),
3537 Price::from("1.00005"),
3538 Price::from("0.99995"),
3539 Price::from("1.00000"),
3540 Quantity::from(100),
3541 UnixNanos::default(),
3542 UnixNanos::default(),
3543 );
3544
3545 let bar2 = Bar::new(
3547 BarType::new(
3548 instrument.id(),
3549 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3550 AggregationSource::Internal,
3551 ),
3552 Price::from("1.00000"),
3553 Price::from("1.00015"),
3554 Price::from("0.99995"),
3555 Price::from("1.00010"), Quantity::from(50),
3557 UnixNanos::from(60_000_000_000),
3558 UnixNanos::from(60_000_000_000),
3559 );
3560
3561 aggregator.handle_bar(bar1);
3562 aggregator.handle_bar(bar2);
3563
3564 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3565 assert_eq!(handler_guard.len(), 1);
3566
3567 let bar = handler_guard.first().unwrap();
3568 assert_eq!(bar.open, Price::from("1.00000"));
3569 assert_eq!(bar.high, Price::from("1.00010"));
3570 assert_eq!(bar.low, Price::from("1.00000"));
3571 assert_eq!(bar.close, Price::from("1.00010"));
3572 assert_eq!(bar.volume, Quantity::from(150));
3573 }
3574
3575 #[rstest]
3576 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3577 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3578 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3580 let handler = Arc::new(Mutex::new(Vec::new()));
3581 let handler_clone = Arc::clone(&handler);
3582
3583 let mut aggregator = RenkoBarAggregator::new(
3584 bar_type,
3585 instrument.price_precision(),
3586 instrument.size_precision(),
3587 instrument.price_increment(),
3588 move |bar: Bar| {
3589 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3590 handler_guard.push(bar);
3591 },
3592 );
3593
3594 let bar1 = Bar::new(
3596 BarType::new(
3597 instrument.id(),
3598 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3599 AggregationSource::Internal,
3600 ),
3601 Price::from("1.00000"),
3602 Price::from("1.00005"),
3603 Price::from("0.99995"),
3604 Price::from("1.00000"),
3605 Quantity::from(100),
3606 UnixNanos::default(),
3607 UnixNanos::default(),
3608 );
3609
3610 let bar2 = Bar::new(
3612 BarType::new(
3613 instrument.id(),
3614 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3615 AggregationSource::Internal,
3616 ),
3617 Price::from("1.00000"),
3618 Price::from("1.00035"),
3619 Price::from("0.99995"),
3620 Price::from("1.00030"), Quantity::from(50),
3622 UnixNanos::from(60_000_000_000),
3623 UnixNanos::from(60_000_000_000),
3624 );
3625
3626 aggregator.handle_bar(bar1);
3627 aggregator.handle_bar(bar2);
3628
3629 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3630 assert_eq!(handler_guard.len(), 3);
3631
3632 let bar1 = &handler_guard[0];
3633 assert_eq!(bar1.open, Price::from("1.00000"));
3634 assert_eq!(bar1.close, Price::from("1.00010"));
3635
3636 let bar2 = &handler_guard[1];
3637 assert_eq!(bar2.open, Price::from("1.00010"));
3638 assert_eq!(bar2.close, Price::from("1.00020"));
3639
3640 let bar3 = &handler_guard[2];
3641 assert_eq!(bar3.open, Price::from("1.00020"));
3642 assert_eq!(bar3.close, Price::from("1.00030"));
3643 }
3644
3645 #[rstest]
3646 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3647 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3648 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3650 let handler = Arc::new(Mutex::new(Vec::new()));
3651 let handler_clone = Arc::clone(&handler);
3652
3653 let mut aggregator = RenkoBarAggregator::new(
3654 bar_type,
3655 instrument.price_precision(),
3656 instrument.size_precision(),
3657 instrument.price_increment(),
3658 move |bar: Bar| {
3659 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3660 handler_guard.push(bar);
3661 },
3662 );
3663
3664 let bar1 = Bar::new(
3666 BarType::new(
3667 instrument.id(),
3668 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3669 AggregationSource::Internal,
3670 ),
3671 Price::from("1.00020"),
3672 Price::from("1.00025"),
3673 Price::from("1.00015"),
3674 Price::from("1.00020"),
3675 Quantity::from(100),
3676 UnixNanos::default(),
3677 UnixNanos::default(),
3678 );
3679
3680 let bar2 = Bar::new(
3682 BarType::new(
3683 instrument.id(),
3684 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3685 AggregationSource::Internal,
3686 ),
3687 Price::from("1.00020"),
3688 Price::from("1.00025"),
3689 Price::from("1.00005"),
3690 Price::from("1.00010"), Quantity::from(50),
3692 UnixNanos::from(60_000_000_000),
3693 UnixNanos::from(60_000_000_000),
3694 );
3695
3696 aggregator.handle_bar(bar1);
3697 aggregator.handle_bar(bar2);
3698
3699 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3700 assert_eq!(handler_guard.len(), 1);
3701
3702 let bar = handler_guard.first().unwrap();
3703 assert_eq!(bar.open, Price::from("1.00020"));
3704 assert_eq!(bar.high, Price::from("1.00020"));
3705 assert_eq!(bar.low, Price::from("1.00010"));
3706 assert_eq!(bar.close, Price::from("1.00010"));
3707 assert_eq!(bar.volume, Quantity::from(150));
3708 }
3709
3710 #[rstest]
3711 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3712 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3713
3714 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3717 let handler = Arc::new(Mutex::new(Vec::new()));
3718 let handler_clone = Arc::clone(&handler);
3719
3720 let aggregator_5 = RenkoBarAggregator::new(
3721 bar_type_5,
3722 instrument.price_precision(),
3723 instrument.size_precision(),
3724 instrument.price_increment(),
3725 move |_bar: Bar| {
3726 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3727 handler_guard.push(_bar);
3728 },
3729 );
3730
3731 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3733 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3734
3735 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3737 let handler2 = Arc::new(Mutex::new(Vec::new()));
3738 let handler2_clone = Arc::clone(&handler2);
3739
3740 let aggregator_20 = RenkoBarAggregator::new(
3741 bar_type_20,
3742 instrument.price_precision(),
3743 instrument.size_precision(),
3744 instrument.price_increment(),
3745 move |_bar: Bar| {
3746 let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3747 handler_guard.push(_bar);
3748 },
3749 );
3750
3751 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3753 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3754 }
3755
3756 #[rstest]
3757 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3758 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3759 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3761 let handler = Arc::new(Mutex::new(Vec::new()));
3762 let handler_clone = Arc::clone(&handler);
3763
3764 let mut aggregator = RenkoBarAggregator::new(
3765 bar_type,
3766 instrument.price_precision(),
3767 instrument.size_precision(),
3768 instrument.price_increment(),
3769 move |bar: Bar| {
3770 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3771 handler_guard.push(bar);
3772 },
3773 );
3774
3775 aggregator.update(
3777 Price::from("1.00000"),
3778 Quantity::from(1),
3779 UnixNanos::from(1000),
3780 );
3781 aggregator.update(
3782 Price::from("1.00010"),
3783 Quantity::from(1),
3784 UnixNanos::from(2000),
3785 ); aggregator.update(
3787 Price::from("1.00020"),
3788 Quantity::from(1),
3789 UnixNanos::from(3000),
3790 ); aggregator.update(
3792 Price::from("1.00025"),
3793 Quantity::from(1),
3794 UnixNanos::from(4000),
3795 ); aggregator.update(
3797 Price::from("1.00030"),
3798 Quantity::from(1),
3799 UnixNanos::from(5000),
3800 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3803 assert_eq!(handler_guard.len(), 3);
3804
3805 let bar1 = &handler_guard[0];
3806 assert_eq!(bar1.open, Price::from("1.00000"));
3807 assert_eq!(bar1.close, Price::from("1.00010"));
3808
3809 let bar2 = &handler_guard[1];
3810 assert_eq!(bar2.open, Price::from("1.00010"));
3811 assert_eq!(bar2.close, Price::from("1.00020"));
3812
3813 let bar3 = &handler_guard[2];
3814 assert_eq!(bar3.open, Price::from("1.00020"));
3815 assert_eq!(bar3.close, Price::from("1.00030"));
3816 }
3817
3818 #[rstest]
3819 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3820 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3821 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3823 let handler = Arc::new(Mutex::new(Vec::new()));
3824 let handler_clone = Arc::clone(&handler);
3825
3826 let mut aggregator = RenkoBarAggregator::new(
3827 bar_type,
3828 instrument.price_precision(),
3829 instrument.size_precision(),
3830 instrument.price_increment(),
3831 move |bar: Bar| {
3832 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3833 handler_guard.push(bar);
3834 },
3835 );
3836
3837 aggregator.update(
3839 Price::from("1.00000"),
3840 Quantity::from(1),
3841 UnixNanos::from(1000),
3842 );
3843 aggregator.update(
3844 Price::from("1.00010"),
3845 Quantity::from(1),
3846 UnixNanos::from(2000),
3847 ); aggregator.update(
3849 Price::from("0.99990"),
3850 Quantity::from(1),
3851 UnixNanos::from(3000),
3852 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3855 assert_eq!(handler_guard.len(), 3);
3856
3857 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
3859 assert_eq!(bar1.high, Price::from("1.00010"));
3860 assert_eq!(bar1.low, Price::from("1.00000"));
3861 assert_eq!(bar1.close, Price::from("1.00010"));
3862
3863 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
3865 assert_eq!(bar2.high, Price::from("1.00010"));
3866 assert_eq!(bar2.low, Price::from("1.00000"));
3867 assert_eq!(bar2.close, Price::from("1.00000"));
3868
3869 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
3871 assert_eq!(bar3.high, Price::from("1.00000"));
3872 assert_eq!(bar3.low, Price::from("0.99990"));
3873 assert_eq!(bar3.close, Price::from("0.99990"));
3874 }
3875
3876 #[rstest]
3877 fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3878 let instrument = InstrumentAny::Equity(equity_aapl);
3879 let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3880 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3881 let handler = Arc::new(Mutex::new(Vec::new()));
3882 let handler_clone = Arc::clone(&handler);
3883
3884 let mut aggregator = TickImbalanceBarAggregator::new(
3885 bar_type,
3886 instrument.price_precision(),
3887 instrument.size_precision(),
3888 move |bar: Bar| {
3889 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3890 handler_guard.push(bar);
3891 },
3892 );
3893
3894 let buy = TradeTick {
3895 aggressor_side: AggressorSide::Buyer,
3896 ..TradeTick::default()
3897 };
3898 let sell = TradeTick {
3899 aggressor_side: AggressorSide::Seller,
3900 ..TradeTick::default()
3901 };
3902
3903 aggregator.handle_trade(buy);
3904 aggregator.handle_trade(sell);
3905 aggregator.handle_trade(buy);
3906
3907 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3908 assert_eq!(handler_guard.len(), 0);
3909 }
3910
3911 #[rstest]
3912 fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3913 let instrument = InstrumentAny::Equity(equity_aapl);
3914 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3915 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3916 let handler = Arc::new(Mutex::new(Vec::new()));
3917 let handler_clone = Arc::clone(&handler);
3918
3919 let mut aggregator = TickImbalanceBarAggregator::new(
3920 bar_type,
3921 instrument.price_precision(),
3922 instrument.size_precision(),
3923 move |bar: Bar| {
3924 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3925 handler_guard.push(bar);
3926 },
3927 );
3928
3929 let buy = TradeTick {
3930 aggressor_side: AggressorSide::Buyer,
3931 ..TradeTick::default()
3932 };
3933 let no_aggressor = TradeTick {
3934 aggressor_side: AggressorSide::NoAggressor,
3935 ..TradeTick::default()
3936 };
3937
3938 aggregator.handle_trade(buy);
3939 aggregator.handle_trade(no_aggressor);
3940 aggregator.handle_trade(buy);
3941
3942 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3943 assert_eq!(handler_guard.len(), 1);
3944 }
3945
3946 #[rstest]
3947 fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3948 let instrument = InstrumentAny::Equity(equity_aapl);
3949 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3950 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3951 let handler = Arc::new(Mutex::new(Vec::new()));
3952 let handler_clone = Arc::clone(&handler);
3953
3954 let mut aggregator = TickRunsBarAggregator::new(
3955 bar_type,
3956 instrument.price_precision(),
3957 instrument.size_precision(),
3958 move |bar: Bar| {
3959 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3960 handler_guard.push(bar);
3961 },
3962 );
3963
3964 let buy = TradeTick {
3965 aggressor_side: AggressorSide::Buyer,
3966 ..TradeTick::default()
3967 };
3968 let sell = TradeTick {
3969 aggressor_side: AggressorSide::Seller,
3970 ..TradeTick::default()
3971 };
3972
3973 aggregator.handle_trade(buy);
3974 aggregator.handle_trade(buy);
3975 aggregator.handle_trade(sell);
3976 aggregator.handle_trade(sell);
3977
3978 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3979 assert_eq!(handler_guard.len(), 2);
3980 }
3981
3982 #[rstest]
3983 fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
3984 let instrument = InstrumentAny::Equity(equity_aapl);
3985 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
3986 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3987 let handler = Arc::new(Mutex::new(Vec::new()));
3988 let handler_clone = Arc::clone(&handler);
3989
3990 let mut aggregator = VolumeImbalanceBarAggregator::new(
3991 bar_type,
3992 instrument.price_precision(),
3993 instrument.size_precision(),
3994 move |bar: Bar| {
3995 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3996 handler_guard.push(bar);
3997 },
3998 );
3999
4000 let large_trade = TradeTick {
4001 size: Quantity::from(25),
4002 aggressor_side: AggressorSide::Buyer,
4003 ..TradeTick::default()
4004 };
4005
4006 aggregator.handle_trade(large_trade);
4007
4008 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4009 assert_eq!(handler_guard.len(), 2);
4010 }
4011
4012 #[rstest]
4013 fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4014 equity_aapl: Equity,
4015 ) {
4016 let instrument = InstrumentAny::Equity(equity_aapl);
4017 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4018 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4019 let handler = Arc::new(Mutex::new(Vec::new()));
4020 let handler_clone = Arc::clone(&handler);
4021
4022 let mut aggregator = VolumeImbalanceBarAggregator::new(
4023 bar_type,
4024 instrument.price_precision(),
4025 instrument.size_precision(),
4026 move |bar: Bar| {
4027 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4028 handler_guard.push(bar);
4029 },
4030 );
4031
4032 let buy = TradeTick {
4033 size: Quantity::from(5),
4034 aggressor_side: AggressorSide::Buyer,
4035 ..TradeTick::default()
4036 };
4037 let no_aggressor = TradeTick {
4038 size: Quantity::from(3),
4039 aggressor_side: AggressorSide::NoAggressor,
4040 ..TradeTick::default()
4041 };
4042
4043 aggregator.handle_trade(buy);
4044 aggregator.handle_trade(no_aggressor);
4045 aggregator.handle_trade(buy);
4046
4047 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4048 assert_eq!(handler_guard.len(), 1);
4049 }
4050
4051 #[rstest]
4052 fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4053 let instrument = InstrumentAny::Equity(equity_aapl);
4054 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4055 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4056 let handler = Arc::new(Mutex::new(Vec::new()));
4057 let handler_clone = Arc::clone(&handler);
4058
4059 let mut aggregator = VolumeRunsBarAggregator::new(
4060 bar_type,
4061 instrument.price_precision(),
4062 instrument.size_precision(),
4063 move |bar: Bar| {
4064 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4065 handler_guard.push(bar);
4066 },
4067 );
4068
4069 let large_trade = TradeTick {
4070 size: Quantity::from(25),
4071 aggressor_side: AggressorSide::Buyer,
4072 ..TradeTick::default()
4073 };
4074
4075 aggregator.handle_trade(large_trade);
4076
4077 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4078 assert_eq!(handler_guard.len(), 2);
4079 }
4080
4081 #[rstest]
4082 fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4083 let instrument = InstrumentAny::Equity(equity_aapl);
4084 let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4085 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4086 let handler = Arc::new(Mutex::new(Vec::new()));
4087 let handler_clone = Arc::clone(&handler);
4088
4089 let mut aggregator = ValueRunsBarAggregator::new(
4090 bar_type,
4091 instrument.price_precision(),
4092 instrument.size_precision(),
4093 move |bar: Bar| {
4094 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4095 handler_guard.push(bar);
4096 },
4097 );
4098
4099 let large_trade = TradeTick {
4100 price: Price::from("5.00"),
4101 size: Quantity::from(25),
4102 aggressor_side: AggressorSide::Buyer,
4103 ..TradeTick::default()
4104 };
4105
4106 aggregator.handle_trade(large_trade);
4107
4108 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4109 assert_eq!(handler_guard.len(), 2);
4110 }
4111}