1use std::{
21 fmt::Debug,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, AtomicU8, Ordering},
25 },
26 time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::runtime::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT};
34use nautilus_model::{
35 enums::{OrderSide, OrderType, TimeInForce},
36 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 types::{Price, Quantity},
39};
40use nautilus_network::{
41 backoff::ExponentialBackoff,
42 mode::ConnectionMode,
43 websocket::{
44 AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
45 channel_message_handler,
46 },
47};
48use serde_json::Value;
49use tokio::sync::RwLock;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54 common::{
55 consts::{
56 BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
57 },
58 credential::Credential,
59 enums::{
60 BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
61 BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
62 },
63 parse::{extract_raw_symbol, make_bybit_symbol},
64 symbol::BybitSymbol,
65 urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
66 },
67 websocket::{
68 enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
69 error::{BybitWsError, BybitWsResult},
70 handler::{FeedHandler, HandlerCommand},
71 messages::{
72 BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
73 BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
74 BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
75 NautilusWsMessage,
76 },
77 },
78};
79
80const DEFAULT_HEARTBEAT_SECS: u64 = 20;
81const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
82const BATCH_PROCESSING_LIMIT: usize = 20;
83
84type FundingCache = Arc<RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
86
87#[cfg_attr(feature = "python", pyo3::pyclass)]
89pub struct BybitWebSocketClient {
90 url: String,
91 environment: BybitEnvironment,
92 product_type: Option<BybitProductType>,
93 credential: Option<Credential>,
94 requires_auth: bool,
95 auth_tracker: AuthTracker,
96 heartbeat: Option<u64>,
97 connection_mode: Arc<ArcSwap<AtomicU8>>,
98 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
99 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
100 signal: Arc<AtomicBool>,
101 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
102 subscriptions: SubscriptionState,
103 is_authenticated: Arc<AtomicBool>,
104 account_id: Option<AccountId>,
105 mm_level: Arc<AtomicU8>,
106 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
107 funding_cache: FundingCache,
108 cancellation_token: CancellationToken,
109}
110
111impl Debug for BybitWebSocketClient {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("BybitWebSocketClient")
114 .field("url", &self.url)
115 .field("environment", &self.environment)
116 .field("product_type", &self.product_type)
117 .field("requires_auth", &self.requires_auth)
118 .field("heartbeat", &self.heartbeat)
119 .field("confirmed_subscriptions", &self.subscriptions.len())
120 .finish()
121 }
122}
123
124impl Clone for BybitWebSocketClient {
125 fn clone(&self) -> Self {
126 Self {
127 url: self.url.clone(),
128 environment: self.environment,
129 product_type: self.product_type,
130 credential: self.credential.clone(),
131 requires_auth: self.requires_auth,
132 auth_tracker: self.auth_tracker.clone(),
133 heartbeat: self.heartbeat,
134 connection_mode: Arc::clone(&self.connection_mode),
135 cmd_tx: Arc::clone(&self.cmd_tx),
136 out_rx: None, signal: Arc::clone(&self.signal),
138 task_handle: None, subscriptions: self.subscriptions.clone(),
140 is_authenticated: Arc::clone(&self.is_authenticated),
141 account_id: self.account_id,
142 mm_level: Arc::clone(&self.mm_level),
143 instruments_cache: Arc::clone(&self.instruments_cache),
144 funding_cache: Arc::clone(&self.funding_cache),
145 cancellation_token: self.cancellation_token.clone(),
146 }
147 }
148}
149
150impl BybitWebSocketClient {
151 #[must_use]
153 pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
154 Self::new_public_with(
155 BybitProductType::Linear,
156 BybitEnvironment::Mainnet,
157 url,
158 heartbeat,
159 )
160 }
161
162 #[must_use]
164 pub fn new_public_with(
165 product_type: BybitProductType,
166 environment: BybitEnvironment,
167 url: Option<String>,
168 heartbeat: Option<u64>,
169 ) -> Self {
170 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
174
175 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
176 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
177
178 Self {
179 url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
180 environment,
181 product_type: Some(product_type),
182 credential: None,
183 requires_auth: false,
184 auth_tracker: AuthTracker::new(),
185 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
186 connection_mode,
187 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
188 out_rx: None,
189 signal: Arc::new(AtomicBool::new(false)),
190 task_handle: None,
191 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
192 is_authenticated: Arc::new(AtomicBool::new(false)),
193 instruments_cache: Arc::new(DashMap::new()),
194 account_id: None,
195 funding_cache: Arc::new(RwLock::new(AHashMap::new())),
196 cancellation_token: CancellationToken::new(),
197 mm_level: Arc::new(AtomicU8::new(0)),
198 }
199 }
200
201 #[must_use]
203 pub fn new_private(
204 environment: BybitEnvironment,
205 credential: Credential,
206 url: Option<String>,
207 heartbeat: Option<u64>,
208 ) -> Self {
209 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
213
214 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
215 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
216
217 Self {
218 url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
219 environment,
220 product_type: None,
221 credential: Some(credential),
222 requires_auth: true,
223 auth_tracker: AuthTracker::new(),
224 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
225 connection_mode,
226 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
227 out_rx: None,
228 signal: Arc::new(AtomicBool::new(false)),
229 task_handle: None,
230 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
231 is_authenticated: Arc::new(AtomicBool::new(false)),
232 instruments_cache: Arc::new(DashMap::new()),
233 account_id: None,
234 funding_cache: Arc::new(RwLock::new(AHashMap::new())),
235 cancellation_token: CancellationToken::new(),
236 mm_level: Arc::new(AtomicU8::new(0)),
237 }
238 }
239
240 #[must_use]
242 pub fn new_trade(
243 environment: BybitEnvironment,
244 credential: Credential,
245 url: Option<String>,
246 heartbeat: Option<u64>,
247 ) -> Self {
248 let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
252
253 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
254 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
255
256 Self {
257 url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
258 environment,
259 product_type: None,
260 credential: Some(credential),
261 requires_auth: true,
262 auth_tracker: AuthTracker::new(),
263 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
264 connection_mode,
265 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
266 out_rx: None,
267 signal: Arc::new(AtomicBool::new(false)),
268 task_handle: None,
269 subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
270 is_authenticated: Arc::new(AtomicBool::new(false)),
271 instruments_cache: Arc::new(DashMap::new()),
272 account_id: None,
273 funding_cache: Arc::new(RwLock::new(AHashMap::new())),
274 cancellation_token: CancellationToken::new(),
275 mm_level: Arc::new(AtomicU8::new(0)),
276 }
277 }
278
279 pub async fn connect(&mut self) -> BybitWsResult<()> {
286 self.signal.store(false, Ordering::Relaxed);
287
288 let (raw_handler, raw_rx) = channel_message_handler();
289
290 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
293 });
295
296 let ping_msg = serde_json::to_string(&BybitSubscription {
297 op: BybitWsOperation::Ping,
298 args: vec![],
299 })?;
300
301 let config = WebSocketConfig {
302 url: self.url.clone(),
303 headers: Self::default_headers(),
304 message_handler: Some(raw_handler),
305 heartbeat: self.heartbeat,
306 heartbeat_msg: Some(ping_msg),
307 ping_handler: Some(ping_handler),
308 reconnect_timeout_ms: Some(5_000),
309 reconnect_delay_initial_ms: Some(500),
310 reconnect_delay_max_ms: Some(5_000),
311 reconnect_backoff_factor: Some(1.5),
312 reconnect_jitter_ms: Some(250),
313 reconnect_max_attempts: None,
314 };
315
316 const MAX_RETRIES: u32 = 5;
319 const CONNECTION_TIMEOUT_SECS: u64 = 10;
320
321 let mut backoff = ExponentialBackoff::new(
322 Duration::from_millis(500),
323 Duration::from_millis(5000),
324 2.0,
325 250,
326 false,
327 )
328 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
329
330 #[allow(unused_assignments)]
331 let mut last_error = String::new();
332 let mut attempt = 0;
333 let client = loop {
334 attempt += 1;
335
336 match tokio::time::timeout(
337 Duration::from_secs(CONNECTION_TIMEOUT_SECS),
338 WebSocketClient::connect(config.clone(), None, vec![], None),
339 )
340 .await
341 {
342 Ok(Ok(client)) => {
343 if attempt > 1 {
344 tracing::info!("WebSocket connection established after {attempt} attempts");
345 }
346 break client;
347 }
348 Ok(Err(e)) => {
349 last_error = e.to_string();
350 tracing::warn!(
351 attempt,
352 max_retries = MAX_RETRIES,
353 url = %self.url,
354 error = %last_error,
355 "WebSocket connection attempt failed"
356 );
357 }
358 Err(_) => {
359 last_error = format!(
360 "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
361 );
362 tracing::warn!(
363 attempt,
364 max_retries = MAX_RETRIES,
365 url = %self.url,
366 "WebSocket connection attempt timed out"
367 );
368 }
369 }
370
371 if attempt >= MAX_RETRIES {
372 return Err(BybitWsError::Transport(format!(
373 "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
374 If this is a DNS error, check your network configuration and DNS settings.",
375 self.url,
376 if last_error.is_empty() {
377 "unknown error"
378 } else {
379 &last_error
380 }
381 )));
382 }
383
384 let delay = backoff.next_duration();
385 tracing::debug!(
386 "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
387 attempt + 1
388 );
389 tokio::time::sleep(delay).await;
390 };
391
392 self.connection_mode.store(client.connection_mode_atomic());
393
394 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
395 self.out_rx = Some(Arc::new(out_rx));
396
397 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
398 *self.cmd_tx.write().await = cmd_tx.clone();
399
400 let cmd = HandlerCommand::SetClient(client);
401
402 self.send_cmd(cmd).await?;
403
404 if !self.instruments_cache.is_empty() {
406 let cached_instruments: Vec<InstrumentAny> = self
407 .instruments_cache
408 .iter()
409 .map(|entry| entry.value().clone())
410 .collect();
411 let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
412 self.send_cmd(cmd).await?;
413 }
414
415 let signal = Arc::clone(&self.signal);
416 let subscriptions = self.subscriptions.clone();
417 let credential = self.credential.clone();
418 let requires_auth = self.requires_auth;
419 let funding_cache = Arc::clone(&self.funding_cache);
420 let account_id = self.account_id;
421 let product_type = self.product_type;
422 let mm_level = Arc::clone(&self.mm_level);
423 let cmd_tx_for_reconnect = cmd_tx.clone();
424 let auth_tracker = self.auth_tracker.clone();
425 let is_authenticated = Arc::clone(&self.is_authenticated);
426
427 let stream_handle = get_runtime().spawn(async move {
428 let mut handler = FeedHandler::new(
429 signal.clone(),
430 cmd_rx,
431 raw_rx,
432 out_tx.clone(),
433 account_id,
434 product_type,
435 mm_level.clone(),
436 auth_tracker,
437 subscriptions.clone(),
438 funding_cache.clone(),
439 );
440
441 let resubscribe_all = || async {
443 let topics = subscriptions.all_topics();
444
445 if topics.is_empty() {
446 return;
447 }
448
449 tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
450
451 for topic in &topics {
452 subscriptions.mark_subscribe(topic.as_str());
453 }
454
455 let mut payloads = Vec::with_capacity(topics.len());
456 for topic in &topics {
457 let message = BybitSubscription {
458 op: BybitWsOperation::Subscribe,
459 args: vec![topic.clone()],
460 };
461 if let Ok(payload) = serde_json::to_string(&message) {
462 payloads.push(payload);
463 }
464 }
465
466 let cmd = HandlerCommand::Subscribe { topics: payloads };
467
468 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
469 tracing::error!("Failed to send resubscribe command: {e}");
470 }
471 };
472
473 loop {
475 match handler.next().await {
476 Some(NautilusWsMessage::Reconnected) => {
477 if signal.load(Ordering::Relaxed) {
478 continue;
479 }
480
481 tracing::info!("WebSocket reconnected");
482
483 let confirmed_topics: Vec<String> = {
485 let confirmed = subscriptions.confirmed();
486 let mut topics = Vec::new();
487 for entry in confirmed.iter() {
488 let (channel, symbols) = entry.pair();
489 for symbol in symbols.iter() {
490 if symbol.is_empty() {
491 topics.push(channel.to_string());
492 } else {
493 topics.push(format!("{channel}.{symbol}"));
494 }
495 }
496 }
497 topics
498 };
499
500 if !confirmed_topics.is_empty() {
501 tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
502 for topic in confirmed_topics {
503 subscriptions.mark_failure(&topic);
504 }
505 }
506
507 funding_cache.write().await.clear();
509
510 if requires_auth {
511 is_authenticated.store(false, Ordering::Relaxed);
512 tracing::debug!("Re-authenticating after reconnection");
513
514 if let Some(cred) = &credential {
515 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
516 let signature = cred.sign_websocket_auth(expires);
517
518 let auth_message = BybitAuthRequest {
519 op: BybitWsOperation::Auth,
520 args: vec![
521 Value::String(cred.api_key().to_string()),
522 Value::Number(expires.into()),
523 Value::String(signature),
524 ],
525 };
526
527 if let Ok(payload) = serde_json::to_string(&auth_message) {
528 let cmd = HandlerCommand::Authenticate { payload };
529 if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
530 tracing::error!(error = %e, "Failed to send reconnection auth command");
531 }
532 } else {
533 tracing::error!("Failed to serialize reconnection auth message");
534 }
535 }
536 }
537
538 if !requires_auth {
541 tracing::debug!("No authentication required, resubscribing immediately");
542 resubscribe_all().await;
543 }
544
545 if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
547 tracing::debug!("Receiver dropped, stopping");
548 break;
549 }
550 continue;
551 }
552 Some(NautilusWsMessage::Authenticated) => {
553 tracing::debug!("Authenticated, resubscribing");
554 is_authenticated.store(true, Ordering::Relaxed);
555 resubscribe_all().await;
556 continue;
557 }
558 Some(msg) => {
559 if out_tx.send(msg).is_err() {
560 tracing::error!("Failed to send message (receiver dropped)");
561 break;
562 }
563 }
564 None => {
565 if handler.is_stopped() {
567 tracing::debug!("Stop signal received, ending message processing");
568 break;
569 }
570 tracing::warn!("WebSocket stream ended unexpectedly");
572 break;
573 }
574 }
575 }
576
577 tracing::debug!("Handler task exiting");
578 });
579
580 self.task_handle = Some(Arc::new(stream_handle));
581
582 if requires_auth && let Err(e) = self.authenticate_if_required().await {
583 return Err(e);
584 }
585
586 Ok(())
587 }
588
589 pub async fn close(&mut self) -> BybitWsResult<()> {
591 tracing::debug!("Starting close process");
592
593 self.signal.store(true, Ordering::Relaxed);
594
595 let cmd = HandlerCommand::Disconnect;
596 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
597 tracing::debug!(
598 "Failed to send disconnect command (handler may already be shut down): {e}"
599 );
600 }
601
602 if let Some(task_handle) = self.task_handle.take() {
603 match Arc::try_unwrap(task_handle) {
604 Ok(handle) => {
605 tracing::debug!("Waiting for task handle to complete");
606 match tokio::time::timeout(Duration::from_secs(2), handle).await {
607 Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
608 Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
609 Err(_) => {
610 tracing::warn!(
611 "Timeout waiting for task handle, task may still be running"
612 );
613 }
615 }
616 }
617 Err(arc_handle) => {
618 tracing::debug!(
619 "Cannot take ownership of task handle - other references exist, aborting task"
620 );
621 arc_handle.abort();
622 }
623 }
624 } else {
625 tracing::debug!("No task handle to await");
626 }
627
628 self.is_authenticated.store(false, Ordering::Relaxed);
629
630 tracing::debug!("Closed");
631
632 Ok(())
633 }
634
635 #[must_use]
637 pub fn is_active(&self) -> bool {
638 let connection_mode_arc = self.connection_mode.load();
639 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
640 && !self.signal.load(Ordering::Relaxed)
641 }
642
643 pub fn is_closed(&self) -> bool {
645 let connection_mode_arc = self.connection_mode.load();
646 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
647 || self.signal.load(Ordering::Relaxed)
648 }
649
650 pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
656 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
657
658 tokio::time::timeout(timeout, async {
659 while !self.is_active() {
660 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
661 }
662 })
663 .await
664 .map_err(|_| {
665 BybitWsError::ClientError(format!(
666 "WebSocket connection timeout after {timeout_secs} seconds"
667 ))
668 })?;
669
670 Ok(())
671 }
672
673 pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
675 if topics.is_empty() {
676 return Ok(());
677 }
678
679 tracing::debug!("Subscribing to topics: {topics:?}");
680
681 let mut topics_to_send = Vec::new();
683
684 for topic in topics {
685 if self.subscriptions.add_reference(&topic) {
687 self.subscriptions.mark_subscribe(&topic);
688 topics_to_send.push(topic.clone());
689 } else {
690 tracing::debug!("Already subscribed to {topic}, skipping duplicate subscription");
691 }
692 }
693
694 if topics_to_send.is_empty() {
695 return Ok(());
696 }
697
698 let mut payloads = Vec::with_capacity(topics_to_send.len());
700 for topic in &topics_to_send {
701 let message = BybitSubscription {
702 op: BybitWsOperation::Subscribe,
703 args: vec![topic.clone()],
704 };
705 let payload = serde_json::to_string(&message).map_err(|e| {
706 BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
707 })?;
708 payloads.push(payload);
709 }
710
711 let cmd = HandlerCommand::Subscribe { topics: payloads };
712 self.cmd_tx
713 .read()
714 .await
715 .send(cmd)
716 .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
717
718 Ok(())
719 }
720
721 pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
723 if topics.is_empty() {
724 return Ok(());
725 }
726
727 tracing::debug!("Attempting to unsubscribe from topics: {topics:?}");
728
729 if self.signal.load(Ordering::Relaxed) {
730 tracing::debug!("Shutdown signal detected, skipping unsubscribe");
731 return Ok(());
732 }
733
734 let mut topics_to_send = Vec::new();
736
737 for topic in topics {
738 if self.subscriptions.remove_reference(&topic) {
740 self.subscriptions.mark_unsubscribe(&topic);
741 topics_to_send.push(topic.clone());
742 } else {
743 tracing::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
744 }
745 }
746
747 if topics_to_send.is_empty() {
748 return Ok(());
749 }
750
751 let mut payloads = Vec::with_capacity(topics_to_send.len());
753 for topic in &topics_to_send {
754 let message = BybitSubscription {
755 op: BybitWsOperation::Unsubscribe,
756 args: vec![topic.clone()],
757 };
758 if let Ok(payload) = serde_json::to_string(&message) {
759 payloads.push(payload);
760 }
761 }
762
763 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
764 if let Err(e) = self.cmd_tx.read().await.send(cmd) {
765 tracing::debug!(error = %e, "Failed to send unsubscribe command");
766 }
767
768 Ok(())
769 }
770
771 pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
777 let rx = self
778 .out_rx
779 .take()
780 .expect("Stream receiver already taken or client not connected");
781 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
782 async_stream::stream! {
783 while let Some(msg) = rx.recv().await {
784 yield msg;
785 }
786 }
787 }
788
789 #[must_use]
791 pub fn subscription_count(&self) -> usize {
792 self.subscriptions.len()
793 }
794
795 #[must_use]
797 pub fn credential(&self) -> Option<&Credential> {
798 self.credential.as_ref()
799 }
800
801 pub fn cache_instrument(&self, instrument: InstrumentAny) {
805 self.instruments_cache
806 .insert(instrument.symbol().inner(), instrument.clone());
807
808 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
811 let cmd = HandlerCommand::UpdateInstrument(instrument);
812 if let Err(e) = cmd_tx.send(cmd) {
813 tracing::debug!("Failed to send instrument update to handler: {e}");
814 }
815 }
816 }
817
818 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
822 self.instruments_cache.clear();
823 let mut count = 0;
824
825 tracing::debug!("Initializing Bybit instrument cache");
826
827 for inst in instruments {
828 let symbol = inst.symbol().inner();
829 self.instruments_cache.insert(symbol, inst.clone());
830 tracing::debug!("Cached instrument: {symbol}");
831 count += 1;
832 }
833
834 tracing::info!("Bybit instrument cache initialized with {count} instruments");
835 }
836
837 pub fn set_account_id(&mut self, account_id: AccountId) {
839 self.account_id = Some(account_id);
840 }
841
842 pub fn set_mm_level(&self, mm_level: u8) {
844 self.mm_level.store(mm_level, Ordering::Relaxed);
845 }
846
847 #[must_use]
849 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
850 &self.instruments_cache
851 }
852
853 #[must_use]
855 pub fn account_id(&self) -> Option<AccountId> {
856 self.account_id
857 }
858
859 #[must_use]
861 pub fn product_type(&self) -> Option<BybitProductType> {
862 self.product_type
863 }
864
865 pub async fn subscribe_orderbook(
875 &self,
876 instrument_id: InstrumentId,
877 depth: u32,
878 ) -> BybitWsResult<()> {
879 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
880 let topic = format!(
881 "{}.{depth}.{raw_symbol}",
882 BybitWsPublicChannel::OrderBook.as_ref()
883 );
884 self.subscribe(vec![topic]).await
885 }
886
887 pub async fn unsubscribe_orderbook(
889 &self,
890 instrument_id: InstrumentId,
891 depth: u32,
892 ) -> BybitWsResult<()> {
893 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
894 let topic = format!(
895 "{}.{depth}.{raw_symbol}",
896 BybitWsPublicChannel::OrderBook.as_ref()
897 );
898 self.unsubscribe(vec![topic]).await
899 }
900
901 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
911 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
912 let topic = format!(
913 "{}.{raw_symbol}",
914 BybitWsPublicChannel::PublicTrade.as_ref()
915 );
916 self.subscribe(vec![topic]).await
917 }
918
919 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
921 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
922 let topic = format!(
923 "{}.{raw_symbol}",
924 BybitWsPublicChannel::PublicTrade.as_ref()
925 );
926 self.unsubscribe(vec![topic]).await
927 }
928
929 pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
939 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
940 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
941 self.subscribe(vec![topic]).await
942 }
943
944 pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
946 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
947 let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
948
949 let symbol = self.product_type.map_or_else(
951 || instrument_id.symbol.inner(),
952 |pt| make_bybit_symbol(raw_symbol, pt),
953 );
954 self.funding_cache.write().await.remove(&symbol);
955
956 self.unsubscribe(vec![topic]).await
957 }
958
959 pub async fn subscribe_klines(
969 &self,
970 instrument_id: InstrumentId,
971 interval: impl Into<String>,
972 ) -> BybitWsResult<()> {
973 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
974 let topic = format!(
975 "{}.{}.{raw_symbol}",
976 BybitWsPublicChannel::Kline.as_ref(),
977 interval.into()
978 );
979 self.subscribe(vec![topic]).await
980 }
981
982 pub async fn unsubscribe_klines(
984 &self,
985 instrument_id: InstrumentId,
986 interval: impl Into<String>,
987 ) -> BybitWsResult<()> {
988 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
989 let topic = format!(
990 "{}.{}.{raw_symbol}",
991 BybitWsPublicChannel::Kline.as_ref(),
992 interval.into()
993 );
994 self.unsubscribe(vec![topic]).await
995 }
996
997 pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1007 if !self.requires_auth {
1008 return Err(BybitWsError::Authentication(
1009 "Order subscription requires authentication".to_string(),
1010 ));
1011 }
1012 self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1013 .await
1014 }
1015
1016 pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1018 self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1019 .await
1020 }
1021
1022 pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1032 if !self.requires_auth {
1033 return Err(BybitWsError::Authentication(
1034 "Execution subscription requires authentication".to_string(),
1035 ));
1036 }
1037 self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1038 .await
1039 }
1040
1041 pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1043 self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1044 .await
1045 }
1046
1047 pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1057 if !self.requires_auth {
1058 return Err(BybitWsError::Authentication(
1059 "Position subscription requires authentication".to_string(),
1060 ));
1061 }
1062 self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1063 .await
1064 }
1065
1066 pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1068 self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1069 .await
1070 }
1071
1072 pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1082 if !self.requires_auth {
1083 return Err(BybitWsError::Authentication(
1084 "Wallet subscription requires authentication".to_string(),
1085 ));
1086 }
1087 self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1088 .await
1089 }
1090
1091 pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1093 self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1094 .await
1095 }
1096
1097 pub async fn place_order(
1107 &self,
1108 params: BybitWsPlaceOrderParams,
1109 client_order_id: ClientOrderId,
1110 trader_id: TraderId,
1111 strategy_id: StrategyId,
1112 instrument_id: InstrumentId,
1113 ) -> BybitWsResult<()> {
1114 if !self.is_authenticated.load(Ordering::Relaxed) {
1115 return Err(BybitWsError::Authentication(
1116 "Must be authenticated to place orders".to_string(),
1117 ));
1118 }
1119
1120 let cmd = HandlerCommand::PlaceOrder {
1121 params,
1122 client_order_id,
1123 trader_id,
1124 strategy_id,
1125 instrument_id,
1126 };
1127
1128 self.send_cmd(cmd).await
1129 }
1130
1131 pub async fn amend_order(
1141 &self,
1142 params: BybitWsAmendOrderParams,
1143 client_order_id: ClientOrderId,
1144 trader_id: TraderId,
1145 strategy_id: StrategyId,
1146 instrument_id: InstrumentId,
1147 venue_order_id: Option<VenueOrderId>,
1148 ) -> BybitWsResult<()> {
1149 if !self.is_authenticated.load(Ordering::Relaxed) {
1150 return Err(BybitWsError::Authentication(
1151 "Must be authenticated to amend orders".to_string(),
1152 ));
1153 }
1154
1155 let cmd = HandlerCommand::AmendOrder {
1156 params,
1157 client_order_id,
1158 trader_id,
1159 strategy_id,
1160 instrument_id,
1161 venue_order_id,
1162 };
1163
1164 self.send_cmd(cmd).await
1165 }
1166
1167 pub async fn cancel_order(
1177 &self,
1178 params: BybitWsCancelOrderParams,
1179 client_order_id: ClientOrderId,
1180 trader_id: TraderId,
1181 strategy_id: StrategyId,
1182 instrument_id: InstrumentId,
1183 venue_order_id: Option<VenueOrderId>,
1184 ) -> BybitWsResult<()> {
1185 if !self.is_authenticated.load(Ordering::Relaxed) {
1186 return Err(BybitWsError::Authentication(
1187 "Must be authenticated to cancel orders".to_string(),
1188 ));
1189 }
1190
1191 let cmd = HandlerCommand::CancelOrder {
1192 params,
1193 client_order_id,
1194 trader_id,
1195 strategy_id,
1196 instrument_id,
1197 venue_order_id,
1198 };
1199
1200 self.send_cmd(cmd).await
1201 }
1202
1203 pub async fn batch_place_orders(
1213 &self,
1214 trader_id: TraderId,
1215 strategy_id: StrategyId,
1216 orders: Vec<BybitWsPlaceOrderParams>,
1217 ) -> BybitWsResult<()> {
1218 if !self.is_authenticated.load(Ordering::Relaxed) {
1219 return Err(BybitWsError::Authentication(
1220 "Must be authenticated to place orders".to_string(),
1221 ));
1222 }
1223
1224 if orders.is_empty() {
1225 tracing::warn!("Batch place orders called with empty orders list");
1226 return Ok(());
1227 }
1228
1229 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1230 self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1231 .await?;
1232 }
1233
1234 Ok(())
1235 }
1236
1237 async fn batch_place_orders_chunk(
1238 &self,
1239 trader_id: TraderId,
1240 strategy_id: StrategyId,
1241 orders: Vec<BybitWsPlaceOrderParams>,
1242 ) -> BybitWsResult<()> {
1243 let category = orders[0].category;
1244
1245 let batch_req_id = UUID4::new().to_string();
1246
1247 let mut batch_order_data = Vec::new();
1249 for order in &orders {
1250 if let Some(order_link_id_str) = &order.order_link_id {
1251 let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1252 let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1253 let instrument_id = self
1254 .instruments_cache
1255 .get(&cache_key)
1256 .map(|inst| inst.id())
1257 .ok_or_else(|| {
1258 BybitWsError::ClientError(format!(
1259 "Instrument {} not found in cache",
1260 cache_key
1261 ))
1262 })?;
1263 batch_order_data.push((
1264 client_order_id,
1265 (client_order_id, trader_id, strategy_id, instrument_id),
1266 ));
1267 }
1268 }
1269
1270 if !batch_order_data.is_empty() {
1271 let cmd = HandlerCommand::RegisterBatchPlace {
1272 req_id: batch_req_id.clone(),
1273 orders: batch_order_data,
1274 };
1275 let cmd_tx = self.cmd_tx.read().await;
1276 if let Err(e) = cmd_tx.send(cmd) {
1277 tracing::error!("Failed to send RegisterBatchPlace command: {e}");
1278 }
1279 }
1280
1281 let mm_level = self.mm_level.load(Ordering::Relaxed);
1282 let has_non_post_only = orders
1283 .iter()
1284 .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1285 let referer = if has_non_post_only || mm_level == 0 {
1286 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1287 } else {
1288 None
1289 };
1290
1291 let request_items: Vec<BybitWsBatchPlaceItem> = orders
1292 .into_iter()
1293 .map(|order| BybitWsBatchPlaceItem {
1294 symbol: order.symbol,
1295 side: order.side,
1296 order_type: order.order_type,
1297 qty: order.qty,
1298 is_leverage: order.is_leverage,
1299 market_unit: order.market_unit,
1300 price: order.price,
1301 time_in_force: order.time_in_force,
1302 order_link_id: order.order_link_id,
1303 reduce_only: order.reduce_only,
1304 close_on_trigger: order.close_on_trigger,
1305 trigger_price: order.trigger_price,
1306 trigger_by: order.trigger_by,
1307 trigger_direction: order.trigger_direction,
1308 tpsl_mode: order.tpsl_mode,
1309 take_profit: order.take_profit,
1310 stop_loss: order.stop_loss,
1311 tp_trigger_by: order.tp_trigger_by,
1312 sl_trigger_by: order.sl_trigger_by,
1313 sl_trigger_price: order.sl_trigger_price,
1314 tp_trigger_price: order.tp_trigger_price,
1315 sl_order_type: order.sl_order_type,
1316 tp_order_type: order.tp_order_type,
1317 sl_limit_price: order.sl_limit_price,
1318 tp_limit_price: order.tp_limit_price,
1319 })
1320 .collect();
1321
1322 let args = BybitWsBatchPlaceOrderArgs {
1323 category,
1324 request: request_items,
1325 };
1326
1327 let request = BybitWsRequest {
1328 req_id: Some(batch_req_id),
1329 op: BybitWsOrderRequestOp::CreateBatch,
1330 header: BybitWsHeader::with_referer(referer),
1331 args: vec![args],
1332 };
1333
1334 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1335
1336 self.send_text(&payload).await
1337 }
1338
1339 pub async fn batch_amend_orders(
1345 &self,
1346 #[allow(unused_variables)] trader_id: TraderId,
1347 #[allow(unused_variables)] strategy_id: StrategyId,
1348 orders: Vec<BybitWsAmendOrderParams>,
1349 ) -> BybitWsResult<()> {
1350 if !self.is_authenticated.load(Ordering::Relaxed) {
1351 return Err(BybitWsError::Authentication(
1352 "Must be authenticated to amend orders".to_string(),
1353 ));
1354 }
1355
1356 if orders.is_empty() {
1357 tracing::warn!("Batch amend orders called with empty orders list");
1358 return Ok(());
1359 }
1360
1361 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1362 self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1363 .await?;
1364 }
1365
1366 Ok(())
1367 }
1368
1369 async fn batch_amend_orders_chunk(
1370 &self,
1371 #[allow(unused_variables)] trader_id: TraderId,
1372 #[allow(unused_variables)] strategy_id: StrategyId,
1373 orders: Vec<BybitWsAmendOrderParams>,
1374 ) -> BybitWsResult<()> {
1375 let request = BybitWsRequest {
1376 req_id: None,
1377 op: BybitWsOrderRequestOp::AmendBatch,
1378 header: BybitWsHeader::now(),
1379 args: orders,
1380 };
1381
1382 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1383
1384 self.send_text(&payload).await
1385 }
1386
1387 pub async fn batch_cancel_orders(
1393 &self,
1394 orders: Vec<BybitWsCancelOrderParams>,
1395 ) -> BybitWsResult<()> {
1396 if !self.is_authenticated.load(Ordering::Relaxed) {
1397 return Err(BybitWsError::Authentication(
1398 "Must be authenticated to cancel orders".to_string(),
1399 ));
1400 }
1401
1402 if orders.is_empty() {
1403 tracing::warn!("Batch cancel orders called with empty orders list");
1404 return Ok(());
1405 }
1406
1407 for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1408 self.batch_cancel_orders_chunk(chunk.to_vec()).await?;
1409 }
1410
1411 Ok(())
1412 }
1413
1414 async fn batch_cancel_orders_chunk(
1415 &self,
1416 orders: Vec<BybitWsCancelOrderParams>,
1417 ) -> BybitWsResult<()> {
1418 let category = orders[0].category;
1420
1421 let request_items: Vec<BybitWsBatchCancelItem> = orders
1422 .into_iter()
1423 .map(|order| BybitWsBatchCancelItem {
1424 symbol: order.symbol,
1425 order_id: order.order_id,
1426 order_link_id: order.order_link_id,
1427 })
1428 .collect();
1429
1430 let args = BybitWsBatchCancelOrderArgs {
1431 category,
1432 request: request_items,
1433 };
1434
1435 let request = BybitWsRequest {
1436 req_id: None,
1437 op: BybitWsOrderRequestOp::CancelBatch,
1438 header: BybitWsHeader::now(),
1439 args: vec![args],
1440 };
1441
1442 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1443
1444 self.send_text(&payload).await
1445 }
1446
1447 #[allow(clippy::too_many_arguments)]
1453 pub async fn submit_order(
1454 &self,
1455 product_type: BybitProductType,
1456 trader_id: TraderId,
1457 strategy_id: StrategyId,
1458 instrument_id: InstrumentId,
1459 client_order_id: ClientOrderId,
1460 order_side: OrderSide,
1461 order_type: OrderType,
1462 quantity: Quantity,
1463 is_quote_quantity: bool,
1464 time_in_force: Option<TimeInForce>,
1465 price: Option<Price>,
1466 trigger_price: Option<Price>,
1467 post_only: Option<bool>,
1468 reduce_only: Option<bool>,
1469 is_leverage: bool,
1470 ) -> BybitWsResult<()> {
1471 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1472 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1473 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1474
1475 let bybit_side = match order_side {
1476 OrderSide::Buy => BybitOrderSide::Buy,
1477 OrderSide::Sell => BybitOrderSide::Sell,
1478 _ => {
1479 return Err(BybitWsError::ClientError(format!(
1480 "Invalid order side: {order_side:?}"
1481 )));
1482 }
1483 };
1484
1485 let (bybit_order_type, is_stop_order) = match order_type {
1487 OrderType::Market => (BybitOrderType::Market, false),
1488 OrderType::Limit => (BybitOrderType::Limit, false),
1489 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1490 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1491 _ => {
1492 return Err(BybitWsError::ClientError(format!(
1493 "Unsupported order type: {order_type:?}"
1494 )));
1495 }
1496 };
1497
1498 let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1499 None
1500 } else if post_only == Some(true) {
1501 Some(BybitTimeInForce::PostOnly)
1502 } else if let Some(tif) = time_in_force {
1503 Some(match tif {
1504 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1505 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1506 TimeInForce::Fok => BybitTimeInForce::Fok,
1507 _ => {
1508 return Err(BybitWsError::ClientError(format!(
1509 "Unsupported time in force: {tif:?}"
1510 )));
1511 }
1512 })
1513 } else {
1514 None
1515 };
1516
1517 let market_unit = if product_type == BybitProductType::Spot
1520 && bybit_order_type == BybitOrderType::Market
1521 {
1522 if is_quote_quantity {
1523 Some(BYBIT_QUOTE_COIN.to_string())
1524 } else {
1525 Some(BYBIT_BASE_COIN.to_string())
1526 }
1527 } else {
1528 None
1529 };
1530
1531 let is_leverage_value = if product_type == BybitProductType::Spot {
1533 Some(i32::from(is_leverage))
1534 } else {
1535 None
1536 };
1537
1538 let trigger_direction = if is_stop_order {
1541 match (order_type, order_side) {
1542 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1543 Some(BybitTriggerDirection::RisesTo as i32)
1544 }
1545 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1546 Some(BybitTriggerDirection::FallsTo as i32)
1547 }
1548 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1549 Some(BybitTriggerDirection::FallsTo as i32)
1550 }
1551 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1552 Some(BybitTriggerDirection::RisesTo as i32)
1553 }
1554 _ => None,
1555 }
1556 } else {
1557 None
1558 };
1559
1560 let params = if is_stop_order {
1561 BybitWsPlaceOrderParams {
1564 category: product_type,
1565 symbol: raw_symbol,
1566 side: bybit_side,
1567 order_type: bybit_order_type,
1568 qty: quantity.to_string(),
1569 is_leverage: is_leverage_value,
1570 market_unit: market_unit.clone(),
1571 price: price.map(|p| p.to_string()),
1572 time_in_force: bybit_tif,
1573 order_link_id: Some(client_order_id.to_string()),
1574 reduce_only: reduce_only.filter(|&r| r),
1575 close_on_trigger: None,
1576 trigger_price: trigger_price.map(|p| p.to_string()),
1577 trigger_by: Some(BybitTriggerType::LastPrice),
1578 trigger_direction,
1579 tpsl_mode: None, take_profit: None,
1581 stop_loss: None,
1582 tp_trigger_by: None,
1583 sl_trigger_by: None,
1584 sl_trigger_price: None, tp_trigger_price: None, sl_order_type: None,
1587 tp_order_type: None,
1588 sl_limit_price: None,
1589 tp_limit_price: None,
1590 }
1591 } else {
1592 BybitWsPlaceOrderParams {
1594 category: product_type,
1595 symbol: raw_symbol,
1596 side: bybit_side,
1597 order_type: bybit_order_type,
1598 qty: quantity.to_string(),
1599 is_leverage: is_leverage_value,
1600 market_unit,
1601 price: price.map(|p| p.to_string()),
1602 time_in_force: if bybit_order_type == BybitOrderType::Market {
1603 None
1604 } else {
1605 bybit_tif
1606 },
1607 order_link_id: Some(client_order_id.to_string()),
1608 reduce_only: reduce_only.filter(|&r| r),
1609 close_on_trigger: None,
1610 trigger_price: None,
1611 trigger_by: None,
1612 trigger_direction: None,
1613 tpsl_mode: None,
1614 take_profit: None,
1615 stop_loss: None,
1616 tp_trigger_by: None,
1617 sl_trigger_by: None,
1618 sl_trigger_price: None,
1619 tp_trigger_price: None,
1620 sl_order_type: None,
1621 tp_order_type: None,
1622 sl_limit_price: None,
1623 tp_limit_price: None,
1624 }
1625 };
1626
1627 self.place_order(
1628 params,
1629 client_order_id,
1630 trader_id,
1631 strategy_id,
1632 instrument_id,
1633 )
1634 .await
1635 }
1636
1637 #[allow(clippy::too_many_arguments)]
1643 pub async fn modify_order(
1644 &self,
1645 product_type: BybitProductType,
1646 trader_id: TraderId,
1647 strategy_id: StrategyId,
1648 instrument_id: InstrumentId,
1649 client_order_id: ClientOrderId,
1650 venue_order_id: Option<VenueOrderId>,
1651 quantity: Option<Quantity>,
1652 price: Option<Price>,
1653 ) -> BybitWsResult<()> {
1654 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1655 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1656 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1657
1658 let params = BybitWsAmendOrderParams {
1659 category: product_type,
1660 symbol: raw_symbol,
1661 order_id: venue_order_id.map(|id| id.to_string()),
1662 order_link_id: Some(client_order_id.to_string()),
1663 qty: quantity.map(|q| q.to_string()),
1664 price: price.map(|p| p.to_string()),
1665 trigger_price: None,
1666 take_profit: None,
1667 stop_loss: None,
1668 tp_trigger_by: None,
1669 sl_trigger_by: None,
1670 };
1671
1672 self.amend_order(
1673 params,
1674 client_order_id,
1675 trader_id,
1676 strategy_id,
1677 instrument_id,
1678 venue_order_id,
1679 )
1680 .await
1681 }
1682
1683 pub async fn cancel_order_by_id(
1689 &self,
1690 product_type: BybitProductType,
1691 trader_id: TraderId,
1692 strategy_id: StrategyId,
1693 instrument_id: InstrumentId,
1694 client_order_id: ClientOrderId,
1695 venue_order_id: Option<VenueOrderId>,
1696 ) -> BybitWsResult<()> {
1697 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1698 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1699 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1700
1701 let params = BybitWsCancelOrderParams {
1702 category: product_type,
1703 symbol: raw_symbol,
1704 order_id: venue_order_id.map(|id| id.to_string()),
1705 order_link_id: Some(client_order_id.to_string()),
1706 };
1707
1708 self.cancel_order(
1709 params,
1710 client_order_id,
1711 trader_id,
1712 strategy_id,
1713 instrument_id,
1714 venue_order_id,
1715 )
1716 .await
1717 }
1718
1719 pub async fn batch_cancel_orders_by_id(
1725 &self,
1726 product_type: BybitProductType,
1727 trader_id: TraderId,
1728 strategy_id: StrategyId,
1729 instrument_ids: Vec<InstrumentId>,
1730 venue_order_ids: Vec<Option<VenueOrderId>>,
1731 client_order_ids: Vec<Option<ClientOrderId>>,
1732 ) -> BybitWsResult<()> {
1733 if instrument_ids.len() != venue_order_ids.len()
1734 || instrument_ids.len() != client_order_ids.len()
1735 {
1736 return Err(BybitWsError::ClientError(
1737 "instrument_ids, venue_order_ids, and client_order_ids must have the same length"
1738 .to_string(),
1739 ));
1740 }
1741
1742 let batch_req_id = UUID4::new().to_string();
1743
1744 let mut params_vec = Vec::new();
1745 let mut batch_cancel_data = Vec::new();
1746
1747 for ((instrument_id, venue_order_id), client_order_id) in instrument_ids
1748 .into_iter()
1749 .zip(venue_order_ids.into_iter())
1750 .zip(client_order_ids.into_iter())
1751 {
1752 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1753 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1754 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1755
1756 let params = BybitWsCancelOrderParams {
1757 category: product_type,
1758 symbol: raw_symbol,
1759 order_id: venue_order_id.map(|id| id.to_string()),
1760 order_link_id: client_order_id.as_ref().map(|id| id.to_string()),
1761 };
1762
1763 params_vec.push(params);
1764
1765 if let Some(client_order_id) = client_order_id {
1766 batch_cancel_data.push((
1767 client_order_id,
1768 (
1769 client_order_id,
1770 trader_id,
1771 strategy_id,
1772 instrument_id,
1773 venue_order_id,
1774 ),
1775 ));
1776 }
1777 }
1778
1779 if !batch_cancel_data.is_empty() {
1780 let cmd = HandlerCommand::RegisterBatchCancel {
1781 req_id: batch_req_id.clone(),
1782 cancels: batch_cancel_data,
1783 };
1784 let cmd_tx = self.cmd_tx.read().await;
1785 if let Err(e) = cmd_tx.send(cmd) {
1786 tracing::error!("Failed to send RegisterBatchCancel command: {e}");
1787 }
1788 }
1789
1790 self.batch_cancel_orders_with_req_id(params_vec, batch_req_id)
1791 .await
1792 }
1793
1794 async fn batch_cancel_orders_with_req_id(
1796 &self,
1797 orders: Vec<BybitWsCancelOrderParams>,
1798 req_id: String,
1799 ) -> BybitWsResult<()> {
1800 if !self.is_authenticated.load(Ordering::Relaxed) {
1801 return Err(BybitWsError::Authentication(
1802 "Must be authenticated to cancel orders".to_string(),
1803 ));
1804 }
1805
1806 if orders.is_empty() {
1807 return Ok(());
1808 }
1809
1810 if orders.len() > 20 {
1811 return Err(BybitWsError::ClientError(
1812 "Batch cancel limit is 20 orders per request".to_string(),
1813 ));
1814 }
1815
1816 let category = orders[0].category;
1818
1819 let request_items: Vec<BybitWsBatchCancelItem> = orders
1820 .into_iter()
1821 .map(|order| BybitWsBatchCancelItem {
1822 symbol: order.symbol,
1823 order_id: order.order_id,
1824 order_link_id: order.order_link_id,
1825 })
1826 .collect();
1827
1828 let args = BybitWsBatchCancelOrderArgs {
1829 category,
1830 request: request_items,
1831 };
1832
1833 let request = BybitWsRequest {
1834 req_id: Some(req_id),
1835 op: BybitWsOrderRequestOp::CancelBatch,
1836 header: BybitWsHeader::now(),
1837 args: vec![args],
1838 };
1839
1840 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1841
1842 self.send_text(&payload).await
1843 }
1844
1845 #[allow(clippy::too_many_arguments)]
1847 pub fn build_place_order_params(
1848 &self,
1849 product_type: BybitProductType,
1850 instrument_id: InstrumentId,
1851 client_order_id: ClientOrderId,
1852 order_side: OrderSide,
1853 order_type: OrderType,
1854 quantity: Quantity,
1855 is_quote_quantity: bool,
1856 time_in_force: Option<TimeInForce>,
1857 price: Option<Price>,
1858 trigger_price: Option<Price>,
1859 post_only: Option<bool>,
1860 reduce_only: Option<bool>,
1861 is_leverage: bool,
1862 ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1863 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1864 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1865 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1866
1867 let bybit_side = match order_side {
1868 OrderSide::Buy => BybitOrderSide::Buy,
1869 OrderSide::Sell => BybitOrderSide::Sell,
1870 _ => {
1871 return Err(BybitWsError::ClientError(format!(
1872 "Invalid order side: {order_side:?}"
1873 )));
1874 }
1875 };
1876
1877 let (bybit_order_type, is_stop_order) = match order_type {
1878 OrderType::Market => (BybitOrderType::Market, false),
1879 OrderType::Limit => (BybitOrderType::Limit, false),
1880 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1881 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1882 _ => {
1883 return Err(BybitWsError::ClientError(format!(
1884 "Unsupported order type: {order_type:?}"
1885 )));
1886 }
1887 };
1888
1889 let bybit_tif = if post_only == Some(true) {
1890 Some(BybitTimeInForce::PostOnly)
1891 } else if let Some(tif) = time_in_force {
1892 Some(match tif {
1893 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1894 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1895 TimeInForce::Fok => BybitTimeInForce::Fok,
1896 _ => {
1897 return Err(BybitWsError::ClientError(format!(
1898 "Unsupported time in force: {tif:?}"
1899 )));
1900 }
1901 })
1902 } else {
1903 None
1904 };
1905
1906 let market_unit = if product_type == BybitProductType::Spot
1907 && bybit_order_type == BybitOrderType::Market
1908 {
1909 if is_quote_quantity {
1910 Some(BYBIT_QUOTE_COIN.to_string())
1911 } else {
1912 Some(BYBIT_BASE_COIN.to_string())
1913 }
1914 } else {
1915 None
1916 };
1917
1918 let is_leverage_value = if product_type == BybitProductType::Spot {
1920 Some(i32::from(is_leverage))
1921 } else {
1922 None
1923 };
1924
1925 let trigger_direction = if is_stop_order {
1928 match (order_type, order_side) {
1929 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1930 Some(BybitTriggerDirection::RisesTo as i32)
1931 }
1932 (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1933 Some(BybitTriggerDirection::FallsTo as i32)
1934 }
1935 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1936 Some(BybitTriggerDirection::FallsTo as i32)
1937 }
1938 (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1939 Some(BybitTriggerDirection::RisesTo as i32)
1940 }
1941 _ => None,
1942 }
1943 } else {
1944 None
1945 };
1946
1947 let params = if is_stop_order {
1948 BybitWsPlaceOrderParams {
1949 category: product_type,
1950 symbol: raw_symbol,
1951 side: bybit_side,
1952 order_type: bybit_order_type,
1953 qty: quantity.to_string(),
1954 is_leverage: is_leverage_value,
1955 market_unit,
1956 price: price.map(|p| p.to_string()),
1957 time_in_force: if bybit_order_type == BybitOrderType::Market {
1958 None
1959 } else {
1960 bybit_tif
1961 },
1962 order_link_id: Some(client_order_id.to_string()),
1963 reduce_only: reduce_only.filter(|&r| r),
1964 close_on_trigger: None,
1965 trigger_price: trigger_price.map(|p| p.to_string()),
1966 trigger_by: Some(BybitTriggerType::LastPrice),
1967 trigger_direction,
1968 tpsl_mode: None,
1969 take_profit: None,
1970 stop_loss: None,
1971 tp_trigger_by: None,
1972 sl_trigger_by: None,
1973 sl_trigger_price: None,
1974 tp_trigger_price: None,
1975 sl_order_type: None,
1976 tp_order_type: None,
1977 sl_limit_price: None,
1978 tp_limit_price: None,
1979 }
1980 } else {
1981 BybitWsPlaceOrderParams {
1982 category: product_type,
1983 symbol: raw_symbol,
1984 side: bybit_side,
1985 order_type: bybit_order_type,
1986 qty: quantity.to_string(),
1987 is_leverage: is_leverage_value,
1988 market_unit,
1989 price: price.map(|p| p.to_string()),
1990 time_in_force: if bybit_order_type == BybitOrderType::Market {
1991 None
1992 } else {
1993 bybit_tif
1994 },
1995 order_link_id: Some(client_order_id.to_string()),
1996 reduce_only: reduce_only.filter(|&r| r),
1997 close_on_trigger: None,
1998 trigger_price: None,
1999 trigger_by: None,
2000 trigger_direction: None,
2001 tpsl_mode: None,
2002 take_profit: None,
2003 stop_loss: None,
2004 tp_trigger_by: None,
2005 sl_trigger_by: None,
2006 sl_trigger_price: None,
2007 tp_trigger_price: None,
2008 sl_order_type: None,
2009 tp_order_type: None,
2010 sl_limit_price: None,
2011 tp_limit_price: None,
2012 }
2013 };
2014
2015 Ok(params)
2016 }
2017
2018 #[allow(clippy::too_many_arguments)]
2020 pub fn build_amend_order_params(
2021 &self,
2022 product_type: BybitProductType,
2023 instrument_id: InstrumentId,
2024 venue_order_id: Option<VenueOrderId>,
2025 client_order_id: Option<ClientOrderId>,
2026 quantity: Option<Quantity>,
2027 price: Option<Price>,
2028 ) -> BybitWsResult<BybitWsAmendOrderParams> {
2029 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2030 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2031 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2032
2033 Ok(BybitWsAmendOrderParams {
2034 category: product_type,
2035 symbol: raw_symbol,
2036 order_id: venue_order_id.map(|v| v.to_string()),
2037 order_link_id: client_order_id.map(|c| c.to_string()),
2038 qty: quantity.map(|q| q.to_string()),
2039 price: price.map(|p| p.to_string()),
2040 trigger_price: None,
2041 take_profit: None,
2042 stop_loss: None,
2043 tp_trigger_by: None,
2044 sl_trigger_by: None,
2045 })
2046 }
2047
2048 fn default_headers() -> Vec<(String, String)> {
2049 vec![
2050 ("Content-Type".to_string(), "application/json".to_string()),
2051 ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2052 ]
2053 }
2054
2055 async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2056 if !self.requires_auth {
2057 return Ok(());
2058 }
2059
2060 let credential = self.credential.as_ref().ok_or_else(|| {
2061 BybitWsError::Authentication("Credentials required for authentication".to_string())
2062 })?;
2063
2064 let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2065 let signature = credential.sign_websocket_auth(expires);
2066
2067 let auth_message = BybitAuthRequest {
2068 op: BybitWsOperation::Auth,
2069 args: vec![
2070 Value::String(credential.api_key().to_string()),
2071 Value::Number(expires.into()),
2072 Value::String(signature),
2073 ],
2074 };
2075
2076 let payload = serde_json::to_string(&auth_message)?;
2077
2078 self.cmd_tx
2079 .read()
2080 .await
2081 .send(HandlerCommand::Authenticate { payload })
2082 .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2083
2084 Ok(())
2087 }
2088
2089 async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2090 let cmd = HandlerCommand::SendText {
2091 payload: text.to_string(),
2092 };
2093
2094 self.send_cmd(cmd).await
2095 }
2096
2097 async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2098 self.cmd_tx
2099 .read()
2100 .await
2101 .send(cmd)
2102 .map_err(|e| BybitWsError::Send(e.to_string()))
2103 }
2104}
2105
2106#[cfg(test)]
2111mod tests {
2112 use rstest::rstest;
2113
2114 use super::*;
2115 use crate::{
2116 common::testing::load_test_json,
2117 websocket::{handler::FeedHandler, messages::BybitWsMessage},
2118 };
2119
2120 #[rstest]
2121 fn classify_orderbook_snapshot() {
2122 let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2123 .expect("invalid fixture");
2124 let message =
2125 FeedHandler::classify_bybit_message(&json).expect("expected orderbook message");
2126 assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2127 }
2128
2129 #[rstest]
2130 fn classify_trade_snapshot() {
2131 let json: Value =
2132 serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2133 let message = FeedHandler::classify_bybit_message(&json).expect("expected trade message");
2134 assert!(matches!(message, BybitWsMessage::Trade(_)));
2135 }
2136
2137 #[rstest]
2138 fn classify_ticker_linear_snapshot() {
2139 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2140 .expect("invalid fixture");
2141 let message = FeedHandler::classify_bybit_message(&json).expect("expected ticker message");
2142 assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2143 }
2144
2145 #[rstest]
2146 fn classify_ticker_option_snapshot() {
2147 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2148 .expect("invalid fixture");
2149 let message = FeedHandler::classify_bybit_message(&json).expect("expected ticker message");
2150 assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2151 }
2152
2153 #[rstest]
2154 fn test_race_unsubscribe_failure_recovery() {
2155 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2163
2164 subscriptions.mark_subscribe(topic);
2166 subscriptions.confirm_subscribe(topic);
2167 assert_eq!(subscriptions.len(), 1);
2168
2169 subscriptions.mark_unsubscribe(topic);
2171 assert_eq!(subscriptions.len(), 0);
2172 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2173
2174 subscriptions.confirm_unsubscribe(topic); subscriptions.mark_subscribe(topic); subscriptions.confirm_subscribe(topic); assert_eq!(subscriptions.len(), 1);
2182 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2183 assert!(subscriptions.pending_subscribe_topics().is_empty());
2184
2185 let all = subscriptions.all_topics();
2187 assert_eq!(all.len(), 1);
2188 assert!(all.contains(&topic.to_string()));
2189 }
2190
2191 #[rstest]
2192 fn test_race_resubscribe_before_unsubscribe_ack() {
2193 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "orderbook.50.BTCUSDT";
2199
2200 subscriptions.mark_subscribe(topic);
2202 subscriptions.confirm_subscribe(topic);
2203 assert_eq!(subscriptions.len(), 1);
2204
2205 subscriptions.mark_unsubscribe(topic);
2207 assert_eq!(subscriptions.len(), 0);
2208 assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2209
2210 subscriptions.mark_subscribe(topic);
2212 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2213
2214 subscriptions.confirm_unsubscribe(topic);
2216 assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2217 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2218
2219 subscriptions.confirm_subscribe(topic);
2221 assert_eq!(subscriptions.len(), 1);
2222 assert!(subscriptions.pending_subscribe_topics().is_empty());
2223
2224 let all = subscriptions.all_topics();
2226 assert_eq!(all.len(), 1);
2227 assert!(all.contains(&topic.to_string()));
2228 }
2229
2230 #[rstest]
2231 fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2232 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "tickers.ETHUSDT";
2237
2238 subscriptions.mark_subscribe(topic);
2240 assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2241
2242 subscriptions.mark_unsubscribe(topic);
2244 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2246
2247 subscriptions.confirm_subscribe(topic);
2249 assert_eq!(subscriptions.len(), 0); assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2251
2252 subscriptions.confirm_unsubscribe(topic);
2254
2255 assert!(subscriptions.is_empty());
2257 assert!(subscriptions.all_topics().is_empty());
2258 }
2259
2260 #[rstest]
2261 fn test_race_reconnection_with_pending_states() {
2262 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let trade_btc = "publicTrade.BTCUSDT";
2268 subscriptions.mark_subscribe(trade_btc);
2269 subscriptions.confirm_subscribe(trade_btc);
2270
2271 let trade_eth = "publicTrade.ETHUSDT";
2273 subscriptions.mark_subscribe(trade_eth);
2274
2275 let book_btc = "orderbook.50.BTCUSDT";
2277 subscriptions.mark_subscribe(book_btc);
2278 subscriptions.confirm_subscribe(book_btc);
2279 subscriptions.mark_unsubscribe(book_btc);
2280
2281 let topics_to_restore = subscriptions.all_topics();
2283
2284 assert_eq!(topics_to_restore.len(), 2);
2286 assert!(topics_to_restore.contains(&trade_btc.to_string()));
2287 assert!(topics_to_restore.contains(&trade_eth.to_string()));
2288 assert!(!topics_to_restore.contains(&book_btc.to_string())); }
2290
2291 #[rstest]
2292 fn test_race_duplicate_subscribe_messages_idempotent() {
2293 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); let topic = "publicTrade.BTCUSDT";
2298
2299 subscriptions.mark_subscribe(topic);
2301 subscriptions.confirm_subscribe(topic);
2302 assert_eq!(subscriptions.len(), 1);
2303
2304 subscriptions.mark_subscribe(topic);
2306 assert!(subscriptions.pending_subscribe_topics().is_empty()); assert_eq!(subscriptions.len(), 1); subscriptions.confirm_subscribe(topic);
2311 assert_eq!(subscriptions.len(), 1);
2312
2313 let all = subscriptions.all_topics();
2315 assert_eq!(all.len(), 1);
2316 assert_eq!(all[0], topic);
2317 }
2318
2319 #[rstest]
2320 #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2321 #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2322 #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2323 #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2324 #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2325 #[case::option_with_leverage(BybitProductType::Option, true, None)]
2326 fn test_is_leverage_parameter(
2327 #[case] product_type: BybitProductType,
2328 #[case] is_leverage: bool,
2329 #[case] expected: Option<i32>,
2330 ) {
2331 let symbol = match product_type {
2332 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2333 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2334 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2335 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2336 };
2337
2338 let instrument_id = InstrumentId::from(symbol);
2339 let client_order_id = ClientOrderId::from("test-order-1");
2340 let quantity = Quantity::from("1.0");
2341
2342 let client = BybitWebSocketClient::new_trade(
2343 BybitEnvironment::Testnet,
2344 Credential::new("test-key", "test-secret"),
2345 None,
2346 Some(20),
2347 );
2348
2349 let params = client
2350 .build_place_order_params(
2351 product_type,
2352 instrument_id,
2353 client_order_id,
2354 OrderSide::Buy,
2355 OrderType::Limit,
2356 quantity,
2357 false, Some(TimeInForce::Gtc),
2359 Some(Price::from("50000.0")),
2360 None,
2361 None,
2362 None,
2363 is_leverage,
2364 )
2365 .expect("Failed to build params");
2366
2367 assert_eq!(params.is_leverage, expected);
2368 }
2369
2370 #[rstest]
2371 #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2372 #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2373 #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2374 #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2375 #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2376 #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2377 fn test_is_quote_quantity_parameter(
2378 #[case] product_type: BybitProductType,
2379 #[case] order_type: OrderType,
2380 #[case] is_quote_quantity: bool,
2381 #[case] expected: Option<String>,
2382 ) {
2383 let symbol = match product_type {
2384 BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2385 BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2386 BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2387 BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2388 };
2389
2390 let instrument_id = InstrumentId::from(symbol);
2391 let client_order_id = ClientOrderId::from("test-order-1");
2392 let quantity = Quantity::from("1.0");
2393
2394 let client = BybitWebSocketClient::new_trade(
2395 BybitEnvironment::Testnet,
2396 Credential::new("test-key", "test-secret"),
2397 None,
2398 Some(20),
2399 );
2400
2401 let params = client
2402 .build_place_order_params(
2403 product_type,
2404 instrument_id,
2405 client_order_id,
2406 OrderSide::Buy,
2407 order_type,
2408 quantity,
2409 is_quote_quantity,
2410 Some(TimeInForce::Gtc),
2411 if order_type == OrderType::Market {
2412 None
2413 } else {
2414 Some(Price::from("50000.0"))
2415 },
2416 None,
2417 None,
2418 None,
2419 false,
2420 )
2421 .expect("Failed to build params");
2422
2423 assert_eq!(params.market_unit, expected);
2424 }
2425}