1use std::{
22 any::Any,
23 cell::RefCell,
24 fmt::Debug,
25 ops::Add,
26 rc::{Rc, Weak},
27};
28
29use chrono::{Duration, TimeDelta};
30use nautilus_common::{
31 clock::{Clock, TestClock},
32 timer::{TimeEvent, TimeEventCallback},
33};
34use nautilus_core::{
35 UnixNanos,
36 correctness::{self, FAILED},
37 datetime::{add_n_months, add_n_months_nanos, add_n_years, add_n_years_nanos},
38};
39use nautilus_model::{
40 data::{
41 QuoteTick, TradeTick,
42 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
43 },
44 enums::{AggregationSource, AggressorSide, BarAggregation, BarIntervalType},
45 types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
46};
47
48type BarHandler = Box<dyn FnMut(Bar)>;
50
51pub trait BarAggregator: Any + Debug {
55 fn bar_type(&self) -> BarType;
57 fn is_running(&self) -> bool;
59 fn set_is_running(&mut self, value: bool);
61 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
63 fn handle_quote(&mut self, quote: QuoteTick) {
65 let spec = self.bar_type().spec();
66 self.update(
67 quote.extract_price(spec.price_type),
68 quote.extract_size(spec.price_type),
69 quote.ts_init,
70 );
71 }
72 fn handle_trade(&mut self, trade: TradeTick) {
74 self.update(trade.price, trade.size, trade.ts_init);
75 }
76 fn handle_bar(&mut self, bar: Bar) {
78 self.update_bar(bar, bar.volume, bar.ts_init);
79 }
80 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
81 fn stop(&mut self) {}
83 fn set_historical_mode(&mut self, _historical_mode: bool, _handler: Box<dyn FnMut(Bar)>) {}
85 fn set_historical_events(&mut self, _events: Vec<TimeEvent>) {}
87 fn set_clock(&mut self, _clock: Rc<RefCell<dyn Clock>>) {}
89 fn build_bar(&mut self, _event: TimeEvent) {}
91 fn start_timer(&mut self, _aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {}
95 fn set_aggregator_weak(&mut self, _weak: Weak<RefCell<Box<dyn BarAggregator>>>) {}
98}
99
100impl dyn BarAggregator {
101 pub fn as_any(&self) -> &dyn Any {
103 self
104 }
105 pub fn as_any_mut(&mut self) -> &mut dyn Any {
107 self
108 }
109}
110
111#[derive(Debug)]
113pub struct BarBuilder {
114 bar_type: BarType,
115 price_precision: u8,
116 size_precision: u8,
117 initialized: bool,
118 ts_last: UnixNanos,
119 count: usize,
120 last_close: Option<Price>,
121 open: Option<Price>,
122 high: Option<Price>,
123 low: Option<Price>,
124 close: Option<Price>,
125 volume: Quantity,
126}
127
128impl BarBuilder {
129 #[must_use]
137 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
138 correctness::check_equal(
139 &bar_type.aggregation_source(),
140 &AggregationSource::Internal,
141 "bar_type.aggregation_source",
142 "AggregationSource::Internal",
143 )
144 .expect(FAILED);
145
146 Self {
147 bar_type,
148 price_precision,
149 size_precision,
150 initialized: false,
151 ts_last: UnixNanos::default(),
152 count: 0,
153 last_close: None,
154 open: None,
155 high: None,
156 low: None,
157 close: None,
158 volume: Quantity::zero(size_precision),
159 }
160 }
161
162 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
168 if ts_init < self.ts_last {
169 return; }
171
172 if self.open.is_none() {
173 self.open = Some(price);
174 self.high = Some(price);
175 self.low = Some(price);
176 self.initialized = true;
177 } else {
178 if price > self.high.unwrap() {
179 self.high = Some(price);
180 }
181 if price < self.low.unwrap() {
182 self.low = Some(price);
183 }
184 }
185
186 self.close = Some(price);
187 self.volume = self.volume.add(size);
188 self.count += 1;
189 self.ts_last = ts_init;
190 }
191
192 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
198 if ts_init < self.ts_last {
199 return; }
201
202 if self.open.is_none() {
203 self.open = Some(bar.open);
204 self.high = Some(bar.high);
205 self.low = Some(bar.low);
206 self.initialized = true;
207 } else {
208 if bar.high > self.high.unwrap() {
209 self.high = Some(bar.high);
210 }
211 if bar.low < self.low.unwrap() {
212 self.low = Some(bar.low);
213 }
214 }
215
216 self.close = Some(bar.close);
217 self.volume = self.volume.add(volume);
218 self.count += 1;
219 self.ts_last = ts_init;
220 }
221
222 pub fn reset(&mut self) {
226 self.open = None;
227 self.high = None;
228 self.low = None;
229 self.volume = Quantity::zero(self.size_precision);
230 self.count = 0;
231 }
232
233 pub fn build_now(&mut self) -> Bar {
235 self.build(self.ts_last, self.ts_last)
236 }
237
238 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
244 if self.open.is_none() {
245 self.open = self.last_close;
246 self.high = self.last_close;
247 self.low = self.last_close;
248 self.close = self.last_close;
249 }
250
251 if let (Some(close), Some(low)) = (self.close, self.low)
252 && close < low
253 {
254 self.low = Some(close);
255 }
256
257 if let (Some(close), Some(high)) = (self.close, self.high)
258 && close > high
259 {
260 self.high = Some(close);
261 }
262
263 let bar = Bar::new(
265 self.bar_type,
266 self.open.unwrap(),
267 self.high.unwrap(),
268 self.low.unwrap(),
269 self.close.unwrap(),
270 self.volume,
271 ts_event,
272 ts_init,
273 );
274
275 self.last_close = self.close;
276 self.reset();
277 bar
278 }
279}
280
281pub struct BarAggregatorCore {
283 bar_type: BarType,
284 builder: BarBuilder,
285 handler: BarHandler,
286 is_running: bool,
287}
288
289impl Debug for BarAggregatorCore {
290 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
291 f.debug_struct(stringify!(BarAggregatorCore))
292 .field("bar_type", &self.bar_type)
293 .field("builder", &self.builder)
294 .field("is_running", &self.is_running)
295 .finish()
296 }
297}
298
299impl BarAggregatorCore {
300 pub fn new<H: FnMut(Bar) + 'static>(
308 bar_type: BarType,
309 price_precision: u8,
310 size_precision: u8,
311 handler: H,
312 ) -> Self {
313 Self {
314 bar_type,
315 builder: BarBuilder::new(bar_type, price_precision, size_precision),
316 handler: Box::new(handler),
317 is_running: false,
318 }
319 }
320
321 pub const fn set_is_running(&mut self, value: bool) {
323 self.is_running = value;
324 }
325 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
326 self.builder.update(price, size, ts_init);
327 }
328
329 fn build_now_and_send(&mut self) {
330 let bar = self.builder.build_now();
331 (self.handler)(bar);
332 }
333
334 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
335 let bar = self.builder.build(ts_event, ts_init);
336 (self.handler)(bar);
337 }
338}
339
340pub struct TickBarAggregator {
345 core: BarAggregatorCore,
346}
347
348impl Debug for TickBarAggregator {
349 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 f.debug_struct(stringify!(TickBarAggregator))
351 .field("core", &self.core)
352 .finish()
353 }
354}
355
356impl TickBarAggregator {
357 pub fn new<H: FnMut(Bar) + 'static>(
365 bar_type: BarType,
366 price_precision: u8,
367 size_precision: u8,
368 handler: H,
369 ) -> Self {
370 Self {
371 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
372 }
373 }
374}
375
376impl BarAggregator for TickBarAggregator {
377 fn bar_type(&self) -> BarType {
378 self.core.bar_type
379 }
380
381 fn is_running(&self) -> bool {
382 self.core.is_running
383 }
384
385 fn set_is_running(&mut self, value: bool) {
386 self.core.set_is_running(value);
387 }
388
389 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
391 self.core.apply_update(price, size, ts_init);
392 let spec = self.core.bar_type.spec();
393
394 if self.core.builder.count >= spec.step.get() {
395 self.core.build_now_and_send();
396 }
397 }
398
399 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
400 self.core.builder.update_bar(bar, volume, ts_init);
401 let spec = self.core.bar_type.spec();
402
403 if self.core.builder.count >= spec.step.get() {
404 self.core.build_now_and_send();
405 }
406 }
407}
408
409pub struct TickImbalanceBarAggregator {
414 core: BarAggregatorCore,
415 imbalance: isize,
416}
417
418impl Debug for TickImbalanceBarAggregator {
419 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420 f.debug_struct(stringify!(TickImbalanceBarAggregator))
421 .field("core", &self.core)
422 .field("imbalance", &self.imbalance)
423 .finish()
424 }
425}
426
427impl TickImbalanceBarAggregator {
428 pub fn new<H: FnMut(Bar) + 'static>(
436 bar_type: BarType,
437 price_precision: u8,
438 size_precision: u8,
439 handler: H,
440 ) -> Self {
441 Self {
442 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
443 imbalance: 0,
444 }
445 }
446}
447
448impl BarAggregator for TickImbalanceBarAggregator {
449 fn bar_type(&self) -> BarType {
450 self.core.bar_type
451 }
452
453 fn is_running(&self) -> bool {
454 self.core.is_running
455 }
456
457 fn set_is_running(&mut self, value: bool) {
458 self.core.set_is_running(value);
459 }
460
461 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
466 self.core.apply_update(price, size, ts_init);
467 }
468
469 fn handle_trade(&mut self, trade: TradeTick) {
470 self.core
471 .apply_update(trade.price, trade.size, trade.ts_init);
472
473 let delta = match trade.aggressor_side {
474 AggressorSide::Buyer => 1,
475 AggressorSide::Seller => -1,
476 AggressorSide::NoAggressor => 0,
477 };
478
479 if delta == 0 {
480 return;
481 }
482
483 self.imbalance += delta;
484 let threshold = self.core.bar_type.spec().step.get();
485 if self.imbalance.unsigned_abs() >= threshold {
486 self.core.build_now_and_send();
487 self.imbalance = 0;
488 }
489 }
490
491 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
492 self.core.builder.update_bar(bar, volume, ts_init);
493 }
494}
495
496pub struct TickRunsBarAggregator {
498 core: BarAggregatorCore,
499 current_run_side: Option<AggressorSide>,
500 run_count: usize,
501}
502
503impl Debug for TickRunsBarAggregator {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 f.debug_struct(stringify!(TickRunsBarAggregator))
506 .field("core", &self.core)
507 .field("current_run_side", &self.current_run_side)
508 .field("run_count", &self.run_count)
509 .finish()
510 }
511}
512
513impl TickRunsBarAggregator {
514 pub fn new<H: FnMut(Bar) + 'static>(
522 bar_type: BarType,
523 price_precision: u8,
524 size_precision: u8,
525 handler: H,
526 ) -> Self {
527 Self {
528 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
529 current_run_side: None,
530 run_count: 0,
531 }
532 }
533}
534
535impl BarAggregator for TickRunsBarAggregator {
536 fn bar_type(&self) -> BarType {
537 self.core.bar_type
538 }
539
540 fn is_running(&self) -> bool {
541 self.core.is_running
542 }
543
544 fn set_is_running(&mut self, value: bool) {
545 self.core.set_is_running(value);
546 }
547
548 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
553 self.core.apply_update(price, size, ts_init);
554 }
555
556 fn handle_trade(&mut self, trade: TradeTick) {
557 let side = match trade.aggressor_side {
558 AggressorSide::Buyer => Some(AggressorSide::Buyer),
559 AggressorSide::Seller => Some(AggressorSide::Seller),
560 AggressorSide::NoAggressor => None,
561 };
562
563 if let Some(side) = side {
564 if self.current_run_side != Some(side) {
565 self.current_run_side = Some(side);
566 self.run_count = 0;
567 self.core.builder.reset();
568 }
569
570 self.core
571 .apply_update(trade.price, trade.size, trade.ts_init);
572 self.run_count += 1;
573
574 let threshold = self.core.bar_type.spec().step.get();
575 if self.run_count >= threshold {
576 self.core.build_now_and_send();
577 self.run_count = 0;
578 self.current_run_side = None;
579 }
580 } else {
581 self.core
582 .apply_update(trade.price, trade.size, trade.ts_init);
583 }
584 }
585
586 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
587 self.core.builder.update_bar(bar, volume, ts_init);
588 }
589}
590
591pub struct VolumeBarAggregator {
593 core: BarAggregatorCore,
594}
595
596impl Debug for VolumeBarAggregator {
597 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
598 f.debug_struct(stringify!(VolumeBarAggregator))
599 .field("core", &self.core)
600 .finish()
601 }
602}
603
604impl VolumeBarAggregator {
605 pub fn new<H: FnMut(Bar) + 'static>(
613 bar_type: BarType,
614 price_precision: u8,
615 size_precision: u8,
616 handler: H,
617 ) -> Self {
618 Self {
619 core: BarAggregatorCore::new(
620 bar_type.standard(),
621 price_precision,
622 size_precision,
623 handler,
624 ),
625 }
626 }
627}
628
629impl BarAggregator for VolumeBarAggregator {
630 fn bar_type(&self) -> BarType {
631 self.core.bar_type
632 }
633
634 fn is_running(&self) -> bool {
635 self.core.is_running
636 }
637
638 fn set_is_running(&mut self, value: bool) {
639 self.core.set_is_running(value);
640 }
641
642 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
644 let mut raw_size_update = size.raw;
645 let spec = self.core.bar_type.spec();
646 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
647
648 while raw_size_update > 0 {
649 if self.core.builder.volume.raw + raw_size_update < raw_step {
650 self.core.apply_update(
651 price,
652 Quantity::from_raw(raw_size_update, size.precision),
653 ts_init,
654 );
655 break;
656 }
657
658 let raw_size_diff = raw_step - self.core.builder.volume.raw;
659 self.core.apply_update(
660 price,
661 Quantity::from_raw(raw_size_diff, size.precision),
662 ts_init,
663 );
664
665 self.core.build_now_and_send();
666 raw_size_update -= raw_size_diff;
667 }
668 }
669
670 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
671 let mut raw_volume_update = volume.raw;
672 let spec = self.core.bar_type.spec();
673 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
674
675 while raw_volume_update > 0 {
676 if self.core.builder.volume.raw + raw_volume_update < raw_step {
677 self.core.builder.update_bar(
678 bar,
679 Quantity::from_raw(raw_volume_update, volume.precision),
680 ts_init,
681 );
682 break;
683 }
684
685 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
686 self.core.builder.update_bar(
687 bar,
688 Quantity::from_raw(raw_volume_diff, volume.precision),
689 ts_init,
690 );
691
692 self.core.build_now_and_send();
693 raw_volume_update -= raw_volume_diff;
694 }
695 }
696}
697
698pub struct VolumeImbalanceBarAggregator {
700 core: BarAggregatorCore,
701 imbalance_raw: i128,
702 raw_step: i128,
703}
704
705impl Debug for VolumeImbalanceBarAggregator {
706 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
707 f.debug_struct(stringify!(VolumeImbalanceBarAggregator))
708 .field("core", &self.core)
709 .field("imbalance_raw", &self.imbalance_raw)
710 .field("raw_step", &self.raw_step)
711 .finish()
712 }
713}
714
715impl VolumeImbalanceBarAggregator {
716 pub fn new<H: FnMut(Bar) + 'static>(
724 bar_type: BarType,
725 price_precision: u8,
726 size_precision: u8,
727 handler: H,
728 ) -> Self {
729 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as i128;
730 Self {
731 core: BarAggregatorCore::new(
732 bar_type.standard(),
733 price_precision,
734 size_precision,
735 handler,
736 ),
737 imbalance_raw: 0,
738 raw_step,
739 }
740 }
741}
742
743impl BarAggregator for VolumeImbalanceBarAggregator {
744 fn bar_type(&self) -> BarType {
745 self.core.bar_type
746 }
747
748 fn is_running(&self) -> bool {
749 self.core.is_running
750 }
751
752 fn set_is_running(&mut self, value: bool) {
753 self.core.set_is_running(value);
754 }
755
756 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
761 self.core.apply_update(price, size, ts_init);
762 }
763
764 fn handle_trade(&mut self, trade: TradeTick) {
765 let side = match trade.aggressor_side {
766 AggressorSide::Buyer => 1,
767 AggressorSide::Seller => -1,
768 AggressorSide::NoAggressor => {
769 self.core
770 .apply_update(trade.price, trade.size, trade.ts_init);
771 return;
772 }
773 };
774
775 let mut raw_remaining = trade.size.raw as i128;
776 while raw_remaining > 0 {
777 let imbalance_abs = self.imbalance_raw.abs();
778 let needed = (self.raw_step - imbalance_abs).max(1);
779 let raw_chunk = raw_remaining.min(needed);
780 let qty_chunk = Quantity::from_raw(raw_chunk as QuantityRaw, trade.size.precision);
781
782 self.core
783 .apply_update(trade.price, qty_chunk, trade.ts_init);
784
785 self.imbalance_raw += side * raw_chunk;
786 raw_remaining -= raw_chunk;
787
788 if self.imbalance_raw.abs() >= self.raw_step {
789 self.core.build_now_and_send();
790 self.imbalance_raw = 0;
791 }
792 }
793 }
794
795 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
796 self.core.builder.update_bar(bar, volume, ts_init);
797 }
798}
799
800pub struct VolumeRunsBarAggregator {
802 core: BarAggregatorCore,
803 current_run_side: Option<AggressorSide>,
804 run_volume_raw: QuantityRaw,
805 raw_step: QuantityRaw,
806}
807
808impl Debug for VolumeRunsBarAggregator {
809 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
810 f.debug_struct(stringify!(VolumeRunsBarAggregator))
811 .field("core", &self.core)
812 .field("current_run_side", &self.current_run_side)
813 .field("run_volume_raw", &self.run_volume_raw)
814 .field("raw_step", &self.raw_step)
815 .finish()
816 }
817}
818
819impl VolumeRunsBarAggregator {
820 pub fn new<H: FnMut(Bar) + 'static>(
828 bar_type: BarType,
829 price_precision: u8,
830 size_precision: u8,
831 handler: H,
832 ) -> Self {
833 let raw_step = (bar_type.spec().step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
834 Self {
835 core: BarAggregatorCore::new(
836 bar_type.standard(),
837 price_precision,
838 size_precision,
839 handler,
840 ),
841 current_run_side: None,
842 run_volume_raw: 0,
843 raw_step,
844 }
845 }
846}
847
848impl BarAggregator for VolumeRunsBarAggregator {
849 fn bar_type(&self) -> BarType {
850 self.core.bar_type
851 }
852
853 fn is_running(&self) -> bool {
854 self.core.is_running
855 }
856
857 fn set_is_running(&mut self, value: bool) {
858 self.core.set_is_running(value);
859 }
860
861 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
866 self.core.apply_update(price, size, ts_init);
867 }
868
869 fn handle_trade(&mut self, trade: TradeTick) {
870 let side = match trade.aggressor_side {
871 AggressorSide::Buyer => Some(AggressorSide::Buyer),
872 AggressorSide::Seller => Some(AggressorSide::Seller),
873 AggressorSide::NoAggressor => None,
874 };
875
876 let Some(side) = side else {
877 self.core
878 .apply_update(trade.price, trade.size, trade.ts_init);
879 return;
880 };
881
882 if self.current_run_side != Some(side) {
883 self.current_run_side = Some(side);
884 self.run_volume_raw = 0;
885 self.core.builder.reset();
886 }
887
888 let mut raw_remaining = trade.size.raw;
889 while raw_remaining > 0 {
890 let needed = self.raw_step.saturating_sub(self.run_volume_raw).max(1);
891 let raw_chunk = raw_remaining.min(needed);
892
893 self.core.apply_update(
894 trade.price,
895 Quantity::from_raw(raw_chunk, trade.size.precision),
896 trade.ts_init,
897 );
898
899 self.run_volume_raw += raw_chunk;
900 raw_remaining -= raw_chunk;
901
902 if self.run_volume_raw >= self.raw_step {
903 self.core.build_now_and_send();
904 self.run_volume_raw = 0;
905 self.current_run_side = None;
906 }
907 }
908 }
909
910 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
911 self.core.builder.update_bar(bar, volume, ts_init);
912 }
913}
914
915pub struct ValueBarAggregator {
920 core: BarAggregatorCore,
921 cum_value: f64,
922}
923
924impl Debug for ValueBarAggregator {
925 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926 f.debug_struct(stringify!(ValueBarAggregator))
927 .field("core", &self.core)
928 .field("cum_value", &self.cum_value)
929 .finish()
930 }
931}
932
933impl ValueBarAggregator {
934 pub fn new<H: FnMut(Bar) + 'static>(
942 bar_type: BarType,
943 price_precision: u8,
944 size_precision: u8,
945 handler: H,
946 ) -> Self {
947 Self {
948 core: BarAggregatorCore::new(
949 bar_type.standard(),
950 price_precision,
951 size_precision,
952 handler,
953 ),
954 cum_value: 0.0,
955 }
956 }
957
958 #[must_use]
959 pub const fn get_cumulative_value(&self) -> f64 {
961 self.cum_value
962 }
963}
964
965impl BarAggregator for ValueBarAggregator {
966 fn bar_type(&self) -> BarType {
967 self.core.bar_type
968 }
969
970 fn is_running(&self) -> bool {
971 self.core.is_running
972 }
973
974 fn set_is_running(&mut self, value: bool) {
975 self.core.set_is_running(value);
976 }
977
978 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
980 let mut size_update = size.as_f64();
981 let spec = self.core.bar_type.spec();
982
983 while size_update > 0.0 {
984 let value_update = price.as_f64() * size_update;
985 if value_update == 0.0 {
986 self.core
988 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
989 break;
990 }
991
992 if self.cum_value + value_update < spec.step.get() as f64 {
993 self.cum_value += value_update;
994 self.core
995 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
996 break;
997 }
998
999 let value_diff = spec.step.get() as f64 - self.cum_value;
1000 let size_diff = size_update * (value_diff / value_update);
1001 self.core
1002 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
1003
1004 self.core.build_now_and_send();
1005 self.cum_value = 0.0;
1006 size_update -= size_diff;
1007 }
1008 }
1009
1010 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1011 let mut volume_update = volume;
1012 let average_price = Price::new(
1013 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
1014 self.core.builder.price_precision,
1015 );
1016
1017 while volume_update.as_f64() > 0.0 {
1018 let value_update = average_price.as_f64() * volume_update.as_f64();
1019 if value_update == 0.0 {
1020 self.core.builder.update_bar(bar, volume_update, ts_init);
1022 break;
1023 }
1024
1025 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
1026 self.cum_value += value_update;
1027 self.core.builder.update_bar(bar, volume_update, ts_init);
1028 break;
1029 }
1030
1031 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
1032 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
1033 self.core.builder.update_bar(
1034 bar,
1035 Quantity::new(volume_diff, volume_update.precision),
1036 ts_init,
1037 );
1038
1039 self.core.build_now_and_send();
1040 self.cum_value = 0.0;
1041 volume_update = Quantity::new(
1042 volume_update.as_f64() - volume_diff,
1043 volume_update.precision,
1044 );
1045 }
1046 }
1047}
1048
1049pub struct ValueImbalanceBarAggregator {
1051 core: BarAggregatorCore,
1052 imbalance_value: f64,
1053 step_value: f64,
1054}
1055
1056impl Debug for ValueImbalanceBarAggregator {
1057 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1058 f.debug_struct(stringify!(ValueImbalanceBarAggregator))
1059 .field("core", &self.core)
1060 .field("imbalance_value", &self.imbalance_value)
1061 .field("step_value", &self.step_value)
1062 .finish()
1063 }
1064}
1065
1066impl ValueImbalanceBarAggregator {
1067 pub fn new<H: FnMut(Bar) + 'static>(
1075 bar_type: BarType,
1076 price_precision: u8,
1077 size_precision: u8,
1078 handler: H,
1079 ) -> Self {
1080 Self {
1081 core: BarAggregatorCore::new(
1082 bar_type.standard(),
1083 price_precision,
1084 size_precision,
1085 handler,
1086 ),
1087 imbalance_value: 0.0,
1088 step_value: bar_type.spec().step.get() as f64,
1089 }
1090 }
1091}
1092
1093impl BarAggregator for ValueImbalanceBarAggregator {
1094 fn bar_type(&self) -> BarType {
1095 self.core.bar_type
1096 }
1097
1098 fn is_running(&self) -> bool {
1099 self.core.is_running
1100 }
1101
1102 fn set_is_running(&mut self, value: bool) {
1103 self.core.set_is_running(value);
1104 }
1105
1106 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1111 self.core.apply_update(price, size, ts_init);
1112 }
1113
1114 fn handle_trade(&mut self, trade: TradeTick) {
1115 let price_f64 = trade.price.as_f64();
1116 if price_f64 == 0.0 {
1117 self.core
1118 .apply_update(trade.price, trade.size, trade.ts_init);
1119 return;
1120 }
1121
1122 let side_sign = match trade.aggressor_side {
1123 AggressorSide::Buyer => 1.0,
1124 AggressorSide::Seller => -1.0,
1125 AggressorSide::NoAggressor => {
1126 self.core
1127 .apply_update(trade.price, trade.size, trade.ts_init);
1128 return;
1129 }
1130 };
1131
1132 let mut size_remaining = trade.size.as_f64();
1133 while size_remaining > 0.0 {
1134 let value_remaining = price_f64 * size_remaining;
1135 if self.imbalance_value == 0.0 || self.imbalance_value.signum() == side_sign {
1136 let needed = self.step_value - self.imbalance_value.abs();
1137 if value_remaining <= needed {
1138 self.imbalance_value += side_sign * value_remaining;
1139 self.core.apply_update(
1140 trade.price,
1141 Quantity::new(size_remaining, trade.size.precision),
1142 trade.ts_init,
1143 );
1144
1145 if self.imbalance_value.abs() >= self.step_value {
1146 self.core.build_now_and_send();
1147 self.imbalance_value = 0.0;
1148 }
1149 break;
1150 }
1151
1152 let value_chunk = needed;
1153 let size_chunk = value_chunk / price_f64;
1154 self.core.apply_update(
1155 trade.price,
1156 Quantity::new(size_chunk, trade.size.precision),
1157 trade.ts_init,
1158 );
1159 self.imbalance_value += side_sign * value_chunk;
1160 size_remaining -= size_chunk;
1161
1162 if self.imbalance_value.abs() >= self.step_value {
1163 self.core.build_now_and_send();
1164 self.imbalance_value = 0.0;
1165 }
1166 } else {
1167 let value_to_flatten = self.imbalance_value.abs().min(value_remaining);
1169 let size_chunk = value_to_flatten / price_f64;
1170 self.core.apply_update(
1171 trade.price,
1172 Quantity::new(size_chunk, trade.size.precision),
1173 trade.ts_init,
1174 );
1175 self.imbalance_value += side_sign * value_to_flatten;
1176 size_remaining -= size_chunk;
1177 }
1178 }
1179 }
1180
1181 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1182 self.core.builder.update_bar(bar, volume, ts_init);
1183 }
1184}
1185
1186pub struct ValueRunsBarAggregator {
1188 core: BarAggregatorCore,
1189 current_run_side: Option<AggressorSide>,
1190 run_value: f64,
1191 step_value: f64,
1192}
1193
1194impl Debug for ValueRunsBarAggregator {
1195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1196 f.debug_struct(stringify!(ValueRunsBarAggregator))
1197 .field("core", &self.core)
1198 .field("current_run_side", &self.current_run_side)
1199 .field("run_value", &self.run_value)
1200 .field("step_value", &self.step_value)
1201 .finish()
1202 }
1203}
1204
1205impl ValueRunsBarAggregator {
1206 pub fn new<H: FnMut(Bar) + 'static>(
1214 bar_type: BarType,
1215 price_precision: u8,
1216 size_precision: u8,
1217 handler: H,
1218 ) -> Self {
1219 Self {
1220 core: BarAggregatorCore::new(
1221 bar_type.standard(),
1222 price_precision,
1223 size_precision,
1224 handler,
1225 ),
1226 current_run_side: None,
1227 run_value: 0.0,
1228 step_value: bar_type.spec().step.get() as f64,
1229 }
1230 }
1231}
1232
1233impl BarAggregator for ValueRunsBarAggregator {
1234 fn bar_type(&self) -> BarType {
1235 self.core.bar_type
1236 }
1237
1238 fn is_running(&self) -> bool {
1239 self.core.is_running
1240 }
1241
1242 fn set_is_running(&mut self, value: bool) {
1243 self.core.set_is_running(value);
1244 }
1245
1246 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1251 self.core.apply_update(price, size, ts_init);
1252 }
1253
1254 fn handle_trade(&mut self, trade: TradeTick) {
1255 let price_f64 = trade.price.as_f64();
1256 if price_f64 == 0.0 {
1257 self.core
1258 .apply_update(trade.price, trade.size, trade.ts_init);
1259 return;
1260 }
1261
1262 let side = match trade.aggressor_side {
1263 AggressorSide::Buyer => Some(AggressorSide::Buyer),
1264 AggressorSide::Seller => Some(AggressorSide::Seller),
1265 AggressorSide::NoAggressor => None,
1266 };
1267
1268 let Some(side) = side else {
1269 self.core
1270 .apply_update(trade.price, trade.size, trade.ts_init);
1271 return;
1272 };
1273
1274 if self.current_run_side != Some(side) {
1275 self.current_run_side = Some(side);
1276 self.run_value = 0.0;
1277 self.core.builder.reset();
1278 }
1279
1280 let mut size_remaining = trade.size.as_f64();
1281 while size_remaining > 0.0 {
1282 let value_update = price_f64 * size_remaining;
1283 if self.run_value + value_update < self.step_value {
1284 self.run_value += value_update;
1285 self.core.apply_update(
1286 trade.price,
1287 Quantity::new(size_remaining, trade.size.precision),
1288 trade.ts_init,
1289 );
1290 break;
1291 }
1292
1293 let value_needed = self.step_value - self.run_value;
1294 let size_chunk = value_needed / price_f64;
1295 self.core.apply_update(
1296 trade.price,
1297 Quantity::new(size_chunk, trade.size.precision),
1298 trade.ts_init,
1299 );
1300
1301 self.core.build_now_and_send();
1302 self.run_value = 0.0;
1303 self.current_run_side = None;
1304 size_remaining -= size_chunk;
1305 }
1306 }
1307
1308 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1309 self.core.builder.update_bar(bar, volume, ts_init);
1310 }
1311}
1312
1313pub struct RenkoBarAggregator {
1319 core: BarAggregatorCore,
1320 pub brick_size: PriceRaw,
1321 last_close: Option<Price>,
1322}
1323
1324impl Debug for RenkoBarAggregator {
1325 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1326 f.debug_struct(stringify!(RenkoBarAggregator))
1327 .field("core", &self.core)
1328 .field("brick_size", &self.brick_size)
1329 .field("last_close", &self.last_close)
1330 .finish()
1331 }
1332}
1333
1334impl RenkoBarAggregator {
1335 pub fn new<H: FnMut(Bar) + 'static>(
1343 bar_type: BarType,
1344 price_precision: u8,
1345 size_precision: u8,
1346 price_increment: Price,
1347 handler: H,
1348 ) -> Self {
1349 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
1351
1352 Self {
1353 core: BarAggregatorCore::new(
1354 bar_type.standard(),
1355 price_precision,
1356 size_precision,
1357 handler,
1358 ),
1359 brick_size,
1360 last_close: None,
1361 }
1362 }
1363}
1364
1365impl BarAggregator for RenkoBarAggregator {
1366 fn bar_type(&self) -> BarType {
1367 self.core.bar_type
1368 }
1369
1370 fn is_running(&self) -> bool {
1371 self.core.is_running
1372 }
1373
1374 fn set_is_running(&mut self, value: bool) {
1375 self.core.set_is_running(value);
1376 }
1377
1378 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1383 self.core.apply_update(price, size, ts_init);
1385
1386 if self.last_close.is_none() {
1388 self.last_close = Some(price);
1389 return;
1390 }
1391
1392 let last_close = self.last_close.unwrap();
1393
1394 let current_raw = price.raw;
1396 let last_close_raw = last_close.raw;
1397 let price_diff_raw = current_raw - last_close_raw;
1398 let abs_price_diff_raw = price_diff_raw.abs();
1399
1400 if abs_price_diff_raw >= self.brick_size {
1402 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1403 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1404 let mut current_close = last_close;
1405
1406 let total_volume = self.core.builder.volume;
1408
1409 for _i in 0..num_bricks {
1410 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1412 let brick_close = Price::from_raw(brick_close_raw, price.precision);
1413
1414 let (brick_high, brick_low) = if direction > 0.0 {
1416 (brick_close, current_close)
1417 } else {
1418 (current_close, brick_close)
1419 };
1420
1421 self.core.builder.reset();
1423 self.core.builder.open = Some(current_close);
1424 self.core.builder.high = Some(brick_high);
1425 self.core.builder.low = Some(brick_low);
1426 self.core.builder.close = Some(brick_close);
1427 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1429 self.core.builder.ts_last = ts_init;
1430 self.core.builder.initialized = true;
1431
1432 self.core.build_and_send(ts_init, ts_init);
1434
1435 current_close = brick_close;
1437 self.last_close = Some(brick_close);
1438 }
1439 }
1440 }
1441
1442 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1443 self.core.builder.update_bar(bar, volume, ts_init);
1445
1446 if self.last_close.is_none() {
1448 self.last_close = Some(bar.close);
1449 return;
1450 }
1451
1452 let last_close = self.last_close.unwrap();
1453
1454 let current_raw = bar.close.raw;
1456 let last_close_raw = last_close.raw;
1457 let price_diff_raw = current_raw - last_close_raw;
1458 let abs_price_diff_raw = price_diff_raw.abs();
1459
1460 if abs_price_diff_raw >= self.brick_size {
1462 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
1463 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
1464 let mut current_close = last_close;
1465
1466 let total_volume = self.core.builder.volume;
1468
1469 for _i in 0..num_bricks {
1470 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
1472 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
1473
1474 let (brick_high, brick_low) = if direction > 0.0 {
1476 (brick_close, current_close)
1477 } else {
1478 (current_close, brick_close)
1479 };
1480
1481 self.core.builder.reset();
1483 self.core.builder.open = Some(current_close);
1484 self.core.builder.high = Some(brick_high);
1485 self.core.builder.low = Some(brick_low);
1486 self.core.builder.close = Some(brick_close);
1487 self.core.builder.volume = total_volume; self.core.builder.count = 1;
1489 self.core.builder.ts_last = ts_init;
1490 self.core.builder.initialized = true;
1491
1492 self.core.build_and_send(ts_init, ts_init);
1494
1495 current_close = brick_close;
1497 self.last_close = Some(brick_close);
1498 }
1499 }
1500 }
1501}
1502
1503pub struct TimeBarAggregator {
1507 core: BarAggregatorCore,
1508 clock: Rc<RefCell<dyn Clock>>,
1509 build_with_no_updates: bool,
1510 timestamp_on_close: bool,
1511 is_left_open: bool,
1512 stored_open_ns: UnixNanos,
1513 timer_name: String,
1514 interval_ns: UnixNanos,
1515 next_close_ns: UnixNanos,
1516 bar_build_delay: u64,
1517 time_bars_origin_offset: Option<TimeDelta>,
1518 skip_first_non_full_bar: bool,
1519 pub historical_mode: bool,
1520 historical_events: Vec<TimeEvent>,
1521 aggregator_weak: Option<Weak<RefCell<Box<dyn BarAggregator>>>>,
1522}
1523
1524impl Debug for TimeBarAggregator {
1525 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1526 f.debug_struct(stringify!(TimeBarAggregator))
1527 .field("core", &self.core)
1528 .field("build_with_no_updates", &self.build_with_no_updates)
1529 .field("timestamp_on_close", &self.timestamp_on_close)
1530 .field("is_left_open", &self.is_left_open)
1531 .field("timer_name", &self.timer_name)
1532 .field("interval_ns", &self.interval_ns)
1533 .field("bar_build_delay", &self.bar_build_delay)
1534 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
1535 .finish()
1536 }
1537}
1538
1539impl TimeBarAggregator {
1540 #[allow(clippy::too_many_arguments)]
1548 pub fn new<H: FnMut(Bar) + 'static>(
1549 bar_type: BarType,
1550 price_precision: u8,
1551 size_precision: u8,
1552 clock: Rc<RefCell<dyn Clock>>,
1553 handler: H,
1554 build_with_no_updates: bool,
1555 timestamp_on_close: bool,
1556 interval_type: BarIntervalType,
1557 time_bars_origin_offset: Option<TimeDelta>,
1558 bar_build_delay: u64,
1559 skip_first_non_full_bar: bool,
1560 ) -> Self {
1561 let is_left_open = match interval_type {
1562 BarIntervalType::LeftOpen => true,
1563 BarIntervalType::RightOpen => false,
1564 };
1565
1566 let core = BarAggregatorCore::new(
1567 bar_type.standard(),
1568 price_precision,
1569 size_precision,
1570 handler,
1571 );
1572
1573 Self {
1574 core,
1575 clock,
1576 build_with_no_updates,
1577 timestamp_on_close,
1578 is_left_open,
1579 stored_open_ns: UnixNanos::default(),
1580 timer_name: bar_type.to_string(),
1581 interval_ns: get_bar_interval_ns(&bar_type),
1582 next_close_ns: UnixNanos::default(),
1583 bar_build_delay,
1584 time_bars_origin_offset,
1585 skip_first_non_full_bar,
1586 historical_mode: false,
1587 historical_events: Vec::new(),
1588 aggregator_weak: None,
1589 }
1590 }
1591
1592 pub fn set_clock_internal(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1594 self.clock = clock;
1595 }
1596
1597 pub fn start_timer_internal(
1606 &mut self,
1607 aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>,
1608 ) {
1609 let aggregator_weak = if let Some(rc) = aggregator_rc {
1611 let weak = Rc::downgrade(&rc);
1613 self.aggregator_weak = Some(weak.clone());
1614 weak
1615 } else {
1616 self.aggregator_weak
1618 .as_ref()
1619 .expect("Aggregator weak reference must be set before calling start_timer()")
1620 .clone()
1621 };
1622
1623 let callback = TimeEventCallback::RustLocal(Rc::new(move |event: TimeEvent| {
1624 if let Some(agg) = aggregator_weak.upgrade() {
1625 agg.borrow_mut().build_bar(event);
1626 }
1627 }));
1628
1629 let now = self.clock.borrow().utc_now();
1631 let mut start_time =
1632 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1633 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1634
1635 let fire_immediately = start_time == now;
1637
1638 self.skip_first_non_full_bar = self.skip_first_non_full_bar && now > start_time;
1639
1640 let spec = &self.bar_type().spec();
1641 let start_time_ns = UnixNanos::from(start_time);
1642
1643 if spec.aggregation != BarAggregation::Month && spec.aggregation != BarAggregation::Year {
1644 self.clock
1645 .borrow_mut()
1646 .set_timer_ns(
1647 &self.timer_name,
1648 self.interval_ns.as_u64(),
1649 Some(start_time_ns),
1650 None,
1651 Some(callback),
1652 Some(true), Some(fire_immediately),
1654 )
1655 .expect(FAILED);
1656
1657 if fire_immediately {
1658 self.next_close_ns = start_time_ns;
1659 } else {
1660 let interval_duration = Duration::nanoseconds(self.interval_ns.as_i64());
1661 self.next_close_ns = UnixNanos::from(start_time + interval_duration);
1662 }
1663
1664 self.stored_open_ns = self.next_close_ns.saturating_sub_ns(self.interval_ns);
1665 } else {
1666 let alert_time = if fire_immediately {
1668 start_time
1669 } else {
1670 let step = spec.step.get() as u32;
1671 if spec.aggregation == BarAggregation::Month {
1672 add_n_months(start_time, step).expect(FAILED)
1673 } else {
1674 add_n_years(start_time, step).expect(FAILED)
1676 }
1677 };
1678
1679 self.clock
1680 .borrow_mut()
1681 .set_time_alert_ns(
1682 &self.timer_name,
1683 UnixNanos::from(alert_time),
1684 Some(callback),
1685 Some(true), )
1687 .expect(FAILED);
1688
1689 self.next_close_ns = UnixNanos::from(alert_time);
1690 self.stored_open_ns = UnixNanos::from(start_time);
1691 }
1692
1693 log::debug!(
1694 "Started timer {}, start_time={:?}, historical_mode={}, fire_immediately={}, now={:?}, bar_build_delay={}",
1695 self.timer_name,
1696 start_time,
1697 self.historical_mode,
1698 fire_immediately,
1699 now,
1700 self.bar_build_delay
1701 );
1702 }
1703
1704 pub fn stop(&mut self) {
1706 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1707 }
1708
1709 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1710 if self.skip_first_non_full_bar {
1711 self.core.builder.reset();
1712 self.skip_first_non_full_bar = false;
1713 } else {
1714 self.core.build_and_send(ts_event, ts_init);
1715 }
1716 }
1717
1718 fn build_bar(&mut self, event: TimeEvent) {
1719 if !self.core.builder.initialized {
1720 return;
1721 }
1722
1723 if !self.build_with_no_updates && self.core.builder.count == 0 {
1724 return; }
1726
1727 let ts_init = event.ts_event;
1728 let ts_event = if self.is_left_open {
1729 if self.timestamp_on_close {
1730 event.ts_event
1731 } else {
1732 self.stored_open_ns
1733 }
1734 } else {
1735 self.stored_open_ns
1736 };
1737
1738 self.build_and_send(ts_event, ts_init);
1739
1740 self.stored_open_ns = event.ts_event;
1742
1743 if self.bar_type().spec().aggregation == BarAggregation::Month {
1744 let step = self.bar_type().spec().step.get() as u32;
1745 let alert_time_ns = add_n_months_nanos(event.ts_event, step).expect(FAILED);
1746
1747 self.clock
1748 .borrow_mut()
1749 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1750 .expect(FAILED);
1751
1752 self.next_close_ns = alert_time_ns;
1753 } else if self.bar_type().spec().aggregation == BarAggregation::Year {
1754 let step = self.bar_type().spec().step.get() as u32;
1755 let alert_time_ns = add_n_years_nanos(event.ts_event, step).expect(FAILED);
1756
1757 self.clock
1758 .borrow_mut()
1759 .set_time_alert_ns(&self.timer_name, alert_time_ns, None, None)
1760 .expect(FAILED);
1761
1762 self.next_close_ns = alert_time_ns;
1763 } else {
1764 self.next_close_ns = self
1766 .clock
1767 .borrow()
1768 .next_time_ns(&self.timer_name)
1769 .unwrap_or_default();
1770 }
1771 }
1772
1773 fn preprocess_historical_events(&mut self, ts_init: UnixNanos) {
1774 if self.clock.borrow().timestamp_ns() == UnixNanos::default() {
1775 {
1777 let mut clock_borrow = self.clock.borrow_mut();
1778 let test_clock = clock_borrow
1779 .as_any_mut()
1780 .downcast_mut::<TestClock>()
1781 .expect("Expected TestClock in historical mode");
1782 test_clock.set_time(ts_init);
1783 }
1784 self.start_timer_internal(None);
1786 }
1787
1788 {
1790 let mut clock_borrow = self.clock.borrow_mut();
1791 let test_clock = clock_borrow
1792 .as_any_mut()
1793 .downcast_mut::<TestClock>()
1794 .expect("Expected TestClock in historical mode");
1795 self.historical_events = test_clock.advance_time(ts_init, true);
1796 }
1797 }
1798
1799 fn postprocess_historical_events(&mut self, _ts_init: UnixNanos) {
1800 let events: Vec<TimeEvent> = self.historical_events.drain(..).collect();
1803 for event in events {
1804 self.build_bar(event);
1805 }
1806 }
1807
1808 pub fn set_historical_events_internal(&mut self, events: Vec<TimeEvent>) {
1810 self.historical_events = events;
1811 }
1812}
1813
1814impl BarAggregator for TimeBarAggregator {
1815 fn bar_type(&self) -> BarType {
1816 self.core.bar_type
1817 }
1818
1819 fn is_running(&self) -> bool {
1820 self.core.is_running
1821 }
1822
1823 fn set_is_running(&mut self, value: bool) {
1824 self.core.set_is_running(value);
1825 }
1826
1827 fn stop(&mut self) {
1829 Self::stop(self);
1830 }
1831
1832 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1833 if self.historical_mode {
1834 self.preprocess_historical_events(ts_init);
1835 }
1836
1837 self.core.apply_update(price, size, ts_init);
1838
1839 if self.historical_mode {
1840 self.postprocess_historical_events(ts_init);
1841 }
1842 }
1843
1844 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1845 if self.historical_mode {
1846 self.preprocess_historical_events(ts_init);
1847 }
1848
1849 self.core.builder.update_bar(bar, volume, ts_init);
1850
1851 if self.historical_mode {
1852 self.postprocess_historical_events(ts_init);
1853 }
1854 }
1855
1856 fn set_historical_mode(&mut self, historical_mode: bool, handler: Box<dyn FnMut(Bar)>) {
1857 self.historical_mode = historical_mode;
1858 self.core.handler = handler;
1859 }
1860
1861 fn set_historical_events(&mut self, events: Vec<TimeEvent>) {
1862 self.set_historical_events_internal(events);
1863 }
1864
1865 fn set_clock(&mut self, clock: Rc<RefCell<dyn Clock>>) {
1866 self.set_clock_internal(clock);
1867 }
1868
1869 fn build_bar(&mut self, event: TimeEvent) {
1870 {
1873 #[allow(clippy::use_self)]
1874 TimeBarAggregator::build_bar(self, event);
1875 }
1876 }
1877
1878 fn set_aggregator_weak(&mut self, weak: Weak<RefCell<Box<dyn BarAggregator>>>) {
1879 self.aggregator_weak = Some(weak);
1880 }
1881
1882 fn start_timer(&mut self, aggregator_rc: Option<Rc<RefCell<Box<dyn BarAggregator>>>>) {
1883 self.start_timer_internal(aggregator_rc);
1884 }
1885}
1886
1887#[cfg(test)]
1888mod tests {
1889 use std::sync::{Arc, Mutex};
1890
1891 use nautilus_common::clock::TestClock;
1892 use nautilus_core::{MUTEX_POISONED, UUID4};
1893 use nautilus_model::{
1894 data::{BarSpecification, BarType},
1895 enums::{AggregationSource, AggressorSide, BarAggregation, PriceType},
1896 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1897 types::{Price, Quantity},
1898 };
1899 use rstest::rstest;
1900 use ustr::Ustr;
1901
1902 use super::*;
1903
1904 #[rstest]
1905 fn test_bar_builder_initialization(equity_aapl: Equity) {
1906 let instrument = InstrumentAny::Equity(equity_aapl);
1907 let bar_type = BarType::new(
1908 instrument.id(),
1909 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1910 AggregationSource::Internal,
1911 );
1912 let builder = BarBuilder::new(
1913 bar_type,
1914 instrument.price_precision(),
1915 instrument.size_precision(),
1916 );
1917
1918 assert!(!builder.initialized);
1919 assert_eq!(builder.ts_last, 0);
1920 assert_eq!(builder.count, 0);
1921 }
1922
1923 #[rstest]
1924 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1925 let instrument = InstrumentAny::Equity(equity_aapl);
1926 let bar_type = BarType::new(
1927 instrument.id(),
1928 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1929 AggregationSource::Internal,
1930 );
1931 let mut builder = BarBuilder::new(
1932 bar_type,
1933 instrument.price_precision(),
1934 instrument.size_precision(),
1935 );
1936
1937 builder.update(
1938 Price::from("100.00"),
1939 Quantity::from(1),
1940 UnixNanos::from(1000),
1941 );
1942 builder.update(
1943 Price::from("95.00"),
1944 Quantity::from(1),
1945 UnixNanos::from(2000),
1946 );
1947 builder.update(
1948 Price::from("105.00"),
1949 Quantity::from(1),
1950 UnixNanos::from(3000),
1951 );
1952
1953 let bar = builder.build_now();
1954 assert!(bar.high > bar.low);
1955 assert_eq!(bar.open, Price::from("100.00"));
1956 assert_eq!(bar.high, Price::from("105.00"));
1957 assert_eq!(bar.low, Price::from("95.00"));
1958 assert_eq!(bar.close, Price::from("105.00"));
1959 }
1960
1961 #[rstest]
1962 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1963 let instrument = InstrumentAny::Equity(equity_aapl);
1964 let bar_type = BarType::new(
1965 instrument.id(),
1966 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1967 AggregationSource::Internal,
1968 );
1969 let mut builder = BarBuilder::new(
1970 bar_type,
1971 instrument.price_precision(),
1972 instrument.size_precision(),
1973 );
1974
1975 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1976 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1977
1978 assert_eq!(builder.ts_last, 1_000);
1979 assert_eq!(builder.count, 1);
1980 }
1981
1982 #[rstest]
1983 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1984 let instrument = InstrumentAny::Equity(equity_aapl);
1985 let bar_type = BarType::new(
1986 instrument.id(),
1987 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1988 AggregationSource::Internal,
1989 );
1990 let mut builder = BarBuilder::new(
1991 bar_type,
1992 instrument.price_precision(),
1993 instrument.size_precision(),
1994 );
1995
1996 builder.update(
1997 Price::from("1.00000"),
1998 Quantity::from(1),
1999 UnixNanos::default(),
2000 );
2001
2002 assert!(builder.initialized);
2003 assert_eq!(builder.ts_last, 0);
2004 assert_eq!(builder.count, 1);
2005 }
2006
2007 #[rstest]
2008 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
2009 equity_aapl: Equity,
2010 ) {
2011 let instrument = InstrumentAny::Equity(equity_aapl);
2012 let bar_type = BarType::new(
2013 instrument.id(),
2014 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2015 AggregationSource::Internal,
2016 );
2017 let mut builder = BarBuilder::new(bar_type, 2, 0);
2018
2019 builder.update(
2020 Price::from("1.00000"),
2021 Quantity::from(1),
2022 UnixNanos::from(1_000),
2023 );
2024 builder.update(
2025 Price::from("1.00001"),
2026 Quantity::from(1),
2027 UnixNanos::from(500),
2028 );
2029
2030 assert!(builder.initialized);
2031 assert_eq!(builder.ts_last, 1_000);
2032 assert_eq!(builder.count, 1);
2033 }
2034
2035 #[rstest]
2036 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
2037 let instrument = InstrumentAny::Equity(equity_aapl);
2038 let bar_type = BarType::new(
2039 instrument.id(),
2040 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2041 AggregationSource::Internal,
2042 );
2043 let mut builder = BarBuilder::new(
2044 bar_type,
2045 instrument.price_precision(),
2046 instrument.size_precision(),
2047 );
2048
2049 for _ in 0..5 {
2050 builder.update(
2051 Price::from("1.00000"),
2052 Quantity::from(1),
2053 UnixNanos::from(1_000),
2054 );
2055 }
2056
2057 assert_eq!(builder.count, 5);
2058 }
2059
2060 #[rstest]
2061 #[should_panic]
2062 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
2063 let instrument = InstrumentAny::Equity(equity_aapl);
2064 let bar_type = BarType::new(
2065 instrument.id(),
2066 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2067 AggregationSource::Internal,
2068 );
2069 let mut builder = BarBuilder::new(
2070 bar_type,
2071 instrument.price_precision(),
2072 instrument.size_precision(),
2073 );
2074 let _ = builder.build_now();
2075 }
2076
2077 #[rstest]
2078 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
2079 let instrument = InstrumentAny::Equity(equity_aapl);
2080 let bar_type = BarType::new(
2081 instrument.id(),
2082 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2083 AggregationSource::Internal,
2084 );
2085 let mut builder = BarBuilder::new(
2086 bar_type,
2087 instrument.price_precision(),
2088 instrument.size_precision(),
2089 );
2090
2091 builder.update(
2092 Price::from("1.00001"),
2093 Quantity::from(2),
2094 UnixNanos::default(),
2095 );
2096 builder.update(
2097 Price::from("1.00002"),
2098 Quantity::from(2),
2099 UnixNanos::default(),
2100 );
2101 builder.update(
2102 Price::from("1.00000"),
2103 Quantity::from(1),
2104 UnixNanos::from(1_000_000_000),
2105 );
2106
2107 let bar = builder.build_now();
2108
2109 assert_eq!(bar.open, Price::from("1.00001"));
2110 assert_eq!(bar.high, Price::from("1.00002"));
2111 assert_eq!(bar.low, Price::from("1.00000"));
2112 assert_eq!(bar.close, Price::from("1.00000"));
2113 assert_eq!(bar.volume, Quantity::from(5));
2114 assert_eq!(bar.ts_init, 1_000_000_000);
2115 assert_eq!(builder.ts_last, 1_000_000_000);
2116 assert_eq!(builder.count, 0);
2117 }
2118
2119 #[rstest]
2120 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
2121 let instrument = InstrumentAny::Equity(equity_aapl);
2122 let bar_type = BarType::new(
2123 instrument.id(),
2124 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
2125 AggregationSource::Internal,
2126 );
2127 let mut builder = BarBuilder::new(bar_type, 2, 0);
2128
2129 builder.update(
2130 Price::from("1.00001"),
2131 Quantity::from(1),
2132 UnixNanos::default(),
2133 );
2134 builder.build_now();
2135
2136 builder.update(
2137 Price::from("1.00000"),
2138 Quantity::from(1),
2139 UnixNanos::default(),
2140 );
2141 builder.update(
2142 Price::from("1.00003"),
2143 Quantity::from(1),
2144 UnixNanos::default(),
2145 );
2146 builder.update(
2147 Price::from("1.00002"),
2148 Quantity::from(1),
2149 UnixNanos::default(),
2150 );
2151
2152 let bar = builder.build_now();
2153
2154 assert_eq!(bar.open, Price::from("1.00000"));
2155 assert_eq!(bar.high, Price::from("1.00003"));
2156 assert_eq!(bar.low, Price::from("1.00000"));
2157 assert_eq!(bar.close, Price::from("1.00002"));
2158 assert_eq!(bar.volume, Quantity::from(3));
2159 }
2160
2161 #[rstest]
2162 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
2163 let instrument = InstrumentAny::Equity(equity_aapl);
2164 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2165 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2166 let handler = Arc::new(Mutex::new(Vec::new()));
2167 let handler_clone = Arc::clone(&handler);
2168
2169 let mut aggregator = TickBarAggregator::new(
2170 bar_type,
2171 instrument.price_precision(),
2172 instrument.size_precision(),
2173 move |bar: Bar| {
2174 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2175 handler_guard.push(bar);
2176 },
2177 );
2178
2179 let trade = TradeTick::default();
2180 aggregator.handle_trade(trade);
2181
2182 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2183 assert_eq!(handler_guard.len(), 0);
2184 }
2185
2186 #[rstest]
2187 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
2188 let instrument = InstrumentAny::Equity(equity_aapl);
2189 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2190 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2191 let handler = Arc::new(Mutex::new(Vec::new()));
2192 let handler_clone = Arc::clone(&handler);
2193
2194 let mut aggregator = TickBarAggregator::new(
2195 bar_type,
2196 instrument.price_precision(),
2197 instrument.size_precision(),
2198 move |bar: Bar| {
2199 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2200 handler_guard.push(bar);
2201 },
2202 );
2203
2204 let trade = TradeTick::default();
2205 aggregator.handle_trade(trade);
2206 aggregator.handle_trade(trade);
2207 aggregator.handle_trade(trade);
2208
2209 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2210 let bar = handler_guard.first().unwrap();
2211 assert_eq!(handler_guard.len(), 1);
2212 assert_eq!(bar.open, trade.price);
2213 assert_eq!(bar.high, trade.price);
2214 assert_eq!(bar.low, trade.price);
2215 assert_eq!(bar.close, trade.price);
2216 assert_eq!(bar.volume, Quantity::from(300000));
2217 assert_eq!(bar.ts_event, trade.ts_event);
2218 assert_eq!(bar.ts_init, trade.ts_init);
2219 }
2220
2221 #[rstest]
2222 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
2223 let instrument = InstrumentAny::Equity(equity_aapl);
2224 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
2225 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2226 let handler = Arc::new(Mutex::new(Vec::new()));
2227 let handler_clone = Arc::clone(&handler);
2228
2229 let mut aggregator = TickBarAggregator::new(
2230 bar_type,
2231 instrument.price_precision(),
2232 instrument.size_precision(),
2233 move |bar: Bar| {
2234 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2235 handler_guard.push(bar);
2236 },
2237 );
2238
2239 aggregator.update(
2240 Price::from("1.00001"),
2241 Quantity::from(1),
2242 UnixNanos::default(),
2243 );
2244 aggregator.update(
2245 Price::from("1.00002"),
2246 Quantity::from(1),
2247 UnixNanos::from(1000),
2248 );
2249 aggregator.update(
2250 Price::from("1.00003"),
2251 Quantity::from(1),
2252 UnixNanos::from(2000),
2253 );
2254
2255 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2256 assert_eq!(handler_guard.len(), 1);
2257
2258 let bar = handler_guard.first().unwrap();
2259 assert_eq!(bar.open, Price::from("1.00001"));
2260 assert_eq!(bar.high, Price::from("1.00003"));
2261 assert_eq!(bar.low, Price::from("1.00001"));
2262 assert_eq!(bar.close, Price::from("1.00003"));
2263 assert_eq!(bar.volume, Quantity::from(3));
2264 }
2265
2266 #[rstest]
2267 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
2268 let instrument = InstrumentAny::Equity(equity_aapl);
2269 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
2270 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2271 let handler = Arc::new(Mutex::new(Vec::new()));
2272 let handler_clone = Arc::clone(&handler);
2273
2274 let mut aggregator = TickBarAggregator::new(
2275 bar_type,
2276 instrument.price_precision(),
2277 instrument.size_precision(),
2278 move |bar: Bar| {
2279 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2280 handler_guard.push(bar);
2281 },
2282 );
2283
2284 aggregator.update(
2285 Price::from("1.00001"),
2286 Quantity::from(1),
2287 UnixNanos::default(),
2288 );
2289 aggregator.update(
2290 Price::from("1.00002"),
2291 Quantity::from(1),
2292 UnixNanos::from(1000),
2293 );
2294 aggregator.update(
2295 Price::from("1.00003"),
2296 Quantity::from(1),
2297 UnixNanos::from(2000),
2298 );
2299 aggregator.update(
2300 Price::from("1.00004"),
2301 Quantity::from(1),
2302 UnixNanos::from(3000),
2303 );
2304
2305 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2306 assert_eq!(handler_guard.len(), 2);
2307
2308 let bar1 = &handler_guard[0];
2309 assert_eq!(bar1.open, Price::from("1.00001"));
2310 assert_eq!(bar1.close, Price::from("1.00002"));
2311 assert_eq!(bar1.volume, Quantity::from(2));
2312
2313 let bar2 = &handler_guard[1];
2314 assert_eq!(bar2.open, Price::from("1.00003"));
2315 assert_eq!(bar2.close, Price::from("1.00004"));
2316 assert_eq!(bar2.volume, Quantity::from(2));
2317 }
2318
2319 #[rstest]
2320 fn test_tick_imbalance_bar_aggregator_emits_at_threshold(equity_aapl: Equity) {
2321 let instrument = InstrumentAny::Equity(equity_aapl);
2322 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
2323 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2324 let handler = Arc::new(Mutex::new(Vec::new()));
2325 let handler_clone = Arc::clone(&handler);
2326
2327 let mut aggregator = TickImbalanceBarAggregator::new(
2328 bar_type,
2329 instrument.price_precision(),
2330 instrument.size_precision(),
2331 move |bar: Bar| {
2332 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2333 handler_guard.push(bar);
2334 },
2335 );
2336
2337 let trade = TradeTick::default();
2338 aggregator.handle_trade(trade);
2339 aggregator.handle_trade(trade);
2340
2341 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2342 assert_eq!(handler_guard.len(), 1);
2343 let bar = handler_guard.first().unwrap();
2344 assert_eq!(bar.volume, Quantity::from(200000));
2345 }
2346
2347 #[rstest]
2348 fn test_tick_imbalance_bar_aggregator_handles_seller_direction(equity_aapl: Equity) {
2349 let instrument = InstrumentAny::Equity(equity_aapl);
2350 let bar_spec = BarSpecification::new(1, BarAggregation::TickImbalance, PriceType::Last);
2351 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2352 let handler = Arc::new(Mutex::new(Vec::new()));
2353 let handler_clone = Arc::clone(&handler);
2354
2355 let mut aggregator = TickImbalanceBarAggregator::new(
2356 bar_type,
2357 instrument.price_precision(),
2358 instrument.size_precision(),
2359 move |bar: Bar| {
2360 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2361 handler_guard.push(bar);
2362 },
2363 );
2364
2365 let sell = TradeTick {
2366 aggressor_side: AggressorSide::Seller,
2367 ..TradeTick::default()
2368 };
2369
2370 aggregator.handle_trade(sell);
2371
2372 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2373 assert_eq!(handler_guard.len(), 1);
2374 }
2375
2376 #[rstest]
2377 fn test_tick_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2378 let instrument = InstrumentAny::Equity(equity_aapl);
2379 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2380 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2381 let handler = Arc::new(Mutex::new(Vec::new()));
2382 let handler_clone = Arc::clone(&handler);
2383
2384 let mut aggregator = TickRunsBarAggregator::new(
2385 bar_type,
2386 instrument.price_precision(),
2387 instrument.size_precision(),
2388 move |bar: Bar| {
2389 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2390 handler_guard.push(bar);
2391 },
2392 );
2393
2394 let buy = TradeTick::default();
2395 let sell = TradeTick {
2396 aggressor_side: AggressorSide::Seller,
2397 ..buy
2398 };
2399
2400 aggregator.handle_trade(buy);
2401 aggregator.handle_trade(buy);
2402 aggregator.handle_trade(sell);
2403 aggregator.handle_trade(sell);
2404
2405 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2406 assert_eq!(handler_guard.len(), 2);
2407 }
2408
2409 #[rstest]
2410 fn test_tick_runs_bar_aggregator_volume_conservation(equity_aapl: Equity) {
2411 let instrument = InstrumentAny::Equity(equity_aapl);
2412 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2413 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2414 let handler = Arc::new(Mutex::new(Vec::new()));
2415 let handler_clone = Arc::clone(&handler);
2416
2417 let mut aggregator = TickRunsBarAggregator::new(
2418 bar_type,
2419 instrument.price_precision(),
2420 instrument.size_precision(),
2421 move |bar: Bar| {
2422 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2423 handler_guard.push(bar);
2424 },
2425 );
2426
2427 let buy = TradeTick {
2428 size: Quantity::from(1),
2429 ..TradeTick::default()
2430 };
2431 let sell = TradeTick {
2432 aggressor_side: AggressorSide::Seller,
2433 size: Quantity::from(1),
2434 ..buy
2435 };
2436
2437 aggregator.handle_trade(buy);
2438 aggregator.handle_trade(buy);
2439 aggregator.handle_trade(sell);
2440 aggregator.handle_trade(sell);
2441
2442 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2443 assert_eq!(handler_guard.len(), 2);
2444 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2445 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2446 }
2447
2448 #[rstest]
2449 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
2450 let instrument = InstrumentAny::Equity(equity_aapl);
2451 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
2452 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2453 let handler = Arc::new(Mutex::new(Vec::new()));
2454 let handler_clone = Arc::clone(&handler);
2455
2456 let mut aggregator = VolumeBarAggregator::new(
2457 bar_type,
2458 instrument.price_precision(),
2459 instrument.size_precision(),
2460 move |bar: Bar| {
2461 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2462 handler_guard.push(bar);
2463 },
2464 );
2465
2466 aggregator.update(
2467 Price::from("1.00001"),
2468 Quantity::from(25),
2469 UnixNanos::default(),
2470 );
2471
2472 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2473 assert_eq!(handler_guard.len(), 2);
2474 let bar1 = &handler_guard[0];
2475 assert_eq!(bar1.volume, Quantity::from(10));
2476 let bar2 = &handler_guard[1];
2477 assert_eq!(bar2.volume, Quantity::from(10));
2478 }
2479
2480 #[rstest]
2481 fn test_volume_runs_bar_aggregator_side_change_resets(equity_aapl: Equity) {
2482 let instrument = InstrumentAny::Equity(equity_aapl);
2483 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2484 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2485 let handler = Arc::new(Mutex::new(Vec::new()));
2486 let handler_clone = Arc::clone(&handler);
2487
2488 let mut aggregator = VolumeRunsBarAggregator::new(
2489 bar_type,
2490 instrument.price_precision(),
2491 instrument.size_precision(),
2492 move |bar: Bar| {
2493 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2494 handler_guard.push(bar);
2495 },
2496 );
2497
2498 let buy = TradeTick {
2499 instrument_id: instrument.id(),
2500 price: Price::from("1.0"),
2501 size: Quantity::from(1),
2502 ..TradeTick::default()
2503 };
2504 let sell = TradeTick {
2505 aggressor_side: AggressorSide::Seller,
2506 ..buy
2507 };
2508
2509 aggregator.handle_trade(buy);
2510 aggregator.handle_trade(buy); aggregator.handle_trade(sell);
2512 aggregator.handle_trade(sell); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2515 assert!(handler_guard.len() >= 2);
2516 assert!(
2517 (handler_guard[0].volume.as_f64() - handler_guard[1].volume.as_f64()).abs()
2518 < f64::EPSILON
2519 );
2520 }
2521
2522 #[rstest]
2523 fn test_volume_runs_bar_aggregator_handles_large_single_trade(equity_aapl: Equity) {
2524 let instrument = InstrumentAny::Equity(equity_aapl);
2525 let bar_spec = BarSpecification::new(3, BarAggregation::VolumeRuns, PriceType::Last);
2526 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2527 let handler = Arc::new(Mutex::new(Vec::new()));
2528 let handler_clone = Arc::clone(&handler);
2529
2530 let mut aggregator = VolumeRunsBarAggregator::new(
2531 bar_type,
2532 instrument.price_precision(),
2533 instrument.size_precision(),
2534 move |bar: Bar| {
2535 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2536 handler_guard.push(bar);
2537 },
2538 );
2539
2540 let trade = TradeTick {
2541 instrument_id: instrument.id(),
2542 price: Price::from("1.0"),
2543 size: Quantity::from(5),
2544 ..TradeTick::default()
2545 };
2546
2547 aggregator.handle_trade(trade);
2548
2549 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2550 assert!(!handler_guard.is_empty());
2551 assert!(handler_guard[0].volume.as_f64() > 0.0);
2552 assert!(handler_guard[0].volume.as_f64() < trade.size.as_f64());
2553 }
2554
2555 #[rstest]
2556 fn test_volume_imbalance_bar_aggregator_splits_large_trade(equity_aapl: Equity) {
2557 let instrument = InstrumentAny::Equity(equity_aapl);
2558 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeImbalance, PriceType::Last);
2559 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2560 let handler = Arc::new(Mutex::new(Vec::new()));
2561 let handler_clone = Arc::clone(&handler);
2562
2563 let mut aggregator = VolumeImbalanceBarAggregator::new(
2564 bar_type,
2565 instrument.price_precision(),
2566 instrument.size_precision(),
2567 move |bar: Bar| {
2568 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2569 handler_guard.push(bar);
2570 },
2571 );
2572
2573 let trade_small = TradeTick {
2574 instrument_id: instrument.id(),
2575 price: Price::from("1.0"),
2576 size: Quantity::from(1),
2577 ..TradeTick::default()
2578 };
2579 let trade_large = TradeTick {
2580 size: Quantity::from(3),
2581 ..trade_small
2582 };
2583
2584 aggregator.handle_trade(trade_small);
2585 aggregator.handle_trade(trade_large);
2586
2587 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2588 assert_eq!(handler_guard.len(), 2);
2589 let total_output = handler_guard
2590 .iter()
2591 .map(|bar| bar.volume.as_f64())
2592 .sum::<f64>();
2593 let total_input = trade_small.size.as_f64() + trade_large.size.as_f64();
2594 assert!((total_output - total_input).abs() < f64::EPSILON);
2595 }
2596
2597 #[rstest]
2598 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
2599 let instrument = InstrumentAny::Equity(equity_aapl);
2600 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2602 let handler = Arc::new(Mutex::new(Vec::new()));
2603 let handler_clone = Arc::clone(&handler);
2604
2605 let mut aggregator = ValueBarAggregator::new(
2606 bar_type,
2607 instrument.price_precision(),
2608 instrument.size_precision(),
2609 move |bar: Bar| {
2610 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2611 handler_guard.push(bar);
2612 },
2613 );
2614
2615 aggregator.update(
2617 Price::from("100.00"),
2618 Quantity::from(5),
2619 UnixNanos::default(),
2620 );
2621 aggregator.update(
2622 Price::from("100.00"),
2623 Quantity::from(5),
2624 UnixNanos::from(1000),
2625 );
2626
2627 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2628 assert_eq!(handler_guard.len(), 1);
2629 let bar = handler_guard.first().unwrap();
2630 assert_eq!(bar.volume, Quantity::from(10));
2631 }
2632
2633 #[rstest]
2634 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
2635 let instrument = InstrumentAny::Equity(equity_aapl);
2636 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2637 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2638 let handler = Arc::new(Mutex::new(Vec::new()));
2639 let handler_clone = Arc::clone(&handler);
2640
2641 let mut aggregator = ValueBarAggregator::new(
2642 bar_type,
2643 instrument.price_precision(),
2644 instrument.size_precision(),
2645 move |bar: Bar| {
2646 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2647 handler_guard.push(bar);
2648 },
2649 );
2650
2651 aggregator.update(
2653 Price::from("100.00"),
2654 Quantity::from(25),
2655 UnixNanos::default(),
2656 );
2657
2658 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2659 assert_eq!(handler_guard.len(), 2);
2660 let remaining_value = aggregator.get_cumulative_value();
2661 assert!(remaining_value < 1000.0); }
2663
2664 #[rstest]
2665 fn test_value_bar_aggregator_handles_zero_price(equity_aapl: Equity) {
2666 let instrument = InstrumentAny::Equity(equity_aapl);
2667 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2668 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2669 let handler = Arc::new(Mutex::new(Vec::new()));
2670 let handler_clone = Arc::clone(&handler);
2671
2672 let mut aggregator = ValueBarAggregator::new(
2673 bar_type,
2674 instrument.price_precision(),
2675 instrument.size_precision(),
2676 move |bar: Bar| {
2677 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2678 handler_guard.push(bar);
2679 },
2680 );
2681
2682 aggregator.update(
2684 Price::from("0.00"),
2685 Quantity::from(100),
2686 UnixNanos::default(),
2687 );
2688
2689 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2691 assert_eq!(handler_guard.len(), 0);
2692
2693 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2695 }
2696
2697 #[rstest]
2698 fn test_value_bar_aggregator_handles_zero_size(equity_aapl: Equity) {
2699 let instrument = InstrumentAny::Equity(equity_aapl);
2700 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
2701 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2702 let handler = Arc::new(Mutex::new(Vec::new()));
2703 let handler_clone = Arc::clone(&handler);
2704
2705 let mut aggregator = ValueBarAggregator::new(
2706 bar_type,
2707 instrument.price_precision(),
2708 instrument.size_precision(),
2709 move |bar: Bar| {
2710 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2711 handler_guard.push(bar);
2712 },
2713 );
2714
2715 aggregator.update(
2717 Price::from("100.00"),
2718 Quantity::from(0),
2719 UnixNanos::default(),
2720 );
2721
2722 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2724 assert_eq!(handler_guard.len(), 0);
2725
2726 assert_eq!(aggregator.get_cumulative_value(), 0.0);
2728 }
2729
2730 #[rstest]
2731 fn test_value_imbalance_bar_aggregator_emits_on_opposing_overflow(equity_aapl: Equity) {
2732 let instrument = InstrumentAny::Equity(equity_aapl);
2733 let bar_spec = BarSpecification::new(10, BarAggregation::ValueImbalance, PriceType::Last);
2734 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2735 let handler = Arc::new(Mutex::new(Vec::new()));
2736 let handler_clone = Arc::clone(&handler);
2737
2738 let mut aggregator = ValueImbalanceBarAggregator::new(
2739 bar_type,
2740 instrument.price_precision(),
2741 instrument.size_precision(),
2742 move |bar: Bar| {
2743 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2744 handler_guard.push(bar);
2745 },
2746 );
2747
2748 let buy = TradeTick {
2749 price: Price::from("5.0"),
2750 size: Quantity::from(2), instrument_id: instrument.id(),
2752 ..TradeTick::default()
2753 };
2754 let sell = TradeTick {
2755 price: Price::from("5.0"),
2756 size: Quantity::from(2), aggressor_side: AggressorSide::Seller,
2758 instrument_id: instrument.id(),
2759 ..buy
2760 };
2761
2762 aggregator.handle_trade(buy);
2763 aggregator.handle_trade(sell);
2764
2765 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2766 assert_eq!(handler_guard.len(), 2);
2767 }
2768
2769 #[rstest]
2770 fn test_value_runs_bar_aggregator_emits_on_consecutive_side(equity_aapl: Equity) {
2771 let instrument = InstrumentAny::Equity(equity_aapl);
2772 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2773 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2774 let handler = Arc::new(Mutex::new(Vec::new()));
2775 let handler_clone = Arc::clone(&handler);
2776
2777 let mut aggregator = ValueRunsBarAggregator::new(
2778 bar_type,
2779 instrument.price_precision(),
2780 instrument.size_precision(),
2781 move |bar: Bar| {
2782 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2783 handler_guard.push(bar);
2784 },
2785 );
2786
2787 let trade = TradeTick {
2788 price: Price::from("10.0"),
2789 size: Quantity::from(5),
2790 instrument_id: instrument.id(),
2791 ..TradeTick::default()
2792 };
2793
2794 aggregator.handle_trade(trade);
2795 aggregator.handle_trade(trade);
2796
2797 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2798 assert_eq!(handler_guard.len(), 1);
2799 let bar = handler_guard.first().unwrap();
2800 assert_eq!(bar.volume, Quantity::from(10));
2801 }
2802
2803 #[rstest]
2804 fn test_value_runs_bar_aggregator_resets_on_side_change(equity_aapl: Equity) {
2805 let instrument = InstrumentAny::Equity(equity_aapl);
2806 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2807 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2808 let handler = Arc::new(Mutex::new(Vec::new()));
2809 let handler_clone = Arc::clone(&handler);
2810
2811 let mut aggregator = ValueRunsBarAggregator::new(
2812 bar_type,
2813 instrument.price_precision(),
2814 instrument.size_precision(),
2815 move |bar: Bar| {
2816 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2817 handler_guard.push(bar);
2818 },
2819 );
2820
2821 let buy = TradeTick {
2822 price: Price::from("10.0"),
2823 size: Quantity::from(5),
2824 instrument_id: instrument.id(),
2825 ..TradeTick::default()
2826 }; let sell = TradeTick {
2828 price: Price::from("10.0"),
2829 size: Quantity::from(10),
2830 aggressor_side: AggressorSide::Seller,
2831 ..buy
2832 }; aggregator.handle_trade(buy);
2835 aggregator.handle_trade(sell);
2836
2837 let handler_guard = handler.lock().expect(MUTEX_POISONED);
2838 assert_eq!(handler_guard.len(), 1);
2839 assert_eq!(handler_guard[0].volume, Quantity::from(10));
2840 }
2841
2842 #[rstest]
2843 fn test_tick_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2844 let instrument = InstrumentAny::Equity(equity_aapl);
2845 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2846 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2847 let handler = Arc::new(Mutex::new(Vec::new()));
2848 let handler_clone = Arc::clone(&handler);
2849
2850 let mut aggregator = TickRunsBarAggregator::new(
2851 bar_type,
2852 instrument.price_precision(),
2853 instrument.size_precision(),
2854 move |bar: Bar| {
2855 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2856 handler_guard.push(bar);
2857 },
2858 );
2859
2860 let buy = TradeTick::default();
2861
2862 aggregator.handle_trade(buy);
2863 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2868 assert_eq!(handler_guard.len(), 2);
2869 }
2870
2871 #[rstest]
2872 fn test_tick_runs_bar_aggregator_handles_no_aggressor_trades(equity_aapl: Equity) {
2873 let instrument = InstrumentAny::Equity(equity_aapl);
2874 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
2875 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2876 let handler = Arc::new(Mutex::new(Vec::new()));
2877 let handler_clone = Arc::clone(&handler);
2878
2879 let mut aggregator = TickRunsBarAggregator::new(
2880 bar_type,
2881 instrument.price_precision(),
2882 instrument.size_precision(),
2883 move |bar: Bar| {
2884 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2885 handler_guard.push(bar);
2886 },
2887 );
2888
2889 let buy = TradeTick::default();
2890 let no_aggressor = TradeTick {
2891 aggressor_side: AggressorSide::NoAggressor,
2892 ..buy
2893 };
2894
2895 aggregator.handle_trade(buy);
2896 aggregator.handle_trade(no_aggressor); aggregator.handle_trade(no_aggressor); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2901 assert_eq!(handler_guard.len(), 1);
2902 }
2903
2904 #[rstest]
2905 fn test_volume_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2906 let instrument = InstrumentAny::Equity(equity_aapl);
2907 let bar_spec = BarSpecification::new(2, BarAggregation::VolumeRuns, PriceType::Last);
2908 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2909 let handler = Arc::new(Mutex::new(Vec::new()));
2910 let handler_clone = Arc::clone(&handler);
2911
2912 let mut aggregator = VolumeRunsBarAggregator::new(
2913 bar_type,
2914 instrument.price_precision(),
2915 instrument.size_precision(),
2916 move |bar: Bar| {
2917 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2918 handler_guard.push(bar);
2919 },
2920 );
2921
2922 let buy = TradeTick {
2923 instrument_id: instrument.id(),
2924 price: Price::from("1.0"),
2925 size: Quantity::from(1),
2926 ..TradeTick::default()
2927 };
2928
2929 aggregator.handle_trade(buy);
2930 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2935 assert_eq!(handler_guard.len(), 2);
2936 assert_eq!(handler_guard[0].volume, Quantity::from(2));
2937 assert_eq!(handler_guard[1].volume, Quantity::from(2));
2938 }
2939
2940 #[rstest]
2941 fn test_value_runs_bar_aggregator_continues_run_after_bar_emission(equity_aapl: Equity) {
2942 let instrument = InstrumentAny::Equity(equity_aapl);
2943 let bar_spec = BarSpecification::new(100, BarAggregation::ValueRuns, PriceType::Last);
2944 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2945 let handler = Arc::new(Mutex::new(Vec::new()));
2946 let handler_clone = Arc::clone(&handler);
2947
2948 let mut aggregator = ValueRunsBarAggregator::new(
2949 bar_type,
2950 instrument.price_precision(),
2951 instrument.size_precision(),
2952 move |bar: Bar| {
2953 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2954 handler_guard.push(bar);
2955 },
2956 );
2957
2958 let buy = TradeTick {
2959 instrument_id: instrument.id(),
2960 price: Price::from("10.0"),
2961 size: Quantity::from(5),
2962 ..TradeTick::default()
2963 }; aggregator.handle_trade(buy);
2966 aggregator.handle_trade(buy); aggregator.handle_trade(buy); aggregator.handle_trade(buy); let handler_guard = handler.lock().expect(MUTEX_POISONED);
2971 assert_eq!(handler_guard.len(), 2);
2972 assert_eq!(handler_guard[0].volume, Quantity::from(10));
2973 assert_eq!(handler_guard[1].volume, Quantity::from(10));
2974 }
2975
2976 #[rstest]
2977 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
2978 let instrument = InstrumentAny::Equity(equity_aapl);
2979 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2981 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2982 let handler = Arc::new(Mutex::new(Vec::new()));
2983 let handler_clone = Arc::clone(&handler);
2984 let clock = Rc::new(RefCell::new(TestClock::new()));
2985
2986 let mut aggregator = TimeBarAggregator::new(
2987 bar_type,
2988 instrument.price_precision(),
2989 instrument.size_precision(),
2990 clock.clone(),
2991 move |bar: Bar| {
2992 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
2993 handler_guard.push(bar);
2994 },
2995 true, false, BarIntervalType::LeftOpen,
2998 None, 15, false, );
3002
3003 aggregator.update(
3004 Price::from("100.00"),
3005 Quantity::from(1),
3006 UnixNanos::default(),
3007 );
3008
3009 let next_sec = UnixNanos::from(1_000_000_000);
3010 clock.borrow_mut().set_time(next_sec);
3011
3012 let event = TimeEvent::new(
3013 Ustr::from("1-SECOND-LAST"),
3014 UUID4::new(),
3015 next_sec,
3016 next_sec,
3017 );
3018 aggregator.build_bar(event);
3019
3020 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3021 assert_eq!(handler_guard.len(), 1);
3022 let bar = handler_guard.first().unwrap();
3023 assert_eq!(bar.ts_event, UnixNanos::default());
3024 assert_eq!(bar.ts_init, next_sec);
3025 }
3026
3027 #[rstest]
3028 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
3029 let instrument = InstrumentAny::Equity(equity_aapl);
3030 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3031 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3032 let handler = Arc::new(Mutex::new(Vec::new()));
3033 let handler_clone = Arc::clone(&handler);
3034 let clock = Rc::new(RefCell::new(TestClock::new()));
3035
3036 let mut aggregator = TimeBarAggregator::new(
3037 bar_type,
3038 instrument.price_precision(),
3039 instrument.size_precision(),
3040 clock.clone(),
3041 move |bar: Bar| {
3042 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3043 handler_guard.push(bar);
3044 },
3045 true, true, BarIntervalType::LeftOpen,
3048 None,
3049 15,
3050 false, );
3052
3053 aggregator.update(
3055 Price::from("100.00"),
3056 Quantity::from(1),
3057 UnixNanos::default(),
3058 );
3059
3060 let ts1 = UnixNanos::from(1_000_000_000);
3062 clock.borrow_mut().set_time(ts1);
3063 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3064 aggregator.build_bar(event);
3065
3066 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3068
3069 let ts2 = UnixNanos::from(2_000_000_000);
3071 clock.borrow_mut().set_time(ts2);
3072 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3073 aggregator.build_bar(event);
3074
3075 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3076 assert_eq!(handler_guard.len(), 2);
3077
3078 let bar1 = &handler_guard[0];
3079 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
3081 assert_eq!(bar1.close, Price::from("100.00"));
3082 let bar2 = &handler_guard[1];
3083 assert_eq!(bar2.ts_event, ts2);
3084 assert_eq!(bar2.ts_init, ts2);
3085 assert_eq!(bar2.close, Price::from("101.00"));
3086 }
3087
3088 #[rstest]
3089 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
3090 let instrument = InstrumentAny::Equity(equity_aapl);
3091 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3092 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3093 let handler = Arc::new(Mutex::new(Vec::new()));
3094 let handler_clone = Arc::clone(&handler);
3095 let clock = Rc::new(RefCell::new(TestClock::new()));
3096 let mut aggregator = TimeBarAggregator::new(
3097 bar_type,
3098 instrument.price_precision(),
3099 instrument.size_precision(),
3100 clock.clone(),
3101 move |bar: Bar| {
3102 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3103 handler_guard.push(bar);
3104 },
3105 true, true, BarIntervalType::RightOpen,
3108 None,
3109 15,
3110 false, );
3112
3113 aggregator.update(
3115 Price::from("100.00"),
3116 Quantity::from(1),
3117 UnixNanos::default(),
3118 );
3119
3120 let ts1 = UnixNanos::from(1_000_000_000);
3122 clock.borrow_mut().set_time(ts1);
3123 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3124 aggregator.build_bar(event);
3125
3126 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
3128
3129 let ts2 = UnixNanos::from(2_000_000_000);
3131 clock.borrow_mut().set_time(ts2);
3132 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3133 aggregator.build_bar(event);
3134
3135 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3136 assert_eq!(handler_guard.len(), 2);
3137
3138 let bar1 = &handler_guard[0];
3139 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
3141 assert_eq!(bar1.close, Price::from("100.00"));
3142
3143 let bar2 = &handler_guard[1];
3144 assert_eq!(bar2.ts_event, ts1);
3145 assert_eq!(bar2.ts_init, ts2);
3146 assert_eq!(bar2.close, Price::from("101.00"));
3147 }
3148
3149 #[rstest]
3150 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
3151 let instrument = InstrumentAny::Equity(equity_aapl);
3152 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3153 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3154 let handler = Arc::new(Mutex::new(Vec::new()));
3155 let handler_clone = Arc::clone(&handler);
3156 let clock = Rc::new(RefCell::new(TestClock::new()));
3157
3158 let mut aggregator = TimeBarAggregator::new(
3160 bar_type,
3161 instrument.price_precision(),
3162 instrument.size_precision(),
3163 clock.clone(),
3164 move |bar: Bar| {
3165 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3166 handler_guard.push(bar);
3167 },
3168 false, true, BarIntervalType::LeftOpen,
3171 None,
3172 15,
3173 false, );
3175
3176 let ts1 = UnixNanos::from(1_000_000_000);
3178 clock.borrow_mut().set_time(ts1);
3179 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3180 aggregator.build_bar(event);
3181
3182 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3183 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
3185
3186 let handler = Arc::new(Mutex::new(Vec::new()));
3188 let handler_clone = Arc::clone(&handler);
3189 let mut aggregator = TimeBarAggregator::new(
3190 bar_type,
3191 instrument.price_precision(),
3192 instrument.size_precision(),
3193 clock.clone(),
3194 move |bar: Bar| {
3195 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3196 handler_guard.push(bar);
3197 },
3198 true, true, BarIntervalType::LeftOpen,
3201 None,
3202 15,
3203 false, );
3205
3206 aggregator.update(
3207 Price::from("100.00"),
3208 Quantity::from(1),
3209 UnixNanos::default(),
3210 );
3211
3212 let ts1 = UnixNanos::from(1_000_000_000);
3214 clock.borrow_mut().set_time(ts1);
3215 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
3216 aggregator.build_bar(event);
3217
3218 let ts2 = UnixNanos::from(2_000_000_000);
3220 clock.borrow_mut().set_time(ts2);
3221 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3222 aggregator.build_bar(event);
3223
3224 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3225 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
3227 assert_eq!(bar1.close, Price::from("100.00"));
3228 let bar2 = &handler_guard[1];
3229 assert_eq!(bar2.close, Price::from("100.00")); }
3231
3232 #[rstest]
3233 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
3234 let instrument = InstrumentAny::Equity(equity_aapl);
3235 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
3236 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3237 let clock = Rc::new(RefCell::new(TestClock::new()));
3238 let handler = Arc::new(Mutex::new(Vec::new()));
3239 let handler_clone = Arc::clone(&handler);
3240
3241 let mut aggregator = TimeBarAggregator::new(
3242 bar_type,
3243 instrument.price_precision(),
3244 instrument.size_precision(),
3245 clock.clone(),
3246 move |bar: Bar| {
3247 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3248 handler_guard.push(bar);
3249 },
3250 true, true, BarIntervalType::RightOpen,
3253 None,
3254 15,
3255 false, );
3257
3258 let ts1 = UnixNanos::from(1_000_000_000);
3259 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
3260
3261 let ts2 = UnixNanos::from(2_000_000_000);
3262 clock.borrow_mut().set_time(ts2);
3263
3264 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
3266 aggregator.build_bar(event);
3267
3268 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3269 let bar = handler_guard.first().unwrap();
3270 assert_eq!(bar.ts_event, UnixNanos::default());
3271 assert_eq!(bar.ts_init, ts2);
3272 }
3273
3274 #[rstest]
3275 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
3276 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3277 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3279 let handler = Arc::new(Mutex::new(Vec::new()));
3280 let handler_clone = Arc::clone(&handler);
3281
3282 let aggregator = RenkoBarAggregator::new(
3283 bar_type,
3284 instrument.price_precision(),
3285 instrument.size_precision(),
3286 instrument.price_increment(),
3287 move |bar: Bar| {
3288 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3289 handler_guard.push(bar);
3290 },
3291 );
3292
3293 assert_eq!(aggregator.bar_type(), bar_type);
3294 assert!(!aggregator.is_running());
3295 let expected_brick_size = 10 * instrument.price_increment().raw;
3297 assert_eq!(aggregator.brick_size, expected_brick_size);
3298 }
3299
3300 #[rstest]
3301 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
3302 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3303 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3305 let handler = Arc::new(Mutex::new(Vec::new()));
3306 let handler_clone = Arc::clone(&handler);
3307
3308 let mut aggregator = RenkoBarAggregator::new(
3309 bar_type,
3310 instrument.price_precision(),
3311 instrument.size_precision(),
3312 instrument.price_increment(),
3313 move |bar: Bar| {
3314 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3315 handler_guard.push(bar);
3316 },
3317 );
3318
3319 aggregator.update(
3321 Price::from("1.00000"),
3322 Quantity::from(1),
3323 UnixNanos::default(),
3324 );
3325 aggregator.update(
3326 Price::from("1.00005"),
3327 Quantity::from(1),
3328 UnixNanos::from(1000),
3329 );
3330
3331 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3332 assert_eq!(handler_guard.len(), 0); }
3334
3335 #[rstest]
3336 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
3337 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3338 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3340 let handler = Arc::new(Mutex::new(Vec::new()));
3341 let handler_clone = Arc::clone(&handler);
3342
3343 let mut aggregator = RenkoBarAggregator::new(
3344 bar_type,
3345 instrument.price_precision(),
3346 instrument.size_precision(),
3347 instrument.price_increment(),
3348 move |bar: Bar| {
3349 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3350 handler_guard.push(bar);
3351 },
3352 );
3353
3354 aggregator.update(
3356 Price::from("1.00000"),
3357 Quantity::from(1),
3358 UnixNanos::default(),
3359 );
3360 aggregator.update(
3361 Price::from("1.00015"),
3362 Quantity::from(1),
3363 UnixNanos::from(1000),
3364 );
3365
3366 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3367 assert_eq!(handler_guard.len(), 1);
3368
3369 let bar = handler_guard.first().unwrap();
3370 assert_eq!(bar.open, Price::from("1.00000"));
3371 assert_eq!(bar.high, Price::from("1.00010"));
3372 assert_eq!(bar.low, Price::from("1.00000"));
3373 assert_eq!(bar.close, Price::from("1.00010"));
3374 assert_eq!(bar.volume, Quantity::from(2));
3375 assert_eq!(bar.ts_event, UnixNanos::from(1000));
3376 assert_eq!(bar.ts_init, UnixNanos::from(1000));
3377 }
3378
3379 #[rstest]
3380 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
3381 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3382 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3384 let handler = Arc::new(Mutex::new(Vec::new()));
3385 let handler_clone = Arc::clone(&handler);
3386
3387 let mut aggregator = RenkoBarAggregator::new(
3388 bar_type,
3389 instrument.price_precision(),
3390 instrument.size_precision(),
3391 instrument.price_increment(),
3392 move |bar: Bar| {
3393 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3394 handler_guard.push(bar);
3395 },
3396 );
3397
3398 aggregator.update(
3400 Price::from("1.00000"),
3401 Quantity::from(1),
3402 UnixNanos::default(),
3403 );
3404 aggregator.update(
3405 Price::from("1.00025"),
3406 Quantity::from(1),
3407 UnixNanos::from(1000),
3408 );
3409
3410 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3411 assert_eq!(handler_guard.len(), 2);
3412
3413 let bar1 = &handler_guard[0];
3414 assert_eq!(bar1.open, Price::from("1.00000"));
3415 assert_eq!(bar1.high, Price::from("1.00010"));
3416 assert_eq!(bar1.low, Price::from("1.00000"));
3417 assert_eq!(bar1.close, Price::from("1.00010"));
3418
3419 let bar2 = &handler_guard[1];
3420 assert_eq!(bar2.open, Price::from("1.00010"));
3421 assert_eq!(bar2.high, Price::from("1.00020"));
3422 assert_eq!(bar2.low, Price::from("1.00010"));
3423 assert_eq!(bar2.close, Price::from("1.00020"));
3424 }
3425
3426 #[rstest]
3427 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
3428 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3429 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3431 let handler = Arc::new(Mutex::new(Vec::new()));
3432 let handler_clone = Arc::clone(&handler);
3433
3434 let mut aggregator = RenkoBarAggregator::new(
3435 bar_type,
3436 instrument.price_precision(),
3437 instrument.size_precision(),
3438 instrument.price_increment(),
3439 move |bar: Bar| {
3440 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3441 handler_guard.push(bar);
3442 },
3443 );
3444
3445 aggregator.update(
3447 Price::from("1.00020"),
3448 Quantity::from(1),
3449 UnixNanos::default(),
3450 );
3451 aggregator.update(
3452 Price::from("1.00005"),
3453 Quantity::from(1),
3454 UnixNanos::from(1000),
3455 );
3456
3457 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3458 assert_eq!(handler_guard.len(), 1);
3459
3460 let bar = handler_guard.first().unwrap();
3461 assert_eq!(bar.open, Price::from("1.00020"));
3462 assert_eq!(bar.high, Price::from("1.00020"));
3463 assert_eq!(bar.low, Price::from("1.00010"));
3464 assert_eq!(bar.close, Price::from("1.00010"));
3465 assert_eq!(bar.volume, Quantity::from(2));
3466 }
3467
3468 #[rstest]
3469 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
3470 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3471 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3473 let handler = Arc::new(Mutex::new(Vec::new()));
3474 let handler_clone = Arc::clone(&handler);
3475
3476 let mut aggregator = RenkoBarAggregator::new(
3477 bar_type,
3478 instrument.price_precision(),
3479 instrument.size_precision(),
3480 instrument.price_increment(),
3481 move |bar: Bar| {
3482 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3483 handler_guard.push(bar);
3484 },
3485 );
3486
3487 let input_bar = Bar::new(
3489 BarType::new(
3490 instrument.id(),
3491 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3492 AggregationSource::Internal,
3493 ),
3494 Price::from("1.00000"),
3495 Price::from("1.00005"),
3496 Price::from("0.99995"),
3497 Price::from("1.00005"), Quantity::from(100),
3499 UnixNanos::default(),
3500 UnixNanos::from(1000),
3501 );
3502
3503 aggregator.handle_bar(input_bar);
3504
3505 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3506 assert_eq!(handler_guard.len(), 0); }
3508
3509 #[rstest]
3510 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
3511 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3512 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3514 let handler = Arc::new(Mutex::new(Vec::new()));
3515 let handler_clone = Arc::clone(&handler);
3516
3517 let mut aggregator = RenkoBarAggregator::new(
3518 bar_type,
3519 instrument.price_precision(),
3520 instrument.size_precision(),
3521 instrument.price_increment(),
3522 move |bar: Bar| {
3523 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3524 handler_guard.push(bar);
3525 },
3526 );
3527
3528 let bar1 = Bar::new(
3530 BarType::new(
3531 instrument.id(),
3532 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3533 AggregationSource::Internal,
3534 ),
3535 Price::from("1.00000"),
3536 Price::from("1.00005"),
3537 Price::from("0.99995"),
3538 Price::from("1.00000"),
3539 Quantity::from(100),
3540 UnixNanos::default(),
3541 UnixNanos::default(),
3542 );
3543
3544 let bar2 = Bar::new(
3546 BarType::new(
3547 instrument.id(),
3548 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3549 AggregationSource::Internal,
3550 ),
3551 Price::from("1.00000"),
3552 Price::from("1.00015"),
3553 Price::from("0.99995"),
3554 Price::from("1.00010"), Quantity::from(50),
3556 UnixNanos::from(60_000_000_000),
3557 UnixNanos::from(60_000_000_000),
3558 );
3559
3560 aggregator.handle_bar(bar1);
3561 aggregator.handle_bar(bar2);
3562
3563 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3564 assert_eq!(handler_guard.len(), 1);
3565
3566 let bar = handler_guard.first().unwrap();
3567 assert_eq!(bar.open, Price::from("1.00000"));
3568 assert_eq!(bar.high, Price::from("1.00010"));
3569 assert_eq!(bar.low, Price::from("1.00000"));
3570 assert_eq!(bar.close, Price::from("1.00010"));
3571 assert_eq!(bar.volume, Quantity::from(150));
3572 }
3573
3574 #[rstest]
3575 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
3576 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3577 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3579 let handler = Arc::new(Mutex::new(Vec::new()));
3580 let handler_clone = Arc::clone(&handler);
3581
3582 let mut aggregator = RenkoBarAggregator::new(
3583 bar_type,
3584 instrument.price_precision(),
3585 instrument.size_precision(),
3586 instrument.price_increment(),
3587 move |bar: Bar| {
3588 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3589 handler_guard.push(bar);
3590 },
3591 );
3592
3593 let bar1 = Bar::new(
3595 BarType::new(
3596 instrument.id(),
3597 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3598 AggregationSource::Internal,
3599 ),
3600 Price::from("1.00000"),
3601 Price::from("1.00005"),
3602 Price::from("0.99995"),
3603 Price::from("1.00000"),
3604 Quantity::from(100),
3605 UnixNanos::default(),
3606 UnixNanos::default(),
3607 );
3608
3609 let bar2 = Bar::new(
3611 BarType::new(
3612 instrument.id(),
3613 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3614 AggregationSource::Internal,
3615 ),
3616 Price::from("1.00000"),
3617 Price::from("1.00035"),
3618 Price::from("0.99995"),
3619 Price::from("1.00030"), Quantity::from(50),
3621 UnixNanos::from(60_000_000_000),
3622 UnixNanos::from(60_000_000_000),
3623 );
3624
3625 aggregator.handle_bar(bar1);
3626 aggregator.handle_bar(bar2);
3627
3628 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3629 assert_eq!(handler_guard.len(), 3);
3630
3631 let bar1 = &handler_guard[0];
3632 assert_eq!(bar1.open, Price::from("1.00000"));
3633 assert_eq!(bar1.close, Price::from("1.00010"));
3634
3635 let bar2 = &handler_guard[1];
3636 assert_eq!(bar2.open, Price::from("1.00010"));
3637 assert_eq!(bar2.close, Price::from("1.00020"));
3638
3639 let bar3 = &handler_guard[2];
3640 assert_eq!(bar3.open, Price::from("1.00020"));
3641 assert_eq!(bar3.close, Price::from("1.00030"));
3642 }
3643
3644 #[rstest]
3645 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
3646 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3647 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3649 let handler = Arc::new(Mutex::new(Vec::new()));
3650 let handler_clone = Arc::clone(&handler);
3651
3652 let mut aggregator = RenkoBarAggregator::new(
3653 bar_type,
3654 instrument.price_precision(),
3655 instrument.size_precision(),
3656 instrument.price_increment(),
3657 move |bar: Bar| {
3658 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3659 handler_guard.push(bar);
3660 },
3661 );
3662
3663 let bar1 = Bar::new(
3665 BarType::new(
3666 instrument.id(),
3667 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3668 AggregationSource::Internal,
3669 ),
3670 Price::from("1.00020"),
3671 Price::from("1.00025"),
3672 Price::from("1.00015"),
3673 Price::from("1.00020"),
3674 Quantity::from(100),
3675 UnixNanos::default(),
3676 UnixNanos::default(),
3677 );
3678
3679 let bar2 = Bar::new(
3681 BarType::new(
3682 instrument.id(),
3683 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
3684 AggregationSource::Internal,
3685 ),
3686 Price::from("1.00020"),
3687 Price::from("1.00025"),
3688 Price::from("1.00005"),
3689 Price::from("1.00010"), Quantity::from(50),
3691 UnixNanos::from(60_000_000_000),
3692 UnixNanos::from(60_000_000_000),
3693 );
3694
3695 aggregator.handle_bar(bar1);
3696 aggregator.handle_bar(bar2);
3697
3698 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3699 assert_eq!(handler_guard.len(), 1);
3700
3701 let bar = handler_guard.first().unwrap();
3702 assert_eq!(bar.open, Price::from("1.00020"));
3703 assert_eq!(bar.high, Price::from("1.00020"));
3704 assert_eq!(bar.low, Price::from("1.00010"));
3705 assert_eq!(bar.close, Price::from("1.00010"));
3706 assert_eq!(bar.volume, Quantity::from(150));
3707 }
3708
3709 #[rstest]
3710 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
3711 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3712
3713 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
3716 let handler = Arc::new(Mutex::new(Vec::new()));
3717 let handler_clone = Arc::clone(&handler);
3718
3719 let aggregator_5 = RenkoBarAggregator::new(
3720 bar_type_5,
3721 instrument.price_precision(),
3722 instrument.size_precision(),
3723 instrument.price_increment(),
3724 move |_bar: Bar| {
3725 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3726 handler_guard.push(_bar);
3727 },
3728 );
3729
3730 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
3732 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
3733
3734 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
3736 let handler2 = Arc::new(Mutex::new(Vec::new()));
3737 let handler2_clone = Arc::clone(&handler2);
3738
3739 let aggregator_20 = RenkoBarAggregator::new(
3740 bar_type_20,
3741 instrument.price_precision(),
3742 instrument.size_precision(),
3743 instrument.price_increment(),
3744 move |_bar: Bar| {
3745 let mut handler_guard = handler2_clone.lock().expect(MUTEX_POISONED);
3746 handler_guard.push(_bar);
3747 },
3748 );
3749
3750 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
3752 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
3753 }
3754
3755 #[rstest]
3756 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
3757 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3758 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3760 let handler = Arc::new(Mutex::new(Vec::new()));
3761 let handler_clone = Arc::clone(&handler);
3762
3763 let mut aggregator = RenkoBarAggregator::new(
3764 bar_type,
3765 instrument.price_precision(),
3766 instrument.size_precision(),
3767 instrument.price_increment(),
3768 move |bar: Bar| {
3769 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3770 handler_guard.push(bar);
3771 },
3772 );
3773
3774 aggregator.update(
3776 Price::from("1.00000"),
3777 Quantity::from(1),
3778 UnixNanos::from(1000),
3779 );
3780 aggregator.update(
3781 Price::from("1.00010"),
3782 Quantity::from(1),
3783 UnixNanos::from(2000),
3784 ); aggregator.update(
3786 Price::from("1.00020"),
3787 Quantity::from(1),
3788 UnixNanos::from(3000),
3789 ); aggregator.update(
3791 Price::from("1.00025"),
3792 Quantity::from(1),
3793 UnixNanos::from(4000),
3794 ); aggregator.update(
3796 Price::from("1.00030"),
3797 Quantity::from(1),
3798 UnixNanos::from(5000),
3799 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3802 assert_eq!(handler_guard.len(), 3);
3803
3804 let bar1 = &handler_guard[0];
3805 assert_eq!(bar1.open, Price::from("1.00000"));
3806 assert_eq!(bar1.close, Price::from("1.00010"));
3807
3808 let bar2 = &handler_guard[1];
3809 assert_eq!(bar2.open, Price::from("1.00010"));
3810 assert_eq!(bar2.close, Price::from("1.00020"));
3811
3812 let bar3 = &handler_guard[2];
3813 assert_eq!(bar3.open, Price::from("1.00020"));
3814 assert_eq!(bar3.close, Price::from("1.00030"));
3815 }
3816
3817 #[rstest]
3818 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
3819 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
3820 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3822 let handler = Arc::new(Mutex::new(Vec::new()));
3823 let handler_clone = Arc::clone(&handler);
3824
3825 let mut aggregator = RenkoBarAggregator::new(
3826 bar_type,
3827 instrument.price_precision(),
3828 instrument.size_precision(),
3829 instrument.price_increment(),
3830 move |bar: Bar| {
3831 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3832 handler_guard.push(bar);
3833 },
3834 );
3835
3836 aggregator.update(
3838 Price::from("1.00000"),
3839 Quantity::from(1),
3840 UnixNanos::from(1000),
3841 );
3842 aggregator.update(
3843 Price::from("1.00010"),
3844 Quantity::from(1),
3845 UnixNanos::from(2000),
3846 ); aggregator.update(
3848 Price::from("0.99990"),
3849 Quantity::from(1),
3850 UnixNanos::from(3000),
3851 ); let handler_guard = handler.lock().expect(MUTEX_POISONED);
3854 assert_eq!(handler_guard.len(), 3);
3855
3856 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
3858 assert_eq!(bar1.high, Price::from("1.00010"));
3859 assert_eq!(bar1.low, Price::from("1.00000"));
3860 assert_eq!(bar1.close, Price::from("1.00010"));
3861
3862 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
3864 assert_eq!(bar2.high, Price::from("1.00010"));
3865 assert_eq!(bar2.low, Price::from("1.00000"));
3866 assert_eq!(bar2.close, Price::from("1.00000"));
3867
3868 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
3870 assert_eq!(bar3.high, Price::from("1.00000"));
3871 assert_eq!(bar3.low, Price::from("0.99990"));
3872 assert_eq!(bar3.close, Price::from("0.99990"));
3873 }
3874
3875 #[rstest]
3876 fn test_tick_imbalance_bar_aggregator_mixed_trades_cancel_out(equity_aapl: Equity) {
3877 let instrument = InstrumentAny::Equity(equity_aapl);
3878 let bar_spec = BarSpecification::new(3, BarAggregation::TickImbalance, PriceType::Last);
3879 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3880 let handler = Arc::new(Mutex::new(Vec::new()));
3881 let handler_clone = Arc::clone(&handler);
3882
3883 let mut aggregator = TickImbalanceBarAggregator::new(
3884 bar_type,
3885 instrument.price_precision(),
3886 instrument.size_precision(),
3887 move |bar: Bar| {
3888 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3889 handler_guard.push(bar);
3890 },
3891 );
3892
3893 let buy = TradeTick {
3894 aggressor_side: AggressorSide::Buyer,
3895 ..TradeTick::default()
3896 };
3897 let sell = TradeTick {
3898 aggressor_side: AggressorSide::Seller,
3899 ..TradeTick::default()
3900 };
3901
3902 aggregator.handle_trade(buy);
3903 aggregator.handle_trade(sell);
3904 aggregator.handle_trade(buy);
3905
3906 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3907 assert_eq!(handler_guard.len(), 0);
3908 }
3909
3910 #[rstest]
3911 fn test_tick_imbalance_bar_aggregator_no_aggressor_ignored(equity_aapl: Equity) {
3912 let instrument = InstrumentAny::Equity(equity_aapl);
3913 let bar_spec = BarSpecification::new(2, BarAggregation::TickImbalance, PriceType::Last);
3914 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3915 let handler = Arc::new(Mutex::new(Vec::new()));
3916 let handler_clone = Arc::clone(&handler);
3917
3918 let mut aggregator = TickImbalanceBarAggregator::new(
3919 bar_type,
3920 instrument.price_precision(),
3921 instrument.size_precision(),
3922 move |bar: Bar| {
3923 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3924 handler_guard.push(bar);
3925 },
3926 );
3927
3928 let buy = TradeTick {
3929 aggressor_side: AggressorSide::Buyer,
3930 ..TradeTick::default()
3931 };
3932 let no_aggressor = TradeTick {
3933 aggressor_side: AggressorSide::NoAggressor,
3934 ..TradeTick::default()
3935 };
3936
3937 aggregator.handle_trade(buy);
3938 aggregator.handle_trade(no_aggressor);
3939 aggregator.handle_trade(buy);
3940
3941 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3942 assert_eq!(handler_guard.len(), 1);
3943 }
3944
3945 #[rstest]
3946 fn test_tick_runs_bar_aggregator_multiple_consecutive_runs(equity_aapl: Equity) {
3947 let instrument = InstrumentAny::Equity(equity_aapl);
3948 let bar_spec = BarSpecification::new(2, BarAggregation::TickRuns, PriceType::Last);
3949 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3950 let handler = Arc::new(Mutex::new(Vec::new()));
3951 let handler_clone = Arc::clone(&handler);
3952
3953 let mut aggregator = TickRunsBarAggregator::new(
3954 bar_type,
3955 instrument.price_precision(),
3956 instrument.size_precision(),
3957 move |bar: Bar| {
3958 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3959 handler_guard.push(bar);
3960 },
3961 );
3962
3963 let buy = TradeTick {
3964 aggressor_side: AggressorSide::Buyer,
3965 ..TradeTick::default()
3966 };
3967 let sell = TradeTick {
3968 aggressor_side: AggressorSide::Seller,
3969 ..TradeTick::default()
3970 };
3971
3972 aggregator.handle_trade(buy);
3973 aggregator.handle_trade(buy);
3974 aggregator.handle_trade(sell);
3975 aggregator.handle_trade(sell);
3976
3977 let handler_guard = handler.lock().expect(MUTEX_POISONED);
3978 assert_eq!(handler_guard.len(), 2);
3979 }
3980
3981 #[rstest]
3982 fn test_volume_imbalance_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
3983 let instrument = InstrumentAny::Equity(equity_aapl);
3984 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
3985 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
3986 let handler = Arc::new(Mutex::new(Vec::new()));
3987 let handler_clone = Arc::clone(&handler);
3988
3989 let mut aggregator = VolumeImbalanceBarAggregator::new(
3990 bar_type,
3991 instrument.price_precision(),
3992 instrument.size_precision(),
3993 move |bar: Bar| {
3994 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
3995 handler_guard.push(bar);
3996 },
3997 );
3998
3999 let large_trade = TradeTick {
4000 size: Quantity::from(25),
4001 aggressor_side: AggressorSide::Buyer,
4002 ..TradeTick::default()
4003 };
4004
4005 aggregator.handle_trade(large_trade);
4006
4007 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4008 assert_eq!(handler_guard.len(), 2);
4009 }
4010
4011 #[rstest]
4012 fn test_volume_imbalance_bar_aggregator_no_aggressor_does_not_affect_imbalance(
4013 equity_aapl: Equity,
4014 ) {
4015 let instrument = InstrumentAny::Equity(equity_aapl);
4016 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeImbalance, PriceType::Last);
4017 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4018 let handler = Arc::new(Mutex::new(Vec::new()));
4019 let handler_clone = Arc::clone(&handler);
4020
4021 let mut aggregator = VolumeImbalanceBarAggregator::new(
4022 bar_type,
4023 instrument.price_precision(),
4024 instrument.size_precision(),
4025 move |bar: Bar| {
4026 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4027 handler_guard.push(bar);
4028 },
4029 );
4030
4031 let buy = TradeTick {
4032 size: Quantity::from(5),
4033 aggressor_side: AggressorSide::Buyer,
4034 ..TradeTick::default()
4035 };
4036 let no_aggressor = TradeTick {
4037 size: Quantity::from(3),
4038 aggressor_side: AggressorSide::NoAggressor,
4039 ..TradeTick::default()
4040 };
4041
4042 aggregator.handle_trade(buy);
4043 aggregator.handle_trade(no_aggressor);
4044 aggregator.handle_trade(buy);
4045
4046 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4047 assert_eq!(handler_guard.len(), 1);
4048 }
4049
4050 #[rstest]
4051 fn test_volume_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4052 let instrument = InstrumentAny::Equity(equity_aapl);
4053 let bar_spec = BarSpecification::new(10, BarAggregation::VolumeRuns, PriceType::Last);
4054 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4055 let handler = Arc::new(Mutex::new(Vec::new()));
4056 let handler_clone = Arc::clone(&handler);
4057
4058 let mut aggregator = VolumeRunsBarAggregator::new(
4059 bar_type,
4060 instrument.price_precision(),
4061 instrument.size_precision(),
4062 move |bar: Bar| {
4063 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4064 handler_guard.push(bar);
4065 },
4066 );
4067
4068 let large_trade = TradeTick {
4069 size: Quantity::from(25),
4070 aggressor_side: AggressorSide::Buyer,
4071 ..TradeTick::default()
4072 };
4073
4074 aggregator.handle_trade(large_trade);
4075
4076 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4077 assert_eq!(handler_guard.len(), 2);
4078 }
4079
4080 #[rstest]
4081 fn test_value_runs_bar_aggregator_large_trade_spans_bars(equity_aapl: Equity) {
4082 let instrument = InstrumentAny::Equity(equity_aapl);
4083 let bar_spec = BarSpecification::new(50, BarAggregation::ValueRuns, PriceType::Last);
4084 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
4085 let handler = Arc::new(Mutex::new(Vec::new()));
4086 let handler_clone = Arc::clone(&handler);
4087
4088 let mut aggregator = ValueRunsBarAggregator::new(
4089 bar_type,
4090 instrument.price_precision(),
4091 instrument.size_precision(),
4092 move |bar: Bar| {
4093 let mut handler_guard = handler_clone.lock().expect(MUTEX_POISONED);
4094 handler_guard.push(bar);
4095 },
4096 );
4097
4098 let large_trade = TradeTick {
4099 price: Price::from("5.00"),
4100 size: Quantity::from(25),
4101 aggressor_side: AggressorSide::Buyer,
4102 ..TradeTick::default()
4103 };
4104
4105 aggregator.handle_trade(large_trade);
4106
4107 let handler_guard = handler.lock().expect(MUTEX_POISONED);
4108 assert_eq!(handler_guard.len(), 2);
4109 }
4110}