1use std::{any::Any, cell::RefCell, fmt::Debug, ops::Add, rc::Rc};
22
23use chrono::TimeDelta;
24use nautilus_common::{
25 clock::Clock,
26 timer::{TimeEvent, TimeEventCallback},
27};
28use nautilus_core::{
29 SharedCell, UnixNanos, WeakCell,
30 correctness::{self, FAILED},
31 datetime::{add_n_months_nanos, subtract_n_months_nanos},
32};
33use nautilus_model::{
34 data::{
35 QuoteTick, TradeTick,
36 bar::{Bar, BarType, get_bar_interval_ns, get_time_bar_start},
37 },
38 enums::{AggregationSource, BarAggregation, BarIntervalType},
39 types::{Price, Quantity, fixed::FIXED_SCALAR, price::PriceRaw, quantity::QuantityRaw},
40};
41
42pub trait BarAggregator: Any + Debug {
46 fn bar_type(&self) -> BarType;
48 fn is_running(&self) -> bool;
50 fn set_is_running(&mut self, value: bool);
52 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos);
54 fn handle_quote(&mut self, quote: QuoteTick) {
56 let spec = self.bar_type().spec();
57 self.update(
58 quote.extract_price(spec.price_type),
59 quote.extract_size(spec.price_type),
60 quote.ts_init,
61 );
62 }
63 fn handle_trade(&mut self, trade: TradeTick) {
65 self.update(trade.price, trade.size, trade.ts_init);
66 }
67 fn handle_bar(&mut self, bar: Bar) {
69 self.update_bar(bar, bar.volume, bar.ts_init);
70 }
71 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos);
72 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos);
74 fn stop_batch_update(&mut self);
76 fn stop(&mut self) {}
79}
80
81impl dyn BarAggregator {
82 pub fn as_any(&self) -> &dyn Any {
84 self
85 }
86 pub fn as_any_mut(&mut self) -> &mut dyn Any {
88 self
89 }
90}
91
92#[derive(Debug)]
94pub struct BarBuilder {
95 bar_type: BarType,
96 price_precision: u8,
97 size_precision: u8,
98 initialized: bool,
99 ts_last: UnixNanos,
100 count: usize,
101 last_close: Option<Price>,
102 open: Option<Price>,
103 high: Option<Price>,
104 low: Option<Price>,
105 close: Option<Price>,
106 volume: Quantity,
107}
108
109impl BarBuilder {
110 #[must_use]
118 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8) -> Self {
119 correctness::check_equal(
120 &bar_type.aggregation_source(),
121 &AggregationSource::Internal,
122 "bar_type.aggregation_source",
123 "AggregationSource::Internal",
124 )
125 .expect(FAILED);
126
127 Self {
128 bar_type,
129 price_precision,
130 size_precision,
131 initialized: false,
132 ts_last: UnixNanos::default(),
133 count: 0,
134 last_close: None,
135 open: None,
136 high: None,
137 low: None,
138 close: None,
139 volume: Quantity::zero(size_precision),
140 }
141 }
142
143 pub fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
149 if ts_init < self.ts_last {
150 return; }
152
153 if self.open.is_none() {
154 self.open = Some(price);
155 self.high = Some(price);
156 self.low = Some(price);
157 self.initialized = true;
158 } else {
159 if price > self.high.unwrap() {
160 self.high = Some(price);
161 }
162 if price < self.low.unwrap() {
163 self.low = Some(price);
164 }
165 }
166
167 self.close = Some(price);
168 self.volume = self.volume.add(size);
169 self.count += 1;
170 self.ts_last = ts_init;
171 }
172
173 pub fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
179 if ts_init < self.ts_last {
180 return; }
182
183 if self.open.is_none() {
184 self.open = Some(bar.open);
185 self.high = Some(bar.high);
186 self.low = Some(bar.low);
187 self.initialized = true;
188 } else {
189 if bar.high > self.high.unwrap() {
190 self.high = Some(bar.high);
191 }
192 if bar.low < self.low.unwrap() {
193 self.low = Some(bar.low);
194 }
195 }
196
197 self.close = Some(bar.close);
198 self.volume = self.volume.add(volume);
199 self.count += 1;
200 self.ts_last = ts_init;
201 }
202
203 pub fn reset(&mut self) {
207 self.open = None;
208 self.high = None;
209 self.low = None;
210 self.volume = Quantity::zero(self.size_precision);
211 self.count = 0;
212 }
213
214 pub fn build_now(&mut self) -> Bar {
216 self.build(self.ts_last, self.ts_last)
217 }
218
219 pub fn build(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) -> Bar {
225 if self.open.is_none() {
226 self.open = self.last_close;
227 self.high = self.last_close;
228 self.low = self.last_close;
229 self.close = self.last_close;
230 }
231
232 if let (Some(close), Some(low)) = (self.close, self.low)
233 && close < low
234 {
235 self.low = Some(close);
236 }
237
238 if let (Some(close), Some(high)) = (self.close, self.high)
239 && close > high
240 {
241 self.high = Some(close);
242 }
243
244 let bar = Bar::new(
246 self.bar_type,
247 self.open.unwrap(),
248 self.high.unwrap(),
249 self.low.unwrap(),
250 self.close.unwrap(),
251 self.volume,
252 ts_event,
253 ts_init,
254 );
255
256 self.last_close = self.close;
257 self.reset();
258 bar
259 }
260}
261
262pub struct BarAggregatorCore<H>
264where
265 H: FnMut(Bar),
266{
267 bar_type: BarType,
268 builder: BarBuilder,
269 handler: H,
270 handler_backup: Option<H>,
271 batch_handler: Option<Box<dyn FnMut(Bar)>>,
272 is_running: bool,
273 batch_mode: bool,
274}
275
276impl<H: FnMut(Bar)> Debug for BarAggregatorCore<H> {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 f.debug_struct(stringify!(BarAggregatorCore))
279 .field("bar_type", &self.bar_type)
280 .field("builder", &self.builder)
281 .field("is_running", &self.is_running)
282 .field("batch_mode", &self.batch_mode)
283 .finish()
284 }
285}
286
287impl<H> BarAggregatorCore<H>
288where
289 H: FnMut(Bar),
290{
291 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8, handler: H) -> Self {
299 Self {
300 bar_type,
301 builder: BarBuilder::new(bar_type, price_precision, size_precision),
302 handler,
303 handler_backup: None,
304 batch_handler: None,
305 is_running: false,
306 batch_mode: false,
307 }
308 }
309
310 pub const fn set_is_running(&mut self, value: bool) {
312 self.is_running = value;
313 }
314
315 fn apply_update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
316 self.builder.update(price, size, ts_init);
317 }
318
319 fn build_now_and_send(&mut self) {
320 let bar = self.builder.build_now();
321 (self.handler)(bar);
322 }
323
324 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
325 let bar = self.builder.build(ts_event, ts_init);
326
327 if self.batch_mode {
328 if let Some(handler) = &mut self.batch_handler {
329 handler(bar);
330 }
331 } else {
332 (self.handler)(bar);
333 }
334 }
335
336 pub fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>) {
338 self.batch_mode = true;
339 self.batch_handler = Some(handler);
340 }
341
342 pub fn stop_batch_update(&mut self) {
344 self.batch_mode = false;
345
346 if let Some(handler) = self.handler_backup.take() {
347 self.handler = handler;
348 }
349 }
350}
351
352pub struct TickBarAggregator<H>
357where
358 H: FnMut(Bar),
359{
360 core: BarAggregatorCore<H>,
361 cum_value: f64,
362}
363
364impl<H: FnMut(Bar)> Debug for TickBarAggregator<H> {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 f.debug_struct(stringify!(TickBarAggregator))
367 .field("core", &self.core)
368 .field("cum_value", &self.cum_value)
369 .finish()
370 }
371}
372
373impl<H> TickBarAggregator<H>
374where
375 H: FnMut(Bar),
376{
377 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8, handler: H) -> Self {
385 Self {
386 core: BarAggregatorCore::new(bar_type, price_precision, size_precision, handler),
387 cum_value: 0.0,
388 }
389 }
390}
391
392impl<H> BarAggregator for TickBarAggregator<H>
393where
394 H: FnMut(Bar) + 'static,
395{
396 fn bar_type(&self) -> BarType {
397 self.core.bar_type
398 }
399
400 fn is_running(&self) -> bool {
401 self.core.is_running
402 }
403
404 fn set_is_running(&mut self, value: bool) {
405 self.core.set_is_running(value);
406 }
407
408 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
410 self.core.apply_update(price, size, ts_init);
411 let spec = self.core.bar_type.spec();
412
413 if self.core.builder.count >= spec.step.get() {
414 self.core.build_now_and_send();
415 }
416 }
417
418 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
419 let mut volume_update = volume;
420 let average_price = Price::new(
421 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
422 self.core.builder.price_precision,
423 );
424
425 while volume_update.as_f64() > 0.0 {
426 let value_update = average_price.as_f64() * volume_update.as_f64();
427 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
428 self.cum_value += value_update;
429 self.core.builder.update_bar(bar, volume_update, ts_init);
430 break;
431 }
432
433 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
434 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
435 self.core.builder.update_bar(
436 bar,
437 Quantity::new(volume_diff, volume_update.precision),
438 ts_init,
439 );
440
441 self.core.build_now_and_send();
442 self.cum_value = 0.0;
443 volume_update = Quantity::new(
444 volume_update.as_f64() - volume_diff,
445 volume_update.precision,
446 );
447 }
448 }
449
450 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
451 self.core.start_batch_update(handler);
452 }
453
454 fn stop_batch_update(&mut self) {
455 self.core.stop_batch_update();
456 }
457}
458
459pub struct VolumeBarAggregator<H>
461where
462 H: FnMut(Bar),
463{
464 core: BarAggregatorCore<H>,
465}
466
467impl<H: FnMut(Bar)> Debug for VolumeBarAggregator<H> {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 f.debug_struct(stringify!(VolumeBarAggregator))
470 .field("core", &self.core)
471 .finish()
472 }
473}
474
475impl<H> VolumeBarAggregator<H>
476where
477 H: FnMut(Bar),
478{
479 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8, handler: H) -> Self {
487 Self {
488 core: BarAggregatorCore::new(
489 bar_type.standard(),
490 price_precision,
491 size_precision,
492 handler,
493 ),
494 }
495 }
496}
497
498impl<H> BarAggregator for VolumeBarAggregator<H>
499where
500 H: FnMut(Bar) + 'static,
501{
502 fn bar_type(&self) -> BarType {
503 self.core.bar_type
504 }
505
506 fn is_running(&self) -> bool {
507 self.core.is_running
508 }
509
510 fn set_is_running(&mut self, value: bool) {
511 self.core.set_is_running(value);
512 }
513
514 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
516 let mut raw_size_update = size.raw;
517 let spec = self.core.bar_type.spec();
518 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
519
520 while raw_size_update > 0 {
521 if self.core.builder.volume.raw + raw_size_update < raw_step {
522 self.core.apply_update(
523 price,
524 Quantity::from_raw(raw_size_update, size.precision),
525 ts_init,
526 );
527 break;
528 }
529
530 let raw_size_diff = raw_step - self.core.builder.volume.raw;
531 self.core.apply_update(
532 price,
533 Quantity::from_raw(raw_size_diff, size.precision),
534 ts_init,
535 );
536
537 self.core.build_now_and_send();
538 raw_size_update -= raw_size_diff;
539 }
540 }
541
542 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
543 let mut raw_volume_update = volume.raw;
544 let spec = self.core.bar_type.spec();
545 let raw_step = (spec.step.get() as f64 * FIXED_SCALAR) as QuantityRaw;
546
547 while raw_volume_update > 0 {
548 if self.core.builder.volume.raw + raw_volume_update < raw_step {
549 self.core.builder.update_bar(
550 bar,
551 Quantity::from_raw(raw_volume_update, volume.precision),
552 ts_init,
553 );
554 break;
555 }
556
557 let raw_volume_diff = raw_step - self.core.builder.volume.raw;
558 self.core.builder.update_bar(
559 bar,
560 Quantity::from_raw(raw_volume_diff, volume.precision),
561 ts_init,
562 );
563
564 self.core.build_now_and_send();
565 raw_volume_update -= raw_volume_diff;
566 }
567 }
568
569 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
570 self.core.start_batch_update(handler);
571 }
572
573 fn stop_batch_update(&mut self) {
574 self.core.stop_batch_update();
575 }
576}
577
578pub struct ValueBarAggregator<H>
583where
584 H: FnMut(Bar),
585{
586 core: BarAggregatorCore<H>,
587 cum_value: f64,
588}
589
590impl<H: FnMut(Bar)> Debug for ValueBarAggregator<H> {
591 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592 f.debug_struct(stringify!(ValueBarAggregator))
593 .field("core", &self.core)
594 .field("cum_value", &self.cum_value)
595 .finish()
596 }
597}
598
599impl<H> ValueBarAggregator<H>
600where
601 H: FnMut(Bar),
602{
603 pub fn new(bar_type: BarType, price_precision: u8, size_precision: u8, handler: H) -> Self {
611 Self {
612 core: BarAggregatorCore::new(
613 bar_type.standard(),
614 price_precision,
615 size_precision,
616 handler,
617 ),
618 cum_value: 0.0,
619 }
620 }
621
622 #[must_use]
623 pub const fn get_cumulative_value(&self) -> f64 {
625 self.cum_value
626 }
627}
628
629impl<H> BarAggregator for ValueBarAggregator<H>
630where
631 H: FnMut(Bar) + 'static,
632{
633 fn bar_type(&self) -> BarType {
634 self.core.bar_type
635 }
636
637 fn is_running(&self) -> bool {
638 self.core.is_running
639 }
640
641 fn set_is_running(&mut self, value: bool) {
642 self.core.set_is_running(value);
643 }
644
645 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
647 let mut size_update = size.as_f64();
648 let spec = self.core.bar_type.spec();
649
650 while size_update > 0.0 {
651 let value_update = price.as_f64() * size_update;
652 if self.cum_value + value_update < spec.step.get() as f64 {
653 self.cum_value += value_update;
654 self.core
655 .apply_update(price, Quantity::new(size_update, size.precision), ts_init);
656 break;
657 }
658
659 let value_diff = spec.step.get() as f64 - self.cum_value;
660 let size_diff = size_update * (value_diff / value_update);
661 self.core
662 .apply_update(price, Quantity::new(size_diff, size.precision), ts_init);
663
664 self.core.build_now_and_send();
665 self.cum_value = 0.0;
666 size_update -= size_diff;
667 }
668 }
669
670 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
671 let mut volume_update = volume;
672 let average_price = Price::new(
673 (bar.high.as_f64() + bar.low.as_f64() + bar.close.as_f64()) / 3.0,
674 self.core.builder.price_precision,
675 );
676
677 while volume_update.as_f64() > 0.0 {
678 let value_update = average_price.as_f64() * volume_update.as_f64();
679 if self.cum_value + value_update < self.core.bar_type.spec().step.get() as f64 {
680 self.cum_value += value_update;
681 self.core.builder.update_bar(bar, volume_update, ts_init);
682 break;
683 }
684
685 let value_diff = self.core.bar_type.spec().step.get() as f64 - self.cum_value;
686 let volume_diff = volume_update.as_f64() * (value_diff / value_update);
687 self.core.builder.update_bar(
688 bar,
689 Quantity::new(volume_diff, volume_update.precision),
690 ts_init,
691 );
692
693 self.core.build_now_and_send();
694 self.cum_value = 0.0;
695 volume_update = Quantity::new(
696 volume_update.as_f64() - volume_diff,
697 volume_update.precision,
698 );
699 }
700 }
701
702 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
703 self.core.start_batch_update(handler);
704 }
705
706 fn stop_batch_update(&mut self) {
707 self.core.stop_batch_update();
708 }
709}
710
711pub struct RenkoBarAggregator<H>
717where
718 H: FnMut(Bar),
719{
720 core: BarAggregatorCore<H>,
721 pub brick_size: PriceRaw,
722 last_close: Option<Price>,
723}
724
725impl<H: FnMut(Bar)> Debug for RenkoBarAggregator<H> {
726 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
727 f.debug_struct(stringify!(RenkoBarAggregator))
728 .field("core", &self.core)
729 .field("brick_size", &self.brick_size)
730 .field("last_close", &self.last_close)
731 .finish()
732 }
733}
734
735impl<H> RenkoBarAggregator<H>
736where
737 H: FnMut(Bar),
738{
739 pub fn new(
747 bar_type: BarType,
748 price_precision: u8,
749 size_precision: u8,
750 price_increment: Price,
751 handler: H,
752 ) -> Self {
753 let brick_size = bar_type.spec().step.get() as PriceRaw * price_increment.raw;
755
756 Self {
757 core: BarAggregatorCore::new(
758 bar_type.standard(),
759 price_precision,
760 size_precision,
761 handler,
762 ),
763 brick_size,
764 last_close: None,
765 }
766 }
767}
768
769impl<H> BarAggregator for RenkoBarAggregator<H>
770where
771 H: FnMut(Bar) + 'static,
772{
773 fn bar_type(&self) -> BarType {
774 self.core.bar_type
775 }
776
777 fn is_running(&self) -> bool {
778 self.core.is_running
779 }
780
781 fn set_is_running(&mut self, value: bool) {
782 self.core.set_is_running(value);
783 }
784
785 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
790 self.core.apply_update(price, size, ts_init);
792
793 if self.last_close.is_none() {
795 self.last_close = Some(price);
796 return;
797 }
798
799 let last_close = self.last_close.unwrap();
800
801 let current_raw = price.raw;
803 let last_close_raw = last_close.raw;
804 let price_diff_raw = current_raw - last_close_raw;
805 let abs_price_diff_raw = price_diff_raw.abs();
806
807 if abs_price_diff_raw >= self.brick_size {
809 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
810 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
811 let mut current_close = last_close;
812
813 let total_volume = self.core.builder.volume;
815
816 for _i in 0..num_bricks {
817 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
819 let brick_close = Price::from_raw(brick_close_raw, price.precision);
820
821 let (brick_high, brick_low) = if direction > 0.0 {
823 (brick_close, current_close)
824 } else {
825 (current_close, brick_close)
826 };
827
828 self.core.builder.reset();
830 self.core.builder.open = Some(current_close);
831 self.core.builder.high = Some(brick_high);
832 self.core.builder.low = Some(brick_low);
833 self.core.builder.close = Some(brick_close);
834 self.core.builder.volume = total_volume; self.core.builder.count = 1;
836 self.core.builder.ts_last = ts_init;
837 self.core.builder.initialized = true;
838
839 self.core.build_and_send(ts_init, ts_init);
841
842 current_close = brick_close;
844 self.last_close = Some(brick_close);
845 }
846 }
847 }
848
849 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
850 self.core.builder.update_bar(bar, volume, ts_init);
852
853 if self.last_close.is_none() {
855 self.last_close = Some(bar.close);
856 return;
857 }
858
859 let last_close = self.last_close.unwrap();
860
861 let current_raw = bar.close.raw;
863 let last_close_raw = last_close.raw;
864 let price_diff_raw = current_raw - last_close_raw;
865 let abs_price_diff_raw = price_diff_raw.abs();
866
867 if abs_price_diff_raw >= self.brick_size {
869 let num_bricks = (abs_price_diff_raw / self.brick_size) as usize;
870 let direction = if price_diff_raw > 0 { 1.0 } else { -1.0 };
871 let mut current_close = last_close;
872
873 let total_volume = self.core.builder.volume;
875
876 for _i in 0..num_bricks {
877 let brick_close_raw = current_close.raw + (direction as PriceRaw) * self.brick_size;
879 let brick_close = Price::from_raw(brick_close_raw, bar.close.precision);
880
881 let (brick_high, brick_low) = if direction > 0.0 {
883 (brick_close, current_close)
884 } else {
885 (current_close, brick_close)
886 };
887
888 self.core.builder.reset();
890 self.core.builder.open = Some(current_close);
891 self.core.builder.high = Some(brick_high);
892 self.core.builder.low = Some(brick_low);
893 self.core.builder.close = Some(brick_close);
894 self.core.builder.volume = total_volume; self.core.builder.count = 1;
896 self.core.builder.ts_last = ts_init;
897 self.core.builder.initialized = true;
898
899 self.core.build_and_send(ts_init, ts_init);
901
902 current_close = brick_close;
904 self.last_close = Some(brick_close);
905 }
906 }
907 }
908
909 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, _: UnixNanos) {
910 self.core.start_batch_update(handler);
911 }
912
913 fn stop_batch_update(&mut self) {
914 self.core.stop_batch_update();
915 }
916}
917
918pub struct TimeBarAggregator<H>
922where
923 H: FnMut(Bar),
924{
925 core: BarAggregatorCore<H>,
926 clock: Rc<RefCell<dyn Clock>>,
927 build_with_no_updates: bool,
928 timestamp_on_close: bool,
929 is_left_open: bool,
930 build_on_next_tick: bool,
931 stored_open_ns: UnixNanos,
932 stored_close_ns: UnixNanos,
933 timer_name: String,
934 interval_ns: UnixNanos,
935 next_close_ns: UnixNanos,
936 bar_build_delay: u64,
937 batch_open_ns: UnixNanos,
938 batch_next_close_ns: UnixNanos,
939 time_bars_origin_offset: Option<TimeDelta>,
940 skip_first_non_full_bar: bool,
941}
942
943impl<H: FnMut(Bar)> Debug for TimeBarAggregator<H> {
944 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
945 f.debug_struct(stringify!(TimeBarAggregator))
946 .field("core", &self.core)
947 .field("build_with_no_updates", &self.build_with_no_updates)
948 .field("timestamp_on_close", &self.timestamp_on_close)
949 .field("is_left_open", &self.is_left_open)
950 .field("timer_name", &self.timer_name)
951 .field("interval_ns", &self.interval_ns)
952 .field("bar_build_delay", &self.bar_build_delay)
953 .field("skip_first_non_full_bar", &self.skip_first_non_full_bar)
954 .finish()
955 }
956}
957
958#[derive(Clone, Debug)]
959pub struct NewBarCallback<H: FnMut(Bar)> {
966 aggregator: WeakCell<TimeBarAggregator<H>>,
967}
968
969impl<H: FnMut(Bar)> NewBarCallback<H> {
970 #[must_use]
972 pub fn new(aggregator: Rc<RefCell<TimeBarAggregator<H>>>) -> Self {
973 let shared: SharedCell<TimeBarAggregator<H>> = SharedCell::from(aggregator);
974 Self {
975 aggregator: shared.downgrade(),
976 }
977 }
978}
979
980impl<H: FnMut(Bar) + 'static> From<NewBarCallback<H>> for TimeEventCallback {
981 fn from(value: NewBarCallback<H>) -> Self {
982 Self::Rust(Rc::new(move |event: TimeEvent| {
983 if let Some(agg) = value.aggregator.upgrade() {
984 agg.borrow_mut().build_bar(event);
985 }
986 }))
987 }
988}
989
990impl<H> TimeBarAggregator<H>
991where
992 H: FnMut(Bar) + 'static,
993{
994 #[allow(clippy::too_many_arguments)]
1002 pub fn new(
1003 bar_type: BarType,
1004 price_precision: u8,
1005 size_precision: u8,
1006 clock: Rc<RefCell<dyn Clock>>,
1007 handler: H,
1008 build_with_no_updates: bool,
1009 timestamp_on_close: bool,
1010 interval_type: BarIntervalType,
1011 time_bars_origin_offset: Option<TimeDelta>,
1012 bar_build_delay: u64,
1013 skip_first_non_full_bar: bool,
1014 ) -> Self {
1015 let is_left_open = match interval_type {
1016 BarIntervalType::LeftOpen => true,
1017 BarIntervalType::RightOpen => false,
1018 };
1019
1020 let core = BarAggregatorCore::new(
1021 bar_type.standard(),
1022 price_precision,
1023 size_precision,
1024 handler,
1025 );
1026
1027 Self {
1028 core,
1029 clock,
1030 build_with_no_updates,
1031 timestamp_on_close,
1032 is_left_open,
1033 build_on_next_tick: false,
1034 stored_open_ns: UnixNanos::default(),
1035 stored_close_ns: UnixNanos::default(),
1036 timer_name: bar_type.to_string(),
1037 interval_ns: get_bar_interval_ns(&bar_type),
1038 next_close_ns: UnixNanos::default(),
1039 bar_build_delay,
1040 batch_open_ns: UnixNanos::default(),
1041 batch_next_close_ns: UnixNanos::default(),
1042 time_bars_origin_offset,
1043 skip_first_non_full_bar,
1044 }
1045 }
1046
1047 pub fn start(&mut self, callback: NewBarCallback<H>) -> anyhow::Result<()> {
1057 let now = self.clock.borrow().utc_now();
1058 let mut start_time =
1059 get_time_bar_start(now, &self.bar_type(), self.time_bars_origin_offset);
1060
1061 if start_time == now {
1062 self.skip_first_non_full_bar = false;
1063 }
1064
1065 start_time += TimeDelta::microseconds(self.bar_build_delay as i64);
1066
1067 let spec = &self.bar_type().spec();
1068 let start_time_ns = UnixNanos::from(start_time);
1069
1070 if spec.aggregation == BarAggregation::Month {
1071 let step = spec.step.get() as u32;
1072 let alert_time_ns = add_n_months_nanos(start_time_ns, step).expect(FAILED);
1073
1074 self.clock
1075 .borrow_mut()
1076 .set_time_alert_ns(&self.timer_name, alert_time_ns, Some(callback.into()), None)
1077 .expect(FAILED);
1078 } else {
1079 self.clock
1080 .borrow_mut()
1081 .set_timer_ns(
1082 &self.timer_name,
1083 self.interval_ns.as_u64(),
1084 Some(start_time_ns),
1085 None,
1086 Some(callback.into()),
1087 None,
1088 None,
1089 )
1090 .expect(FAILED);
1091 }
1092
1093 log::debug!("Started timer {}", self.timer_name);
1094 Ok(())
1095 }
1096
1097 pub fn stop(&mut self) {
1099 self.clock.borrow_mut().cancel_timer(&self.timer_name);
1100 }
1101
1102 pub fn start_batch_time(&mut self, time_ns: UnixNanos) {
1108 let spec = self.bar_type().spec();
1109 self.core.batch_mode = true;
1110
1111 let time = time_ns.to_datetime_utc();
1112 let start_time = get_time_bar_start(time, &self.bar_type(), self.time_bars_origin_offset);
1113 self.batch_open_ns = UnixNanos::from(start_time);
1114
1115 if spec.aggregation == BarAggregation::Month {
1116 let step = spec.step.get() as u32;
1117
1118 if self.batch_open_ns == time_ns {
1119 self.batch_open_ns =
1120 subtract_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1121 }
1122
1123 self.batch_next_close_ns = add_n_months_nanos(self.batch_open_ns, step).expect(FAILED);
1124 } else {
1125 if self.batch_open_ns == time_ns {
1126 self.batch_open_ns -= self.interval_ns;
1127 }
1128
1129 self.batch_next_close_ns = self.batch_open_ns + self.interval_ns;
1130 }
1131 }
1132
1133 const fn bar_ts_event(&self, open_ns: UnixNanos, close_ns: UnixNanos) -> UnixNanos {
1134 if self.is_left_open {
1135 if self.timestamp_on_close {
1136 close_ns
1137 } else {
1138 open_ns
1139 }
1140 } else {
1141 open_ns
1142 }
1143 }
1144
1145 fn build_and_send(&mut self, ts_event: UnixNanos, ts_init: UnixNanos) {
1146 if self.skip_first_non_full_bar {
1147 self.core.builder.reset();
1148 self.skip_first_non_full_bar = false;
1149 } else {
1150 self.core.build_and_send(ts_event, ts_init);
1151 }
1152 }
1153
1154 fn batch_pre_update(&mut self, time_ns: UnixNanos) {
1155 if time_ns > self.batch_next_close_ns && self.core.builder.initialized {
1156 let ts_init = self.batch_next_close_ns;
1157 let ts_event = self.bar_ts_event(self.batch_open_ns, ts_init);
1158 self.build_and_send(ts_event, ts_init);
1159 }
1160 }
1161
1162 fn batch_post_update(&mut self, time_ns: UnixNanos) {
1163 let step = self.bar_type().spec().step.get() as u32;
1164
1165 if !self.core.batch_mode
1167 && time_ns == self.batch_next_close_ns
1168 && time_ns > self.stored_open_ns
1169 {
1170 self.batch_next_close_ns = UnixNanos::default();
1171 return;
1172 }
1173
1174 if time_ns > self.batch_next_close_ns {
1175 if self.bar_type().spec().aggregation == BarAggregation::Month {
1177 while self.batch_next_close_ns < time_ns {
1178 self.batch_next_close_ns =
1179 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1180 }
1181
1182 self.batch_open_ns =
1183 subtract_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1184 } else {
1185 while self.batch_next_close_ns < time_ns {
1186 self.batch_next_close_ns += self.interval_ns;
1187 }
1188
1189 self.batch_open_ns = self.batch_next_close_ns - self.interval_ns;
1190 }
1191 }
1192
1193 if time_ns == self.batch_next_close_ns {
1194 let ts_event = self.bar_ts_event(self.batch_open_ns, self.batch_next_close_ns);
1195 self.build_and_send(ts_event, time_ns);
1196 self.batch_open_ns = self.batch_next_close_ns;
1197
1198 if self.bar_type().spec().aggregation == BarAggregation::Month {
1199 self.batch_next_close_ns =
1200 add_n_months_nanos(self.batch_next_close_ns, step).expect(FAILED);
1201 } else {
1202 self.batch_next_close_ns += self.interval_ns;
1203 }
1204 }
1205
1206 if !self.core.batch_mode {
1208 self.batch_next_close_ns = UnixNanos::default();
1209 }
1210 }
1211
1212 fn build_bar(&mut self, event: TimeEvent) {
1213 if !self.core.builder.initialized {
1214 self.build_on_next_tick = true;
1215 self.stored_close_ns = self.next_close_ns;
1216 return;
1217 }
1218
1219 if !self.build_with_no_updates && self.core.builder.count == 0 {
1220 return;
1221 }
1222
1223 let ts_init = event.ts_event;
1224 let ts_event = self.bar_ts_event(self.stored_open_ns, ts_init);
1225 self.build_and_send(ts_event, ts_init);
1226
1227 self.stored_open_ns = ts_init;
1228
1229 if self.bar_type().spec().aggregation == BarAggregation::Month {
1230 let step = self.bar_type().spec().step.get() as u32;
1231 let next_alert_ns = add_n_months_nanos(ts_init, step).expect(FAILED);
1232
1233 self.clock
1234 .borrow_mut()
1235 .set_time_alert_ns(&self.timer_name, next_alert_ns, None, None)
1236 .expect(FAILED);
1237
1238 self.next_close_ns = next_alert_ns;
1239 } else {
1240 self.next_close_ns = self
1241 .clock
1242 .borrow()
1243 .next_time_ns(&self.timer_name)
1244 .unwrap_or_default();
1245 }
1246 }
1247}
1248
1249impl<H: FnMut(Bar)> BarAggregator for TimeBarAggregator<H>
1250where
1251 H: FnMut(Bar) + 'static,
1252{
1253 fn bar_type(&self) -> BarType {
1254 self.core.bar_type
1255 }
1256
1257 fn is_running(&self) -> bool {
1258 self.core.is_running
1259 }
1260
1261 fn set_is_running(&mut self, value: bool) {
1262 self.core.set_is_running(value);
1263 }
1264
1265 fn stop(&mut self) {
1267 Self::stop(self);
1268 }
1269
1270 fn update(&mut self, price: Price, size: Quantity, ts_init: UnixNanos) {
1271 if self.batch_next_close_ns != UnixNanos::default() {
1272 self.batch_pre_update(ts_init);
1273 }
1274
1275 self.core.apply_update(price, size, ts_init);
1276
1277 if self.build_on_next_tick {
1278 if ts_init <= self.stored_close_ns {
1279 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1280 self.build_and_send(ts_event, ts_init);
1281 }
1282
1283 self.build_on_next_tick = false;
1284 self.stored_close_ns = UnixNanos::default();
1285 }
1286
1287 if self.batch_next_close_ns != UnixNanos::default() {
1288 self.batch_post_update(ts_init);
1289 }
1290 }
1291
1292 fn update_bar(&mut self, bar: Bar, volume: Quantity, ts_init: UnixNanos) {
1293 if self.batch_next_close_ns != UnixNanos::default() {
1294 self.batch_pre_update(ts_init);
1295 }
1296
1297 self.core.builder.update_bar(bar, volume, ts_init);
1298
1299 if self.build_on_next_tick {
1300 if ts_init <= self.stored_close_ns {
1301 let ts_event = self.bar_ts_event(self.stored_open_ns, self.stored_close_ns);
1302 self.build_and_send(ts_event, ts_init);
1303 }
1304
1305 self.build_on_next_tick = false;
1307 self.stored_close_ns = UnixNanos::default();
1308 }
1309
1310 if self.batch_next_close_ns != UnixNanos::default() {
1311 self.batch_post_update(ts_init);
1312 }
1313 }
1314
1315 fn start_batch_update(&mut self, handler: Box<dyn FnMut(Bar)>, time_ns: UnixNanos) {
1316 self.core.start_batch_update(handler);
1317 self.start_batch_time(time_ns);
1318 }
1319
1320 fn stop_batch_update(&mut self) {
1321 self.core.stop_batch_update();
1322 }
1323}
1324
1325#[cfg(test)]
1329mod tests {
1330 use std::sync::{Arc, Mutex};
1331
1332 use nautilus_common::clock::TestClock;
1333 use nautilus_core::UUID4;
1334 use nautilus_model::{
1335 data::{BarSpecification, BarType},
1336 enums::{AggregationSource, BarAggregation, PriceType},
1337 instruments::{CurrencyPair, Equity, Instrument, InstrumentAny, stubs::*},
1338 types::{Price, Quantity},
1339 };
1340 use rstest::rstest;
1341 use ustr::Ustr;
1342
1343 use super::*;
1344
1345 #[rstest]
1346 fn test_bar_builder_initialization(equity_aapl: Equity) {
1347 let instrument = InstrumentAny::Equity(equity_aapl);
1348 let bar_type = BarType::new(
1349 instrument.id(),
1350 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1351 AggregationSource::Internal,
1352 );
1353 let builder = BarBuilder::new(
1354 bar_type,
1355 instrument.price_precision(),
1356 instrument.size_precision(),
1357 );
1358
1359 assert!(!builder.initialized);
1360 assert_eq!(builder.ts_last, 0);
1361 assert_eq!(builder.count, 0);
1362 }
1363
1364 #[rstest]
1365 fn test_bar_builder_maintains_ohlc_order(equity_aapl: Equity) {
1366 let instrument = InstrumentAny::Equity(equity_aapl);
1367 let bar_type = BarType::new(
1368 instrument.id(),
1369 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1370 AggregationSource::Internal,
1371 );
1372 let mut builder = BarBuilder::new(
1373 bar_type,
1374 instrument.price_precision(),
1375 instrument.size_precision(),
1376 );
1377
1378 builder.update(
1379 Price::from("100.00"),
1380 Quantity::from(1),
1381 UnixNanos::from(1000),
1382 );
1383 builder.update(
1384 Price::from("95.00"),
1385 Quantity::from(1),
1386 UnixNanos::from(2000),
1387 );
1388 builder.update(
1389 Price::from("105.00"),
1390 Quantity::from(1),
1391 UnixNanos::from(3000),
1392 );
1393
1394 let bar = builder.build_now();
1395 assert!(bar.high > bar.low);
1396 assert_eq!(bar.open, Price::from("100.00"));
1397 assert_eq!(bar.high, Price::from("105.00"));
1398 assert_eq!(bar.low, Price::from("95.00"));
1399 assert_eq!(bar.close, Price::from("105.00"));
1400 }
1401
1402 #[rstest]
1403 fn test_update_ignores_earlier_timestamps(equity_aapl: Equity) {
1404 let instrument = InstrumentAny::Equity(equity_aapl);
1405 let bar_type = BarType::new(
1406 instrument.id(),
1407 BarSpecification::new(100, BarAggregation::Tick, PriceType::Last),
1408 AggregationSource::Internal,
1409 );
1410 let mut builder = BarBuilder::new(
1411 bar_type,
1412 instrument.price_precision(),
1413 instrument.size_precision(),
1414 );
1415
1416 builder.update(Price::from("1.00000"), Quantity::from(1), 1_000.into());
1417 builder.update(Price::from("1.00001"), Quantity::from(1), 500.into());
1418
1419 assert_eq!(builder.ts_last, 1_000);
1420 assert_eq!(builder.count, 1);
1421 }
1422
1423 #[rstest]
1424 fn test_bar_builder_single_update_results_in_expected_properties(equity_aapl: Equity) {
1425 let instrument = InstrumentAny::Equity(equity_aapl);
1426 let bar_type = BarType::new(
1427 instrument.id(),
1428 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1429 AggregationSource::Internal,
1430 );
1431 let mut builder = BarBuilder::new(
1432 bar_type,
1433 instrument.price_precision(),
1434 instrument.size_precision(),
1435 );
1436
1437 builder.update(
1438 Price::from("1.00000"),
1439 Quantity::from(1),
1440 UnixNanos::default(),
1441 );
1442
1443 assert!(builder.initialized);
1444 assert_eq!(builder.ts_last, 0);
1445 assert_eq!(builder.count, 1);
1446 }
1447
1448 #[rstest]
1449 fn test_bar_builder_single_update_when_timestamp_less_than_last_update_ignores(
1450 equity_aapl: Equity,
1451 ) {
1452 let instrument = InstrumentAny::Equity(equity_aapl);
1453 let bar_type = BarType::new(
1454 instrument.id(),
1455 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1456 AggregationSource::Internal,
1457 );
1458 let mut builder = BarBuilder::new(bar_type, 2, 0);
1459
1460 builder.update(
1461 Price::from("1.00000"),
1462 Quantity::from(1),
1463 UnixNanos::from(1_000),
1464 );
1465 builder.update(
1466 Price::from("1.00001"),
1467 Quantity::from(1),
1468 UnixNanos::from(500),
1469 );
1470
1471 assert!(builder.initialized);
1472 assert_eq!(builder.ts_last, 1_000);
1473 assert_eq!(builder.count, 1);
1474 }
1475
1476 #[rstest]
1477 fn test_bar_builder_multiple_updates_correctly_increments_count(equity_aapl: Equity) {
1478 let instrument = InstrumentAny::Equity(equity_aapl);
1479 let bar_type = BarType::new(
1480 instrument.id(),
1481 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1482 AggregationSource::Internal,
1483 );
1484 let mut builder = BarBuilder::new(
1485 bar_type,
1486 instrument.price_precision(),
1487 instrument.size_precision(),
1488 );
1489
1490 for _ in 0..5 {
1491 builder.update(
1492 Price::from("1.00000"),
1493 Quantity::from(1),
1494 UnixNanos::from(1_000),
1495 );
1496 }
1497
1498 assert_eq!(builder.count, 5);
1499 }
1500
1501 #[rstest]
1502 #[should_panic]
1503 fn test_bar_builder_build_when_no_updates_panics(equity_aapl: Equity) {
1504 let instrument = InstrumentAny::Equity(equity_aapl);
1505 let bar_type = BarType::new(
1506 instrument.id(),
1507 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1508 AggregationSource::Internal,
1509 );
1510 let mut builder = BarBuilder::new(
1511 bar_type,
1512 instrument.price_precision(),
1513 instrument.size_precision(),
1514 );
1515 let _ = builder.build_now();
1516 }
1517
1518 #[rstest]
1519 fn test_bar_builder_build_when_received_updates_returns_expected_bar(equity_aapl: Equity) {
1520 let instrument = InstrumentAny::Equity(equity_aapl);
1521 let bar_type = BarType::new(
1522 instrument.id(),
1523 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1524 AggregationSource::Internal,
1525 );
1526 let mut builder = BarBuilder::new(
1527 bar_type,
1528 instrument.price_precision(),
1529 instrument.size_precision(),
1530 );
1531
1532 builder.update(
1533 Price::from("1.00001"),
1534 Quantity::from(2),
1535 UnixNanos::default(),
1536 );
1537 builder.update(
1538 Price::from("1.00002"),
1539 Quantity::from(2),
1540 UnixNanos::default(),
1541 );
1542 builder.update(
1543 Price::from("1.00000"),
1544 Quantity::from(1),
1545 UnixNanos::from(1_000_000_000),
1546 );
1547
1548 let bar = builder.build_now();
1549
1550 assert_eq!(bar.open, Price::from("1.00001"));
1551 assert_eq!(bar.high, Price::from("1.00002"));
1552 assert_eq!(bar.low, Price::from("1.00000"));
1553 assert_eq!(bar.close, Price::from("1.00000"));
1554 assert_eq!(bar.volume, Quantity::from(5));
1555 assert_eq!(bar.ts_init, 1_000_000_000);
1556 assert_eq!(builder.ts_last, 1_000_000_000);
1557 assert_eq!(builder.count, 0);
1558 }
1559
1560 #[rstest]
1561 fn test_bar_builder_build_with_previous_close(equity_aapl: Equity) {
1562 let instrument = InstrumentAny::Equity(equity_aapl);
1563 let bar_type = BarType::new(
1564 instrument.id(),
1565 BarSpecification::new(3, BarAggregation::Tick, PriceType::Last),
1566 AggregationSource::Internal,
1567 );
1568 let mut builder = BarBuilder::new(bar_type, 2, 0);
1569
1570 builder.update(
1571 Price::from("1.00001"),
1572 Quantity::from(1),
1573 UnixNanos::default(),
1574 );
1575 builder.build_now();
1576
1577 builder.update(
1578 Price::from("1.00000"),
1579 Quantity::from(1),
1580 UnixNanos::default(),
1581 );
1582 builder.update(
1583 Price::from("1.00003"),
1584 Quantity::from(1),
1585 UnixNanos::default(),
1586 );
1587 builder.update(
1588 Price::from("1.00002"),
1589 Quantity::from(1),
1590 UnixNanos::default(),
1591 );
1592
1593 let bar = builder.build_now();
1594
1595 assert_eq!(bar.open, Price::from("1.00000"));
1596 assert_eq!(bar.high, Price::from("1.00003"));
1597 assert_eq!(bar.low, Price::from("1.00000"));
1598 assert_eq!(bar.close, Price::from("1.00002"));
1599 assert_eq!(bar.volume, Quantity::from(3));
1600 }
1601
1602 #[rstest]
1603 fn test_tick_bar_aggregator_handle_trade_when_step_count_below_threshold(equity_aapl: Equity) {
1604 let instrument = InstrumentAny::Equity(equity_aapl);
1605 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1606 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1607 let handler = Arc::new(Mutex::new(Vec::new()));
1608 let handler_clone = Arc::clone(&handler);
1609
1610 let mut aggregator = TickBarAggregator::new(
1611 bar_type,
1612 instrument.price_precision(),
1613 instrument.size_precision(),
1614 move |bar: Bar| {
1615 let mut handler_guard = handler_clone.lock().unwrap();
1616 handler_guard.push(bar);
1617 },
1618 );
1619
1620 let trade = TradeTick::default();
1621 aggregator.handle_trade(trade);
1622
1623 let handler_guard = handler.lock().unwrap();
1624 assert_eq!(handler_guard.len(), 0);
1625 }
1626
1627 #[rstest]
1628 fn test_tick_bar_aggregator_handle_trade_when_step_count_reached(equity_aapl: Equity) {
1629 let instrument = InstrumentAny::Equity(equity_aapl);
1630 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1631 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1632 let handler = Arc::new(Mutex::new(Vec::new()));
1633 let handler_clone = Arc::clone(&handler);
1634
1635 let mut aggregator = TickBarAggregator::new(
1636 bar_type,
1637 instrument.price_precision(),
1638 instrument.size_precision(),
1639 move |bar: Bar| {
1640 let mut handler_guard = handler_clone.lock().unwrap();
1641 handler_guard.push(bar);
1642 },
1643 );
1644
1645 let trade = TradeTick::default();
1646 aggregator.handle_trade(trade);
1647 aggregator.handle_trade(trade);
1648 aggregator.handle_trade(trade);
1649
1650 let handler_guard = handler.lock().unwrap();
1651 let bar = handler_guard.first().unwrap();
1652 assert_eq!(handler_guard.len(), 1);
1653 assert_eq!(bar.open, trade.price);
1654 assert_eq!(bar.high, trade.price);
1655 assert_eq!(bar.low, trade.price);
1656 assert_eq!(bar.close, trade.price);
1657 assert_eq!(bar.volume, Quantity::from(300000));
1658 assert_eq!(bar.ts_event, trade.ts_event);
1659 assert_eq!(bar.ts_init, trade.ts_init);
1660 }
1661
1662 #[rstest]
1663 fn test_tick_bar_aggregator_aggregates_to_step_size(equity_aapl: Equity) {
1664 let instrument = InstrumentAny::Equity(equity_aapl);
1665 let bar_spec = BarSpecification::new(3, BarAggregation::Tick, PriceType::Last);
1666 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1667 let handler = Arc::new(Mutex::new(Vec::new()));
1668 let handler_clone = Arc::clone(&handler);
1669
1670 let mut aggregator = TickBarAggregator::new(
1671 bar_type,
1672 instrument.price_precision(),
1673 instrument.size_precision(),
1674 move |bar: Bar| {
1675 let mut handler_guard = handler_clone.lock().unwrap();
1676 handler_guard.push(bar);
1677 },
1678 );
1679
1680 aggregator.update(
1681 Price::from("1.00001"),
1682 Quantity::from(1),
1683 UnixNanos::default(),
1684 );
1685 aggregator.update(
1686 Price::from("1.00002"),
1687 Quantity::from(1),
1688 UnixNanos::from(1000),
1689 );
1690 aggregator.update(
1691 Price::from("1.00003"),
1692 Quantity::from(1),
1693 UnixNanos::from(2000),
1694 );
1695
1696 let handler_guard = handler.lock().unwrap();
1697 assert_eq!(handler_guard.len(), 1);
1698
1699 let bar = handler_guard.first().unwrap();
1700 assert_eq!(bar.open, Price::from("1.00001"));
1701 assert_eq!(bar.high, Price::from("1.00003"));
1702 assert_eq!(bar.low, Price::from("1.00001"));
1703 assert_eq!(bar.close, Price::from("1.00003"));
1704 assert_eq!(bar.volume, Quantity::from(3));
1705 }
1706
1707 #[rstest]
1708 fn test_tick_bar_aggregator_resets_after_bar_created(equity_aapl: Equity) {
1709 let instrument = InstrumentAny::Equity(equity_aapl);
1710 let bar_spec = BarSpecification::new(2, BarAggregation::Tick, PriceType::Last);
1711 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1712 let handler = Arc::new(Mutex::new(Vec::new()));
1713 let handler_clone = Arc::clone(&handler);
1714
1715 let mut aggregator = TickBarAggregator::new(
1716 bar_type,
1717 instrument.price_precision(),
1718 instrument.size_precision(),
1719 move |bar: Bar| {
1720 let mut handler_guard = handler_clone.lock().unwrap();
1721 handler_guard.push(bar);
1722 },
1723 );
1724
1725 aggregator.update(
1726 Price::from("1.00001"),
1727 Quantity::from(1),
1728 UnixNanos::default(),
1729 );
1730 aggregator.update(
1731 Price::from("1.00002"),
1732 Quantity::from(1),
1733 UnixNanos::from(1000),
1734 );
1735 aggregator.update(
1736 Price::from("1.00003"),
1737 Quantity::from(1),
1738 UnixNanos::from(2000),
1739 );
1740 aggregator.update(
1741 Price::from("1.00004"),
1742 Quantity::from(1),
1743 UnixNanos::from(3000),
1744 );
1745
1746 let handler_guard = handler.lock().unwrap();
1747 assert_eq!(handler_guard.len(), 2);
1748
1749 let bar1 = &handler_guard[0];
1750 assert_eq!(bar1.open, Price::from("1.00001"));
1751 assert_eq!(bar1.close, Price::from("1.00002"));
1752 assert_eq!(bar1.volume, Quantity::from(2));
1753
1754 let bar2 = &handler_guard[1];
1755 assert_eq!(bar2.open, Price::from("1.00003"));
1756 assert_eq!(bar2.close, Price::from("1.00004"));
1757 assert_eq!(bar2.volume, Quantity::from(2));
1758 }
1759
1760 #[rstest]
1761 fn test_volume_bar_aggregator_builds_multiple_bars_from_large_update(equity_aapl: Equity) {
1762 let instrument = InstrumentAny::Equity(equity_aapl);
1763 let bar_spec = BarSpecification::new(10, BarAggregation::Volume, PriceType::Last);
1764 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1765 let handler = Arc::new(Mutex::new(Vec::new()));
1766 let handler_clone = Arc::clone(&handler);
1767
1768 let mut aggregator = VolumeBarAggregator::new(
1769 bar_type,
1770 instrument.price_precision(),
1771 instrument.size_precision(),
1772 move |bar: Bar| {
1773 let mut handler_guard = handler_clone.lock().unwrap();
1774 handler_guard.push(bar);
1775 },
1776 );
1777
1778 aggregator.update(
1779 Price::from("1.00001"),
1780 Quantity::from(25),
1781 UnixNanos::default(),
1782 );
1783
1784 let handler_guard = handler.lock().unwrap();
1785 assert_eq!(handler_guard.len(), 2);
1786 let bar1 = &handler_guard[0];
1787 assert_eq!(bar1.volume, Quantity::from(10));
1788 let bar2 = &handler_guard[1];
1789 assert_eq!(bar2.volume, Quantity::from(10));
1790 }
1791
1792 #[rstest]
1793 fn test_value_bar_aggregator_builds_at_value_threshold(equity_aapl: Equity) {
1794 let instrument = InstrumentAny::Equity(equity_aapl);
1795 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1797 let handler = Arc::new(Mutex::new(Vec::new()));
1798 let handler_clone = Arc::clone(&handler);
1799
1800 let mut aggregator = ValueBarAggregator::new(
1801 bar_type,
1802 instrument.price_precision(),
1803 instrument.size_precision(),
1804 move |bar: Bar| {
1805 let mut handler_guard = handler_clone.lock().unwrap();
1806 handler_guard.push(bar);
1807 },
1808 );
1809
1810 aggregator.update(
1812 Price::from("100.00"),
1813 Quantity::from(5),
1814 UnixNanos::default(),
1815 );
1816 aggregator.update(
1817 Price::from("100.00"),
1818 Quantity::from(5),
1819 UnixNanos::from(1000),
1820 );
1821
1822 let handler_guard = handler.lock().unwrap();
1823 assert_eq!(handler_guard.len(), 1);
1824 let bar = handler_guard.first().unwrap();
1825 assert_eq!(bar.volume, Quantity::from(10));
1826 }
1827
1828 #[rstest]
1829 fn test_value_bar_aggregator_handles_large_update(equity_aapl: Equity) {
1830 let instrument = InstrumentAny::Equity(equity_aapl);
1831 let bar_spec = BarSpecification::new(1000, BarAggregation::Value, PriceType::Last);
1832 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1833 let handler = Arc::new(Mutex::new(Vec::new()));
1834 let handler_clone = Arc::clone(&handler);
1835
1836 let mut aggregator = ValueBarAggregator::new(
1837 bar_type,
1838 instrument.price_precision(),
1839 instrument.size_precision(),
1840 move |bar: Bar| {
1841 let mut handler_guard = handler_clone.lock().unwrap();
1842 handler_guard.push(bar);
1843 },
1844 );
1845
1846 aggregator.update(
1848 Price::from("100.00"),
1849 Quantity::from(25),
1850 UnixNanos::default(),
1851 );
1852
1853 let handler_guard = handler.lock().unwrap();
1854 assert_eq!(handler_guard.len(), 2);
1855 let remaining_value = aggregator.get_cumulative_value();
1856 assert!(remaining_value < 1000.0); }
1858
1859 #[rstest]
1860 fn test_time_bar_aggregator_builds_at_interval(equity_aapl: Equity) {
1861 let instrument = InstrumentAny::Equity(equity_aapl);
1862 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1864 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1865 let handler = Arc::new(Mutex::new(Vec::new()));
1866 let handler_clone = Arc::clone(&handler);
1867 let clock = Rc::new(RefCell::new(TestClock::new()));
1868
1869 let mut aggregator = TimeBarAggregator::new(
1870 bar_type,
1871 instrument.price_precision(),
1872 instrument.size_precision(),
1873 clock.clone(),
1874 move |bar: Bar| {
1875 let mut handler_guard = handler_clone.lock().unwrap();
1876 handler_guard.push(bar);
1877 },
1878 true, false, BarIntervalType::LeftOpen,
1881 None, 15, false, );
1885
1886 aggregator.update(
1887 Price::from("100.00"),
1888 Quantity::from(1),
1889 UnixNanos::default(),
1890 );
1891
1892 let next_sec = UnixNanos::from(1_000_000_000);
1893 clock.borrow_mut().set_time(next_sec);
1894
1895 let event = TimeEvent::new(
1896 Ustr::from("1-SECOND-LAST"),
1897 UUID4::new(),
1898 next_sec,
1899 next_sec,
1900 );
1901 aggregator.build_bar(event);
1902
1903 let handler_guard = handler.lock().unwrap();
1904 assert_eq!(handler_guard.len(), 1);
1905 let bar = handler_guard.first().unwrap();
1906 assert_eq!(bar.ts_event, UnixNanos::default());
1907 assert_eq!(bar.ts_init, next_sec);
1908 }
1909
1910 #[rstest]
1911 fn test_time_bar_aggregator_left_open_interval(equity_aapl: Equity) {
1912 let instrument = InstrumentAny::Equity(equity_aapl);
1913 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1914 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1915 let handler = Arc::new(Mutex::new(Vec::new()));
1916 let handler_clone = Arc::clone(&handler);
1917 let clock = Rc::new(RefCell::new(TestClock::new()));
1918
1919 let mut aggregator = TimeBarAggregator::new(
1920 bar_type,
1921 instrument.price_precision(),
1922 instrument.size_precision(),
1923 clock.clone(),
1924 move |bar: Bar| {
1925 let mut handler_guard = handler_clone.lock().unwrap();
1926 handler_guard.push(bar);
1927 },
1928 true, true, BarIntervalType::LeftOpen,
1931 None,
1932 15,
1933 false, );
1935
1936 aggregator.update(
1938 Price::from("100.00"),
1939 Quantity::from(1),
1940 UnixNanos::default(),
1941 );
1942
1943 let ts1 = UnixNanos::from(1_000_000_000);
1945 clock.borrow_mut().set_time(ts1);
1946 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
1947 aggregator.build_bar(event);
1948
1949 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
1951
1952 let ts2 = UnixNanos::from(2_000_000_000);
1954 clock.borrow_mut().set_time(ts2);
1955 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
1956 aggregator.build_bar(event);
1957
1958 let handler_guard = handler.lock().unwrap();
1959 assert_eq!(handler_guard.len(), 2);
1960
1961 let bar1 = &handler_guard[0];
1962 assert_eq!(bar1.ts_event, ts1); assert_eq!(bar1.ts_init, ts1);
1964 assert_eq!(bar1.close, Price::from("100.00"));
1965 let bar2 = &handler_guard[1];
1966 assert_eq!(bar2.ts_event, ts2);
1967 assert_eq!(bar2.ts_init, ts2);
1968 assert_eq!(bar2.close, Price::from("101.00"));
1969 }
1970
1971 #[rstest]
1972 fn test_time_bar_aggregator_right_open_interval(equity_aapl: Equity) {
1973 let instrument = InstrumentAny::Equity(equity_aapl);
1974 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
1975 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
1976 let handler = Arc::new(Mutex::new(Vec::new()));
1977 let handler_clone = Arc::clone(&handler);
1978 let clock = Rc::new(RefCell::new(TestClock::new()));
1979 let mut aggregator = TimeBarAggregator::new(
1980 bar_type,
1981 instrument.price_precision(),
1982 instrument.size_precision(),
1983 clock.clone(),
1984 move |bar: Bar| {
1985 let mut handler_guard = handler_clone.lock().unwrap();
1986 handler_guard.push(bar);
1987 },
1988 true, true, BarIntervalType::RightOpen,
1991 None,
1992 15,
1993 false, );
1995
1996 aggregator.update(
1998 Price::from("100.00"),
1999 Quantity::from(1),
2000 UnixNanos::default(),
2001 );
2002
2003 let ts1 = UnixNanos::from(1_000_000_000);
2005 clock.borrow_mut().set_time(ts1);
2006 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2007 aggregator.build_bar(event);
2008
2009 aggregator.update(Price::from("101.00"), Quantity::from(1), ts1);
2011
2012 let ts2 = UnixNanos::from(2_000_000_000);
2014 clock.borrow_mut().set_time(ts2);
2015 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2016 aggregator.build_bar(event);
2017
2018 let handler_guard = handler.lock().unwrap();
2019 assert_eq!(handler_guard.len(), 2);
2020
2021 let bar1 = &handler_guard[0];
2022 assert_eq!(bar1.ts_event, UnixNanos::default()); assert_eq!(bar1.ts_init, ts1);
2024 assert_eq!(bar1.close, Price::from("100.00"));
2025
2026 let bar2 = &handler_guard[1];
2027 assert_eq!(bar2.ts_event, ts1);
2028 assert_eq!(bar2.ts_init, ts2);
2029 assert_eq!(bar2.close, Price::from("101.00"));
2030 }
2031
2032 #[rstest]
2033 fn test_time_bar_aggregator_no_updates_behavior(equity_aapl: Equity) {
2034 let instrument = InstrumentAny::Equity(equity_aapl);
2035 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2036 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2037 let handler = Arc::new(Mutex::new(Vec::new()));
2038 let handler_clone = Arc::clone(&handler);
2039 let clock = Rc::new(RefCell::new(TestClock::new()));
2040
2041 let mut aggregator = TimeBarAggregator::new(
2043 bar_type,
2044 instrument.price_precision(),
2045 instrument.size_precision(),
2046 clock.clone(),
2047 move |bar: Bar| {
2048 let mut handler_guard = handler_clone.lock().unwrap();
2049 handler_guard.push(bar);
2050 },
2051 false, true, BarIntervalType::LeftOpen,
2054 None,
2055 15,
2056 false, );
2058
2059 let ts1 = UnixNanos::from(1_000_000_000);
2061 clock.borrow_mut().set_time(ts1);
2062 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2063 aggregator.build_bar(event);
2064
2065 let handler_guard = handler.lock().unwrap();
2066 assert_eq!(handler_guard.len(), 0); drop(handler_guard);
2068
2069 let handler = Arc::new(Mutex::new(Vec::new()));
2071 let handler_clone = Arc::clone(&handler);
2072 let mut aggregator = TimeBarAggregator::new(
2073 bar_type,
2074 instrument.price_precision(),
2075 instrument.size_precision(),
2076 clock.clone(),
2077 move |bar: Bar| {
2078 let mut handler_guard = handler_clone.lock().unwrap();
2079 handler_guard.push(bar);
2080 },
2081 true, true, BarIntervalType::LeftOpen,
2084 None,
2085 15,
2086 false, );
2088
2089 aggregator.update(
2090 Price::from("100.00"),
2091 Quantity::from(1),
2092 UnixNanos::default(),
2093 );
2094
2095 let ts1 = UnixNanos::from(1_000_000_000);
2097 clock.borrow_mut().set_time(ts1);
2098 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts1, ts1);
2099 aggregator.build_bar(event);
2100
2101 let ts2 = UnixNanos::from(2_000_000_000);
2103 clock.borrow_mut().set_time(ts2);
2104 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2105 aggregator.build_bar(event);
2106
2107 let handler_guard = handler.lock().unwrap();
2108 assert_eq!(handler_guard.len(), 2); let bar1 = &handler_guard[0];
2110 assert_eq!(bar1.close, Price::from("100.00"));
2111 let bar2 = &handler_guard[1];
2112 assert_eq!(bar2.close, Price::from("100.00")); }
2114
2115 #[rstest]
2116 fn test_time_bar_aggregator_respects_timestamp_on_close(equity_aapl: Equity) {
2117 let instrument = InstrumentAny::Equity(equity_aapl);
2118 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2119 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2120 let clock = Rc::new(RefCell::new(TestClock::new()));
2121 let handler = Arc::new(Mutex::new(Vec::new()));
2122 let handler_clone = Arc::clone(&handler);
2123
2124 let mut aggregator = TimeBarAggregator::new(
2125 bar_type,
2126 instrument.price_precision(),
2127 instrument.size_precision(),
2128 clock.clone(),
2129 move |bar: Bar| {
2130 let mut handler_guard = handler_clone.lock().unwrap();
2131 handler_guard.push(bar);
2132 },
2133 true, true, BarIntervalType::RightOpen,
2136 None,
2137 15,
2138 false, );
2140
2141 let ts1 = UnixNanos::from(1_000_000_000);
2142 aggregator.update(Price::from("100.00"), Quantity::from(1), ts1);
2143
2144 let ts2 = UnixNanos::from(2_000_000_000);
2145 clock.borrow_mut().set_time(ts2);
2146
2147 let event = TimeEvent::new(Ustr::from("1-SECOND-LAST"), UUID4::new(), ts2, ts2);
2149 aggregator.build_bar(event);
2150
2151 let handler_guard = handler.lock().unwrap();
2152 let bar = handler_guard.first().unwrap();
2153 assert_eq!(bar.ts_event, UnixNanos::default());
2154 assert_eq!(bar.ts_init, ts2);
2155 }
2156
2157 #[rstest]
2158 fn test_time_bar_aggregator_batches_updates(equity_aapl: Equity) {
2159 let instrument = InstrumentAny::Equity(equity_aapl);
2160 let bar_spec = BarSpecification::new(1, BarAggregation::Second, PriceType::Last);
2161 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2162 let clock = Rc::new(RefCell::new(TestClock::new()));
2163 let handler = Arc::new(Mutex::new(Vec::new()));
2164 let handler_clone = Arc::clone(&handler);
2165
2166 let mut aggregator = TimeBarAggregator::new(
2167 bar_type,
2168 instrument.price_precision(),
2169 instrument.size_precision(),
2170 clock.clone(),
2171 move |bar: Bar| {
2172 let mut handler_guard = handler_clone.lock().unwrap();
2173 handler_guard.push(bar);
2174 },
2175 true, true, BarIntervalType::LeftOpen,
2178 None,
2179 15,
2180 false, );
2182
2183 let ts1 = UnixNanos::from(1_000_000_000);
2184 clock.borrow_mut().set_time(ts1);
2185
2186 let initial_time = clock.borrow().utc_now();
2187 aggregator.start_batch_time(UnixNanos::from(
2188 initial_time.timestamp_nanos_opt().unwrap() as u64
2189 ));
2190
2191 let handler_guard = handler.lock().unwrap();
2192 assert_eq!(handler_guard.len(), 0);
2193 }
2194
2195 #[rstest]
2200 fn test_renko_bar_aggregator_initialization(audusd_sim: CurrencyPair) {
2201 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2202 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2204 let handler = Arc::new(Mutex::new(Vec::new()));
2205 let handler_clone = Arc::clone(&handler);
2206
2207 let aggregator = RenkoBarAggregator::new(
2208 bar_type,
2209 instrument.price_precision(),
2210 instrument.size_precision(),
2211 instrument.price_increment(),
2212 move |bar: Bar| {
2213 let mut handler_guard = handler_clone.lock().unwrap();
2214 handler_guard.push(bar);
2215 },
2216 );
2217
2218 assert_eq!(aggregator.bar_type(), bar_type);
2219 assert!(!aggregator.is_running());
2220 let expected_brick_size = 10 * instrument.price_increment().raw;
2222 assert_eq!(aggregator.brick_size, expected_brick_size);
2223 }
2224
2225 #[rstest]
2226 fn test_renko_bar_aggregator_update_below_brick_size_no_bar(audusd_sim: CurrencyPair) {
2227 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2228 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2230 let handler = Arc::new(Mutex::new(Vec::new()));
2231 let handler_clone = Arc::clone(&handler);
2232
2233 let mut aggregator = RenkoBarAggregator::new(
2234 bar_type,
2235 instrument.price_precision(),
2236 instrument.size_precision(),
2237 instrument.price_increment(),
2238 move |bar: Bar| {
2239 let mut handler_guard = handler_clone.lock().unwrap();
2240 handler_guard.push(bar);
2241 },
2242 );
2243
2244 aggregator.update(
2246 Price::from("1.00000"),
2247 Quantity::from(1),
2248 UnixNanos::default(),
2249 );
2250 aggregator.update(
2251 Price::from("1.00005"),
2252 Quantity::from(1),
2253 UnixNanos::from(1000),
2254 );
2255
2256 let handler_guard = handler.lock().unwrap();
2257 assert_eq!(handler_guard.len(), 0); }
2259
2260 #[rstest]
2261 fn test_renko_bar_aggregator_update_exceeds_brick_size_creates_bar(audusd_sim: CurrencyPair) {
2262 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2263 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2265 let handler = Arc::new(Mutex::new(Vec::new()));
2266 let handler_clone = Arc::clone(&handler);
2267
2268 let mut aggregator = RenkoBarAggregator::new(
2269 bar_type,
2270 instrument.price_precision(),
2271 instrument.size_precision(),
2272 instrument.price_increment(),
2273 move |bar: Bar| {
2274 let mut handler_guard = handler_clone.lock().unwrap();
2275 handler_guard.push(bar);
2276 },
2277 );
2278
2279 aggregator.update(
2281 Price::from("1.00000"),
2282 Quantity::from(1),
2283 UnixNanos::default(),
2284 );
2285 aggregator.update(
2286 Price::from("1.00015"),
2287 Quantity::from(1),
2288 UnixNanos::from(1000),
2289 );
2290
2291 let handler_guard = handler.lock().unwrap();
2292 assert_eq!(handler_guard.len(), 1);
2293
2294 let bar = handler_guard.first().unwrap();
2295 assert_eq!(bar.open, Price::from("1.00000"));
2296 assert_eq!(bar.high, Price::from("1.00010"));
2297 assert_eq!(bar.low, Price::from("1.00000"));
2298 assert_eq!(bar.close, Price::from("1.00010"));
2299 assert_eq!(bar.volume, Quantity::from(2));
2300 assert_eq!(bar.ts_event, UnixNanos::from(1000));
2301 assert_eq!(bar.ts_init, UnixNanos::from(1000));
2302 }
2303
2304 #[rstest]
2305 fn test_renko_bar_aggregator_multiple_bricks_in_one_update(audusd_sim: CurrencyPair) {
2306 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2307 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2309 let handler = Arc::new(Mutex::new(Vec::new()));
2310 let handler_clone = Arc::clone(&handler);
2311
2312 let mut aggregator = RenkoBarAggregator::new(
2313 bar_type,
2314 instrument.price_precision(),
2315 instrument.size_precision(),
2316 instrument.price_increment(),
2317 move |bar: Bar| {
2318 let mut handler_guard = handler_clone.lock().unwrap();
2319 handler_guard.push(bar);
2320 },
2321 );
2322
2323 aggregator.update(
2325 Price::from("1.00000"),
2326 Quantity::from(1),
2327 UnixNanos::default(),
2328 );
2329 aggregator.update(
2330 Price::from("1.00025"),
2331 Quantity::from(1),
2332 UnixNanos::from(1000),
2333 );
2334
2335 let handler_guard = handler.lock().unwrap();
2336 assert_eq!(handler_guard.len(), 2);
2337
2338 let bar1 = &handler_guard[0];
2339 assert_eq!(bar1.open, Price::from("1.00000"));
2340 assert_eq!(bar1.high, Price::from("1.00010"));
2341 assert_eq!(bar1.low, Price::from("1.00000"));
2342 assert_eq!(bar1.close, Price::from("1.00010"));
2343
2344 let bar2 = &handler_guard[1];
2345 assert_eq!(bar2.open, Price::from("1.00010"));
2346 assert_eq!(bar2.high, Price::from("1.00020"));
2347 assert_eq!(bar2.low, Price::from("1.00010"));
2348 assert_eq!(bar2.close, Price::from("1.00020"));
2349 }
2350
2351 #[rstest]
2352 fn test_renko_bar_aggregator_downward_movement(audusd_sim: CurrencyPair) {
2353 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2354 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2356 let handler = Arc::new(Mutex::new(Vec::new()));
2357 let handler_clone = Arc::clone(&handler);
2358
2359 let mut aggregator = RenkoBarAggregator::new(
2360 bar_type,
2361 instrument.price_precision(),
2362 instrument.size_precision(),
2363 instrument.price_increment(),
2364 move |bar: Bar| {
2365 let mut handler_guard = handler_clone.lock().unwrap();
2366 handler_guard.push(bar);
2367 },
2368 );
2369
2370 aggregator.update(
2372 Price::from("1.00020"),
2373 Quantity::from(1),
2374 UnixNanos::default(),
2375 );
2376 aggregator.update(
2377 Price::from("1.00005"),
2378 Quantity::from(1),
2379 UnixNanos::from(1000),
2380 );
2381
2382 let handler_guard = handler.lock().unwrap();
2383 assert_eq!(handler_guard.len(), 1);
2384
2385 let bar = handler_guard.first().unwrap();
2386 assert_eq!(bar.open, Price::from("1.00020"));
2387 assert_eq!(bar.high, Price::from("1.00020"));
2388 assert_eq!(bar.low, Price::from("1.00010"));
2389 assert_eq!(bar.close, Price::from("1.00010"));
2390 assert_eq!(bar.volume, Quantity::from(2));
2391 }
2392
2393 #[rstest]
2394 fn test_renko_bar_aggregator_handle_bar_below_brick_size(audusd_sim: CurrencyPair) {
2395 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2396 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2398 let handler = Arc::new(Mutex::new(Vec::new()));
2399 let handler_clone = Arc::clone(&handler);
2400
2401 let mut aggregator = RenkoBarAggregator::new(
2402 bar_type,
2403 instrument.price_precision(),
2404 instrument.size_precision(),
2405 instrument.price_increment(),
2406 move |bar: Bar| {
2407 let mut handler_guard = handler_clone.lock().unwrap();
2408 handler_guard.push(bar);
2409 },
2410 );
2411
2412 let input_bar = Bar::new(
2414 BarType::new(
2415 instrument.id(),
2416 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2417 AggregationSource::Internal,
2418 ),
2419 Price::from("1.00000"),
2420 Price::from("1.00005"),
2421 Price::from("0.99995"),
2422 Price::from("1.00005"), Quantity::from(100),
2424 UnixNanos::default(),
2425 UnixNanos::from(1000),
2426 );
2427
2428 aggregator.handle_bar(input_bar);
2429
2430 let handler_guard = handler.lock().unwrap();
2431 assert_eq!(handler_guard.len(), 0); }
2433
2434 #[rstest]
2435 fn test_renko_bar_aggregator_handle_bar_exceeds_brick_size(audusd_sim: CurrencyPair) {
2436 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2437 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2439 let handler = Arc::new(Mutex::new(Vec::new()));
2440 let handler_clone = Arc::clone(&handler);
2441
2442 let mut aggregator = RenkoBarAggregator::new(
2443 bar_type,
2444 instrument.price_precision(),
2445 instrument.size_precision(),
2446 instrument.price_increment(),
2447 move |bar: Bar| {
2448 let mut handler_guard = handler_clone.lock().unwrap();
2449 handler_guard.push(bar);
2450 },
2451 );
2452
2453 let bar1 = Bar::new(
2455 BarType::new(
2456 instrument.id(),
2457 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2458 AggregationSource::Internal,
2459 ),
2460 Price::from("1.00000"),
2461 Price::from("1.00005"),
2462 Price::from("0.99995"),
2463 Price::from("1.00000"),
2464 Quantity::from(100),
2465 UnixNanos::default(),
2466 UnixNanos::default(),
2467 );
2468
2469 let bar2 = Bar::new(
2471 BarType::new(
2472 instrument.id(),
2473 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2474 AggregationSource::Internal,
2475 ),
2476 Price::from("1.00000"),
2477 Price::from("1.00015"),
2478 Price::from("0.99995"),
2479 Price::from("1.00010"), Quantity::from(50),
2481 UnixNanos::from(60_000_000_000),
2482 UnixNanos::from(60_000_000_000),
2483 );
2484
2485 aggregator.handle_bar(bar1);
2486 aggregator.handle_bar(bar2);
2487
2488 let handler_guard = handler.lock().unwrap();
2489 assert_eq!(handler_guard.len(), 1);
2490
2491 let bar = handler_guard.first().unwrap();
2492 assert_eq!(bar.open, Price::from("1.00000"));
2493 assert_eq!(bar.high, Price::from("1.00010"));
2494 assert_eq!(bar.low, Price::from("1.00000"));
2495 assert_eq!(bar.close, Price::from("1.00010"));
2496 assert_eq!(bar.volume, Quantity::from(150));
2497 }
2498
2499 #[rstest]
2500 fn test_renko_bar_aggregator_handle_bar_multiple_bricks(audusd_sim: CurrencyPair) {
2501 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2502 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2504 let handler = Arc::new(Mutex::new(Vec::new()));
2505 let handler_clone = Arc::clone(&handler);
2506
2507 let mut aggregator = RenkoBarAggregator::new(
2508 bar_type,
2509 instrument.price_precision(),
2510 instrument.size_precision(),
2511 instrument.price_increment(),
2512 move |bar: Bar| {
2513 let mut handler_guard = handler_clone.lock().unwrap();
2514 handler_guard.push(bar);
2515 },
2516 );
2517
2518 let bar1 = Bar::new(
2520 BarType::new(
2521 instrument.id(),
2522 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2523 AggregationSource::Internal,
2524 ),
2525 Price::from("1.00000"),
2526 Price::from("1.00005"),
2527 Price::from("0.99995"),
2528 Price::from("1.00000"),
2529 Quantity::from(100),
2530 UnixNanos::default(),
2531 UnixNanos::default(),
2532 );
2533
2534 let bar2 = Bar::new(
2536 BarType::new(
2537 instrument.id(),
2538 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2539 AggregationSource::Internal,
2540 ),
2541 Price::from("1.00000"),
2542 Price::from("1.00035"),
2543 Price::from("0.99995"),
2544 Price::from("1.00030"), Quantity::from(50),
2546 UnixNanos::from(60_000_000_000),
2547 UnixNanos::from(60_000_000_000),
2548 );
2549
2550 aggregator.handle_bar(bar1);
2551 aggregator.handle_bar(bar2);
2552
2553 let handler_guard = handler.lock().unwrap();
2554 assert_eq!(handler_guard.len(), 3);
2555
2556 let bar1 = &handler_guard[0];
2557 assert_eq!(bar1.open, Price::from("1.00000"));
2558 assert_eq!(bar1.close, Price::from("1.00010"));
2559
2560 let bar2 = &handler_guard[1];
2561 assert_eq!(bar2.open, Price::from("1.00010"));
2562 assert_eq!(bar2.close, Price::from("1.00020"));
2563
2564 let bar3 = &handler_guard[2];
2565 assert_eq!(bar3.open, Price::from("1.00020"));
2566 assert_eq!(bar3.close, Price::from("1.00030"));
2567 }
2568
2569 #[rstest]
2570 fn test_renko_bar_aggregator_handle_bar_downward_movement(audusd_sim: CurrencyPair) {
2571 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2572 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2574 let handler = Arc::new(Mutex::new(Vec::new()));
2575 let handler_clone = Arc::clone(&handler);
2576
2577 let mut aggregator = RenkoBarAggregator::new(
2578 bar_type,
2579 instrument.price_precision(),
2580 instrument.size_precision(),
2581 instrument.price_increment(),
2582 move |bar: Bar| {
2583 let mut handler_guard = handler_clone.lock().unwrap();
2584 handler_guard.push(bar);
2585 },
2586 );
2587
2588 let bar1 = Bar::new(
2590 BarType::new(
2591 instrument.id(),
2592 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2593 AggregationSource::Internal,
2594 ),
2595 Price::from("1.00020"),
2596 Price::from("1.00025"),
2597 Price::from("1.00015"),
2598 Price::from("1.00020"),
2599 Quantity::from(100),
2600 UnixNanos::default(),
2601 UnixNanos::default(),
2602 );
2603
2604 let bar2 = Bar::new(
2606 BarType::new(
2607 instrument.id(),
2608 BarSpecification::new(1, BarAggregation::Minute, PriceType::Mid),
2609 AggregationSource::Internal,
2610 ),
2611 Price::from("1.00020"),
2612 Price::from("1.00025"),
2613 Price::from("1.00005"),
2614 Price::from("1.00010"), Quantity::from(50),
2616 UnixNanos::from(60_000_000_000),
2617 UnixNanos::from(60_000_000_000),
2618 );
2619
2620 aggregator.handle_bar(bar1);
2621 aggregator.handle_bar(bar2);
2622
2623 let handler_guard = handler.lock().unwrap();
2624 assert_eq!(handler_guard.len(), 1);
2625
2626 let bar = handler_guard.first().unwrap();
2627 assert_eq!(bar.open, Price::from("1.00020"));
2628 assert_eq!(bar.high, Price::from("1.00020"));
2629 assert_eq!(bar.low, Price::from("1.00010"));
2630 assert_eq!(bar.close, Price::from("1.00010"));
2631 assert_eq!(bar.volume, Quantity::from(150));
2632 }
2633
2634 #[rstest]
2635 fn test_renko_bar_aggregator_brick_size_calculation(audusd_sim: CurrencyPair) {
2636 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2637
2638 let bar_spec_5 = BarSpecification::new(5, BarAggregation::Renko, PriceType::Mid); let bar_type_5 = BarType::new(instrument.id(), bar_spec_5, AggregationSource::Internal);
2641 let handler = Arc::new(Mutex::new(Vec::new()));
2642 let handler_clone = Arc::clone(&handler);
2643
2644 let aggregator_5 = RenkoBarAggregator::new(
2645 bar_type_5,
2646 instrument.price_precision(),
2647 instrument.size_precision(),
2648 instrument.price_increment(),
2649 move |_bar: Bar| {
2650 let mut handler_guard = handler_clone.lock().unwrap();
2651 handler_guard.push(_bar);
2652 },
2653 );
2654
2655 let expected_brick_size_5 = 5 * instrument.price_increment().raw;
2657 assert_eq!(aggregator_5.brick_size, expected_brick_size_5);
2658
2659 let bar_spec_20 = BarSpecification::new(20, BarAggregation::Renko, PriceType::Mid); let bar_type_20 = BarType::new(instrument.id(), bar_spec_20, AggregationSource::Internal);
2661 let handler2 = Arc::new(Mutex::new(Vec::new()));
2662 let handler2_clone = Arc::clone(&handler2);
2663
2664 let aggregator_20 = RenkoBarAggregator::new(
2665 bar_type_20,
2666 instrument.price_precision(),
2667 instrument.size_precision(),
2668 instrument.price_increment(),
2669 move |_bar: Bar| {
2670 let mut handler_guard = handler2_clone.lock().unwrap();
2671 handler_guard.push(_bar);
2672 },
2673 );
2674
2675 let expected_brick_size_20 = 20 * instrument.price_increment().raw;
2677 assert_eq!(aggregator_20.brick_size, expected_brick_size_20);
2678 }
2679
2680 #[rstest]
2681 fn test_renko_bar_aggregator_sequential_updates(audusd_sim: CurrencyPair) {
2682 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2683 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2685 let handler = Arc::new(Mutex::new(Vec::new()));
2686 let handler_clone = Arc::clone(&handler);
2687
2688 let mut aggregator = RenkoBarAggregator::new(
2689 bar_type,
2690 instrument.price_precision(),
2691 instrument.size_precision(),
2692 instrument.price_increment(),
2693 move |bar: Bar| {
2694 let mut handler_guard = handler_clone.lock().unwrap();
2695 handler_guard.push(bar);
2696 },
2697 );
2698
2699 aggregator.update(
2701 Price::from("1.00000"),
2702 Quantity::from(1),
2703 UnixNanos::from(1000),
2704 );
2705 aggregator.update(
2706 Price::from("1.00010"),
2707 Quantity::from(1),
2708 UnixNanos::from(2000),
2709 ); aggregator.update(
2711 Price::from("1.00020"),
2712 Quantity::from(1),
2713 UnixNanos::from(3000),
2714 ); aggregator.update(
2716 Price::from("1.00025"),
2717 Quantity::from(1),
2718 UnixNanos::from(4000),
2719 ); aggregator.update(
2721 Price::from("1.00030"),
2722 Quantity::from(1),
2723 UnixNanos::from(5000),
2724 ); let handler_guard = handler.lock().unwrap();
2727 assert_eq!(handler_guard.len(), 3);
2728
2729 let bar1 = &handler_guard[0];
2730 assert_eq!(bar1.open, Price::from("1.00000"));
2731 assert_eq!(bar1.close, Price::from("1.00010"));
2732
2733 let bar2 = &handler_guard[1];
2734 assert_eq!(bar2.open, Price::from("1.00010"));
2735 assert_eq!(bar2.close, Price::from("1.00020"));
2736
2737 let bar3 = &handler_guard[2];
2738 assert_eq!(bar3.open, Price::from("1.00020"));
2739 assert_eq!(bar3.close, Price::from("1.00030"));
2740 }
2741
2742 #[rstest]
2743 fn test_renko_bar_aggregator_mixed_direction_movement(audusd_sim: CurrencyPair) {
2744 let instrument = InstrumentAny::CurrencyPair(audusd_sim);
2745 let bar_spec = BarSpecification::new(10, BarAggregation::Renko, PriceType::Mid); let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::Internal);
2747 let handler = Arc::new(Mutex::new(Vec::new()));
2748 let handler_clone = Arc::clone(&handler);
2749
2750 let mut aggregator = RenkoBarAggregator::new(
2751 bar_type,
2752 instrument.price_precision(),
2753 instrument.size_precision(),
2754 instrument.price_increment(),
2755 move |bar: Bar| {
2756 let mut handler_guard = handler_clone.lock().unwrap();
2757 handler_guard.push(bar);
2758 },
2759 );
2760
2761 aggregator.update(
2763 Price::from("1.00000"),
2764 Quantity::from(1),
2765 UnixNanos::from(1000),
2766 );
2767 aggregator.update(
2768 Price::from("1.00010"),
2769 Quantity::from(1),
2770 UnixNanos::from(2000),
2771 ); aggregator.update(
2773 Price::from("0.99990"),
2774 Quantity::from(1),
2775 UnixNanos::from(3000),
2776 ); let handler_guard = handler.lock().unwrap();
2779 assert_eq!(handler_guard.len(), 3);
2780
2781 let bar1 = &handler_guard[0]; assert_eq!(bar1.open, Price::from("1.00000"));
2783 assert_eq!(bar1.high, Price::from("1.00010"));
2784 assert_eq!(bar1.low, Price::from("1.00000"));
2785 assert_eq!(bar1.close, Price::from("1.00010"));
2786
2787 let bar2 = &handler_guard[1]; assert_eq!(bar2.open, Price::from("1.00010"));
2789 assert_eq!(bar2.high, Price::from("1.00010"));
2790 assert_eq!(bar2.low, Price::from("1.00000"));
2791 assert_eq!(bar2.close, Price::from("1.00000"));
2792
2793 let bar3 = &handler_guard[2]; assert_eq!(bar3.open, Price::from("1.00000"));
2795 assert_eq!(bar3.high, Price::from("1.00000"));
2796 assert_eq!(bar3.low, Price::from("0.99990"));
2797 assert_eq!(bar3.close, Price::from("0.99990"));
2798 }
2799}