1use ahash::AHashMap;
19use nautilus_core::{UUID4, nanos::UnixNanos};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
23 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10,
24 QuoteTick, TradeTick, depth::DEPTH10_LEN,
25 },
26 enums::{
27 AggregationSource, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus,
28 OrderType, RecordFlag, TimeInForce, TriggerType,
29 },
30 identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
31 instruments::{Instrument, InstrumentAny},
32 reports::{FillReport, OrderStatusReport},
33 types::{Money, Price, Quantity},
34};
35use rust_decimal::Decimal;
36use ustr::Ustr;
37
38use super::{
39 enums::OKXWsChannel,
40 messages::{
41 OKXAlgoOrderMsg, OKXBookMsg, OKXCandleMsg, OKXIndexPriceMsg, OKXMarkPriceMsg, OKXOrderMsg,
42 OKXTickerMsg, OKXTradeMsg, OrderBookEntry,
43 },
44};
45use crate::{
46 common::{
47 consts::{OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE},
48 enums::{
49 OKXBookAction, OKXCandleConfirm, OKXInstrumentType, OKXOrderCategory, OKXOrderStatus,
50 OKXOrderType, OKXSide, OKXTargetCurrency, OKXTriggerType,
51 },
52 models::OKXInstrument,
53 parse::{
54 okx_channel_to_bar_spec, parse_client_order_id, parse_fee, parse_fee_currency,
55 parse_funding_rate_msg, parse_instrument_any, parse_message_vec,
56 parse_millisecond_timestamp, parse_price, parse_quantity,
57 },
58 },
59 websocket::messages::{ExecutionReport, NautilusWsMessage, OKXFundingRateMsg},
60};
61
62fn is_market_price(px: &str) -> bool {
70 px.is_empty() || px == "0" || px == "-1" || px == "-2"
71}
72
73fn extract_fees_from_cached_instrument(
78 instrument: &InstrumentAny,
79) -> (
80 Option<Decimal>,
81 Option<Decimal>,
82 Option<Decimal>,
83 Option<Decimal>,
84) {
85 match instrument {
86 InstrumentAny::CurrencyPair(pair) => (
87 Some(pair.margin_init),
88 Some(pair.margin_maint),
89 Some(pair.maker_fee),
90 Some(pair.taker_fee),
91 ),
92 InstrumentAny::CryptoPerpetual(perp) => (
93 Some(perp.margin_init),
94 Some(perp.margin_maint),
95 Some(perp.maker_fee),
96 Some(perp.taker_fee),
97 ),
98 InstrumentAny::CryptoFuture(future) => (
99 Some(future.margin_init),
100 Some(future.margin_maint),
101 Some(future.maker_fee),
102 Some(future.taker_fee),
103 ),
104 InstrumentAny::CryptoOption(option) => (
105 Some(option.margin_init),
106 Some(option.margin_maint),
107 Some(option.maker_fee),
108 Some(option.taker_fee),
109 ),
110 _ => (None, None, None, None),
111 }
112}
113
114pub fn parse_book_msg_vec(
120 data: Vec<OKXBookMsg>,
121 instrument_id: &InstrumentId,
122 price_precision: u8,
123 size_precision: u8,
124 action: OKXBookAction,
125 ts_init: UnixNanos,
126) -> anyhow::Result<Vec<Data>> {
127 let mut deltas = Vec::with_capacity(data.len());
128
129 for msg in data {
130 let deltas_api = OrderBookDeltas_API::new(parse_book_msg(
131 &msg,
132 *instrument_id,
133 price_precision,
134 size_precision,
135 &action,
136 ts_init,
137 )?);
138 deltas.push(Data::Deltas(deltas_api));
139 }
140
141 Ok(deltas)
142}
143
144pub fn parse_ticker_msg_vec(
150 data: serde_json::Value,
151 instrument_id: &InstrumentId,
152 price_precision: u8,
153 size_precision: u8,
154 ts_init: UnixNanos,
155) -> anyhow::Result<Vec<Data>> {
156 parse_message_vec(
157 data,
158 |msg| {
159 parse_ticker_msg(
160 msg,
161 *instrument_id,
162 price_precision,
163 size_precision,
164 ts_init,
165 )
166 },
167 Data::Quote,
168 )
169}
170
171pub fn parse_quote_msg_vec(
177 data: serde_json::Value,
178 instrument_id: &InstrumentId,
179 price_precision: u8,
180 size_precision: u8,
181 ts_init: UnixNanos,
182) -> anyhow::Result<Vec<Data>> {
183 parse_message_vec(
184 data,
185 |msg| {
186 parse_quote_msg(
187 msg,
188 *instrument_id,
189 price_precision,
190 size_precision,
191 ts_init,
192 )
193 },
194 Data::Quote,
195 )
196}
197
198pub fn parse_trade_msg_vec(
204 data: serde_json::Value,
205 instrument_id: &InstrumentId,
206 price_precision: u8,
207 size_precision: u8,
208 ts_init: UnixNanos,
209) -> anyhow::Result<Vec<Data>> {
210 parse_message_vec(
211 data,
212 |msg| {
213 parse_trade_msg(
214 msg,
215 *instrument_id,
216 price_precision,
217 size_precision,
218 ts_init,
219 )
220 },
221 Data::Trade,
222 )
223}
224
225pub fn parse_mark_price_msg_vec(
231 data: serde_json::Value,
232 instrument_id: &InstrumentId,
233 price_precision: u8,
234 ts_init: UnixNanos,
235) -> anyhow::Result<Vec<Data>> {
236 parse_message_vec(
237 data,
238 |msg| parse_mark_price_msg(msg, *instrument_id, price_precision, ts_init),
239 Data::MarkPriceUpdate,
240 )
241}
242
243pub fn parse_index_price_msg_vec(
249 data: serde_json::Value,
250 instrument_id: &InstrumentId,
251 price_precision: u8,
252 ts_init: UnixNanos,
253) -> anyhow::Result<Vec<Data>> {
254 parse_message_vec(
255 data,
256 |msg| parse_index_price_msg(msg, *instrument_id, price_precision, ts_init),
257 Data::IndexPriceUpdate,
258 )
259}
260
261pub fn parse_funding_rate_msg_vec(
268 data: serde_json::Value,
269 instrument_id: &InstrumentId,
270 ts_init: UnixNanos,
271 funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
272) -> anyhow::Result<Vec<FundingRateUpdate>> {
273 let msgs: Vec<OKXFundingRateMsg> = serde_json::from_value(data)?;
274
275 let mut result = Vec::with_capacity(msgs.len());
276 for msg in &msgs {
277 let cache_key = (msg.funding_rate, msg.funding_time);
278
279 if let Some(cached) = funding_cache.get(&msg.inst_id)
280 && *cached == cache_key
281 {
282 continue; }
284
285 funding_cache.insert(msg.inst_id, cache_key);
287 let funding_rate = parse_funding_rate_msg(msg, *instrument_id, ts_init)?;
288 result.push(funding_rate);
289 }
290
291 Ok(result)
292}
293
294pub fn parse_candle_msg_vec(
300 data: serde_json::Value,
301 instrument_id: &InstrumentId,
302 price_precision: u8,
303 size_precision: u8,
304 spec: BarSpecification,
305 ts_init: UnixNanos,
306) -> anyhow::Result<Vec<Data>> {
307 let msgs: Vec<OKXCandleMsg> = serde_json::from_value(data)?;
308 let bar_type = BarType::new(*instrument_id, spec, AggregationSource::External);
309 let mut bars = Vec::with_capacity(msgs.len());
310
311 for msg in msgs {
312 if msg.confirm == OKXCandleConfirm::Closed {
314 let bar = parse_candle_msg(&msg, bar_type, price_precision, size_precision, ts_init)?;
315 bars.push(Data::Bar(bar));
316 }
317 }
318
319 Ok(bars)
320}
321
322pub fn parse_book10_msg_vec(
328 data: Vec<OKXBookMsg>,
329 instrument_id: &InstrumentId,
330 price_precision: u8,
331 size_precision: u8,
332 ts_init: UnixNanos,
333) -> anyhow::Result<Vec<Data>> {
334 let mut depth10_updates = Vec::with_capacity(data.len());
335
336 for msg in data {
337 let depth10 = parse_book10_msg(
338 &msg,
339 *instrument_id,
340 price_precision,
341 size_precision,
342 ts_init,
343 )?;
344 depth10_updates.push(Data::Depth10(Box::new(depth10)));
345 }
346
347 Ok(depth10_updates)
348}
349
350pub fn parse_book_msg(
356 msg: &OKXBookMsg,
357 instrument_id: InstrumentId,
358 price_precision: u8,
359 size_precision: u8,
360 action: &OKXBookAction,
361 ts_init: UnixNanos,
362) -> anyhow::Result<OrderBookDeltas> {
363 let flags = if action == &OKXBookAction::Snapshot {
364 RecordFlag::F_SNAPSHOT as u8
365 } else {
366 0
367 };
368 let ts_event = parse_millisecond_timestamp(msg.ts);
369
370 let mut deltas = Vec::with_capacity(msg.asks.len() + msg.bids.len());
371
372 for bid in &msg.bids {
373 let book_action = match action {
374 OKXBookAction::Snapshot => BookAction::Add,
375 _ => match bid.size.as_str() {
376 "0" => BookAction::Delete,
377 _ => BookAction::Update,
378 },
379 };
380 let price = parse_price(&bid.price, price_precision)?;
381 let size = parse_quantity(&bid.size, size_precision)?;
382 let order_id = 0; let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
384 let delta = OrderBookDelta::new(
385 instrument_id,
386 book_action,
387 order,
388 flags,
389 msg.seq_id,
390 ts_event,
391 ts_init,
392 );
393 deltas.push(delta);
394 }
395
396 for ask in &msg.asks {
397 let book_action = match action {
398 OKXBookAction::Snapshot => BookAction::Add,
399 _ => match ask.size.as_str() {
400 "0" => BookAction::Delete,
401 _ => BookAction::Update,
402 },
403 };
404 let price = parse_price(&ask.price, price_precision)?;
405 let size = parse_quantity(&ask.size, size_precision)?;
406 let order_id = 0; let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
408 let delta = OrderBookDelta::new(
409 instrument_id,
410 book_action,
411 order,
412 flags,
413 msg.seq_id,
414 ts_event,
415 ts_init,
416 );
417 deltas.push(delta);
418 }
419
420 OrderBookDeltas::new_checked(instrument_id, deltas)
421}
422
423pub fn parse_quote_msg(
429 msg: &OKXBookMsg,
430 instrument_id: InstrumentId,
431 price_precision: u8,
432 size_precision: u8,
433 ts_init: UnixNanos,
434) -> anyhow::Result<QuoteTick> {
435 let best_bid: &OrderBookEntry = &msg.bids[0];
436 let best_ask: &OrderBookEntry = &msg.asks[0];
437
438 let bid_price = parse_price(&best_bid.price, price_precision)?;
439 let ask_price = parse_price(&best_ask.price, price_precision)?;
440 let bid_size = parse_quantity(&best_bid.size, size_precision)?;
441 let ask_size = parse_quantity(&best_ask.size, size_precision)?;
442 let ts_event = parse_millisecond_timestamp(msg.ts);
443
444 QuoteTick::new_checked(
445 instrument_id,
446 bid_price,
447 ask_price,
448 bid_size,
449 ask_size,
450 ts_event,
451 ts_init,
452 )
453}
454
455pub fn parse_book10_msg(
463 msg: &OKXBookMsg,
464 instrument_id: InstrumentId,
465 price_precision: u8,
466 size_precision: u8,
467 ts_init: UnixNanos,
468) -> anyhow::Result<OrderBookDepth10> {
469 let mut bids: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
471 let mut asks: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
472 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
473 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
474
475 let bid_len = msg.bids.len().min(DEPTH10_LEN);
477 for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
478 let price = parse_price(&level.price, price_precision)?;
479 let size = parse_quantity(&level.size, size_precision)?;
480 let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
481
482 let bid_order = BookOrder::new(OrderSide::Buy, price, size, 0);
483 bids[i] = bid_order;
484 bid_counts[i] = orders_count;
485 }
486
487 for i in bid_len..DEPTH10_LEN {
489 bids[i] = BookOrder::new(
490 OrderSide::Buy,
491 Price::zero(price_precision),
492 Quantity::zero(size_precision),
493 0,
494 );
495 bid_counts[i] = 0;
496 }
497
498 let ask_len = msg.asks.len().min(DEPTH10_LEN);
500 for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
501 let price = parse_price(&level.price, price_precision)?;
502 let size = parse_quantity(&level.size, size_precision)?;
503 let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
504
505 let ask_order = BookOrder::new(OrderSide::Sell, price, size, 0);
506 asks[i] = ask_order;
507 ask_counts[i] = orders_count;
508 }
509
510 for i in ask_len..DEPTH10_LEN {
512 asks[i] = BookOrder::new(
513 OrderSide::Sell,
514 Price::zero(price_precision),
515 Quantity::zero(size_precision),
516 0,
517 );
518 ask_counts[i] = 0;
519 }
520
521 let ts_event = parse_millisecond_timestamp(msg.ts);
522
523 Ok(OrderBookDepth10::new(
524 instrument_id,
525 bids,
526 asks,
527 bid_counts,
528 ask_counts,
529 RecordFlag::F_SNAPSHOT as u8,
530 msg.seq_id, ts_event,
532 ts_init,
533 ))
534}
535
536pub fn parse_ticker_msg(
542 msg: &OKXTickerMsg,
543 instrument_id: InstrumentId,
544 price_precision: u8,
545 size_precision: u8,
546 ts_init: UnixNanos,
547) -> anyhow::Result<QuoteTick> {
548 let bid_price = parse_price(&msg.bid_px, price_precision)?;
549 let ask_price = parse_price(&msg.ask_px, price_precision)?;
550 let bid_size = parse_quantity(&msg.bid_sz, size_precision)?;
551 let ask_size = parse_quantity(&msg.ask_sz, size_precision)?;
552 let ts_event = parse_millisecond_timestamp(msg.ts);
553
554 QuoteTick::new_checked(
555 instrument_id,
556 bid_price,
557 ask_price,
558 bid_size,
559 ask_size,
560 ts_event,
561 ts_init,
562 )
563}
564
565pub fn parse_trade_msg(
571 msg: &OKXTradeMsg,
572 instrument_id: InstrumentId,
573 price_precision: u8,
574 size_precision: u8,
575 ts_init: UnixNanos,
576) -> anyhow::Result<TradeTick> {
577 let price = parse_price(&msg.px, price_precision)?;
578 let size = parse_quantity(&msg.sz, size_precision)?;
579 let aggressor_side: AggressorSide = msg.side.into();
580 let trade_id = TradeId::new(&msg.trade_id);
581 let ts_event = parse_millisecond_timestamp(msg.ts);
582
583 TradeTick::new_checked(
584 instrument_id,
585 price,
586 size,
587 aggressor_side,
588 trade_id,
589 ts_event,
590 ts_init,
591 )
592}
593
594pub fn parse_mark_price_msg(
600 msg: &OKXMarkPriceMsg,
601 instrument_id: InstrumentId,
602 price_precision: u8,
603 ts_init: UnixNanos,
604) -> anyhow::Result<MarkPriceUpdate> {
605 let price = parse_price(&msg.mark_px, price_precision)?;
606 let ts_event = parse_millisecond_timestamp(msg.ts);
607
608 Ok(MarkPriceUpdate::new(
609 instrument_id,
610 price,
611 ts_event,
612 ts_init,
613 ))
614}
615
616pub fn parse_index_price_msg(
622 msg: &OKXIndexPriceMsg,
623 instrument_id: InstrumentId,
624 price_precision: u8,
625 ts_init: UnixNanos,
626) -> anyhow::Result<IndexPriceUpdate> {
627 let price = parse_price(&msg.idx_px, price_precision)?;
628 let ts_event = parse_millisecond_timestamp(msg.ts);
629
630 Ok(IndexPriceUpdate::new(
631 instrument_id,
632 price,
633 ts_event,
634 ts_init,
635 ))
636}
637
638pub fn parse_candle_msg(
644 msg: &OKXCandleMsg,
645 bar_type: BarType,
646 price_precision: u8,
647 size_precision: u8,
648 ts_init: UnixNanos,
649) -> anyhow::Result<Bar> {
650 let open = parse_price(&msg.o, price_precision)?;
651 let high = parse_price(&msg.h, price_precision)?;
652 let low = parse_price(&msg.l, price_precision)?;
653 let close = parse_price(&msg.c, price_precision)?;
654 let volume = parse_quantity(&msg.vol, size_precision)?;
655 let ts_event = parse_millisecond_timestamp(msg.ts);
656
657 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
658}
659
660pub fn parse_order_msg_vec(
666 data: Vec<OKXOrderMsg>,
667 account_id: AccountId,
668 instruments: &AHashMap<Ustr, InstrumentAny>,
669 fee_cache: &AHashMap<Ustr, Money>,
670 filled_qty_cache: &AHashMap<Ustr, Quantity>,
671 ts_init: UnixNanos,
672) -> anyhow::Result<Vec<ExecutionReport>> {
673 let mut order_reports = Vec::with_capacity(data.len());
674
675 for msg in data {
676 match parse_order_msg(
677 &msg,
678 account_id,
679 instruments,
680 fee_cache,
681 filled_qty_cache,
682 ts_init,
683 ) {
684 Ok(report) => order_reports.push(report),
685 Err(e) => tracing::error!("Failed to parse execution report from message: {e}"),
686 }
687 }
688
689 Ok(order_reports)
690}
691
692fn has_acc_fill_sz_increased(
694 acc_fill_sz: &Option<String>,
695 previous_filled_qty: Option<Quantity>,
696 size_precision: u8,
697) -> bool {
698 if let Some(acc_str) = acc_fill_sz {
699 if acc_str.is_empty() || acc_str == "0" {
700 return false;
701 }
702 if let Ok(current_filled) = parse_quantity(acc_str, size_precision) {
703 if let Some(prev_qty) = previous_filled_qty {
704 return current_filled > prev_qty;
705 }
706 return !current_filled.is_zero();
707 }
708 }
709 false
710}
711
712pub fn parse_order_msg(
719 msg: &OKXOrderMsg,
720 account_id: AccountId,
721 instruments: &AHashMap<Ustr, InstrumentAny>,
722 fee_cache: &AHashMap<Ustr, Money>,
723 filled_qty_cache: &AHashMap<Ustr, Quantity>,
724 ts_init: UnixNanos,
725) -> anyhow::Result<ExecutionReport> {
726 let instrument = instruments
727 .get(&msg.inst_id)
728 .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
729
730 let previous_fee = fee_cache.get(&msg.ord_id).copied();
731 let previous_filled_qty = filled_qty_cache.get(&msg.ord_id).copied();
732
733 let has_new_fill = (!msg.fill_sz.is_empty() && msg.fill_sz != "0")
734 || !msg.trade_id.is_empty()
735 || has_acc_fill_sz_increased(
736 &msg.acc_fill_sz,
737 previous_filled_qty,
738 instrument.size_precision(),
739 );
740
741 match msg.state {
742 OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled if has_new_fill => {
743 parse_fill_report(
744 msg,
745 instrument,
746 account_id,
747 previous_fee,
748 previous_filled_qty,
749 ts_init,
750 )
751 .map(ExecutionReport::Fill)
752 }
753 _ => parse_order_status_report(msg, instrument, account_id, ts_init)
754 .map(ExecutionReport::Order),
755 }
756}
757
758pub fn parse_algo_order_msg(
765 msg: OKXAlgoOrderMsg,
766 account_id: AccountId,
767 instruments: &AHashMap<Ustr, InstrumentAny>,
768 ts_init: UnixNanos,
769) -> anyhow::Result<ExecutionReport> {
770 let inst = instruments
771 .get(&msg.inst_id)
772 .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
773
774 parse_algo_order_status_report(&msg, inst, account_id, ts_init).map(ExecutionReport::Order)
776}
777
778pub fn parse_algo_order_status_report(
785 msg: &OKXAlgoOrderMsg,
786 instrument: &InstrumentAny,
787 account_id: AccountId,
788 ts_init: UnixNanos,
789) -> anyhow::Result<OrderStatusReport> {
790 let client_order_id = if msg.cl_ord_id.is_empty() {
792 parse_client_order_id(&msg.algo_cl_ord_id)
793 } else {
794 parse_client_order_id(&msg.cl_ord_id)
795 };
796
797 let venue_order_id = if msg.ord_id.is_empty() {
799 VenueOrderId::new(msg.algo_id.as_str())
800 } else {
801 VenueOrderId::new(msg.ord_id.as_str())
802 };
803
804 let order_side: OrderSide = msg.side.into();
805
806 let order_type = if is_market_price(&msg.ord_px) {
808 OrderType::StopMarket
809 } else {
810 OrderType::StopLimit
811 };
812
813 let status: OrderStatus = msg.state.into();
814
815 let quantity = parse_quantity(msg.sz.as_str(), instrument.size_precision())?;
816
817 let filled_qty = if msg.actual_sz.is_empty() || msg.actual_sz == "0" {
819 Quantity::zero(instrument.size_precision())
820 } else {
821 parse_quantity(msg.actual_sz.as_str(), instrument.size_precision())?
822 };
823
824 let trigger_px = parse_price(msg.trigger_px.as_str(), instrument.price_precision())?;
825
826 let price = if msg.ord_px != "-1" {
828 Some(parse_price(
829 msg.ord_px.as_str(),
830 instrument.price_precision(),
831 )?)
832 } else {
833 None
834 };
835
836 let trigger_type = match msg.trigger_px_type {
837 OKXTriggerType::Last => TriggerType::LastPrice,
838 OKXTriggerType::Mark => TriggerType::MarkPrice,
839 OKXTriggerType::Index => TriggerType::IndexPrice,
840 OKXTriggerType::None => TriggerType::Default,
841 };
842
843 let mut report = OrderStatusReport::new(
844 account_id,
845 instrument.id(),
846 client_order_id,
847 venue_order_id,
848 order_side,
849 order_type,
850 TimeInForce::Gtc, status,
852 quantity,
853 filled_qty,
854 msg.c_time.into(), msg.u_time.into(), ts_init,
857 None, );
859
860 report.trigger_price = Some(trigger_px);
861 report.trigger_type = Some(trigger_type);
862
863 if let Some(limit_price) = price {
864 report.price = Some(limit_price);
865 }
866
867 Ok(report)
868}
869
870pub fn parse_order_status_report(
876 msg: &OKXOrderMsg,
877 instrument: &InstrumentAny,
878 account_id: AccountId,
879 ts_init: UnixNanos,
880) -> anyhow::Result<OrderStatusReport> {
881 let client_order_id = parse_client_order_id(&msg.cl_ord_id);
882 let venue_order_id = VenueOrderId::new(msg.ord_id);
883 let order_side: OrderSide = msg.side.into();
884
885 let okx_order_type = msg.ord_type;
886
887 let order_type = match okx_order_type {
889 OKXOrderType::Trigger => {
891 if is_market_price(&msg.px) {
892 OrderType::StopMarket
893 } else {
894 OrderType::StopLimit
895 }
896 }
897 OKXOrderType::Fok | OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => {
901 if is_market_price(&msg.px) {
902 OrderType::Market
903 } else {
904 OrderType::Limit
905 }
906 }
907 _ => msg.ord_type.into(),
909 };
910 let order_status: OrderStatus = msg.state.into();
911
912 let time_in_force = match okx_order_type {
913 OKXOrderType::Fok => TimeInForce::Fok,
914 OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
915 _ => TimeInForce::Gtc,
916 };
917
918 let size_precision = instrument.size_precision();
919
920 let is_quote_qty_explicit = msg.tgt_ccy == Some(OKXTargetCurrency::QuoteCcy);
926
927 let is_quote_qty_heuristic = msg.tgt_ccy.is_none()
932 && (msg.inst_type == OKXInstrumentType::Spot || msg.inst_type == OKXInstrumentType::Margin)
933 && msg.side == OKXSide::Buy
934 && msg.ord_type == OKXOrderType::Market;
935
936 let (quantity, filled_qty) = if is_quote_qty_explicit || is_quote_qty_heuristic {
937 let sz_quote = msg.sz.parse::<f64>().map_err(|e| {
939 anyhow::anyhow!("Failed to parse sz='{}' as quote quantity: {}", msg.sz, e)
940 })?;
941
942 let conversion_price = if !is_market_price(&msg.px) {
945 msg.px
947 .parse::<f64>()
948 .map_err(|e| anyhow::anyhow!("Failed to parse px='{}': {}", msg.px, e))?
949 } else if !msg.avg_px.is_empty() && msg.avg_px != "0" {
950 msg.avg_px
952 .parse::<f64>()
953 .map_err(|e| anyhow::anyhow!("Failed to parse avg_px='{}': {}", msg.avg_px, e))?
954 } else {
955 0.0
956 };
957
958 let quantity_base = if conversion_price > 0.0 {
960 Quantity::new(sz_quote / conversion_price, size_precision)
961 } else {
962 parse_quantity(&msg.sz, size_precision)?
965 };
966
967 let filled_qty =
968 parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
969
970 (quantity_base, filled_qty)
971 } else {
972 let quantity = parse_quantity(&msg.sz, size_precision)?;
974 let filled_qty =
975 parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
976
977 (quantity, filled_qty)
978 };
979
980 let (quantity, filled_qty) = if (is_quote_qty_explicit || is_quote_qty_heuristic)
983 && msg.state == OKXOrderStatus::Filled
984 && filled_qty.is_positive()
985 {
986 (filled_qty, filled_qty)
987 } else {
988 (quantity, filled_qty)
989 };
990
991 let ts_accepted = parse_millisecond_timestamp(msg.c_time);
992 let ts_last = parse_millisecond_timestamp(msg.u_time);
993
994 let is_liquidation = matches!(
995 msg.category,
996 OKXOrderCategory::FullLiquidation | OKXOrderCategory::PartialLiquidation
997 );
998
999 let is_adl = msg.category == OKXOrderCategory::Adl;
1000
1001 if is_liquidation {
1002 tracing::warn!(
1003 order_id = msg.ord_id.as_str(),
1004 category = ?msg.category,
1005 inst_id = msg.inst_id.as_str(),
1006 state = ?msg.state,
1007 "Liquidation order status update"
1008 );
1009 }
1010
1011 if is_adl {
1012 tracing::warn!(
1013 order_id = msg.ord_id.as_str(),
1014 inst_id = msg.inst_id.as_str(),
1015 state = ?msg.state,
1016 "ADL (Auto-Deleveraging) order status update"
1017 );
1018 }
1019
1020 let mut report = OrderStatusReport::new(
1021 account_id,
1022 instrument.id(),
1023 client_order_id,
1024 venue_order_id,
1025 order_side,
1026 order_type,
1027 time_in_force,
1028 order_status,
1029 quantity,
1030 filled_qty,
1031 ts_accepted,
1032 ts_init,
1033 ts_last,
1034 None, );
1036
1037 let price_precision = instrument.price_precision();
1038
1039 if okx_order_type == OKXOrderType::Trigger {
1040 if !is_market_price(&msg.px)
1043 && let Ok(price) = parse_price(&msg.px, price_precision)
1044 {
1045 report = report.with_price(price);
1046 }
1047 } else {
1048 if !is_market_price(&msg.px)
1050 && let Ok(price) = parse_price(&msg.px, price_precision)
1051 {
1052 report = report.with_price(price);
1053 }
1054 }
1055
1056 if !msg.avg_px.is_empty()
1057 && let Ok(avg_px) = msg.avg_px.parse::<f64>()
1058 {
1059 report = report.with_avg_px(avg_px);
1060 }
1061
1062 if matches!(
1063 msg.ord_type,
1064 OKXOrderType::PostOnly | OKXOrderType::MmpAndPostOnly
1065 ) || matches!(
1066 msg.cancel_source.as_deref(),
1067 Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
1068 ) || matches!(
1069 msg.cancel_source_reason.as_deref(),
1070 Some(reason) if reason.contains("POST_ONLY")
1071 ) {
1072 report = report.with_post_only(true);
1073 }
1074
1075 if msg.reduce_only == "true" {
1076 report = report.with_reduce_only(true);
1077 }
1078
1079 if let Some(reason) = msg
1080 .cancel_source_reason
1081 .as_ref()
1082 .filter(|reason| !reason.is_empty())
1083 {
1084 report = report.with_cancel_reason(reason.clone());
1085 } else if let Some(source) = msg
1086 .cancel_source
1087 .as_ref()
1088 .filter(|source| !source.is_empty())
1089 {
1090 let reason = if source == OKX_POST_ONLY_CANCEL_SOURCE {
1091 OKX_POST_ONLY_CANCEL_REASON.to_string()
1092 } else {
1093 format!("cancel_source={source}")
1094 };
1095 report = report.with_cancel_reason(reason);
1096 }
1097
1098 Ok(report)
1099}
1100
1101pub fn parse_fill_report(
1107 msg: &OKXOrderMsg,
1108 instrument: &InstrumentAny,
1109 account_id: AccountId,
1110 previous_fee: Option<Money>,
1111 previous_filled_qty: Option<Quantity>,
1112 ts_init: UnixNanos,
1113) -> anyhow::Result<FillReport> {
1114 let client_order_id = parse_client_order_id(&msg.cl_ord_id);
1115 let venue_order_id = VenueOrderId::new(msg.ord_id);
1116
1117 let trade_id = if msg.trade_id.is_empty() {
1120 TradeId::from(UUID4::new().to_string().as_str())
1121 } else {
1122 TradeId::from(msg.trade_id.as_str())
1123 };
1124
1125 let order_side: OrderSide = msg.side.into();
1126
1127 let price_precision = instrument.price_precision();
1128 let size_precision = instrument.size_precision();
1129
1130 let price_str = if !msg.fill_px.is_empty() {
1131 &msg.fill_px
1132 } else if !msg.avg_px.is_empty() {
1133 &msg.avg_px
1134 } else {
1135 &msg.px
1136 };
1137 let last_px = parse_price(price_str, price_precision).map_err(|e| {
1138 anyhow::anyhow!(
1139 "Failed to parse price (fill_px='{}', avg_px='{}', px='{}'): {}",
1140 msg.fill_px,
1141 msg.avg_px,
1142 msg.px,
1143 e
1144 )
1145 })?;
1146
1147 let last_qty = if !msg.fill_sz.is_empty() && msg.fill_sz != "0" {
1150 parse_quantity(&msg.fill_sz, size_precision)
1151 .map_err(|e| anyhow::anyhow!("Failed to parse fill_sz='{}': {e}", msg.fill_sz,))?
1152 } else if let Some(ref acc_fill_sz) = msg.acc_fill_sz {
1153 if !acc_fill_sz.is_empty() && acc_fill_sz != "0" {
1155 let current_filled = parse_quantity(acc_fill_sz, size_precision).map_err(|e| {
1156 anyhow::anyhow!("Failed to parse acc_fill_sz='{}': {e}", acc_fill_sz,)
1157 })?;
1158
1159 if let Some(prev_qty) = previous_filled_qty {
1161 let incremental = current_filled - prev_qty;
1162 if incremental.is_zero() {
1163 anyhow::bail!(
1164 "Incremental fill quantity is zero (acc_fill_sz='{}', previous_filled_qty={})",
1165 acc_fill_sz,
1166 prev_qty
1167 );
1168 }
1169 incremental
1170 } else {
1171 current_filled
1173 }
1174 } else {
1175 anyhow::bail!(
1176 "Cannot determine fill quantity: fill_sz is empty/zero and acc_fill_sz is empty/zero"
1177 );
1178 }
1179 } else {
1180 anyhow::bail!(
1181 "Cannot determine fill quantity: fill_sz='{}' and acc_fill_sz is None",
1182 msg.fill_sz
1183 );
1184 };
1185
1186 let fee_str = msg.fee.as_deref().unwrap_or("0");
1187 let fee_value = fee_str
1188 .parse::<f64>()
1189 .map_err(|e| anyhow::anyhow!("Failed to parse fee '{}': {}", fee_str, e))?;
1190
1191 let fee_currency = parse_fee_currency(msg.fee_ccy.as_str(), fee_value, || {
1192 format!("fill report for inst_id={}", msg.inst_id)
1193 });
1194
1195 let total_fee = parse_fee(msg.fee.as_deref(), fee_currency)
1197 .map_err(|e| anyhow::anyhow!("Failed to parse fee={:?}: {}", msg.fee, e))?;
1198
1199 let commission = if let Some(previous_fee) = previous_fee {
1201 let incremental = total_fee - previous_fee;
1202
1203 if incremental < Money::zero(fee_currency) {
1204 tracing::debug!(
1205 order_id = msg.ord_id.as_str(),
1206 total_fee = %total_fee,
1207 previous_fee = %previous_fee,
1208 incremental = %incremental,
1209 "Negative incremental fee detected - likely a maker rebate or fee refund"
1210 );
1211 }
1212
1213 if previous_fee >= Money::zero(fee_currency)
1216 && total_fee > Money::zero(fee_currency)
1217 && incremental > total_fee
1218 {
1219 tracing::error!(
1220 order_id = msg.ord_id.as_str(),
1221 total_fee = %total_fee,
1222 previous_fee = %previous_fee,
1223 incremental = %incremental,
1224 "Incremental fee exceeds total fee - likely fee cache corruption, using total fee as fallback"
1225 );
1226 total_fee
1227 } else {
1228 incremental
1229 }
1230 } else {
1231 total_fee
1232 };
1233
1234 let liquidity_side: LiquiditySide = msg.exec_type.into();
1235 let ts_event = parse_millisecond_timestamp(msg.fill_time);
1236
1237 let is_liquidation = matches!(
1238 msg.category,
1239 OKXOrderCategory::FullLiquidation | OKXOrderCategory::PartialLiquidation
1240 );
1241
1242 let is_adl = msg.category == OKXOrderCategory::Adl;
1243
1244 if is_liquidation {
1245 tracing::warn!(
1246 order_id = msg.ord_id.as_str(),
1247 category = ?msg.category,
1248 inst_id = msg.inst_id.as_str(),
1249 side = ?msg.side,
1250 fill_sz = %msg.fill_sz,
1251 fill_px = %msg.fill_px,
1252 "Liquidation order detected"
1253 );
1254 }
1255
1256 if is_adl {
1257 tracing::warn!(
1258 order_id = msg.ord_id.as_str(),
1259 inst_id = msg.inst_id.as_str(),
1260 side = ?msg.side,
1261 fill_sz = %msg.fill_sz,
1262 fill_px = %msg.fill_px,
1263 "ADL (Auto-Deleveraging) order detected"
1264 );
1265 }
1266
1267 let report = FillReport::new(
1268 account_id,
1269 instrument.id(),
1270 venue_order_id,
1271 trade_id,
1272 order_side,
1273 last_qty,
1274 last_px,
1275 commission,
1276 liquidity_side,
1277 client_order_id,
1278 None,
1279 ts_event,
1280 ts_init,
1281 None, );
1283
1284 Ok(report)
1285}
1286
1287#[allow(clippy::too_many_arguments)]
1300pub fn parse_ws_message_data(
1301 channel: &OKXWsChannel,
1302 data: serde_json::Value,
1303 instrument_id: &InstrumentId,
1304 price_precision: u8,
1305 size_precision: u8,
1306 ts_init: UnixNanos,
1307 funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
1308 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
1309) -> anyhow::Result<Option<NautilusWsMessage>> {
1310 match channel {
1311 OKXWsChannel::Instruments => {
1312 if let Ok(msg) = serde_json::from_value::<OKXInstrument>(data) {
1313 let (margin_init, margin_maint, maker_fee, taker_fee) =
1315 instruments_cache.get(&Ustr::from(&msg.inst_id)).map_or(
1316 (None, None, None, None),
1317 extract_fees_from_cached_instrument,
1318 );
1319
1320 match parse_instrument_any(
1321 &msg,
1322 margin_init,
1323 margin_maint,
1324 maker_fee,
1325 taker_fee,
1326 ts_init,
1327 )? {
1328 Some(inst_any) => Ok(Some(NautilusWsMessage::Instrument(Box::new(inst_any)))),
1329 None => {
1330 tracing::warn!("Empty instrument payload: {:?}", msg);
1331 Ok(None)
1332 }
1333 }
1334 } else {
1335 anyhow::bail!("Failed to deserialize instrument payload")
1336 }
1337 }
1338 OKXWsChannel::BboTbt => {
1339 let data_vec = parse_quote_msg_vec(
1340 data,
1341 instrument_id,
1342 price_precision,
1343 size_precision,
1344 ts_init,
1345 )?;
1346 Ok(Some(NautilusWsMessage::Data(data_vec)))
1347 }
1348 OKXWsChannel::Tickers => {
1349 let data_vec = parse_ticker_msg_vec(
1350 data,
1351 instrument_id,
1352 price_precision,
1353 size_precision,
1354 ts_init,
1355 )?;
1356 Ok(Some(NautilusWsMessage::Data(data_vec)))
1357 }
1358 OKXWsChannel::Trades => {
1359 let data_vec = parse_trade_msg_vec(
1360 data,
1361 instrument_id,
1362 price_precision,
1363 size_precision,
1364 ts_init,
1365 )?;
1366 Ok(Some(NautilusWsMessage::Data(data_vec)))
1367 }
1368 OKXWsChannel::MarkPrice => {
1369 let data_vec = parse_mark_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
1370 Ok(Some(NautilusWsMessage::Data(data_vec)))
1371 }
1372 OKXWsChannel::IndexTickers => {
1373 let data_vec =
1374 parse_index_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
1375 Ok(Some(NautilusWsMessage::Data(data_vec)))
1376 }
1377 OKXWsChannel::FundingRate => {
1378 let data_vec = parse_funding_rate_msg_vec(data, instrument_id, ts_init, funding_cache)?;
1379 Ok(Some(NautilusWsMessage::FundingRates(data_vec)))
1380 }
1381 channel if okx_channel_to_bar_spec(channel).is_some() => {
1382 let bar_spec = okx_channel_to_bar_spec(channel).expect("bar_spec checked above");
1383 let data_vec = parse_candle_msg_vec(
1384 data,
1385 instrument_id,
1386 price_precision,
1387 size_precision,
1388 bar_spec,
1389 ts_init,
1390 )?;
1391 Ok(Some(NautilusWsMessage::Data(data_vec)))
1392 }
1393 OKXWsChannel::Books
1394 | OKXWsChannel::BooksTbt
1395 | OKXWsChannel::Books5
1396 | OKXWsChannel::Books50Tbt => {
1397 if let Ok(book_msgs) = serde_json::from_value::<Vec<OKXBookMsg>>(data) {
1398 let data_vec = parse_book10_msg_vec(
1399 book_msgs,
1400 instrument_id,
1401 price_precision,
1402 size_precision,
1403 ts_init,
1404 )?;
1405 Ok(Some(NautilusWsMessage::Data(data_vec)))
1406 } else {
1407 anyhow::bail!("Failed to deserialize Books channel data as Vec<OKXBookMsg>")
1408 }
1409 }
1410 _ => {
1411 tracing::warn!("Unsupported channel for message parsing: {channel:?}");
1412 Ok(None)
1413 }
1414 }
1415}
1416
1417#[cfg(test)]
1421mod tests {
1422 use ahash::AHashMap;
1423 use nautilus_core::nanos::UnixNanos;
1424 use nautilus_model::{
1425 data::bar::BAR_SPEC_1_DAY_LAST,
1426 identifiers::{ClientOrderId, Symbol},
1427 instruments::CryptoPerpetual,
1428 types::Currency,
1429 };
1430 use rstest::rstest;
1431 use rust_decimal::Decimal;
1432 use ustr::Ustr;
1433
1434 use super::*;
1435 use crate::{
1436 OKXPositionSide,
1437 common::{
1438 enums::{OKXExecType, OKXInstrumentType, OKXOrderType, OKXSide, OKXTradeMode},
1439 parse::parse_account_state,
1440 testing::load_test_json,
1441 },
1442 http::models::OKXAccount,
1443 websocket::messages::{OKXWebSocketArg, OKXWebSocketEvent},
1444 };
1445
1446 fn create_stub_instrument() -> CryptoPerpetual {
1447 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1448 CryptoPerpetual::new(
1449 instrument_id,
1450 Symbol::from("BTC-USDT-SWAP"),
1451 Currency::BTC(),
1452 Currency::USDT(),
1453 Currency::USDT(),
1454 false,
1455 2,
1456 8,
1457 Price::from("0.01"),
1458 Quantity::from("0.00000001"),
1459 None,
1460 None,
1461 None,
1462 None,
1463 None,
1464 None,
1465 None,
1466 None,
1467 None,
1468 None,
1469 None,
1470 None,
1471 UnixNanos::default(),
1472 UnixNanos::default(),
1473 )
1474 }
1475
1476 fn create_stub_order_msg(
1477 fill_sz: &str,
1478 acc_fill_sz: Option<String>,
1479 order_id: &str,
1480 trade_id: &str,
1481 ) -> OKXOrderMsg {
1482 OKXOrderMsg {
1483 acc_fill_sz,
1484 avg_px: "50000.0".to_string(),
1485 c_time: 1746947317401,
1486 cancel_source: None,
1487 cancel_source_reason: None,
1488 category: OKXOrderCategory::Normal,
1489 ccy: Ustr::from("USDT"),
1490 cl_ord_id: "test_order_1".to_string(),
1491 algo_cl_ord_id: None,
1492 fee: Some("-1.0".to_string()),
1493 fee_ccy: Ustr::from("USDT"),
1494 fill_px: "50000.0".to_string(),
1495 fill_sz: fill_sz.to_string(),
1496 fill_time: 1746947317402,
1497 inst_id: Ustr::from("BTC-USDT-SWAP"),
1498 inst_type: OKXInstrumentType::Swap,
1499 lever: "2.0".to_string(),
1500 ord_id: Ustr::from(order_id),
1501 ord_type: OKXOrderType::Market,
1502 pnl: "0".to_string(),
1503 pos_side: OKXPositionSide::Long,
1504 px: "".to_string(),
1505 reduce_only: "false".to_string(),
1506 side: OKXSide::Buy,
1507 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
1508 exec_type: OKXExecType::Taker,
1509 sz: "0.03".to_string(),
1510 td_mode: OKXTradeMode::Isolated,
1511 tgt_ccy: None,
1512 trade_id: trade_id.to_string(),
1513 u_time: 1746947317402,
1514 }
1515 }
1516
1517 #[rstest]
1518 fn test_parse_books_snapshot() {
1519 let json_data = load_test_json("ws_books_snapshot.json");
1520 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1521 let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
1522 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1523 _ => panic!("Expected a `BookData` variant"),
1524 };
1525
1526 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1527 let deltas = parse_book_msg(
1528 &okx_books[0],
1529 instrument_id,
1530 2,
1531 1,
1532 &action,
1533 UnixNanos::default(),
1534 )
1535 .unwrap();
1536
1537 assert_eq!(deltas.instrument_id, instrument_id);
1538 assert_eq!(deltas.deltas.len(), 16);
1539 assert_eq!(deltas.flags, 32);
1540 assert_eq!(deltas.sequence, 123456);
1541 assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
1542 assert_eq!(deltas.ts_init, UnixNanos::default());
1543
1544 assert!(!deltas.deltas.is_empty());
1546 let bid_deltas: Vec<_> = deltas
1548 .deltas
1549 .iter()
1550 .filter(|d| d.order.side == OrderSide::Buy)
1551 .collect();
1552 let ask_deltas: Vec<_> = deltas
1553 .deltas
1554 .iter()
1555 .filter(|d| d.order.side == OrderSide::Sell)
1556 .collect();
1557 assert!(!bid_deltas.is_empty());
1558 assert!(!ask_deltas.is_empty());
1559 }
1560
1561 #[rstest]
1562 fn test_parse_books_update() {
1563 let json_data = load_test_json("ws_books_update.json");
1564 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1565 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1566 let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
1567 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1568 _ => panic!("Expected a `BookData` variant"),
1569 };
1570
1571 let deltas = parse_book_msg(
1572 &okx_books[0],
1573 instrument_id,
1574 2,
1575 1,
1576 &action,
1577 UnixNanos::default(),
1578 )
1579 .unwrap();
1580
1581 assert_eq!(deltas.instrument_id, instrument_id);
1582 assert_eq!(deltas.deltas.len(), 16);
1583 assert_eq!(deltas.flags, 0);
1584 assert_eq!(deltas.sequence, 123457);
1585 assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
1586 assert_eq!(deltas.ts_init, UnixNanos::default());
1587
1588 assert!(!deltas.deltas.is_empty());
1590 let bid_deltas: Vec<_> = deltas
1592 .deltas
1593 .iter()
1594 .filter(|d| d.order.side == OrderSide::Buy)
1595 .collect();
1596 let ask_deltas: Vec<_> = deltas
1597 .deltas
1598 .iter()
1599 .filter(|d| d.order.side == OrderSide::Sell)
1600 .collect();
1601 assert!(!bid_deltas.is_empty());
1602 assert!(!ask_deltas.is_empty());
1603 }
1604
1605 #[rstest]
1606 fn test_parse_tickers() {
1607 let json_data = load_test_json("ws_tickers.json");
1608 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1609 let okx_tickers: Vec<OKXTickerMsg> = match msg {
1610 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1611 _ => panic!("Expected a `Data` variant"),
1612 };
1613
1614 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1615 let trade =
1616 parse_ticker_msg(&okx_tickers[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
1617
1618 assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1619 assert_eq!(trade.bid_price, Price::from("8888.88"));
1620 assert_eq!(trade.ask_price, Price::from("9999.99"));
1621 assert_eq!(trade.bid_size, Quantity::from(5));
1622 assert_eq!(trade.ask_size, Quantity::from(11));
1623 assert_eq!(trade.ts_event, UnixNanos::from(1597026383085000000));
1624 assert_eq!(trade.ts_init, UnixNanos::default());
1625 }
1626
1627 #[rstest]
1628 fn test_parse_quotes() {
1629 let json_data = load_test_json("ws_bbo_tbt.json");
1630 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1631 let okx_quotes: Vec<OKXBookMsg> = match msg {
1632 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1633 _ => panic!("Expected a `Data` variant"),
1634 };
1635 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1636
1637 let quote =
1638 parse_quote_msg(&okx_quotes[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
1639
1640 assert_eq!(quote.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1641 assert_eq!(quote.bid_price, Price::from("8476.97"));
1642 assert_eq!(quote.ask_price, Price::from("8476.98"));
1643 assert_eq!(quote.bid_size, Quantity::from(256));
1644 assert_eq!(quote.ask_size, Quantity::from(415));
1645 assert_eq!(quote.ts_event, UnixNanos::from(1597026383085000000));
1646 assert_eq!(quote.ts_init, UnixNanos::default());
1647 }
1648
1649 #[rstest]
1650 fn test_parse_trades() {
1651 let json_data = load_test_json("ws_trades.json");
1652 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1653 let okx_trades: Vec<OKXTradeMsg> = match msg {
1654 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1655 _ => panic!("Expected a `Data` variant"),
1656 };
1657
1658 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1659 let trade =
1660 parse_trade_msg(&okx_trades[0], instrument_id, 1, 8, UnixNanos::default()).unwrap();
1661
1662 assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1663 assert_eq!(trade.price, Price::from("42219.9"));
1664 assert_eq!(trade.size, Quantity::from("0.12060306"));
1665 assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
1666 assert_eq!(trade.trade_id, TradeId::from("130639474"));
1667 assert_eq!(trade.ts_event, UnixNanos::from(1630048897897000000));
1668 assert_eq!(trade.ts_init, UnixNanos::default());
1669 }
1670
1671 #[rstest]
1672 fn test_parse_candle() {
1673 let json_data = load_test_json("ws_candle.json");
1674 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1675 let okx_candles: Vec<OKXCandleMsg> = match msg {
1676 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1677 _ => panic!("Expected a `Data` variant"),
1678 };
1679
1680 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1681 let bar_type = BarType::new(
1682 instrument_id,
1683 BAR_SPEC_1_DAY_LAST,
1684 AggregationSource::External,
1685 );
1686 let bar = parse_candle_msg(&okx_candles[0], bar_type, 2, 0, UnixNanos::default()).unwrap();
1687
1688 assert_eq!(bar.bar_type, bar_type);
1689 assert_eq!(bar.open, Price::from("8533.02"));
1690 assert_eq!(bar.high, Price::from("8553.74"));
1691 assert_eq!(bar.low, Price::from("8527.17"));
1692 assert_eq!(bar.close, Price::from("8548.26"));
1693 assert_eq!(bar.volume, Quantity::from(45247));
1694 assert_eq!(bar.ts_event, UnixNanos::from(1597026383085000000));
1695 assert_eq!(bar.ts_init, UnixNanos::default());
1696 }
1697
1698 #[rstest]
1699 fn test_parse_funding_rate() {
1700 let json_data = load_test_json("ws_funding_rate.json");
1701 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1702
1703 let okx_funding_rates: Vec<crate::websocket::messages::OKXFundingRateMsg> = match msg {
1704 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1705 _ => panic!("Expected a `Data` variant"),
1706 };
1707
1708 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1709 let funding_rate =
1710 parse_funding_rate_msg(&okx_funding_rates[0], instrument_id, UnixNanos::default())
1711 .unwrap();
1712
1713 assert_eq!(funding_rate.instrument_id, instrument_id);
1714 assert_eq!(funding_rate.rate, Decimal::new(1, 4));
1715 assert_eq!(
1716 funding_rate.next_funding_ns,
1717 Some(UnixNanos::from(1744590349506000000))
1718 );
1719 assert_eq!(funding_rate.ts_event, UnixNanos::from(1744590349506000000));
1720 assert_eq!(funding_rate.ts_init, UnixNanos::default());
1721 }
1722
1723 #[rstest]
1724 fn test_parse_book_vec() {
1725 let json_data = load_test_json("ws_books_snapshot.json");
1726 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1727 let (msgs, action): (Vec<OKXBookMsg>, OKXBookAction) = match event {
1728 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1729 _ => panic!("Expected BookData"),
1730 };
1731
1732 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1733 let deltas_vec =
1734 parse_book_msg_vec(msgs, &instrument_id, 8, 1, action, UnixNanos::default()).unwrap();
1735
1736 assert_eq!(deltas_vec.len(), 1);
1737
1738 if let Data::Deltas(d) = &deltas_vec[0] {
1739 assert_eq!(d.sequence, 123456);
1740 } else {
1741 panic!("Expected Deltas");
1742 }
1743 }
1744
1745 #[rstest]
1746 fn test_parse_ticker_vec() {
1747 let json_data = load_test_json("ws_tickers.json");
1748 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1749 let data_val: serde_json::Value = match event {
1750 OKXWebSocketEvent::Data { data, .. } => data,
1751 _ => panic!("Expected Data"),
1752 };
1753
1754 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1755 let quotes_vec =
1756 parse_ticker_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1757
1758 assert_eq!(quotes_vec.len(), 1);
1759
1760 if let Data::Quote(q) = "es_vec[0] {
1761 assert_eq!(q.bid_price, Price::from("8888.88000000"));
1762 assert_eq!(q.ask_price, Price::from("9999.99"));
1763 } else {
1764 panic!("Expected Quote");
1765 }
1766 }
1767
1768 #[rstest]
1769 fn test_parse_trade_vec() {
1770 let json_data = load_test_json("ws_trades.json");
1771 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1772 let data_val: serde_json::Value = match event {
1773 OKXWebSocketEvent::Data { data, .. } => data,
1774 _ => panic!("Expected Data"),
1775 };
1776
1777 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1778 let trades_vec =
1779 parse_trade_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1780
1781 assert_eq!(trades_vec.len(), 1);
1782
1783 if let Data::Trade(t) = &trades_vec[0] {
1784 assert_eq!(t.trade_id, TradeId::new("130639474"));
1785 } else {
1786 panic!("Expected Trade");
1787 }
1788 }
1789
1790 #[rstest]
1791 fn test_parse_candle_vec() {
1792 let json_data = load_test_json("ws_candle.json");
1793 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1794 let data_val: serde_json::Value = match event {
1795 OKXWebSocketEvent::Data { data, .. } => data,
1796 _ => panic!("Expected Data"),
1797 };
1798
1799 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1800 let bars_vec = parse_candle_msg_vec(
1801 data_val,
1802 &instrument_id,
1803 2,
1804 1,
1805 BAR_SPEC_1_DAY_LAST,
1806 UnixNanos::default(),
1807 )
1808 .unwrap();
1809
1810 assert_eq!(bars_vec.len(), 1);
1811
1812 if let Data::Bar(b) = &bars_vec[0] {
1813 assert_eq!(b.open, Price::from("8533.02"));
1814 } else {
1815 panic!("Expected Bar");
1816 }
1817 }
1818
1819 #[rstest]
1820 fn test_parse_book_message() {
1821 let json_data = load_test_json("ws_bbo_tbt.json");
1822 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1823 let (okx_books, arg): (Vec<OKXBookMsg>, OKXWebSocketArg) = match msg {
1824 OKXWebSocketEvent::Data { data, arg, .. } => {
1825 (serde_json::from_value(data).unwrap(), arg)
1826 }
1827 _ => panic!("Expected a `Data` variant"),
1828 };
1829
1830 assert_eq!(arg.channel, OKXWsChannel::BboTbt);
1831 assert_eq!(arg.inst_id.as_ref().unwrap(), &Ustr::from("BTC-USDT"));
1832 assert_eq!(arg.inst_type, None);
1833 assert_eq!(okx_books.len(), 1);
1834
1835 let book_msg = &okx_books[0];
1836
1837 assert_eq!(book_msg.asks.len(), 1);
1839 let ask = &book_msg.asks[0];
1840 assert_eq!(ask.price, "8476.98");
1841 assert_eq!(ask.size, "415");
1842 assert_eq!(ask.liquidated_orders_count, "0");
1843 assert_eq!(ask.orders_count, "13");
1844
1845 assert_eq!(book_msg.bids.len(), 1);
1847 let bid = &book_msg.bids[0];
1848 assert_eq!(bid.price, "8476.97");
1849 assert_eq!(bid.size, "256");
1850 assert_eq!(bid.liquidated_orders_count, "0");
1851 assert_eq!(bid.orders_count, "12");
1852 assert_eq!(book_msg.ts, 1597026383085);
1853 assert_eq!(book_msg.seq_id, 123456);
1854 assert_eq!(book_msg.checksum, None);
1855 assert_eq!(book_msg.prev_seq_id, None);
1856 }
1857
1858 #[rstest]
1859 fn test_parse_ws_account_message() {
1860 let json_data = load_test_json("ws_account.json");
1861 let accounts: Vec<OKXAccount> = serde_json::from_str(&json_data).unwrap();
1862
1863 assert_eq!(accounts.len(), 1);
1864 let account = &accounts[0];
1865
1866 assert_eq!(account.total_eq, "100.56089404807182");
1867 assert_eq!(account.details.len(), 3);
1868
1869 let usdt_detail = &account.details[0];
1870 assert_eq!(usdt_detail.ccy, "USDT");
1871 assert_eq!(usdt_detail.avail_bal, "100.52768569797846");
1872 assert_eq!(usdt_detail.cash_bal, "100.52768569797846");
1873
1874 let btc_detail = &account.details[1];
1875 assert_eq!(btc_detail.ccy, "BTC");
1876 assert_eq!(btc_detail.avail_bal, "0.0000000051");
1877
1878 let eth_detail = &account.details[2];
1879 assert_eq!(eth_detail.ccy, "ETH");
1880 assert_eq!(eth_detail.avail_bal, "0.000000185");
1881
1882 let account_id = AccountId::new("OKX-001");
1883 let ts_init = nautilus_core::nanos::UnixNanos::default();
1884 let account_state = parse_account_state(account, account_id, ts_init);
1885
1886 assert!(account_state.is_ok());
1887 let state = account_state.unwrap();
1888 assert_eq!(state.account_id, account_id);
1889 assert_eq!(state.balances.len(), 3);
1890 }
1891
1892 #[rstest]
1893 fn test_parse_order_msg() {
1894 let json_data = load_test_json("ws_orders.json");
1895 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1896
1897 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1898
1899 let account_id = AccountId::new("OKX-001");
1900 let mut instruments = AHashMap::new();
1901
1902 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1904 let instrument = CryptoPerpetual::new(
1905 instrument_id,
1906 Symbol::from("BTC-USDT-SWAP"),
1907 Currency::BTC(),
1908 Currency::USDT(),
1909 Currency::USDT(),
1910 false, 2, 8, Price::from("0.01"),
1914 Quantity::from("0.00000001"),
1915 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1928 UnixNanos::default(),
1929 );
1930
1931 instruments.insert(
1932 Ustr::from("BTC-USDT-SWAP"),
1933 InstrumentAny::CryptoPerpetual(instrument),
1934 );
1935
1936 let ts_init = UnixNanos::default();
1937 let fee_cache = AHashMap::new();
1938 let filled_qty_cache = AHashMap::new();
1939
1940 let result = parse_order_msg_vec(
1941 data,
1942 account_id,
1943 &instruments,
1944 &fee_cache,
1945 &filled_qty_cache,
1946 ts_init,
1947 );
1948
1949 assert!(result.is_ok());
1950 let order_reports = result.unwrap();
1951 assert_eq!(order_reports.len(), 1);
1952
1953 let report = &order_reports[0];
1955
1956 if let ExecutionReport::Fill(fill_report) = report {
1957 assert_eq!(fill_report.account_id, account_id);
1958 assert_eq!(fill_report.instrument_id, instrument_id);
1959 assert_eq!(
1960 fill_report.client_order_id,
1961 Some(ClientOrderId::new("001BTCUSDT20250106001"))
1962 );
1963 assert_eq!(
1964 fill_report.venue_order_id,
1965 VenueOrderId::new("2497956918703120384")
1966 );
1967 assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1968 assert_eq!(fill_report.order_side, OrderSide::Buy);
1969 assert_eq!(fill_report.last_px, Price::from("103698.90"));
1970 assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1971 assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1972 } else {
1973 panic!("Expected Fill report for filled order");
1974 }
1975 }
1976
1977 #[rstest]
1978 fn test_parse_order_status_report() {
1979 let json_data = load_test_json("ws_orders.json");
1980 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1981 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1982 let order_msg = &data[0];
1983
1984 let account_id = AccountId::new("OKX-001");
1985 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1986 let instrument = CryptoPerpetual::new(
1987 instrument_id,
1988 Symbol::from("BTC-USDT-SWAP"),
1989 Currency::BTC(),
1990 Currency::USDT(),
1991 Currency::USDT(),
1992 false, 2, 8, Price::from("0.01"),
1996 Quantity::from("0.00000001"),
1997 None,
1998 None,
1999 None,
2000 None,
2001 None,
2002 None,
2003 None,
2004 None,
2005 None,
2006 None,
2007 None,
2008 None,
2009 UnixNanos::default(),
2010 UnixNanos::default(),
2011 );
2012
2013 let ts_init = UnixNanos::default();
2014
2015 let result = parse_order_status_report(
2016 order_msg,
2017 &InstrumentAny::CryptoPerpetual(instrument),
2018 account_id,
2019 ts_init,
2020 );
2021
2022 assert!(result.is_ok());
2023 let order_status_report = result.unwrap();
2024
2025 assert_eq!(order_status_report.account_id, account_id);
2026 assert_eq!(order_status_report.instrument_id, instrument_id);
2027 assert_eq!(
2028 order_status_report.client_order_id,
2029 Some(ClientOrderId::new("001BTCUSDT20250106001"))
2030 );
2031 assert_eq!(
2032 order_status_report.venue_order_id,
2033 VenueOrderId::new("2497956918703120384")
2034 );
2035 assert_eq!(order_status_report.order_side, OrderSide::Buy);
2036 assert_eq!(order_status_report.order_status, OrderStatus::Filled);
2037 assert_eq!(order_status_report.quantity, Quantity::from("0.03000000"));
2038 assert_eq!(order_status_report.filled_qty, Quantity::from("0.03000000"));
2039 }
2040
2041 #[rstest]
2042 fn test_parse_fill_report() {
2043 let json_data = load_test_json("ws_orders.json");
2044 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2045 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2046 let order_msg = &data[0];
2047
2048 let account_id = AccountId::new("OKX-001");
2049 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2050 let instrument = CryptoPerpetual::new(
2051 instrument_id,
2052 Symbol::from("BTC-USDT-SWAP"),
2053 Currency::BTC(),
2054 Currency::USDT(),
2055 Currency::USDT(),
2056 false, 2, 8, Price::from("0.01"),
2060 Quantity::from("0.00000001"),
2061 None,
2062 None,
2063 None,
2064 None,
2065 None,
2066 None,
2067 None,
2068 None,
2069 None,
2070 None,
2071 None,
2072 None,
2073 UnixNanos::default(),
2074 UnixNanos::default(),
2075 );
2076
2077 let ts_init = UnixNanos::default();
2078
2079 let result = parse_fill_report(
2080 order_msg,
2081 &InstrumentAny::CryptoPerpetual(instrument),
2082 account_id,
2083 None,
2084 None,
2085 ts_init,
2086 );
2087
2088 assert!(result.is_ok());
2089 let fill_report = result.unwrap();
2090
2091 assert_eq!(fill_report.account_id, account_id);
2092 assert_eq!(fill_report.instrument_id, instrument_id);
2093 assert_eq!(
2094 fill_report.client_order_id,
2095 Some(ClientOrderId::new("001BTCUSDT20250106001"))
2096 );
2097 assert_eq!(
2098 fill_report.venue_order_id,
2099 VenueOrderId::new("2497956918703120384")
2100 );
2101 assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
2102 assert_eq!(fill_report.order_side, OrderSide::Buy);
2103 assert_eq!(fill_report.last_px, Price::from("103698.90"));
2104 assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
2105 assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
2106 }
2107
2108 #[rstest]
2109 fn test_parse_book10_msg() {
2110 let json_data = load_test_json("ws_books_snapshot.json");
2111 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
2112 let msgs: Vec<OKXBookMsg> = match event {
2113 OKXWebSocketEvent::BookData { data, .. } => data,
2114 _ => panic!("Expected BookData"),
2115 };
2116
2117 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2118 let depth10 =
2119 parse_book10_msg(&msgs[0], instrument_id, 2, 0, UnixNanos::default()).unwrap();
2120
2121 assert_eq!(depth10.instrument_id, instrument_id);
2122 assert_eq!(depth10.sequence, 123456);
2123 assert_eq!(depth10.ts_event, UnixNanos::from(1597026383085000000));
2124 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
2125
2126 assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
2128 assert_eq!(depth10.bids[0].size, Quantity::from("256"));
2129 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
2130 assert_eq!(depth10.bid_counts[0], 12);
2131
2132 assert_eq!(depth10.bids[1].price, Price::from("8475.55"));
2133 assert_eq!(depth10.bids[1].size, Quantity::from("101"));
2134 assert_eq!(depth10.bid_counts[1], 1);
2135
2136 assert_eq!(depth10.bids[8].price, Price::from("0"));
2138 assert_eq!(depth10.bids[8].size, Quantity::from("0"));
2139 assert_eq!(depth10.bid_counts[8], 0);
2140
2141 assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
2143 assert_eq!(depth10.asks[0].size, Quantity::from("415"));
2144 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
2145 assert_eq!(depth10.ask_counts[0], 13);
2146
2147 assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
2148 assert_eq!(depth10.asks[1].size, Quantity::from("7"));
2149 assert_eq!(depth10.ask_counts[1], 2);
2150
2151 assert_eq!(depth10.asks[8].price, Price::from("0"));
2153 assert_eq!(depth10.asks[8].size, Quantity::from("0"));
2154 assert_eq!(depth10.ask_counts[8], 0);
2155 }
2156
2157 #[rstest]
2158 fn test_parse_book10_msg_vec() {
2159 let json_data = load_test_json("ws_books_snapshot.json");
2160 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
2161 let msgs: Vec<OKXBookMsg> = match event {
2162 OKXWebSocketEvent::BookData { data, .. } => data,
2163 _ => panic!("Expected BookData"),
2164 };
2165
2166 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2167 let depth10_vec =
2168 parse_book10_msg_vec(msgs, &instrument_id, 2, 0, UnixNanos::default()).unwrap();
2169
2170 assert_eq!(depth10_vec.len(), 1);
2171
2172 if let Data::Depth10(d) = &depth10_vec[0] {
2173 assert_eq!(d.instrument_id, instrument_id);
2174 assert_eq!(d.sequence, 123456);
2175 assert_eq!(d.bids[0].price, Price::from("8476.97"));
2176 assert_eq!(d.asks[0].price, Price::from("8476.98"));
2177 } else {
2178 panic!("Expected Depth10");
2179 }
2180 }
2181
2182 #[rstest]
2183 fn test_parse_fill_report_with_fee_cache() {
2184 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2185 let instrument = CryptoPerpetual::new(
2186 instrument_id,
2187 Symbol::from("BTC-USDT-SWAP"),
2188 Currency::BTC(),
2189 Currency::USDT(),
2190 Currency::USDT(),
2191 false, 2, 8, Price::from("0.01"),
2195 Quantity::from("0.00000001"),
2196 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
2209 UnixNanos::default(),
2210 );
2211
2212 let account_id = AccountId::new("OKX-001");
2213 let ts_init = UnixNanos::default();
2214
2215 let order_msg_1 = OKXOrderMsg {
2217 acc_fill_sz: Some("0.01".to_string()),
2218 avg_px: "50000.0".to_string(),
2219 c_time: 1746947317401,
2220 cancel_source: None,
2221 cancel_source_reason: None,
2222 category: OKXOrderCategory::Normal,
2223 ccy: Ustr::from("USDT"),
2224 cl_ord_id: "test_order_1".to_string(),
2225 algo_cl_ord_id: None,
2226 fee: Some("-1.0".to_string()), fee_ccy: Ustr::from("USDT"),
2228 fill_px: "50000.0".to_string(),
2229 fill_sz: "0.01".to_string(),
2230 fill_time: 1746947317402,
2231 inst_id: Ustr::from("BTC-USDT-SWAP"),
2232 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2233 lever: "2.0".to_string(),
2234 ord_id: Ustr::from("1234567890"),
2235 ord_type: OKXOrderType::Market,
2236 pnl: "0".to_string(),
2237 pos_side: OKXPositionSide::Long,
2238 px: "".to_string(),
2239 reduce_only: "false".to_string(),
2240 side: crate::common::enums::OKXSide::Buy,
2241 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2242 exec_type: crate::common::enums::OKXExecType::Maker,
2243 sz: "0.03".to_string(), td_mode: OKXTradeMode::Isolated,
2245 tgt_ccy: None,
2246 trade_id: "trade_1".to_string(),
2247 u_time: 1746947317402,
2248 };
2249
2250 let fill_report_1 = parse_fill_report(
2251 &order_msg_1,
2252 &InstrumentAny::CryptoPerpetual(instrument),
2253 account_id,
2254 None,
2255 None,
2256 ts_init,
2257 )
2258 .unwrap();
2259
2260 assert_eq!(fill_report_1.commission, Money::new(1.0, Currency::USDT()));
2262
2263 let order_msg_2 = OKXOrderMsg {
2265 acc_fill_sz: Some("0.03".to_string()),
2266 avg_px: "50000.0".to_string(),
2267 c_time: 1746947317401,
2268 cancel_source: None,
2269 cancel_source_reason: None,
2270 category: OKXOrderCategory::Normal,
2271 ccy: Ustr::from("USDT"),
2272 cl_ord_id: "test_order_1".to_string(),
2273 algo_cl_ord_id: None,
2274 fee: Some("-3.0".to_string()), fee_ccy: Ustr::from("USDT"),
2276 fill_px: "50000.0".to_string(),
2277 fill_sz: "0.02".to_string(),
2278 fill_time: 1746947317403,
2279 inst_id: Ustr::from("BTC-USDT-SWAP"),
2280 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2281 lever: "2.0".to_string(),
2282 ord_id: Ustr::from("1234567890"),
2283 ord_type: OKXOrderType::Market,
2284 pnl: "0".to_string(),
2285 pos_side: OKXPositionSide::Long,
2286 px: "".to_string(),
2287 reduce_only: "false".to_string(),
2288 side: crate::common::enums::OKXSide::Buy,
2289 state: crate::common::enums::OKXOrderStatus::Filled,
2290 exec_type: crate::common::enums::OKXExecType::Maker,
2291 sz: "0.03".to_string(), td_mode: OKXTradeMode::Isolated,
2293 tgt_ccy: None,
2294 trade_id: "trade_2".to_string(),
2295 u_time: 1746947317403,
2296 };
2297
2298 let fill_report_2 = parse_fill_report(
2299 &order_msg_2,
2300 &InstrumentAny::CryptoPerpetual(instrument),
2301 account_id,
2302 Some(fill_report_1.commission),
2303 Some(fill_report_1.last_qty),
2304 ts_init,
2305 )
2306 .unwrap();
2307
2308 assert_eq!(fill_report_2.commission, Money::new(2.0, Currency::USDT()));
2310
2311 }
2313
2314 #[rstest]
2315 fn test_parse_fill_report_with_maker_rebates() {
2316 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2317 let instrument = CryptoPerpetual::new(
2318 instrument_id,
2319 Symbol::from("BTC-USDT-SWAP"),
2320 Currency::BTC(),
2321 Currency::USDT(),
2322 Currency::USDT(),
2323 false,
2324 2,
2325 8,
2326 Price::from("0.01"),
2327 Quantity::from("0.00000001"),
2328 None,
2329 None,
2330 None,
2331 None,
2332 None,
2333 None,
2334 None,
2335 None,
2336 None,
2337 None,
2338 None,
2339 None,
2340 UnixNanos::default(),
2341 UnixNanos::default(),
2342 );
2343
2344 let account_id = AccountId::new("OKX-001");
2345 let ts_init = UnixNanos::default();
2346
2347 let order_msg_1 = OKXOrderMsg {
2349 acc_fill_sz: Some("0.01".to_string()),
2350 avg_px: "50000.0".to_string(),
2351 c_time: 1746947317401,
2352 cancel_source: None,
2353 cancel_source_reason: None,
2354 category: OKXOrderCategory::Normal,
2355 ccy: Ustr::from("USDT"),
2356 cl_ord_id: "test_order_rebate".to_string(),
2357 algo_cl_ord_id: None,
2358 fee: Some("0.5".to_string()), fee_ccy: Ustr::from("USDT"),
2360 fill_px: "50000.0".to_string(),
2361 fill_sz: "0.01".to_string(),
2362 fill_time: 1746947317402,
2363 inst_id: Ustr::from("BTC-USDT-SWAP"),
2364 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2365 lever: "2.0".to_string(),
2366 ord_id: Ustr::from("rebate_order_123"),
2367 ord_type: OKXOrderType::Market,
2368 pnl: "0".to_string(),
2369 pos_side: OKXPositionSide::Long,
2370 px: "".to_string(),
2371 reduce_only: "false".to_string(),
2372 side: crate::common::enums::OKXSide::Buy,
2373 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2374 exec_type: crate::common::enums::OKXExecType::Maker,
2375 sz: "0.02".to_string(),
2376 td_mode: OKXTradeMode::Isolated,
2377 tgt_ccy: None,
2378 trade_id: "trade_rebate_1".to_string(),
2379 u_time: 1746947317402,
2380 };
2381
2382 let fill_report_1 = parse_fill_report(
2383 &order_msg_1,
2384 &InstrumentAny::CryptoPerpetual(instrument),
2385 account_id,
2386 None,
2387 None,
2388 ts_init,
2389 )
2390 .unwrap();
2391
2392 assert_eq!(fill_report_1.commission, Money::new(-0.5, Currency::USDT()));
2394
2395 let order_msg_2 = OKXOrderMsg {
2397 acc_fill_sz: Some("0.02".to_string()),
2398 avg_px: "50000.0".to_string(),
2399 c_time: 1746947317401,
2400 cancel_source: None,
2401 cancel_source_reason: None,
2402 category: OKXOrderCategory::Normal,
2403 ccy: Ustr::from("USDT"),
2404 cl_ord_id: "test_order_rebate".to_string(),
2405 algo_cl_ord_id: None,
2406 fee: Some("0.8".to_string()), fee_ccy: Ustr::from("USDT"),
2408 fill_px: "50000.0".to_string(),
2409 fill_sz: "0.01".to_string(),
2410 fill_time: 1746947317403,
2411 inst_id: Ustr::from("BTC-USDT-SWAP"),
2412 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2413 lever: "2.0".to_string(),
2414 ord_id: Ustr::from("rebate_order_123"),
2415 ord_type: OKXOrderType::Market,
2416 pnl: "0".to_string(),
2417 pos_side: OKXPositionSide::Long,
2418 px: "".to_string(),
2419 reduce_only: "false".to_string(),
2420 side: crate::common::enums::OKXSide::Buy,
2421 state: crate::common::enums::OKXOrderStatus::Filled,
2422 exec_type: crate::common::enums::OKXExecType::Maker,
2423 sz: "0.02".to_string(),
2424 td_mode: OKXTradeMode::Isolated,
2425 tgt_ccy: None,
2426 trade_id: "trade_rebate_2".to_string(),
2427 u_time: 1746947317403,
2428 };
2429
2430 let fill_report_2 = parse_fill_report(
2431 &order_msg_2,
2432 &InstrumentAny::CryptoPerpetual(instrument),
2433 account_id,
2434 Some(fill_report_1.commission),
2435 Some(fill_report_1.last_qty),
2436 ts_init,
2437 )
2438 .unwrap();
2439
2440 assert_eq!(fill_report_2.commission, Money::new(-0.3, Currency::USDT()));
2442 }
2443
2444 #[rstest]
2445 fn test_parse_fill_report_rebate_to_charge_transition() {
2446 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2447 let instrument = CryptoPerpetual::new(
2448 instrument_id,
2449 Symbol::from("BTC-USDT-SWAP"),
2450 Currency::BTC(),
2451 Currency::USDT(),
2452 Currency::USDT(),
2453 false,
2454 2,
2455 8,
2456 Price::from("0.01"),
2457 Quantity::from("0.00000001"),
2458 None,
2459 None,
2460 None,
2461 None,
2462 None,
2463 None,
2464 None,
2465 None,
2466 None,
2467 None,
2468 None,
2469 None,
2470 UnixNanos::default(),
2471 UnixNanos::default(),
2472 );
2473
2474 let account_id = AccountId::new("OKX-001");
2475 let ts_init = UnixNanos::default();
2476
2477 let order_msg_1 = OKXOrderMsg {
2479 acc_fill_sz: Some("0.01".to_string()),
2480 avg_px: "50000.0".to_string(),
2481 c_time: 1746947317401,
2482 cancel_source: None,
2483 cancel_source_reason: None,
2484 category: OKXOrderCategory::Normal,
2485 ccy: Ustr::from("USDT"),
2486 cl_ord_id: "test_order_transition".to_string(),
2487 algo_cl_ord_id: None,
2488 fee: Some("1.0".to_string()), fee_ccy: Ustr::from("USDT"),
2490 fill_px: "50000.0".to_string(),
2491 fill_sz: "0.01".to_string(),
2492 fill_time: 1746947317402,
2493 inst_id: Ustr::from("BTC-USDT-SWAP"),
2494 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2495 lever: "2.0".to_string(),
2496 ord_id: Ustr::from("transition_order_456"),
2497 ord_type: OKXOrderType::Market,
2498 pnl: "0".to_string(),
2499 pos_side: OKXPositionSide::Long,
2500 px: "".to_string(),
2501 reduce_only: "false".to_string(),
2502 side: crate::common::enums::OKXSide::Buy,
2503 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2504 exec_type: crate::common::enums::OKXExecType::Maker,
2505 sz: "0.02".to_string(),
2506 td_mode: OKXTradeMode::Isolated,
2507 tgt_ccy: None,
2508 trade_id: "trade_transition_1".to_string(),
2509 u_time: 1746947317402,
2510 };
2511
2512 let fill_report_1 = parse_fill_report(
2513 &order_msg_1,
2514 &InstrumentAny::CryptoPerpetual(instrument),
2515 account_id,
2516 None,
2517 None,
2518 ts_init,
2519 )
2520 .unwrap();
2521
2522 assert_eq!(fill_report_1.commission, Money::new(-1.0, Currency::USDT()));
2524
2525 let order_msg_2 = OKXOrderMsg {
2529 acc_fill_sz: Some("0.02".to_string()),
2530 avg_px: "50000.0".to_string(),
2531 c_time: 1746947317401,
2532 cancel_source: None,
2533 cancel_source_reason: None,
2534 category: OKXOrderCategory::Normal,
2535 ccy: Ustr::from("USDT"),
2536 cl_ord_id: "test_order_transition".to_string(),
2537 algo_cl_ord_id: None,
2538 fee: Some("-2.0".to_string()), fee_ccy: Ustr::from("USDT"),
2540 fill_px: "50000.0".to_string(),
2541 fill_sz: "0.01".to_string(),
2542 fill_time: 1746947317403,
2543 inst_id: Ustr::from("BTC-USDT-SWAP"),
2544 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2545 lever: "2.0".to_string(),
2546 ord_id: Ustr::from("transition_order_456"),
2547 ord_type: OKXOrderType::Market,
2548 pnl: "0".to_string(),
2549 pos_side: OKXPositionSide::Long,
2550 px: "".to_string(),
2551 reduce_only: "false".to_string(),
2552 side: crate::common::enums::OKXSide::Buy,
2553 state: crate::common::enums::OKXOrderStatus::Filled,
2554 exec_type: crate::common::enums::OKXExecType::Taker,
2555 sz: "0.02".to_string(),
2556 td_mode: OKXTradeMode::Isolated,
2557 tgt_ccy: None,
2558 trade_id: "trade_transition_2".to_string(),
2559 u_time: 1746947317403,
2560 };
2561
2562 let fill_report_2 = parse_fill_report(
2563 &order_msg_2,
2564 &InstrumentAny::CryptoPerpetual(instrument),
2565 account_id,
2566 Some(fill_report_1.commission),
2567 Some(fill_report_1.last_qty),
2568 ts_init,
2569 )
2570 .unwrap();
2571
2572 assert_eq!(fill_report_2.commission, Money::new(3.0, Currency::USDT()));
2575 }
2576
2577 #[rstest]
2578 fn test_parse_fill_report_negative_incremental() {
2579 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2580 let instrument = CryptoPerpetual::new(
2581 instrument_id,
2582 Symbol::from("BTC-USDT-SWAP"),
2583 Currency::BTC(),
2584 Currency::USDT(),
2585 Currency::USDT(),
2586 false,
2587 2,
2588 8,
2589 Price::from("0.01"),
2590 Quantity::from("0.00000001"),
2591 None,
2592 None,
2593 None,
2594 None,
2595 None,
2596 None,
2597 None,
2598 None,
2599 None,
2600 None,
2601 None,
2602 None,
2603 UnixNanos::default(),
2604 UnixNanos::default(),
2605 );
2606
2607 let account_id = AccountId::new("OKX-001");
2608 let ts_init = UnixNanos::default();
2609
2610 let order_msg_1 = OKXOrderMsg {
2612 acc_fill_sz: Some("0.01".to_string()),
2613 avg_px: "50000.0".to_string(),
2614 c_time: 1746947317401,
2615 cancel_source: None,
2616 cancel_source_reason: None,
2617 category: OKXOrderCategory::Normal,
2618 ccy: Ustr::from("USDT"),
2619 cl_ord_id: "test_order_neg_inc".to_string(),
2620 algo_cl_ord_id: None,
2621 fee: Some("-2.0".to_string()),
2622 fee_ccy: Ustr::from("USDT"),
2623 fill_px: "50000.0".to_string(),
2624 fill_sz: "0.01".to_string(),
2625 fill_time: 1746947317402,
2626 inst_id: Ustr::from("BTC-USDT-SWAP"),
2627 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2628 lever: "2.0".to_string(),
2629 ord_id: Ustr::from("neg_inc_order_789"),
2630 ord_type: OKXOrderType::Market,
2631 pnl: "0".to_string(),
2632 pos_side: OKXPositionSide::Long,
2633 px: "".to_string(),
2634 reduce_only: "false".to_string(),
2635 side: crate::common::enums::OKXSide::Buy,
2636 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2637 exec_type: crate::common::enums::OKXExecType::Taker,
2638 sz: "0.02".to_string(),
2639 td_mode: OKXTradeMode::Isolated,
2640 tgt_ccy: None,
2641 trade_id: "trade_neg_inc_1".to_string(),
2642 u_time: 1746947317402,
2643 };
2644
2645 let fill_report_1 = parse_fill_report(
2646 &order_msg_1,
2647 &InstrumentAny::CryptoPerpetual(instrument),
2648 account_id,
2649 None,
2650 None,
2651 ts_init,
2652 )
2653 .unwrap();
2654
2655 assert_eq!(fill_report_1.commission, Money::new(2.0, Currency::USDT()));
2656
2657 let order_msg_2 = OKXOrderMsg {
2660 acc_fill_sz: Some("0.02".to_string()),
2661 avg_px: "50000.0".to_string(),
2662 c_time: 1746947317401,
2663 cancel_source: None,
2664 cancel_source_reason: None,
2665 category: OKXOrderCategory::Normal,
2666 ccy: Ustr::from("USDT"),
2667 cl_ord_id: "test_order_neg_inc".to_string(),
2668 algo_cl_ord_id: None,
2669 fee: Some("-1.5".to_string()), fee_ccy: Ustr::from("USDT"),
2671 fill_px: "50000.0".to_string(),
2672 fill_sz: "0.01".to_string(),
2673 fill_time: 1746947317403,
2674 inst_id: Ustr::from("BTC-USDT-SWAP"),
2675 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2676 lever: "2.0".to_string(),
2677 ord_id: Ustr::from("neg_inc_order_789"),
2678 ord_type: OKXOrderType::Market,
2679 pnl: "0".to_string(),
2680 pos_side: OKXPositionSide::Long,
2681 px: "".to_string(),
2682 reduce_only: "false".to_string(),
2683 side: crate::common::enums::OKXSide::Buy,
2684 state: crate::common::enums::OKXOrderStatus::Filled,
2685 exec_type: crate::common::enums::OKXExecType::Maker,
2686 sz: "0.02".to_string(),
2687 td_mode: OKXTradeMode::Isolated,
2688 tgt_ccy: None,
2689 trade_id: "trade_neg_inc_2".to_string(),
2690 u_time: 1746947317403,
2691 };
2692
2693 let fill_report_2 = parse_fill_report(
2694 &order_msg_2,
2695 &InstrumentAny::CryptoPerpetual(instrument),
2696 account_id,
2697 Some(fill_report_1.commission),
2698 Some(fill_report_1.last_qty),
2699 ts_init,
2700 )
2701 .unwrap();
2702
2703 assert_eq!(fill_report_2.commission, Money::new(-0.5, Currency::USDT()));
2705 }
2706
2707 #[rstest]
2708 fn test_parse_fill_report_empty_fill_sz_first_fill() {
2709 let instrument = create_stub_instrument();
2710 let account_id = AccountId::new("OKX-001");
2711 let ts_init = UnixNanos::default();
2712
2713 let order_msg =
2714 create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_1");
2715
2716 let fill_report = parse_fill_report(
2717 &order_msg,
2718 &InstrumentAny::CryptoPerpetual(instrument),
2719 account_id,
2720 None,
2721 None,
2722 ts_init,
2723 )
2724 .unwrap();
2725
2726 assert_eq!(fill_report.last_qty, Quantity::from("0.01"));
2727 }
2728
2729 #[rstest]
2730 fn test_parse_fill_report_empty_fill_sz_subsequent_fills() {
2731 let instrument = create_stub_instrument();
2732 let account_id = AccountId::new("OKX-001");
2733 let ts_init = UnixNanos::default();
2734
2735 let order_msg_1 =
2736 create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "trade_1");
2737
2738 let fill_report_1 = parse_fill_report(
2739 &order_msg_1,
2740 &InstrumentAny::CryptoPerpetual(instrument),
2741 account_id,
2742 None,
2743 None,
2744 ts_init,
2745 )
2746 .unwrap();
2747
2748 assert_eq!(fill_report_1.last_qty, Quantity::from("0.01"));
2749
2750 let order_msg_2 =
2751 create_stub_order_msg("", Some("0.03".to_string()), "1234567890", "trade_2");
2752
2753 let fill_report_2 = parse_fill_report(
2754 &order_msg_2,
2755 &InstrumentAny::CryptoPerpetual(instrument),
2756 account_id,
2757 Some(fill_report_1.commission),
2758 Some(fill_report_1.last_qty),
2759 ts_init,
2760 )
2761 .unwrap();
2762
2763 assert_eq!(fill_report_2.last_qty, Quantity::from("0.02"));
2764 }
2765
2766 #[rstest]
2767 fn test_parse_fill_report_error_both_empty() {
2768 let instrument = create_stub_instrument();
2769 let account_id = AccountId::new("OKX-001");
2770 let ts_init = UnixNanos::default();
2771
2772 let order_msg = create_stub_order_msg("", Some("".to_string()), "1234567890", "trade_1");
2773
2774 let result = parse_fill_report(
2775 &order_msg,
2776 &InstrumentAny::CryptoPerpetual(instrument),
2777 account_id,
2778 None,
2779 None,
2780 ts_init,
2781 );
2782
2783 assert!(result.is_err());
2784 let err_msg = result.unwrap_err().to_string();
2785 assert!(err_msg.contains("Cannot determine fill quantity"));
2786 assert!(err_msg.contains("empty/zero"));
2787 }
2788
2789 #[rstest]
2790 fn test_parse_fill_report_error_acc_fill_sz_none() {
2791 let instrument = create_stub_instrument();
2792 let account_id = AccountId::new("OKX-001");
2793 let ts_init = UnixNanos::default();
2794
2795 let order_msg = create_stub_order_msg("", None, "1234567890", "trade_1");
2796
2797 let result = parse_fill_report(
2798 &order_msg,
2799 &InstrumentAny::CryptoPerpetual(instrument),
2800 account_id,
2801 None,
2802 None,
2803 ts_init,
2804 );
2805
2806 assert!(result.is_err());
2807 let err_msg = result.unwrap_err().to_string();
2808 assert!(err_msg.contains("Cannot determine fill quantity"));
2809 assert!(err_msg.contains("acc_fill_sz is None"));
2810 }
2811
2812 #[rstest]
2813 fn test_parse_order_msg_acc_fill_sz_only_update() {
2814 let instrument = create_stub_instrument();
2816 let account_id = AccountId::new("OKX-001");
2817 let ts_init = UnixNanos::default();
2818
2819 let mut instruments = AHashMap::new();
2820 instruments.insert(
2821 Ustr::from("BTC-USDT-SWAP"),
2822 InstrumentAny::CryptoPerpetual(instrument),
2823 );
2824
2825 let fee_cache = AHashMap::new();
2826 let mut filled_qty_cache = AHashMap::new();
2827
2828 let msg_1 = create_stub_order_msg("", Some("0.01".to_string()), "1234567890", "");
2830
2831 let report_1 = parse_order_msg(
2832 &msg_1,
2833 account_id,
2834 &instruments,
2835 &fee_cache,
2836 &filled_qty_cache,
2837 ts_init,
2838 )
2839 .unwrap();
2840
2841 assert!(matches!(report_1, ExecutionReport::Fill(_)));
2843 if let ExecutionReport::Fill(fill) = &report_1 {
2844 assert_eq!(fill.last_qty, Quantity::from("0.01"));
2845 }
2846
2847 filled_qty_cache.insert(Ustr::from("1234567890"), Quantity::from("0.01"));
2849
2850 let msg_2 = create_stub_order_msg("", Some("0.03".to_string()), "1234567890", "");
2852
2853 let report_2 = parse_order_msg(
2854 &msg_2,
2855 account_id,
2856 &instruments,
2857 &fee_cache,
2858 &filled_qty_cache,
2859 ts_init,
2860 )
2861 .unwrap();
2862
2863 assert!(matches!(report_2, ExecutionReport::Fill(_)));
2865 if let ExecutionReport::Fill(fill) = &report_2 {
2866 assert_eq!(fill.last_qty, Quantity::from("0.02"));
2867 }
2868 }
2869
2870 #[rstest]
2871 fn test_parse_book10_msg_partial_levels() {
2872 let book_msg = OKXBookMsg {
2874 asks: vec![
2875 OrderBookEntry {
2876 price: "8476.98".to_string(),
2877 size: "415".to_string(),
2878 liquidated_orders_count: "0".to_string(),
2879 orders_count: "13".to_string(),
2880 },
2881 OrderBookEntry {
2882 price: "8477.00".to_string(),
2883 size: "7".to_string(),
2884 liquidated_orders_count: "0".to_string(),
2885 orders_count: "2".to_string(),
2886 },
2887 ],
2888 bids: vec![OrderBookEntry {
2889 price: "8476.97".to_string(),
2890 size: "256".to_string(),
2891 liquidated_orders_count: "0".to_string(),
2892 orders_count: "12".to_string(),
2893 }],
2894 ts: 1597026383085,
2895 checksum: None,
2896 prev_seq_id: None,
2897 seq_id: 123456,
2898 };
2899
2900 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2901 let depth10 =
2902 parse_book10_msg(&book_msg, instrument_id, 2, 0, UnixNanos::default()).unwrap();
2903
2904 assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
2906 assert_eq!(depth10.bids[0].size, Quantity::from("256"));
2907 assert_eq!(depth10.bid_counts[0], 12);
2908
2909 assert_eq!(depth10.bids[1].price, Price::from("0"));
2911 assert_eq!(depth10.bids[1].size, Quantity::from("0"));
2912 assert_eq!(depth10.bid_counts[1], 0);
2913
2914 assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
2916 assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
2917 assert_eq!(depth10.asks[2].price, Price::from("0")); }
2919
2920 #[rstest]
2921 fn test_parse_algo_order_msg_stop_market() {
2922 let json_data = load_test_json("ws_orders_algo.json");
2923 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2924 let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2925
2926 let msg = &data[0];
2928 assert_eq!(msg.algo_id, "706620792746729472");
2929 assert_eq!(msg.algo_cl_ord_id, "STOP001BTCUSDT20250120");
2930 assert_eq!(msg.state, OKXOrderStatus::Live);
2931 assert_eq!(msg.ord_px, "-1"); let account_id = AccountId::new("OKX-001");
2934 let mut instruments = AHashMap::new();
2935
2936 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2938 let instrument = CryptoPerpetual::new(
2939 instrument_id,
2940 Symbol::from("BTC-USDT-SWAP"),
2941 Currency::BTC(),
2942 Currency::USDT(),
2943 Currency::USDT(),
2944 false, 2, 8, Price::from("0.01"),
2948 Quantity::from("0.00000001"),
2949 None,
2950 None,
2951 None,
2952 None,
2953 None,
2954 None,
2955 None,
2956 None,
2957 None,
2958 None,
2959 None,
2960 None,
2961 0.into(), 0.into(), );
2964 instruments.insert(
2965 Ustr::from("BTC-USDT-SWAP"),
2966 InstrumentAny::CryptoPerpetual(instrument),
2967 );
2968
2969 let result =
2970 parse_algo_order_msg(msg.clone(), account_id, &instruments, UnixNanos::default());
2971
2972 assert!(result.is_ok());
2973 let report = result.unwrap();
2974
2975 if let ExecutionReport::Order(status_report) = report {
2976 assert_eq!(status_report.order_type, OrderType::StopMarket);
2977 assert_eq!(status_report.order_side, OrderSide::Sell);
2978 assert_eq!(status_report.quantity, Quantity::from("0.01000000"));
2979 assert_eq!(status_report.trigger_price, Some(Price::from("95000.00")));
2980 assert_eq!(status_report.trigger_type, Some(TriggerType::LastPrice));
2981 assert_eq!(status_report.price, None); } else {
2983 panic!("Expected Order report");
2984 }
2985 }
2986
2987 #[rstest]
2988 fn test_parse_algo_order_msg_stop_limit() {
2989 let json_data = load_test_json("ws_orders_algo.json");
2990 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2991 let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2992
2993 let msg = &data[1];
2995 assert_eq!(msg.algo_id, "706620792746729473");
2996 assert_eq!(msg.state, OKXOrderStatus::Live);
2997 assert_eq!(msg.ord_px, "106000"); let account_id = AccountId::new("OKX-001");
3000 let mut instruments = AHashMap::new();
3001
3002 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3004 let instrument = CryptoPerpetual::new(
3005 instrument_id,
3006 Symbol::from("BTC-USDT-SWAP"),
3007 Currency::BTC(),
3008 Currency::USDT(),
3009 Currency::USDT(),
3010 false, 2, 8, Price::from("0.01"),
3014 Quantity::from("0.00000001"),
3015 None,
3016 None,
3017 None,
3018 None,
3019 None,
3020 None,
3021 None,
3022 None,
3023 None,
3024 None,
3025 None,
3026 None,
3027 0.into(), 0.into(), );
3030 instruments.insert(
3031 Ustr::from("BTC-USDT-SWAP"),
3032 InstrumentAny::CryptoPerpetual(instrument),
3033 );
3034
3035 let result =
3036 parse_algo_order_msg(msg.clone(), account_id, &instruments, UnixNanos::default());
3037
3038 assert!(result.is_ok());
3039 let report = result.unwrap();
3040
3041 if let ExecutionReport::Order(status_report) = report {
3042 assert_eq!(status_report.order_type, OrderType::StopLimit);
3043 assert_eq!(status_report.order_side, OrderSide::Buy);
3044 assert_eq!(status_report.quantity, Quantity::from("0.02000000"));
3045 assert_eq!(status_report.trigger_price, Some(Price::from("105000.00")));
3046 assert_eq!(status_report.trigger_type, Some(TriggerType::MarkPrice));
3047 assert_eq!(status_report.price, Some(Price::from("106000.00"))); } else {
3049 panic!("Expected Order report");
3050 }
3051 }
3052
3053 #[rstest]
3054 fn test_parse_trigger_order_from_regular_channel() {
3055 let json_data = load_test_json("ws_orders_trigger.json");
3056 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3057 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3058
3059 let msg = &data[0];
3061 assert_eq!(msg.ord_type, OKXOrderType::Trigger);
3062 assert_eq!(msg.state, OKXOrderStatus::Filled);
3063
3064 let account_id = AccountId::new("OKX-001");
3065 let mut instruments = AHashMap::new();
3066
3067 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3069 let instrument = CryptoPerpetual::new(
3070 instrument_id,
3071 Symbol::from("BTC-USDT-SWAP"),
3072 Currency::BTC(),
3073 Currency::USDT(),
3074 Currency::USDT(),
3075 false, 2, 8, Price::from("0.01"),
3079 Quantity::from("0.00000001"),
3080 None,
3081 None,
3082 None,
3083 None,
3084 None,
3085 None,
3086 None,
3087 None,
3088 None,
3089 None,
3090 None,
3091 None,
3092 0.into(), 0.into(), );
3095 instruments.insert(
3096 Ustr::from("BTC-USDT-SWAP"),
3097 InstrumentAny::CryptoPerpetual(instrument),
3098 );
3099 let fee_cache = AHashMap::new();
3100 let filled_qty_cache = AHashMap::new();
3101
3102 let result = parse_order_msg_vec(
3103 vec![msg.clone()],
3104 account_id,
3105 &instruments,
3106 &fee_cache,
3107 &filled_qty_cache,
3108 UnixNanos::default(),
3109 );
3110
3111 assert!(result.is_ok());
3112 let reports = result.unwrap();
3113 assert_eq!(reports.len(), 1);
3114
3115 if let ExecutionReport::Fill(fill_report) = &reports[0] {
3116 assert_eq!(fill_report.order_side, OrderSide::Sell);
3117 assert_eq!(fill_report.last_qty, Quantity::from("0.01000000"));
3118 assert_eq!(fill_report.last_px, Price::from("101950.00"));
3119 } else {
3120 panic!("Expected Fill report for filled trigger order");
3121 }
3122 }
3123
3124 #[rstest]
3125 fn test_parse_liquidation_order() {
3126 let json_data = load_test_json("ws_orders_liquidation.json");
3127 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3128 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3129
3130 let msg = &data[0];
3132 assert_eq!(msg.category, OKXOrderCategory::FullLiquidation);
3133 assert_eq!(msg.state, OKXOrderStatus::Filled);
3134 assert_eq!(msg.inst_id.as_str(), "BTC-USDT-SWAP");
3135
3136 let account_id = AccountId::new("OKX-001");
3137 let mut instruments = AHashMap::new();
3138
3139 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3141 let instrument = CryptoPerpetual::new(
3142 instrument_id,
3143 Symbol::from("BTC-USDT-SWAP"),
3144 Currency::BTC(),
3145 Currency::USDT(),
3146 Currency::USDT(),
3147 false, 2, 8, Price::from("0.01"),
3151 Quantity::from("0.00000001"),
3152 None,
3153 None,
3154 None,
3155 None,
3156 None,
3157 None,
3158 None,
3159 None,
3160 None,
3161 None,
3162 None,
3163 None,
3164 0.into(), 0.into(), );
3167 instruments.insert(
3168 Ustr::from("BTC-USDT-SWAP"),
3169 InstrumentAny::CryptoPerpetual(instrument),
3170 );
3171 let fee_cache = AHashMap::new();
3172 let filled_qty_cache = AHashMap::new();
3173
3174 let result = parse_order_msg_vec(
3175 vec![msg.clone()],
3176 account_id,
3177 &instruments,
3178 &fee_cache,
3179 &filled_qty_cache,
3180 UnixNanos::default(),
3181 );
3182
3183 assert!(result.is_ok());
3184 let reports = result.unwrap();
3185 assert_eq!(reports.len(), 1);
3186
3187 if let ExecutionReport::Fill(fill_report) = &reports[0] {
3189 assert_eq!(fill_report.order_side, OrderSide::Sell);
3190 assert_eq!(fill_report.last_qty, Quantity::from("0.50000000"));
3191 assert_eq!(fill_report.last_px, Price::from("40000.00"));
3192 assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
3193 } else {
3194 panic!("Expected Fill report for liquidation order");
3195 }
3196 }
3197
3198 #[rstest]
3199 fn test_parse_adl_order() {
3200 let json_data = load_test_json("ws_orders_adl.json");
3201 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
3202 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
3203
3204 let msg = &data[0];
3206 assert_eq!(msg.category, OKXOrderCategory::Adl);
3207 assert_eq!(msg.state, OKXOrderStatus::Filled);
3208 assert_eq!(msg.inst_id.as_str(), "ETH-USDT-SWAP");
3209
3210 let account_id = AccountId::new("OKX-001");
3211 let mut instruments = AHashMap::new();
3212
3213 let instrument_id = InstrumentId::from("ETH-USDT-SWAP.OKX");
3215 let instrument = CryptoPerpetual::new(
3216 instrument_id,
3217 Symbol::from("ETH-USDT-SWAP"),
3218 Currency::ETH(),
3219 Currency::USDT(),
3220 Currency::USDT(),
3221 false, 2, 8, Price::from("0.01"),
3225 Quantity::from("0.00000001"),
3226 None,
3227 None,
3228 None,
3229 None,
3230 None,
3231 None,
3232 None,
3233 None,
3234 None,
3235 None,
3236 None,
3237 None,
3238 0.into(), 0.into(), );
3241 instruments.insert(
3242 Ustr::from("ETH-USDT-SWAP"),
3243 InstrumentAny::CryptoPerpetual(instrument),
3244 );
3245 let fee_cache = AHashMap::new();
3246 let filled_qty_cache = AHashMap::new();
3247
3248 let result = parse_order_msg_vec(
3249 vec![msg.clone()],
3250 account_id,
3251 &instruments,
3252 &fee_cache,
3253 &filled_qty_cache,
3254 UnixNanos::default(),
3255 );
3256
3257 assert!(result.is_ok());
3258 let reports = result.unwrap();
3259 assert_eq!(reports.len(), 1);
3260
3261 if let ExecutionReport::Fill(fill_report) = &reports[0] {
3263 assert_eq!(fill_report.order_side, OrderSide::Buy);
3264 assert_eq!(fill_report.last_qty, Quantity::from("0.30000000"));
3265 assert_eq!(fill_report.last_px, Price::from("41000.00"));
3266 assert_eq!(fill_report.liquidity_side, LiquiditySide::Taker);
3267 } else {
3268 panic!("Expected Fill report for ADL order");
3269 }
3270 }
3271
3272 #[rstest]
3273 fn test_parse_unknown_category_graceful_fallback() {
3274 let json_with_unknown_category = r#"{
3276 "category": "some_future_category_we_dont_know"
3277 }"#;
3278
3279 let result: Result<serde_json::Value, _> = serde_json::from_str(json_with_unknown_category);
3280 assert!(result.is_ok());
3281
3282 let category_result: Result<OKXOrderCategory, _> =
3284 serde_json::from_str(r#""some_future_category""#);
3285 assert!(category_result.is_ok());
3286 assert_eq!(category_result.unwrap(), OKXOrderCategory::Other);
3287
3288 let normal: OKXOrderCategory = serde_json::from_str(r#""normal""#).unwrap();
3290 assert_eq!(normal, OKXOrderCategory::Normal);
3291
3292 let twap: OKXOrderCategory = serde_json::from_str(r#""twap""#).unwrap();
3293 assert_eq!(twap, OKXOrderCategory::Twap);
3294 }
3295
3296 #[rstest]
3297 fn test_parse_partial_liquidation_order() {
3298 let account_id = AccountId::new("OKX-001");
3300 let mut instruments = AHashMap::new();
3301
3302 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
3303 let instrument = CryptoPerpetual::new(
3304 instrument_id,
3305 Symbol::from("BTC-USDT-SWAP"),
3306 Currency::BTC(),
3307 Currency::USDT(),
3308 Currency::USDT(),
3309 false,
3310 2,
3311 8,
3312 Price::from("0.01"),
3313 Quantity::from("0.00000001"),
3314 None,
3315 None,
3316 None,
3317 None,
3318 None,
3319 None,
3320 None,
3321 None,
3322 None,
3323 None,
3324 None,
3325 None,
3326 0.into(),
3327 0.into(),
3328 );
3329 instruments.insert(
3330 Ustr::from("BTC-USDT-SWAP"),
3331 InstrumentAny::CryptoPerpetual(instrument),
3332 );
3333
3334 let partial_liq_msg = OKXOrderMsg {
3335 acc_fill_sz: Some("0.25".to_string()),
3336 avg_px: "39000.0".to_string(),
3337 c_time: 1746947317401,
3338 cancel_source: None,
3339 cancel_source_reason: None,
3340 category: OKXOrderCategory::PartialLiquidation,
3341 ccy: Ustr::from("USDT"),
3342 cl_ord_id: "".to_string(),
3343 algo_cl_ord_id: None,
3344 fee: Some("-9.75".to_string()),
3345 fee_ccy: Ustr::from("USDT"),
3346 fill_px: "39000.0".to_string(),
3347 fill_sz: "0.25".to_string(),
3348 fill_time: 1746947317402,
3349 inst_id: Ustr::from("BTC-USDT-SWAP"),
3350 inst_type: OKXInstrumentType::Swap,
3351 lever: "10.0".to_string(),
3352 ord_id: Ustr::from("2497956918703120888"),
3353 ord_type: OKXOrderType::Market,
3354 pnl: "-2500".to_string(),
3355 pos_side: OKXPositionSide::Long,
3356 px: "".to_string(),
3357 reduce_only: "false".to_string(),
3358 side: OKXSide::Sell,
3359 state: OKXOrderStatus::Filled,
3360 exec_type: OKXExecType::Taker,
3361 sz: "0.25".to_string(),
3362 td_mode: OKXTradeMode::Isolated,
3363 tgt_ccy: None,
3364 trade_id: "1518905888".to_string(),
3365 u_time: 1746947317402,
3366 };
3367
3368 let fee_cache = AHashMap::new();
3369 let filled_qty_cache = AHashMap::new();
3370 let result = parse_order_msg(
3371 &partial_liq_msg,
3372 account_id,
3373 &instruments,
3374 &fee_cache,
3375 &filled_qty_cache,
3376 UnixNanos::default(),
3377 );
3378
3379 assert!(result.is_ok());
3380 let report = result.unwrap();
3381
3382 if let ExecutionReport::Fill(fill_report) = report {
3384 assert_eq!(fill_report.order_side, OrderSide::Sell);
3385 assert_eq!(fill_report.last_qty, Quantity::from("0.25000000"));
3386 assert_eq!(fill_report.last_px, Price::from("39000.00"));
3387 } else {
3388 panic!("Expected Fill report for partial liquidation order");
3389 }
3390 }
3391
3392 #[rstest]
3393 fn test_websocket_instrument_update_preserves_cached_fees() {
3394 use nautilus_model::{identifiers::InstrumentId, instruments::InstrumentAny};
3395
3396 use crate::common::{models::OKXInstrument, parse::parse_instrument_any};
3397
3398 let ts_init = UnixNanos::default();
3399
3400 let initial_fees = (
3402 Some(Decimal::new(8, 4)), Some(Decimal::new(10, 4)), );
3405
3406 let initial_inst_json = serde_json::json!({
3408 "instType": "SPOT",
3409 "instId": "BTC-USD",
3410 "baseCcy": "BTC",
3411 "quoteCcy": "USD",
3412 "settleCcy": "",
3413 "ctVal": "",
3414 "ctMult": "",
3415 "ctValCcy": "",
3416 "optType": "",
3417 "stk": "",
3418 "listTime": "1733454000000",
3419 "expTime": "",
3420 "lever": "",
3421 "tickSz": "0.1",
3422 "lotSz": "0.00000001",
3423 "minSz": "0.00001",
3424 "ctType": "linear",
3425 "alias": "",
3426 "state": "live",
3427 "maxLmtSz": "9999999999",
3428 "maxMktSz": "1000000",
3429 "maxTwapSz": "9999999999.0000000000000000",
3430 "maxIcebergSz": "9999999999.0000000000000000",
3431 "maxTriggerSz": "9999999999.0000000000000000",
3432 "maxStopSz": "1000000",
3433 "uly": "",
3434 "instFamily": "",
3435 "ruleType": "normal",
3436 "maxLmtAmt": "20000000",
3437 "maxMktAmt": "1000000"
3438 });
3439
3440 let initial_inst: OKXInstrument = serde_json::from_value(initial_inst_json)
3441 .expect("Failed to deserialize initial instrument");
3442
3443 let parsed_initial = parse_instrument_any(
3445 &initial_inst,
3446 None,
3447 None,
3448 initial_fees.0,
3449 initial_fees.1,
3450 ts_init,
3451 )
3452 .expect("Failed to parse initial instrument")
3453 .expect("Initial instrument should not be None");
3454
3455 if let InstrumentAny::CurrencyPair(ref pair) = parsed_initial {
3457 assert_eq!(pair.maker_fee, Decimal::new(8, 4));
3458 assert_eq!(pair.taker_fee, Decimal::new(10, 4));
3459 } else {
3460 panic!("Expected CurrencyPair instrument");
3461 }
3462
3463 let mut instruments_cache = AHashMap::new();
3465 instruments_cache.insert(Ustr::from("BTC-USD"), parsed_initial);
3466
3467 let ws_update = serde_json::json!({
3469 "instType": "SPOT",
3470 "instId": "BTC-USD",
3471 "baseCcy": "BTC",
3472 "quoteCcy": "USD",
3473 "settleCcy": "",
3474 "ctVal": "",
3475 "ctMult": "",
3476 "ctValCcy": "",
3477 "optType": "",
3478 "stk": "",
3479 "listTime": "1733454000000",
3480 "expTime": "",
3481 "lever": "",
3482 "tickSz": "0.1",
3483 "lotSz": "0.00000001",
3484 "minSz": "0.00001",
3485 "ctType": "linear",
3486 "alias": "",
3487 "state": "live",
3488 "maxLmtSz": "9999999999",
3489 "maxMktSz": "1000000",
3490 "maxTwapSz": "9999999999.0000000000000000",
3491 "maxIcebergSz": "9999999999.0000000000000000",
3492 "maxTriggerSz": "9999999999.0000000000000000",
3493 "maxStopSz": "1000000",
3494 "uly": "",
3495 "instFamily": "",
3496 "ruleType": "normal",
3497 "maxLmtAmt": "20000000",
3498 "maxMktAmt": "1000000"
3499 });
3500
3501 let instrument_id = InstrumentId::from("BTC-USD.OKX");
3502 let mut funding_cache = AHashMap::new();
3503
3504 let result = parse_ws_message_data(
3506 &OKXWsChannel::Instruments,
3507 ws_update,
3508 &instrument_id,
3509 2,
3510 8,
3511 ts_init,
3512 &mut funding_cache,
3513 &instruments_cache,
3514 )
3515 .expect("Failed to parse WebSocket instrument update");
3516
3517 if let Some(NautilusWsMessage::Instrument(boxed_inst)) = result {
3519 if let InstrumentAny::CurrencyPair(pair) = *boxed_inst {
3520 assert_eq!(
3521 pair.maker_fee,
3522 Decimal::new(8, 4),
3523 "Maker fee should be preserved from cache"
3524 );
3525 assert_eq!(
3526 pair.taker_fee,
3527 Decimal::new(10, 4),
3528 "Taker fee should be preserved from cache"
3529 );
3530 } else {
3531 panic!("Expected CurrencyPair instrument from WebSocket update");
3532 }
3533 } else {
3534 panic!("Expected Instrument message from WebSocket update");
3535 }
3536 }
3537}