1use std::sync::Arc;
17
18use chrono::{DateTime, Utc};
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::{
22 Bar, BarType, BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
23 QuoteTick, TradeTick,
24 },
25 enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
26 identifiers::{InstrumentId, TradeId},
27 types::{Price, Quantity},
28};
29use uuid::Uuid;
30
31use super::{
32 message::{BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, TradeMsg, WsMessage},
33 types::InstrumentMiniInfo,
34};
35use crate::parse::{normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action};
36
37#[must_use]
38pub fn parse_tardis_ws_message(msg: WsMessage, info: Arc<InstrumentMiniInfo>) -> Option<Data> {
39 match msg {
40 WsMessage::BookChange(msg) => {
41 if msg.bids.is_empty() && msg.asks.is_empty() {
42 tracing::error!(
43 "Invalid book change for {} {} (empty bids and asks)",
44 msg.exchange,
45 msg.symbol
46 );
47 return None;
48 }
49 Some(Data::Deltas(parse_book_change_msg_as_deltas(
50 msg,
51 info.price_precision,
52 info.size_precision,
53 info.instrument_id,
54 )))
55 }
56 WsMessage::BookSnapshot(msg) => match msg.bids.len() {
57 1 => Some(Data::Quote(parse_book_snapshot_msg_as_quote(
58 msg,
59 info.price_precision,
60 info.size_precision,
61 info.instrument_id,
62 ))),
63 _ => Some(Data::Deltas(parse_book_snapshot_msg_as_deltas(
64 msg,
65 info.price_precision,
66 info.size_precision,
67 info.instrument_id,
68 ))),
69 },
70 WsMessage::Trade(msg) => Some(Data::Trade(parse_trade_msg(
71 msg,
72 info.price_precision,
73 info.size_precision,
74 info.instrument_id,
75 ))),
76 WsMessage::TradeBar(msg) => Some(Data::Bar(parse_bar_msg(
77 msg,
78 info.price_precision,
79 info.size_precision,
80 info.instrument_id,
81 ))),
82 WsMessage::DerivativeTicker(_) => None,
83 WsMessage::Disconnect(_) => None,
84 }
85}
86
87#[must_use]
88pub fn parse_book_change_msg_as_deltas(
89 msg: BookChangeMsg,
90 price_precision: u8,
91 size_precision: u8,
92 instrument_id: InstrumentId,
93) -> OrderBookDeltas_API {
94 parse_book_msg_as_deltas(
95 msg.bids,
96 msg.asks,
97 msg.is_snapshot,
98 price_precision,
99 size_precision,
100 instrument_id,
101 msg.timestamp,
102 msg.local_timestamp,
103 )
104}
105
106#[must_use]
107pub fn parse_book_snapshot_msg_as_deltas(
108 msg: BookSnapshotMsg,
109 price_precision: u8,
110 size_precision: u8,
111 instrument_id: InstrumentId,
112) -> OrderBookDeltas_API {
113 parse_book_msg_as_deltas(
114 msg.bids,
115 msg.asks,
116 true,
117 price_precision,
118 size_precision,
119 instrument_id,
120 msg.timestamp,
121 msg.local_timestamp,
122 )
123}
124
125#[allow(clippy::too_many_arguments)]
126#[must_use]
127pub fn parse_book_msg_as_deltas(
128 bids: Vec<BookLevel>,
129 asks: Vec<BookLevel>,
130 is_snapshot: bool,
131 price_precision: u8,
132 size_precision: u8,
133 instrument_id: InstrumentId,
134 timestamp: DateTime<Utc>,
135 local_timestamp: DateTime<Utc>,
136) -> OrderBookDeltas_API {
137 let ts_event = UnixNanos::from(timestamp.timestamp_nanos_opt().unwrap() as u64);
138 let ts_init = UnixNanos::from(local_timestamp.timestamp_nanos_opt().unwrap() as u64);
139
140 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(bids.len() + asks.len());
141
142 for level in bids {
143 deltas.push(parse_book_level(
144 instrument_id,
145 price_precision,
146 size_precision,
147 OrderSide::Buy,
148 level,
149 is_snapshot,
150 ts_event,
151 ts_init,
152 ));
153 }
154
155 for level in asks {
156 deltas.push(parse_book_level(
157 instrument_id,
158 price_precision,
159 size_precision,
160 OrderSide::Sell,
161 level,
162 is_snapshot,
163 ts_event,
164 ts_init,
165 ));
166 }
167
168 if let Some(last_delta) = deltas.last_mut() {
169 last_delta.flags += RecordFlag::F_LAST.value();
170 }
171
172 OrderBookDeltas_API::new(OrderBookDeltas::new(instrument_id, deltas))
174}
175
176#[allow(clippy::too_many_arguments)]
177#[must_use]
178pub fn parse_book_level(
179 instrument_id: InstrumentId,
180 price_precision: u8,
181 size_precision: u8,
182 side: OrderSide,
183 level: BookLevel,
184 is_snapshot: bool,
185 ts_event: UnixNanos,
186 ts_init: UnixNanos,
187) -> OrderBookDelta {
188 let amount = normalize_amount(level.amount, size_precision);
189 let action = parse_book_action(is_snapshot, amount);
190 let price = Price::new(level.price, price_precision);
191 let size = Quantity::new(amount, size_precision);
192 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
194 let flags = if is_snapshot {
195 RecordFlag::F_SNAPSHOT.value()
196 } else {
197 0
198 };
199 let sequence = 0; assert!(
202 !(action != BookAction::Delete && size.is_zero()),
203 "Invalid zero size for {action}"
204 );
205
206 OrderBookDelta::new(
207 instrument_id,
208 action,
209 order,
210 flags,
211 sequence,
212 ts_event,
213 ts_init,
214 )
215}
216
217#[must_use]
218pub fn parse_book_snapshot_msg_as_quote(
219 msg: BookSnapshotMsg,
220 price_precision: u8,
221 size_precision: u8,
222 instrument_id: InstrumentId,
223) -> QuoteTick {
224 let ts_event = UnixNanos::from(msg.timestamp);
225 let ts_init = UnixNanos::from(msg.local_timestamp);
226
227 let best_bid = &msg.bids[0];
228 let bid_price = Price::new(best_bid.price, price_precision);
229 let bid_size = Quantity::new(best_bid.amount, size_precision);
230
231 let best_ask = &msg.asks[0];
232 let ask_price = Price::new(best_ask.price, price_precision);
233 let ask_size = Quantity::new(best_ask.amount, size_precision);
234
235 QuoteTick::new(
236 instrument_id,
237 bid_price,
238 ask_price,
239 bid_size,
240 ask_size,
241 ts_event,
242 ts_init,
243 )
244}
245
246#[must_use]
247pub fn parse_trade_msg(
248 msg: TradeMsg,
249 price_precision: u8,
250 size_precision: u8,
251 instrument_id: InstrumentId,
252) -> TradeTick {
253 let price = Price::new(msg.price, price_precision);
254 let size = Quantity::new(msg.amount, size_precision);
255 let aggressor_side = parse_aggressor_side(&msg.side);
256 let trade_id = TradeId::new(msg.id.unwrap_or_else(|| Uuid::new_v4().to_string()));
257 let ts_event = UnixNanos::from(msg.timestamp);
258 let ts_init = UnixNanos::from(msg.local_timestamp);
259
260 TradeTick::new(
261 instrument_id,
262 price,
263 size,
264 aggressor_side,
265 trade_id,
266 ts_event,
267 ts_init,
268 )
269}
270
271#[must_use]
272pub fn parse_bar_msg(
273 msg: BarMsg,
274 price_precision: u8,
275 size_precision: u8,
276 instrument_id: InstrumentId,
277) -> Bar {
278 let spec = parse_bar_spec(&msg.name);
279 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
280
281 let open = Price::new(msg.open, price_precision);
282 let high = Price::new(msg.high, price_precision);
283 let low = Price::new(msg.low, price_precision);
284 let close = Price::new(msg.close, price_precision);
285 let volume = Quantity::new(msg.volume, size_precision);
286 let ts_event = UnixNanos::from(msg.timestamp);
287 let ts_init = UnixNanos::from(msg.local_timestamp);
288
289 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
290}
291
292#[cfg(test)]
296mod tests {
297 use nautilus_model::enums::{AggressorSide, BookAction};
298 use rstest::rstest;
299
300 use super::*;
301 use crate::tests::load_test_json;
302
303 #[rstest]
304 fn test_parse_book_change_message() {
305 let json_data = load_test_json("book_change.json");
306 let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
307
308 let price_precision = 0;
309 let size_precision = 0;
310 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
311 let deltas =
312 parse_book_change_msg_as_deltas(msg, price_precision, size_precision, instrument_id);
313
314 assert_eq!(deltas.deltas.len(), 1);
315 assert_eq!(deltas.instrument_id, instrument_id);
316 assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
317 assert_eq!(deltas.sequence, 0);
318 assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
319 assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
320 assert_eq!(
321 deltas.deltas[0].instrument_id,
322 InstrumentId::from("XBTUSD.BITMEX")
323 );
324 assert_eq!(deltas.deltas[0].action, BookAction::Update);
325 assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
326 assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
327 assert_eq!(deltas.deltas[0].order.order_id, 0);
328 assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
329 assert_eq!(deltas.deltas[0].sequence, 0);
330 assert_eq!(
331 deltas.deltas[0].ts_event,
332 UnixNanos::from(1571830193469000000)
333 );
334 assert_eq!(
335 deltas.deltas[0].ts_init,
336 UnixNanos::from(1571830193469000000)
337 );
338 }
339
340 #[rstest]
341 fn test_parse_book_snapshot_message_as_deltas() {
342 let json_data = load_test_json("book_snapshot.json");
343 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
344
345 let price_precision = 1;
346 let size_precision = 0;
347 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
348 let deltas =
349 parse_book_snapshot_msg_as_deltas(msg, price_precision, size_precision, instrument_id);
350 let delta_0 = deltas.deltas[0];
351 let delta_2 = deltas.deltas[2];
352
353 assert_eq!(deltas.deltas.len(), 4);
354 assert_eq!(deltas.instrument_id, instrument_id);
355 assert_eq!(
356 deltas.flags,
357 RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
358 );
359 assert_eq!(deltas.sequence, 0);
360 assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
361 assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
362 assert_eq!(delta_0.instrument_id, instrument_id);
363 assert_eq!(delta_0.action, BookAction::Add);
364 assert_eq!(delta_0.order.side, OrderSide::Buy);
365 assert_eq!(delta_0.order.price, Price::from("7633.5"));
366 assert_eq!(delta_0.order.size, Quantity::from(1906067));
367 assert_eq!(delta_0.order.order_id, 0);
368 assert_eq!(delta_0.flags, RecordFlag::F_SNAPSHOT.value());
369 assert_eq!(delta_0.sequence, 0);
370 assert_eq!(delta_0.ts_event, UnixNanos::from(1572010786950000000));
371 assert_eq!(delta_0.ts_init, UnixNanos::from(1572010786961000000));
372 assert_eq!(delta_2.instrument_id, instrument_id);
373 assert_eq!(delta_2.action, BookAction::Add);
374 assert_eq!(delta_2.order.side, OrderSide::Sell);
375 assert_eq!(delta_2.order.price, Price::from("7634.0"));
376 assert_eq!(delta_2.order.size, Quantity::from(1467849));
377 assert_eq!(delta_2.order.order_id, 0);
378 assert_eq!(delta_2.flags, RecordFlag::F_SNAPSHOT.value());
379 assert_eq!(delta_2.sequence, 0);
380 assert_eq!(delta_2.ts_event, UnixNanos::from(1572010786950000000));
381 assert_eq!(delta_2.ts_init, UnixNanos::from(1572010786961000000));
382 }
383
384 #[rstest]
385 fn test_parse_book_snapshot_message_as_quote() {
386 let json_data = load_test_json("book_snapshot.json");
387 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
388
389 let price_precision = 1;
390 let size_precision = 0;
391 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
392 let quote =
393 parse_book_snapshot_msg_as_quote(msg, price_precision, size_precision, instrument_id);
394
395 assert_eq!(quote.instrument_id, instrument_id);
396 assert_eq!(quote.bid_price, Price::from("7633.5"));
397 assert_eq!(quote.bid_size, Quantity::from(1906067));
398 assert_eq!(quote.ask_price, Price::from("7634.0"));
399 assert_eq!(quote.ask_size, Quantity::from(1467849));
400 assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
401 assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
402 }
403
404 #[rstest]
405 fn test_parse_trade_message() {
406 let json_data = load_test_json("trade.json");
407 let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
408
409 let price_precision = 0;
410 let size_precision = 0;
411 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
412 let trade = parse_trade_msg(msg, price_precision, size_precision, instrument_id);
413
414 assert_eq!(trade.instrument_id, instrument_id);
415 assert_eq!(trade.price, Price::from("7996"));
416 assert_eq!(trade.size, Quantity::from(50));
417 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
418 assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
419 assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
420 }
421
422 #[rstest]
423 fn test_parse_bar_message() {
424 let json_data = load_test_json("bar.json");
425 let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
426
427 let price_precision = 1;
428 let size_precision = 0;
429 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
430 let bar = parse_bar_msg(msg, price_precision, size_precision, instrument_id);
431
432 assert_eq!(
433 bar.bar_type,
434 BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
435 );
436 assert_eq!(bar.open, Price::from("7623.5"));
437 assert_eq!(bar.high, Price::from("7623.5"));
438 assert_eq!(bar.low, Price::from("7623"));
439 assert_eq!(bar.close, Price::from("7623.5"));
440 assert_eq!(bar.volume, Quantity::from(37034));
441 assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
442 assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
443 }
444}