1use nautilus_core::nanos::UnixNanos;
19use nautilus_model::{
20 data::{
21 Bar, BarSpecification, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate,
22 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
23 },
24 enums::{
25 AggregationSource, AggressorSide, BarAggregation, BookAction, OrderSide, PriceType,
26 RecordFlag,
27 },
28 identifiers::TradeId,
29 instruments::{Instrument, InstrumentAny},
30 types::{Price, Quantity},
31};
32use rust_decimal::{Decimal, prelude::FromPrimitive};
33use ustr::Ustr;
34
35use super::{
36 error::{BinanceWsError, BinanceWsResult},
37 messages::{
38 BinanceFuturesAggTradeMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
39 BinanceFuturesKlineMsg, BinanceFuturesMarkPriceMsg, BinanceFuturesTradeMsg,
40 },
41};
42use crate::common::enums::{BinanceKlineInterval, BinanceWsEventType};
43
44pub fn parse_agg_trade(
50 msg: &BinanceFuturesAggTradeMsg,
51 instrument: &InstrumentAny,
52 ts_init: UnixNanos,
53) -> BinanceWsResult<TradeTick> {
54 let instrument_id = instrument.id();
55 let price_precision = instrument.price_precision();
56 let size_precision = instrument.size_precision();
57
58 let price = msg
59 .price
60 .parse::<f64>()
61 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
62 let size = msg
63 .quantity
64 .parse::<f64>()
65 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
66
67 let aggressor_side = if msg.is_buyer_maker {
68 AggressorSide::Seller
69 } else {
70 AggressorSide::Buyer
71 };
72
73 let ts_event = UnixNanos::from(msg.trade_time as u64 * 1_000_000); let trade_id = TradeId::new(msg.agg_trade_id.to_string());
75
76 Ok(TradeTick::new(
77 instrument_id,
78 Price::new(price, price_precision),
79 Quantity::new(size, size_precision),
80 aggressor_side,
81 trade_id,
82 ts_event,
83 ts_init,
84 ))
85}
86
87pub fn parse_trade(
93 msg: &BinanceFuturesTradeMsg,
94 instrument: &InstrumentAny,
95 ts_init: UnixNanos,
96) -> BinanceWsResult<TradeTick> {
97 let instrument_id = instrument.id();
98 let price_precision = instrument.price_precision();
99 let size_precision = instrument.size_precision();
100
101 let price = msg
102 .price
103 .parse::<f64>()
104 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
105 let size = msg
106 .quantity
107 .parse::<f64>()
108 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
109
110 let aggressor_side = if msg.is_buyer_maker {
111 AggressorSide::Seller
112 } else {
113 AggressorSide::Buyer
114 };
115
116 let ts_event = UnixNanos::from(msg.trade_time as u64 * 1_000_000); let trade_id = TradeId::new(msg.trade_id.to_string());
118
119 Ok(TradeTick::new(
120 instrument_id,
121 Price::new(price, price_precision),
122 Quantity::new(size, size_precision),
123 aggressor_side,
124 trade_id,
125 ts_event,
126 ts_init,
127 ))
128}
129
130pub fn parse_book_ticker(
136 msg: &BinanceFuturesBookTickerMsg,
137 instrument: &InstrumentAny,
138 ts_init: UnixNanos,
139) -> BinanceWsResult<QuoteTick> {
140 let instrument_id = instrument.id();
141 let price_precision = instrument.price_precision();
142 let size_precision = instrument.size_precision();
143
144 let bid_price = msg
145 .best_bid_price
146 .parse::<f64>()
147 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
148 let bid_size = msg
149 .best_bid_qty
150 .parse::<f64>()
151 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
152 let ask_price = msg
153 .best_ask_price
154 .parse::<f64>()
155 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
156 let ask_size = msg
157 .best_ask_qty
158 .parse::<f64>()
159 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
160
161 let ts_event = UnixNanos::from(msg.transaction_time as u64 * 1_000_000); Ok(QuoteTick::new(
164 instrument_id,
165 Price::new(bid_price, price_precision),
166 Price::new(ask_price, price_precision),
167 Quantity::new(bid_size, size_precision),
168 Quantity::new(ask_size, size_precision),
169 ts_event,
170 ts_init,
171 ))
172}
173
174pub fn parse_depth_update(
180 msg: &BinanceFuturesDepthUpdateMsg,
181 instrument: &InstrumentAny,
182 ts_init: UnixNanos,
183) -> BinanceWsResult<OrderBookDeltas> {
184 let instrument_id = instrument.id();
185 let price_precision = instrument.price_precision();
186 let size_precision = instrument.size_precision();
187
188 let ts_event = UnixNanos::from(msg.transaction_time as u64 * 1_000_000); let mut deltas = Vec::with_capacity(msg.bids.len() + msg.asks.len());
191
192 for (i, bid) in msg.bids.iter().enumerate() {
194 let price = bid[0]
195 .parse::<f64>()
196 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
197 let size = bid[1]
198 .parse::<f64>()
199 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
200
201 let action = if size == 0.0 {
202 BookAction::Delete
203 } else {
204 BookAction::Update
205 };
206
207 let is_last = i == msg.bids.len() - 1 && msg.asks.is_empty();
208 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
209
210 let order = BookOrder::new(
211 OrderSide::Buy,
212 Price::new(price, price_precision),
213 Quantity::new(size, size_precision),
214 0,
215 );
216
217 deltas.push(OrderBookDelta::new(
218 instrument_id,
219 action,
220 order,
221 flags,
222 msg.final_update_id,
223 ts_event,
224 ts_init,
225 ));
226 }
227
228 for (i, ask) in msg.asks.iter().enumerate() {
230 let price = ask[0]
231 .parse::<f64>()
232 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
233 let size = ask[1]
234 .parse::<f64>()
235 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
236
237 let action = if size == 0.0 {
238 BookAction::Delete
239 } else {
240 BookAction::Update
241 };
242
243 let is_last = i == msg.asks.len() - 1;
244 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
245
246 let order = BookOrder::new(
247 OrderSide::Sell,
248 Price::new(price, price_precision),
249 Quantity::new(size, size_precision),
250 0,
251 );
252
253 deltas.push(OrderBookDelta::new(
254 instrument_id,
255 action,
256 order,
257 flags,
258 msg.final_update_id,
259 ts_event,
260 ts_init,
261 ));
262 }
263
264 Ok(OrderBookDeltas::new(instrument_id, deltas))
265}
266
267pub fn parse_mark_price(
273 msg: &BinanceFuturesMarkPriceMsg,
274 instrument: &InstrumentAny,
275 ts_init: UnixNanos,
276) -> BinanceWsResult<(MarkPriceUpdate, IndexPriceUpdate, FundingRateUpdate)> {
277 let instrument_id = instrument.id();
278 let price_precision = instrument.price_precision();
279
280 let mark_price = msg
281 .mark_price
282 .parse::<f64>()
283 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
284 let index_price = msg
285 .index_price
286 .parse::<f64>()
287 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
288 let funding_rate = msg
289 .funding_rate
290 .parse::<f64>()
291 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
292
293 let ts_event = UnixNanos::from(msg.event_time as u64 * 1_000_000); let next_funding_ns = if msg.next_funding_time > 0 {
295 Some(UnixNanos::from(msg.next_funding_time as u64 * 1_000_000))
296 } else {
297 None
298 };
299
300 let mark_update = MarkPriceUpdate::new(
301 instrument_id,
302 Price::new(mark_price, price_precision),
303 ts_event,
304 ts_init,
305 );
306
307 let index_update = IndexPriceUpdate::new(
308 instrument_id,
309 Price::new(index_price, price_precision),
310 ts_event,
311 ts_init,
312 );
313
314 let funding_update = FundingRateUpdate::new(
315 instrument_id,
316 Decimal::from_f64(funding_rate).unwrap_or_default(),
317 next_funding_ns,
318 ts_event,
319 ts_init,
320 );
321
322 Ok((mark_update, index_update, funding_update))
323}
324
325fn interval_to_bar_spec(interval: BinanceKlineInterval) -> BarSpecification {
327 match interval {
328 BinanceKlineInterval::Second1 => {
329 BarSpecification::new(1, BarAggregation::Second, PriceType::Last)
330 }
331 BinanceKlineInterval::Minute1 => {
332 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
333 }
334 BinanceKlineInterval::Minute3 => {
335 BarSpecification::new(3, BarAggregation::Minute, PriceType::Last)
336 }
337 BinanceKlineInterval::Minute5 => {
338 BarSpecification::new(5, BarAggregation::Minute, PriceType::Last)
339 }
340 BinanceKlineInterval::Minute15 => {
341 BarSpecification::new(15, BarAggregation::Minute, PriceType::Last)
342 }
343 BinanceKlineInterval::Minute30 => {
344 BarSpecification::new(30, BarAggregation::Minute, PriceType::Last)
345 }
346 BinanceKlineInterval::Hour1 => {
347 BarSpecification::new(1, BarAggregation::Hour, PriceType::Last)
348 }
349 BinanceKlineInterval::Hour2 => {
350 BarSpecification::new(2, BarAggregation::Hour, PriceType::Last)
351 }
352 BinanceKlineInterval::Hour4 => {
353 BarSpecification::new(4, BarAggregation::Hour, PriceType::Last)
354 }
355 BinanceKlineInterval::Hour6 => {
356 BarSpecification::new(6, BarAggregation::Hour, PriceType::Last)
357 }
358 BinanceKlineInterval::Hour8 => {
359 BarSpecification::new(8, BarAggregation::Hour, PriceType::Last)
360 }
361 BinanceKlineInterval::Hour12 => {
362 BarSpecification::new(12, BarAggregation::Hour, PriceType::Last)
363 }
364 BinanceKlineInterval::Day1 => {
365 BarSpecification::new(1, BarAggregation::Day, PriceType::Last)
366 }
367 BinanceKlineInterval::Day3 => {
368 BarSpecification::new(3, BarAggregation::Day, PriceType::Last)
369 }
370 BinanceKlineInterval::Week1 => {
371 BarSpecification::new(1, BarAggregation::Week, PriceType::Last)
372 }
373 BinanceKlineInterval::Month1 => {
374 BarSpecification::new(1, BarAggregation::Month, PriceType::Last)
375 }
376 }
377}
378
379pub fn parse_kline(
387 msg: &BinanceFuturesKlineMsg,
388 instrument: &InstrumentAny,
389 ts_init: UnixNanos,
390) -> BinanceWsResult<Option<Bar>> {
391 if !msg.kline.is_closed {
393 return Ok(None);
394 }
395
396 let instrument_id = instrument.id();
397 let price_precision = instrument.price_precision();
398 let size_precision = instrument.size_precision();
399
400 let spec = interval_to_bar_spec(msg.kline.interval);
401 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
402
403 let open = msg
404 .kline
405 .open
406 .parse::<f64>()
407 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
408 let high = msg
409 .kline
410 .high
411 .parse::<f64>()
412 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
413 let low = msg
414 .kline
415 .low
416 .parse::<f64>()
417 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
418 let close = msg
419 .kline
420 .close
421 .parse::<f64>()
422 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
423 let volume = msg
424 .kline
425 .volume
426 .parse::<f64>()
427 .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
428
429 let ts_event = UnixNanos::from(msg.kline.close_time as u64 * 1_000_000); let bar = Bar::new(
433 bar_type,
434 Price::new(open, price_precision),
435 Price::new(high, price_precision),
436 Price::new(low, price_precision),
437 Price::new(close, price_precision),
438 Quantity::new(volume, size_precision),
439 ts_event,
440 ts_init,
441 );
442
443 Ok(Some(bar))
444}
445
446pub fn extract_symbol(json: &serde_json::Value) -> Option<Ustr> {
448 json.get("s").and_then(|v| v.as_str()).map(Ustr::from)
449}
450
451pub fn extract_event_type(json: &serde_json::Value) -> Option<BinanceWsEventType> {
453 json.get("e")
454 .and_then(|v| serde_json::from_value(v.clone()).ok())
455}