nautilus_coinbase_intx/websocket/
messages.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use chrono::{DateTime, Utc};
17use nautilus_model::{
18    data::{Data, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas},
19    events::OrderEventAny,
20    instruments::InstrumentAny,
21};
22use serde::{Deserialize, Serialize};
23use ustr::Ustr;
24
25use super::enums::{CoinbaseIntxWsChannel, WsMessageType, WsOperation};
26use crate::common::enums::{CoinbaseIntxInstrumentType, CoinbaseIntxSide};
27
28#[derive(Debug, Clone)]
29pub enum NautilusWsMessage {
30    Data(Data),
31    DataVec(Vec<Data>),
32    Deltas(OrderBookDeltas),
33    Instrument(InstrumentAny),
34    OrderEvent(OrderEventAny),
35    MarkPrice(MarkPriceUpdate),
36    IndexPrice(IndexPriceUpdate),
37    MarkAndIndex((MarkPriceUpdate, IndexPriceUpdate)),
38}
39
40#[derive(Debug, Serialize)]
41pub struct CoinbaseIntxSubscription {
42    #[serde(rename = "type")]
43    pub op: WsOperation,
44    #[serde(skip_serializing_if = "Option::is_none")]
45    pub product_ids: Option<Vec<Ustr>>,
46    pub channels: Vec<CoinbaseIntxWsChannel>,
47    pub time: String,
48    pub key: Ustr,
49    pub passphrase: Ustr,
50    pub signature: String,
51}
52
53#[derive(Debug, Deserialize)]
54#[serde(untagged)]
55#[allow(clippy::large_enum_variant)]
56pub enum CoinbaseIntxWsMessage {
57    Reject(CoinbaseIntxWsRejectMsg),
58    Confirmation(CoinbaseIntxWsConfirmationMsg),
59    Instrument(CoinbaseIntxWsInstrumentMsg),
60    Funding(CoinbaseIntxWsFundingMsg),
61    Risk(CoinbaseIntxWsRiskMsg),
62    BookSnapshot(CoinbaseIntxWsOrderBookSnapshotMsg),
63    BookUpdate(CoinbaseIntxWsOrderBookUpdateMsg),
64    Quote(CoinbaseIntxWsQuoteMsg),
65    Trade(CoinbaseIntxWsTradeMsg),
66    CandleSnapshot(CoinbaseIntxWsCandleSnapshotMsg),
67    CandleUpdate(CoinbaseIntxWsCandleUpdateMsg),
68}
69
70#[derive(Debug, Deserialize)]
71pub struct CoinbaseIntxWsRejectMsg {
72    pub message: String,
73    pub reason: String,
74    pub channel: CoinbaseIntxWsChannel,
75}
76
77#[derive(Debug, Deserialize)]
78pub struct CoinbaseIntxWsConfirmationMsg {
79    pub channels: Vec<CoinbaseIntxWsChannelDetails>,
80    pub authenticated: bool,
81    pub channel: CoinbaseIntxWsChannel,
82    pub time: DateTime<Utc>,
83}
84
85#[derive(Debug, Deserialize)]
86pub struct CoinbaseIntxWsChannelDetails {
87    pub name: CoinbaseIntxWsChannel,
88    pub product_ids: Option<Vec<Ustr>>,
89}
90
91#[derive(Debug, Deserialize)]
92pub struct CoinbaseIntxWsInstrumentMsg {
93    #[serde(rename = "type")]
94    pub message_type: WsMessageType,
95    pub channel: CoinbaseIntxWsChannel,
96    pub product_id: Ustr,
97    pub instrument_type: CoinbaseIntxInstrumentType,
98    pub instrument_mode: String,
99    pub base_asset_name: String,
100    pub quote_asset_name: String,
101    pub base_increment: String,
102    pub quote_increment: String,
103    pub avg_daily_quantity: String,
104    pub avg_daily_volume: String,
105    pub total30_day_quantity: String,
106    pub total30_day_volume: String,
107    pub total24_hour_quantity: String,
108    pub total24_hour_volume: String,
109    pub base_imf: String,
110    pub min_quantity: String,
111    pub position_size_limit: Option<String>,
112    pub position_notional_limit: Option<String>,
113    pub funding_interval: Option<String>,
114    pub trading_state: String,
115    pub last_updated_time: DateTime<Utc>,
116    pub default_initial_margin: Option<String>,
117    pub base_asset_multiplier: String,
118    pub underlying_type: CoinbaseIntxInstrumentType,
119    pub sequence: u64,
120    pub time: DateTime<Utc>,
121}
122
123#[derive(Debug, Deserialize)]
124pub struct CoinbaseIntxWsFundingMsg {
125    #[serde(rename = "type")]
126    pub message_type: WsMessageType,
127    pub channel: CoinbaseIntxWsChannel,
128    pub product_id: Ustr,
129    pub funding_rate: String,
130    pub is_final: bool,
131    pub sequence: u64,
132    pub time: DateTime<Utc>,
133}
134
135#[derive(Debug, Deserialize)]
136pub struct CoinbaseIntxWsRiskMsg {
137    #[serde(rename = "type")]
138    pub message_type: WsMessageType,
139    pub channel: CoinbaseIntxWsChannel,
140    pub product_id: Ustr,
141    pub limit_up: String,
142    pub limit_down: String,
143    pub index_price: String,
144    pub mark_price: String,
145    pub settlement_price: String,
146    pub open_interest: String,
147    pub sequence: u64,
148    pub time: DateTime<Utc>,
149}
150
151#[derive(Debug, Deserialize)]
152pub struct CoinbaseIntxWsOrderBookSnapshotMsg {
153    #[serde(rename = "type")]
154    pub message_type: WsMessageType,
155    pub channel: CoinbaseIntxWsChannel,
156    pub product_id: Ustr,
157    pub bids: Vec<[String; 2]>, // [price, size]
158    pub asks: Vec<[String; 2]>, // [price, size]
159    pub sequence: u64,
160    pub time: DateTime<Utc>,
161}
162
163#[derive(Debug, Deserialize)]
164pub struct CoinbaseIntxWsOrderBookUpdateMsg {
165    #[serde(rename = "type")]
166    pub message_type: WsMessageType,
167    pub channel: CoinbaseIntxWsChannel,
168    pub product_id: Ustr,
169    pub changes: Vec<[String; 3]>, // [side, price, size]
170    pub sequence: u64,
171    pub time: DateTime<Utc>,
172}
173
174#[derive(Debug, Deserialize)]
175pub struct CoinbaseIntxWsQuoteMsg {
176    #[serde(rename = "type")]
177    pub message_type: WsMessageType,
178    pub channel: CoinbaseIntxWsChannel,
179    pub product_id: Ustr,
180    pub bid_price: String,
181    pub bid_qty: String,
182    pub ask_price: String,
183    pub ask_qty: String,
184    pub sequence: u64,
185    pub time: DateTime<Utc>,
186}
187
188#[derive(Debug, Deserialize)]
189pub struct CoinbaseIntxWsTradeMsg {
190    #[serde(rename = "type")]
191    pub message_type: WsMessageType,
192    pub channel: CoinbaseIntxWsChannel,
193    pub product_id: Ustr,
194    pub match_id: String,
195    pub trade_price: String,
196    pub trade_qty: String,
197    pub aggressor_side: CoinbaseIntxSide,
198    pub sequence: u64,
199    pub time: DateTime<Utc>,
200}
201
202#[derive(Debug, Deserialize)]
203pub struct CoinbaseIntxWsCandle {
204    pub start: DateTime<Utc>,
205    pub open: String,
206    pub high: String,
207    pub low: String,
208    pub close: String,
209    pub volume: String,
210}
211
212#[derive(Debug, Deserialize)]
213pub struct CoinbaseIntxWsCandleSnapshotMsg {
214    #[serde(rename = "type")]
215    pub message_type: WsMessageType,
216    pub channel: CoinbaseIntxWsChannel,
217    pub product_id: Ustr,
218    pub granularity: Ustr,
219    pub candles: Vec<CoinbaseIntxWsCandle>,
220    pub sequence: u64,
221}
222
223#[derive(Debug, Deserialize)]
224pub struct CoinbaseIntxWsCandleUpdateMsg {
225    #[serde(rename = "type")]
226    pub message_type: WsMessageType,
227    pub channel: CoinbaseIntxWsChannel,
228    pub product_id: Ustr,
229    pub start: DateTime<Utc>,
230    #[serde(default)]
231    pub open: Option<String>,
232    #[serde(default)]
233    pub high: Option<String>,
234    #[serde(default)]
235    pub low: Option<String>,
236    #[serde(default)]
237    pub close: Option<String>,
238    #[serde(default)]
239    pub volume: Option<String>,
240    pub sequence: u64,
241}
242
243////////////////////////////////////////////////////////////////////////////////
244// Tests
245////////////////////////////////////////////////////////////////////////////////
246
247#[cfg(test)]
248mod tests {
249    use rstest::rstest;
250
251    use super::*;
252    use crate::common::testing::load_test_json;
253
254    #[rstest]
255    fn test_parse_asset_model() {
256        let json_data = load_test_json("ws_instruments.json");
257        let parsed: CoinbaseIntxWsInstrumentMsg = serde_json::from_str(&json_data).unwrap();
258
259        assert_eq!(parsed.product_id, "ETH-PERP");
260        assert_eq!(parsed.message_type, WsMessageType::Snapshot);
261        assert_eq!(parsed.channel, CoinbaseIntxWsChannel::Instruments);
262        assert_eq!(parsed.instrument_type, CoinbaseIntxInstrumentType::Perp);
263        assert_eq!(parsed.instrument_mode, "standard");
264        assert_eq!(parsed.base_asset_name, "ETH");
265        assert_eq!(parsed.quote_asset_name, "USDC");
266        assert_eq!(parsed.base_increment, "0.0001");
267        assert_eq!(parsed.quote_increment, "0.01");
268        assert_eq!(parsed.avg_daily_quantity, "229061.15400333333");
269        assert_eq!(parsed.avg_daily_volume, "5.33931093731498E8");
270        assert_eq!(parsed.total30_day_quantity, "6871834.6201");
271        assert_eq!(parsed.total30_day_volume, "1.601793281194494E10");
272        assert_eq!(parsed.total24_hour_quantity, "116705.0261");
273        assert_eq!(parsed.total24_hour_volume, "2.22252453944151E8");
274        assert_eq!(parsed.base_imf, "0.05");
275        assert_eq!(parsed.min_quantity, "0.0001");
276        assert_eq!(parsed.position_size_limit, Some("5841.0594".to_string()));
277        assert_eq!(parsed.position_notional_limit, Some("70000000".to_string()));
278        assert_eq!(parsed.funding_interval, Some("3600000000000".to_string()));
279        assert_eq!(parsed.trading_state, "trading");
280        assert_eq!(
281            parsed.last_updated_time.to_rfc3339(),
282            "2025-03-14T22:00:00+00:00"
283        );
284        assert_eq!(parsed.default_initial_margin, Some("0.2".to_string()));
285        assert_eq!(parsed.base_asset_multiplier, "1.0");
286        assert_eq!(parsed.underlying_type, CoinbaseIntxInstrumentType::Spot);
287        assert_eq!(parsed.sequence, 0);
288        assert_eq!(parsed.time.to_rfc3339(), "2025-03-14T22:59:53.373+00:00");
289    }
290
291    #[rstest]
292    fn test_parse_ws_trade_msg() {
293        let json_data = load_test_json("ws_match.json");
294        let parsed: CoinbaseIntxWsTradeMsg = serde_json::from_str(&json_data).unwrap();
295
296        assert_eq!(parsed.product_id, "BTC-PERP");
297        assert_eq!(parsed.message_type, WsMessageType::Update);
298        assert_eq!(parsed.channel, CoinbaseIntxWsChannel::Match);
299        assert_eq!(parsed.match_id, "423596942694547460");
300        assert_eq!(parsed.trade_price, "84374");
301        assert_eq!(parsed.trade_qty, "0.0213");
302        assert_eq!(parsed.aggressor_side, CoinbaseIntxSide::Buy);
303        assert_eq!(parsed.sequence, 0);
304        assert_eq!(parsed.time.to_rfc3339(), "2025-03-14T23:03:01.189+00:00");
305    }
306
307    #[rstest]
308    fn test_parse_ws_quote_msg() {
309        let json_data = load_test_json("ws_quote.json");
310        let parsed: CoinbaseIntxWsQuoteMsg = serde_json::from_str(&json_data).unwrap();
311
312        assert_eq!(parsed.product_id, "BTC-PERP");
313        assert_eq!(parsed.message_type, WsMessageType::Snapshot);
314        assert_eq!(parsed.channel, CoinbaseIntxWsChannel::Level1);
315        assert_eq!(parsed.bid_price, "84368.5");
316        assert_eq!(parsed.bid_qty, "2.608");
317        assert_eq!(parsed.ask_price, "84368.6");
318        assert_eq!(parsed.ask_qty, "2.9453");
319        assert_eq!(parsed.sequence, 0);
320        assert_eq!(parsed.time.to_rfc3339(), "2025-03-14T23:05:39.533+00:00");
321    }
322
323    #[rstest]
324    fn test_parse_ws_order_book_snapshot_msg() {
325        let json_data = load_test_json("ws_book_snapshot.json");
326        let parsed: CoinbaseIntxWsOrderBookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
327
328        assert_eq!(parsed.product_id, "BTC-PERP");
329        assert_eq!(parsed.message_type, WsMessageType::Snapshot);
330        assert_eq!(parsed.channel, CoinbaseIntxWsChannel::Level2);
331        assert_eq!(parsed.sequence, 0);
332        assert_eq!(parsed.time.to_rfc3339(), "2025-03-14T23:09:43.993+00:00");
333
334        assert_eq!(parsed.bids.len(), 50);
335        assert_eq!(parsed.asks.len(), 50);
336
337        assert_eq!(parsed.bids[0][0], "84323.6");
338        assert_eq!(parsed.bids[0][1], "4.9466");
339
340        assert_eq!(parsed.bids[49][0], "84296.2");
341        assert_eq!(parsed.bids[49][1], "0.0237");
342
343        assert_eq!(parsed.asks[0][0], "84323.7");
344        assert_eq!(parsed.asks[0][1], "2.6944");
345
346        assert_eq!(parsed.asks[49][0], "84346.9");
347        assert_eq!(parsed.asks[49][1], "0.3257");
348    }
349
350    #[rstest]
351    fn test_parse_ws_order_book_update_msg() {
352        let json_data = load_test_json("ws_book_update.json");
353        let parsed: CoinbaseIntxWsOrderBookUpdateMsg = serde_json::from_str(&json_data).unwrap();
354
355        assert_eq!(parsed.product_id, "BTC-PERP");
356        assert_eq!(parsed.message_type, WsMessageType::Update);
357        assert_eq!(parsed.channel, CoinbaseIntxWsChannel::Level2);
358        assert_eq!(parsed.sequence, 1);
359        assert_eq!(parsed.time.to_rfc3339(), "2025-03-14T23:09:44.095+00:00");
360
361        assert_eq!(parsed.changes.len(), 2);
362
363        assert_eq!(parsed.changes[0][0], "BUY"); // side
364        assert_eq!(parsed.changes[0][1], "84296.2"); // price
365        assert_eq!(parsed.changes[0][2], "0"); // size (0 means delete)
366
367        assert_eq!(parsed.changes[1][0], "BUY"); // side
368        assert_eq!(parsed.changes[1][1], "84296.3"); // price
369        assert_eq!(parsed.changes[1][2], "0.1779"); // size
370    }
371
372    #[rstest]
373    fn test_parse_ws_candle_snapshot_msg() {
374        let json_data = load_test_json("ws_candles.json");
375        let parsed: CoinbaseIntxWsCandleSnapshotMsg = serde_json::from_str(&json_data).unwrap();
376
377        assert_eq!(parsed.granularity, "ONE_MINUTE");
378        assert_eq!(parsed.sequence, 0);
379        assert_eq!(parsed.candles.len(), 1);
380
381        let candle = &parsed.candles[0];
382        assert_eq!(candle.start.to_rfc3339(), "2025-03-14T23:14:00+00:00");
383        assert_eq!(candle.open, "1921.71");
384        assert_eq!(candle.high, "1921.71");
385        assert_eq!(candle.low, "1919.87");
386        assert_eq!(candle.close, "1919.87");
387        assert_eq!(candle.volume, "11.2803");
388    }
389}