Skip to main content

nautilus_bybit/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Bybit adapter.
17
18use std::{
19    future::Future,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::{StreamExt, pin_mut};
27use nautilus_common::{
28    clients::ExecutionClient,
29    live::{get_runtime, runner::get_exec_event_sender},
30    messages::execution::{
31        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
33        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
34        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
35        SubmitOrderList,
36    },
37};
38use nautilus_core::{
39    MUTEX_POISONED, UnixNanos,
40    env::get_or_env_var,
41    time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45    accounts::AccountAny,
46    enums::{OmsType, OrderSide, OrderType, TimeInForce},
47    events::OrderEventAny,
48    identifiers::{AccountId, ClientId, InstrumentId, Venue},
49    orders::Order,
50    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51    types::{AccountBalance, MarginBalance},
52};
53use tokio::task::JoinHandle;
54use ustr::Ustr;
55
56use crate::{
57    common::{
58        consts::BYBIT_VENUE,
59        enums::{
60            BybitAccountType, BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType,
61            BybitTimeInForce,
62        },
63        parse::extract_raw_symbol,
64    },
65    config::BybitExecClientConfig,
66    http::client::BybitHttpClient,
67    websocket::{
68        client::BybitWebSocketClient,
69        messages::{
70            BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams,
71            NautilusWsMessage,
72        },
73    },
74};
75
76/// Live execution client for Bybit.
77#[derive(Debug)]
78pub struct BybitExecutionClient {
79    core: ExecutionClientCore,
80    clock: &'static AtomicTime,
81    config: BybitExecClientConfig,
82    emitter: ExecutionEventEmitter,
83    http_client: BybitHttpClient,
84    ws_private: BybitWebSocketClient,
85    ws_trade: BybitWebSocketClient,
86    ws_private_stream_handle: Option<JoinHandle<()>>,
87    ws_trade_stream_handle: Option<JoinHandle<()>>,
88    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
89}
90
91impl BybitExecutionClient {
92    /// Creates a new [`BybitExecutionClient`].
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the client fails to initialize.
97    pub fn new(core: ExecutionClientCore, config: BybitExecClientConfig) -> anyhow::Result<Self> {
98        let api_key = get_or_env_var(config.api_key.clone(), "BYBIT_API_KEY")?;
99        let api_secret = get_or_env_var(config.api_secret.clone(), "BYBIT_API_SECRET")?;
100
101        let http_client = BybitHttpClient::with_credentials(
102            api_key.clone(),
103            api_secret.clone(),
104            Some(config.http_base_url()),
105            config.http_timeout_secs,
106            config.max_retries,
107            config.retry_delay_initial_ms,
108            config.retry_delay_max_ms,
109            config.recv_window_ms,
110            config.http_proxy_url.clone(),
111        )?;
112
113        let ws_private = BybitWebSocketClient::new_private(
114            config.environment,
115            Some(api_key.clone()),
116            Some(api_secret.clone()),
117            Some(config.ws_private_url()),
118            config.heartbeat_interval_secs,
119        );
120
121        let ws_trade = BybitWebSocketClient::new_trade(
122            config.environment,
123            Some(api_key),
124            Some(api_secret),
125            Some(config.ws_trade_url()),
126            config.heartbeat_interval_secs,
127        );
128
129        let clock = get_atomic_clock_realtime();
130        let emitter = ExecutionEventEmitter::new(
131            clock,
132            core.trader_id,
133            core.account_id,
134            core.account_type,
135            None,
136        );
137
138        Ok(Self {
139            core,
140            clock,
141            config,
142            emitter,
143            http_client,
144            ws_private,
145            ws_trade,
146            ws_private_stream_handle: None,
147            ws_trade_stream_handle: None,
148            pending_tasks: Mutex::new(Vec::new()),
149        })
150    }
151
152    fn product_types(&self) -> Vec<BybitProductType> {
153        if self.config.product_types.is_empty() {
154            vec![BybitProductType::Linear]
155        } else {
156            self.config.product_types.clone()
157        }
158    }
159
160    async fn refresh_account_state(&self) -> anyhow::Result<()> {
161        let account_state = self
162            .http_client
163            .request_account_state(BybitAccountType::Unified, self.core.account_id)
164            .await
165            .context("failed to request Bybit account state")?;
166
167        self.emitter.send_account_state(account_state);
168        Ok(())
169    }
170
171    fn update_account_state(&self) -> anyhow::Result<()> {
172        let runtime = get_runtime();
173        runtime.block_on(self.refresh_account_state())
174    }
175
176    fn spawn_task<F>(&self, description: &'static str, fut: F)
177    where
178        F: Future<Output = anyhow::Result<()>> + Send + 'static,
179    {
180        let runtime = get_runtime();
181        let handle = runtime.spawn(async move {
182            if let Err(e) = fut.await {
183                log::warn!("{description} failed: {e:?}");
184            }
185        });
186
187        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
188        tasks.retain(|handle| !handle.is_finished());
189        tasks.push(handle);
190    }
191
192    fn abort_pending_tasks(&self) {
193        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
194        for handle in tasks.drain(..) {
195            handle.abort();
196        }
197    }
198
199    /// Polls the cache until the account is registered or timeout is reached.
200    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
201        let account_id = self.core.account_id;
202
203        if self.core.cache().account(&account_id).is_some() {
204            log::info!("Account {account_id} registered");
205            return Ok(());
206        }
207
208        let start = Instant::now();
209        let timeout = Duration::from_secs_f64(timeout_secs);
210        let interval = Duration::from_millis(10);
211
212        loop {
213            tokio::time::sleep(interval).await;
214
215            if self.core.cache().account(&account_id).is_some() {
216                log::info!("Account {account_id} registered");
217                return Ok(());
218            }
219
220            if start.elapsed() >= timeout {
221                anyhow::bail!(
222                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
223                );
224            }
225        }
226    }
227
228    fn get_product_type_for_instrument(&self, instrument_id: InstrumentId) -> BybitProductType {
229        // Determine product type from instrument symbol
230        let symbol = instrument_id.symbol.as_str();
231        if symbol.ends_with("-SPOT") || (!symbol.contains('-') && !symbol.contains("PERP")) {
232            BybitProductType::Spot
233        } else if symbol.ends_with("-OPTION") {
234            BybitProductType::Option
235        } else if symbol.contains("USD") && !symbol.contains("USDT") && !symbol.contains("USDC") {
236            BybitProductType::Inverse
237        } else {
238            BybitProductType::Linear
239        }
240    }
241
242    fn map_order_type(order_type: OrderType) -> anyhow::Result<BybitOrderType> {
243        match order_type {
244            OrderType::Market => Ok(BybitOrderType::Market),
245            OrderType::Limit => Ok(BybitOrderType::Limit),
246            _ => anyhow::bail!("unsupported order type for Bybit: {order_type}"),
247        }
248    }
249
250    fn map_time_in_force(tif: TimeInForce, is_post_only: bool) -> BybitTimeInForce {
251        if is_post_only {
252            return BybitTimeInForce::PostOnly;
253        }
254        match tif {
255            TimeInForce::Gtc => BybitTimeInForce::Gtc,
256            TimeInForce::Ioc => BybitTimeInForce::Ioc,
257            TimeInForce::Fok => BybitTimeInForce::Fok,
258            _ => BybitTimeInForce::Gtc,
259        }
260    }
261}
262
263#[async_trait(?Send)]
264impl ExecutionClient for BybitExecutionClient {
265    fn is_connected(&self) -> bool {
266        self.core.is_connected()
267    }
268
269    fn client_id(&self) -> ClientId {
270        self.core.client_id
271    }
272
273    fn account_id(&self) -> AccountId {
274        self.core.account_id
275    }
276
277    fn venue(&self) -> Venue {
278        *BYBIT_VENUE
279    }
280
281    fn oms_type(&self) -> OmsType {
282        self.core.oms_type
283    }
284
285    fn get_account(&self) -> Option<AccountAny> {
286        self.core.cache().account(&self.core.account_id).cloned()
287    }
288
289    async fn connect(&mut self) -> anyhow::Result<()> {
290        if self.core.is_connected() {
291            return Ok(());
292        }
293
294        let product_types = self.product_types();
295
296        if !self.core.instruments_initialized() {
297            let mut all_instruments = Vec::new();
298            for product_type in &product_types {
299                let instruments = self
300                    .http_client
301                    .request_instruments(*product_type, None)
302                    .await
303                    .with_context(|| {
304                        format!("failed to request Bybit instruments for {product_type:?}")
305                    })?;
306
307                if instruments.is_empty() {
308                    log::warn!("No instruments returned for {product_type:?}");
309                    continue;
310                }
311
312                log::info!("Loaded {} {product_type:?} instruments", instruments.len());
313
314                self.http_client.cache_instruments(instruments.clone());
315                all_instruments.extend(instruments);
316            }
317
318            if !all_instruments.is_empty() {
319                self.ws_private.cache_instruments(all_instruments.clone());
320                self.ws_trade.cache_instruments(all_instruments);
321            }
322            self.core.set_instruments_initialized();
323        }
324
325        self.ws_private.set_account_id(self.core.account_id);
326        self.ws_trade.set_account_id(self.core.account_id);
327
328        self.ws_private.connect().await?;
329        self.ws_private.wait_until_active(10.0).await?;
330        log::info!("Connected to private WebSocket");
331
332        if self.ws_private_stream_handle.is_none() {
333            let stream = self.ws_private.stream();
334            let emitter = self.emitter.clone();
335            let handle = get_runtime().spawn(async move {
336                pin_mut!(stream);
337                while let Some(message) = stream.next().await {
338                    dispatch_ws_message(message, &emitter);
339                }
340            });
341            self.ws_private_stream_handle = Some(handle);
342        }
343
344        // Demo environment does not support Trade WebSocket API
345        if self.config.environment == BybitEnvironment::Demo {
346            log::warn!("Demo mode: Trade WebSocket not available, orders use HTTP REST API");
347        } else {
348            self.ws_trade.connect().await?;
349            self.ws_trade.wait_until_active(10.0).await?;
350            log::info!("Connected to trade WebSocket");
351
352            if self.ws_trade_stream_handle.is_none() {
353                let stream = self.ws_trade.stream();
354                let emitter = self.emitter.clone();
355                let handle = get_runtime().spawn(async move {
356                    pin_mut!(stream);
357                    while let Some(message) = stream.next().await {
358                        dispatch_ws_message(message, &emitter);
359                    }
360                });
361                self.ws_trade_stream_handle = Some(handle);
362            }
363        }
364
365        self.ws_private.subscribe_orders().await?;
366        self.ws_private.subscribe_executions().await?;
367        self.ws_private.subscribe_positions().await?;
368        self.ws_private.subscribe_wallet().await?;
369
370        let account_state = self
371            .http_client
372            .request_account_state(BybitAccountType::Unified, self.core.account_id)
373            .await
374            .context("failed to request Bybit account state")?;
375
376        if !account_state.balances.is_empty() {
377            log::info!(
378                "Received account state with {} balance(s)",
379                account_state.balances.len()
380            );
381        }
382        self.emitter.send_account_state(account_state);
383
384        self.await_account_registered(30.0).await?;
385
386        self.core.set_connected();
387        log::info!("Connected: client_id={}", self.core.client_id);
388        Ok(())
389    }
390
391    async fn disconnect(&mut self) -> anyhow::Result<()> {
392        if self.core.is_disconnected() {
393            return Ok(());
394        }
395
396        self.abort_pending_tasks();
397        self.http_client.cancel_all_requests();
398
399        if let Err(e) = self.ws_private.close().await {
400            log::warn!("Error closing private websocket: {e:?}");
401        }
402
403        if let Err(e) = self.ws_trade.close().await {
404            log::warn!("Error closing trade websocket: {e:?}");
405        }
406
407        if let Some(handle) = self.ws_private_stream_handle.take() {
408            handle.abort();
409        }
410
411        if let Some(handle) = self.ws_trade_stream_handle.take() {
412            handle.abort();
413        }
414
415        self.core.set_disconnected();
416        log::info!("Disconnected: client_id={}", self.core.client_id);
417        Ok(())
418    }
419
420    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
421        self.update_account_state()
422    }
423
424    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
425        log::debug!(
426            "query_order not implemented for Bybit execution client (client_order_id={})",
427            cmd.client_order_id
428        );
429        Ok(())
430    }
431
432    fn generate_account_state(
433        &self,
434        balances: Vec<AccountBalance>,
435        margins: Vec<MarginBalance>,
436        reported: bool,
437        ts_event: UnixNanos,
438    ) -> anyhow::Result<()> {
439        self.emitter
440            .emit_account_state(balances, margins, reported, ts_event);
441        Ok(())
442    }
443
444    fn start(&mut self) -> anyhow::Result<()> {
445        if self.core.is_started() {
446            return Ok(());
447        }
448
449        let sender = get_exec_event_sender();
450        self.emitter.set_sender(sender);
451        self.core.set_started();
452
453        let http_client = self.http_client.clone();
454        let mut ws_private = self.ws_private.clone();
455        let mut ws_trade = self.ws_trade.clone();
456        let product_types = self.config.product_types.clone();
457
458        get_runtime().spawn(async move {
459            let mut all_instruments = Vec::new();
460            for product_type in product_types {
461                match http_client.request_instruments(product_type, None).await {
462                    Ok(instruments) => {
463                        if instruments.is_empty() {
464                            log::warn!("No instruments returned for {product_type:?}");
465                            continue;
466                        }
467                        http_client.cache_instruments(instruments.clone());
468                        all_instruments.extend(instruments);
469                    }
470                    Err(e) => {
471                        log::error!("Failed to request instruments for {product_type:?}: {e}");
472                    }
473                }
474            }
475
476            if all_instruments.is_empty() {
477                log::warn!(
478                    "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
479                );
480            } else {
481                ws_private.cache_instruments(all_instruments.clone());
482                ws_trade.cache_instruments(all_instruments);
483                log::info!("Instruments initialized");
484            }
485        });
486
487        log::info!(
488            "Started: client_id={}, account_id={}, account_type={:?}, product_types={:?}, environment={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
489            self.core.client_id,
490            self.core.account_id,
491            self.core.account_type,
492            self.config.product_types,
493            self.config.environment,
494            self.config.http_proxy_url,
495            self.config.ws_proxy_url,
496        );
497        Ok(())
498    }
499
500    fn stop(&mut self) -> anyhow::Result<()> {
501        if self.core.is_stopped() {
502            return Ok(());
503        }
504
505        self.core.set_stopped();
506        self.core.set_disconnected();
507        if let Some(handle) = self.ws_private_stream_handle.take() {
508            handle.abort();
509        }
510        if let Some(handle) = self.ws_trade_stream_handle.take() {
511            handle.abort();
512        }
513        self.abort_pending_tasks();
514        log::info!("Stopped: client_id={}", self.core.client_id);
515        Ok(())
516    }
517
518    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
519        let order = {
520            let cache = self.core.cache();
521            let order = cache
522                .order(&cmd.client_order_id)
523                .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
524
525            if order.is_closed() {
526                log::warn!("Cannot submit closed order {}", order.client_order_id());
527                return Ok(());
528            }
529
530            order.clone()
531        };
532
533        // Validate order params before emitting submitted event
534        if let Err(e) = BybitOrderSide::try_from(order.order_side()) {
535            self.emitter.emit_order_denied(&order, &e.to_string());
536            return Ok(());
537        }
538        if let Err(e) = Self::map_order_type(order.order_type()) {
539            self.emitter.emit_order_denied(&order, &e.to_string());
540            return Ok(());
541        }
542
543        log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
544        self.emitter.emit_order_submitted(&order);
545
546        let instrument_id = order.instrument_id();
547        let product_type = self.get_product_type_for_instrument(instrument_id);
548        let client_order_id = order.client_order_id();
549        let strategy_id = order.strategy_id();
550        let emitter = self.emitter.clone();
551        let clock = self.clock;
552
553        if self.config.environment == BybitEnvironment::Demo {
554            let http_client = self.http_client.clone();
555            let account_id = self.core.account_id;
556            let order_side = order.order_side();
557            let order_type = order.order_type();
558            let quantity = order.quantity();
559            let time_in_force = order.time_in_force();
560            let price = order.price();
561            let trigger_price = order.trigger_price();
562            let post_only = order.is_post_only();
563            let reduce_only = order.is_reduce_only();
564
565            self.spawn_task("submit_order_http", async move {
566                let result = http_client
567                    .submit_order(
568                        account_id,
569                        product_type,
570                        instrument_id,
571                        client_order_id,
572                        order_side,
573                        order_type,
574                        quantity,
575                        Some(time_in_force),
576                        price,
577                        trigger_price,
578                        Some(post_only),
579                        reduce_only,
580                        false, // is_quote_quantity
581                        false, // is_leverage
582                    )
583                    .await;
584
585                if let Err(e) = result {
586                    let ts_event = clock.get_time_ns();
587                    emitter.emit_order_rejected_event(
588                        strategy_id,
589                        instrument_id,
590                        client_order_id,
591                        &format!("submit-order-error: {e}"),
592                        ts_event,
593                        false,
594                    );
595                    anyhow::bail!("submit order failed: {e}");
596                }
597
598                Ok(())
599            });
600
601            return Ok(());
602        }
603
604        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
605        let bybit_side = BybitOrderSide::try_from(order.order_side())?;
606        let bybit_order_type = Self::map_order_type(order.order_type())?;
607
608        let params = BybitWsPlaceOrderParams {
609            category: product_type,
610            symbol: Ustr::from(raw_symbol),
611            side: bybit_side,
612            order_type: bybit_order_type,
613            qty: order.quantity().to_string(),
614            is_leverage: None,
615            market_unit: None,
616            price: order.price().map(|p| p.to_string()),
617            time_in_force: Some(Self::map_time_in_force(
618                order.time_in_force(),
619                order.is_post_only(),
620            )),
621            order_link_id: Some(order.client_order_id().to_string()),
622            reduce_only: if order.is_reduce_only() {
623                Some(true)
624            } else {
625                None
626            },
627            close_on_trigger: None,
628            trigger_price: order.trigger_price().map(|p| p.to_string()),
629            trigger_by: None,
630            trigger_direction: None,
631            tpsl_mode: None,
632            take_profit: None,
633            stop_loss: None,
634            tp_trigger_by: None,
635            sl_trigger_by: None,
636            sl_trigger_price: None,
637            tp_trigger_price: None,
638            sl_order_type: None,
639            tp_order_type: None,
640            sl_limit_price: None,
641            tp_limit_price: None,
642        };
643
644        let ws_trade = self.ws_trade.clone();
645        let trader_id = self.core.trader_id;
646
647        self.spawn_task("submit_order", async move {
648            let result = ws_trade
649                .place_order(
650                    params,
651                    client_order_id,
652                    trader_id,
653                    strategy_id,
654                    instrument_id,
655                )
656                .await
657                .map_err(|e| anyhow::anyhow!("submit order failed: {e}"));
658
659            if let Err(e) = result {
660                let ts_event = clock.get_time_ns();
661                emitter.emit_order_rejected_event(
662                    strategy_id,
663                    instrument_id,
664                    client_order_id,
665                    &format!("submit-order-error: {e}"),
666                    ts_event,
667                    false,
668                );
669                return Err(e);
670            }
671
672            Ok(())
673        });
674
675        Ok(())
676    }
677
678    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
679        log::warn!(
680            "submit_order_list not yet implemented for Bybit execution client (got {} orders)",
681            cmd.order_list.client_order_ids.len()
682        );
683        Ok(())
684    }
685
686    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
687        let instrument_id = cmd.instrument_id;
688        let product_type = self.get_product_type_for_instrument(instrument_id);
689        let client_order_id = cmd.client_order_id;
690        let strategy_id = cmd.strategy_id;
691        let venue_order_id = cmd.venue_order_id;
692        let emitter = self.emitter.clone();
693        let clock = self.clock;
694
695        if self.config.environment == BybitEnvironment::Demo {
696            let http_client = self.http_client.clone();
697            let account_id = self.core.account_id;
698            let quantity = cmd.quantity;
699            let price = cmd.price;
700
701            self.spawn_task("modify_order_http", async move {
702                let result = http_client
703                    .modify_order(
704                        account_id,
705                        product_type,
706                        instrument_id,
707                        Some(client_order_id),
708                        venue_order_id,
709                        quantity,
710                        price,
711                    )
712                    .await;
713
714                if let Err(e) = result {
715                    let ts_event = clock.get_time_ns();
716                    emitter.emit_order_modify_rejected_event(
717                        strategy_id,
718                        instrument_id,
719                        client_order_id,
720                        venue_order_id,
721                        &format!("modify-order-error: {e}"),
722                        ts_event,
723                    );
724                    anyhow::bail!("modify order failed: {e}");
725                }
726
727                Ok(())
728            });
729
730            return Ok(());
731        }
732
733        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
734
735        let params = BybitWsAmendOrderParams {
736            category: product_type,
737            symbol: Ustr::from(raw_symbol),
738            order_id: cmd.venue_order_id.map(|v| v.to_string()),
739            order_link_id: Some(cmd.client_order_id.to_string()),
740            qty: cmd.quantity.map(|q| q.to_string()),
741            price: cmd.price.map(|p| p.to_string()),
742            trigger_price: None,
743            take_profit: None,
744            stop_loss: None,
745            tp_trigger_by: None,
746            sl_trigger_by: None,
747        };
748
749        let ws_trade = self.ws_trade.clone();
750        let trader_id = cmd.trader_id;
751
752        self.spawn_task("modify_order", async move {
753            let result = ws_trade
754                .amend_order(
755                    params,
756                    client_order_id,
757                    trader_id,
758                    strategy_id,
759                    instrument_id,
760                    venue_order_id,
761                )
762                .await
763                .map_err(|e| anyhow::anyhow!("modify order failed: {e}"));
764
765            if let Err(e) = result {
766                let ts_event = clock.get_time_ns();
767                emitter.emit_order_modify_rejected_event(
768                    strategy_id,
769                    instrument_id,
770                    client_order_id,
771                    venue_order_id,
772                    &format!("modify-order-error: {e}"),
773                    ts_event,
774                );
775                return Err(e);
776            }
777
778            Ok(())
779        });
780
781        Ok(())
782    }
783
784    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
785        let instrument_id = cmd.instrument_id;
786        let product_type = self.get_product_type_for_instrument(instrument_id);
787        let client_order_id = cmd.client_order_id;
788        let strategy_id = cmd.strategy_id;
789        let venue_order_id = cmd.venue_order_id;
790        let emitter = self.emitter.clone();
791        let clock = self.clock;
792
793        if self.config.environment == BybitEnvironment::Demo {
794            let http_client = self.http_client.clone();
795            let account_id = self.core.account_id;
796
797            self.spawn_task("cancel_order_http", async move {
798                let result = http_client
799                    .cancel_order(
800                        account_id,
801                        product_type,
802                        instrument_id,
803                        Some(client_order_id),
804                        venue_order_id,
805                    )
806                    .await;
807
808                if let Err(e) = result {
809                    let ts_event = clock.get_time_ns();
810                    emitter.emit_order_cancel_rejected_event(
811                        strategy_id,
812                        instrument_id,
813                        client_order_id,
814                        venue_order_id,
815                        &format!("cancel-order-error: {e}"),
816                        ts_event,
817                    );
818                    anyhow::bail!("cancel order failed: {e}");
819                }
820
821                Ok(())
822            });
823
824            return Ok(());
825        }
826
827        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
828
829        let params = BybitWsCancelOrderParams {
830            category: product_type,
831            symbol: Ustr::from(raw_symbol),
832            order_id: cmd.venue_order_id.map(|v| v.to_string()),
833            order_link_id: Some(cmd.client_order_id.to_string()),
834        };
835
836        let ws_trade = self.ws_trade.clone();
837        let trader_id = cmd.trader_id;
838
839        self.spawn_task("cancel_order", async move {
840            let result = ws_trade
841                .cancel_order(
842                    params,
843                    client_order_id,
844                    trader_id,
845                    strategy_id,
846                    instrument_id,
847                    venue_order_id,
848                )
849                .await
850                .map_err(|e| anyhow::anyhow!("cancel order failed: {e}"));
851
852            if let Err(e) = result {
853                let ts_event = clock.get_time_ns();
854                emitter.emit_order_cancel_rejected_event(
855                    strategy_id,
856                    instrument_id,
857                    client_order_id,
858                    venue_order_id,
859                    &format!("cancel-order-error: {e}"),
860                    ts_event,
861                );
862                return Err(e);
863            }
864
865            Ok(())
866        });
867
868        Ok(())
869    }
870
871    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
872        if cmd.order_side != OrderSide::NoOrderSide {
873            log::warn!(
874                "Bybit does not support order_side filtering for cancel all orders; \
875                ignoring order_side={:?} and canceling all orders",
876                cmd.order_side,
877            );
878        }
879
880        let instrument_id = cmd.instrument_id;
881        let product_type = self.get_product_type_for_instrument(instrument_id);
882        let account_id = self.core.account_id;
883        let http_client = self.http_client.clone();
884
885        self.spawn_task("cancel_all_orders", async move {
886            match http_client
887                .cancel_all_orders(account_id, product_type, instrument_id)
888                .await
889            {
890                Ok(reports) => {
891                    for report in reports {
892                        log::debug!("Cancelled order: {report:?}");
893                    }
894                }
895                Err(e) => {
896                    log::error!("Failed to cancel all orders for {instrument_id}: {e}");
897                }
898            }
899            Ok(())
900        });
901
902        Ok(())
903    }
904
905    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
906        if cmd.cancels.is_empty() {
907            return Ok(());
908        }
909
910        let instrument_id = cmd.instrument_id;
911        let product_type = self.get_product_type_for_instrument(instrument_id);
912
913        // Demo mode: cancel individually via HTTP (batch not supported)
914        if self.config.environment == BybitEnvironment::Demo {
915            let http_client = self.http_client.clone();
916            let account_id = self.core.account_id;
917            let strategy_id = cmd.strategy_id;
918            let emitter = self.emitter.clone();
919            let clock = self.clock;
920            let cancels: Vec<_> = cmd
921                .cancels
922                .iter()
923                .map(|c| (c.client_order_id, c.venue_order_id))
924                .collect();
925
926            self.spawn_task("batch_cancel_orders_http", async move {
927                for (client_order_id, venue_order_id) in cancels {
928                    if let Err(e) = http_client
929                        .cancel_order(
930                            account_id,
931                            product_type,
932                            instrument_id,
933                            Some(client_order_id),
934                            venue_order_id,
935                        )
936                        .await
937                    {
938                        let ts_event = clock.get_time_ns();
939                        emitter.emit_order_cancel_rejected_event(
940                            strategy_id,
941                            instrument_id,
942                            client_order_id,
943                            venue_order_id,
944                            &format!("cancel-order-error: {e}"),
945                            ts_event,
946                        );
947                    }
948                }
949                Ok(())
950            });
951
952            return Ok(());
953        }
954
955        let raw_symbol = Ustr::from(extract_raw_symbol(instrument_id.symbol.as_str()));
956
957        let mut cancel_params = Vec::with_capacity(cmd.cancels.len());
958        for cancel in &cmd.cancels {
959            cancel_params.push(BybitWsCancelOrderParams {
960                category: product_type,
961                symbol: raw_symbol,
962                order_id: cancel.venue_order_id.map(|v| v.to_string()),
963                order_link_id: Some(cancel.client_order_id.to_string()),
964            });
965        }
966
967        let ws_trade = self.ws_trade.clone();
968        let trader_id = cmd.trader_id;
969        let strategy_id = cmd.strategy_id;
970
971        self.spawn_task("batch_cancel_orders", async move {
972            ws_trade
973                .batch_cancel_orders(trader_id, strategy_id, cancel_params)
974                .await?;
975            Ok(())
976        });
977
978        Ok(())
979    }
980
981    async fn generate_order_status_report(
982        &self,
983        cmd: &GenerateOrderStatusReport,
984    ) -> anyhow::Result<Option<OrderStatusReport>> {
985        let Some(instrument_id) = cmd.instrument_id else {
986            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
987            return Ok(None);
988        };
989
990        let product_type = self.get_product_type_for_instrument(instrument_id);
991
992        let mut reports = self
993            .http_client
994            .request_order_status_reports(
995                self.core.account_id,
996                product_type,
997                Some(instrument_id),
998                false,
999                None,
1000                None,
1001                None,
1002            )
1003            .await?;
1004
1005        if let Some(client_order_id) = cmd.client_order_id {
1006            reports.retain(|report| report.client_order_id == Some(client_order_id));
1007        }
1008
1009        if let Some(venue_order_id) = cmd.venue_order_id {
1010            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
1011        }
1012
1013        Ok(reports.into_iter().next())
1014    }
1015
1016    async fn generate_order_status_reports(
1017        &self,
1018        cmd: &GenerateOrderStatusReports,
1019    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1020        let mut reports = Vec::new();
1021
1022        if let Some(instrument_id) = cmd.instrument_id {
1023            let product_type = self.get_product_type_for_instrument(instrument_id);
1024            let mut fetched = self
1025                .http_client
1026                .request_order_status_reports(
1027                    self.core.account_id,
1028                    product_type,
1029                    Some(instrument_id),
1030                    cmd.open_only,
1031                    None,
1032                    None,
1033                    None,
1034                )
1035                .await?;
1036            reports.append(&mut fetched);
1037        } else {
1038            for product_type in self.product_types() {
1039                let mut fetched = self
1040                    .http_client
1041                    .request_order_status_reports(
1042                        self.core.account_id,
1043                        product_type,
1044                        None,
1045                        cmd.open_only,
1046                        None,
1047                        None,
1048                        None,
1049                    )
1050                    .await?;
1051                reports.append(&mut fetched);
1052            }
1053        }
1054
1055        if let Some(start) = cmd.start {
1056            reports.retain(|r| r.ts_last >= start);
1057        }
1058        if let Some(end) = cmd.end {
1059            reports.retain(|r| r.ts_last <= end);
1060        }
1061
1062        Ok(reports)
1063    }
1064
1065    async fn generate_fill_reports(
1066        &self,
1067        cmd: GenerateFillReports,
1068    ) -> anyhow::Result<Vec<FillReport>> {
1069        let start_ms = nanos_to_millis(cmd.start);
1070        let end_ms = nanos_to_millis(cmd.end);
1071        let mut reports = Vec::new();
1072
1073        if let Some(instrument_id) = cmd.instrument_id {
1074            let product_type = self.get_product_type_for_instrument(instrument_id);
1075            let mut fetched = self
1076                .http_client
1077                .request_fill_reports(
1078                    self.core.account_id,
1079                    product_type,
1080                    Some(instrument_id),
1081                    start_ms,
1082                    end_ms,
1083                    None,
1084                )
1085                .await?;
1086            reports.append(&mut fetched);
1087        } else {
1088            for product_type in self.product_types() {
1089                let mut fetched = self
1090                    .http_client
1091                    .request_fill_reports(
1092                        self.core.account_id,
1093                        product_type,
1094                        None,
1095                        start_ms,
1096                        end_ms,
1097                        None,
1098                    )
1099                    .await?;
1100                reports.append(&mut fetched);
1101            }
1102        }
1103
1104        if let Some(venue_order_id) = cmd.venue_order_id {
1105            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
1106        }
1107
1108        Ok(reports)
1109    }
1110
1111    async fn generate_position_status_reports(
1112        &self,
1113        cmd: &GeneratePositionStatusReports,
1114    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1115        let mut reports = Vec::new();
1116
1117        if let Some(instrument_id) = cmd.instrument_id {
1118            let product_type = self.get_product_type_for_instrument(instrument_id);
1119
1120            // Skip Spot - positions API only supports derivatives
1121            if product_type != BybitProductType::Spot {
1122                let mut fetched = self
1123                    .http_client
1124                    .request_position_status_reports(
1125                        self.core.account_id,
1126                        product_type,
1127                        Some(instrument_id),
1128                    )
1129                    .await?;
1130                reports.append(&mut fetched);
1131            }
1132        } else {
1133            for product_type in self.product_types() {
1134                // Skip Spot - positions API only supports derivatives
1135                if product_type == BybitProductType::Spot {
1136                    continue;
1137                }
1138                let mut fetched = self
1139                    .http_client
1140                    .request_position_status_reports(self.core.account_id, product_type, None)
1141                    .await?;
1142                reports.append(&mut fetched);
1143            }
1144        }
1145
1146        Ok(reports)
1147    }
1148
1149    async fn generate_mass_status(
1150        &self,
1151        lookback_mins: Option<u64>,
1152    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1153        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1154
1155        let ts_now = self.clock.get_time_ns();
1156
1157        let start = lookback_mins.map(|mins| {
1158            let lookback_ns = mins * 60 * 1_000_000_000;
1159            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1160        });
1161
1162        let order_cmd = GenerateOrderStatusReportsBuilder::default()
1163            .ts_init(ts_now)
1164            .open_only(false)
1165            .start(start)
1166            .build()
1167            .map_err(|e| anyhow::anyhow!("{e}"))?;
1168
1169        let fill_cmd = GenerateFillReportsBuilder::default()
1170            .ts_init(ts_now)
1171            .start(start)
1172            .build()
1173            .map_err(|e| anyhow::anyhow!("{e}"))?;
1174
1175        let position_cmd = GeneratePositionStatusReportsBuilder::default()
1176            .ts_init(ts_now)
1177            .start(start)
1178            .build()
1179            .map_err(|e| anyhow::anyhow!("{e}"))?;
1180
1181        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1182            self.generate_order_status_reports(&order_cmd),
1183            self.generate_fill_reports(fill_cmd),
1184            self.generate_position_status_reports(&position_cmd),
1185        )?;
1186
1187        log::info!("Received {} OrderStatusReports", order_reports.len());
1188        log::info!("Received {} FillReports", fill_reports.len());
1189        log::info!("Received {} PositionReports", position_reports.len());
1190
1191        let mut mass_status = ExecutionMassStatus::new(
1192            self.core.client_id,
1193            self.core.account_id,
1194            *BYBIT_VENUE,
1195            ts_now,
1196            None,
1197        );
1198
1199        mass_status.add_order_reports(order_reports);
1200        mass_status.add_fill_reports(fill_reports);
1201        mass_status.add_position_reports(position_reports);
1202
1203        Ok(Some(mass_status))
1204    }
1205}
1206
1207/// Dispatches a WebSocket message using the event emitter.
1208fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1209    match message {
1210        NautilusWsMessage::AccountState(state) => {
1211            emitter.send_account_state(state);
1212        }
1213        NautilusWsMessage::PositionStatusReport(report) => {
1214            emitter.send_position_report(report);
1215        }
1216        NautilusWsMessage::OrderStatusReports(reports) => {
1217            log::debug!("Processing {} order status report(s)", reports.len());
1218            for report in reports {
1219                emitter.send_order_status_report(report);
1220            }
1221        }
1222        NautilusWsMessage::FillReports(reports) => {
1223            log::debug!("Processing {} fill report(s)", reports.len());
1224            for report in reports {
1225                emitter.send_fill_report(report);
1226            }
1227        }
1228        NautilusWsMessage::OrderRejected(event) => {
1229            emitter.send_order_event(OrderEventAny::Rejected(event));
1230        }
1231        NautilusWsMessage::OrderCancelRejected(event) => {
1232            emitter.send_order_event(OrderEventAny::CancelRejected(event));
1233        }
1234        NautilusWsMessage::OrderModifyRejected(event) => {
1235            emitter.send_order_event(OrderEventAny::ModifyRejected(event));
1236        }
1237        NautilusWsMessage::Error(e) => {
1238            log::warn!("Websocket error: code={} message={}", e.code, e.message);
1239        }
1240        NautilusWsMessage::Reconnected => {
1241            log::info!("Websocket reconnected");
1242        }
1243        NautilusWsMessage::Authenticated => {
1244            log::debug!("Websocket authenticated");
1245        }
1246        NautilusWsMessage::Deltas(_)
1247        | NautilusWsMessage::Data(_)
1248        | NautilusWsMessage::FundingRates(_)
1249        | NautilusWsMessage::MarkPrices(_)
1250        | NautilusWsMessage::IndexPrices(_) => {
1251            log::debug!("Ignoring websocket data message");
1252        }
1253    }
1254}
1255
1256fn nanos_to_millis(value: Option<UnixNanos>) -> Option<i64> {
1257    value.map(|nanos| (nanos.as_u64() / 1_000_000) as i64)
1258}