nautilus_network/
fix.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Simple FIX message buffer processor.
17
18use std::sync::Arc;
19
20use memchr::memchr;
21
22use crate::socket::TcpMessageHandler;
23
24const MIN_MESSAGE_SIZE: usize = 10; // Minimum length for "8=FIX" + "10=xxx|"
25const MAX_MESSAGE_SIZE: usize = 8192; // Max message size to prevent buffer bloat
26const CHECKSUM_LEN: usize = 7; // Length of "10=xxx|"
27const CHECKSUM_TAG: &[u8] = b"10=";
28const START_PATTERN: &[u8] = b"8=FIX";
29const START_CHAR: u8 = b'8';
30const DELIMITER: u8 = b'\x01';
31
32/// Processes a mutable byte buffer containing FIX protocol messages.
33///
34/// Extracts complete messages starting with "8=FIX" (supporting various FIX versions)
35/// and ending with "10=xxx|" (where xxx is a three-digit checksum), passes them to the
36/// provided handler, and removes them from the buffer, leaving incomplete data for
37/// future processing.
38///
39/// # Assumptions
40///
41/// - Fields are delimited by SOH (`\x01`).
42/// - The checksum field is "10=xxx|" where xxx is a three-digit ASCII number.
43/// - Messages are ASCII-encoded.
44///
45/// # Behavior
46///
47/// - Uses `memchr` for efficient message start detection.
48/// - Discards malformed data up to the next potential message start.
49/// - Retains incomplete messages in the buffer for additional data.
50/// - Enforces a maximum message size to prevent buffer overflow.
51///
52/// # Warning
53///
54/// This parser is designed for basic FIX message processing and does not support all features
55/// of the FIX protocol. Notably, it lacks handling for repeating groups and other advanced
56/// structures, which may be required for full protocol compliance in complex scenarios.
57pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &Arc<TcpMessageHandler>) {
58    let mut processed_to = 0;
59
60    while processed_to < buf.len() {
61        if buf.len() - processed_to < MIN_MESSAGE_SIZE {
62            break;
63        }
64
65        // Find the potential start of a FIX message
66        let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
67        if let Some(idx) = start_idx {
68            if idx + START_PATTERN.len() <= buf.len()
69                && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
70            {
71                // Search for message end
72                if let Some(end_pos) = find_message_end(&buf[idx..]) {
73                    let message_end = idx + end_pos;
74                    if message_end - idx > MAX_MESSAGE_SIZE {
75                        // Message exceeds max size, discard up to this point
76                        processed_to = idx + 1;
77                        continue;
78                    }
79                    let message = &buf[idx..message_end];
80                    handler(message); // Pass complete message to handler
81                    processed_to = message_end; // Update processed position
82                } else {
83                    // Incomplete message, wait for more data
84                    break;
85                }
86            } else {
87                // Invalid start pattern, discard data up to this point
88                processed_to = idx + 1;
89            }
90        } else {
91            // No message start found in the remaining buffer, clear it to avoid garbage buildup
92            buf.clear();
93            return;
94        }
95    }
96
97    // Remove all processed data from the buffer
98    if processed_to > 0 {
99        buf.drain(0..processed_to);
100    }
101}
102
103/// Locate the end of a FIX message. Searches for "10=xxx|" where xxx is a three-digit ASCII number.
104#[inline(always)]
105fn find_message_end(buf: &[u8]) -> Option<usize> {
106    let mut idx = 0;
107    while idx + CHECKSUM_LEN <= buf.len() {
108        if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
109            && buf[idx + 3].is_ascii_digit()
110            && buf[idx + 4].is_ascii_digit()
111            && buf[idx + 5].is_ascii_digit()
112            && buf[idx + 6] == DELIMITER
113        {
114            return Some(idx + CHECKSUM_LEN);
115        }
116        idx += 1;
117    }
118    None
119}
120
121#[cfg(test)]
122mod process_fix_buffer_tests {
123    use std::sync::{Arc, Mutex};
124
125    use rstest::rstest;
126
127    use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
128
129    #[rstest]
130    fn test_process_empty_buffer() {
131        let mut buffer = Vec::new();
132        let received = Arc::new(Mutex::new(Vec::new()));
133        let received_clone = received.clone();
134
135        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
136            received_clone.lock().unwrap().push(data.to_vec());
137        });
138
139        process_fix_buffer(&mut buffer, &handler);
140
141        // Buffer was empty, so no messages should be processed
142        assert!(received.lock().unwrap().is_empty());
143        assert!(buffer.is_empty());
144    }
145
146    #[rstest]
147    fn test_process_incomplete_message() {
148        // A partial FIX message without end
149        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
150        let received = Arc::new(Mutex::new(Vec::new()));
151        let received_clone = received.clone();
152
153        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
154            received_clone.lock().unwrap().push(data.to_vec());
155        });
156
157        process_fix_buffer(&mut buffer, &handler);
158
159        // No complete message, nothing should be processed
160        assert!(received.lock().unwrap().is_empty());
161        // Buffer should be preserved for more data
162        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
163    }
164
165    #[rstest]
166    fn test_process_complete_message() {
167        // A complete FIX message
168        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
169        let received = Arc::new(Mutex::new(Vec::new()));
170        let received_clone = received.clone();
171
172        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
173            received_clone.lock().unwrap().push(data.to_vec());
174        });
175
176        process_fix_buffer(&mut buffer, &handler);
177
178        assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
179    }
180
181    #[rstest]
182    fn test_process_message_with_garbage_prefix() {
183        // Message with garbage before the FIX header
184        let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
185        let received = Arc::new(Mutex::new(Vec::new()));
186        let received_clone = received.clone();
187
188        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
189            received_clone.lock().unwrap().push(data.to_vec());
190        });
191
192        process_fix_buffer(&mut buffer, &handler);
193
194        assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
195    }
196
197    #[rstest]
198    fn test_process_partial_checksum() {
199        // Message with partial checksum (missing the SOH)
200        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
201        let received = Arc::new(Mutex::new(Vec::new()));
202        let received_clone = received.clone();
203
204        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
205            received_clone.lock().unwrap().push(data.to_vec());
206        });
207
208        process_fix_buffer(&mut buffer, &handler);
209
210        // No complete message, nothing should be processed
211        assert!(received.lock().unwrap().is_empty());
212        // Buffer should be preserved
213        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
214    }
215
216    #[rstest]
217    fn test_process_multiple_messages_single_call() {
218        // Two complete messages
219        let mut buffer =
220            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
221                .to_vec();
222        let received = Arc::new(Mutex::new(Vec::new()));
223        let received_clone = received.clone();
224
225        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
226            received_clone.lock().unwrap().push(data.to_vec());
227        });
228
229        process_fix_buffer(&mut buffer, &handler);
230
231        assert_eq!(received.lock().unwrap().len(), 2);
232        assert_eq!(
233            received.lock().unwrap()[0],
234            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
235        );
236        assert_eq!(
237            received.lock().unwrap()[1],
238            b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
239        );
240        assert!(buffer.is_empty());
241    }
242
243    #[rstest]
244    fn test_process_message_with_invalid_checksum() {
245        // Message with invalid checksum format (not 3 digits)
246        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
247        let received = Arc::new(Mutex::new(Vec::new()));
248        let received_clone = received.clone();
249
250        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
251            received_clone.lock().unwrap().push(data.to_vec());
252        });
253
254        process_fix_buffer(&mut buffer, &handler);
255
256        // No message should be processed due to invalid checksum format
257        assert!(received.lock().unwrap().is_empty());
258        // Buffer should be preserved
259        assert_eq!(
260            buffer,
261            b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
262        );
263    }
264
265    #[rstest]
266    fn test_process_message_with_multiple_checksums() {
267        let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
268        let received = Arc::new(Mutex::new(Vec::new()));
269        let received_clone = received.clone();
270
271        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
272            received_clone.lock().unwrap().push(data.to_vec());
273        });
274
275        process_fix_buffer(&mut buffer, &handler);
276
277        // One message processed, extra data retained
278        assert_eq!(received.lock().unwrap().len(), 1);
279        assert_eq!(
280            received.lock().unwrap()[0],
281            b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
282        );
283        assert_eq!(buffer, b"10=456\x01".to_vec());
284    }
285
286    #[rstest]
287    fn test_process_large_buffer() {
288        let mut buffer = Vec::new();
289        let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
290        for _ in 0..1000 {
291            buffer.extend_from_slice(message);
292        }
293        let received = Arc::new(Mutex::new(Vec::new()));
294        let received_clone = received.clone();
295
296        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
297            received_clone.lock().unwrap().push(data.to_vec());
298        });
299
300        process_fix_buffer(&mut buffer, &handler);
301
302        // 1000 messages processed, buffer empty
303        assert_eq!(received.lock().unwrap().len(), 1000);
304        assert!(buffer.is_empty());
305    }
306}