nautilus_binance/common/sbe/
cursor.rs1use super::error::{MAX_GROUP_SIZE, SbeDecodeError};
19
20#[derive(Debug, Clone)]
25pub struct SbeCursor<'a> {
26 buf: &'a [u8],
27 pos: usize,
28}
29
30impl<'a> SbeCursor<'a> {
31 #[must_use]
33 pub const fn new(buf: &'a [u8]) -> Self {
34 Self { buf, pos: 0 }
35 }
36
37 #[must_use]
39 pub const fn new_at(buf: &'a [u8], pos: usize) -> Self {
40 Self { buf, pos }
41 }
42
43 #[must_use]
45 pub const fn pos(&self) -> usize {
46 self.pos
47 }
48
49 #[must_use]
51 pub const fn remaining(&self) -> usize {
52 self.buf.len().saturating_sub(self.pos)
53 }
54
55 #[must_use]
57 pub const fn buffer(&self) -> &'a [u8] {
58 self.buf
59 }
60
61 #[must_use]
63 pub fn peek(&self) -> &'a [u8] {
64 &self.buf[self.pos..]
65 }
66
67 pub fn require(&self, n: usize) -> Result<(), SbeDecodeError> {
73 if self.remaining() < n {
74 return Err(SbeDecodeError::BufferTooShort {
75 expected: self.pos + n,
76 actual: self.buf.len(),
77 });
78 }
79 Ok(())
80 }
81
82 pub fn advance(&mut self, n: usize) -> Result<(), SbeDecodeError> {
88 self.require(n)?;
89 self.pos += n;
90 Ok(())
91 }
92
93 pub fn skip(&mut self, n: usize) {
99 self.pos += n;
100 }
101
102 pub fn reset(&mut self) {
104 self.pos = 0;
105 }
106
107 pub fn set_pos(&mut self, pos: usize) {
109 self.pos = pos;
110 }
111
112 pub fn read_u8(&mut self) -> Result<u8, SbeDecodeError> {
114 self.require(1)?;
115 let value = self.buf[self.pos];
116 self.pos += 1;
117 Ok(value)
118 }
119
120 pub fn read_i8(&mut self) -> Result<i8, SbeDecodeError> {
122 self.require(1)?;
123 let value = self.buf[self.pos] as i8;
124 self.pos += 1;
125 Ok(value)
126 }
127
128 pub fn read_u16_le(&mut self) -> Result<u16, SbeDecodeError> {
130 self.require(2)?;
131 let value = u16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
132 self.pos += 2;
133 Ok(value)
134 }
135
136 pub fn read_i16_le(&mut self) -> Result<i16, SbeDecodeError> {
138 self.require(2)?;
139 let value = i16::from_le_bytes([self.buf[self.pos], self.buf[self.pos + 1]]);
140 self.pos += 2;
141 Ok(value)
142 }
143
144 pub fn read_u32_le(&mut self) -> Result<u32, SbeDecodeError> {
146 self.require(4)?;
147 let value = u32::from_le_bytes([
148 self.buf[self.pos],
149 self.buf[self.pos + 1],
150 self.buf[self.pos + 2],
151 self.buf[self.pos + 3],
152 ]);
153 self.pos += 4;
154 Ok(value)
155 }
156
157 pub fn read_i32_le(&mut self) -> Result<i32, SbeDecodeError> {
159 self.require(4)?;
160 let value = i32::from_le_bytes([
161 self.buf[self.pos],
162 self.buf[self.pos + 1],
163 self.buf[self.pos + 2],
164 self.buf[self.pos + 3],
165 ]);
166 self.pos += 4;
167 Ok(value)
168 }
169
170 pub fn read_u64_le(&mut self) -> Result<u64, SbeDecodeError> {
172 self.require(8)?;
173 let value = u64::from_le_bytes([
174 self.buf[self.pos],
175 self.buf[self.pos + 1],
176 self.buf[self.pos + 2],
177 self.buf[self.pos + 3],
178 self.buf[self.pos + 4],
179 self.buf[self.pos + 5],
180 self.buf[self.pos + 6],
181 self.buf[self.pos + 7],
182 ]);
183 self.pos += 8;
184 Ok(value)
185 }
186
187 pub fn read_i64_le(&mut self) -> Result<i64, SbeDecodeError> {
189 self.require(8)?;
190 let value = i64::from_le_bytes([
191 self.buf[self.pos],
192 self.buf[self.pos + 1],
193 self.buf[self.pos + 2],
194 self.buf[self.pos + 3],
195 self.buf[self.pos + 4],
196 self.buf[self.pos + 5],
197 self.buf[self.pos + 6],
198 self.buf[self.pos + 7],
199 ]);
200 self.pos += 8;
201 Ok(value)
202 }
203
204 pub fn read_optional_i64_le(&mut self) -> Result<Option<i64>, SbeDecodeError> {
206 let value = self.read_i64_le()?;
207 Ok(if value == i64::MIN { None } else { Some(value) })
208 }
209
210 pub fn read_bytes(&mut self, n: usize) -> Result<&'a [u8], SbeDecodeError> {
212 self.require(n)?;
213 let slice = &self.buf[self.pos..self.pos + n];
214 self.pos += n;
215 Ok(slice)
216 }
217
218 pub fn read_var_string8(&mut self) -> Result<String, SbeDecodeError> {
222 let len = self.read_u8()? as usize;
223 if len == 0 {
224 return Ok(String::new());
225 }
226 self.require(len)?;
227 let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
228 .map_err(|_| SbeDecodeError::InvalidUtf8)?
229 .to_string();
230 self.pos += len;
231 Ok(s)
232 }
233
234 pub fn read_var_string8_ref(&mut self) -> Result<&'a str, SbeDecodeError> {
236 let len = self.read_u8()? as usize;
237 if len == 0 {
238 return Ok("");
239 }
240 self.require(len)?;
241 let s = std::str::from_utf8(&self.buf[self.pos..self.pos + len])
242 .map_err(|_| SbeDecodeError::InvalidUtf8)?;
243 self.pos += len;
244 Ok(s)
245 }
246
247 pub fn read_group_header(&mut self) -> Result<(u16, u32), SbeDecodeError> {
251 let block_length = self.read_u16_le()?;
252 let num_in_group = self.read_u32_le()?;
253
254 if num_in_group > MAX_GROUP_SIZE {
255 return Err(SbeDecodeError::GroupSizeTooLarge {
256 count: num_in_group,
257 max: MAX_GROUP_SIZE,
258 });
259 }
260
261 Ok((block_length, num_in_group))
262 }
263
264 pub fn read_group_header_16(&mut self) -> Result<(u16, u16), SbeDecodeError> {
268 let block_length = self.read_u16_le()?;
269 let num_in_group = self.read_u16_le()?;
270
271 if u32::from(num_in_group) > MAX_GROUP_SIZE {
272 return Err(SbeDecodeError::GroupSizeTooLarge {
273 count: u32::from(num_in_group),
274 max: MAX_GROUP_SIZE,
275 });
276 }
277
278 Ok((block_length, num_in_group))
279 }
280
281 pub fn read_group<T, F>(
286 &mut self,
287 block_length: u16,
288 num_in_group: u32,
289 mut decode_item: F,
290 ) -> Result<Vec<T>, SbeDecodeError>
291 where
292 F: FnMut(&mut Self) -> Result<T, SbeDecodeError>,
293 {
294 let block_len = block_length as usize;
295 let count = num_in_group as usize;
296
297 self.require(count * block_len)?;
299
300 let mut items = Vec::with_capacity(count);
301 for _ in 0..count {
302 let item_start = self.pos;
303 let item = decode_item(self)?;
304 items.push(item);
305
306 self.pos = item_start + block_len;
308 }
309
310 Ok(items)
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use rstest::rstest;
317
318 use super::*;
319
320 #[rstest]
321 fn test_new_starts_at_zero() {
322 let buf = [1, 2, 3, 4];
323 let cursor = SbeCursor::new(&buf);
324 assert_eq!(cursor.pos(), 0);
325 assert_eq!(cursor.remaining(), 4);
326 }
327
328 #[rstest]
329 fn test_new_at_starts_at_offset() {
330 let buf = [1, 2, 3, 4];
331 let cursor = SbeCursor::new_at(&buf, 2);
332 assert_eq!(cursor.pos(), 2);
333 assert_eq!(cursor.remaining(), 2);
334 }
335
336 #[rstest]
337 fn test_read_u8() {
338 let buf = [0x42, 0xFF];
339 let mut cursor = SbeCursor::new(&buf);
340
341 assert_eq!(cursor.read_u8().unwrap(), 0x42);
342 assert_eq!(cursor.pos(), 1);
343
344 assert_eq!(cursor.read_u8().unwrap(), 0xFF);
345 assert_eq!(cursor.pos(), 2);
346
347 assert!(cursor.read_u8().is_err());
348 }
349
350 #[rstest]
351 fn test_read_i8() {
352 let buf = [0x7F, 0x80]; let mut cursor = SbeCursor::new(&buf);
354
355 assert_eq!(cursor.read_i8().unwrap(), 127);
356 assert_eq!(cursor.read_i8().unwrap(), -128);
357 }
358
359 #[rstest]
360 fn test_read_u16_le() {
361 let buf = [0x34, 0x12]; let mut cursor = SbeCursor::new(&buf);
363
364 assert_eq!(cursor.read_u16_le().unwrap(), 0x1234);
365 assert_eq!(cursor.pos(), 2);
366 }
367
368 #[rstest]
369 fn test_read_i64_le() {
370 let value: i64 = -1234567890123456789;
371 let buf = value.to_le_bytes();
372 let mut cursor = SbeCursor::new(&buf);
373
374 assert_eq!(cursor.read_i64_le().unwrap(), value);
375 assert_eq!(cursor.pos(), 8);
376 }
377
378 #[rstest]
379 fn test_read_optional_i64_null() {
380 let buf = i64::MIN.to_le_bytes();
381 let mut cursor = SbeCursor::new(&buf);
382
383 assert_eq!(cursor.read_optional_i64_le().unwrap(), None);
384 }
385
386 #[rstest]
387 fn test_read_optional_i64_present() {
388 let value: i64 = 12345;
389 let buf = value.to_le_bytes();
390 let mut cursor = SbeCursor::new(&buf);
391
392 assert_eq!(cursor.read_optional_i64_le().unwrap(), Some(12345));
393 }
394
395 #[rstest]
396 fn test_read_var_string8() {
397 let mut buf = vec![5]; buf.extend_from_slice(b"hello");
399 let mut cursor = SbeCursor::new(&buf);
400
401 assert_eq!(cursor.read_var_string8().unwrap(), "hello");
402 assert_eq!(cursor.pos(), 6); }
404
405 #[rstest]
406 fn test_read_var_string8_empty() {
407 let buf = [0]; let mut cursor = SbeCursor::new(&buf);
409
410 assert_eq!(cursor.read_var_string8().unwrap(), "");
411 assert_eq!(cursor.pos(), 1);
412 }
413
414 #[rstest]
415 fn test_read_var_string8_invalid_utf8() {
416 let buf = [2, 0xFF, 0xFE]; let mut cursor = SbeCursor::new(&buf);
418
419 assert!(matches!(
420 cursor.read_var_string8(),
421 Err(SbeDecodeError::InvalidUtf8)
422 ));
423 }
424
425 #[rstest]
426 fn test_read_group_header() {
427 let buf = [24, 0, 3, 0, 0, 0];
429 let mut cursor = SbeCursor::new(&buf);
430
431 let (block_len, count) = cursor.read_group_header().unwrap();
432 assert_eq!(block_len, 24);
433 assert_eq!(count, 3);
434 assert_eq!(cursor.pos(), 6);
435 }
436
437 #[rstest]
438 fn test_read_group_header_too_large() {
439 let count = MAX_GROUP_SIZE + 1;
441 let mut buf = vec![24, 0]; buf.extend_from_slice(&count.to_le_bytes());
443 let mut cursor = SbeCursor::new(&buf);
444
445 assert!(matches!(
446 cursor.read_group_header(),
447 Err(SbeDecodeError::GroupSizeTooLarge { .. })
448 ));
449 }
450
451 #[rstest]
452 fn test_read_group() {
453 let mut buf = Vec::new();
455 buf.extend_from_slice(&100u32.to_le_bytes()); buf.extend_from_slice(&200u32.to_le_bytes()); let mut cursor = SbeCursor::new(&buf);
459 let items: Vec<u32> = cursor.read_group(4, 2, |c| c.read_u32_le()).unwrap();
460
461 assert_eq!(items, vec![100, 200]);
462 assert_eq!(cursor.pos(), 8);
463 }
464
465 #[rstest]
466 fn test_read_group_respects_block_length() {
467 let mut buf = Vec::new();
469 buf.extend_from_slice(&100u32.to_le_bytes());
470 buf.extend_from_slice(&[0, 0, 0, 0]); buf.extend_from_slice(&200u32.to_le_bytes());
472 buf.extend_from_slice(&[0, 0, 0, 0]); let mut cursor = SbeCursor::new(&buf);
475 let items: Vec<u32> = cursor.read_group(8, 2, |c| c.read_u32_le()).unwrap();
476
477 assert_eq!(items, vec![100, 200]);
478 assert_eq!(cursor.pos(), 16); }
480
481 #[rstest]
482 fn test_require_success() {
483 let buf = [1, 2, 3, 4];
484 let cursor = SbeCursor::new(&buf);
485
486 assert!(cursor.require(4).is_ok());
487 assert!(cursor.require(3).is_ok());
488 }
489
490 #[rstest]
491 fn test_require_failure() {
492 let buf = [1, 2];
493 let cursor = SbeCursor::new(&buf);
494
495 let err = cursor.require(5).unwrap_err();
496 assert!(matches!(
497 err,
498 SbeDecodeError::BufferTooShort {
499 expected: 5,
500 actual: 2
501 }
502 ));
503 }
504
505 #[rstest]
506 fn test_advance() {
507 let buf = [1, 2, 3, 4];
508 let mut cursor = SbeCursor::new(&buf);
509
510 cursor.advance(2).unwrap();
511 assert_eq!(cursor.pos(), 2);
512
513 cursor.advance(2).unwrap();
514 assert_eq!(cursor.pos(), 4);
515
516 assert!(cursor.advance(1).is_err());
517 }
518
519 #[rstest]
520 fn test_peek() {
521 let buf = [1, 2, 3, 4];
522 let mut cursor = SbeCursor::new(&buf);
523
524 assert_eq!(cursor.peek(), &[1, 2, 3, 4]);
525
526 cursor.advance(2).unwrap();
527 assert_eq!(cursor.peek(), &[3, 4]);
528 }
529
530 #[rstest]
531 fn test_reset() {
532 let buf = [1, 2, 3, 4];
533 let mut cursor = SbeCursor::new(&buf);
534
535 cursor.advance(3).unwrap();
536 assert_eq!(cursor.pos(), 3);
537
538 cursor.reset();
539 assert_eq!(cursor.pos(), 0);
540 }
541}