nautilus_binance/common/sbe/
cursor.rs1#![allow(clippy::missing_errors_doc)]
18
19use super::error::{MAX_GROUP_SIZE, SbeDecodeError};
22
23#[derive(Debug, Clone)]
28pub struct SbeCursor<'a> {
29 buf: &'a [u8],
30 pos: usize,
31}
32
33impl<'a> SbeCursor<'a> {
34 #[must_use]
36 pub const fn new(buf: &'a [u8]) -> Self {
37 Self { buf, pos: 0 }
38 }
39
40 #[must_use]
42 pub const fn new_at(buf: &'a [u8], pos: usize) -> Self {
43 Self { buf, pos }
44 }
45
46 #[must_use]
48 pub const fn pos(&self) -> usize {
49 self.pos
50 }
51
52 #[must_use]
54 pub const fn remaining(&self) -> usize {
55 self.buf.len().saturating_sub(self.pos)
56 }
57
58 #[must_use]
60 pub const fn buffer(&self) -> &'a [u8] {
61 self.buf
62 }
63
64 #[must_use]
66 pub fn peek(&self) -> &'a [u8] {
67 &self.buf[self.pos..]
68 }
69
70 pub fn require(&self, n: usize) -> Result<(), SbeDecodeError> {
76 if self.remaining() < n {
77 return Err(SbeDecodeError::BufferTooShort {
78 expected: self.pos + n,
79 actual: self.buf.len(),
80 });
81 }
82 Ok(())
83 }
84
85 pub fn advance(&mut self, n: usize) -> Result<(), SbeDecodeError> {
91 self.require(n)?;
92 self.pos += n;
93 Ok(())
94 }
95
96 pub fn skip(&mut self, n: usize) {
102 self.pos += n;
103 }
104
105 pub fn reset(&mut self) {
107 self.pos = 0;
108 }
109
110 pub fn set_pos(&mut self, pos: usize) {
112 self.pos = pos;
113 }
114
115 pub fn read_u8(&mut self) -> Result<u8, SbeDecodeError> {
117 self.require(1)?;
118 let value = self.buf[self.pos];
119 self.pos += 1;
120 Ok(value)
121 }
122
123 pub fn read_i8(&mut self) -> Result<i8, SbeDecodeError> {
125 self.require(1)?;
126 let value = self.buf[self.pos] as i8;
127 self.pos += 1;
128 Ok(value)
129 }
130
131 pub fn read_u16_le(&mut self) -> Result<u16, SbeDecodeError> {
133 self.require(2)?;
134 let value = u16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
135 self.pos += 2;
136 Ok(value)
137 }
138
139 pub fn read_i16_le(&mut self) -> Result<i16, SbeDecodeError> {
141 self.require(2)?;
142 let value = i16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
143 self.pos += 2;
144 Ok(value)
145 }
146
147 pub fn read_u32_le(&mut self) -> Result<u32, SbeDecodeError> {
149 self.require(4)?;
150 let value = u32::from_le_bytes([
151 self.buf[self.pos],
152 self.buf[self.pos + 1],
153 self.buf[self.pos + 2],
154 self.buf[self.pos + 3],
155 ]);
156 self.pos += 4;
157 Ok(value)
158 }
159
160 pub fn read_i32_le(&mut self) -> Result<i32, SbeDecodeError> {
162 self.require(4)?;
163 let value = i32::from_le_bytes([
164 self.buf[self.pos],
165 self.buf[self.pos + 1],
166 self.buf[self.pos + 2],
167 self.buf[self.pos + 3],
168 ]);
169 self.pos += 4;
170 Ok(value)
171 }
172
173 pub fn read_u64_le(&mut self) -> Result<u64, SbeDecodeError> {
175 self.require(8)?;
176 let value = u64::from_le_bytes([
177 self.buf[self.pos],
178 self.buf[self.pos + 1],
179 self.buf[self.pos + 2],
180 self.buf[self.pos + 3],
181 self.buf[self.pos + 4],
182 self.buf[self.pos + 5],
183 self.buf[self.pos + 6],
184 self.buf[self.pos + 7],
185 ]);
186 self.pos += 8;
187 Ok(value)
188 }
189
190 pub fn read_i64_le(&mut self) -> Result<i64, SbeDecodeError> {
192 self.require(8)?;
193 let value = i64::from_le_bytes([
194 self.buf[self.pos],
195 self.buf[self.pos + 1],
196 self.buf[self.pos + 2],
197 self.buf[self.pos + 3],
198 self.buf[self.pos + 4],
199 self.buf[self.pos + 5],
200 self.buf[self.pos + 6],
201 self.buf[self.pos + 7],
202 ]);
203 self.pos += 8;
204 Ok(value)
205 }
206
207 pub fn read_optional_i64_le(&mut self) -> Result<Option<i64>, SbeDecodeError> {
209 let value = self.read_i64_le()?;
210 Ok(if value == i64::MIN { None } else { Some(value) })
211 }
212
213 pub fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], SbeDecodeError> {
215 self.require(n)?;
216 let slice = &self.buf[self.pos..self.pos + n];
217 self.pos += n;
218 Ok(slice)
219 }
220
221 pub fn read_var_string8(&mut self) -> Result<String, SbeDecodeError> {
225 let len = self.read_u8()? as usize;
226 if len == 0 {
227 return Ok(String::new());
228 }
229 self.require(len)?;
230 let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
231 .map_err(|_| SbeDecodeError::InvalidUtf8)?
232 .to_string();
233 self.pos += len;
234 Ok(s)
235 }
236
237 pub fn read_var_string8_ref(&mut self) -> Result<&'a str, SbeDecodeError> {
239 let len = self.read_u8()? as usize;
240 if len == 0 {
241 return Ok("");
242 }
243 self.require(len)?;
244 let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
245 .map_err(|_| SbeDecodeError::InvalidUtf8)?;
246 self.pos += len;
247 Ok(s)
248 }
249
250 pub fn skip_var_data8(&mut self) -> Result<(), SbeDecodeError> {
254 let len = self.read_u8()? as usize;
255 if len > 0 {
256 self.advance(len)?;
257 }
258 Ok(())
259 }
260
261 pub fn read_var_bytes8(&mut self) -> Result<Vec<u8>, SbeDecodeError> {
265 let len = self.read_u8()? as usize;
266 if len == 0 {
267 return Ok(Vec::new());
268 }
269 self.require(len)?;
270 let bytes = self.buf[self.pos..self.pos + len].to_vec();
271 self.pos += len;
272 Ok(bytes)
273 }
274
275 pub fn read_group_header(&mut self) -> Result<(u16, u32), SbeDecodeError> {
279 let block_length = self.read_u16_le()?;
280 let num_in_group = self.read_u32_le()?;
281
282 if num_in_group > MAX_GROUP_SIZE {
283 return Err(SbeDecodeError::GroupSizeTooLarge {
284 count: num_in_group,
285 max: MAX_GROUP_SIZE,
286 });
287 }
288
289 Ok((block_length, num_in_group))
290 }
291
292 pub fn read_group_header_16(&mut self) -> Result<(u16, u16), SbeDecodeError> {
296 let block_length = self.read_u16_le()?;
297 let num_in_group = self.read_u16_le()?;
298
299 if u32::from(num_in_group) > MAX_GROUP_SIZE {
300 return Err(SbeDecodeError::GroupSizeTooLarge {
301 count: u32::from(num_in_group),
302 max: MAX_GROUP_SIZE,
303 });
304 }
305
306 Ok((block_length, num_in_group))
307 }
308
309 pub fn read_group<T, F>(
314 &mut self,
315 block_length: u16,
316 num_in_group: u32,
317 mut decode_item: F,
318 ) -> Result<Vec<T>, SbeDecodeError>
319 where
320 F: FnMut(&mut Self) -> Result<T, SbeDecodeError>,
321 {
322 let block_len = block_length as usize;
323 let count = num_in_group as usize;
324
325 self.require(count * block_len)?;
327
328 let mut items = Vec::with_capacity(count);
329 for _ in 0..count {
330 let item_start = self.pos;
331 let item = decode_item(self)?;
332 items.push(item);
333
334 self.pos = item_start + block_len;
336 }
337
338 Ok(items)
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use rstest::rstest;
345
346 use super::*;
347
348 #[rstest]
349 fn test_new_starts_at_zero() {
350 let buf = [1, 2, 3, 4];
351 let cursor = SbeCursor::new(&buf);
352 assert_eq!(cursor.pos(), 0);
353 assert_eq!(cursor.remaining(), 4);
354 }
355
356 #[rstest]
357 fn test_new_at_starts_at_offset() {
358 let buf = [1, 2, 3, 4];
359 let cursor = SbeCursor::new_at(&buf, 2);
360 assert_eq!(cursor.pos(), 2);
361 assert_eq!(cursor.remaining(), 2);
362 }
363
364 #[rstest]
365 fn test_read_u8() {
366 let buf = [0x42, 0xFF];
367 let mut cursor = SbeCursor::new(&buf);
368
369 assert_eq!(cursor.read_u8().unwrap(), 0x42);
370 assert_eq!(cursor.pos(), 1);
371
372 assert_eq!(cursor.read_u8().unwrap(), 0xFF);
373 assert_eq!(cursor.pos(), 2);
374
375 assert!(cursor.read_u8().is_err());
376 }
377
378 #[rstest]
379 fn test_read_i8() {
380 let buf = [0x7F, 0x80]; let mut cursor = SbeCursor::new(&buf);
382
383 assert_eq!(cursor.read_i8().unwrap(), 127);
384 assert_eq!(cursor.read_i8().unwrap(), -128);
385 }
386
387 #[rstest]
388 fn test_read_u16_le() {
389 let buf = [0x34, 0x12]; let mut cursor = SbeCursor::new(&buf);
391
392 assert_eq!(cursor.read_u16_le().unwrap(), 0x1234);
393 assert_eq!(cursor.pos(), 2);
394 }
395
396 #[rstest]
397 fn test_read_i64_le() {
398 let value: i64 = -1234567890123456789;
399 let buf = value.to_le_bytes();
400 let mut cursor = SbeCursor::new(&buf);
401
402 assert_eq!(cursor.read_i64_le().unwrap(), value);
403 assert_eq!(cursor.pos(), 8);
404 }
405
406 #[rstest]
407 fn test_read_optional_i64_null() {
408 let buf = i64::MIN.to_le_bytes();
409 let mut cursor = SbeCursor::new(&buf);
410
411 assert_eq!(cursor.read_optional_i64_le().unwrap(), None);
412 }
413
414 #[rstest]
415 fn test_read_optional_i64_present() {
416 let value: i64 = 12345;
417 let buf = value.to_le_bytes();
418 let mut cursor = SbeCursor::new(&buf);
419
420 assert_eq!(cursor.read_optional_i64_le().unwrap(), Some(12345));
421 }
422
423 #[rstest]
424 fn test_read_var_string8() {
425 let mut buf = vec![5]; buf.extend_from_slice(b"hello");
427 let mut cursor = SbeCursor::new(&buf);
428
429 assert_eq!(cursor.read_var_string8().unwrap(), "hello");
430 assert_eq!(cursor.pos(), 6); }
432
433 #[rstest]
434 fn test_read_var_string8_empty() {
435 let buf = [0]; let mut cursor = SbeCursor::new(&buf);
437
438 assert_eq!(cursor.read_var_string8().unwrap(), "");
439 assert_eq!(cursor.pos(), 1);
440 }
441
442 #[rstest]
443 fn test_read_var_string8_invalid_utf8() {
444 let buf = [2, 0xFF, 0xFE]; let mut cursor = SbeCursor::new(&buf);
446
447 assert!(matches!(
448 cursor.read_var_string8(),
449 Err(SbeDecodeError::InvalidUtf8)
450 ));
451 }
452
453 #[rstest]
454 fn test_read_group_header() {
455 let buf = [24, 0, 3, 0, 0, 0];
457 let mut cursor = SbeCursor::new(&buf);
458
459 let (block_len, count) = cursor.read_group_header().unwrap();
460 assert_eq!(block_len, 24);
461 assert_eq!(count, 3);
462 assert_eq!(cursor.pos(), 6);
463 }
464
465 #[rstest]
466 fn test_read_group_header_too_large() {
467 let count = MAX_GROUP_SIZE + 1;
469 let mut buf = vec![24, 0]; buf.extend_from_slice(&count.to_le_bytes());
471 let mut cursor = SbeCursor::new(&buf);
472
473 assert!(matches!(
474 cursor.read_group_header(),
475 Err(SbeDecodeError::GroupSizeTooLarge { .. })
476 ));
477 }
478
479 #[rstest]
480 fn test_read_group() {
481 let mut buf = Vec::new();
483 buf.extend_from_slice(&100u32.to_le_bytes()); buf.extend_from_slice(&200u32.to_le_bytes()); let mut cursor = SbeCursor::new(&buf);
487 let items: Vec<u32> = cursor.read_group(4, 2, |c| c.read_u32_le()).unwrap();
488
489 assert_eq!(items, vec![100, 200]);
490 assert_eq!(cursor.pos(), 8);
491 }
492
493 #[rstest]
494 fn test_read_group_respects_block_length() {
495 let mut buf = Vec::new();
497 buf.extend_from_slice(&100u32.to_le_bytes());
498 buf.extend_from_slice(&[0, 0, 0, 0]); buf.extend_from_slice(&200u32.to_le_bytes());
500 buf.extend_from_slice(&[0, 0, 0, 0]); let mut cursor = SbeCursor::new(&buf);
503 let items: Vec<u32> = cursor.read_group(8, 2, |c| c.read_u32_le()).unwrap();
504
505 assert_eq!(items, vec![100, 200]);
506 assert_eq!(cursor.pos(), 16); }
508
509 #[rstest]
510 fn test_require_success() {
511 let buf = [1, 2, 3, 4];
512 let cursor = SbeCursor::new(&buf);
513
514 assert!(cursor.require(4).is_ok());
515 assert!(cursor.require(3).is_ok());
516 }
517
518 #[rstest]
519 fn test_require_failure() {
520 let buf = [1, 2];
521 let cursor = SbeCursor::new(&buf);
522
523 let err = cursor.require(5).unwrap_err();
524 assert!(matches!(
525 err,
526 SbeDecodeError::BufferTooShort {
527 expected: 5,
528 actual: 2
529 }
530 ));
531 }
532
533 #[rstest]
534 fn test_advance() {
535 let buf = [1, 2, 3, 4];
536 let mut cursor = SbeCursor::new(&buf);
537
538 cursor.advance(2).unwrap();
539 assert_eq!(cursor.pos(), 2);
540
541 cursor.advance(2).unwrap();
542 assert_eq!(cursor.pos(), 4);
543
544 assert!(cursor.advance(1).is_err());
545 }
546
547 #[rstest]
548 fn test_peek() {
549 let buf = [1, 2, 3, 4];
550 let mut cursor = SbeCursor::new(&buf);
551
552 assert_eq!(cursor.peek(), &[1, 2, 3, 4]);
553
554 cursor.advance(2).unwrap();
555 assert_eq!(cursor.peek(), &[3, 4]);
556 }
557
558 #[rstest]
559 fn test_reset() {
560 let buf = [1, 2, 3, 4];
561 let mut cursor = SbeCursor::new(&buf);
562
563 cursor.advance(3).unwrap();
564 assert_eq!(cursor.pos(), 3);
565
566 cursor.reset();
567 assert_eq!(cursor.pos(), 0);
568 }
569}