nautilus_coinbase_intx/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use nautilus_core::nanos::UnixNanos;
17use nautilus_model::{
18    data::{
19        Bar, BarType, BookOrder, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
20        OrderBookDeltas, QuoteTick, TradeTick,
21    },
22    enums::{AggregationSource, AggressorSide, BookAction, OrderSide, RecordFlag},
23    identifiers::{InstrumentId, Symbol, TradeId},
24    instruments::{CryptoPerpetual, CurrencyPair, any::InstrumentAny},
25    types::{Price, Quantity},
26};
27use rust_decimal::Decimal;
28
29use super::messages::{
30    CoinbaseIntxWsCandleSnapshotMsg, CoinbaseIntxWsInstrumentMsg,
31    CoinbaseIntxWsOrderBookSnapshotMsg, CoinbaseIntxWsOrderBookUpdateMsg, CoinbaseIntxWsQuoteMsg,
32    CoinbaseIntxWsRiskMsg, CoinbaseIntxWsTradeMsg,
33};
34use crate::common::{
35    enums::CoinbaseIntxInstrumentType,
36    parse::{coinbase_channel_as_bar_spec, get_currency, parse_instrument_id},
37};
38
39/// Parses a Coinbase spot instrument into an InstrumentAny::CurrencyPair.
40pub fn parse_spot_instrument(
41    definition: &CoinbaseIntxWsInstrumentMsg,
42    margin_init: Option<Decimal>,
43    margin_maint: Option<Decimal>,
44    maker_fee: Option<Decimal>,
45    taker_fee: Option<Decimal>,
46    ts_init: UnixNanos,
47) -> anyhow::Result<InstrumentAny> {
48    let instrument_id = parse_instrument_id(definition.product_id);
49    let raw_symbol = Symbol::from_ustr_unchecked(definition.product_id);
50
51    let base_currency = get_currency(&definition.base_asset_name);
52    let quote_currency = get_currency(&definition.quote_asset_name);
53
54    let price_increment = Price::from(&definition.quote_increment);
55    let size_increment = Quantity::from(&definition.base_increment);
56
57    let lot_size = None;
58    let max_quantity = None;
59    let min_quantity = None;
60    let max_notional = None;
61    let min_notional = None;
62    let max_price = None;
63    let min_price = None;
64
65    let instrument = CurrencyPair::new(
66        instrument_id,
67        raw_symbol,
68        base_currency,
69        quote_currency,
70        price_increment.precision,
71        size_increment.precision,
72        price_increment,
73        size_increment,
74        lot_size,
75        max_quantity,
76        min_quantity,
77        max_notional,
78        min_notional,
79        max_price,
80        min_price,
81        margin_init,
82        margin_maint,
83        maker_fee,
84        taker_fee,
85        definition.time.into(),
86        ts_init,
87    );
88
89    Ok(InstrumentAny::CurrencyPair(instrument))
90}
91
92/// Parses a Coinbase perpetual instrument into an InstrumentAny::CryptoPerpetual.
93pub fn parse_perp_instrument(
94    definition: &CoinbaseIntxWsInstrumentMsg,
95    margin_init: Option<Decimal>,
96    margin_maint: Option<Decimal>,
97    maker_fee: Option<Decimal>,
98    taker_fee: Option<Decimal>,
99    ts_init: UnixNanos,
100) -> anyhow::Result<InstrumentAny> {
101    let instrument_id = parse_instrument_id(definition.product_id);
102    let raw_symbol = Symbol::from_ustr_unchecked(definition.product_id);
103
104    let base_currency = get_currency(&definition.base_asset_name);
105    let quote_currency = get_currency(&definition.quote_asset_name);
106    let settlement_currency = quote_currency;
107
108    let price_increment = Price::from(&definition.quote_increment);
109    let size_increment = Quantity::from(&definition.base_increment);
110
111    let multiplier = Some(Quantity::from(&definition.base_asset_multiplier));
112
113    let lot_size = None;
114    let max_quantity = None;
115    let min_quantity = None;
116    let max_notional = None;
117    let min_notional = None;
118    let max_price = None;
119    let min_price = None;
120
121    let is_inverse = false;
122
123    let instrument = CryptoPerpetual::new(
124        instrument_id,
125        raw_symbol,
126        base_currency,
127        quote_currency,
128        settlement_currency,
129        is_inverse,
130        price_increment.precision,
131        size_increment.precision,
132        price_increment,
133        size_increment,
134        multiplier,
135        lot_size,
136        max_quantity,
137        min_quantity,
138        max_notional,
139        min_notional,
140        max_price,
141        min_price,
142        margin_init,
143        margin_maint,
144        maker_fee,
145        taker_fee,
146        definition.time.into(),
147        ts_init,
148    );
149
150    Ok(InstrumentAny::CryptoPerpetual(instrument))
151}
152
153#[must_use]
154pub fn parse_instrument_any(
155    instrument: &CoinbaseIntxWsInstrumentMsg,
156    ts_init: UnixNanos,
157) -> Option<InstrumentAny> {
158    let result = match instrument.instrument_type {
159        CoinbaseIntxInstrumentType::Spot => {
160            parse_spot_instrument(instrument, None, None, None, None, ts_init).map(Some)
161        }
162        CoinbaseIntxInstrumentType::Perp => {
163            parse_perp_instrument(instrument, None, None, None, None, ts_init).map(Some)
164        }
165        CoinbaseIntxInstrumentType::Index => {
166            tracing::warn!(
167                "Index instrument parsing not implemented {}",
168                instrument.product_id,
169            );
170            Ok(None)
171        }
172    };
173
174    match result {
175        Ok(instrument) => instrument,
176        Err(e) => {
177            tracing::warn!("Failed to parse instrument {}: {e}", instrument.product_id,);
178            None
179        }
180    }
181}
182
183pub fn parse_orderbook_snapshot_msg(
184    msg: &CoinbaseIntxWsOrderBookSnapshotMsg,
185    instrument_id: InstrumentId,
186    price_precision: u8,
187    size_precision: u8,
188    ts_init: UnixNanos,
189) -> anyhow::Result<OrderBookDeltas> {
190    let ts_event = UnixNanos::from(msg.time);
191
192    // Set the snapshot flag
193    let flags = RecordFlag::F_SNAPSHOT.value();
194
195    // Allocate capacity for all bids and asks
196    let mut deltas = Vec::with_capacity(msg.bids.len() + msg.asks.len());
197
198    // Process bids - in Coinbase, bids are buy orders
199    for bid in &msg.bids {
200        let price_str = &bid[0];
201        let size_str = &bid[1];
202
203        let price = Price::new(
204            price_str
205                .parse::<f64>()
206                .map_err(|e| anyhow::anyhow!("Failed to parse bid price: {e}"))?,
207            price_precision,
208        );
209
210        let size = Quantity::new(
211            size_str
212                .parse::<f64>()
213                .map_err(|e| anyhow::anyhow!("Failed to parse bid size: {e}"))?,
214            size_precision,
215        );
216
217        // For bids (buy orders), we use OrderSide::Buy
218        let order_id = 0; // Not provided by Coinbase
219        let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
220
221        let delta = OrderBookDelta::new(
222            instrument_id,
223            BookAction::Add, // For snapshots, always use Add
224            order,
225            flags,
226            msg.sequence,
227            ts_event,
228            ts_init,
229        );
230
231        deltas.push(delta);
232    }
233
234    // Process asks - in Coinbase, asks are sell orders
235    for ask in &msg.asks {
236        let price_str = &ask[0];
237        let size_str = &ask[1];
238
239        let price = Price::new(
240            price_str
241                .parse::<f64>()
242                .map_err(|e| anyhow::anyhow!("Failed to parse ask price: {e}"))?,
243            price_precision,
244        );
245
246        let size = Quantity::new(
247            size_str
248                .parse::<f64>()
249                .map_err(|e| anyhow::anyhow!("Failed to parse ask size: {e}"))?,
250            size_precision,
251        );
252
253        // For asks (sell orders), we use OrderSide::Sell
254        let order_id = 0; // Not provided by Coinbase
255        let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
256
257        let delta = OrderBookDelta::new(
258            instrument_id,
259            BookAction::Add, // For snapshots, always use Add
260            order,
261            flags,
262            msg.sequence,
263            ts_event,
264            ts_init,
265        );
266
267        deltas.push(delta);
268    }
269
270    Ok(OrderBookDeltas::new(instrument_id, deltas))
271}
272
273pub fn parse_orderbook_update_msg(
274    msg: &CoinbaseIntxWsOrderBookUpdateMsg,
275    instrument_id: InstrumentId,
276    price_precision: u8,
277    size_precision: u8,
278    ts_init: UnixNanos,
279) -> anyhow::Result<OrderBookDeltas> {
280    let ts_event = UnixNanos::from(msg.time);
281
282    // No snapshot flag for updates
283    let flags = 0;
284
285    // Allocate capacity for all changes
286    let mut deltas = Vec::with_capacity(msg.changes.len());
287
288    // Process changes
289    for change in &msg.changes {
290        let side_str = &change[0];
291        let price_str = &change[1];
292        let size_str = &change[2];
293
294        let price = Price::new(
295            price_str
296                .parse::<f64>()
297                .map_err(|e| anyhow::anyhow!("Failed to parse price: {e}"))?,
298            price_precision,
299        );
300
301        let size = Quantity::new(
302            size_str
303                .parse::<f64>()
304                .map_err(|e| anyhow::anyhow!("Failed to parse size: {e}"))?,
305            size_precision,
306        );
307
308        // Determine order side
309        let side = match side_str.as_str() {
310            "BUY" => OrderSide::Buy,
311            "SELL" => OrderSide::Sell,
312            _ => return Err(anyhow::anyhow!("Unknown order side: {side_str}")),
313        };
314
315        // Determine book action based on size
316        let book_action = if size.is_zero() {
317            BookAction::Delete
318        } else {
319            BookAction::Update
320        };
321
322        let order_id = 0; // Not provided by Coinbase
323        let order = BookOrder::new(side, price, size, order_id);
324
325        let delta = OrderBookDelta::new(
326            instrument_id,
327            book_action,
328            order,
329            flags,
330            msg.sequence,
331            ts_event,
332            ts_init,
333        );
334
335        deltas.push(delta);
336    }
337
338    Ok(OrderBookDeltas::new(instrument_id, deltas))
339}
340
341pub fn parse_quote_msg(
342    msg: &CoinbaseIntxWsQuoteMsg,
343    instrument_id: InstrumentId,
344    price_precision: u8,
345    size_precision: u8,
346    ts_init: UnixNanos,
347) -> anyhow::Result<QuoteTick> {
348    let bid_price = Price::new(msg.bid_price.parse::<f64>()?, price_precision);
349    let ask_price = Price::new(msg.ask_price.parse::<f64>()?, price_precision);
350    let bid_size = Quantity::new(msg.bid_qty.parse::<f64>()?, size_precision);
351    let ask_size = Quantity::new(msg.ask_qty.parse::<f64>()?, size_precision);
352    let ts_event = UnixNanos::from(msg.time);
353
354    Ok(QuoteTick::new(
355        instrument_id,
356        bid_price,
357        ask_price,
358        bid_size,
359        ask_size,
360        ts_event,
361        ts_init,
362    ))
363}
364
365pub fn parse_trade_msg(
366    msg: &CoinbaseIntxWsTradeMsg,
367    instrument_id: InstrumentId,
368    price_precision: u8,
369    size_precision: u8,
370    ts_init: UnixNanos,
371) -> anyhow::Result<TradeTick> {
372    let price = Price::new(msg.trade_price.parse::<f64>()?, price_precision);
373    let size = Quantity::new(msg.trade_qty.parse::<f64>()?, size_precision);
374    let aggressor_side: AggressorSide = msg.aggressor_side.clone().into();
375    let trade_id = TradeId::new(&msg.match_id);
376    let ts_event = UnixNanos::from(msg.time);
377
378    Ok(TradeTick::new(
379        instrument_id,
380        price,
381        size,
382        aggressor_side,
383        trade_id,
384        ts_event,
385        ts_init,
386    ))
387}
388
389pub fn parse_mark_price_msg(
390    msg: &CoinbaseIntxWsRiskMsg,
391    instrument_id: InstrumentId,
392    price_precision: u8,
393    ts_init: UnixNanos,
394) -> anyhow::Result<MarkPriceUpdate> {
395    let value = Price::new(msg.mark_price.parse::<f64>()?, price_precision);
396    let ts_event = UnixNanos::from(msg.time);
397
398    Ok(MarkPriceUpdate::new(
399        instrument_id,
400        value,
401        ts_event,
402        ts_init,
403    ))
404}
405
406pub fn parse_index_price_msg(
407    msg: &CoinbaseIntxWsRiskMsg,
408    instrument_id: InstrumentId,
409    price_precision: u8,
410    ts_init: UnixNanos,
411) -> anyhow::Result<IndexPriceUpdate> {
412    let value = Price::new(msg.index_price.parse::<f64>()?, price_precision);
413    let ts_event = UnixNanos::from(msg.time);
414
415    Ok(IndexPriceUpdate::new(
416        instrument_id,
417        value,
418        ts_event,
419        ts_init,
420    ))
421}
422
423pub fn parse_candle_msg(
424    msg: &CoinbaseIntxWsCandleSnapshotMsg,
425    instrument_id: InstrumentId,
426    price_precision: u8,
427    size_precision: u8,
428    ts_init: UnixNanos,
429) -> anyhow::Result<Bar> {
430    let bar_spec = coinbase_channel_as_bar_spec(&msg.channel)?;
431    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
432    let candle = msg.candles.last().unwrap();
433    let ts_event = UnixNanos::from(candle.start); // TODO: Convert to close
434
435    let open_price = Price::new(candle.open.parse::<f64>()?, price_precision);
436    let high_price = Price::new(candle.high.parse::<f64>()?, price_precision);
437    let low_price = Price::new(candle.low.parse::<f64>()?, price_precision);
438    let close_price = Price::new(candle.close.parse::<f64>()?, price_precision);
439    let volume = Quantity::new(candle.volume.parse::<f64>()?, size_precision);
440
441    // Create a new bar
442    Ok(Bar::new(
443        bar_type,
444        open_price,
445        high_price,
446        low_price,
447        close_price,
448        volume,
449        ts_event,
450        ts_init,
451    ))
452}