nautilus_coinbase_intx/fix/
client.rs1use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicUsize, Ordering},
29 },
30 time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::{
36 live::get_runtime,
37 logging::{log_task_started, log_task_stopped},
38};
39#[cfg(feature = "python")]
40use nautilus_core::python::IntoPyObjectNautilusExt;
41use nautilus_core::{env::get_or_env_var, time::get_atomic_clock_realtime};
42use nautilus_model::identifiers::AccountId;
43use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
44#[cfg(feature = "python")]
45use pyo3::prelude::*;
46use tokio::task::JoinHandle;
47use tokio_tungstenite::tungstenite::stream::Mode;
48
49use super::{
50 messages::{FIX_DELIMITER, FixMessage},
51 parse::convert_to_order_status_report,
52};
53use crate::{
54 common::consts::COINBASE_INTX,
55 fix::{
56 messages::{fix_exec_type, fix_message_type, fix_tag},
57 parse::convert_to_fill_report,
58 },
59};
60
61#[cfg_attr(
62 feature = "python",
63 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.coinbase_intx")
64)]
65#[derive(Debug, Clone)]
66pub struct CoinbaseIntxFixClient {
67 endpoint: String,
68 api_key: String,
69 api_secret: String,
70 api_passphrase: String,
71 portfolio_id: String,
72 sender_comp_id: String,
73 target_comp_id: String,
74 socket: Option<Arc<SocketClient>>,
75 connected: Arc<AtomicBool>,
76 logged_on: Arc<AtomicBool>,
77 seq_num: Arc<AtomicUsize>,
78 received_seq_num: Arc<AtomicUsize>,
79 heartbeat_secs: u64,
80 processing_task: Option<Arc<JoinHandle<()>>>,
81 heartbeat_task: Option<Arc<JoinHandle<()>>>,
82}
83
84impl CoinbaseIntxFixClient {
85 pub fn new(
91 endpoint: Option<String>,
92 api_key: Option<String>,
93 api_secret: Option<String>,
94 api_passphrase: Option<String>,
95 portfolio_id: Option<String>,
96 ) -> anyhow::Result<Self> {
97 let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
98 let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
99 let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
100 let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
101 let portfolio_id = get_or_env_var(portfolio_id, "COINBASE_INTX_PORTFOLIO_ID")?;
102 let sender_comp_id = api_key.clone();
103 let target_comp_id = "CBINTLDC".to_string(); Ok(Self {
106 endpoint,
107 api_key,
108 api_secret,
109 api_passphrase,
110 portfolio_id,
111 sender_comp_id,
112 target_comp_id,
113 socket: None,
114 connected: Arc::new(AtomicBool::new(false)),
115 logged_on: Arc::new(AtomicBool::new(false)),
116 seq_num: Arc::new(AtomicUsize::new(1)),
117 received_seq_num: Arc::new(AtomicUsize::new(0)),
118 heartbeat_secs: 10, processing_task: None,
120 heartbeat_task: None,
121 })
122 }
123
124 pub fn from_env() -> anyhow::Result<Self> {
131 Self::new(None, None, None, None, None)
132 }
133
134 #[must_use]
136 pub const fn endpoint(&self) -> &str {
137 self.endpoint.as_str()
138 }
139
140 #[must_use]
142 pub const fn api_key(&self) -> &str {
143 self.api_key.as_str()
144 }
145
146 #[must_use]
148 pub fn api_key_masked(&self) -> String {
149 nautilus_core::string::mask_api_key(&self.api_key)
150 }
151
152 #[must_use]
154 pub const fn portfolio_id(&self) -> &str {
155 self.portfolio_id.as_str()
156 }
157
158 #[must_use]
160 pub const fn sender_comp_id(&self) -> &str {
161 self.sender_comp_id.as_str()
162 }
163
164 #[must_use]
166 pub const fn target_comp_id(&self) -> &str {
167 self.target_comp_id.as_str()
168 }
169
170 #[must_use]
172 pub fn is_connected(&self) -> bool {
173 self.connected.load(Ordering::SeqCst)
174 }
175
176 #[must_use]
178 pub fn is_logged_on(&self) -> bool {
179 self.logged_on.load(Ordering::SeqCst)
180 }
181
182 pub async fn connect(
192 &mut self,
193 #[cfg(feature = "python")] handler: Py<PyAny>,
194 #[cfg(not(feature = "python"))] _handler: (),
195 ) -> anyhow::Result<()> {
196 let logged_on = self.logged_on.clone();
197 let seq_num = self.seq_num.clone();
198 let received_seq_num = self.received_seq_num.clone();
199 let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
200
201 let handle_message = Arc::new(move |data: &[u8]| {
202 if let Ok(message) = FixMessage::parse(data) {
203 if let Some(msg_seq) = message.msg_seq_num() {
205 received_seq_num.store(msg_seq, Ordering::SeqCst);
206 }
207
208 if let Some(msg_type) = message.msg_type() {
210 match msg_type {
211 fix_message_type::LOGON => {
212 tracing::info!("Logon successful");
213 logged_on.store(true, Ordering::SeqCst);
214 }
215 fix_message_type::LOGOUT => {
216 tracing::info!("Received logout");
217 logged_on.store(false, Ordering::SeqCst);
218 }
219 fix_message_type::EXECUTION_REPORT => {
220 if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
221 if matches!(
222 exec_type,
223 fix_exec_type::REJECTED
224 | fix_exec_type::NEW
225 | fix_exec_type::PENDING_NEW
226 ) {
227 tracing::debug!(
229 "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
230 );
231 } else if matches!(
232 exec_type,
233 fix_exec_type::CANCELED
234 | fix_exec_type::EXPIRED
235 | fix_exec_type::REPLACED
236 ) {
237 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
239 match convert_to_order_status_report(
240 &message, account_id, ts_init,
241 ) {
242 #[cfg(feature = "python")]
243 Ok(report) => Python::attach(|py| {
244 call_python(
245 py,
246 &handler,
247 report.into_py_any_unwrap(py),
248 );
249 }),
250 #[cfg(not(feature = "python"))]
251 Ok(_report) => {
252 tracing::debug!(
253 "Order status report handled (Python disabled)"
254 );
255 }
256 Err(e) => {
257 tracing::error!(
258 "Failed to parse FIX execution report: {e}"
259 );
260 }
261 }
262 } else if exec_type == fix_exec_type::PARTIAL_FILL
263 || exec_type == fix_exec_type::FILL
264 {
265 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
267 match convert_to_fill_report(&message, account_id, ts_init) {
268 #[cfg(feature = "python")]
269 Ok(report) => Python::attach(|py| {
270 call_python(
271 py,
272 &handler,
273 report.into_py_any_unwrap(py),
274 );
275 }),
276 #[cfg(not(feature = "python"))]
277 Ok(_report) => {
278 tracing::debug!(
279 "Fill report handled (Python disabled)"
280 );
281 }
282 Err(e) => {
283 tracing::error!(
284 "Failed to parse FIX execution report: {e}"
285 );
286 }
287 }
288 } else {
289 tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
290 }
291 }
292 }
293 _ => tracing::trace!("Received unexpected {message:?}"),
297 }
298 }
299 } else {
300 tracing::error!("Failed to parse FIX message");
301 }
302 });
303
304 let config = SocketConfig {
305 url: self.endpoint.clone(),
306 mode: Mode::Tls,
307 suffix: vec![FIX_DELIMITER],
308 message_handler: Some(handle_message),
309 heartbeat: None, reconnect_timeout_ms: Some(10000),
311 reconnect_delay_initial_ms: Some(5000),
312 reconnect_delay_max_ms: Some(30000),
313 reconnect_backoff_factor: Some(1.5),
314 reconnect_jitter_ms: Some(500),
315 reconnect_max_attempts: None,
316 connection_max_retries: None,
317 certs_dir: None,
318 };
319
320 let socket = match SocketClient::connect(
321 config, None, None, None, )
325 .await
326 {
327 Ok(socket) => socket,
328 Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
329 };
330
331 let writer_tx = socket.writer_tx.clone();
332
333 self.socket = Some(Arc::new(socket));
334
335 self.send_logon().await?;
336
337 let connected_clone = self.connected.clone();
339 let logged_on_clone = self.logged_on.clone();
340 let heartbeat_secs = self.heartbeat_secs;
341 let client_clone = self.clone();
342
343 self.processing_task = Some(Arc::new(get_runtime().spawn(async move {
344 log_task_started("maintain-fix-connection");
345
346 let mut last_logon_attempt = std::time::Instant::now()
347 .checked_sub(Duration::from_secs(10))
348 .unwrap();
349
350 loop {
351 tokio::time::sleep(Duration::from_millis(100)).await;
352
353 if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
355 {
356 if last_logon_attempt.elapsed() > Duration::from_secs(10) {
358 tracing::info!("Connected without logon");
359 last_logon_attempt = std::time::Instant::now();
360
361 if let Err(e) = client_clone.send_logon().await {
362 tracing::error!("Failed to send logon: {e}");
363 }
364 }
365 }
366 }
367 })));
368
369 let logged_on_clone = self.logged_on.clone();
370 let sender_comp_id = self.sender_comp_id.clone();
371 let target_comp_id = self.target_comp_id.clone();
372
373 self.heartbeat_task = Some(Arc::new(get_runtime().spawn(async move {
374 log_task_started("heartbeat");
375 tracing::debug!("Heartbeat at {heartbeat_secs}s intervals");
376
377 let interval = Duration::from_secs(heartbeat_secs);
378
379 loop {
380 if logged_on_clone.load(Ordering::SeqCst) {
381 let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
383 let now = chrono::Utc::now();
384 let msg =
385 FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
386
387 if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
388 tracing::error!("Failed to send heartbeat: {e}");
389 break;
390 }
391
392 tracing::trace!("Sent heartbeat");
393 } else {
394 tracing::debug!("No longer logged on, stopping heartbeat task");
396 break;
397 }
398
399 tokio::time::sleep(interval).await;
400 }
401
402 log_task_stopped("heartbeat");
403 })));
404
405 Ok(())
406 }
407
408 pub async fn close(&mut self) -> anyhow::Result<()> {
414 if self.is_logged_on()
416 && let Err(e) = self.send_logout("Normal logout").await
417 {
418 tracing::warn!("Failed to send logout message: {e}");
419 }
420
421 if let Some(socket) = &self.socket {
423 socket.close().await;
424 }
425
426 if let Some(task) = self.processing_task.take() {
428 task.abort();
429 }
430
431 if let Some(task) = self.heartbeat_task.take() {
433 task.abort();
434 }
435
436 self.connected.store(false, Ordering::SeqCst);
437 self.logged_on.store(false, Ordering::SeqCst);
438
439 Ok(())
440 }
441
442 async fn send_logon(&self) -> anyhow::Result<()> {
444 if self.socket.is_none() {
445 anyhow::bail!("Socket not connected".to_string());
446 }
447
448 self.seq_num.store(1, Ordering::SeqCst);
450
451 let now = chrono::Utc::now();
452 let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
453 let passphrase = self.api_passphrase.clone();
454
455 let message = format!(
456 "{}{}{}{}",
457 timestamp, self.api_key, self.target_comp_id, passphrase
458 );
459
460 let decoded_secret = BASE64_STANDARD
462 .decode(&self.api_secret)
463 .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
464
465 let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
466 let tag = hmac::sign(&key, message.as_bytes());
467 let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
468
469 let logon_msg = FixMessage::create_logon(
470 1, &self.sender_comp_id,
472 &self.target_comp_id,
473 self.heartbeat_secs,
474 &self.api_key,
475 &passphrase,
476 &encoded_signature,
477 &now,
478 );
479
480 if let Some(socket) = &self.socket {
481 tracing::info!("Logging on...");
482
483 match socket.send_bytes(logon_msg.to_bytes()).await {
484 Ok(()) => tracing::debug!("Sent logon message"),
485 Err(e) => tracing::error!("Error on logon: {e}"),
486 }
487 } else {
488 anyhow::bail!("Socket not connected".to_string());
489 }
490
491 let start = std::time::Instant::now();
492 while !self.is_logged_on() {
493 tokio::time::sleep(Duration::from_millis(100)).await;
494
495 if start.elapsed() > Duration::from_secs(10) {
496 anyhow::bail!("Logon timeout".to_string());
497 }
498 }
499
500 self.logged_on.store(true, Ordering::SeqCst);
501
502 Ok(())
503 }
504
505 async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
507 if self.socket.is_none() {
508 anyhow::bail!("Socket not connected".to_string());
509 }
510
511 let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
512 let now = chrono::Utc::now();
513
514 let logout_msg = FixMessage::create_logout(
515 seq_num,
516 &self.sender_comp_id,
517 &self.target_comp_id,
518 Some(text),
519 &now,
520 );
521
522 if let Some(socket) = &self.socket {
523 match socket.send_bytes(logout_msg.to_bytes()).await {
524 Ok(()) => tracing::debug!("Sent logout message"),
525 Err(e) => tracing::error!("Error on logout: {e}"),
526 }
527 } else {
528 anyhow::bail!("Socket not connected".to_string());
529 }
530
531 Ok(())
532 }
533}
534
535#[cfg(feature = "python")]
537pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
538 if let Err(e) = callback.call1(py, (py_obj,)) {
539 tracing::error!("Error calling Python: {e}");
540 }
541}