nautilus_coinbase_intx/common/
parse.rs
1use std::{num::NonZero, str::FromStr};
17
18use nautilus_core::{datetime::NANOSECONDS_IN_MILLISECOND, nanos::UnixNanos};
19use nautilus_model::{
20 currencies::CURRENCY_MAP,
21 data::BarSpecification,
22 enums::{
23 AggressorSide, BarAggregation, CurrencyType, LiquiditySide, OrderSide, PositionSide,
24 PriceType,
25 },
26 identifiers::{InstrumentId, Symbol},
27 types::{Currency, Money, Price, Quantity},
28};
29use serde::{Deserialize, Deserializer};
30use ustr::Ustr;
31
32use crate::{
33 common::{
34 consts::COINBASE_INTX_VENUE,
35 enums::{CoinbaseIntxExecType, CoinbaseIntxSide},
36 },
37 websocket::enums::CoinbaseIntxWsChannel,
38};
39
40pub const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
41 step: NonZero::new(1).unwrap(),
42 aggregation: BarAggregation::Minute,
43 price_type: PriceType::Last,
44};
45pub const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
46 step: NonZero::new(5).unwrap(),
47 aggregation: BarAggregation::Minute,
48 price_type: PriceType::Last,
49};
50pub const BAR_SPEC_30_MINUTE: BarSpecification = BarSpecification {
51 step: NonZero::new(30).unwrap(),
52 aggregation: BarAggregation::Minute,
53 price_type: PriceType::Last,
54};
55pub const BAR_SPEC_2_HOUR: BarSpecification = BarSpecification {
56 step: NonZero::new(2).unwrap(),
57 aggregation: BarAggregation::Hour,
58 price_type: PriceType::Last,
59};
60pub const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
61 step: NonZero::new(1).unwrap(),
62 aggregation: BarAggregation::Day,
63 price_type: PriceType::Last,
64};
65
66pub fn deserialize_optional_string_to_u64<'de, D>(deserializer: D) -> Result<Option<u64>, D::Error>
68where
69 D: Deserializer<'de>,
70{
71 let s: Option<String> = Option::deserialize(deserializer)?;
72 match s {
73 Some(s) if s.is_empty() => Ok(None),
74 Some(s) => s.parse().map(Some).map_err(serde::de::Error::custom),
75 None => Ok(None),
76 }
77}
78
79pub fn get_currency(code: &str) -> Currency {
81 CURRENCY_MAP
82 .lock()
83 .unwrap()
84 .get(code)
85 .copied()
86 .unwrap_or(Currency::new(code, 8, 0, code, CurrencyType::Crypto))
87}
88
89#[must_use]
91pub fn parse_instrument_id(symbol: Ustr) -> InstrumentId {
92 InstrumentId::new(Symbol::from_ustr_unchecked(symbol), *COINBASE_INTX_VENUE)
93}
94
95pub fn parse_millisecond_timestamp(timestamp: &str) -> anyhow::Result<UnixNanos> {
96 let millis: u64 = timestamp.parse()?;
97 Ok(UnixNanos::from(millis * NANOSECONDS_IN_MILLISECOND))
98}
99
100pub fn parse_rfc3339_timestamp(timestamp: &str) -> anyhow::Result<UnixNanos> {
101 let dt = chrono::DateTime::parse_from_rfc3339(timestamp)?;
102 Ok(UnixNanos::from(dt.timestamp_nanos_opt().unwrap() as u64))
103}
104
105pub fn parse_price(value: &str) -> anyhow::Result<Price> {
106 Price::from_str(value).map_err(|e| anyhow::anyhow!(e))
107}
108
109pub fn parse_quantity(value: &str, precision: u8) -> anyhow::Result<Quantity> {
110 Quantity::new_checked(value.parse::<f64>()?, precision)
111}
112
113pub fn parse_notional(value: &str, currency: Currency) -> anyhow::Result<Option<Money>> {
114 let parsed = value.trim().parse::<f64>()?;
115 Ok(if parsed == 0.0 {
116 None
117 } else {
118 Some(Money::new(parsed, currency))
119 })
120}
121
122pub fn parse_aggressor_side(side: &Option<CoinbaseIntxSide>) -> AggressorSide {
123 match side {
124 Some(CoinbaseIntxSide::Buy) => nautilus_model::enums::AggressorSide::Buyer,
125 Some(CoinbaseIntxSide::Sell) => nautilus_model::enums::AggressorSide::Seller,
126 None => nautilus_model::enums::AggressorSide::NoAggressor,
127 }
128}
129
130pub fn parse_execution_type(liquidity: &Option<CoinbaseIntxExecType>) -> LiquiditySide {
131 match liquidity {
132 Some(CoinbaseIntxExecType::Maker) => nautilus_model::enums::LiquiditySide::Maker,
133 Some(CoinbaseIntxExecType::Taker) => nautilus_model::enums::LiquiditySide::Taker,
134 _ => nautilus_model::enums::LiquiditySide::NoLiquiditySide,
135 }
136}
137
138pub fn parse_position_side(current_qty: Option<f64>) -> PositionSide {
139 match current_qty {
140 Some(qty) if qty.is_sign_positive() => PositionSide::Long,
141 Some(qty) if qty.is_sign_negative() => PositionSide::Short,
142 _ => PositionSide::Flat,
143 }
144}
145
146pub fn parse_order_side(order_side: &Option<CoinbaseIntxSide>) -> OrderSide {
147 match order_side {
148 Some(CoinbaseIntxSide::Buy) => OrderSide::Buy,
149 Some(CoinbaseIntxSide::Sell) => OrderSide::Sell,
150 None => OrderSide::NoOrderSide,
151 }
152}
153
154pub fn bar_spec_as_coinbase_channel(
155 bar_spec: BarSpecification,
156) -> anyhow::Result<CoinbaseIntxWsChannel> {
157 let channel = match bar_spec {
158 BAR_SPEC_1_MINUTE => CoinbaseIntxWsChannel::CandlesOneMinute,
159 BAR_SPEC_5_MINUTE => CoinbaseIntxWsChannel::CandlesFiveMinute,
160 BAR_SPEC_30_MINUTE => CoinbaseIntxWsChannel::CandlesThirtyMinute,
161 BAR_SPEC_2_HOUR => CoinbaseIntxWsChannel::CandlesTwoHour,
162 BAR_SPEC_1_DAY => CoinbaseIntxWsChannel::CandlesOneDay,
163 _ => anyhow::bail!("Invalid `BarSpecification` for channel, was {bar_spec}"),
164 };
165 Ok(channel)
166}
167
168pub fn coinbase_channel_as_bar_spec(
169 channel: &CoinbaseIntxWsChannel,
170) -> anyhow::Result<BarSpecification> {
171 let bar_spec = match channel {
172 CoinbaseIntxWsChannel::CandlesOneMinute => BAR_SPEC_1_MINUTE,
173 CoinbaseIntxWsChannel::CandlesFiveMinute => BAR_SPEC_5_MINUTE,
174 CoinbaseIntxWsChannel::CandlesThirtyMinute => BAR_SPEC_30_MINUTE,
175 CoinbaseIntxWsChannel::CandlesTwoHour => BAR_SPEC_2_HOUR,
176 CoinbaseIntxWsChannel::CandlesOneDay => BAR_SPEC_1_DAY,
177 _ => anyhow::bail!("Invalid channel for `BarSpecification`, was {channel}"),
179 };
180 Ok(bar_spec)
181}