nautilus_coinbase_intx/fix/
client.rs1use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicUsize, Ordering},
29 },
30 time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::{
36 live::get_runtime,
37 logging::{log_task_started, log_task_stopped},
38};
39#[cfg(feature = "python")]
40use nautilus_core::python::{IntoPyObjectNautilusExt, call_python};
41use nautilus_core::{env::get_or_env_var, time::get_atomic_clock_realtime};
42use nautilus_model::identifiers::AccountId;
43use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
44#[cfg(feature = "python")]
45use pyo3::prelude::*;
46use tokio::task::JoinHandle;
47use tokio_tungstenite::tungstenite::stream::Mode;
48
49use super::{
50 messages::{FIX_DELIMITER, FixMessage},
51 parse::convert_to_order_status_report,
52};
53use crate::{
54 common::consts::COINBASE_INTX,
55 fix::{
56 messages::{fix_exec_type, fix_message_type, fix_tag},
57 parse::convert_to_fill_report,
58 },
59};
60
61#[cfg_attr(
62 feature = "python",
63 pyo3::pyclass(
64 module = "nautilus_trader.core.nautilus_pyo3.coinbase_intx",
65 from_py_object
66 )
67)]
68#[derive(Debug, Clone)]
69pub struct CoinbaseIntxFixClient {
70 endpoint: String,
71 api_key: String,
72 api_secret: String,
73 api_passphrase: String,
74 portfolio_id: String,
75 sender_comp_id: String,
76 target_comp_id: String,
77 socket: Option<Arc<SocketClient>>,
78 connected: Arc<AtomicBool>,
79 logged_on: Arc<AtomicBool>,
80 seq_num: Arc<AtomicUsize>,
81 received_seq_num: Arc<AtomicUsize>,
82 heartbeat_secs: u64,
83 processing_task: Option<Arc<JoinHandle<()>>>,
84 heartbeat_task: Option<Arc<JoinHandle<()>>>,
85}
86
87impl CoinbaseIntxFixClient {
88 pub fn new(
94 endpoint: Option<String>,
95 api_key: Option<String>,
96 api_secret: Option<String>,
97 api_passphrase: Option<String>,
98 portfolio_id: Option<String>,
99 ) -> anyhow::Result<Self> {
100 let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
101 let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
102 let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
103 let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
104 let portfolio_id = get_or_env_var(portfolio_id, "COINBASE_INTX_PORTFOLIO_ID")?;
105 let sender_comp_id = api_key.clone();
106 let target_comp_id = "CBINTLDC".to_string(); Ok(Self {
109 endpoint,
110 api_key,
111 api_secret,
112 api_passphrase,
113 portfolio_id,
114 sender_comp_id,
115 target_comp_id,
116 socket: None,
117 connected: Arc::new(AtomicBool::new(false)),
118 logged_on: Arc::new(AtomicBool::new(false)),
119 seq_num: Arc::new(AtomicUsize::new(1)),
120 received_seq_num: Arc::new(AtomicUsize::new(0)),
121 heartbeat_secs: 10, processing_task: None,
123 heartbeat_task: None,
124 })
125 }
126
127 pub fn from_env() -> anyhow::Result<Self> {
134 Self::new(None, None, None, None, None)
135 }
136
137 #[must_use]
139 pub const fn endpoint(&self) -> &str {
140 self.endpoint.as_str()
141 }
142
143 #[must_use]
145 pub const fn api_key(&self) -> &str {
146 self.api_key.as_str()
147 }
148
149 #[must_use]
151 pub fn api_key_masked(&self) -> String {
152 nautilus_core::string::mask_api_key(&self.api_key)
153 }
154
155 #[must_use]
157 pub const fn portfolio_id(&self) -> &str {
158 self.portfolio_id.as_str()
159 }
160
161 #[must_use]
163 pub const fn sender_comp_id(&self) -> &str {
164 self.sender_comp_id.as_str()
165 }
166
167 #[must_use]
169 pub const fn target_comp_id(&self) -> &str {
170 self.target_comp_id.as_str()
171 }
172
173 #[must_use]
175 pub fn is_connected(&self) -> bool {
176 self.connected.load(Ordering::SeqCst)
177 }
178
179 #[must_use]
181 pub fn is_logged_on(&self) -> bool {
182 self.logged_on.load(Ordering::SeqCst)
183 }
184
185 pub async fn connect(
195 &mut self,
196 #[cfg(feature = "python")] handler: Py<PyAny>,
197 #[cfg(not(feature = "python"))] _handler: (),
198 ) -> anyhow::Result<()> {
199 let logged_on = self.logged_on.clone();
200 let seq_num = self.seq_num.clone();
201 let received_seq_num = self.received_seq_num.clone();
202 let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
203
204 let handle_message = Arc::new(move |data: &[u8]| {
205 if let Ok(message) = FixMessage::parse(data) {
206 if let Some(msg_seq) = message.msg_seq_num() {
208 received_seq_num.store(msg_seq, Ordering::SeqCst);
209 }
210
211 if let Some(msg_type) = message.msg_type() {
213 match msg_type {
214 fix_message_type::LOGON => {
215 log::info!("Logon successful");
216 logged_on.store(true, Ordering::SeqCst);
217 }
218 fix_message_type::LOGOUT => {
219 log::info!("Received logout");
220 logged_on.store(false, Ordering::SeqCst);
221 }
222 fix_message_type::EXECUTION_REPORT => {
223 if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
224 if matches!(
225 exec_type,
226 fix_exec_type::REJECTED
227 | fix_exec_type::NEW
228 | fix_exec_type::PENDING_NEW
229 ) {
230 log::debug!(
232 "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
233 );
234 } else if matches!(
235 exec_type,
236 fix_exec_type::CANCELED
237 | fix_exec_type::EXPIRED
238 | fix_exec_type::REPLACED
239 ) {
240 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
242 match convert_to_order_status_report(
243 &message, account_id, ts_init,
244 ) {
245 #[cfg(feature = "python")]
246 Ok(report) => Python::attach(|py| {
247 call_python(
248 py,
249 &handler,
250 report.into_py_any_unwrap(py),
251 );
252 }),
253 #[cfg(not(feature = "python"))]
254 Ok(_report) => {
255 log::debug!(
256 "Order status report handled (Python disabled)"
257 );
258 }
259 Err(e) => {
260 log::error!(
261 "Failed to parse FIX execution report: {e}"
262 );
263 }
264 }
265 } else if exec_type == fix_exec_type::PARTIAL_FILL
266 || exec_type == fix_exec_type::FILL
267 {
268 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
270 match convert_to_fill_report(&message, account_id, ts_init) {
271 #[cfg(feature = "python")]
272 Ok(report) => Python::attach(|py| {
273 call_python(
274 py,
275 &handler,
276 report.into_py_any_unwrap(py),
277 );
278 }),
279 #[cfg(not(feature = "python"))]
280 Ok(_report) => {
281 log::debug!("Fill report handled (Python disabled)");
282 }
283 Err(e) => {
284 log::error!(
285 "Failed to parse FIX execution report: {e}"
286 );
287 }
288 }
289 } else {
290 log::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
291 }
292 }
293 }
294 _ => log::trace!("Received unexpected {message:?}"),
298 }
299 }
300 } else {
301 log::error!("Failed to parse FIX message");
302 }
303 });
304
305 let config = SocketConfig {
306 url: self.endpoint.clone(),
307 mode: Mode::Tls,
308 suffix: vec![FIX_DELIMITER],
309 message_handler: Some(handle_message),
310 heartbeat: None, reconnect_timeout_ms: Some(10000),
312 reconnect_delay_initial_ms: Some(5000),
313 reconnect_delay_max_ms: Some(30000),
314 reconnect_backoff_factor: Some(1.5),
315 reconnect_jitter_ms: Some(500),
316 reconnect_max_attempts: None,
317 connection_max_retries: None,
318 certs_dir: None,
319 };
320
321 let socket = match SocketClient::connect(
322 config, None, None, None, )
326 .await
327 {
328 Ok(socket) => socket,
329 Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
330 };
331
332 let writer_tx = socket.writer_tx.clone();
333
334 self.socket = Some(Arc::new(socket));
335
336 self.send_logon().await?;
337
338 let connected_clone = self.connected.clone();
340 let logged_on_clone = self.logged_on.clone();
341 let heartbeat_secs = self.heartbeat_secs;
342 let client_clone = self.clone();
343
344 self.processing_task = Some(Arc::new(get_runtime().spawn(async move {
345 log_task_started("maintain-fix-connection");
346
347 let mut last_logon_attempt = std::time::Instant::now()
348 .checked_sub(Duration::from_secs(10))
349 .unwrap();
350
351 loop {
352 tokio::time::sleep(Duration::from_millis(100)).await;
353
354 if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
356 {
357 if last_logon_attempt.elapsed() > Duration::from_secs(10) {
359 log::info!("Connected without logon");
360 last_logon_attempt = std::time::Instant::now();
361
362 if let Err(e) = client_clone.send_logon().await {
363 log::error!("Failed to send logon: {e}");
364 }
365 }
366 }
367 }
368 })));
369
370 let logged_on_clone = self.logged_on.clone();
371 let sender_comp_id = self.sender_comp_id.clone();
372 let target_comp_id = self.target_comp_id.clone();
373
374 self.heartbeat_task = Some(Arc::new(get_runtime().spawn(async move {
375 log_task_started("heartbeat");
376 log::debug!("Heartbeat at {heartbeat_secs}s intervals");
377
378 let interval = Duration::from_secs(heartbeat_secs);
379
380 loop {
381 if logged_on_clone.load(Ordering::SeqCst) {
382 let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
384 let now = chrono::Utc::now();
385 let msg =
386 FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
387
388 if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
389 log::error!("Failed to send heartbeat: {e}");
390 break;
391 }
392
393 log::trace!("Sent heartbeat");
394 } else {
395 log::debug!("No longer logged on, stopping heartbeat task");
397 break;
398 }
399
400 tokio::time::sleep(interval).await;
401 }
402
403 log_task_stopped("heartbeat");
404 })));
405
406 Ok(())
407 }
408
409 pub async fn close(&mut self) -> anyhow::Result<()> {
415 if self.is_logged_on()
417 && let Err(e) = self.send_logout("Normal logout").await
418 {
419 log::warn!("Failed to send logout message: {e}");
420 }
421
422 if let Some(socket) = &self.socket {
424 socket.close().await;
425 }
426
427 if let Some(task) = self.processing_task.take() {
429 task.abort();
430 }
431
432 if let Some(task) = self.heartbeat_task.take() {
434 task.abort();
435 }
436
437 self.connected.store(false, Ordering::SeqCst);
438 self.logged_on.store(false, Ordering::SeqCst);
439
440 Ok(())
441 }
442
443 async fn send_logon(&self) -> anyhow::Result<()> {
445 if self.socket.is_none() {
446 anyhow::bail!("Socket not connected".to_string());
447 }
448
449 self.seq_num.store(1, Ordering::SeqCst);
451
452 let now = chrono::Utc::now();
453 let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
454 let passphrase = self.api_passphrase.clone();
455
456 let message = format!(
457 "{}{}{}{}",
458 timestamp, self.api_key, self.target_comp_id, passphrase
459 );
460
461 let decoded_secret = BASE64_STANDARD
463 .decode(&self.api_secret)
464 .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
465
466 let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
467 let tag = hmac::sign(&key, message.as_bytes());
468 let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
469
470 let logon_msg = FixMessage::create_logon(
471 1, &self.sender_comp_id,
473 &self.target_comp_id,
474 self.heartbeat_secs,
475 &self.api_key,
476 &passphrase,
477 &encoded_signature,
478 &now,
479 );
480
481 if let Some(socket) = &self.socket {
482 log::info!("Logging on...");
483
484 match socket.send_bytes(logon_msg.to_bytes()).await {
485 Ok(()) => log::debug!("Sent logon message"),
486 Err(e) => log::error!("Error on logon: {e}"),
487 }
488 } else {
489 anyhow::bail!("Socket not connected".to_string());
490 }
491
492 let start = std::time::Instant::now();
493 while !self.is_logged_on() {
494 tokio::time::sleep(Duration::from_millis(100)).await;
495
496 if start.elapsed() > Duration::from_secs(10) {
497 anyhow::bail!("Logon timeout".to_string());
498 }
499 }
500
501 self.logged_on.store(true, Ordering::SeqCst);
502
503 Ok(())
504 }
505
506 async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
508 if self.socket.is_none() {
509 anyhow::bail!("Socket not connected".to_string());
510 }
511
512 let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
513 let now = chrono::Utc::now();
514
515 let logout_msg = FixMessage::create_logout(
516 seq_num,
517 &self.sender_comp_id,
518 &self.target_comp_id,
519 Some(text),
520 &now,
521 );
522
523 if let Some(socket) = &self.socket {
524 match socket.send_bytes(logout_msg.to_bytes()).await {
525 Ok(()) => log::debug!("Sent logout message"),
526 Err(e) => log::error!("Error on logout: {e}"),
527 }
528 } else {
529 anyhow::bail!("Socket not connected".to_string());
530 }
531
532 Ok(())
533 }
534}