1use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{
22 Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
23 QuoteTick, TradeTick,
24 },
25 enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
26 identifiers::{InstrumentId, TradeId},
27 types::{Price, Quantity},
28};
29use uuid::Uuid;
30
31use super::{
32 message::{BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, TradeMsg, WsMessage},
33 types::InstrumentMiniInfo,
34};
35use crate::parse::{parse_aggressor_side, parse_bar_spec, parse_book_action};
36
37#[must_use]
38pub fn parse_tardis_ws_message(msg: WsMessage, info: Arc<InstrumentMiniInfo>) -> Option<Data> {
39 match msg {
40 WsMessage::BookChange(msg) => {
41 if msg.bids.is_empty() && msg.asks.is_empty() {
42 tracing::error!(
43 "Invalid book change for {} {} (empty bids and asks)",
44 msg.exchange,
45 msg.symbol
46 );
47 return None;
48 }
49 Some(Data::Deltas(parse_book_change_msg_as_deltas(
50 msg,
51 info.price_precision,
52 info.size_precision,
53 info.instrument_id,
54 )))
55 }
56 WsMessage::BookSnapshot(msg) => match msg.bids.len() {
57 1 => Some(Data::Quote(parse_book_snapshot_msg_as_quote(
58 msg,
59 info.price_precision,
60 info.size_precision,
61 info.instrument_id,
62 ))),
63 _ => Some(Data::Deltas(parse_book_snapshot_msg_as_deltas(
64 msg,
65 info.price_precision,
66 info.size_precision,
67 info.instrument_id,
68 ))),
69 },
70 WsMessage::Trade(msg) => Some(Data::Trade(parse_trade_msg(
71 msg,
72 info.price_precision,
73 info.size_precision,
74 info.instrument_id,
75 ))),
76 WsMessage::TradeBar(msg) => Some(Data::Bar(parse_bar_msg(
77 msg,
78 info.price_precision,
79 info.size_precision,
80 info.instrument_id,
81 ))),
82 WsMessage::DerivativeTicker(_) => None,
83 WsMessage::Disconnect(_) => None,
84 }
85}
86
87#[must_use]
88pub fn parse_book_change_msg_as_deltas(
89 msg: BookChangeMsg,
90 price_precision: u8,
91 size_precision: u8,
92 instrument_id: InstrumentId,
93) -> OrderBookDeltas_API {
94 parse_book_msg_as_deltas(
95 msg.bids,
96 msg.asks,
97 msg.is_snapshot,
98 price_precision,
99 size_precision,
100 instrument_id,
101 msg.timestamp,
102 msg.local_timestamp,
103 )
104}
105
106#[must_use]
107pub fn parse_book_snapshot_msg_as_deltas(
108 msg: BookSnapshotMsg,
109 price_precision: u8,
110 size_precision: u8,
111 instrument_id: InstrumentId,
112) -> OrderBookDeltas_API {
113 parse_book_msg_as_deltas(
114 msg.bids,
115 msg.asks,
116 true,
117 price_precision,
118 size_precision,
119 instrument_id,
120 msg.timestamp,
121 msg.local_timestamp,
122 )
123}
124
125#[allow(clippy::too_many_arguments)]
126#[must_use]
127pub fn parse_book_msg_as_deltas(
128 bids: Vec<BookLevel>,
129 asks: Vec<BookLevel>,
130 is_snapshot: bool,
131 price_precision: u8,
132 size_precision: u8,
133 instrument_id: InstrumentId,
134 timestamp: DateTime<Utc>,
135 local_timestamp: DateTime<Utc>,
136) -> OrderBookDeltas_API {
137 let ts_event = UnixNanos::from(timestamp.timestamp_nanos_opt().unwrap() as u64);
138 let ts_init = UnixNanos::from(local_timestamp.timestamp_nanos_opt().unwrap() as u64);
139
140 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(bids.len() + asks.len());
141
142 for level in bids {
143 deltas.push(parse_book_level(
144 instrument_id,
145 price_precision,
146 size_precision,
147 OrderSide::Buy,
148 level,
149 is_snapshot,
150 ts_event,
151 ts_init,
152 ));
153 }
154
155 for level in asks {
156 deltas.push(parse_book_level(
157 instrument_id,
158 price_precision,
159 size_precision,
160 OrderSide::Sell,
161 level,
162 is_snapshot,
163 ts_event,
164 ts_init,
165 ));
166 }
167
168 if let Some(last_delta) = deltas.last_mut() {
169 last_delta.flags += RecordFlag::F_LAST.value();
170 }
171
172 OrderBookDeltas_API::new(OrderBookDeltas::new(instrument_id, deltas))
174}
175
176#[allow(clippy::too_many_arguments)]
177#[must_use]
178pub fn parse_book_level(
179 instrument_id: InstrumentId,
180 price_precision: u8,
181 size_precision: u8,
182 side: OrderSide,
183 level: BookLevel,
184 is_snapshot: bool,
185 ts_event: UnixNanos,
186 ts_init: UnixNanos,
187) -> OrderBookDelta {
188 let action = parse_book_action(is_snapshot, level.amount);
189 let price = Price::new(level.price, price_precision);
190 let size = Quantity::new(level.amount, size_precision);
191 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
193 let flags = if is_snapshot {
194 RecordFlag::F_SNAPSHOT.value()
195 } else {
196 0
197 };
198 let sequence = 0; assert!(
201 !(action != BookAction::Delete && size.is_zero()),
202 "Invalid zero size for {action}"
203 );
204
205 OrderBookDelta::new(
206 instrument_id,
207 action,
208 order,
209 flags,
210 sequence,
211 ts_event,
212 ts_init,
213 )
214}
215
216#[must_use]
217pub fn parse_book_snapshot_msg_as_quote(
218 msg: BookSnapshotMsg,
219 price_precision: u8,
220 size_precision: u8,
221 instrument_id: InstrumentId,
222) -> QuoteTick {
223 let ts_event = UnixNanos::from(msg.timestamp);
224 let ts_init = UnixNanos::from(msg.local_timestamp);
225
226 let best_bid = &msg.bids[0];
227 let bid_price = Price::new(best_bid.price, price_precision);
228 let bid_size = Quantity::new(best_bid.amount, size_precision);
229
230 let best_ask = &msg.asks[0];
231 let ask_price = Price::new(best_ask.price, price_precision);
232 let ask_size = Quantity::new(best_ask.amount, size_precision);
233
234 QuoteTick::new(
235 instrument_id,
236 bid_price,
237 ask_price,
238 bid_size,
239 ask_size,
240 ts_event,
241 ts_init,
242 )
243}
244
245#[must_use]
246pub fn parse_trade_msg(
247 msg: TradeMsg,
248 price_precision: u8,
249 size_precision: u8,
250 instrument_id: InstrumentId,
251) -> TradeTick {
252 let price = Price::new(msg.price, price_precision);
253 let size = Quantity::new(msg.amount, size_precision);
254 let aggressor_side = parse_aggressor_side(&msg.side);
255 let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
256 let ts_event = UnixNanos::from(msg.timestamp);
257 let ts_init = UnixNanos::from(msg.local_timestamp);
258
259 TradeTick::new(
260 instrument_id,
261 price,
262 size,
263 aggressor_side,
264 trade_id,
265 ts_event,
266 ts_init,
267 )
268}
269
270#[must_use]
271pub fn parse_bar_msg(
272 msg: BarMsg,
273 price_precision: u8,
274 size_precision: u8,
275 instrument_id: InstrumentId,
276) -> Bar {
277 let spec = parse_bar_spec(&msg.name);
278 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
279
280 let open = Price::new(msg.open, price_precision);
281 let high = Price::new(msg.high, price_precision);
282 let low = Price::new(msg.low, price_precision);
283 let close = Price::new(msg.close, price_precision);
284 let volume = Quantity::new(msg.volume, size_precision);
285 let ts_event = UnixNanos::from(msg.timestamp);
286 let ts_init = UnixNanos::from(msg.local_timestamp);
287
288 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
289}
290
291#[cfg(test)]
295mod tests {
296 use nautilus_model::enums::{AggressorSide, BookAction};
297 use rstest::rstest;
298
299 use super::*;
300 use crate::tests::load_test_json;
301
302 #[rstest]
303 fn test_parse_book_change_message() {
304 let json_data = load_test_json("book_change.json");
305 let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
306
307 let price_precision = 0;
308 let size_precision = 0;
309 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
310 let deltas =
311 parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id);
312
313 assert_eq!(deltas.deltas.len(), 1);
314 assert_eq!(deltas.instrument_id, instrument_id);
315 assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
316 assert_eq!(deltas.sequence, 0);
317 assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
318 assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
319 assert_eq!(
320 deltas.deltas[0].instrument_id,
321 InstrumentId::from("XBTUSD.BITMEX")
322 );
323 assert_eq!(deltas.deltas[0].action, BookAction::Update);
324 assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
325 assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
326 assert_eq!(deltas.deltas[0].order.order_id, 0);
327 assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
328 assert_eq!(deltas.deltas[0].sequence, 0);
329 assert_eq!(
330 deltas.deltas[0].ts_event,
331 UnixNanos::from(1571830193469000000)
332 );
333 assert_eq!(
334 deltas.deltas[0].ts_init,
335 UnixNanos::from(1571830193469000000)
336 );
337 }
338
339 #[rstest]
340 fn test_parse_book_snapshot_message_as_deltas() {
341 let json_data = load_test_json("book_snapshot.json");
342 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
343
344 let price_precision = 1;
345 let size_precision = 0;
346 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
347 let deltas =
348 parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id);
349 let delta_0 = deltas.deltas[0];
350 let delta_2 = deltas.deltas[2];
351
352 assert_eq!(deltas.deltas.len(), 4);
353 assert_eq!(deltas.instrument_id, instrument_id);
354 assert_eq!(
355 deltas.flags,
356 RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
357 );
358 assert_eq!(deltas.sequence, 0);
359 assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
360 assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
361 assert_eq!(delta_0.instrument_id, instrument_id);
362 assert_eq!(delta_0.action, BookAction::Add);
363 assert_eq!(delta_0.order.side, OrderSide::Buy);
364 assert_eq!(delta_0.order.price, Price::from("7633.5"));
365 assert_eq!(delta_0.order.size, Quantity::from(1906067));
366 assert_eq!(delta_0.order.order_id, 0);
367 assert_eq!(delta_0.flags, RecordFlag::F_SNAPSHOT.value());
368 assert_eq!(delta_0.sequence, 0);
369 assert_eq!(delta_0.ts_event, UnixNanos::from(1572010786950000000));
370 assert_eq!(delta_0.ts_init, UnixNanos::from(1572010786961000000));
371 assert_eq!(delta_2.instrument_id, instrument_id);
372 assert_eq!(delta_2.action, BookAction::Add);
373 assert_eq!(delta_2.order.side, OrderSide::Sell);
374 assert_eq!(delta_2.order.price, Price::from("7634.0"));
375 assert_eq!(delta_2.order.size, Quantity::from(1467849));
376 assert_eq!(delta_2.order.order_id, 0);
377 assert_eq!(delta_2.flags, RecordFlag::F_SNAPSHOT.value());
378 assert_eq!(delta_2.sequence, 0);
379 assert_eq!(delta_2.ts_event, UnixNanos::from(1572010786950000000));
380 assert_eq!(delta_2.ts_init, UnixNanos::from(1572010786961000000));
381 }
382
383 #[rstest]
384 fn test_parse_book_snapshot_message_as_quote() {
385 let json_data = load_test_json("book_snapshot.json");
386 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
387
388 let price_precision = 1;
389 let size_precision = 0;
390 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
391 let quote =
392 parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id);
393
394 assert_eq!(quote.instrument_id, instrument_id);
395 assert_eq!(quote.bid_price, Price::from("7633.5"));
396 assert_eq!(quote.bid_size, Quantity::from(1906067));
397 assert_eq!(quote.ask_price, Price::from("7634.0"));
398 assert_eq!(quote.ask_size, Quantity::from(1467849));
399 assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
400 assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
401 }
402
403 #[rstest]
404 fn test_parse_trade_message() {
405 let json_data = load_test_json("trade.json");
406 let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
407
408 let price_precision = 0;
409 let size_precision = 0;
410 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
411 let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id);
412
413 assert_eq!(trade.instrument_id, instrument_id);
414 assert_eq!(trade.price, Price::from("7996"));
415 assert_eq!(trade.size, Quantity::from(50));
416 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
417 assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
418 assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
419 }
420
421 #[rstest]
422 fn test_parse_bar_message() {
423 let json_data = load_test_json("bar.json");
424 let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
425
426 let price_precision = 1;
427 let size_precision = 0;
428 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
429 let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id);
430
431 assert_eq!(
432 bar.bar_type,
433 BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
434 );
435 assert_eq!(bar.open, Price::from("7623.5"));
436 assert_eq!(bar.high, Price::from("7623.5"));
437 assert_eq!(bar.low, Price::from("7623"));
438 assert_eq!(bar.close, Price::from("7623.5"));
439 assert_eq!(bar.volume, Quantity::from(37034));
440 assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
441 assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
442 }
443}