Skip to main content

nautilus_kraken/execution/
futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kraken Futures execution client implementation.
17
18use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use nautilus_common::{
23    clients::ExecutionClient,
24    live::get_runtime,
25    messages::execution::{
26        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
27        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
28        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
29    },
30};
31use nautilus_core::{
32    UnixNanos,
33    time::{AtomicTime, get_atomic_clock_realtime},
34};
35use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
36use nautilus_model::{
37    accounts::AccountAny,
38    enums::{AccountType, OmsType, OrderSide},
39    events::OrderEventAny,
40    identifiers::{AccountId, ClientId, Venue},
41    orders::{Order, OrderAny},
42    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
43    types::{AccountBalance, MarginBalance},
44};
45use tokio::task::JoinHandle;
46use tokio_util::sync::CancellationToken;
47
48use crate::{
49    common::{consts::KRAKEN_VENUE, credential::KrakenCredential},
50    config::KrakenExecClientConfig,
51    http::KrakenFuturesHttpClient,
52    websocket::futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
53};
54
55const MUTEX_POISONED: &str = "mutex poisoned";
56
57/// Kraken Futures execution client.
58///
59/// Provides order management, account operations, and position management
60/// for Kraken Futures markets.
61#[allow(dead_code)]
62#[derive(Debug)]
63pub struct KrakenFuturesExecutionClient {
64    core: ExecutionClientCore,
65    clock: &'static AtomicTime,
66    config: KrakenExecClientConfig,
67    emitter: ExecutionEventEmitter,
68    http: KrakenFuturesHttpClient,
69    ws: KrakenFuturesWebSocketClient,
70    cancellation_token: CancellationToken,
71    ws_stream_handle: Option<JoinHandle<()>>,
72    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
73}
74
75impl KrakenFuturesExecutionClient {
76    /// Creates a new [`KrakenFuturesExecutionClient`].
77    pub fn new(core: ExecutionClientCore, config: KrakenExecClientConfig) -> anyhow::Result<Self> {
78        let clock = get_atomic_clock_realtime();
79        let emitter = ExecutionEventEmitter::new(
80            clock,
81            core.trader_id,
82            core.account_id,
83            AccountType::Margin,
84            None,
85        );
86
87        let cancellation_token = CancellationToken::new();
88
89        let http = KrakenFuturesHttpClient::new(
90            config.environment,
91            config.base_url.clone(),
92            config.timeout_secs,
93            None,
94            None,
95            None,
96            config.http_proxy.clone(),
97            config.max_requests_per_second,
98        )?;
99
100        let credential = KrakenCredential::new(config.api_key.clone(), config.api_secret.clone());
101        let ws = KrakenFuturesWebSocketClient::with_credentials(
102            config.ws_url(),
103            config.heartbeat_interval_secs,
104            Some(credential),
105        );
106
107        Ok(Self {
108            core,
109            clock,
110            config,
111            emitter,
112            http,
113            ws,
114            cancellation_token,
115            ws_stream_handle: None,
116            pending_tasks: Mutex::new(Vec::new()),
117        })
118    }
119
120    /// Returns a reference to the clock.
121    #[must_use]
122    pub fn clock(&self) -> &'static AtomicTime {
123        self.clock
124    }
125
126    /// Returns a reference to the event emitter.
127    #[must_use]
128    pub fn emitter(&self) -> &ExecutionEventEmitter {
129        &self.emitter
130    }
131
132    fn spawn_task<F>(&self, description: &'static str, fut: F)
133    where
134        F: Future<Output = anyhow::Result<()>> + Send + 'static,
135    {
136        let runtime = get_runtime();
137        let handle = runtime.spawn(async move {
138            if let Err(e) = fut.await {
139                log::warn!("{description} failed: {e:?}");
140            }
141        });
142
143        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
144        tasks.retain(|handle| !handle.is_finished());
145        tasks.push(handle);
146    }
147
148    fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) -> anyhow::Result<()> {
149        if order.is_closed() {
150            log::warn!(
151                "Cannot submit closed order: client_order_id={}",
152                order.client_order_id()
153            );
154            return Ok(());
155        }
156
157        let account_id = self.core.account_id;
158        let client_order_id = order.client_order_id();
159        let trader_id = order.trader_id();
160        let strategy_id = order.strategy_id();
161        let instrument_id = order.instrument_id();
162        let order_side = order.order_side();
163        let order_type = order.order_type();
164        let quantity = order.quantity();
165        let time_in_force = order.time_in_force();
166        let price = order.price();
167        let trigger_price = order.trigger_price();
168        let is_reduce_only = order.is_reduce_only();
169        let is_post_only = order.is_post_only();
170
171        log::debug!("OrderSubmitted: client_order_id={client_order_id}");
172        self.emitter.emit_order_submitted(order);
173
174        self.ws
175            .cache_client_order(client_order_id, None, instrument_id, trader_id, strategy_id);
176
177        let http = self.http.clone();
178        let ws = self.ws.clone();
179        let emitter = self.emitter.clone();
180        let clock = self.clock;
181
182        self.spawn_task(task_name, async move {
183            let result = http
184                .submit_order(
185                    account_id,
186                    instrument_id,
187                    client_order_id,
188                    order_side,
189                    order_type,
190                    quantity,
191                    time_in_force,
192                    price,
193                    trigger_price,
194                    is_reduce_only,
195                    is_post_only,
196                )
197                .await;
198
199            match result {
200                Ok(report) => {
201                    // Update cache with venue_order_id so cancel messages without
202                    // cli_ord_id can be mapped back to our orders
203                    ws.cache_client_order(
204                        client_order_id,
205                        Some(report.venue_order_id),
206                        instrument_id,
207                        trader_id,
208                        strategy_id,
209                    );
210                    Ok(())
211                }
212                Err(e) => {
213                    let ts_event = clock.get_time_ns();
214                    emitter.emit_order_rejected_event(
215                        strategy_id,
216                        instrument_id,
217                        client_order_id,
218                        &format!("{task_name} error: {e}"),
219                        ts_event,
220                        false,
221                    );
222                    Err(e)
223                }
224            }
225        });
226
227        Ok(())
228    }
229
230    fn cancel_single_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
231        let account_id = self.core.account_id;
232        let client_order_id = cmd.client_order_id;
233        let venue_order_id = cmd.venue_order_id;
234        let strategy_id = cmd.strategy_id;
235        let instrument_id = cmd.instrument_id;
236
237        log::info!(
238            "Canceling order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
239        );
240
241        let http = self.http.clone();
242        let emitter = self.emitter.clone();
243        let clock = self.clock;
244
245        self.spawn_task("cancel_order", async move {
246            if let Err(e) = http
247                .cancel_order(
248                    account_id,
249                    instrument_id,
250                    Some(client_order_id),
251                    venue_order_id,
252                )
253                .await
254            {
255                log::error!("Cancel order failed: {e}");
256                let ts_event = clock.get_time_ns();
257                emitter.emit_order_cancel_rejected_event(
258                    strategy_id,
259                    instrument_id,
260                    client_order_id,
261                    venue_order_id,
262                    &format!("cancel-order error: {e}"),
263                    ts_event,
264                );
265                anyhow::bail!("Cancel order failed: {e}");
266            }
267            Ok(())
268        });
269
270        Ok(())
271    }
272
273    fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
274        let mut rx = self
275            .ws
276            .take_output_rx()
277            .context("Failed to take futures WebSocket output receiver")?;
278        let emitter = self.emitter.clone();
279        let cancellation_token = self.cancellation_token.clone();
280
281        let handle = get_runtime().spawn(async move {
282            loop {
283                tokio::select! {
284                    () = cancellation_token.cancelled() => {
285                        log::debug!("Futures execution message handler cancelled");
286                        break;
287                    }
288                    msg = rx.recv() => {
289                        match msg {
290                            Some(ws_msg) => {
291                                Self::handle_ws_message(ws_msg, &emitter);
292                            }
293                            None => {
294                                log::debug!("Futures execution WebSocket stream ended");
295                                break;
296                            }
297                        }
298                    }
299                }
300            }
301        });
302
303        self.ws_stream_handle = Some(handle);
304        Ok(())
305    }
306
307    fn handle_ws_message(msg: KrakenFuturesWsMessage, emitter: &ExecutionEventEmitter) {
308        match msg {
309            KrakenFuturesWsMessage::OrderAccepted(event) => {
310                emitter.send_order_event(OrderEventAny::Accepted(event));
311            }
312            KrakenFuturesWsMessage::OrderCanceled(event) => {
313                emitter.send_order_event(OrderEventAny::Canceled(event));
314            }
315            KrakenFuturesWsMessage::OrderExpired(event) => {
316                emitter.send_order_event(OrderEventAny::Expired(event));
317            }
318            KrakenFuturesWsMessage::OrderUpdated(event) => {
319                emitter.send_order_event(OrderEventAny::Updated(event));
320            }
321            KrakenFuturesWsMessage::OrderStatusReport(report) => {
322                emitter.send_order_status_report(*report);
323            }
324            KrakenFuturesWsMessage::FillReport(report) => {
325                emitter.send_fill_report(*report);
326            }
327            KrakenFuturesWsMessage::Reconnected => {
328                log::info!("Futures execution WebSocket reconnected");
329            }
330            // Data messages are handled by the data client
331            KrakenFuturesWsMessage::BookDeltas(_)
332            | KrakenFuturesWsMessage::Quote(_)
333            | KrakenFuturesWsMessage::Trade(_)
334            | KrakenFuturesWsMessage::MarkPrice(_)
335            | KrakenFuturesWsMessage::IndexPrice(_)
336            | KrakenFuturesWsMessage::FundingRate(_) => {}
337        }
338    }
339
340    fn modify_single_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
341        let client_order_id = cmd.client_order_id;
342        let venue_order_id = cmd.venue_order_id;
343        let strategy_id = cmd.strategy_id;
344        let instrument_id = cmd.instrument_id;
345        let quantity = cmd.quantity;
346        let price = cmd.price;
347
348        log::info!(
349            "Modifying order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
350        );
351
352        let http = self.http.clone();
353        let emitter = self.emitter.clone();
354        let clock = self.clock;
355
356        self.spawn_task("modify_order", async move {
357            if let Err(e) = http
358                .modify_order(
359                    instrument_id,
360                    Some(client_order_id),
361                    venue_order_id,
362                    quantity,
363                    price,
364                    None,
365                )
366                .await
367            {
368                log::error!("Modify order failed: {e}");
369                let ts_event = clock.get_time_ns();
370                emitter.emit_order_modify_rejected_event(
371                    strategy_id,
372                    instrument_id,
373                    client_order_id,
374                    venue_order_id,
375                    &format!("modify-order error: {e}"),
376                    ts_event,
377                );
378                anyhow::bail!("Modify order failed: {e}");
379            }
380            Ok(())
381        });
382
383        Ok(())
384    }
385}
386
387#[async_trait(?Send)]
388impl ExecutionClient for KrakenFuturesExecutionClient {
389    fn is_connected(&self) -> bool {
390        self.core.is_connected()
391    }
392
393    fn client_id(&self) -> ClientId {
394        self.core.client_id
395    }
396
397    fn account_id(&self) -> AccountId {
398        self.core.account_id
399    }
400
401    fn venue(&self) -> Venue {
402        *KRAKEN_VENUE
403    }
404
405    fn oms_type(&self) -> OmsType {
406        self.core.oms_type
407    }
408
409    fn get_account(&self) -> Option<AccountAny> {
410        self.core.cache().account(&self.core.account_id).cloned()
411    }
412
413    fn generate_account_state(
414        &self,
415        balances: Vec<AccountBalance>,
416        margins: Vec<MarginBalance>,
417        reported: bool,
418        ts_event: UnixNanos,
419    ) -> anyhow::Result<()> {
420        self.emitter
421            .emit_account_state(balances, margins, reported, ts_event);
422        Ok(())
423    }
424
425    fn start(&mut self) -> anyhow::Result<()> {
426        if self.core.is_started() {
427            return Ok(());
428        }
429
430        self.core.set_started();
431
432        log::info!(
433            "Started: client_id={}, account_id={}, product_type=Futures, environment={:?}",
434            self.core.client_id,
435            self.core.account_id,
436            self.config.environment
437        );
438        Ok(())
439    }
440
441    fn stop(&mut self) -> anyhow::Result<()> {
442        if self.core.is_stopped() {
443            return Ok(());
444        }
445
446        self.cancellation_token.cancel();
447        self.core.set_stopped();
448        self.core.set_disconnected();
449        log::info!("Stopped: client_id={}", self.core.client_id);
450        Ok(())
451    }
452
453    async fn connect(&mut self) -> anyhow::Result<()> {
454        if self.core.is_connected() {
455            return Ok(());
456        }
457
458        self.ws
459            .connect()
460            .await
461            .context("Failed to connect futures WebSocket")?;
462        self.ws
463            .wait_until_active(10.0)
464            .await
465            .context("Futures WebSocket failed to become active")?;
466
467        self.ws
468            .authenticate()
469            .await
470            .context("Failed to authenticate futures WebSocket")?;
471
472        self.ws.set_account_id(self.core.account_id);
473
474        self.ws
475            .subscribe_executions()
476            .await
477            .context("Failed to subscribe to executions")?;
478
479        self.spawn_message_handler()?;
480
481        log::info!("Futures WebSocket authenticated and subscribed to executions");
482
483        self.core.set_connected();
484        log::info!("Connected: client_id={}", self.core.client_id);
485        Ok(())
486    }
487
488    async fn disconnect(&mut self) -> anyhow::Result<()> {
489        if self.core.is_disconnected() {
490            return Ok(());
491        }
492
493        self.cancellation_token.cancel();
494
495        if let Some(handle) = self.ws_stream_handle.take() {
496            handle.abort();
497        }
498
499        let _ = self.ws.close().await;
500
501        self.cancellation_token = CancellationToken::new();
502        self.core.set_disconnected();
503        log::info!("Disconnected: client_id={}", self.core.client_id);
504        Ok(())
505    }
506
507    async fn generate_order_status_report(
508        &self,
509        cmd: &GenerateOrderStatusReport,
510    ) -> anyhow::Result<Option<OrderStatusReport>> {
511        log::debug!(
512            "Generating order status report: venue_order_id={:?}, client_order_id={:?}",
513            cmd.venue_order_id,
514            cmd.client_order_id
515        );
516
517        let account_id = self.core.account_id;
518        let reports = self
519            .http
520            .request_order_status_reports(account_id, None, None, None, false)
521            .await?;
522
523        // Note: cmd.venue_order_id is typed as Option<ClientOrderId> in the message struct
524        Ok(reports.into_iter().find(|r| {
525            cmd.venue_order_id
526                .is_some_and(|id| r.venue_order_id.as_str() == id.as_str())
527                || cmd
528                    .client_order_id
529                    .is_some_and(|id| r.client_order_id == Some(id))
530        }))
531    }
532
533    async fn generate_order_status_reports(
534        &self,
535        cmd: &GenerateOrderStatusReports,
536    ) -> anyhow::Result<Vec<OrderStatusReport>> {
537        log::debug!(
538            "Generating order status reports: instrument_id={:?}, open_only={}",
539            cmd.instrument_id,
540            cmd.open_only
541        );
542
543        let account_id = self.core.account_id;
544        self.http
545            .request_order_status_reports(account_id, cmd.instrument_id, None, None, cmd.open_only)
546            .await
547    }
548
549    async fn generate_fill_reports(
550        &self,
551        cmd: GenerateFillReports,
552    ) -> anyhow::Result<Vec<FillReport>> {
553        log::debug!(
554            "Generating fill reports: instrument_id={:?}",
555            cmd.instrument_id
556        );
557
558        let account_id = self.core.account_id;
559        self.http
560            .request_fill_reports(account_id, cmd.instrument_id, None, None)
561            .await
562    }
563
564    async fn generate_position_status_reports(
565        &self,
566        cmd: &GeneratePositionStatusReports,
567    ) -> anyhow::Result<Vec<PositionStatusReport>> {
568        log::debug!(
569            "Generating position status reports: instrument_id={:?}",
570            cmd.instrument_id
571        );
572
573        let account_id = self.core.account_id;
574        self.http
575            .request_position_status_reports(account_id, cmd.instrument_id)
576            .await
577    }
578
579    async fn generate_mass_status(
580        &self,
581        _lookback_mins: Option<u64>,
582    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
583        log::debug!("Generating mass status");
584
585        let account_id = self.core.account_id;
586        let order_reports = self
587            .http
588            .request_order_status_reports(account_id, None, None, None, true)
589            .await?;
590        let fill_reports = self
591            .http
592            .request_fill_reports(account_id, None, None, None)
593            .await?;
594        let position_reports = self
595            .http
596            .request_position_status_reports(account_id, None)
597            .await?;
598
599        let mut mass_status = ExecutionMassStatus::new(
600            self.core.client_id,
601            self.core.account_id,
602            *KRAKEN_VENUE,
603            self.clock.get_time_ns(),
604            None,
605        );
606        mass_status.add_order_reports(order_reports);
607        mass_status.add_fill_reports(fill_reports);
608        mass_status.add_position_reports(position_reports);
609
610        Ok(Some(mass_status))
611    }
612
613    fn query_account(&self, cmd: &QueryAccount) -> anyhow::Result<()> {
614        log::debug!("Querying account: {cmd:?}");
615
616        let account_id = self.core.account_id;
617        let http = self.http.clone();
618        let emitter = self.emitter.clone();
619
620        self.spawn_task("query_account", async move {
621            let account_state = http.request_account_state(account_id).await?;
622            emitter.emit_account_state(
623                account_state.balances.clone(),
624                account_state.margins.clone(),
625                account_state.is_reported,
626                account_state.ts_event,
627            );
628            Ok(())
629        });
630
631        Ok(())
632    }
633
634    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
635        log::debug!("Querying order: {cmd:?}");
636
637        let venue_order_id = cmd
638            .venue_order_id
639            .context("venue_order_id required for query_order")?;
640        let account_id = self.core.account_id;
641        let http = self.http.clone();
642        let emitter = self.emitter.clone();
643
644        self.spawn_task("query_order", async move {
645            let reports = http
646                .request_order_status_reports(account_id, None, None, None, true)
647                .await
648                .context("Failed to query order")?;
649
650            if let Some(report) = reports
651                .into_iter()
652                .find(|r| r.venue_order_id == venue_order_id)
653            {
654                emitter.send_order_status_report(report);
655            }
656            Ok(())
657        });
658
659        Ok(())
660    }
661
662    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
663        let order = self
664            .core
665            .cache()
666            .order(&cmd.client_order_id)
667            .cloned()
668            .ok_or_else(|| anyhow::anyhow!("Order not found in cache: {}", cmd.client_order_id))?;
669        self.submit_single_order(&order, "submit_order")
670    }
671
672    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
673        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
674
675        log::info!(
676            "Submitting order list: order_list_id={}, count={}",
677            cmd.order_list.id,
678            orders.len()
679        );
680
681        for order in &orders {
682            self.submit_single_order(order, "submit_order_list")?;
683        }
684
685        Ok(())
686    }
687
688    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
689        self.modify_single_order(cmd)
690    }
691
692    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
693        self.cancel_single_order(cmd)
694    }
695
696    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
697        let instrument_id = cmd.instrument_id;
698
699        if cmd.order_side == OrderSide::NoOrderSide {
700            log::info!("Canceling all orders: instrument_id={instrument_id} (bulk)");
701
702            let http = self.http.clone();
703            let symbol = instrument_id.symbol.to_string();
704
705            self.spawn_task("cancel_all_orders", async move {
706                if let Err(e) = http.inner.cancel_all_orders(Some(symbol)).await {
707                    log::error!("Cancel all orders failed: {e}");
708                    anyhow::bail!("Cancel all orders failed: {e}");
709                }
710                Ok(())
711            });
712
713            return Ok(());
714        }
715
716        log::info!(
717            "Canceling all orders: instrument_id={instrument_id}, side={:?}",
718            cmd.order_side
719        );
720
721        let orders_to_cancel: Vec<_> = {
722            let cache = self.core.cache();
723            let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
724
725            open_orders
726                .into_iter()
727                .filter(|order| order.order_side() == cmd.order_side)
728                .filter_map(|order| {
729                    Some((
730                        order.venue_order_id()?,
731                        order.client_order_id(),
732                        order.instrument_id(),
733                        order.strategy_id(),
734                    ))
735                })
736                .collect()
737        };
738
739        let account_id = self.core.account_id;
740
741        for (venue_order_id, client_order_id, order_instrument_id, strategy_id) in orders_to_cancel
742        {
743            let http = self.http.clone();
744            let emitter = self.emitter.clone();
745            let clock = self.clock;
746
747            self.spawn_task("cancel_order_by_side", async move {
748                if let Err(e) = http
749                    .cancel_order(
750                        account_id,
751                        order_instrument_id,
752                        Some(client_order_id),
753                        Some(venue_order_id),
754                    )
755                    .await
756                {
757                    log::error!("Cancel order failed: {e}");
758                    let ts_event = clock.get_time_ns();
759                    emitter.emit_order_cancel_rejected_event(
760                        strategy_id,
761                        order_instrument_id,
762                        client_order_id,
763                        Some(venue_order_id),
764                        &format!("cancel-order error: {e}"),
765                        ts_event,
766                    );
767                }
768                Ok(())
769            });
770        }
771
772        Ok(())
773    }
774
775    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
776        log::info!(
777            "Batch canceling orders: instrument_id={}, count={}",
778            cmd.instrument_id,
779            cmd.cancels.len()
780        );
781
782        for cancel in &cmd.cancels {
783            self.cancel_single_order(cancel)?;
784        }
785
786        Ok(())
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use std::{cell::RefCell, rc::Rc};
793
794    use nautilus_common::cache::Cache;
795    use nautilus_model::{
796        enums::AccountType,
797        identifiers::{AccountId, ClientId, TraderId},
798    };
799    use rstest::rstest;
800
801    use super::*;
802    use crate::{common::enums::KrakenProductType, config::KrakenExecClientConfig};
803
804    fn create_test_core() -> ExecutionClientCore {
805        let cache = Rc::new(RefCell::new(Cache::default()));
806        ExecutionClientCore::new(
807            TraderId::from("TESTER-001"),
808            ClientId::from("KRAKEN"),
809            *KRAKEN_VENUE,
810            OmsType::Netting,
811            AccountId::from("KRAKEN-001"),
812            AccountType::Margin,
813            None,
814            cache,
815        )
816    }
817
818    #[rstest]
819    fn test_futures_exec_client_new() {
820        let config = KrakenExecClientConfig {
821            product_type: KrakenProductType::Futures,
822            api_key: "test_key".to_string(),
823            api_secret: "test_secret".to_string(),
824            ..Default::default()
825        };
826
827        let client = KrakenFuturesExecutionClient::new(create_test_core(), config);
828        assert!(client.is_ok());
829
830        let client = client.unwrap();
831        assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
832        assert_eq!(client.account_id(), AccountId::from("KRAKEN-001"));
833        assert_eq!(client.venue(), *KRAKEN_VENUE);
834        assert!(!client.is_connected());
835    }
836
837    #[rstest]
838    fn test_futures_exec_client_start_stop() {
839        let config = KrakenExecClientConfig {
840            product_type: KrakenProductType::Futures,
841            api_key: "test_key".to_string(),
842            api_secret: "test_secret".to_string(),
843            ..Default::default()
844        };
845
846        let mut client = KrakenFuturesExecutionClient::new(create_test_core(), config).unwrap();
847
848        assert!(client.start().is_ok());
849        assert!(client.stop().is_ok());
850        assert!(!client.is_connected());
851    }
852}