nautilus_coinbase_intx/fix/
client.rs1use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicUsize, Ordering},
29 },
30 time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::{
36 live::get_runtime,
37 logging::{log_task_started, log_task_stopped},
38};
39#[cfg(feature = "python")]
40use nautilus_core::python::{IntoPyObjectNautilusExt, call_python};
41use nautilus_core::{env::get_or_env_var, time::get_atomic_clock_realtime};
42use nautilus_model::identifiers::AccountId;
43use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
44#[cfg(feature = "python")]
45use pyo3::prelude::*;
46use tokio::task::JoinHandle;
47use tokio_tungstenite::tungstenite::stream::Mode;
48
49use super::{
50 messages::{FIX_DELIMITER, FixMessage},
51 parse::convert_to_order_status_report,
52};
53use crate::{
54 common::consts::COINBASE_INTX,
55 fix::{
56 messages::{fix_exec_type, fix_message_type, fix_tag},
57 parse::convert_to_fill_report,
58 },
59};
60
61#[cfg_attr(
62 feature = "python",
63 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.coinbase_intx")
64)]
65#[derive(Debug, Clone)]
66pub struct CoinbaseIntxFixClient {
67 endpoint: String,
68 api_key: String,
69 api_secret: String,
70 api_passphrase: String,
71 portfolio_id: String,
72 sender_comp_id: String,
73 target_comp_id: String,
74 socket: Option<Arc<SocketClient>>,
75 connected: Arc<AtomicBool>,
76 logged_on: Arc<AtomicBool>,
77 seq_num: Arc<AtomicUsize>,
78 received_seq_num: Arc<AtomicUsize>,
79 heartbeat_secs: u64,
80 processing_task: Option<Arc<JoinHandle<()>>>,
81 heartbeat_task: Option<Arc<JoinHandle<()>>>,
82}
83
84impl CoinbaseIntxFixClient {
85 pub fn new(
91 endpoint: Option<String>,
92 api_key: Option<String>,
93 api_secret: Option<String>,
94 api_passphrase: Option<String>,
95 portfolio_id: Option<String>,
96 ) -> anyhow::Result<Self> {
97 let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
98 let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
99 let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
100 let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
101 let portfolio_id = get_or_env_var(portfolio_id, "COINBASE_INTX_PORTFOLIO_ID")?;
102 let sender_comp_id = api_key.clone();
103 let target_comp_id = "CBINTLDC".to_string(); Ok(Self {
106 endpoint,
107 api_key,
108 api_secret,
109 api_passphrase,
110 portfolio_id,
111 sender_comp_id,
112 target_comp_id,
113 socket: None,
114 connected: Arc::new(AtomicBool::new(false)),
115 logged_on: Arc::new(AtomicBool::new(false)),
116 seq_num: Arc::new(AtomicUsize::new(1)),
117 received_seq_num: Arc::new(AtomicUsize::new(0)),
118 heartbeat_secs: 10, processing_task: None,
120 heartbeat_task: None,
121 })
122 }
123
124 pub fn from_env() -> anyhow::Result<Self> {
131 Self::new(None, None, None, None, None)
132 }
133
134 #[must_use]
136 pub const fn endpoint(&self) -> &str {
137 self.endpoint.as_str()
138 }
139
140 #[must_use]
142 pub const fn api_key(&self) -> &str {
143 self.api_key.as_str()
144 }
145
146 #[must_use]
148 pub fn api_key_masked(&self) -> String {
149 nautilus_core::string::mask_api_key(&self.api_key)
150 }
151
152 #[must_use]
154 pub const fn portfolio_id(&self) -> &str {
155 self.portfolio_id.as_str()
156 }
157
158 #[must_use]
160 pub const fn sender_comp_id(&self) -> &str {
161 self.sender_comp_id.as_str()
162 }
163
164 #[must_use]
166 pub const fn target_comp_id(&self) -> &str {
167 self.target_comp_id.as_str()
168 }
169
170 #[must_use]
172 pub fn is_connected(&self) -> bool {
173 self.connected.load(Ordering::SeqCst)
174 }
175
176 #[must_use]
178 pub fn is_logged_on(&self) -> bool {
179 self.logged_on.load(Ordering::SeqCst)
180 }
181
182 pub async fn connect(
192 &mut self,
193 #[cfg(feature = "python")] handler: Py<PyAny>,
194 #[cfg(not(feature = "python"))] _handler: (),
195 ) -> anyhow::Result<()> {
196 let logged_on = self.logged_on.clone();
197 let seq_num = self.seq_num.clone();
198 let received_seq_num = self.received_seq_num.clone();
199 let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
200
201 let handle_message = Arc::new(move |data: &[u8]| {
202 if let Ok(message) = FixMessage::parse(data) {
203 if let Some(msg_seq) = message.msg_seq_num() {
205 received_seq_num.store(msg_seq, Ordering::SeqCst);
206 }
207
208 if let Some(msg_type) = message.msg_type() {
210 match msg_type {
211 fix_message_type::LOGON => {
212 log::info!("Logon successful");
213 logged_on.store(true, Ordering::SeqCst);
214 }
215 fix_message_type::LOGOUT => {
216 log::info!("Received logout");
217 logged_on.store(false, Ordering::SeqCst);
218 }
219 fix_message_type::EXECUTION_REPORT => {
220 if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
221 if matches!(
222 exec_type,
223 fix_exec_type::REJECTED
224 | fix_exec_type::NEW
225 | fix_exec_type::PENDING_NEW
226 ) {
227 log::debug!(
229 "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
230 );
231 } else if matches!(
232 exec_type,
233 fix_exec_type::CANCELED
234 | fix_exec_type::EXPIRED
235 | fix_exec_type::REPLACED
236 ) {
237 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
239 match convert_to_order_status_report(
240 &message, account_id, ts_init,
241 ) {
242 #[cfg(feature = "python")]
243 Ok(report) => Python::attach(|py| {
244 call_python(
245 py,
246 &handler,
247 report.into_py_any_unwrap(py),
248 );
249 }),
250 #[cfg(not(feature = "python"))]
251 Ok(_report) => {
252 log::debug!(
253 "Order status report handled (Python disabled)"
254 );
255 }
256 Err(e) => {
257 log::error!(
258 "Failed to parse FIX execution report: {e}"
259 );
260 }
261 }
262 } else if exec_type == fix_exec_type::PARTIAL_FILL
263 || exec_type == fix_exec_type::FILL
264 {
265 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
267 match convert_to_fill_report(&message, account_id, ts_init) {
268 #[cfg(feature = "python")]
269 Ok(report) => Python::attach(|py| {
270 call_python(
271 py,
272 &handler,
273 report.into_py_any_unwrap(py),
274 );
275 }),
276 #[cfg(not(feature = "python"))]
277 Ok(_report) => {
278 log::debug!("Fill report handled (Python disabled)");
279 }
280 Err(e) => {
281 log::error!(
282 "Failed to parse FIX execution report: {e}"
283 );
284 }
285 }
286 } else {
287 log::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
288 }
289 }
290 }
291 _ => log::trace!("Received unexpected {message:?}"),
295 }
296 }
297 } else {
298 log::error!("Failed to parse FIX message");
299 }
300 });
301
302 let config = SocketConfig {
303 url: self.endpoint.clone(),
304 mode: Mode::Tls,
305 suffix: vec![FIX_DELIMITER],
306 message_handler: Some(handle_message),
307 heartbeat: None, reconnect_timeout_ms: Some(10000),
309 reconnect_delay_initial_ms: Some(5000),
310 reconnect_delay_max_ms: Some(30000),
311 reconnect_backoff_factor: Some(1.5),
312 reconnect_jitter_ms: Some(500),
313 reconnect_max_attempts: None,
314 connection_max_retries: None,
315 certs_dir: None,
316 };
317
318 let socket = match SocketClient::connect(
319 config, None, None, None, )
323 .await
324 {
325 Ok(socket) => socket,
326 Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
327 };
328
329 let writer_tx = socket.writer_tx.clone();
330
331 self.socket = Some(Arc::new(socket));
332
333 self.send_logon().await?;
334
335 let connected_clone = self.connected.clone();
337 let logged_on_clone = self.logged_on.clone();
338 let heartbeat_secs = self.heartbeat_secs;
339 let client_clone = self.clone();
340
341 self.processing_task = Some(Arc::new(get_runtime().spawn(async move {
342 log_task_started("maintain-fix-connection");
343
344 let mut last_logon_attempt = std::time::Instant::now()
345 .checked_sub(Duration::from_secs(10))
346 .unwrap();
347
348 loop {
349 tokio::time::sleep(Duration::from_millis(100)).await;
350
351 if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
353 {
354 if last_logon_attempt.elapsed() > Duration::from_secs(10) {
356 log::info!("Connected without logon");
357 last_logon_attempt = std::time::Instant::now();
358
359 if let Err(e) = client_clone.send_logon().await {
360 log::error!("Failed to send logon: {e}");
361 }
362 }
363 }
364 }
365 })));
366
367 let logged_on_clone = self.logged_on.clone();
368 let sender_comp_id = self.sender_comp_id.clone();
369 let target_comp_id = self.target_comp_id.clone();
370
371 self.heartbeat_task = Some(Arc::new(get_runtime().spawn(async move {
372 log_task_started("heartbeat");
373 log::debug!("Heartbeat at {heartbeat_secs}s intervals");
374
375 let interval = Duration::from_secs(heartbeat_secs);
376
377 loop {
378 if logged_on_clone.load(Ordering::SeqCst) {
379 let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
381 let now = chrono::Utc::now();
382 let msg =
383 FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
384
385 if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
386 log::error!("Failed to send heartbeat: {e}");
387 break;
388 }
389
390 log::trace!("Sent heartbeat");
391 } else {
392 log::debug!("No longer logged on, stopping heartbeat task");
394 break;
395 }
396
397 tokio::time::sleep(interval).await;
398 }
399
400 log_task_stopped("heartbeat");
401 })));
402
403 Ok(())
404 }
405
406 pub async fn close(&mut self) -> anyhow::Result<()> {
412 if self.is_logged_on()
414 && let Err(e) = self.send_logout("Normal logout").await
415 {
416 log::warn!("Failed to send logout message: {e}");
417 }
418
419 if let Some(socket) = &self.socket {
421 socket.close().await;
422 }
423
424 if let Some(task) = self.processing_task.take() {
426 task.abort();
427 }
428
429 if let Some(task) = self.heartbeat_task.take() {
431 task.abort();
432 }
433
434 self.connected.store(false, Ordering::SeqCst);
435 self.logged_on.store(false, Ordering::SeqCst);
436
437 Ok(())
438 }
439
440 async fn send_logon(&self) -> anyhow::Result<()> {
442 if self.socket.is_none() {
443 anyhow::bail!("Socket not connected".to_string());
444 }
445
446 self.seq_num.store(1, Ordering::SeqCst);
448
449 let now = chrono::Utc::now();
450 let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
451 let passphrase = self.api_passphrase.clone();
452
453 let message = format!(
454 "{}{}{}{}",
455 timestamp, self.api_key, self.target_comp_id, passphrase
456 );
457
458 let decoded_secret = BASE64_STANDARD
460 .decode(&self.api_secret)
461 .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
462
463 let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
464 let tag = hmac::sign(&key, message.as_bytes());
465 let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
466
467 let logon_msg = FixMessage::create_logon(
468 1, &self.sender_comp_id,
470 &self.target_comp_id,
471 self.heartbeat_secs,
472 &self.api_key,
473 &passphrase,
474 &encoded_signature,
475 &now,
476 );
477
478 if let Some(socket) = &self.socket {
479 log::info!("Logging on...");
480
481 match socket.send_bytes(logon_msg.to_bytes()).await {
482 Ok(()) => log::debug!("Sent logon message"),
483 Err(e) => log::error!("Error on logon: {e}"),
484 }
485 } else {
486 anyhow::bail!("Socket not connected".to_string());
487 }
488
489 let start = std::time::Instant::now();
490 while !self.is_logged_on() {
491 tokio::time::sleep(Duration::from_millis(100)).await;
492
493 if start.elapsed() > Duration::from_secs(10) {
494 anyhow::bail!("Logon timeout".to_string());
495 }
496 }
497
498 self.logged_on.store(true, Ordering::SeqCst);
499
500 Ok(())
501 }
502
503 async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
505 if self.socket.is_none() {
506 anyhow::bail!("Socket not connected".to_string());
507 }
508
509 let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
510 let now = chrono::Utc::now();
511
512 let logout_msg = FixMessage::create_logout(
513 seq_num,
514 &self.sender_comp_id,
515 &self.target_comp_id,
516 Some(text),
517 &now,
518 );
519
520 if let Some(socket) = &self.socket {
521 match socket.send_bytes(logout_msg.to_bytes()).await {
522 Ok(()) => log::debug!("Sent logout message"),
523 Err(e) => log::error!("Error on logout: {e}"),
524 }
525 } else {
526 anyhow::bail!("Socket not connected".to_string());
527 }
528
529 Ok(())
530 }
531}