1use std::{
21 fmt::Debug,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU8, Ordering},
25 },
26 time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::live::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt};
34use nautilus_model::{
35 data::BarType,
36 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, PriceType, TimeInForce},
37 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
38 instruments::{Instrument, InstrumentAny},
39 types::{Price, Quantity},
40};
41use nautilus_network::{
42 backoff::ExponentialBackoff,
43 mode::ConnectionMode,
44 websocket::{
45 AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
46 channel_message_handler,
47 },
48};
49use serde_json::Value;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54 common::{
55 consts::{
56 BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
57 },
58 credential::Credential,
59 enums::{
60 BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
61 BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
62 },
63 parse::{bar_spec_to_bybit_interval, extract_raw_symbol, make_bybit_symbol},
64 symbol::BybitSymbol,
65 urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
66 },
67 websocket::{
68 enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
69 error::{BybitWsError, BybitWsResult},
70 handler::{FeedHandler, HandlerCommand},
71 messages::{
72 BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
73 BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
74 BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
75 NautilusWsMessage,
76 },
77 },
78};
79
80const DEFAULT_HEARTBEAT_SECS: u64 = 20;
81const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
82const BATCH_PROCESSING_LIMIT: usize = 20;
83
84type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
86
87fn resolve_credential(
94 environment: BybitEnvironment,
95 api_key: Option<String>,
96 api_secret: Option<String>,
97) -> Option<Credential> {
98 let (api_key_env, api_secret_env) = match environment {
99 BybitEnvironment::Demo => ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET"),
100 BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
101 BybitEnvironment::Mainnet => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
102 };
103
104 let key = get_or_env_var_opt(api_key, api_key_env);
105 let secret = get_or_env_var_opt(api_secret, api_secret_env);
106
107 match (key, secret) {
108 (Some(k), Some(s)) => Some(Credential::new(k, s)),
109 _ => None,
110 }
111}
112
113#[cfg_attr(feature = "python", pyo3::pyclass(from_py_object))]
115pub struct BybitWebSocketClient {
116 url: String,
117 environment: BybitEnvironment,
118 product_type: Option<BybitProductType>,
119 credential: Option<Credential>,
120 requires_auth: bool,
121 auth_tracker: AuthTracker,
122 heartbeat: Option<u64>,
123 connection_mode: Arc<ArcSwap<AtomicU8>>,
124 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
125 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
126 signal: Arc<AtomicBool>,
127 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
128 subscriptions: SubscriptionState,
129 account_id: Option<AccountId>,
130 mm_level: Arc<AtomicU8>,
131 bars_timestamp_on_close: bool,
132 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
133 bar_types_cache: Arc<DashMap<String, BarType>>,
134 funding_cache: FundingCache,
135 cancellation_token: CancellationToken,
136}
137
138impl Debug for BybitWebSocketClient {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 f.debug_struct(stringify!(BybitWebSocketClient))
141 .field("url", &self.url)
142 .field("environment", &self.environment)
143 .field("product_type", &self.product_type)
144 .field("requires_auth", &self.requires_auth)
145 .field("heartbeat", &self.heartbeat)
146 .field("confirmed_subscriptions", &self.subscriptions.len())
147 .finish()
148 }
149}
150
151impl Clone for BybitWebSocketClient {
152 fn clone(&self) -> Self {
153 Self {
154 url: self.url.clone(),
155 environment: self.environment,
156 product_type: self.product_type,
157 credential: self.credential.clone(),
158 requires_auth: self.requires_auth,
159 auth_tracker: self.auth_tracker.clone(),
160 heartbeat: self.heartbeat,
161 connection_mode: Arc::clone(&self.connection_mode),
162 cmd_tx: Arc::clone(&self.cmd_tx),
163 out_rx: None, signal: Arc::clone(&self.signal),
165 task_handle: None, subscriptions: self.subscriptions.clone(),
167 account_id: self.account_id,
168 mm_level: Arc::clone(&self.mm_level),
169 bars_timestamp_on_close: self.bars_timestamp_on_close,
170 instruments_cache: Arc::clone(&self.instruments_cache),
171 bar_types_cache: Arc::clone(&self.bar_types_cache),
172 funding_cache: Arc::clone(&self.funding_cache),
173 cancellation_token: self.cancellation_token.clone(),
174 }
175 }
176}
177
178impl BybitWebSocketClient {
179 #[must_use]
181 pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
182 Self::new_public_with(
183 BybitProductType::Linear,
184 BybitEnvironment::Mainnet,
185 url,
186 heartbeat,
187 )
188 }
189
190 #[must_use]
192 pub fn new_public_with(
193 product_type: BybitProductType,
194 environment: BybitEnvironment,
195 url: Option<String>,
196 heartbeat: Option<u64>,
197 ) -> Self {
198 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
202
203 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
204 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
205
206 Self {
207 url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
208 environment,
209 product_type: Some(product_type),
210 credential: None,
211 requires_auth: false,
212 auth_tracker: AuthTracker::new(),
213 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
214 connection_mode,
215 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
216 out_rx: None,
217 signal: Arc::new(AtomicBool::new(false)),
218 task_handle: None,
219 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
220 instruments_cache: Arc::new(DashMap::new()),
221 bar_types_cache: Arc::new(DashMap::new()),
222 account_id: None,
223 mm_level: Arc::new(AtomicU8::new(0)),
224 bars_timestamp_on_close: true,
225 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
226 cancellation_token: CancellationToken::new(),
227 }
228 }
229
230 #[must_use]
238 pub fn new_private(
239 environment: BybitEnvironment,
240 api_key: Option<String>,
241 api_secret: Option<String>,
242 url: Option<String>,
243 heartbeat: Option<u64>,
244 ) -> Self {
245 let credential = resolve_credential(environment, api_key, api_secret);
246
247 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
251
252 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
253 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
254
255 Self {
256 url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
257 environment,
258 product_type: None,
259 credential,
260 requires_auth: true,
261 auth_tracker: AuthTracker::new(),
262 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
263 connection_mode,
264 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
265 out_rx: None,
266 signal: Arc::new(AtomicBool::new(false)),
267 task_handle: None,
268 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
269 instruments_cache: Arc::new(DashMap::new()),
270 bar_types_cache: Arc::new(DashMap::new()),
271 account_id: None,
272 mm_level: Arc::new(AtomicU8::new(0)),
273 bars_timestamp_on_close: true,
274 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
275 cancellation_token: CancellationToken::new(),
276 }
277 }
278
279 #[must_use]
287 pub fn new_trade(
288 environment: BybitEnvironment,
289 api_key: Option<String>,
290 api_secret: Option<String>,
291 url: Option<String>,
292 heartbeat: Option<u64>,
293 ) -> Self {
294 let credential = resolve_credential(environment, api_key, api_secret);
295
296 let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
300
301 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
302 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
303
304 Self {
305 url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
306 environment,
307 product_type: None,
308 credential,
309 requires_auth: true,
310 auth_tracker: AuthTracker::new(),
311 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
312 connection_mode,
313 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
314 out_rx: None,
315 signal: Arc::new(AtomicBool::new(false)),
316 task_handle: None,
317 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
318 instruments_cache: Arc::new(DashMap::new()),
319 bar_types_cache: Arc::new(DashMap::new()),
320 account_id: None,
321 mm_level: Arc::new(AtomicU8::new(0)),
322 bars_timestamp_on_close: true,
323 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
324 cancellation_token: CancellationToken::new(),
325 }
326 }
327
328 pub async fn connect(&mut self) -> BybitWsResult<()> {
335 const MAX_RETRIES: u32 = 5;
336 const CONNECTION_TIMEOUT_SECS: u64 = 10;
337
338 self.signal.store(false, Ordering::Relaxed);
339
340 let (raw_handler, raw_rx) = channel_message_handler();
341
342 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
345 });
347
348 let ping_msg = serde_json::to_string(&BybitSubscription {
349 op: BybitWsOperation::Ping,
350 args: vec![],
351 })?;
352
353 let config = WebSocketConfig {
354 url: self.url.clone(),
355 headers: Self::default_headers(),
356 heartbeat: self.heartbeat,
357 heartbeat_msg: Some(ping_msg),
358 reconnect_timeout_ms: Some(5_000),
359 reconnect_delay_initial_ms: Some(500),
360 reconnect_delay_max_ms: Some(5_000),
361 reconnect_backoff_factor: Some(1.5),
362 reconnect_jitter_ms: Some(250),
363 reconnect_max_attempts: None,
364 };
365
366 let mut backoff = ExponentialBackoff::new(
369 Duration::from_millis(500),
370 Duration::from_millis(5000),
371 2.0,
372 250,
373 false,
374 )
375 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
376
377 #[allow(unused_assignments)]
378 let mut last_error = String::new();
379 let mut attempt = 0;
380 let client = loop {
381 attempt += 1;
382
383 match tokio::time::timeout(
384 Duration::from_secs(CONNECTION_TIMEOUT_SECS),
385 WebSocketClient::connect(
386 config.clone(),
387 Some(raw_handler.clone()),
388 Some(ping_handler.clone()),
389 None,
390 vec![],
391 None,
392 ),
393 )
394 .await
395 {
396 Ok(Ok(client)) => {
397 if attempt > 1 {
398 log::info!("WebSocket connection established after {attempt} attempts");
399 }
400 break client;
401 }
402 Ok(Err(e)) => {
403 last_error = e.to_string();
404 log::warn!(
405 "WebSocket connection attempt failed: attempt={attempt}, max_retries={MAX_RETRIES}, url={}, error={last_error}",
406 self.url
407 );
408 }
409 Err(_) => {
410 last_error = format!(
411 "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
412 );
413 log::warn!(
414 "WebSocket connection attempt timed out: attempt={attempt}, max_retries={MAX_RETRIES}, url={}",
415 self.url
416 );
417 }
418 }
419
420 if attempt >= MAX_RETRIES {
421 return Err(BybitWsError::Transport(format!(
422 "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
423 If this is a DNS error, check your network configuration and DNS settings.",
424 self.url,
425 if last_error.is_empty() {
426 "unknown error"
427 } else {
428 &last_error
429 }
430 )));
431 }
432
433 let delay = backoff.next_duration();
434 log::debug!(
435 "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
436 attempt + 1
437 );
438 tokio::time::sleep(delay).await;
439 };
440
441 self.connection_mode.store(client.connection_mode_atomic());
442
443 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
444 self.out_rx = Some(Arc::new(out_rx));
445
446 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
447 *self.cmd_tx.write().await = cmd_tx.clone();
448
449 let cmd = HandlerCommand::SetClient(client);
450
451 self.send_cmd(cmd).await?;
452
453 if !self.instruments_cache.is_empty() {
455 let cached_instruments: Vec<InstrumentAny> = self
456 .instruments_cache
457 .iter()
458 .map(|entry| entry.value().clone())
459 .collect();
460 let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
461 self.send_cmd(cmd).await?;
462 }
463
464 let signal = Arc::clone(&self.signal);
465 let subscriptions = self.subscriptions.clone();
466 let credential = self.credential.clone();
467 let requires_auth = self.requires_auth;
468 let funding_cache = Arc::clone(&self.funding_cache);
469 let bar_types_cache = Arc::clone(&self.bar_types_cache);
470 let account_id = self.account_id;
471 let product_type = self.product_type;
472 let bars_timestamp_on_close = self.bars_timestamp_on_close;
473 let mm_level = Arc::clone(&self.mm_level);
474 let cmd_tx_for_reconnect = cmd_tx.clone();
475 let auth_tracker = self.auth_tracker.clone();
476 let auth_tracker_for_handler = auth_tracker.clone();
477
478 let stream_handle = get_runtime().spawn(async move {
479 let mut handler = FeedHandler::new(
480 signal.clone(),
481 cmd_rx,
482 raw_rx,
483 out_tx.clone(),
484 account_id,
485 product_type,
486 bars_timestamp_on_close,
487 mm_level.clone(),
488 auth_tracker_for_handler,
489 subscriptions.clone(),
490 funding_cache.clone(),
491 bar_types_cache.clone(),
492 );
493
494 let resubscribe_all = || async {
496 let topics = subscriptions.all_topics();
497
498 if topics.is_empty() {
499 return;
500 }
501
502 log::debug!(
503 "Resubscribing to confirmed subscriptions: count={}",
504 topics.len()
505 );
506
507 for topic in &topics {
508 subscriptions.mark_subscribe(topic.as_str());
509 }
510
511 let mut payloads = Vec::with_capacity(topics.len());
512 for topic in &topics {
513 let message = BybitSubscription {
514 op: BybitWsOperation::Subscribe,
515 args: vec![topic.clone()],
516 };
517 if let Ok(payload) = serde_json::to_string(&message) {
518 payloads.push(payload);
519 }
520 }
521
522 let cmd = HandlerCommand::Subscribe { topics: payloads };
523
524 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
525 log::error!("Failed to send resubscribe command: {e}");
526 }
527 };
528
529 loop {
531 match handler.next().await {
532 Some(NautilusWsMessage::Reconnected) => {
533 if signal.load(Ordering::Relaxed) {
534 continue;
535 }
536
537 log::info!("WebSocket reconnected");
538
539 let confirmed_topics: Vec<String> = {
541 let confirmed = subscriptions.confirmed();
542 let mut topics = Vec::new();
543 for entry in confirmed.iter() {
544 let (channel, symbols) = entry.pair();
545 for symbol in symbols {
546 if symbol.is_empty() {
547 topics.push(channel.to_string());
548 } else {
549 topics.push(format!("{channel}.{symbol}"));
550 }
551 }
552 }
553 topics
554 };
555
556 if !confirmed_topics.is_empty() {
557 log::debug!(
558 "Marking confirmed subscriptions as pending for replay: count={}",
559 confirmed_topics.len()
560 );
561 for topic in confirmed_topics {
562 subscriptions.mark_failure(&topic);
563 }
564 }
565
566 funding_cache.write().await.clear();
568
569 if requires_auth {
570 log::debug!("Re-authenticating after reconnection");
571
572 if let Some(cred) = &credential {
573 let _rx = auth_tracker.begin();
575
576 let expires = chrono::Utc::now().timestamp_millis()
577 + WEBSOCKET_AUTH_WINDOW_MS;
578 let signature = cred.sign_websocket_auth(expires);
579
580 let auth_message = BybitAuthRequest {
581 op: BybitWsOperation::Auth,
582 args: vec![
583 Value::String(cred.api_key().to_string()),
584 Value::Number(expires.into()),
585 Value::String(signature),
586 ],
587 };
588
589 if let Ok(payload) = serde_json::to_string(&auth_message) {
590 let cmd = HandlerCommand::Authenticate { payload };
591 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
592 log::error!(
593 "Failed to send reconnection auth command: error={e}"
594 );
595 }
596 } else {
597 log::error!("Failed to serialize reconnection auth message");
598 }
599 }
600 }
601
602 if !requires_auth {
605 log::debug!("No authentication required, resubscribing immediately");
606 resubscribe_all().await;
607 }
608
609 if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
611 log::debug!("Receiver dropped, stopping");
612 break;
613 }
614 continue;
615 }
616 Some(NautilusWsMessage::Authenticated) => {
617 log::debug!("Authenticated, resubscribing");
618 resubscribe_all().await;
619 continue;
620 }
621 Some(msg) => {
622 if out_tx.send(msg).is_err() {
623 log::error!("Failed to send message (receiver dropped)");
624 break;
625 }
626 }
627 None => {
628 if handler.is_stopped() {
630 log::debug!("Stop signal received, ending message processing");
631 break;
632 }
633 log::warn!("WebSocket stream ended unexpectedly");
635 break;
636 }
637 }
638 }
639
640 log::debug!("Handler task exiting");
641 });
642
643 self.task_handle = Some(Arc::new(stream_handle));
644
645 if requires_auth && let Err(e) = self.authenticate_if_required().await {
646 return Err(e);
647 }
648
649 Ok(())
650 }
651
652 pub async fn close(&mut self) -> BybitWsResult<()> {
654 log::debug!("Starting close process");
655
656 self.signal.store(true, Ordering::Relaxed);
657
658 let cmd = HandlerCommand::Disconnect;
659 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
660 log::debug!(
661 "Failed to send disconnect command (handler may already be shut down): {e}"
662 );
663 }
664
665 if let Some(task_handle) = self.task_handle.take() {
666 match Arc::try_unwrap(task_handle) {
667 Ok(handle) => {
668 log::debug!("Waiting for task handle to complete");
669 match tokio::time::timeout(Duration::from_secs(2), handle).await {
670 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
671 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
672 Err(_) => {
673 log::warn!(
674 "Timeout waiting for task handle, task may still be running"
675 );
676 }
678 }
679 }
680 Err(arc_handle) => {
681 log::debug!(
682 "Cannot take ownership of task handle - other references exist, aborting task"
683 );
684 arc_handle.abort();
685 }
686 }
687 } else {
688 log::debug!("No task handle to await");
689 }
690
691 self.auth_tracker.invalidate();
692
693 log::debug!("Closed");
694
695 Ok(())
696 }
697
698 #[must_use]
700 pub fn is_active(&self) -> bool {
701 let connection_mode_arc = self.connection_mode.load();
702 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
703 && !self.signal.load(Ordering::Relaxed)
704 }
705
706 pub fn is_closed(&self) -> bool {
708 let connection_mode_arc = self.connection_mode.load();
709 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
710 || self.signal.load(Ordering::Relaxed)
711 }
712
713 pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
719 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
720
721 tokio::time::timeout(timeout, async {
722 while !self.is_active() {
723 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
724 }
725 })
726 .await
727 .map_err(|_| {
728 BybitWsError::ClientError(format!(
729 "WebSocket connection timeout after {timeout_secs} seconds"
730 ))
731 })?;
732
733 Ok(())
734 }
735
736 pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
738 if topics.is_empty() {
739 return Ok(());
740 }
741
742 log::debug!("Subscribing to topics: {topics:?}");
743
744 let mut topics_to_send = Vec::new();
746
747 for topic in topics {
748 if self.subscriptions.add_reference(&topic) {
750 self.subscriptions.mark_subscribe(&topic);
751 topics_to_send.push(topic.clone());
752 } else {
753 log::debug!("Already subscribed to {topic}, skipping duplicate subscription");
754 }
755 }
756
757 if topics_to_send.is_empty() {
758 return Ok(());
759 }
760
761 let mut payloads = Vec::with_capacity(topics_to_send.len());
763 for topic in &topics_to_send {
764 let message = BybitSubscription {
765 op: BybitWsOperation::Subscribe,
766 args: vec![topic.clone()],
767 };
768 let payload = serde_json::to_string(&message).map_err(|e| {
769 BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
770 })?;
771 payloads.push(payload);
772 }
773
774 let cmd = HandlerCommand::Subscribe { topics: payloads };
775 self.cmd_tx
776 .read()
777 .await
778 .send(cmd)
779 .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
780
781 Ok(())
782 }
783
784 pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
786 if topics.is_empty() {
787 return Ok(());
788 }
789
790 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
791
792 if self.signal.load(Ordering::Relaxed) {
793 log::debug!("Shutdown signal detected, skipping unsubscribe");
794 return Ok(());
795 }
796
797 let mut topics_to_send = Vec::new();
799
800 for topic in topics {
801 if self.subscriptions.remove_reference(&topic) {
803 self.subscriptions.mark_unsubscribe(&topic);
804 topics_to_send.push(topic.clone());
805 } else {
806 log::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
807 }
808 }
809
810 if topics_to_send.is_empty() {
811 return Ok(());
812 }
813
814 let mut payloads = Vec::with_capacity(topics_to_send.len());
816 for topic in &topics_to_send {
817 let message = BybitSubscription {
818 op: BybitWsOperation::Unsubscribe,
819 args: vec![topic.clone()],
820 };
821 if let Ok(payload) = serde_json::to_string(&message) {
822 payloads.push(payload);
823 }
824 }
825
826 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
827 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
828 log::debug!("Failed to send unsubscribe command: error={e}");
829 }
830
831 Ok(())
832 }
833
834 pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
840 let rx = self
841 .out_rx
842 .take()
843 .expect("Stream receiver already taken or client not connected");
844 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
845 async_stream::stream! {
846 while let Some(msg) = rx.recv().await {
847 yield msg;
848 }
849 }
850 }
851
852 #[must_use]
854 pub fn subscription_count(&self) -> usize {
855 self.subscriptions.len()
856 }
857
858 #[must_use]
860 pub fn credential(&self) -> Option<&Credential> {
861 self.credential.as_ref()
862 }
863
864 pub fn cache_instrument(&self, instrument: InstrumentAny) {
868 self.instruments_cache
869 .insert(instrument.symbol().inner(), instrument.clone());
870
871 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
874 let cmd = HandlerCommand::UpdateInstrument(instrument);
875 if let Err(e) = cmd_tx.send(cmd) {
876 log::debug!("Failed to send instrument update to handler: {e}");
877 }
878 }
879 }
880
881 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
885 self.instruments_cache.clear();
886 let mut count = 0;
887
888 log::debug!("Initializing Bybit instrument cache");
889
890 for inst in instruments {
891 let symbol = inst.symbol().inner();
892 self.instruments_cache.insert(symbol, inst.clone());
893 log::debug!("Cached instrument: {symbol}");
894 count += 1;
895 }
896
897 log::info!("Bybit instrument cache initialized with {count} instruments");
898 }
899
900 pub fn set_account_id(&mut self, account_id: AccountId) {
902 self.account_id = Some(account_id);
903 }
904
905 pub fn set_mm_level(&self, mm_level: u8) {
907 self.mm_level.store(mm_level, Ordering::Relaxed);
908 }
909
910 pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
915 self.bars_timestamp_on_close = value;
916 }
917
918 #[must_use]
920 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
921 &self.instruments_cache
922 }
923
924 #[must_use]
926 pub fn account_id(&self) -> Option<AccountId> {
927 self.account_id
928 }
929
930 #[must_use]
932 pub fn product_type(&self) -> Option<BybitProductType> {
933 self.product_type
934 }
935
936 pub async fn subscribe_orderbook(
946 &self,
947 instrument_id: InstrumentId,
948 depth: u32,
949 ) -> BybitWsResult<()> {
950 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
951 let topic = format!(
952 "{}.{depth}.{raw_symbol}",
953 BybitWsPublicChannel::OrderBook.as_ref()
954 );
955 self.subscribe(vec![topic]).await
956 }
957
958 pub async fn unsubscribe_orderbook(
960 &self,
961 instrument_id: InstrumentId,
962 depth: u32,
963 ) -> BybitWsResult<()> {
964 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
965 let topic = format!(
966 "{}.{depth}.{raw_symbol}",
967 BybitWsPublicChannel::OrderBook.as_ref()
968 );
969 self.unsubscribe(vec![topic]).await
970 }
971
972 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
982 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
983 let topic = format!(
984 "{}.{raw_symbol}",
985 BybitWsPublicChannel::PublicTrade.as_ref()
986 );
987 self.subscribe(vec![topic]).await
988 }
989
990 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
992 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
993 let topic = format!(
994 "{}.{raw_symbol}",
995 BybitWsPublicChannel::PublicTrade.as_ref()
996 );
997 self.unsubscribe(vec![topic]).await
998 }
999
1000 pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1010 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1011 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1012 self.subscribe(vec![topic]).await
1013 }
1014
1015 pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1017 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1018 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1019
1020 let symbol = self.product_type.map_or_else(
1022 || instrument_id.symbol.inner(),
1023 |pt| make_bybit_symbol(raw_symbol, pt),
1024 );
1025 self.funding_cache.write().await.remove(&symbol);
1026
1027 self.unsubscribe(vec![topic]).await
1028 }
1029
1030 pub async fn subscribe_bars(&self, bar_type: BarType) -> BybitWsResult<()> {
1040 let spec = bar_type.spec();
1041
1042 if spec.price_type != PriceType::Last {
1044 return Err(BybitWsError::ClientError(format!(
1045 "Bybit bars only support LAST price type, received {}",
1046 spec.price_type
1047 )));
1048 }
1049 if bar_type.aggregation_source() != AggregationSource::External {
1050 return Err(BybitWsError::ClientError(format!(
1051 "Bybit bars only support EXTERNAL aggregation source, received {}",
1052 bar_type.aggregation_source()
1053 )));
1054 }
1055
1056 let step = spec.step.get();
1058 if spec.aggregation == BarAggregation::Minute && step >= 60 {
1059 let hours = step / 60;
1060 return Err(BybitWsError::ClientError(format!(
1061 "Invalid bar type: {step}-MINUTE not supported, use {hours}-HOUR instead"
1062 )));
1063 }
1064
1065 let interval = bar_spec_to_bybit_interval(spec.aggregation, spec.step.get() as u64)
1066 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1067
1068 let instrument_id = bar_type.instrument_id();
1069 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1070 let topic = format!(
1071 "{}.{}.{raw_symbol}",
1072 BybitWsPublicChannel::Kline.as_ref(),
1073 interval
1074 );
1075
1076 if self.subscriptions.get_reference_count(&topic) == 0 {
1078 self.bar_types_cache.insert(topic.clone(), bar_type);
1079 }
1080
1081 self.subscribe(vec![topic]).await
1082 }
1083
1084 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> BybitWsResult<()> {
1086 let spec = bar_type.spec();
1087 let interval = bar_spec_to_bybit_interval(spec.aggregation, spec.step.get() as u64)
1088 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1089
1090 let instrument_id = bar_type.instrument_id();
1091 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1092 let topic = format!(
1093 "{}.{}.{raw_symbol}",
1094 BybitWsPublicChannel::Kline.as_ref(),
1095 interval
1096 );
1097
1098 if self.subscriptions.get_reference_count(&topic) == 1 {
1100 self.bar_types_cache.remove(&topic);
1101 }
1102
1103 self.unsubscribe(vec![topic]).await
1104 }
1105
1106 pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1116 if !self.requires_auth {
1117 return Err(BybitWsError::Authentication(
1118 "Order subscription requires authentication".to_string(),
1119 ));
1120 }
1121 self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1122 .await
1123 }
1124
1125 pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1127 self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1128 .await
1129 }
1130
1131 pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1141 if !self.requires_auth {
1142 return Err(BybitWsError::Authentication(
1143 "Execution subscription requires authentication".to_string(),
1144 ));
1145 }
1146 self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1147 .await
1148 }
1149
1150 pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1152 self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1153 .await
1154 }
1155
1156 pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1166 if !self.requires_auth {
1167 return Err(BybitWsError::Authentication(
1168 "Position subscription requires authentication".to_string(),
1169 ));
1170 }
1171 self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1172 .await
1173 }
1174
1175 pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1177 self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1178 .await
1179 }
1180
1181 pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1191 if !self.requires_auth {
1192 return Err(BybitWsError::Authentication(
1193 "Wallet subscription requires authentication".to_string(),
1194 ));
1195 }
1196 self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1197 .await
1198 }
1199
1200 pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1202 self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1203 .await
1204 }
1205
1206 pub async fn place_order(
1216 &self,
1217 params: BybitWsPlaceOrderParams,
1218 client_order_id: ClientOrderId,
1219 trader_id: TraderId,
1220 strategy_id: StrategyId,
1221 instrument_id: InstrumentId,
1222 ) -> BybitWsResult<()> {
1223 if !self.auth_tracker.is_authenticated() {
1224 return Err(BybitWsError::Authentication(
1225 "Must be authenticated to place orders".to_string(),
1226 ));
1227 }
1228
1229 let cmd = HandlerCommand::PlaceOrder {
1230 params,
1231 client_order_id,
1232 trader_id,
1233 strategy_id,
1234 instrument_id,
1235 };
1236
1237 self.send_cmd(cmd).await
1238 }
1239
1240 pub async fn amend_order(
1250 &self,
1251 params: BybitWsAmendOrderParams,
1252 client_order_id: ClientOrderId,
1253 trader_id: TraderId,
1254 strategy_id: StrategyId,
1255 instrument_id: InstrumentId,
1256 venue_order_id: Option<VenueOrderId>,
1257 ) -> BybitWsResult<()> {
1258 if !self.auth_tracker.is_authenticated() {
1259 return Err(BybitWsError::Authentication(
1260 "Must be authenticated to amend orders".to_string(),
1261 ));
1262 }
1263
1264 let cmd = HandlerCommand::AmendOrder {
1265 params,
1266 client_order_id,
1267 trader_id,
1268 strategy_id,
1269 instrument_id,
1270 venue_order_id,
1271 };
1272
1273 self.send_cmd(cmd).await
1274 }
1275
1276 pub async fn cancel_order(
1286 &self,
1287 params: BybitWsCancelOrderParams,
1288 client_order_id: ClientOrderId,
1289 trader_id: TraderId,
1290 strategy_id: StrategyId,
1291 instrument_id: InstrumentId,
1292 venue_order_id: Option<VenueOrderId>,
1293 ) -> BybitWsResult<()> {
1294 if !self.auth_tracker.is_authenticated() {
1295 return Err(BybitWsError::Authentication(
1296 "Must be authenticated to cancel orders".to_string(),
1297 ));
1298 }
1299
1300 let cmd = HandlerCommand::CancelOrder {
1301 params,
1302 client_order_id,
1303 trader_id,
1304 strategy_id,
1305 instrument_id,
1306 venue_order_id,
1307 };
1308
1309 self.send_cmd(cmd).await
1310 }
1311
1312 pub async fn batch_place_orders(
1322 &self,
1323 trader_id: TraderId,
1324 strategy_id: StrategyId,
1325 orders: Vec<BybitWsPlaceOrderParams>,
1326 ) -> BybitWsResult<()> {
1327 if !self.auth_tracker.is_authenticated() {
1328 return Err(BybitWsError::Authentication(
1329 "Must be authenticated to place orders".to_string(),
1330 ));
1331 }
1332
1333 if orders.is_empty() {
1334 log::warn!("Batch place orders called with empty orders list");
1335 return Ok(());
1336 }
1337
1338 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1339 self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1340 .await?;
1341 }
1342
1343 Ok(())
1344 }
1345
1346 async fn batch_place_orders_chunk(
1347 &self,
1348 trader_id: TraderId,
1349 strategy_id: StrategyId,
1350 orders: Vec<BybitWsPlaceOrderParams>,
1351 ) -> BybitWsResult<()> {
1352 let category = orders[0].category;
1353 let batch_req_id = UUID4::new().to_string();
1354
1355 let mut batch_order_data = Vec::new();
1357 for order in &orders {
1358 if let Some(order_link_id_str) = &order.order_link_id {
1359 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1360 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1361 let instrument_id = self
1362 .instruments_cache
1363 .get(&cache_key)
1364 .map(|inst| inst.id())
1365 .ok_or_else(|| {
1366 BybitWsError::ClientError(format!(
1367 "Instrument {cache_key} not found in cache"
1368 ))
1369 })?;
1370 batch_order_data.push((
1371 client_order_id,
1372 (client_order_id, trader_id, strategy_id, instrument_id),
1373 ));
1374 }
1375 }
1376
1377 if !batch_order_data.is_empty() {
1378 let cmd = HandlerCommand::RegisterBatchPlace {
1379 req_id: batch_req_id.clone(),
1380 orders: batch_order_data,
1381 };
1382 let cmd_tx = self.cmd_tx.read().await;
1383 if let Err(e) = cmd_tx.send(cmd) {
1384 log::error!("Failed to send RegisterBatchPlace command: {e}");
1385 }
1386 }
1387
1388 let mm_level = self.mm_level.load(Ordering::Relaxed);
1389 let has_non_post_only = orders
1390 .iter()
1391 .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1392 let referer = if has_non_post_only || mm_level == 0 {
1393 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1394 } else {
1395 None
1396 };
1397
1398 let request_items: Vec<BybitWsBatchPlaceItem> = orders
1399 .into_iter()
1400 .map(|order| BybitWsBatchPlaceItem {
1401 symbol: order.symbol,
1402 side: order.side,
1403 order_type: order.order_type,
1404 qty: order.qty,
1405 is_leverage: order.is_leverage,
1406 market_unit: order.market_unit,
1407 price: order.price,
1408 time_in_force: order.time_in_force,
1409 order_link_id: order.order_link_id,
1410 reduce_only: order.reduce_only,
1411 close_on_trigger: order.close_on_trigger,
1412 trigger_price: order.trigger_price,
1413 trigger_by: order.trigger_by,
1414 trigger_direction: order.trigger_direction,
1415 tpsl_mode: order.tpsl_mode,
1416 take_profit: order.take_profit,
1417 stop_loss: order.stop_loss,
1418 tp_trigger_by: order.tp_trigger_by,
1419 sl_trigger_by: order.sl_trigger_by,
1420 sl_trigger_price: order.sl_trigger_price,
1421 tp_trigger_price: order.tp_trigger_price,
1422 sl_order_type: order.sl_order_type,
1423 tp_order_type: order.tp_order_type,
1424 sl_limit_price: order.sl_limit_price,
1425 tp_limit_price: order.tp_limit_price,
1426 })
1427 .collect();
1428
1429 let args = BybitWsBatchPlaceOrderArgs {
1430 category,
1431 request: request_items,
1432 };
1433
1434 let request = BybitWsRequest {
1435 req_id: Some(batch_req_id),
1436 op: BybitWsOrderRequestOp::CreateBatch,
1437 header: BybitWsHeader::with_referer(referer),
1438 args: vec![args],
1439 };
1440
1441 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1442
1443 self.send_text(&payload).await
1444 }
1445
1446 pub async fn batch_amend_orders(
1452 &self,
1453 #[allow(unused_variables)] trader_id: TraderId,
1454 #[allow(unused_variables)] strategy_id: StrategyId,
1455 orders: Vec<BybitWsAmendOrderParams>,
1456 ) -> BybitWsResult<()> {
1457 if !self.auth_tracker.is_authenticated() {
1458 return Err(BybitWsError::Authentication(
1459 "Must be authenticated to amend orders".to_string(),
1460 ));
1461 }
1462
1463 if orders.is_empty() {
1464 log::warn!("Batch amend orders called with empty orders list");
1465 return Ok(());
1466 }
1467
1468 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1469 self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1470 .await?;
1471 }
1472
1473 Ok(())
1474 }
1475
1476 async fn batch_amend_orders_chunk(
1477 &self,
1478 #[allow(unused_variables)] trader_id: TraderId,
1479 #[allow(unused_variables)] strategy_id: StrategyId,
1480 orders: Vec<BybitWsAmendOrderParams>,
1481 ) -> BybitWsResult<()> {
1482 let request = BybitWsRequest {
1483 req_id: None,
1484 op: BybitWsOrderRequestOp::AmendBatch,
1485 header: BybitWsHeader::now(),
1486 args: orders,
1487 };
1488
1489 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1490
1491 self.send_text(&payload).await
1492 }
1493
1494 pub async fn batch_cancel_orders(
1500 &self,
1501 trader_id: TraderId,
1502 strategy_id: StrategyId,
1503 orders: Vec<BybitWsCancelOrderParams>,
1504 ) -> BybitWsResult<()> {
1505 if !self.auth_tracker.is_authenticated() {
1506 return Err(BybitWsError::Authentication(
1507 "Must be authenticated to cancel orders".to_string(),
1508 ));
1509 }
1510
1511 if orders.is_empty() {
1512 log::warn!("Batch cancel orders called with empty orders list");
1513 return Ok(());
1514 }
1515
1516 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1517 self.batch_cancel_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1518 .await?;
1519 }
1520
1521 Ok(())
1522 }
1523
1524 async fn batch_cancel_orders_chunk(
1525 &self,
1526 trader_id: TraderId,
1527 strategy_id: StrategyId,
1528 orders: Vec<BybitWsCancelOrderParams>,
1529 ) -> BybitWsResult<()> {
1530 if orders.is_empty() {
1531 return Ok(());
1532 }
1533
1534 let category = orders[0].category;
1535 let batch_req_id = UUID4::new().to_string();
1536
1537 let mut validated_data = Vec::new();
1538
1539 for order in &orders {
1540 if let Some(order_link_id_str) = &order.order_link_id {
1541 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1542 let instrument_id = self
1543 .instruments_cache
1544 .get(&cache_key)
1545 .map(|inst| inst.id())
1546 .ok_or_else(|| {
1547 BybitWsError::ClientError(format!(
1548 "Instrument {cache_key} not found in cache"
1549 ))
1550 })?;
1551
1552 let venue_order_id = order
1553 .order_id
1554 .as_ref()
1555 .map(|id| VenueOrderId::from(id.as_str()));
1556
1557 validated_data.push((order_link_id_str.clone(), instrument_id, venue_order_id));
1558 }
1559 }
1560
1561 let batch_cancel_data: Vec<_> = validated_data
1562 .iter()
1563 .map(|(order_link_id_str, instrument_id, venue_order_id)| {
1564 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1565 (
1566 client_order_id,
1567 (
1568 client_order_id,
1569 trader_id,
1570 strategy_id,
1571 *instrument_id,
1572 *venue_order_id,
1573 ),
1574 )
1575 })
1576 .collect();
1577
1578 if !batch_cancel_data.is_empty() {
1579 let cmd = HandlerCommand::RegisterBatchCancel {
1580 req_id: batch_req_id.clone(),
1581 cancels: batch_cancel_data,
1582 };
1583 let cmd_tx = self.cmd_tx.read().await;
1584 if let Err(e) = cmd_tx.send(cmd) {
1585 log::error!("Failed to send RegisterBatchCancel command: {e}");
1586 }
1587 }
1588
1589 let request_items: Vec<BybitWsBatchCancelItem> = orders
1590 .into_iter()
1591 .map(|order| BybitWsBatchCancelItem {
1592 symbol: order.symbol,
1593 order_id: order.order_id,
1594 order_link_id: order.order_link_id,
1595 })
1596 .collect();
1597
1598 let args = BybitWsBatchCancelOrderArgs {
1599 category,
1600 request: request_items,
1601 };
1602
1603 let request = BybitWsRequest {
1604 req_id: Some(batch_req_id),
1605 op: BybitWsOrderRequestOp::CancelBatch,
1606 header: BybitWsHeader::now(),
1607 args: vec![args],
1608 };
1609
1610 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1611
1612 self.send_text(&payload).await
1613 }
1614
1615 #[allow(clippy::too_many_arguments)]
1621 pub async fn submit_order(
1622 &self,
1623 product_type: BybitProductType,
1624 trader_id: TraderId,
1625 strategy_id: StrategyId,
1626 instrument_id: InstrumentId,
1627 client_order_id: ClientOrderId,
1628 order_side: OrderSide,
1629 order_type: OrderType,
1630 quantity: Quantity,
1631 is_quote_quantity: bool,
1632 time_in_force: Option<TimeInForce>,
1633 price: Option<Price>,
1634 trigger_price: Option<Price>,
1635 post_only: Option<bool>,
1636 reduce_only: Option<bool>,
1637 is_leverage: bool,
1638 ) -> BybitWsResult<()> {
1639 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1640 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1641 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1642
1643 let bybit_side = match order_side {
1644 OrderSide::Buy => BybitOrderSide::Buy,
1645 OrderSide::Sell => BybitOrderSide::Sell,
1646 _ => {
1647 return Err(BybitWsError::ClientError(format!(
1648 "Invalid order side: {order_side:?}"
1649 )));
1650 }
1651 };
1652
1653 let (bybit_order_type, is_stop_order) = match order_type {
1655 OrderType::Market => (BybitOrderType::Market, false),
1656 OrderType::Limit => (BybitOrderType::Limit, false),
1657 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1658 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1659 _ => {
1660 return Err(BybitWsError::ClientError(format!(
1661 "Unsupported order type: {order_type:?}"
1662 )));
1663 }
1664 };
1665
1666 let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1667 None
1668 } else if post_only == Some(true) {
1669 Some(BybitTimeInForce::PostOnly)
1670 } else if let Some(tif) = time_in_force {
1671 Some(match tif {
1672 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1673 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1674 TimeInForce::Fok => BybitTimeInForce::Fok,
1675 _ => {
1676 return Err(BybitWsError::ClientError(format!(
1677 "Unsupported time in force: {tif:?}"
1678 )));
1679 }
1680 })
1681 } else {
1682 None
1683 };
1684
1685 let market_unit = if product_type == BybitProductType::Spot
1688 && bybit_order_type == BybitOrderType::Market
1689 {
1690 if is_quote_quantity {
1691 Some(BYBIT_QUOTE_COIN.to_string())
1692 } else {
1693 Some(BYBIT_BASE_COIN.to_string())
1694 }
1695 } else {
1696 None
1697 };
1698
1699 let is_leverage_value = if product_type == BybitProductType::Spot {
1701 Some(i32::from(is_leverage))
1702 } else {
1703 None
1704 };
1705
1706 let trigger_direction = if is_stop_order {
1709 match (order_type, order_side) {
1710 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1711 Some(BybitTriggerDirection::RisesTo as i32)
1712 }
1713 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1714 Some(BybitTriggerDirection::FallsTo as i32)
1715 }
1716 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1717 Some(BybitTriggerDirection::FallsTo as i32)
1718 }
1719 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1720 Some(BybitTriggerDirection::RisesTo as i32)
1721 }
1722 _ => None,
1723 }
1724 } else {
1725 None
1726 };
1727
1728 let params = if is_stop_order {
1729 BybitWsPlaceOrderParams {
1732 category: product_type,
1733 symbol: raw_symbol,
1734 side: bybit_side,
1735 order_type: bybit_order_type,
1736 qty: quantity.to_string(),
1737 is_leverage: is_leverage_value,
1738 market_unit: market_unit.clone(),
1739 price: price.map(|p| p.to_string()),
1740 time_in_force: bybit_tif,
1741 order_link_id: Some(client_order_id.to_string()),
1742 reduce_only: reduce_only.filter(|&r| r),
1743 close_on_trigger: None,
1744 trigger_price: trigger_price.map(|p| p.to_string()),
1745 trigger_by: Some(BybitTriggerType::LastPrice),
1746 trigger_direction,
1747 tpsl_mode: None, take_profit: None,
1749 stop_loss: None,
1750 tp_trigger_by: None,
1751 sl_trigger_by: None,
1752 sl_trigger_price: None, tp_trigger_price: None, sl_order_type: None,
1755 tp_order_type: None,
1756 sl_limit_price: None,
1757 tp_limit_price: None,
1758 }
1759 } else {
1760 BybitWsPlaceOrderParams {
1762 category: product_type,
1763 symbol: raw_symbol,
1764 side: bybit_side,
1765 order_type: bybit_order_type,
1766 qty: quantity.to_string(),
1767 is_leverage: is_leverage_value,
1768 market_unit,
1769 price: price.map(|p| p.to_string()),
1770 time_in_force: if bybit_order_type == BybitOrderType::Market {
1771 None
1772 } else {
1773 bybit_tif
1774 },
1775 order_link_id: Some(client_order_id.to_string()),
1776 reduce_only: reduce_only.filter(|&r| r),
1777 close_on_trigger: None,
1778 trigger_price: None,
1779 trigger_by: None,
1780 trigger_direction: None,
1781 tpsl_mode: None,
1782 take_profit: None,
1783 stop_loss: None,
1784 tp_trigger_by: None,
1785 sl_trigger_by: None,
1786 sl_trigger_price: None,
1787 tp_trigger_price: None,
1788 sl_order_type: None,
1789 tp_order_type: None,
1790 sl_limit_price: None,
1791 tp_limit_price: None,
1792 }
1793 };
1794
1795 self.place_order(
1796 params,
1797 client_order_id,
1798 trader_id,
1799 strategy_id,
1800 instrument_id,
1801 )
1802 .await
1803 }
1804
1805 #[allow(clippy::too_many_arguments)]
1811 pub async fn modify_order(
1812 &self,
1813 product_type: BybitProductType,
1814 trader_id: TraderId,
1815 strategy_id: StrategyId,
1816 instrument_id: InstrumentId,
1817 client_order_id: ClientOrderId,
1818 venue_order_id: Option<VenueOrderId>,
1819 quantity: Option<Quantity>,
1820 price: Option<Price>,
1821 ) -> BybitWsResult<()> {
1822 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1823 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1824 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1825
1826 let params = BybitWsAmendOrderParams {
1827 category: product_type,
1828 symbol: raw_symbol,
1829 order_id: venue_order_id.map(|id| id.to_string()),
1830 order_link_id: Some(client_order_id.to_string()),
1831 qty: quantity.map(|q| q.to_string()),
1832 price: price.map(|p| p.to_string()),
1833 trigger_price: None,
1834 take_profit: None,
1835 stop_loss: None,
1836 tp_trigger_by: None,
1837 sl_trigger_by: None,
1838 };
1839
1840 self.amend_order(
1841 params,
1842 client_order_id,
1843 trader_id,
1844 strategy_id,
1845 instrument_id,
1846 venue_order_id,
1847 )
1848 .await
1849 }
1850
1851 pub async fn cancel_order_by_id(
1857 &self,
1858 product_type: BybitProductType,
1859 trader_id: TraderId,
1860 strategy_id: StrategyId,
1861 instrument_id: InstrumentId,
1862 client_order_id: ClientOrderId,
1863 venue_order_id: Option<VenueOrderId>,
1864 ) -> BybitWsResult<()> {
1865 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1866 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1867 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1868
1869 let params = BybitWsCancelOrderParams {
1870 category: product_type,
1871 symbol: raw_symbol,
1872 order_id: venue_order_id.map(|id| id.to_string()),
1873 order_link_id: Some(client_order_id.to_string()),
1874 };
1875
1876 self.cancel_order(
1877 params,
1878 client_order_id,
1879 trader_id,
1880 strategy_id,
1881 instrument_id,
1882 venue_order_id,
1883 )
1884 .await
1885 }
1886
1887 #[allow(clippy::too_many_arguments)]
1889 pub fn build_place_order_params(
1890 &self,
1891 product_type: BybitProductType,
1892 instrument_id: InstrumentId,
1893 client_order_id: ClientOrderId,
1894 order_side: OrderSide,
1895 order_type: OrderType,
1896 quantity: Quantity,
1897 is_quote_quantity: bool,
1898 time_in_force: Option<TimeInForce>,
1899 price: Option<Price>,
1900 trigger_price: Option<Price>,
1901 post_only: Option<bool>,
1902 reduce_only: Option<bool>,
1903 is_leverage: bool,
1904 take_profit: Option<Price>,
1905 stop_loss: Option<Price>,
1906 ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1907 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1908 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1909 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1910
1911 let bybit_side = match order_side {
1912 OrderSide::Buy => BybitOrderSide::Buy,
1913 OrderSide::Sell => BybitOrderSide::Sell,
1914 _ => {
1915 return Err(BybitWsError::ClientError(format!(
1916 "Invalid order side: {order_side:?}"
1917 )));
1918 }
1919 };
1920
1921 let (bybit_order_type, is_stop_order) = match order_type {
1922 OrderType::Market => (BybitOrderType::Market, false),
1923 OrderType::Limit => (BybitOrderType::Limit, false),
1924 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1925 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1926 _ => {
1927 return Err(BybitWsError::ClientError(format!(
1928 "Unsupported order type: {order_type:?}"
1929 )));
1930 }
1931 };
1932
1933 let bybit_tif = if post_only == Some(true) {
1934 Some(BybitTimeInForce::PostOnly)
1935 } else if let Some(tif) = time_in_force {
1936 Some(match tif {
1937 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1938 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1939 TimeInForce::Fok => BybitTimeInForce::Fok,
1940 _ => {
1941 return Err(BybitWsError::ClientError(format!(
1942 "Unsupported time in force: {tif:?}"
1943 )));
1944 }
1945 })
1946 } else {
1947 None
1948 };
1949
1950 let market_unit = if product_type == BybitProductType::Spot
1951 && bybit_order_type == BybitOrderType::Market
1952 {
1953 if is_quote_quantity {
1954 Some(BYBIT_QUOTE_COIN.to_string())
1955 } else {
1956 Some(BYBIT_BASE_COIN.to_string())
1957 }
1958 } else {
1959 None
1960 };
1961
1962 let is_leverage_value = if product_type == BybitProductType::Spot {
1964 Some(i32::from(is_leverage))
1965 } else {
1966 None
1967 };
1968
1969 let trigger_direction = if is_stop_order {
1972 match (order_type, order_side) {
1973 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1974 Some(BybitTriggerDirection::RisesTo as i32)
1975 }
1976 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1977 Some(BybitTriggerDirection::FallsTo as i32)
1978 }
1979 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1980 Some(BybitTriggerDirection::FallsTo as i32)
1981 }
1982 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1983 Some(BybitTriggerDirection::RisesTo as i32)
1984 }
1985 _ => None,
1986 }
1987 } else {
1988 None
1989 };
1990
1991 let params = if is_stop_order {
1992 BybitWsPlaceOrderParams {
1993 category: product_type,
1994 symbol: raw_symbol,
1995 side: bybit_side,
1996 order_type: bybit_order_type,
1997 qty: quantity.to_string(),
1998 is_leverage: is_leverage_value,
1999 market_unit,
2000 price: price.map(|p| p.to_string()),
2001 time_in_force: if bybit_order_type == BybitOrderType::Market {
2002 None
2003 } else {
2004 bybit_tif
2005 },
2006 order_link_id: Some(client_order_id.to_string()),
2007 reduce_only: reduce_only.filter(|&r| r),
2008 close_on_trigger: None,
2009 trigger_price: trigger_price.map(|p| p.to_string()),
2010 trigger_by: Some(BybitTriggerType::LastPrice),
2011 trigger_direction,
2012 tpsl_mode: if take_profit.is_some() || stop_loss.is_some() {
2013 Some("Full".to_string())
2014 } else {
2015 None
2016 },
2017 take_profit: take_profit.map(|p| p.to_string()),
2018 stop_loss: stop_loss.map(|p| p.to_string()),
2019 tp_trigger_by: take_profit.map(|_| BybitTriggerType::LastPrice),
2020 sl_trigger_by: stop_loss.map(|_| BybitTriggerType::LastPrice),
2021 sl_trigger_price: None,
2022 tp_trigger_price: None,
2023 sl_order_type: None,
2024 tp_order_type: None,
2025 sl_limit_price: None,
2026 tp_limit_price: None,
2027 }
2028 } else {
2029 BybitWsPlaceOrderParams {
2030 category: product_type,
2031 symbol: raw_symbol,
2032 side: bybit_side,
2033 order_type: bybit_order_type,
2034 qty: quantity.to_string(),
2035 is_leverage: is_leverage_value,
2036 market_unit,
2037 price: price.map(|p| p.to_string()),
2038 time_in_force: if bybit_order_type == BybitOrderType::Market {
2039 None
2040 } else {
2041 bybit_tif
2042 },
2043 order_link_id: Some(client_order_id.to_string()),
2044 reduce_only: reduce_only.filter(|&r| r),
2045 close_on_trigger: None,
2046 trigger_price: None,
2047 trigger_by: None,
2048 trigger_direction: None,
2049 tpsl_mode: if take_profit.is_some() || stop_loss.is_some() {
2050 Some("Full".to_string())
2051 } else {
2052 None
2053 },
2054 take_profit: take_profit.map(|p| p.to_string()),
2055 stop_loss: stop_loss.map(|p| p.to_string()),
2056 tp_trigger_by: take_profit.map(|_| BybitTriggerType::LastPrice),
2057 sl_trigger_by: stop_loss.map(|_| BybitTriggerType::LastPrice),
2058 sl_trigger_price: None,
2059 tp_trigger_price: None,
2060 sl_order_type: None,
2061 tp_order_type: None,
2062 sl_limit_price: None,
2063 tp_limit_price: None,
2064 }
2065 };
2066
2067 Ok(params)
2068 }
2069
2070 #[allow(clippy::too_many_arguments)]
2072 pub fn build_amend_order_params(
2073 &self,
2074 product_type: BybitProductType,
2075 instrument_id: InstrumentId,
2076 venue_order_id: Option<VenueOrderId>,
2077 client_order_id: Option<ClientOrderId>,
2078 quantity: Option<Quantity>,
2079 price: Option<Price>,
2080 ) -> BybitWsResult<BybitWsAmendOrderParams> {
2081 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2082 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2083 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2084
2085 Ok(BybitWsAmendOrderParams {
2086 category: product_type,
2087 symbol: raw_symbol,
2088 order_id: venue_order_id.map(|v| v.to_string()),
2089 order_link_id: client_order_id.map(|c| c.to_string()),
2090 qty: quantity.map(|q| q.to_string()),
2091 price: price.map(|p| p.to_string()),
2092 trigger_price: None,
2093 take_profit: None,
2094 stop_loss: None,
2095 tp_trigger_by: None,
2096 sl_trigger_by: None,
2097 })
2098 }
2099
2100 pub fn build_cancel_order_params(
2107 &self,
2108 product_type: BybitProductType,
2109 instrument_id: InstrumentId,
2110 venue_order_id: Option<VenueOrderId>,
2111 client_order_id: Option<ClientOrderId>,
2112 ) -> BybitWsResult<BybitWsCancelOrderParams> {
2113 if venue_order_id.is_none() && client_order_id.is_none() {
2114 return Err(BybitWsError::ClientError(
2115 "Either venue_order_id or client_order_id must be provided".to_string(),
2116 ));
2117 }
2118
2119 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2120 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2121 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2122
2123 Ok(BybitWsCancelOrderParams {
2124 category: product_type,
2125 symbol: raw_symbol,
2126 order_id: venue_order_id.map(|v| v.to_string()),
2127 order_link_id: client_order_id.map(|c| c.to_string()),
2128 })
2129 }
2130
2131 fn default_headers() -> Vec<(String, String)> {
2132 vec![
2133 ("Content-Type".to_string(), "application/json".to_string()),
2134 ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2135 ]
2136 }
2137
2138 async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2139 if !self.requires_auth {
2140 return Ok(());
2141 }
2142
2143 let credential = self.credential.as_ref().ok_or_else(|| {
2144 BybitWsError::Authentication("Credentials required for authentication".to_string())
2145 })?;
2146
2147 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2148 let signature = credential.sign_websocket_auth(expires);
2149
2150 let auth_message = BybitAuthRequest {
2151 op: BybitWsOperation::Auth,
2152 args: vec![
2153 Value::String(credential.api_key().to_string()),
2154 Value::Number(expires.into()),
2155 Value::String(signature),
2156 ],
2157 };
2158
2159 let payload = serde_json::to_string(&auth_message)?;
2160
2161 let _rx = self.auth_tracker.begin();
2163
2164 self.cmd_tx
2165 .read()
2166 .await
2167 .send(HandlerCommand::Authenticate { payload })
2168 .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2169
2170 Ok(())
2173 }
2174
2175 async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2176 let cmd = HandlerCommand::SendText {
2177 payload: text.to_string(),
2178 };
2179
2180 self.send_cmd(cmd).await
2181 }
2182
2183 async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2184 self.cmd_tx
2185 .read()
2186 .await
2187 .send(cmd)
2188 .map_err(|e| BybitWsError::Send(e.to_string()))
2189 }
2190}
2191
2192#[cfg(test)]
2193mod tests {
2194 use rstest::rstest;
2195
2196 use super::*;
2197 use crate::{
2198 common::testing::load_test_json,
2199 websocket::{classify_bybit_message, messages::BybitWsMessage},
2200 };
2201
2202 #[rstest]
2203 fn classify_orderbook_snapshot() {
2204 let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2205 .expect("invalid fixture");
2206 let message = classify_bybit_message(json);
2207 assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2208 }
2209
2210 #[rstest]
2211 fn classify_trade_snapshot() {
2212 let json: Value =
2213 serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2214 let message = classify_bybit_message(json);
2215 assert!(matches!(message, BybitWsMessage::Trade(_)));
2216 }
2217
2218 #[rstest]
2219 fn classify_ticker_linear_snapshot() {
2220 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2221 .expect("invalid fixture");
2222 let message = classify_bybit_message(json);
2223 assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2224 }
2225
2226 #[rstest]
2227 fn classify_ticker_option_snapshot() {
2228 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2229 .expect("invalid fixture");
2230 let message = classify_bybit_message(json);
2231 assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2232 }
2233
2234 #[rstest]
2235 fn test_race_unsubscribe_failure_recovery() {
2236 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2244
2245 subscriptions.mark_subscribe(topic);
2247 subscriptions.confirm_subscribe(topic);
2248 assert_eq!(subscriptions.len(), 1);
2249
2250 subscriptions.mark_unsubscribe(topic);
2252 assert_eq!(subscriptions.len(), 0);
2253 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2254
2255 subscriptions.confirm_unsubscribe(topic); subscriptions.mark_subscribe(topic); subscriptions.confirm_subscribe(topic); assert_eq!(subscriptions.len(), 1);
2263 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2264 assert!(subscriptions.pending_subscribe_topics().is_empty());
2265
2266 let all = subscriptions.all_topics();
2268 assert_eq!(all.len(), 1);
2269 assert!(all.contains(&topic.to_string()));
2270 }
2271
2272 #[rstest]
2273 fn test_race_resubscribe_before_unsubscribe_ack() {
2274 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "orderbook.50.BTCUSDT";
2280
2281 subscriptions.mark_subscribe(topic);
2283 subscriptions.confirm_subscribe(topic);
2284 assert_eq!(subscriptions.len(), 1);
2285
2286 subscriptions.mark_unsubscribe(topic);
2288 assert_eq!(subscriptions.len(), 0);
2289 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2290
2291 subscriptions.mark_subscribe(topic);
2293 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2294
2295 subscriptions.confirm_unsubscribe(topic);
2297 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2298 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2299
2300 subscriptions.confirm_subscribe(topic);
2302 assert_eq!(subscriptions.len(), 1);
2303 assert!(subscriptions.pending_subscribe_topics().is_empty());
2304
2305 let all = subscriptions.all_topics();
2307 assert_eq!(all.len(), 1);
2308 assert!(all.contains(&topic.to_string()));
2309 }
2310
2311 #[rstest]
2312 fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2313 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "tickers.ETHUSDT";
2318
2319 subscriptions.mark_subscribe(topic);
2321 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2322
2323 subscriptions.mark_unsubscribe(topic);
2325 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2327
2328 subscriptions.confirm_subscribe(topic);
2330 assert_eq!(subscriptions.len(), 0); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2332
2333 subscriptions.confirm_unsubscribe(topic);
2335
2336 assert!(subscriptions.is_empty());
2338 assert!(subscriptions.all_topics().is_empty());
2339 }
2340
2341 #[rstest]
2342 fn test_race_reconnection_with_pending_states() {
2343 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let trade_btc = "publicTrade.BTCUSDT";
2349 subscriptions.mark_subscribe(trade_btc);
2350 subscriptions.confirm_subscribe(trade_btc);
2351
2352 let trade_eth = "publicTrade.ETHUSDT";
2354 subscriptions.mark_subscribe(trade_eth);
2355
2356 let book_btc = "orderbook.50.BTCUSDT";
2358 subscriptions.mark_subscribe(book_btc);
2359 subscriptions.confirm_subscribe(book_btc);
2360 subscriptions.mark_unsubscribe(book_btc);
2361
2362 let topics_to_restore = subscriptions.all_topics();
2364
2365 assert_eq!(topics_to_restore.len(), 2);
2367 assert!(topics_to_restore.contains(&trade_btc.to_string()));
2368 assert!(topics_to_restore.contains(&trade_eth.to_string()));
2369 assert!(!topics_to_restore.contains(&book_btc.to_string())); }
2371
2372 #[rstest]
2373 fn test_race_duplicate_subscribe_messages_idempotent() {
2374 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2379
2380 subscriptions.mark_subscribe(topic);
2382 subscriptions.confirm_subscribe(topic);
2383 assert_eq!(subscriptions.len(), 1);
2384
2385 subscriptions.mark_subscribe(topic);
2387 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.len(), 1); subscriptions.confirm_subscribe(topic);
2392 assert_eq!(subscriptions.len(), 1);
2393
2394 let all = subscriptions.all_topics();
2396 assert_eq!(all.len(), 1);
2397 assert_eq!(all[0], topic);
2398 }
2399
2400 #[rstest]
2401 #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2402 #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2403 #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2404 #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2405 #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2406 #[case::option_with_leverage(BybitProductType::Option, true, None)]
2407 fn test_is_leverage_parameter(
2408 #[case] product_type: BybitProductType,
2409 #[case] is_leverage: bool,
2410 #[case] expected: Option<i32>,
2411 ) {
2412 let symbol = match product_type {
2413 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2414 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2415 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2416 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2417 };
2418
2419 let instrument_id = InstrumentId::from(symbol);
2420 let client_order_id = ClientOrderId::from("test-order-1");
2421 let quantity = Quantity::from("1.0");
2422
2423 let client = BybitWebSocketClient::new_trade(
2424 BybitEnvironment::Testnet,
2425 Some("test-key".to_string()),
2426 Some("test-secret".to_string()),
2427 None,
2428 Some(20),
2429 );
2430
2431 let params = client
2432 .build_place_order_params(
2433 product_type,
2434 instrument_id,
2435 client_order_id,
2436 OrderSide::Buy,
2437 OrderType::Limit,
2438 quantity,
2439 false, Some(TimeInForce::Gtc),
2441 Some(Price::from("50000.0")),
2442 None,
2443 None,
2444 None,
2445 is_leverage,
2446 None, None, )
2449 .expect("Failed to build params");
2450
2451 assert_eq!(params.is_leverage, expected);
2452 }
2453
2454 #[rstest]
2455 #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2456 #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2457 #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2458 #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2459 #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2460 #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2461 fn test_is_quote_quantity_parameter(
2462 #[case] product_type: BybitProductType,
2463 #[case] order_type: OrderType,
2464 #[case] is_quote_quantity: bool,
2465 #[case] expected: Option<String>,
2466 ) {
2467 let symbol = match product_type {
2468 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2469 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2470 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2471 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2472 };
2473
2474 let instrument_id = InstrumentId::from(symbol);
2475 let client_order_id = ClientOrderId::from("test-order-1");
2476 let quantity = Quantity::from("1.0");
2477
2478 let client = BybitWebSocketClient::new_trade(
2479 BybitEnvironment::Testnet,
2480 Some("test-key".to_string()),
2481 Some("test-secret".to_string()),
2482 None,
2483 Some(20),
2484 );
2485
2486 let params = client
2487 .build_place_order_params(
2488 product_type,
2489 instrument_id,
2490 client_order_id,
2491 OrderSide::Buy,
2492 order_type,
2493 quantity,
2494 is_quote_quantity,
2495 Some(TimeInForce::Gtc),
2496 if order_type == OrderType::Market {
2497 None
2498 } else {
2499 Some(Price::from("50000.0"))
2500 },
2501 None,
2502 None,
2503 None,
2504 false,
2505 None, None, )
2508 .expect("Failed to build params");
2509
2510 assert_eq!(params.market_unit, expected);
2511 }
2512}