1use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use nautilus_common::{
23 clients::ExecutionClient,
24 live::get_runtime,
25 messages::execution::{
26 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
27 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
28 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
29 },
30};
31use nautilus_core::{
32 UnixNanos,
33 time::{AtomicTime, get_atomic_clock_realtime},
34};
35use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
36use nautilus_model::{
37 accounts::AccountAny,
38 enums::{AccountType, OmsType, OrderSide},
39 events::OrderEventAny,
40 identifiers::{AccountId, ClientId, Venue},
41 orders::{Order, OrderAny},
42 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
43 types::{AccountBalance, MarginBalance},
44};
45use tokio::task::JoinHandle;
46use tokio_util::sync::CancellationToken;
47
48use crate::{
49 common::{consts::KRAKEN_VENUE, credential::KrakenCredential},
50 config::KrakenExecClientConfig,
51 http::KrakenFuturesHttpClient,
52 websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
53};
54
55const MUTEX_POISONED: &str = "mutex poisoned";
56
57#[allow(dead_code)]
62#[derive(Debug)]
63pub struct KrakenFuturesExecutionClient {
64 core: ExecutionClientCore,
65 clock: &'static AtomicTime,
66 config: KrakenExecClientConfig,
67 emitter: ExecutionEventEmitter,
68 http: KrakenFuturesHttpClient,
69 ws: KrakenFuturesWebSocketClient,
70 cancellation_token: CancellationToken,
71 ws_stream_handle: Option<JoinHandle<()>>,
72 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
73}
74
75impl KrakenFuturesExecutionClient {
76 pub fn new(core: ExecutionClientCore, config: KrakenExecClientConfig) -> anyhow::Result<Self> {
78 let clock = get_atomic_clock_realtime();
79 let emitter = ExecutionEventEmitter::new(
80 clock,
81 core.trader_id,
82 core.account_id,
83 AccountType::Margin,
84 None,
85 );
86
87 let cancellation_token = CancellationToken::new();
88
89 let http = KrakenFuturesHttpClient::new(
90 config.environment,
91 config.base_url.clone(),
92 config.timeout_secs,
93 None,
94 None,
95 None,
96 config.http_proxy.clone(),
97 config.max_requests_per_second,
98 )?;
99
100 let credential = KrakenCredential::new(config.api_key.clone(), config.api_secret.clone());
101 let ws = KrakenFuturesWebSocketClient::with_credentials(
102 config.ws_url(),
103 config.heartbeat_interval_secs,
104 Some(credential),
105 );
106
107 Ok(Self {
108 core,
109 clock,
110 config,
111 emitter,
112 http,
113 ws,
114 cancellation_token,
115 ws_stream_handle: None,
116 pending_tasks: Mutex::new(Vec::new()),
117 })
118 }
119
120 #[must_use]
122 pub fn clock(&self) -> &'static AtomicTime {
123 self.clock
124 }
125
126 #[must_use]
128 pub fn emitter(&self) -> &ExecutionEventEmitter {
129 &self.emitter
130 }
131
132 fn spawn_task<F>(&self, description: &'static str, fut: F)
133 where
134 F: Future<Output = anyhow::Result<()>> + Send + 'static,
135 {
136 let runtime = get_runtime();
137 let handle = runtime.spawn(async move {
138 if let Err(e) = fut.await {
139 log::warn!("{description} failed: {e:?}");
140 }
141 });
142
143 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
144 tasks.retain(|handle| !handle.is_finished());
145 tasks.push(handle);
146 }
147
148 fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) -> anyhow::Result<()> {
149 if order.is_closed() {
150 log::warn!(
151 "Cannot submit closed order: client_order_id={}",
152 order.client_order_id()
153 );
154 return Ok(());
155 }
156
157 let account_id = self.core.account_id;
158 let client_order_id = order.client_order_id();
159 let trader_id = order.trader_id();
160 let strategy_id = order.strategy_id();
161 let instrument_id = order.instrument_id();
162 let order_side = order.order_side();
163 let order_type = order.order_type();
164 let quantity = order.quantity();
165 let time_in_force = order.time_in_force();
166 let price = order.price();
167 let trigger_price = order.trigger_price();
168 let is_reduce_only = order.is_reduce_only();
169 let is_post_only = order.is_post_only();
170
171 log::debug!("OrderSubmitted: client_order_id={client_order_id}");
172 self.emitter.emit_order_submitted(order);
173
174 self.ws
175 .cache_client_order(client_order_id, None, instrument_id, trader_id, strategy_id);
176
177 let http = self.http.clone();
178 let ws = self.ws.clone();
179 let emitter = self.emitter.clone();
180 let clock = self.clock;
181
182 self.spawn_task(task_name, async move {
183 let result = http
184 .submit_order(
185 account_id,
186 instrument_id,
187 client_order_id,
188 order_side,
189 order_type,
190 quantity,
191 time_in_force,
192 price,
193 trigger_price,
194 is_reduce_only,
195 is_post_only,
196 )
197 .await;
198
199 match result {
200 Ok(report) => {
201 ws.cache_client_order(
204 client_order_id,
205 Some(report.venue_order_id),
206 instrument_id,
207 trader_id,
208 strategy_id,
209 );
210 Ok(())
211 }
212 Err(e) => {
213 let ts_event = clock.get_time_ns();
214 emitter.emit_order_rejected_event(
215 strategy_id,
216 instrument_id,
217 client_order_id,
218 &format!("{task_name} error: {e}"),
219 ts_event,
220 false,
221 );
222 Err(e)
223 }
224 }
225 });
226
227 Ok(())
228 }
229
230 fn cancel_single_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
231 let account_id = self.core.account_id;
232 let client_order_id = cmd.client_order_id;
233 let venue_order_id = cmd.venue_order_id;
234 let strategy_id = cmd.strategy_id;
235 let instrument_id = cmd.instrument_id;
236
237 log::info!(
238 "Canceling order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
239 );
240
241 let http = self.http.clone();
242 let emitter = self.emitter.clone();
243 let clock = self.clock;
244
245 self.spawn_task("cancel_order", async move {
246 if let Err(e) = http
247 .cancel_order(
248 account_id,
249 instrument_id,
250 Some(client_order_id),
251 venue_order_id,
252 )
253 .await
254 {
255 log::error!("Cancel order failed: {e}");
256 let ts_event = clock.get_time_ns();
257 emitter.emit_order_cancel_rejected_event(
258 strategy_id,
259 instrument_id,
260 client_order_id,
261 venue_order_id,
262 &format!("cancel-order error: {e}"),
263 ts_event,
264 );
265 anyhow::bail!("Cancel order failed: {e}");
266 }
267 Ok(())
268 });
269
270 Ok(())
271 }
272
273 fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
274 let mut rx = self
275 .ws
276 .take_output_rx()
277 .context("Failed to take futures WebSocket output receiver")?;
278 let emitter = self.emitter.clone();
279 let cancellation_token = self.cancellation_token.clone();
280
281 let handle = get_runtime().spawn(async move {
282 loop {
283 tokio::select! {
284 () = cancellation_token.cancelled() => {
285 log::debug!("Futures execution message handler cancelled");
286 break;
287 }
288 msg = rx.recv() => {
289 match msg {
290 Some(ws_msg) => {
291 Self::handle_ws_message(ws_msg, &emitter);
292 }
293 None => {
294 log::debug!("Futures execution WebSocket stream ended");
295 break;
296 }
297 }
298 }
299 }
300 }
301 });
302
303 self.ws_stream_handle = Some(handle);
304 Ok(())
305 }
306
307 fn handle_ws_message(msg: KrakenFuturesWsMessage, emitter: &ExecutionEventEmitter) {
308 match msg {
309 KrakenFuturesWsMessage::OrderAccepted(event) => {
310 emitter.send_order_event(OrderEventAny::Accepted(event));
311 }
312 KrakenFuturesWsMessage::OrderCanceled(event) => {
313 emitter.send_order_event(OrderEventAny::Canceled(event));
314 }
315 KrakenFuturesWsMessage::OrderExpired(event) => {
316 emitter.send_order_event(OrderEventAny::Expired(event));
317 }
318 KrakenFuturesWsMessage::OrderUpdated(event) => {
319 emitter.send_order_event(OrderEventAny::Updated(event));
320 }
321 KrakenFuturesWsMessage::OrderStatusReport(report) => {
322 emitter.send_order_status_report(*report);
323 }
324 KrakenFuturesWsMessage::FillReport(report) => {
325 emitter.send_fill_report(*report);
326 }
327 KrakenFuturesWsMessage::Reconnected => {
328 log::info!("Futures execution WebSocket reconnected");
329 }
330 KrakenFuturesWsMessage::BookDeltas(_)
332 | KrakenFuturesWsMessage::Quote(_)
333 | KrakenFuturesWsMessage::Trade(_)
334 | KrakenFuturesWsMessage::MarkPrice(_)
335 | KrakenFuturesWsMessage::IndexPrice(_)
336 | KrakenFuturesWsMessage::FundingRate(_) => {}
337 }
338 }
339
340 fn modify_single_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
341 let client_order_id = cmd.client_order_id;
342 let venue_order_id = cmd.venue_order_id;
343 let strategy_id = cmd.strategy_id;
344 let instrument_id = cmd.instrument_id;
345 let quantity = cmd.quantity;
346 let price = cmd.price;
347
348 log::info!(
349 "Modifying order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
350 );
351
352 let http = self.http.clone();
353 let emitter = self.emitter.clone();
354 let clock = self.clock;
355
356 self.spawn_task("modify_order", async move {
357 if let Err(e) = http
358 .modify_order(
359 instrument_id,
360 Some(client_order_id),
361 venue_order_id,
362 quantity,
363 price,
364 None,
365 )
366 .await
367 {
368 log::error!("Modify order failed: {e}");
369 let ts_event = clock.get_time_ns();
370 emitter.emit_order_modify_rejected_event(
371 strategy_id,
372 instrument_id,
373 client_order_id,
374 venue_order_id,
375 &format!("modify-order error: {e}"),
376 ts_event,
377 );
378 anyhow::bail!("Modify order failed: {e}");
379 }
380 Ok(())
381 });
382
383 Ok(())
384 }
385}
386
387#[async_trait(?Send)]
388impl ExecutionClient for KrakenFuturesExecutionClient {
389 fn is_connected(&self) -> bool {
390 self.core.is_connected()
391 }
392
393 fn client_id(&self) -> ClientId {
394 self.core.client_id
395 }
396
397 fn account_id(&self) -> AccountId {
398 self.core.account_id
399 }
400
401 fn venue(&self) -> Venue {
402 *KRAKEN_VENUE
403 }
404
405 fn oms_type(&self) -> OmsType {
406 self.core.oms_type
407 }
408
409 fn get_account(&self) -> Option<AccountAny> {
410 self.core.cache().account(&self.core.account_id).cloned()
411 }
412
413 fn generate_account_state(
414 &self,
415 balances: Vec<AccountBalance>,
416 margins: Vec<MarginBalance>,
417 reported: bool,
418 ts_event: UnixNanos,
419 ) -> anyhow::Result<()> {
420 self.emitter
421 .emit_account_state(balances, margins, reported, ts_event);
422 Ok(())
423 }
424
425 fn start(&mut self) -> anyhow::Result<()> {
426 if self.core.is_started() {
427 return Ok(());
428 }
429
430 self.core.set_started();
431
432 log::info!(
433 "Started: client_id={}, account_id={}, product_type=Futures, environment={:?}",
434 self.core.client_id,
435 self.core.account_id,
436 self.config.environment
437 );
438 Ok(())
439 }
440
441 fn stop(&mut self) -> anyhow::Result<()> {
442 if self.core.is_stopped() {
443 return Ok(());
444 }
445
446 self.cancellation_token.cancel();
447 self.core.set_stopped();
448 self.core.set_disconnected();
449 log::info!("Stopped: client_id={}", self.core.client_id);
450 Ok(())
451 }
452
453 async fn connect(&mut self) -> anyhow::Result<()> {
454 if self.core.is_connected() {
455 return Ok(());
456 }
457
458 self.ws
459 .connect()
460 .await
461 .context("Failed to connect futures WebSocket")?;
462 self.ws
463 .wait_until_active(10.0)
464 .await
465 .context("Futures WebSocket failed to become active")?;
466
467 self.ws
468 .authenticate()
469 .await
470 .context("Failed to authenticate futures WebSocket")?;
471
472 self.ws.set_account_id(self.core.account_id);
473
474 self.ws
475 .subscribe_executions()
476 .await
477 .context("Failed to subscribe to executions")?;
478
479 self.spawn_message_handler()?;
480
481 log::info!("Futures WebSocket authenticated and subscribed to executions");
482
483 self.core.set_connected();
484 log::info!("Connected: client_id={}", self.core.client_id);
485 Ok(())
486 }
487
488 async fn disconnect(&mut self) -> anyhow::Result<()> {
489 if self.core.is_disconnected() {
490 return Ok(());
491 }
492
493 self.cancellation_token.cancel();
494
495 if let Some(handle) = self.ws_stream_handle.take() {
496 handle.abort();
497 }
498
499 let _ = self.ws.close().await;
500
501 self.cancellation_token = CancellationToken::new();
502 self.core.set_disconnected();
503 log::info!("Disconnected: client_id={}", self.core.client_id);
504 Ok(())
505 }
506
507 async fn generate_order_status_report(
508 &self,
509 cmd: &GenerateOrderStatusReport,
510 ) -> anyhow::Result<Option<OrderStatusReport>> {
511 log::debug!(
512 "Generating order status report: venue_order_id={:?}, client_order_id={:?}",
513 cmd.venue_order_id,
514 cmd.client_order_id
515 );
516
517 let account_id = self.core.account_id;
518 let reports = self
519 .http
520 .request_order_status_reports(account_id, None, None, None, false)
521 .await?;
522
523 Ok(reports.into_iter().find(|r| {
525 cmd.venue_order_id
526 .is_some_and(|id| r.venue_order_id.as_str() == id.as_str())
527 || cmd
528 .client_order_id
529 .is_some_and(|id| r.client_order_id == Some(id))
530 }))
531 }
532
533 async fn generate_order_status_reports(
534 &self,
535 cmd: &GenerateOrderStatusReports,
536 ) -> anyhow::Result<Vec<OrderStatusReport>> {
537 log::debug!(
538 "Generating order status reports: instrument_id={:?}, open_only={}",
539 cmd.instrument_id,
540 cmd.open_only
541 );
542
543 let account_id = self.core.account_id;
544 self.http
545 .request_order_status_reports(account_id, cmd.instrument_id, None, None, cmd.open_only)
546 .await
547 }
548
549 async fn generate_fill_reports(
550 &self,
551 cmd: GenerateFillReports,
552 ) -> anyhow::Result<Vec<FillReport>> {
553 log::debug!(
554 "Generating fill reports: instrument_id={:?}",
555 cmd.instrument_id
556 );
557
558 let account_id = self.core.account_id;
559 self.http
560 .request_fill_reports(account_id, cmd.instrument_id, None, None)
561 .await
562 }
563
564 async fn generate_position_status_reports(
565 &self,
566 cmd: &GeneratePositionStatusReports,
567 ) -> anyhow::Result<Vec<PositionStatusReport>> {
568 log::debug!(
569 "Generating position status reports: instrument_id={:?}",
570 cmd.instrument_id
571 );
572
573 let account_id = self.core.account_id;
574 self.http
575 .request_position_status_reports(account_id, cmd.instrument_id)
576 .await
577 }
578
579 async fn generate_mass_status(
580 &self,
581 _lookback_mins: Option<u64>,
582 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
583 log::debug!("Generating mass status");
584
585 let account_id = self.core.account_id;
586 let order_reports = self
587 .http
588 .request_order_status_reports(account_id, None, None, None, true)
589 .await?;
590 let fill_reports = self
591 .http
592 .request_fill_reports(account_id, None, None, None)
593 .await?;
594 let position_reports = self
595 .http
596 .request_position_status_reports(account_id, None)
597 .await?;
598
599 let mut mass_status = ExecutionMassStatus::new(
600 self.core.client_id,
601 self.core.account_id,
602 *KRAKEN_VENUE,
603 self.clock.get_time_ns(),
604 None,
605 );
606 mass_status.add_order_reports(order_reports);
607 mass_status.add_fill_reports(fill_reports);
608 mass_status.add_position_reports(position_reports);
609
610 Ok(Some(mass_status))
611 }
612
613 fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
614 log::debug!("Querying account: {cmd:?}");
615
616 let account_id = self.core.account_id;
617 let http = self.http.clone();
618 let emitter = self.emitter.clone();
619
620 self.spawn_task("query_account", async move {
621 let account_state = http.request_account_state(account_id).await?;
622 emitter.emit_account_state(
623 account_state.balances.clone(),
624 account_state.margins.clone(),
625 account_state.is_reported,
626 account_state.ts_event,
627 );
628 Ok(())
629 });
630
631 Ok(())
632 }
633
634 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
635 log::debug!("Querying order: {cmd:?}");
636
637 let venue_order_id = cmd
638 .venue_order_id
639 .context("venue_order_id required for query_order")?;
640 let account_id = self.core.account_id;
641 let http = self.http.clone();
642 let emitter = self.emitter.clone();
643
644 self.spawn_task("query_order", async move {
645 let reports = http
646 .request_order_status_reports(account_id, None, None, None, true)
647 .await
648 .context("Failed to query order")?;
649
650 if let Some(report) = reports
651 .into_iter()
652 .find(|r| r.venue_order_id == venue_order_id)
653 {
654 emitter.send_order_status_report(report);
655 }
656 Ok(())
657 });
658
659 Ok(())
660 }
661
662 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
663 let order = self
664 .core
665 .cache()
666 .order(&cmd.client_order_id)
667 .cloned()
668 .ok_or_else(|| anyhow::anyhow!("Order not found in cache: {}", cmd.client_order_id))?;
669 self.submit_single_order(&order, "submit_order")
670 }
671
672 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
673 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
674
675 log::info!(
676 "Submitting order list: order_list_id={}, count={}",
677 cmd.order_list.id,
678 orders.len()
679 );
680
681 for order in &orders {
682 self.submit_single_order(order, "submit_order_list")?;
683 }
684
685 Ok(())
686 }
687
688 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
689 self.modify_single_order(cmd)
690 }
691
692 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
693 self.cancel_single_order(cmd)
694 }
695
696 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
697 let instrument_id = cmd.instrument_id;
698
699 if cmd.order_side == OrderSide::NoOrderSide {
700 log::info!("Canceling all orders: instrument_id={instrument_id} (bulk)");
701
702 let http = self.http.clone();
703 let symbol = instrument_id.symbol.to_string();
704
705 self.spawn_task("cancel_all_orders", async move {
706 if let Err(e) = http.inner.cancel_all_orders(Some(symbol)).await {
707 log::error!("Cancel all orders failed: {e}");
708 anyhow::bail!("Cancel all orders failed: {e}");
709 }
710 Ok(())
711 });
712
713 return Ok(());
714 }
715
716 log::info!(
717 "Canceling all orders: instrument_id={instrument_id}, side={:?}",
718 cmd.order_side
719 );
720
721 let orders_to_cancel: Vec<_> = {
722 let cache = self.core.cache();
723 let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
724
725 open_orders
726 .into_iter()
727 .filter(|order| order.order_side() == cmd.order_side)
728 .filter_map(|order| {
729 Some((
730 order.venue_order_id()?,
731 order.client_order_id(),
732 order.instrument_id(),
733 order.strategy_id(),
734 ))
735 })
736 .collect()
737 };
738
739 let account_id = self.core.account_id;
740
741 for (venue_order_id, client_order_id, order_instrument_id, strategy_id) in orders_to_cancel
742 {
743 let http = self.http.clone();
744 let emitter = self.emitter.clone();
745 let clock = self.clock;
746
747 self.spawn_task("cancel_order_by_side", async move {
748 if let Err(e) = http
749 .cancel_order(
750 account_id,
751 order_instrument_id,
752 Some(client_order_id),
753 Some(venue_order_id),
754 )
755 .await
756 {
757 log::error!("Cancel order failed: {e}");
758 let ts_event = clock.get_time_ns();
759 emitter.emit_order_cancel_rejected_event(
760 strategy_id,
761 order_instrument_id,
762 client_order_id,
763 Some(venue_order_id),
764 &format!("cancel-order error: {e}"),
765 ts_event,
766 );
767 }
768 Ok(())
769 });
770 }
771
772 Ok(())
773 }
774
775 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
776 log::info!(
777 "Batch canceling orders: instrument_id={}, count={}",
778 cmd.instrument_id,
779 cmd.cancels.len()
780 );
781
782 for cancel in &cmd.cancels {
783 self.cancel_single_order(cancel)?;
784 }
785
786 Ok(())
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use std::{cell::RefCell, rc::Rc};
793
794 use nautilus_common::cache::Cache;
795 use nautilus_model::{
796 enums::AccountType,
797 identifiers::{AccountId, ClientId, TraderId},
798 };
799 use rstest::rstest;
800
801 use super::*;
802 use crate::{common::enums::KrakenProductType, config::KrakenExecClientConfig};
803
804 fn create_test_core() -> ExecutionClientCore {
805 let cache = Rc::new(RefCell::new(Cache::default()));
806 ExecutionClientCore::new(
807 TraderId::from("TESTER-001"),
808 ClientId::from("KRAKEN"),
809 *KRAKEN_VENUE,
810 OmsType::Netting,
811 AccountId::from("KRAKEN-001"),
812 AccountType::Margin,
813 None,
814 cache,
815 )
816 }
817
818 #[rstest]
819 fn test_futures_exec_client_new() {
820 let config = KrakenExecClientConfig {
821 product_type: KrakenProductType::Futures,
822 api_key: "test_key".to_string(),
823 api_secret: "test_secret".to_string(),
824 ..Default::default()
825 };
826
827 let client = KrakenFuturesExecutionClient::new(create_test_core(), config);
828 assert!(client.is_ok());
829
830 let client = client.unwrap();
831 assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
832 assert_eq!(client.account_id(), AccountId::from("KRAKEN-001"));
833 assert_eq!(client.venue(), *KRAKEN_VENUE);
834 assert!(!client.is_connected());
835 }
836
837 #[rstest]
838 fn test_futures_exec_client_start_stop() {
839 let config = KrakenExecClientConfig {
840 product_type: KrakenProductType::Futures,
841 api_key: "test_key".to_string(),
842 api_secret: "test_secret".to_string(),
843 ..Default::default()
844 };
845
846 let mut client = KrakenFuturesExecutionClient::new(create_test_core(), config).unwrap();
847
848 assert!(client.start().is_ok());
849 assert!(client.stop().is_ok());
850 assert!(!client.is_connected());
851 }
852}