1use memchr::memchr;
19
20use crate::socket::TcpMessageHandler;
21
22const MIN_MESSAGE_SIZE: usize = 10; const MAX_MESSAGE_SIZE: usize = 8192; const CHECKSUM_LEN: usize = 7; const CHECKSUM_TAG: &[u8] = b"10=";
26const START_PATTERN: &[u8] = b"8=FIX";
27const START_CHAR: u8 = b'8';
28const DELIMITER: u8 = b'\x01';
29
30pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &TcpMessageHandler) {
56 let mut processed_to = 0;
57
58 while processed_to < buf.len() {
59 if buf.len() - processed_to < MIN_MESSAGE_SIZE {
60 break;
61 }
62
63 let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
65 if let Some(idx) = start_idx {
66 if idx + START_PATTERN.len() <= buf.len()
67 && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
68 {
69 if let Some(end_pos) = find_message_end(&buf[idx..]) {
71 let message_end = idx + end_pos;
72 let message_len = message_end - idx;
73
74 if message_len > MAX_MESSAGE_SIZE {
76 processed_to = idx + 1;
78 continue;
79 }
80
81 let message = &buf[idx..message_end];
82 handler(message); processed_to = message_end; } else {
85 break;
87 }
88 } else {
89 processed_to = idx + 1;
91 }
92 } else {
93 if buf.len() > START_PATTERN.len() - 1 {
97 let keep_from = buf.len() - (START_PATTERN.len() - 1);
98 buf.drain(0..keep_from);
99 }
100 return;
101 }
102 }
103
104 if processed_to > 0 {
106 buf.drain(0..processed_to);
107 }
108}
109
110#[inline(always)]
112fn find_message_end(buf: &[u8]) -> Option<usize> {
113 let mut idx = 0;
114 while idx + CHECKSUM_LEN <= buf.len() {
115 if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
116 && buf[idx + 3].is_ascii_digit()
117 && buf[idx + 4].is_ascii_digit()
118 && buf[idx + 5].is_ascii_digit()
119 && buf[idx + 6] == DELIMITER
120 {
121 return Some(idx + CHECKSUM_LEN);
122 }
123 idx += 1;
124 }
125 None
126}
127
128#[cfg(test)]
129mod process_fix_buffer_tests {
130 use std::sync::{Arc, Mutex};
131
132 use rstest::rstest;
133
134 use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
135
136 #[rstest]
137 fn test_process_empty_buffer() {
138 let mut buffer = Vec::new();
139 let received = Arc::new(Mutex::new(Vec::new()));
140 let received_clone = received.clone();
141
142 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
143 received_clone.lock().unwrap().push(data.to_vec());
144 });
145
146 process_fix_buffer(&mut buffer, &handler);
147
148 assert!(received.lock().unwrap().is_empty());
150 assert!(buffer.is_empty());
151 }
152
153 #[rstest]
154 fn test_process_incomplete_message() {
155 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
157 let received = Arc::new(Mutex::new(Vec::new()));
158 let received_clone = received.clone();
159
160 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
161 received_clone.lock().unwrap().push(data.to_vec());
162 });
163
164 process_fix_buffer(&mut buffer, &handler);
165
166 assert!(received.lock().unwrap().is_empty());
168 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
170 }
171
172 #[rstest]
173 fn test_process_complete_message() {
174 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
176 let received = Arc::new(Mutex::new(Vec::new()));
177 let received_clone = received.clone();
178
179 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
180 received_clone.lock().unwrap().push(data.to_vec());
181 });
182
183 process_fix_buffer(&mut buffer, &handler);
184
185 assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
186 }
187
188 #[rstest]
189 fn test_process_message_with_garbage_prefix() {
190 let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
192 let received = Arc::new(Mutex::new(Vec::new()));
193 let received_clone = received.clone();
194
195 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
196 received_clone.lock().unwrap().push(data.to_vec());
197 });
198
199 process_fix_buffer(&mut buffer, &handler);
200
201 assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
202 }
203
204 #[rstest]
205 fn test_process_partial_checksum() {
206 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
208 let received = Arc::new(Mutex::new(Vec::new()));
209 let received_clone = received.clone();
210
211 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
212 received_clone.lock().unwrap().push(data.to_vec());
213 });
214
215 process_fix_buffer(&mut buffer, &handler);
216
217 assert!(received.lock().unwrap().is_empty());
219 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
221 }
222
223 #[rstest]
224 fn test_process_multiple_messages_single_call() {
225 let mut buffer =
227 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
228 .to_vec();
229 let received = Arc::new(Mutex::new(Vec::new()));
230 let received_clone = received.clone();
231
232 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
233 received_clone.lock().unwrap().push(data.to_vec());
234 });
235
236 process_fix_buffer(&mut buffer, &handler);
237
238 assert_eq!(received.lock().unwrap().len(), 2);
239 assert_eq!(
240 received.lock().unwrap()[0],
241 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
242 );
243 assert_eq!(
244 received.lock().unwrap()[1],
245 b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
246 );
247 assert!(buffer.is_empty());
248 }
249
250 #[rstest]
251 fn test_process_message_with_invalid_checksum() {
252 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
254 let received = Arc::new(Mutex::new(Vec::new()));
255 let received_clone = received.clone();
256
257 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
258 received_clone.lock().unwrap().push(data.to_vec());
259 });
260
261 process_fix_buffer(&mut buffer, &handler);
262
263 assert!(received.lock().unwrap().is_empty());
265 assert_eq!(
267 buffer,
268 b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
269 );
270 }
271
272 #[rstest]
273 fn test_process_message_with_multiple_checksums() {
274 let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
275 let received = Arc::new(Mutex::new(Vec::new()));
276 let received_clone = received.clone();
277
278 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
279 received_clone.lock().unwrap().push(data.to_vec());
280 });
281
282 process_fix_buffer(&mut buffer, &handler);
283
284 assert_eq!(received.lock().unwrap().len(), 1);
286 assert_eq!(
287 received.lock().unwrap()[0],
288 b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
289 );
290 assert_eq!(buffer, b"10=456\x01".to_vec());
291 }
292
293 #[rstest]
294 fn test_process_large_buffer() {
295 let mut buffer = Vec::new();
296 let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
297 for _ in 0..1000 {
298 buffer.extend_from_slice(message);
299 }
300 let received = Arc::new(Mutex::new(Vec::new()));
301 let received_clone = received.clone();
302
303 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
304 received_clone.lock().unwrap().push(data.to_vec());
305 });
306
307 process_fix_buffer(&mut buffer, &handler);
308
309 assert_eq!(received.lock().unwrap().len(), 1000);
311 assert!(buffer.is_empty());
312 }
313}