1use std::{
21 fmt::Debug,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU8, Ordering},
25 },
26 time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::live::runtime::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt};
34use nautilus_model::{
35 enums::{OrderSide, OrderType, TimeInForce},
36 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 types::{Price, Quantity},
39};
40use nautilus_network::{
41 backoff::ExponentialBackoff,
42 mode::ConnectionMode,
43 websocket::{
44 AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
45 channel_message_handler,
46 },
47};
48use serde_json::Value;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use crate::{
53 common::{
54 consts::{
55 BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
56 },
57 credential::Credential,
58 enums::{
59 BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
60 BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
61 },
62 parse::{extract_raw_symbol, make_bybit_symbol},
63 symbol::BybitSymbol,
64 urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
65 },
66 websocket::{
67 enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
68 error::{BybitWsError, BybitWsResult},
69 handler::{FeedHandler, HandlerCommand},
70 messages::{
71 BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
72 BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
73 BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
74 NautilusWsMessage,
75 },
76 },
77};
78
79const DEFAULT_HEARTBEAT_SECS: u64 = 20;
80const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
81const BATCH_PROCESSING_LIMIT: usize = 20;
82
83type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
85
86fn resolve_credential(
93 environment: BybitEnvironment,
94 api_key: Option<String>,
95 api_secret: Option<String>,
96) -> Option<Credential> {
97 let (api_key_env, api_secret_env) = match environment {
98 BybitEnvironment::Demo => ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET"),
99 BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
100 BybitEnvironment::Mainnet => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
101 };
102
103 let key = get_or_env_var_opt(api_key, api_key_env);
104 let secret = get_or_env_var_opt(api_secret, api_secret_env);
105
106 match (key, secret) {
107 (Some(k), Some(s)) => Some(Credential::new(k, s)),
108 _ => None,
109 }
110}
111
112#[cfg_attr(feature = "python", pyo3::pyclass)]
114pub struct BybitWebSocketClient {
115 url: String,
116 environment: BybitEnvironment,
117 product_type: Option<BybitProductType>,
118 credential: Option<Credential>,
119 requires_auth: bool,
120 auth_tracker: AuthTracker,
121 heartbeat: Option<u64>,
122 connection_mode: Arc<ArcSwap<AtomicU8>>,
123 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
124 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
125 signal: Arc<AtomicBool>,
126 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
127 subscriptions: SubscriptionState,
128 is_authenticated: Arc<AtomicBool>,
129 account_id: Option<AccountId>,
130 mm_level: Arc<AtomicU8>,
131 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
132 funding_cache: FundingCache,
133 cancellation_token: CancellationToken,
134}
135
136impl Debug for BybitWebSocketClient {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("BybitWebSocketClient")
139 .field("url", &self.url)
140 .field("environment", &self.environment)
141 .field("product_type", &self.product_type)
142 .field("requires_auth", &self.requires_auth)
143 .field("heartbeat", &self.heartbeat)
144 .field("confirmed_subscriptions", &self.subscriptions.len())
145 .finish()
146 }
147}
148
149impl Clone for BybitWebSocketClient {
150 fn clone(&self) -> Self {
151 Self {
152 url: self.url.clone(),
153 environment: self.environment,
154 product_type: self.product_type,
155 credential: self.credential.clone(),
156 requires_auth: self.requires_auth,
157 auth_tracker: self.auth_tracker.clone(),
158 heartbeat: self.heartbeat,
159 connection_mode: Arc::clone(&self.connection_mode),
160 cmd_tx: Arc::clone(&self.cmd_tx),
161 out_rx: None, signal: Arc::clone(&self.signal),
163 task_handle: None, subscriptions: self.subscriptions.clone(),
165 is_authenticated: Arc::clone(&self.is_authenticated),
166 account_id: self.account_id,
167 mm_level: Arc::clone(&self.mm_level),
168 instruments_cache: Arc::clone(&self.instruments_cache),
169 funding_cache: Arc::clone(&self.funding_cache),
170 cancellation_token: self.cancellation_token.clone(),
171 }
172 }
173}
174
175impl BybitWebSocketClient {
176 #[must_use]
178 pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
179 Self::new_public_with(
180 BybitProductType::Linear,
181 BybitEnvironment::Mainnet,
182 url,
183 heartbeat,
184 )
185 }
186
187 #[must_use]
189 pub fn new_public_with(
190 product_type: BybitProductType,
191 environment: BybitEnvironment,
192 url: Option<String>,
193 heartbeat: Option<u64>,
194 ) -> Self {
195 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
199
200 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
201 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
202
203 Self {
204 url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
205 environment,
206 product_type: Some(product_type),
207 credential: None,
208 requires_auth: false,
209 auth_tracker: AuthTracker::new(),
210 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
211 connection_mode,
212 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
213 out_rx: None,
214 signal: Arc::new(AtomicBool::new(false)),
215 task_handle: None,
216 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
217 is_authenticated: Arc::new(AtomicBool::new(false)),
218 instruments_cache: Arc::new(DashMap::new()),
219 account_id: None,
220 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
221 cancellation_token: CancellationToken::new(),
222 mm_level: Arc::new(AtomicU8::new(0)),
223 }
224 }
225
226 #[must_use]
234 pub fn new_private(
235 environment: BybitEnvironment,
236 api_key: Option<String>,
237 api_secret: Option<String>,
238 url: Option<String>,
239 heartbeat: Option<u64>,
240 ) -> Self {
241 let credential = resolve_credential(environment, api_key, api_secret);
242
243 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
247
248 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
249 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
250
251 Self {
252 url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
253 environment,
254 product_type: None,
255 credential,
256 requires_auth: true,
257 auth_tracker: AuthTracker::new(),
258 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
259 connection_mode,
260 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
261 out_rx: None,
262 signal: Arc::new(AtomicBool::new(false)),
263 task_handle: None,
264 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
265 is_authenticated: Arc::new(AtomicBool::new(false)),
266 instruments_cache: Arc::new(DashMap::new()),
267 account_id: None,
268 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
269 cancellation_token: CancellationToken::new(),
270 mm_level: Arc::new(AtomicU8::new(0)),
271 }
272 }
273
274 #[must_use]
282 pub fn new_trade(
283 environment: BybitEnvironment,
284 api_key: Option<String>,
285 api_secret: Option<String>,
286 url: Option<String>,
287 heartbeat: Option<u64>,
288 ) -> Self {
289 let credential = resolve_credential(environment, api_key, api_secret);
290
291 let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
295
296 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
297 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
298
299 Self {
300 url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
301 environment,
302 product_type: None,
303 credential,
304 requires_auth: true,
305 auth_tracker: AuthTracker::new(),
306 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
307 connection_mode,
308 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
309 out_rx: None,
310 signal: Arc::new(AtomicBool::new(false)),
311 task_handle: None,
312 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
313 is_authenticated: Arc::new(AtomicBool::new(false)),
314 instruments_cache: Arc::new(DashMap::new()),
315 account_id: None,
316 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
317 cancellation_token: CancellationToken::new(),
318 mm_level: Arc::new(AtomicU8::new(0)),
319 }
320 }
321
322 pub async fn connect(&mut self) -> BybitWsResult<()> {
329 self.signal.store(false, Ordering::Relaxed);
330
331 let (raw_handler, raw_rx) = channel_message_handler();
332
333 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
336 });
338
339 let ping_msg = serde_json::to_string(&BybitSubscription {
340 op: BybitWsOperation::Ping,
341 args: vec![],
342 })?;
343
344 let config = WebSocketConfig {
345 url: self.url.clone(),
346 headers: Self::default_headers(),
347 message_handler: Some(raw_handler),
348 heartbeat: self.heartbeat,
349 heartbeat_msg: Some(ping_msg),
350 ping_handler: Some(ping_handler),
351 reconnect_timeout_ms: Some(5_000),
352 reconnect_delay_initial_ms: Some(500),
353 reconnect_delay_max_ms: Some(5_000),
354 reconnect_backoff_factor: Some(1.5),
355 reconnect_jitter_ms: Some(250),
356 reconnect_max_attempts: None,
357 };
358
359 const MAX_RETRIES: u32 = 5;
362 const CONNECTION_TIMEOUT_SECS: u64 = 10;
363
364 let mut backoff = ExponentialBackoff::new(
365 Duration::from_millis(500),
366 Duration::from_millis(5000),
367 2.0,
368 250,
369 false,
370 )
371 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
372
373 #[allow(unused_assignments)]
374 let mut last_error = String::new();
375 let mut attempt = 0;
376 let client = loop {
377 attempt += 1;
378
379 match tokio::time::timeout(
380 Duration::from_secs(CONNECTION_TIMEOUT_SECS),
381 WebSocketClient::connect(config.clone(), None, vec![], None),
382 )
383 .await
384 {
385 Ok(Ok(client)) => {
386 if attempt > 1 {
387 tracing::info!("WebSocket connection established after {attempt} attempts");
388 }
389 break client;
390 }
391 Ok(Err(e)) => {
392 last_error = e.to_string();
393 tracing::warn!(
394 attempt,
395 max_retries = MAX_RETRIES,
396 url = %self.url,
397 error = %last_error,
398 "WebSocket connection attempt failed"
399 );
400 }
401 Err(_) => {
402 last_error = format!(
403 "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
404 );
405 tracing::warn!(
406 attempt,
407 max_retries = MAX_RETRIES,
408 url = %self.url,
409 "WebSocket connection attempt timed out"
410 );
411 }
412 }
413
414 if attempt >= MAX_RETRIES {
415 return Err(BybitWsError::Transport(format!(
416 "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
417 If this is a DNS error, check your network configuration and DNS settings.",
418 self.url,
419 if last_error.is_empty() {
420 "unknown error"
421 } else {
422 &last_error
423 }
424 )));
425 }
426
427 let delay = backoff.next_duration();
428 tracing::debug!(
429 "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
430 attempt + 1
431 );
432 tokio::time::sleep(delay).await;
433 };
434
435 self.connection_mode.store(client.connection_mode_atomic());
436
437 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
438 self.out_rx = Some(Arc::new(out_rx));
439
440 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
441 *self.cmd_tx.write().await = cmd_tx.clone();
442
443 let cmd = HandlerCommand::SetClient(client);
444
445 self.send_cmd(cmd).await?;
446
447 if !self.instruments_cache.is_empty() {
449 let cached_instruments: Vec<InstrumentAny> = self
450 .instruments_cache
451 .iter()
452 .map(|entry| entry.value().clone())
453 .collect();
454 let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
455 self.send_cmd(cmd).await?;
456 }
457
458 let signal = Arc::clone(&self.signal);
459 let subscriptions = self.subscriptions.clone();
460 let credential = self.credential.clone();
461 let requires_auth = self.requires_auth;
462 let funding_cache = Arc::clone(&self.funding_cache);
463 let account_id = self.account_id;
464 let product_type = self.product_type;
465 let mm_level = Arc::clone(&self.mm_level);
466 let cmd_tx_for_reconnect = cmd_tx.clone();
467 let auth_tracker = self.auth_tracker.clone();
468 let is_authenticated = Arc::clone(&self.is_authenticated);
469
470 let stream_handle = get_runtime().spawn(async move {
471 let mut handler = FeedHandler::new(
472 signal.clone(),
473 cmd_rx,
474 raw_rx,
475 out_tx.clone(),
476 account_id,
477 product_type,
478 mm_level.clone(),
479 auth_tracker,
480 subscriptions.clone(),
481 funding_cache.clone(),
482 );
483
484 let resubscribe_all = || async {
486 let topics = subscriptions.all_topics();
487
488 if topics.is_empty() {
489 return;
490 }
491
492 tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
493
494 for topic in &topics {
495 subscriptions.mark_subscribe(topic.as_str());
496 }
497
498 let mut payloads = Vec::with_capacity(topics.len());
499 for topic in &topics {
500 let message = BybitSubscription {
501 op: BybitWsOperation::Subscribe,
502 args: vec![topic.clone()],
503 };
504 if let Ok(payload) = serde_json::to_string(&message) {
505 payloads.push(payload);
506 }
507 }
508
509 let cmd = HandlerCommand::Subscribe { topics: payloads };
510
511 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
512 tracing::error!("Failed to send resubscribe command: {e}");
513 }
514 };
515
516 loop {
518 match handler.next().await {
519 Some(NautilusWsMessage::Reconnected) => {
520 if signal.load(Ordering::Relaxed) {
521 continue;
522 }
523
524 tracing::info!("WebSocket reconnected");
525
526 let confirmed_topics: Vec<String> = {
528 let confirmed = subscriptions.confirmed();
529 let mut topics = Vec::new();
530 for entry in confirmed.iter() {
531 let (channel, symbols) = entry.pair();
532 for symbol in symbols {
533 if symbol.is_empty() {
534 topics.push(channel.to_string());
535 } else {
536 topics.push(format!("{channel}.{symbol}"));
537 }
538 }
539 }
540 topics
541 };
542
543 if !confirmed_topics.is_empty() {
544 tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
545 for topic in confirmed_topics {
546 subscriptions.mark_failure(&topic);
547 }
548 }
549
550 funding_cache.write().await.clear();
552
553 if requires_auth {
554 is_authenticated.store(false, Ordering::Relaxed);
555 tracing::debug!("Re-authenticating after reconnection");
556
557 if let Some(cred) = &credential {
558 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
559 let signature = cred.sign_websocket_auth(expires);
560
561 let auth_message = BybitAuthRequest {
562 op: BybitWsOperation::Auth,
563 args: vec![
564 Value::String(cred.api_key().to_string()),
565 Value::Number(expires.into()),
566 Value::String(signature),
567 ],
568 };
569
570 if let Ok(payload) = serde_json::to_string(&auth_message) {
571 let cmd = HandlerCommand::Authenticate { payload };
572 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
573 tracing::error!(error = %e, "Failed to send reconnection auth command");
574 }
575 } else {
576 tracing::error!("Failed to serialize reconnection auth message");
577 }
578 }
579 }
580
581 if !requires_auth {
584 tracing::debug!("No authentication required, resubscribing immediately");
585 resubscribe_all().await;
586 }
587
588 if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
590 tracing::debug!("Receiver dropped, stopping");
591 break;
592 }
593 continue;
594 }
595 Some(NautilusWsMessage::Authenticated) => {
596 tracing::debug!("Authenticated, resubscribing");
597 is_authenticated.store(true, Ordering::Relaxed);
598 resubscribe_all().await;
599 continue;
600 }
601 Some(msg) => {
602 if out_tx.send(msg).is_err() {
603 tracing::error!("Failed to send message (receiver dropped)");
604 break;
605 }
606 }
607 None => {
608 if handler.is_stopped() {
610 tracing::debug!("Stop signal received, ending message processing");
611 break;
612 }
613 tracing::warn!("WebSocket stream ended unexpectedly");
615 break;
616 }
617 }
618 }
619
620 tracing::debug!("Handler task exiting");
621 });
622
623 self.task_handle = Some(Arc::new(stream_handle));
624
625 if requires_auth && let Err(e) = self.authenticate_if_required().await {
626 return Err(e);
627 }
628
629 Ok(())
630 }
631
632 pub async fn close(&mut self) -> BybitWsResult<()> {
634 tracing::debug!("Starting close process");
635
636 self.signal.store(true, Ordering::Relaxed);
637
638 let cmd = HandlerCommand::Disconnect;
639 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
640 tracing::debug!(
641 "Failed to send disconnect command (handler may already be shut down): {e}"
642 );
643 }
644
645 if let Some(task_handle) = self.task_handle.take() {
646 match Arc::try_unwrap(task_handle) {
647 Ok(handle) => {
648 tracing::debug!("Waiting for task handle to complete");
649 match tokio::time::timeout(Duration::from_secs(2), handle).await {
650 Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
651 Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
652 Err(_) => {
653 tracing::warn!(
654 "Timeout waiting for task handle, task may still be running"
655 );
656 }
658 }
659 }
660 Err(arc_handle) => {
661 tracing::debug!(
662 "Cannot take ownership of task handle - other references exist, aborting task"
663 );
664 arc_handle.abort();
665 }
666 }
667 } else {
668 tracing::debug!("No task handle to await");
669 }
670
671 self.is_authenticated.store(false, Ordering::Relaxed);
672
673 tracing::debug!("Closed");
674
675 Ok(())
676 }
677
678 #[must_use]
680 pub fn is_active(&self) -> bool {
681 let connection_mode_arc = self.connection_mode.load();
682 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
683 && !self.signal.load(Ordering::Relaxed)
684 }
685
686 pub fn is_closed(&self) -> bool {
688 let connection_mode_arc = self.connection_mode.load();
689 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
690 || self.signal.load(Ordering::Relaxed)
691 }
692
693 pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
699 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
700
701 tokio::time::timeout(timeout, async {
702 while !self.is_active() {
703 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
704 }
705 })
706 .await
707 .map_err(|_| {
708 BybitWsError::ClientError(format!(
709 "WebSocket connection timeout after {timeout_secs} seconds"
710 ))
711 })?;
712
713 Ok(())
714 }
715
716 pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
718 if topics.is_empty() {
719 return Ok(());
720 }
721
722 tracing::debug!("Subscribing to topics: {topics:?}");
723
724 let mut topics_to_send = Vec::new();
726
727 for topic in topics {
728 if self.subscriptions.add_reference(&topic) {
730 self.subscriptions.mark_subscribe(&topic);
731 topics_to_send.push(topic.clone());
732 } else {
733 tracing::debug!("Already subscribed to {topic}, skipping duplicate subscription");
734 }
735 }
736
737 if topics_to_send.is_empty() {
738 return Ok(());
739 }
740
741 let mut payloads = Vec::with_capacity(topics_to_send.len());
743 for topic in &topics_to_send {
744 let message = BybitSubscription {
745 op: BybitWsOperation::Subscribe,
746 args: vec![topic.clone()],
747 };
748 let payload = serde_json::to_string(&message).map_err(|e| {
749 BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
750 })?;
751 payloads.push(payload);
752 }
753
754 let cmd = HandlerCommand::Subscribe { topics: payloads };
755 self.cmd_tx
756 .read()
757 .await
758 .send(cmd)
759 .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
760
761 Ok(())
762 }
763
764 pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
766 if topics.is_empty() {
767 return Ok(());
768 }
769
770 tracing::debug!("Attempting to unsubscribe from topics: {topics:?}");
771
772 if self.signal.load(Ordering::Relaxed) {
773 tracing::debug!("Shutdown signal detected, skipping unsubscribe");
774 return Ok(());
775 }
776
777 let mut topics_to_send = Vec::new();
779
780 for topic in topics {
781 if self.subscriptions.remove_reference(&topic) {
783 self.subscriptions.mark_unsubscribe(&topic);
784 topics_to_send.push(topic.clone());
785 } else {
786 tracing::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
787 }
788 }
789
790 if topics_to_send.is_empty() {
791 return Ok(());
792 }
793
794 let mut payloads = Vec::with_capacity(topics_to_send.len());
796 for topic in &topics_to_send {
797 let message = BybitSubscription {
798 op: BybitWsOperation::Unsubscribe,
799 args: vec![topic.clone()],
800 };
801 if let Ok(payload) = serde_json::to_string(&message) {
802 payloads.push(payload);
803 }
804 }
805
806 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
807 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
808 tracing::debug!(error = %e, "Failed to send unsubscribe command");
809 }
810
811 Ok(())
812 }
813
814 pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
820 let rx = self
821 .out_rx
822 .take()
823 .expect("Stream receiver already taken or client not connected");
824 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
825 async_stream::stream! {
826 while let Some(msg) = rx.recv().await {
827 yield msg;
828 }
829 }
830 }
831
832 #[must_use]
834 pub fn subscription_count(&self) -> usize {
835 self.subscriptions.len()
836 }
837
838 #[must_use]
840 pub fn credential(&self) -> Option<&Credential> {
841 self.credential.as_ref()
842 }
843
844 pub fn cache_instrument(&self, instrument: InstrumentAny) {
848 self.instruments_cache
849 .insert(instrument.symbol().inner(), instrument.clone());
850
851 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
854 let cmd = HandlerCommand::UpdateInstrument(instrument);
855 if let Err(e) = cmd_tx.send(cmd) {
856 tracing::debug!("Failed to send instrument update to handler: {e}");
857 }
858 }
859 }
860
861 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
865 self.instruments_cache.clear();
866 let mut count = 0;
867
868 tracing::debug!("Initializing Bybit instrument cache");
869
870 for inst in instruments {
871 let symbol = inst.symbol().inner();
872 self.instruments_cache.insert(symbol, inst.clone());
873 tracing::debug!("Cached instrument: {symbol}");
874 count += 1;
875 }
876
877 tracing::info!("Bybit instrument cache initialized with {count} instruments");
878 }
879
880 pub fn set_account_id(&mut self, account_id: AccountId) {
882 self.account_id = Some(account_id);
883 }
884
885 pub fn set_mm_level(&self, mm_level: u8) {
887 self.mm_level.store(mm_level, Ordering::Relaxed);
888 }
889
890 #[must_use]
892 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
893 &self.instruments_cache
894 }
895
896 #[must_use]
898 pub fn account_id(&self) -> Option<AccountId> {
899 self.account_id
900 }
901
902 #[must_use]
904 pub fn product_type(&self) -> Option<BybitProductType> {
905 self.product_type
906 }
907
908 pub async fn subscribe_orderbook(
918 &self,
919 instrument_id: InstrumentId,
920 depth: u32,
921 ) -> BybitWsResult<()> {
922 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
923 let topic = format!(
924 "{}.{depth}.{raw_symbol}",
925 BybitWsPublicChannel::OrderBook.as_ref()
926 );
927 self.subscribe(vec![topic]).await
928 }
929
930 pub async fn unsubscribe_orderbook(
932 &self,
933 instrument_id: InstrumentId,
934 depth: u32,
935 ) -> BybitWsResult<()> {
936 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
937 let topic = format!(
938 "{}.{depth}.{raw_symbol}",
939 BybitWsPublicChannel::OrderBook.as_ref()
940 );
941 self.unsubscribe(vec![topic]).await
942 }
943
944 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
954 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
955 let topic = format!(
956 "{}.{raw_symbol}",
957 BybitWsPublicChannel::PublicTrade.as_ref()
958 );
959 self.subscribe(vec![topic]).await
960 }
961
962 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
964 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
965 let topic = format!(
966 "{}.{raw_symbol}",
967 BybitWsPublicChannel::PublicTrade.as_ref()
968 );
969 self.unsubscribe(vec![topic]).await
970 }
971
972 pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
982 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
983 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
984 self.subscribe(vec![topic]).await
985 }
986
987 pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
989 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
990 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
991
992 let symbol = self.product_type.map_or_else(
994 || instrument_id.symbol.inner(),
995 |pt| make_bybit_symbol(raw_symbol, pt),
996 );
997 self.funding_cache.write().await.remove(&symbol);
998
999 self.unsubscribe(vec![topic]).await
1000 }
1001
1002 pub async fn subscribe_klines(
1012 &self,
1013 instrument_id: InstrumentId,
1014 interval: impl Into<String>,
1015 ) -> BybitWsResult<()> {
1016 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1017 let topic = format!(
1018 "{}.{}.{raw_symbol}",
1019 BybitWsPublicChannel::Kline.as_ref(),
1020 interval.into()
1021 );
1022 self.subscribe(vec![topic]).await
1023 }
1024
1025 pub async fn unsubscribe_klines(
1027 &self,
1028 instrument_id: InstrumentId,
1029 interval: impl Into<String>,
1030 ) -> BybitWsResult<()> {
1031 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1032 let topic = format!(
1033 "{}.{}.{raw_symbol}",
1034 BybitWsPublicChannel::Kline.as_ref(),
1035 interval.into()
1036 );
1037 self.unsubscribe(vec![topic]).await
1038 }
1039
1040 pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1050 if !self.requires_auth {
1051 return Err(BybitWsError::Authentication(
1052 "Order subscription requires authentication".to_string(),
1053 ));
1054 }
1055 self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1056 .await
1057 }
1058
1059 pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1061 self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1062 .await
1063 }
1064
1065 pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1075 if !self.requires_auth {
1076 return Err(BybitWsError::Authentication(
1077 "Execution subscription requires authentication".to_string(),
1078 ));
1079 }
1080 self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1081 .await
1082 }
1083
1084 pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1086 self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1087 .await
1088 }
1089
1090 pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1100 if !self.requires_auth {
1101 return Err(BybitWsError::Authentication(
1102 "Position subscription requires authentication".to_string(),
1103 ));
1104 }
1105 self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1106 .await
1107 }
1108
1109 pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1111 self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1112 .await
1113 }
1114
1115 pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1125 if !self.requires_auth {
1126 return Err(BybitWsError::Authentication(
1127 "Wallet subscription requires authentication".to_string(),
1128 ));
1129 }
1130 self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1131 .await
1132 }
1133
1134 pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1136 self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1137 .await
1138 }
1139
1140 pub async fn place_order(
1150 &self,
1151 params: BybitWsPlaceOrderParams,
1152 client_order_id: ClientOrderId,
1153 trader_id: TraderId,
1154 strategy_id: StrategyId,
1155 instrument_id: InstrumentId,
1156 ) -> BybitWsResult<()> {
1157 if !self.is_authenticated.load(Ordering::Relaxed) {
1158 return Err(BybitWsError::Authentication(
1159 "Must be authenticated to place orders".to_string(),
1160 ));
1161 }
1162
1163 let cmd = HandlerCommand::PlaceOrder {
1164 params,
1165 client_order_id,
1166 trader_id,
1167 strategy_id,
1168 instrument_id,
1169 };
1170
1171 self.send_cmd(cmd).await
1172 }
1173
1174 pub async fn amend_order(
1184 &self,
1185 params: BybitWsAmendOrderParams,
1186 client_order_id: ClientOrderId,
1187 trader_id: TraderId,
1188 strategy_id: StrategyId,
1189 instrument_id: InstrumentId,
1190 venue_order_id: Option<VenueOrderId>,
1191 ) -> BybitWsResult<()> {
1192 if !self.is_authenticated.load(Ordering::Relaxed) {
1193 return Err(BybitWsError::Authentication(
1194 "Must be authenticated to amend orders".to_string(),
1195 ));
1196 }
1197
1198 let cmd = HandlerCommand::AmendOrder {
1199 params,
1200 client_order_id,
1201 trader_id,
1202 strategy_id,
1203 instrument_id,
1204 venue_order_id,
1205 };
1206
1207 self.send_cmd(cmd).await
1208 }
1209
1210 pub async fn cancel_order(
1220 &self,
1221 params: BybitWsCancelOrderParams,
1222 client_order_id: ClientOrderId,
1223 trader_id: TraderId,
1224 strategy_id: StrategyId,
1225 instrument_id: InstrumentId,
1226 venue_order_id: Option<VenueOrderId>,
1227 ) -> BybitWsResult<()> {
1228 if !self.is_authenticated.load(Ordering::Relaxed) {
1229 return Err(BybitWsError::Authentication(
1230 "Must be authenticated to cancel orders".to_string(),
1231 ));
1232 }
1233
1234 let cmd = HandlerCommand::CancelOrder {
1235 params,
1236 client_order_id,
1237 trader_id,
1238 strategy_id,
1239 instrument_id,
1240 venue_order_id,
1241 };
1242
1243 self.send_cmd(cmd).await
1244 }
1245
1246 pub async fn batch_place_orders(
1256 &self,
1257 trader_id: TraderId,
1258 strategy_id: StrategyId,
1259 orders: Vec<BybitWsPlaceOrderParams>,
1260 ) -> BybitWsResult<()> {
1261 if !self.is_authenticated.load(Ordering::Relaxed) {
1262 return Err(BybitWsError::Authentication(
1263 "Must be authenticated to place orders".to_string(),
1264 ));
1265 }
1266
1267 if orders.is_empty() {
1268 tracing::warn!("Batch place orders called with empty orders list");
1269 return Ok(());
1270 }
1271
1272 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1273 self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1274 .await?;
1275 }
1276
1277 Ok(())
1278 }
1279
1280 async fn batch_place_orders_chunk(
1281 &self,
1282 trader_id: TraderId,
1283 strategy_id: StrategyId,
1284 orders: Vec<BybitWsPlaceOrderParams>,
1285 ) -> BybitWsResult<()> {
1286 let category = orders[0].category;
1287 let batch_req_id = UUID4::new().to_string();
1288
1289 let mut batch_order_data = Vec::new();
1291 for order in &orders {
1292 if let Some(order_link_id_str) = &order.order_link_id {
1293 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1294 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1295 let instrument_id = self
1296 .instruments_cache
1297 .get(&cache_key)
1298 .map(|inst| inst.id())
1299 .ok_or_else(|| {
1300 BybitWsError::ClientError(format!(
1301 "Instrument {cache_key} not found in cache"
1302 ))
1303 })?;
1304 batch_order_data.push((
1305 client_order_id,
1306 (client_order_id, trader_id, strategy_id, instrument_id),
1307 ));
1308 }
1309 }
1310
1311 if !batch_order_data.is_empty() {
1312 let cmd = HandlerCommand::RegisterBatchPlace {
1313 req_id: batch_req_id.clone(),
1314 orders: batch_order_data,
1315 };
1316 let cmd_tx = self.cmd_tx.read().await;
1317 if let Err(e) = cmd_tx.send(cmd) {
1318 tracing::error!("Failed to send RegisterBatchPlace command: {e}");
1319 }
1320 }
1321
1322 let mm_level = self.mm_level.load(Ordering::Relaxed);
1323 let has_non_post_only = orders
1324 .iter()
1325 .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1326 let referer = if has_non_post_only || mm_level == 0 {
1327 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1328 } else {
1329 None
1330 };
1331
1332 let request_items: Vec<BybitWsBatchPlaceItem> = orders
1333 .into_iter()
1334 .map(|order| BybitWsBatchPlaceItem {
1335 symbol: order.symbol,
1336 side: order.side,
1337 order_type: order.order_type,
1338 qty: order.qty,
1339 is_leverage: order.is_leverage,
1340 market_unit: order.market_unit,
1341 price: order.price,
1342 time_in_force: order.time_in_force,
1343 order_link_id: order.order_link_id,
1344 reduce_only: order.reduce_only,
1345 close_on_trigger: order.close_on_trigger,
1346 trigger_price: order.trigger_price,
1347 trigger_by: order.trigger_by,
1348 trigger_direction: order.trigger_direction,
1349 tpsl_mode: order.tpsl_mode,
1350 take_profit: order.take_profit,
1351 stop_loss: order.stop_loss,
1352 tp_trigger_by: order.tp_trigger_by,
1353 sl_trigger_by: order.sl_trigger_by,
1354 sl_trigger_price: order.sl_trigger_price,
1355 tp_trigger_price: order.tp_trigger_price,
1356 sl_order_type: order.sl_order_type,
1357 tp_order_type: order.tp_order_type,
1358 sl_limit_price: order.sl_limit_price,
1359 tp_limit_price: order.tp_limit_price,
1360 })
1361 .collect();
1362
1363 let args = BybitWsBatchPlaceOrderArgs {
1364 category,
1365 request: request_items,
1366 };
1367
1368 let request = BybitWsRequest {
1369 req_id: Some(batch_req_id),
1370 op: BybitWsOrderRequestOp::CreateBatch,
1371 header: BybitWsHeader::with_referer(referer),
1372 args: vec![args],
1373 };
1374
1375 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1376
1377 self.send_text(&payload).await
1378 }
1379
1380 pub async fn batch_amend_orders(
1386 &self,
1387 #[allow(unused_variables)] trader_id: TraderId,
1388 #[allow(unused_variables)] strategy_id: StrategyId,
1389 orders: Vec<BybitWsAmendOrderParams>,
1390 ) -> BybitWsResult<()> {
1391 if !self.is_authenticated.load(Ordering::Relaxed) {
1392 return Err(BybitWsError::Authentication(
1393 "Must be authenticated to amend orders".to_string(),
1394 ));
1395 }
1396
1397 if orders.is_empty() {
1398 tracing::warn!("Batch amend orders called with empty orders list");
1399 return Ok(());
1400 }
1401
1402 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1403 self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1404 .await?;
1405 }
1406
1407 Ok(())
1408 }
1409
1410 async fn batch_amend_orders_chunk(
1411 &self,
1412 #[allow(unused_variables)] trader_id: TraderId,
1413 #[allow(unused_variables)] strategy_id: StrategyId,
1414 orders: Vec<BybitWsAmendOrderParams>,
1415 ) -> BybitWsResult<()> {
1416 let request = BybitWsRequest {
1417 req_id: None,
1418 op: BybitWsOrderRequestOp::AmendBatch,
1419 header: BybitWsHeader::now(),
1420 args: orders,
1421 };
1422
1423 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1424
1425 self.send_text(&payload).await
1426 }
1427
1428 pub async fn batch_cancel_orders(
1434 &self,
1435 trader_id: TraderId,
1436 strategy_id: StrategyId,
1437 orders: Vec<BybitWsCancelOrderParams>,
1438 ) -> BybitWsResult<()> {
1439 if !self.is_authenticated.load(Ordering::Relaxed) {
1440 return Err(BybitWsError::Authentication(
1441 "Must be authenticated to cancel orders".to_string(),
1442 ));
1443 }
1444
1445 if orders.is_empty() {
1446 tracing::warn!("Batch cancel orders called with empty orders list");
1447 return Ok(());
1448 }
1449
1450 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1451 self.batch_cancel_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1452 .await?;
1453 }
1454
1455 Ok(())
1456 }
1457
1458 async fn batch_cancel_orders_chunk(
1459 &self,
1460 trader_id: TraderId,
1461 strategy_id: StrategyId,
1462 orders: Vec<BybitWsCancelOrderParams>,
1463 ) -> BybitWsResult<()> {
1464 if orders.is_empty() {
1465 return Ok(());
1466 }
1467
1468 let category = orders[0].category;
1469 let batch_req_id = UUID4::new().to_string();
1470
1471 let mut validated_data = Vec::new();
1472
1473 for order in &orders {
1474 if let Some(order_link_id_str) = &order.order_link_id {
1475 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1476 let instrument_id = self
1477 .instruments_cache
1478 .get(&cache_key)
1479 .map(|inst| inst.id())
1480 .ok_or_else(|| {
1481 BybitWsError::ClientError(format!(
1482 "Instrument {cache_key} not found in cache"
1483 ))
1484 })?;
1485
1486 let venue_order_id = order
1487 .order_id
1488 .as_ref()
1489 .map(|id| VenueOrderId::from(id.as_str()));
1490
1491 validated_data.push((order_link_id_str.clone(), instrument_id, venue_order_id));
1492 }
1493 }
1494
1495 let batch_cancel_data: Vec<_> = validated_data
1496 .iter()
1497 .map(|(order_link_id_str, instrument_id, venue_order_id)| {
1498 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1499 (
1500 client_order_id,
1501 (
1502 client_order_id,
1503 trader_id,
1504 strategy_id,
1505 *instrument_id,
1506 *venue_order_id,
1507 ),
1508 )
1509 })
1510 .collect();
1511
1512 if !batch_cancel_data.is_empty() {
1513 let cmd = HandlerCommand::RegisterBatchCancel {
1514 req_id: batch_req_id.clone(),
1515 cancels: batch_cancel_data,
1516 };
1517 let cmd_tx = self.cmd_tx.read().await;
1518 if let Err(e) = cmd_tx.send(cmd) {
1519 tracing::error!("Failed to send RegisterBatchCancel command: {e}");
1520 }
1521 }
1522
1523 let request_items: Vec<BybitWsBatchCancelItem> = orders
1524 .into_iter()
1525 .map(|order| BybitWsBatchCancelItem {
1526 symbol: order.symbol,
1527 order_id: order.order_id,
1528 order_link_id: order.order_link_id,
1529 })
1530 .collect();
1531
1532 let args = BybitWsBatchCancelOrderArgs {
1533 category,
1534 request: request_items,
1535 };
1536
1537 let request = BybitWsRequest {
1538 req_id: Some(batch_req_id),
1539 op: BybitWsOrderRequestOp::CancelBatch,
1540 header: BybitWsHeader::now(),
1541 args: vec![args],
1542 };
1543
1544 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1545
1546 self.send_text(&payload).await
1547 }
1548
1549 #[allow(clippy::too_many_arguments)]
1555 pub async fn submit_order(
1556 &self,
1557 product_type: BybitProductType,
1558 trader_id: TraderId,
1559 strategy_id: StrategyId,
1560 instrument_id: InstrumentId,
1561 client_order_id: ClientOrderId,
1562 order_side: OrderSide,
1563 order_type: OrderType,
1564 quantity: Quantity,
1565 is_quote_quantity: bool,
1566 time_in_force: Option<TimeInForce>,
1567 price: Option<Price>,
1568 trigger_price: Option<Price>,
1569 post_only: Option<bool>,
1570 reduce_only: Option<bool>,
1571 is_leverage: bool,
1572 ) -> BybitWsResult<()> {
1573 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1574 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1575 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1576
1577 let bybit_side = match order_side {
1578 OrderSide::Buy => BybitOrderSide::Buy,
1579 OrderSide::Sell => BybitOrderSide::Sell,
1580 _ => {
1581 return Err(BybitWsError::ClientError(format!(
1582 "Invalid order side: {order_side:?}"
1583 )));
1584 }
1585 };
1586
1587 let (bybit_order_type, is_stop_order) = match order_type {
1589 OrderType::Market => (BybitOrderType::Market, false),
1590 OrderType::Limit => (BybitOrderType::Limit, false),
1591 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1592 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1593 _ => {
1594 return Err(BybitWsError::ClientError(format!(
1595 "Unsupported order type: {order_type:?}"
1596 )));
1597 }
1598 };
1599
1600 let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1601 None
1602 } else if post_only == Some(true) {
1603 Some(BybitTimeInForce::PostOnly)
1604 } else if let Some(tif) = time_in_force {
1605 Some(match tif {
1606 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1607 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1608 TimeInForce::Fok => BybitTimeInForce::Fok,
1609 _ => {
1610 return Err(BybitWsError::ClientError(format!(
1611 "Unsupported time in force: {tif:?}"
1612 )));
1613 }
1614 })
1615 } else {
1616 None
1617 };
1618
1619 let market_unit = if product_type == BybitProductType::Spot
1622 && bybit_order_type == BybitOrderType::Market
1623 {
1624 if is_quote_quantity {
1625 Some(BYBIT_QUOTE_COIN.to_string())
1626 } else {
1627 Some(BYBIT_BASE_COIN.to_string())
1628 }
1629 } else {
1630 None
1631 };
1632
1633 let is_leverage_value = if product_type == BybitProductType::Spot {
1635 Some(i32::from(is_leverage))
1636 } else {
1637 None
1638 };
1639
1640 let trigger_direction = if is_stop_order {
1643 match (order_type, order_side) {
1644 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1645 Some(BybitTriggerDirection::RisesTo as i32)
1646 }
1647 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1648 Some(BybitTriggerDirection::FallsTo as i32)
1649 }
1650 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1651 Some(BybitTriggerDirection::FallsTo as i32)
1652 }
1653 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1654 Some(BybitTriggerDirection::RisesTo as i32)
1655 }
1656 _ => None,
1657 }
1658 } else {
1659 None
1660 };
1661
1662 let params = if is_stop_order {
1663 BybitWsPlaceOrderParams {
1666 category: product_type,
1667 symbol: raw_symbol,
1668 side: bybit_side,
1669 order_type: bybit_order_type,
1670 qty: quantity.to_string(),
1671 is_leverage: is_leverage_value,
1672 market_unit: market_unit.clone(),
1673 price: price.map(|p| p.to_string()),
1674 time_in_force: bybit_tif,
1675 order_link_id: Some(client_order_id.to_string()),
1676 reduce_only: reduce_only.filter(|&r| r),
1677 close_on_trigger: None,
1678 trigger_price: trigger_price.map(|p| p.to_string()),
1679 trigger_by: Some(BybitTriggerType::LastPrice),
1680 trigger_direction,
1681 tpsl_mode: None, take_profit: None,
1683 stop_loss: None,
1684 tp_trigger_by: None,
1685 sl_trigger_by: None,
1686 sl_trigger_price: None, tp_trigger_price: None, sl_order_type: None,
1689 tp_order_type: None,
1690 sl_limit_price: None,
1691 tp_limit_price: None,
1692 }
1693 } else {
1694 BybitWsPlaceOrderParams {
1696 category: product_type,
1697 symbol: raw_symbol,
1698 side: bybit_side,
1699 order_type: bybit_order_type,
1700 qty: quantity.to_string(),
1701 is_leverage: is_leverage_value,
1702 market_unit,
1703 price: price.map(|p| p.to_string()),
1704 time_in_force: if bybit_order_type == BybitOrderType::Market {
1705 None
1706 } else {
1707 bybit_tif
1708 },
1709 order_link_id: Some(client_order_id.to_string()),
1710 reduce_only: reduce_only.filter(|&r| r),
1711 close_on_trigger: None,
1712 trigger_price: None,
1713 trigger_by: None,
1714 trigger_direction: None,
1715 tpsl_mode: None,
1716 take_profit: None,
1717 stop_loss: None,
1718 tp_trigger_by: None,
1719 sl_trigger_by: None,
1720 sl_trigger_price: None,
1721 tp_trigger_price: None,
1722 sl_order_type: None,
1723 tp_order_type: None,
1724 sl_limit_price: None,
1725 tp_limit_price: None,
1726 }
1727 };
1728
1729 self.place_order(
1730 params,
1731 client_order_id,
1732 trader_id,
1733 strategy_id,
1734 instrument_id,
1735 )
1736 .await
1737 }
1738
1739 #[allow(clippy::too_many_arguments)]
1745 pub async fn modify_order(
1746 &self,
1747 product_type: BybitProductType,
1748 trader_id: TraderId,
1749 strategy_id: StrategyId,
1750 instrument_id: InstrumentId,
1751 client_order_id: ClientOrderId,
1752 venue_order_id: Option<VenueOrderId>,
1753 quantity: Option<Quantity>,
1754 price: Option<Price>,
1755 ) -> BybitWsResult<()> {
1756 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1757 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1758 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1759
1760 let params = BybitWsAmendOrderParams {
1761 category: product_type,
1762 symbol: raw_symbol,
1763 order_id: venue_order_id.map(|id| id.to_string()),
1764 order_link_id: Some(client_order_id.to_string()),
1765 qty: quantity.map(|q| q.to_string()),
1766 price: price.map(|p| p.to_string()),
1767 trigger_price: None,
1768 take_profit: None,
1769 stop_loss: None,
1770 tp_trigger_by: None,
1771 sl_trigger_by: None,
1772 };
1773
1774 self.amend_order(
1775 params,
1776 client_order_id,
1777 trader_id,
1778 strategy_id,
1779 instrument_id,
1780 venue_order_id,
1781 )
1782 .await
1783 }
1784
1785 pub async fn cancel_order_by_id(
1791 &self,
1792 product_type: BybitProductType,
1793 trader_id: TraderId,
1794 strategy_id: StrategyId,
1795 instrument_id: InstrumentId,
1796 client_order_id: ClientOrderId,
1797 venue_order_id: Option<VenueOrderId>,
1798 ) -> BybitWsResult<()> {
1799 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1800 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1801 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1802
1803 let params = BybitWsCancelOrderParams {
1804 category: product_type,
1805 symbol: raw_symbol,
1806 order_id: venue_order_id.map(|id| id.to_string()),
1807 order_link_id: Some(client_order_id.to_string()),
1808 };
1809
1810 self.cancel_order(
1811 params,
1812 client_order_id,
1813 trader_id,
1814 strategy_id,
1815 instrument_id,
1816 venue_order_id,
1817 )
1818 .await
1819 }
1820
1821 #[allow(clippy::too_many_arguments)]
1823 pub fn build_place_order_params(
1824 &self,
1825 product_type: BybitProductType,
1826 instrument_id: InstrumentId,
1827 client_order_id: ClientOrderId,
1828 order_side: OrderSide,
1829 order_type: OrderType,
1830 quantity: Quantity,
1831 is_quote_quantity: bool,
1832 time_in_force: Option<TimeInForce>,
1833 price: Option<Price>,
1834 trigger_price: Option<Price>,
1835 post_only: Option<bool>,
1836 reduce_only: Option<bool>,
1837 is_leverage: bool,
1838 ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1839 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1840 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1841 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1842
1843 let bybit_side = match order_side {
1844 OrderSide::Buy => BybitOrderSide::Buy,
1845 OrderSide::Sell => BybitOrderSide::Sell,
1846 _ => {
1847 return Err(BybitWsError::ClientError(format!(
1848 "Invalid order side: {order_side:?}"
1849 )));
1850 }
1851 };
1852
1853 let (bybit_order_type, is_stop_order) = match order_type {
1854 OrderType::Market => (BybitOrderType::Market, false),
1855 OrderType::Limit => (BybitOrderType::Limit, false),
1856 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1857 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1858 _ => {
1859 return Err(BybitWsError::ClientError(format!(
1860 "Unsupported order type: {order_type:?}"
1861 )));
1862 }
1863 };
1864
1865 let bybit_tif = if post_only == Some(true) {
1866 Some(BybitTimeInForce::PostOnly)
1867 } else if let Some(tif) = time_in_force {
1868 Some(match tif {
1869 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1870 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1871 TimeInForce::Fok => BybitTimeInForce::Fok,
1872 _ => {
1873 return Err(BybitWsError::ClientError(format!(
1874 "Unsupported time in force: {tif:?}"
1875 )));
1876 }
1877 })
1878 } else {
1879 None
1880 };
1881
1882 let market_unit = if product_type == BybitProductType::Spot
1883 && bybit_order_type == BybitOrderType::Market
1884 {
1885 if is_quote_quantity {
1886 Some(BYBIT_QUOTE_COIN.to_string())
1887 } else {
1888 Some(BYBIT_BASE_COIN.to_string())
1889 }
1890 } else {
1891 None
1892 };
1893
1894 let is_leverage_value = if product_type == BybitProductType::Spot {
1896 Some(i32::from(is_leverage))
1897 } else {
1898 None
1899 };
1900
1901 let trigger_direction = if is_stop_order {
1904 match (order_type, order_side) {
1905 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1906 Some(BybitTriggerDirection::RisesTo as i32)
1907 }
1908 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1909 Some(BybitTriggerDirection::FallsTo as i32)
1910 }
1911 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1912 Some(BybitTriggerDirection::FallsTo as i32)
1913 }
1914 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1915 Some(BybitTriggerDirection::RisesTo as i32)
1916 }
1917 _ => None,
1918 }
1919 } else {
1920 None
1921 };
1922
1923 let params = if is_stop_order {
1924 BybitWsPlaceOrderParams {
1925 category: product_type,
1926 symbol: raw_symbol,
1927 side: bybit_side,
1928 order_type: bybit_order_type,
1929 qty: quantity.to_string(),
1930 is_leverage: is_leverage_value,
1931 market_unit,
1932 price: price.map(|p| p.to_string()),
1933 time_in_force: if bybit_order_type == BybitOrderType::Market {
1934 None
1935 } else {
1936 bybit_tif
1937 },
1938 order_link_id: Some(client_order_id.to_string()),
1939 reduce_only: reduce_only.filter(|&r| r),
1940 close_on_trigger: None,
1941 trigger_price: trigger_price.map(|p| p.to_string()),
1942 trigger_by: Some(BybitTriggerType::LastPrice),
1943 trigger_direction,
1944 tpsl_mode: None,
1945 take_profit: None,
1946 stop_loss: None,
1947 tp_trigger_by: None,
1948 sl_trigger_by: None,
1949 sl_trigger_price: None,
1950 tp_trigger_price: None,
1951 sl_order_type: None,
1952 tp_order_type: None,
1953 sl_limit_price: None,
1954 tp_limit_price: None,
1955 }
1956 } else {
1957 BybitWsPlaceOrderParams {
1958 category: product_type,
1959 symbol: raw_symbol,
1960 side: bybit_side,
1961 order_type: bybit_order_type,
1962 qty: quantity.to_string(),
1963 is_leverage: is_leverage_value,
1964 market_unit,
1965 price: price.map(|p| p.to_string()),
1966 time_in_force: if bybit_order_type == BybitOrderType::Market {
1967 None
1968 } else {
1969 bybit_tif
1970 },
1971 order_link_id: Some(client_order_id.to_string()),
1972 reduce_only: reduce_only.filter(|&r| r),
1973 close_on_trigger: None,
1974 trigger_price: None,
1975 trigger_by: None,
1976 trigger_direction: None,
1977 tpsl_mode: None,
1978 take_profit: None,
1979 stop_loss: None,
1980 tp_trigger_by: None,
1981 sl_trigger_by: None,
1982 sl_trigger_price: None,
1983 tp_trigger_price: None,
1984 sl_order_type: None,
1985 tp_order_type: None,
1986 sl_limit_price: None,
1987 tp_limit_price: None,
1988 }
1989 };
1990
1991 Ok(params)
1992 }
1993
1994 #[allow(clippy::too_many_arguments)]
1996 pub fn build_amend_order_params(
1997 &self,
1998 product_type: BybitProductType,
1999 instrument_id: InstrumentId,
2000 venue_order_id: Option<VenueOrderId>,
2001 client_order_id: Option<ClientOrderId>,
2002 quantity: Option<Quantity>,
2003 price: Option<Price>,
2004 ) -> BybitWsResult<BybitWsAmendOrderParams> {
2005 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2006 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2007 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2008
2009 Ok(BybitWsAmendOrderParams {
2010 category: product_type,
2011 symbol: raw_symbol,
2012 order_id: venue_order_id.map(|v| v.to_string()),
2013 order_link_id: client_order_id.map(|c| c.to_string()),
2014 qty: quantity.map(|q| q.to_string()),
2015 price: price.map(|p| p.to_string()),
2016 trigger_price: None,
2017 take_profit: None,
2018 stop_loss: None,
2019 tp_trigger_by: None,
2020 sl_trigger_by: None,
2021 })
2022 }
2023
2024 pub fn build_cancel_order_params(
2031 &self,
2032 product_type: BybitProductType,
2033 instrument_id: InstrumentId,
2034 venue_order_id: Option<VenueOrderId>,
2035 client_order_id: Option<ClientOrderId>,
2036 ) -> BybitWsResult<BybitWsCancelOrderParams> {
2037 if venue_order_id.is_none() && client_order_id.is_none() {
2038 return Err(BybitWsError::ClientError(
2039 "Either venue_order_id or client_order_id must be provided".to_string(),
2040 ));
2041 }
2042
2043 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2044 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2045 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2046
2047 Ok(BybitWsCancelOrderParams {
2048 category: product_type,
2049 symbol: raw_symbol,
2050 order_id: venue_order_id.map(|v| v.to_string()),
2051 order_link_id: client_order_id.map(|c| c.to_string()),
2052 })
2053 }
2054
2055 fn default_headers() -> Vec<(String, String)> {
2056 vec![
2057 ("Content-Type".to_string(), "application/json".to_string()),
2058 ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2059 ]
2060 }
2061
2062 async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2063 if !self.requires_auth {
2064 return Ok(());
2065 }
2066
2067 let credential = self.credential.as_ref().ok_or_else(|| {
2068 BybitWsError::Authentication("Credentials required for authentication".to_string())
2069 })?;
2070
2071 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2072 let signature = credential.sign_websocket_auth(expires);
2073
2074 let auth_message = BybitAuthRequest {
2075 op: BybitWsOperation::Auth,
2076 args: vec![
2077 Value::String(credential.api_key().to_string()),
2078 Value::Number(expires.into()),
2079 Value::String(signature),
2080 ],
2081 };
2082
2083 let payload = serde_json::to_string(&auth_message)?;
2084
2085 self.cmd_tx
2086 .read()
2087 .await
2088 .send(HandlerCommand::Authenticate { payload })
2089 .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2090
2091 Ok(())
2094 }
2095
2096 async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2097 let cmd = HandlerCommand::SendText {
2098 payload: text.to_string(),
2099 };
2100
2101 self.send_cmd(cmd).await
2102 }
2103
2104 async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2105 self.cmd_tx
2106 .read()
2107 .await
2108 .send(cmd)
2109 .map_err(|e| BybitWsError::Send(e.to_string()))
2110 }
2111}
2112
2113#[cfg(test)]
2114mod tests {
2115 use rstest::rstest;
2116
2117 use super::*;
2118 use crate::{
2119 common::testing::load_test_json,
2120 websocket::{classify_bybit_message, messages::BybitWsMessage},
2121 };
2122
2123 #[rstest]
2124 fn classify_orderbook_snapshot() {
2125 let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2126 .expect("invalid fixture");
2127 let message = classify_bybit_message(json);
2128 assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2129 }
2130
2131 #[rstest]
2132 fn classify_trade_snapshot() {
2133 let json: Value =
2134 serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2135 let message = classify_bybit_message(json);
2136 assert!(matches!(message, BybitWsMessage::Trade(_)));
2137 }
2138
2139 #[rstest]
2140 fn classify_ticker_linear_snapshot() {
2141 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2142 .expect("invalid fixture");
2143 let message = classify_bybit_message(json);
2144 assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2145 }
2146
2147 #[rstest]
2148 fn classify_ticker_option_snapshot() {
2149 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2150 .expect("invalid fixture");
2151 let message = classify_bybit_message(json);
2152 assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2153 }
2154
2155 #[rstest]
2156 fn test_race_unsubscribe_failure_recovery() {
2157 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2165
2166 subscriptions.mark_subscribe(topic);
2168 subscriptions.confirm_subscribe(topic);
2169 assert_eq!(subscriptions.len(), 1);
2170
2171 subscriptions.mark_unsubscribe(topic);
2173 assert_eq!(subscriptions.len(), 0);
2174 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2175
2176 subscriptions.confirm_unsubscribe(topic); subscriptions.mark_subscribe(topic); subscriptions.confirm_subscribe(topic); assert_eq!(subscriptions.len(), 1);
2184 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2185 assert!(subscriptions.pending_subscribe_topics().is_empty());
2186
2187 let all = subscriptions.all_topics();
2189 assert_eq!(all.len(), 1);
2190 assert!(all.contains(&topic.to_string()));
2191 }
2192
2193 #[rstest]
2194 fn test_race_resubscribe_before_unsubscribe_ack() {
2195 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "orderbook.50.BTCUSDT";
2201
2202 subscriptions.mark_subscribe(topic);
2204 subscriptions.confirm_subscribe(topic);
2205 assert_eq!(subscriptions.len(), 1);
2206
2207 subscriptions.mark_unsubscribe(topic);
2209 assert_eq!(subscriptions.len(), 0);
2210 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2211
2212 subscriptions.mark_subscribe(topic);
2214 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2215
2216 subscriptions.confirm_unsubscribe(topic);
2218 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2219 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2220
2221 subscriptions.confirm_subscribe(topic);
2223 assert_eq!(subscriptions.len(), 1);
2224 assert!(subscriptions.pending_subscribe_topics().is_empty());
2225
2226 let all = subscriptions.all_topics();
2228 assert_eq!(all.len(), 1);
2229 assert!(all.contains(&topic.to_string()));
2230 }
2231
2232 #[rstest]
2233 fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2234 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "tickers.ETHUSDT";
2239
2240 subscriptions.mark_subscribe(topic);
2242 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2243
2244 subscriptions.mark_unsubscribe(topic);
2246 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2248
2249 subscriptions.confirm_subscribe(topic);
2251 assert_eq!(subscriptions.len(), 0); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2253
2254 subscriptions.confirm_unsubscribe(topic);
2256
2257 assert!(subscriptions.is_empty());
2259 assert!(subscriptions.all_topics().is_empty());
2260 }
2261
2262 #[rstest]
2263 fn test_race_reconnection_with_pending_states() {
2264 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let trade_btc = "publicTrade.BTCUSDT";
2270 subscriptions.mark_subscribe(trade_btc);
2271 subscriptions.confirm_subscribe(trade_btc);
2272
2273 let trade_eth = "publicTrade.ETHUSDT";
2275 subscriptions.mark_subscribe(trade_eth);
2276
2277 let book_btc = "orderbook.50.BTCUSDT";
2279 subscriptions.mark_subscribe(book_btc);
2280 subscriptions.confirm_subscribe(book_btc);
2281 subscriptions.mark_unsubscribe(book_btc);
2282
2283 let topics_to_restore = subscriptions.all_topics();
2285
2286 assert_eq!(topics_to_restore.len(), 2);
2288 assert!(topics_to_restore.contains(&trade_btc.to_string()));
2289 assert!(topics_to_restore.contains(&trade_eth.to_string()));
2290 assert!(!topics_to_restore.contains(&book_btc.to_string())); }
2292
2293 #[rstest]
2294 fn test_race_duplicate_subscribe_messages_idempotent() {
2295 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2300
2301 subscriptions.mark_subscribe(topic);
2303 subscriptions.confirm_subscribe(topic);
2304 assert_eq!(subscriptions.len(), 1);
2305
2306 subscriptions.mark_subscribe(topic);
2308 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.len(), 1); subscriptions.confirm_subscribe(topic);
2313 assert_eq!(subscriptions.len(), 1);
2314
2315 let all = subscriptions.all_topics();
2317 assert_eq!(all.len(), 1);
2318 assert_eq!(all[0], topic);
2319 }
2320
2321 #[rstest]
2322 #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2323 #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2324 #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2325 #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2326 #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2327 #[case::option_with_leverage(BybitProductType::Option, true, None)]
2328 fn test_is_leverage_parameter(
2329 #[case] product_type: BybitProductType,
2330 #[case] is_leverage: bool,
2331 #[case] expected: Option<i32>,
2332 ) {
2333 let symbol = match product_type {
2334 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2335 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2336 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2337 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2338 };
2339
2340 let instrument_id = InstrumentId::from(symbol);
2341 let client_order_id = ClientOrderId::from("test-order-1");
2342 let quantity = Quantity::from("1.0");
2343
2344 let client = BybitWebSocketClient::new_trade(
2345 BybitEnvironment::Testnet,
2346 Some("test-key".to_string()),
2347 Some("test-secret".to_string()),
2348 None,
2349 Some(20),
2350 );
2351
2352 let params = client
2353 .build_place_order_params(
2354 product_type,
2355 instrument_id,
2356 client_order_id,
2357 OrderSide::Buy,
2358 OrderType::Limit,
2359 quantity,
2360 false, Some(TimeInForce::Gtc),
2362 Some(Price::from("50000.0")),
2363 None,
2364 None,
2365 None,
2366 is_leverage,
2367 )
2368 .expect("Failed to build params");
2369
2370 assert_eq!(params.is_leverage, expected);
2371 }
2372
2373 #[rstest]
2374 #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2375 #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2376 #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2377 #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2378 #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2379 #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2380 fn test_is_quote_quantity_parameter(
2381 #[case] product_type: BybitProductType,
2382 #[case] order_type: OrderType,
2383 #[case] is_quote_quantity: bool,
2384 #[case] expected: Option<String>,
2385 ) {
2386 let symbol = match product_type {
2387 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2388 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2389 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2390 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2391 };
2392
2393 let instrument_id = InstrumentId::from(symbol);
2394 let client_order_id = ClientOrderId::from("test-order-1");
2395 let quantity = Quantity::from("1.0");
2396
2397 let client = BybitWebSocketClient::new_trade(
2398 BybitEnvironment::Testnet,
2399 Some("test-key".to_string()),
2400 Some("test-secret".to_string()),
2401 None,
2402 Some(20),
2403 );
2404
2405 let params = client
2406 .build_place_order_params(
2407 product_type,
2408 instrument_id,
2409 client_order_id,
2410 OrderSide::Buy,
2411 order_type,
2412 quantity,
2413 is_quote_quantity,
2414 Some(TimeInForce::Gtc),
2415 if order_type == OrderType::Market {
2416 None
2417 } else {
2418 Some(Price::from("50000.0"))
2419 },
2420 None,
2421 None,
2422 None,
2423 false,
2424 )
2425 .expect("Failed to build params");
2426
2427 assert_eq!(params.market_unit, expected);
2428 }
2429}