nautilus_coinbase_intx/fix/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! FIX Client for the Coinbase International Drop Copy Endpoint.
17//!
18//! This implementation focuses specifically on processing execution reports
19//! via the FIX protocol, leveraging the existing `SocketClient` for TCP/TLS connectivity.
20//!
21//! # Warning
22//!
23//! **Not a full FIX engine**: This client supports only the Coinbase International Drop Copy
24//! endpoint and lacks general-purpose FIX functionality.
25use std::{
26    sync::{
27        Arc,
28        atomic::{AtomicBool, AtomicUsize, Ordering},
29    },
30    time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::{
36    live::get_runtime,
37    logging::{log_task_started, log_task_stopped},
38};
39#[cfg(feature = "python")]
40use nautilus_core::python::IntoPyObjectNautilusExt;
41use nautilus_core::{env::get_or_env_var, time::get_atomic_clock_realtime};
42use nautilus_model::identifiers::AccountId;
43use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
44#[cfg(feature = "python")]
45use pyo3::prelude::*;
46use tokio::task::JoinHandle;
47use tokio_tungstenite::tungstenite::stream::Mode;
48
49use super::{
50    messages::{FIX_DELIMITER, FixMessage},
51    parse::convert_to_order_status_report,
52};
53use crate::{
54    common::consts::COINBASE_INTX,
55    fix::{
56        messages::{fix_exec_type, fix_message_type, fix_tag},
57        parse::convert_to_fill_report,
58    },
59};
60
61#[cfg_attr(
62    feature = "python",
63    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.coinbase_intx")
64)]
65#[derive(Debug, Clone)]
66pub struct CoinbaseIntxFixClient {
67    endpoint: String,
68    api_key: String,
69    api_secret: String,
70    api_passphrase: String,
71    portfolio_id: String,
72    sender_comp_id: String,
73    target_comp_id: String,
74    socket: Option<Arc<SocketClient>>,
75    connected: Arc<AtomicBool>,
76    logged_on: Arc<AtomicBool>,
77    seq_num: Arc<AtomicUsize>,
78    received_seq_num: Arc<AtomicUsize>,
79    heartbeat_secs: u64,
80    processing_task: Option<Arc<JoinHandle<()>>>,
81    heartbeat_task: Option<Arc<JoinHandle<()>>>,
82}
83
84impl CoinbaseIntxFixClient {
85    /// Creates a new [`CoinbaseIntxFixClient`] instance.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if required environment variables or parameters are missing.
90    pub fn new(
91        endpoint: Option<String>,
92        api_key: Option<String>,
93        api_secret: Option<String>,
94        api_passphrase: Option<String>,
95        portfolio_id: Option<String>,
96    ) -> anyhow::Result<Self> {
97        let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
98        let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
99        let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
100        let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
101        let portfolio_id = get_or_env_var(portfolio_id, "COINBASE_INTX_PORTFOLIO_ID")?;
102        let sender_comp_id = api_key.clone();
103        let target_comp_id = "CBINTLDC".to_string(); // Drop Copy endpoint
104
105        Ok(Self {
106            endpoint,
107            api_key,
108            api_secret,
109            api_passphrase,
110            portfolio_id,
111            sender_comp_id,
112            target_comp_id,
113            socket: None,
114            connected: Arc::new(AtomicBool::new(false)),
115            logged_on: Arc::new(AtomicBool::new(false)),
116            seq_num: Arc::new(AtomicUsize::new(1)),
117            received_seq_num: Arc::new(AtomicUsize::new(0)),
118            heartbeat_secs: 10, // Default (probably no need to change)
119            processing_task: None,
120            heartbeat_task: None,
121        })
122    }
123
124    /// Creates a new authenticated [`CoinbaseIntxFixClient`] instance using
125    /// environment variables and the default Coinbase International FIX drop copy endpoint.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if required environment variables are not set.
130    pub fn from_env() -> anyhow::Result<Self> {
131        Self::new(None, None, None, None, None)
132    }
133
134    /// Returns the FIX endpoint being used by the client.
135    #[must_use]
136    pub const fn endpoint(&self) -> &str {
137        self.endpoint.as_str()
138    }
139
140    /// Returns the public API key being used by the client.
141    #[must_use]
142    pub const fn api_key(&self) -> &str {
143        self.api_key.as_str()
144    }
145
146    /// Returns a masked version of the API key for logging purposes.
147    #[must_use]
148    pub fn api_key_masked(&self) -> String {
149        nautilus_core::string::mask_api_key(&self.api_key)
150    }
151
152    /// Returns the Coinbase International portfolio ID being used by the client.
153    #[must_use]
154    pub const fn portfolio_id(&self) -> &str {
155        self.portfolio_id.as_str()
156    }
157
158    /// Returns the sender company ID being used by the client.
159    #[must_use]
160    pub const fn sender_comp_id(&self) -> &str {
161        self.sender_comp_id.as_str()
162    }
163
164    /// Returns the target company ID being used by the client.
165    #[must_use]
166    pub const fn target_comp_id(&self) -> &str {
167        self.target_comp_id.as_str()
168    }
169
170    /// Checks if the client is connected.
171    #[must_use]
172    pub fn is_connected(&self) -> bool {
173        self.connected.load(Ordering::SeqCst)
174    }
175
176    /// Checks if the client is logged on.
177    #[must_use]
178    pub fn is_logged_on(&self) -> bool {
179        self.logged_on.load(Ordering::SeqCst)
180    }
181
182    /// Connects to the Coinbase International FIX Drop Copy endpoint.
183    ///
184    /// # Panics
185    ///
186    /// Panics if time calculation or unwrap logic inside fails during logon retry setup.
187    ///
188    /// # Errors
189    ///
190    /// Returns an error if network connection or FIX logon fails.
191    pub async fn connect(
192        &mut self,
193        #[cfg(feature = "python")] handler: Py<PyAny>,
194        #[cfg(not(feature = "python"))] _handler: (),
195    ) -> anyhow::Result<()> {
196        let logged_on = self.logged_on.clone();
197        let seq_num = self.seq_num.clone();
198        let received_seq_num = self.received_seq_num.clone();
199        let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
200
201        let handle_message = Arc::new(move |data: &[u8]| {
202            if let Ok(message) = FixMessage::parse(data) {
203                // Update received sequence number
204                if let Some(msg_seq) = message.msg_seq_num() {
205                    received_seq_num.store(msg_seq, Ordering::SeqCst);
206                }
207
208                // Process message based on type
209                if let Some(msg_type) = message.msg_type() {
210                    match msg_type {
211                        fix_message_type::LOGON => {
212                            tracing::info!("Logon successful");
213                            logged_on.store(true, Ordering::SeqCst);
214                        }
215                        fix_message_type::LOGOUT => {
216                            tracing::info!("Received logout");
217                            logged_on.store(false, Ordering::SeqCst);
218                        }
219                        fix_message_type::EXECUTION_REPORT => {
220                            if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
221                                if matches!(
222                                    exec_type,
223                                    fix_exec_type::REJECTED
224                                        | fix_exec_type::NEW
225                                        | fix_exec_type::PENDING_NEW
226                                ) {
227                                    // These order events are already handled by the client
228                                    tracing::debug!(
229                                        "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
230                                    );
231                                } else if matches!(
232                                    exec_type,
233                                    fix_exec_type::CANCELED
234                                        | fix_exec_type::EXPIRED
235                                        | fix_exec_type::REPLACED
236                                ) {
237                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
238                                    let ts_init = clock.get_time_ns();
239                                    match convert_to_order_status_report(
240                                        &message, account_id, ts_init,
241                                    ) {
242                                        #[cfg(feature = "python")]
243                                        Ok(report) => Python::attach(|py| {
244                                            call_python(
245                                                py,
246                                                &handler,
247                                                report.into_py_any_unwrap(py),
248                                            );
249                                        }),
250                                        #[cfg(not(feature = "python"))]
251                                        Ok(_report) => {
252                                            tracing::debug!(
253                                                "Order status report handled (Python disabled)"
254                                            );
255                                        }
256                                        Err(e) => {
257                                            tracing::error!(
258                                                "Failed to parse FIX execution report: {e}"
259                                            );
260                                        }
261                                    }
262                                } else if exec_type == fix_exec_type::PARTIAL_FILL
263                                    || exec_type == fix_exec_type::FILL
264                                {
265                                    let clock = get_atomic_clock_realtime(); // TODO: Optimize
266                                    let ts_init = clock.get_time_ns();
267                                    match convert_to_fill_report(&message, account_id, ts_init) {
268                                        #[cfg(feature = "python")]
269                                        Ok(report) => Python::attach(|py| {
270                                            call_python(
271                                                py,
272                                                &handler,
273                                                report.into_py_any_unwrap(py),
274                                            );
275                                        }),
276                                        #[cfg(not(feature = "python"))]
277                                        Ok(_report) => {
278                                            tracing::debug!(
279                                                "Fill report handled (Python disabled)"
280                                            );
281                                        }
282                                        Err(e) => {
283                                            tracing::error!(
284                                                "Failed to parse FIX execution report: {e}"
285                                            );
286                                        }
287                                    }
288                                } else {
289                                    tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
290                                }
291                            }
292                        }
293                        // These can be HEARTBEAT or TEST_REQUEST messages,
294                        // ideally we'd respond to these with a heartbeat
295                        // including tag 112 TestReqID.
296                        _ => tracing::trace!("Received unexpected {message:?}"),
297                    }
298                }
299            } else {
300                tracing::error!("Failed to parse FIX message");
301            }
302        });
303
304        let config = SocketConfig {
305            url: self.endpoint.clone(),
306            mode: Mode::Tls,
307            suffix: vec![FIX_DELIMITER],
308            message_handler: Some(handle_message),
309            heartbeat: None, // Using FIX heartbeats
310            reconnect_timeout_ms: Some(10000),
311            reconnect_delay_initial_ms: Some(5000),
312            reconnect_delay_max_ms: Some(30000),
313            reconnect_backoff_factor: Some(1.5),
314            reconnect_jitter_ms: Some(500),
315            reconnect_max_attempts: None,
316            connection_max_retries: None,
317            certs_dir: None,
318        };
319
320        let socket = match SocketClient::connect(
321            config, None, // post_connection
322            None, // post_reconnection
323            None, // post_disconnection
324        )
325        .await
326        {
327            Ok(socket) => socket,
328            Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
329        };
330
331        let writer_tx = socket.writer_tx.clone();
332
333        self.socket = Some(Arc::new(socket));
334
335        self.send_logon().await?;
336
337        // Create task to monitor connection and send logon after reconnect
338        let connected_clone = self.connected.clone();
339        let logged_on_clone = self.logged_on.clone();
340        let heartbeat_secs = self.heartbeat_secs;
341        let client_clone = self.clone();
342
343        self.processing_task = Some(Arc::new(get_runtime().spawn(async move {
344            log_task_started("maintain-fix-connection");
345
346            let mut last_logon_attempt = std::time::Instant::now()
347                .checked_sub(Duration::from_secs(10))
348                .unwrap();
349
350            loop {
351                tokio::time::sleep(Duration::from_millis(100)).await;
352
353                // Check if connected but not logged on
354                if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
355                {
356                    // Rate limit logon attempts
357                    if last_logon_attempt.elapsed() > Duration::from_secs(10) {
358                        tracing::info!("Connected without logon");
359                        last_logon_attempt = std::time::Instant::now();
360
361                        if let Err(e) = client_clone.send_logon().await {
362                            tracing::error!("Failed to send logon: {e}");
363                        }
364                    }
365                }
366            }
367        })));
368
369        let logged_on_clone = self.logged_on.clone();
370        let sender_comp_id = self.sender_comp_id.clone();
371        let target_comp_id = self.target_comp_id.clone();
372
373        self.heartbeat_task = Some(Arc::new(get_runtime().spawn(async move {
374            log_task_started("heartbeat");
375            tracing::debug!("Heartbeat at {heartbeat_secs}s intervals");
376
377            let interval = Duration::from_secs(heartbeat_secs);
378
379            loop {
380                if logged_on_clone.load(Ordering::SeqCst) {
381                    // Create new heartbeat message
382                    let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
383                    let now = chrono::Utc::now();
384                    let msg =
385                        FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
386
387                    if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
388                        tracing::error!("Failed to send heartbeat: {e}");
389                        break;
390                    }
391
392                    tracing::trace!("Sent heartbeat");
393                } else {
394                    // No longer logged on
395                    tracing::debug!("No longer logged on, stopping heartbeat task");
396                    break;
397                }
398
399                tokio::time::sleep(interval).await;
400            }
401
402            log_task_stopped("heartbeat");
403        })));
404
405        Ok(())
406    }
407
408    /// Closes the connection.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if logout or socket closure fails.
413    pub async fn close(&mut self) -> anyhow::Result<()> {
414        // Send logout message if connected
415        if self.is_logged_on()
416            && let Err(e) = self.send_logout("Normal logout").await
417        {
418            tracing::warn!("Failed to send logout message: {e}");
419        }
420
421        // Close socket
422        if let Some(socket) = &self.socket {
423            socket.close().await;
424        }
425
426        // Cancel processing task
427        if let Some(task) = self.processing_task.take() {
428            task.abort();
429        }
430
431        // Cancel heartbeat task
432        if let Some(task) = self.heartbeat_task.take() {
433            task.abort();
434        }
435
436        self.connected.store(false, Ordering::SeqCst);
437        self.logged_on.store(false, Ordering::SeqCst);
438
439        Ok(())
440    }
441
442    /// Send a logon message
443    async fn send_logon(&self) -> anyhow::Result<()> {
444        if self.socket.is_none() {
445            anyhow::bail!("Socket not connected".to_string());
446        }
447
448        // Reset sequence number
449        self.seq_num.store(1, Ordering::SeqCst);
450
451        let now = chrono::Utc::now();
452        let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
453        let passphrase = self.api_passphrase.clone();
454
455        let message = format!(
456            "{}{}{}{}",
457            timestamp, self.api_key, self.target_comp_id, passphrase
458        );
459
460        // Create signature
461        let decoded_secret = BASE64_STANDARD
462            .decode(&self.api_secret)
463            .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
464
465        let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
466        let tag = hmac::sign(&key, message.as_bytes());
467        let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
468
469        let logon_msg = FixMessage::create_logon(
470            1, // Always use 1 for new logon with reset
471            &self.sender_comp_id,
472            &self.target_comp_id,
473            self.heartbeat_secs,
474            &self.api_key,
475            &passphrase,
476            &encoded_signature,
477            &now,
478        );
479
480        if let Some(socket) = &self.socket {
481            tracing::info!("Logging on...");
482
483            match socket.send_bytes(logon_msg.to_bytes()).await {
484                Ok(()) => tracing::debug!("Sent logon message"),
485                Err(e) => tracing::error!("Error on logon: {e}"),
486            }
487        } else {
488            anyhow::bail!("Socket not connected".to_string());
489        }
490
491        let start = std::time::Instant::now();
492        while !self.is_logged_on() {
493            tokio::time::sleep(Duration::from_millis(100)).await;
494
495            if start.elapsed() > Duration::from_secs(10) {
496                anyhow::bail!("Logon timeout".to_string());
497            }
498        }
499
500        self.logged_on.store(true, Ordering::SeqCst);
501
502        Ok(())
503    }
504
505    /// Sends a logout message.
506    async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
507        if self.socket.is_none() {
508            anyhow::bail!("Socket not connected".to_string());
509        }
510
511        let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
512        let now = chrono::Utc::now();
513
514        let logout_msg = FixMessage::create_logout(
515            seq_num,
516            &self.sender_comp_id,
517            &self.target_comp_id,
518            Some(text),
519            &now,
520        );
521
522        if let Some(socket) = &self.socket {
523            match socket.send_bytes(logout_msg.to_bytes()).await {
524                Ok(()) => tracing::debug!("Sent logout message"),
525                Err(e) => tracing::error!("Error on logout: {e}"),
526            }
527        } else {
528            anyhow::bail!("Socket not connected".to_string());
529        }
530
531        Ok(())
532    }
533}
534
535// Can't be moved to core because we don't want to depend on tracing there
536#[cfg(feature = "python")]
537pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
538    if let Err(e) = callback.call1(py, (py_obj,)) {
539        tracing::error!("Error calling Python: {e}");
540    }
541}