nautilus_binance/common/sbe/
cursor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Zero-copy SBE byte cursor for sequential decoding.
17
18use super::error::{MAX_GROUP_SIZE, SbeDecodeError};
19
20/// Zero-copy SBE byte cursor for sequential decoding.
21///
22/// Wraps a byte slice and tracks position, providing typed read methods
23/// that automatically advance the cursor.
24#[derive(Debug, Clone)]
25pub struct SbeCursor<'a> {
26    buf: &'a [u8],
27    pos: usize,
28}
29
30impl<'a> SbeCursor<'a> {
31    /// Creates a new cursor at position 0.
32    #[must_use]
33    pub const fn new(buf: &'a [u8]) -> Self {
34        Self { buf, pos: 0 }
35    }
36
37    /// Creates a cursor starting at a specific offset.
38    #[must_use]
39    pub const fn new_at(buf: &'a [u8], pos: usize) -> Self {
40        Self { buf, pos }
41    }
42
43    /// Current position in the buffer.
44    #[must_use]
45    pub const fn pos(&self) -> usize {
46        self.pos
47    }
48
49    /// Remaining bytes from current position.
50    #[must_use]
51    pub const fn remaining(&self) -> usize {
52        self.buf.len().saturating_sub(self.pos)
53    }
54
55    /// Returns the underlying buffer.
56    #[must_use]
57    pub const fn buffer(&self) -> &'a [u8] {
58        self.buf
59    }
60
61    /// Returns remaining bytes as a slice.
62    #[must_use]
63    pub fn peek(&self) -> &'a [u8] {
64        &self.buf[self.pos..]
65    }
66
67    /// Ensures at least `n` bytes remain.
68    ///
69    /// # Errors
70    ///
71    /// Returns `BufferTooShort` if fewer than `n` bytes remain.
72    pub fn require(&self, n: usize) -> Result<(), SbeDecodeError> {
73        if self.remaining() < n {
74            return Err(SbeDecodeError::BufferTooShort {
75                expected: self.pos + n,
76                actual: self.buf.len(),
77            });
78        }
79        Ok(())
80    }
81
82    /// Advances position by `n` bytes.
83    ///
84    /// # Errors
85    ///
86    /// Returns `BufferTooShort` if fewer than `n` bytes remain.
87    pub fn advance(&mut self, n: usize) -> Result<(), SbeDecodeError> {
88        self.require(n)?;
89        self.pos += n;
90        Ok(())
91    }
92
93    /// Skips `n` bytes without bounds checking.
94    ///
95    /// # Safety
96    ///
97    /// Caller must ensure `n` bytes are available.
98    pub fn skip(&mut self, n: usize) {
99        self.pos += n;
100    }
101
102    /// Resets cursor to start of buffer.
103    pub fn reset(&mut self) {
104        self.pos = 0;
105    }
106
107    /// Sets cursor to a specific position.
108    pub fn set_pos(&mut self, pos: usize) {
109        self.pos = pos;
110    }
111
112    /// Reads a u8 and advances by 1 byte.
113    pub fn read_u8(&mut self) -> Result<u8, SbeDecodeError> {
114        self.require(1)?;
115        let value = self.buf[self.pos];
116        self.pos += 1;
117        Ok(value)
118    }
119
120    /// Reads an i8 and advances by 1 byte.
121    pub fn read_i8(&mut self) -> Result<i8, SbeDecodeError> {
122        self.require(1)?;
123        let value = self.buf[self.pos] as i8;
124        self.pos += 1;
125        Ok(value)
126    }
127
128    /// Reads a u16 little-endian and advances by 2 bytes.
129    pub fn read_u16_le(&mut self) -> Result<u16, SbeDecodeError> {
130        self.require(2)?;
131        let value = u16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
132        self.pos += 2;
133        Ok(value)
134    }
135
136    /// Reads an i16 little-endian and advances by 2 bytes.
137    pub fn read_i16_le(&mut self) -> Result<i16, SbeDecodeError> {
138        self.require(2)?;
139        let value = i16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
140        self.pos += 2;
141        Ok(value)
142    }
143
144    /// Reads a u32 little-endian and advances by 4 bytes.
145    pub fn read_u32_le(&mut self) -> Result<u32, SbeDecodeError> {
146        self.require(4)?;
147        let value = u32::from_le_bytes([
148            self.buf[self.pos],
149            self.buf[self.pos + 1],
150            self.buf[self.pos + 2],
151            self.buf[self.pos + 3],
152        ]);
153        self.pos += 4;
154        Ok(value)
155    }
156
157    /// Reads an i32 little-endian and advances by 4 bytes.
158    pub fn read_i32_le(&mut self) -> Result<i32, SbeDecodeError> {
159        self.require(4)?;
160        let value = i32::from_le_bytes([
161            self.buf[self.pos],
162            self.buf[self.pos + 1],
163            self.buf[self.pos + 2],
164            self.buf[self.pos + 3],
165        ]);
166        self.pos += 4;
167        Ok(value)
168    }
169
170    /// Reads a u64 little-endian and advances by 8 bytes.
171    pub fn read_u64_le(&mut self) -> Result<u64, SbeDecodeError> {
172        self.require(8)?;
173        let value = u64::from_le_bytes([
174            self.buf[self.pos],
175            self.buf[self.pos + 1],
176            self.buf[self.pos + 2],
177            self.buf[self.pos + 3],
178            self.buf[self.pos + 4],
179            self.buf[self.pos + 5],
180            self.buf[self.pos + 6],
181            self.buf[self.pos + 7],
182        ]);
183        self.pos += 8;
184        Ok(value)
185    }
186
187    /// Reads an i64 little-endian and advances by 8 bytes.
188    pub fn read_i64_le(&mut self) -> Result<i64, SbeDecodeError> {
189        self.require(8)?;
190        let value = i64::from_le_bytes([
191            self.buf[self.pos],
192            self.buf[self.pos + 1],
193            self.buf[self.pos + 2],
194            self.buf[self.pos + 3],
195            self.buf[self.pos + 4],
196            self.buf[self.pos + 5],
197            self.buf[self.pos + 6],
198            self.buf[self.pos + 7],
199        ]);
200        self.pos += 8;
201        Ok(value)
202    }
203
204    /// Reads an optional i64 where `i64::MIN` represents None.
205    pub fn read_optional_i64_le(&mut self) -> Result<Option<i64>, SbeDecodeError> {
206        let value = self.read_i64_le()?;
207        Ok(if value == i64::MIN { None } else { Some(value) })
208    }
209
210    /// Reads N bytes and advances.
211    pub fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], SbeDecodeError> {
212        self.require(n)?;
213        let slice = &self.buf[self.pos..self.pos + n];
214        self.pos += n;
215        Ok(slice)
216    }
217
218    /// Reads a varString8 (1-byte length prefix + UTF-8 data).
219    ///
220    /// Returns empty string if length is 0.
221    pub fn read_var_string8(&mut self) -> Result<String, SbeDecodeError> {
222        let len = self.read_u8()? as usize;
223        if len == 0 {
224            return Ok(String::new());
225        }
226        self.require(len)?;
227        let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
228            .map_err(|_| SbeDecodeError::InvalidUtf8)?
229            .to_string();
230        self.pos += len;
231        Ok(s)
232    }
233
234    /// Reads a varString8 as a &str (zero-copy).
235    pub fn read_var_string8_ref(&mut self) -> Result<&'a str, SbeDecodeError> {
236        let len = self.read_u8()? as usize;
237        if len == 0 {
238            return Ok("");
239        }
240        self.require(len)?;
241        let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
242            .map_err(|_| SbeDecodeError::InvalidUtf8)?;
243        self.pos += len;
244        Ok(s)
245    }
246
247    /// Reads group header (u16 block_length + u32 num_in_group).
248    ///
249    /// Returns (block_length, num_in_group).
250    pub fn read_group_header(&mut self) -> Result<(u16, u32), SbeDecodeError> {
251        let block_length = self.read_u16_le()?;
252        let num_in_group = self.read_u32_le()?;
253
254        if num_in_group > MAX_GROUP_SIZE {
255            return Err(SbeDecodeError::GroupSizeTooLarge {
256                count: num_in_group,
257                max: MAX_GROUP_SIZE,
258            });
259        }
260
261        Ok((block_length, num_in_group))
262    }
263
264    /// Reads compact group header (u16 block_length + u16 num_in_group).
265    ///
266    /// Returns (block_length, num_in_group).
267    pub fn read_group_header_16(&mut self) -> Result<(u16, u16), SbeDecodeError> {
268        let block_length = self.read_u16_le()?;
269        let num_in_group = self.read_u16_le()?;
270
271        if u32::from(num_in_group) > MAX_GROUP_SIZE {
272            return Err(SbeDecodeError::GroupSizeTooLarge {
273                count: u32::from(num_in_group),
274                max: MAX_GROUP_SIZE,
275            });
276        }
277
278        Ok((block_length, num_in_group))
279    }
280
281    /// Iterates over a group, calling `decode_item` for each element.
282    ///
283    /// The decoder function receives a cursor positioned at the start of each item
284    /// and should decode the item without advancing past `block_length` bytes.
285    pub fn read_group<T, F>(
286        &mut self,
287        block_length: u16,
288        num_in_group: u32,
289        mut decode_item: F,
290    ) -> Result<Vec<T>, SbeDecodeError>
291    where
292        F: FnMut(&mut Self) -> Result<T, SbeDecodeError>,
293    {
294        let block_len = block_length as usize;
295        let count = num_in_group as usize;
296
297        // Validate we have enough bytes for all items
298        self.require(count * block_len)?;
299
300        let mut items = Vec::with_capacity(count);
301        for _ in 0..count {
302            let item_start = self.pos;
303            let item = decode_item(self)?;
304            items.push(item);
305
306            // Advance to next item boundary (respects block_length even if decoder read less)
307            self.pos = item_start + block_len;
308        }
309
310        Ok(items)
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use rstest::rstest;
317
318    use super::*;
319
320    #[rstest]
321    fn test_new_starts_at_zero() {
322        let buf = [1, 2, 3, 4];
323        let cursor = SbeCursor::new(&buf);
324        assert_eq!(cursor.pos(), 0);
325        assert_eq!(cursor.remaining(), 4);
326    }
327
328    #[rstest]
329    fn test_new_at_starts_at_offset() {
330        let buf = [1, 2, 3, 4];
331        let cursor = SbeCursor::new_at(&buf, 2);
332        assert_eq!(cursor.pos(), 2);
333        assert_eq!(cursor.remaining(), 2);
334    }
335
336    #[rstest]
337    fn test_read_u8() {
338        let buf = [0x42, 0xFF];
339        let mut cursor = SbeCursor::new(&buf);
340
341        assert_eq!(cursor.read_u8().unwrap(), 0x42);
342        assert_eq!(cursor.pos(), 1);
343
344        assert_eq!(cursor.read_u8().unwrap(), 0xFF);
345        assert_eq!(cursor.pos(), 2);
346
347        assert!(cursor.read_u8().is_err());
348    }
349
350    #[rstest]
351    fn test_read_i8() {
352        let buf = [0x7F, 0x80]; // 127, -128
353        let mut cursor = SbeCursor::new(&buf);
354
355        assert_eq!(cursor.read_i8().unwrap(), 127);
356        assert_eq!(cursor.read_i8().unwrap(), -128);
357    }
358
359    #[rstest]
360    fn test_read_u16_le() {
361        let buf = [0x34, 0x12]; // 0x1234 in little-endian
362        let mut cursor = SbeCursor::new(&buf);
363
364        assert_eq!(cursor.read_u16_le().unwrap(), 0x1234);
365        assert_eq!(cursor.pos(), 2);
366    }
367
368    #[rstest]
369    fn test_read_i64_le() {
370        let value: i64 = -1234567890123456789;
371        let buf = value.to_le_bytes();
372        let mut cursor = SbeCursor::new(&buf);
373
374        assert_eq!(cursor.read_i64_le().unwrap(), value);
375        assert_eq!(cursor.pos(), 8);
376    }
377
378    #[rstest]
379    fn test_read_optional_i64_null() {
380        let buf = i64::MIN.to_le_bytes();
381        let mut cursor = SbeCursor::new(&buf);
382
383        assert_eq!(cursor.read_optional_i64_le().unwrap(), None);
384    }
385
386    #[rstest]
387    fn test_read_optional_i64_present() {
388        let value: i64 = 12345;
389        let buf = value.to_le_bytes();
390        let mut cursor = SbeCursor::new(&buf);
391
392        assert_eq!(cursor.read_optional_i64_le().unwrap(), Some(12345));
393    }
394
395    #[rstest]
396    fn test_read_var_string8() {
397        let mut buf = vec![5]; // length = 5
398        buf.extend_from_slice(b"hello");
399        let mut cursor = SbeCursor::new(&buf);
400
401        assert_eq!(cursor.read_var_string8().unwrap(), "hello");
402        assert_eq!(cursor.pos(), 6); // 1 + 5
403    }
404
405    #[rstest]
406    fn test_read_var_string8_empty() {
407        let buf = [0]; // length = 0
408        let mut cursor = SbeCursor::new(&buf);
409
410        assert_eq!(cursor.read_var_string8().unwrap(), "");
411        assert_eq!(cursor.pos(), 1);
412    }
413
414    #[rstest]
415    fn test_read_var_string8_invalid_utf8() {
416        let buf = [2, 0xFF, 0xFE]; // length = 2, invalid UTF-8
417        let mut cursor = SbeCursor::new(&buf);
418
419        assert!(matches!(
420            cursor.read_var_string8(),
421            Err(SbeDecodeError::InvalidUtf8)
422        ));
423    }
424
425    #[rstest]
426    fn test_read_group_header() {
427        // block_length = 24, num_in_group = 3
428        let buf = [24, 0, 3, 0, 0, 0];
429        let mut cursor = SbeCursor::new(&buf);
430
431        let (block_len, count) = cursor.read_group_header().unwrap();
432        assert_eq!(block_len, 24);
433        assert_eq!(count, 3);
434        assert_eq!(cursor.pos(), 6);
435    }
436
437    #[rstest]
438    fn test_read_group_header_too_large() {
439        // num_in_group = MAX_GROUP_SIZE + 1
440        let count = MAX_GROUP_SIZE + 1;
441        let mut buf = vec![24, 0]; // block_length = 24
442        buf.extend_from_slice(&count.to_le_bytes());
443        let mut cursor = SbeCursor::new(&buf);
444
445        assert!(matches!(
446            cursor.read_group_header(),
447            Err(SbeDecodeError::GroupSizeTooLarge { .. })
448        ));
449    }
450
451    #[rstest]
452    fn test_read_group() {
453        // 2 items, each 4 bytes containing a u32
454        let mut buf = Vec::new();
455        buf.extend_from_slice(&100u32.to_le_bytes()); // item 0
456        buf.extend_from_slice(&200u32.to_le_bytes()); // item 1
457
458        let mut cursor = SbeCursor::new(&buf);
459        let items: Vec<u32> = cursor.read_group(4, 2, |c| c.read_u32_le()).unwrap();
460
461        assert_eq!(items, vec![100, 200]);
462        assert_eq!(cursor.pos(), 8);
463    }
464
465    #[rstest]
466    fn test_read_group_respects_block_length() {
467        // 2 items, block_length = 8, but we only read 4 bytes per item
468        let mut buf = Vec::new();
469        buf.extend_from_slice(&100u32.to_le_bytes());
470        buf.extend_from_slice(&[0, 0, 0, 0]); // padding
471        buf.extend_from_slice(&200u32.to_le_bytes());
472        buf.extend_from_slice(&[0, 0, 0, 0]); // padding
473
474        let mut cursor = SbeCursor::new(&buf);
475        let items: Vec<u32> = cursor.read_group(8, 2, |c| c.read_u32_le()).unwrap();
476
477        assert_eq!(items, vec![100, 200]);
478        assert_eq!(cursor.pos(), 16); // 2 * 8
479    }
480
481    #[rstest]
482    fn test_require_success() {
483        let buf = [1, 2, 3, 4];
484        let cursor = SbeCursor::new(&buf);
485
486        assert!(cursor.require(4).is_ok());
487        assert!(cursor.require(3).is_ok());
488    }
489
490    #[rstest]
491    fn test_require_failure() {
492        let buf = [1, 2];
493        let cursor = SbeCursor::new(&buf);
494
495        let err = cursor.require(5).unwrap_err();
496        assert!(matches!(
497            err,
498            SbeDecodeError::BufferTooShort {
499                expected: 5,
500                actual: 2
501            }
502        ));
503    }
504
505    #[rstest]
506    fn test_advance() {
507        let buf = [1, 2, 3, 4];
508        let mut cursor = SbeCursor::new(&buf);
509
510        cursor.advance(2).unwrap();
511        assert_eq!(cursor.pos(), 2);
512
513        cursor.advance(2).unwrap();
514        assert_eq!(cursor.pos(), 4);
515
516        assert!(cursor.advance(1).is_err());
517    }
518
519    #[rstest]
520    fn test_peek() {
521        let buf = [1, 2, 3, 4];
522        let mut cursor = SbeCursor::new(&buf);
523
524        assert_eq!(cursor.peek(), &[1, 2, 3, 4]);
525
526        cursor.advance(2).unwrap();
527        assert_eq!(cursor.peek(), &[3, 4]);
528    }
529
530    #[rstest]
531    fn test_reset() {
532        let buf = [1, 2, 3, 4];
533        let mut cursor = SbeCursor::new(&buf);
534
535        cursor.advance(3).unwrap();
536        assert_eq!(cursor.pos(), 3);
537
538        cursor.reset();
539        assert_eq!(cursor.pos(), 0);
540    }
541}