nautilus_model/data/
bar.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bar aggregate structures, data types and functionality.
17
18use std::{
19    collections::HashMap,
20    fmt::{Debug, Display, Formatter},
21    hash::Hash,
22    num::NonZeroUsize,
23    str::FromStr,
24};
25
26use chrono::{DateTime, Datelike, Duration, TimeDelta, Timelike, Utc};
27use derive_builder::Builder;
28use indexmap::IndexMap;
29use nautilus_core::{
30    correctness::{check_predicate_true, FAILED},
31    serialization::Serializable,
32    UnixNanos,
33};
34use serde::{Deserialize, Deserializer, Serialize, Serializer};
35
36use super::GetTsInit;
37use crate::{
38    enums::{AggregationSource, BarAggregation, PriceType},
39    identifiers::InstrumentId,
40    types::{Price, Quantity},
41};
42
43/// Returns the bar interval as a `TimeDelta`.
44///
45/// # Panics
46///
47/// This function panics:
48/// - If the aggregation method of the given `bar_type` is not time based.
49pub fn get_bar_interval(bar_type: &BarType) -> TimeDelta {
50    let spec = bar_type.spec();
51
52    match spec.aggregation {
53        BarAggregation::Millisecond => TimeDelta::milliseconds(spec.step.get() as i64),
54        BarAggregation::Second => TimeDelta::seconds(spec.step.get() as i64),
55        BarAggregation::Minute => TimeDelta::minutes(spec.step.get() as i64),
56        BarAggregation::Hour => TimeDelta::hours(spec.step.get() as i64),
57        BarAggregation::Day => TimeDelta::days(spec.step.get() as i64),
58        _ => panic!("Aggregation not time based"),
59    }
60}
61
62/// Returns the bar interval as `UnixNanos`.
63///
64/// # Panics
65///
66/// This function panics:
67/// - If the aggregation method of the given `bar_type` is not time based.
68pub fn get_bar_interval_ns(bar_type: &BarType) -> UnixNanos {
69    let interval_ns = get_bar_interval(bar_type)
70        .num_nanoseconds()
71        .expect("Invalid bar interval") as u64;
72    UnixNanos::from(interval_ns)
73}
74
75/// Returns the time bar start as a timezone-aware `DateTime<Utc>`.
76pub fn get_time_bar_start(now: DateTime<Utc>, bar_type: &BarType) -> DateTime<Utc> {
77    let spec = bar_type.spec();
78    let step = spec.step.get();
79
80    match spec.aggregation {
81        BarAggregation::Millisecond => {
82            let diff_milliseconds = now.timestamp_subsec_millis() as usize % step;
83            now - TimeDelta::milliseconds(diff_milliseconds as i64)
84        }
85        BarAggregation::Second => {
86            let diff_seconds = now.timestamp() % step as i64;
87            now - TimeDelta::seconds(diff_seconds)
88        }
89        BarAggregation::Minute => {
90            let diff_minutes = now.time().minute() as usize % step;
91            now - TimeDelta::minutes(diff_minutes as i64)
92                - TimeDelta::seconds(now.time().second() as i64)
93        }
94        BarAggregation::Hour => {
95            let diff_hours = now.time().hour() as usize % step;
96            let diff_days = if diff_hours == 0 { 0 } else { (step / 24) - 1 };
97            now - TimeDelta::days(diff_days as i64)
98                - TimeDelta::hours(diff_hours as i64)
99                - TimeDelta::minutes(now.minute() as i64)
100                - TimeDelta::seconds(now.second() as i64)
101        }
102        BarAggregation::Day => {
103            now - TimeDelta::days(now.day() as i64 % step as i64)
104                - TimeDelta::hours(now.hour() as i64)
105                - TimeDelta::minutes(now.minute() as i64)
106                - TimeDelta::seconds(now.second() as i64)
107        }
108        _ => panic!(
109            "Aggregation type {} not supported for time bars",
110            spec.aggregation
111        ),
112    }
113}
114
115/// Represents a bar aggregation specification including a step, aggregation
116/// method/rule and price type.
117#[repr(C)]
118#[derive(
119    Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize, Builder,
120)]
121#[cfg_attr(
122    feature = "python",
123    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
124)]
125pub struct BarSpecification {
126    /// The step for binning samples for bar aggregation.
127    pub step: NonZeroUsize,
128    /// The type of bar aggregation.
129    pub aggregation: BarAggregation,
130    /// The price type to use for aggregation.
131    pub price_type: PriceType,
132}
133
134impl BarSpecification {
135    /// Creates a new [`BarSpecification`] instance with correctness checking.
136    ///
137    /// # Errors
138    ///
139    /// This function returns an error:
140    /// - If `step` is not positive (> 0).
141    ///
142    /// # Notes
143    ///
144    /// PyO3 requires a `Result` type for proper error handling and stacktrace printing in Python.
145    pub fn new_checked(
146        step: usize,
147        aggregation: BarAggregation,
148        price_type: PriceType,
149    ) -> anyhow::Result<Self> {
150        let step = NonZeroUsize::new(step)
151            .ok_or(anyhow::anyhow!("Invalid step: {step} (must be non-zero)"))?;
152        Ok(Self {
153            step,
154            aggregation,
155            price_type,
156        })
157    }
158
159    /// Creates a new [`BarSpecification`] instance.
160    ///
161    /// # Panics
162    ///
163    /// This function panics:
164    /// - If `step` is not positive (> 0).
165    #[must_use]
166    pub fn new(step: usize, aggregation: BarAggregation, price_type: PriceType) -> Self {
167        Self::new_checked(step, aggregation, price_type).expect(FAILED)
168    }
169
170    pub fn timedelta(&self) -> TimeDelta {
171        match self.aggregation {
172            BarAggregation::Millisecond => Duration::milliseconds(self.step.get() as i64),
173            BarAggregation::Second => Duration::seconds(self.step.get() as i64),
174            BarAggregation::Minute => Duration::minutes(self.step.get() as i64),
175            BarAggregation::Hour => Duration::hours(self.step.get() as i64),
176            BarAggregation::Day => Duration::days(self.step.get() as i64),
177            _ => panic!(
178                "Timedelta not supported for aggregation type: {:?}",
179                self.aggregation
180            ),
181        }
182    }
183
184    /// Return a value indicating whether the aggregation method is time-driven:
185    ///  - [`BarAggregation::Millisecond`]
186    ///  - [`BarAggregation::Second`]
187    ///  - [`BarAggregation::Minute`]
188    ///  - [`BarAggregation::Hour`]
189    ///  - [`BarAggregation::Day`]
190    ///  - [`BarAggregation::Month`]
191    pub fn is_time_aggregated(&self) -> bool {
192        matches!(
193            self.aggregation,
194            BarAggregation::Millisecond
195                | BarAggregation::Second
196                | BarAggregation::Minute
197                | BarAggregation::Hour
198                | BarAggregation::Day
199                | BarAggregation::Month
200        )
201    }
202
203    /// Return a value indicating whether the aggregation method is threshold-driven:
204    ///  - [`BarAggregation::Tick`]
205    ///  - [`BarAggregation::TickImbalance`]
206    ///  - [`BarAggregation::Volume`]
207    ///  - [`BarAggregation::VolumeImbalance`]
208    ///  - [`BarAggregation::Value`]
209    ///  - [`BarAggregation::ValueImbalance`]
210    pub fn is_threshold_aggregated(&self) -> bool {
211        matches!(
212            self.aggregation,
213            BarAggregation::Tick
214                | BarAggregation::TickImbalance
215                | BarAggregation::Volume
216                | BarAggregation::VolumeImbalance
217                | BarAggregation::Value
218                | BarAggregation::ValueImbalance
219        )
220    }
221
222    /// Return a value indicating whether the aggregation method is information-driven:
223    ///  - [`BarAggregation::TickRuns`]
224    ///  - [`BarAggregation::VolumeRuns`]
225    ///  - [`BarAggregation::ValueRuns`]
226    pub fn is_information_aggregated(&self) -> bool {
227        matches!(
228            self.aggregation,
229            BarAggregation::TickRuns | BarAggregation::VolumeRuns | BarAggregation::ValueRuns
230        )
231    }
232}
233
234impl Display for BarSpecification {
235    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
236        write!(f, "{}-{}-{}", self.step, self.aggregation, self.price_type)
237    }
238}
239
240/// Represents a bar type including the instrument ID, bar specification and
241/// aggregation source.
242#[repr(C)]
243#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
244#[cfg_attr(
245    feature = "python",
246    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
247)]
248pub enum BarType {
249    Standard {
250        /// The bar type's instrument ID.
251        instrument_id: InstrumentId,
252        /// The bar type's specification.
253        spec: BarSpecification,
254        /// The bar type's aggregation source.
255        aggregation_source: AggregationSource,
256    },
257    Composite {
258        /// The bar type's instrument ID.
259        instrument_id: InstrumentId,
260        /// The bar type's specification.
261        spec: BarSpecification,
262        /// The bar type's aggregation source.
263        aggregation_source: AggregationSource,
264
265        /// The composite step for binning samples for bar aggregation.
266        composite_step: usize,
267        /// The composite type of bar aggregation.
268        composite_aggregation: BarAggregation,
269        /// The composite bar type's aggregation source.
270        composite_aggregation_source: AggregationSource,
271    },
272}
273
274impl BarType {
275    /// Creates a new [`BarType`] instance.
276    #[must_use]
277    pub fn new(
278        instrument_id: InstrumentId,
279        spec: BarSpecification,
280        aggregation_source: AggregationSource,
281    ) -> Self {
282        Self::Standard {
283            instrument_id,
284            spec,
285            aggregation_source,
286        }
287    }
288
289    /// Creates a new composite [`BarType`] instance.
290    pub fn new_composite(
291        instrument_id: InstrumentId,
292        spec: BarSpecification,
293        aggregation_source: AggregationSource,
294
295        composite_step: usize,
296        composite_aggregation: BarAggregation,
297        composite_aggregation_source: AggregationSource,
298    ) -> Self {
299        Self::Composite {
300            instrument_id,
301            spec,
302            aggregation_source,
303
304            composite_step,
305            composite_aggregation,
306            composite_aggregation_source,
307        }
308    }
309
310    /// Returns whether this instance is a standard bar type.
311    pub fn is_standard(&self) -> bool {
312        match &self {
313            BarType::Standard { .. } => true,
314            BarType::Composite { .. } => false,
315        }
316    }
317
318    /// Returns whether this instance is a composite bar type.
319    pub fn is_composite(&self) -> bool {
320        match &self {
321            BarType::Standard { .. } => false,
322            BarType::Composite { .. } => true,
323        }
324    }
325
326    /// Returns the standard bar type component.
327    pub fn standard(&self) -> Self {
328        match &self {
329            &&b @ BarType::Standard { .. } => b,
330            BarType::Composite {
331                instrument_id,
332                spec,
333                aggregation_source,
334                ..
335            } => Self::new(*instrument_id, *spec, *aggregation_source),
336        }
337    }
338
339    /// Returns any composite bar type component.
340    pub fn composite(&self) -> Self {
341        match &self {
342            &&b @ BarType::Standard { .. } => b, // case shouldn't be used if is_composite is called before
343            BarType::Composite {
344                instrument_id,
345                spec,
346                aggregation_source: _,
347
348                composite_step,
349                composite_aggregation,
350                composite_aggregation_source,
351            } => Self::new(
352                *instrument_id,
353                BarSpecification::new(*composite_step, *composite_aggregation, spec.price_type),
354                *composite_aggregation_source,
355            ),
356        }
357    }
358
359    /// Returns the [`InstrumentId`] for this bar type.
360    pub fn instrument_id(&self) -> InstrumentId {
361        match &self {
362            BarType::Standard { instrument_id, .. } | BarType::Composite { instrument_id, .. } => {
363                *instrument_id
364            }
365        }
366    }
367
368    /// Returns the [`BarSpecification`] for this bar type.
369    pub fn spec(&self) -> BarSpecification {
370        match &self {
371            BarType::Standard { spec, .. } | BarType::Composite { spec, .. } => *spec,
372        }
373    }
374
375    /// Returns the [`AggregationSource`] for this bar type.
376    pub fn aggregation_source(&self) -> AggregationSource {
377        match &self {
378            BarType::Standard {
379                aggregation_source, ..
380            }
381            | BarType::Composite {
382                aggregation_source, ..
383            } => *aggregation_source,
384        }
385    }
386}
387
388#[derive(thiserror::Error, Debug)]
389#[error("Error parsing `BarType` from '{input}', invalid token: '{token}' at position {position}")]
390pub struct BarTypeParseError {
391    input: String,
392    token: String,
393    position: usize,
394}
395
396impl FromStr for BarType {
397    type Err = BarTypeParseError;
398
399    fn from_str(s: &str) -> Result<Self, Self::Err> {
400        let parts: Vec<&str> = s.split('@').collect();
401        let standard = parts[0];
402        let composite_str = parts.get(1);
403
404        let pieces: Vec<&str> = standard.rsplitn(5, '-').collect();
405        let rev_pieces: Vec<&str> = pieces.into_iter().rev().collect();
406        if rev_pieces.len() != 5 {
407            return Err(BarTypeParseError {
408                input: s.to_string(),
409                token: String::new(),
410                position: 0,
411            });
412        }
413
414        let instrument_id =
415            InstrumentId::from_str(rev_pieces[0]).map_err(|_| BarTypeParseError {
416                input: s.to_string(),
417                token: rev_pieces[0].to_string(),
418                position: 0,
419            })?;
420
421        let step = rev_pieces[1].parse().map_err(|_| BarTypeParseError {
422            input: s.to_string(),
423            token: rev_pieces[1].to_string(),
424            position: 1,
425        })?;
426        let aggregation =
427            BarAggregation::from_str(rev_pieces[2]).map_err(|_| BarTypeParseError {
428                input: s.to_string(),
429                token: rev_pieces[2].to_string(),
430                position: 2,
431            })?;
432        let price_type = PriceType::from_str(rev_pieces[3]).map_err(|_| BarTypeParseError {
433            input: s.to_string(),
434            token: rev_pieces[3].to_string(),
435            position: 3,
436        })?;
437        let aggregation_source =
438            AggregationSource::from_str(rev_pieces[4]).map_err(|_| BarTypeParseError {
439                input: s.to_string(),
440                token: rev_pieces[4].to_string(),
441                position: 4,
442            })?;
443
444        if let Some(composite_str) = composite_str {
445            let composite_pieces: Vec<&str> = composite_str.rsplitn(3, '-').collect();
446            let rev_composite_pieces: Vec<&str> = composite_pieces.into_iter().rev().collect();
447            if rev_composite_pieces.len() != 3 {
448                return Err(BarTypeParseError {
449                    input: s.to_string(),
450                    token: String::new(),
451                    position: 5,
452                });
453            }
454
455            let composite_step =
456                rev_composite_pieces[0]
457                    .parse()
458                    .map_err(|_| BarTypeParseError {
459                        input: s.to_string(),
460                        token: rev_composite_pieces[0].to_string(),
461                        position: 5,
462                    })?;
463            let composite_aggregation =
464                BarAggregation::from_str(rev_composite_pieces[1]).map_err(|_| {
465                    BarTypeParseError {
466                        input: s.to_string(),
467                        token: rev_composite_pieces[1].to_string(),
468                        position: 6,
469                    }
470                })?;
471            let composite_aggregation_source = AggregationSource::from_str(rev_composite_pieces[2])
472                .map_err(|_| BarTypeParseError {
473                    input: s.to_string(),
474                    token: rev_composite_pieces[2].to_string(),
475                    position: 7,
476                })?;
477
478            Ok(Self::new_composite(
479                instrument_id,
480                BarSpecification::new(step, aggregation, price_type),
481                aggregation_source,
482                composite_step,
483                composite_aggregation,
484                composite_aggregation_source,
485            ))
486        } else {
487            Ok(Self::Standard {
488                instrument_id,
489                spec: BarSpecification::new(step, aggregation, price_type),
490                aggregation_source,
491            })
492        }
493    }
494}
495
496impl From<&str> for BarType {
497    fn from(value: &str) -> Self {
498        Self::from_str(value).expect(FAILED)
499    }
500}
501
502impl Display for BarType {
503    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
504        match &self {
505            BarType::Standard {
506                instrument_id,
507                spec,
508                aggregation_source,
509            } => {
510                write!(f, "{}-{}-{}", instrument_id, spec, aggregation_source)
511            }
512            BarType::Composite {
513                instrument_id,
514                spec,
515                aggregation_source,
516
517                composite_step,
518                composite_aggregation,
519                composite_aggregation_source,
520            } => {
521                write!(
522                    f,
523                    "{}-{}-{}@{}-{}-{}",
524                    instrument_id,
525                    spec,
526                    aggregation_source,
527                    *composite_step,
528                    *composite_aggregation,
529                    *composite_aggregation_source
530                )
531            }
532        }
533    }
534}
535
536impl Serialize for BarType {
537    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
538    where
539        S: Serializer,
540    {
541        serializer.serialize_str(&self.to_string())
542    }
543}
544
545impl<'de> Deserialize<'de> for BarType {
546    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
547    where
548        D: Deserializer<'de>,
549    {
550        let s: String = Deserialize::deserialize(deserializer)?;
551        Self::from_str(&s).map_err(serde::de::Error::custom)
552    }
553}
554
555/// Represents an aggregated bar.
556#[repr(C)]
557#[derive(Clone, Copy, Hash, PartialEq, Eq, Debug, Serialize, Deserialize)]
558#[serde(tag = "type")]
559#[cfg_attr(
560    feature = "python",
561    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.model")
562)]
563pub struct Bar {
564    /// The bar type for this bar.
565    pub bar_type: BarType,
566    /// The bars open price.
567    pub open: Price,
568    /// The bars high price.
569    pub high: Price,
570    /// The bars low price.
571    pub low: Price,
572    /// The bars close price.
573    pub close: Price,
574    /// The bars volume.
575    pub volume: Quantity,
576    /// UNIX timestamp (nanoseconds) when the data event occurred.
577    pub ts_event: UnixNanos,
578    /// UNIX timestamp (nanoseconds) when the struct was initialized.
579    pub ts_init: UnixNanos,
580}
581
582impl Bar {
583    /// Creates a new [`Bar`] instance with correctness checking.
584    ///
585    /// # Errors
586    ///
587    /// This function returns an error:
588    /// - If `high` is not >= `low`.
589    /// - If `high` is not >= `close`.
590    /// - If `low` is not <= `close.
591    ///
592    /// # Notes
593    ///
594    /// PyO3 requires a `Result` type for proper error handling and stacktrace printing in Python.
595    #[allow(clippy::too_many_arguments)]
596    pub fn new_checked(
597        bar_type: BarType,
598        open: Price,
599        high: Price,
600        low: Price,
601        close: Price,
602        volume: Quantity,
603        ts_event: UnixNanos,
604        ts_init: UnixNanos,
605    ) -> anyhow::Result<Self> {
606        check_predicate_true(high >= open, "high >= open")?;
607        check_predicate_true(high >= low, "high >= low")?;
608        check_predicate_true(high >= close, "high >= close")?;
609        check_predicate_true(low <= close, "low <= close")?;
610        check_predicate_true(low <= open, "low <= open")?;
611
612        Ok(Self {
613            bar_type,
614            open,
615            high,
616            low,
617            close,
618            volume,
619            ts_event,
620            ts_init,
621        })
622    }
623
624    /// Creates a new [`Bar`] instance.
625    ///
626    /// # Panics
627    ///
628    /// This function panics:
629    /// - If `high` is not >= `low`.
630    /// - If `high` is not >= `close`.
631    /// - If `low` is not <= `close.
632    #[allow(clippy::too_many_arguments)]
633    pub fn new(
634        bar_type: BarType,
635        open: Price,
636        high: Price,
637        low: Price,
638        close: Price,
639        volume: Quantity,
640        ts_event: UnixNanos,
641        ts_init: UnixNanos,
642    ) -> Self {
643        Self::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
644            .expect(FAILED)
645    }
646
647    pub fn instrument_id(&self) -> InstrumentId {
648        self.bar_type.instrument_id()
649    }
650
651    /// Returns the metadata for the type, for use with serialization formats.
652    #[must_use]
653    pub fn get_metadata(
654        bar_type: &BarType,
655        price_precision: u8,
656        size_precision: u8,
657    ) -> HashMap<String, String> {
658        let mut metadata = HashMap::new();
659        let instrument_id = bar_type.instrument_id();
660        metadata.insert("bar_type".to_string(), bar_type.to_string());
661        metadata.insert("instrument_id".to_string(), instrument_id.to_string());
662        metadata.insert("price_precision".to_string(), price_precision.to_string());
663        metadata.insert("size_precision".to_string(), size_precision.to_string());
664        metadata
665    }
666
667    /// Returns the field map for the type, for use with Arrow schemas.
668    #[must_use]
669    pub fn get_fields() -> IndexMap<String, String> {
670        let mut metadata = IndexMap::new();
671        metadata.insert("open".to_string(), "Int64".to_string());
672        metadata.insert("high".to_string(), "Int64".to_string());
673        metadata.insert("low".to_string(), "Int64".to_string());
674        metadata.insert("close".to_string(), "Int64".to_string());
675        metadata.insert("volume".to_string(), "UInt64".to_string());
676        metadata.insert("ts_event".to_string(), "UInt64".to_string());
677        metadata.insert("ts_init".to_string(), "UInt64".to_string());
678        metadata
679    }
680}
681
682impl Display for Bar {
683    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
684        write!(
685            f,
686            "{},{},{},{},{},{},{}",
687            self.bar_type, self.open, self.high, self.low, self.close, self.volume, self.ts_event
688        )
689    }
690}
691
692impl Serializable for Bar {}
693
694impl GetTsInit for Bar {
695    fn ts_init(&self) -> UnixNanos {
696        self.ts_init
697    }
698}
699
700////////////////////////////////////////////////////////////////////////////////
701// Tests
702////////////////////////////////////////////////////////////////////////////////
703#[cfg(test)]
704mod tests {
705    use chrono::TimeZone;
706    use rstest::rstest;
707
708    use super::*;
709    use crate::identifiers::{Symbol, Venue};
710
711    #[rstest]
712    fn test_bar_specification_new_invalid() {
713        let result = BarSpecification::new_checked(0, BarAggregation::Tick, PriceType::Last);
714        assert!(result.is_err());
715    }
716
717    #[rstest]
718    #[should_panic(expected = "Invalid step: 0 (must be non-zero)")]
719    fn test_bar_specification_new_checked_with_invalid_step_panics() {
720        let aggregation = BarAggregation::Tick;
721        let price_type = PriceType::Last;
722
723        let _ = BarSpecification::new(0, aggregation, price_type);
724    }
725
726    #[rstest]
727    #[case(BarAggregation::Millisecond, 1, TimeDelta::milliseconds(1))]
728    #[case(BarAggregation::Millisecond, 10, TimeDelta::milliseconds(10))]
729    #[case(BarAggregation::Second, 1, TimeDelta::seconds(1))]
730    #[case(BarAggregation::Second, 15, TimeDelta::seconds(15))]
731    #[case(BarAggregation::Minute, 1, TimeDelta::minutes(1))]
732    #[case(BarAggregation::Minute, 60, TimeDelta::minutes(60))]
733    #[case(BarAggregation::Hour, 1, TimeDelta::hours(1))]
734    #[case(BarAggregation::Hour, 4, TimeDelta::hours(4))]
735    #[case(BarAggregation::Day, 1, TimeDelta::days(1))]
736    #[case(BarAggregation::Day, 2, TimeDelta::days(2))]
737    #[should_panic(expected = "Aggregation not time based")]
738    #[case(BarAggregation::Tick, 1, TimeDelta::zero())]
739    fn test_get_bar_interval(
740        #[case] aggregation: BarAggregation,
741        #[case] step: usize,
742        #[case] expected: TimeDelta,
743    ) {
744        let bar_type = BarType::Standard {
745            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
746            spec: BarSpecification::new(step, aggregation, PriceType::Last),
747            aggregation_source: AggregationSource::Internal,
748        };
749
750        let interval = get_bar_interval(&bar_type);
751        assert_eq!(interval, expected);
752    }
753
754    #[rstest]
755    #[case(BarAggregation::Millisecond, 1, UnixNanos::from(1_000_000))]
756    #[case(BarAggregation::Millisecond, 10, UnixNanos::from(10_000_000))]
757    #[case(BarAggregation::Second, 1, UnixNanos::from(1_000_000_000))]
758    #[case(BarAggregation::Second, 10, UnixNanos::from(10_000_000_000))]
759    #[case(BarAggregation::Minute, 1, UnixNanos::from(60_000_000_000))]
760    #[case(BarAggregation::Minute, 60, UnixNanos::from(3_600_000_000_000))]
761    #[case(BarAggregation::Hour, 1, UnixNanos::from(3_600_000_000_000))]
762    #[case(BarAggregation::Hour, 4, UnixNanos::from(14_400_000_000_000))]
763    #[case(BarAggregation::Day, 1, UnixNanos::from(86_400_000_000_000))]
764    #[case(BarAggregation::Day, 2, UnixNanos::from(172_800_000_000_000))]
765    #[should_panic(expected = "Aggregation not time based")]
766    #[case(BarAggregation::Tick, 1, UnixNanos::from(0))]
767    fn test_get_bar_interval_ns(
768        #[case] aggregation: BarAggregation,
769        #[case] step: usize,
770        #[case] expected: UnixNanos,
771    ) {
772        let bar_type = BarType::Standard {
773            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
774            spec: BarSpecification::new(step, aggregation, PriceType::Last),
775            aggregation_source: AggregationSource::Internal,
776        };
777
778        let interval_ns = get_bar_interval_ns(&bar_type);
779        assert_eq!(interval_ns, expected);
780    }
781
782    #[rstest]
783    #[case::millisecond(
784    Utc.timestamp_opt(1658349296, 123_000_000).unwrap(), // 2024-07-21 12:34:56.123 UTC
785    BarAggregation::Millisecond,
786    1,
787    Utc.timestamp_opt(1658349296, 123_000_000).unwrap(),  // 2024-07-21 12:34:56.123 UTC
788    )]
789    #[rstest]
790    #[case::millisecond(
791    Utc.timestamp_opt(1658349296, 123_000_000).unwrap(), // 2024-07-21 12:34:56.123 UTC
792    BarAggregation::Millisecond,
793    10,
794    Utc.timestamp_opt(1658349296, 120_000_000).unwrap(),  // 2024-07-21 12:34:56.120 UTC
795    )]
796    #[case::second(
797    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
798    BarAggregation::Millisecond,
799    1000,
800    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap()
801    )]
802    #[case::second(
803    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
804    BarAggregation::Second,
805    1,
806    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap()
807    )]
808    #[case::second(
809    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
810    BarAggregation::Second,
811    5,
812    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 55).unwrap()
813    )]
814    #[case::second(
815    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
816    BarAggregation::Second,
817    60,
818    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 0).unwrap()
819    )]
820    #[case::minute(
821    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
822    BarAggregation::Minute,
823    1,
824    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 0).unwrap()
825    )]
826    #[case::minute(
827    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
828    BarAggregation::Minute,
829    5,
830    Utc.with_ymd_and_hms(2024, 7, 21, 12, 30, 0).unwrap()
831    )]
832    #[case::minute(
833    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
834    BarAggregation::Minute,
835    60,
836    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
837    )]
838    #[case::hour(
839    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
840    BarAggregation::Hour,
841    1,
842    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
843    )]
844    #[case::hour(
845    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
846    BarAggregation::Hour,
847    2,
848    Utc.with_ymd_and_hms(2024, 7, 21, 12, 0, 0).unwrap()
849    )]
850    #[case::day(
851    Utc.with_ymd_and_hms(2024, 7, 21, 12, 34, 56).unwrap(),
852    BarAggregation::Day,
853    1,
854    Utc.with_ymd_and_hms(2024, 7, 21, 0, 0, 0).unwrap()
855    )]
856    fn test_get_time_bar_start(
857        #[case] now: DateTime<Utc>,
858        #[case] aggregation: BarAggregation,
859        #[case] step: usize,
860        #[case] expected: DateTime<Utc>,
861    ) {
862        let bar_type = BarType::Standard {
863            instrument_id: InstrumentId::from("BTCUSDT-PERP.BINANCE"),
864            spec: BarSpecification::new(step, aggregation, PriceType::Last),
865            aggregation_source: AggregationSource::Internal,
866        };
867
868        let start_time = get_time_bar_start(now, &bar_type);
869        assert_eq!(start_time, expected);
870    }
871
872    #[rstest]
873    fn test_bar_spec_string_reprs() {
874        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
875        assert_eq!(bar_spec.to_string(), "1-MINUTE-BID");
876        assert_eq!(format!("{bar_spec}"), "1-MINUTE-BID");
877    }
878
879    #[rstest]
880    fn test_bar_type_parse_valid() {
881        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL";
882        let bar_type = BarType::from(input);
883
884        assert_eq!(
885            bar_type.instrument_id(),
886            InstrumentId::from("BTCUSDT-PERP.BINANCE")
887        );
888        assert_eq!(
889            bar_type.spec(),
890            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
891        );
892        assert_eq!(bar_type.aggregation_source(), AggregationSource::External);
893        assert_eq!(bar_type, BarType::from(input));
894    }
895
896    #[rstest]
897    fn test_bar_type_composite_parse_valid() {
898        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-EXTERNAL";
899        let bar_type = BarType::from(input);
900        let standard = bar_type.standard();
901
902        assert_eq!(
903            bar_type.instrument_id(),
904            InstrumentId::from("BTCUSDT-PERP.BINANCE")
905        );
906        assert_eq!(
907            bar_type.spec(),
908            BarSpecification::new(2, BarAggregation::Minute, PriceType::Last,)
909        );
910        assert_eq!(bar_type.aggregation_source(), AggregationSource::Internal);
911        assert_eq!(bar_type, BarType::from(input));
912        assert!(bar_type.is_composite());
913
914        assert_eq!(
915            standard.instrument_id(),
916            InstrumentId::from("BTCUSDT-PERP.BINANCE")
917        );
918        assert_eq!(
919            standard.spec(),
920            BarSpecification::new(2, BarAggregation::Minute, PriceType::Last,)
921        );
922        assert_eq!(standard.aggregation_source(), AggregationSource::Internal);
923        assert!(standard.is_standard());
924
925        let composite = bar_type.composite();
926        let composite_input = "BTCUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL";
927
928        assert_eq!(
929            composite.instrument_id(),
930            InstrumentId::from("BTCUSDT-PERP.BINANCE")
931        );
932        assert_eq!(
933            composite.spec(),
934            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last,)
935        );
936        assert_eq!(composite.aggregation_source(), AggregationSource::External);
937        assert_eq!(composite, BarType::from(composite_input));
938        assert!(composite.is_standard());
939    }
940
941    #[rstest]
942    fn test_bar_type_parse_invalid_token_pos_0() {
943        let input = "BTCUSDT-PERP-1-MINUTE-LAST-INTERNAL";
944        let result = BarType::from_str(input);
945
946        assert_eq!(
947            result.unwrap_err().to_string(),
948            format!("Error parsing `BarType` from '{input}', invalid token: 'BTCUSDT-PERP' at position 0")
949        );
950    }
951
952    #[rstest]
953    fn test_bar_type_parse_invalid_token_pos_1() {
954        let input = "BTCUSDT-PERP.BINANCE-INVALID-MINUTE-LAST-INTERNAL";
955        let result = BarType::from_str(input);
956
957        assert_eq!(
958            result.unwrap_err().to_string(),
959            format!(
960                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 1"
961            )
962        );
963    }
964
965    #[rstest]
966    fn test_bar_type_parse_invalid_token_pos_2() {
967        let input = "BTCUSDT-PERP.BINANCE-1-INVALID-LAST-INTERNAL";
968        let result = BarType::from_str(input);
969
970        assert_eq!(
971            result.unwrap_err().to_string(),
972            format!(
973                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 2"
974            )
975        );
976    }
977
978    #[rstest]
979    fn test_bar_type_parse_invalid_token_pos_3() {
980        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-INVALID-INTERNAL";
981        let result = BarType::from_str(input);
982
983        assert_eq!(
984            result.unwrap_err().to_string(),
985            format!(
986                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 3"
987            )
988        );
989    }
990
991    #[rstest]
992    fn test_bar_type_parse_invalid_token_pos_4() {
993        let input = "BTCUSDT-PERP.BINANCE-1-MINUTE-BID-INVALID";
994        let result = BarType::from_str(input);
995
996        assert!(result.is_err());
997        assert_eq!(
998            result.unwrap_err().to_string(),
999            format!(
1000                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 4"
1001            )
1002        );
1003    }
1004
1005    #[rstest]
1006    fn test_bar_type_parse_invalid_token_pos_5() {
1007        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@INVALID-MINUTE-EXTERNAL";
1008        let result = BarType::from_str(input);
1009
1010        assert!(result.is_err());
1011        assert_eq!(
1012            result.unwrap_err().to_string(),
1013            format!(
1014                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 5"
1015            )
1016        );
1017    }
1018
1019    #[rstest]
1020    fn test_bar_type_parse_invalid_token_pos_6() {
1021        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-INVALID-EXTERNAL";
1022        let result = BarType::from_str(input);
1023
1024        assert!(result.is_err());
1025        assert_eq!(
1026            result.unwrap_err().to_string(),
1027            format!(
1028                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 6"
1029            )
1030        );
1031    }
1032
1033    #[rstest]
1034    fn test_bar_type_parse_invalid_token_pos_7() {
1035        let input = "BTCUSDT-PERP.BINANCE-2-MINUTE-LAST-INTERNAL@1-MINUTE-INVALID";
1036        let result = BarType::from_str(input);
1037
1038        assert!(result.is_err());
1039        assert_eq!(
1040            result.unwrap_err().to_string(),
1041            format!(
1042                "Error parsing `BarType` from '{input}', invalid token: 'INVALID' at position 7"
1043            )
1044        );
1045    }
1046
1047    #[rstest]
1048    fn test_bar_type_equality() {
1049        let instrument_id1 = InstrumentId {
1050            symbol: Symbol::new("AUD/USD"),
1051            venue: Venue::new("SIM"),
1052        };
1053        let instrument_id2 = InstrumentId {
1054            symbol: Symbol::new("GBP/USD"),
1055            venue: Venue::new("SIM"),
1056        };
1057        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1058        let bar_type1 = BarType::Standard {
1059            instrument_id: instrument_id1,
1060            spec: bar_spec,
1061            aggregation_source: AggregationSource::External,
1062        };
1063        let bar_type2 = BarType::Standard {
1064            instrument_id: instrument_id1,
1065            spec: bar_spec,
1066            aggregation_source: AggregationSource::External,
1067        };
1068        let bar_type3 = BarType::Standard {
1069            instrument_id: instrument_id2,
1070            spec: bar_spec,
1071            aggregation_source: AggregationSource::External,
1072        };
1073        assert_eq!(bar_type1, bar_type1);
1074        assert_eq!(bar_type1, bar_type2);
1075        assert_ne!(bar_type1, bar_type3);
1076    }
1077
1078    #[rstest]
1079    fn test_bar_type_comparison() {
1080        let instrument_id1 = InstrumentId {
1081            symbol: Symbol::new("AUD/USD"),
1082            venue: Venue::new("SIM"),
1083        };
1084
1085        let instrument_id2 = InstrumentId {
1086            symbol: Symbol::new("GBP/USD"),
1087            venue: Venue::new("SIM"),
1088        };
1089        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1090        let bar_spec2 = BarSpecification::new(2, BarAggregation::Minute, PriceType::Bid);
1091        let bar_type1 = BarType::Standard {
1092            instrument_id: instrument_id1,
1093            spec: bar_spec,
1094            aggregation_source: AggregationSource::External,
1095        };
1096        let bar_type2 = BarType::Standard {
1097            instrument_id: instrument_id1,
1098            spec: bar_spec,
1099            aggregation_source: AggregationSource::External,
1100        };
1101        let bar_type3 = BarType::Standard {
1102            instrument_id: instrument_id2,
1103            spec: bar_spec,
1104            aggregation_source: AggregationSource::External,
1105        };
1106        let bar_type4 = BarType::Composite {
1107            instrument_id: instrument_id2,
1108            spec: bar_spec2,
1109            aggregation_source: AggregationSource::Internal,
1110
1111            composite_step: 1,
1112            composite_aggregation: BarAggregation::Minute,
1113            composite_aggregation_source: AggregationSource::External,
1114        };
1115
1116        assert!(bar_type1 <= bar_type2);
1117        assert!(bar_type1 < bar_type3);
1118        assert!(bar_type3 > bar_type1);
1119        assert!(bar_type3 >= bar_type1);
1120        assert!(bar_type4 >= bar_type1);
1121    }
1122
1123    #[rstest]
1124    fn test_bar_new() {
1125        let bar_type = BarType::from("AAPL.XNAS-1-MINUTE-LAST-INTERNAL");
1126        let open = Price::from("100.0");
1127        let high = Price::from("105.0");
1128        let low = Price::from("95.0");
1129        let close = Price::from("102.0");
1130        let volume = Quantity::from("1000");
1131        let ts_event = UnixNanos::from(1_000_000);
1132        let ts_init = UnixNanos::from(2_000_000);
1133
1134        let bar = Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init);
1135
1136        assert_eq!(bar.bar_type, bar_type);
1137        assert_eq!(bar.open, open);
1138        assert_eq!(bar.high, high);
1139        assert_eq!(bar.low, low);
1140        assert_eq!(bar.close, close);
1141        assert_eq!(bar.volume, volume);
1142        assert_eq!(bar.ts_event, ts_event);
1143        assert_eq!(bar.ts_init, ts_init);
1144    }
1145
1146    #[rstest]
1147    #[case("100.0", "90.0", "95.0", "92.0")] // high < open
1148    #[case("100.0", "105.0", "110.0", "102.0")] // high < low
1149    #[case("100.0", "105.0", "95.0", "110.0")] // high < close
1150    #[case("100.0", "105.0", "95.0", "90.0")] // low > close
1151    #[case("100.0", "110.0", "105.0", "108.0")] // low > open
1152    #[case("100.0", "90.0", "110.0", "120.0")] // high < open, high < close, low > close
1153    fn test_bar_new_checked_conditions(
1154        #[case] open: &str,
1155        #[case] high: &str,
1156        #[case] low: &str,
1157        #[case] close: &str,
1158    ) {
1159        let bar_type = BarType::from("AAPL.XNAS-1-MINUTE-LAST-INTERNAL");
1160        let open = Price::from(open);
1161        let high = Price::from(high);
1162        let low = Price::from(low);
1163        let close = Price::from(close);
1164        let volume = Quantity::from("1000");
1165        let ts_event = UnixNanos::from(1_000_000);
1166        let ts_init = UnixNanos::from(2_000_000);
1167
1168        let result = Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init);
1169
1170        assert!(result.is_err());
1171    }
1172
1173    #[rstest]
1174    fn test_bar_equality() {
1175        let instrument_id = InstrumentId {
1176            symbol: Symbol::new("AUDUSD"),
1177            venue: Venue::new("SIM"),
1178        };
1179        let bar_spec = BarSpecification::new(1, BarAggregation::Minute, PriceType::Bid);
1180        let bar_type = BarType::Standard {
1181            instrument_id,
1182            spec: bar_spec,
1183            aggregation_source: AggregationSource::External,
1184        };
1185        let bar1 = Bar {
1186            bar_type,
1187            open: Price::from("1.00001"),
1188            high: Price::from("1.00004"),
1189            low: Price::from("1.00002"),
1190            close: Price::from("1.00003"),
1191            volume: Quantity::from("100000"),
1192            ts_event: UnixNanos::default(),
1193            ts_init: UnixNanos::from(1),
1194        };
1195
1196        let bar2 = Bar {
1197            bar_type,
1198            open: Price::from("1.00000"),
1199            high: Price::from("1.00004"),
1200            low: Price::from("1.00002"),
1201            close: Price::from("1.00003"),
1202            volume: Quantity::from("100000"),
1203            ts_event: UnixNanos::default(),
1204            ts_init: UnixNanos::from(1),
1205        };
1206        assert_eq!(bar1, bar1);
1207        assert_ne!(bar1, bar2);
1208    }
1209
1210    #[rstest]
1211    fn test_json_serialization() {
1212        let bar = Bar::default();
1213        let serialized = bar.as_json_bytes().unwrap();
1214        let deserialized = Bar::from_json_bytes(serialized.as_ref()).unwrap();
1215        assert_eq!(deserialized, bar);
1216    }
1217
1218    #[rstest]
1219    fn test_msgpack_serialization() {
1220        let bar = Bar::default();
1221        let serialized = bar.as_msgpack_bytes().unwrap();
1222        let deserialized = Bar::from_msgpack_bytes(serialized.as_ref()).unwrap();
1223        assert_eq!(deserialized, bar);
1224    }
1225}