1use nautilus_core::nanos::UnixNanos;
17use nautilus_model::{
18 data::{
19 Bar, BarType, BookOrder, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
20 OrderBookDeltas, QuoteTick, TradeTick,
21 },
22 enums::{AggregationSource, AggressorSide, BookAction, OrderSide, RecordFlag},
23 identifiers::{InstrumentId, Symbol, TradeId},
24 instruments::{CryptoPerpetual, CurrencyPair, any::InstrumentAny},
25 types::{Price, Quantity},
26};
27use rust_decimal::Decimal;
28
29use super::messages::{
30 CoinbaseIntxWsCandleSnapshotMsg, CoinbaseIntxWsInstrumentMsg,
31 CoinbaseIntxWsOrderBookSnapshotMsg, CoinbaseIntxWsOrderBookUpdateMsg, CoinbaseIntxWsQuoteMsg,
32 CoinbaseIntxWsRiskMsg, CoinbaseIntxWsTradeMsg,
33};
34use crate::common::{
35 enums::CoinbaseIntxInstrumentType,
36 parse::{coinbase_channel_as_bar_spec, get_currency, parse_instrument_id},
37};
38
39pub fn parse_spot_instrument(
41 definition: &CoinbaseIntxWsInstrumentMsg,
42 margin_init: Option<Decimal>,
43 margin_maint: Option<Decimal>,
44 maker_fee: Option<Decimal>,
45 taker_fee: Option<Decimal>,
46 ts_init: UnixNanos,
47) -> anyhow::Result<InstrumentAny> {
48 let instrument_id = parse_instrument_id(definition.product_id);
49 let raw_symbol = Symbol::from_ustr_unchecked(definition.product_id);
50
51 let base_currency = get_currency(&definition.base_asset_name);
52 let quote_currency = get_currency(&definition.quote_asset_name);
53
54 let price_increment = Price::from(&definition.quote_increment);
55 let size_increment = Quantity::from(&definition.base_increment);
56
57 let lot_size = None;
58 let max_quantity = None;
59 let min_quantity = None;
60 let max_notional = None;
61 let min_notional = None;
62 let max_price = None;
63 let min_price = None;
64
65 let instrument = CurrencyPair::new(
66 instrument_id,
67 raw_symbol,
68 base_currency,
69 quote_currency,
70 price_increment.precision,
71 size_increment.precision,
72 price_increment,
73 size_increment,
74 lot_size,
75 max_quantity,
76 min_quantity,
77 max_notional,
78 min_notional,
79 max_price,
80 min_price,
81 margin_init,
82 margin_maint,
83 maker_fee,
84 taker_fee,
85 definition.time.into(),
86 ts_init,
87 );
88
89 Ok(InstrumentAny::CurrencyPair(instrument))
90}
91
92pub fn parse_perp_instrument(
94 definition: &CoinbaseIntxWsInstrumentMsg,
95 margin_init: Option<Decimal>,
96 margin_maint: Option<Decimal>,
97 maker_fee: Option<Decimal>,
98 taker_fee: Option<Decimal>,
99 ts_init: UnixNanos,
100) -> anyhow::Result<InstrumentAny> {
101 let instrument_id = parse_instrument_id(definition.product_id);
102 let raw_symbol = Symbol::from_ustr_unchecked(definition.product_id);
103
104 let base_currency = get_currency(&definition.base_asset_name);
105 let quote_currency = get_currency(&definition.quote_asset_name);
106 let settlement_currency = quote_currency;
107
108 let price_increment = Price::from(&definition.quote_increment);
109 let size_increment = Quantity::from(&definition.base_increment);
110
111 let multiplier = Some(Quantity::from(&definition.base_asset_multiplier));
112
113 let lot_size = None;
114 let max_quantity = None;
115 let min_quantity = None;
116 let max_notional = None;
117 let min_notional = None;
118 let max_price = None;
119 let min_price = None;
120
121 let is_inverse = false;
122
123 let instrument = CryptoPerpetual::new(
124 instrument_id,
125 raw_symbol,
126 base_currency,
127 quote_currency,
128 settlement_currency,
129 is_inverse,
130 price_increment.precision,
131 size_increment.precision,
132 price_increment,
133 size_increment,
134 multiplier,
135 lot_size,
136 max_quantity,
137 min_quantity,
138 max_notional,
139 min_notional,
140 max_price,
141 min_price,
142 margin_init,
143 margin_maint,
144 maker_fee,
145 taker_fee,
146 definition.time.into(),
147 ts_init,
148 );
149
150 Ok(InstrumentAny::CryptoPerpetual(instrument))
151}
152
153#[must_use]
154pub fn parse_instrument_any(
155 instrument: &CoinbaseIntxWsInstrumentMsg,
156 ts_init: UnixNanos,
157) -> Option<InstrumentAny> {
158 let result = match instrument.instrument_type {
159 CoinbaseIntxInstrumentType::Spot => {
160 parse_spot_instrument(instrument, None, None, None, None, ts_init).map(Some)
161 }
162 CoinbaseIntxInstrumentType::Perp => {
163 parse_perp_instrument(instrument, None, None, None, None, ts_init).map(Some)
164 }
165 CoinbaseIntxInstrumentType::Index => {
166 tracing::warn!(
167 "Index instrument parsing not implemented {}",
168 instrument.product_id,
169 );
170 Ok(None)
171 }
172 };
173
174 match result {
175 Ok(instrument) => instrument,
176 Err(e) => {
177 tracing::warn!("Failed to parse instrument {}: {e}", instrument.product_id,);
178 None
179 }
180 }
181}
182
183pub fn parse_orderbook_snapshot_msg(
184 msg: &CoinbaseIntxWsOrderBookSnapshotMsg,
185 instrument_id: InstrumentId,
186 price_precision: u8,
187 size_precision: u8,
188 ts_init: UnixNanos,
189) -> anyhow::Result<OrderBookDeltas> {
190 let ts_event = UnixNanos::from(msg.time);
191
192 let flags = RecordFlag::F_SNAPSHOT.value();
194
195 let mut deltas = Vec::with_capacity(msg.bids.len() + msg.asks.len());
197
198 for bid in &msg.bids {
200 let price_str = &bid[0];
201 let size_str = &bid[1];
202
203 let price = Price::new(
204 price_str
205 .parse::<f64>()
206 .map_err(|e| anyhow::anyhow!("Failed to parse bid price: {e}"))?,
207 price_precision,
208 );
209
210 let size = Quantity::new(
211 size_str
212 .parse::<f64>()
213 .map_err(|e| anyhow::anyhow!("Failed to parse bid size: {e}"))?,
214 size_precision,
215 );
216
217 let order_id = 0; let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
220
221 let delta = OrderBookDelta::new(
222 instrument_id,
223 BookAction::Add, order,
225 flags,
226 msg.sequence,
227 ts_event,
228 ts_init,
229 );
230
231 deltas.push(delta);
232 }
233
234 for ask in &msg.asks {
236 let price_str = &ask[0];
237 let size_str = &ask[1];
238
239 let price = Price::new(
240 price_str
241 .parse::<f64>()
242 .map_err(|e| anyhow::anyhow!("Failed to parse ask price: {e}"))?,
243 price_precision,
244 );
245
246 let size = Quantity::new(
247 size_str
248 .parse::<f64>()
249 .map_err(|e| anyhow::anyhow!("Failed to parse ask size: {e}"))?,
250 size_precision,
251 );
252
253 let order_id = 0; let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
256
257 let delta = OrderBookDelta::new(
258 instrument_id,
259 BookAction::Add, order,
261 flags,
262 msg.sequence,
263 ts_event,
264 ts_init,
265 );
266
267 deltas.push(delta);
268 }
269
270 Ok(OrderBookDeltas::new(instrument_id, deltas))
271}
272
273pub fn parse_orderbook_update_msg(
274 msg: &CoinbaseIntxWsOrderBookUpdateMsg,
275 instrument_id: InstrumentId,
276 price_precision: u8,
277 size_precision: u8,
278 ts_init: UnixNanos,
279) -> anyhow::Result<OrderBookDeltas> {
280 let ts_event = UnixNanos::from(msg.time);
281
282 let flags = 0;
284
285 let mut deltas = Vec::with_capacity(msg.changes.len());
287
288 for change in &msg.changes {
290 let side_str = &change[0];
291 let price_str = &change[1];
292 let size_str = &change[2];
293
294 let price = Price::new(
295 price_str
296 .parse::<f64>()
297 .map_err(|e| anyhow::anyhow!("Failed to parse price: {e}"))?,
298 price_precision,
299 );
300
301 let size = Quantity::new(
302 size_str
303 .parse::<f64>()
304 .map_err(|e| anyhow::anyhow!("Failed to parse size: {e}"))?,
305 size_precision,
306 );
307
308 let side = match side_str.as_str() {
310 "BUY" => OrderSide::Buy,
311 "SELL" => OrderSide::Sell,
312 _ => return Err(anyhow::anyhow!("Unknown order side: {side_str}")),
313 };
314
315 let book_action = if size.is_zero() {
317 BookAction::Delete
318 } else {
319 BookAction::Update
320 };
321
322 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
324
325 let delta = OrderBookDelta::new(
326 instrument_id,
327 book_action,
328 order,
329 flags,
330 msg.sequence,
331 ts_event,
332 ts_init,
333 );
334
335 deltas.push(delta);
336 }
337
338 Ok(OrderBookDeltas::new(instrument_id, deltas))
339}
340
341pub fn parse_quote_msg(
342 msg: &CoinbaseIntxWsQuoteMsg,
343 instrument_id: InstrumentId,
344 price_precision: u8,
345 size_precision: u8,
346 ts_init: UnixNanos,
347) -> anyhow::Result<QuoteTick> {
348 let bid_price = Price::new(msg.bid_price.parse::<f64>()?, price_precision);
349 let ask_price = Price::new(msg.ask_price.parse::<f64>()?, price_precision);
350 let bid_size = Quantity::new(msg.bid_qty.parse::<f64>()?, size_precision);
351 let ask_size = Quantity::new(msg.ask_qty.parse::<f64>()?, size_precision);
352 let ts_event = UnixNanos::from(msg.time);
353
354 Ok(QuoteTick::new(
355 instrument_id,
356 bid_price,
357 ask_price,
358 bid_size,
359 ask_size,
360 ts_event,
361 ts_init,
362 ))
363}
364
365pub fn parse_trade_msg(
366 msg: &CoinbaseIntxWsTradeMsg,
367 instrument_id: InstrumentId,
368 price_precision: u8,
369 size_precision: u8,
370 ts_init: UnixNanos,
371) -> anyhow::Result<TradeTick> {
372 let price = Price::new(msg.trade_price.parse::<f64>()?, price_precision);
373 let size = Quantity::new(msg.trade_qty.parse::<f64>()?, size_precision);
374 let aggressor_side: AggressorSide = msg.aggressor_side.clone().into();
375 let trade_id = TradeId::new(&msg.match_id);
376 let ts_event = UnixNanos::from(msg.time);
377
378 Ok(TradeTick::new(
379 instrument_id,
380 price,
381 size,
382 aggressor_side,
383 trade_id,
384 ts_event,
385 ts_init,
386 ))
387}
388
389pub fn parse_mark_price_msg(
390 msg: &CoinbaseIntxWsRiskMsg,
391 instrument_id: InstrumentId,
392 price_precision: u8,
393 ts_init: UnixNanos,
394) -> anyhow::Result<MarkPriceUpdate> {
395 let value = Price::new(msg.mark_price.parse::<f64>()?, price_precision);
396 let ts_event = UnixNanos::from(msg.time);
397
398 Ok(MarkPriceUpdate::new(
399 instrument_id,
400 value,
401 ts_event,
402 ts_init,
403 ))
404}
405
406pub fn parse_index_price_msg(
407 msg: &CoinbaseIntxWsRiskMsg,
408 instrument_id: InstrumentId,
409 price_precision: u8,
410 ts_init: UnixNanos,
411) -> anyhow::Result<IndexPriceUpdate> {
412 let value = Price::new(msg.index_price.parse::<f64>()?, price_precision);
413 let ts_event = UnixNanos::from(msg.time);
414
415 Ok(IndexPriceUpdate::new(
416 instrument_id,
417 value,
418 ts_event,
419 ts_init,
420 ))
421}
422
423pub fn parse_candle_msg(
424 msg: &CoinbaseIntxWsCandleSnapshotMsg,
425 instrument_id: InstrumentId,
426 price_precision: u8,
427 size_precision: u8,
428 ts_init: UnixNanos,
429) -> anyhow::Result<Bar> {
430 let bar_spec = coinbase_channel_as_bar_spec(&msg.channel)?;
431 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
432 let candle = msg.candles.last().unwrap();
433 let ts_event = UnixNanos::from(candle.start); let open_price = Price::new(candle.open.parse::<f64>()?, price_precision);
436 let high_price = Price::new(candle.high.parse::<f64>()?, price_precision);
437 let low_price = Price::new(candle.low.parse::<f64>()?, price_precision);
438 let close_price = Price::new(candle.close.parse::<f64>()?, price_precision);
439 let volume = Quantity::new(candle.volume.parse::<f64>()?, size_precision);
440
441 Ok(Bar::new(
443 bar_type,
444 open_price,
445 high_price,
446 low_price,
447 close_price,
448 volume,
449 ts_event,
450 ts_init,
451 ))
452}