Skip to main content

nautilus_binance/common/sbe/
cursor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// SBE decode cursor - all read methods return SbeDecodeError::BufferUnderrun when buffer too short
17#![allow(clippy::missing_errors_doc)]
18
19//! Zero-copy SBE byte cursor for sequential decoding.
20
21use super::error::{MAX_GROUP_SIZE, SbeDecodeError};
22
23/// Zero-copy SBE byte cursor for sequential decoding.
24///
25/// Wraps a byte slice and tracks position, providing typed read methods
26/// that automatically advance the cursor.
27#[derive(Debug, Clone)]
28pub struct SbeCursor<'a> {
29    buf: &'a [u8],
30    pos: usize,
31}
32
33impl<'a> SbeCursor<'a> {
34    /// Creates a new cursor at position 0.
35    #[must_use]
36    pub const fn new(buf: &'a [u8]) -> Self {
37        Self { buf, pos: 0 }
38    }
39
40    /// Creates a cursor starting at a specific offset.
41    #[must_use]
42    pub const fn new_at(buf: &'a [u8], pos: usize) -> Self {
43        Self { buf, pos }
44    }
45
46    /// Current position in the buffer.
47    #[must_use]
48    pub const fn pos(&self) -> usize {
49        self.pos
50    }
51
52    /// Remaining bytes from current position.
53    #[must_use]
54    pub const fn remaining(&self) -> usize {
55        self.buf.len().saturating_sub(self.pos)
56    }
57
58    /// Returns the underlying buffer.
59    #[must_use]
60    pub const fn buffer(&self) -> &'a [u8] {
61        self.buf
62    }
63
64    /// Returns remaining bytes as a slice.
65    #[must_use]
66    pub fn peek(&self) -> &'a [u8] {
67        &self.buf[self.pos..]
68    }
69
70    /// Ensures at least `n` bytes remain.
71    ///
72    /// # Errors
73    ///
74    /// Returns `BufferTooShort` if fewer than `n` bytes remain.
75    pub fn require(&self, n: usize) -> Result<(), SbeDecodeError> {
76        if self.remaining() < n {
77            return Err(SbeDecodeError::BufferTooShort {
78                expected: self.pos + n,
79                actual: self.buf.len(),
80            });
81        }
82        Ok(())
83    }
84
85    /// Advances position by `n` bytes.
86    ///
87    /// # Errors
88    ///
89    /// Returns `BufferTooShort` if fewer than `n` bytes remain.
90    pub fn advance(&mut self, n: usize) -> Result<(), SbeDecodeError> {
91        self.require(n)?;
92        self.pos += n;
93        Ok(())
94    }
95
96    /// Skips `n` bytes without bounds checking.
97    ///
98    /// # Safety
99    ///
100    /// Caller must ensure `n` bytes are available.
101    pub fn skip(&mut self, n: usize) {
102        self.pos += n;
103    }
104
105    /// Resets cursor to start of buffer.
106    pub fn reset(&mut self) {
107        self.pos = 0;
108    }
109
110    /// Sets cursor to a specific position.
111    pub fn set_pos(&mut self, pos: usize) {
112        self.pos = pos;
113    }
114
115    /// Reads a u8 and advances by 1 byte.
116    pub fn read_u8(&mut self) -> Result<u8, SbeDecodeError> {
117        self.require(1)?;
118        let value = self.buf[self.pos];
119        self.pos += 1;
120        Ok(value)
121    }
122
123    /// Reads an i8 and advances by 1 byte.
124    pub fn read_i8(&mut self) -> Result<i8, SbeDecodeError> {
125        self.require(1)?;
126        let value = self.buf[self.pos] as i8;
127        self.pos += 1;
128        Ok(value)
129    }
130
131    /// Reads a u16 little-endian and advances by 2 bytes.
132    pub fn read_u16_le(&mut self) -> Result<u16, SbeDecodeError> {
133        self.require(2)?;
134        let value = u16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
135        self.pos += 2;
136        Ok(value)
137    }
138
139    /// Reads an i16 little-endian and advances by 2 bytes.
140    pub fn read_i16_le(&mut self) -> Result<i16, SbeDecodeError> {
141        self.require(2)?;
142        let value = i16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
143        self.pos += 2;
144        Ok(value)
145    }
146
147    /// Reads a u32 little-endian and advances by 4 bytes.
148    pub fn read_u32_le(&mut self) -> Result<u32, SbeDecodeError> {
149        self.require(4)?;
150        let value = u32::from_le_bytes([
151            self.buf[self.pos],
152            self.buf[self.pos + 1],
153            self.buf[self.pos + 2],
154            self.buf[self.pos + 3],
155        ]);
156        self.pos += 4;
157        Ok(value)
158    }
159
160    /// Reads an i32 little-endian and advances by 4 bytes.
161    pub fn read_i32_le(&mut self) -> Result<i32, SbeDecodeError> {
162        self.require(4)?;
163        let value = i32::from_le_bytes([
164            self.buf[self.pos],
165            self.buf[self.pos + 1],
166            self.buf[self.pos + 2],
167            self.buf[self.pos + 3],
168        ]);
169        self.pos += 4;
170        Ok(value)
171    }
172
173    /// Reads a u64 little-endian and advances by 8 bytes.
174    pub fn read_u64_le(&mut self) -> Result<u64, SbeDecodeError> {
175        self.require(8)?;
176        let value = u64::from_le_bytes([
177            self.buf[self.pos],
178            self.buf[self.pos + 1],
179            self.buf[self.pos + 2],
180            self.buf[self.pos + 3],
181            self.buf[self.pos + 4],
182            self.buf[self.pos + 5],
183            self.buf[self.pos + 6],
184            self.buf[self.pos + 7],
185        ]);
186        self.pos += 8;
187        Ok(value)
188    }
189
190    /// Reads an i64 little-endian and advances by 8 bytes.
191    pub fn read_i64_le(&mut self) -> Result<i64, SbeDecodeError> {
192        self.require(8)?;
193        let value = i64::from_le_bytes([
194            self.buf[self.pos],
195            self.buf[self.pos + 1],
196            self.buf[self.pos + 2],
197            self.buf[self.pos + 3],
198            self.buf[self.pos + 4],
199            self.buf[self.pos + 5],
200            self.buf[self.pos + 6],
201            self.buf[self.pos + 7],
202        ]);
203        self.pos += 8;
204        Ok(value)
205    }
206
207    /// Reads an optional i64 where `i64::MIN` represents None.
208    pub fn read_optional_i64_le(&mut self) -> Result<Option<i64>, SbeDecodeError> {
209        let value = self.read_i64_le()?;
210        Ok(if value == i64::MIN { None } else { Some(value) })
211    }
212
213    /// Reads N bytes and advances.
214    pub fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], SbeDecodeError> {
215        self.require(n)?;
216        let slice = &self.buf[self.pos..self.pos + n];
217        self.pos += n;
218        Ok(slice)
219    }
220
221    /// Reads a varString8 (1-byte length prefix + UTF-8 data).
222    ///
223    /// Returns empty string if length is 0.
224    pub fn read_var_string8(&mut self) -> Result<String, SbeDecodeError> {
225        let len = self.read_u8()? as usize;
226        if len == 0 {
227            return Ok(String::new());
228        }
229        self.require(len)?;
230        let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
231            .map_err(|_| SbeDecodeError::InvalidUtf8)?
232            .to_string();
233        self.pos += len;
234        Ok(s)
235    }
236
237    /// Reads a varString8 as a &str (zero-copy).
238    pub fn read_var_string8_ref(&mut self) -> Result<&'a str, SbeDecodeError> {
239        let len = self.read_u8()? as usize;
240        if len == 0 {
241            return Ok("");
242        }
243        self.require(len)?;
244        let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
245            .map_err(|_| SbeDecodeError::InvalidUtf8)?;
246        self.pos += len;
247        Ok(s)
248    }
249
250    /// Skips a varData8 field (1-byte length prefix + binary data).
251    ///
252    /// Used for skipping binary fields that should not be decoded as UTF-8.
253    pub fn skip_var_data8(&mut self) -> Result<(), SbeDecodeError> {
254        let len = self.read_u8()? as usize;
255        if len > 0 {
256            self.advance(len)?;
257        }
258        Ok(())
259    }
260
261    /// Reads a varData8 field (1-byte length prefix + binary data).
262    ///
263    /// Returns the raw bytes without UTF-8 decoding.
264    pub fn read_var_bytes8(&mut self) -> Result<Vec<u8>, SbeDecodeError> {
265        let len = self.read_u8()? as usize;
266        if len == 0 {
267            return Ok(Vec::new());
268        }
269        self.require(len)?;
270        let bytes = self.buf[self.pos..self.pos + len].to_vec();
271        self.pos += len;
272        Ok(bytes)
273    }
274
275    /// Reads group header (u16 block_length + u32 num_in_group).
276    ///
277    /// Returns (block_length, num_in_group).
278    pub fn read_group_header(&mut self) -> Result<(u16, u32), SbeDecodeError> {
279        let block_length = self.read_u16_le()?;
280        let num_in_group = self.read_u32_le()?;
281
282        if num_in_group > MAX_GROUP_SIZE {
283            return Err(SbeDecodeError::GroupSizeTooLarge {
284                count: num_in_group,
285                max: MAX_GROUP_SIZE,
286            });
287        }
288
289        Ok((block_length, num_in_group))
290    }
291
292    /// Reads compact group header (u16 block_length + u16 num_in_group).
293    ///
294    /// Returns (block_length, num_in_group).
295    pub fn read_group_header_16(&mut self) -> Result<(u16, u16), SbeDecodeError> {
296        let block_length = self.read_u16_le()?;
297        let num_in_group = self.read_u16_le()?;
298
299        if u32::from(num_in_group) > MAX_GROUP_SIZE {
300            return Err(SbeDecodeError::GroupSizeTooLarge {
301                count: u32::from(num_in_group),
302                max: MAX_GROUP_SIZE,
303            });
304        }
305
306        Ok((block_length, num_in_group))
307    }
308
309    /// Iterates over a group, calling `decode_item` for each element.
310    ///
311    /// The decoder function receives a cursor positioned at the start of each item
312    /// and should decode the item without advancing past `block_length` bytes.
313    pub fn read_group<T, F>(
314        &mut self,
315        block_length: u16,
316        num_in_group: u32,
317        mut decode_item: F,
318    ) -> Result<Vec<T>, SbeDecodeError>
319    where
320        F: FnMut(&mut Self) -> Result<T, SbeDecodeError>,
321    {
322        let block_len = block_length as usize;
323        let count = num_in_group as usize;
324
325        // Validate we have enough bytes for all items
326        self.require(count * block_len)?;
327
328        let mut items = Vec::with_capacity(count);
329        for _ in 0..count {
330            let item_start = self.pos;
331            let item = decode_item(self)?;
332            items.push(item);
333
334            // Advance to next item boundary (respects block_length even if decoder read less)
335            self.pos = item_start + block_len;
336        }
337
338        Ok(items)
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use rstest::rstest;
345
346    use super::*;
347
348    #[rstest]
349    fn test_new_starts_at_zero() {
350        let buf = [1, 2, 3, 4];
351        let cursor = SbeCursor::new(&buf);
352        assert_eq!(cursor.pos(), 0);
353        assert_eq!(cursor.remaining(), 4);
354    }
355
356    #[rstest]
357    fn test_new_at_starts_at_offset() {
358        let buf = [1, 2, 3, 4];
359        let cursor = SbeCursor::new_at(&buf, 2);
360        assert_eq!(cursor.pos(), 2);
361        assert_eq!(cursor.remaining(), 2);
362    }
363
364    #[rstest]
365    fn test_read_u8() {
366        let buf = [0x42, 0xFF];
367        let mut cursor = SbeCursor::new(&buf);
368
369        assert_eq!(cursor.read_u8().unwrap(), 0x42);
370        assert_eq!(cursor.pos(), 1);
371
372        assert_eq!(cursor.read_u8().unwrap(), 0xFF);
373        assert_eq!(cursor.pos(), 2);
374
375        assert!(cursor.read_u8().is_err());
376    }
377
378    #[rstest]
379    fn test_read_i8() {
380        let buf = [0x7F, 0x80]; // 127, -128
381        let mut cursor = SbeCursor::new(&buf);
382
383        assert_eq!(cursor.read_i8().unwrap(), 127);
384        assert_eq!(cursor.read_i8().unwrap(), -128);
385    }
386
387    #[rstest]
388    fn test_read_u16_le() {
389        let buf = [0x34, 0x12]; // 0x1234 in little-endian
390        let mut cursor = SbeCursor::new(&buf);
391
392        assert_eq!(cursor.read_u16_le().unwrap(), 0x1234);
393        assert_eq!(cursor.pos(), 2);
394    }
395
396    #[rstest]
397    fn test_read_i64_le() {
398        let value: i64 = -1234567890123456789;
399        let buf = value.to_le_bytes();
400        let mut cursor = SbeCursor::new(&buf);
401
402        assert_eq!(cursor.read_i64_le().unwrap(), value);
403        assert_eq!(cursor.pos(), 8);
404    }
405
406    #[rstest]
407    fn test_read_optional_i64_null() {
408        let buf = i64::MIN.to_le_bytes();
409        let mut cursor = SbeCursor::new(&buf);
410
411        assert_eq!(cursor.read_optional_i64_le().unwrap(), None);
412    }
413
414    #[rstest]
415    fn test_read_optional_i64_present() {
416        let value: i64 = 12345;
417        let buf = value.to_le_bytes();
418        let mut cursor = SbeCursor::new(&buf);
419
420        assert_eq!(cursor.read_optional_i64_le().unwrap(), Some(12345));
421    }
422
423    #[rstest]
424    fn test_read_var_string8() {
425        let mut buf = vec![5]; // length = 5
426        buf.extend_from_slice(b"hello");
427        let mut cursor = SbeCursor::new(&buf);
428
429        assert_eq!(cursor.read_var_string8().unwrap(), "hello");
430        assert_eq!(cursor.pos(), 6); // 1 + 5
431    }
432
433    #[rstest]
434    fn test_read_var_string8_empty() {
435        let buf = [0]; // length = 0
436        let mut cursor = SbeCursor::new(&buf);
437
438        assert_eq!(cursor.read_var_string8().unwrap(), "");
439        assert_eq!(cursor.pos(), 1);
440    }
441
442    #[rstest]
443    fn test_read_var_string8_invalid_utf8() {
444        let buf = [2, 0xFF, 0xFE]; // length = 2, invalid UTF-8
445        let mut cursor = SbeCursor::new(&buf);
446
447        assert!(matches!(
448            cursor.read_var_string8(),
449            Err(SbeDecodeError::InvalidUtf8)
450        ));
451    }
452
453    #[rstest]
454    fn test_read_group_header() {
455        // block_length = 24, num_in_group = 3
456        let buf = [24, 0, 3, 0, 0, 0];
457        let mut cursor = SbeCursor::new(&buf);
458
459        let (block_len, count) = cursor.read_group_header().unwrap();
460        assert_eq!(block_len, 24);
461        assert_eq!(count, 3);
462        assert_eq!(cursor.pos(), 6);
463    }
464
465    #[rstest]
466    fn test_read_group_header_too_large() {
467        // num_in_group = MAX_GROUP_SIZE + 1
468        let count = MAX_GROUP_SIZE + 1;
469        let mut buf = vec![24, 0]; // block_length = 24
470        buf.extend_from_slice(&count.to_le_bytes());
471        let mut cursor = SbeCursor::new(&buf);
472
473        assert!(matches!(
474            cursor.read_group_header(),
475            Err(SbeDecodeError::GroupSizeTooLarge { .. })
476        ));
477    }
478
479    #[rstest]
480    fn test_read_group() {
481        // 2 items, each 4 bytes containing a u32
482        let mut buf = Vec::new();
483        buf.extend_from_slice(&100u32.to_le_bytes()); // item 0
484        buf.extend_from_slice(&200u32.to_le_bytes()); // item 1
485
486        let mut cursor = SbeCursor::new(&buf);
487        let items: Vec<u32> = cursor.read_group(4, 2, |c| c.read_u32_le()).unwrap();
488
489        assert_eq!(items, vec![100, 200]);
490        assert_eq!(cursor.pos(), 8);
491    }
492
493    #[rstest]
494    fn test_read_group_respects_block_length() {
495        // 2 items, block_length = 8, but we only read 4 bytes per item
496        let mut buf = Vec::new();
497        buf.extend_from_slice(&100u32.to_le_bytes());
498        buf.extend_from_slice(&[0, 0, 0, 0]); // padding
499        buf.extend_from_slice(&200u32.to_le_bytes());
500        buf.extend_from_slice(&[0, 0, 0, 0]); // padding
501
502        let mut cursor = SbeCursor::new(&buf);
503        let items: Vec<u32> = cursor.read_group(8, 2, |c| c.read_u32_le()).unwrap();
504
505        assert_eq!(items, vec![100, 200]);
506        assert_eq!(cursor.pos(), 16); // 2 * 8
507    }
508
509    #[rstest]
510    fn test_require_success() {
511        let buf = [1, 2, 3, 4];
512        let cursor = SbeCursor::new(&buf);
513
514        assert!(cursor.require(4).is_ok());
515        assert!(cursor.require(3).is_ok());
516    }
517
518    #[rstest]
519    fn test_require_failure() {
520        let buf = [1, 2];
521        let cursor = SbeCursor::new(&buf);
522
523        let err = cursor.require(5).unwrap_err();
524        assert!(matches!(
525            err,
526            SbeDecodeError::BufferTooShort {
527                expected: 5,
528                actual: 2
529            }
530        ));
531    }
532
533    #[rstest]
534    fn test_advance() {
535        let buf = [1, 2, 3, 4];
536        let mut cursor = SbeCursor::new(&buf);
537
538        cursor.advance(2).unwrap();
539        assert_eq!(cursor.pos(), 2);
540
541        cursor.advance(2).unwrap();
542        assert_eq!(cursor.pos(), 4);
543
544        assert!(cursor.advance(1).is_err());
545    }
546
547    #[rstest]
548    fn test_peek() {
549        let buf = [1, 2, 3, 4];
550        let mut cursor = SbeCursor::new(&buf);
551
552        assert_eq!(cursor.peek(), &[1, 2, 3, 4]);
553
554        cursor.advance(2).unwrap();
555        assert_eq!(cursor.peek(), &[3, 4]);
556    }
557
558    #[rstest]
559    fn test_reset() {
560        let buf = [1, 2, 3, 4];
561        let mut cursor = SbeCursor::new(&buf);
562
563        cursor.advance(3).unwrap();
564        assert_eq!(cursor.pos(), 3);
565
566        cursor.reset();
567        assert_eq!(cursor.pos(), 0);
568    }
569}