nautilus_model/data/
bar.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregate structures, data types and functionality.
17
18use std::{
19    collections::HashMap,
20    fmt::{Debug, Display, Formatter},
21    hash::Hash,
22    num::NonZeroUsize,
23    str::FromStr,
24};
25
26use chrono::{DateTime, Datelike, Duration, SubsecRound, TimeDelta, Timelike, Utc};
27use derive_builder::Builder;
28use indexmap::IndexMap;
29use nautilus_core::{
30    UnixNanos,
31    correctness::{FAILED, check_predicate_true},
32    datetime::{add_n_months, subtract_n_months},
33    serialization::Serializable,
34};
35use serde::{Deserialize, Deserializer, Serialize, Serializer};
36
37use super::GetTsInit;
38use crate::{
39    enums::{AggregationSource, BarAggregation, PriceType},
40    identifiers::InstrumentId,
41    types::{Price, Quantity, fixed::FIXED_SIZE_BINARY},
42};
43
44/// Returns the bar interval as a `TimeDelta`.
45///
46/// # Panics
47///
48/// This function panics:
49/// - If the aggregation method of the given `bar_type` is not time based.
50pub fn get_bar_interval(bar_type: &BarType) -> TimeDelta {
51    let spec = bar_type.spec();
52
53    match spec.aggregation {
54        BarAggregation::Millisecond => TimeDelta::milliseconds(spec.step.get() as i64),
55        BarAggregation::Second => TimeDelta::seconds(spec.step.get() as i64),
56        BarAggregation::Minute => TimeDelta::minutes(spec.step.get() as i64),
57        BarAggregation::Hour => TimeDelta::hours(spec.step.get() as i64),
58        BarAggregation::Day => TimeDelta::days(spec.step.get() as i64),
59        BarAggregation::Week => TimeDelta::days(7 * spec.step.get() as i64),
60        BarAggregation::Month => TimeDelta::days(0),
61        _ => panic!("Aggregation not time based"),
62    }
63}
64
65/// Returns the bar interval as `UnixNanos`.
66///
67/// # Panics
68///
69/// This function panics:
70/// - If the aggregation method of the given `bar_type` is not time based.
71pub fn get_bar_interval_ns(bar_type: &BarType) -> UnixNanos {
72    let interval_ns = get_bar_interval(bar_type)
73        .num_nanoseconds()
74        .expect("Invalid bar interval") as u64;
75    UnixNanos::from(interval_ns)
76}
77
78/// Returns the time bar start as a timezone-aware `DateTime<Utc>`.
79/// Returns the time bar start as a timezone-aware `DateTime<Utc>`.
80pub fn get_time_bar_start(
81    now: DateTime<Utc>,
82    bar_type: &BarType,
83    time_bars_origin: Option<TimeDelta>,
84) -> DateTime<Utc> {
85    let spec = bar_type.spec();
86    let step = spec.step.get() as i64;
87    let origin_offset: TimeDelta = time_bars_origin.unwrap_or_else(TimeDelta::zero);
88
89    match spec.aggregation {
90        BarAggregation::Millisecond => {
91            let mut start_time = now.trunc_subsecs(0);
92            start_time += origin_offset;
93
94            if now < start_time {
95                start_time -= Duration::seconds(1);
96            }
97
98            while start_time <= now {
99                start_time += Duration::milliseconds(step);
100            }
101
102            start_time -= Duration::milliseconds(step);
103            start_time
104        }
105        BarAggregation::Second => {
106            let mut start_time = now.trunc_subsecs(0) - Duration::seconds(now.second() as i64);
107            start_time += origin_offset;
108
109            if now < start_time {
110                start_time -= Duration::minutes(1);
111            }
112
113            while start_time <= now {
114                start_time += Duration::seconds(step);
115            }
116
117            start_time -= Duration::seconds(step);
118            start_time
119        }
120        BarAggregation::Minute => {
121            let mut start_time = now.trunc_subsecs(0)
122                - Duration::seconds(now.second() as i64)
123                - Duration::minutes(now.minute() as i64);
124            start_time += origin_offset;
125
126            if now < start_time {
127                start_time -= Duration::hours(1);
128            }
129
130            while start_time <= now {
131                start_time += Duration::minutes(step);
132            }
133
134            start_time -= Duration::minutes(step);
135            start_time
136        }
137        BarAggregation::Hour => {
138            let mut start_time = now.trunc_subsecs(0)
139                - Duration::seconds(now.second() as i64)
140                - Duration::minutes(now.minute() as i64)
141                - Duration::hours(now.hour() as i64);
142            start_time += origin_offset;
143
144            if now < start_time {
145                start_time -= Duration::days(1);
146            }
147
148            while start_time <= now {
149                start_time += Duration::hours(step);
150            }
151
152            start_time -= Duration::hours(step);
153            start_time
154        }
155        BarAggregation::Day => {
156            let mut start_time = now.trunc_subsecs(0)
157                - Duration::seconds(now.second() as i64)
158                - Duration::minutes(now.minute() as i64)
159                - Duration::hours(now.hour() as i64);
160            start_time += origin_offset;
161
162            if now < start_time {
163                start_time -= Duration::days(1);
164            }
165
166            start_time
167        }
168        BarAggregation::Week => {
169            let mut start_time = now.trunc_subsecs(0)
170                - Duration::seconds(now.second() as i64)
171                - Duration::minutes(now.minute() as i64)
172                - Duration::hours(now.hour() as i64)
173                - TimeDelta::days(now.weekday().num_days_from_monday() as i64);
174            start_time += origin_offset;
175
176            if now < start_time {
177                start_time -= Duration::weeks(1);
178            }
179
180            start_time
181        }
182        BarAggregation::Month => {
183            // Set to the first day of the year
184            let mut start_time = DateTime::from_naive_utc_and_offset(
185                chrono::NaiveDate::from_ymd_opt(now.year(), 1, 1)
186                    .expect("valid date")
187                    .and_hms_opt(0, 0, 0)
188                    .expect("valid time"),
189                Utc,
190            );
191            start_time += origin_offset;
192
193            if now < start_time {
194                start_time = subtract_n_months(start_time, 12);
195            }
196
197            let months_step = step as u32;
198            while start_time <= now {
199                start_time = add_n_months(start_time, months_step);
200            }
201
202            start_time = subtract_n_months(start_time, months_step);
203            start_time
204        }
205        _ => panic!(
206            "Aggregation type {} not supported for time bars",
207            spec.aggregation
208        ),
209    }
210}
211
212/// Represents a bar aggregation specification including a step, aggregation
213/// method/rule and price type.
214#[repr(C)]
215#[derive(
216    Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Builder,
217)]
218#[cfg_attr(
219    feature = "python",
220    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
221)]
222pub struct BarSpecification {
223    /// The step for binning samples for bar aggregation.
224    pub step: NonZeroUsize,
225    /// The type of bar aggregation.
226    pub aggregation: BarAggregation,
227    /// The price type to use for aggregation.
228    pub price_type: PriceType,
229}
230
231impl BarSpecification {
232    /// Creates a new [`BarSpecification`] instance with correctness checking.
233    ///
234    /// # Errors
235    ///
236    /// This function returns an error:
237    /// - If `step` is not positive (> 0).
238    ///
239    /// # Notes
240    ///
241    /// PyO3 requires a `Result` type for proper error handling and stacktrace printing in Python.
242    pub fn new_checked(
243        step: usize,
244        aggregation: BarAggregation,
245        price_type: PriceType,
246    ) -> anyhow::Result<Self> {
247        let step = NonZeroUsize::new(step)
248            .ok_or(anyhow::anyhow!("Invalid step: {step} (must be non-zero)"))?;
249        Ok(Self {
250            step,
251            aggregation,
252            price_type,
253        })
254    }
255
256    /// Creates a new [`BarSpecification`] instance.
257    ///
258    /// # Panics
259    ///
260    /// This function panics:
261    /// - If `step` is not positive (> 0).
262    #[must_use]
263    pub fn new(step: usize, aggregation: BarAggregation, price_type: PriceType) -> Self {
264        Self::new_checked(step, aggregation, price_type).expect(FAILED)
265    }
266
267    pub fn timedelta(&self) -> TimeDelta {
268        match self.aggregation {
269            BarAggregation::Millisecond => Duration::milliseconds(self.step.get() as i64),
270            BarAggregation::Second => Duration::seconds(self.step.get() as i64),
271            BarAggregation::Minute => Duration::minutes(self.step.get() as i64),
272            BarAggregation::Hour => Duration::hours(self.step.get() as i64),
273            BarAggregation::Day => Duration::days(self.step.get() as i64),
274            _ => panic!(
275                "Timedelta not supported for aggregation type: {:?}",
276                self.aggregation
277            ),
278        }
279    }
280
281    /// Return a value indicating whether the aggregation method is time-driven:
282    ///  - [`BarAggregation::Millisecond`]
283    ///  - [`BarAggregation::Second`]
284    ///  - [`BarAggregation::Minute`]
285    ///  - [`BarAggregation::Hour`]
286    ///  - [`BarAggregation::Day`]
287    ///  - [`BarAggregation::Month`]
288    pub fn is_time_aggregated(&self) -> bool {
289        matches!(
290            self.aggregation,
291            BarAggregation::Millisecond
292                | BarAggregation::Second
293                | BarAggregation::Minute
294                | BarAggregation::Hour
295                | BarAggregation::Day
296                | BarAggregation::Month
297        )
298    }
299
300    /// Return a value indicating whether the aggregation method is threshold-driven:
301    ///  - [`BarAggregation::Tick`]
302    ///  - [`BarAggregation::TickImbalance`]
303    ///  - [`BarAggregation::Volume`]
304    ///  - [`BarAggregation::VolumeImbalance`]
305    ///  - [`BarAggregation::Value`]
306    ///  - [`BarAggregation::ValueImbalance`]
307    pub fn is_threshold_aggregated(&self) -> bool {
308        matches!(
309            self.aggregation,
310            BarAggregation::Tick
311                | BarAggregation::TickImbalance
312                | BarAggregation::Volume
313                | BarAggregation::VolumeImbalance
314                | BarAggregation::Value
315                | BarAggregation::ValueImbalance
316        )
317    }
318
319    /// Return a value indicating whether the aggregation method is information-driven:
320    ///  - [`BarAggregation::TickRuns`]
321    ///  - [`BarAggregation::VolumeRuns`]
322    ///  - [`BarAggregation::ValueRuns`]
323    pub fn is_information_aggregated(&self) -> bool {
324        matches!(
325            self.aggregation,
326            BarAggregation::TickRuns | BarAggregation::VolumeRuns | BarAggregation::ValueRuns
327        )
328    }
329}
330
331impl Display for BarSpecification {
332    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
333        write!(f, "{}-{}-{}", self.step, self.aggregation, self.price_type)
334    }
335}
336
337/// Represents a bar type including the instrument ID, bar specification and
338/// aggregation source.
339#[repr(C)]
340#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
341#[cfg_attr(
342    feature = "python",
343    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
344)]
345pub enum BarType {
346    Standard {
347        /// The bar type's instrument ID.
348        instrument_id: InstrumentId,
349        /// The bar type's specification.
350        spec: BarSpecification,
351        /// The bar type's aggregation source.
352        aggregation_source: AggregationSource,
353    },
354    Composite {
355        /// The bar type's instrument ID.
356        instrument_id: InstrumentId,
357        /// The bar type's specification.
358        spec: BarSpecification,
359        /// The bar type's aggregation source.
360        aggregation_source: AggregationSource,
361
362        /// The composite step for binning samples for bar aggregation.
363        composite_step: usize,
364        /// The composite type of bar aggregation.
365        composite_aggregation: BarAggregation,
366        /// The composite bar type's aggregation source.
367        composite_aggregation_source: AggregationSource,
368    },
369}
370
371impl BarType {
372    /// Creates a new [`BarType`] instance.
373    #[must_use]
374    pub fn new(
375        instrument_id: InstrumentId,
376        spec: BarSpecification,
377        aggregation_source: AggregationSource,
378    ) -> Self {
379        Self::Standard {
380            instrument_id,
381            spec,
382            aggregation_source,
383        }
384    }
385
386    /// Creates a new composite [`BarType`] instance.
387    pub fn new_composite(
388        instrument_id: InstrumentId,
389        spec: BarSpecification,
390        aggregation_source: AggregationSource,
391
392        composite_step: usize,
393        composite_aggregation: BarAggregation,
394        composite_aggregation_source: AggregationSource,
395    ) -> Self {
396        Self::Composite {
397            instrument_id,
398            spec,
399            aggregation_source,
400
401            composite_step,
402            composite_aggregation,
403            composite_aggregation_source,
404        }
405    }
406
407    /// Returns whether this instance is a standard bar type.
408    pub fn is_standard(&self) -> bool {
409        match &self {
410            BarType::Standard { .. } => true,
411            BarType::Composite { .. } => false,
412        }
413    }
414
415    /// Returns whether this instance is a composite bar type.
416    pub fn is_composite(&self) -> bool {
417        match &self {
418            BarType::Standard { .. } => false,
419            BarType::Composite { .. } => true,
420        }
421    }
422
423    /// Returns the standard bar type component.
424    pub fn standard(&self) -> Self {
425        match &self {
426            &&b @ BarType::Standard { .. } => b,
427            BarType::Composite {
428                instrument_id,
429                spec,
430                aggregation_source,
431                ..
432            } => Self::new(*instrument_id, *spec, *aggregation_source),
433        }
434    }
435
436    /// Returns any composite bar type component.
437    pub fn composite(&self) -> Self {
438        match &self {
439            &&b @ BarType::Standard { .. } => b, // case shouldn't be used if is_composite is called before
440            BarType::Composite {
441                instrument_id,
442                spec,
443                aggregation_source: _,
444
445                composite_step,
446                composite_aggregation,
447                composite_aggregation_source,
448            } => Self::new(
449                *instrument_id,
450                BarSpecification::new(*composite_step, *composite_aggregation, spec.price_type),
451                *composite_aggregation_source,
452            ),
453        }
454    }
455
456    /// Returns the [`InstrumentId`] for this bar type.
457    pub fn instrument_id(&self) -> InstrumentId {
458        match &self {
459            BarType::Standard { instrument_id, .. } | BarType::Composite { instrument_id, .. } => {
460                *instrument_id
461            }
462        }
463    }
464
465    /// Returns the [`BarSpecification`] for this bar type.
466    pub fn spec(&self) -> BarSpecification {
467        match &self {
468            BarType::Standard { spec, .. } | BarType::Composite { spec, .. } => *spec,
469        }
470    }
471
472    /// Returns the [`AggregationSource`] for this bar type.
473    pub fn aggregation_source(&self) -> AggregationSource {
474        match &self {
475            BarType::Standard {
476                aggregation_source, ..
477            }
478            | BarType::Composite {
479                aggregation_source, ..
480            } => *aggregation_source,
481        }
482    }
483}
484
485#[derive(thiserror::Error, Debug)]
486#[error("Error parsing `BarType` from '{input}', invalid token: '{token}' at position {position}")]
487pub struct BarTypeParseError {
488    input: String,
489    token: String,
490    position: usize,
491}
492
493impl FromStr for BarType {
494    type Err = BarTypeParseError;
495
496    fn from_str(s: &str) -> Result<Self, Self::Err> {
497        let parts: Vec<&str> = s.split('@').collect();
498        let standard = parts[0];
499        let composite_str = parts.get(1);
500
501        let pieces: Vec<&str> = standard.rsplitn(5, '-').collect();
502        let rev_pieces: Vec<&str> = pieces.into_iter().rev().collect();
503        if rev_pieces.len() != 5 {
504            return Err(BarTypeParseError {
505                input: s.to_string(),
506                token: String::new(),
507                position: 0,
508            });
509        }
510
511        let instrument_id =
512            InstrumentId::from_str(rev_pieces[0]).map_err(|_| BarTypeParseError {
513                input: s.to_string(),
514                token: rev_pieces[0].to_string(),
515                position: 0,
516            })?;
517
518        let step = rev_pieces[1].parse().map_err(|_| BarTypeParseError {
519            input: s.to_string(),
520            token: rev_pieces[1].to_string(),
521            position: 1,
522        })?;
523        let aggregation =
524            BarAggregation::from_str(rev_pieces[2]).map_err(|_| BarTypeParseError {
525                input: s.to_string(),
526                token: rev_pieces[2].to_string(),
527                position: 2,
528            })?;
529        let price_type = PriceType::from_str(rev_pieces[3]).map_err(|_| BarTypeParseError {
530            input: s.to_string(),
531            token: rev_pieces[3].to_string(),
532            position: 3,
533        })?;
534        let aggregation_source =
535            AggregationSource::from_str(rev_pieces[4]).map_err(|_| BarTypeParseError {
536                input: s.to_string(),
537                token: rev_pieces[4].to_string(),
538                position: 4,
539            })?;
540
541        if let Some(composite_str) = composite_str {
542            let composite_pieces: Vec<&str> = composite_str.rsplitn(3, '-').collect();
543            let rev_composite_pieces: Vec<&str> = composite_pieces.into_iter().rev().collect();
544            if rev_composite_pieces.len() != 3 {
545                return Err(BarTypeParseError {
546                    input: s.to_string(),
547                    token: String::new(),
548                    position: 5,
549                });
550            }
551
552            let composite_step =
553                rev_composite_pieces[0]
554                    .parse()
555                    .map_err(|_| BarTypeParseError {
556                        input: s.to_string(),
557                        token: rev_composite_pieces[0].to_string(),
558                        position: 5,
559                    })?;
560            let composite_aggregation =
561                BarAggregation::from_str(rev_composite_pieces[1]).map_err(|_| {
562                    BarTypeParseError {
563                        input: s.to_string(),
564                        token: rev_composite_pieces[1].to_string(),
565                        position: 6,
566                    }
567                })?;
568            let composite_aggregation_source = AggregationSource::from_str(rev_composite_pieces[2])
569                .map_err(|_| BarTypeParseError {
570                    input: s.to_string(),
571                    token: rev_composite_pieces[2].to_string(),
572                    position: 7,
573                })?;
574
575            Ok(Self::new_composite(
576                instrument_id,
577                BarSpecification::new(step, aggregation, price_type),
578                aggregation_source,
579                composite_step,
580                composite_aggregation,
581                composite_aggregation_source,
582            ))
583        } else {
584            Ok(Self::Standard {
585                instrument_id,
586                spec: BarSpecification::new(step, aggregation, price_type),
587                aggregation_source,
588            })
589        }
590    }
591}
592
593impl From<&str> for BarType {
594    fn from(value: &str) -> Self {
595        Self::from_str(value).expect(FAILED)
596    }
597}
598
599impl Display for BarType {
600    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
601        match &self {
602            BarType::Standard {
603                instrument_id,
604                spec,
605                aggregation_source,
606            } => {
607                write!(f, "{}-{}-{}", instrument_id, spec, aggregation_source)
608            }
609            BarType::Composite {
610                instrument_id,
611                spec,
612                aggregation_source,
613
614                composite_step,
615                composite_aggregation,
616                composite_aggregation_source,
617            } => {
618                write!(
619                    f,
620                    "{}-{}-{}@{}-{}-{}",
621                    instrument_id,
622                    spec,
623                    aggregation_source,
624                    *composite_step,
625                    *composite_aggregation,
626                    *composite_aggregation_source
627                )
628            }
629        }
630    }
631}
632
633impl Serialize for BarType {
634    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
635    where
636        S: Serializer,
637    {
638        serializer.serialize_str(&self.to_string())
639    }
640}
641
642impl<'de> Deserialize<'de> for BarType {
643    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
644    where
645        D: Deserializer<'de>,
646    {
647        let s: String = Deserialize::deserialize(deserializer)?;
648        Self::from_str(&s).map_err(serde::de::Error::custom)
649    }
650}
651
652/// Represents an aggregated bar.
653#[repr(C)]
654#[derive(Clone, Copy, Hash, PartialEq, Eq, Debug, Serialize, Deserialize)]
655#[serde(tag = "type")]
656#[cfg_attr(
657    feature = "python",
658    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
659)]
660pub struct Bar {
661    /// The bar type for this bar.
662    pub bar_type: BarType,
663    /// The bars open price.
664    pub open: Price,
665    /// The bars high price.
666    pub high: Price,
667    /// The bars low price.
668    pub low: Price,
669    /// The bars close price.
670    pub close: Price,
671    /// The bars volume.
672    pub volume: Quantity,
673    /// UNIX timestamp (nanoseconds) when the data event occurred.
674    pub ts_event: UnixNanos,
675    /// UNIX timestamp (nanoseconds) when the struct was initialized.
676    pub ts_init: UnixNanos,
677}
678
679impl Bar {
680    /// Creates a new [`Bar`] instance with correctness checking.
681    ///
682    /// # Errors
683    ///
684    /// This function returns an error:
685    /// - If `high` is not >= `low`.
686    /// - If `high` is not >= `close`.
687    /// - If `low` is not <= `close.
688    ///
689    /// # Notes
690    ///
691    /// PyO3 requires a `Result` type for proper error handling and stacktrace printing in Python.
692    #[allow(clippy::too_many_arguments)]
693    pub fn new_checked(
694        bar_type: BarType,
695        open: Price,
696        high: Price,
697        low: Price,
698        close: Price,
699        volume: Quantity,
700        ts_event: UnixNanos,
701        ts_init: UnixNanos,
702    ) -> anyhow::Result<Self> {
703        check_predicate_true(high >= open, "high >= open")?;
704        check_predicate_true(high >= low, "high >= low")?;
705        check_predicate_true(high >= close, "high >= close")?;
706        check_predicate_true(low <= close, "low <= close")?;
707        check_predicate_true(low <= open, "low <= open")?;
708
709        Ok(Self {
710            bar_type,
711            open,
712            high,
713            low,
714            close,
715            volume,
716            ts_event,
717            ts_init,
718        })
719    }
720
721    /// Creates a new [`Bar`] instance.
722    ///
723    /// # Panics
724    ///
725    /// This function panics:
726    /// - If `high` is not >= `low`.
727    /// - If `high` is not >= `close`.
728    /// - If `low` is not <= `close.
729    #[allow(clippy::too_many_arguments)]
730    pub fn new(
731        bar_type: BarType,
732        open: Price,
733        high: Price,
734        low: Price,
735        close: Price,
736        volume: Quantity,
737        ts_event: UnixNanos,
738        ts_init: UnixNanos,
739    ) -> Self {
740        Self::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
741            .expect(FAILED)
742    }
743
744    pub fn instrument_id(&self) -> InstrumentId {
745        self.bar_type.instrument_id()
746    }
747
748    /// Returns the metadata for the type, for use with serialization formats.
749    #[must_use]
750    pub fn get_metadata(
751        bar_type: &BarType,
752        price_precision: u8,
753        size_precision: u8,
754    ) -> HashMap<String, String> {
755        let mut metadata = HashMap::new();
756        let instrument_id = bar_type.instrument_id();
757        metadata.insert("bar_type".to_string(), bar_type.to_string());
758        metadata.insert("instrument_id".to_string(), instrument_id.to_string());
759        metadata.insert("price_precision".to_string(), price_precision.to_string());
760        metadata.insert("size_precision".to_string(), size_precision.to_string());
761        metadata
762    }
763
764    /// Returns the field map for the type, for use with Arrow schemas.
765    #[must_use]
766    pub fn get_fields() -> IndexMap<String, String> {
767        let mut metadata = IndexMap::new();
768        metadata.insert("open".to_string(), FIXED_SIZE_BINARY.to_string());
769        metadata.insert("high".to_string(), FIXED_SIZE_BINARY.to_string());
770        metadata.insert("low".to_string(), FIXED_SIZE_BINARY.to_string());
771        metadata.insert("close".to_string(), FIXED_SIZE_BINARY.to_string());
772        metadata.insert("volume".to_string(), FIXED_SIZE_BINARY.to_string());
773        metadata.insert("ts_event".to_string(), "UInt64".to_string());
774        metadata.insert("ts_init".to_string(), "UInt64".to_string());
775        metadata
776    }
777}
778
779impl Display for Bar {
780    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
781        write!(
782            f,
783            "{},{},{},{},{},{},{}",
784            self.bar_type, self.open, self.high, self.low, self.close, self.volume, self.ts_event
785        )
786    }
787}
788
789impl Serializable for Bar {}
790
791impl GetTsInit for Bar {
792    fn ts_init(&self) -> UnixNanos {
793        self.ts_init
794    }
795}
796
797////////////////////////////////////////////////////////////////////////////////
798// Tests
799////////////////////////////////////////////////////////////////////////////////
800#[cfg(test)]
801mod tests {
802    use chrono::TimeZone;
803    use rstest::rstest;
804
805    use super::*;
806    use crate::identifiers::{Symbol, Venue};
807
808    #[rstest]
809    fn test_bar_specification_new_invalid() {
810        let result = BarSpecification::new_checked(0, BarAggregation::Tick, PriceType::Last);
811        assert!(result.is_err());
812    }
813
814    #[rstest]
815    #[should_panic(expected = "Invalid step: 0 (must be non-zero)")]
816    fn test_bar_specification_new_checked_with_invalid_step_panics() {
817        let aggregation = BarAggregation::Tick;
818        let price_type = PriceType::Last;
819
820        let _ = BarSpecification::new(0, aggregation, price_type);
821    }
822
823    #[rstest]
824    #[case(BarAggregation::Millisecond, 1, TimeDelta::milliseconds(1))]
825    #[case(BarAggregation::Millisecond, 10, TimeDelta::milliseconds(10))]
826    #[case(BarAggregation::Second, 1, TimeDelta::seconds(1))]
827    #[case(BarAggregation::Second, 15, TimeDelta::seconds(15))]
828    #[case(BarAggregation::Minute, 1, TimeDelta::minutes(1))]
829    #[case(BarAggregation::Minute, 60, TimeDelta::minutes(60))]
830    #[case(BarAggregation::Hour, 1, TimeDelta::hours(1))]
831    #[case(BarAggregation::Hour, 4, TimeDelta::hours(4))]
832    #[case(BarAggregation::Day, 1, TimeDelta::days(1))]
833    #[case(BarAggregation::Day, 2, TimeDelta::days(2))]
834    #[should_panic(expected = "Aggregation not time based")]
835    #[case(BarAggregation::Tick, 1, TimeDelta::zero())]
836    fn test_get_bar_interval(
837        #[case] aggregation: BarAggregation,
838        #[case] step: usize,
839        #[case] expected: TimeDelta,
840    ) {
841        let bar_type = BarType::Standard {
842            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
843            spec: BarSpecification::new(step, aggregation, PriceType::Last),
844            aggregation_source: AggregationSource::Internal,
845        };
846
847        let interval = get_bar_interval(&bar_type);
848        assert_eq!(interval, expected);
849    }
850
851    #[rstest]
852    #[case(BarAggregation::Millisecond, 1, UnixNanos::from(1_000_000))]
853    #[case(BarAggregation::Millisecond, 10, UnixNanos::from(10_000_000))]
854    #[case(BarAggregation::Second, 1, UnixNanos::from(1_000_000_000))]
855    #[case(BarAggregation::Second, 10, UnixNanos::from(10_000_000_000))]
856    #[case(BarAggregation::Minute, 1, UnixNanos::from(60_000_000_000))]
857    #[case(BarAggregation::Minute, 60, UnixNanos::from(3_600_000_000_000))]
858    #[case(BarAggregation::Hour, 1, UnixNanos::from(3_600_000_000_000))]
859    #[case(BarAggregation::Hour, 4, UnixNanos::from(14_400_000_000_000))]
860    #[case(BarAggregation::Day, 1, UnixNanos::from(86_400_000_000_000))]
861    #[case(BarAggregation::Day, 2, UnixNanos::from(172_800_000_000_000))]
862    #[should_panic(expected = "Aggregation not time based")]
863    #[case(BarAggregation::Tick, 1, UnixNanos::from(0))]
864    fn test_get_bar_interval_ns(
865        #[case] aggregation: BarAggregation,
866        #[case] step: usize,
867        #[case] expected: UnixNanos,
868    ) {
869        let bar_type = BarType::Standard {
870            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
871            spec: BarSpecification::new(step, aggregation, PriceType::Last),
872            aggregation_source: AggregationSource::Internal,
873        };
874
875        let interval_ns = get_bar_interval_ns(&bar_type);
876        assert_eq!(interval_ns, expected);
877    }
878
879    #[rstest]
880    #[case::millisecond(
881    Utc.timestamp_opt(1658349296, 123_000_000).unwrap(), // 2024-07-21 12:34:56.123 UTC
882    BarAggregation::Millisecond,
883    1,
884    Utc.timestamp_opt(1658349296, 123_000_000).unwrap(),  // 2024-07-21 12:34:56.123 UTC
885    )]
886    #[rstest]
887    #[case::millisecond(
888    Utc.timestamp_opt(1658349296, 123_000_000).unwrap(), // 2024-07-21 12:34:56.123 UTC
889    BarAggregation::Millisecond,
890    10,
891    Utc.timestamp_opt(1658349296, 120_000_000).unwrap(),  // 2024-07-21 12:34:56.120 UTC
892    )]
893    #[case::second(
894    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
895    BarAggregation::Millisecond,
896    1000,
897    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap()
898    )]
899    #[case::second(
900    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
901    BarAggregation::Second,
902    1,
903    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap()
904    )]
905    #[case::second(
906    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
907    BarAggregation::Second,
908    5,
909    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 55).unwrap()
910    )]
911    #[case::second(
912    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
913    BarAggregation::Second,
914    60,
915    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 0).unwrap()
916    )]
917    #[case::minute(
918    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
919    BarAggregation::Minute,
920    1,
921    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 0).unwrap()
922    )]
923    #[case::minute(
924    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
925    BarAggregation::Minute,
926    5,
927    Utc.with_ymd_and_hms(2024, 7, 21, 12, 30, 0).unwrap()
928    )]
929    #[case::minute(
930    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
931    BarAggregation::Minute,
932    60,
933    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
934    )]
935    #[case::hour(
936    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
937    BarAggregation::Hour,
938    1,
939    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
940    )]
941    #[case::hour(
942    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
943    BarAggregation::Hour,
944    2,
945    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
946    )]
947    #[case::day(
948    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
949    BarAggregation::Day,
950    1,
951    Utc.with_ymd_and_hms(2024, 7, 21, 0, 0, 0).unwrap()
952    )]
953    fn test_get_time_bar_start(
954        #[case] now: DateTime<Utc>,
955        #[case] aggregation: BarAggregation,
956        #[case] step: usize,
957        #[case] expected: DateTime<Utc>,
958    ) {
959        let bar_type = BarType::Standard {
960            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
961            spec: BarSpecification::new(step, aggregation, PriceType::Last),
962            aggregation_source: AggregationSource::Internal,
963        };
964
965        let start_time = get_time_bar_start(now, &bar_type, None);
966        assert_eq!(start_time, expected);
967    }
968
969    #[rstest]
970    fn test_bar_spec_string_reprs() {
971        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
972        assert_eq!(bar_spec.to_string(), "1-MINUTE-BID");
973        assert_eq!(format!("{bar_spec}"), "1-MINUTE-BID");
974    }
975
976    #[rstest]
977    fn test_bar_type_parse_valid() {
978        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL";
979        let bar_type = BarType::from(input);
980
981        assert_eq!(
982            bar_type.instrument_id(),
983            InstrumentId::from("BTCUSDT-PERP.BINANCE")
984        );
985        assert_eq!(
986            bar_type.spec(),
987            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
988        );
989        assert_eq!(bar_type.aggregation_source(), AggregationSource::External);
990        assert_eq!(bar_type, BarType::from(input));
991    }
992
993    #[rstest]
994    fn test_bar_type_composite_parse_valid() {
995        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL";
996        let bar_type = BarType::from(input);
997        let standard = bar_type.standard();
998
999        assert_eq!(
1000            bar_type.instrument_id(),
1001            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1002        );
1003        assert_eq!(
1004            bar_type.spec(),
1005            BarSpecification::new(2, BarAggregation::Minute, PriceType::Last,)
1006        );
1007        assert_eq!(bar_type.aggregation_source(), AggregationSource::Internal);
1008        assert_eq!(bar_type, BarType::from(input));
1009        assert!(bar_type.is_composite());
1010
1011        assert_eq!(
1012            standard.instrument_id(),
1013            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1014        );
1015        assert_eq!(
1016            standard.spec(),
1017            BarSpecification::new(2, BarAggregation::Minute, PriceType::Last,)
1018        );
1019        assert_eq!(standard.aggregation_source(), AggregationSource::Internal);
1020        assert!(standard.is_standard());
1021
1022        let composite = bar_type.composite();
1023        let composite_input = "BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL";
1024
1025        assert_eq!(
1026            composite.instrument_id(),
1027            InstrumentId::from("BTCUSDT-PERP.BINANCE")
1028        );
1029        assert_eq!(
1030            composite.spec(),
1031            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last,)
1032        );
1033        assert_eq!(composite.aggregation_source(), AggregationSource::External);
1034        assert_eq!(composite, BarType::from(composite_input));
1035        assert!(composite.is_standard());
1036    }
1037
1038    #[rstest]
1039    fn test_bar_type_parse_invalid_token_pos_0() {
1040        let input = "BTCUSDT-PERP-1-MINUTE-LAST-INTERNAL";
1041        let result = BarType::from_str(input);
1042
1043        assert_eq!(
1044            result.unwrap_err().to_string(),
1045            format!(
1046                "Error parsing `BarType` from '{input}', invalid token: 'BTCUSDT-PERP' at position 0"
1047            )
1048        );
1049    }
1050
1051    #[rstest]
1052    fn test_bar_type_parse_invalid_token_pos_1() {
1053        let input = "BTCUSDT-PERP.BINANCE-INVALID-MINUTE-LAST-INTERNAL";
1054        let result = BarType::from_str(input);
1055
1056        assert_eq!(
1057            result.unwrap_err().to_string(),
1058            format!(
1059                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 1"
1060            )
1061        );
1062    }
1063
1064    #[rstest]
1065    fn test_bar_type_parse_invalid_token_pos_2() {
1066        let input = "BTCUSDT-PERP.BINANCE-1-INVALID-LAST-INTERNAL";
1067        let result = BarType::from_str(input);
1068
1069        assert_eq!(
1070            result.unwrap_err().to_string(),
1071            format!(
1072                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 2"
1073            )
1074        );
1075    }
1076
1077    #[rstest]
1078    fn test_bar_type_parse_invalid_token_pos_3() {
1079        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-INVALID-INTERNAL";
1080        let result = BarType::from_str(input);
1081
1082        assert_eq!(
1083            result.unwrap_err().to_string(),
1084            format!(
1085                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 3"
1086            )
1087        );
1088    }
1089
1090    #[rstest]
1091    fn test_bar_type_parse_invalid_token_pos_4() {
1092        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-BID-INVALID";
1093        let result = BarType::from_str(input);
1094
1095        assert!(result.is_err());
1096        assert_eq!(
1097            result.unwrap_err().to_string(),
1098            format!(
1099                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 4"
1100            )
1101        );
1102    }
1103
1104    #[rstest]
1105    fn test_bar_type_parse_invalid_token_pos_5() {
1106        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@INVALID-MINUTE-EXTERNAL";
1107        let result = BarType::from_str(input);
1108
1109        assert!(result.is_err());
1110        assert_eq!(
1111            result.unwrap_err().to_string(),
1112            format!(
1113                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 5"
1114            )
1115        );
1116    }
1117
1118    #[rstest]
1119    fn test_bar_type_parse_invalid_token_pos_6() {
1120        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-INVALID-EXTERNAL";
1121        let result = BarType::from_str(input);
1122
1123        assert!(result.is_err());
1124        assert_eq!(
1125            result.unwrap_err().to_string(),
1126            format!(
1127                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 6"
1128            )
1129        );
1130    }
1131
1132    #[rstest]
1133    fn test_bar_type_parse_invalid_token_pos_7() {
1134        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-INVALID";
1135        let result = BarType::from_str(input);
1136
1137        assert!(result.is_err());
1138        assert_eq!(
1139            result.unwrap_err().to_string(),
1140            format!(
1141                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 7"
1142            )
1143        );
1144    }
1145
1146    #[rstest]
1147    fn test_bar_type_equality() {
1148        let instrument_id1 = InstrumentId {
1149            symbol: Symbol::new("AUD/USD"),
1150            venue: Venue::new("SIM"),
1151        };
1152        let instrument_id2 = InstrumentId {
1153            symbol: Symbol::new("GBP/USD"),
1154            venue: Venue::new("SIM"),
1155        };
1156        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1157        let bar_type1 = BarType::Standard {
1158            instrument_id: instrument_id1,
1159            spec: bar_spec,
1160            aggregation_source: AggregationSource::External,
1161        };
1162        let bar_type2 = BarType::Standard {
1163            instrument_id: instrument_id1,
1164            spec: bar_spec,
1165            aggregation_source: AggregationSource::External,
1166        };
1167        let bar_type3 = BarType::Standard {
1168            instrument_id: instrument_id2,
1169            spec: bar_spec,
1170            aggregation_source: AggregationSource::External,
1171        };
1172        assert_eq!(bar_type1, bar_type1);
1173        assert_eq!(bar_type1, bar_type2);
1174        assert_ne!(bar_type1, bar_type3);
1175    }
1176
1177    #[rstest]
1178    fn test_bar_type_comparison() {
1179        let instrument_id1 = InstrumentId {
1180            symbol: Symbol::new("AUD/USD"),
1181            venue: Venue::new("SIM"),
1182        };
1183
1184        let instrument_id2 = InstrumentId {
1185            symbol: Symbol::new("GBP/USD"),
1186            venue: Venue::new("SIM"),
1187        };
1188        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1189        let bar_spec2 = BarSpecification::new(2, BarAggregation::Minute, PriceType::Bid);
1190        let bar_type1 = BarType::Standard {
1191            instrument_id: instrument_id1,
1192            spec: bar_spec,
1193            aggregation_source: AggregationSource::External,
1194        };
1195        let bar_type2 = BarType::Standard {
1196            instrument_id: instrument_id1,
1197            spec: bar_spec,
1198            aggregation_source: AggregationSource::External,
1199        };
1200        let bar_type3 = BarType::Standard {
1201            instrument_id: instrument_id2,
1202            spec: bar_spec,
1203            aggregation_source: AggregationSource::External,
1204        };
1205        let bar_type4 = BarType::Composite {
1206            instrument_id: instrument_id2,
1207            spec: bar_spec2,
1208            aggregation_source: AggregationSource::Internal,
1209
1210            composite_step: 1,
1211            composite_aggregation: BarAggregation::Minute,
1212            composite_aggregation_source: AggregationSource::External,
1213        };
1214
1215        assert!(bar_type1 <= bar_type2);
1216        assert!(bar_type1 < bar_type3);
1217        assert!(bar_type3 > bar_type1);
1218        assert!(bar_type3 >= bar_type1);
1219        assert!(bar_type4 >= bar_type1);
1220    }
1221
1222    #[rstest]
1223    fn test_bar_new() {
1224        let bar_type = BarType::from("AAPL.XNAS-1-MINUTE-LAST-INTERNAL");
1225        let open = Price::from("100.0");
1226        let high = Price::from("105.0");
1227        let low = Price::from("95.0");
1228        let close = Price::from("102.0");
1229        let volume = Quantity::from("1000");
1230        let ts_event = UnixNanos::from(1_000_000);
1231        let ts_init = UnixNanos::from(2_000_000);
1232
1233        let bar = Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init);
1234
1235        assert_eq!(bar.bar_type, bar_type);
1236        assert_eq!(bar.open, open);
1237        assert_eq!(bar.high, high);
1238        assert_eq!(bar.low, low);
1239        assert_eq!(bar.close, close);
1240        assert_eq!(bar.volume, volume);
1241        assert_eq!(bar.ts_event, ts_event);
1242        assert_eq!(bar.ts_init, ts_init);
1243    }
1244
1245    #[rstest]
1246    #[case("100.0", "90.0", "95.0", "92.0")] // high < open
1247    #[case("100.0", "105.0", "110.0", "102.0")] // high < low
1248    #[case("100.0", "105.0", "95.0", "110.0")] // high < close
1249    #[case("100.0", "105.0", "95.0", "90.0")] // low > close
1250    #[case("100.0", "110.0", "105.0", "108.0")] // low > open
1251    #[case("100.0", "90.0", "110.0", "120.0")] // high < open, high < close, low > close
1252    fn test_bar_new_checked_conditions(
1253        #[case] open: &str,
1254        #[case] high: &str,
1255        #[case] low: &str,
1256        #[case] close: &str,
1257    ) {
1258        let bar_type = BarType::from("AAPL.XNAS-1-MINUTE-LAST-INTERNAL");
1259        let open = Price::from(open);
1260        let high = Price::from(high);
1261        let low = Price::from(low);
1262        let close = Price::from(close);
1263        let volume = Quantity::from("1000");
1264        let ts_event = UnixNanos::from(1_000_000);
1265        let ts_init = UnixNanos::from(2_000_000);
1266
1267        let result = Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init);
1268
1269        assert!(result.is_err());
1270    }
1271
1272    #[rstest]
1273    fn test_bar_equality() {
1274        let instrument_id = InstrumentId {
1275            symbol: Symbol::new("AUDUSD"),
1276            venue: Venue::new("SIM"),
1277        };
1278        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1279        let bar_type = BarType::Standard {
1280            instrument_id,
1281            spec: bar_spec,
1282            aggregation_source: AggregationSource::External,
1283        };
1284        let bar1 = Bar {
1285            bar_type,
1286            open: Price::from("1.00001"),
1287            high: Price::from("1.00004"),
1288            low: Price::from("1.00002"),
1289            close: Price::from("1.00003"),
1290            volume: Quantity::from("100000"),
1291            ts_event: UnixNanos::default(),
1292            ts_init: UnixNanos::from(1),
1293        };
1294
1295        let bar2 = Bar {
1296            bar_type,
1297            open: Price::from("1.00000"),
1298            high: Price::from("1.00004"),
1299            low: Price::from("1.00002"),
1300            close: Price::from("1.00003"),
1301            volume: Quantity::from("100000"),
1302            ts_event: UnixNanos::default(),
1303            ts_init: UnixNanos::from(1),
1304        };
1305        assert_eq!(bar1, bar1);
1306        assert_ne!(bar1, bar2);
1307    }
1308
1309    #[rstest]
1310    fn test_json_serialization() {
1311        let bar = Bar::default();
1312        let serialized = bar.as_json_bytes().unwrap();
1313        let deserialized = Bar::from_json_bytes(serialized.as_ref()).unwrap();
1314        assert_eq!(deserialized, bar);
1315    }
1316
1317    #[rstest]
1318    fn test_msgpack_serialization() {
1319        let bar = Bar::default();
1320        let serialized = bar.as_msgpack_bytes().unwrap();
1321        let deserialized = Bar::from_msgpack_bytes(serialized.as_ref()).unwrap();
1322        assert_eq!(deserialized, bar);
1323    }
1324}