1use ahash::AHashMap;
19use nautilus_core::{UUID4, nanos::UnixNanos};
20use nautilus_model::{
21 data::{
22 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
23 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API, OrderBookDepth10,
24 QuoteTick, TradeTick, depth::DEPTH10_LEN,
25 },
26 enums::{
27 AggregationSource, AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus,
28 OrderType, RecordFlag, TimeInForce, TriggerType,
29 },
30 identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
31 instruments::{Instrument, InstrumentAny},
32 reports::{FillReport, OrderStatusReport},
33 types::{Currency, Money, Price, Quantity},
34};
35use ustr::Ustr;
36
37use super::{
38 enums::OKXWsChannel,
39 messages::{
40 OKXAlgoOrderMsg, OKXBookMsg, OKXCandleMsg, OKXIndexPriceMsg, OKXMarkPriceMsg, OKXOrderMsg,
41 OKXTickerMsg, OKXTradeMsg, OrderBookEntry,
42 },
43};
44use crate::{
45 common::{
46 consts::{OKX_POST_ONLY_CANCEL_REASON, OKX_POST_ONLY_CANCEL_SOURCE},
47 enums::{OKXBookAction, OKXCandleConfirm, OKXOrderStatus, OKXOrderType, OKXTriggerType},
48 models::OKXInstrument,
49 parse::{
50 okx_channel_to_bar_spec, parse_client_order_id, parse_fee, parse_funding_rate_msg,
51 parse_instrument_any, parse_message_vec, parse_millisecond_timestamp, parse_price,
52 parse_quantity,
53 },
54 },
55 websocket::messages::{ExecutionReport, NautilusWsMessage, OKXFundingRateMsg},
56};
57
58pub fn parse_book_msg_vec(
64 data: Vec<OKXBookMsg>,
65 instrument_id: &InstrumentId,
66 price_precision: u8,
67 size_precision: u8,
68 action: OKXBookAction,
69 ts_init: UnixNanos,
70) -> anyhow::Result<Vec<Data>> {
71 let mut deltas = Vec::with_capacity(data.len());
72
73 for msg in data {
74 let deltas_api = OrderBookDeltas_API::new(parse_book_msg(
75 &msg,
76 *instrument_id,
77 price_precision,
78 size_precision,
79 &action,
80 ts_init,
81 )?);
82 deltas.push(Data::Deltas(deltas_api));
83 }
84
85 Ok(deltas)
86}
87
88pub fn parse_ticker_msg_vec(
94 data: serde_json::Value,
95 instrument_id: &InstrumentId,
96 price_precision: u8,
97 size_precision: u8,
98 ts_init: UnixNanos,
99) -> anyhow::Result<Vec<Data>> {
100 parse_message_vec(
101 data,
102 |msg| {
103 parse_ticker_msg(
104 msg,
105 *instrument_id,
106 price_precision,
107 size_precision,
108 ts_init,
109 )
110 },
111 Data::Quote,
112 )
113}
114
115pub fn parse_quote_msg_vec(
121 data: serde_json::Value,
122 instrument_id: &InstrumentId,
123 price_precision: u8,
124 size_precision: u8,
125 ts_init: UnixNanos,
126) -> anyhow::Result<Vec<Data>> {
127 parse_message_vec(
128 data,
129 |msg| {
130 parse_quote_msg(
131 msg,
132 *instrument_id,
133 price_precision,
134 size_precision,
135 ts_init,
136 )
137 },
138 Data::Quote,
139 )
140}
141
142pub fn parse_trade_msg_vec(
148 data: serde_json::Value,
149 instrument_id: &InstrumentId,
150 price_precision: u8,
151 size_precision: u8,
152 ts_init: UnixNanos,
153) -> anyhow::Result<Vec<Data>> {
154 parse_message_vec(
155 data,
156 |msg| {
157 parse_trade_msg(
158 msg,
159 *instrument_id,
160 price_precision,
161 size_precision,
162 ts_init,
163 )
164 },
165 Data::Trade,
166 )
167}
168
169pub fn parse_mark_price_msg_vec(
175 data: serde_json::Value,
176 instrument_id: &InstrumentId,
177 price_precision: u8,
178 ts_init: UnixNanos,
179) -> anyhow::Result<Vec<Data>> {
180 parse_message_vec(
181 data,
182 |msg| parse_mark_price_msg(msg, *instrument_id, price_precision, ts_init),
183 Data::MarkPriceUpdate,
184 )
185}
186
187pub fn parse_index_price_msg_vec(
193 data: serde_json::Value,
194 instrument_id: &InstrumentId,
195 price_precision: u8,
196 ts_init: UnixNanos,
197) -> anyhow::Result<Vec<Data>> {
198 parse_message_vec(
199 data,
200 |msg| parse_index_price_msg(msg, *instrument_id, price_precision, ts_init),
201 Data::IndexPriceUpdate,
202 )
203}
204
205pub fn parse_funding_rate_msg_vec(
212 data: serde_json::Value,
213 instrument_id: &InstrumentId,
214 ts_init: UnixNanos,
215 funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
216) -> anyhow::Result<Vec<FundingRateUpdate>> {
217 let msgs: Vec<OKXFundingRateMsg> = serde_json::from_value(data)?;
218
219 let mut result = Vec::with_capacity(msgs.len());
220 for msg in &msgs {
221 let cache_key = (msg.funding_rate, msg.funding_time);
222
223 if let Some(cached) = funding_cache.get(&msg.inst_id)
224 && *cached == cache_key
225 {
226 continue; }
228
229 funding_cache.insert(msg.inst_id, cache_key);
231 let funding_rate = parse_funding_rate_msg(msg, *instrument_id, ts_init)?;
232 result.push(funding_rate);
233 }
234
235 Ok(result)
236}
237
238pub fn parse_candle_msg_vec(
244 data: serde_json::Value,
245 instrument_id: &InstrumentId,
246 price_precision: u8,
247 size_precision: u8,
248 spec: BarSpecification,
249 ts_init: UnixNanos,
250) -> anyhow::Result<Vec<Data>> {
251 let msgs: Vec<OKXCandleMsg> = serde_json::from_value(data)?;
252 let bar_type = BarType::new(*instrument_id, spec, AggregationSource::External);
253 let mut bars = Vec::with_capacity(msgs.len());
254
255 for msg in msgs {
256 if msg.confirm == OKXCandleConfirm::Closed {
258 let bar = parse_candle_msg(&msg, bar_type, price_precision, size_precision, ts_init)?;
259 bars.push(Data::Bar(bar));
260 }
261 }
262
263 Ok(bars)
264}
265
266pub fn parse_book10_msg_vec(
272 data: Vec<OKXBookMsg>,
273 instrument_id: &InstrumentId,
274 price_precision: u8,
275 size_precision: u8,
276 ts_init: UnixNanos,
277) -> anyhow::Result<Vec<Data>> {
278 let mut depth10_updates = Vec::with_capacity(data.len());
279
280 for msg in data {
281 let depth10 = parse_book10_msg(
282 &msg,
283 *instrument_id,
284 price_precision,
285 size_precision,
286 ts_init,
287 )?;
288 depth10_updates.push(Data::Depth10(Box::new(depth10)));
289 }
290
291 Ok(depth10_updates)
292}
293
294pub fn parse_book_msg(
300 msg: &OKXBookMsg,
301 instrument_id: InstrumentId,
302 price_precision: u8,
303 size_precision: u8,
304 action: &OKXBookAction,
305 ts_init: UnixNanos,
306) -> anyhow::Result<OrderBookDeltas> {
307 let flags = if action == &OKXBookAction::Snapshot {
308 RecordFlag::F_SNAPSHOT as u8
309 } else {
310 0
311 };
312 let ts_event = parse_millisecond_timestamp(msg.ts);
313
314 let mut deltas = Vec::with_capacity(msg.asks.len() + msg.bids.len());
315
316 for bid in &msg.bids {
317 let book_action = match action {
318 OKXBookAction::Snapshot => BookAction::Add,
319 _ => match bid.size.as_str() {
320 "0" => BookAction::Delete,
321 _ => BookAction::Update,
322 },
323 };
324 let price = parse_price(&bid.price, price_precision)?;
325 let size = parse_quantity(&bid.size, size_precision)?;
326 let order_id = 0; let order = BookOrder::new(OrderSide::Buy, price, size, order_id);
328 let delta = OrderBookDelta::new(
329 instrument_id,
330 book_action,
331 order,
332 flags,
333 msg.seq_id,
334 ts_event,
335 ts_init,
336 );
337 deltas.push(delta)
338 }
339
340 for ask in &msg.asks {
341 let book_action = match action {
342 OKXBookAction::Snapshot => BookAction::Add,
343 _ => match ask.size.as_str() {
344 "0" => BookAction::Delete,
345 _ => BookAction::Update,
346 },
347 };
348 let price = parse_price(&ask.price, price_precision)?;
349 let size = parse_quantity(&ask.size, size_precision)?;
350 let order_id = 0; let order = BookOrder::new(OrderSide::Sell, price, size, order_id);
352 let delta = OrderBookDelta::new(
353 instrument_id,
354 book_action,
355 order,
356 flags,
357 msg.seq_id,
358 ts_event,
359 ts_init,
360 );
361 deltas.push(delta)
362 }
363
364 OrderBookDeltas::new_checked(instrument_id, deltas)
365}
366
367pub fn parse_quote_msg(
373 msg: &OKXBookMsg,
374 instrument_id: InstrumentId,
375 price_precision: u8,
376 size_precision: u8,
377 ts_init: UnixNanos,
378) -> anyhow::Result<QuoteTick> {
379 let best_bid: &OrderBookEntry = &msg.bids[0];
380 let best_ask: &OrderBookEntry = &msg.asks[0];
381
382 let bid_price = parse_price(&best_bid.price, price_precision)?;
383 let ask_price = parse_price(&best_ask.price, price_precision)?;
384 let bid_size = parse_quantity(&best_bid.size, size_precision)?;
385 let ask_size = parse_quantity(&best_ask.size, size_precision)?;
386 let ts_event = parse_millisecond_timestamp(msg.ts);
387
388 QuoteTick::new_checked(
389 instrument_id,
390 bid_price,
391 ask_price,
392 bid_size,
393 ask_size,
394 ts_event,
395 ts_init,
396 )
397}
398
399pub fn parse_book10_msg(
407 msg: &OKXBookMsg,
408 instrument_id: InstrumentId,
409 price_precision: u8,
410 size_precision: u8,
411 ts_init: UnixNanos,
412) -> anyhow::Result<OrderBookDepth10> {
413 let mut bids: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
415 let mut asks: [BookOrder; DEPTH10_LEN] = [BookOrder::default(); DEPTH10_LEN];
416 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
417 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
418
419 let bid_len = msg.bids.len().min(DEPTH10_LEN);
421 for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
422 let price = parse_price(&level.price, price_precision)?;
423 let size = parse_quantity(&level.size, size_precision)?;
424 let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
425
426 let bid_order = BookOrder::new(OrderSide::Buy, price, size, 0);
427 bids[i] = bid_order;
428 bid_counts[i] = orders_count;
429 }
430
431 for i in bid_len..DEPTH10_LEN {
433 bids[i] = BookOrder::new(
434 OrderSide::Buy,
435 Price::zero(price_precision),
436 Quantity::zero(size_precision),
437 0,
438 );
439 bid_counts[i] = 0;
440 }
441
442 let ask_len = msg.asks.len().min(DEPTH10_LEN);
444 for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
445 let price = parse_price(&level.price, price_precision)?;
446 let size = parse_quantity(&level.size, size_precision)?;
447 let orders_count = level.orders_count.parse::<u32>().unwrap_or(1);
448
449 let ask_order = BookOrder::new(OrderSide::Sell, price, size, 0);
450 asks[i] = ask_order;
451 ask_counts[i] = orders_count;
452 }
453
454 for i in ask_len..DEPTH10_LEN {
456 asks[i] = BookOrder::new(
457 OrderSide::Sell,
458 Price::zero(price_precision),
459 Quantity::zero(size_precision),
460 0,
461 );
462 ask_counts[i] = 0;
463 }
464
465 let ts_event = parse_millisecond_timestamp(msg.ts);
466
467 Ok(OrderBookDepth10::new(
468 instrument_id,
469 bids,
470 asks,
471 bid_counts,
472 ask_counts,
473 RecordFlag::F_SNAPSHOT as u8,
474 msg.seq_id, ts_event,
476 ts_init,
477 ))
478}
479
480pub fn parse_ticker_msg(
486 msg: &OKXTickerMsg,
487 instrument_id: InstrumentId,
488 price_precision: u8,
489 size_precision: u8,
490 ts_init: UnixNanos,
491) -> anyhow::Result<QuoteTick> {
492 let bid_price = parse_price(&msg.bid_px, price_precision)?;
493 let ask_price = parse_price(&msg.ask_px, price_precision)?;
494 let bid_size = parse_quantity(&msg.bid_sz, size_precision)?;
495 let ask_size = parse_quantity(&msg.ask_sz, size_precision)?;
496 let ts_event = parse_millisecond_timestamp(msg.ts);
497
498 QuoteTick::new_checked(
499 instrument_id,
500 bid_price,
501 ask_price,
502 bid_size,
503 ask_size,
504 ts_event,
505 ts_init,
506 )
507}
508
509pub fn parse_trade_msg(
515 msg: &OKXTradeMsg,
516 instrument_id: InstrumentId,
517 price_precision: u8,
518 size_precision: u8,
519 ts_init: UnixNanos,
520) -> anyhow::Result<TradeTick> {
521 let price = parse_price(&msg.px, price_precision)?;
522 let size = parse_quantity(&msg.sz, size_precision)?;
523 let aggressor_side: AggressorSide = msg.side.into();
524 let trade_id = TradeId::new(&msg.trade_id);
525 let ts_event = parse_millisecond_timestamp(msg.ts);
526
527 TradeTick::new_checked(
528 instrument_id,
529 price,
530 size,
531 aggressor_side,
532 trade_id,
533 ts_event,
534 ts_init,
535 )
536}
537
538pub fn parse_mark_price_msg(
544 msg: &OKXMarkPriceMsg,
545 instrument_id: InstrumentId,
546 price_precision: u8,
547 ts_init: UnixNanos,
548) -> anyhow::Result<MarkPriceUpdate> {
549 let price = parse_price(&msg.mark_px, price_precision)?;
550 let ts_event = parse_millisecond_timestamp(msg.ts);
551
552 Ok(MarkPriceUpdate::new(
553 instrument_id,
554 price,
555 ts_event,
556 ts_init,
557 ))
558}
559
560pub fn parse_index_price_msg(
566 msg: &OKXIndexPriceMsg,
567 instrument_id: InstrumentId,
568 price_precision: u8,
569 ts_init: UnixNanos,
570) -> anyhow::Result<IndexPriceUpdate> {
571 let price = parse_price(&msg.idx_px, price_precision)?;
572 let ts_event = parse_millisecond_timestamp(msg.ts);
573
574 Ok(IndexPriceUpdate::new(
575 instrument_id,
576 price,
577 ts_event,
578 ts_init,
579 ))
580}
581
582pub fn parse_candle_msg(
588 msg: &OKXCandleMsg,
589 bar_type: BarType,
590 price_precision: u8,
591 size_precision: u8,
592 ts_init: UnixNanos,
593) -> anyhow::Result<Bar> {
594 let open = parse_price(&msg.o, price_precision)?;
595 let high = parse_price(&msg.h, price_precision)?;
596 let low = parse_price(&msg.l, price_precision)?;
597 let close = parse_price(&msg.c, price_precision)?;
598 let volume = parse_quantity(&msg.vol, size_precision)?;
599 let ts_event = parse_millisecond_timestamp(msg.ts);
600
601 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
602}
603
604pub fn parse_order_msg_vec(
610 data: Vec<OKXOrderMsg>,
611 account_id: AccountId,
612 instruments: &AHashMap<Ustr, InstrumentAny>,
613 fee_cache: &AHashMap<Ustr, Money>,
614 ts_init: UnixNanos,
615) -> anyhow::Result<Vec<ExecutionReport>> {
616 let mut order_reports = Vec::with_capacity(data.len());
617
618 for msg in data {
619 match parse_order_msg(&msg, account_id, instruments, fee_cache, ts_init) {
620 Ok(report) => order_reports.push(report),
621 Err(e) => tracing::error!("Failed to parse execution report from message: {e}"),
622 }
623 }
624
625 Ok(order_reports)
626}
627
628pub fn parse_order_msg(
635 msg: &OKXOrderMsg,
636 account_id: AccountId,
637 instruments: &AHashMap<Ustr, InstrumentAny>,
638 fee_cache: &AHashMap<Ustr, Money>,
639 ts_init: UnixNanos,
640) -> anyhow::Result<ExecutionReport> {
641 let instrument = instruments
642 .get(&msg.inst_id)
643 .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
644
645 let previous_fee = fee_cache.get(&msg.ord_id).copied();
646
647 let has_new_fill = (!msg.fill_sz.is_empty() && msg.fill_sz != "0") || !msg.trade_id.is_empty();
650
651 match msg.state {
652 OKXOrderStatus::Filled | OKXOrderStatus::PartiallyFilled if has_new_fill => {
653 parse_fill_report(msg, instrument, account_id, previous_fee, ts_init)
654 .map(ExecutionReport::Fill)
655 }
656 _ => parse_order_status_report(msg, instrument, account_id, ts_init)
657 .map(ExecutionReport::Order),
658 }
659}
660
661pub fn parse_algo_order_msg(
668 msg: OKXAlgoOrderMsg,
669 account_id: AccountId,
670 instruments: &AHashMap<Ustr, InstrumentAny>,
671 ts_init: UnixNanos,
672) -> anyhow::Result<ExecutionReport> {
673 let inst = instruments
674 .get(&msg.inst_id)
675 .ok_or_else(|| anyhow::anyhow!("No instrument found for inst_id: {}", msg.inst_id))?;
676
677 parse_algo_order_status_report(&msg, inst, account_id, ts_init).map(ExecutionReport::Order)
679}
680
681pub fn parse_algo_order_status_report(
688 msg: &OKXAlgoOrderMsg,
689 instrument: &InstrumentAny,
690 account_id: AccountId,
691 ts_init: UnixNanos,
692) -> anyhow::Result<OrderStatusReport> {
693 let client_order_id = if msg.cl_ord_id.is_empty() {
695 parse_client_order_id(&msg.algo_cl_ord_id)
696 } else {
697 parse_client_order_id(&msg.cl_ord_id)
698 };
699
700 let venue_order_id = if msg.ord_id.is_empty() {
702 VenueOrderId::new(msg.algo_id.as_str())
703 } else {
704 VenueOrderId::new(msg.ord_id.as_str())
705 };
706
707 let order_side: OrderSide = msg.side.into();
708
709 let order_type = if msg.ord_px == "-1" {
711 OrderType::StopMarket
712 } else {
713 OrderType::StopLimit
714 };
715
716 let status: OrderStatus = msg.state.into();
717
718 let quantity = parse_quantity(msg.sz.as_str(), instrument.size_precision())?;
719
720 let filled_qty = if msg.actual_sz.is_empty() || msg.actual_sz == "0" {
722 Quantity::zero(instrument.size_precision())
723 } else {
724 parse_quantity(msg.actual_sz.as_str(), instrument.size_precision())?
725 };
726
727 let trigger_px = parse_price(msg.trigger_px.as_str(), instrument.price_precision())?;
728
729 let price = if msg.ord_px != "-1" {
731 Some(parse_price(
732 msg.ord_px.as_str(),
733 instrument.price_precision(),
734 )?)
735 } else {
736 None
737 };
738
739 let trigger_type = match msg.trigger_px_type {
740 OKXTriggerType::Last => TriggerType::LastPrice,
741 OKXTriggerType::Mark => TriggerType::MarkPrice,
742 OKXTriggerType::Index => TriggerType::IndexPrice,
743 OKXTriggerType::None => TriggerType::Default,
744 };
745
746 let mut report = OrderStatusReport::new(
747 account_id,
748 instrument.id(),
749 client_order_id,
750 venue_order_id,
751 order_side,
752 order_type,
753 TimeInForce::Gtc, status,
755 quantity,
756 filled_qty,
757 msg.c_time.into(), msg.u_time.into(), ts_init,
760 None, );
762
763 report.trigger_price = Some(trigger_px);
764 report.trigger_type = Some(trigger_type);
765
766 if let Some(limit_price) = price {
767 report.price = Some(limit_price);
768 }
769
770 Ok(report)
771}
772
773pub fn parse_order_status_report(
779 msg: &OKXOrderMsg,
780 instrument: &InstrumentAny,
781 account_id: AccountId,
782 ts_init: UnixNanos,
783) -> anyhow::Result<OrderStatusReport> {
784 let client_order_id = parse_client_order_id(&msg.cl_ord_id);
785 let venue_order_id = VenueOrderId::new(msg.ord_id);
786 let order_side: OrderSide = msg.side.into();
787
788 let okx_order_type = msg.ord_type;
789 let order_type = if okx_order_type == OKXOrderType::Trigger {
792 if msg.px.is_empty() || msg.px == "0" {
793 OrderType::StopMarket
794 } else {
795 OrderType::StopLimit
796 }
797 } else {
798 msg.ord_type.into()
799 };
800 let order_status: OrderStatus = msg.state.into();
801
802 let time_in_force = match okx_order_type {
803 OKXOrderType::Fok => TimeInForce::Fok,
804 OKXOrderType::Ioc | OKXOrderType::OptimalLimitIoc => TimeInForce::Ioc,
805 _ => TimeInForce::Gtc,
806 };
807
808 let size_precision = instrument.size_precision();
809 let quantity = parse_quantity(&msg.sz, size_precision)?;
810 let filled_qty = parse_quantity(&msg.acc_fill_sz.clone().unwrap_or_default(), size_precision)?;
811 let ts_accepted = parse_millisecond_timestamp(msg.c_time);
812 let ts_last = parse_millisecond_timestamp(msg.u_time);
813
814 let mut report = OrderStatusReport::new(
815 account_id,
816 instrument.id(),
817 client_order_id,
818 venue_order_id,
819 order_side,
820 order_type,
821 time_in_force,
822 order_status,
823 quantity,
824 filled_qty,
825 ts_accepted,
826 ts_init,
827 ts_last,
828 None, );
830
831 let price_precision = instrument.price_precision();
832
833 if okx_order_type == OKXOrderType::Trigger {
834 if !msg.px.is_empty()
837 && msg.px != "0"
838 && let Ok(price) = parse_price(&msg.px, price_precision)
839 {
840 report = report.with_price(price);
841 }
842 } else {
843 if !msg.px.is_empty()
845 && let Ok(price) = parse_price(&msg.px, price_precision)
846 {
847 report = report.with_price(price);
848 }
849 }
850
851 if !msg.avg_px.is_empty()
852 && let Ok(avg_px) = msg.avg_px.parse::<f64>()
853 {
854 report = report.with_avg_px(avg_px);
855 }
856
857 if matches!(
858 msg.ord_type,
859 OKXOrderType::PostOnly | OKXOrderType::MmpAndPostOnly
860 ) || matches!(
861 msg.cancel_source.as_deref(),
862 Some(source) if source == OKX_POST_ONLY_CANCEL_SOURCE
863 ) || matches!(
864 msg.cancel_source_reason.as_deref(),
865 Some(reason) if reason.contains("POST_ONLY")
866 ) {
867 report = report.with_post_only(true);
868 }
869
870 if msg.reduce_only == "true" {
871 report = report.with_reduce_only(true);
872 }
873
874 if let Some(reason) = msg
875 .cancel_source_reason
876 .as_ref()
877 .filter(|reason| !reason.is_empty())
878 {
879 report = report.with_cancel_reason(reason.clone());
880 } else if let Some(source) = msg
881 .cancel_source
882 .as_ref()
883 .filter(|source| !source.is_empty())
884 {
885 let reason = if source == OKX_POST_ONLY_CANCEL_SOURCE {
886 OKX_POST_ONLY_CANCEL_REASON.to_string()
887 } else {
888 format!("cancel_source={source}")
889 };
890 report = report.with_cancel_reason(reason);
891 }
892
893 Ok(report)
894}
895
896pub fn parse_fill_report(
902 msg: &OKXOrderMsg,
903 instrument: &InstrumentAny,
904 account_id: AccountId,
905 previous_fee: Option<Money>,
906 ts_init: UnixNanos,
907) -> anyhow::Result<FillReport> {
908 let client_order_id = parse_client_order_id(&msg.cl_ord_id);
909 let venue_order_id = VenueOrderId::new(msg.ord_id);
910
911 let trade_id = if msg.trade_id.is_empty() {
914 TradeId::from(UUID4::new().to_string().as_str())
915 } else {
916 TradeId::from(msg.trade_id.as_str())
917 };
918
919 let order_side: OrderSide = msg.side.into();
920
921 let price_precision = instrument.price_precision();
922 let size_precision = instrument.size_precision();
923
924 let price_str = if !msg.fill_px.is_empty() {
927 &msg.fill_px
928 } else if !msg.avg_px.is_empty() {
929 &msg.avg_px
930 } else {
931 &msg.px };
933 let last_px = parse_price(price_str, price_precision).map_err(|e| {
934 anyhow::anyhow!(
935 "Failed to parse price (fill_px='{}', avg_px='{}', px='{}'): {}",
936 msg.fill_px,
937 msg.avg_px,
938 msg.px,
939 e
940 )
941 })?;
942
943 let qty_str = if !msg.fill_sz.is_empty() && msg.fill_sz != "0" {
946 &msg.fill_sz
947 } else if let Some(ref acc_fill_sz) = msg.acc_fill_sz {
948 if !acc_fill_sz.is_empty() && acc_fill_sz != "0" {
949 acc_fill_sz
950 } else {
951 &msg.sz }
953 } else {
954 &msg.sz };
956 let last_qty = parse_quantity(qty_str, size_precision).map_err(|e| {
957 anyhow::anyhow!(
958 "Failed to parse quantity (fill_sz='{}', acc_fill_sz={:?}, sz='{}'): {}",
959 msg.fill_sz,
960 msg.acc_fill_sz,
961 msg.sz,
962 e
963 )
964 })?;
965
966 let fee_currency = Currency::from(&msg.fee_ccy);
967 let total_fee = parse_fee(msg.fee.as_deref(), fee_currency)
969 .map_err(|e| anyhow::anyhow!("Failed to parse fee={:?}: {}", msg.fee, e))?;
970
971 let commission = if let Some(previous_fee) = previous_fee {
973 let incremental = total_fee - previous_fee;
974
975 if incremental < Money::zero(fee_currency) {
976 tracing::debug!(
977 order_id = msg.ord_id.as_str(),
978 total_fee = %total_fee,
979 previous_fee = %previous_fee,
980 incremental = %incremental,
981 "Negative incremental fee detected - likely a maker rebate or fee refund"
982 );
983 }
984
985 if previous_fee >= Money::zero(fee_currency)
988 && total_fee > Money::zero(fee_currency)
989 && incremental > total_fee
990 {
991 tracing::error!(
992 order_id = msg.ord_id.as_str(),
993 total_fee = %total_fee,
994 previous_fee = %previous_fee,
995 incremental = %incremental,
996 "Incremental fee exceeds total fee - likely fee cache corruption, using total fee as fallback"
997 );
998 total_fee
999 } else {
1000 incremental
1001 }
1002 } else {
1003 total_fee
1004 };
1005
1006 let liquidity_side: LiquiditySide = msg.exec_type.into();
1007 let ts_event = parse_millisecond_timestamp(msg.fill_time);
1008
1009 let report = FillReport::new(
1010 account_id,
1011 instrument.id(),
1012 venue_order_id,
1013 trade_id,
1014 order_side,
1015 last_qty,
1016 last_px,
1017 commission,
1018 liquidity_side,
1019 client_order_id,
1020 None,
1021 ts_event,
1022 ts_init,
1023 None, );
1025
1026 Ok(report)
1027}
1028
1029pub fn parse_ws_message_data(
1042 channel: &OKXWsChannel,
1043 data: serde_json::Value,
1044 instrument_id: &InstrumentId,
1045 price_precision: u8,
1046 size_precision: u8,
1047 ts_init: UnixNanos,
1048 funding_cache: &mut AHashMap<Ustr, (Ustr, u64)>,
1049) -> anyhow::Result<Option<NautilusWsMessage>> {
1050 match channel {
1051 OKXWsChannel::Instruments => {
1052 if let Ok(msg) = serde_json::from_value::<OKXInstrument>(data) {
1053 match parse_instrument_any(&msg, ts_init)? {
1054 Some(inst_any) => Ok(Some(NautilusWsMessage::Instrument(Box::new(inst_any)))),
1055 None => {
1056 tracing::warn!("Empty instrument payload: {:?}", msg);
1057 Ok(None)
1058 }
1059 }
1060 } else {
1061 anyhow::bail!("Failed to deserialize instrument payload")
1062 }
1063 }
1064 OKXWsChannel::BboTbt => {
1065 let data_vec = parse_quote_msg_vec(
1066 data,
1067 instrument_id,
1068 price_precision,
1069 size_precision,
1070 ts_init,
1071 )?;
1072 Ok(Some(NautilusWsMessage::Data(data_vec)))
1073 }
1074 OKXWsChannel::Tickers => {
1075 let data_vec = parse_ticker_msg_vec(
1076 data,
1077 instrument_id,
1078 price_precision,
1079 size_precision,
1080 ts_init,
1081 )?;
1082 Ok(Some(NautilusWsMessage::Data(data_vec)))
1083 }
1084 OKXWsChannel::Trades => {
1085 let data_vec = parse_trade_msg_vec(
1086 data,
1087 instrument_id,
1088 price_precision,
1089 size_precision,
1090 ts_init,
1091 )?;
1092 Ok(Some(NautilusWsMessage::Data(data_vec)))
1093 }
1094 OKXWsChannel::MarkPrice => {
1095 let data_vec = parse_mark_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
1096 Ok(Some(NautilusWsMessage::Data(data_vec)))
1097 }
1098 OKXWsChannel::IndexTickers => {
1099 let data_vec =
1100 parse_index_price_msg_vec(data, instrument_id, price_precision, ts_init)?;
1101 Ok(Some(NautilusWsMessage::Data(data_vec)))
1102 }
1103 OKXWsChannel::FundingRate => {
1104 let data_vec = parse_funding_rate_msg_vec(data, instrument_id, ts_init, funding_cache)?;
1105 Ok(Some(NautilusWsMessage::FundingRates(data_vec)))
1106 }
1107 channel if okx_channel_to_bar_spec(channel).is_some() => {
1108 let bar_spec = okx_channel_to_bar_spec(channel).expect("bar_spec checked above");
1109 let data_vec = parse_candle_msg_vec(
1110 data,
1111 instrument_id,
1112 price_precision,
1113 size_precision,
1114 bar_spec,
1115 ts_init,
1116 )?;
1117 Ok(Some(NautilusWsMessage::Data(data_vec)))
1118 }
1119 OKXWsChannel::Books
1120 | OKXWsChannel::BooksTbt
1121 | OKXWsChannel::Books5
1122 | OKXWsChannel::Books50Tbt => {
1123 if let Ok(book_msgs) = serde_json::from_value::<Vec<OKXBookMsg>>(data) {
1124 let data_vec = parse_book10_msg_vec(
1125 book_msgs,
1126 instrument_id,
1127 price_precision,
1128 size_precision,
1129 ts_init,
1130 )?;
1131 Ok(Some(NautilusWsMessage::Data(data_vec)))
1132 } else {
1133 anyhow::bail!("Failed to deserialize Books channel data as Vec<OKXBookMsg>")
1134 }
1135 }
1136 _ => {
1137 tracing::warn!("Unsupported channel for message parsing: {channel:?}");
1138 Ok(None)
1139 }
1140 }
1141}
1142
1143#[cfg(test)]
1147mod tests {
1148 use ahash::AHashMap;
1149 use nautilus_core::nanos::UnixNanos;
1150 use nautilus_model::{
1151 data::bar::BAR_SPEC_1_DAY_LAST,
1152 identifiers::{ClientOrderId, Symbol},
1153 instruments::CryptoPerpetual,
1154 };
1155 use rstest::rstest;
1156 use rust_decimal::Decimal;
1157 use ustr::Ustr;
1158
1159 use super::*;
1160 use crate::{
1161 OKXPositionSide,
1162 common::{enums::OKXTradeMode, parse::parse_account_state, testing::load_test_json},
1163 http::models::OKXAccount,
1164 websocket::messages::{OKXWebSocketArg, OKXWebSocketEvent},
1165 };
1166
1167 #[rstest]
1168 fn test_parse_books_snapshot() {
1169 let json_data = load_test_json("ws_books_snapshot.json");
1170 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1171 let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
1172 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1173 _ => panic!("Expected a `BookData` variant"),
1174 };
1175
1176 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1177 let deltas = parse_book_msg(
1178 &okx_books[0],
1179 instrument_id,
1180 2,
1181 1,
1182 &action,
1183 UnixNanos::default(),
1184 )
1185 .unwrap();
1186
1187 assert_eq!(deltas.instrument_id, instrument_id);
1188 assert_eq!(deltas.deltas.len(), 16);
1189 assert_eq!(deltas.flags, 32);
1190 assert_eq!(deltas.sequence, 123456);
1191 assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
1192 assert_eq!(deltas.ts_init, UnixNanos::default());
1193
1194 assert!(!deltas.deltas.is_empty());
1196 let bid_deltas: Vec<_> = deltas
1198 .deltas
1199 .iter()
1200 .filter(|d| d.order.side == OrderSide::Buy)
1201 .collect();
1202 let ask_deltas: Vec<_> = deltas
1203 .deltas
1204 .iter()
1205 .filter(|d| d.order.side == OrderSide::Sell)
1206 .collect();
1207 assert!(!bid_deltas.is_empty());
1208 assert!(!ask_deltas.is_empty());
1209 }
1210
1211 #[rstest]
1212 fn test_parse_books_update() {
1213 let json_data = load_test_json("ws_books_update.json");
1214 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1215 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1216 let (okx_books, action): (Vec<OKXBookMsg>, OKXBookAction) = match msg {
1217 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1218 _ => panic!("Expected a `BookData` variant"),
1219 };
1220
1221 let deltas = parse_book_msg(
1222 &okx_books[0],
1223 instrument_id,
1224 2,
1225 1,
1226 &action,
1227 UnixNanos::default(),
1228 )
1229 .unwrap();
1230
1231 assert_eq!(deltas.instrument_id, instrument_id);
1232 assert_eq!(deltas.deltas.len(), 16);
1233 assert_eq!(deltas.flags, 0);
1234 assert_eq!(deltas.sequence, 123457);
1235 assert_eq!(deltas.ts_event, UnixNanos::from(1597026383085000000));
1236 assert_eq!(deltas.ts_init, UnixNanos::default());
1237
1238 assert!(!deltas.deltas.is_empty());
1240 let bid_deltas: Vec<_> = deltas
1242 .deltas
1243 .iter()
1244 .filter(|d| d.order.side == OrderSide::Buy)
1245 .collect();
1246 let ask_deltas: Vec<_> = deltas
1247 .deltas
1248 .iter()
1249 .filter(|d| d.order.side == OrderSide::Sell)
1250 .collect();
1251 assert!(!bid_deltas.is_empty());
1252 assert!(!ask_deltas.is_empty());
1253 }
1254
1255 #[rstest]
1256 fn test_parse_tickers() {
1257 let json_data = load_test_json("ws_tickers.json");
1258 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1259 let okx_tickers: Vec<OKXTickerMsg> = match msg {
1260 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1261 _ => panic!("Expected a `Data` variant"),
1262 };
1263
1264 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1265 let trade =
1266 parse_ticker_msg(&okx_tickers[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
1267
1268 assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1269 assert_eq!(trade.bid_price, Price::from("8888.88"));
1270 assert_eq!(trade.ask_price, Price::from("9999.99"));
1271 assert_eq!(trade.bid_size, Quantity::from(5));
1272 assert_eq!(trade.ask_size, Quantity::from(11));
1273 assert_eq!(trade.ts_event, UnixNanos::from(1597026383085000000));
1274 assert_eq!(trade.ts_init, UnixNanos::default());
1275 }
1276
1277 #[rstest]
1278 fn test_parse_quotes() {
1279 let json_data = load_test_json("ws_bbo_tbt.json");
1280 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1281 let okx_quotes: Vec<OKXBookMsg> = match msg {
1282 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1283 _ => panic!("Expected a `Data` variant"),
1284 };
1285 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1286
1287 let quote =
1288 parse_quote_msg(&okx_quotes[0], instrument_id, 2, 1, UnixNanos::default()).unwrap();
1289
1290 assert_eq!(quote.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1291 assert_eq!(quote.bid_price, Price::from("8476.97"));
1292 assert_eq!(quote.ask_price, Price::from("8476.98"));
1293 assert_eq!(quote.bid_size, Quantity::from(256));
1294 assert_eq!(quote.ask_size, Quantity::from(415));
1295 assert_eq!(quote.ts_event, UnixNanos::from(1597026383085000000));
1296 assert_eq!(quote.ts_init, UnixNanos::default());
1297 }
1298
1299 #[rstest]
1300 fn test_parse_trades() {
1301 let json_data = load_test_json("ws_trades.json");
1302 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1303 let okx_trades: Vec<OKXTradeMsg> = match msg {
1304 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1305 _ => panic!("Expected a `Data` variant"),
1306 };
1307
1308 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1309 let trade =
1310 parse_trade_msg(&okx_trades[0], instrument_id, 1, 8, UnixNanos::default()).unwrap();
1311
1312 assert_eq!(trade.instrument_id, InstrumentId::from("BTC-USDT.OKX"));
1313 assert_eq!(trade.price, Price::from("42219.9"));
1314 assert_eq!(trade.size, Quantity::from("0.12060306"));
1315 assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
1316 assert_eq!(trade.trade_id, TradeId::from("130639474"));
1317 assert_eq!(trade.ts_event, UnixNanos::from(1630048897897000000));
1318 assert_eq!(trade.ts_init, UnixNanos::default());
1319 }
1320
1321 #[rstest]
1322 fn test_parse_candle() {
1323 let json_data = load_test_json("ws_candle.json");
1324 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1325 let okx_candles: Vec<OKXCandleMsg> = match msg {
1326 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1327 _ => panic!("Expected a `Data` variant"),
1328 };
1329
1330 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1331 let bar_type = BarType::new(
1332 instrument_id,
1333 BAR_SPEC_1_DAY_LAST,
1334 AggregationSource::External,
1335 );
1336 let bar = parse_candle_msg(&okx_candles[0], bar_type, 2, 0, UnixNanos::default()).unwrap();
1337
1338 assert_eq!(bar.bar_type, bar_type);
1339 assert_eq!(bar.open, Price::from("8533.02"));
1340 assert_eq!(bar.high, Price::from("8553.74"));
1341 assert_eq!(bar.low, Price::from("8527.17"));
1342 assert_eq!(bar.close, Price::from("8548.26"));
1343 assert_eq!(bar.volume, Quantity::from(45247));
1344 assert_eq!(bar.ts_event, UnixNanos::from(1597026383085000000));
1345 assert_eq!(bar.ts_init, UnixNanos::default());
1346 }
1347
1348 #[rstest]
1349 fn test_parse_funding_rate() {
1350 let json_data = load_test_json("ws_funding_rate.json");
1351 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1352
1353 let okx_funding_rates: Vec<crate::websocket::messages::OKXFundingRateMsg> = match msg {
1354 OKXWebSocketEvent::Data { data, .. } => serde_json::from_value(data).unwrap(),
1355 _ => panic!("Expected a `Data` variant"),
1356 };
1357
1358 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1359 let funding_rate =
1360 parse_funding_rate_msg(&okx_funding_rates[0], instrument_id, UnixNanos::default())
1361 .unwrap();
1362
1363 assert_eq!(funding_rate.instrument_id, instrument_id);
1364 assert_eq!(funding_rate.rate, Decimal::new(1, 4));
1365 assert_eq!(
1366 funding_rate.next_funding_ns,
1367 Some(UnixNanos::from(1744590349506000000))
1368 );
1369 assert_eq!(funding_rate.ts_event, UnixNanos::from(1744590349506000000));
1370 assert_eq!(funding_rate.ts_init, UnixNanos::default());
1371 }
1372
1373 #[rstest]
1374 fn test_parse_book_vec() {
1375 let json_data = load_test_json("ws_books_snapshot.json");
1376 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1377 let (msgs, action): (Vec<OKXBookMsg>, OKXBookAction) = match event {
1378 OKXWebSocketEvent::BookData { data, action, .. } => (data, action),
1379 _ => panic!("Expected BookData"),
1380 };
1381
1382 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1383 let deltas_vec =
1384 parse_book_msg_vec(msgs, &instrument_id, 8, 1, action, UnixNanos::default()).unwrap();
1385
1386 assert_eq!(deltas_vec.len(), 1);
1387
1388 if let Data::Deltas(d) = &deltas_vec[0] {
1389 assert_eq!(d.sequence, 123456);
1390 } else {
1391 panic!("Expected Deltas");
1392 }
1393 }
1394
1395 #[rstest]
1396 fn test_parse_ticker_vec() {
1397 let json_data = load_test_json("ws_tickers.json");
1398 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1399 let data_val: serde_json::Value = match event {
1400 OKXWebSocketEvent::Data { data, .. } => data,
1401 _ => panic!("Expected Data"),
1402 };
1403
1404 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1405 let quotes_vec =
1406 parse_ticker_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1407
1408 assert_eq!(quotes_vec.len(), 1);
1409
1410 if let Data::Quote(q) = "es_vec[0] {
1411 assert_eq!(q.bid_price, Price::from("8888.88000000"));
1412 assert_eq!(q.ask_price, Price::from("9999.99"));
1413 } else {
1414 panic!("Expected Quote");
1415 }
1416 }
1417
1418 #[rstest]
1419 fn test_parse_trade_vec() {
1420 let json_data = load_test_json("ws_trades.json");
1421 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1422 let data_val: serde_json::Value = match event {
1423 OKXWebSocketEvent::Data { data, .. } => data,
1424 _ => panic!("Expected Data"),
1425 };
1426
1427 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1428 let trades_vec =
1429 parse_trade_msg_vec(data_val, &instrument_id, 8, 1, UnixNanos::default()).unwrap();
1430
1431 assert_eq!(trades_vec.len(), 1);
1432
1433 if let Data::Trade(t) = &trades_vec[0] {
1434 assert_eq!(t.trade_id, TradeId::new("130639474"));
1435 } else {
1436 panic!("Expected Trade");
1437 }
1438 }
1439
1440 #[rstest]
1441 fn test_parse_candle_vec() {
1442 let json_data = load_test_json("ws_candle.json");
1443 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1444 let data_val: serde_json::Value = match event {
1445 OKXWebSocketEvent::Data { data, .. } => data,
1446 _ => panic!("Expected Data"),
1447 };
1448
1449 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1450 let bars_vec = parse_candle_msg_vec(
1451 data_val,
1452 &instrument_id,
1453 2,
1454 1,
1455 BAR_SPEC_1_DAY_LAST,
1456 UnixNanos::default(),
1457 )
1458 .unwrap();
1459
1460 assert_eq!(bars_vec.len(), 1);
1461
1462 if let Data::Bar(b) = &bars_vec[0] {
1463 assert_eq!(b.open, Price::from("8533.02"));
1464 } else {
1465 panic!("Expected Bar");
1466 }
1467 }
1468
1469 #[rstest]
1470 fn test_parse_book_message() {
1471 let json_data = load_test_json("ws_bbo_tbt.json");
1472 let msg: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1473 let (okx_books, arg): (Vec<OKXBookMsg>, OKXWebSocketArg) = match msg {
1474 OKXWebSocketEvent::Data { data, arg, .. } => {
1475 (serde_json::from_value(data).unwrap(), arg)
1476 }
1477 _ => panic!("Expected a `Data` variant"),
1478 };
1479
1480 assert_eq!(arg.channel, OKXWsChannel::BboTbt);
1481 assert_eq!(arg.inst_id.as_ref().unwrap(), &Ustr::from("BTC-USDT"));
1482 assert_eq!(arg.inst_type, None);
1483 assert_eq!(okx_books.len(), 1);
1484
1485 let book_msg = &okx_books[0];
1486
1487 assert_eq!(book_msg.asks.len(), 1);
1489 let ask = &book_msg.asks[0];
1490 assert_eq!(ask.price, "8476.98");
1491 assert_eq!(ask.size, "415");
1492 assert_eq!(ask.liquidated_orders_count, "0");
1493 assert_eq!(ask.orders_count, "13");
1494
1495 assert_eq!(book_msg.bids.len(), 1);
1497 let bid = &book_msg.bids[0];
1498 assert_eq!(bid.price, "8476.97");
1499 assert_eq!(bid.size, "256");
1500 assert_eq!(bid.liquidated_orders_count, "0");
1501 assert_eq!(bid.orders_count, "12");
1502 assert_eq!(book_msg.ts, 1597026383085);
1503 assert_eq!(book_msg.seq_id, 123456);
1504 assert_eq!(book_msg.checksum, None);
1505 assert_eq!(book_msg.prev_seq_id, None);
1506 }
1507
1508 #[rstest]
1509 fn test_parse_ws_account_message() {
1510 let json_data = load_test_json("ws_account.json");
1511 let accounts: Vec<OKXAccount> = serde_json::from_str(&json_data).unwrap();
1512
1513 assert_eq!(accounts.len(), 1);
1514 let account = &accounts[0];
1515
1516 assert_eq!(account.total_eq, "100.56089404807182");
1517 assert_eq!(account.details.len(), 3);
1518
1519 let usdt_detail = &account.details[0];
1520 assert_eq!(usdt_detail.ccy, "USDT");
1521 assert_eq!(usdt_detail.avail_bal, "100.52768569797846");
1522 assert_eq!(usdt_detail.cash_bal, "100.52768569797846");
1523
1524 let btc_detail = &account.details[1];
1525 assert_eq!(btc_detail.ccy, "BTC");
1526 assert_eq!(btc_detail.avail_bal, "0.0000000051");
1527
1528 let eth_detail = &account.details[2];
1529 assert_eq!(eth_detail.ccy, "ETH");
1530 assert_eq!(eth_detail.avail_bal, "0.000000185");
1531
1532 let account_id = AccountId::new("OKX-001");
1533 let ts_init = nautilus_core::nanos::UnixNanos::default();
1534 let account_state = parse_account_state(account, account_id, ts_init);
1535
1536 assert!(account_state.is_ok());
1537 let state = account_state.unwrap();
1538 assert_eq!(state.account_id, account_id);
1539 assert_eq!(state.balances.len(), 3);
1540 }
1541
1542 #[rstest]
1543 fn test_parse_order_msg() {
1544 let json_data = load_test_json("ws_orders.json");
1545 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1546
1547 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1548
1549 let account_id = AccountId::new("OKX-001");
1550 let mut instruments = AHashMap::new();
1551
1552 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1554 let instrument = CryptoPerpetual::new(
1555 instrument_id,
1556 Symbol::from("BTC-USDT-SWAP"),
1557 Currency::BTC(),
1558 Currency::USDT(),
1559 Currency::USDT(),
1560 false, 2, 8, Price::from("0.01"),
1564 Quantity::from("0.00000001"),
1565 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1578 UnixNanos::default(),
1579 );
1580
1581 instruments.insert(
1582 Ustr::from("BTC-USDT-SWAP"),
1583 InstrumentAny::CryptoPerpetual(instrument),
1584 );
1585
1586 let ts_init = UnixNanos::default();
1587 let fee_cache = AHashMap::new();
1588
1589 let result = parse_order_msg_vec(data, account_id, &instruments, &fee_cache, ts_init);
1590
1591 assert!(result.is_ok());
1592 let order_reports = result.unwrap();
1593 assert_eq!(order_reports.len(), 1);
1594
1595 let report = &order_reports[0];
1597
1598 if let ExecutionReport::Fill(fill_report) = report {
1599 assert_eq!(fill_report.account_id, account_id);
1600 assert_eq!(fill_report.instrument_id, instrument_id);
1601 assert_eq!(
1602 fill_report.client_order_id,
1603 Some(ClientOrderId::new("001BTCUSDT20250106001"))
1604 );
1605 assert_eq!(
1606 fill_report.venue_order_id,
1607 VenueOrderId::new("2497956918703120384")
1608 );
1609 assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1610 assert_eq!(fill_report.order_side, OrderSide::Buy);
1611 assert_eq!(fill_report.last_px, Price::from("103698.90"));
1612 assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1613 assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1614 } else {
1615 panic!("Expected Fill report for filled order");
1616 }
1617 }
1618
1619 #[rstest]
1620 fn test_parse_order_status_report() {
1621 let json_data = load_test_json("ws_orders.json");
1622 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1623 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1624 let order_msg = &data[0];
1625
1626 let account_id = AccountId::new("OKX-001");
1627 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1628 let instrument = CryptoPerpetual::new(
1629 instrument_id,
1630 Symbol::from("BTC-USDT-SWAP"),
1631 Currency::BTC(),
1632 Currency::USDT(),
1633 Currency::USDT(),
1634 false, 2, 8, Price::from("0.01"),
1638 Quantity::from("0.00000001"),
1639 None,
1640 None,
1641 None,
1642 None,
1643 None,
1644 None,
1645 None,
1646 None,
1647 None,
1648 None,
1649 None,
1650 None,
1651 UnixNanos::default(),
1652 UnixNanos::default(),
1653 );
1654
1655 let ts_init = UnixNanos::default();
1656
1657 let result = parse_order_status_report(
1658 order_msg,
1659 &InstrumentAny::CryptoPerpetual(instrument),
1660 account_id,
1661 ts_init,
1662 );
1663
1664 assert!(result.is_ok());
1665 let order_status_report = result.unwrap();
1666
1667 assert_eq!(order_status_report.account_id, account_id);
1668 assert_eq!(order_status_report.instrument_id, instrument_id);
1669 assert_eq!(
1670 order_status_report.client_order_id,
1671 Some(ClientOrderId::new("001BTCUSDT20250106001"))
1672 );
1673 assert_eq!(
1674 order_status_report.venue_order_id,
1675 VenueOrderId::new("2497956918703120384")
1676 );
1677 assert_eq!(order_status_report.order_side, OrderSide::Buy);
1678 assert_eq!(order_status_report.order_status, OrderStatus::Filled);
1679 assert_eq!(order_status_report.quantity, Quantity::from("0.03000000"));
1680 assert_eq!(order_status_report.filled_qty, Quantity::from("0.03000000"));
1681 }
1682
1683 #[rstest]
1684 fn test_parse_fill_report() {
1685 let json_data = load_test_json("ws_orders.json");
1686 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
1687 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
1688 let order_msg = &data[0];
1689
1690 let account_id = AccountId::new("OKX-001");
1691 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1692 let instrument = CryptoPerpetual::new(
1693 instrument_id,
1694 Symbol::from("BTC-USDT-SWAP"),
1695 Currency::BTC(),
1696 Currency::USDT(),
1697 Currency::USDT(),
1698 false, 2, 8, Price::from("0.01"),
1702 Quantity::from("0.00000001"),
1703 None,
1704 None,
1705 None,
1706 None,
1707 None,
1708 None,
1709 None,
1710 None,
1711 None,
1712 None,
1713 None,
1714 None,
1715 UnixNanos::default(),
1716 UnixNanos::default(),
1717 );
1718
1719 let ts_init = UnixNanos::default();
1720
1721 let result = parse_fill_report(
1722 order_msg,
1723 &InstrumentAny::CryptoPerpetual(instrument),
1724 account_id,
1725 None,
1726 ts_init,
1727 );
1728
1729 assert!(result.is_ok());
1730 let fill_report = result.unwrap();
1731
1732 assert_eq!(fill_report.account_id, account_id);
1733 assert_eq!(fill_report.instrument_id, instrument_id);
1734 assert_eq!(
1735 fill_report.client_order_id,
1736 Some(ClientOrderId::new("001BTCUSDT20250106001"))
1737 );
1738 assert_eq!(
1739 fill_report.venue_order_id,
1740 VenueOrderId::new("2497956918703120384")
1741 );
1742 assert_eq!(fill_report.trade_id, TradeId::from("1518905529"));
1743 assert_eq!(fill_report.order_side, OrderSide::Buy);
1744 assert_eq!(fill_report.last_px, Price::from("103698.90"));
1745 assert_eq!(fill_report.last_qty, Quantity::from("0.03000000"));
1746 assert_eq!(fill_report.liquidity_side, LiquiditySide::Maker);
1747 }
1748
1749 #[rstest]
1750 fn test_parse_book10_msg() {
1751 let json_data = load_test_json("ws_books_snapshot.json");
1752 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1753 let msgs: Vec<OKXBookMsg> = match event {
1754 OKXWebSocketEvent::BookData { data, .. } => data,
1755 _ => panic!("Expected BookData"),
1756 };
1757
1758 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1759 let depth10 =
1760 parse_book10_msg(&msgs[0], instrument_id, 2, 0, UnixNanos::default()).unwrap();
1761
1762 assert_eq!(depth10.instrument_id, instrument_id);
1763 assert_eq!(depth10.sequence, 123456);
1764 assert_eq!(depth10.ts_event, UnixNanos::from(1597026383085000000));
1765 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1766
1767 assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
1769 assert_eq!(depth10.bids[0].size, Quantity::from("256"));
1770 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1771 assert_eq!(depth10.bid_counts[0], 12);
1772
1773 assert_eq!(depth10.bids[1].price, Price::from("8475.55"));
1774 assert_eq!(depth10.bids[1].size, Quantity::from("101"));
1775 assert_eq!(depth10.bid_counts[1], 1);
1776
1777 assert_eq!(depth10.bids[8].price, Price::from("0"));
1779 assert_eq!(depth10.bids[8].size, Quantity::from("0"));
1780 assert_eq!(depth10.bid_counts[8], 0);
1781
1782 assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
1784 assert_eq!(depth10.asks[0].size, Quantity::from("415"));
1785 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1786 assert_eq!(depth10.ask_counts[0], 13);
1787
1788 assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
1789 assert_eq!(depth10.asks[1].size, Quantity::from("7"));
1790 assert_eq!(depth10.ask_counts[1], 2);
1791
1792 assert_eq!(depth10.asks[8].price, Price::from("0"));
1794 assert_eq!(depth10.asks[8].size, Quantity::from("0"));
1795 assert_eq!(depth10.ask_counts[8], 0);
1796 }
1797
1798 #[rstest]
1799 fn test_parse_book10_msg_vec() {
1800 let json_data = load_test_json("ws_books_snapshot.json");
1801 let event: OKXWebSocketEvent = serde_json::from_str(&json_data).unwrap();
1802 let msgs: Vec<OKXBookMsg> = match event {
1803 OKXWebSocketEvent::BookData { data, .. } => data,
1804 _ => panic!("Expected BookData"),
1805 };
1806
1807 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
1808 let depth10_vec =
1809 parse_book10_msg_vec(msgs, &instrument_id, 2, 0, UnixNanos::default()).unwrap();
1810
1811 assert_eq!(depth10_vec.len(), 1);
1812
1813 if let Data::Depth10(d) = &depth10_vec[0] {
1814 assert_eq!(d.instrument_id, instrument_id);
1815 assert_eq!(d.sequence, 123456);
1816 assert_eq!(d.bids[0].price, Price::from("8476.97"));
1817 assert_eq!(d.asks[0].price, Price::from("8476.98"));
1818 } else {
1819 panic!("Expected Depth10");
1820 }
1821 }
1822
1823 #[rstest]
1824 fn test_parse_fill_report_with_fee_cache() {
1825 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1826 let instrument = CryptoPerpetual::new(
1827 instrument_id,
1828 Symbol::from("BTC-USDT-SWAP"),
1829 Currency::BTC(),
1830 Currency::USDT(),
1831 Currency::USDT(),
1832 false, 2, 8, Price::from("0.01"),
1836 Quantity::from("0.00000001"),
1837 None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1850 UnixNanos::default(),
1851 );
1852
1853 let account_id = AccountId::new("OKX-001");
1854 let ts_init = UnixNanos::default();
1855
1856 let order_msg_1 = OKXOrderMsg {
1858 acc_fill_sz: Some("0.01".to_string()),
1859 avg_px: "50000.0".to_string(),
1860 c_time: 1746947317401,
1861 cancel_source: None,
1862 cancel_source_reason: None,
1863 category: Ustr::from("normal"),
1864 ccy: Ustr::from("USDT"),
1865 cl_ord_id: "test_order_1".to_string(),
1866 algo_cl_ord_id: None,
1867 fee: Some("-1.0".to_string()), fee_ccy: Ustr::from("USDT"),
1869 fill_px: "50000.0".to_string(),
1870 fill_sz: "0.01".to_string(),
1871 fill_time: 1746947317402,
1872 inst_id: Ustr::from("BTC-USDT-SWAP"),
1873 inst_type: crate::common::enums::OKXInstrumentType::Swap,
1874 lever: "2.0".to_string(),
1875 ord_id: Ustr::from("1234567890"),
1876 ord_type: OKXOrderType::Market,
1877 pnl: "0".to_string(),
1878 pos_side: OKXPositionSide::Long,
1879 px: "".to_string(),
1880 reduce_only: "false".to_string(),
1881 side: crate::common::enums::OKXSide::Buy,
1882 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
1883 exec_type: crate::common::enums::OKXExecType::Maker,
1884 sz: "0.03".to_string(), td_mode: OKXTradeMode::Isolated,
1886 trade_id: "trade_1".to_string(),
1887 u_time: 1746947317402,
1888 };
1889
1890 let fill_report_1 = parse_fill_report(
1891 &order_msg_1,
1892 &InstrumentAny::CryptoPerpetual(instrument),
1893 account_id,
1894 None,
1895 ts_init,
1896 )
1897 .unwrap();
1898
1899 assert_eq!(fill_report_1.commission, Money::new(1.0, Currency::USDT()));
1901
1902 let order_msg_2 = OKXOrderMsg {
1904 acc_fill_sz: Some("0.03".to_string()),
1905 avg_px: "50000.0".to_string(),
1906 c_time: 1746947317401,
1907 cancel_source: None,
1908 cancel_source_reason: None,
1909 category: Ustr::from("normal"),
1910 ccy: Ustr::from("USDT"),
1911 cl_ord_id: "test_order_1".to_string(),
1912 algo_cl_ord_id: None,
1913 fee: Some("-3.0".to_string()), fee_ccy: Ustr::from("USDT"),
1915 fill_px: "50000.0".to_string(),
1916 fill_sz: "0.02".to_string(),
1917 fill_time: 1746947317403,
1918 inst_id: Ustr::from("BTC-USDT-SWAP"),
1919 inst_type: crate::common::enums::OKXInstrumentType::Swap,
1920 lever: "2.0".to_string(),
1921 ord_id: Ustr::from("1234567890"),
1922 ord_type: OKXOrderType::Market,
1923 pnl: "0".to_string(),
1924 pos_side: OKXPositionSide::Long,
1925 px: "".to_string(),
1926 reduce_only: "false".to_string(),
1927 side: crate::common::enums::OKXSide::Buy,
1928 state: crate::common::enums::OKXOrderStatus::Filled,
1929 exec_type: crate::common::enums::OKXExecType::Maker,
1930 sz: "0.03".to_string(), td_mode: OKXTradeMode::Isolated,
1932 trade_id: "trade_2".to_string(),
1933 u_time: 1746947317403,
1934 };
1935
1936 let fill_report_2 = parse_fill_report(
1937 &order_msg_2,
1938 &InstrumentAny::CryptoPerpetual(instrument),
1939 account_id,
1940 Some(fill_report_1.commission),
1941 ts_init,
1942 )
1943 .unwrap();
1944
1945 assert_eq!(fill_report_2.commission, Money::new(2.0, Currency::USDT()));
1947
1948 }
1950
1951 #[rstest]
1952 fn test_parse_fill_report_with_maker_rebates() {
1953 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
1954 let instrument = CryptoPerpetual::new(
1955 instrument_id,
1956 Symbol::from("BTC-USDT-SWAP"),
1957 Currency::BTC(),
1958 Currency::USDT(),
1959 Currency::USDT(),
1960 false,
1961 2,
1962 8,
1963 Price::from("0.01"),
1964 Quantity::from("0.00000001"),
1965 None,
1966 None,
1967 None,
1968 None,
1969 None,
1970 None,
1971 None,
1972 None,
1973 None,
1974 None,
1975 None,
1976 None,
1977 UnixNanos::default(),
1978 UnixNanos::default(),
1979 );
1980
1981 let account_id = AccountId::new("OKX-001");
1982 let ts_init = UnixNanos::default();
1983
1984 let order_msg_1 = OKXOrderMsg {
1986 acc_fill_sz: Some("0.01".to_string()),
1987 avg_px: "50000.0".to_string(),
1988 c_time: 1746947317401,
1989 cancel_source: None,
1990 cancel_source_reason: None,
1991 category: Ustr::from("normal"),
1992 ccy: Ustr::from("USDT"),
1993 cl_ord_id: "test_order_rebate".to_string(),
1994 algo_cl_ord_id: None,
1995 fee: Some("0.5".to_string()), fee_ccy: Ustr::from("USDT"),
1997 fill_px: "50000.0".to_string(),
1998 fill_sz: "0.01".to_string(),
1999 fill_time: 1746947317402,
2000 inst_id: Ustr::from("BTC-USDT-SWAP"),
2001 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2002 lever: "2.0".to_string(),
2003 ord_id: Ustr::from("rebate_order_123"),
2004 ord_type: OKXOrderType::Market,
2005 pnl: "0".to_string(),
2006 pos_side: OKXPositionSide::Long,
2007 px: "".to_string(),
2008 reduce_only: "false".to_string(),
2009 side: crate::common::enums::OKXSide::Buy,
2010 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2011 exec_type: crate::common::enums::OKXExecType::Maker,
2012 sz: "0.02".to_string(),
2013 td_mode: OKXTradeMode::Isolated,
2014 trade_id: "trade_rebate_1".to_string(),
2015 u_time: 1746947317402,
2016 };
2017
2018 let fill_report_1 = parse_fill_report(
2019 &order_msg_1,
2020 &InstrumentAny::CryptoPerpetual(instrument),
2021 account_id,
2022 None,
2023 ts_init,
2024 )
2025 .unwrap();
2026
2027 assert_eq!(fill_report_1.commission, Money::new(-0.5, Currency::USDT()));
2029
2030 let order_msg_2 = OKXOrderMsg {
2032 acc_fill_sz: Some("0.02".to_string()),
2033 avg_px: "50000.0".to_string(),
2034 c_time: 1746947317401,
2035 cancel_source: None,
2036 cancel_source_reason: None,
2037 category: Ustr::from("normal"),
2038 ccy: Ustr::from("USDT"),
2039 cl_ord_id: "test_order_rebate".to_string(),
2040 algo_cl_ord_id: None,
2041 fee: Some("0.8".to_string()), fee_ccy: Ustr::from("USDT"),
2043 fill_px: "50000.0".to_string(),
2044 fill_sz: "0.01".to_string(),
2045 fill_time: 1746947317403,
2046 inst_id: Ustr::from("BTC-USDT-SWAP"),
2047 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2048 lever: "2.0".to_string(),
2049 ord_id: Ustr::from("rebate_order_123"),
2050 ord_type: OKXOrderType::Market,
2051 pnl: "0".to_string(),
2052 pos_side: OKXPositionSide::Long,
2053 px: "".to_string(),
2054 reduce_only: "false".to_string(),
2055 side: crate::common::enums::OKXSide::Buy,
2056 state: crate::common::enums::OKXOrderStatus::Filled,
2057 exec_type: crate::common::enums::OKXExecType::Maker,
2058 sz: "0.02".to_string(),
2059 td_mode: OKXTradeMode::Isolated,
2060 trade_id: "trade_rebate_2".to_string(),
2061 u_time: 1746947317403,
2062 };
2063
2064 let fill_report_2 = parse_fill_report(
2065 &order_msg_2,
2066 &InstrumentAny::CryptoPerpetual(instrument),
2067 account_id,
2068 Some(fill_report_1.commission),
2069 ts_init,
2070 )
2071 .unwrap();
2072
2073 assert_eq!(fill_report_2.commission, Money::new(-0.3, Currency::USDT()));
2075 }
2076
2077 #[rstest]
2078 fn test_parse_fill_report_rebate_to_charge_transition() {
2079 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2080 let instrument = CryptoPerpetual::new(
2081 instrument_id,
2082 Symbol::from("BTC-USDT-SWAP"),
2083 Currency::BTC(),
2084 Currency::USDT(),
2085 Currency::USDT(),
2086 false,
2087 2,
2088 8,
2089 Price::from("0.01"),
2090 Quantity::from("0.00000001"),
2091 None,
2092 None,
2093 None,
2094 None,
2095 None,
2096 None,
2097 None,
2098 None,
2099 None,
2100 None,
2101 None,
2102 None,
2103 UnixNanos::default(),
2104 UnixNanos::default(),
2105 );
2106
2107 let account_id = AccountId::new("OKX-001");
2108 let ts_init = UnixNanos::default();
2109
2110 let order_msg_1 = OKXOrderMsg {
2112 acc_fill_sz: Some("0.01".to_string()),
2113 avg_px: "50000.0".to_string(),
2114 c_time: 1746947317401,
2115 cancel_source: None,
2116 cancel_source_reason: None,
2117 category: Ustr::from("normal"),
2118 ccy: Ustr::from("USDT"),
2119 cl_ord_id: "test_order_transition".to_string(),
2120 algo_cl_ord_id: None,
2121 fee: Some("1.0".to_string()), fee_ccy: Ustr::from("USDT"),
2123 fill_px: "50000.0".to_string(),
2124 fill_sz: "0.01".to_string(),
2125 fill_time: 1746947317402,
2126 inst_id: Ustr::from("BTC-USDT-SWAP"),
2127 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2128 lever: "2.0".to_string(),
2129 ord_id: Ustr::from("transition_order_456"),
2130 ord_type: OKXOrderType::Market,
2131 pnl: "0".to_string(),
2132 pos_side: OKXPositionSide::Long,
2133 px: "".to_string(),
2134 reduce_only: "false".to_string(),
2135 side: crate::common::enums::OKXSide::Buy,
2136 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2137 exec_type: crate::common::enums::OKXExecType::Maker,
2138 sz: "0.02".to_string(),
2139 td_mode: OKXTradeMode::Isolated,
2140 trade_id: "trade_transition_1".to_string(),
2141 u_time: 1746947317402,
2142 };
2143
2144 let fill_report_1 = parse_fill_report(
2145 &order_msg_1,
2146 &InstrumentAny::CryptoPerpetual(instrument),
2147 account_id,
2148 None,
2149 ts_init,
2150 )
2151 .unwrap();
2152
2153 assert_eq!(fill_report_1.commission, Money::new(-1.0, Currency::USDT()));
2155
2156 let order_msg_2 = OKXOrderMsg {
2160 acc_fill_sz: Some("0.02".to_string()),
2161 avg_px: "50000.0".to_string(),
2162 c_time: 1746947317401,
2163 cancel_source: None,
2164 cancel_source_reason: None,
2165 category: Ustr::from("normal"),
2166 ccy: Ustr::from("USDT"),
2167 cl_ord_id: "test_order_transition".to_string(),
2168 algo_cl_ord_id: None,
2169 fee: Some("-2.0".to_string()), fee_ccy: Ustr::from("USDT"),
2171 fill_px: "50000.0".to_string(),
2172 fill_sz: "0.01".to_string(),
2173 fill_time: 1746947317403,
2174 inst_id: Ustr::from("BTC-USDT-SWAP"),
2175 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2176 lever: "2.0".to_string(),
2177 ord_id: Ustr::from("transition_order_456"),
2178 ord_type: OKXOrderType::Market,
2179 pnl: "0".to_string(),
2180 pos_side: OKXPositionSide::Long,
2181 px: "".to_string(),
2182 reduce_only: "false".to_string(),
2183 side: crate::common::enums::OKXSide::Buy,
2184 state: crate::common::enums::OKXOrderStatus::Filled,
2185 exec_type: crate::common::enums::OKXExecType::Taker,
2186 sz: "0.02".to_string(),
2187 td_mode: OKXTradeMode::Isolated,
2188 trade_id: "trade_transition_2".to_string(),
2189 u_time: 1746947317403,
2190 };
2191
2192 let fill_report_2 = parse_fill_report(
2193 &order_msg_2,
2194 &InstrumentAny::CryptoPerpetual(instrument),
2195 account_id,
2196 Some(fill_report_1.commission),
2197 ts_init,
2198 )
2199 .unwrap();
2200
2201 assert_eq!(fill_report_2.commission, Money::new(3.0, Currency::USDT()));
2204 }
2205
2206 #[rstest]
2207 fn test_parse_fill_report_negative_incremental() {
2208 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2209 let instrument = CryptoPerpetual::new(
2210 instrument_id,
2211 Symbol::from("BTC-USDT-SWAP"),
2212 Currency::BTC(),
2213 Currency::USDT(),
2214 Currency::USDT(),
2215 false,
2216 2,
2217 8,
2218 Price::from("0.01"),
2219 Quantity::from("0.00000001"),
2220 None,
2221 None,
2222 None,
2223 None,
2224 None,
2225 None,
2226 None,
2227 None,
2228 None,
2229 None,
2230 None,
2231 None,
2232 UnixNanos::default(),
2233 UnixNanos::default(),
2234 );
2235
2236 let account_id = AccountId::new("OKX-001");
2237 let ts_init = UnixNanos::default();
2238
2239 let order_msg_1 = OKXOrderMsg {
2241 acc_fill_sz: Some("0.01".to_string()),
2242 avg_px: "50000.0".to_string(),
2243 c_time: 1746947317401,
2244 cancel_source: None,
2245 cancel_source_reason: None,
2246 category: Ustr::from("normal"),
2247 ccy: Ustr::from("USDT"),
2248 cl_ord_id: "test_order_neg_inc".to_string(),
2249 algo_cl_ord_id: None,
2250 fee: Some("-2.0".to_string()),
2251 fee_ccy: Ustr::from("USDT"),
2252 fill_px: "50000.0".to_string(),
2253 fill_sz: "0.01".to_string(),
2254 fill_time: 1746947317402,
2255 inst_id: Ustr::from("BTC-USDT-SWAP"),
2256 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2257 lever: "2.0".to_string(),
2258 ord_id: Ustr::from("neg_inc_order_789"),
2259 ord_type: OKXOrderType::Market,
2260 pnl: "0".to_string(),
2261 pos_side: OKXPositionSide::Long,
2262 px: "".to_string(),
2263 reduce_only: "false".to_string(),
2264 side: crate::common::enums::OKXSide::Buy,
2265 state: crate::common::enums::OKXOrderStatus::PartiallyFilled,
2266 exec_type: crate::common::enums::OKXExecType::Taker,
2267 sz: "0.02".to_string(),
2268 td_mode: OKXTradeMode::Isolated,
2269 trade_id: "trade_neg_inc_1".to_string(),
2270 u_time: 1746947317402,
2271 };
2272
2273 let fill_report_1 = parse_fill_report(
2274 &order_msg_1,
2275 &InstrumentAny::CryptoPerpetual(instrument),
2276 account_id,
2277 None,
2278 ts_init,
2279 )
2280 .unwrap();
2281
2282 assert_eq!(fill_report_1.commission, Money::new(2.0, Currency::USDT()));
2283
2284 let order_msg_2 = OKXOrderMsg {
2287 acc_fill_sz: Some("0.02".to_string()),
2288 avg_px: "50000.0".to_string(),
2289 c_time: 1746947317401,
2290 cancel_source: None,
2291 cancel_source_reason: None,
2292 category: Ustr::from("normal"),
2293 ccy: Ustr::from("USDT"),
2294 cl_ord_id: "test_order_neg_inc".to_string(),
2295 algo_cl_ord_id: None,
2296 fee: Some("-1.5".to_string()), fee_ccy: Ustr::from("USDT"),
2298 fill_px: "50000.0".to_string(),
2299 fill_sz: "0.01".to_string(),
2300 fill_time: 1746947317403,
2301 inst_id: Ustr::from("BTC-USDT-SWAP"),
2302 inst_type: crate::common::enums::OKXInstrumentType::Swap,
2303 lever: "2.0".to_string(),
2304 ord_id: Ustr::from("neg_inc_order_789"),
2305 ord_type: OKXOrderType::Market,
2306 pnl: "0".to_string(),
2307 pos_side: OKXPositionSide::Long,
2308 px: "".to_string(),
2309 reduce_only: "false".to_string(),
2310 side: crate::common::enums::OKXSide::Buy,
2311 state: crate::common::enums::OKXOrderStatus::Filled,
2312 exec_type: crate::common::enums::OKXExecType::Maker,
2313 sz: "0.02".to_string(),
2314 td_mode: OKXTradeMode::Isolated,
2315 trade_id: "trade_neg_inc_2".to_string(),
2316 u_time: 1746947317403,
2317 };
2318
2319 let fill_report_2 = parse_fill_report(
2320 &order_msg_2,
2321 &InstrumentAny::CryptoPerpetual(instrument),
2322 account_id,
2323 Some(fill_report_1.commission),
2324 ts_init,
2325 )
2326 .unwrap();
2327
2328 assert_eq!(fill_report_2.commission, Money::new(-0.5, Currency::USDT()));
2330 }
2331
2332 #[rstest]
2333 fn test_parse_book10_msg_partial_levels() {
2334 let book_msg = OKXBookMsg {
2336 asks: vec![
2337 OrderBookEntry {
2338 price: "8476.98".to_string(),
2339 size: "415".to_string(),
2340 liquidated_orders_count: "0".to_string(),
2341 orders_count: "13".to_string(),
2342 },
2343 OrderBookEntry {
2344 price: "8477.00".to_string(),
2345 size: "7".to_string(),
2346 liquidated_orders_count: "0".to_string(),
2347 orders_count: "2".to_string(),
2348 },
2349 ],
2350 bids: vec![OrderBookEntry {
2351 price: "8476.97".to_string(),
2352 size: "256".to_string(),
2353 liquidated_orders_count: "0".to_string(),
2354 orders_count: "12".to_string(),
2355 }],
2356 ts: 1597026383085,
2357 checksum: None,
2358 prev_seq_id: None,
2359 seq_id: 123456,
2360 };
2361
2362 let instrument_id = InstrumentId::from("BTC-USDT.OKX");
2363 let depth10 =
2364 parse_book10_msg(&book_msg, instrument_id, 2, 0, UnixNanos::default()).unwrap();
2365
2366 assert_eq!(depth10.bids[0].price, Price::from("8476.97"));
2368 assert_eq!(depth10.bids[0].size, Quantity::from("256"));
2369 assert_eq!(depth10.bid_counts[0], 12);
2370
2371 assert_eq!(depth10.bids[1].price, Price::from("0"));
2373 assert_eq!(depth10.bids[1].size, Quantity::from("0"));
2374 assert_eq!(depth10.bid_counts[1], 0);
2375
2376 assert_eq!(depth10.asks[0].price, Price::from("8476.98"));
2378 assert_eq!(depth10.asks[1].price, Price::from("8477.00"));
2379 assert_eq!(depth10.asks[2].price, Price::from("0")); }
2381
2382 #[rstest]
2383 fn test_parse_algo_order_msg_stop_market() {
2384 let json_data = load_test_json("ws_orders_algo.json");
2385 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2386 let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2387
2388 let msg = &data[0];
2390 assert_eq!(msg.algo_id, "706620792746729472");
2391 assert_eq!(msg.algo_cl_ord_id, "STOP001BTCUSDT20250120");
2392 assert_eq!(msg.state, OKXOrderStatus::Live);
2393 assert_eq!(msg.ord_px, "-1"); let account_id = AccountId::new("OKX-001");
2396 let mut instruments = AHashMap::new();
2397
2398 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2400 let instrument = CryptoPerpetual::new(
2401 instrument_id,
2402 Symbol::from("BTC-USDT-SWAP"),
2403 Currency::BTC(),
2404 Currency::USDT(),
2405 Currency::USDT(),
2406 false, 2, 8, Price::from("0.01"),
2410 Quantity::from("0.00000001"),
2411 None,
2412 None,
2413 None,
2414 None,
2415 None,
2416 None,
2417 None,
2418 None,
2419 None,
2420 None,
2421 None,
2422 None,
2423 0.into(), 0.into(), );
2426 instruments.insert(
2427 Ustr::from("BTC-USDT-SWAP"),
2428 InstrumentAny::CryptoPerpetual(instrument),
2429 );
2430
2431 let result =
2432 parse_algo_order_msg(msg.clone(), account_id, &instruments, UnixNanos::default());
2433
2434 assert!(result.is_ok());
2435 let report = result.unwrap();
2436
2437 if let ExecutionReport::Order(status_report) = report {
2438 assert_eq!(status_report.order_type, OrderType::StopMarket);
2439 assert_eq!(status_report.order_side, OrderSide::Sell);
2440 assert_eq!(status_report.quantity, Quantity::from("0.01000000"));
2441 assert_eq!(status_report.trigger_price, Some(Price::from("95000.00")));
2442 assert_eq!(status_report.trigger_type, Some(TriggerType::LastPrice));
2443 assert_eq!(status_report.price, None); } else {
2445 panic!("Expected Order report");
2446 }
2447 }
2448
2449 #[rstest]
2450 fn test_parse_algo_order_msg_stop_limit() {
2451 let json_data = load_test_json("ws_orders_algo.json");
2452 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2453 let data: Vec<OKXAlgoOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2454
2455 let msg = &data[1];
2457 assert_eq!(msg.algo_id, "706620792746729473");
2458 assert_eq!(msg.state, OKXOrderStatus::Live);
2459 assert_eq!(msg.ord_px, "106000"); let account_id = AccountId::new("OKX-001");
2462 let mut instruments = AHashMap::new();
2463
2464 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2466 let instrument = CryptoPerpetual::new(
2467 instrument_id,
2468 Symbol::from("BTC-USDT-SWAP"),
2469 Currency::BTC(),
2470 Currency::USDT(),
2471 Currency::USDT(),
2472 false, 2, 8, Price::from("0.01"),
2476 Quantity::from("0.00000001"),
2477 None,
2478 None,
2479 None,
2480 None,
2481 None,
2482 None,
2483 None,
2484 None,
2485 None,
2486 None,
2487 None,
2488 None,
2489 0.into(), 0.into(), );
2492 instruments.insert(
2493 Ustr::from("BTC-USDT-SWAP"),
2494 InstrumentAny::CryptoPerpetual(instrument),
2495 );
2496
2497 let result =
2498 parse_algo_order_msg(msg.clone(), account_id, &instruments, UnixNanos::default());
2499
2500 assert!(result.is_ok());
2501 let report = result.unwrap();
2502
2503 if let ExecutionReport::Order(status_report) = report {
2504 assert_eq!(status_report.order_type, OrderType::StopLimit);
2505 assert_eq!(status_report.order_side, OrderSide::Buy);
2506 assert_eq!(status_report.quantity, Quantity::from("0.02000000"));
2507 assert_eq!(status_report.trigger_price, Some(Price::from("105000.00")));
2508 assert_eq!(status_report.trigger_type, Some(TriggerType::MarkPrice));
2509 assert_eq!(status_report.price, Some(Price::from("106000.00"))); } else {
2511 panic!("Expected Order report");
2512 }
2513 }
2514
2515 #[rstest]
2516 fn test_parse_trigger_order_from_regular_channel() {
2517 let json_data = load_test_json("ws_orders_trigger.json");
2518 let ws_msg: serde_json::Value = serde_json::from_str(&json_data).unwrap();
2519 let data: Vec<OKXOrderMsg> = serde_json::from_value(ws_msg["data"].clone()).unwrap();
2520
2521 let msg = &data[0];
2523 assert_eq!(msg.ord_type, OKXOrderType::Trigger);
2524 assert_eq!(msg.state, OKXOrderStatus::Filled);
2525
2526 let account_id = AccountId::new("OKX-001");
2527 let mut instruments = AHashMap::new();
2528
2529 let instrument_id = InstrumentId::from("BTC-USDT-SWAP.OKX");
2531 let instrument = CryptoPerpetual::new(
2532 instrument_id,
2533 Symbol::from("BTC-USDT-SWAP"),
2534 Currency::BTC(),
2535 Currency::USDT(),
2536 Currency::USDT(),
2537 false, 2, 8, Price::from("0.01"),
2541 Quantity::from("0.00000001"),
2542 None,
2543 None,
2544 None,
2545 None,
2546 None,
2547 None,
2548 None,
2549 None,
2550 None,
2551 None,
2552 None,
2553 None,
2554 0.into(), 0.into(), );
2557 instruments.insert(
2558 Ustr::from("BTC-USDT-SWAP"),
2559 InstrumentAny::CryptoPerpetual(instrument),
2560 );
2561 let fee_cache = AHashMap::new();
2562
2563 let result = parse_order_msg_vec(
2564 vec![msg.clone()],
2565 account_id,
2566 &instruments,
2567 &fee_cache,
2568 UnixNanos::default(),
2569 );
2570
2571 assert!(result.is_ok());
2572 let reports = result.unwrap();
2573 assert_eq!(reports.len(), 1);
2574
2575 if let ExecutionReport::Fill(fill_report) = &reports[0] {
2576 assert_eq!(fill_report.order_side, OrderSide::Sell);
2577 assert_eq!(fill_report.last_qty, Quantity::from("0.01000000"));
2578 assert_eq!(fill_report.last_px, Price::from("101950.00"));
2579 } else {
2580 panic!("Expected Fill report for filled trigger order");
2581 }
2582 }
2583}