nautilus_network/
fix.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Simple FIX message buffer processor.
17
18use memchr::memchr;
19
20use crate::socket::TcpMessageHandler;
21
22const MIN_MESSAGE_SIZE: usize = 10; // Minimum length for "8=FIX" + "10=xxx|"
23const MAX_MESSAGE_SIZE: usize = 8192; // Max message size to prevent buffer bloat
24const CHECKSUM_LEN: usize = 7; // Length of "10=xxx|"
25const CHECKSUM_TAG: &[u8] = b"10=";
26const START_PATTERN: &[u8] = b"8=FIX";
27const START_CHAR: u8 = b'8';
28const DELIMITER: u8 = b'\x01';
29
30/// Processes a mutable byte buffer containing FIX protocol messages.
31///
32/// Extracts complete messages starting with "8=FIX" (supporting various FIX versions)
33/// and ending with "10=xxx|" (where xxx is a three-digit checksum), passes them to the
34/// provided handler, and removes them from the buffer, leaving incomplete data for
35/// future processing.
36///
37/// # Assumptions
38///
39/// - Fields are delimited by SOH (`\x01`).
40/// - The checksum field is "10=xxx|" where xxx is a three-digit ASCII number.
41/// - Messages are ASCII-encoded.
42///
43/// # Behavior
44///
45/// - Uses `memchr` for efficient message start detection.
46/// - Discards malformed data up to the next potential message start.
47/// - Retains incomplete messages in the buffer for additional data.
48/// - Enforces a maximum message size to prevent buffer overflow.
49///
50/// # Warning
51///
52/// This parser is designed for basic FIX message processing and does not support all features
53/// of the FIX protocol. Notably, it lacks handling for repeating groups and other advanced
54/// structures, which may be required for full protocol compliance in complex scenarios.
55pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &TcpMessageHandler) {
56    let mut processed_to = 0;
57
58    while processed_to < buf.len() {
59        if buf.len() - processed_to < MIN_MESSAGE_SIZE {
60            break;
61        }
62
63        // Find the potential start of a FIX message
64        let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
65        if let Some(idx) = start_idx {
66            if idx + START_PATTERN.len() <= buf.len()
67                && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
68            {
69                // Search for message end
70                if let Some(end_pos) = find_message_end(&buf[idx..]) {
71                    let message_end = idx + end_pos;
72                    let message_len = message_end - idx;
73
74                    // Check if message exceeds max size
75                    if message_len > MAX_MESSAGE_SIZE {
76                        // Message exceeds max size, discard up to this point
77                        processed_to = idx + 1;
78                        continue;
79                    }
80
81                    let message = &buf[idx..message_end];
82                    handler(message); // Pass complete message to handler
83                    processed_to = message_end; // Update processed position
84                } else {
85                    // Incomplete message, wait for more data
86                    break;
87                }
88            } else {
89                // Invalid start pattern, discard data up to this point
90                processed_to = idx + 1;
91            }
92        } else {
93            // No message start found in the remaining buffer
94            // Keep last 4 bytes in case they contain a partial "8=FIX" pattern at buffer boundary
95            // This prevents discarding partial message starts that span buffer reads
96            if buf.len() > START_PATTERN.len() - 1 {
97                let keep_from = buf.len() - (START_PATTERN.len() - 1);
98                buf.drain(0..keep_from);
99            }
100            return;
101        }
102    }
103
104    // Remove all processed data from the buffer
105    if processed_to > 0 {
106        buf.drain(0..processed_to);
107    }
108}
109
110/// Locate the end of a FIX message. Searches for "10=xxx|" where xxx is a three-digit ASCII number.
111#[inline(always)]
112fn find_message_end(buf: &[u8]) -> Option<usize> {
113    let mut idx = 0;
114    while idx + CHECKSUM_LEN <= buf.len() {
115        if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
116            && buf[idx + 3].is_ascii_digit()
117            && buf[idx + 4].is_ascii_digit()
118            && buf[idx + 5].is_ascii_digit()
119            && buf[idx + 6] == DELIMITER
120        {
121            return Some(idx + CHECKSUM_LEN);
122        }
123        idx += 1;
124    }
125    None
126}
127
128#[cfg(test)]
129mod process_fix_buffer_tests {
130    use std::sync::{Arc, Mutex};
131
132    use rstest::rstest;
133
134    use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
135
136    #[rstest]
137    fn test_process_empty_buffer() {
138        let mut buffer = Vec::new();
139        let received = Arc::new(Mutex::new(Vec::new()));
140        let received_clone = received.clone();
141
142        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
143            received_clone.lock().unwrap().push(data.to_vec());
144        });
145
146        process_fix_buffer(&mut buffer, &handler);
147
148        // Buffer was empty, so no messages should be processed
149        assert!(received.lock().unwrap().is_empty());
150        assert!(buffer.is_empty());
151    }
152
153    #[rstest]
154    fn test_process_incomplete_message() {
155        // A partial FIX message without end
156        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
157        let received = Arc::new(Mutex::new(Vec::new()));
158        let received_clone = received.clone();
159
160        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
161            received_clone.lock().unwrap().push(data.to_vec());
162        });
163
164        process_fix_buffer(&mut buffer, &handler);
165
166        // No complete message, nothing should be processed
167        assert!(received.lock().unwrap().is_empty());
168        // Buffer should be preserved for more data
169        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
170    }
171
172    #[rstest]
173    fn test_process_complete_message() {
174        // A complete FIX message
175        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
176        let received = Arc::new(Mutex::new(Vec::new()));
177        let received_clone = received.clone();
178
179        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
180            received_clone.lock().unwrap().push(data.to_vec());
181        });
182
183        process_fix_buffer(&mut buffer, &handler);
184
185        assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
186    }
187
188    #[rstest]
189    fn test_process_message_with_garbage_prefix() {
190        // Message with garbage before the FIX header
191        let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
192        let received = Arc::new(Mutex::new(Vec::new()));
193        let received_clone = received.clone();
194
195        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
196            received_clone.lock().unwrap().push(data.to_vec());
197        });
198
199        process_fix_buffer(&mut buffer, &handler);
200
201        assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
202    }
203
204    #[rstest]
205    fn test_process_partial_checksum() {
206        // Message with partial checksum (missing the SOH)
207        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
208        let received = Arc::new(Mutex::new(Vec::new()));
209        let received_clone = received.clone();
210
211        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
212            received_clone.lock().unwrap().push(data.to_vec());
213        });
214
215        process_fix_buffer(&mut buffer, &handler);
216
217        // No complete message, nothing should be processed
218        assert!(received.lock().unwrap().is_empty());
219        // Buffer should be preserved
220        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
221    }
222
223    #[rstest]
224    fn test_process_multiple_messages_single_call() {
225        // Two complete messages
226        let mut buffer =
227            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
228                .to_vec();
229        let received = Arc::new(Mutex::new(Vec::new()));
230        let received_clone = received.clone();
231
232        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
233            received_clone.lock().unwrap().push(data.to_vec());
234        });
235
236        process_fix_buffer(&mut buffer, &handler);
237
238        assert_eq!(received.lock().unwrap().len(), 2);
239        assert_eq!(
240            received.lock().unwrap()[0],
241            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
242        );
243        assert_eq!(
244            received.lock().unwrap()[1],
245            b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
246        );
247        assert!(buffer.is_empty());
248    }
249
250    #[rstest]
251    fn test_process_message_with_invalid_checksum() {
252        // Message with invalid checksum format (not 3 digits)
253        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
254        let received = Arc::new(Mutex::new(Vec::new()));
255        let received_clone = received.clone();
256
257        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
258            received_clone.lock().unwrap().push(data.to_vec());
259        });
260
261        process_fix_buffer(&mut buffer, &handler);
262
263        // No message should be processed due to invalid checksum format
264        assert!(received.lock().unwrap().is_empty());
265        // Buffer should be preserved
266        assert_eq!(
267            buffer,
268            b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
269        );
270    }
271
272    #[rstest]
273    fn test_process_message_with_multiple_checksums() {
274        let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
275        let received = Arc::new(Mutex::new(Vec::new()));
276        let received_clone = received.clone();
277
278        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
279            received_clone.lock().unwrap().push(data.to_vec());
280        });
281
282        process_fix_buffer(&mut buffer, &handler);
283
284        // One message processed, extra data retained
285        assert_eq!(received.lock().unwrap().len(), 1);
286        assert_eq!(
287            received.lock().unwrap()[0],
288            b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
289        );
290        assert_eq!(buffer, b"10=456\x01".to_vec());
291    }
292
293    #[rstest]
294    fn test_process_large_buffer() {
295        let mut buffer = Vec::new();
296        let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
297        for _ in 0..1000 {
298            buffer.extend_from_slice(message);
299        }
300        let received = Arc::new(Mutex::new(Vec::new()));
301        let received_clone = received.clone();
302
303        let handler: TcpMessageHandler = Arc::new(move |data: &[u8]| {
304            received_clone.lock().unwrap().push(data.to_vec());
305        });
306
307        process_fix_buffer(&mut buffer, &handler);
308
309        // 1000 messages processed, buffer empty
310        assert_eq!(received.lock().unwrap().len(), 1000);
311        assert!(buffer.is_empty());
312    }
313}