nautilus_binance/common/sbe/
cursor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Zero-copy SBE byte cursor for sequential decoding.
17
18use super::error::{MAX_GROUP_SIZE, SbeDecodeError};
19
20/// Zero-copy SBE byte cursor for sequential decoding.
21///
22/// Wraps a byte slice and tracks position, providing typed read methods
23/// that automatically advance the cursor.
24#[derive(Debug, Clone)]
25pub struct SbeCursor<'a> {
26    buf: &'a [u8],
27    pos: usize,
28}
29
30impl<'a> SbeCursor<'a> {
31    /// Creates a new cursor at position 0.
32    #[must_use]
33    pub const fn new(buf: &'a [u8]) -> Self {
34        Self { buf, pos: 0 }
35    }
36
37    /// Creates a cursor starting at a specific offset.
38    #[must_use]
39    pub const fn new_at(buf: &'a [u8], pos: usize) -> Self {
40        Self { buf, pos }
41    }
42
43    /// Current position in the buffer.
44    #[must_use]
45    pub const fn pos(&self) -> usize {
46        self.pos
47    }
48
49    /// Remaining bytes from current position.
50    #[must_use]
51    pub const fn remaining(&self) -> usize {
52        self.buf.len().saturating_sub(self.pos)
53    }
54
55    /// Returns the underlying buffer.
56    #[must_use]
57    pub const fn buffer(&self) -> &'a [u8] {
58        self.buf
59    }
60
61    /// Returns remaining bytes as a slice.
62    #[must_use]
63    pub fn peek(&self) -> &'a [u8] {
64        &self.buf[self.pos..]
65    }
66
67    /// Ensures at least `n` bytes remain.
68    ///
69    /// # Errors
70    ///
71    /// Returns `BufferTooShort` if fewer than `n` bytes remain.
72    pub fn require(&self, n: usize) -> Result<(), SbeDecodeError> {
73        if self.remaining() < n {
74            return Err(SbeDecodeError::BufferTooShort {
75                expected: self.pos + n,
76                actual: self.buf.len(),
77            });
78        }
79        Ok(())
80    }
81
82    /// Advances position by `n` bytes.
83    ///
84    /// # Errors
85    ///
86    /// Returns `BufferTooShort` if fewer than `n` bytes remain.
87    pub fn advance(&mut self, n: usize) -> Result<(), SbeDecodeError> {
88        self.require(n)?;
89        self.pos += n;
90        Ok(())
91    }
92
93    /// Skips `n` bytes without bounds checking.
94    ///
95    /// # Safety
96    ///
97    /// Caller must ensure `n` bytes are available.
98    pub fn skip(&mut self, n: usize) {
99        self.pos += n;
100    }
101
102    /// Resets cursor to start of buffer.
103    pub fn reset(&mut self) {
104        self.pos = 0;
105    }
106
107    /// Sets cursor to a specific position.
108    pub fn set_pos(&mut self, pos: usize) {
109        self.pos = pos;
110    }
111
112    /// Reads a u8 and advances by 1 byte.
113    pub fn read_u8(&mut self) -> Result<u8, SbeDecodeError> {
114        self.require(1)?;
115        let value = self.buf[self.pos];
116        self.pos += 1;
117        Ok(value)
118    }
119
120    /// Reads an i8 and advances by 1 byte.
121    pub fn read_i8(&mut self) -> Result<i8, SbeDecodeError> {
122        self.require(1)?;
123        let value = self.buf[self.pos] as i8;
124        self.pos += 1;
125        Ok(value)
126    }
127
128    /// Reads a u16 little-endian and advances by 2 bytes.
129    pub fn read_u16_le(&mut self) -> Result<u16, SbeDecodeError> {
130        self.require(2)?;
131        let value = u16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
132        self.pos += 2;
133        Ok(value)
134    }
135
136    /// Reads an i16 little-endian and advances by 2 bytes.
137    pub fn read_i16_le(&mut self) -> Result<i16, SbeDecodeError> {
138        self.require(2)?;
139        let value = i16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
140        self.pos += 2;
141        Ok(value)
142    }
143
144    /// Reads a u32 little-endian and advances by 4 bytes.
145    pub fn read_u32_le(&mut self) -> Result<u32, SbeDecodeError> {
146        self.require(4)?;
147        let value = u32::from_le_bytes([
148            self.buf[self.pos],
149            self.buf[self.pos + 1],
150            self.buf[self.pos + 2],
151            self.buf[self.pos + 3],
152        ]);
153        self.pos += 4;
154        Ok(value)
155    }
156
157    /// Reads an i32 little-endian and advances by 4 bytes.
158    pub fn read_i32_le(&mut self) -> Result<i32, SbeDecodeError> {
159        self.require(4)?;
160        let value = i32::from_le_bytes([
161            self.buf[self.pos],
162            self.buf[self.pos + 1],
163            self.buf[self.pos + 2],
164            self.buf[self.pos + 3],
165        ]);
166        self.pos += 4;
167        Ok(value)
168    }
169
170    /// Reads a u64 little-endian and advances by 8 bytes.
171    pub fn read_u64_le(&mut self) -> Result<u64, SbeDecodeError> {
172        self.require(8)?;
173        let value = u64::from_le_bytes([
174            self.buf[self.pos],
175            self.buf[self.pos + 1],
176            self.buf[self.pos + 2],
177            self.buf[self.pos + 3],
178            self.buf[self.pos + 4],
179            self.buf[self.pos + 5],
180            self.buf[self.pos + 6],
181            self.buf[self.pos + 7],
182        ]);
183        self.pos += 8;
184        Ok(value)
185    }
186
187    /// Reads an i64 little-endian and advances by 8 bytes.
188    pub fn read_i64_le(&mut self) -> Result<i64, SbeDecodeError> {
189        self.require(8)?;
190        let value = i64::from_le_bytes([
191            self.buf[self.pos],
192            self.buf[self.pos + 1],
193            self.buf[self.pos + 2],
194            self.buf[self.pos + 3],
195            self.buf[self.pos + 4],
196            self.buf[self.pos + 5],
197            self.buf[self.pos + 6],
198            self.buf[self.pos + 7],
199        ]);
200        self.pos += 8;
201        Ok(value)
202    }
203
204    /// Reads an optional i64 where `i64::MIN` represents None.
205    pub fn read_optional_i64_le(&mut self) -> Result<Option<i64>, SbeDecodeError> {
206        let value = self.read_i64_le()?;
207        Ok(if value == i64::MIN { None } else { Some(value) })
208    }
209
210    /// Reads N bytes and advances.
211    pub fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], SbeDecodeError> {
212        self.require(n)?;
213        let slice = &self.buf[self.pos..self.pos + n];
214        self.pos += n;
215        Ok(slice)
216    }
217
218    /// Reads a varString8 (1-byte length prefix + UTF-8 data).
219    ///
220    /// Returns empty string if length is 0.
221    pub fn read_var_string8(&mut self) -> Result<String, SbeDecodeError> {
222        let len = self.read_u8()? as usize;
223        if len == 0 {
224            return Ok(String::new());
225        }
226        self.require(len)?;
227        let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
228            .map_err(|_| SbeDecodeError::InvalidUtf8)?
229            .to_string();
230        self.pos += len;
231        Ok(s)
232    }
233
234    /// Reads a varString8 as a &str (zero-copy).
235    pub fn read_var_string8_ref(&mut self) -> Result<&'a str, SbeDecodeError> {
236        let len = self.read_u8()? as usize;
237        if len == 0 {
238            return Ok("");
239        }
240        self.require(len)?;
241        let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
242            .map_err(|_| SbeDecodeError::InvalidUtf8)?;
243        self.pos += len;
244        Ok(s)
245    }
246
247    /// Skips a varData8 field (1-byte length prefix + binary data).
248    ///
249    /// Used for skipping binary fields that should not be decoded as UTF-8.
250    pub fn skip_var_data8(&mut self) -> Result<(), SbeDecodeError> {
251        let len = self.read_u8()? as usize;
252        if len > 0 {
253            self.advance(len)?;
254        }
255        Ok(())
256    }
257
258    /// Reads a varData8 field (1-byte length prefix + binary data).
259    ///
260    /// Returns the raw bytes without UTF-8 decoding.
261    pub fn read_var_bytes8(&mut self) -> Result<Vec<u8>, SbeDecodeError> {
262        let len = self.read_u8()? as usize;
263        if len == 0 {
264            return Ok(Vec::new());
265        }
266        self.require(len)?;
267        let bytes = self.buf[self.pos..self.pos + len].to_vec();
268        self.pos += len;
269        Ok(bytes)
270    }
271
272    /// Reads group header (u16 block_length + u32 num_in_group).
273    ///
274    /// Returns (block_length, num_in_group).
275    pub fn read_group_header(&mut self) -> Result<(u16, u32), SbeDecodeError> {
276        let block_length = self.read_u16_le()?;
277        let num_in_group = self.read_u32_le()?;
278
279        if num_in_group > MAX_GROUP_SIZE {
280            return Err(SbeDecodeError::GroupSizeTooLarge {
281                count: num_in_group,
282                max: MAX_GROUP_SIZE,
283            });
284        }
285
286        Ok((block_length, num_in_group))
287    }
288
289    /// Reads compact group header (u16 block_length + u16 num_in_group).
290    ///
291    /// Returns (block_length, num_in_group).
292    pub fn read_group_header_16(&mut self) -> Result<(u16, u16), SbeDecodeError> {
293        let block_length = self.read_u16_le()?;
294        let num_in_group = self.read_u16_le()?;
295
296        if u32::from(num_in_group) > MAX_GROUP_SIZE {
297            return Err(SbeDecodeError::GroupSizeTooLarge {
298                count: u32::from(num_in_group),
299                max: MAX_GROUP_SIZE,
300            });
301        }
302
303        Ok((block_length, num_in_group))
304    }
305
306    /// Iterates over a group, calling `decode_item` for each element.
307    ///
308    /// The decoder function receives a cursor positioned at the start of each item
309    /// and should decode the item without advancing past `block_length` bytes.
310    pub fn read_group<T, F>(
311        &mut self,
312        block_length: u16,
313        num_in_group: u32,
314        mut decode_item: F,
315    ) -> Result<Vec<T>, SbeDecodeError>
316    where
317        F: FnMut(&mut Self) -> Result<T, SbeDecodeError>,
318    {
319        let block_len = block_length as usize;
320        let count = num_in_group as usize;
321
322        // Validate we have enough bytes for all items
323        self.require(count * block_len)?;
324
325        let mut items = Vec::with_capacity(count);
326        for _ in 0..count {
327            let item_start = self.pos;
328            let item = decode_item(self)?;
329            items.push(item);
330
331            // Advance to next item boundary (respects block_length even if decoder read less)
332            self.pos = item_start + block_len;
333        }
334
335        Ok(items)
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use rstest::rstest;
342
343    use super::*;
344
345    #[rstest]
346    fn test_new_starts_at_zero() {
347        let buf = [1, 2, 3, 4];
348        let cursor = SbeCursor::new(&buf);
349        assert_eq!(cursor.pos(), 0);
350        assert_eq!(cursor.remaining(), 4);
351    }
352
353    #[rstest]
354    fn test_new_at_starts_at_offset() {
355        let buf = [1, 2, 3, 4];
356        let cursor = SbeCursor::new_at(&buf, 2);
357        assert_eq!(cursor.pos(), 2);
358        assert_eq!(cursor.remaining(), 2);
359    }
360
361    #[rstest]
362    fn test_read_u8() {
363        let buf = [0x42, 0xFF];
364        let mut cursor = SbeCursor::new(&buf);
365
366        assert_eq!(cursor.read_u8().unwrap(), 0x42);
367        assert_eq!(cursor.pos(), 1);
368
369        assert_eq!(cursor.read_u8().unwrap(), 0xFF);
370        assert_eq!(cursor.pos(), 2);
371
372        assert!(cursor.read_u8().is_err());
373    }
374
375    #[rstest]
376    fn test_read_i8() {
377        let buf = [0x7F, 0x80]; // 127, -128
378        let mut cursor = SbeCursor::new(&buf);
379
380        assert_eq!(cursor.read_i8().unwrap(), 127);
381        assert_eq!(cursor.read_i8().unwrap(), -128);
382    }
383
384    #[rstest]
385    fn test_read_u16_le() {
386        let buf = [0x34, 0x12]; // 0x1234 in little-endian
387        let mut cursor = SbeCursor::new(&buf);
388
389        assert_eq!(cursor.read_u16_le().unwrap(), 0x1234);
390        assert_eq!(cursor.pos(), 2);
391    }
392
393    #[rstest]
394    fn test_read_i64_le() {
395        let value: i64 = -1234567890123456789;
396        let buf = value.to_le_bytes();
397        let mut cursor = SbeCursor::new(&buf);
398
399        assert_eq!(cursor.read_i64_le().unwrap(), value);
400        assert_eq!(cursor.pos(), 8);
401    }
402
403    #[rstest]
404    fn test_read_optional_i64_null() {
405        let buf = i64::MIN.to_le_bytes();
406        let mut cursor = SbeCursor::new(&buf);
407
408        assert_eq!(cursor.read_optional_i64_le().unwrap(), None);
409    }
410
411    #[rstest]
412    fn test_read_optional_i64_present() {
413        let value: i64 = 12345;
414        let buf = value.to_le_bytes();
415        let mut cursor = SbeCursor::new(&buf);
416
417        assert_eq!(cursor.read_optional_i64_le().unwrap(), Some(12345));
418    }
419
420    #[rstest]
421    fn test_read_var_string8() {
422        let mut buf = vec![5]; // length = 5
423        buf.extend_from_slice(b"hello");
424        let mut cursor = SbeCursor::new(&buf);
425
426        assert_eq!(cursor.read_var_string8().unwrap(), "hello");
427        assert_eq!(cursor.pos(), 6); // 1 + 5
428    }
429
430    #[rstest]
431    fn test_read_var_string8_empty() {
432        let buf = [0]; // length = 0
433        let mut cursor = SbeCursor::new(&buf);
434
435        assert_eq!(cursor.read_var_string8().unwrap(), "");
436        assert_eq!(cursor.pos(), 1);
437    }
438
439    #[rstest]
440    fn test_read_var_string8_invalid_utf8() {
441        let buf = [2, 0xFF, 0xFE]; // length = 2, invalid UTF-8
442        let mut cursor = SbeCursor::new(&buf);
443
444        assert!(matches!(
445            cursor.read_var_string8(),
446            Err(SbeDecodeError::InvalidUtf8)
447        ));
448    }
449
450    #[rstest]
451    fn test_read_group_header() {
452        // block_length = 24, num_in_group = 3
453        let buf = [24, 0, 3, 0, 0, 0];
454        let mut cursor = SbeCursor::new(&buf);
455
456        let (block_len, count) = cursor.read_group_header().unwrap();
457        assert_eq!(block_len, 24);
458        assert_eq!(count, 3);
459        assert_eq!(cursor.pos(), 6);
460    }
461
462    #[rstest]
463    fn test_read_group_header_too_large() {
464        // num_in_group = MAX_GROUP_SIZE + 1
465        let count = MAX_GROUP_SIZE + 1;
466        let mut buf = vec![24, 0]; // block_length = 24
467        buf.extend_from_slice(&count.to_le_bytes());
468        let mut cursor = SbeCursor::new(&buf);
469
470        assert!(matches!(
471            cursor.read_group_header(),
472            Err(SbeDecodeError::GroupSizeTooLarge { .. })
473        ));
474    }
475
476    #[rstest]
477    fn test_read_group() {
478        // 2 items, each 4 bytes containing a u32
479        let mut buf = Vec::new();
480        buf.extend_from_slice(&100u32.to_le_bytes()); // item 0
481        buf.extend_from_slice(&200u32.to_le_bytes()); // item 1
482
483        let mut cursor = SbeCursor::new(&buf);
484        let items: Vec<u32> = cursor.read_group(4, 2, |c| c.read_u32_le()).unwrap();
485
486        assert_eq!(items, vec![100, 200]);
487        assert_eq!(cursor.pos(), 8);
488    }
489
490    #[rstest]
491    fn test_read_group_respects_block_length() {
492        // 2 items, block_length = 8, but we only read 4 bytes per item
493        let mut buf = Vec::new();
494        buf.extend_from_slice(&100u32.to_le_bytes());
495        buf.extend_from_slice(&[0, 0, 0, 0]); // padding
496        buf.extend_from_slice(&200u32.to_le_bytes());
497        buf.extend_from_slice(&[0, 0, 0, 0]); // padding
498
499        let mut cursor = SbeCursor::new(&buf);
500        let items: Vec<u32> = cursor.read_group(8, 2, |c| c.read_u32_le()).unwrap();
501
502        assert_eq!(items, vec![100, 200]);
503        assert_eq!(cursor.pos(), 16); // 2 * 8
504    }
505
506    #[rstest]
507    fn test_require_success() {
508        let buf = [1, 2, 3, 4];
509        let cursor = SbeCursor::new(&buf);
510
511        assert!(cursor.require(4).is_ok());
512        assert!(cursor.require(3).is_ok());
513    }
514
515    #[rstest]
516    fn test_require_failure() {
517        let buf = [1, 2];
518        let cursor = SbeCursor::new(&buf);
519
520        let err = cursor.require(5).unwrap_err();
521        assert!(matches!(
522            err,
523            SbeDecodeError::BufferTooShort {
524                expected: 5,
525                actual: 2
526            }
527        ));
528    }
529
530    #[rstest]
531    fn test_advance() {
532        let buf = [1, 2, 3, 4];
533        let mut cursor = SbeCursor::new(&buf);
534
535        cursor.advance(2).unwrap();
536        assert_eq!(cursor.pos(), 2);
537
538        cursor.advance(2).unwrap();
539        assert_eq!(cursor.pos(), 4);
540
541        assert!(cursor.advance(1).is_err());
542    }
543
544    #[rstest]
545    fn test_peek() {
546        let buf = [1, 2, 3, 4];
547        let mut cursor = SbeCursor::new(&buf);
548
549        assert_eq!(cursor.peek(), &[1, 2, 3, 4]);
550
551        cursor.advance(2).unwrap();
552        assert_eq!(cursor.peek(), &[3, 4]);
553    }
554
555    #[rstest]
556    fn test_reset() {
557        let buf = [1, 2, 3, 4];
558        let mut cursor = SbeCursor::new(&buf);
559
560        cursor.advance(3).unwrap();
561        assert_eq!(cursor.pos(), 3);
562
563        cursor.reset();
564        assert_eq!(cursor.pos(), 0);
565    }
566}