1use std::{
19 future::Future,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::{StreamExt, pin_mut};
27use nautilus_common::{
28 clients::ExecutionClient,
29 live::{get_runtime, runner::get_exec_event_sender},
30 messages::execution::{
31 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
33 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
34 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
35 SubmitOrderList,
36 },
37};
38use nautilus_core::{
39 MUTEX_POISONED, UnixNanos,
40 env::get_or_env_var,
41 time::{AtomicTime, get_atomic_clock_realtime},
42};
43use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
44use nautilus_model::{
45 accounts::AccountAny,
46 enums::{OmsType, OrderSide, OrderType, TimeInForce},
47 events::OrderEventAny,
48 identifiers::{AccountId, ClientId, InstrumentId, Venue},
49 orders::Order,
50 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51 types::{AccountBalance, MarginBalance},
52};
53use tokio::task::JoinHandle;
54use ustr::Ustr;
55
56use crate::{
57 common::{
58 consts::BYBIT_VENUE,
59 enums::{
60 BybitAccountType, BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType,
61 BybitTimeInForce,
62 },
63 parse::extract_raw_symbol,
64 },
65 config::BybitExecClientConfig,
66 http::client::BybitHttpClient,
67 websocket::{
68 client::BybitWebSocketClient,
69 messages::{
70 BybitWsAmendOrderParams, BybitWsCancelOrderParams, BybitWsPlaceOrderParams,
71 NautilusWsMessage,
72 },
73 },
74};
75
76#[derive(Debug)]
78pub struct BybitExecutionClient {
79 core: ExecutionClientCore,
80 clock: &'static AtomicTime,
81 config: BybitExecClientConfig,
82 emitter: ExecutionEventEmitter,
83 http_client: BybitHttpClient,
84 ws_private: BybitWebSocketClient,
85 ws_trade: BybitWebSocketClient,
86 ws_private_stream_handle: Option<JoinHandle<()>>,
87 ws_trade_stream_handle: Option<JoinHandle<()>>,
88 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
89}
90
91impl BybitExecutionClient {
92 pub fn new(core: ExecutionClientCore, config: BybitExecClientConfig) -> anyhow::Result<Self> {
98 let api_key = get_or_env_var(config.api_key.clone(), "BYBIT_API_KEY")?;
99 let api_secret = get_or_env_var(config.api_secret.clone(), "BYBIT_API_SECRET")?;
100
101 let http_client = BybitHttpClient::with_credentials(
102 api_key.clone(),
103 api_secret.clone(),
104 Some(config.http_base_url()),
105 config.http_timeout_secs,
106 config.max_retries,
107 config.retry_delay_initial_ms,
108 config.retry_delay_max_ms,
109 config.recv_window_ms,
110 config.http_proxy_url.clone(),
111 )?;
112
113 let ws_private = BybitWebSocketClient::new_private(
114 config.environment,
115 Some(api_key.clone()),
116 Some(api_secret.clone()),
117 Some(config.ws_private_url()),
118 config.heartbeat_interval_secs,
119 );
120
121 let ws_trade = BybitWebSocketClient::new_trade(
122 config.environment,
123 Some(api_key),
124 Some(api_secret),
125 Some(config.ws_trade_url()),
126 config.heartbeat_interval_secs,
127 );
128
129 let clock = get_atomic_clock_realtime();
130 let emitter = ExecutionEventEmitter::new(
131 clock,
132 core.trader_id,
133 core.account_id,
134 core.account_type,
135 None,
136 );
137
138 Ok(Self {
139 core,
140 clock,
141 config,
142 emitter,
143 http_client,
144 ws_private,
145 ws_trade,
146 ws_private_stream_handle: None,
147 ws_trade_stream_handle: None,
148 pending_tasks: Mutex::new(Vec::new()),
149 })
150 }
151
152 fn product_types(&self) -> Vec<BybitProductType> {
153 if self.config.product_types.is_empty() {
154 vec![BybitProductType::Linear]
155 } else {
156 self.config.product_types.clone()
157 }
158 }
159
160 async fn refresh_account_state(&self) -> anyhow::Result<()> {
161 let account_state = self
162 .http_client
163 .request_account_state(BybitAccountType::Unified, self.core.account_id)
164 .await
165 .context("failed to request Bybit account state")?;
166
167 self.emitter.send_account_state(account_state);
168 Ok(())
169 }
170
171 fn update_account_state(&self) -> anyhow::Result<()> {
172 let runtime = get_runtime();
173 runtime.block_on(self.refresh_account_state())
174 }
175
176 fn spawn_task<F>(&self, description: &'static str, fut: F)
177 where
178 F: Future<Output = anyhow::Result<()>> + Send + 'static,
179 {
180 let runtime = get_runtime();
181 let handle = runtime.spawn(async move {
182 if let Err(e) = fut.await {
183 log::warn!("{description} failed: {e:?}");
184 }
185 });
186
187 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
188 tasks.retain(|handle| !handle.is_finished());
189 tasks.push(handle);
190 }
191
192 fn abort_pending_tasks(&self) {
193 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
194 for handle in tasks.drain(..) {
195 handle.abort();
196 }
197 }
198
199 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
201 let account_id = self.core.account_id;
202
203 if self.core.cache().account(&account_id).is_some() {
204 log::info!("Account {account_id} registered");
205 return Ok(());
206 }
207
208 let start = Instant::now();
209 let timeout = Duration::from_secs_f64(timeout_secs);
210 let interval = Duration::from_millis(10);
211
212 loop {
213 tokio::time::sleep(interval).await;
214
215 if self.core.cache().account(&account_id).is_some() {
216 log::info!("Account {account_id} registered");
217 return Ok(());
218 }
219
220 if start.elapsed() >= timeout {
221 anyhow::bail!(
222 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
223 );
224 }
225 }
226 }
227
228 fn get_product_type_for_instrument(&self, instrument_id: InstrumentId) -> BybitProductType {
229 let symbol = instrument_id.symbol.as_str();
231 if symbol.ends_with("-SPOT") || (!symbol.contains('-') && !symbol.contains("PERP")) {
232 BybitProductType::Spot
233 } else if symbol.ends_with("-OPTION") {
234 BybitProductType::Option
235 } else if symbol.contains("USD") && !symbol.contains("USDT") && !symbol.contains("USDC") {
236 BybitProductType::Inverse
237 } else {
238 BybitProductType::Linear
239 }
240 }
241
242 fn map_order_type(order_type: OrderType) -> anyhow::Result<BybitOrderType> {
243 match order_type {
244 OrderType::Market => Ok(BybitOrderType::Market),
245 OrderType::Limit => Ok(BybitOrderType::Limit),
246 _ => anyhow::bail!("unsupported order type for Bybit: {order_type}"),
247 }
248 }
249
250 fn map_time_in_force(tif: TimeInForce, is_post_only: bool) -> BybitTimeInForce {
251 if is_post_only {
252 return BybitTimeInForce::PostOnly;
253 }
254 match tif {
255 TimeInForce::Gtc => BybitTimeInForce::Gtc,
256 TimeInForce::Ioc => BybitTimeInForce::Ioc,
257 TimeInForce::Fok => BybitTimeInForce::Fok,
258 _ => BybitTimeInForce::Gtc,
259 }
260 }
261}
262
263#[async_trait(?Send)]
264impl ExecutionClient for BybitExecutionClient {
265 fn is_connected(&self) -> bool {
266 self.core.is_connected()
267 }
268
269 fn client_id(&self) -> ClientId {
270 self.core.client_id
271 }
272
273 fn account_id(&self) -> AccountId {
274 self.core.account_id
275 }
276
277 fn venue(&self) -> Venue {
278 *BYBIT_VENUE
279 }
280
281 fn oms_type(&self) -> OmsType {
282 self.core.oms_type
283 }
284
285 fn get_account(&self) -> Option<AccountAny> {
286 self.core.cache().account(&self.core.account_id).cloned()
287 }
288
289 async fn connect(&mut self) -> anyhow::Result<()> {
290 if self.core.is_connected() {
291 return Ok(());
292 }
293
294 let product_types = self.product_types();
295
296 if !self.core.instruments_initialized() {
297 let mut all_instruments = Vec::new();
298 for product_type in &product_types {
299 let instruments = self
300 .http_client
301 .request_instruments(*product_type, None)
302 .await
303 .with_context(|| {
304 format!("failed to request Bybit instruments for {product_type:?}")
305 })?;
306
307 if instruments.is_empty() {
308 log::warn!("No instruments returned for {product_type:?}");
309 continue;
310 }
311
312 log::info!("Loaded {} {product_type:?} instruments", instruments.len());
313
314 self.http_client.cache_instruments(instruments.clone());
315 all_instruments.extend(instruments);
316 }
317
318 if !all_instruments.is_empty() {
319 self.ws_private.cache_instruments(all_instruments.clone());
320 self.ws_trade.cache_instruments(all_instruments);
321 }
322 self.core.set_instruments_initialized();
323 }
324
325 self.ws_private.set_account_id(self.core.account_id);
326 self.ws_trade.set_account_id(self.core.account_id);
327
328 self.ws_private.connect().await?;
329 self.ws_private.wait_until_active(10.0).await?;
330 log::info!("Connected to private WebSocket");
331
332 if self.ws_private_stream_handle.is_none() {
333 let stream = self.ws_private.stream();
334 let emitter = self.emitter.clone();
335 let handle = get_runtime().spawn(async move {
336 pin_mut!(stream);
337 while let Some(message) = stream.next().await {
338 dispatch_ws_message(message, &emitter);
339 }
340 });
341 self.ws_private_stream_handle = Some(handle);
342 }
343
344 if self.config.environment == BybitEnvironment::Demo {
346 log::warn!("Demo mode: Trade WebSocket not available, orders use HTTP REST API");
347 } else {
348 self.ws_trade.connect().await?;
349 self.ws_trade.wait_until_active(10.0).await?;
350 log::info!("Connected to trade WebSocket");
351
352 if self.ws_trade_stream_handle.is_none() {
353 let stream = self.ws_trade.stream();
354 let emitter = self.emitter.clone();
355 let handle = get_runtime().spawn(async move {
356 pin_mut!(stream);
357 while let Some(message) = stream.next().await {
358 dispatch_ws_message(message, &emitter);
359 }
360 });
361 self.ws_trade_stream_handle = Some(handle);
362 }
363 }
364
365 self.ws_private.subscribe_orders().await?;
366 self.ws_private.subscribe_executions().await?;
367 self.ws_private.subscribe_positions().await?;
368 self.ws_private.subscribe_wallet().await?;
369
370 let account_state = self
371 .http_client
372 .request_account_state(BybitAccountType::Unified, self.core.account_id)
373 .await
374 .context("failed to request Bybit account state")?;
375
376 if !account_state.balances.is_empty() {
377 log::info!(
378 "Received account state with {} balance(s)",
379 account_state.balances.len()
380 );
381 }
382 self.emitter.send_account_state(account_state);
383
384 self.await_account_registered(30.0).await?;
385
386 self.core.set_connected();
387 log::info!("Connected: client_id={}", self.core.client_id);
388 Ok(())
389 }
390
391 async fn disconnect(&mut self) -> anyhow::Result<()> {
392 if self.core.is_disconnected() {
393 return Ok(());
394 }
395
396 self.abort_pending_tasks();
397 self.http_client.cancel_all_requests();
398
399 if let Err(e) = self.ws_private.close().await {
400 log::warn!("Error closing private websocket: {e:?}");
401 }
402
403 if let Err(e) = self.ws_trade.close().await {
404 log::warn!("Error closing trade websocket: {e:?}");
405 }
406
407 if let Some(handle) = self.ws_private_stream_handle.take() {
408 handle.abort();
409 }
410
411 if let Some(handle) = self.ws_trade_stream_handle.take() {
412 handle.abort();
413 }
414
415 self.core.set_disconnected();
416 log::info!("Disconnected: client_id={}", self.core.client_id);
417 Ok(())
418 }
419
420 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
421 self.update_account_state()
422 }
423
424 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
425 log::debug!(
426 "query_order not implemented for Bybit execution client (client_order_id={})",
427 cmd.client_order_id
428 );
429 Ok(())
430 }
431
432 fn generate_account_state(
433 &self,
434 balances: Vec<AccountBalance>,
435 margins: Vec<MarginBalance>,
436 reported: bool,
437 ts_event: UnixNanos,
438 ) -> anyhow::Result<()> {
439 self.emitter
440 .emit_account_state(balances, margins, reported, ts_event);
441 Ok(())
442 }
443
444 fn start(&mut self) -> anyhow::Result<()> {
445 if self.core.is_started() {
446 return Ok(());
447 }
448
449 let sender = get_exec_event_sender();
450 self.emitter.set_sender(sender);
451 self.core.set_started();
452
453 let http_client = self.http_client.clone();
454 let mut ws_private = self.ws_private.clone();
455 let mut ws_trade = self.ws_trade.clone();
456 let product_types = self.config.product_types.clone();
457
458 get_runtime().spawn(async move {
459 let mut all_instruments = Vec::new();
460 for product_type in product_types {
461 match http_client.request_instruments(product_type, None).await {
462 Ok(instruments) => {
463 if instruments.is_empty() {
464 log::warn!("No instruments returned for {product_type:?}");
465 continue;
466 }
467 http_client.cache_instruments(instruments.clone());
468 all_instruments.extend(instruments);
469 }
470 Err(e) => {
471 log::error!("Failed to request instruments for {product_type:?}: {e}");
472 }
473 }
474 }
475
476 if all_instruments.is_empty() {
477 log::warn!(
478 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
479 );
480 } else {
481 ws_private.cache_instruments(all_instruments.clone());
482 ws_trade.cache_instruments(all_instruments);
483 log::info!("Instruments initialized");
484 }
485 });
486
487 log::info!(
488 "Started: client_id={}, account_id={}, account_type={:?}, product_types={:?}, environment={:?}, http_proxy_url={:?}, ws_proxy_url={:?}",
489 self.core.client_id,
490 self.core.account_id,
491 self.core.account_type,
492 self.config.product_types,
493 self.config.environment,
494 self.config.http_proxy_url,
495 self.config.ws_proxy_url,
496 );
497 Ok(())
498 }
499
500 fn stop(&mut self) -> anyhow::Result<()> {
501 if self.core.is_stopped() {
502 return Ok(());
503 }
504
505 self.core.set_stopped();
506 self.core.set_disconnected();
507 if let Some(handle) = self.ws_private_stream_handle.take() {
508 handle.abort();
509 }
510 if let Some(handle) = self.ws_trade_stream_handle.take() {
511 handle.abort();
512 }
513 self.abort_pending_tasks();
514 log::info!("Stopped: client_id={}", self.core.client_id);
515 Ok(())
516 }
517
518 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
519 let order = {
520 let cache = self.core.cache();
521 let order = cache
522 .order(&cmd.client_order_id)
523 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
524
525 if order.is_closed() {
526 log::warn!("Cannot submit closed order {}", order.client_order_id());
527 return Ok(());
528 }
529
530 order.clone()
531 };
532
533 if let Err(e) = BybitOrderSide::try_from(order.order_side()) {
535 self.emitter.emit_order_denied(&order, &e.to_string());
536 return Ok(());
537 }
538 if let Err(e) = Self::map_order_type(order.order_type()) {
539 self.emitter.emit_order_denied(&order, &e.to_string());
540 return Ok(());
541 }
542
543 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
544 self.emitter.emit_order_submitted(&order);
545
546 let instrument_id = order.instrument_id();
547 let product_type = self.get_product_type_for_instrument(instrument_id);
548 let client_order_id = order.client_order_id();
549 let strategy_id = order.strategy_id();
550 let emitter = self.emitter.clone();
551 let clock = self.clock;
552
553 if self.config.environment == BybitEnvironment::Demo {
554 let http_client = self.http_client.clone();
555 let account_id = self.core.account_id;
556 let order_side = order.order_side();
557 let order_type = order.order_type();
558 let quantity = order.quantity();
559 let time_in_force = order.time_in_force();
560 let price = order.price();
561 let trigger_price = order.trigger_price();
562 let post_only = order.is_post_only();
563 let reduce_only = order.is_reduce_only();
564
565 self.spawn_task("submit_order_http", async move {
566 let result = http_client
567 .submit_order(
568 account_id,
569 product_type,
570 instrument_id,
571 client_order_id,
572 order_side,
573 order_type,
574 quantity,
575 Some(time_in_force),
576 price,
577 trigger_price,
578 Some(post_only),
579 reduce_only,
580 false, false, )
583 .await;
584
585 if let Err(e) = result {
586 let ts_event = clock.get_time_ns();
587 emitter.emit_order_rejected_event(
588 strategy_id,
589 instrument_id,
590 client_order_id,
591 &format!("submit-order-error: {e}"),
592 ts_event,
593 false,
594 );
595 anyhow::bail!("submit order failed: {e}");
596 }
597
598 Ok(())
599 });
600
601 return Ok(());
602 }
603
604 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
605 let bybit_side = BybitOrderSide::try_from(order.order_side())?;
606 let bybit_order_type = Self::map_order_type(order.order_type())?;
607
608 let params = BybitWsPlaceOrderParams {
609 category: product_type,
610 symbol: Ustr::from(raw_symbol),
611 side: bybit_side,
612 order_type: bybit_order_type,
613 qty: order.quantity().to_string(),
614 is_leverage: None,
615 market_unit: None,
616 price: order.price().map(|p| p.to_string()),
617 time_in_force: Some(Self::map_time_in_force(
618 order.time_in_force(),
619 order.is_post_only(),
620 )),
621 order_link_id: Some(order.client_order_id().to_string()),
622 reduce_only: if order.is_reduce_only() {
623 Some(true)
624 } else {
625 None
626 },
627 close_on_trigger: None,
628 trigger_price: order.trigger_price().map(|p| p.to_string()),
629 trigger_by: None,
630 trigger_direction: None,
631 tpsl_mode: None,
632 take_profit: None,
633 stop_loss: None,
634 tp_trigger_by: None,
635 sl_trigger_by: None,
636 sl_trigger_price: None,
637 tp_trigger_price: None,
638 sl_order_type: None,
639 tp_order_type: None,
640 sl_limit_price: None,
641 tp_limit_price: None,
642 };
643
644 let ws_trade = self.ws_trade.clone();
645 let trader_id = self.core.trader_id;
646
647 self.spawn_task("submit_order", async move {
648 let result = ws_trade
649 .place_order(
650 params,
651 client_order_id,
652 trader_id,
653 strategy_id,
654 instrument_id,
655 )
656 .await
657 .map_err(|e| anyhow::anyhow!("submit order failed: {e}"));
658
659 if let Err(e) = result {
660 let ts_event = clock.get_time_ns();
661 emitter.emit_order_rejected_event(
662 strategy_id,
663 instrument_id,
664 client_order_id,
665 &format!("submit-order-error: {e}"),
666 ts_event,
667 false,
668 );
669 return Err(e);
670 }
671
672 Ok(())
673 });
674
675 Ok(())
676 }
677
678 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
679 log::warn!(
680 "submit_order_list not yet implemented for Bybit execution client (got {} orders)",
681 cmd.order_list.client_order_ids.len()
682 );
683 Ok(())
684 }
685
686 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
687 let instrument_id = cmd.instrument_id;
688 let product_type = self.get_product_type_for_instrument(instrument_id);
689 let client_order_id = cmd.client_order_id;
690 let strategy_id = cmd.strategy_id;
691 let venue_order_id = cmd.venue_order_id;
692 let emitter = self.emitter.clone();
693 let clock = self.clock;
694
695 if self.config.environment == BybitEnvironment::Demo {
696 let http_client = self.http_client.clone();
697 let account_id = self.core.account_id;
698 let quantity = cmd.quantity;
699 let price = cmd.price;
700
701 self.spawn_task("modify_order_http", async move {
702 let result = http_client
703 .modify_order(
704 account_id,
705 product_type,
706 instrument_id,
707 Some(client_order_id),
708 venue_order_id,
709 quantity,
710 price,
711 )
712 .await;
713
714 if let Err(e) = result {
715 let ts_event = clock.get_time_ns();
716 emitter.emit_order_modify_rejected_event(
717 strategy_id,
718 instrument_id,
719 client_order_id,
720 venue_order_id,
721 &format!("modify-order-error: {e}"),
722 ts_event,
723 );
724 anyhow::bail!("modify order failed: {e}");
725 }
726
727 Ok(())
728 });
729
730 return Ok(());
731 }
732
733 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
734
735 let params = BybitWsAmendOrderParams {
736 category: product_type,
737 symbol: Ustr::from(raw_symbol),
738 order_id: cmd.venue_order_id.map(|v| v.to_string()),
739 order_link_id: Some(cmd.client_order_id.to_string()),
740 qty: cmd.quantity.map(|q| q.to_string()),
741 price: cmd.price.map(|p| p.to_string()),
742 trigger_price: None,
743 take_profit: None,
744 stop_loss: None,
745 tp_trigger_by: None,
746 sl_trigger_by: None,
747 };
748
749 let ws_trade = self.ws_trade.clone();
750 let trader_id = cmd.trader_id;
751
752 self.spawn_task("modify_order", async move {
753 let result = ws_trade
754 .amend_order(
755 params,
756 client_order_id,
757 trader_id,
758 strategy_id,
759 instrument_id,
760 venue_order_id,
761 )
762 .await
763 .map_err(|e| anyhow::anyhow!("modify order failed: {e}"));
764
765 if let Err(e) = result {
766 let ts_event = clock.get_time_ns();
767 emitter.emit_order_modify_rejected_event(
768 strategy_id,
769 instrument_id,
770 client_order_id,
771 venue_order_id,
772 &format!("modify-order-error: {e}"),
773 ts_event,
774 );
775 return Err(e);
776 }
777
778 Ok(())
779 });
780
781 Ok(())
782 }
783
784 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
785 let instrument_id = cmd.instrument_id;
786 let product_type = self.get_product_type_for_instrument(instrument_id);
787 let client_order_id = cmd.client_order_id;
788 let strategy_id = cmd.strategy_id;
789 let venue_order_id = cmd.venue_order_id;
790 let emitter = self.emitter.clone();
791 let clock = self.clock;
792
793 if self.config.environment == BybitEnvironment::Demo {
794 let http_client = self.http_client.clone();
795 let account_id = self.core.account_id;
796
797 self.spawn_task("cancel_order_http", async move {
798 let result = http_client
799 .cancel_order(
800 account_id,
801 product_type,
802 instrument_id,
803 Some(client_order_id),
804 venue_order_id,
805 )
806 .await;
807
808 if let Err(e) = result {
809 let ts_event = clock.get_time_ns();
810 emitter.emit_order_cancel_rejected_event(
811 strategy_id,
812 instrument_id,
813 client_order_id,
814 venue_order_id,
815 &format!("cancel-order-error: {e}"),
816 ts_event,
817 );
818 anyhow::bail!("cancel order failed: {e}");
819 }
820
821 Ok(())
822 });
823
824 return Ok(());
825 }
826
827 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
828
829 let params = BybitWsCancelOrderParams {
830 category: product_type,
831 symbol: Ustr::from(raw_symbol),
832 order_id: cmd.venue_order_id.map(|v| v.to_string()),
833 order_link_id: Some(cmd.client_order_id.to_string()),
834 };
835
836 let ws_trade = self.ws_trade.clone();
837 let trader_id = cmd.trader_id;
838
839 self.spawn_task("cancel_order", async move {
840 let result = ws_trade
841 .cancel_order(
842 params,
843 client_order_id,
844 trader_id,
845 strategy_id,
846 instrument_id,
847 venue_order_id,
848 )
849 .await
850 .map_err(|e| anyhow::anyhow!("cancel order failed: {e}"));
851
852 if let Err(e) = result {
853 let ts_event = clock.get_time_ns();
854 emitter.emit_order_cancel_rejected_event(
855 strategy_id,
856 instrument_id,
857 client_order_id,
858 venue_order_id,
859 &format!("cancel-order-error: {e}"),
860 ts_event,
861 );
862 return Err(e);
863 }
864
865 Ok(())
866 });
867
868 Ok(())
869 }
870
871 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
872 if cmd.order_side != OrderSide::NoOrderSide {
873 log::warn!(
874 "Bybit does not support order_side filtering for cancel all orders; \
875 ignoring order_side={:?} and canceling all orders",
876 cmd.order_side,
877 );
878 }
879
880 let instrument_id = cmd.instrument_id;
881 let product_type = self.get_product_type_for_instrument(instrument_id);
882 let account_id = self.core.account_id;
883 let http_client = self.http_client.clone();
884
885 self.spawn_task("cancel_all_orders", async move {
886 match http_client
887 .cancel_all_orders(account_id, product_type, instrument_id)
888 .await
889 {
890 Ok(reports) => {
891 for report in reports {
892 log::debug!("Cancelled order: {report:?}");
893 }
894 }
895 Err(e) => {
896 log::error!("Failed to cancel all orders for {instrument_id}: {e}");
897 }
898 }
899 Ok(())
900 });
901
902 Ok(())
903 }
904
905 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
906 if cmd.cancels.is_empty() {
907 return Ok(());
908 }
909
910 let instrument_id = cmd.instrument_id;
911 let product_type = self.get_product_type_for_instrument(instrument_id);
912
913 if self.config.environment == BybitEnvironment::Demo {
915 let http_client = self.http_client.clone();
916 let account_id = self.core.account_id;
917 let strategy_id = cmd.strategy_id;
918 let emitter = self.emitter.clone();
919 let clock = self.clock;
920 let cancels: Vec<_> = cmd
921 .cancels
922 .iter()
923 .map(|c| (c.client_order_id, c.venue_order_id))
924 .collect();
925
926 self.spawn_task("batch_cancel_orders_http", async move {
927 for (client_order_id, venue_order_id) in cancels {
928 if let Err(e) = http_client
929 .cancel_order(
930 account_id,
931 product_type,
932 instrument_id,
933 Some(client_order_id),
934 venue_order_id,
935 )
936 .await
937 {
938 let ts_event = clock.get_time_ns();
939 emitter.emit_order_cancel_rejected_event(
940 strategy_id,
941 instrument_id,
942 client_order_id,
943 venue_order_id,
944 &format!("cancel-order-error: {e}"),
945 ts_event,
946 );
947 }
948 }
949 Ok(())
950 });
951
952 return Ok(());
953 }
954
955 let raw_symbol = Ustr::from(extract_raw_symbol(instrument_id.symbol.as_str()));
956
957 let mut cancel_params = Vec::with_capacity(cmd.cancels.len());
958 for cancel in &cmd.cancels {
959 cancel_params.push(BybitWsCancelOrderParams {
960 category: product_type,
961 symbol: raw_symbol,
962 order_id: cancel.venue_order_id.map(|v| v.to_string()),
963 order_link_id: Some(cancel.client_order_id.to_string()),
964 });
965 }
966
967 let ws_trade = self.ws_trade.clone();
968 let trader_id = cmd.trader_id;
969 let strategy_id = cmd.strategy_id;
970
971 self.spawn_task("batch_cancel_orders", async move {
972 ws_trade
973 .batch_cancel_orders(trader_id, strategy_id, cancel_params)
974 .await?;
975 Ok(())
976 });
977
978 Ok(())
979 }
980
981 async fn generate_order_status_report(
982 &self,
983 cmd: &GenerateOrderStatusReport,
984 ) -> anyhow::Result<Option<OrderStatusReport>> {
985 let Some(instrument_id) = cmd.instrument_id else {
986 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
987 return Ok(None);
988 };
989
990 let product_type = self.get_product_type_for_instrument(instrument_id);
991
992 let mut reports = self
993 .http_client
994 .request_order_status_reports(
995 self.core.account_id,
996 product_type,
997 Some(instrument_id),
998 false,
999 None,
1000 None,
1001 None,
1002 )
1003 .await?;
1004
1005 if let Some(client_order_id) = cmd.client_order_id {
1006 reports.retain(|report| report.client_order_id == Some(client_order_id));
1007 }
1008
1009 if let Some(venue_order_id) = cmd.venue_order_id {
1010 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
1011 }
1012
1013 Ok(reports.into_iter().next())
1014 }
1015
1016 async fn generate_order_status_reports(
1017 &self,
1018 cmd: &GenerateOrderStatusReports,
1019 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1020 let mut reports = Vec::new();
1021
1022 if let Some(instrument_id) = cmd.instrument_id {
1023 let product_type = self.get_product_type_for_instrument(instrument_id);
1024 let mut fetched = self
1025 .http_client
1026 .request_order_status_reports(
1027 self.core.account_id,
1028 product_type,
1029 Some(instrument_id),
1030 cmd.open_only,
1031 None,
1032 None,
1033 None,
1034 )
1035 .await?;
1036 reports.append(&mut fetched);
1037 } else {
1038 for product_type in self.product_types() {
1039 let mut fetched = self
1040 .http_client
1041 .request_order_status_reports(
1042 self.core.account_id,
1043 product_type,
1044 None,
1045 cmd.open_only,
1046 None,
1047 None,
1048 None,
1049 )
1050 .await?;
1051 reports.append(&mut fetched);
1052 }
1053 }
1054
1055 if let Some(start) = cmd.start {
1056 reports.retain(|r| r.ts_last >= start);
1057 }
1058 if let Some(end) = cmd.end {
1059 reports.retain(|r| r.ts_last <= end);
1060 }
1061
1062 Ok(reports)
1063 }
1064
1065 async fn generate_fill_reports(
1066 &self,
1067 cmd: GenerateFillReports,
1068 ) -> anyhow::Result<Vec<FillReport>> {
1069 let start_ms = nanos_to_millis(cmd.start);
1070 let end_ms = nanos_to_millis(cmd.end);
1071 let mut reports = Vec::new();
1072
1073 if let Some(instrument_id) = cmd.instrument_id {
1074 let product_type = self.get_product_type_for_instrument(instrument_id);
1075 let mut fetched = self
1076 .http_client
1077 .request_fill_reports(
1078 self.core.account_id,
1079 product_type,
1080 Some(instrument_id),
1081 start_ms,
1082 end_ms,
1083 None,
1084 )
1085 .await?;
1086 reports.append(&mut fetched);
1087 } else {
1088 for product_type in self.product_types() {
1089 let mut fetched = self
1090 .http_client
1091 .request_fill_reports(
1092 self.core.account_id,
1093 product_type,
1094 None,
1095 start_ms,
1096 end_ms,
1097 None,
1098 )
1099 .await?;
1100 reports.append(&mut fetched);
1101 }
1102 }
1103
1104 if let Some(venue_order_id) = cmd.venue_order_id {
1105 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
1106 }
1107
1108 Ok(reports)
1109 }
1110
1111 async fn generate_position_status_reports(
1112 &self,
1113 cmd: &GeneratePositionStatusReports,
1114 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1115 let mut reports = Vec::new();
1116
1117 if let Some(instrument_id) = cmd.instrument_id {
1118 let product_type = self.get_product_type_for_instrument(instrument_id);
1119
1120 if product_type != BybitProductType::Spot {
1122 let mut fetched = self
1123 .http_client
1124 .request_position_status_reports(
1125 self.core.account_id,
1126 product_type,
1127 Some(instrument_id),
1128 )
1129 .await?;
1130 reports.append(&mut fetched);
1131 }
1132 } else {
1133 for product_type in self.product_types() {
1134 if product_type == BybitProductType::Spot {
1136 continue;
1137 }
1138 let mut fetched = self
1139 .http_client
1140 .request_position_status_reports(self.core.account_id, product_type, None)
1141 .await?;
1142 reports.append(&mut fetched);
1143 }
1144 }
1145
1146 Ok(reports)
1147 }
1148
1149 async fn generate_mass_status(
1150 &self,
1151 lookback_mins: Option<u64>,
1152 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1153 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1154
1155 let ts_now = self.clock.get_time_ns();
1156
1157 let start = lookback_mins.map(|mins| {
1158 let lookback_ns = mins * 60 * 1_000_000_000;
1159 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1160 });
1161
1162 let order_cmd = GenerateOrderStatusReportsBuilder::default()
1163 .ts_init(ts_now)
1164 .open_only(false)
1165 .start(start)
1166 .build()
1167 .map_err(|e| anyhow::anyhow!("{e}"))?;
1168
1169 let fill_cmd = GenerateFillReportsBuilder::default()
1170 .ts_init(ts_now)
1171 .start(start)
1172 .build()
1173 .map_err(|e| anyhow::anyhow!("{e}"))?;
1174
1175 let position_cmd = GeneratePositionStatusReportsBuilder::default()
1176 .ts_init(ts_now)
1177 .start(start)
1178 .build()
1179 .map_err(|e| anyhow::anyhow!("{e}"))?;
1180
1181 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
1182 self.generate_order_status_reports(&order_cmd),
1183 self.generate_fill_reports(fill_cmd),
1184 self.generate_position_status_reports(&position_cmd),
1185 )?;
1186
1187 log::info!("Received {} OrderStatusReports", order_reports.len());
1188 log::info!("Received {} FillReports", fill_reports.len());
1189 log::info!("Received {} PositionReports", position_reports.len());
1190
1191 let mut mass_status = ExecutionMassStatus::new(
1192 self.core.client_id,
1193 self.core.account_id,
1194 *BYBIT_VENUE,
1195 ts_now,
1196 None,
1197 );
1198
1199 mass_status.add_order_reports(order_reports);
1200 mass_status.add_fill_reports(fill_reports);
1201 mass_status.add_position_reports(position_reports);
1202
1203 Ok(Some(mass_status))
1204 }
1205}
1206
1207fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1209 match message {
1210 NautilusWsMessage::AccountState(state) => {
1211 emitter.send_account_state(state);
1212 }
1213 NautilusWsMessage::PositionStatusReport(report) => {
1214 emitter.send_position_report(report);
1215 }
1216 NautilusWsMessage::OrderStatusReports(reports) => {
1217 log::debug!("Processing {} order status report(s)", reports.len());
1218 for report in reports {
1219 emitter.send_order_status_report(report);
1220 }
1221 }
1222 NautilusWsMessage::FillReports(reports) => {
1223 log::debug!("Processing {} fill report(s)", reports.len());
1224 for report in reports {
1225 emitter.send_fill_report(report);
1226 }
1227 }
1228 NautilusWsMessage::OrderRejected(event) => {
1229 emitter.send_order_event(OrderEventAny::Rejected(event));
1230 }
1231 NautilusWsMessage::OrderCancelRejected(event) => {
1232 emitter.send_order_event(OrderEventAny::CancelRejected(event));
1233 }
1234 NautilusWsMessage::OrderModifyRejected(event) => {
1235 emitter.send_order_event(OrderEventAny::ModifyRejected(event));
1236 }
1237 NautilusWsMessage::Error(e) => {
1238 log::warn!("Websocket error: code={} message={}", e.code, e.message);
1239 }
1240 NautilusWsMessage::Reconnected => {
1241 log::info!("Websocket reconnected");
1242 }
1243 NautilusWsMessage::Authenticated => {
1244 log::debug!("Websocket authenticated");
1245 }
1246 NautilusWsMessage::Deltas(_)
1247 | NautilusWsMessage::Data(_)
1248 | NautilusWsMessage::FundingRates(_)
1249 | NautilusWsMessage::MarkPrices(_)
1250 | NautilusWsMessage::IndexPrices(_) => {
1251 log::debug!("Ignoring websocket data message");
1252 }
1253 }
1254}
1255
1256fn nanos_to_millis(value: Option<UnixNanos>) -> Option<i64> {
1257 value.map(|nanos| (nanos.as_u64() / 1_000_000) as i64)
1258}