nautilus_network/
fix.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Simple FIX message buffer processor.
17
18use memchr::memchr;
19
20use crate::socket::TcpMessageHandler;
21
22const MIN_MESSAGE_SIZE: usize = 10; // Minimum length for "8=FIX" + "10=xxx|"
23const MAX_MESSAGE_SIZE: usize = 8192; // Max message size to prevent buffer bloat
24const CHECKSUM_LEN: usize = 7; // Length of "10=xxx|"
25const CHECKSUM_TAG: &[u8] = b"10=";
26const START_PATTERN: &[u8] = b"8=FIX";
27const START_CHAR: u8 = b'8';
28const DELIMITER: u8 = b'\x01';
29
30/// Processes a mutable byte buffer containing FIX protocol messages.
31///
32/// Extracts complete messages starting with "8=FIX" (supporting various FIX versions)
33/// and ending with "10=xxx|" (where xxx is a three-digit checksum), passes them to the
34/// provided handler, and removes them from the buffer, leaving incomplete data for
35/// future processing.
36///
37/// # Assumptions
38///
39/// - Fields are delimited by SOH (`\x01`).
40/// - The checksum field is "10=xxx|" where xxx is a three-digit ASCII number.
41/// - Messages are ASCII-encoded.
42///
43/// # Behavior
44///
45/// - Uses `memchr` for efficient message start detection.
46/// - Discards malformed data up to the next potential message start.
47/// - Retains incomplete messages in the buffer for additional data.
48/// - Enforces a maximum message size to prevent buffer overflow.
49///
50/// # Warning
51///
52/// This parser is designed for basic FIX message processing and does not support all features
53/// of the FIX protocol. Notably, it lacks handling for repeating groups and other advanced
54/// structures, which may be required for full protocol compliance in complex scenarios.
55pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &TcpMessageHandler) {
56    let mut processed_to = 0;
57
58    while processed_to < buf.len() {
59        if buf.len() - processed_to < MIN_MESSAGE_SIZE {
60            break;
61        }
62
63        // Find the potential start of a FIX message
64        let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
65        if let Some(idx) = start_idx {
66            if idx + START_PATTERN.len() <= buf.len()
67                && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
68            {
69                // Search for message end
70                if let Some(end_pos) = find_message_end(&buf[idx..]) {
71                    let message_end = idx + end_pos;
72                    let message_len = message_end - idx;
73
74                    // Check if message exceeds max size
75                    if message_len > MAX_MESSAGE_SIZE {
76                        // Message exceeds max size, discard up to this point
77                        processed_to = idx + 1;
78                        continue;
79                    }
80
81                    let message = &buf[idx..message_end];
82                    handler(message); // Pass complete message to handler
83                    processed_to = message_end; // Update processed position
84                } else {
85                    // Incomplete message, wait for more data
86                    break;
87                }
88            } else {
89                // Invalid start pattern, discard data up to this point
90                processed_to = idx + 1;
91            }
92        } else {
93            // No message start found in the remaining buffer
94            // Keep last 4 bytes in case they contain a partial "8=FIX" pattern at buffer boundary
95            // This prevents discarding partial message starts that span buffer reads
96            if buf.len() > START_PATTERN.len() - 1 {
97                let keep_from = buf.len() - (START_PATTERN.len() - 1);
98                buf.drain(0..keep_from);
99            }
100            return;
101        }
102    }
103
104    // Remove all processed data from the buffer
105    if processed_to > 0 {
106        buf.drain(0..processed_to);
107    }
108}
109
110/// Locate the end of a FIX message. Searches for "10=xxx|" where xxx is a three-digit ASCII number.
111#[inline(always)]
112fn find_message_end(buf: &[u8]) -> Option<usize> {
113    let mut idx = 0;
114    while idx + CHECKSUM_LEN <= buf.len() {
115        if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
116            && buf[idx + 3].is_ascii_digit()
117            && buf[idx + 4].is_ascii_digit()
118            && buf[idx + 5].is_ascii_digit()
119            && buf[idx + 6] == DELIMITER
120        {
121            return Some(idx + CHECKSUM_LEN);
122        }
123        idx += 1;
124    }
125    None
126}
127
128#[cfg(test)]
129mod process_fix_buffer_tests {
130    use std::sync::{Arc, Mutex};
131
132    use nautilus_core::MUTEX_POISONED;
133    use rstest::rstest;
134
135    use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
136
137    #[rstest]
138    fn test_process_empty_buffer() {
139        let mut buffer = Vec::new();
140        let received = Arc::new(Mutex::new(Vec::new()));
141        let received_clone = received.clone();
142
143        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
144            received_clone
145                .lock()
146                .expect(MUTEX_POISONED)
147                .push(data.to_vec());
148        });
149
150        process_fix_buffer(&mut buffer, &handler);
151
152        // Buffer was empty, so no messages should be processed
153        assert!(received.lock().expect(MUTEX_POISONED).is_empty());
154        assert!(buffer.is_empty());
155    }
156
157    #[rstest]
158    fn test_process_incomplete_message() {
159        // A partial FIX message without end
160        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
161        let received = Arc::new(Mutex::new(Vec::new()));
162        let received_clone = received.clone();
163
164        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
165            received_clone
166                .lock()
167                .expect(MUTEX_POISONED)
168                .push(data.to_vec());
169        });
170
171        process_fix_buffer(&mut buffer, &handler);
172
173        // No complete message, nothing should be processed
174        assert!(received.lock().expect(MUTEX_POISONED).is_empty());
175        // Buffer should be preserved for more data
176        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
177    }
178
179    #[rstest]
180    fn test_process_complete_message() {
181        // A complete FIX message
182        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
183        let received = Arc::new(Mutex::new(Vec::new()));
184        let received_clone = received.clone();
185
186        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
187            received_clone
188                .lock()
189                .expect(MUTEX_POISONED)
190                .push(data.to_vec());
191        });
192
193        process_fix_buffer(&mut buffer, &handler);
194
195        assert!(buffer.is_empty() || received.lock().expect(MUTEX_POISONED).len() == 1);
196    }
197
198    #[rstest]
199    fn test_process_message_with_garbage_prefix() {
200        // Message with garbage before the FIX header
201        let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
202        let received = Arc::new(Mutex::new(Vec::new()));
203        let received_clone = received.clone();
204
205        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
206            received_clone
207                .lock()
208                .expect(MUTEX_POISONED)
209                .push(data.to_vec());
210        });
211
212        process_fix_buffer(&mut buffer, &handler);
213
214        assert!(buffer.is_empty() || received.lock().expect(MUTEX_POISONED).len() == 1);
215    }
216
217    #[rstest]
218    fn test_process_partial_checksum() {
219        // Message with partial checksum (missing the SOH)
220        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
221        let received = Arc::new(Mutex::new(Vec::new()));
222        let received_clone = received.clone();
223
224        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
225            received_clone
226                .lock()
227                .expect(MUTEX_POISONED)
228                .push(data.to_vec());
229        });
230
231        process_fix_buffer(&mut buffer, &handler);
232
233        // No complete message, nothing should be processed
234        assert!(received.lock().expect(MUTEX_POISONED).is_empty());
235        // Buffer should be preserved
236        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
237    }
238
239    #[rstest]
240    fn test_process_multiple_messages_single_call() {
241        // Two complete messages
242        let mut buffer =
243            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
244                .to_vec();
245        let received = Arc::new(Mutex::new(Vec::new()));
246        let received_clone = received.clone();
247
248        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
249            received_clone
250                .lock()
251                .expect(MUTEX_POISONED)
252                .push(data.to_vec());
253        });
254
255        process_fix_buffer(&mut buffer, &handler);
256
257        assert_eq!(received.lock().expect(MUTEX_POISONED).len(), 2);
258        assert_eq!(
259            received.lock().expect(MUTEX_POISONED)[0],
260            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
261        );
262        assert_eq!(
263            received.lock().expect(MUTEX_POISONED)[1],
264            b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
265        );
266        assert!(buffer.is_empty());
267    }
268
269    #[rstest]
270    fn test_process_message_with_invalid_checksum() {
271        // Message with invalid checksum format (not 3 digits)
272        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
273        let received = Arc::new(Mutex::new(Vec::new()));
274        let received_clone = received.clone();
275
276        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
277            received_clone
278                .lock()
279                .expect(MUTEX_POISONED)
280                .push(data.to_vec());
281        });
282
283        process_fix_buffer(&mut buffer, &handler);
284
285        // No message should be processed due to invalid checksum format
286        assert!(received.lock().expect(MUTEX_POISONED).is_empty());
287        // Buffer should be preserved
288        assert_eq!(
289            buffer,
290            b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
291        );
292    }
293
294    #[rstest]
295    fn test_process_message_with_multiple_checksums() {
296        let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
297        let received = Arc::new(Mutex::new(Vec::new()));
298        let received_clone = received.clone();
299
300        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
301            received_clone
302                .lock()
303                .expect(MUTEX_POISONED)
304                .push(data.to_vec());
305        });
306
307        process_fix_buffer(&mut buffer, &handler);
308
309        // One message processed, extra data retained
310        assert_eq!(received.lock().expect(MUTEX_POISONED).len(), 1);
311        assert_eq!(
312            received.lock().expect(MUTEX_POISONED)[0],
313            b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
314        );
315        assert_eq!(buffer, b"10=456\x01".to_vec());
316    }
317
318    #[rstest]
319    fn test_process_large_buffer() {
320        let mut buffer = Vec::new();
321        let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
322        for _ in 0..1000 {
323            buffer.extend_from_slice(message);
324        }
325        let received = Arc::new(Mutex::new(Vec::new()));
326        let received_clone = received.clone();
327
328        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
329            received_clone
330                .lock()
331                .expect(MUTEX_POISONED)
332                .push(data.to_vec());
333        });
334
335        process_fix_buffer(&mut buffer, &handler);
336
337        // 1000 messages processed, buffer empty
338        assert_eq!(received.lock().expect(MUTEX_POISONED).len(), 1000);
339        assert!(buffer.is_empty());
340    }
341}