nautilus_coinbase_intx/fix/
client.rs1use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicUsize, Ordering},
29 },
30 time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::logging::{log_task_started, log_task_stopped};
36#[cfg(feature = "python")]
37use nautilus_core::python::IntoPyObjectNautilusExt;
38use nautilus_core::{env::get_or_env_var, time::get_atomic_clock_realtime};
39use nautilus_model::identifiers::AccountId;
40use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
41#[cfg(feature = "python")]
42use pyo3::prelude::*;
43use tokio::task::JoinHandle;
44use tokio_tungstenite::tungstenite::stream::Mode;
45
46use super::{
47 messages::{FIX_DELIMITER, FixMessage},
48 parse::convert_to_order_status_report,
49};
50use crate::{
51 common::consts::COINBASE_INTX,
52 fix::{
53 messages::{fix_exec_type, fix_message_type, fix_tag},
54 parse::convert_to_fill_report,
55 },
56};
57
58#[cfg_attr(
59 feature = "python",
60 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
61)]
62#[derive(Debug, Clone)]
63pub struct CoinbaseIntxFixClient {
64 endpoint: String,
65 api_key: String,
66 api_secret: String,
67 api_passphrase: String,
68 portfolio_id: String,
69 sender_comp_id: String,
70 target_comp_id: String,
71 socket: Option<Arc<SocketClient>>,
72 connected: Arc<AtomicBool>,
73 logged_on: Arc<AtomicBool>,
74 seq_num: Arc<AtomicUsize>,
75 received_seq_num: Arc<AtomicUsize>,
76 heartbeat_secs: u64,
77 processing_task: Option<Arc<JoinHandle<()>>>,
78 heartbeat_task: Option<Arc<JoinHandle<()>>>,
79}
80
81impl CoinbaseIntxFixClient {
82 pub fn new(
88 endpoint: Option<String>,
89 api_key: Option<String>,
90 api_secret: Option<String>,
91 api_passphrase: Option<String>,
92 portfolio_id: Option<String>,
93 ) -> anyhow::Result<Self> {
94 let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
95 let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
96 let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
97 let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
98 let portfolio_id = get_or_env_var(portfolio_id, "COINBASE_INTX_PORTFOLIO_ID")?;
99 let sender_comp_id = api_key.clone();
100 let target_comp_id = "CBINTLDC".to_string(); Ok(Self {
103 endpoint,
104 api_key,
105 api_secret,
106 api_passphrase,
107 portfolio_id,
108 sender_comp_id,
109 target_comp_id,
110 socket: None,
111 connected: Arc::new(AtomicBool::new(false)),
112 logged_on: Arc::new(AtomicBool::new(false)),
113 seq_num: Arc::new(AtomicUsize::new(1)),
114 received_seq_num: Arc::new(AtomicUsize::new(0)),
115 heartbeat_secs: 10, processing_task: None,
117 heartbeat_task: None,
118 })
119 }
120
121 pub fn from_env() -> anyhow::Result<Self> {
128 Self::new(None, None, None, None, None)
129 }
130
131 #[must_use]
133 pub const fn endpoint(&self) -> &str {
134 self.endpoint.as_str()
135 }
136
137 #[must_use]
139 pub const fn api_key(&self) -> &str {
140 self.api_key.as_str()
141 }
142
143 #[must_use]
145 pub fn api_key_masked(&self) -> String {
146 nautilus_core::string::mask_api_key(&self.api_key)
147 }
148
149 #[must_use]
151 pub const fn portfolio_id(&self) -> &str {
152 self.portfolio_id.as_str()
153 }
154
155 #[must_use]
157 pub const fn sender_comp_id(&self) -> &str {
158 self.sender_comp_id.as_str()
159 }
160
161 #[must_use]
163 pub const fn target_comp_id(&self) -> &str {
164 self.target_comp_id.as_str()
165 }
166
167 #[must_use]
169 pub fn is_connected(&self) -> bool {
170 self.connected.load(Ordering::SeqCst)
171 }
172
173 #[must_use]
175 pub fn is_logged_on(&self) -> bool {
176 self.logged_on.load(Ordering::SeqCst)
177 }
178
179 pub async fn connect(
189 &mut self,
190 #[cfg(feature = "python")] handler: Py<PyAny>,
191 #[cfg(not(feature = "python"))] _handler: (),
192 ) -> anyhow::Result<()> {
193 let logged_on = self.logged_on.clone();
194 let seq_num = self.seq_num.clone();
195 let received_seq_num = self.received_seq_num.clone();
196 let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
197
198 let handle_message = Arc::new(move |data: &[u8]| {
199 if let Ok(message) = FixMessage::parse(data) {
200 if let Some(msg_seq) = message.msg_seq_num() {
202 received_seq_num.store(msg_seq, Ordering::SeqCst);
203 }
204
205 if let Some(msg_type) = message.msg_type() {
207 match msg_type {
208 fix_message_type::LOGON => {
209 tracing::info!("Logon successful");
210 logged_on.store(true, Ordering::SeqCst);
211 }
212 fix_message_type::LOGOUT => {
213 tracing::info!("Received logout");
214 logged_on.store(false, Ordering::SeqCst);
215 }
216 fix_message_type::EXECUTION_REPORT => {
217 if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
218 if matches!(
219 exec_type,
220 fix_exec_type::REJECTED
221 | fix_exec_type::NEW
222 | fix_exec_type::PENDING_NEW
223 ) {
224 tracing::debug!(
226 "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
227 );
228 } else if matches!(
229 exec_type,
230 fix_exec_type::CANCELED
231 | fix_exec_type::EXPIRED
232 | fix_exec_type::REPLACED
233 ) {
234 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
236 match convert_to_order_status_report(
237 &message, account_id, ts_init,
238 ) {
239 #[cfg(feature = "python")]
240 Ok(report) => Python::attach(|py| {
241 call_python(
242 py,
243 &handler,
244 report.into_py_any_unwrap(py),
245 );
246 }),
247 #[cfg(not(feature = "python"))]
248 Ok(_report) => {
249 tracing::debug!(
250 "Order status report handled (Python disabled)"
251 );
252 }
253 Err(e) => {
254 tracing::error!(
255 "Failed to parse FIX execution report: {e}"
256 );
257 }
258 }
259 } else if exec_type == fix_exec_type::PARTIAL_FILL
260 || exec_type == fix_exec_type::FILL
261 {
262 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
264 match convert_to_fill_report(&message, account_id, ts_init) {
265 #[cfg(feature = "python")]
266 Ok(report) => Python::attach(|py| {
267 call_python(
268 py,
269 &handler,
270 report.into_py_any_unwrap(py),
271 );
272 }),
273 #[cfg(not(feature = "python"))]
274 Ok(_report) => {
275 tracing::debug!(
276 "Fill report handled (Python disabled)"
277 );
278 }
279 Err(e) => {
280 tracing::error!(
281 "Failed to parse FIX execution report: {e}"
282 );
283 }
284 }
285 } else {
286 tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
287 }
288 }
289 }
290 _ => tracing::trace!("Received unexpected {message:?}"),
294 }
295 }
296 } else {
297 tracing::error!("Failed to parse FIX message");
298 }
299 });
300
301 let config = SocketConfig {
302 url: self.endpoint.clone(),
303 mode: Mode::Tls,
304 suffix: vec![FIX_DELIMITER],
305 message_handler: Some(handle_message),
306 heartbeat: None, reconnect_timeout_ms: Some(10000),
308 reconnect_delay_initial_ms: Some(5000),
309 reconnect_delay_max_ms: Some(30000),
310 reconnect_backoff_factor: Some(1.5),
311 reconnect_jitter_ms: Some(500),
312 reconnect_max_attempts: None,
313 connection_max_retries: None,
314 certs_dir: None,
315 };
316
317 let socket = match SocketClient::connect(
318 config, None, None, None, )
322 .await
323 {
324 Ok(socket) => socket,
325 Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
326 };
327
328 let writer_tx = socket.writer_tx.clone();
329
330 self.socket = Some(Arc::new(socket));
331
332 self.send_logon().await?;
333
334 let connected_clone = self.connected.clone();
336 let logged_on_clone = self.logged_on.clone();
337 let heartbeat_secs = self.heartbeat_secs;
338 let client_clone = self.clone();
339
340 self.processing_task = Some(Arc::new(tokio::spawn(async move {
341 log_task_started("maintain-fix-connection");
342
343 let mut last_logon_attempt = std::time::Instant::now()
344 .checked_sub(Duration::from_secs(10))
345 .unwrap();
346
347 loop {
348 tokio::time::sleep(Duration::from_millis(100)).await;
349
350 if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
352 {
353 if last_logon_attempt.elapsed() > Duration::from_secs(10) {
355 tracing::info!("Connected without logon");
356 last_logon_attempt = std::time::Instant::now();
357
358 if let Err(e) = client_clone.send_logon().await {
359 tracing::error!("Failed to send logon: {e}");
360 }
361 }
362 }
363 }
364 })));
365
366 let logged_on_clone = self.logged_on.clone();
367 let sender_comp_id = self.sender_comp_id.clone();
368 let target_comp_id = self.target_comp_id.clone();
369
370 self.heartbeat_task = Some(Arc::new(tokio::spawn(async move {
371 log_task_started("heartbeat");
372 tracing::debug!("Heartbeat at {heartbeat_secs}s intervals");
373
374 let interval = Duration::from_secs(heartbeat_secs);
375
376 loop {
377 if logged_on_clone.load(Ordering::SeqCst) {
378 let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
380 let now = chrono::Utc::now();
381 let msg =
382 FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
383
384 if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
385 tracing::error!("Failed to send heartbeat: {e}");
386 break;
387 }
388
389 tracing::trace!("Sent heartbeat");
390 } else {
391 tracing::debug!("No longer logged on, stopping heartbeat task");
393 break;
394 }
395
396 tokio::time::sleep(interval).await;
397 }
398
399 log_task_stopped("heartbeat");
400 })));
401
402 Ok(())
403 }
404
405 pub async fn close(&mut self) -> anyhow::Result<()> {
411 if self.is_logged_on()
413 && let Err(e) = self.send_logout("Normal logout").await
414 {
415 tracing::warn!("Failed to send logout message: {e}");
416 }
417
418 if let Some(socket) = &self.socket {
420 socket.close().await;
421 }
422
423 if let Some(task) = self.processing_task.take() {
425 task.abort();
426 }
427
428 if let Some(task) = self.heartbeat_task.take() {
430 task.abort();
431 }
432
433 self.connected.store(false, Ordering::SeqCst);
434 self.logged_on.store(false, Ordering::SeqCst);
435
436 Ok(())
437 }
438
439 async fn send_logon(&self) -> anyhow::Result<()> {
441 if self.socket.is_none() {
442 anyhow::bail!("Socket not connected".to_string());
443 }
444
445 self.seq_num.store(1, Ordering::SeqCst);
447
448 let now = chrono::Utc::now();
449 let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
450 let passphrase = self.api_passphrase.clone();
451
452 let message = format!(
453 "{}{}{}{}",
454 timestamp, self.api_key, self.target_comp_id, passphrase
455 );
456
457 let decoded_secret = BASE64_STANDARD
459 .decode(&self.api_secret)
460 .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
461
462 let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
463 let tag = hmac::sign(&key, message.as_bytes());
464 let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
465
466 let logon_msg = FixMessage::create_logon(
467 1, &self.sender_comp_id,
469 &self.target_comp_id,
470 self.heartbeat_secs,
471 &self.api_key,
472 &passphrase,
473 &encoded_signature,
474 &now,
475 );
476
477 if let Some(socket) = &self.socket {
478 tracing::info!("Logging on...");
479
480 match socket.send_bytes(logon_msg.to_bytes()).await {
481 Ok(()) => tracing::debug!("Sent logon message"),
482 Err(e) => tracing::error!("Error on logon: {e}"),
483 }
484 } else {
485 anyhow::bail!("Socket not connected".to_string());
486 }
487
488 let start = std::time::Instant::now();
489 while !self.is_logged_on() {
490 tokio::time::sleep(Duration::from_millis(100)).await;
491
492 if start.elapsed() > Duration::from_secs(10) {
493 anyhow::bail!("Logon timeout".to_string());
494 }
495 }
496
497 self.logged_on.store(true, Ordering::SeqCst);
498
499 Ok(())
500 }
501
502 async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
504 if self.socket.is_none() {
505 anyhow::bail!("Socket not connected".to_string());
506 }
507
508 let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
509 let now = chrono::Utc::now();
510
511 let logout_msg = FixMessage::create_logout(
512 seq_num,
513 &self.sender_comp_id,
514 &self.target_comp_id,
515 Some(text),
516 &now,
517 );
518
519 if let Some(socket) = &self.socket {
520 match socket.send_bytes(logout_msg.to_bytes()).await {
521 Ok(()) => tracing::debug!("Sent logout message"),
522 Err(e) => tracing::error!("Error on logout: {e}"),
523 }
524 } else {
525 anyhow::bail!("Socket not connected".to_string());
526 }
527
528 Ok(())
529 }
530}
531
532#[cfg(feature = "python")]
534pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
535 if let Err(e) = callback.call1(py, (py_obj,)) {
536 tracing::error!("Error calling Python: {e}");
537 }
538}