1use std::{
21 fmt::Debug,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU8, Ordering},
25 },
26 time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::live::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt};
34use nautilus_model::{
35 enums::{OrderSide, OrderType, TimeInForce},
36 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 types::{Price, Quantity},
39};
40use nautilus_network::{
41 backoff::ExponentialBackoff,
42 mode::ConnectionMode,
43 websocket::{
44 AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
45 channel_message_handler,
46 },
47};
48use serde_json::Value;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use crate::{
53 common::{
54 consts::{
55 BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
56 },
57 credential::Credential,
58 enums::{
59 BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
60 BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
61 },
62 parse::{extract_raw_symbol, make_bybit_symbol},
63 symbol::BybitSymbol,
64 urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
65 },
66 websocket::{
67 enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
68 error::{BybitWsError, BybitWsResult},
69 handler::{FeedHandler, HandlerCommand},
70 messages::{
71 BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
72 BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
73 BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
74 NautilusWsMessage,
75 },
76 },
77};
78
79const DEFAULT_HEARTBEAT_SECS: u64 = 20;
80const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
81const BATCH_PROCESSING_LIMIT: usize = 20;
82
83type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
85
86fn resolve_credential(
93 environment: BybitEnvironment,
94 api_key: Option<String>,
95 api_secret: Option<String>,
96) -> Option<Credential> {
97 let (api_key_env, api_secret_env) = match environment {
98 BybitEnvironment::Demo => ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET"),
99 BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
100 BybitEnvironment::Mainnet => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
101 };
102
103 let key = get_or_env_var_opt(api_key, api_key_env);
104 let secret = get_or_env_var_opt(api_secret, api_secret_env);
105
106 match (key, secret) {
107 (Some(k), Some(s)) => Some(Credential::new(k, s)),
108 _ => None,
109 }
110}
111
112#[cfg_attr(feature = "python", pyo3::pyclass)]
114pub struct BybitWebSocketClient {
115 url: String,
116 environment: BybitEnvironment,
117 product_type: Option<BybitProductType>,
118 credential: Option<Credential>,
119 requires_auth: bool,
120 auth_tracker: AuthTracker,
121 heartbeat: Option<u64>,
122 connection_mode: Arc<ArcSwap<AtomicU8>>,
123 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
124 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
125 signal: Arc<AtomicBool>,
126 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
127 subscriptions: SubscriptionState,
128 is_authenticated: Arc<AtomicBool>,
129 account_id: Option<AccountId>,
130 mm_level: Arc<AtomicU8>,
131 bars_timestamp_on_close: bool,
132 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
133 funding_cache: FundingCache,
134 cancellation_token: CancellationToken,
135}
136
137impl Debug for BybitWebSocketClient {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("BybitWebSocketClient")
140 .field("url", &self.url)
141 .field("environment", &self.environment)
142 .field("product_type", &self.product_type)
143 .field("requires_auth", &self.requires_auth)
144 .field("heartbeat", &self.heartbeat)
145 .field("confirmed_subscriptions", &self.subscriptions.len())
146 .finish()
147 }
148}
149
150impl Clone for BybitWebSocketClient {
151 fn clone(&self) -> Self {
152 Self {
153 url: self.url.clone(),
154 environment: self.environment,
155 product_type: self.product_type,
156 credential: self.credential.clone(),
157 requires_auth: self.requires_auth,
158 auth_tracker: self.auth_tracker.clone(),
159 heartbeat: self.heartbeat,
160 connection_mode: Arc::clone(&self.connection_mode),
161 cmd_tx: Arc::clone(&self.cmd_tx),
162 out_rx: None, signal: Arc::clone(&self.signal),
164 task_handle: None, subscriptions: self.subscriptions.clone(),
166 is_authenticated: Arc::clone(&self.is_authenticated),
167 account_id: self.account_id,
168 mm_level: Arc::clone(&self.mm_level),
169 bars_timestamp_on_close: self.bars_timestamp_on_close,
170 instruments_cache: Arc::clone(&self.instruments_cache),
171 funding_cache: Arc::clone(&self.funding_cache),
172 cancellation_token: self.cancellation_token.clone(),
173 }
174 }
175}
176
177impl BybitWebSocketClient {
178 #[must_use]
180 pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
181 Self::new_public_with(
182 BybitProductType::Linear,
183 BybitEnvironment::Mainnet,
184 url,
185 heartbeat,
186 )
187 }
188
189 #[must_use]
191 pub fn new_public_with(
192 product_type: BybitProductType,
193 environment: BybitEnvironment,
194 url: Option<String>,
195 heartbeat: Option<u64>,
196 ) -> Self {
197 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
201
202 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
203 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
204
205 Self {
206 url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
207 environment,
208 product_type: Some(product_type),
209 credential: None,
210 requires_auth: false,
211 auth_tracker: AuthTracker::new(),
212 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
213 connection_mode,
214 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
215 out_rx: None,
216 signal: Arc::new(AtomicBool::new(false)),
217 task_handle: None,
218 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
219 is_authenticated: Arc::new(AtomicBool::new(false)),
220 instruments_cache: Arc::new(DashMap::new()),
221 account_id: None,
222 mm_level: Arc::new(AtomicU8::new(0)),
223 bars_timestamp_on_close: true,
224 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
225 cancellation_token: CancellationToken::new(),
226 }
227 }
228
229 #[must_use]
237 pub fn new_private(
238 environment: BybitEnvironment,
239 api_key: Option<String>,
240 api_secret: Option<String>,
241 url: Option<String>,
242 heartbeat: Option<u64>,
243 ) -> Self {
244 let credential = resolve_credential(environment, api_key, api_secret);
245
246 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
250
251 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
252 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
253
254 Self {
255 url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
256 environment,
257 product_type: None,
258 credential,
259 requires_auth: true,
260 auth_tracker: AuthTracker::new(),
261 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
262 connection_mode,
263 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
264 out_rx: None,
265 signal: Arc::new(AtomicBool::new(false)),
266 task_handle: None,
267 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
268 is_authenticated: Arc::new(AtomicBool::new(false)),
269 instruments_cache: Arc::new(DashMap::new()),
270 account_id: None,
271 mm_level: Arc::new(AtomicU8::new(0)),
272 bars_timestamp_on_close: true,
273 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
274 cancellation_token: CancellationToken::new(),
275 }
276 }
277
278 #[must_use]
286 pub fn new_trade(
287 environment: BybitEnvironment,
288 api_key: Option<String>,
289 api_secret: Option<String>,
290 url: Option<String>,
291 heartbeat: Option<u64>,
292 ) -> Self {
293 let credential = resolve_credential(environment, api_key, api_secret);
294
295 let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
299
300 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
301 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
302
303 Self {
304 url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
305 environment,
306 product_type: None,
307 credential,
308 requires_auth: true,
309 auth_tracker: AuthTracker::new(),
310 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
311 connection_mode,
312 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
313 out_rx: None,
314 signal: Arc::new(AtomicBool::new(false)),
315 task_handle: None,
316 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
317 is_authenticated: Arc::new(AtomicBool::new(false)),
318 instruments_cache: Arc::new(DashMap::new()),
319 account_id: None,
320 mm_level: Arc::new(AtomicU8::new(0)),
321 bars_timestamp_on_close: true,
322 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
323 cancellation_token: CancellationToken::new(),
324 }
325 }
326
327 pub async fn connect(&mut self) -> BybitWsResult<()> {
334 self.signal.store(false, Ordering::Relaxed);
335
336 let (raw_handler, raw_rx) = channel_message_handler();
337
338 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
341 });
343
344 let ping_msg = serde_json::to_string(&BybitSubscription {
345 op: BybitWsOperation::Ping,
346 args: vec![],
347 })?;
348
349 let config = WebSocketConfig {
350 url: self.url.clone(),
351 headers: Self::default_headers(),
352 heartbeat: self.heartbeat,
353 heartbeat_msg: Some(ping_msg),
354 reconnect_timeout_ms: Some(5_000),
355 reconnect_delay_initial_ms: Some(500),
356 reconnect_delay_max_ms: Some(5_000),
357 reconnect_backoff_factor: Some(1.5),
358 reconnect_jitter_ms: Some(250),
359 reconnect_max_attempts: None,
360 };
361
362 const MAX_RETRIES: u32 = 5;
365 const CONNECTION_TIMEOUT_SECS: u64 = 10;
366
367 let mut backoff = ExponentialBackoff::new(
368 Duration::from_millis(500),
369 Duration::from_millis(5000),
370 2.0,
371 250,
372 false,
373 )
374 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
375
376 #[allow(unused_assignments)]
377 let mut last_error = String::new();
378 let mut attempt = 0;
379 let client = loop {
380 attempt += 1;
381
382 match tokio::time::timeout(
383 Duration::from_secs(CONNECTION_TIMEOUT_SECS),
384 WebSocketClient::connect(
385 config.clone(),
386 Some(raw_handler.clone()),
387 Some(ping_handler.clone()),
388 None,
389 vec![],
390 None,
391 ),
392 )
393 .await
394 {
395 Ok(Ok(client)) => {
396 if attempt > 1 {
397 tracing::info!("WebSocket connection established after {attempt} attempts");
398 }
399 break client;
400 }
401 Ok(Err(e)) => {
402 last_error = e.to_string();
403 tracing::warn!(
404 attempt,
405 max_retries = MAX_RETRIES,
406 url = %self.url,
407 error = %last_error,
408 "WebSocket connection attempt failed"
409 );
410 }
411 Err(_) => {
412 last_error = format!(
413 "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
414 );
415 tracing::warn!(
416 attempt,
417 max_retries = MAX_RETRIES,
418 url = %self.url,
419 "WebSocket connection attempt timed out"
420 );
421 }
422 }
423
424 if attempt >= MAX_RETRIES {
425 return Err(BybitWsError::Transport(format!(
426 "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
427 If this is a DNS error, check your network configuration and DNS settings.",
428 self.url,
429 if last_error.is_empty() {
430 "unknown error"
431 } else {
432 &last_error
433 }
434 )));
435 }
436
437 let delay = backoff.next_duration();
438 tracing::debug!(
439 "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
440 attempt + 1
441 );
442 tokio::time::sleep(delay).await;
443 };
444
445 self.connection_mode.store(client.connection_mode_atomic());
446
447 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
448 self.out_rx = Some(Arc::new(out_rx));
449
450 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
451 *self.cmd_tx.write().await = cmd_tx.clone();
452
453 let cmd = HandlerCommand::SetClient(client);
454
455 self.send_cmd(cmd).await?;
456
457 if !self.instruments_cache.is_empty() {
459 let cached_instruments: Vec<InstrumentAny> = self
460 .instruments_cache
461 .iter()
462 .map(|entry| entry.value().clone())
463 .collect();
464 let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
465 self.send_cmd(cmd).await?;
466 }
467
468 let signal = Arc::clone(&self.signal);
469 let subscriptions = self.subscriptions.clone();
470 let credential = self.credential.clone();
471 let requires_auth = self.requires_auth;
472 let funding_cache = Arc::clone(&self.funding_cache);
473 let account_id = self.account_id;
474 let product_type = self.product_type;
475 let bars_timestamp_on_close = self.bars_timestamp_on_close;
476 let mm_level = Arc::clone(&self.mm_level);
477 let cmd_tx_for_reconnect = cmd_tx.clone();
478 let auth_tracker = self.auth_tracker.clone();
479 let is_authenticated = Arc::clone(&self.is_authenticated);
480
481 let stream_handle = get_runtime().spawn(async move {
482 let mut handler = FeedHandler::new(
483 signal.clone(),
484 cmd_rx,
485 raw_rx,
486 out_tx.clone(),
487 account_id,
488 product_type,
489 bars_timestamp_on_close,
490 mm_level.clone(),
491 auth_tracker,
492 subscriptions.clone(),
493 funding_cache.clone(),
494 );
495
496 let resubscribe_all = || async {
498 let topics = subscriptions.all_topics();
499
500 if topics.is_empty() {
501 return;
502 }
503
504 tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
505
506 for topic in &topics {
507 subscriptions.mark_subscribe(topic.as_str());
508 }
509
510 let mut payloads = Vec::with_capacity(topics.len());
511 for topic in &topics {
512 let message = BybitSubscription {
513 op: BybitWsOperation::Subscribe,
514 args: vec![topic.clone()],
515 };
516 if let Ok(payload) = serde_json::to_string(&message) {
517 payloads.push(payload);
518 }
519 }
520
521 let cmd = HandlerCommand::Subscribe { topics: payloads };
522
523 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
524 tracing::error!("Failed to send resubscribe command: {e}");
525 }
526 };
527
528 loop {
530 match handler.next().await {
531 Some(NautilusWsMessage::Reconnected) => {
532 if signal.load(Ordering::Relaxed) {
533 continue;
534 }
535
536 tracing::info!("WebSocket reconnected");
537
538 let confirmed_topics: Vec<String> = {
540 let confirmed = subscriptions.confirmed();
541 let mut topics = Vec::new();
542 for entry in confirmed.iter() {
543 let (channel, symbols) = entry.pair();
544 for symbol in symbols {
545 if symbol.is_empty() {
546 topics.push(channel.to_string());
547 } else {
548 topics.push(format!("{channel}.{symbol}"));
549 }
550 }
551 }
552 topics
553 };
554
555 if !confirmed_topics.is_empty() {
556 tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
557 for topic in confirmed_topics {
558 subscriptions.mark_failure(&topic);
559 }
560 }
561
562 funding_cache.write().await.clear();
564
565 if requires_auth {
566 is_authenticated.store(false, Ordering::Relaxed);
567 tracing::debug!("Re-authenticating after reconnection");
568
569 if let Some(cred) = &credential {
570 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
571 let signature = cred.sign_websocket_auth(expires);
572
573 let auth_message = BybitAuthRequest {
574 op: BybitWsOperation::Auth,
575 args: vec![
576 Value::String(cred.api_key().to_string()),
577 Value::Number(expires.into()),
578 Value::String(signature),
579 ],
580 };
581
582 if let Ok(payload) = serde_json::to_string(&auth_message) {
583 let cmd = HandlerCommand::Authenticate { payload };
584 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
585 tracing::error!(error = %e, "Failed to send reconnection auth command");
586 }
587 } else {
588 tracing::error!("Failed to serialize reconnection auth message");
589 }
590 }
591 }
592
593 if !requires_auth {
596 tracing::debug!("No authentication required, resubscribing immediately");
597 resubscribe_all().await;
598 }
599
600 if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
602 tracing::debug!("Receiver dropped, stopping");
603 break;
604 }
605 continue;
606 }
607 Some(NautilusWsMessage::Authenticated) => {
608 tracing::debug!("Authenticated, resubscribing");
609 is_authenticated.store(true, Ordering::Relaxed);
610 resubscribe_all().await;
611 continue;
612 }
613 Some(msg) => {
614 if out_tx.send(msg).is_err() {
615 tracing::error!("Failed to send message (receiver dropped)");
616 break;
617 }
618 }
619 None => {
620 if handler.is_stopped() {
622 tracing::debug!("Stop signal received, ending message processing");
623 break;
624 }
625 tracing::warn!("WebSocket stream ended unexpectedly");
627 break;
628 }
629 }
630 }
631
632 tracing::debug!("Handler task exiting");
633 });
634
635 self.task_handle = Some(Arc::new(stream_handle));
636
637 if requires_auth && let Err(e) = self.authenticate_if_required().await {
638 return Err(e);
639 }
640
641 Ok(())
642 }
643
644 pub async fn close(&mut self) -> BybitWsResult<()> {
646 tracing::debug!("Starting close process");
647
648 self.signal.store(true, Ordering::Relaxed);
649
650 let cmd = HandlerCommand::Disconnect;
651 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
652 tracing::debug!(
653 "Failed to send disconnect command (handler may already be shut down): {e}"
654 );
655 }
656
657 if let Some(task_handle) = self.task_handle.take() {
658 match Arc::try_unwrap(task_handle) {
659 Ok(handle) => {
660 tracing::debug!("Waiting for task handle to complete");
661 match tokio::time::timeout(Duration::from_secs(2), handle).await {
662 Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
663 Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
664 Err(_) => {
665 tracing::warn!(
666 "Timeout waiting for task handle, task may still be running"
667 );
668 }
670 }
671 }
672 Err(arc_handle) => {
673 tracing::debug!(
674 "Cannot take ownership of task handle - other references exist, aborting task"
675 );
676 arc_handle.abort();
677 }
678 }
679 } else {
680 tracing::debug!("No task handle to await");
681 }
682
683 self.is_authenticated.store(false, Ordering::Relaxed);
684
685 tracing::debug!("Closed");
686
687 Ok(())
688 }
689
690 #[must_use]
692 pub fn is_active(&self) -> bool {
693 let connection_mode_arc = self.connection_mode.load();
694 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
695 && !self.signal.load(Ordering::Relaxed)
696 }
697
698 pub fn is_closed(&self) -> bool {
700 let connection_mode_arc = self.connection_mode.load();
701 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
702 || self.signal.load(Ordering::Relaxed)
703 }
704
705 pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
711 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
712
713 tokio::time::timeout(timeout, async {
714 while !self.is_active() {
715 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
716 }
717 })
718 .await
719 .map_err(|_| {
720 BybitWsError::ClientError(format!(
721 "WebSocket connection timeout after {timeout_secs} seconds"
722 ))
723 })?;
724
725 Ok(())
726 }
727
728 pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
730 if topics.is_empty() {
731 return Ok(());
732 }
733
734 tracing::debug!("Subscribing to topics: {topics:?}");
735
736 let mut topics_to_send = Vec::new();
738
739 for topic in topics {
740 if self.subscriptions.add_reference(&topic) {
742 self.subscriptions.mark_subscribe(&topic);
743 topics_to_send.push(topic.clone());
744 } else {
745 tracing::debug!("Already subscribed to {topic}, skipping duplicate subscription");
746 }
747 }
748
749 if topics_to_send.is_empty() {
750 return Ok(());
751 }
752
753 let mut payloads = Vec::with_capacity(topics_to_send.len());
755 for topic in &topics_to_send {
756 let message = BybitSubscription {
757 op: BybitWsOperation::Subscribe,
758 args: vec![topic.clone()],
759 };
760 let payload = serde_json::to_string(&message).map_err(|e| {
761 BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
762 })?;
763 payloads.push(payload);
764 }
765
766 let cmd = HandlerCommand::Subscribe { topics: payloads };
767 self.cmd_tx
768 .read()
769 .await
770 .send(cmd)
771 .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
772
773 Ok(())
774 }
775
776 pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
778 if topics.is_empty() {
779 return Ok(());
780 }
781
782 tracing::debug!("Attempting to unsubscribe from topics: {topics:?}");
783
784 if self.signal.load(Ordering::Relaxed) {
785 tracing::debug!("Shutdown signal detected, skipping unsubscribe");
786 return Ok(());
787 }
788
789 let mut topics_to_send = Vec::new();
791
792 for topic in topics {
793 if self.subscriptions.remove_reference(&topic) {
795 self.subscriptions.mark_unsubscribe(&topic);
796 topics_to_send.push(topic.clone());
797 } else {
798 tracing::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
799 }
800 }
801
802 if topics_to_send.is_empty() {
803 return Ok(());
804 }
805
806 let mut payloads = Vec::with_capacity(topics_to_send.len());
808 for topic in &topics_to_send {
809 let message = BybitSubscription {
810 op: BybitWsOperation::Unsubscribe,
811 args: vec![topic.clone()],
812 };
813 if let Ok(payload) = serde_json::to_string(&message) {
814 payloads.push(payload);
815 }
816 }
817
818 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
819 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
820 tracing::debug!(error = %e, "Failed to send unsubscribe command");
821 }
822
823 Ok(())
824 }
825
826 pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
832 let rx = self
833 .out_rx
834 .take()
835 .expect("Stream receiver already taken or client not connected");
836 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
837 async_stream::stream! {
838 while let Some(msg) = rx.recv().await {
839 yield msg;
840 }
841 }
842 }
843
844 #[must_use]
846 pub fn subscription_count(&self) -> usize {
847 self.subscriptions.len()
848 }
849
850 #[must_use]
852 pub fn credential(&self) -> Option<&Credential> {
853 self.credential.as_ref()
854 }
855
856 pub fn cache_instrument(&self, instrument: InstrumentAny) {
860 self.instruments_cache
861 .insert(instrument.symbol().inner(), instrument.clone());
862
863 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
866 let cmd = HandlerCommand::UpdateInstrument(instrument);
867 if let Err(e) = cmd_tx.send(cmd) {
868 tracing::debug!("Failed to send instrument update to handler: {e}");
869 }
870 }
871 }
872
873 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
877 self.instruments_cache.clear();
878 let mut count = 0;
879
880 tracing::debug!("Initializing Bybit instrument cache");
881
882 for inst in instruments {
883 let symbol = inst.symbol().inner();
884 self.instruments_cache.insert(symbol, inst.clone());
885 tracing::debug!("Cached instrument: {symbol}");
886 count += 1;
887 }
888
889 tracing::info!("Bybit instrument cache initialized with {count} instruments");
890 }
891
892 pub fn set_account_id(&mut self, account_id: AccountId) {
894 self.account_id = Some(account_id);
895 }
896
897 pub fn set_mm_level(&self, mm_level: u8) {
899 self.mm_level.store(mm_level, Ordering::Relaxed);
900 }
901
902 pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
907 self.bars_timestamp_on_close = value;
908 }
909
910 #[must_use]
912 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
913 &self.instruments_cache
914 }
915
916 #[must_use]
918 pub fn account_id(&self) -> Option<AccountId> {
919 self.account_id
920 }
921
922 #[must_use]
924 pub fn product_type(&self) -> Option<BybitProductType> {
925 self.product_type
926 }
927
928 pub async fn subscribe_orderbook(
938 &self,
939 instrument_id: InstrumentId,
940 depth: u32,
941 ) -> BybitWsResult<()> {
942 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
943 let topic = format!(
944 "{}.{depth}.{raw_symbol}",
945 BybitWsPublicChannel::OrderBook.as_ref()
946 );
947 self.subscribe(vec![topic]).await
948 }
949
950 pub async fn unsubscribe_orderbook(
952 &self,
953 instrument_id: InstrumentId,
954 depth: u32,
955 ) -> BybitWsResult<()> {
956 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
957 let topic = format!(
958 "{}.{depth}.{raw_symbol}",
959 BybitWsPublicChannel::OrderBook.as_ref()
960 );
961 self.unsubscribe(vec![topic]).await
962 }
963
964 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
974 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
975 let topic = format!(
976 "{}.{raw_symbol}",
977 BybitWsPublicChannel::PublicTrade.as_ref()
978 );
979 self.subscribe(vec![topic]).await
980 }
981
982 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
984 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
985 let topic = format!(
986 "{}.{raw_symbol}",
987 BybitWsPublicChannel::PublicTrade.as_ref()
988 );
989 self.unsubscribe(vec![topic]).await
990 }
991
992 pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1002 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1003 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1004 self.subscribe(vec![topic]).await
1005 }
1006
1007 pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1009 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1010 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1011
1012 let symbol = self.product_type.map_or_else(
1014 || instrument_id.symbol.inner(),
1015 |pt| make_bybit_symbol(raw_symbol, pt),
1016 );
1017 self.funding_cache.write().await.remove(&symbol);
1018
1019 self.unsubscribe(vec![topic]).await
1020 }
1021
1022 pub async fn subscribe_klines(
1032 &self,
1033 instrument_id: InstrumentId,
1034 interval: impl Into<String>,
1035 ) -> BybitWsResult<()> {
1036 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1037 let topic = format!(
1038 "{}.{}.{raw_symbol}",
1039 BybitWsPublicChannel::Kline.as_ref(),
1040 interval.into()
1041 );
1042 self.subscribe(vec![topic]).await
1043 }
1044
1045 pub async fn unsubscribe_klines(
1047 &self,
1048 instrument_id: InstrumentId,
1049 interval: impl Into<String>,
1050 ) -> BybitWsResult<()> {
1051 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1052 let topic = format!(
1053 "{}.{}.{raw_symbol}",
1054 BybitWsPublicChannel::Kline.as_ref(),
1055 interval.into()
1056 );
1057 self.unsubscribe(vec![topic]).await
1058 }
1059
1060 pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1070 if !self.requires_auth {
1071 return Err(BybitWsError::Authentication(
1072 "Order subscription requires authentication".to_string(),
1073 ));
1074 }
1075 self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1076 .await
1077 }
1078
1079 pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1081 self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1082 .await
1083 }
1084
1085 pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1095 if !self.requires_auth {
1096 return Err(BybitWsError::Authentication(
1097 "Execution subscription requires authentication".to_string(),
1098 ));
1099 }
1100 self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1101 .await
1102 }
1103
1104 pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1106 self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1107 .await
1108 }
1109
1110 pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1120 if !self.requires_auth {
1121 return Err(BybitWsError::Authentication(
1122 "Position subscription requires authentication".to_string(),
1123 ));
1124 }
1125 self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1126 .await
1127 }
1128
1129 pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1131 self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1132 .await
1133 }
1134
1135 pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1145 if !self.requires_auth {
1146 return Err(BybitWsError::Authentication(
1147 "Wallet subscription requires authentication".to_string(),
1148 ));
1149 }
1150 self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1151 .await
1152 }
1153
1154 pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1156 self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1157 .await
1158 }
1159
1160 pub async fn place_order(
1170 &self,
1171 params: BybitWsPlaceOrderParams,
1172 client_order_id: ClientOrderId,
1173 trader_id: TraderId,
1174 strategy_id: StrategyId,
1175 instrument_id: InstrumentId,
1176 ) -> BybitWsResult<()> {
1177 if !self.is_authenticated.load(Ordering::Relaxed) {
1178 return Err(BybitWsError::Authentication(
1179 "Must be authenticated to place orders".to_string(),
1180 ));
1181 }
1182
1183 let cmd = HandlerCommand::PlaceOrder {
1184 params,
1185 client_order_id,
1186 trader_id,
1187 strategy_id,
1188 instrument_id,
1189 };
1190
1191 self.send_cmd(cmd).await
1192 }
1193
1194 pub async fn amend_order(
1204 &self,
1205 params: BybitWsAmendOrderParams,
1206 client_order_id: ClientOrderId,
1207 trader_id: TraderId,
1208 strategy_id: StrategyId,
1209 instrument_id: InstrumentId,
1210 venue_order_id: Option<VenueOrderId>,
1211 ) -> BybitWsResult<()> {
1212 if !self.is_authenticated.load(Ordering::Relaxed) {
1213 return Err(BybitWsError::Authentication(
1214 "Must be authenticated to amend orders".to_string(),
1215 ));
1216 }
1217
1218 let cmd = HandlerCommand::AmendOrder {
1219 params,
1220 client_order_id,
1221 trader_id,
1222 strategy_id,
1223 instrument_id,
1224 venue_order_id,
1225 };
1226
1227 self.send_cmd(cmd).await
1228 }
1229
1230 pub async fn cancel_order(
1240 &self,
1241 params: BybitWsCancelOrderParams,
1242 client_order_id: ClientOrderId,
1243 trader_id: TraderId,
1244 strategy_id: StrategyId,
1245 instrument_id: InstrumentId,
1246 venue_order_id: Option<VenueOrderId>,
1247 ) -> BybitWsResult<()> {
1248 if !self.is_authenticated.load(Ordering::Relaxed) {
1249 return Err(BybitWsError::Authentication(
1250 "Must be authenticated to cancel orders".to_string(),
1251 ));
1252 }
1253
1254 let cmd = HandlerCommand::CancelOrder {
1255 params,
1256 client_order_id,
1257 trader_id,
1258 strategy_id,
1259 instrument_id,
1260 venue_order_id,
1261 };
1262
1263 self.send_cmd(cmd).await
1264 }
1265
1266 pub async fn batch_place_orders(
1276 &self,
1277 trader_id: TraderId,
1278 strategy_id: StrategyId,
1279 orders: Vec<BybitWsPlaceOrderParams>,
1280 ) -> BybitWsResult<()> {
1281 if !self.is_authenticated.load(Ordering::Relaxed) {
1282 return Err(BybitWsError::Authentication(
1283 "Must be authenticated to place orders".to_string(),
1284 ));
1285 }
1286
1287 if orders.is_empty() {
1288 tracing::warn!("Batch place orders called with empty orders list");
1289 return Ok(());
1290 }
1291
1292 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1293 self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1294 .await?;
1295 }
1296
1297 Ok(())
1298 }
1299
1300 async fn batch_place_orders_chunk(
1301 &self,
1302 trader_id: TraderId,
1303 strategy_id: StrategyId,
1304 orders: Vec<BybitWsPlaceOrderParams>,
1305 ) -> BybitWsResult<()> {
1306 let category = orders[0].category;
1307 let batch_req_id = UUID4::new().to_string();
1308
1309 let mut batch_order_data = Vec::new();
1311 for order in &orders {
1312 if let Some(order_link_id_str) = &order.order_link_id {
1313 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1314 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1315 let instrument_id = self
1316 .instruments_cache
1317 .get(&cache_key)
1318 .map(|inst| inst.id())
1319 .ok_or_else(|| {
1320 BybitWsError::ClientError(format!(
1321 "Instrument {cache_key} not found in cache"
1322 ))
1323 })?;
1324 batch_order_data.push((
1325 client_order_id,
1326 (client_order_id, trader_id, strategy_id, instrument_id),
1327 ));
1328 }
1329 }
1330
1331 if !batch_order_data.is_empty() {
1332 let cmd = HandlerCommand::RegisterBatchPlace {
1333 req_id: batch_req_id.clone(),
1334 orders: batch_order_data,
1335 };
1336 let cmd_tx = self.cmd_tx.read().await;
1337 if let Err(e) = cmd_tx.send(cmd) {
1338 tracing::error!("Failed to send RegisterBatchPlace command: {e}");
1339 }
1340 }
1341
1342 let mm_level = self.mm_level.load(Ordering::Relaxed);
1343 let has_non_post_only = orders
1344 .iter()
1345 .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1346 let referer = if has_non_post_only || mm_level == 0 {
1347 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1348 } else {
1349 None
1350 };
1351
1352 let request_items: Vec<BybitWsBatchPlaceItem> = orders
1353 .into_iter()
1354 .map(|order| BybitWsBatchPlaceItem {
1355 symbol: order.symbol,
1356 side: order.side,
1357 order_type: order.order_type,
1358 qty: order.qty,
1359 is_leverage: order.is_leverage,
1360 market_unit: order.market_unit,
1361 price: order.price,
1362 time_in_force: order.time_in_force,
1363 order_link_id: order.order_link_id,
1364 reduce_only: order.reduce_only,
1365 close_on_trigger: order.close_on_trigger,
1366 trigger_price: order.trigger_price,
1367 trigger_by: order.trigger_by,
1368 trigger_direction: order.trigger_direction,
1369 tpsl_mode: order.tpsl_mode,
1370 take_profit: order.take_profit,
1371 stop_loss: order.stop_loss,
1372 tp_trigger_by: order.tp_trigger_by,
1373 sl_trigger_by: order.sl_trigger_by,
1374 sl_trigger_price: order.sl_trigger_price,
1375 tp_trigger_price: order.tp_trigger_price,
1376 sl_order_type: order.sl_order_type,
1377 tp_order_type: order.tp_order_type,
1378 sl_limit_price: order.sl_limit_price,
1379 tp_limit_price: order.tp_limit_price,
1380 })
1381 .collect();
1382
1383 let args = BybitWsBatchPlaceOrderArgs {
1384 category,
1385 request: request_items,
1386 };
1387
1388 let request = BybitWsRequest {
1389 req_id: Some(batch_req_id),
1390 op: BybitWsOrderRequestOp::CreateBatch,
1391 header: BybitWsHeader::with_referer(referer),
1392 args: vec![args],
1393 };
1394
1395 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1396
1397 self.send_text(&payload).await
1398 }
1399
1400 pub async fn batch_amend_orders(
1406 &self,
1407 #[allow(unused_variables)] trader_id: TraderId,
1408 #[allow(unused_variables)] strategy_id: StrategyId,
1409 orders: Vec<BybitWsAmendOrderParams>,
1410 ) -> BybitWsResult<()> {
1411 if !self.is_authenticated.load(Ordering::Relaxed) {
1412 return Err(BybitWsError::Authentication(
1413 "Must be authenticated to amend orders".to_string(),
1414 ));
1415 }
1416
1417 if orders.is_empty() {
1418 tracing::warn!("Batch amend orders called with empty orders list");
1419 return Ok(());
1420 }
1421
1422 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1423 self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1424 .await?;
1425 }
1426
1427 Ok(())
1428 }
1429
1430 async fn batch_amend_orders_chunk(
1431 &self,
1432 #[allow(unused_variables)] trader_id: TraderId,
1433 #[allow(unused_variables)] strategy_id: StrategyId,
1434 orders: Vec<BybitWsAmendOrderParams>,
1435 ) -> BybitWsResult<()> {
1436 let request = BybitWsRequest {
1437 req_id: None,
1438 op: BybitWsOrderRequestOp::AmendBatch,
1439 header: BybitWsHeader::now(),
1440 args: orders,
1441 };
1442
1443 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1444
1445 self.send_text(&payload).await
1446 }
1447
1448 pub async fn batch_cancel_orders(
1454 &self,
1455 trader_id: TraderId,
1456 strategy_id: StrategyId,
1457 orders: Vec<BybitWsCancelOrderParams>,
1458 ) -> BybitWsResult<()> {
1459 if !self.is_authenticated.load(Ordering::Relaxed) {
1460 return Err(BybitWsError::Authentication(
1461 "Must be authenticated to cancel orders".to_string(),
1462 ));
1463 }
1464
1465 if orders.is_empty() {
1466 tracing::warn!("Batch cancel orders called with empty orders list");
1467 return Ok(());
1468 }
1469
1470 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1471 self.batch_cancel_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1472 .await?;
1473 }
1474
1475 Ok(())
1476 }
1477
1478 async fn batch_cancel_orders_chunk(
1479 &self,
1480 trader_id: TraderId,
1481 strategy_id: StrategyId,
1482 orders: Vec<BybitWsCancelOrderParams>,
1483 ) -> BybitWsResult<()> {
1484 if orders.is_empty() {
1485 return Ok(());
1486 }
1487
1488 let category = orders[0].category;
1489 let batch_req_id = UUID4::new().to_string();
1490
1491 let mut validated_data = Vec::new();
1492
1493 for order in &orders {
1494 if let Some(order_link_id_str) = &order.order_link_id {
1495 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1496 let instrument_id = self
1497 .instruments_cache
1498 .get(&cache_key)
1499 .map(|inst| inst.id())
1500 .ok_or_else(|| {
1501 BybitWsError::ClientError(format!(
1502 "Instrument {cache_key} not found in cache"
1503 ))
1504 })?;
1505
1506 let venue_order_id = order
1507 .order_id
1508 .as_ref()
1509 .map(|id| VenueOrderId::from(id.as_str()));
1510
1511 validated_data.push((order_link_id_str.clone(), instrument_id, venue_order_id));
1512 }
1513 }
1514
1515 let batch_cancel_data: Vec<_> = validated_data
1516 .iter()
1517 .map(|(order_link_id_str, instrument_id, venue_order_id)| {
1518 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1519 (
1520 client_order_id,
1521 (
1522 client_order_id,
1523 trader_id,
1524 strategy_id,
1525 *instrument_id,
1526 *venue_order_id,
1527 ),
1528 )
1529 })
1530 .collect();
1531
1532 if !batch_cancel_data.is_empty() {
1533 let cmd = HandlerCommand::RegisterBatchCancel {
1534 req_id: batch_req_id.clone(),
1535 cancels: batch_cancel_data,
1536 };
1537 let cmd_tx = self.cmd_tx.read().await;
1538 if let Err(e) = cmd_tx.send(cmd) {
1539 tracing::error!("Failed to send RegisterBatchCancel command: {e}");
1540 }
1541 }
1542
1543 let request_items: Vec<BybitWsBatchCancelItem> = orders
1544 .into_iter()
1545 .map(|order| BybitWsBatchCancelItem {
1546 symbol: order.symbol,
1547 order_id: order.order_id,
1548 order_link_id: order.order_link_id,
1549 })
1550 .collect();
1551
1552 let args = BybitWsBatchCancelOrderArgs {
1553 category,
1554 request: request_items,
1555 };
1556
1557 let request = BybitWsRequest {
1558 req_id: Some(batch_req_id),
1559 op: BybitWsOrderRequestOp::CancelBatch,
1560 header: BybitWsHeader::now(),
1561 args: vec![args],
1562 };
1563
1564 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1565
1566 self.send_text(&payload).await
1567 }
1568
1569 #[allow(clippy::too_many_arguments)]
1575 pub async fn submit_order(
1576 &self,
1577 product_type: BybitProductType,
1578 trader_id: TraderId,
1579 strategy_id: StrategyId,
1580 instrument_id: InstrumentId,
1581 client_order_id: ClientOrderId,
1582 order_side: OrderSide,
1583 order_type: OrderType,
1584 quantity: Quantity,
1585 is_quote_quantity: bool,
1586 time_in_force: Option<TimeInForce>,
1587 price: Option<Price>,
1588 trigger_price: Option<Price>,
1589 post_only: Option<bool>,
1590 reduce_only: Option<bool>,
1591 is_leverage: bool,
1592 ) -> BybitWsResult<()> {
1593 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1594 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1595 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1596
1597 let bybit_side = match order_side {
1598 OrderSide::Buy => BybitOrderSide::Buy,
1599 OrderSide::Sell => BybitOrderSide::Sell,
1600 _ => {
1601 return Err(BybitWsError::ClientError(format!(
1602 "Invalid order side: {order_side:?}"
1603 )));
1604 }
1605 };
1606
1607 let (bybit_order_type, is_stop_order) = match order_type {
1609 OrderType::Market => (BybitOrderType::Market, false),
1610 OrderType::Limit => (BybitOrderType::Limit, false),
1611 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1612 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1613 _ => {
1614 return Err(BybitWsError::ClientError(format!(
1615 "Unsupported order type: {order_type:?}"
1616 )));
1617 }
1618 };
1619
1620 let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1621 None
1622 } else if post_only == Some(true) {
1623 Some(BybitTimeInForce::PostOnly)
1624 } else if let Some(tif) = time_in_force {
1625 Some(match tif {
1626 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1627 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1628 TimeInForce::Fok => BybitTimeInForce::Fok,
1629 _ => {
1630 return Err(BybitWsError::ClientError(format!(
1631 "Unsupported time in force: {tif:?}"
1632 )));
1633 }
1634 })
1635 } else {
1636 None
1637 };
1638
1639 let market_unit = if product_type == BybitProductType::Spot
1642 && bybit_order_type == BybitOrderType::Market
1643 {
1644 if is_quote_quantity {
1645 Some(BYBIT_QUOTE_COIN.to_string())
1646 } else {
1647 Some(BYBIT_BASE_COIN.to_string())
1648 }
1649 } else {
1650 None
1651 };
1652
1653 let is_leverage_value = if product_type == BybitProductType::Spot {
1655 Some(i32::from(is_leverage))
1656 } else {
1657 None
1658 };
1659
1660 let trigger_direction = if is_stop_order {
1663 match (order_type, order_side) {
1664 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1665 Some(BybitTriggerDirection::RisesTo as i32)
1666 }
1667 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1668 Some(BybitTriggerDirection::FallsTo as i32)
1669 }
1670 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1671 Some(BybitTriggerDirection::FallsTo as i32)
1672 }
1673 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1674 Some(BybitTriggerDirection::RisesTo as i32)
1675 }
1676 _ => None,
1677 }
1678 } else {
1679 None
1680 };
1681
1682 let params = if is_stop_order {
1683 BybitWsPlaceOrderParams {
1686 category: product_type,
1687 symbol: raw_symbol,
1688 side: bybit_side,
1689 order_type: bybit_order_type,
1690 qty: quantity.to_string(),
1691 is_leverage: is_leverage_value,
1692 market_unit: market_unit.clone(),
1693 price: price.map(|p| p.to_string()),
1694 time_in_force: bybit_tif,
1695 order_link_id: Some(client_order_id.to_string()),
1696 reduce_only: reduce_only.filter(|&r| r),
1697 close_on_trigger: None,
1698 trigger_price: trigger_price.map(|p| p.to_string()),
1699 trigger_by: Some(BybitTriggerType::LastPrice),
1700 trigger_direction,
1701 tpsl_mode: None, take_profit: None,
1703 stop_loss: None,
1704 tp_trigger_by: None,
1705 sl_trigger_by: None,
1706 sl_trigger_price: None, tp_trigger_price: None, sl_order_type: None,
1709 tp_order_type: None,
1710 sl_limit_price: None,
1711 tp_limit_price: None,
1712 }
1713 } else {
1714 BybitWsPlaceOrderParams {
1716 category: product_type,
1717 symbol: raw_symbol,
1718 side: bybit_side,
1719 order_type: bybit_order_type,
1720 qty: quantity.to_string(),
1721 is_leverage: is_leverage_value,
1722 market_unit,
1723 price: price.map(|p| p.to_string()),
1724 time_in_force: if bybit_order_type == BybitOrderType::Market {
1725 None
1726 } else {
1727 bybit_tif
1728 },
1729 order_link_id: Some(client_order_id.to_string()),
1730 reduce_only: reduce_only.filter(|&r| r),
1731 close_on_trigger: None,
1732 trigger_price: None,
1733 trigger_by: None,
1734 trigger_direction: None,
1735 tpsl_mode: None,
1736 take_profit: None,
1737 stop_loss: None,
1738 tp_trigger_by: None,
1739 sl_trigger_by: None,
1740 sl_trigger_price: None,
1741 tp_trigger_price: None,
1742 sl_order_type: None,
1743 tp_order_type: None,
1744 sl_limit_price: None,
1745 tp_limit_price: None,
1746 }
1747 };
1748
1749 self.place_order(
1750 params,
1751 client_order_id,
1752 trader_id,
1753 strategy_id,
1754 instrument_id,
1755 )
1756 .await
1757 }
1758
1759 #[allow(clippy::too_many_arguments)]
1765 pub async fn modify_order(
1766 &self,
1767 product_type: BybitProductType,
1768 trader_id: TraderId,
1769 strategy_id: StrategyId,
1770 instrument_id: InstrumentId,
1771 client_order_id: ClientOrderId,
1772 venue_order_id: Option<VenueOrderId>,
1773 quantity: Option<Quantity>,
1774 price: Option<Price>,
1775 ) -> BybitWsResult<()> {
1776 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1777 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1778 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1779
1780 let params = BybitWsAmendOrderParams {
1781 category: product_type,
1782 symbol: raw_symbol,
1783 order_id: venue_order_id.map(|id| id.to_string()),
1784 order_link_id: Some(client_order_id.to_string()),
1785 qty: quantity.map(|q| q.to_string()),
1786 price: price.map(|p| p.to_string()),
1787 trigger_price: None,
1788 take_profit: None,
1789 stop_loss: None,
1790 tp_trigger_by: None,
1791 sl_trigger_by: None,
1792 };
1793
1794 self.amend_order(
1795 params,
1796 client_order_id,
1797 trader_id,
1798 strategy_id,
1799 instrument_id,
1800 venue_order_id,
1801 )
1802 .await
1803 }
1804
1805 pub async fn cancel_order_by_id(
1811 &self,
1812 product_type: BybitProductType,
1813 trader_id: TraderId,
1814 strategy_id: StrategyId,
1815 instrument_id: InstrumentId,
1816 client_order_id: ClientOrderId,
1817 venue_order_id: Option<VenueOrderId>,
1818 ) -> BybitWsResult<()> {
1819 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1820 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1821 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1822
1823 let params = BybitWsCancelOrderParams {
1824 category: product_type,
1825 symbol: raw_symbol,
1826 order_id: venue_order_id.map(|id| id.to_string()),
1827 order_link_id: Some(client_order_id.to_string()),
1828 };
1829
1830 self.cancel_order(
1831 params,
1832 client_order_id,
1833 trader_id,
1834 strategy_id,
1835 instrument_id,
1836 venue_order_id,
1837 )
1838 .await
1839 }
1840
1841 #[allow(clippy::too_many_arguments)]
1843 pub fn build_place_order_params(
1844 &self,
1845 product_type: BybitProductType,
1846 instrument_id: InstrumentId,
1847 client_order_id: ClientOrderId,
1848 order_side: OrderSide,
1849 order_type: OrderType,
1850 quantity: Quantity,
1851 is_quote_quantity: bool,
1852 time_in_force: Option<TimeInForce>,
1853 price: Option<Price>,
1854 trigger_price: Option<Price>,
1855 post_only: Option<bool>,
1856 reduce_only: Option<bool>,
1857 is_leverage: bool,
1858 ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1859 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1860 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1861 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1862
1863 let bybit_side = match order_side {
1864 OrderSide::Buy => BybitOrderSide::Buy,
1865 OrderSide::Sell => BybitOrderSide::Sell,
1866 _ => {
1867 return Err(BybitWsError::ClientError(format!(
1868 "Invalid order side: {order_side:?}"
1869 )));
1870 }
1871 };
1872
1873 let (bybit_order_type, is_stop_order) = match order_type {
1874 OrderType::Market => (BybitOrderType::Market, false),
1875 OrderType::Limit => (BybitOrderType::Limit, false),
1876 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1877 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1878 _ => {
1879 return Err(BybitWsError::ClientError(format!(
1880 "Unsupported order type: {order_type:?}"
1881 )));
1882 }
1883 };
1884
1885 let bybit_tif = if post_only == Some(true) {
1886 Some(BybitTimeInForce::PostOnly)
1887 } else if let Some(tif) = time_in_force {
1888 Some(match tif {
1889 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1890 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1891 TimeInForce::Fok => BybitTimeInForce::Fok,
1892 _ => {
1893 return Err(BybitWsError::ClientError(format!(
1894 "Unsupported time in force: {tif:?}"
1895 )));
1896 }
1897 })
1898 } else {
1899 None
1900 };
1901
1902 let market_unit = if product_type == BybitProductType::Spot
1903 && bybit_order_type == BybitOrderType::Market
1904 {
1905 if is_quote_quantity {
1906 Some(BYBIT_QUOTE_COIN.to_string())
1907 } else {
1908 Some(BYBIT_BASE_COIN.to_string())
1909 }
1910 } else {
1911 None
1912 };
1913
1914 let is_leverage_value = if product_type == BybitProductType::Spot {
1916 Some(i32::from(is_leverage))
1917 } else {
1918 None
1919 };
1920
1921 let trigger_direction = if is_stop_order {
1924 match (order_type, order_side) {
1925 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1926 Some(BybitTriggerDirection::RisesTo as i32)
1927 }
1928 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1929 Some(BybitTriggerDirection::FallsTo as i32)
1930 }
1931 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1932 Some(BybitTriggerDirection::FallsTo as i32)
1933 }
1934 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1935 Some(BybitTriggerDirection::RisesTo as i32)
1936 }
1937 _ => None,
1938 }
1939 } else {
1940 None
1941 };
1942
1943 let params = if is_stop_order {
1944 BybitWsPlaceOrderParams {
1945 category: product_type,
1946 symbol: raw_symbol,
1947 side: bybit_side,
1948 order_type: bybit_order_type,
1949 qty: quantity.to_string(),
1950 is_leverage: is_leverage_value,
1951 market_unit,
1952 price: price.map(|p| p.to_string()),
1953 time_in_force: if bybit_order_type == BybitOrderType::Market {
1954 None
1955 } else {
1956 bybit_tif
1957 },
1958 order_link_id: Some(client_order_id.to_string()),
1959 reduce_only: reduce_only.filter(|&r| r),
1960 close_on_trigger: None,
1961 trigger_price: trigger_price.map(|p| p.to_string()),
1962 trigger_by: Some(BybitTriggerType::LastPrice),
1963 trigger_direction,
1964 tpsl_mode: None,
1965 take_profit: None,
1966 stop_loss: None,
1967 tp_trigger_by: None,
1968 sl_trigger_by: None,
1969 sl_trigger_price: None,
1970 tp_trigger_price: None,
1971 sl_order_type: None,
1972 tp_order_type: None,
1973 sl_limit_price: None,
1974 tp_limit_price: None,
1975 }
1976 } else {
1977 BybitWsPlaceOrderParams {
1978 category: product_type,
1979 symbol: raw_symbol,
1980 side: bybit_side,
1981 order_type: bybit_order_type,
1982 qty: quantity.to_string(),
1983 is_leverage: is_leverage_value,
1984 market_unit,
1985 price: price.map(|p| p.to_string()),
1986 time_in_force: if bybit_order_type == BybitOrderType::Market {
1987 None
1988 } else {
1989 bybit_tif
1990 },
1991 order_link_id: Some(client_order_id.to_string()),
1992 reduce_only: reduce_only.filter(|&r| r),
1993 close_on_trigger: None,
1994 trigger_price: None,
1995 trigger_by: None,
1996 trigger_direction: None,
1997 tpsl_mode: None,
1998 take_profit: None,
1999 stop_loss: None,
2000 tp_trigger_by: None,
2001 sl_trigger_by: None,
2002 sl_trigger_price: None,
2003 tp_trigger_price: None,
2004 sl_order_type: None,
2005 tp_order_type: None,
2006 sl_limit_price: None,
2007 tp_limit_price: None,
2008 }
2009 };
2010
2011 Ok(params)
2012 }
2013
2014 #[allow(clippy::too_many_arguments)]
2016 pub fn build_amend_order_params(
2017 &self,
2018 product_type: BybitProductType,
2019 instrument_id: InstrumentId,
2020 venue_order_id: Option<VenueOrderId>,
2021 client_order_id: Option<ClientOrderId>,
2022 quantity: Option<Quantity>,
2023 price: Option<Price>,
2024 ) -> BybitWsResult<BybitWsAmendOrderParams> {
2025 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2026 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2027 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2028
2029 Ok(BybitWsAmendOrderParams {
2030 category: product_type,
2031 symbol: raw_symbol,
2032 order_id: venue_order_id.map(|v| v.to_string()),
2033 order_link_id: client_order_id.map(|c| c.to_string()),
2034 qty: quantity.map(|q| q.to_string()),
2035 price: price.map(|p| p.to_string()),
2036 trigger_price: None,
2037 take_profit: None,
2038 stop_loss: None,
2039 tp_trigger_by: None,
2040 sl_trigger_by: None,
2041 })
2042 }
2043
2044 pub fn build_cancel_order_params(
2051 &self,
2052 product_type: BybitProductType,
2053 instrument_id: InstrumentId,
2054 venue_order_id: Option<VenueOrderId>,
2055 client_order_id: Option<ClientOrderId>,
2056 ) -> BybitWsResult<BybitWsCancelOrderParams> {
2057 if venue_order_id.is_none() && client_order_id.is_none() {
2058 return Err(BybitWsError::ClientError(
2059 "Either venue_order_id or client_order_id must be provided".to_string(),
2060 ));
2061 }
2062
2063 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2064 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2065 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2066
2067 Ok(BybitWsCancelOrderParams {
2068 category: product_type,
2069 symbol: raw_symbol,
2070 order_id: venue_order_id.map(|v| v.to_string()),
2071 order_link_id: client_order_id.map(|c| c.to_string()),
2072 })
2073 }
2074
2075 fn default_headers() -> Vec<(String, String)> {
2076 vec![
2077 ("Content-Type".to_string(), "application/json".to_string()),
2078 ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2079 ]
2080 }
2081
2082 async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2083 if !self.requires_auth {
2084 return Ok(());
2085 }
2086
2087 let credential = self.credential.as_ref().ok_or_else(|| {
2088 BybitWsError::Authentication("Credentials required for authentication".to_string())
2089 })?;
2090
2091 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2092 let signature = credential.sign_websocket_auth(expires);
2093
2094 let auth_message = BybitAuthRequest {
2095 op: BybitWsOperation::Auth,
2096 args: vec![
2097 Value::String(credential.api_key().to_string()),
2098 Value::Number(expires.into()),
2099 Value::String(signature),
2100 ],
2101 };
2102
2103 let payload = serde_json::to_string(&auth_message)?;
2104
2105 self.cmd_tx
2106 .read()
2107 .await
2108 .send(HandlerCommand::Authenticate { payload })
2109 .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2110
2111 Ok(())
2114 }
2115
2116 async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2117 let cmd = HandlerCommand::SendText {
2118 payload: text.to_string(),
2119 };
2120
2121 self.send_cmd(cmd).await
2122 }
2123
2124 async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2125 self.cmd_tx
2126 .read()
2127 .await
2128 .send(cmd)
2129 .map_err(|e| BybitWsError::Send(e.to_string()))
2130 }
2131}
2132
2133#[cfg(test)]
2134mod tests {
2135 use rstest::rstest;
2136
2137 use super::*;
2138 use crate::{
2139 common::testing::load_test_json,
2140 websocket::{classify_bybit_message, messages::BybitWsMessage},
2141 };
2142
2143 #[rstest]
2144 fn classify_orderbook_snapshot() {
2145 let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2146 .expect("invalid fixture");
2147 let message = classify_bybit_message(json);
2148 assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2149 }
2150
2151 #[rstest]
2152 fn classify_trade_snapshot() {
2153 let json: Value =
2154 serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2155 let message = classify_bybit_message(json);
2156 assert!(matches!(message, BybitWsMessage::Trade(_)));
2157 }
2158
2159 #[rstest]
2160 fn classify_ticker_linear_snapshot() {
2161 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2162 .expect("invalid fixture");
2163 let message = classify_bybit_message(json);
2164 assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2165 }
2166
2167 #[rstest]
2168 fn classify_ticker_option_snapshot() {
2169 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2170 .expect("invalid fixture");
2171 let message = classify_bybit_message(json);
2172 assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2173 }
2174
2175 #[rstest]
2176 fn test_race_unsubscribe_failure_recovery() {
2177 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2185
2186 subscriptions.mark_subscribe(topic);
2188 subscriptions.confirm_subscribe(topic);
2189 assert_eq!(subscriptions.len(), 1);
2190
2191 subscriptions.mark_unsubscribe(topic);
2193 assert_eq!(subscriptions.len(), 0);
2194 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2195
2196 subscriptions.confirm_unsubscribe(topic); subscriptions.mark_subscribe(topic); subscriptions.confirm_subscribe(topic); assert_eq!(subscriptions.len(), 1);
2204 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2205 assert!(subscriptions.pending_subscribe_topics().is_empty());
2206
2207 let all = subscriptions.all_topics();
2209 assert_eq!(all.len(), 1);
2210 assert!(all.contains(&topic.to_string()));
2211 }
2212
2213 #[rstest]
2214 fn test_race_resubscribe_before_unsubscribe_ack() {
2215 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "orderbook.50.BTCUSDT";
2221
2222 subscriptions.mark_subscribe(topic);
2224 subscriptions.confirm_subscribe(topic);
2225 assert_eq!(subscriptions.len(), 1);
2226
2227 subscriptions.mark_unsubscribe(topic);
2229 assert_eq!(subscriptions.len(), 0);
2230 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2231
2232 subscriptions.mark_subscribe(topic);
2234 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2235
2236 subscriptions.confirm_unsubscribe(topic);
2238 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2239 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2240
2241 subscriptions.confirm_subscribe(topic);
2243 assert_eq!(subscriptions.len(), 1);
2244 assert!(subscriptions.pending_subscribe_topics().is_empty());
2245
2246 let all = subscriptions.all_topics();
2248 assert_eq!(all.len(), 1);
2249 assert!(all.contains(&topic.to_string()));
2250 }
2251
2252 #[rstest]
2253 fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2254 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "tickers.ETHUSDT";
2259
2260 subscriptions.mark_subscribe(topic);
2262 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2263
2264 subscriptions.mark_unsubscribe(topic);
2266 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2268
2269 subscriptions.confirm_subscribe(topic);
2271 assert_eq!(subscriptions.len(), 0); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2273
2274 subscriptions.confirm_unsubscribe(topic);
2276
2277 assert!(subscriptions.is_empty());
2279 assert!(subscriptions.all_topics().is_empty());
2280 }
2281
2282 #[rstest]
2283 fn test_race_reconnection_with_pending_states() {
2284 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let trade_btc = "publicTrade.BTCUSDT";
2290 subscriptions.mark_subscribe(trade_btc);
2291 subscriptions.confirm_subscribe(trade_btc);
2292
2293 let trade_eth = "publicTrade.ETHUSDT";
2295 subscriptions.mark_subscribe(trade_eth);
2296
2297 let book_btc = "orderbook.50.BTCUSDT";
2299 subscriptions.mark_subscribe(book_btc);
2300 subscriptions.confirm_subscribe(book_btc);
2301 subscriptions.mark_unsubscribe(book_btc);
2302
2303 let topics_to_restore = subscriptions.all_topics();
2305
2306 assert_eq!(topics_to_restore.len(), 2);
2308 assert!(topics_to_restore.contains(&trade_btc.to_string()));
2309 assert!(topics_to_restore.contains(&trade_eth.to_string()));
2310 assert!(!topics_to_restore.contains(&book_btc.to_string())); }
2312
2313 #[rstest]
2314 fn test_race_duplicate_subscribe_messages_idempotent() {
2315 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2320
2321 subscriptions.mark_subscribe(topic);
2323 subscriptions.confirm_subscribe(topic);
2324 assert_eq!(subscriptions.len(), 1);
2325
2326 subscriptions.mark_subscribe(topic);
2328 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.len(), 1); subscriptions.confirm_subscribe(topic);
2333 assert_eq!(subscriptions.len(), 1);
2334
2335 let all = subscriptions.all_topics();
2337 assert_eq!(all.len(), 1);
2338 assert_eq!(all[0], topic);
2339 }
2340
2341 #[rstest]
2342 #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2343 #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2344 #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2345 #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2346 #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2347 #[case::option_with_leverage(BybitProductType::Option, true, None)]
2348 fn test_is_leverage_parameter(
2349 #[case] product_type: BybitProductType,
2350 #[case] is_leverage: bool,
2351 #[case] expected: Option<i32>,
2352 ) {
2353 let symbol = match product_type {
2354 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2355 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2356 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2357 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2358 };
2359
2360 let instrument_id = InstrumentId::from(symbol);
2361 let client_order_id = ClientOrderId::from("test-order-1");
2362 let quantity = Quantity::from("1.0");
2363
2364 let client = BybitWebSocketClient::new_trade(
2365 BybitEnvironment::Testnet,
2366 Some("test-key".to_string()),
2367 Some("test-secret".to_string()),
2368 None,
2369 Some(20),
2370 );
2371
2372 let params = client
2373 .build_place_order_params(
2374 product_type,
2375 instrument_id,
2376 client_order_id,
2377 OrderSide::Buy,
2378 OrderType::Limit,
2379 quantity,
2380 false, Some(TimeInForce::Gtc),
2382 Some(Price::from("50000.0")),
2383 None,
2384 None,
2385 None,
2386 is_leverage,
2387 )
2388 .expect("Failed to build params");
2389
2390 assert_eq!(params.is_leverage, expected);
2391 }
2392
2393 #[rstest]
2394 #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2395 #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2396 #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2397 #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2398 #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2399 #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2400 fn test_is_quote_quantity_parameter(
2401 #[case] product_type: BybitProductType,
2402 #[case] order_type: OrderType,
2403 #[case] is_quote_quantity: bool,
2404 #[case] expected: Option<String>,
2405 ) {
2406 let symbol = match product_type {
2407 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2408 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2409 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2410 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2411 };
2412
2413 let instrument_id = InstrumentId::from(symbol);
2414 let client_order_id = ClientOrderId::from("test-order-1");
2415 let quantity = Quantity::from("1.0");
2416
2417 let client = BybitWebSocketClient::new_trade(
2418 BybitEnvironment::Testnet,
2419 Some("test-key".to_string()),
2420 Some("test-secret".to_string()),
2421 None,
2422 Some(20),
2423 );
2424
2425 let params = client
2426 .build_place_order_params(
2427 product_type,
2428 instrument_id,
2429 client_order_id,
2430 OrderSide::Buy,
2431 order_type,
2432 quantity,
2433 is_quote_quantity,
2434 Some(TimeInForce::Gtc),
2435 if order_type == OrderType::Market {
2436 None
2437 } else {
2438 Some(Price::from("50000.0"))
2439 },
2440 None,
2441 None,
2442 None,
2443 false,
2444 )
2445 .expect("Failed to build params");
2446
2447 assert_eq!(params.market_unit, expected);
2448 }
2449}