nautilus_coinbase_intx/fix/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! FIX Client for the Coinbase International Drop Copy Endpoint.
17//!
18//! This implementation focuses specifically on processing execution reports
19//! via the FIX protocol, leveraging the existing `SocketClient` for TCP/TLS connectivity.
20//!
21//! # Warning
22//!
23//! **Not a full FIX engine**: This client supports only the Coinbase International Drop Copy
24//! endpoint and lacks general-purpose FIX functionality.
25use std::{
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicUsize, Ordering},
29    },
30    time::Duration,
31};
32
33use base64::prelude::*;
34use nautilus_core::{python::IntoPyObjectNautilusExt, time::get_atomic_clock_realtime};
35use nautilus_model::identifiers::AccountId;
36use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
37use pyo3::prelude::*;
38use ring::hmac;
39use tokio::task::JoinHandle;
40use tokio_tungstenite::tungstenite::stream::Mode;
41
42use super::{
43    messages::{FIX_DELIMITER, FixMessage},
44    parse::convert_to_order_status_report,
45};
46use crate::{
47    common::{consts::COINBASE_INTX, credential::get_env_var},
48    fix::{
49        messages::{fix_exec_type, fix_message_type, fix_tag},
50        parse::convert_to_fill_report,
51    },
52};
53
54#[pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")]
55#[derive(Clone)]
56pub struct CoinbaseIntxFixClient {
57    endpoint: String,
58    api_key: String,
59    api_secret: String,
60    api_passphrase: String,
61    portfolio_id: String,
62    sender_comp_id: String,
63    target_comp_id: String,
64    socket: Option<Arc<SocketClient>>,
65    connected: Arc<AtomicBool>,
66    logged_on: Arc<AtomicBool>,
67    seq_num: Arc<AtomicUsize>,
68    received_seq_num: Arc<AtomicUsize>,
69    heartbeat_secs: u64,
70    processing_task: Option<Arc<JoinHandle<()>>>,
71    heartbeat_task: Option<Arc<JoinHandle<()>>>,
72}
73
74impl CoinbaseIntxFixClient {
75    /// Creates a new [`CoinbaseIntxFixClient`] instance.
76    pub fn new(
77        endpoint: Option<String>,
78        api_key: Option<String>,
79        api_secret: Option<String>,
80        api_passphrase: Option<String>,
81        portfolio_id: Option<String>,
82    ) -> anyhow::Result<Self> {
83        let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
84        let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
85        let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
86        let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
87        let portfolio_id = portfolio_id.unwrap_or(get_env_var("COINBASE_INTX_PORTFOLIO_ID")?);
88        let sender_comp_id = api_key.to_string();
89        let target_comp_id = "CBINTLDC".to_string(); // Drop Copy endpoint
90
91        Ok(Self {
92            endpoint,
93            api_key,
94            api_secret,
95            api_passphrase,
96            portfolio_id,
97            sender_comp_id,
98            target_comp_id,
99            socket: None,
100            connected: Arc::new(AtomicBool::new(false)),
101            logged_on: Arc::new(AtomicBool::new(false)),
102            seq_num: Arc::new(AtomicUsize::new(1)),
103            received_seq_num: Arc::new(AtomicUsize::new(0)),
104            heartbeat_secs: 10, // Default (probably no need to change)
105            processing_task: None,
106            heartbeat_task: None,
107        })
108    }
109
110    /// Creates a new authenticated [`CoinbaseIntxFixClient`] instance using
111    /// environment variables and the default Coinbase International FIX drop copy endpoint.
112    pub fn from_env() -> anyhow::Result<Self> {
113        Self::new(None, None, None, None, None)
114    }
115
116    /// Returns the FIX endpoint being used by the client.
117    pub fn endpoint(&self) -> &str {
118        self.endpoint.as_str()
119    }
120
121    /// Returns the public API key being used by the client.
122    pub fn api_key(&self) -> &str {
123        self.api_key.as_str()
124    }
125
126    /// Returns the Coinbase International portfolio ID being used by the client.
127    pub fn portfolio_id(&self) -> &str {
128        self.portfolio_id.as_str()
129    }
130
131    /// Returns the sender company ID being used by the client.
132    pub fn sender_comp_id(&self) -> &str {
133        self.sender_comp_id.as_str()
134    }
135
136    /// Returns the target company ID being used by the client.
137    pub fn target_comp_id(&self) -> &str {
138        self.target_comp_id.as_str()
139    }
140
141    /// Checks if the client is connected.
142    pub fn is_connected(&self) -> bool {
143        self.connected.load(Ordering::SeqCst)
144    }
145
146    /// Checks if the client is logged on.
147    pub fn is_logged_on(&self) -> bool {
148        self.logged_on.load(Ordering::SeqCst)
149    }
150
151    /// Connects to the Coinbase International FIX Drop Copy endpoint.
152    pub async fn connect(&mut self, handler: PyObject) -> anyhow::Result<()> {
153        let config = SocketConfig {
154            url: self.endpoint.clone(),
155            mode: Mode::Tls,
156            suffix: vec![FIX_DELIMITER],
157            py_handler: None, // Using handler from arg (TODO: refactor this config pattern)
158            heartbeat: None,  // Using FIX heartbeats
159            reconnect_timeout_ms: Some(10000),
160            reconnect_delay_initial_ms: Some(5000),
161            reconnect_delay_max_ms: Some(30000),
162            reconnect_backoff_factor: Some(1.5),
163            reconnect_jitter_ms: Some(500),
164            certs_dir: None,
165        };
166
167        let logged_on = self.logged_on.clone();
168        let seq_num = self.seq_num.clone();
169        let received_seq_num = self.received_seq_num.clone();
170        let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
171
172        let handle_message = Arc::new(move |data: &[u8]| {
173            if let Ok(message) = FixMessage::parse(data) {
174                // Update received sequence number
175                if let Some(msg_seq) = message.msg_seq_num() {
176                    received_seq_num.store(msg_seq, Ordering::SeqCst);
177                }
178
179                // Process message based on type
180                if let Some(msg_type) = message.msg_type() {
181                    match msg_type {
182                        fix_message_type::LOGON => {
183                            tracing::info!("Logon successful");
184                            logged_on.store(true, Ordering::SeqCst);
185                        }
186                        fix_message_type::LOGOUT => {
187                            tracing::info!("Received logout");
188                            logged_on.store(false, Ordering::SeqCst);
189                        }
190                        fix_message_type::EXECUTION_REPORT => {
191                            if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
192                                if matches!(
193                                    exec_type,
194                                    fix_exec_type::REJECTED
195                                        | fix_exec_type::NEW
196                                        | fix_exec_type::PENDING_NEW
197                                ) {
198                                    // These order events are already handled by the client
199                                    tracing::debug!(
200                                        "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
201                                    );
202                                } else if matches!(
203                                    exec_type,
204                                    fix_exec_type::CANCELED
205                                        | fix_exec_type::EXPIRED
206                                        | fix_exec_type::REPLACED
207                                ) {
208                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
209                                    let ts_init = clock.get_time_ns();
210                                    match convert_to_order_status_report(
211                                        &message, account_id, ts_init,
212                                    ) {
213                                        Ok(report) => Python::with_gil(|py| {
214                                            call_python(
215                                                py,
216                                                &handler,
217                                                report.into_py_any_unwrap(py),
218                                            );
219                                        }),
220                                        Err(e) => {
221                                            tracing::error!(
222                                                "Failed to parse FIX execution report: {e}"
223                                            );
224                                        }
225                                    }
226                                } else if exec_type == fix_exec_type::PARTIAL_FILL
227                                    || exec_type == fix_exec_type::FILL
228                                {
229                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
230                                    let ts_init = clock.get_time_ns();
231                                    match convert_to_fill_report(&message, account_id, ts_init) {
232                                        Ok(report) => Python::with_gil(|py| {
233                                            call_python(
234                                                py,
235                                                &handler,
236                                                report.into_py_any_unwrap(py),
237                                            );
238                                        }),
239                                        Err(e) => {
240                                            tracing::error!(
241                                                "Failed to parse FIX execution report: {e}"
242                                            );
243                                        }
244                                    }
245                                } else {
246                                    tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
247                                }
248                            }
249                        }
250                        // These can be HEARTBEAT or TEST_REQUEST messages,
251                        // ideally we'd respond to these with a heartbeat
252                        // including tag 112 TestReqID.
253                        _ => tracing::trace!("Recieved unexpected {message:?}"),
254                    }
255                }
256            } else {
257                tracing::error!("Failed to parse FIX message");
258            }
259        });
260
261        let socket =
262            match SocketClient::connect(config, Some(handle_message), None, None, None).await {
263                Ok(socket) => socket,
264                Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
265            };
266
267        let writer_tx = socket.writer_tx.clone();
268
269        self.socket = Some(Arc::new(socket));
270
271        self.send_logon().await?;
272
273        // Create task to monitor connection and send logon after reconnect
274        let connected_clone = self.connected.clone();
275        let logged_on_clone = self.logged_on.clone();
276        let heartbeat_secs = self.heartbeat_secs;
277        let client_clone = self.clone();
278
279        self.processing_task = Some(Arc::new(tokio::spawn(async move {
280            tracing::debug!("Started task 'maintain FIX connection'");
281
282            let mut last_logon_attempt = std::time::Instant::now() - Duration::from_secs(10);
283
284            loop {
285                tokio::time::sleep(Duration::from_millis(100)).await;
286
287                // Check if connected but not logged on
288                if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
289                {
290                    // Rate limit logon attempts
291                    if last_logon_attempt.elapsed() > Duration::from_secs(10) {
292                        tracing::info!("Connected without logon");
293                        last_logon_attempt = std::time::Instant::now();
294
295                        if let Err(e) = client_clone.send_logon().await {
296                            tracing::error!("Failed to send logon: {e}");
297                        }
298                    }
299                }
300            }
301        })));
302
303        let logged_on_clone = self.logged_on.clone();
304        let sender_comp_id = self.sender_comp_id.clone();
305        let target_comp_id = self.target_comp_id.clone();
306
307        self.heartbeat_task = Some(Arc::new(tokio::spawn(async move {
308            tracing::debug!("Started task 'FIX heartbeat' at {heartbeat_secs}s intervals");
309            let interval = Duration::from_secs(heartbeat_secs);
310
311            loop {
312                if logged_on_clone.load(Ordering::SeqCst) {
313                    // Create new heartbeat message
314                    let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
315                    let now = chrono::Utc::now();
316                    let msg =
317                        FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
318
319                    if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
320                        tracing::error!("Failed to send heartbeat: {e}");
321                        break;
322                    }
323
324                    tracing::trace!("Sent heartbeat");
325                } else {
326                    // No longer logged on
327                    tracing::debug!("No longer logged on, stopping heartbeat task");
328                    break;
329                }
330
331                tokio::time::sleep(interval).await;
332            }
333
334            tracing::debug!("Stopped task 'FIX heartbeat'");
335        })));
336
337        Ok(())
338    }
339
340    /// Closes the connection.
341    pub async fn close(&mut self) -> anyhow::Result<()> {
342        // Send logout message if connected
343        if self.is_logged_on() {
344            if let Err(e) = self.send_logout("Normal logout").await {
345                tracing::warn!("Failed to send logout message: {e}");
346            }
347        }
348
349        // Close socket
350        if let Some(socket) = &self.socket {
351            socket.close().await;
352        }
353
354        // Cancel processing task
355        if let Some(task) = self.processing_task.take() {
356            task.abort();
357        }
358
359        // Cancel heartbeat task
360        if let Some(task) = self.heartbeat_task.take() {
361            task.abort();
362        }
363
364        self.connected.store(false, Ordering::SeqCst);
365        self.logged_on.store(false, Ordering::SeqCst);
366
367        Ok(())
368    }
369
370    /// Send a logon message
371    async fn send_logon(&self) -> anyhow::Result<()> {
372        if self.socket.is_none() {
373            anyhow::bail!("Socket not connected".to_string());
374        }
375
376        // Reset sequence number
377        self.seq_num.store(1, Ordering::SeqCst);
378
379        let now = chrono::Utc::now();
380        let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
381        let passphrase = self.api_passphrase.clone();
382
383        let message = format!(
384            "{}{}{}{}",
385            timestamp, self.api_key, self.target_comp_id, passphrase
386        );
387
388        // Create signature
389        let decoded_secret = BASE64_STANDARD
390            .decode(&self.api_secret)
391            .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
392
393        let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
394        let signature = hmac::sign(&hmac_key, message.as_bytes());
395        let encoded_signature = BASE64_STANDARD.encode(signature);
396
397        let logon_msg = FixMessage::create_logon(
398            1, // Always use 1 for new logon with reset
399            &self.sender_comp_id,
400            &self.target_comp_id,
401            self.heartbeat_secs,
402            &self.api_key,
403            &passphrase,
404            &encoded_signature,
405            &now,
406        );
407
408        if let Some(socket) = &self.socket {
409            tracing::info!("Logging on...");
410
411            match socket.send_bytes(logon_msg.to_bytes()).await {
412                Ok(_) => tracing::debug!("Sent logon message"),
413                Err(e) => tracing::error!("Error on logon: {e}"),
414            }
415        } else {
416            anyhow::bail!("Socket not connected".to_string());
417        }
418
419        let start = std::time::Instant::now();
420        while !self.is_logged_on() {
421            tokio::time::sleep(Duration::from_millis(100)).await;
422
423            if start.elapsed() > Duration::from_secs(10) {
424                anyhow::bail!("Logon timeout".to_string());
425            }
426        }
427
428        self.logged_on.store(true, Ordering::SeqCst);
429
430        Ok(())
431    }
432
433    /// Sends a logout message.
434    async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
435        if self.socket.is_none() {
436            anyhow::bail!("Socket not connected".to_string());
437        }
438
439        let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
440        let now = chrono::Utc::now();
441
442        let logout_msg = FixMessage::create_logout(
443            seq_num,
444            &self.sender_comp_id,
445            &self.target_comp_id,
446            Some(text),
447            &now,
448        );
449
450        if let Some(socket) = &self.socket {
451            match socket.send_bytes(logout_msg.to_bytes()).await {
452                Ok(_) => tracing::debug!("Sent logout message"),
453                Err(e) => tracing::error!("Error on logout: {e}"),
454            }
455        } else {
456            anyhow::bail!("Socket not connected".to_string());
457        }
458
459        Ok(())
460    }
461}
462
463// Can't be moved to core because we don't want to depend on tracing there
464pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
465    if let Err(e) = callback.call1(py, (py_obj,)) {
466        tracing::error!("Error calling Python: {e}");
467    }
468}