Skip to main content

nautilus_kraken/execution/
spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kraken Spot execution client implementation.
17
18use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures_util::StreamExt;
23use nautilus_common::{
24    clients::ExecutionClient,
25    live::get_runtime,
26    messages::execution::{
27        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
29        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
30    },
31};
32use nautilus_core::{
33    UnixNanos,
34    time::{AtomicTime, get_atomic_clock_realtime},
35};
36use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
37use nautilus_model::{
38    accounts::AccountAny,
39    enums::{AccountType, OmsType, OrderSide},
40    events::OrderEventAny,
41    identifiers::{AccountId, ClientId, Venue},
42    orders::{Order, OrderAny},
43    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
44    types::{AccountBalance, MarginBalance},
45};
46use tokio::task::JoinHandle;
47use tokio_util::sync::CancellationToken;
48
49use crate::{
50    common::consts::KRAKEN_VENUE,
51    config::KrakenExecClientConfig,
52    http::KrakenSpotHttpClient,
53    websocket::spot_v2::{client::KrakenSpotWebSocketClient, messages::NautilusWsMessage},
54};
55
56const MUTEX_POISONED: &str = "mutex poisoned";
57
58/// Kraken Spot execution client.
59///
60/// Provides order management and account operations for Kraken Spot markets.
61#[allow(dead_code)]
62#[derive(Debug)]
63pub struct KrakenSpotExecutionClient {
64    core: ExecutionClientCore,
65    clock: &'static AtomicTime,
66    config: KrakenExecClientConfig,
67    emitter: ExecutionEventEmitter,
68    http: KrakenSpotHttpClient,
69    ws: KrakenSpotWebSocketClient,
70    cancellation_token: CancellationToken,
71    ws_stream_handle: Option<JoinHandle<()>>,
72    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
73}
74
75impl KrakenSpotExecutionClient {
76    /// Creates a new [`KrakenSpotExecutionClient`].
77    pub fn new(core: ExecutionClientCore, config: KrakenExecClientConfig) -> anyhow::Result<Self> {
78        let clock = get_atomic_clock_realtime();
79        let emitter = ExecutionEventEmitter::new(
80            clock,
81            core.trader_id,
82            core.account_id,
83            AccountType::Margin,
84            None,
85        );
86
87        let cancellation_token = CancellationToken::new();
88
89        let http = KrakenSpotHttpClient::new(
90            config.environment,
91            config.base_url.clone(),
92            config.timeout_secs,
93            None,
94            None,
95            None,
96            config.http_proxy.clone(),
97            config.max_requests_per_second,
98        )?;
99
100        let data_config = crate::config::KrakenDataClientConfig {
101            api_key: Some(config.api_key.clone()),
102            api_secret: Some(config.api_secret.clone()),
103            product_type: config.product_type,
104            environment: config.environment,
105            base_url: config.base_url.clone(),
106            ws_public_url: None,
107            ws_private_url: Some(config.ws_url()),
108            http_proxy: config.http_proxy.clone(),
109            ws_proxy: config.ws_proxy.clone(),
110            timeout_secs: config.timeout_secs,
111            heartbeat_interval_secs: config.heartbeat_interval_secs,
112            max_requests_per_second: config.max_requests_per_second,
113        };
114        let ws = KrakenSpotWebSocketClient::new(data_config, cancellation_token.clone());
115
116        Ok(Self {
117            core,
118            clock,
119            config,
120            emitter,
121            http,
122            ws,
123            cancellation_token,
124            ws_stream_handle: None,
125            pending_tasks: Mutex::new(Vec::new()),
126        })
127    }
128
129    /// Returns a reference to the clock.
130    #[must_use]
131    pub fn clock(&self) -> &'static AtomicTime {
132        self.clock
133    }
134
135    /// Returns a reference to the event emitter.
136    #[must_use]
137    pub fn emitter(&self) -> &ExecutionEventEmitter {
138        &self.emitter
139    }
140
141    fn spawn_task<F>(&self, description: &'static str, fut: F)
142    where
143        F: Future<Output = anyhow::Result<()>> + Send + 'static,
144    {
145        let runtime = get_runtime();
146        let handle = runtime.spawn(async move {
147            if let Err(e) = fut.await {
148                log::warn!("{description} failed: {e:?}");
149            }
150        });
151
152        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
153        tasks.retain(|handle| !handle.is_finished());
154        tasks.push(handle);
155    }
156
157    fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) -> anyhow::Result<()> {
158        if order.is_closed() {
159            log::warn!(
160                "Cannot submit closed order: client_order_id={}",
161                order.client_order_id()
162            );
163            return Ok(());
164        }
165
166        let account_id = self.core.account_id;
167        let client_order_id = order.client_order_id();
168        let trader_id = order.trader_id();
169        let strategy_id = order.strategy_id();
170        let instrument_id = order.instrument_id();
171        let order_side = order.order_side();
172        let order_type = order.order_type();
173        let quantity = order.quantity();
174        let time_in_force = order.time_in_force();
175        let expire_time = order.expire_time();
176        let price = order.price();
177        let trigger_price = order.trigger_price();
178        let is_reduce_only = order.is_reduce_only();
179        let is_post_only = order.is_post_only();
180
181        log::debug!("OrderSubmitted: client_order_id={client_order_id}");
182        self.emitter.emit_order_submitted(order);
183
184        self.ws
185            .cache_client_order(client_order_id, instrument_id, trader_id, strategy_id);
186
187        let http = self.http.clone();
188        let emitter = self.emitter.clone();
189        let clock = self.clock;
190
191        self.spawn_task(task_name, async move {
192            let result = http
193                .submit_order(
194                    account_id,
195                    instrument_id,
196                    client_order_id,
197                    order_side,
198                    order_type,
199                    quantity,
200                    time_in_force,
201                    expire_time,
202                    price,
203                    trigger_price,
204                    is_reduce_only,
205                    is_post_only,
206                )
207                .await;
208
209            if let Err(e) = result {
210                let ts_event = clock.get_time_ns();
211                emitter.emit_order_rejected_event(
212                    strategy_id,
213                    instrument_id,
214                    client_order_id,
215                    &format!("{task_name} error: {e}"),
216                    ts_event,
217                    false,
218                );
219                return Err(e);
220            }
221
222            Ok(())
223        });
224
225        Ok(())
226    }
227
228    fn cancel_single_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
229        let account_id = self.core.account_id;
230        let client_order_id = cmd.client_order_id;
231        let venue_order_id = cmd.venue_order_id;
232        let strategy_id = cmd.strategy_id;
233        let instrument_id = cmd.instrument_id;
234
235        log::info!(
236            "Canceling order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
237        );
238
239        let http = self.http.clone();
240        let emitter = self.emitter.clone();
241        let clock = self.clock;
242
243        self.spawn_task("cancel_order", async move {
244            if let Err(e) = http
245                .cancel_order(
246                    account_id,
247                    instrument_id,
248                    Some(client_order_id),
249                    venue_order_id,
250                )
251                .await
252            {
253                log::error!("Cancel order failed: {e}");
254                let ts_event = clock.get_time_ns();
255                emitter.emit_order_cancel_rejected_event(
256                    strategy_id,
257                    instrument_id,
258                    client_order_id,
259                    venue_order_id,
260                    &format!("cancel-order error: {e}"),
261                    ts_event,
262                );
263                anyhow::bail!("Cancel order failed: {e}");
264            }
265            Ok(())
266        });
267
268        Ok(())
269    }
270
271    fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
272        let stream = self.ws.stream().map_err(|e| anyhow::anyhow!("{e}"))?;
273        let emitter = self.emitter.clone();
274        let cancellation_token = self.cancellation_token.clone();
275
276        let handle = get_runtime().spawn(async move {
277            tokio::pin!(stream);
278
279            loop {
280                tokio::select! {
281                    () = cancellation_token.cancelled() => {
282                        log::debug!("Spot execution message handler cancelled");
283                        break;
284                    }
285                    msg = stream.next() => {
286                        match msg {
287                            Some(ws_msg) => {
288                                Self::handle_ws_message(ws_msg, &emitter);
289                            }
290                            None => {
291                                log::debug!("Spot execution WebSocket stream ended");
292                                break;
293                            }
294                        }
295                    }
296                }
297            }
298        });
299
300        self.ws_stream_handle = Some(handle);
301        Ok(())
302    }
303
304    fn modify_single_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
305        let client_order_id = cmd.client_order_id;
306        let venue_order_id = cmd.venue_order_id;
307        let strategy_id = cmd.strategy_id;
308        let instrument_id = cmd.instrument_id;
309        let quantity = cmd.quantity;
310        let price = cmd.price;
311
312        log::info!(
313            "Modifying order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
314        );
315
316        let http = self.http.clone();
317        let emitter = self.emitter.clone();
318        let clock = self.clock;
319
320        self.spawn_task("modify_order", async move {
321            if let Err(e) = http
322                .modify_order(
323                    instrument_id,
324                    Some(client_order_id),
325                    venue_order_id,
326                    quantity,
327                    price,
328                    None,
329                )
330                .await
331            {
332                log::error!("Modify order failed: {e}");
333                let ts_event = clock.get_time_ns();
334                emitter.emit_order_modify_rejected_event(
335                    strategy_id,
336                    instrument_id,
337                    client_order_id,
338                    venue_order_id,
339                    &format!("modify-order error: {e}"),
340                    ts_event,
341                );
342                anyhow::bail!("Modify order failed: {e}");
343            }
344            Ok(())
345        });
346
347        Ok(())
348    }
349
350    fn handle_ws_message(msg: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
351        match msg {
352            NautilusWsMessage::OrderRejected(event) => {
353                emitter.send_order_event(OrderEventAny::Rejected(event));
354            }
355            NautilusWsMessage::OrderAccepted(event) => {
356                emitter.send_order_event(OrderEventAny::Accepted(event));
357            }
358            NautilusWsMessage::OrderCanceled(event) => {
359                emitter.send_order_event(OrderEventAny::Canceled(event));
360            }
361            NautilusWsMessage::OrderExpired(event) => {
362                emitter.send_order_event(OrderEventAny::Expired(event));
363            }
364            NautilusWsMessage::OrderUpdated(event) => {
365                emitter.send_order_event(OrderEventAny::Updated(event));
366            }
367            NautilusWsMessage::OrderStatusReport(report) => {
368                emitter.send_order_status_report(*report);
369            }
370            NautilusWsMessage::FillReport(report) => {
371                emitter.send_fill_report(*report);
372            }
373            NautilusWsMessage::Reconnected => {
374                log::info!("Spot execution WebSocket reconnected");
375            }
376            // Data messages are handled by the data client
377            NautilusWsMessage::Data(_) | NautilusWsMessage::Deltas(_) => {}
378        }
379    }
380}
381
382#[async_trait(?Send)]
383impl ExecutionClient for KrakenSpotExecutionClient {
384    fn is_connected(&self) -> bool {
385        self.core.is_connected()
386    }
387
388    fn client_id(&self) -> ClientId {
389        self.core.client_id
390    }
391
392    fn account_id(&self) -> AccountId {
393        self.core.account_id
394    }
395
396    fn venue(&self) -> Venue {
397        *KRAKEN_VENUE
398    }
399
400    fn oms_type(&self) -> OmsType {
401        self.core.oms_type
402    }
403
404    fn get_account(&self) -> Option<AccountAny> {
405        self.core.cache().account(&self.core.account_id).cloned()
406    }
407
408    fn generate_account_state(
409        &self,
410        balances: Vec<AccountBalance>,
411        margins: Vec<MarginBalance>,
412        reported: bool,
413        ts_event: UnixNanos,
414    ) -> anyhow::Result<()> {
415        self.emitter
416            .emit_account_state(balances, margins, reported, ts_event);
417        Ok(())
418    }
419
420    fn start(&mut self) -> anyhow::Result<()> {
421        if self.core.is_started() {
422            return Ok(());
423        }
424
425        self.core.set_started();
426
427        log::info!(
428            "Started: client_id={}, account_id={}, product_type=Spot, environment={:?}",
429            self.core.client_id,
430            self.core.account_id,
431            self.config.environment
432        );
433        Ok(())
434    }
435
436    fn stop(&mut self) -> anyhow::Result<()> {
437        if self.core.is_stopped() {
438            return Ok(());
439        }
440
441        self.cancellation_token.cancel();
442        self.core.set_stopped();
443        self.core.set_disconnected();
444        log::info!("Stopped: client_id={}", self.core.client_id);
445        Ok(())
446    }
447
448    async fn connect(&mut self) -> anyhow::Result<()> {
449        if self.core.is_connected() {
450            return Ok(());
451        }
452
453        self.ws
454            .connect()
455            .await
456            .context("Failed to connect spot WebSocket")?;
457        self.ws
458            .wait_until_active(10.0)
459            .await
460            .context("Spot WebSocket failed to become active")?;
461
462        self.ws
463            .authenticate()
464            .await
465            .context("Failed to authenticate spot WebSocket")?;
466
467        self.ws.set_account_id(self.core.account_id);
468
469        self.ws
470            .subscribe_executions(true, true)
471            .await
472            .context("Failed to subscribe to executions")?;
473
474        self.spawn_message_handler()?;
475
476        log::info!("Spot WebSocket authenticated and subscribed to executions");
477
478        self.core.set_connected();
479        log::info!("Connected: client_id={}", self.core.client_id);
480        Ok(())
481    }
482
483    async fn disconnect(&mut self) -> anyhow::Result<()> {
484        if self.core.is_disconnected() {
485            return Ok(());
486        }
487
488        self.cancellation_token.cancel();
489
490        if let Some(handle) = self.ws_stream_handle.take() {
491            handle.abort();
492        }
493
494        let _ = self.ws.close().await;
495
496        self.cancellation_token = CancellationToken::new();
497        self.core.set_disconnected();
498        log::info!("Disconnected: client_id={}", self.core.client_id);
499        Ok(())
500    }
501
502    async fn generate_order_status_report(
503        &self,
504        cmd: &GenerateOrderStatusReport,
505    ) -> anyhow::Result<Option<OrderStatusReport>> {
506        log::debug!(
507            "Generating order status report: venue_order_id={:?}, client_order_id={:?}",
508            cmd.venue_order_id,
509            cmd.client_order_id
510        );
511
512        let account_id = self.core.account_id;
513        let reports = self
514            .http
515            .request_order_status_reports(account_id, None, None, None, false)
516            .await?;
517
518        // Note: cmd.venue_order_id is typed as Option<ClientOrderId> in the message struct
519        Ok(reports.into_iter().find(|r| {
520            cmd.venue_order_id
521                .is_some_and(|id| r.venue_order_id.as_str() == id.as_str())
522                || cmd
523                    .client_order_id
524                    .is_some_and(|id| r.client_order_id == Some(id))
525        }))
526    }
527
528    async fn generate_order_status_reports(
529        &self,
530        cmd: &GenerateOrderStatusReports,
531    ) -> anyhow::Result<Vec<OrderStatusReport>> {
532        log::debug!(
533            "Generating order status reports: instrument_id={:?}, open_only={}",
534            cmd.instrument_id,
535            cmd.open_only
536        );
537
538        let account_id = self.core.account_id;
539        self.http
540            .request_order_status_reports(account_id, cmd.instrument_id, None, None, cmd.open_only)
541            .await
542    }
543
544    async fn generate_fill_reports(
545        &self,
546        cmd: GenerateFillReports,
547    ) -> anyhow::Result<Vec<FillReport>> {
548        log::debug!(
549            "Generating fill reports: instrument_id={:?}",
550            cmd.instrument_id
551        );
552
553        let account_id = self.core.account_id;
554        self.http
555            .request_fill_reports(account_id, cmd.instrument_id, None, None)
556            .await
557    }
558
559    async fn generate_position_status_reports(
560        &self,
561        cmd: &GeneratePositionStatusReports,
562    ) -> anyhow::Result<Vec<PositionStatusReport>> {
563        log::debug!(
564            "Generating position status reports: instrument_id={:?}",
565            cmd.instrument_id
566        );
567
568        let account_id = self.core.account_id;
569        self.http
570            .request_position_status_reports(account_id, cmd.instrument_id)
571            .await
572    }
573
574    async fn generate_mass_status(
575        &self,
576        _lookback_mins: Option<u64>,
577    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
578        log::debug!("Generating mass status");
579
580        let account_id = self.core.account_id;
581        let order_reports = self
582            .http
583            .request_order_status_reports(account_id, None, None, None, true)
584            .await?;
585        let fill_reports = self
586            .http
587            .request_fill_reports(account_id, None, None, None)
588            .await?;
589        let position_reports = self
590            .http
591            .request_position_status_reports(account_id, None)
592            .await?;
593
594        let mut mass_status = ExecutionMassStatus::new(
595            self.core.client_id,
596            self.core.account_id,
597            *KRAKEN_VENUE,
598            self.clock.get_time_ns(),
599            None,
600        );
601        mass_status.add_order_reports(order_reports);
602        mass_status.add_fill_reports(fill_reports);
603        mass_status.add_position_reports(position_reports);
604
605        Ok(Some(mass_status))
606    }
607
608    fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
609        log::debug!("Querying account: {cmd:?}");
610
611        let account_id = self.core.account_id;
612        let http = self.http.clone();
613        let emitter = self.emitter.clone();
614
615        self.spawn_task("query_account", async move {
616            let account_state = http.request_account_state(account_id).await?;
617            emitter.emit_account_state(
618                account_state.balances.clone(),
619                account_state.margins.clone(),
620                account_state.is_reported,
621                account_state.ts_event,
622            );
623            Ok(())
624        });
625
626        Ok(())
627    }
628
629    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
630        log::debug!("Querying order: {cmd:?}");
631
632        let venue_order_id = cmd
633            .venue_order_id
634            .context("venue_order_id required for query_order")?;
635        let account_id = self.core.account_id;
636        let http = self.http.clone();
637        let emitter = self.emitter.clone();
638
639        self.spawn_task("query_order", async move {
640            let reports = http
641                .request_order_status_reports(account_id, None, None, None, true)
642                .await
643                .context("Failed to query order")?;
644
645            if let Some(report) = reports
646                .into_iter()
647                .find(|r| r.venue_order_id == venue_order_id)
648            {
649                emitter.send_order_status_report(report);
650            }
651            Ok(())
652        });
653
654        Ok(())
655    }
656
657    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
658        let order = self
659            .core
660            .cache()
661            .order(&cmd.client_order_id)
662            .cloned()
663            .ok_or_else(|| anyhow::anyhow!("Order not found in cache: {}", cmd.client_order_id))?;
664        self.submit_single_order(&order, "submit_order")
665    }
666
667    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
668        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
669
670        log::info!(
671            "Submitting order list: order_list_id={}, count={}",
672            cmd.order_list.id,
673            orders.len()
674        );
675
676        for order in &orders {
677            self.submit_single_order(order, "submit_order_list")?;
678        }
679
680        Ok(())
681    }
682
683    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
684        self.modify_single_order(cmd)
685    }
686
687    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
688        self.cancel_single_order(cmd)
689    }
690
691    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
692        let instrument_id = cmd.instrument_id;
693
694        if cmd.order_side == OrderSide::NoOrderSide {
695            log::info!("Canceling all orders: instrument_id={instrument_id} (bulk)");
696
697            let http = self.http.clone();
698
699            self.spawn_task("cancel_all_orders", async move {
700                if let Err(e) = http.inner.cancel_all_orders().await {
701                    log::error!("Cancel all orders failed: {e}");
702                    anyhow::bail!("Cancel all orders failed: {e}");
703                }
704                Ok(())
705            });
706
707            return Ok(());
708        }
709
710        log::info!(
711            "Canceling all orders: instrument_id={instrument_id}, side={:?}",
712            cmd.order_side
713        );
714
715        let orders_to_cancel: Vec<_> = {
716            let cache = self.core.cache();
717            let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
718
719            open_orders
720                .into_iter()
721                .filter(|order| order.order_side() == cmd.order_side)
722                .filter_map(|order| {
723                    Some((
724                        order.venue_order_id()?,
725                        order.client_order_id(),
726                        order.instrument_id(),
727                        order.strategy_id(),
728                    ))
729                })
730                .collect()
731        };
732
733        let account_id = self.core.account_id;
734
735        for (venue_order_id, client_order_id, order_instrument_id, strategy_id) in orders_to_cancel
736        {
737            let http = self.http.clone();
738            let emitter = self.emitter.clone();
739            let clock = self.clock;
740
741            self.spawn_task("cancel_order_by_side", async move {
742                if let Err(e) = http
743                    .cancel_order(
744                        account_id,
745                        order_instrument_id,
746                        Some(client_order_id),
747                        Some(venue_order_id),
748                    )
749                    .await
750                {
751                    log::error!("Cancel order failed: {e}");
752                    let ts_event = clock.get_time_ns();
753                    emitter.emit_order_cancel_rejected_event(
754                        strategy_id,
755                        order_instrument_id,
756                        client_order_id,
757                        Some(venue_order_id),
758                        &format!("cancel-order error: {e}"),
759                        ts_event,
760                    );
761                }
762                Ok(())
763            });
764        }
765
766        Ok(())
767    }
768
769    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
770        log::info!(
771            "Batch canceling orders: instrument_id={}, count={}",
772            cmd.instrument_id,
773            cmd.cancels.len()
774        );
775
776        for cancel in &cmd.cancels {
777            self.cancel_single_order(cancel)?;
778        }
779
780        Ok(())
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use std::{cell::RefCell, rc::Rc};
787
788    use nautilus_common::cache::Cache;
789    use nautilus_model::{
790        enums::AccountType,
791        identifiers::{AccountId, ClientId, TraderId},
792    };
793    use rstest::rstest;
794
795    use super::*;
796    use crate::{common::enums::KrakenProductType, config::KrakenExecClientConfig};
797
798    fn create_test_core() -> ExecutionClientCore {
799        let cache = Rc::new(RefCell::new(Cache::default()));
800        ExecutionClientCore::new(
801            TraderId::from("TESTER-001"),
802            ClientId::from("KRAKEN"),
803            *KRAKEN_VENUE,
804            OmsType::Hedging,
805            AccountId::from("KRAKEN-001"),
806            AccountType::Margin,
807            None,
808            cache,
809        )
810    }
811
812    #[rstest]
813    fn test_spot_exec_client_new() {
814        let config = KrakenExecClientConfig {
815            product_type: KrakenProductType::Spot,
816            api_key: "test_key".to_string(),
817            api_secret: "test_secret".to_string(),
818            ..Default::default()
819        };
820
821        let client = KrakenSpotExecutionClient::new(create_test_core(), config);
822        assert!(client.is_ok());
823
824        let client = client.unwrap();
825        assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
826        assert_eq!(client.account_id(), AccountId::from("KRAKEN-001"));
827        assert_eq!(client.venue(), *KRAKEN_VENUE);
828        assert!(!client.is_connected());
829    }
830
831    #[rstest]
832    fn test_spot_exec_client_start_stop() {
833        let config = KrakenExecClientConfig {
834            product_type: KrakenProductType::Spot,
835            api_key: "test_key".to_string(),
836            api_secret: "test_secret".to_string(),
837            ..Default::default()
838        };
839
840        let mut client = KrakenSpotExecutionClient::new(create_test_core(), config).unwrap();
841
842        assert!(client.start().is_ok());
843        assert!(client.stop().is_ok());
844        assert!(!client.is_connected());
845    }
846}