1use memchr::memchr;
19
20use crate::socket::TcpMessageHandler;
21
22const MIN_MESSAGE_SIZE: usize = 10; const MAX_MESSAGE_SIZE: usize = 8192; const CHECKSUM_LEN: usize = 7; const CHECKSUM_TAG: &[u8] = b"10=";
26const START_PATTERN: &[u8] = b"8=FIX";
27const START_CHAR: u8 = b'8';
28const DELIMITER: u8 = b'\x01';
29
30pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &TcpMessageHandler) {
56 let mut processed_to = 0;
57
58 while processed_to < buf.len() {
59 if buf.len() - processed_to < MIN_MESSAGE_SIZE {
60 break;
61 }
62
63 let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
65 if let Some(idx) = start_idx {
66 if idx + START_PATTERN.len() <= buf.len()
67 && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
68 {
69 if let Some(end_pos) = find_message_end(&buf[idx..]) {
71 let message_end = idx + end_pos;
72 let message_len = message_end - idx;
73
74 if message_len > MAX_MESSAGE_SIZE {
76 processed_to = idx + 1;
78 continue;
79 }
80
81 let message = &buf[idx..message_end];
82 handler(message); processed_to = message_end; } else {
85 break;
87 }
88 } else {
89 processed_to = idx + 1;
91 }
92 } else {
93 buf.clear();
95 return;
96 }
97 }
98
99 if processed_to > 0 {
101 buf.drain(0..processed_to);
102 }
103}
104
105#[inline(always)]
107fn find_message_end(buf: &[u8]) -> Option<usize> {
108 let mut idx = 0;
109 while idx + CHECKSUM_LEN <= buf.len() {
110 if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
111 && buf[idx + 3].is_ascii_digit()
112 && buf[idx + 4].is_ascii_digit()
113 && buf[idx + 5].is_ascii_digit()
114 && buf[idx + 6] == DELIMITER
115 {
116 return Some(idx + CHECKSUM_LEN);
117 }
118 idx += 1;
119 }
120 None
121}
122
123#[cfg(test)]
124mod process_fix_buffer_tests {
125 use std::sync::{Arc, Mutex};
126
127 use rstest::rstest;
128
129 use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
130
131 #[rstest]
132 fn test_process_empty_buffer() {
133 let mut buffer = Vec::new();
134 let received = Arc::new(Mutex::new(Vec::new()));
135 let received_clone = received.clone();
136
137 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
138 received_clone.lock().unwrap().push(data.to_vec());
139 });
140
141 process_fix_buffer(&mut buffer, &handler);
142
143 assert!(received.lock().unwrap().is_empty());
145 assert!(buffer.is_empty());
146 }
147
148 #[rstest]
149 fn test_process_incomplete_message() {
150 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
152 let received = Arc::new(Mutex::new(Vec::new()));
153 let received_clone = received.clone();
154
155 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
156 received_clone.lock().unwrap().push(data.to_vec());
157 });
158
159 process_fix_buffer(&mut buffer, &handler);
160
161 assert!(received.lock().unwrap().is_empty());
163 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
165 }
166
167 #[rstest]
168 fn test_process_complete_message() {
169 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
171 let received = Arc::new(Mutex::new(Vec::new()));
172 let received_clone = received.clone();
173
174 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
175 received_clone.lock().unwrap().push(data.to_vec());
176 });
177
178 process_fix_buffer(&mut buffer, &handler);
179
180 assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
181 }
182
183 #[rstest]
184 fn test_process_message_with_garbage_prefix() {
185 let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
187 let received = Arc::new(Mutex::new(Vec::new()));
188 let received_clone = received.clone();
189
190 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
191 received_clone.lock().unwrap().push(data.to_vec());
192 });
193
194 process_fix_buffer(&mut buffer, &handler);
195
196 assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
197 }
198
199 #[rstest]
200 fn test_process_partial_checksum() {
201 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
203 let received = Arc::new(Mutex::new(Vec::new()));
204 let received_clone = received.clone();
205
206 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
207 received_clone.lock().unwrap().push(data.to_vec());
208 });
209
210 process_fix_buffer(&mut buffer, &handler);
211
212 assert!(received.lock().unwrap().is_empty());
214 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
216 }
217
218 #[rstest]
219 fn test_process_multiple_messages_single_call() {
220 let mut buffer =
222 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
223 .to_vec();
224 let received = Arc::new(Mutex::new(Vec::new()));
225 let received_clone = received.clone();
226
227 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
228 received_clone.lock().unwrap().push(data.to_vec());
229 });
230
231 process_fix_buffer(&mut buffer, &handler);
232
233 assert_eq!(received.lock().unwrap().len(), 2);
234 assert_eq!(
235 received.lock().unwrap()[0],
236 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
237 );
238 assert_eq!(
239 received.lock().unwrap()[1],
240 b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
241 );
242 assert!(buffer.is_empty());
243 }
244
245 #[rstest]
246 fn test_process_message_with_invalid_checksum() {
247 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
249 let received = Arc::new(Mutex::new(Vec::new()));
250 let received_clone = received.clone();
251
252 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
253 received_clone.lock().unwrap().push(data.to_vec());
254 });
255
256 process_fix_buffer(&mut buffer, &handler);
257
258 assert!(received.lock().unwrap().is_empty());
260 assert_eq!(
262 buffer,
263 b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
264 );
265 }
266
267 #[rstest]
268 fn test_process_message_with_multiple_checksums() {
269 let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
270 let received = Arc::new(Mutex::new(Vec::new()));
271 let received_clone = received.clone();
272
273 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
274 received_clone.lock().unwrap().push(data.to_vec());
275 });
276
277 process_fix_buffer(&mut buffer, &handler);
278
279 assert_eq!(received.lock().unwrap().len(), 1);
281 assert_eq!(
282 received.lock().unwrap()[0],
283 b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
284 );
285 assert_eq!(buffer, b"10=456\x01".to_vec());
286 }
287
288 #[rstest]
289 fn test_process_large_buffer() {
290 let mut buffer = Vec::new();
291 let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
292 for _ in 0..1000 {
293 buffer.extend_from_slice(message);
294 }
295 let received = Arc::new(Mutex::new(Vec::new()));
296 let received_clone = received.clone();
297
298 let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
299 received_clone.lock().unwrap().push(data.to_vec());
300 });
301
302 process_fix_buffer(&mut buffer, &handler);
303
304 assert_eq!(received.lock().unwrap().len(), 1000);
306 assert!(buffer.is_empty());
307 }
308}