nautilus_binance/common/sbe/
cursor.rs1use super::error::{MAX_GROUP_SIZE, SbeDecodeError};
19
20#[derive(Debug, Clone)]
25pub struct SbeCursor<'a> {
26 buf: &'a [u8],
27 pos: usize,
28}
29
30impl<'a> SbeCursor<'a> {
31 #[must_use]
33 pub const fn new(buf: &'a [u8]) -> Self {
34 Self { buf, pos: 0 }
35 }
36
37 #[must_use]
39 pub const fn new_at(buf: &'a [u8], pos: usize) -> Self {
40 Self { buf, pos }
41 }
42
43 #[must_use]
45 pub const fn pos(&self) -> usize {
46 self.pos
47 }
48
49 #[must_use]
51 pub const fn remaining(&self) -> usize {
52 self.buf.len().saturating_sub(self.pos)
53 }
54
55 #[must_use]
57 pub const fn buffer(&self) -> &'a [u8] {
58 self.buf
59 }
60
61 #[must_use]
63 pub fn peek(&self) -> &'a [u8] {
64 &self.buf[self.pos..]
65 }
66
67 pub fn require(&self, n: usize) -> Result<(), SbeDecodeError> {
73 if self.remaining() < n {
74 return Err(SbeDecodeError::BufferTooShort {
75 expected: self.pos + n,
76 actual: self.buf.len(),
77 });
78 }
79 Ok(())
80 }
81
82 pub fn advance(&mut self, n: usize) -> Result<(), SbeDecodeError> {
88 self.require(n)?;
89 self.pos += n;
90 Ok(())
91 }
92
93 pub fn skip(&mut self, n: usize) {
99 self.pos += n;
100 }
101
102 pub fn reset(&mut self) {
104 self.pos = 0;
105 }
106
107 pub fn set_pos(&mut self, pos: usize) {
109 self.pos = pos;
110 }
111
112 pub fn read_u8(&mut self) -> Result<u8, SbeDecodeError> {
114 self.require(1)?;
115 let value = self.buf[self.pos];
116 self.pos += 1;
117 Ok(value)
118 }
119
120 pub fn read_i8(&mut self) -> Result<i8, SbeDecodeError> {
122 self.require(1)?;
123 let value = self.buf[self.pos] as i8;
124 self.pos += 1;
125 Ok(value)
126 }
127
128 pub fn read_u16_le(&mut self) -> Result<u16, SbeDecodeError> {
130 self.require(2)?;
131 let value = u16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
132 self.pos += 2;
133 Ok(value)
134 }
135
136 pub fn read_i16_le(&mut self) -> Result<i16, SbeDecodeError> {
138 self.require(2)?;
139 let value = i16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
140 self.pos += 2;
141 Ok(value)
142 }
143
144 pub fn read_u32_le(&mut self) -> Result<u32, SbeDecodeError> {
146 self.require(4)?;
147 let value = u32::from_le_bytes([
148 self.buf[self.pos],
149 self.buf[self.pos + 1],
150 self.buf[self.pos + 2],
151 self.buf[self.pos + 3],
152 ]);
153 self.pos += 4;
154 Ok(value)
155 }
156
157 pub fn read_i32_le(&mut self) -> Result<i32, SbeDecodeError> {
159 self.require(4)?;
160 let value = i32::from_le_bytes([
161 self.buf[self.pos],
162 self.buf[self.pos + 1],
163 self.buf[self.pos + 2],
164 self.buf[self.pos + 3],
165 ]);
166 self.pos += 4;
167 Ok(value)
168 }
169
170 pub fn read_u64_le(&mut self) -> Result<u64, SbeDecodeError> {
172 self.require(8)?;
173 let value = u64::from_le_bytes([
174 self.buf[self.pos],
175 self.buf[self.pos + 1],
176 self.buf[self.pos + 2],
177 self.buf[self.pos + 3],
178 self.buf[self.pos + 4],
179 self.buf[self.pos + 5],
180 self.buf[self.pos + 6],
181 self.buf[self.pos + 7],
182 ]);
183 self.pos += 8;
184 Ok(value)
185 }
186
187 pub fn read_i64_le(&mut self) -> Result<i64, SbeDecodeError> {
189 self.require(8)?;
190 let value = i64::from_le_bytes([
191 self.buf[self.pos],
192 self.buf[self.pos + 1],
193 self.buf[self.pos + 2],
194 self.buf[self.pos + 3],
195 self.buf[self.pos + 4],
196 self.buf[self.pos + 5],
197 self.buf[self.pos + 6],
198 self.buf[self.pos + 7],
199 ]);
200 self.pos += 8;
201 Ok(value)
202 }
203
204 pub fn read_optional_i64_le(&mut self) -> Result<Option<i64>, SbeDecodeError> {
206 let value = self.read_i64_le()?;
207 Ok(if value == i64::MIN { None } else { Some(value) })
208 }
209
210 pub fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], SbeDecodeError> {
212 self.require(n)?;
213 let slice = &self.buf[self.pos..self.pos + n];
214 self.pos += n;
215 Ok(slice)
216 }
217
218 pub fn read_var_string8(&mut self) -> Result<String, SbeDecodeError> {
222 let len = self.read_u8()? as usize;
223 if len == 0 {
224 return Ok(String::new());
225 }
226 self.require(len)?;
227 let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
228 .map_err(|_| SbeDecodeError::InvalidUtf8)?
229 .to_string();
230 self.pos += len;
231 Ok(s)
232 }
233
234 pub fn read_var_string8_ref(&mut self) -> Result<&'a str, SbeDecodeError> {
236 let len = self.read_u8()? as usize;
237 if len == 0 {
238 return Ok("");
239 }
240 self.require(len)?;
241 let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
242 .map_err(|_| SbeDecodeError::InvalidUtf8)?;
243 self.pos += len;
244 Ok(s)
245 }
246
247 pub fn skip_var_data8(&mut self) -> Result<(), SbeDecodeError> {
251 let len = self.read_u8()? as usize;
252 if len > 0 {
253 self.advance(len)?;
254 }
255 Ok(())
256 }
257
258 pub fn read_var_bytes8(&mut self) -> Result<Vec<u8>, SbeDecodeError> {
262 let len = self.read_u8()? as usize;
263 if len == 0 {
264 return Ok(Vec::new());
265 }
266 self.require(len)?;
267 let bytes = self.buf[self.pos..self.pos + len].to_vec();
268 self.pos += len;
269 Ok(bytes)
270 }
271
272 pub fn read_group_header(&mut self) -> Result<(u16, u32), SbeDecodeError> {
276 let block_length = self.read_u16_le()?;
277 let num_in_group = self.read_u32_le()?;
278
279 if num_in_group > MAX_GROUP_SIZE {
280 return Err(SbeDecodeError::GroupSizeTooLarge {
281 count: num_in_group,
282 max: MAX_GROUP_SIZE,
283 });
284 }
285
286 Ok((block_length, num_in_group))
287 }
288
289 pub fn read_group_header_16(&mut self) -> Result<(u16, u16), SbeDecodeError> {
293 let block_length = self.read_u16_le()?;
294 let num_in_group = self.read_u16_le()?;
295
296 if u32::from(num_in_group) > MAX_GROUP_SIZE {
297 return Err(SbeDecodeError::GroupSizeTooLarge {
298 count: u32::from(num_in_group),
299 max: MAX_GROUP_SIZE,
300 });
301 }
302
303 Ok((block_length, num_in_group))
304 }
305
306 pub fn read_group<T, F>(
311 &mut self,
312 block_length: u16,
313 num_in_group: u32,
314 mut decode_item: F,
315 ) -> Result<Vec<T>, SbeDecodeError>
316 where
317 F: FnMut(&mut Self) -> Result<T, SbeDecodeError>,
318 {
319 let block_len = block_length as usize;
320 let count = num_in_group as usize;
321
322 self.require(count * block_len)?;
324
325 let mut items = Vec::with_capacity(count);
326 for _ in 0..count {
327 let item_start = self.pos;
328 let item = decode_item(self)?;
329 items.push(item);
330
331 self.pos = item_start + block_len;
333 }
334
335 Ok(items)
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use rstest::rstest;
342
343 use super::*;
344
345 #[rstest]
346 fn test_new_starts_at_zero() {
347 let buf = [1, 2, 3, 4];
348 let cursor = SbeCursor::new(&buf);
349 assert_eq!(cursor.pos(), 0);
350 assert_eq!(cursor.remaining(), 4);
351 }
352
353 #[rstest]
354 fn test_new_at_starts_at_offset() {
355 let buf = [1, 2, 3, 4];
356 let cursor = SbeCursor::new_at(&buf, 2);
357 assert_eq!(cursor.pos(), 2);
358 assert_eq!(cursor.remaining(), 2);
359 }
360
361 #[rstest]
362 fn test_read_u8() {
363 let buf = [0x42, 0xFF];
364 let mut cursor = SbeCursor::new(&buf);
365
366 assert_eq!(cursor.read_u8().unwrap(), 0x42);
367 assert_eq!(cursor.pos(), 1);
368
369 assert_eq!(cursor.read_u8().unwrap(), 0xFF);
370 assert_eq!(cursor.pos(), 2);
371
372 assert!(cursor.read_u8().is_err());
373 }
374
375 #[rstest]
376 fn test_read_i8() {
377 let buf = [0x7F, 0x80]; let mut cursor = SbeCursor::new(&buf);
379
380 assert_eq!(cursor.read_i8().unwrap(), 127);
381 assert_eq!(cursor.read_i8().unwrap(), -128);
382 }
383
384 #[rstest]
385 fn test_read_u16_le() {
386 let buf = [0x34, 0x12]; let mut cursor = SbeCursor::new(&buf);
388
389 assert_eq!(cursor.read_u16_le().unwrap(), 0x1234);
390 assert_eq!(cursor.pos(), 2);
391 }
392
393 #[rstest]
394 fn test_read_i64_le() {
395 let value: i64 = -1234567890123456789;
396 let buf = value.to_le_bytes();
397 let mut cursor = SbeCursor::new(&buf);
398
399 assert_eq!(cursor.read_i64_le().unwrap(), value);
400 assert_eq!(cursor.pos(), 8);
401 }
402
403 #[rstest]
404 fn test_read_optional_i64_null() {
405 let buf = i64::MIN.to_le_bytes();
406 let mut cursor = SbeCursor::new(&buf);
407
408 assert_eq!(cursor.read_optional_i64_le().unwrap(), None);
409 }
410
411 #[rstest]
412 fn test_read_optional_i64_present() {
413 let value: i64 = 12345;
414 let buf = value.to_le_bytes();
415 let mut cursor = SbeCursor::new(&buf);
416
417 assert_eq!(cursor.read_optional_i64_le().unwrap(), Some(12345));
418 }
419
420 #[rstest]
421 fn test_read_var_string8() {
422 let mut buf = vec![5]; buf.extend_from_slice(b"hello");
424 let mut cursor = SbeCursor::new(&buf);
425
426 assert_eq!(cursor.read_var_string8().unwrap(), "hello");
427 assert_eq!(cursor.pos(), 6); }
429
430 #[rstest]
431 fn test_read_var_string8_empty() {
432 let buf = [0]; let mut cursor = SbeCursor::new(&buf);
434
435 assert_eq!(cursor.read_var_string8().unwrap(), "");
436 assert_eq!(cursor.pos(), 1);
437 }
438
439 #[rstest]
440 fn test_read_var_string8_invalid_utf8() {
441 let buf = [2, 0xFF, 0xFE]; let mut cursor = SbeCursor::new(&buf);
443
444 assert!(matches!(
445 cursor.read_var_string8(),
446 Err(SbeDecodeError::InvalidUtf8)
447 ));
448 }
449
450 #[rstest]
451 fn test_read_group_header() {
452 let buf = [24, 0, 3, 0, 0, 0];
454 let mut cursor = SbeCursor::new(&buf);
455
456 let (block_len, count) = cursor.read_group_header().unwrap();
457 assert_eq!(block_len, 24);
458 assert_eq!(count, 3);
459 assert_eq!(cursor.pos(), 6);
460 }
461
462 #[rstest]
463 fn test_read_group_header_too_large() {
464 let count = MAX_GROUP_SIZE + 1;
466 let mut buf = vec![24, 0]; buf.extend_from_slice(&count.to_le_bytes());
468 let mut cursor = SbeCursor::new(&buf);
469
470 assert!(matches!(
471 cursor.read_group_header(),
472 Err(SbeDecodeError::GroupSizeTooLarge { .. })
473 ));
474 }
475
476 #[rstest]
477 fn test_read_group() {
478 let mut buf = Vec::new();
480 buf.extend_from_slice(&100u32.to_le_bytes()); buf.extend_from_slice(&200u32.to_le_bytes()); let mut cursor = SbeCursor::new(&buf);
484 let items: Vec<u32> = cursor.read_group(4, 2, |c| c.read_u32_le()).unwrap();
485
486 assert_eq!(items, vec![100, 200]);
487 assert_eq!(cursor.pos(), 8);
488 }
489
490 #[rstest]
491 fn test_read_group_respects_block_length() {
492 let mut buf = Vec::new();
494 buf.extend_from_slice(&100u32.to_le_bytes());
495 buf.extend_from_slice(&[0, 0, 0, 0]); buf.extend_from_slice(&200u32.to_le_bytes());
497 buf.extend_from_slice(&[0, 0, 0, 0]); let mut cursor = SbeCursor::new(&buf);
500 let items: Vec<u32> = cursor.read_group(8, 2, |c| c.read_u32_le()).unwrap();
501
502 assert_eq!(items, vec![100, 200]);
503 assert_eq!(cursor.pos(), 16); }
505
506 #[rstest]
507 fn test_require_success() {
508 let buf = [1, 2, 3, 4];
509 let cursor = SbeCursor::new(&buf);
510
511 assert!(cursor.require(4).is_ok());
512 assert!(cursor.require(3).is_ok());
513 }
514
515 #[rstest]
516 fn test_require_failure() {
517 let buf = [1, 2];
518 let cursor = SbeCursor::new(&buf);
519
520 let err = cursor.require(5).unwrap_err();
521 assert!(matches!(
522 err,
523 SbeDecodeError::BufferTooShort {
524 expected: 5,
525 actual: 2
526 }
527 ));
528 }
529
530 #[rstest]
531 fn test_advance() {
532 let buf = [1, 2, 3, 4];
533 let mut cursor = SbeCursor::new(&buf);
534
535 cursor.advance(2).unwrap();
536 assert_eq!(cursor.pos(), 2);
537
538 cursor.advance(2).unwrap();
539 assert_eq!(cursor.pos(), 4);
540
541 assert!(cursor.advance(1).is_err());
542 }
543
544 #[rstest]
545 fn test_peek() {
546 let buf = [1, 2, 3, 4];
547 let mut cursor = SbeCursor::new(&buf);
548
549 assert_eq!(cursor.peek(), &[1, 2, 3, 4]);
550
551 cursor.advance(2).unwrap();
552 assert_eq!(cursor.peek(), &[3, 4]);
553 }
554
555 #[rstest]
556 fn test_reset() {
557 let buf = [1, 2, 3, 4];
558 let mut cursor = SbeCursor::new(&buf);
559
560 cursor.advance(3).unwrap();
561 assert_eq!(cursor.pos(), 3);
562
563 cursor.reset();
564 assert_eq!(cursor.pos(), 0);
565 }
566}