1use std::{
21 fmt::Debug,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU8, Ordering},
25 },
26 time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::live::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt};
34use nautilus_model::{
35 enums::{OrderSide, OrderType, TimeInForce},
36 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 types::{Price, Quantity},
39};
40use nautilus_network::{
41 backoff::ExponentialBackoff,
42 mode::ConnectionMode,
43 websocket::{
44 AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
45 channel_message_handler,
46 },
47};
48use serde_json::Value;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use crate::{
53 common::{
54 consts::{
55 BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
56 },
57 credential::Credential,
58 enums::{
59 BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
60 BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
61 },
62 parse::{extract_raw_symbol, make_bybit_symbol},
63 symbol::BybitSymbol,
64 urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
65 },
66 websocket::{
67 enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
68 error::{BybitWsError, BybitWsResult},
69 handler::{FeedHandler, HandlerCommand},
70 messages::{
71 BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
72 BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
73 BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
74 NautilusWsMessage,
75 },
76 },
77};
78
79const DEFAULT_HEARTBEAT_SECS: u64 = 20;
80const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
81const BATCH_PROCESSING_LIMIT: usize = 20;
82
83type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
85
86fn resolve_credential(
93 environment: BybitEnvironment,
94 api_key: Option<String>,
95 api_secret: Option<String>,
96) -> Option<Credential> {
97 let (api_key_env, api_secret_env) = match environment {
98 BybitEnvironment::Demo => ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET"),
99 BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
100 BybitEnvironment::Mainnet => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
101 };
102
103 let key = get_or_env_var_opt(api_key, api_key_env);
104 let secret = get_or_env_var_opt(api_secret, api_secret_env);
105
106 match (key, secret) {
107 (Some(k), Some(s)) => Some(Credential::new(k, s)),
108 _ => None,
109 }
110}
111
112#[cfg_attr(feature = "python", pyo3::pyclass)]
114pub struct BybitWebSocketClient {
115 url: String,
116 environment: BybitEnvironment,
117 product_type: Option<BybitProductType>,
118 credential: Option<Credential>,
119 requires_auth: bool,
120 auth_tracker: AuthTracker,
121 heartbeat: Option<u64>,
122 connection_mode: Arc<ArcSwap<AtomicU8>>,
123 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
124 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
125 signal: Arc<AtomicBool>,
126 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
127 subscriptions: SubscriptionState,
128 is_authenticated: Arc<AtomicBool>,
129 account_id: Option<AccountId>,
130 mm_level: Arc<AtomicU8>,
131 bars_timestamp_on_close: bool,
132 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
133 funding_cache: FundingCache,
134 cancellation_token: CancellationToken,
135}
136
137impl Debug for BybitWebSocketClient {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct(stringify!(BybitWebSocketClient))
140 .field("url", &self.url)
141 .field("environment", &self.environment)
142 .field("product_type", &self.product_type)
143 .field("requires_auth", &self.requires_auth)
144 .field("heartbeat", &self.heartbeat)
145 .field("confirmed_subscriptions", &self.subscriptions.len())
146 .finish()
147 }
148}
149
150impl Clone for BybitWebSocketClient {
151 fn clone(&self) -> Self {
152 Self {
153 url: self.url.clone(),
154 environment: self.environment,
155 product_type: self.product_type,
156 credential: self.credential.clone(),
157 requires_auth: self.requires_auth,
158 auth_tracker: self.auth_tracker.clone(),
159 heartbeat: self.heartbeat,
160 connection_mode: Arc::clone(&self.connection_mode),
161 cmd_tx: Arc::clone(&self.cmd_tx),
162 out_rx: None, signal: Arc::clone(&self.signal),
164 task_handle: None, subscriptions: self.subscriptions.clone(),
166 is_authenticated: Arc::clone(&self.is_authenticated),
167 account_id: self.account_id,
168 mm_level: Arc::clone(&self.mm_level),
169 bars_timestamp_on_close: self.bars_timestamp_on_close,
170 instruments_cache: Arc::clone(&self.instruments_cache),
171 funding_cache: Arc::clone(&self.funding_cache),
172 cancellation_token: self.cancellation_token.clone(),
173 }
174 }
175}
176
177impl BybitWebSocketClient {
178 #[must_use]
180 pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
181 Self::new_public_with(
182 BybitProductType::Linear,
183 BybitEnvironment::Mainnet,
184 url,
185 heartbeat,
186 )
187 }
188
189 #[must_use]
191 pub fn new_public_with(
192 product_type: BybitProductType,
193 environment: BybitEnvironment,
194 url: Option<String>,
195 heartbeat: Option<u64>,
196 ) -> Self {
197 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
201
202 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
203 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
204
205 Self {
206 url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
207 environment,
208 product_type: Some(product_type),
209 credential: None,
210 requires_auth: false,
211 auth_tracker: AuthTracker::new(),
212 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
213 connection_mode,
214 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
215 out_rx: None,
216 signal: Arc::new(AtomicBool::new(false)),
217 task_handle: None,
218 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
219 is_authenticated: Arc::new(AtomicBool::new(false)),
220 instruments_cache: Arc::new(DashMap::new()),
221 account_id: None,
222 mm_level: Arc::new(AtomicU8::new(0)),
223 bars_timestamp_on_close: true,
224 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
225 cancellation_token: CancellationToken::new(),
226 }
227 }
228
229 #[must_use]
237 pub fn new_private(
238 environment: BybitEnvironment,
239 api_key: Option<String>,
240 api_secret: Option<String>,
241 url: Option<String>,
242 heartbeat: Option<u64>,
243 ) -> Self {
244 let credential = resolve_credential(environment, api_key, api_secret);
245
246 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
250
251 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
252 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
253
254 Self {
255 url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
256 environment,
257 product_type: None,
258 credential,
259 requires_auth: true,
260 auth_tracker: AuthTracker::new(),
261 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
262 connection_mode,
263 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
264 out_rx: None,
265 signal: Arc::new(AtomicBool::new(false)),
266 task_handle: None,
267 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
268 is_authenticated: Arc::new(AtomicBool::new(false)),
269 instruments_cache: Arc::new(DashMap::new()),
270 account_id: None,
271 mm_level: Arc::new(AtomicU8::new(0)),
272 bars_timestamp_on_close: true,
273 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
274 cancellation_token: CancellationToken::new(),
275 }
276 }
277
278 #[must_use]
286 pub fn new_trade(
287 environment: BybitEnvironment,
288 api_key: Option<String>,
289 api_secret: Option<String>,
290 url: Option<String>,
291 heartbeat: Option<u64>,
292 ) -> Self {
293 let credential = resolve_credential(environment, api_key, api_secret);
294
295 let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
299
300 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
301 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
302
303 Self {
304 url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
305 environment,
306 product_type: None,
307 credential,
308 requires_auth: true,
309 auth_tracker: AuthTracker::new(),
310 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
311 connection_mode,
312 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
313 out_rx: None,
314 signal: Arc::new(AtomicBool::new(false)),
315 task_handle: None,
316 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
317 is_authenticated: Arc::new(AtomicBool::new(false)),
318 instruments_cache: Arc::new(DashMap::new()),
319 account_id: None,
320 mm_level: Arc::new(AtomicU8::new(0)),
321 bars_timestamp_on_close: true,
322 funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
323 cancellation_token: CancellationToken::new(),
324 }
325 }
326
327 pub async fn connect(&mut self) -> BybitWsResult<()> {
334 const MAX_RETRIES: u32 = 5;
335 const CONNECTION_TIMEOUT_SECS: u64 = 10;
336
337 self.signal.store(false, Ordering::Relaxed);
338
339 let (raw_handler, raw_rx) = channel_message_handler();
340
341 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
344 });
346
347 let ping_msg = serde_json::to_string(&BybitSubscription {
348 op: BybitWsOperation::Ping,
349 args: vec![],
350 })?;
351
352 let config = WebSocketConfig {
353 url: self.url.clone(),
354 headers: Self::default_headers(),
355 heartbeat: self.heartbeat,
356 heartbeat_msg: Some(ping_msg),
357 reconnect_timeout_ms: Some(5_000),
358 reconnect_delay_initial_ms: Some(500),
359 reconnect_delay_max_ms: Some(5_000),
360 reconnect_backoff_factor: Some(1.5),
361 reconnect_jitter_ms: Some(250),
362 reconnect_max_attempts: None,
363 };
364
365 let mut backoff = ExponentialBackoff::new(
368 Duration::from_millis(500),
369 Duration::from_millis(5000),
370 2.0,
371 250,
372 false,
373 )
374 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
375
376 #[allow(unused_assignments)]
377 let mut last_error = String::new();
378 let mut attempt = 0;
379 let client = loop {
380 attempt += 1;
381
382 match tokio::time::timeout(
383 Duration::from_secs(CONNECTION_TIMEOUT_SECS),
384 WebSocketClient::connect(
385 config.clone(),
386 Some(raw_handler.clone()),
387 Some(ping_handler.clone()),
388 None,
389 vec![],
390 None,
391 ),
392 )
393 .await
394 {
395 Ok(Ok(client)) => {
396 if attempt > 1 {
397 log::info!("WebSocket connection established after {attempt} attempts");
398 }
399 break client;
400 }
401 Ok(Err(e)) => {
402 last_error = e.to_string();
403 log::warn!(
404 "WebSocket connection attempt failed: attempt={attempt}, max_retries={MAX_RETRIES}, url={}, error={last_error}",
405 self.url
406 );
407 }
408 Err(_) => {
409 last_error = format!(
410 "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
411 );
412 log::warn!(
413 "WebSocket connection attempt timed out: attempt={attempt}, max_retries={MAX_RETRIES}, url={}",
414 self.url
415 );
416 }
417 }
418
419 if attempt >= MAX_RETRIES {
420 return Err(BybitWsError::Transport(format!(
421 "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
422 If this is a DNS error, check your network configuration and DNS settings.",
423 self.url,
424 if last_error.is_empty() {
425 "unknown error"
426 } else {
427 &last_error
428 }
429 )));
430 }
431
432 let delay = backoff.next_duration();
433 log::debug!(
434 "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
435 attempt + 1
436 );
437 tokio::time::sleep(delay).await;
438 };
439
440 self.connection_mode.store(client.connection_mode_atomic());
441
442 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
443 self.out_rx = Some(Arc::new(out_rx));
444
445 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
446 *self.cmd_tx.write().await = cmd_tx.clone();
447
448 let cmd = HandlerCommand::SetClient(client);
449
450 self.send_cmd(cmd).await?;
451
452 if !self.instruments_cache.is_empty() {
454 let cached_instruments: Vec<InstrumentAny> = self
455 .instruments_cache
456 .iter()
457 .map(|entry| entry.value().clone())
458 .collect();
459 let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
460 self.send_cmd(cmd).await?;
461 }
462
463 let signal = Arc::clone(&self.signal);
464 let subscriptions = self.subscriptions.clone();
465 let credential = self.credential.clone();
466 let requires_auth = self.requires_auth;
467 let funding_cache = Arc::clone(&self.funding_cache);
468 let account_id = self.account_id;
469 let product_type = self.product_type;
470 let bars_timestamp_on_close = self.bars_timestamp_on_close;
471 let mm_level = Arc::clone(&self.mm_level);
472 let cmd_tx_for_reconnect = cmd_tx.clone();
473 let auth_tracker = self.auth_tracker.clone();
474 let is_authenticated = Arc::clone(&self.is_authenticated);
475
476 let stream_handle = get_runtime().spawn(async move {
477 let mut handler = FeedHandler::new(
478 signal.clone(),
479 cmd_rx,
480 raw_rx,
481 out_tx.clone(),
482 account_id,
483 product_type,
484 bars_timestamp_on_close,
485 mm_level.clone(),
486 auth_tracker,
487 subscriptions.clone(),
488 funding_cache.clone(),
489 );
490
491 let resubscribe_all = || async {
493 let topics = subscriptions.all_topics();
494
495 if topics.is_empty() {
496 return;
497 }
498
499 log::debug!(
500 "Resubscribing to confirmed subscriptions: count={}",
501 topics.len()
502 );
503
504 for topic in &topics {
505 subscriptions.mark_subscribe(topic.as_str());
506 }
507
508 let mut payloads = Vec::with_capacity(topics.len());
509 for topic in &topics {
510 let message = BybitSubscription {
511 op: BybitWsOperation::Subscribe,
512 args: vec![topic.clone()],
513 };
514 if let Ok(payload) = serde_json::to_string(&message) {
515 payloads.push(payload);
516 }
517 }
518
519 let cmd = HandlerCommand::Subscribe { topics: payloads };
520
521 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
522 log::error!("Failed to send resubscribe command: {e}");
523 }
524 };
525
526 loop {
528 match handler.next().await {
529 Some(NautilusWsMessage::Reconnected) => {
530 if signal.load(Ordering::Relaxed) {
531 continue;
532 }
533
534 log::info!("WebSocket reconnected");
535
536 let confirmed_topics: Vec<String> = {
538 let confirmed = subscriptions.confirmed();
539 let mut topics = Vec::new();
540 for entry in confirmed.iter() {
541 let (channel, symbols) = entry.pair();
542 for symbol in symbols {
543 if symbol.is_empty() {
544 topics.push(channel.to_string());
545 } else {
546 topics.push(format!("{channel}.{symbol}"));
547 }
548 }
549 }
550 topics
551 };
552
553 if !confirmed_topics.is_empty() {
554 log::debug!(
555 "Marking confirmed subscriptions as pending for replay: count={}",
556 confirmed_topics.len()
557 );
558 for topic in confirmed_topics {
559 subscriptions.mark_failure(&topic);
560 }
561 }
562
563 funding_cache.write().await.clear();
565
566 if requires_auth {
567 is_authenticated.store(false, Ordering::Relaxed);
568 log::debug!("Re-authenticating after reconnection");
569
570 if let Some(cred) = &credential {
571 let expires = chrono::Utc::now().timestamp_millis()
572 + WEBSOCKET_AUTH_WINDOW_MS;
573 let signature = cred.sign_websocket_auth(expires);
574
575 let auth_message = BybitAuthRequest {
576 op: BybitWsOperation::Auth,
577 args: vec![
578 Value::String(cred.api_key().to_string()),
579 Value::Number(expires.into()),
580 Value::String(signature),
581 ],
582 };
583
584 if let Ok(payload) = serde_json::to_string(&auth_message) {
585 let cmd = HandlerCommand::Authenticate { payload };
586 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
587 log::error!(
588 "Failed to send reconnection auth command: error={e}"
589 );
590 }
591 } else {
592 log::error!("Failed to serialize reconnection auth message");
593 }
594 }
595 }
596
597 if !requires_auth {
600 log::debug!("No authentication required, resubscribing immediately");
601 resubscribe_all().await;
602 }
603
604 if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
606 log::debug!("Receiver dropped, stopping");
607 break;
608 }
609 continue;
610 }
611 Some(NautilusWsMessage::Authenticated) => {
612 log::debug!("Authenticated, resubscribing");
613 is_authenticated.store(true, Ordering::Relaxed);
614 resubscribe_all().await;
615 continue;
616 }
617 Some(msg) => {
618 if out_tx.send(msg).is_err() {
619 log::error!("Failed to send message (receiver dropped)");
620 break;
621 }
622 }
623 None => {
624 if handler.is_stopped() {
626 log::debug!("Stop signal received, ending message processing");
627 break;
628 }
629 log::warn!("WebSocket stream ended unexpectedly");
631 break;
632 }
633 }
634 }
635
636 log::debug!("Handler task exiting");
637 });
638
639 self.task_handle = Some(Arc::new(stream_handle));
640
641 if requires_auth && let Err(e) = self.authenticate_if_required().await {
642 return Err(e);
643 }
644
645 Ok(())
646 }
647
648 pub async fn close(&mut self) -> BybitWsResult<()> {
650 log::debug!("Starting close process");
651
652 self.signal.store(true, Ordering::Relaxed);
653
654 let cmd = HandlerCommand::Disconnect;
655 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
656 log::debug!(
657 "Failed to send disconnect command (handler may already be shut down): {e}"
658 );
659 }
660
661 if let Some(task_handle) = self.task_handle.take() {
662 match Arc::try_unwrap(task_handle) {
663 Ok(handle) => {
664 log::debug!("Waiting for task handle to complete");
665 match tokio::time::timeout(Duration::from_secs(2), handle).await {
666 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
667 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
668 Err(_) => {
669 log::warn!(
670 "Timeout waiting for task handle, task may still be running"
671 );
672 }
674 }
675 }
676 Err(arc_handle) => {
677 log::debug!(
678 "Cannot take ownership of task handle - other references exist, aborting task"
679 );
680 arc_handle.abort();
681 }
682 }
683 } else {
684 log::debug!("No task handle to await");
685 }
686
687 self.is_authenticated.store(false, Ordering::Relaxed);
688
689 log::debug!("Closed");
690
691 Ok(())
692 }
693
694 #[must_use]
696 pub fn is_active(&self) -> bool {
697 let connection_mode_arc = self.connection_mode.load();
698 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
699 && !self.signal.load(Ordering::Relaxed)
700 }
701
702 pub fn is_closed(&self) -> bool {
704 let connection_mode_arc = self.connection_mode.load();
705 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
706 || self.signal.load(Ordering::Relaxed)
707 }
708
709 pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
715 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
716
717 tokio::time::timeout(timeout, async {
718 while !self.is_active() {
719 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
720 }
721 })
722 .await
723 .map_err(|_| {
724 BybitWsError::ClientError(format!(
725 "WebSocket connection timeout after {timeout_secs} seconds"
726 ))
727 })?;
728
729 Ok(())
730 }
731
732 pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
734 if topics.is_empty() {
735 return Ok(());
736 }
737
738 log::debug!("Subscribing to topics: {topics:?}");
739
740 let mut topics_to_send = Vec::new();
742
743 for topic in topics {
744 if self.subscriptions.add_reference(&topic) {
746 self.subscriptions.mark_subscribe(&topic);
747 topics_to_send.push(topic.clone());
748 } else {
749 log::debug!("Already subscribed to {topic}, skipping duplicate subscription");
750 }
751 }
752
753 if topics_to_send.is_empty() {
754 return Ok(());
755 }
756
757 let mut payloads = Vec::with_capacity(topics_to_send.len());
759 for topic in &topics_to_send {
760 let message = BybitSubscription {
761 op: BybitWsOperation::Subscribe,
762 args: vec![topic.clone()],
763 };
764 let payload = serde_json::to_string(&message).map_err(|e| {
765 BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
766 })?;
767 payloads.push(payload);
768 }
769
770 let cmd = HandlerCommand::Subscribe { topics: payloads };
771 self.cmd_tx
772 .read()
773 .await
774 .send(cmd)
775 .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
776
777 Ok(())
778 }
779
780 pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
782 if topics.is_empty() {
783 return Ok(());
784 }
785
786 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
787
788 if self.signal.load(Ordering::Relaxed) {
789 log::debug!("Shutdown signal detected, skipping unsubscribe");
790 return Ok(());
791 }
792
793 let mut topics_to_send = Vec::new();
795
796 for topic in topics {
797 if self.subscriptions.remove_reference(&topic) {
799 self.subscriptions.mark_unsubscribe(&topic);
800 topics_to_send.push(topic.clone());
801 } else {
802 log::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
803 }
804 }
805
806 if topics_to_send.is_empty() {
807 return Ok(());
808 }
809
810 let mut payloads = Vec::with_capacity(topics_to_send.len());
812 for topic in &topics_to_send {
813 let message = BybitSubscription {
814 op: BybitWsOperation::Unsubscribe,
815 args: vec![topic.clone()],
816 };
817 if let Ok(payload) = serde_json::to_string(&message) {
818 payloads.push(payload);
819 }
820 }
821
822 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
823 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
824 log::debug!("Failed to send unsubscribe command: error={e}");
825 }
826
827 Ok(())
828 }
829
830 pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
836 let rx = self
837 .out_rx
838 .take()
839 .expect("Stream receiver already taken or client not connected");
840 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
841 async_stream::stream! {
842 while let Some(msg) = rx.recv().await {
843 yield msg;
844 }
845 }
846 }
847
848 #[must_use]
850 pub fn subscription_count(&self) -> usize {
851 self.subscriptions.len()
852 }
853
854 #[must_use]
856 pub fn credential(&self) -> Option<&Credential> {
857 self.credential.as_ref()
858 }
859
860 pub fn cache_instrument(&self, instrument: InstrumentAny) {
864 self.instruments_cache
865 .insert(instrument.symbol().inner(), instrument.clone());
866
867 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
870 let cmd = HandlerCommand::UpdateInstrument(instrument);
871 if let Err(e) = cmd_tx.send(cmd) {
872 log::debug!("Failed to send instrument update to handler: {e}");
873 }
874 }
875 }
876
877 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
881 self.instruments_cache.clear();
882 let mut count = 0;
883
884 log::debug!("Initializing Bybit instrument cache");
885
886 for inst in instruments {
887 let symbol = inst.symbol().inner();
888 self.instruments_cache.insert(symbol, inst.clone());
889 log::debug!("Cached instrument: {symbol}");
890 count += 1;
891 }
892
893 log::info!("Bybit instrument cache initialized with {count} instruments");
894 }
895
896 pub fn set_account_id(&mut self, account_id: AccountId) {
898 self.account_id = Some(account_id);
899 }
900
901 pub fn set_mm_level(&self, mm_level: u8) {
903 self.mm_level.store(mm_level, Ordering::Relaxed);
904 }
905
906 pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
911 self.bars_timestamp_on_close = value;
912 }
913
914 #[must_use]
916 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
917 &self.instruments_cache
918 }
919
920 #[must_use]
922 pub fn account_id(&self) -> Option<AccountId> {
923 self.account_id
924 }
925
926 #[must_use]
928 pub fn product_type(&self) -> Option<BybitProductType> {
929 self.product_type
930 }
931
932 pub async fn subscribe_orderbook(
942 &self,
943 instrument_id: InstrumentId,
944 depth: u32,
945 ) -> BybitWsResult<()> {
946 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
947 let topic = format!(
948 "{}.{depth}.{raw_symbol}",
949 BybitWsPublicChannel::OrderBook.as_ref()
950 );
951 self.subscribe(vec![topic]).await
952 }
953
954 pub async fn unsubscribe_orderbook(
956 &self,
957 instrument_id: InstrumentId,
958 depth: u32,
959 ) -> BybitWsResult<()> {
960 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
961 let topic = format!(
962 "{}.{depth}.{raw_symbol}",
963 BybitWsPublicChannel::OrderBook.as_ref()
964 );
965 self.unsubscribe(vec![topic]).await
966 }
967
968 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
978 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
979 let topic = format!(
980 "{}.{raw_symbol}",
981 BybitWsPublicChannel::PublicTrade.as_ref()
982 );
983 self.subscribe(vec![topic]).await
984 }
985
986 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
988 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
989 let topic = format!(
990 "{}.{raw_symbol}",
991 BybitWsPublicChannel::PublicTrade.as_ref()
992 );
993 self.unsubscribe(vec![topic]).await
994 }
995
996 pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1006 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1007 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1008 self.subscribe(vec![topic]).await
1009 }
1010
1011 pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1013 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1014 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1015
1016 let symbol = self.product_type.map_or_else(
1018 || instrument_id.symbol.inner(),
1019 |pt| make_bybit_symbol(raw_symbol, pt),
1020 );
1021 self.funding_cache.write().await.remove(&symbol);
1022
1023 self.unsubscribe(vec![topic]).await
1024 }
1025
1026 pub async fn subscribe_klines(
1036 &self,
1037 instrument_id: InstrumentId,
1038 interval: impl Into<String>,
1039 ) -> BybitWsResult<()> {
1040 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1041 let topic = format!(
1042 "{}.{}.{raw_symbol}",
1043 BybitWsPublicChannel::Kline.as_ref(),
1044 interval.into()
1045 );
1046 self.subscribe(vec![topic]).await
1047 }
1048
1049 pub async fn unsubscribe_klines(
1051 &self,
1052 instrument_id: InstrumentId,
1053 interval: impl Into<String>,
1054 ) -> BybitWsResult<()> {
1055 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1056 let topic = format!(
1057 "{}.{}.{raw_symbol}",
1058 BybitWsPublicChannel::Kline.as_ref(),
1059 interval.into()
1060 );
1061 self.unsubscribe(vec![topic]).await
1062 }
1063
1064 pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1074 if !self.requires_auth {
1075 return Err(BybitWsError::Authentication(
1076 "Order subscription requires authentication".to_string(),
1077 ));
1078 }
1079 self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1080 .await
1081 }
1082
1083 pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1085 self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1086 .await
1087 }
1088
1089 pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1099 if !self.requires_auth {
1100 return Err(BybitWsError::Authentication(
1101 "Execution subscription requires authentication".to_string(),
1102 ));
1103 }
1104 self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1105 .await
1106 }
1107
1108 pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1110 self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1111 .await
1112 }
1113
1114 pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1124 if !self.requires_auth {
1125 return Err(BybitWsError::Authentication(
1126 "Position subscription requires authentication".to_string(),
1127 ));
1128 }
1129 self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1130 .await
1131 }
1132
1133 pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1135 self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1136 .await
1137 }
1138
1139 pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1149 if !self.requires_auth {
1150 return Err(BybitWsError::Authentication(
1151 "Wallet subscription requires authentication".to_string(),
1152 ));
1153 }
1154 self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1155 .await
1156 }
1157
1158 pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1160 self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1161 .await
1162 }
1163
1164 pub async fn place_order(
1174 &self,
1175 params: BybitWsPlaceOrderParams,
1176 client_order_id: ClientOrderId,
1177 trader_id: TraderId,
1178 strategy_id: StrategyId,
1179 instrument_id: InstrumentId,
1180 ) -> BybitWsResult<()> {
1181 if !self.is_authenticated.load(Ordering::Relaxed) {
1182 return Err(BybitWsError::Authentication(
1183 "Must be authenticated to place orders".to_string(),
1184 ));
1185 }
1186
1187 let cmd = HandlerCommand::PlaceOrder {
1188 params,
1189 client_order_id,
1190 trader_id,
1191 strategy_id,
1192 instrument_id,
1193 };
1194
1195 self.send_cmd(cmd).await
1196 }
1197
1198 pub async fn amend_order(
1208 &self,
1209 params: BybitWsAmendOrderParams,
1210 client_order_id: ClientOrderId,
1211 trader_id: TraderId,
1212 strategy_id: StrategyId,
1213 instrument_id: InstrumentId,
1214 venue_order_id: Option<VenueOrderId>,
1215 ) -> BybitWsResult<()> {
1216 if !self.is_authenticated.load(Ordering::Relaxed) {
1217 return Err(BybitWsError::Authentication(
1218 "Must be authenticated to amend orders".to_string(),
1219 ));
1220 }
1221
1222 let cmd = HandlerCommand::AmendOrder {
1223 params,
1224 client_order_id,
1225 trader_id,
1226 strategy_id,
1227 instrument_id,
1228 venue_order_id,
1229 };
1230
1231 self.send_cmd(cmd).await
1232 }
1233
1234 pub async fn cancel_order(
1244 &self,
1245 params: BybitWsCancelOrderParams,
1246 client_order_id: ClientOrderId,
1247 trader_id: TraderId,
1248 strategy_id: StrategyId,
1249 instrument_id: InstrumentId,
1250 venue_order_id: Option<VenueOrderId>,
1251 ) -> BybitWsResult<()> {
1252 if !self.is_authenticated.load(Ordering::Relaxed) {
1253 return Err(BybitWsError::Authentication(
1254 "Must be authenticated to cancel orders".to_string(),
1255 ));
1256 }
1257
1258 let cmd = HandlerCommand::CancelOrder {
1259 params,
1260 client_order_id,
1261 trader_id,
1262 strategy_id,
1263 instrument_id,
1264 venue_order_id,
1265 };
1266
1267 self.send_cmd(cmd).await
1268 }
1269
1270 pub async fn batch_place_orders(
1280 &self,
1281 trader_id: TraderId,
1282 strategy_id: StrategyId,
1283 orders: Vec<BybitWsPlaceOrderParams>,
1284 ) -> BybitWsResult<()> {
1285 if !self.is_authenticated.load(Ordering::Relaxed) {
1286 return Err(BybitWsError::Authentication(
1287 "Must be authenticated to place orders".to_string(),
1288 ));
1289 }
1290
1291 if orders.is_empty() {
1292 log::warn!("Batch place orders called with empty orders list");
1293 return Ok(());
1294 }
1295
1296 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1297 self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1298 .await?;
1299 }
1300
1301 Ok(())
1302 }
1303
1304 async fn batch_place_orders_chunk(
1305 &self,
1306 trader_id: TraderId,
1307 strategy_id: StrategyId,
1308 orders: Vec<BybitWsPlaceOrderParams>,
1309 ) -> BybitWsResult<()> {
1310 let category = orders[0].category;
1311 let batch_req_id = UUID4::new().to_string();
1312
1313 let mut batch_order_data = Vec::new();
1315 for order in &orders {
1316 if let Some(order_link_id_str) = &order.order_link_id {
1317 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1318 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1319 let instrument_id = self
1320 .instruments_cache
1321 .get(&cache_key)
1322 .map(|inst| inst.id())
1323 .ok_or_else(|| {
1324 BybitWsError::ClientError(format!(
1325 "Instrument {cache_key} not found in cache"
1326 ))
1327 })?;
1328 batch_order_data.push((
1329 client_order_id,
1330 (client_order_id, trader_id, strategy_id, instrument_id),
1331 ));
1332 }
1333 }
1334
1335 if !batch_order_data.is_empty() {
1336 let cmd = HandlerCommand::RegisterBatchPlace {
1337 req_id: batch_req_id.clone(),
1338 orders: batch_order_data,
1339 };
1340 let cmd_tx = self.cmd_tx.read().await;
1341 if let Err(e) = cmd_tx.send(cmd) {
1342 log::error!("Failed to send RegisterBatchPlace command: {e}");
1343 }
1344 }
1345
1346 let mm_level = self.mm_level.load(Ordering::Relaxed);
1347 let has_non_post_only = orders
1348 .iter()
1349 .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1350 let referer = if has_non_post_only || mm_level == 0 {
1351 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1352 } else {
1353 None
1354 };
1355
1356 let request_items: Vec<BybitWsBatchPlaceItem> = orders
1357 .into_iter()
1358 .map(|order| BybitWsBatchPlaceItem {
1359 symbol: order.symbol,
1360 side: order.side,
1361 order_type: order.order_type,
1362 qty: order.qty,
1363 is_leverage: order.is_leverage,
1364 market_unit: order.market_unit,
1365 price: order.price,
1366 time_in_force: order.time_in_force,
1367 order_link_id: order.order_link_id,
1368 reduce_only: order.reduce_only,
1369 close_on_trigger: order.close_on_trigger,
1370 trigger_price: order.trigger_price,
1371 trigger_by: order.trigger_by,
1372 trigger_direction: order.trigger_direction,
1373 tpsl_mode: order.tpsl_mode,
1374 take_profit: order.take_profit,
1375 stop_loss: order.stop_loss,
1376 tp_trigger_by: order.tp_trigger_by,
1377 sl_trigger_by: order.sl_trigger_by,
1378 sl_trigger_price: order.sl_trigger_price,
1379 tp_trigger_price: order.tp_trigger_price,
1380 sl_order_type: order.sl_order_type,
1381 tp_order_type: order.tp_order_type,
1382 sl_limit_price: order.sl_limit_price,
1383 tp_limit_price: order.tp_limit_price,
1384 })
1385 .collect();
1386
1387 let args = BybitWsBatchPlaceOrderArgs {
1388 category,
1389 request: request_items,
1390 };
1391
1392 let request = BybitWsRequest {
1393 req_id: Some(batch_req_id),
1394 op: BybitWsOrderRequestOp::CreateBatch,
1395 header: BybitWsHeader::with_referer(referer),
1396 args: vec![args],
1397 };
1398
1399 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1400
1401 self.send_text(&payload).await
1402 }
1403
1404 pub async fn batch_amend_orders(
1410 &self,
1411 #[allow(unused_variables)] trader_id: TraderId,
1412 #[allow(unused_variables)] strategy_id: StrategyId,
1413 orders: Vec<BybitWsAmendOrderParams>,
1414 ) -> BybitWsResult<()> {
1415 if !self.is_authenticated.load(Ordering::Relaxed) {
1416 return Err(BybitWsError::Authentication(
1417 "Must be authenticated to amend orders".to_string(),
1418 ));
1419 }
1420
1421 if orders.is_empty() {
1422 log::warn!("Batch amend orders called with empty orders list");
1423 return Ok(());
1424 }
1425
1426 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1427 self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1428 .await?;
1429 }
1430
1431 Ok(())
1432 }
1433
1434 async fn batch_amend_orders_chunk(
1435 &self,
1436 #[allow(unused_variables)] trader_id: TraderId,
1437 #[allow(unused_variables)] strategy_id: StrategyId,
1438 orders: Vec<BybitWsAmendOrderParams>,
1439 ) -> BybitWsResult<()> {
1440 let request = BybitWsRequest {
1441 req_id: None,
1442 op: BybitWsOrderRequestOp::AmendBatch,
1443 header: BybitWsHeader::now(),
1444 args: orders,
1445 };
1446
1447 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1448
1449 self.send_text(&payload).await
1450 }
1451
1452 pub async fn batch_cancel_orders(
1458 &self,
1459 trader_id: TraderId,
1460 strategy_id: StrategyId,
1461 orders: Vec<BybitWsCancelOrderParams>,
1462 ) -> BybitWsResult<()> {
1463 if !self.is_authenticated.load(Ordering::Relaxed) {
1464 return Err(BybitWsError::Authentication(
1465 "Must be authenticated to cancel orders".to_string(),
1466 ));
1467 }
1468
1469 if orders.is_empty() {
1470 log::warn!("Batch cancel orders called with empty orders list");
1471 return Ok(());
1472 }
1473
1474 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1475 self.batch_cancel_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1476 .await?;
1477 }
1478
1479 Ok(())
1480 }
1481
1482 async fn batch_cancel_orders_chunk(
1483 &self,
1484 trader_id: TraderId,
1485 strategy_id: StrategyId,
1486 orders: Vec<BybitWsCancelOrderParams>,
1487 ) -> BybitWsResult<()> {
1488 if orders.is_empty() {
1489 return Ok(());
1490 }
1491
1492 let category = orders[0].category;
1493 let batch_req_id = UUID4::new().to_string();
1494
1495 let mut validated_data = Vec::new();
1496
1497 for order in &orders {
1498 if let Some(order_link_id_str) = &order.order_link_id {
1499 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1500 let instrument_id = self
1501 .instruments_cache
1502 .get(&cache_key)
1503 .map(|inst| inst.id())
1504 .ok_or_else(|| {
1505 BybitWsError::ClientError(format!(
1506 "Instrument {cache_key} not found in cache"
1507 ))
1508 })?;
1509
1510 let venue_order_id = order
1511 .order_id
1512 .as_ref()
1513 .map(|id| VenueOrderId::from(id.as_str()));
1514
1515 validated_data.push((order_link_id_str.clone(), instrument_id, venue_order_id));
1516 }
1517 }
1518
1519 let batch_cancel_data: Vec<_> = validated_data
1520 .iter()
1521 .map(|(order_link_id_str, instrument_id, venue_order_id)| {
1522 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1523 (
1524 client_order_id,
1525 (
1526 client_order_id,
1527 trader_id,
1528 strategy_id,
1529 *instrument_id,
1530 *venue_order_id,
1531 ),
1532 )
1533 })
1534 .collect();
1535
1536 if !batch_cancel_data.is_empty() {
1537 let cmd = HandlerCommand::RegisterBatchCancel {
1538 req_id: batch_req_id.clone(),
1539 cancels: batch_cancel_data,
1540 };
1541 let cmd_tx = self.cmd_tx.read().await;
1542 if let Err(e) = cmd_tx.send(cmd) {
1543 log::error!("Failed to send RegisterBatchCancel command: {e}");
1544 }
1545 }
1546
1547 let request_items: Vec<BybitWsBatchCancelItem> = orders
1548 .into_iter()
1549 .map(|order| BybitWsBatchCancelItem {
1550 symbol: order.symbol,
1551 order_id: order.order_id,
1552 order_link_id: order.order_link_id,
1553 })
1554 .collect();
1555
1556 let args = BybitWsBatchCancelOrderArgs {
1557 category,
1558 request: request_items,
1559 };
1560
1561 let request = BybitWsRequest {
1562 req_id: Some(batch_req_id),
1563 op: BybitWsOrderRequestOp::CancelBatch,
1564 header: BybitWsHeader::now(),
1565 args: vec![args],
1566 };
1567
1568 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1569
1570 self.send_text(&payload).await
1571 }
1572
1573 #[allow(clippy::too_many_arguments)]
1579 pub async fn submit_order(
1580 &self,
1581 product_type: BybitProductType,
1582 trader_id: TraderId,
1583 strategy_id: StrategyId,
1584 instrument_id: InstrumentId,
1585 client_order_id: ClientOrderId,
1586 order_side: OrderSide,
1587 order_type: OrderType,
1588 quantity: Quantity,
1589 is_quote_quantity: bool,
1590 time_in_force: Option<TimeInForce>,
1591 price: Option<Price>,
1592 trigger_price: Option<Price>,
1593 post_only: Option<bool>,
1594 reduce_only: Option<bool>,
1595 is_leverage: bool,
1596 ) -> BybitWsResult<()> {
1597 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1598 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1599 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1600
1601 let bybit_side = match order_side {
1602 OrderSide::Buy => BybitOrderSide::Buy,
1603 OrderSide::Sell => BybitOrderSide::Sell,
1604 _ => {
1605 return Err(BybitWsError::ClientError(format!(
1606 "Invalid order side: {order_side:?}"
1607 )));
1608 }
1609 };
1610
1611 let (bybit_order_type, is_stop_order) = match order_type {
1613 OrderType::Market => (BybitOrderType::Market, false),
1614 OrderType::Limit => (BybitOrderType::Limit, false),
1615 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1616 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1617 _ => {
1618 return Err(BybitWsError::ClientError(format!(
1619 "Unsupported order type: {order_type:?}"
1620 )));
1621 }
1622 };
1623
1624 let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1625 None
1626 } else if post_only == Some(true) {
1627 Some(BybitTimeInForce::PostOnly)
1628 } else if let Some(tif) = time_in_force {
1629 Some(match tif {
1630 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1631 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1632 TimeInForce::Fok => BybitTimeInForce::Fok,
1633 _ => {
1634 return Err(BybitWsError::ClientError(format!(
1635 "Unsupported time in force: {tif:?}"
1636 )));
1637 }
1638 })
1639 } else {
1640 None
1641 };
1642
1643 let market_unit = if product_type == BybitProductType::Spot
1646 && bybit_order_type == BybitOrderType::Market
1647 {
1648 if is_quote_quantity {
1649 Some(BYBIT_QUOTE_COIN.to_string())
1650 } else {
1651 Some(BYBIT_BASE_COIN.to_string())
1652 }
1653 } else {
1654 None
1655 };
1656
1657 let is_leverage_value = if product_type == BybitProductType::Spot {
1659 Some(i32::from(is_leverage))
1660 } else {
1661 None
1662 };
1663
1664 let trigger_direction = if is_stop_order {
1667 match (order_type, order_side) {
1668 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1669 Some(BybitTriggerDirection::RisesTo as i32)
1670 }
1671 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1672 Some(BybitTriggerDirection::FallsTo as i32)
1673 }
1674 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1675 Some(BybitTriggerDirection::FallsTo as i32)
1676 }
1677 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1678 Some(BybitTriggerDirection::RisesTo as i32)
1679 }
1680 _ => None,
1681 }
1682 } else {
1683 None
1684 };
1685
1686 let params = if is_stop_order {
1687 BybitWsPlaceOrderParams {
1690 category: product_type,
1691 symbol: raw_symbol,
1692 side: bybit_side,
1693 order_type: bybit_order_type,
1694 qty: quantity.to_string(),
1695 is_leverage: is_leverage_value,
1696 market_unit: market_unit.clone(),
1697 price: price.map(|p| p.to_string()),
1698 time_in_force: bybit_tif,
1699 order_link_id: Some(client_order_id.to_string()),
1700 reduce_only: reduce_only.filter(|&r| r),
1701 close_on_trigger: None,
1702 trigger_price: trigger_price.map(|p| p.to_string()),
1703 trigger_by: Some(BybitTriggerType::LastPrice),
1704 trigger_direction,
1705 tpsl_mode: None, take_profit: None,
1707 stop_loss: None,
1708 tp_trigger_by: None,
1709 sl_trigger_by: None,
1710 sl_trigger_price: None, tp_trigger_price: None, sl_order_type: None,
1713 tp_order_type: None,
1714 sl_limit_price: None,
1715 tp_limit_price: None,
1716 }
1717 } else {
1718 BybitWsPlaceOrderParams {
1720 category: product_type,
1721 symbol: raw_symbol,
1722 side: bybit_side,
1723 order_type: bybit_order_type,
1724 qty: quantity.to_string(),
1725 is_leverage: is_leverage_value,
1726 market_unit,
1727 price: price.map(|p| p.to_string()),
1728 time_in_force: if bybit_order_type == BybitOrderType::Market {
1729 None
1730 } else {
1731 bybit_tif
1732 },
1733 order_link_id: Some(client_order_id.to_string()),
1734 reduce_only: reduce_only.filter(|&r| r),
1735 close_on_trigger: None,
1736 trigger_price: None,
1737 trigger_by: None,
1738 trigger_direction: None,
1739 tpsl_mode: None,
1740 take_profit: None,
1741 stop_loss: None,
1742 tp_trigger_by: None,
1743 sl_trigger_by: None,
1744 sl_trigger_price: None,
1745 tp_trigger_price: None,
1746 sl_order_type: None,
1747 tp_order_type: None,
1748 sl_limit_price: None,
1749 tp_limit_price: None,
1750 }
1751 };
1752
1753 self.place_order(
1754 params,
1755 client_order_id,
1756 trader_id,
1757 strategy_id,
1758 instrument_id,
1759 )
1760 .await
1761 }
1762
1763 #[allow(clippy::too_many_arguments)]
1769 pub async fn modify_order(
1770 &self,
1771 product_type: BybitProductType,
1772 trader_id: TraderId,
1773 strategy_id: StrategyId,
1774 instrument_id: InstrumentId,
1775 client_order_id: ClientOrderId,
1776 venue_order_id: Option<VenueOrderId>,
1777 quantity: Option<Quantity>,
1778 price: Option<Price>,
1779 ) -> BybitWsResult<()> {
1780 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1781 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1782 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1783
1784 let params = BybitWsAmendOrderParams {
1785 category: product_type,
1786 symbol: raw_symbol,
1787 order_id: venue_order_id.map(|id| id.to_string()),
1788 order_link_id: Some(client_order_id.to_string()),
1789 qty: quantity.map(|q| q.to_string()),
1790 price: price.map(|p| p.to_string()),
1791 trigger_price: None,
1792 take_profit: None,
1793 stop_loss: None,
1794 tp_trigger_by: None,
1795 sl_trigger_by: None,
1796 };
1797
1798 self.amend_order(
1799 params,
1800 client_order_id,
1801 trader_id,
1802 strategy_id,
1803 instrument_id,
1804 venue_order_id,
1805 )
1806 .await
1807 }
1808
1809 pub async fn cancel_order_by_id(
1815 &self,
1816 product_type: BybitProductType,
1817 trader_id: TraderId,
1818 strategy_id: StrategyId,
1819 instrument_id: InstrumentId,
1820 client_order_id: ClientOrderId,
1821 venue_order_id: Option<VenueOrderId>,
1822 ) -> BybitWsResult<()> {
1823 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1824 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1825 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1826
1827 let params = BybitWsCancelOrderParams {
1828 category: product_type,
1829 symbol: raw_symbol,
1830 order_id: venue_order_id.map(|id| id.to_string()),
1831 order_link_id: Some(client_order_id.to_string()),
1832 };
1833
1834 self.cancel_order(
1835 params,
1836 client_order_id,
1837 trader_id,
1838 strategy_id,
1839 instrument_id,
1840 venue_order_id,
1841 )
1842 .await
1843 }
1844
1845 #[allow(clippy::too_many_arguments)]
1847 pub fn build_place_order_params(
1848 &self,
1849 product_type: BybitProductType,
1850 instrument_id: InstrumentId,
1851 client_order_id: ClientOrderId,
1852 order_side: OrderSide,
1853 order_type: OrderType,
1854 quantity: Quantity,
1855 is_quote_quantity: bool,
1856 time_in_force: Option<TimeInForce>,
1857 price: Option<Price>,
1858 trigger_price: Option<Price>,
1859 post_only: Option<bool>,
1860 reduce_only: Option<bool>,
1861 is_leverage: bool,
1862 ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1863 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1864 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1865 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1866
1867 let bybit_side = match order_side {
1868 OrderSide::Buy => BybitOrderSide::Buy,
1869 OrderSide::Sell => BybitOrderSide::Sell,
1870 _ => {
1871 return Err(BybitWsError::ClientError(format!(
1872 "Invalid order side: {order_side:?}"
1873 )));
1874 }
1875 };
1876
1877 let (bybit_order_type, is_stop_order) = match order_type {
1878 OrderType::Market => (BybitOrderType::Market, false),
1879 OrderType::Limit => (BybitOrderType::Limit, false),
1880 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1881 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1882 _ => {
1883 return Err(BybitWsError::ClientError(format!(
1884 "Unsupported order type: {order_type:?}"
1885 )));
1886 }
1887 };
1888
1889 let bybit_tif = if post_only == Some(true) {
1890 Some(BybitTimeInForce::PostOnly)
1891 } else if let Some(tif) = time_in_force {
1892 Some(match tif {
1893 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1894 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1895 TimeInForce::Fok => BybitTimeInForce::Fok,
1896 _ => {
1897 return Err(BybitWsError::ClientError(format!(
1898 "Unsupported time in force: {tif:?}"
1899 )));
1900 }
1901 })
1902 } else {
1903 None
1904 };
1905
1906 let market_unit = if product_type == BybitProductType::Spot
1907 && bybit_order_type == BybitOrderType::Market
1908 {
1909 if is_quote_quantity {
1910 Some(BYBIT_QUOTE_COIN.to_string())
1911 } else {
1912 Some(BYBIT_BASE_COIN.to_string())
1913 }
1914 } else {
1915 None
1916 };
1917
1918 let is_leverage_value = if product_type == BybitProductType::Spot {
1920 Some(i32::from(is_leverage))
1921 } else {
1922 None
1923 };
1924
1925 let trigger_direction = if is_stop_order {
1928 match (order_type, order_side) {
1929 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1930 Some(BybitTriggerDirection::RisesTo as i32)
1931 }
1932 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1933 Some(BybitTriggerDirection::FallsTo as i32)
1934 }
1935 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1936 Some(BybitTriggerDirection::FallsTo as i32)
1937 }
1938 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1939 Some(BybitTriggerDirection::RisesTo as i32)
1940 }
1941 _ => None,
1942 }
1943 } else {
1944 None
1945 };
1946
1947 let params = if is_stop_order {
1948 BybitWsPlaceOrderParams {
1949 category: product_type,
1950 symbol: raw_symbol,
1951 side: bybit_side,
1952 order_type: bybit_order_type,
1953 qty: quantity.to_string(),
1954 is_leverage: is_leverage_value,
1955 market_unit,
1956 price: price.map(|p| p.to_string()),
1957 time_in_force: if bybit_order_type == BybitOrderType::Market {
1958 None
1959 } else {
1960 bybit_tif
1961 },
1962 order_link_id: Some(client_order_id.to_string()),
1963 reduce_only: reduce_only.filter(|&r| r),
1964 close_on_trigger: None,
1965 trigger_price: trigger_price.map(|p| p.to_string()),
1966 trigger_by: Some(BybitTriggerType::LastPrice),
1967 trigger_direction,
1968 tpsl_mode: None,
1969 take_profit: None,
1970 stop_loss: None,
1971 tp_trigger_by: None,
1972 sl_trigger_by: None,
1973 sl_trigger_price: None,
1974 tp_trigger_price: None,
1975 sl_order_type: None,
1976 tp_order_type: None,
1977 sl_limit_price: None,
1978 tp_limit_price: None,
1979 }
1980 } else {
1981 BybitWsPlaceOrderParams {
1982 category: product_type,
1983 symbol: raw_symbol,
1984 side: bybit_side,
1985 order_type: bybit_order_type,
1986 qty: quantity.to_string(),
1987 is_leverage: is_leverage_value,
1988 market_unit,
1989 price: price.map(|p| p.to_string()),
1990 time_in_force: if bybit_order_type == BybitOrderType::Market {
1991 None
1992 } else {
1993 bybit_tif
1994 },
1995 order_link_id: Some(client_order_id.to_string()),
1996 reduce_only: reduce_only.filter(|&r| r),
1997 close_on_trigger: None,
1998 trigger_price: None,
1999 trigger_by: None,
2000 trigger_direction: None,
2001 tpsl_mode: None,
2002 take_profit: None,
2003 stop_loss: None,
2004 tp_trigger_by: None,
2005 sl_trigger_by: None,
2006 sl_trigger_price: None,
2007 tp_trigger_price: None,
2008 sl_order_type: None,
2009 tp_order_type: None,
2010 sl_limit_price: None,
2011 tp_limit_price: None,
2012 }
2013 };
2014
2015 Ok(params)
2016 }
2017
2018 #[allow(clippy::too_many_arguments)]
2020 pub fn build_amend_order_params(
2021 &self,
2022 product_type: BybitProductType,
2023 instrument_id: InstrumentId,
2024 venue_order_id: Option<VenueOrderId>,
2025 client_order_id: Option<ClientOrderId>,
2026 quantity: Option<Quantity>,
2027 price: Option<Price>,
2028 ) -> BybitWsResult<BybitWsAmendOrderParams> {
2029 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2030 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2031 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2032
2033 Ok(BybitWsAmendOrderParams {
2034 category: product_type,
2035 symbol: raw_symbol,
2036 order_id: venue_order_id.map(|v| v.to_string()),
2037 order_link_id: client_order_id.map(|c| c.to_string()),
2038 qty: quantity.map(|q| q.to_string()),
2039 price: price.map(|p| p.to_string()),
2040 trigger_price: None,
2041 take_profit: None,
2042 stop_loss: None,
2043 tp_trigger_by: None,
2044 sl_trigger_by: None,
2045 })
2046 }
2047
2048 pub fn build_cancel_order_params(
2055 &self,
2056 product_type: BybitProductType,
2057 instrument_id: InstrumentId,
2058 venue_order_id: Option<VenueOrderId>,
2059 client_order_id: Option<ClientOrderId>,
2060 ) -> BybitWsResult<BybitWsCancelOrderParams> {
2061 if venue_order_id.is_none() && client_order_id.is_none() {
2062 return Err(BybitWsError::ClientError(
2063 "Either venue_order_id or client_order_id must be provided".to_string(),
2064 ));
2065 }
2066
2067 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2068 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2069 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2070
2071 Ok(BybitWsCancelOrderParams {
2072 category: product_type,
2073 symbol: raw_symbol,
2074 order_id: venue_order_id.map(|v| v.to_string()),
2075 order_link_id: client_order_id.map(|c| c.to_string()),
2076 })
2077 }
2078
2079 fn default_headers() -> Vec<(String, String)> {
2080 vec![
2081 ("Content-Type".to_string(), "application/json".to_string()),
2082 ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2083 ]
2084 }
2085
2086 async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2087 if !self.requires_auth {
2088 return Ok(());
2089 }
2090
2091 let credential = self.credential.as_ref().ok_or_else(|| {
2092 BybitWsError::Authentication("Credentials required for authentication".to_string())
2093 })?;
2094
2095 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2096 let signature = credential.sign_websocket_auth(expires);
2097
2098 let auth_message = BybitAuthRequest {
2099 op: BybitWsOperation::Auth,
2100 args: vec![
2101 Value::String(credential.api_key().to_string()),
2102 Value::Number(expires.into()),
2103 Value::String(signature),
2104 ],
2105 };
2106
2107 let payload = serde_json::to_string(&auth_message)?;
2108
2109 self.cmd_tx
2110 .read()
2111 .await
2112 .send(HandlerCommand::Authenticate { payload })
2113 .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2114
2115 Ok(())
2118 }
2119
2120 async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2121 let cmd = HandlerCommand::SendText {
2122 payload: text.to_string(),
2123 };
2124
2125 self.send_cmd(cmd).await
2126 }
2127
2128 async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2129 self.cmd_tx
2130 .read()
2131 .await
2132 .send(cmd)
2133 .map_err(|e| BybitWsError::Send(e.to_string()))
2134 }
2135}
2136
2137#[cfg(test)]
2138mod tests {
2139 use rstest::rstest;
2140
2141 use super::*;
2142 use crate::{
2143 common::testing::load_test_json,
2144 websocket::{classify_bybit_message, messages::BybitWsMessage},
2145 };
2146
2147 #[rstest]
2148 fn classify_orderbook_snapshot() {
2149 let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2150 .expect("invalid fixture");
2151 let message = classify_bybit_message(json);
2152 assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2153 }
2154
2155 #[rstest]
2156 fn classify_trade_snapshot() {
2157 let json: Value =
2158 serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2159 let message = classify_bybit_message(json);
2160 assert!(matches!(message, BybitWsMessage::Trade(_)));
2161 }
2162
2163 #[rstest]
2164 fn classify_ticker_linear_snapshot() {
2165 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2166 .expect("invalid fixture");
2167 let message = classify_bybit_message(json);
2168 assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2169 }
2170
2171 #[rstest]
2172 fn classify_ticker_option_snapshot() {
2173 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2174 .expect("invalid fixture");
2175 let message = classify_bybit_message(json);
2176 assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2177 }
2178
2179 #[rstest]
2180 fn test_race_unsubscribe_failure_recovery() {
2181 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2189
2190 subscriptions.mark_subscribe(topic);
2192 subscriptions.confirm_subscribe(topic);
2193 assert_eq!(subscriptions.len(), 1);
2194
2195 subscriptions.mark_unsubscribe(topic);
2197 assert_eq!(subscriptions.len(), 0);
2198 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2199
2200 subscriptions.confirm_unsubscribe(topic); subscriptions.mark_subscribe(topic); subscriptions.confirm_subscribe(topic); assert_eq!(subscriptions.len(), 1);
2208 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2209 assert!(subscriptions.pending_subscribe_topics().is_empty());
2210
2211 let all = subscriptions.all_topics();
2213 assert_eq!(all.len(), 1);
2214 assert!(all.contains(&topic.to_string()));
2215 }
2216
2217 #[rstest]
2218 fn test_race_resubscribe_before_unsubscribe_ack() {
2219 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "orderbook.50.BTCUSDT";
2225
2226 subscriptions.mark_subscribe(topic);
2228 subscriptions.confirm_subscribe(topic);
2229 assert_eq!(subscriptions.len(), 1);
2230
2231 subscriptions.mark_unsubscribe(topic);
2233 assert_eq!(subscriptions.len(), 0);
2234 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2235
2236 subscriptions.mark_subscribe(topic);
2238 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2239
2240 subscriptions.confirm_unsubscribe(topic);
2242 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2243 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2244
2245 subscriptions.confirm_subscribe(topic);
2247 assert_eq!(subscriptions.len(), 1);
2248 assert!(subscriptions.pending_subscribe_topics().is_empty());
2249
2250 let all = subscriptions.all_topics();
2252 assert_eq!(all.len(), 1);
2253 assert!(all.contains(&topic.to_string()));
2254 }
2255
2256 #[rstest]
2257 fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2258 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "tickers.ETHUSDT";
2263
2264 subscriptions.mark_subscribe(topic);
2266 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2267
2268 subscriptions.mark_unsubscribe(topic);
2270 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2272
2273 subscriptions.confirm_subscribe(topic);
2275 assert_eq!(subscriptions.len(), 0); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2277
2278 subscriptions.confirm_unsubscribe(topic);
2280
2281 assert!(subscriptions.is_empty());
2283 assert!(subscriptions.all_topics().is_empty());
2284 }
2285
2286 #[rstest]
2287 fn test_race_reconnection_with_pending_states() {
2288 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let trade_btc = "publicTrade.BTCUSDT";
2294 subscriptions.mark_subscribe(trade_btc);
2295 subscriptions.confirm_subscribe(trade_btc);
2296
2297 let trade_eth = "publicTrade.ETHUSDT";
2299 subscriptions.mark_subscribe(trade_eth);
2300
2301 let book_btc = "orderbook.50.BTCUSDT";
2303 subscriptions.mark_subscribe(book_btc);
2304 subscriptions.confirm_subscribe(book_btc);
2305 subscriptions.mark_unsubscribe(book_btc);
2306
2307 let topics_to_restore = subscriptions.all_topics();
2309
2310 assert_eq!(topics_to_restore.len(), 2);
2312 assert!(topics_to_restore.contains(&trade_btc.to_string()));
2313 assert!(topics_to_restore.contains(&trade_eth.to_string()));
2314 assert!(!topics_to_restore.contains(&book_btc.to_string())); }
2316
2317 #[rstest]
2318 fn test_race_duplicate_subscribe_messages_idempotent() {
2319 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2324
2325 subscriptions.mark_subscribe(topic);
2327 subscriptions.confirm_subscribe(topic);
2328 assert_eq!(subscriptions.len(), 1);
2329
2330 subscriptions.mark_subscribe(topic);
2332 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.len(), 1); subscriptions.confirm_subscribe(topic);
2337 assert_eq!(subscriptions.len(), 1);
2338
2339 let all = subscriptions.all_topics();
2341 assert_eq!(all.len(), 1);
2342 assert_eq!(all[0], topic);
2343 }
2344
2345 #[rstest]
2346 #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2347 #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2348 #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2349 #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2350 #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2351 #[case::option_with_leverage(BybitProductType::Option, true, None)]
2352 fn test_is_leverage_parameter(
2353 #[case] product_type: BybitProductType,
2354 #[case] is_leverage: bool,
2355 #[case] expected: Option<i32>,
2356 ) {
2357 let symbol = match product_type {
2358 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2359 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2360 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2361 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2362 };
2363
2364 let instrument_id = InstrumentId::from(symbol);
2365 let client_order_id = ClientOrderId::from("test-order-1");
2366 let quantity = Quantity::from("1.0");
2367
2368 let client = BybitWebSocketClient::new_trade(
2369 BybitEnvironment::Testnet,
2370 Some("test-key".to_string()),
2371 Some("test-secret".to_string()),
2372 None,
2373 Some(20),
2374 );
2375
2376 let params = client
2377 .build_place_order_params(
2378 product_type,
2379 instrument_id,
2380 client_order_id,
2381 OrderSide::Buy,
2382 OrderType::Limit,
2383 quantity,
2384 false, Some(TimeInForce::Gtc),
2386 Some(Price::from("50000.0")),
2387 None,
2388 None,
2389 None,
2390 is_leverage,
2391 )
2392 .expect("Failed to build params");
2393
2394 assert_eq!(params.is_leverage, expected);
2395 }
2396
2397 #[rstest]
2398 #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2399 #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2400 #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2401 #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2402 #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2403 #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2404 fn test_is_quote_quantity_parameter(
2405 #[case] product_type: BybitProductType,
2406 #[case] order_type: OrderType,
2407 #[case] is_quote_quantity: bool,
2408 #[case] expected: Option<String>,
2409 ) {
2410 let symbol = match product_type {
2411 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2412 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2413 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2414 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2415 };
2416
2417 let instrument_id = InstrumentId::from(symbol);
2418 let client_order_id = ClientOrderId::from("test-order-1");
2419 let quantity = Quantity::from("1.0");
2420
2421 let client = BybitWebSocketClient::new_trade(
2422 BybitEnvironment::Testnet,
2423 Some("test-key".to_string()),
2424 Some("test-secret".to_string()),
2425 None,
2426 Some(20),
2427 );
2428
2429 let params = client
2430 .build_place_order_params(
2431 product_type,
2432 instrument_id,
2433 client_order_id,
2434 OrderSide::Buy,
2435 order_type,
2436 quantity,
2437 is_quote_quantity,
2438 Some(TimeInForce::Gtc),
2439 if order_type == OrderType::Market {
2440 None
2441 } else {
2442 Some(Price::from("50000.0"))
2443 },
2444 None,
2445 None,
2446 None,
2447 false,
2448 )
2449 .expect("Failed to build params");
2450
2451 assert_eq!(params.market_unit, expected);
2452 }
2453}