nautilus_network/
fix.rs
1use std::sync::Arc;
19
20use memchr::memchr;
21
22use crate::socket::TcpMessageHandler;
23
24const MIN_MESSAGE_SIZE: usize = 10; const MAX_MESSAGE_SIZE: usize = 8192; const CHECKSUM_LEN: usize = 7; const CHECKSUM_TAG: &[u8] = b"10=";
28const START_PATTERN: &[u8] = b"8=FIX";
29const START_CHAR: u8 = b'8';
30const DELIMITER: u8 = b'\x01';
31
32pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &Arc<TcpMessageHandler>) {
58 let mut processed_to = 0;
59
60 while processed_to < buf.len() {
61 if buf.len() - processed_to < MIN_MESSAGE_SIZE {
62 break;
63 }
64
65 let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
67 if let Some(idx) = start_idx {
68 if idx + START_PATTERN.len() <= buf.len()
69 && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
70 {
71 if let Some(end_pos) = find_message_end(&buf[idx..]) {
73 let message_end = idx + end_pos;
74 if message_end - idx > MAX_MESSAGE_SIZE {
75 processed_to = idx + 1;
77 continue;
78 }
79 let message = &buf[idx..message_end];
80 handler(message); processed_to = message_end; } else {
83 break;
85 }
86 } else {
87 processed_to = idx + 1;
89 }
90 } else {
91 buf.clear();
93 return;
94 }
95 }
96
97 if processed_to > 0 {
99 buf.drain(0..processed_to);
100 }
101}
102
103#[inline(always)]
105fn find_message_end(buf: &[u8]) -> Option<usize> {
106 let mut idx = 0;
107 while idx + CHECKSUM_LEN <= buf.len() {
108 if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
109 && buf[idx + 3].is_ascii_digit()
110 && buf[idx + 4].is_ascii_digit()
111 && buf[idx + 5].is_ascii_digit()
112 && buf[idx + 6] == DELIMITER
113 {
114 return Some(idx + CHECKSUM_LEN);
115 }
116 idx += 1;
117 }
118 None
119}
120
121#[cfg(test)]
122mod process_fix_buffer_tests {
123 use std::sync::{Arc, Mutex};
124
125 use rstest::rstest;
126
127 use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
128
129 #[rstest]
130 fn test_process_empty_buffer() {
131 let mut buffer = Vec::new();
132 let received = Arc::new(Mutex::new(Vec::new()));
133 let received_clone = received.clone();
134
135 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
136 received_clone.lock().unwrap().push(data.to_vec());
137 });
138
139 process_fix_buffer(&mut buffer, &handler);
140
141 assert!(received.lock().unwrap().is_empty());
143 assert!(buffer.is_empty());
144 }
145
146 #[rstest]
147 fn test_process_incomplete_message() {
148 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
150 let received = Arc::new(Mutex::new(Vec::new()));
151 let received_clone = received.clone();
152
153 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
154 received_clone.lock().unwrap().push(data.to_vec());
155 });
156
157 process_fix_buffer(&mut buffer, &handler);
158
159 assert!(received.lock().unwrap().is_empty());
161 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
163 }
164
165 #[rstest]
166 fn test_process_complete_message() {
167 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
169 let received = Arc::new(Mutex::new(Vec::new()));
170 let received_clone = received.clone();
171
172 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
173 received_clone.lock().unwrap().push(data.to_vec());
174 });
175
176 process_fix_buffer(&mut buffer, &handler);
177
178 assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
179 }
180
181 #[rstest]
182 fn test_process_message_with_garbage_prefix() {
183 let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
185 let received = Arc::new(Mutex::new(Vec::new()));
186 let received_clone = received.clone();
187
188 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
189 received_clone.lock().unwrap().push(data.to_vec());
190 });
191
192 process_fix_buffer(&mut buffer, &handler);
193
194 assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
195 }
196
197 #[rstest]
198 fn test_process_partial_checksum() {
199 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
201 let received = Arc::new(Mutex::new(Vec::new()));
202 let received_clone = received.clone();
203
204 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
205 received_clone.lock().unwrap().push(data.to_vec());
206 });
207
208 process_fix_buffer(&mut buffer, &handler);
209
210 assert!(received.lock().unwrap().is_empty());
212 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
214 }
215
216 #[rstest]
217 fn test_process_multiple_messages_single_call() {
218 let mut buffer =
220 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
221 .to_vec();
222 let received = Arc::new(Mutex::new(Vec::new()));
223 let received_clone = received.clone();
224
225 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
226 received_clone.lock().unwrap().push(data.to_vec());
227 });
228
229 process_fix_buffer(&mut buffer, &handler);
230
231 assert_eq!(received.lock().unwrap().len(), 2);
232 assert_eq!(
233 received.lock().unwrap()[0],
234 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
235 );
236 assert_eq!(
237 received.lock().unwrap()[1],
238 b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
239 );
240 assert!(buffer.is_empty());
241 }
242
243 #[rstest]
244 fn test_process_message_with_invalid_checksum() {
245 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
247 let received = Arc::new(Mutex::new(Vec::new()));
248 let received_clone = received.clone();
249
250 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
251 received_clone.lock().unwrap().push(data.to_vec());
252 });
253
254 process_fix_buffer(&mut buffer, &handler);
255
256 assert!(received.lock().unwrap().is_empty());
258 assert_eq!(
260 buffer,
261 b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
262 );
263 }
264
265 #[rstest]
266 fn test_process_message_with_multiple_checksums() {
267 let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
268 let received = Arc::new(Mutex::new(Vec::new()));
269 let received_clone = received.clone();
270
271 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
272 received_clone.lock().unwrap().push(data.to_vec());
273 });
274
275 process_fix_buffer(&mut buffer, &handler);
276
277 assert_eq!(received.lock().unwrap().len(), 1);
279 assert_eq!(
280 received.lock().unwrap()[0],
281 b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
282 );
283 assert_eq!(buffer, b"10=456\x01".to_vec());
284 }
285
286 #[rstest]
287 fn test_process_large_buffer() {
288 let mut buffer = Vec::new();
289 let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
290 for _ in 0..1000 {
291 buffer.extend_from_slice(message);
292 }
293 let received = Arc::new(Mutex::new(Vec::new()));
294 let received_clone = received.clone();
295
296 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
297 received_clone.lock().unwrap().push(data.to_vec());
298 });
299
300 process_fix_buffer(&mut buffer, &handler);
301
302 assert_eq!(received.lock().unwrap().len(), 1000);
304 assert!(buffer.is_empty());
305 }
306}