nautilus_coinbase_intx/fix/
client.rs1use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicUsize, Ordering},
29 },
30 time::Duration,
31};
32
33use aws_lc_rs::hmac;
34use base64::prelude::*;
35use nautilus_common::logging::{log_task_started, log_task_stopped};
36#[cfg(feature = "python")]
37use nautilus_core::python::IntoPyObjectNautilusExt;
38use nautilus_core::{env::get_env_var, time::get_atomic_clock_realtime};
39use nautilus_model::identifiers::AccountId;
40use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
41#[cfg(feature = "python")]
42use pyo3::prelude::*;
43use tokio::task::JoinHandle;
44use tokio_tungstenite::tungstenite::stream::Mode;
45
46use super::{
47 messages::{FIX_DELIMITER, FixMessage},
48 parse::convert_to_order_status_report,
49};
50use crate::{
51 common::consts::COINBASE_INTX,
52 fix::{
53 messages::{fix_exec_type, fix_message_type, fix_tag},
54 parse::convert_to_fill_report,
55 },
56};
57
58#[cfg_attr(
59 feature = "python",
60 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
61)]
62#[derive(Debug, Clone)]
63pub struct CoinbaseIntxFixClient {
64 endpoint: String,
65 api_key: String,
66 api_secret: String,
67 api_passphrase: String,
68 portfolio_id: String,
69 sender_comp_id: String,
70 target_comp_id: String,
71 socket: Option<Arc<SocketClient>>,
72 connected: Arc<AtomicBool>,
73 logged_on: Arc<AtomicBool>,
74 seq_num: Arc<AtomicUsize>,
75 received_seq_num: Arc<AtomicUsize>,
76 heartbeat_secs: u64,
77 processing_task: Option<Arc<JoinHandle<()>>>,
78 heartbeat_task: Option<Arc<JoinHandle<()>>>,
79}
80
81impl CoinbaseIntxFixClient {
82 pub fn new(
88 endpoint: Option<String>,
89 api_key: Option<String>,
90 api_secret: Option<String>,
91 api_passphrase: Option<String>,
92 portfolio_id: Option<String>,
93 ) -> anyhow::Result<Self> {
94 let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
95 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
96 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
97 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
98 let portfolio_id = portfolio_id.unwrap_or(get_env_var("COINBASE_INTX_PORTFOLIO_ID")?);
99 let sender_comp_id = api_key.to_string();
100 let target_comp_id = "CBINTLDC".to_string(); Ok(Self {
103 endpoint,
104 api_key,
105 api_secret,
106 api_passphrase,
107 portfolio_id,
108 sender_comp_id,
109 target_comp_id,
110 socket: None,
111 connected: Arc::new(AtomicBool::new(false)),
112 logged_on: Arc::new(AtomicBool::new(false)),
113 seq_num: Arc::new(AtomicUsize::new(1)),
114 received_seq_num: Arc::new(AtomicUsize::new(0)),
115 heartbeat_secs: 10, processing_task: None,
117 heartbeat_task: None,
118 })
119 }
120
121 pub fn from_env() -> anyhow::Result<Self> {
128 Self::new(None, None, None, None, None)
129 }
130
131 #[must_use]
133 pub const fn endpoint(&self) -> &str {
134 self.endpoint.as_str()
135 }
136
137 #[must_use]
139 pub const fn api_key(&self) -> &str {
140 self.api_key.as_str()
141 }
142
143 #[must_use]
145 pub const fn portfolio_id(&self) -> &str {
146 self.portfolio_id.as_str()
147 }
148
149 #[must_use]
151 pub const fn sender_comp_id(&self) -> &str {
152 self.sender_comp_id.as_str()
153 }
154
155 #[must_use]
157 pub const fn target_comp_id(&self) -> &str {
158 self.target_comp_id.as_str()
159 }
160
161 #[must_use]
163 pub fn is_connected(&self) -> bool {
164 self.connected.load(Ordering::SeqCst)
165 }
166
167 #[must_use]
169 pub fn is_logged_on(&self) -> bool {
170 self.logged_on.load(Ordering::SeqCst)
171 }
172
173 pub async fn connect(
183 &mut self,
184 #[cfg(feature = "python")] handler: PyObject,
185 #[cfg(not(feature = "python"))] _handler: (),
186 ) -> anyhow::Result<()> {
187 let logged_on = self.logged_on.clone();
188 let seq_num = self.seq_num.clone();
189 let received_seq_num = self.received_seq_num.clone();
190 let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
191
192 let handle_message = Arc::new(move |data: &[u8]| {
193 if let Ok(message) = FixMessage::parse(data) {
194 if let Some(msg_seq) = message.msg_seq_num() {
196 received_seq_num.store(msg_seq, Ordering::SeqCst);
197 }
198
199 if let Some(msg_type) = message.msg_type() {
201 match msg_type {
202 fix_message_type::LOGON => {
203 tracing::info!("Logon successful");
204 logged_on.store(true, Ordering::SeqCst);
205 }
206 fix_message_type::LOGOUT => {
207 tracing::info!("Received logout");
208 logged_on.store(false, Ordering::SeqCst);
209 }
210 fix_message_type::EXECUTION_REPORT => {
211 if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
212 if matches!(
213 exec_type,
214 fix_exec_type::REJECTED
215 | fix_exec_type::NEW
216 | fix_exec_type::PENDING_NEW
217 ) {
218 tracing::debug!(
220 "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
221 );
222 } else if matches!(
223 exec_type,
224 fix_exec_type::CANCELED
225 | fix_exec_type::EXPIRED
226 | fix_exec_type::REPLACED
227 ) {
228 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
230 match convert_to_order_status_report(
231 &message, account_id, ts_init,
232 ) {
233 #[cfg(feature = "python")]
234 Ok(report) => Python::with_gil(|py| {
235 call_python(
236 py,
237 &handler,
238 report.into_py_any_unwrap(py),
239 );
240 }),
241 #[cfg(not(feature = "python"))]
242 Ok(_report) => {
243 tracing::debug!(
244 "Order status report handled (Python disabled)"
245 );
246 }
247 Err(e) => {
248 tracing::error!(
249 "Failed to parse FIX execution report: {e}"
250 );
251 }
252 }
253 } else if exec_type == fix_exec_type::PARTIAL_FILL
254 || exec_type == fix_exec_type::FILL
255 {
256 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
258 match convert_to_fill_report(&message, account_id, ts_init) {
259 #[cfg(feature = "python")]
260 Ok(report) => Python::with_gil(|py| {
261 call_python(
262 py,
263 &handler,
264 report.into_py_any_unwrap(py),
265 );
266 }),
267 #[cfg(not(feature = "python"))]
268 Ok(_report) => {
269 tracing::debug!(
270 "Fill report handled (Python disabled)"
271 );
272 }
273 Err(e) => {
274 tracing::error!(
275 "Failed to parse FIX execution report: {e}"
276 );
277 }
278 }
279 } else {
280 tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
281 }
282 }
283 }
284 _ => tracing::trace!("Recieved unexpected {message:?}"),
288 }
289 }
290 } else {
291 tracing::error!("Failed to parse FIX message");
292 }
293 });
294
295 let config = SocketConfig {
296 url: self.endpoint.clone(),
297 mode: Mode::Tls,
298 suffix: vec![FIX_DELIMITER],
299 message_handler: Some(handle_message),
300 heartbeat: None, reconnect_timeout_ms: Some(10000),
302 reconnect_delay_initial_ms: Some(5000),
303 reconnect_delay_max_ms: Some(30000),
304 reconnect_backoff_factor: Some(1.5),
305 reconnect_jitter_ms: Some(500),
306 certs_dir: None,
307 };
308
309 let socket = match SocketClient::connect(
310 config, None, None, None, )
314 .await
315 {
316 Ok(socket) => socket,
317 Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
318 };
319
320 let writer_tx = socket.writer_tx.clone();
321
322 self.socket = Some(Arc::new(socket));
323
324 self.send_logon().await?;
325
326 let connected_clone = self.connected.clone();
328 let logged_on_clone = self.logged_on.clone();
329 let heartbeat_secs = self.heartbeat_secs;
330 let client_clone = self.clone();
331
332 self.processing_task = Some(Arc::new(tokio::spawn(async move {
333 log_task_started("maintain-fix-connection");
334
335 let mut last_logon_attempt = std::time::Instant::now()
336 .checked_sub(Duration::from_secs(10))
337 .unwrap();
338
339 loop {
340 tokio::time::sleep(Duration::from_millis(100)).await;
341
342 if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
344 {
345 if last_logon_attempt.elapsed() > Duration::from_secs(10) {
347 tracing::info!("Connected without logon");
348 last_logon_attempt = std::time::Instant::now();
349
350 if let Err(e) = client_clone.send_logon().await {
351 tracing::error!("Failed to send logon: {e}");
352 }
353 }
354 }
355 }
356 })));
357
358 let logged_on_clone = self.logged_on.clone();
359 let sender_comp_id = self.sender_comp_id.clone();
360 let target_comp_id = self.target_comp_id.clone();
361
362 self.heartbeat_task = Some(Arc::new(tokio::spawn(async move {
363 log_task_started("heartbeat");
364 tracing::debug!("Heartbeat at {heartbeat_secs}s intervals");
365
366 let interval = Duration::from_secs(heartbeat_secs);
367
368 loop {
369 if logged_on_clone.load(Ordering::SeqCst) {
370 let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
372 let now = chrono::Utc::now();
373 let msg =
374 FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
375
376 if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
377 tracing::error!("Failed to send heartbeat: {e}");
378 break;
379 }
380
381 tracing::trace!("Sent heartbeat");
382 } else {
383 tracing::debug!("No longer logged on, stopping heartbeat task");
385 break;
386 }
387
388 tokio::time::sleep(interval).await;
389 }
390
391 log_task_stopped("heartbeat");
392 })));
393
394 Ok(())
395 }
396
397 pub async fn close(&mut self) -> anyhow::Result<()> {
403 if self.is_logged_on()
405 && let Err(e) = self.send_logout("Normal logout").await
406 {
407 tracing::warn!("Failed to send logout message: {e}");
408 }
409
410 if let Some(socket) = &self.socket {
412 socket.close().await;
413 }
414
415 if let Some(task) = self.processing_task.take() {
417 task.abort();
418 }
419
420 if let Some(task) = self.heartbeat_task.take() {
422 task.abort();
423 }
424
425 self.connected.store(false, Ordering::SeqCst);
426 self.logged_on.store(false, Ordering::SeqCst);
427
428 Ok(())
429 }
430
431 async fn send_logon(&self) -> anyhow::Result<()> {
433 if self.socket.is_none() {
434 anyhow::bail!("Socket not connected".to_string());
435 }
436
437 self.seq_num.store(1, Ordering::SeqCst);
439
440 let now = chrono::Utc::now();
441 let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
442 let passphrase = self.api_passphrase.clone();
443
444 let message = format!(
445 "{}{}{}{}",
446 timestamp, self.api_key, self.target_comp_id, passphrase
447 );
448
449 let decoded_secret = BASE64_STANDARD
451 .decode(&self.api_secret)
452 .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
453
454 let key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
455 let tag = hmac::sign(&key, message.as_bytes());
456 let encoded_signature = BASE64_STANDARD.encode(tag.as_ref());
457
458 let logon_msg = FixMessage::create_logon(
459 1, &self.sender_comp_id,
461 &self.target_comp_id,
462 self.heartbeat_secs,
463 &self.api_key,
464 &passphrase,
465 &encoded_signature,
466 &now,
467 );
468
469 if let Some(socket) = &self.socket {
470 tracing::info!("Logging on...");
471
472 match socket.send_bytes(logon_msg.to_bytes()).await {
473 Ok(()) => tracing::debug!("Sent logon message"),
474 Err(e) => tracing::error!("Error on logon: {e}"),
475 }
476 } else {
477 anyhow::bail!("Socket not connected".to_string());
478 }
479
480 let start = std::time::Instant::now();
481 while !self.is_logged_on() {
482 tokio::time::sleep(Duration::from_millis(100)).await;
483
484 if start.elapsed() > Duration::from_secs(10) {
485 anyhow::bail!("Logon timeout".to_string());
486 }
487 }
488
489 self.logged_on.store(true, Ordering::SeqCst);
490
491 Ok(())
492 }
493
494 async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
496 if self.socket.is_none() {
497 anyhow::bail!("Socket not connected".to_string());
498 }
499
500 let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
501 let now = chrono::Utc::now();
502
503 let logout_msg = FixMessage::create_logout(
504 seq_num,
505 &self.sender_comp_id,
506 &self.target_comp_id,
507 Some(text),
508 &now,
509 );
510
511 if let Some(socket) = &self.socket {
512 match socket.send_bytes(logout_msg.to_bytes()).await {
513 Ok(()) => tracing::debug!("Sent logout message"),
514 Err(e) => tracing::error!("Error on logout: {e}"),
515 }
516 } else {
517 anyhow::bail!("Socket not connected".to_string());
518 }
519
520 Ok(())
521 }
522}
523
524#[cfg(feature = "python")]
526pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
527 if let Err(e) = callback.call1(py, (py_obj,)) {
528 tracing::error!("Error calling Python: {e}");
529 }
530}