1use memchr::memchr;
19
20use crate::socket::TcpMessageHandler;
21
22const MIN_MESSAGE_SIZE: usize = 10; const MAX_MESSAGE_SIZE: usize = 8192; const CHECKSUM_LEN: usize = 7; const CHECKSUM_TAG: &[u8] = b"10=";
26const START_PATTERN: &[u8] = b"8=FIX";
27const START_CHAR: u8 = b'8';
28const DELIMITER: u8 = b'\x01';
29
30pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &TcpMessageHandler) {
56 let mut processed_to = 0;
57
58 while processed_to < buf.len() {
59 if buf.len() - processed_to < MIN_MESSAGE_SIZE {
60 break;
61 }
62
63 let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
65 if let Some(idx) = start_idx {
66 if idx + START_PATTERN.len() <= buf.len()
67 && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
68 {
69 if let Some(end_pos) = find_message_end(&buf[idx..]) {
71 let message_end = idx + end_pos;
72 let message_len = message_end - idx;
73
74 if message_len > MAX_MESSAGE_SIZE {
76 processed_to = idx + 1;
78 continue;
79 }
80
81 let message = &buf[idx..message_end];
82 handler(message); processed_to = message_end; } else {
85 break;
87 }
88 } else {
89 processed_to = idx + 1;
91 }
92 } else {
93 if buf.len() > START_PATTERN.len() - 1 {
97 let keep_from = buf.len() - (START_PATTERN.len() - 1);
98 buf.drain(0..keep_from);
99 }
100 return;
101 }
102 }
103
104 if processed_to > 0 {
106 buf.drain(0..processed_to);
107 }
108}
109
110#[inline(always)]
112fn find_message_end(buf: &[u8]) -> Option<usize> {
113 let mut idx = 0;
114 while idx + CHECKSUM_LEN <= buf.len() {
115 if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
116 && buf[idx + 3].is_ascii_digit()
117 && buf[idx + 4].is_ascii_digit()
118 && buf[idx + 5].is_ascii_digit()
119 && buf[idx + 6] == DELIMITER
120 {
121 return Some(idx + CHECKSUM_LEN);
122 }
123 idx += 1;
124 }
125 None
126}
127
128#[cfg(test)]
129mod process_fix_buffer_tests {
130 use std::sync::{Arc, Mutex};
131
132 use nautilus_core::MUTEX_POISONED;
133 use rstest::rstest;
134
135 use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
136
137 #[rstest]
138 fn test_process_empty_buffer() {
139 let mut buffer = Vec::new();
140 let received = Arc::new(Mutex::new(Vec::new()));
141 let received_clone = received.clone();
142
143 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
144 received_clone
145 .lock()
146 .expect(MUTEX_POISONED)
147 .push(data.to_vec());
148 });
149
150 process_fix_buffer(&mut buffer, &handler);
151
152 assert!(received.lock().expect(MUTEX_POISONED).is_empty());
154 assert!(buffer.is_empty());
155 }
156
157 #[rstest]
158 fn test_process_incomplete_message() {
159 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
161 let received = Arc::new(Mutex::new(Vec::new()));
162 let received_clone = received.clone();
163
164 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
165 received_clone
166 .lock()
167 .expect(MUTEX_POISONED)
168 .push(data.to_vec());
169 });
170
171 process_fix_buffer(&mut buffer, &handler);
172
173 assert!(received.lock().expect(MUTEX_POISONED).is_empty());
175 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
177 }
178
179 #[rstest]
180 fn test_process_complete_message() {
181 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
183 let received = Arc::new(Mutex::new(Vec::new()));
184 let received_clone = received.clone();
185
186 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
187 received_clone
188 .lock()
189 .expect(MUTEX_POISONED)
190 .push(data.to_vec());
191 });
192
193 process_fix_buffer(&mut buffer, &handler);
194
195 assert!(buffer.is_empty() || received.lock().expect(MUTEX_POISONED).len() == 1);
196 }
197
198 #[rstest]
199 fn test_process_message_with_garbage_prefix() {
200 let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
202 let received = Arc::new(Mutex::new(Vec::new()));
203 let received_clone = received.clone();
204
205 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
206 received_clone
207 .lock()
208 .expect(MUTEX_POISONED)
209 .push(data.to_vec());
210 });
211
212 process_fix_buffer(&mut buffer, &handler);
213
214 assert!(buffer.is_empty() || received.lock().expect(MUTEX_POISONED).len() == 1);
215 }
216
217 #[rstest]
218 fn test_process_partial_checksum() {
219 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
221 let received = Arc::new(Mutex::new(Vec::new()));
222 let received_clone = received.clone();
223
224 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
225 received_clone
226 .lock()
227 .expect(MUTEX_POISONED)
228 .push(data.to_vec());
229 });
230
231 process_fix_buffer(&mut buffer, &handler);
232
233 assert!(received.lock().expect(MUTEX_POISONED).is_empty());
235 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
237 }
238
239 #[rstest]
240 fn test_process_multiple_messages_single_call() {
241 let mut buffer =
243 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
244 .to_vec();
245 let received = Arc::new(Mutex::new(Vec::new()));
246 let received_clone = received.clone();
247
248 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
249 received_clone
250 .lock()
251 .expect(MUTEX_POISONED)
252 .push(data.to_vec());
253 });
254
255 process_fix_buffer(&mut buffer, &handler);
256
257 assert_eq!(received.lock().expect(MUTEX_POISONED).len(), 2);
258 assert_eq!(
259 received.lock().expect(MUTEX_POISONED)[0],
260 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
261 );
262 assert_eq!(
263 received.lock().expect(MUTEX_POISONED)[1],
264 b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
265 );
266 assert!(buffer.is_empty());
267 }
268
269 #[rstest]
270 fn test_process_message_with_invalid_checksum() {
271 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
273 let received = Arc::new(Mutex::new(Vec::new()));
274 let received_clone = received.clone();
275
276 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
277 received_clone
278 .lock()
279 .expect(MUTEX_POISONED)
280 .push(data.to_vec());
281 });
282
283 process_fix_buffer(&mut buffer, &handler);
284
285 assert!(received.lock().expect(MUTEX_POISONED).is_empty());
287 assert_eq!(
289 buffer,
290 b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
291 );
292 }
293
294 #[rstest]
295 fn test_process_message_with_multiple_checksums() {
296 let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
297 let received = Arc::new(Mutex::new(Vec::new()));
298 let received_clone = received.clone();
299
300 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
301 received_clone
302 .lock()
303 .expect(MUTEX_POISONED)
304 .push(data.to_vec());
305 });
306
307 process_fix_buffer(&mut buffer, &handler);
308
309 assert_eq!(received.lock().expect(MUTEX_POISONED).len(), 1);
311 assert_eq!(
312 received.lock().expect(MUTEX_POISONED)[0],
313 b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
314 );
315 assert_eq!(buffer, b"10=456\x01".to_vec());
316 }
317
318 #[rstest]
319 fn test_process_large_buffer() {
320 let mut buffer = Vec::new();
321 let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
322 for _ in 0..1000 {
323 buffer.extend_from_slice(message);
324 }
325 let received = Arc::new(Mutex::new(Vec::new()));
326 let received_clone = received.clone();
327
328 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
329 received_clone
330 .lock()
331 .expect(MUTEX_POISONED)
332 .push(data.to_vec());
333 });
334
335 process_fix_buffer(&mut buffer, &handler);
336
337 assert_eq!(received.lock().expect(MUTEX_POISONED).len(), 1000);
339 assert!(buffer.is_empty());
340 }
341}