1use ahash::AHashMap;
17use nautilus_core::nanos::UnixNanos;
18use nautilus_model::{
19 data::{
20 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
21 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10,
22 QuoteTick, TradeTick, depth::DEPTH10_LEN,
23 },
24 enums::{
25 AggregationSource, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus,
26 OrderType, RecordFlag, TimeInForce,
27 },
28 identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
29 instruments::{Instrument, InstrumentAny},
30 reports::{FillReport, OrderStatusReport},
31 types::{Currency, Money, Price, Quantity},
32};
33use ustr::Ustr;
34
35use super::{
36 enums::OKXWsChannel,
37 messages::{
38 OKXBookMsg, OKXCandleMsg, OKXIndexPriceMsg, OKXMarkPriceMsg, OKXOrderMsg, OKXTickerMsg,
39 OKXTradeMsg, OrderBookEntry,
40 },
41};
42use crate::{
43 common::{
44 enums::{OKXBookAction, OKXCandleConfirm, OKXOrderStatus, OKXOrderType},
45 models::OKXInstrument,
46 parse::{
47 okx_channel_to_bar_spec, parse_client_order_id, parse_fee, parse_funding_rate_msg,
48 parse_instrument_any, parse_message_vec, parse_millisecond_timestamp, parse_price,
49 parse_quantity,
50 },
51 },
52 websocket::messages::{ExecutionReport, NautilusWsMessage, OKXFundingRateMsg},
53};
54
55pub fn parse_book_msg_vec(
57 data: Vec<OKXBookMsg>,
58 instrument_id: &InstrumentId,
59 price_precision: u8,
60 size_precision: u8,
61 action: OKXBookAction,
62 ts_init: UnixNanos,
63) -> anyhow::Result<Vec<Data>> {
64 let mut deltas = Vec::with_capacity(data.len());
65
66 for msg in data {
67 let deltas_api = OrderBookDeltas_API::new(parse_book_msg(
68 &msg,
69 *instrument_id,
70 price_precision,
71 size_precision,
72 &action,
73 ts_init,
74 )?);
75 deltas.push(Data::Deltas(deltas_api));
76 }
77
78 Ok(deltas)
79}
80
81pub fn parse_ticker_msg_vec(
83 data: serde_json::Value,
84 instrument_id: &InstrumentId,
85 price_precision: u8,
86 size_precision: u8,
87 ts_init: UnixNanos,
88) -> anyhow::Result<Vec<Data>> {
89 parse_message_vec(
90 data,
91 |msg| {
92 parse_ticker_msg(
93 msg,
94 *instrument_id,
95 price_precision,
96 size_precision,
97 ts_init,
98 )
99 },
100 Data::Quote,
101 )
102}
103
104pub fn parse_quote_msg_vec(
106 data: serde_json::Value,
107 instrument_id: &InstrumentId,
108 price_precision: u8,
109 size_precision: u8,
110 ts_init: UnixNanos,
111) -> anyhow::Result<Vec<Data>> {
112 parse_message_vec(
113 data,
114 |msg| {
115 parse_quote_msg(
116 msg,
117 *instrument_id,
118 price_precision,
119 size_precision,
120 ts_init,
121 )
122 },
123 Data::Quote,
124 )
125}
126
127pub fn parse_trade_msg_vec(
129 data: serde_json::Value,
130 instrument_id: &InstrumentId,
131 price_precision: u8,
132 size_precision: u8,
133 ts_init: UnixNanos,
134) -> anyhow::Result<Vec<Data>> {
135 parse_message_vec(
136 data,
137 |msg| {
138 parse_trade_msg(
139 msg,
140 *instrument_id,
141 price_precision,
142 size_precision,
143 ts_init,
144 )
145 },
146 Data::Trade,
147 )
148}
149
150pub fn parse_mark_price_msg_vec(
152 data: serde_json::Value,
153 instrument_id: &InstrumentId,
154 price_precision: u8,
155 ts_init: UnixNanos,
156) -> anyhow::Result<Vec<Data>> {
157 parse_message_vec(
158 data,
159 |msg| parse_mark_price_msg(msg, *instrument_id, price_precision, ts_init),
160 Data::MarkPriceUpdate,
161 )
162}
163
164pub fn parse_index_price_msg_vec(
166 data: serde_json::Value,
167 instrument_id: &InstrumentId,
168 price_precision: u8,
169 ts_init: UnixNanos,
170) -> anyhow::Result<Vec<Data>> {
171 parse_message_vec(
172 data,
173 |msg| parse_index_price_msg(msg, *instrument_id, price_precision, ts_init),
174 Data::IndexPriceUpdate,
175 )
176}
177
178pub fn parse_funding_rate_msg_vec(
181 data: serde_json::Value,
182 instrument_id: &InstrumentId,
183 ts_init: UnixNanos,
184 funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
185) -> anyhow::Result<Vec<FundingRateUpdate>> {
186 let msgs: Vec<OKXFundingRateMsg> = serde_json::from_value(data)?;
187
188 let mut result = Vec::with_capacity(msgs.len());
189 for msg in &msgs {
190 let cache_key = (msg.funding_rate, msg.funding_time);
191
192 if let Some(cached) = funding_cache.get(&msg.inst_id)
193 && *cached == cache_key
194 {
195 continue; }
197
198 funding_cache.insert(msg.inst_id, cache_key);
200 let funding_rate = parse_funding_rate_msg(msg, *instrument_id, ts_init)?;
201 result.push(funding_rate);
202 }
203
204 Ok(result)
205}
206
207pub fn parse_candle_msg_vec(
209 data: serde_json::Value,
210 instrument_id: &InstrumentId,
211 price_precision: u8,
212 size_precision: u8,
213 spec: BarSpecification,
214 ts_init: UnixNanos,
215) -> anyhow::Result<Vec<Data>> {
216 let msgs: Vec<OKXCandleMsg> = serde_json::from_value(data)?;
217 let bar_type = BarType::new(*instrument_id, spec, AggregationSource::External);
218 let mut bars = Vec::with_capacity(msgs.len());
219
220 for msg in msgs {
221 if msg.confirm == OKXCandleConfirm::Closed {
223 let bar = parse_candle_msg(&msg, bar_type, price_precision, size_precision, ts_init)?;
224 bars.push(Data::Bar(bar));
225 }
226 }
227
228 Ok(bars)
229}
230
231pub fn parse_book10_msg_vec(
233 data: Vec<OKXBookMsg>,
234 instrument_id: &InstrumentId,
235 price_precision: u8,
236 size_precision: u8,
237 ts_init: UnixNanos,
238) -> anyhow::Result<Vec<Data>> {
239 let mut depth10_updates = Vec::with_capacity(data.len());
240
241 for msg in data {
242 let depth10 = parse_book10_msg(
243 &msg,
244 *instrument_id,
245 price_precision,
246 size_precision,
247 ts_init,
248 )?;
249 depth10_updates.push(Data::Depth10(Box::new(depth10)));
250 }
251
252 Ok(depth10_updates)
253}
254
255pub fn parse_book_msg(
257 msg: &OKXBookMsg,
258 instrument_id: InstrumentId,
259 price_precision: u8,
260 size_precision: u8,
261 action: &OKXBookAction,
262 ts_init: UnixNanos,
263) -> anyhow::Result<OrderBookDeltas> {
264 let flags = if action == &OKXBookAction::Snapshot {
265 RecordFlag::F_SNAPSHOT as u8
266 } else {
267 0
268 };
269 let ts_event = parse_millisecond_timestamp(msg.ts);
270
271 let mut deltas = Vec::with_capacity(msg.asks.len() + msg.bids.len());
272
273 for bid in &msg.bids {
274 let book_action = match action {
275 OKXBookAction::Snapshot => BookAction::Add,
276 _ => match bid.size.as_str() {
277 "0" => BookAction::Delete,
278 _ => BookAction::Update,
279 },
280 };
281 let price = parse_price(&bid.price, price_precision)?;
282 let size = parse_quantity(&bid.size, size_precision)?;
283 let order_id = 0; let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
285 let delta = OrderBookDelta::new(
286 instrument_id,
287 book_action,
288 order,
289 flags,
290 msg.seq_id,
291 ts_event,
292 ts_init,
293 );
294 deltas.push(delta)
295 }
296
297 for ask in &msg.asks {
298 let book_action = match action {
299 OKXBookAction::Snapshot => BookAction::Add,
300 _ => match ask.size.as_str() {
301 "0" => BookAction::Delete,
302 _ => BookAction::Update,
303 },
304 };
305 let price = parse_price(&ask.price, price_precision)?;
306 let size = parse_quantity(&ask.size, size_precision)?;
307 let order_id = 0; let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
309 let delta = OrderBookDelta::new(
310 instrument_id,
311 book_action,
312 order,
313 flags,
314 msg.seq_id,
315 ts_event,
316 ts_init,
317 );
318 deltas.push(delta)
319 }
320
321 OrderBookDeltas::new_checked(instrument_id, deltas)
322}
323
324pub fn parse_quote_msg(
326 msg: &OKXBookMsg,
327 instrument_id: InstrumentId,
328 price_precision: u8,
329 size_precision: u8,
330 ts_init: UnixNanos,
331) -> anyhow::Result<QuoteTick> {
332 let best_bid: &OrderBookEntry = &msg.bids[0];
333 let best_ask: &OrderBookEntry = &msg.asks[0];
334
335 let bid_price = parse_price(&best_bid.price, price_precision)?;
336 let ask_price = parse_price(&best_ask.price, price_precision)?;
337 let bid_size = parse_quantity(&best_bid.size, size_precision)?;
338 let ask_size = parse_quantity(&best_ask.size, size_precision)?;
339 let ts_event = parse_millisecond_timestamp(msg.ts);
340
341 QuoteTick::new_checked(
342 instrument_id,
343 bid_price,
344 ask_price,
345 bid_size,
346 ask_size,
347 ts_event,
348 ts_init,
349 )
350}
351
352pub fn parse_book10_msg(
356 msg: &OKXBookMsg,
357 instrument_id: InstrumentId,
358 price_precision: u8,
359 size_precision: u8,
360 ts_init: UnixNanos,
361) -> anyhow::Result<OrderBookDepth10> {
362 let mut bids: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
364 let mut asks: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
365 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
366 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
367
368 let bid_len = msg.bids.len().min(DEPTH10_LEN);
370 for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
371 let price = parse_price(&level.price, price_precision)?;
372 let size = parse_quantity(&level.size, size_precision)?;
373 let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
374
375 let bid_order = BookOrder::new(OrderSide::Buy, price, size, 0);
376 bids[i] = bid_order;
377 bid_counts[i] = orders_count;
378 }
379
380 for i in bid_len..DEPTH10_LEN {
382 bids[i] = BookOrder::new(
383 OrderSide::Buy,
384 Price::zero(price_precision),
385 Quantity::zero(size_precision),
386 0,
387 );
388 bid_counts[i] = 0;
389 }
390
391 let ask_len = msg.asks.len().min(DEPTH10_LEN);
393 for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
394 let price = parse_price(&level.price, price_precision)?;
395 let size = parse_quantity(&level.size, size_precision)?;
396 let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
397
398 let ask_order = BookOrder::new(OrderSide::Sell, price, size, 0);
399 asks[i] = ask_order;
400 ask_counts[i] = orders_count;
401 }
402
403 for i in ask_len..DEPTH10_LEN {
405 asks[i] = BookOrder::new(
406 OrderSide::Sell,
407 Price::zero(price_precision),
408 Quantity::zero(size_precision),
409 0,
410 );
411 ask_counts[i] = 0;
412 }
413
414 let ts_event = parse_millisecond_timestamp(msg.ts);
415
416 Ok(OrderBookDepth10::new(
417 instrument_id,
418 bids,
419 asks,
420 bid_counts,
421 ask_counts,
422 RecordFlag::F_SNAPSHOT as u8,
423 msg.seq_id, ts_event,
425 ts_init,
426 ))
427}
428
429pub fn parse_ticker_msg(
431 msg: &OKXTickerMsg,
432 instrument_id: InstrumentId,
433 price_precision: u8,
434 size_precision: u8,
435 ts_init: UnixNanos,
436) -> anyhow::Result<QuoteTick> {
437 let bid_price = parse_price(&msg.bid_px, price_precision)?;
438 let ask_price = parse_price(&msg.ask_px, price_precision)?;
439 let bid_size = parse_quantity(&msg.bid_sz, size_precision)?;
440 let ask_size = parse_quantity(&msg.ask_sz, size_precision)?;
441 let ts_event = parse_millisecond_timestamp(msg.ts);
442
443 QuoteTick::new_checked(
444 instrument_id,
445 bid_price,
446 ask_price,
447 bid_size,
448 ask_size,
449 ts_event,
450 ts_init,
451 )
452}
453
454pub fn parse_trade_msg(
456 msg: &OKXTradeMsg,
457 instrument_id: InstrumentId,
458 price_precision: u8,
459 size_precision: u8,
460 ts_init: UnixNanos,
461) -> anyhow::Result<TradeTick> {
462 let price = parse_price(&msg.px, price_precision)?;
463 let size = parse_quantity(&msg.sz, size_precision)?;
464 let aggressor_side: AggressorSide = msg.side.into();
465 let trade_id = TradeId::new(&msg.trade_id);
466 let ts_event = parse_millisecond_timestamp(msg.ts);
467
468 TradeTick::new_checked(
469 instrument_id,
470 price,
471 size,
472 aggressor_side,
473 trade_id,
474 ts_event,
475 ts_init,
476 )
477}
478
479pub fn parse_mark_price_msg(
481 msg: &OKXMarkPriceMsg,
482 instrument_id: InstrumentId,
483 price_precision: u8,
484 ts_init: UnixNanos,
485) -> anyhow::Result<MarkPriceUpdate> {
486 let price = parse_price(&msg.mark_px, price_precision)?;
487 let ts_event = parse_millisecond_timestamp(msg.ts);
488
489 Ok(MarkPriceUpdate::new(
490 instrument_id,
491 price,
492 ts_event,
493 ts_init,
494 ))
495}
496
497pub fn parse_index_price_msg(
499 msg: &OKXIndexPriceMsg,
500 instrument_id: InstrumentId,
501 price_precision: u8,
502 ts_init: UnixNanos,
503) -> anyhow::Result<IndexPriceUpdate> {
504 let price = parse_price(&msg.idx_px, price_precision)?;
505 let ts_event = parse_millisecond_timestamp(msg.ts);
506
507 Ok(IndexPriceUpdate::new(
508 instrument_id,
509 price,
510 ts_event,
511 ts_init,
512 ))
513}
514
515pub fn parse_candle_msg(
517 msg: &OKXCandleMsg,
518 bar_type: BarType,
519 price_precision: u8,
520 size_precision: u8,
521 ts_init: UnixNanos,
522) -> anyhow::Result<Bar> {
523 let open = parse_price(&msg.o, price_precision)?;
524 let high = parse_price(&msg.h, price_precision)?;
525 let low = parse_price(&msg.l, price_precision)?;
526 let close = parse_price(&msg.c, price_precision)?;
527 let volume = parse_quantity(&msg.vol, size_precision)?;
528 let ts_event = parse_millisecond_timestamp(msg.ts);
529
530 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
531}
532
533pub fn parse_order_msg_vec(
535 data: Vec<OKXOrderMsg>,
536 account_id: AccountId,
537 instruments: &AHashMap<Ustr, InstrumentAny>,
538 fee_cache: &AHashMap<Ustr, Money>,
539 ts_init: UnixNanos,
540) -> anyhow::Result<Vec<ExecutionReport>> {
541 let mut order_reports = Vec::with_capacity(data.len());
542
543 for msg in data {
544 let inst = instruments
545 .get(&msg.inst_id)
546 .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
547
548 let previous_fee = fee_cache.get(&msg.ord_id).copied();
549
550 let result = match &msg.state {
551 OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled => {
552 parse_fill_report(&msg, inst, account_id, previous_fee, ts_init)
553 .map(ExecutionReport::Fill)
554 }
555 _ => parse_order_status_report(&msg, inst, account_id, ts_init)
556 .map(ExecutionReport::Order),
557 };
558
559 match result {
560 Ok(report) => order_reports.push(report),
561 Err(e) => tracing::error!("Failed to parse execution report from message: {e}"),
562 }
563 }
564
565 Ok(order_reports)
566}
567
568pub fn parse_order_status_report(
570 msg: &OKXOrderMsg,
571 instrument: &InstrumentAny,
572 account_id: AccountId,
573 ts_init: UnixNanos,
574) -> anyhow::Result<OrderStatusReport> {
575 let client_order_id = parse_client_order_id(&msg.cl_ord_id);
576 let venue_order_id = VenueOrderId::new(msg.ord_id);
577 let order_side: OrderSide = msg.side.into();
578
579 let okx_order_type = msg.ord_type;
580 let order_type: OrderType = msg.ord_type.into();
581 let order_status: OrderStatus = msg.state.into();
582
583 let time_in_force = match okx_order_type {
584 OKXOrderType::Fok => TimeInForce::Fok,
585 OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
586 _ => TimeInForce::Gtc,
587 };
588
589 let size_precision = instrument.size_precision();
590 let quantity = parse_quantity(&msg.sz, size_precision)?;
591 let filled_qty = parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
592
593 let ts_accepted = parse_millisecond_timestamp(msg.c_time);
594 let ts_last = parse_millisecond_timestamp(msg.u_time);
595
596 let mut report = OrderStatusReport::new(
597 account_id,
598 instrument.id(),
599 client_order_id,
600 venue_order_id,
601 order_side,
602 order_type,
603 time_in_force,
604 order_status,
605 quantity,
606 filled_qty,
607 ts_accepted,
608 ts_init,
609 ts_last,
610 None, );
612
613 if !msg.px.is_empty() {
614 let price_precision = instrument.price_precision();
615 if let Ok(price) = parse_price(&msg.px, price_precision) {
616 report = report.with_price(price);
617 }
618 }
619
620 if !msg.avg_px.is_empty() {
621 report = report.with_avg_px(msg.avg_px.parse::<f64>()?);
622 }
623
624 if msg.ord_type == OKXOrderType::PostOnly {
625 report = report.with_post_only(true);
626 }
627
628 if msg.reduce_only == "true" {
629 report = report.with_reduce_only(true);
630 }
631
632 if let Some(reason) = &msg.cancel_source_reason
633 && !reason.is_empty()
634 {
635 report = report.with_cancel_reason(reason.clone());
636 }
637
638 Ok(report)
639}
640
641pub fn parse_fill_report(
643 msg: &OKXOrderMsg,
644 instrument: &InstrumentAny,
645 account_id: AccountId,
646 previous_fee: Option<Money>,
647 ts_init: UnixNanos,
648) -> anyhow::Result<FillReport> {
649 let client_order_id = parse_client_order_id(&msg.cl_ord_id);
650 let venue_order_id = VenueOrderId::new(msg.ord_id);
651 let trade_id = TradeId::from(msg.trade_id.as_str());
652 let order_side: OrderSide = msg.side.into();
653
654 let price_precision = instrument.price_precision();
655 let size_precision = instrument.size_precision();
656 let last_px = parse_price(&msg.fill_px, price_precision)?;
657 let last_qty = parse_quantity(&msg.fill_sz, size_precision)?;
658
659 let fee_currency = Currency::from(&msg.fee_ccy);
660 let total_fee = parse_fee(msg.fee.as_deref(), fee_currency)?;
661 let commission = if let Some(previous_fee) = previous_fee {
662 total_fee - previous_fee
663 } else {
664 total_fee
665 };
666
667 let liquidity_side: LiquiditySide = msg.exec_type.into();
668 let ts_event = parse_millisecond_timestamp(msg.fill_time);
669
670 let report = FillReport::new(
671 account_id,
672 instrument.id(),
673 venue_order_id,
674 trade_id,
675 order_side,
676 last_qty,
677 last_px,
678 commission,
679 liquidity_side,
680 client_order_id,
681 None,
682 ts_event,
683 ts_init,
684 None, );
686
687 Ok(report)
688}
689
690pub fn parse_ws_message_data(
698 channel: &OKXWsChannel,
699 data: serde_json::Value,
700 instrument_id: &InstrumentId,
701 price_precision: u8,
702 size_precision: u8,
703 ts_init: UnixNanos,
704 funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
705) -> anyhow::Result<Option<NautilusWsMessage>> {
706 match channel {
707 OKXWsChannel::Instruments => {
708 if let Ok(msg) = serde_json::from_value::<OKXInstrument>(data) {
709 match parse_instrument_any(&msg, ts_init)? {
710 Some(inst_any) => Ok(Some(NautilusWsMessage::Instrument(Box::new(inst_any)))),
711 None => {
712 tracing::warn!("Empty instrument payload: {:?}", msg);
713 Ok(None)
714 }
715 }
716 } else {
717 anyhow::bail!("Failed to deserialize instrument payload")
718 }
719 }
720 OKXWsChannel::BboTbt => {
721 let data_vec = parse_quote_msg_vec(
722 data,
723 instrument_id,
724 price_precision,
725 size_precision,
726 ts_init,
727 )?;
728 Ok(Some(NautilusWsMessage::Data(data_vec)))
729 }
730 OKXWsChannel::Tickers => {
731 let data_vec = parse_ticker_msg_vec(
732 data,
733 instrument_id,
734 price_precision,
735 size_precision,
736 ts_init,
737 )?;
738 Ok(Some(NautilusWsMessage::Data(data_vec)))
739 }
740 OKXWsChannel::Trades => {
741 let data_vec = parse_trade_msg_vec(
742 data,
743 instrument_id,
744 price_precision,
745 size_precision,
746 ts_init,
747 )?;
748 Ok(Some(NautilusWsMessage::Data(data_vec)))
749 }
750 OKXWsChannel::MarkPrice => {
751 let data_vec = parse_mark_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
752 Ok(Some(NautilusWsMessage::Data(data_vec)))
753 }
754 OKXWsChannel::IndexTickers => {
755 let data_vec =
756 parse_index_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
757 Ok(Some(NautilusWsMessage::Data(data_vec)))
758 }
759 OKXWsChannel::FundingRate => {
760 let data_vec = parse_funding_rate_msg_vec(data, instrument_id, ts_init, funding_cache)?;
761 Ok(Some(NautilusWsMessage::FundingRates(data_vec)))
762 }
763 channel if okx_channel_to_bar_spec(channel).is_some() => {
764 let bar_spec = okx_channel_to_bar_spec(channel).expect("bar_spec checked above");
765 let data_vec = parse_candle_msg_vec(
766 data,
767 instrument_id,
768 price_precision,
769 size_precision,
770 bar_spec,
771 ts_init,
772 )?;
773 Ok(Some(NautilusWsMessage::Data(data_vec)))
774 }
775 OKXWsChannel::Books
776 | OKXWsChannel::BooksTbt
777 | OKXWsChannel::Books5
778 | OKXWsChannel::Books50Tbt => {
779 if let Ok(book_msgs) = serde_json::from_value::<Vec<OKXBookMsg>>(data) {
780 let data_vec = parse_book10_msg_vec(
781 book_msgs,
782 instrument_id,
783 price_precision,
784 size_precision,
785 ts_init,
786 )?;
787 Ok(Some(NautilusWsMessage::Data(data_vec)))
788 } else {
789 anyhow::bail!("Failed to deserialize Books channel data as Vec<OKXBookMsg>")
790 }
791 }
792 _ => {
793 tracing::warn!("Unsupported channel for message parsing: {channel:?}");
794 Ok(None)
795 }
796 }
797}
798
799#[cfg(test)]
803mod tests {
804 use ahash::AHashMap;
805 use nautilus_core::nanos::UnixNanos;
806 use nautilus_model::{
807 data::bar::BAR_SPEC_1_DAY_LAST,
808 enums::AggressorSide,
809 identifiers::{ClientOrderId, InstrumentId, Symbol},
810 instruments::CryptoPerpetual,
811 types::{Currency, Price, Quantity},
812 };
813 use rstest::rstest;
814 use rust_decimal::Decimal;
815 use ustr::Ustr;
816
817 use super::*;
818 use crate::{
819 common::{enums::OKXTradeMode, parse::parse_account_state, testing::load_test_json},
820 http::models::OKXAccount,
821 websocket::messages::{OKXWebSocketArg, OKXWebSocketEvent},
822 };
823
824 #[rstest]
825 fn test_parse_books_snapshot() {
826 let json_data = load_test_json("ws_books_snapshot.json");
827 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
828 let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
829 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
830 _ => panic!("Expected a `BookData` variant"),
831 };
832
833 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
834 let deltas = parse_book_msg(
835 &okx_books[0],
836 instrument_id,
837 2,
838 1,
839 &action,
840 UnixNanos::default(),
841 )
842 .unwrap();
843
844 assert_eq!(deltas.instrument_id, instrument_id);
845 assert_eq!(deltas.deltas.len(), 16);
846 assert_eq!(deltas.flags, 32);
847 assert_eq!(deltas.sequence, 123456);
848 assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
849 assert_eq!(deltas.ts_init, UnixNanos::default());
850
851 assert!(!deltas.deltas.is_empty());
853 let bid_deltas: Vec<_> = deltas
855 .deltas
856 .iter()
857 .filter(|d| d.order.side == OrderSide::Buy)
858 .collect();
859 let ask_deltas: Vec<_> = deltas
860 .deltas
861 .iter()
862 .filter(|d| d.order.side == OrderSide::Sell)
863 .collect();
864 assert!(!bid_deltas.is_empty());
865 assert!(!ask_deltas.is_empty());
866 }
867
868 #[rstest]
869 fn test_parse_books_update() {
870 let json_data = load_test_json("ws_books_update.json");
871 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
872 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
873 let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
874 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
875 _ => panic!("Expected a `BookData` variant"),
876 };
877
878 let deltas = parse_book_msg(
879 &okx_books[0],
880 instrument_id,
881 2,
882 1,
883 &action,
884 UnixNanos::default(),
885 )
886 .unwrap();
887
888 assert_eq!(deltas.instrument_id, instrument_id);
889 assert_eq!(deltas.deltas.len(), 16);
890 assert_eq!(deltas.flags, 0);
891 assert_eq!(deltas.sequence, 123457);
892 assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
893 assert_eq!(deltas.ts_init, UnixNanos::default());
894
895 assert!(!deltas.deltas.is_empty());
897 let bid_deltas: Vec<_> = deltas
899 .deltas
900 .iter()
901 .filter(|d| d.order.side == OrderSide::Buy)
902 .collect();
903 let ask_deltas: Vec<_> = deltas
904 .deltas
905 .iter()
906 .filter(|d| d.order.side == OrderSide::Sell)
907 .collect();
908 assert!(!bid_deltas.is_empty());
909 assert!(!ask_deltas.is_empty());
910 }
911
912 #[rstest]
913 fn test_parse_tickers() {
914 let json_data = load_test_json("ws_tickers.json");
915 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
916 let okx_tickers: Vec<OKXTickerMsg> = match msg {
917 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
918 _ => panic!("Expected a `Data` variant"),
919 };
920
921 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
922 let trade =
923 parse_ticker_msg(&okx_tickers[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
924
925 assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
926 assert_eq!(trade.bid_price, Price::from("8888.88"));
927 assert_eq!(trade.ask_price, Price::from("9999.99"));
928 assert_eq!(trade.bid_size, Quantity::from(5));
929 assert_eq!(trade.ask_size, Quantity::from(11));
930 assert_eq!(trade.ts_event, UnixNanos::from(1597026383085000000));
931 assert_eq!(trade.ts_init, UnixNanos::default());
932 }
933
934 #[rstest]
935 fn test_parse_quotes() {
936 let json_data = load_test_json("ws_bbo_tbt.json");
937 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
938 let okx_quotes: Vec<OKXBookMsg> = match msg {
939 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
940 _ => panic!("Expected a `Data` variant"),
941 };
942 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
943
944 let quote =
945 parse_quote_msg(&okx_quotes[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
946
947 assert_eq!(quote.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
948 assert_eq!(quote.bid_price, Price::from("8476.97"));
949 assert_eq!(quote.ask_price, Price::from("8476.98"));
950 assert_eq!(quote.bid_size, Quantity::from(256));
951 assert_eq!(quote.ask_size, Quantity::from(415));
952 assert_eq!(quote.ts_event, UnixNanos::from(1597026383085000000));
953 assert_eq!(quote.ts_init, UnixNanos::default());
954 }
955
956 #[rstest]
957 fn test_parse_trades() {
958 let json_data = load_test_json("ws_trades.json");
959 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
960 let okx_trades: Vec<OKXTradeMsg> = match msg {
961 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
962 _ => panic!("Expected a `Data` variant"),
963 };
964
965 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
966 let trade =
967 parse_trade_msg(&okx_trades[0], instrument_id, 1, 8, UnixNanos::default()).unwrap();
968
969 assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
970 assert_eq!(trade.price, Price::from("42219.9"));
971 assert_eq!(trade.size, Quantity::from("0.12060306"));
972 assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
973 assert_eq!(trade.trade_id, TradeId::from("130639474"));
974 assert_eq!(trade.ts_event, UnixNanos::from(1630048897897000000));
975 assert_eq!(trade.ts_init, UnixNanos::default());
976 }
977
978 #[rstest]
979 fn test_parse_candle() {
980 let json_data = load_test_json("ws_candle.json");
981 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
982 let okx_candles: Vec<OKXCandleMsg> = match msg {
983 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
984 _ => panic!("Expected a `Data` variant"),
985 };
986
987 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
988 let bar_type = BarType::new(
989 instrument_id,
990 BAR_SPEC_1_DAY_LAST,
991 AggregationSource::External,
992 );
993 let bar = parse_candle_msg(&okx_candles[0], bar_type, 2, 0, UnixNanos::default()).unwrap();
994
995 assert_eq!(bar.bar_type, bar_type);
996 assert_eq!(bar.open, Price::from("8533.02"));
997 assert_eq!(bar.high, Price::from("8553.74"));
998 assert_eq!(bar.low, Price::from("8527.17"));
999 assert_eq!(bar.close, Price::from("8548.26"));
1000 assert_eq!(bar.volume, Quantity::from(45247));
1001 assert_eq!(bar.ts_event, UnixNanos::from(1597026383085000000));
1002 assert_eq!(bar.ts_init, UnixNanos::default());
1003 }
1004
1005 #[rstest]
1006 fn test_parse_funding_rate() {
1007 let json_data = load_test_json("ws_funding_rate.json");
1008 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1009
1010 let okx_funding_rates: Vec<crate::websocket::messages::OKXFundingRateMsg> = match msg {
1011 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1012 _ => panic!("Expected a `Data` variant"),
1013 };
1014
1015 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1016 let funding_rate =
1017 parse_funding_rate_msg(&okx_funding_rates[0], instrument_id, UnixNanos::default())
1018 .unwrap();
1019
1020 assert_eq!(funding_rate.instrument_id, instrument_id);
1021 assert_eq!(funding_rate.rate, Decimal::new(1, 4));
1022 assert_eq!(
1023 funding_rate.next_funding_ns,
1024 Some(UnixNanos::from(1744590349506000000))
1025 );
1026 assert_eq!(funding_rate.ts_event, UnixNanos::from(1744590349506000000));
1027 assert_eq!(funding_rate.ts_init, UnixNanos::default());
1028 }
1029
1030 #[rstest]
1031 fn test_parse_book_vec() {
1032 let json_data = load_test_json("ws_books_snapshot.json");
1033 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1034 let (msgs, action): (Vec<OKXBookMsg>, OKXBookAction) = match event {
1035 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1036 _ => panic!("Expected BookData"),
1037 };
1038
1039 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1040 let deltas_vec =
1041 parse_book_msg_vec(msgs, &instrument_id, 8, 1, action, UnixNanos::default()).unwrap();
1042
1043 assert_eq!(deltas_vec.len(), 1);
1044
1045 if let Data::Deltas(d) = &deltas_vec[0] {
1046 assert_eq!(d.sequence, 123456);
1047 } else {
1048 panic!("Expected Deltas");
1049 }
1050 }
1051
1052 #[rstest]
1053 fn test_parse_ticker_vec() {
1054 let json_data = load_test_json("ws_tickers.json");
1055 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1056 let data_val: serde_json::Value = match event {
1057 OKXWebSocketEvent::Data { data, .. } => data,
1058 _ => panic!("Expected Data"),
1059 };
1060
1061 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1062 let quotes_vec =
1063 parse_ticker_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1064
1065 assert_eq!(quotes_vec.len(), 1);
1066
1067 if let Data::Quote(q) = "es_vec[0] {
1068 assert_eq!(q.bid_price, Price::from("8888.88000000"));
1069 assert_eq!(q.ask_price, Price::from("9999.99"));
1070 } else {
1071 panic!("Expected Quote");
1072 }
1073 }
1074
1075 #[rstest]
1076 fn test_parse_trade_vec() {
1077 let json_data = load_test_json("ws_trades.json");
1078 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1079 let data_val: serde_json::Value = match event {
1080 OKXWebSocketEvent::Data { data, .. } => data,
1081 _ => panic!("Expected Data"),
1082 };
1083
1084 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1085 let trades_vec =
1086 parse_trade_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1087
1088 assert_eq!(trades_vec.len(), 1);
1089
1090 if let Data::Trade(t) = &trades_vec[0] {
1091 assert_eq!(t.trade_id, TradeId::new("130639474"));
1092 } else {
1093 panic!("Expected Trade");
1094 }
1095 }
1096
1097 #[rstest]
1098 fn test_parse_candle_vec() {
1099 let json_data = load_test_json("ws_candle.json");
1100 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1101 let data_val: serde_json::Value = match event {
1102 OKXWebSocketEvent::Data { data, .. } => data,
1103 _ => panic!("Expected Data"),
1104 };
1105
1106 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1107 let bars_vec = parse_candle_msg_vec(
1108 data_val,
1109 &instrument_id,
1110 2,
1111 1,
1112 BAR_SPEC_1_DAY_LAST,
1113 UnixNanos::default(),
1114 )
1115 .unwrap();
1116
1117 assert_eq!(bars_vec.len(), 1);
1118
1119 if let Data::Bar(b) = &bars_vec[0] {
1120 assert_eq!(b.open, Price::from("8533.02"));
1121 } else {
1122 panic!("Expected Bar");
1123 }
1124 }
1125
1126 #[rstest]
1127 fn test_parse_book_message() {
1128 let json_data = load_test_json("ws_bbo_tbt.json");
1129 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1130 let (okx_books, arg): (Vec<OKXBookMsg>, OKXWebSocketArg) = match msg {
1131 OKXWebSocketEvent::Data { data, arg, .. } => {
1132 (serde_json::from_value(data).unwrap(), arg)
1133 }
1134 _ => panic!("Expected a `Data` variant"),
1135 };
1136
1137 assert_eq!(arg.channel, OKXWsChannel::BboTbt);
1138 assert_eq!(arg.inst_id.as_ref().unwrap(), &Ustr::from("BTC-USDT"));
1139 assert_eq!(arg.inst_type, None);
1140 assert_eq!(okx_books.len(), 1);
1141
1142 let book_msg = &okx_books[0];
1143
1144 assert_eq!(book_msg.asks.len(), 1);
1146 let ask = &book_msg.asks[0];
1147 assert_eq!(ask.price, "8476.98");
1148 assert_eq!(ask.size, "415");
1149 assert_eq!(ask.liquidated_orders_count, "0");
1150 assert_eq!(ask.orders_count, "13");
1151
1152 assert_eq!(book_msg.bids.len(), 1);
1154 let bid = &book_msg.bids[0];
1155 assert_eq!(bid.price, "8476.97");
1156 assert_eq!(bid.size, "256");
1157 assert_eq!(bid.liquidated_orders_count, "0");
1158 assert_eq!(bid.orders_count, "12");
1159 assert_eq!(book_msg.ts, 1597026383085);
1160 assert_eq!(book_msg.seq_id, 123456);
1161 assert_eq!(book_msg.checksum, None);
1162 assert_eq!(book_msg.prev_seq_id, None);
1163 }
1164
1165 #[rstest]
1166 fn test_parse_ws_account_message() {
1167 let json_data = load_test_json("ws_account.json");
1168 let accounts: Vec<OKXAccount> = serde_json::from_str(&json_data).unwrap();
1169
1170 assert_eq!(accounts.len(), 1);
1171 let account = &accounts[0];
1172
1173 assert_eq!(account.total_eq, "100.56089404807182");
1174 assert_eq!(account.details.len(), 3);
1175
1176 let usdt_detail = &account.details[0];
1177 assert_eq!(usdt_detail.ccy, "USDT");
1178 assert_eq!(usdt_detail.avail_bal, "100.52768569797846");
1179 assert_eq!(usdt_detail.cash_bal, "100.52768569797846");
1180
1181 let btc_detail = &account.details[1];
1182 assert_eq!(btc_detail.ccy, "BTC");
1183 assert_eq!(btc_detail.avail_bal, "0.0000000051");
1184
1185 let eth_detail = &account.details[2];
1186 assert_eq!(eth_detail.ccy, "ETH");
1187 assert_eq!(eth_detail.avail_bal, "0.000000185");
1188
1189 let account_id = AccountId::new("OKX-001");
1190 let ts_init = nautilus_core::nanos::UnixNanos::default();
1191 let account_state = parse_account_state(account, account_id, ts_init);
1192
1193 assert!(account_state.is_ok());
1194 let state = account_state.unwrap();
1195 assert_eq!(state.account_id, account_id);
1196 assert_eq!(state.balances.len(), 3);
1197 }
1198
1199 #[rstest]
1200 fn test_parse_order_msg() {
1201 let json_data = load_test_json("ws_orders.json");
1202 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1203
1204 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1205
1206 let account_id = AccountId::new("OKX-001");
1207 let mut instruments = AHashMap::new();
1208
1209 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1211 let instrument = CryptoPerpetual::new(
1212 instrument_id,
1213 Symbol::from("BTC-USDT-SWAP"),
1214 Currency::BTC(),
1215 Currency::USDT(),
1216 Currency::USDT(),
1217 false, 2, 8, Price::from("0.01"),
1221 Quantity::from("0.00000001"),
1222 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1235 UnixNanos::default(),
1236 );
1237
1238 instruments.insert(
1239 Ustr::from("BTC-USDT-SWAP"),
1240 InstrumentAny::CryptoPerpetual(instrument),
1241 );
1242
1243 let ts_init = UnixNanos::default();
1244 let fee_cache = AHashMap::new();
1245
1246 let result = parse_order_msg_vec(data, account_id, &instruments, &fee_cache, ts_init);
1247
1248 assert!(result.is_ok());
1249 let order_reports = result.unwrap();
1250 assert_eq!(order_reports.len(), 1);
1251
1252 let report = &order_reports[0];
1254
1255 if let ExecutionReport::Fill(fill_report) = report {
1256 assert_eq!(fill_report.account_id, account_id);
1257 assert_eq!(fill_report.instrument_id, instrument_id);
1258 assert_eq!(
1259 fill_report.client_order_id,
1260 Some(ClientOrderId::new("001BTCUSDT20250106001"))
1261 );
1262 assert_eq!(
1263 fill_report.venue_order_id,
1264 VenueOrderId::new("2497956918703120384")
1265 );
1266 assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1267 assert_eq!(fill_report.order_side, OrderSide::Buy);
1268 assert_eq!(fill_report.last_px, Price::from("103698.90"));
1269 assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1270 assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1271 } else {
1272 panic!("Expected Fill report for filled order");
1273 }
1274 }
1275
1276 #[rstest]
1277 fn test_parse_order_status_report() {
1278 let json_data = load_test_json("ws_orders.json");
1279 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1280 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1281 let order_msg = &data[0];
1282
1283 let account_id = AccountId::new("OKX-001");
1284 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1285 let instrument = CryptoPerpetual::new(
1286 instrument_id,
1287 Symbol::from("BTC-USDT-SWAP"),
1288 Currency::BTC(),
1289 Currency::USDT(),
1290 Currency::USDT(),
1291 false, 2, 8, Price::from("0.01"),
1295 Quantity::from("0.00000001"),
1296 None,
1297 None,
1298 None,
1299 None,
1300 None,
1301 None,
1302 None,
1303 None,
1304 None,
1305 None,
1306 None,
1307 None,
1308 UnixNanos::default(),
1309 UnixNanos::default(),
1310 );
1311
1312 let ts_init = UnixNanos::default();
1313
1314 let result = parse_order_status_report(
1315 order_msg,
1316 &InstrumentAny::CryptoPerpetual(instrument),
1317 account_id,
1318 ts_init,
1319 );
1320
1321 assert!(result.is_ok());
1322 let order_status_report = result.unwrap();
1323
1324 assert_eq!(order_status_report.account_id, account_id);
1325 assert_eq!(order_status_report.instrument_id, instrument_id);
1326 assert_eq!(
1327 order_status_report.client_order_id,
1328 Some(ClientOrderId::new("001BTCUSDT20250106001"))
1329 );
1330 assert_eq!(
1331 order_status_report.venue_order_id,
1332 VenueOrderId::new("2497956918703120384")
1333 );
1334 assert_eq!(order_status_report.order_side, OrderSide::Buy);
1335 assert_eq!(order_status_report.order_status, OrderStatus::Filled);
1336 assert_eq!(order_status_report.quantity, Quantity::from("0.03000000"));
1337 assert_eq!(order_status_report.filled_qty, Quantity::from("0.03000000"));
1338 }
1339
1340 #[rstest]
1341 fn test_parse_fill_report() {
1342 let json_data = load_test_json("ws_orders.json");
1343 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1344 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1345 let order_msg = &data[0];
1346
1347 let account_id = AccountId::new("OKX-001");
1348 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1349 let instrument = CryptoPerpetual::new(
1350 instrument_id,
1351 Symbol::from("BTC-USDT-SWAP"),
1352 Currency::BTC(),
1353 Currency::USDT(),
1354 Currency::USDT(),
1355 false, 2, 8, Price::from("0.01"),
1359 Quantity::from("0.00000001"),
1360 None,
1361 None,
1362 None,
1363 None,
1364 None,
1365 None,
1366 None,
1367 None,
1368 None,
1369 None,
1370 None,
1371 None,
1372 UnixNanos::default(),
1373 UnixNanos::default(),
1374 );
1375
1376 let ts_init = UnixNanos::default();
1377
1378 let result = parse_fill_report(
1379 order_msg,
1380 &InstrumentAny::CryptoPerpetual(instrument),
1381 account_id,
1382 None,
1383 ts_init,
1384 );
1385
1386 assert!(result.is_ok());
1387 let fill_report = result.unwrap();
1388
1389 assert_eq!(fill_report.account_id, account_id);
1390 assert_eq!(fill_report.instrument_id, instrument_id);
1391 assert_eq!(
1392 fill_report.client_order_id,
1393 Some(ClientOrderId::new("001BTCUSDT20250106001"))
1394 );
1395 assert_eq!(
1396 fill_report.venue_order_id,
1397 VenueOrderId::new("2497956918703120384")
1398 );
1399 assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1400 assert_eq!(fill_report.order_side, OrderSide::Buy);
1401 assert_eq!(fill_report.last_px, Price::from("103698.90"));
1402 assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1403 assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1404 }
1405
1406 #[rstest]
1407 fn test_parse_book10_msg() {
1408 let json_data = load_test_json("ws_books_snapshot.json");
1409 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1410 let msgs: Vec<OKXBookMsg> = match event {
1411 OKXWebSocketEvent::BookData { data, .. } => data,
1412 _ => panic!("Expected BookData"),
1413 };
1414
1415 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1416 let depth10 =
1417 parse_book10_msg(&msgs[0], instrument_id, 2, 0, UnixNanos::default()).unwrap();
1418
1419 assert_eq!(depth10.instrument_id, instrument_id);
1420 assert_eq!(depth10.sequence, 123456);
1421 assert_eq!(depth10.ts_event, UnixNanos::from(1597026383085000000));
1422 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1423
1424 assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
1426 assert_eq!(depth10.bids[0].size, Quantity::from("256"));
1427 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1428 assert_eq!(depth10.bid_counts[0], 12);
1429
1430 assert_eq!(depth10.bids[1].price, Price::from("8475.55"));
1431 assert_eq!(depth10.bids[1].size, Quantity::from("101"));
1432 assert_eq!(depth10.bid_counts[1], 1);
1433
1434 assert_eq!(depth10.bids[8].price, Price::from("0"));
1436 assert_eq!(depth10.bids[8].size, Quantity::from("0"));
1437 assert_eq!(depth10.bid_counts[8], 0);
1438
1439 assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
1441 assert_eq!(depth10.asks[0].size, Quantity::from("415"));
1442 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1443 assert_eq!(depth10.ask_counts[0], 13);
1444
1445 assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
1446 assert_eq!(depth10.asks[1].size, Quantity::from("7"));
1447 assert_eq!(depth10.ask_counts[1], 2);
1448
1449 assert_eq!(depth10.asks[8].price, Price::from("0"));
1451 assert_eq!(depth10.asks[8].size, Quantity::from("0"));
1452 assert_eq!(depth10.ask_counts[8], 0);
1453 }
1454
1455 #[rstest]
1456 fn test_parse_book10_msg_vec() {
1457 let json_data = load_test_json("ws_books_snapshot.json");
1458 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1459 let msgs: Vec<OKXBookMsg> = match event {
1460 OKXWebSocketEvent::BookData { data, .. } => data,
1461 _ => panic!("Expected BookData"),
1462 };
1463
1464 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1465 let depth10_vec =
1466 parse_book10_msg_vec(msgs, &instrument_id, 2, 0, UnixNanos::default()).unwrap();
1467
1468 assert_eq!(depth10_vec.len(), 1);
1469
1470 if let Data::Depth10(d) = &depth10_vec[0] {
1471 assert_eq!(d.instrument_id, instrument_id);
1472 assert_eq!(d.sequence, 123456);
1473 assert_eq!(d.bids[0].price, Price::from("8476.97"));
1474 assert_eq!(d.asks[0].price, Price::from("8476.98"));
1475 } else {
1476 panic!("Expected Depth10");
1477 }
1478 }
1479
1480 #[rstest]
1481 fn test_parse_fill_report_with_fee_cache() {
1482 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1483 let instrument = CryptoPerpetual::new(
1484 instrument_id,
1485 Symbol::from("BTC-USDT-SWAP"),
1486 Currency::BTC(),
1487 Currency::USDT(),
1488 Currency::USDT(),
1489 false, 2, 8, Price::from("0.01"),
1493 Quantity::from("0.00000001"),
1494 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1507 UnixNanos::default(),
1508 );
1509
1510 let account_id = AccountId::new("OKX-001");
1511 let ts_init = UnixNanos::default();
1512
1513 let order_msg_1 = OKXOrderMsg {
1515 acc_fill_sz: Some("0.01".to_string()),
1516 algo_cl_ord_id: None,
1517 algo_id: None,
1518 avg_px: "50000.0".to_string(),
1519 c_time: 1746947317401,
1520 cancel_source: None,
1521 cancel_source_reason: None,
1522 category: Ustr::from("normal"),
1523 ccy: Ustr::from("USDT"),
1524 cl_ord_id: "test_order_1".to_string(),
1525 fee: Some("-1.0".to_string()), fee_ccy: Ustr::from("USDT"),
1527 fill_px: "50000.0".to_string(),
1528 fill_sz: "0.01".to_string(),
1529 fill_time: 1746947317402,
1530 inst_id: Ustr::from("BTC-USDT-SWAP"),
1531 inst_type: crate::common::enums::OKXInstrumentType::Swap,
1532 lever: "2.0".to_string(),
1533 ord_id: Ustr::from("1234567890"),
1534 ord_type: OKXOrderType::Market,
1535 pnl: "0".to_string(),
1536 pos_side: Ustr::from("long"),
1537 px: "".to_string(),
1538 reduce_only: "false".to_string(),
1539 side: crate::common::enums::OKXSide::Buy,
1540 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
1541 exec_type: crate::common::enums::OKXExecType::Maker,
1542 sz: "0.03".to_string(), td_mode: OKXTradeMode::Isolated,
1544 trade_id: "trade_1".to_string(),
1545 u_time: 1746947317402,
1546 };
1547
1548 let fill_report_1 = parse_fill_report(
1549 &order_msg_1,
1550 &InstrumentAny::CryptoPerpetual(instrument),
1551 account_id,
1552 None,
1553 ts_init,
1554 )
1555 .unwrap();
1556
1557 assert_eq!(fill_report_1.commission, Money::new(1.0, Currency::USDT()));
1559
1560 let order_msg_2 = OKXOrderMsg {
1562 acc_fill_sz: Some("0.03".to_string()),
1563 algo_cl_ord_id: None,
1564 algo_id: None,
1565 avg_px: "50000.0".to_string(),
1566 c_time: 1746947317401,
1567 cancel_source: None,
1568 cancel_source_reason: None,
1569 category: Ustr::from("normal"),
1570 ccy: Ustr::from("USDT"),
1571 cl_ord_id: "test_order_1".to_string(),
1572 fee: Some("-3.0".to_string()), fee_ccy: Ustr::from("USDT"),
1574 fill_px: "50000.0".to_string(),
1575 fill_sz: "0.02".to_string(),
1576 fill_time: 1746947317403,
1577 inst_id: Ustr::from("BTC-USDT-SWAP"),
1578 inst_type: crate::common::enums::OKXInstrumentType::Swap,
1579 lever: "2.0".to_string(),
1580 ord_id: Ustr::from("1234567890"),
1581 ord_type: OKXOrderType::Market,
1582 pnl: "0".to_string(),
1583 pos_side: Ustr::from("long"),
1584 px: "".to_string(),
1585 reduce_only: "false".to_string(),
1586 side: crate::common::enums::OKXSide::Buy,
1587 state: crate::common::enums::OKXOrderStatus::Filled,
1588 exec_type: crate::common::enums::OKXExecType::Maker,
1589 sz: "0.03".to_string(), td_mode: OKXTradeMode::Isolated,
1591 trade_id: "trade_2".to_string(),
1592 u_time: 1746947317403,
1593 };
1594
1595 let fill_report_2 = parse_fill_report(
1596 &order_msg_2,
1597 &InstrumentAny::CryptoPerpetual(instrument),
1598 account_id,
1599 Some(fill_report_1.commission),
1600 ts_init,
1601 )
1602 .unwrap();
1603
1604 assert_eq!(fill_report_2.commission, Money::new(2.0, Currency::USDT()));
1606
1607 }
1609
1610 #[rstest]
1611 fn test_parse_book10_msg_partial_levels() {
1612 let book_msg = OKXBookMsg {
1614 asks: vec![
1615 OrderBookEntry {
1616 price: "8476.98".to_string(),
1617 size: "415".to_string(),
1618 liquidated_orders_count: "0".to_string(),
1619 orders_count: "13".to_string(),
1620 },
1621 OrderBookEntry {
1622 price: "8477.00".to_string(),
1623 size: "7".to_string(),
1624 liquidated_orders_count: "0".to_string(),
1625 orders_count: "2".to_string(),
1626 },
1627 ],
1628 bids: vec![OrderBookEntry {
1629 price: "8476.97".to_string(),
1630 size: "256".to_string(),
1631 liquidated_orders_count: "0".to_string(),
1632 orders_count: "12".to_string(),
1633 }],
1634 ts: 1597026383085,
1635 checksum: None,
1636 prev_seq_id: None,
1637 seq_id: 123456,
1638 };
1639
1640 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1641 let depth10 =
1642 parse_book10_msg(&book_msg, instrument_id, 2, 0, UnixNanos::default()).unwrap();
1643
1644 assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
1646 assert_eq!(depth10.bids[0].size, Quantity::from("256"));
1647 assert_eq!(depth10.bid_counts[0], 12);
1648
1649 assert_eq!(depth10.bids[1].price, Price::from("0"));
1651 assert_eq!(depth10.bids[1].size, Quantity::from("0"));
1652 assert_eq!(depth10.bid_counts[1], 0);
1653
1654 assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
1656 assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
1657 assert_eq!(depth10.asks[2].price, Price::from("0")); }
1659}