1use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures_util::StreamExt;
23use nautilus_common::{
24 clients::ExecutionClient,
25 live::get_runtime,
26 messages::execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
29 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
30 },
31};
32use nautilus_core::{
33 UnixNanos,
34 time::{AtomicTime, get_atomic_clock_realtime},
35};
36use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
37use nautilus_model::{
38 accounts::AccountAny,
39 enums::{AccountType, OmsType, OrderSide},
40 events::OrderEventAny,
41 identifiers::{AccountId, ClientId, Venue},
42 orders::{Order, OrderAny},
43 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
44 types::{AccountBalance, MarginBalance},
45};
46use tokio::task::JoinHandle;
47use tokio_util::sync::CancellationToken;
48
49use crate::{
50 common::consts::KRAKEN_VENUE,
51 config::KrakenExecClientConfig,
52 http::KrakenSpotHttpClient,
53 websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
54};
55
56const MUTEX_POISONED: &str = "mutex poisoned";
57
58#[allow(dead_code)]
62#[derive(Debug)]
63pub struct KrakenSpotExecutionClient {
64 core: ExecutionClientCore,
65 clock: &'static AtomicTime,
66 config: KrakenExecClientConfig,
67 emitter: ExecutionEventEmitter,
68 http: KrakenSpotHttpClient,
69 ws: KrakenSpotWebSocketClient,
70 cancellation_token: CancellationToken,
71 ws_stream_handle: Option<JoinHandle<()>>,
72 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
73}
74
75impl KrakenSpotExecutionClient {
76 pub fn new(core: ExecutionClientCore, config: KrakenExecClientConfig) -> anyhow::Result<Self> {
78 let clock = get_atomic_clock_realtime();
79 let emitter = ExecutionEventEmitter::new(
80 clock,
81 core.trader_id,
82 core.account_id,
83 AccountType::Margin,
84 None,
85 );
86
87 let cancellation_token = CancellationToken::new();
88
89 let http = KrakenSpotHttpClient::new(
90 config.environment,
91 config.base_url.clone(),
92 config.timeout_secs,
93 None,
94 None,
95 None,
96 config.http_proxy.clone(),
97 config.max_requests_per_second,
98 )?;
99
100 let data_config = crate::config::KrakenDataClientConfig {
101 api_key: Some(config.api_key.clone()),
102 api_secret: Some(config.api_secret.clone()),
103 product_type: config.product_type,
104 environment: config.environment,
105 base_url: config.base_url.clone(),
106 ws_public_url: None,
107 ws_private_url: Some(config.ws_url()),
108 http_proxy: config.http_proxy.clone(),
109 ws_proxy: config.ws_proxy.clone(),
110 timeout_secs: config.timeout_secs,
111 heartbeat_interval_secs: config.heartbeat_interval_secs,
112 max_requests_per_second: config.max_requests_per_second,
113 };
114 let ws = KrakenSpotWebSocketClient::new(data_config, cancellation_token.clone());
115
116 Ok(Self {
117 core,
118 clock,
119 config,
120 emitter,
121 http,
122 ws,
123 cancellation_token,
124 ws_stream_handle: None,
125 pending_tasks: Mutex::new(Vec::new()),
126 })
127 }
128
129 #[must_use]
131 pub fn clock(&self) -> &'static AtomicTime {
132 self.clock
133 }
134
135 #[must_use]
137 pub fn emitter(&self) -> &ExecutionEventEmitter {
138 &self.emitter
139 }
140
141 fn spawn_task<F>(&self, description: &'static str, fut: F)
142 where
143 F: Future<Output = anyhow::Result<()>> + Send + 'static,
144 {
145 let runtime = get_runtime();
146 let handle = runtime.spawn(async move {
147 if let Err(e) = fut.await {
148 log::warn!("{description} failed: {e:?}");
149 }
150 });
151
152 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
153 tasks.retain(|handle| !handle.is_finished());
154 tasks.push(handle);
155 }
156
157 fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) -> anyhow::Result<()> {
158 if order.is_closed() {
159 log::warn!(
160 "Cannot submit closed order: client_order_id={}",
161 order.client_order_id()
162 );
163 return Ok(());
164 }
165
166 let account_id = self.core.account_id;
167 let client_order_id = order.client_order_id();
168 let trader_id = order.trader_id();
169 let strategy_id = order.strategy_id();
170 let instrument_id = order.instrument_id();
171 let order_side = order.order_side();
172 let order_type = order.order_type();
173 let quantity = order.quantity();
174 let time_in_force = order.time_in_force();
175 let expire_time = order.expire_time();
176 let price = order.price();
177 let trigger_price = order.trigger_price();
178 let is_reduce_only = order.is_reduce_only();
179 let is_post_only = order.is_post_only();
180
181 log::debug!("OrderSubmitted: client_order_id={client_order_id}");
182 self.emitter.emit_order_submitted(order);
183
184 self.ws
185 .cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
186
187 let http = self.http.clone();
188 let emitter = self.emitter.clone();
189 let clock = self.clock;
190
191 self.spawn_task(task_name, async move {
192 let result = http
193 .submit_order(
194 account_id,
195 instrument_id,
196 client_order_id,
197 order_side,
198 order_type,
199 quantity,
200 time_in_force,
201 expire_time,
202 price,
203 trigger_price,
204 is_reduce_only,
205 is_post_only,
206 )
207 .await;
208
209 if let Err(e) = result {
210 let ts_event = clock.get_time_ns();
211 emitter.emit_order_rejected_event(
212 strategy_id,
213 instrument_id,
214 client_order_id,
215 &format!("{task_name} error: {e}"),
216 ts_event,
217 false,
218 );
219 return Err(e);
220 }
221
222 Ok(())
223 });
224
225 Ok(())
226 }
227
228 fn cancel_single_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
229 let account_id = self.core.account_id;
230 let client_order_id = cmd.client_order_id;
231 let venue_order_id = cmd.venue_order_id;
232 let strategy_id = cmd.strategy_id;
233 let instrument_id = cmd.instrument_id;
234
235 log::info!(
236 "Canceling order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
237 );
238
239 let http = self.http.clone();
240 let emitter = self.emitter.clone();
241 let clock = self.clock;
242
243 self.spawn_task("cancel_order", async move {
244 if let Err(e) = http
245 .cancel_order(
246 account_id,
247 instrument_id,
248 Some(client_order_id),
249 venue_order_id,
250 )
251 .await
252 {
253 log::error!("Cancel order failed: {e}");
254 let ts_event = clock.get_time_ns();
255 emitter.emit_order_cancel_rejected_event(
256 strategy_id,
257 instrument_id,
258 client_order_id,
259 venue_order_id,
260 &format!("cancel-order error: {e}"),
261 ts_event,
262 );
263 anyhow::bail!("Cancel order failed: {e}");
264 }
265 Ok(())
266 });
267
268 Ok(())
269 }
270
271 fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
272 let stream = self.ws.stream().map_err(|e| anyhow::anyhow!("{e}"))?;
273 let emitter = self.emitter.clone();
274 let cancellation_token = self.cancellation_token.clone();
275
276 let handle = get_runtime().spawn(async move {
277 tokio::pin!(stream);
278
279 loop {
280 tokio::select! {
281 () = cancellation_token.cancelled() => {
282 log::debug!("Spot execution message handler cancelled");
283 break;
284 }
285 msg = stream.next() => {
286 match msg {
287 Some(ws_msg) => {
288 Self::handle_ws_message(ws_msg, &emitter);
289 }
290 None => {
291 log::debug!("Spot execution WebSocket stream ended");
292 break;
293 }
294 }
295 }
296 }
297 }
298 });
299
300 self.ws_stream_handle = Some(handle);
301 Ok(())
302 }
303
304 fn modify_single_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
305 let client_order_id = cmd.client_order_id;
306 let venue_order_id = cmd.venue_order_id;
307 let strategy_id = cmd.strategy_id;
308 let instrument_id = cmd.instrument_id;
309 let quantity = cmd.quantity;
310 let price = cmd.price;
311
312 log::info!(
313 "Modifying order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
314 );
315
316 let http = self.http.clone();
317 let emitter = self.emitter.clone();
318 let clock = self.clock;
319
320 self.spawn_task("modify_order", async move {
321 if let Err(e) = http
322 .modify_order(
323 instrument_id,
324 Some(client_order_id),
325 venue_order_id,
326 quantity,
327 price,
328 None,
329 )
330 .await
331 {
332 log::error!("Modify order failed: {e}");
333 let ts_event = clock.get_time_ns();
334 emitter.emit_order_modify_rejected_event(
335 strategy_id,
336 instrument_id,
337 client_order_id,
338 venue_order_id,
339 &format!("modify-order error: {e}"),
340 ts_event,
341 );
342 anyhow::bail!("Modify order failed: {e}");
343 }
344 Ok(())
345 });
346
347 Ok(())
348 }
349
350 fn handle_ws_message(msg: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
351 match msg {
352 NautilusWsMessage::OrderRejected(event) => {
353 emitter.send_order_event(OrderEventAny::Rejected(event));
354 }
355 NautilusWsMessage::OrderAccepted(event) => {
356 emitter.send_order_event(OrderEventAny::Accepted(event));
357 }
358 NautilusWsMessage::OrderCanceled(event) => {
359 emitter.send_order_event(OrderEventAny::Canceled(event));
360 }
361 NautilusWsMessage::OrderExpired(event) => {
362 emitter.send_order_event(OrderEventAny::Expired(event));
363 }
364 NautilusWsMessage::OrderUpdated(event) => {
365 emitter.send_order_event(OrderEventAny::Updated(event));
366 }
367 NautilusWsMessage::OrderStatusReport(report) => {
368 emitter.send_order_status_report(*report);
369 }
370 NautilusWsMessage::FillReport(report) => {
371 emitter.send_fill_report(*report);
372 }
373 NautilusWsMessage::Reconnected => {
374 log::info!("Spot execution WebSocket reconnected");
375 }
376 NautilusWsMessage::Data(_) | NautilusWsMessage::Deltas(_) => {}
378 }
379 }
380}
381
382#[async_trait(?Send)]
383impl ExecutionClient for KrakenSpotExecutionClient {
384 fn is_connected(&self) -> bool {
385 self.core.is_connected()
386 }
387
388 fn client_id(&self) -> ClientId {
389 self.core.client_id
390 }
391
392 fn account_id(&self) -> AccountId {
393 self.core.account_id
394 }
395
396 fn venue(&self) -> Venue {
397 *KRAKEN_VENUE
398 }
399
400 fn oms_type(&self) -> OmsType {
401 self.core.oms_type
402 }
403
404 fn get_account(&self) -> Option<AccountAny> {
405 self.core.cache().account(&self.core.account_id).cloned()
406 }
407
408 fn generate_account_state(
409 &self,
410 balances: Vec<AccountBalance>,
411 margins: Vec<MarginBalance>,
412 reported: bool,
413 ts_event: UnixNanos,
414 ) -> anyhow::Result<()> {
415 self.emitter
416 .emit_account_state(balances, margins, reported, ts_event);
417 Ok(())
418 }
419
420 fn start(&mut self) -> anyhow::Result<()> {
421 if self.core.is_started() {
422 return Ok(());
423 }
424
425 self.core.set_started();
426
427 log::info!(
428 "Started: client_id={}, account_id={}, product_type=Spot, environment={:?}",
429 self.core.client_id,
430 self.core.account_id,
431 self.config.environment
432 );
433 Ok(())
434 }
435
436 fn stop(&mut self) -> anyhow::Result<()> {
437 if self.core.is_stopped() {
438 return Ok(());
439 }
440
441 self.cancellation_token.cancel();
442 self.core.set_stopped();
443 self.core.set_disconnected();
444 log::info!("Stopped: client_id={}", self.core.client_id);
445 Ok(())
446 }
447
448 async fn connect(&mut self) -> anyhow::Result<()> {
449 if self.core.is_connected() {
450 return Ok(());
451 }
452
453 self.ws
454 .connect()
455 .await
456 .context("Failed to connect spot WebSocket")?;
457 self.ws
458 .wait_until_active(10.0)
459 .await
460 .context("Spot WebSocket failed to become active")?;
461
462 self.ws
463 .authenticate()
464 .await
465 .context("Failed to authenticate spot WebSocket")?;
466
467 self.ws.set_account_id(self.core.account_id);
468
469 self.ws
470 .subscribe_executions(true, true)
471 .await
472 .context("Failed to subscribe to executions")?;
473
474 self.spawn_message_handler()?;
475
476 log::info!("Spot WebSocket authenticated and subscribed to executions");
477
478 self.core.set_connected();
479 log::info!("Connected: client_id={}", self.core.client_id);
480 Ok(())
481 }
482
483 async fn disconnect(&mut self) -> anyhow::Result<()> {
484 if self.core.is_disconnected() {
485 return Ok(());
486 }
487
488 self.cancellation_token.cancel();
489
490 if let Some(handle) = self.ws_stream_handle.take() {
491 handle.abort();
492 }
493
494 let _ = self.ws.close().await;
495
496 self.cancellation_token = CancellationToken::new();
497 self.core.set_disconnected();
498 log::info!("Disconnected: client_id={}", self.core.client_id);
499 Ok(())
500 }
501
502 async fn generate_order_status_report(
503 &self,
504 cmd: &GenerateOrderStatusReport,
505 ) -> anyhow::Result<Option<OrderStatusReport>> {
506 log::debug!(
507 "Generating order status report: venue_order_id={:?}, client_order_id={:?}",
508 cmd.venue_order_id,
509 cmd.client_order_id
510 );
511
512 let account_id = self.core.account_id;
513 let reports = self
514 .http
515 .request_order_status_reports(account_id, None, None, None, false)
516 .await?;
517
518 Ok(reports.into_iter().find(|r| {
520 cmd.venue_order_id
521 .is_some_and(|id| r.venue_order_id.as_str() == id.as_str())
522 || cmd
523 .client_order_id
524 .is_some_and(|id| r.client_order_id == Some(id))
525 }))
526 }
527
528 async fn generate_order_status_reports(
529 &self,
530 cmd: &GenerateOrderStatusReports,
531 ) -> anyhow::Result<Vec<OrderStatusReport>> {
532 log::debug!(
533 "Generating order status reports: instrument_id={:?}, open_only={}",
534 cmd.instrument_id,
535 cmd.open_only
536 );
537
538 let account_id = self.core.account_id;
539 self.http
540 .request_order_status_reports(account_id, cmd.instrument_id, None, None, cmd.open_only)
541 .await
542 }
543
544 async fn generate_fill_reports(
545 &self,
546 cmd: GenerateFillReports,
547 ) -> anyhow::Result<Vec<FillReport>> {
548 log::debug!(
549 "Generating fill reports: instrument_id={:?}",
550 cmd.instrument_id
551 );
552
553 let account_id = self.core.account_id;
554 self.http
555 .request_fill_reports(account_id, cmd.instrument_id, None, None)
556 .await
557 }
558
559 async fn generate_position_status_reports(
560 &self,
561 cmd: &GeneratePositionStatusReports,
562 ) -> anyhow::Result<Vec<PositionStatusReport>> {
563 log::debug!(
564 "Generating position status reports: instrument_id={:?}",
565 cmd.instrument_id
566 );
567
568 let account_id = self.core.account_id;
569 self.http
570 .request_position_status_reports(account_id, cmd.instrument_id)
571 .await
572 }
573
574 async fn generate_mass_status(
575 &self,
576 _lookback_mins: Option<u64>,
577 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
578 log::debug!("Generating mass status");
579
580 let account_id = self.core.account_id;
581 let order_reports = self
582 .http
583 .request_order_status_reports(account_id, None, None, None, true)
584 .await?;
585 let fill_reports = self
586 .http
587 .request_fill_reports(account_id, None, None, None)
588 .await?;
589 let position_reports = self
590 .http
591 .request_position_status_reports(account_id, None)
592 .await?;
593
594 let mut mass_status = ExecutionMassStatus::new(
595 self.core.client_id,
596 self.core.account_id,
597 *KRAKEN_VENUE,
598 self.clock.get_time_ns(),
599 None,
600 );
601 mass_status.add_order_reports(order_reports);
602 mass_status.add_fill_reports(fill_reports);
603 mass_status.add_position_reports(position_reports);
604
605 Ok(Some(mass_status))
606 }
607
608 fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
609 log::debug!("Querying account: {cmd:?}");
610
611 let account_id = self.core.account_id;
612 let http = self.http.clone();
613 let emitter = self.emitter.clone();
614
615 self.spawn_task("query_account", async move {
616 let account_state = http.request_account_state(account_id).await?;
617 emitter.emit_account_state(
618 account_state.balances.clone(),
619 account_state.margins.clone(),
620 account_state.is_reported,
621 account_state.ts_event,
622 );
623 Ok(())
624 });
625
626 Ok(())
627 }
628
629 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
630 log::debug!("Querying order: {cmd:?}");
631
632 let venue_order_id = cmd
633 .venue_order_id
634 .context("venue_order_id required for query_order")?;
635 let account_id = self.core.account_id;
636 let http = self.http.clone();
637 let emitter = self.emitter.clone();
638
639 self.spawn_task("query_order", async move {
640 let reports = http
641 .request_order_status_reports(account_id, None, None, None, true)
642 .await
643 .context("Failed to query order")?;
644
645 if let Some(report) = reports
646 .into_iter()
647 .find(|r| r.venue_order_id == venue_order_id)
648 {
649 emitter.send_order_status_report(report);
650 }
651 Ok(())
652 });
653
654 Ok(())
655 }
656
657 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
658 let order = self
659 .core
660 .cache()
661 .order(&cmd.client_order_id)
662 .cloned()
663 .ok_or_else(|| anyhow::anyhow!("Order not found in cache: {}", cmd.client_order_id))?;
664 self.submit_single_order(&order, "submit_order")
665 }
666
667 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
668 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
669
670 log::info!(
671 "Submitting order list: order_list_id={}, count={}",
672 cmd.order_list.id,
673 orders.len()
674 );
675
676 for order in &orders {
677 self.submit_single_order(order, "submit_order_list")?;
678 }
679
680 Ok(())
681 }
682
683 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
684 self.modify_single_order(cmd)
685 }
686
687 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
688 self.cancel_single_order(cmd)
689 }
690
691 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
692 let instrument_id = cmd.instrument_id;
693
694 if cmd.order_side == OrderSide::NoOrderSide {
695 log::info!("Canceling all orders: instrument_id={instrument_id} (bulk)");
696
697 let http = self.http.clone();
698
699 self.spawn_task("cancel_all_orders", async move {
700 if let Err(e) = http.inner.cancel_all_orders().await {
701 log::error!("Cancel all orders failed: {e}");
702 anyhow::bail!("Cancel all orders failed: {e}");
703 }
704 Ok(())
705 });
706
707 return Ok(());
708 }
709
710 log::info!(
711 "Canceling all orders: instrument_id={instrument_id}, side={:?}",
712 cmd.order_side
713 );
714
715 let orders_to_cancel: Vec<_> = {
716 let cache = self.core.cache();
717 let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
718
719 open_orders
720 .into_iter()
721 .filter(|order| order.order_side() == cmd.order_side)
722 .filter_map(|order| {
723 Some((
724 order.venue_order_id()?,
725 order.client_order_id(),
726 order.instrument_id(),
727 order.strategy_id(),
728 ))
729 })
730 .collect()
731 };
732
733 let account_id = self.core.account_id;
734
735 for (venue_order_id, client_order_id, order_instrument_id, strategy_id) in orders_to_cancel
736 {
737 let http = self.http.clone();
738 let emitter = self.emitter.clone();
739 let clock = self.clock;
740
741 self.spawn_task("cancel_order_by_side", async move {
742 if let Err(e) = http
743 .cancel_order(
744 account_id,
745 order_instrument_id,
746 Some(client_order_id),
747 Some(venue_order_id),
748 )
749 .await
750 {
751 log::error!("Cancel order failed: {e}");
752 let ts_event = clock.get_time_ns();
753 emitter.emit_order_cancel_rejected_event(
754 strategy_id,
755 order_instrument_id,
756 client_order_id,
757 Some(venue_order_id),
758 &format!("cancel-order error: {e}"),
759 ts_event,
760 );
761 }
762 Ok(())
763 });
764 }
765
766 Ok(())
767 }
768
769 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
770 log::info!(
771 "Batch canceling orders: instrument_id={}, count={}",
772 cmd.instrument_id,
773 cmd.cancels.len()
774 );
775
776 for cancel in &cmd.cancels {
777 self.cancel_single_order(cancel)?;
778 }
779
780 Ok(())
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use std::{cell::RefCell, rc::Rc};
787
788 use nautilus_common::cache::Cache;
789 use nautilus_model::{
790 enums::AccountType,
791 identifiers::{AccountId, ClientId, TraderId},
792 };
793 use rstest::rstest;
794
795 use super::*;
796 use crate::{common::enums::KrakenProductType, config::KrakenExecClientConfig};
797
798 fn create_test_core() -> ExecutionClientCore {
799 let cache = Rc::new(RefCell::new(Cache::default()));
800 ExecutionClientCore::new(
801 TraderId::from("TESTER-001"),
802 ClientId::from("KRAKEN"),
803 *KRAKEN_VENUE,
804 OmsType::Hedging,
805 AccountId::from("KRAKEN-001"),
806 AccountType::Margin,
807 None,
808 cache,
809 )
810 }
811
812 #[rstest]
813 fn test_spot_exec_client_new() {
814 let config = KrakenExecClientConfig {
815 product_type: KrakenProductType::Spot,
816 api_key: "test_key".to_string(),
817 api_secret: "test_secret".to_string(),
818 ..Default::default()
819 };
820
821 let client = KrakenSpotExecutionClient::new(create_test_core(), config);
822 assert!(client.is_ok());
823
824 let client = client.unwrap();
825 assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
826 assert_eq!(client.account_id(), AccountId::from("KRAKEN-001"));
827 assert_eq!(client.venue(), *KRAKEN_VENUE);
828 assert!(!client.is_connected());
829 }
830
831 #[rstest]
832 fn test_spot_exec_client_start_stop() {
833 let config = KrakenExecClientConfig {
834 product_type: KrakenProductType::Spot,
835 api_key: "test_key".to_string(),
836 api_secret: "test_secret".to_string(),
837 ..Default::default()
838 };
839
840 let mut client = KrakenSpotExecutionClient::new(create_test_core(), config).unwrap();
841
842 assert!(client.start().is_ok());
843 assert!(client.stop().is_ok());
844 assert!(!client.is_connected());
845 }
846}