nautilus_network/
fix.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Simple FIX message buffer processor.
17
18use memchr::memchr;
19
20use crate::socket::TcpMessageHandler;
21
22const MIN_MESSAGE_SIZE: usize = 10; // Minimum length for "8=FIX" + "10=xxx|"
23const MAX_MESSAGE_SIZE: usize = 8192; // Max message size to prevent buffer bloat
24const CHECKSUM_LEN: usize = 7; // Length of "10=xxx|"
25const CHECKSUM_TAG: &[u8] = b"10=";
26const START_PATTERN: &[u8] = b"8=FIX";
27const START_CHAR: u8 = b'8';
28const DELIMITER: u8 = b'\x01';
29
30/// Processes a mutable byte buffer containing FIX protocol messages.
31///
32/// Extracts complete messages starting with "8=FIX" (supporting various FIX versions)
33/// and ending with "10=xxx|" (where xxx is a three-digit checksum), passes them to the
34/// provided handler, and removes them from the buffer, leaving incomplete data for
35/// future processing.
36///
37/// # Assumptions
38///
39/// - Fields are delimited by SOH (`\x01`).
40/// - The checksum field is "10=xxx|" where xxx is a three-digit ASCII number.
41/// - Messages are ASCII-encoded.
42///
43/// # Behavior
44///
45/// - Uses `memchr` for efficient message start detection.
46/// - Discards malformed data up to the next potential message start.
47/// - Retains incomplete messages in the buffer for additional data.
48/// - Enforces a maximum message size to prevent buffer overflow.
49///
50/// # Warning
51///
52/// This parser is designed for basic FIX message processing and does not support all features
53/// of the FIX protocol. Notably, it lacks handling for repeating groups and other advanced
54/// structures, which may be required for full protocol compliance in complex scenarios.
55pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &TcpMessageHandler) {
56    let mut processed_to = 0;
57
58    while processed_to < buf.len() {
59        if buf.len() - processed_to < MIN_MESSAGE_SIZE {
60            break;
61        }
62
63        // Find the potential start of a FIX message
64        let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
65        if let Some(idx) = start_idx {
66            if idx + START_PATTERN.len() <= buf.len()
67                && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
68            {
69                // Search for message end
70                if let Some(end_pos) = find_message_end(&buf[idx..]) {
71                    let message_end = idx + end_pos;
72                    let message_len = message_end - idx;
73
74                    // Check if message exceeds max size
75                    if message_len > MAX_MESSAGE_SIZE {
76                        // Message exceeds max size, discard up to this point
77                        processed_to = idx + 1;
78                        continue;
79                    }
80
81                    let message = &buf[idx..message_end];
82                    handler(message); // Pass complete message to handler
83                    processed_to = message_end; // Update processed position
84                } else {
85                    // Incomplete message, wait for more data
86                    break;
87                }
88            } else {
89                // Invalid start pattern, discard data up to this point
90                processed_to = idx + 1;
91            }
92        } else {
93            // No message start found in the remaining buffer, clear it to avoid garbage buildup
94            buf.clear();
95            return;
96        }
97    }
98
99    // Remove all processed data from the buffer
100    if processed_to > 0 {
101        buf.drain(0..processed_to);
102    }
103}
104
105/// Locate the end of a FIX message. Searches for "10=xxx|" where xxx is a three-digit ASCII number.
106#[inline(always)]
107fn find_message_end(buf: &[u8]) -> Option<usize> {
108    let mut idx = 0;
109    while idx + CHECKSUM_LEN <= buf.len() {
110        if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
111            && buf[idx + 3].is_ascii_digit()
112            && buf[idx + 4].is_ascii_digit()
113            && buf[idx + 5].is_ascii_digit()
114            && buf[idx + 6] == DELIMITER
115        {
116            return Some(idx + CHECKSUM_LEN);
117        }
118        idx += 1;
119    }
120    None
121}
122
123#[cfg(test)]
124mod process_fix_buffer_tests {
125    use std::sync::{Arc, Mutex};
126
127    use rstest::rstest;
128
129    use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
130
131    #[rstest]
132    fn test_process_empty_buffer() {
133        let mut buffer = Vec::new();
134        let received = Arc::new(Mutex::new(Vec::new()));
135        let received_clone = received.clone();
136
137        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
138            received_clone.lock().unwrap().push(data.to_vec());
139        });
140
141        process_fix_buffer(&mut buffer, &handler);
142
143        // Buffer was empty, so no messages should be processed
144        assert!(received.lock().unwrap().is_empty());
145        assert!(buffer.is_empty());
146    }
147
148    #[rstest]
149    fn test_process_incomplete_message() {
150        // A partial FIX message without end
151        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
152        let received = Arc::new(Mutex::new(Vec::new()));
153        let received_clone = received.clone();
154
155        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
156            received_clone.lock().unwrap().push(data.to_vec());
157        });
158
159        process_fix_buffer(&mut buffer, &handler);
160
161        // No complete message, nothing should be processed
162        assert!(received.lock().unwrap().is_empty());
163        // Buffer should be preserved for more data
164        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
165    }
166
167    #[rstest]
168    fn test_process_complete_message() {
169        // A complete FIX message
170        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
171        let received = Arc::new(Mutex::new(Vec::new()));
172        let received_clone = received.clone();
173
174        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
175            received_clone.lock().unwrap().push(data.to_vec());
176        });
177
178        process_fix_buffer(&mut buffer, &handler);
179
180        assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
181    }
182
183    #[rstest]
184    fn test_process_message_with_garbage_prefix() {
185        // Message with garbage before the FIX header
186        let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
187        let received = Arc::new(Mutex::new(Vec::new()));
188        let received_clone = received.clone();
189
190        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
191            received_clone.lock().unwrap().push(data.to_vec());
192        });
193
194        process_fix_buffer(&mut buffer, &handler);
195
196        assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
197    }
198
199    #[rstest]
200    fn test_process_partial_checksum() {
201        // Message with partial checksum (missing the SOH)
202        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
203        let received = Arc::new(Mutex::new(Vec::new()));
204        let received_clone = received.clone();
205
206        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
207            received_clone.lock().unwrap().push(data.to_vec());
208        });
209
210        process_fix_buffer(&mut buffer, &handler);
211
212        // No complete message, nothing should be processed
213        assert!(received.lock().unwrap().is_empty());
214        // Buffer should be preserved
215        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
216    }
217
218    #[rstest]
219    fn test_process_multiple_messages_single_call() {
220        // Two complete messages
221        let mut buffer =
222            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
223                .to_vec();
224        let received = Arc::new(Mutex::new(Vec::new()));
225        let received_clone = received.clone();
226
227        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
228            received_clone.lock().unwrap().push(data.to_vec());
229        });
230
231        process_fix_buffer(&mut buffer, &handler);
232
233        assert_eq!(received.lock().unwrap().len(), 2);
234        assert_eq!(
235            received.lock().unwrap()[0],
236            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
237        );
238        assert_eq!(
239            received.lock().unwrap()[1],
240            b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
241        );
242        assert!(buffer.is_empty());
243    }
244
245    #[rstest]
246    fn test_process_message_with_invalid_checksum() {
247        // Message with invalid checksum format (not 3 digits)
248        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
249        let received = Arc::new(Mutex::new(Vec::new()));
250        let received_clone = received.clone();
251
252        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
253            received_clone.lock().unwrap().push(data.to_vec());
254        });
255
256        process_fix_buffer(&mut buffer, &handler);
257
258        // No message should be processed due to invalid checksum format
259        assert!(received.lock().unwrap().is_empty());
260        // Buffer should be preserved
261        assert_eq!(
262            buffer,
263            b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
264        );
265    }
266
267    #[rstest]
268    fn test_process_message_with_multiple_checksums() {
269        let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
270        let received = Arc::new(Mutex::new(Vec::new()));
271        let received_clone = received.clone();
272
273        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
274            received_clone.lock().unwrap().push(data.to_vec());
275        });
276
277        process_fix_buffer(&mut buffer, &handler);
278
279        // One message processed, extra data retained
280        assert_eq!(received.lock().unwrap().len(), 1);
281        assert_eq!(
282            received.lock().unwrap()[0],
283            b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
284        );
285        assert_eq!(buffer, b"10=456\x01".to_vec());
286    }
287
288    #[rstest]
289    fn test_process_large_buffer() {
290        let mut buffer = Vec::new();
291        let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
292        for _ in 0..1000 {
293            buffer.extend_from_slice(message);
294        }
295        let received = Arc::new(Mutex::new(Vec::new()));
296        let received_clone = received.clone();
297
298        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
299            received_clone.lock().unwrap().push(data.to_vec());
300        });
301
302        process_fix_buffer(&mut buffer, &handler);
303
304        // 1000 messages processed, buffer empty
305        assert_eq!(received.lock().unwrap().len(), 1000);
306        assert!(buffer.is_empty());
307    }
308}