1use ahash::AHashMap;
19use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_MILLISECOND};
20use nautilus_model::{
21 data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
22 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
23 identifiers::TradeId,
24 instruments::{Instrument, InstrumentAny},
25 types::{Price, Quantity},
26};
27use ustr::Ustr;
28
29use super::{
30 enums::{DeribitBookAction, DeribitBookMsgType},
31 messages::{DeribitBookMsg, DeribitQuoteMsg, DeribitTickerMsg, DeribitTradeMsg},
32};
33
34pub fn parse_trade_msg(
40 msg: &DeribitTradeMsg,
41 instrument: &InstrumentAny,
42 ts_init: UnixNanos,
43) -> anyhow::Result<TradeTick> {
44 let instrument_id = instrument.id();
45 let price_precision = instrument.price_precision();
46 let size_precision = instrument.size_precision();
47
48 let price = Price::new(msg.price, price_precision);
49 let size = Quantity::new(msg.amount.abs(), size_precision);
50
51 let aggressor_side = match msg.direction.as_str() {
52 "buy" => AggressorSide::Buyer,
53 "sell" => AggressorSide::Seller,
54 _ => AggressorSide::NoAggressor,
55 };
56
57 let trade_id = TradeId::new(&msg.trade_id);
58 let ts_event = UnixNanos::new(msg.timestamp * NANOSECONDS_IN_MILLISECOND);
59
60 TradeTick::new_checked(
61 instrument_id,
62 price,
63 size,
64 aggressor_side,
65 trade_id,
66 ts_event,
67 ts_init,
68 )
69}
70
71pub fn parse_trades_data(
73 trades: Vec<DeribitTradeMsg>,
74 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
75 ts_init: UnixNanos,
76) -> Vec<Data> {
77 trades
78 .iter()
79 .filter_map(|msg| {
80 instruments_cache
81 .get(&msg.instrument_name)
82 .and_then(|inst| parse_trade_msg(msg, inst, ts_init).ok())
83 .map(Data::Trade)
84 })
85 .collect()
86}
87
88#[allow(dead_code)] fn convert_book_action(action: &DeribitBookAction) -> BookAction {
91 match action {
92 DeribitBookAction::New => BookAction::Add,
93 DeribitBookAction::Change => BookAction::Update,
94 DeribitBookAction::Delete => BookAction::Delete,
95 }
96}
97
98pub fn parse_book_snapshot(
104 msg: &DeribitBookMsg,
105 instrument: &InstrumentAny,
106 ts_init: UnixNanos,
107) -> anyhow::Result<OrderBookDeltas> {
108 let instrument_id = instrument.id();
109 let price_precision = instrument.price_precision();
110 let size_precision = instrument.size_precision();
111 let ts_event = UnixNanos::new(msg.timestamp * NANOSECONDS_IN_MILLISECOND);
112
113 let mut deltas = Vec::new();
114
115 deltas.push(OrderBookDelta::clear(
117 instrument_id,
118 msg.change_id,
119 ts_event,
120 ts_init,
121 ));
122
123 for bid in &msg.bids {
125 if bid.len() >= 3 {
126 let price_val = bid[1].as_f64().unwrap_or(0.0);
128 let amount_val = bid[2].as_f64().unwrap_or(0.0);
129
130 if amount_val > 0.0 {
131 let price = Price::new(price_val, price_precision);
132 let size = Quantity::new(amount_val, size_precision);
133
134 deltas.push(OrderBookDelta::new(
135 instrument_id,
136 BookAction::Add,
137 BookOrder::new(OrderSide::Buy, price, size, 0),
138 0, msg.change_id,
140 ts_event,
141 ts_init,
142 ));
143 }
144 }
145 }
146
147 for ask in &msg.asks {
149 if ask.len() >= 3 {
150 let price_val = ask[1].as_f64().unwrap_or(0.0);
152 let amount_val = ask[2].as_f64().unwrap_or(0.0);
153
154 if amount_val > 0.0 {
155 let price = Price::new(price_val, price_precision);
156 let size = Quantity::new(amount_val, size_precision);
157
158 deltas.push(OrderBookDelta::new(
159 instrument_id,
160 BookAction::Add,
161 BookOrder::new(OrderSide::Sell, price, size, 0),
162 0, msg.change_id,
164 ts_event,
165 ts_init,
166 ));
167 }
168 }
169 }
170
171 if let Some(last) = deltas.last_mut() {
173 *last = OrderBookDelta::new(
174 last.instrument_id,
175 last.action,
176 last.order,
177 RecordFlag::F_LAST as u8,
178 last.sequence,
179 last.ts_event,
180 last.ts_init,
181 );
182 }
183
184 Ok(OrderBookDeltas::new(instrument_id, deltas))
185}
186
187pub fn parse_book_delta(
193 msg: &DeribitBookMsg,
194 instrument: &InstrumentAny,
195 ts_init: UnixNanos,
196) -> anyhow::Result<OrderBookDeltas> {
197 let instrument_id = instrument.id();
198 let price_precision = instrument.price_precision();
199 let size_precision = instrument.size_precision();
200 let ts_event = UnixNanos::new(msg.timestamp * NANOSECONDS_IN_MILLISECOND);
201
202 let mut deltas = Vec::new();
203
204 for bid in &msg.bids {
206 if bid.len() >= 3 {
207 let action_str = bid[0].as_str().unwrap_or("new");
208 let price_val = bid[1].as_f64().unwrap_or(0.0);
209 let amount_val = bid[2].as_f64().unwrap_or(0.0);
210
211 let action = match action_str {
212 "new" => BookAction::Add,
213 "change" => BookAction::Update,
214 "delete" => BookAction::Delete,
215 _ => continue,
216 };
217
218 let price = Price::new(price_val, price_precision);
219 let size = Quantity::new(amount_val.abs(), size_precision);
220
221 deltas.push(OrderBookDelta::new(
222 instrument_id,
223 action,
224 BookOrder::new(OrderSide::Buy, price, size, 0),
225 0, msg.change_id,
227 ts_event,
228 ts_init,
229 ));
230 }
231 }
232
233 for ask in &msg.asks {
235 if ask.len() >= 3 {
236 let action_str = ask[0].as_str().unwrap_or("new");
237 let price_val = ask[1].as_f64().unwrap_or(0.0);
238 let amount_val = ask[2].as_f64().unwrap_or(0.0);
239
240 let action = match action_str {
241 "new" => BookAction::Add,
242 "change" => BookAction::Update,
243 "delete" => BookAction::Delete,
244 _ => continue,
245 };
246
247 let price = Price::new(price_val, price_precision);
248 let size = Quantity::new(amount_val.abs(), size_precision);
249
250 deltas.push(OrderBookDelta::new(
251 instrument_id,
252 action,
253 BookOrder::new(OrderSide::Sell, price, size, 0),
254 0, msg.change_id,
256 ts_event,
257 ts_init,
258 ));
259 }
260 }
261
262 if let Some(last) = deltas.last_mut() {
264 *last = OrderBookDelta::new(
265 last.instrument_id,
266 last.action,
267 last.order,
268 RecordFlag::F_LAST as u8,
269 last.sequence,
270 last.ts_event,
271 last.ts_init,
272 );
273 }
274
275 Ok(OrderBookDeltas::new(instrument_id, deltas))
276}
277
278pub fn parse_book_msg(
284 msg: &DeribitBookMsg,
285 instrument: &InstrumentAny,
286 ts_init: UnixNanos,
287) -> anyhow::Result<OrderBookDeltas> {
288 match msg.msg_type {
289 DeribitBookMsgType::Snapshot => parse_book_snapshot(msg, instrument, ts_init),
290 DeribitBookMsgType::Change => parse_book_delta(msg, instrument, ts_init),
291 }
292}
293
294pub fn parse_ticker_to_quote(
300 msg: &DeribitTickerMsg,
301 instrument: &InstrumentAny,
302 ts_init: UnixNanos,
303) -> anyhow::Result<QuoteTick> {
304 let instrument_id = instrument.id();
305 let price_precision = instrument.price_precision();
306 let size_precision = instrument.size_precision();
307
308 let bid_price = Price::new(msg.best_bid_price.unwrap_or(0.0), price_precision);
309 let ask_price = Price::new(msg.best_ask_price.unwrap_or(0.0), price_precision);
310 let bid_size = Quantity::new(msg.best_bid_amount.unwrap_or(0.0), size_precision);
311 let ask_size = Quantity::new(msg.best_ask_amount.unwrap_or(0.0), size_precision);
312 let ts_event = UnixNanos::new(msg.timestamp * NANOSECONDS_IN_MILLISECOND);
313
314 QuoteTick::new_checked(
315 instrument_id,
316 bid_price,
317 ask_price,
318 bid_size,
319 ask_size,
320 ts_event,
321 ts_init,
322 )
323}
324
325pub fn parse_quote_msg(
331 msg: &DeribitQuoteMsg,
332 instrument: &InstrumentAny,
333 ts_init: UnixNanos,
334) -> anyhow::Result<QuoteTick> {
335 let instrument_id = instrument.id();
336 let price_precision = instrument.price_precision();
337 let size_precision = instrument.size_precision();
338
339 let bid_price = Price::new(msg.best_bid_price, price_precision);
340 let ask_price = Price::new(msg.best_ask_price, price_precision);
341 let bid_size = Quantity::new(msg.best_bid_amount, size_precision);
342 let ask_size = Quantity::new(msg.best_ask_amount, size_precision);
343 let ts_event = UnixNanos::new(msg.timestamp * NANOSECONDS_IN_MILLISECOND);
344
345 QuoteTick::new_checked(
346 instrument_id,
347 bid_price,
348 ask_price,
349 bid_size,
350 ask_size,
351 ts_event,
352 ts_init,
353 )
354}
355
356#[cfg(test)]
357mod tests {
358 use rstest::rstest;
359
360 use super::*;
361 use crate::{
362 common::{parse::parse_deribit_instrument_any, testing::load_test_json},
363 http::models::{DeribitInstrument, DeribitJsonRpcResponse},
364 };
365
366 fn test_perpetual_instrument() -> InstrumentAny {
368 let json = load_test_json("http_get_instruments.json");
369 let response: DeribitJsonRpcResponse<Vec<DeribitInstrument>> =
370 serde_json::from_str(&json).unwrap();
371 let instrument = &response.result.unwrap()[0];
372 parse_deribit_instrument_any(instrument, UnixNanos::default(), UnixNanos::default())
373 .unwrap()
374 .unwrap()
375 }
376
377 #[rstest]
378 fn test_parse_trade_msg_sell() {
379 let instrument = test_perpetual_instrument();
380 let json = load_test_json("ws_trades.json");
381 let response: serde_json::Value = serde_json::from_str(&json).unwrap();
382 let trades: Vec<DeribitTradeMsg> =
383 serde_json::from_value(response["params"]["data"].clone()).unwrap();
384 let msg = &trades[0];
385
386 let tick = parse_trade_msg(msg, &instrument, UnixNanos::default()).unwrap();
387
388 assert_eq!(tick.instrument_id, instrument.id());
389 assert_eq!(tick.price, instrument.make_price(92294.5));
390 assert_eq!(tick.size, instrument.make_qty(10.0, None));
391 assert_eq!(tick.aggressor_side, AggressorSide::Seller);
392 assert_eq!(tick.trade_id.to_string(), "403691824");
393 assert_eq!(tick.ts_event, UnixNanos::new(1_765_531_356_452_000_000));
394 }
395
396 #[rstest]
397 fn test_parse_trade_msg_buy() {
398 let instrument = test_perpetual_instrument();
399 let json = load_test_json("ws_trades.json");
400 let response: serde_json::Value = serde_json::from_str(&json).unwrap();
401 let trades: Vec<DeribitTradeMsg> =
402 serde_json::from_value(response["params"]["data"].clone()).unwrap();
403 let msg = &trades[1];
404
405 let tick = parse_trade_msg(msg, &instrument, UnixNanos::default()).unwrap();
406
407 assert_eq!(tick.instrument_id, instrument.id());
408 assert_eq!(tick.price, instrument.make_price(92288.5));
409 assert_eq!(tick.size, instrument.make_qty(750.0, None));
410 assert_eq!(tick.aggressor_side, AggressorSide::Seller);
411 assert_eq!(tick.trade_id.to_string(), "403691825");
412 }
413
414 #[rstest]
415 fn test_parse_book_snapshot() {
416 let instrument = test_perpetual_instrument();
417 let json = load_test_json("ws_book_snapshot.json");
418 let response: serde_json::Value = serde_json::from_str(&json).unwrap();
419 let msg: DeribitBookMsg =
420 serde_json::from_value(response["params"]["data"].clone()).unwrap();
421
422 let deltas = parse_book_snapshot(&msg, &instrument, UnixNanos::default()).unwrap();
423
424 assert_eq!(deltas.instrument_id, instrument.id());
425 assert_eq!(deltas.deltas.len(), 11);
427
428 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
430
431 let first_bid = &deltas.deltas[1];
433 assert_eq!(first_bid.action, BookAction::Add);
434 assert_eq!(first_bid.order.side, OrderSide::Buy);
435 assert_eq!(first_bid.order.price, instrument.make_price(42500.0));
436 assert_eq!(first_bid.order.size, instrument.make_qty(1000.0, None));
437
438 let first_ask = &deltas.deltas[6];
440 assert_eq!(first_ask.action, BookAction::Add);
441 assert_eq!(first_ask.order.side, OrderSide::Sell);
442 assert_eq!(first_ask.order.price, instrument.make_price(42501.0));
443 assert_eq!(first_ask.order.size, instrument.make_qty(800.0, None));
444
445 let last = deltas.deltas.last().unwrap();
447 assert_eq!(
448 last.flags & RecordFlag::F_LAST as u8,
449 RecordFlag::F_LAST as u8
450 );
451 }
452
453 #[rstest]
454 fn test_parse_book_delta() {
455 let instrument = test_perpetual_instrument();
456 let json = load_test_json("ws_book_delta.json");
457 let response: serde_json::Value = serde_json::from_str(&json).unwrap();
458 let msg: DeribitBookMsg =
459 serde_json::from_value(response["params"]["data"].clone()).unwrap();
460
461 let deltas = parse_book_delta(&msg, &instrument, UnixNanos::default()).unwrap();
462
463 assert_eq!(deltas.instrument_id, instrument.id());
464 assert_eq!(deltas.deltas.len(), 4);
466
467 let bid_change = &deltas.deltas[0];
469 assert_eq!(bid_change.action, BookAction::Update);
470 assert_eq!(bid_change.order.side, OrderSide::Buy);
471 assert_eq!(bid_change.order.price, instrument.make_price(42500.0));
472 assert_eq!(bid_change.order.size, instrument.make_qty(950.0, None));
473
474 let bid_new = &deltas.deltas[1];
476 assert_eq!(bid_new.action, BookAction::Add);
477 assert_eq!(bid_new.order.side, OrderSide::Buy);
478 assert_eq!(bid_new.order.price, instrument.make_price(42498.5));
479 assert_eq!(bid_new.order.size, instrument.make_qty(300.0, None));
480
481 let ask_delete = &deltas.deltas[2];
483 assert_eq!(ask_delete.action, BookAction::Delete);
484 assert_eq!(ask_delete.order.side, OrderSide::Sell);
485 assert_eq!(ask_delete.order.price, instrument.make_price(42501.0));
486 assert_eq!(ask_delete.order.size, instrument.make_qty(0.0, None));
487
488 let ask_change = &deltas.deltas[3];
490 assert_eq!(ask_change.action, BookAction::Update);
491 assert_eq!(ask_change.order.side, OrderSide::Sell);
492 assert_eq!(ask_change.order.price, instrument.make_price(42501.5));
493 assert_eq!(ask_change.order.size, instrument.make_qty(700.0, None));
494
495 let last = deltas.deltas.last().unwrap();
497 assert_eq!(
498 last.flags & RecordFlag::F_LAST as u8,
499 RecordFlag::F_LAST as u8
500 );
501 }
502
503 #[rstest]
504 fn test_parse_ticker_to_quote() {
505 let instrument = test_perpetual_instrument();
506 let json = load_test_json("ws_ticker.json");
507 let response: serde_json::Value = serde_json::from_str(&json).unwrap();
508 let msg: DeribitTickerMsg =
509 serde_json::from_value(response["params"]["data"].clone()).unwrap();
510
511 assert_eq!(msg.instrument_name.as_str(), "BTC-PERPETUAL");
513 assert_eq!(msg.timestamp, 1_765_541_474_086);
514 assert_eq!(msg.best_bid_price, Some(92283.5));
515 assert_eq!(msg.best_ask_price, Some(92284.0));
516 assert_eq!(msg.best_bid_amount, Some(117660.0));
517 assert_eq!(msg.best_ask_amount, Some(186520.0));
518 assert_eq!(msg.mark_price, 92281.78);
519 assert_eq!(msg.index_price, 92263.55);
520 assert_eq!(msg.open_interest, 1132329370.0);
521
522 let quote = parse_ticker_to_quote(&msg, &instrument, UnixNanos::default()).unwrap();
523
524 assert_eq!(quote.instrument_id, instrument.id());
525 assert_eq!(quote.bid_price, instrument.make_price(92283.5));
526 assert_eq!(quote.ask_price, instrument.make_price(92284.0));
527 assert_eq!(quote.bid_size, instrument.make_qty(117660.0, None));
528 assert_eq!(quote.ask_size, instrument.make_qty(186520.0, None));
529 assert_eq!(quote.ts_event, UnixNanos::new(1_765_541_474_086_000_000));
530 }
531
532 #[rstest]
533 fn test_parse_quote_msg() {
534 let instrument = test_perpetual_instrument();
535 let json = load_test_json("ws_quote.json");
536 let response: serde_json::Value = serde_json::from_str(&json).unwrap();
537 let msg: DeribitQuoteMsg =
538 serde_json::from_value(response["params"]["data"].clone()).unwrap();
539
540 assert_eq!(msg.instrument_name.as_str(), "BTC-PERPETUAL");
542 assert_eq!(msg.timestamp, 1_765_541_767_174);
543 assert_eq!(msg.best_bid_price, 92288.0);
544 assert_eq!(msg.best_ask_price, 92288.5);
545 assert_eq!(msg.best_bid_amount, 133440.0);
546 assert_eq!(msg.best_ask_amount, 99470.0);
547
548 let quote = parse_quote_msg(&msg, &instrument, UnixNanos::default()).unwrap();
549
550 assert_eq!(quote.instrument_id, instrument.id());
551 assert_eq!(quote.bid_price, instrument.make_price(92288.0));
552 assert_eq!(quote.ask_price, instrument.make_price(92288.5));
553 assert_eq!(quote.bid_size, instrument.make_qty(133440.0, None));
554 assert_eq!(quote.ask_size, instrument.make_qty(99470.0, None));
555 assert_eq!(quote.ts_event, UnixNanos::new(1_765_541_767_174_000_000));
556 }
557
558 #[rstest]
559 fn test_parse_book_msg_snapshot() {
560 let instrument = test_perpetual_instrument();
561 let json = load_test_json("ws_book_snapshot.json");
562 let response: serde_json::Value = serde_json::from_str(&json).unwrap();
563 let msg: DeribitBookMsg =
564 serde_json::from_value(response["params"]["data"].clone()).unwrap();
565
566 assert_eq!(
568 msg.bids[0].len(),
569 3,
570 "Snapshot bids should have 3 elements: [action, price, amount]"
571 );
572 assert_eq!(
573 msg.bids[0][0].as_str(),
574 Some("new"),
575 "First element should be 'new' action for snapshot"
576 );
577 assert_eq!(
578 msg.asks[0].len(),
579 3,
580 "Snapshot asks should have 3 elements: [action, price, amount]"
581 );
582 assert_eq!(
583 msg.asks[0][0].as_str(),
584 Some("new"),
585 "First element should be 'new' action for snapshot"
586 );
587
588 let deltas = parse_book_msg(&msg, &instrument, UnixNanos::default()).unwrap();
589
590 assert_eq!(deltas.instrument_id, instrument.id());
591 assert_eq!(deltas.deltas.len(), 11);
593
594 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
596
597 let first_bid = &deltas.deltas[1];
599 assert_eq!(first_bid.action, BookAction::Add);
600 assert_eq!(first_bid.order.side, OrderSide::Buy);
601 assert_eq!(first_bid.order.price, instrument.make_price(42500.0));
602 assert_eq!(first_bid.order.size, instrument.make_qty(1000.0, None));
603
604 let first_ask = &deltas.deltas[6];
606 assert_eq!(first_ask.action, BookAction::Add);
607 assert_eq!(first_ask.order.side, OrderSide::Sell);
608 assert_eq!(first_ask.order.price, instrument.make_price(42501.0));
609 assert_eq!(first_ask.order.size, instrument.make_qty(800.0, None));
610 }
611
612 #[rstest]
613 fn test_parse_book_msg_delta() {
614 let instrument = test_perpetual_instrument();
615 let json = load_test_json("ws_book_delta.json");
616 let response: serde_json::Value = serde_json::from_str(&json).unwrap();
617 let msg: DeribitBookMsg =
618 serde_json::from_value(response["params"]["data"].clone()).unwrap();
619
620 assert_eq!(
622 msg.bids[0].len(),
623 3,
624 "Delta bids should have 3 elements: [action, price, amount]"
625 );
626 assert_eq!(
627 msg.bids[0][0].as_str(),
628 Some("change"),
629 "First bid should be 'change' action"
630 );
631 assert_eq!(
632 msg.bids[1][0].as_str(),
633 Some("new"),
634 "Second bid should be 'new' action"
635 );
636 assert_eq!(
637 msg.asks[0].len(),
638 3,
639 "Delta asks should have 3 elements: [action, price, amount]"
640 );
641 assert_eq!(
642 msg.asks[0][0].as_str(),
643 Some("delete"),
644 "First ask should be 'delete' action"
645 );
646
647 let deltas = parse_book_msg(&msg, &instrument, UnixNanos::default()).unwrap();
648
649 assert_eq!(deltas.instrument_id, instrument.id());
650 assert_eq!(deltas.deltas.len(), 4);
652
653 assert_ne!(deltas.deltas[0].action, BookAction::Clear);
655
656 let bid_change = &deltas.deltas[0];
658 assert_eq!(bid_change.action, BookAction::Update);
659 assert_eq!(bid_change.order.side, OrderSide::Buy);
660 assert_eq!(bid_change.order.price, instrument.make_price(42500.0));
661 assert_eq!(bid_change.order.size, instrument.make_qty(950.0, None));
662
663 let bid_new = &deltas.deltas[1];
665 assert_eq!(bid_new.action, BookAction::Add);
666 assert_eq!(bid_new.order.side, OrderSide::Buy);
667 assert_eq!(bid_new.order.price, instrument.make_price(42498.5));
668 assert_eq!(bid_new.order.size, instrument.make_qty(300.0, None));
669
670 let ask_delete = &deltas.deltas[2];
672 assert_eq!(ask_delete.action, BookAction::Delete);
673 assert_eq!(ask_delete.order.side, OrderSide::Sell);
674 assert_eq!(ask_delete.order.price, instrument.make_price(42501.0));
675
676 let ask_change = &deltas.deltas[3];
678 assert_eq!(ask_change.action, BookAction::Update);
679 assert_eq!(ask_change.order.side, OrderSide::Sell);
680 assert_eq!(ask_change.order.price, instrument.make_price(42501.5));
681 assert_eq!(ask_change.order.size, instrument.make_qty(700.0, None));
682 }
683}