nautilus_coinbase_intx/fix/
client.rs
1use std::{
26 sync::{
27 Arc,
28 atomic::{AtomicBool, AtomicUsize, Ordering},
29 },
30 time::Duration,
31};
32
33use base64::prelude::*;
34use nautilus_core::{python::IntoPyObjectNautilusExt, time::get_atomic_clock_realtime};
35use nautilus_model::identifiers::AccountId;
36use nautilus_network::socket::{SocketClient, SocketConfig, WriterCommand};
37use pyo3::prelude::*;
38use ring::hmac;
39use tokio::task::JoinHandle;
40use tokio_tungstenite::tungstenite::stream::Mode;
41
42use super::{
43 messages::{FIX_DELIMITER, FixMessage},
44 parse::convert_to_order_status_report,
45};
46use crate::{
47 common::{consts::COINBASE_INTX, credential::get_env_var},
48 fix::{
49 messages::{fix_exec_type, fix_message_type, fix_tag},
50 parse::convert_to_fill_report,
51 },
52};
53
54#[pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")]
55#[derive(Clone)]
56pub struct CoinbaseIntxFixClient {
57 endpoint: String,
58 api_key: String,
59 api_secret: String,
60 api_passphrase: String,
61 portfolio_id: String,
62 sender_comp_id: String,
63 target_comp_id: String,
64 socket: Option<Arc<SocketClient>>,
65 connected: Arc<AtomicBool>,
66 logged_on: Arc<AtomicBool>,
67 seq_num: Arc<AtomicUsize>,
68 received_seq_num: Arc<AtomicUsize>,
69 heartbeat_secs: u64,
70 processing_task: Option<Arc<JoinHandle<()>>>,
71 heartbeat_task: Option<Arc<JoinHandle<()>>>,
72}
73
74impl CoinbaseIntxFixClient {
75 pub fn new(
77 endpoint: Option<String>,
78 api_key: Option<String>,
79 api_secret: Option<String>,
80 api_passphrase: Option<String>,
81 portfolio_id: Option<String>,
82 ) -> anyhow::Result<Self> {
83 let endpoint = endpoint.unwrap_or("fix.international.coinbase.com:6130".to_string());
84 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
85 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
86 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
87 let portfolio_id = portfolio_id.unwrap_or(get_env_var("COINBASE_INTX_PORTFOLIO_ID")?);
88 let sender_comp_id = api_key.to_string();
89 let target_comp_id = "CBINTLDC".to_string(); Ok(Self {
92 endpoint,
93 api_key,
94 api_secret,
95 api_passphrase,
96 portfolio_id,
97 sender_comp_id,
98 target_comp_id,
99 socket: None,
100 connected: Arc::new(AtomicBool::new(false)),
101 logged_on: Arc::new(AtomicBool::new(false)),
102 seq_num: Arc::new(AtomicUsize::new(1)),
103 received_seq_num: Arc::new(AtomicUsize::new(0)),
104 heartbeat_secs: 10, processing_task: None,
106 heartbeat_task: None,
107 })
108 }
109
110 pub fn from_env() -> anyhow::Result<Self> {
113 Self::new(None, None, None, None, None)
114 }
115
116 pub fn endpoint(&self) -> &str {
118 self.endpoint.as_str()
119 }
120
121 pub fn api_key(&self) -> &str {
123 self.api_key.as_str()
124 }
125
126 pub fn portfolio_id(&self) -> &str {
128 self.portfolio_id.as_str()
129 }
130
131 pub fn sender_comp_id(&self) -> &str {
133 self.sender_comp_id.as_str()
134 }
135
136 pub fn target_comp_id(&self) -> &str {
138 self.target_comp_id.as_str()
139 }
140
141 pub fn is_connected(&self) -> bool {
143 self.connected.load(Ordering::SeqCst)
144 }
145
146 pub fn is_logged_on(&self) -> bool {
148 self.logged_on.load(Ordering::SeqCst)
149 }
150
151 pub async fn connect(&mut self, handler: PyObject) -> anyhow::Result<()> {
153 let config = SocketConfig {
154 url: self.endpoint.clone(),
155 mode: Mode::Tls,
156 suffix: vec![FIX_DELIMITER],
157 py_handler: None, heartbeat: None, reconnect_timeout_ms: Some(10000),
160 reconnect_delay_initial_ms: Some(5000),
161 reconnect_delay_max_ms: Some(30000),
162 reconnect_backoff_factor: Some(1.5),
163 reconnect_jitter_ms: Some(500),
164 certs_dir: None,
165 };
166
167 let logged_on = self.logged_on.clone();
168 let seq_num = self.seq_num.clone();
169 let received_seq_num = self.received_seq_num.clone();
170 let account_id = AccountId::new(format!("{COINBASE_INTX}-{}", self.portfolio_id));
171
172 let handle_message = Arc::new(move |data: &[u8]| {
173 if let Ok(message) = FixMessage::parse(data) {
174 if let Some(msg_seq) = message.msg_seq_num() {
176 received_seq_num.store(msg_seq, Ordering::SeqCst);
177 }
178
179 if let Some(msg_type) = message.msg_type() {
181 match msg_type {
182 fix_message_type::LOGON => {
183 tracing::info!("Logon successful");
184 logged_on.store(true, Ordering::SeqCst);
185 }
186 fix_message_type::LOGOUT => {
187 tracing::info!("Received logout");
188 logged_on.store(false, Ordering::SeqCst);
189 }
190 fix_message_type::EXECUTION_REPORT => {
191 if let Some(exec_type) = message.get_field(fix_tag::EXEC_TYPE) {
192 if matches!(
193 exec_type,
194 fix_exec_type::REJECTED
195 | fix_exec_type::NEW
196 | fix_exec_type::PENDING_NEW
197 ) {
198 tracing::debug!(
200 "Received execution report for EXEC_TYPE {exec_type} (not handling here)"
201 );
202 } else if matches!(
203 exec_type,
204 fix_exec_type::CANCELED
205 | fix_exec_type::EXPIRED
206 | fix_exec_type::REPLACED
207 ) {
208 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
210 match convert_to_order_status_report(
211 &message, account_id, ts_init,
212 ) {
213 Ok(report) => Python::with_gil(|py| {
214 call_python(
215 py,
216 &handler,
217 report.into_py_any_unwrap(py),
218 );
219 }),
220 Err(e) => {
221 tracing::error!(
222 "Failed to parse FIX execution report: {e}"
223 );
224 }
225 }
226 } else if exec_type == fix_exec_type::PARTIAL_FILL
227 || exec_type == fix_exec_type::FILL
228 {
229 let clock = get_atomic_clock_realtime(); let ts_init = clock.get_time_ns();
231 match convert_to_fill_report(&message, account_id, ts_init) {
232 Ok(report) => Python::with_gil(|py| {
233 call_python(
234 py,
235 &handler,
236 report.into_py_any_unwrap(py),
237 );
238 }),
239 Err(e) => {
240 tracing::error!(
241 "Failed to parse FIX execution report: {e}"
242 );
243 }
244 }
245 } else {
246 tracing::warn!("Unhandled EXEC_TYPE {exec_type}: {message:?}");
247 }
248 }
249 }
250 _ => tracing::trace!("Recieved unexpected {message:?}"),
254 }
255 }
256 } else {
257 tracing::error!("Failed to parse FIX message");
258 }
259 });
260
261 let socket =
262 match SocketClient::connect(config, Some(handle_message), None, None, None).await {
263 Ok(socket) => socket,
264 Err(e) => anyhow::bail!("Failed to connect to FIX endpoint: {e:?}"),
265 };
266
267 let writer_tx = socket.writer_tx.clone();
268
269 self.socket = Some(Arc::new(socket));
270
271 self.send_logon().await?;
272
273 let connected_clone = self.connected.clone();
275 let logged_on_clone = self.logged_on.clone();
276 let heartbeat_secs = self.heartbeat_secs;
277 let client_clone = self.clone();
278
279 self.processing_task = Some(Arc::new(tokio::spawn(async move {
280 tracing::debug!("Started task 'maintain FIX connection'");
281
282 let mut last_logon_attempt = std::time::Instant::now() - Duration::from_secs(10);
283
284 loop {
285 tokio::time::sleep(Duration::from_millis(100)).await;
286
287 if connected_clone.load(Ordering::SeqCst) && !logged_on_clone.load(Ordering::SeqCst)
289 {
290 if last_logon_attempt.elapsed() > Duration::from_secs(10) {
292 tracing::info!("Connected without logon");
293 last_logon_attempt = std::time::Instant::now();
294
295 if let Err(e) = client_clone.send_logon().await {
296 tracing::error!("Failed to send logon: {e}");
297 }
298 }
299 }
300 }
301 })));
302
303 let logged_on_clone = self.logged_on.clone();
304 let sender_comp_id = self.sender_comp_id.clone();
305 let target_comp_id = self.target_comp_id.clone();
306
307 self.heartbeat_task = Some(Arc::new(tokio::spawn(async move {
308 tracing::debug!("Started task 'FIX heartbeat' at {heartbeat_secs}s intervals");
309 let interval = Duration::from_secs(heartbeat_secs);
310
311 loop {
312 if logged_on_clone.load(Ordering::SeqCst) {
313 let seq = seq_num.fetch_add(1, Ordering::SeqCst) + 1;
315 let now = chrono::Utc::now();
316 let msg =
317 FixMessage::create_heartbeat(seq, &sender_comp_id, &target_comp_id, &now);
318
319 if let Err(e) = writer_tx.send(WriterCommand::Send(msg.to_bytes().into())) {
320 tracing::error!("Failed to send heartbeat: {e}");
321 break;
322 }
323
324 tracing::trace!("Sent heartbeat");
325 } else {
326 tracing::debug!("No longer logged on, stopping heartbeat task");
328 break;
329 }
330
331 tokio::time::sleep(interval).await;
332 }
333
334 tracing::debug!("Stopped task 'FIX heartbeat'");
335 })));
336
337 Ok(())
338 }
339
340 pub async fn close(&mut self) -> anyhow::Result<()> {
342 if self.is_logged_on() {
344 if let Err(e) = self.send_logout("Normal logout").await {
345 tracing::warn!("Failed to send logout message: {e}");
346 }
347 }
348
349 if let Some(socket) = &self.socket {
351 socket.close().await;
352 }
353
354 if let Some(task) = self.processing_task.take() {
356 task.abort();
357 }
358
359 if let Some(task) = self.heartbeat_task.take() {
361 task.abort();
362 }
363
364 self.connected.store(false, Ordering::SeqCst);
365 self.logged_on.store(false, Ordering::SeqCst);
366
367 Ok(())
368 }
369
370 async fn send_logon(&self) -> anyhow::Result<()> {
372 if self.socket.is_none() {
373 anyhow::bail!("Socket not connected".to_string());
374 }
375
376 self.seq_num.store(1, Ordering::SeqCst);
378
379 let now = chrono::Utc::now();
380 let timestamp = now.format("%Y%m%d-%H:%M:%S.%3f").to_string();
381 let passphrase = self.api_passphrase.clone();
382
383 let message = format!(
384 "{}{}{}{}",
385 timestamp, self.api_key, self.target_comp_id, passphrase
386 );
387
388 let decoded_secret = BASE64_STANDARD
390 .decode(&self.api_secret)
391 .map_err(|e| anyhow::anyhow!("Invalid base64 secret key: {e}"))?;
392
393 let hmac_key = hmac::Key::new(hmac::HMAC_SHA256, &decoded_secret);
394 let signature = hmac::sign(&hmac_key, message.as_bytes());
395 let encoded_signature = BASE64_STANDARD.encode(signature);
396
397 let logon_msg = FixMessage::create_logon(
398 1, &self.sender_comp_id,
400 &self.target_comp_id,
401 self.heartbeat_secs,
402 &self.api_key,
403 &passphrase,
404 &encoded_signature,
405 &now,
406 );
407
408 if let Some(socket) = &self.socket {
409 tracing::info!("Logging on...");
410
411 match socket.send_bytes(logon_msg.to_bytes()).await {
412 Ok(_) => tracing::debug!("Sent logon message"),
413 Err(e) => tracing::error!("Error on logon: {e}"),
414 }
415 } else {
416 anyhow::bail!("Socket not connected".to_string());
417 }
418
419 let start = std::time::Instant::now();
420 while !self.is_logged_on() {
421 tokio::time::sleep(Duration::from_millis(100)).await;
422
423 if start.elapsed() > Duration::from_secs(10) {
424 anyhow::bail!("Logon timeout".to_string());
425 }
426 }
427
428 self.logged_on.store(true, Ordering::SeqCst);
429
430 Ok(())
431 }
432
433 async fn send_logout(&self, text: &str) -> anyhow::Result<()> {
435 if self.socket.is_none() {
436 anyhow::bail!("Socket not connected".to_string());
437 }
438
439 let seq_num = self.seq_num.fetch_add(1, Ordering::SeqCst);
440 let now = chrono::Utc::now();
441
442 let logout_msg = FixMessage::create_logout(
443 seq_num,
444 &self.sender_comp_id,
445 &self.target_comp_id,
446 Some(text),
447 &now,
448 );
449
450 if let Some(socket) = &self.socket {
451 match socket.send_bytes(logout_msg.to_bytes()).await {
452 Ok(_) => tracing::debug!("Sent logout message"),
453 Err(e) => tracing::error!("Error on logout: {e}"),
454 }
455 } else {
456 anyhow::bail!("Socket not connected".to_string());
457 }
458
459 Ok(())
460 }
461}
462
463pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
465 if let Err(e) = callback.call1(py, (py_obj,)) {
466 tracing::error!("Error calling Python: {e}");
467 }
468}