Skip to main content

nautilus_coinbase_intx/fix/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! FIX Client for the Coinbase International Drop Copy Endpoint.
17//!
18//! This implementation focuses specifically on processing execution reports
19//! via the FIX protocol, leveraging the existing `SocketClient` for TCP/TLS connectivity.
20//!
21//! # Warning
22//!
23//! **Not a full FIX engine**: This client supports only the Coinbase International Drop Copy
24//! endpoint and lacks general-purpose FIX functionality.
25use std::{
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicUsize, Ordering},
29    },
30    time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::{
36    live::get_runtime,
37    logging::{log_task_started, log_task_stopped},
38};
39#[cfg(feature = "python")]
40use nautilus_core::python::{IntoPyObjectNautilusExt, call_python};
41use nautilus_core::{env::get_or_env_var, time::get_atomic_clock_realtime};
42use nautilus_model::identifiers::AccountId;
43use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
44#[cfg(feature = "python")]
45use pyo3::prelude::*;
46use tokio::task::JoinHandle;
47use tokio_tungstenite::tungstenite::stream::Mode;
48
49use super::{
50    messages::{FIX_DELIMITER, FixMessage},
51    parse::convert_to_order_status_report,
52};
53use crate::{
54    common::consts::COINBASE_INTX,
55    fix::{
56        messages::{fix_exec_type, fix_message_type, fix_tag},
57        parse::convert_to_fill_report,
58    },
59};
60
61#[cfg_attr(
62    feature = "python",
63    pyo3::pyclass(
64        module = "nautilus_trader.core.nautilus_pyo3.coinbase_intx",
65        from_py_object
66    )
67)]
68#[derive(Debug, Clone)]
69pub struct CoinbaseIntxFixClient {
70    endpoint: String,
71    api_key: String,
72    api_secret: String,
73    api_passphrase: String,
74    portfolio_id: String,
75    sender_comp_id: String,
76    target_comp_id: String,
77    socket: Option<Arc<SocketClient>>,
78    connected: Arc<AtomicBool>,
79    logged_on: Arc<AtomicBool>,
80    seq_num: Arc<AtomicUsize>,
81    received_seq_num: Arc<AtomicUsize>,
82    heartbeat_secs: u64,
83    processing_task: Option<Arc<JoinHandle<()>>>,
84    heartbeat_task: Option<Arc<JoinHandle<()>>>,
85}
86
87impl CoinbaseIntxFixClient {
88    /// Creates a new [`CoinbaseIntxFixClient`] instance.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if required environment variables or parameters are missing.
93    pub fn new(
94        endpoint: Option<String>,
95        api_key: Option<String>,
96        api_secret: Option<String>,
97        api_passphrase: Option<String>,
98        portfolio_id: Option<String>,
99    ) -> anyhow::Result<Self> {
100        let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
101        let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
102        let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
103        let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
104        let portfolio_id = get_or_env_var(portfolio_id, "COINBASE_INTX_PORTFOLIO_ID")?;
105        let sender_comp_id = api_key.clone();
106        let target_comp_id = "CBINTLDC".to_string(); // Drop Copy endpoint
107
108        Ok(Self {
109            endpoint,
110            api_key,
111            api_secret,
112            api_passphrase,
113            portfolio_id,
114            sender_comp_id,
115            target_comp_id,
116            socket: None,
117            connected: Arc::new(AtomicBool::new(false)),
118            logged_on: Arc::new(AtomicBool::new(false)),
119            seq_num: Arc::new(AtomicUsize::new(1)),
120            received_seq_num: Arc::new(AtomicUsize::new(0)),
121            heartbeat_secs: 10, // Default (probably no need to change)
122            processing_task: None,
123            heartbeat_task: None,
124        })
125    }
126
127    /// Creates a new authenticated [`CoinbaseIntxFixClient`] instance using
128    /// environment variables and the default Coinbase International FIX drop copy endpoint.
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if required environment variables are not set.
133    pub fn from_env() -> anyhow::Result<Self> {
134        Self::new(None, None, None, None, None)
135    }
136
137    /// Returns the FIX endpoint being used by the client.
138    #[must_use]
139    pub const fn endpoint(&self) -> &str {
140        self.endpoint.as_str()
141    }
142
143    /// Returns the public API key being used by the client.
144    #[must_use]
145    pub const fn api_key(&self) -> &str {
146        self.api_key.as_str()
147    }
148
149    /// Returns a masked version of the API key for logging purposes.
150    #[must_use]
151    pub fn api_key_masked(&self) -> String {
152        nautilus_core::string::mask_api_key(&self.api_key)
153    }
154
155    /// Returns the Coinbase International portfolio ID being used by the client.
156    #[must_use]
157    pub const fn portfolio_id(&self) -> &str {
158        self.portfolio_id.as_str()
159    }
160
161    /// Returns the sender company ID being used by the client.
162    #[must_use]
163    pub const fn sender_comp_id(&self) -> &str {
164        self.sender_comp_id.as_str()
165    }
166
167    /// Returns the target company ID being used by the client.
168    #[must_use]
169    pub const fn target_comp_id(&self) -> &str {
170        self.target_comp_id.as_str()
171    }
172
173    /// Checks if the client is connected.
174    #[must_use]
175    pub fn is_connected(&self) -> bool {
176        self.connected.load(Ordering::SeqCst)
177    }
178
179    /// Checks if the client is logged on.
180    #[must_use]
181    pub fn is_logged_on(&self) -> bool {
182        self.logged_on.load(Ordering::SeqCst)
183    }
184
185    /// Connects to the Coinbase International FIX Drop Copy endpoint.
186    ///
187    /// # Panics
188    ///
189    /// Panics if time calculation or unwrap logic inside fails during logon retry setup.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if network connection or FIX logon fails.
194    pub async fn connect(
195        &mut self,
196        #[cfg(feature = "python")] handler: Py<PyAny>,
197        #[cfg(not(feature = "python"))] _handler: (),
198    ) -> anyhow::Result<()> {
199        let logged_on = self.logged_on.clone();
200        let seq_num = self.seq_num.clone();
201        let received_seq_num = self.received_seq_num.clone();
202        let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
203
204        let handle_message = Arc::new(move |data: &[u8]| {
205            if let Ok(message) = FixMessage::parse(data) {
206                // Update received sequence number
207                if let Some(msg_seq) = message.msg_seq_num() {
208                    received_seq_num.store(msg_seq, Ordering::SeqCst);
209                }
210
211                // Process message based on type
212                if let Some(msg_type) = message.msg_type() {
213                    match msg_type {
214                        fix_message_type::LOGON => {
215                            log::info!("Logon successful");
216                            logged_on.store(true, Ordering::SeqCst);
217                        }
218                        fix_message_type::LOGOUT => {
219                            log::info!("Received logout");
220                            logged_on.store(false, Ordering::SeqCst);
221                        }
222                        fix_message_type::EXECUTION_REPORT => {
223                            if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
224                                if matches!(
225                                    exec_type,
226                                    fix_exec_type::REJECTED
227                                        | fix_exec_type::NEW
228                                        | fix_exec_type::PENDING_NEW
229                                ) {
230                                    // These order events are already handled by the client
231                                    log::debug!(
232                                        "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
233                                    );
234                                } else if matches!(
235                                    exec_type,
236                                    fix_exec_type::CANCELED
237                                        | fix_exec_type::EXPIRED
238                                        | fix_exec_type::REPLACED
239                                ) {
240                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
241                                    let ts_init = clock.get_time_ns();
242                                    match convert_to_order_status_report(
243                                        &message, account_id, ts_init,
244                                    ) {
245                                        #[cfg(feature = "python")]
246                                        Ok(report) => Python::attach(|py| {
247                                            call_python(
248                                                py,
249                                                &handler,
250                                                report.into_py_any_unwrap(py),
251                                            );
252                                        }),
253                                        #[cfg(not(feature = "python"))]
254                                        Ok(_report) => {
255                                            log::debug!(
256                                                "Order status report handled (Python disabled)"
257                                            );
258                                        }
259                                        Err(e) => {
260                                            log::error!(
261                                                "Failed to parse FIX execution report: {e}"
262                                            );
263                                        }
264                                    }
265                                } else if exec_type == fix_exec_type::PARTIAL_FILL
266                                    || exec_type == fix_exec_type::FILL
267                                {
268                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
269                                    let ts_init = clock.get_time_ns();
270                                    match convert_to_fill_report(&message, account_id, ts_init) {
271                                        #[cfg(feature = "python")]
272                                        Ok(report) => Python::attach(|py| {
273                                            call_python(
274                                                py,
275                                                &handler,
276                                                report.into_py_any_unwrap(py),
277                                            );
278                                        }),
279                                        #[cfg(not(feature = "python"))]
280                                        Ok(_report) => {
281                                            log::debug!("Fill report handled (Python disabled)");
282                                        }
283                                        Err(e) => {
284                                            log::error!(
285                                                "Failed to parse FIX execution report: {e}"
286                                            );
287                                        }
288                                    }
289                                } else {
290                                    log::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
291                                }
292                            }
293                        }
294                        // These can be HEARTBEAT or TEST_REQUEST messages,
295                        // ideally we'd respond to these with a heartbeat
296                        // including tag 112 TestReqID.
297                        _ => log::trace!("Received unexpected {message:?}"),
298                    }
299                }
300            } else {
301                log::error!("Failed to parse FIX message");
302            }
303        });
304
305        let config = SocketConfig {
306            url: self.endpoint.clone(),
307            mode: Mode::Tls,
308            suffix: vec![FIX_DELIMITER],
309            message_handler: Some(handle_message),
310            heartbeat: None, // Using FIX heartbeats
311            reconnect_timeout_ms: Some(10000),
312            reconnect_delay_initial_ms: Some(5000),
313            reconnect_delay_max_ms: Some(30000),
314            reconnect_backoff_factor: Some(1.5),
315            reconnect_jitter_ms: Some(500),
316            reconnect_max_attempts: None,
317            connection_max_retries: None,
318            certs_dir: None,
319        };
320
321        let socket = match SocketClient::connect(
322            config, None, // post_connection
323            None, // post_reconnection
324            None, // post_disconnection
325        )
326        .await
327        {
328            Ok(socket) => socket,
329            Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
330        };
331
332        let writer_tx = socket.writer_tx.clone();
333
334        self.socket = Some(Arc::new(socket));
335
336        self.send_logon().await?;
337
338        // Create task to monitor connection and send logon after reconnect
339        let connected_clone = self.connected.clone();
340        let logged_on_clone = self.logged_on.clone();
341        let heartbeat_secs = self.heartbeat_secs;
342        let client_clone = self.clone();
343
344        self.processing_task = Some(Arc::new(get_runtime().spawn(async move {
345            log_task_started("maintain-fix-connection");
346
347            let mut last_logon_attempt = std::time::Instant::now()
348                .checked_sub(Duration::from_secs(10))
349                .unwrap();
350
351            loop {
352                tokio::time::sleep(Duration::from_millis(100)).await;
353
354                // Check if connected but not logged on
355                if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
356                {
357                    // Rate limit logon attempts
358                    if last_logon_attempt.elapsed() > Duration::from_secs(10) {
359                        log::info!("Connected without logon");
360                        last_logon_attempt = std::time::Instant::now();
361
362                        if let Err(e) = client_clone.send_logon().await {
363                            log::error!("Failed to send logon: {e}");
364                        }
365                    }
366                }
367            }
368        })));
369
370        let logged_on_clone = self.logged_on.clone();
371        let sender_comp_id = self.sender_comp_id.clone();
372        let target_comp_id = self.target_comp_id.clone();
373
374        self.heartbeat_task = Some(Arc::new(get_runtime().spawn(async move {
375            log_task_started("heartbeat");
376            log::debug!("Heartbeat at {heartbeat_secs}s intervals");
377
378            let interval = Duration::from_secs(heartbeat_secs);
379
380            loop {
381                if logged_on_clone.load(Ordering::SeqCst) {
382                    // Create new heartbeat message
383                    let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
384                    let now = chrono::Utc::now();
385                    let msg =
386                        FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
387
388                    if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
389                        log::error!("Failed to send heartbeat: {e}");
390                        break;
391                    }
392
393                    log::trace!("Sent heartbeat");
394                } else {
395                    // No longer logged on
396                    log::debug!("No longer logged on, stopping heartbeat task");
397                    break;
398                }
399
400                tokio::time::sleep(interval).await;
401            }
402
403            log_task_stopped("heartbeat");
404        })));
405
406        Ok(())
407    }
408
409    /// Closes the connection.
410    ///
411    /// # Errors
412    ///
413    /// Returns an error if logout or socket closure fails.
414    pub async fn close(&mut self) -> anyhow::Result<()> {
415        // Send logout message if connected
416        if self.is_logged_on()
417            && let Err(e) = self.send_logout("Normal logout").await
418        {
419            log::warn!("Failed to send logout message: {e}");
420        }
421
422        // Close socket
423        if let Some(socket) = &self.socket {
424            socket.close().await;
425        }
426
427        // Cancel processing task
428        if let Some(task) = self.processing_task.take() {
429            task.abort();
430        }
431
432        // Cancel heartbeat task
433        if let Some(task) = self.heartbeat_task.take() {
434            task.abort();
435        }
436
437        self.connected.store(false, Ordering::SeqCst);
438        self.logged_on.store(false, Ordering::SeqCst);
439
440        Ok(())
441    }
442
443    /// Send a logon message
444    async fn send_logon(&self) -> anyhow::Result<()> {
445        if self.socket.is_none() {
446            anyhow::bail!("Socket not connected".to_string());
447        }
448
449        // Reset sequence number
450        self.seq_num.store(1, Ordering::SeqCst);
451
452        let now = chrono::Utc::now();
453        let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
454        let passphrase = self.api_passphrase.clone();
455
456        let message = format!(
457            "{}{}{}{}",
458            timestamp, self.api_key, self.target_comp_id, passphrase
459        );
460
461        // Create signature
462        let decoded_secret = BASE64_STANDARD
463            .decode(&self.api_secret)
464            .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
465
466        let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
467        let tag = hmac::sign(&key, message.as_bytes());
468        let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
469
470        let logon_msg = FixMessage::create_logon(
471            1, // Always use 1 for new logon with reset
472            &self.sender_comp_id,
473            &self.target_comp_id,
474            self.heartbeat_secs,
475            &self.api_key,
476            &passphrase,
477            &encoded_signature,
478            &now,
479        );
480
481        if let Some(socket) = &self.socket {
482            log::info!("Logging on...");
483
484            match socket.send_bytes(logon_msg.to_bytes()).await {
485                Ok(()) => log::debug!("Sent logon message"),
486                Err(e) => log::error!("Error on logon: {e}"),
487            }
488        } else {
489            anyhow::bail!("Socket not connected".to_string());
490        }
491
492        let start = std::time::Instant::now();
493        while !self.is_logged_on() {
494            tokio::time::sleep(Duration::from_millis(100)).await;
495
496            if start.elapsed() > Duration::from_secs(10) {
497                anyhow::bail!("Logon timeout".to_string());
498            }
499        }
500
501        self.logged_on.store(true, Ordering::SeqCst);
502
503        Ok(())
504    }
505
506    /// Sends a logout message.
507    async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
508        if self.socket.is_none() {
509            anyhow::bail!("Socket not connected".to_string());
510        }
511
512        let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
513        let now = chrono::Utc::now();
514
515        let logout_msg = FixMessage::create_logout(
516            seq_num,
517            &self.sender_comp_id,
518            &self.target_comp_id,
519            Some(text),
520            &now,
521        );
522
523        if let Some(socket) = &self.socket {
524            match socket.send_bytes(logout_msg.to_bytes()).await {
525                Ok(()) => log::debug!("Sent logout message"),
526                Err(e) => log::error!("Error on logout: {e}"),
527            }
528        } else {
529            anyhow::bail!("Socket not connected".to_string());
530        }
531
532        Ok(())
533    }
534}