nautilus_deribit/websocket/
messages.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data structures for Deribit WebSocket JSON-RPC messages.
17
18use nautilus_model::{
19    data::{Data, OrderBookDeltas},
20    instruments::InstrumentAny,
21};
22use serde::{Deserialize, Serialize};
23use ustr::Ustr;
24
25use super::enums::{DeribitBookAction, DeribitBookMsgType, DeribitHeartbeatType};
26pub use crate::common::rpc::{DeribitJsonRpcError, DeribitJsonRpcRequest, DeribitJsonRpcResponse};
27use crate::websocket::error::DeribitWsError;
28
29/// JSON-RPC subscription notification from Deribit.
30#[derive(Debug, Clone, Deserialize)]
31pub struct DeribitSubscriptionNotification<T> {
32    /// JSON-RPC version.
33    pub jsonrpc: String,
34    /// Method name (always "subscription").
35    pub method: String,
36    /// Subscription parameters containing channel and data.
37    pub params: DeribitSubscriptionParams<T>,
38}
39
40/// Subscription notification parameters.
41#[derive(Debug, Clone, Deserialize)]
42pub struct DeribitSubscriptionParams<T> {
43    /// Channel name (e.g., "trades.BTC-PERPETUAL.raw").
44    pub channel: String,
45    /// Channel-specific data.
46    pub data: T,
47}
48
49/// Authentication request parameters for client_signature grant.
50#[derive(Debug, Clone, Serialize)]
51pub struct DeribitAuthParams {
52    /// Grant type (client_signature for HMAC auth).
53    pub grant_type: String,
54    /// Client ID (API key).
55    pub client_id: String,
56    /// Unix timestamp in milliseconds.
57    pub timestamp: u64,
58    /// HMAC-SHA256 signature.
59    pub signature: String,
60    /// Random nonce.
61    pub nonce: String,
62    /// Data string (empty for WebSocket auth).
63    pub data: String,
64    /// Optional scope for session-based authentication.
65    /// Use "session:name" for persistent session auth (allows skipping access_token in private requests).
66    /// Use "connection" (default) for per-connection auth (requires access_token in each private request).
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub scope: Option<String>,
69}
70
71/// Token refresh request parameters.
72#[derive(Debug, Clone, Serialize)]
73pub struct DeribitRefreshTokenParams {
74    /// Grant type (always "refresh_token").
75    pub grant_type: String,
76    /// The refresh token obtained from authentication.
77    pub refresh_token: String,
78}
79
80/// Authentication response result.
81#[derive(Debug, Clone, Deserialize)]
82pub struct DeribitAuthResult {
83    /// Access token.
84    pub access_token: String,
85    /// Token expiration time in seconds.
86    pub expires_in: u64,
87    /// Refresh token.
88    pub refresh_token: String,
89    /// Granted scope.
90    pub scope: String,
91    /// Token type (bearer).
92    pub token_type: String,
93    /// Enabled features.
94    #[serde(default)]
95    pub enabled_features: Vec<String>,
96}
97
98/// Subscription request parameters.
99#[derive(Debug, Clone, Serialize)]
100pub struct DeribitSubscribeParams {
101    /// List of channels to subscribe to.
102    pub channels: Vec<String>,
103}
104
105/// Subscription response result.
106#[derive(Debug, Clone, Deserialize)]
107pub struct DeribitSubscribeResult(pub Vec<String>);
108
109/// Heartbeat enable request parameters.
110#[derive(Debug, Clone, Serialize)]
111pub struct DeribitHeartbeatParams {
112    /// Heartbeat interval in seconds (minimum 10).
113    pub interval: u64,
114}
115
116/// Heartbeat notification data.
117#[derive(Debug, Clone, Deserialize)]
118pub struct DeribitHeartbeatData {
119    /// Heartbeat type.
120    #[serde(rename = "type")]
121    pub heartbeat_type: DeribitHeartbeatType,
122}
123
124/// Trade data from trades.{instrument}.raw channel.
125#[derive(Debug, Clone, Deserialize)]
126pub struct DeribitTradeMsg {
127    /// Trade ID.
128    pub trade_id: String,
129    /// Instrument name.
130    pub instrument_name: Ustr,
131    /// Trade price.
132    pub price: f64,
133    /// Trade amount (contracts).
134    pub amount: f64,
135    /// Trade direction ("buy" or "sell").
136    pub direction: String,
137    /// Trade timestamp in milliseconds.
138    pub timestamp: u64,
139    /// Trade sequence number.
140    pub trade_seq: u64,
141    /// Tick direction (0-3).
142    pub tick_direction: i8,
143    /// Index price at trade time.
144    pub index_price: f64,
145    /// Mark price at trade time.
146    pub mark_price: f64,
147    /// IV (for options).
148    pub iv: Option<f64>,
149    /// Liquidation indicator.
150    pub liquidation: Option<String>,
151    /// Combo trade ID (if part of combo).
152    pub combo_trade_id: Option<i64>,
153    /// Block trade ID.
154    pub block_trade_id: Option<String>,
155    /// Combo ID.
156    pub combo_id: Option<String>,
157}
158
159/// Order book data from book.{instrument}.raw channel.
160#[derive(Debug, Clone, Deserialize)]
161pub struct DeribitBookMsg {
162    /// Message type (snapshot or change).
163    #[serde(rename = "type")]
164    pub msg_type: DeribitBookMsgType,
165    /// Instrument name.
166    pub instrument_name: Ustr,
167    /// Timestamp in milliseconds.
168    pub timestamp: u64,
169    /// Change ID for sequence tracking.
170    pub change_id: u64,
171    /// Previous change ID (for delta validation).
172    pub prev_change_id: Option<u64>,
173    /// Bid levels: [action, price, amount] where action is "new" for snapshot, "new"/"change"/"delete" for change.
174    pub bids: Vec<Vec<serde_json::Value>>,
175    /// Ask levels: [action, price, amount] where action is "new" for snapshot, "new"/"change"/"delete" for change.
176    pub asks: Vec<Vec<serde_json::Value>>,
177}
178
179/// Parsed order book level.
180#[derive(Debug, Clone)]
181pub struct DeribitBookLevel {
182    /// Price level.
183    pub price: f64,
184    /// Amount at this level.
185    pub amount: f64,
186    /// Action for delta updates.
187    pub action: Option<DeribitBookAction>,
188}
189
190/// Ticker data from ticker.{instrument}.raw channel.
191#[derive(Debug, Clone, Deserialize)]
192pub struct DeribitTickerMsg {
193    /// Instrument name.
194    pub instrument_name: Ustr,
195    /// Timestamp in milliseconds.
196    pub timestamp: u64,
197    /// Best bid price.
198    pub best_bid_price: Option<f64>,
199    /// Best bid amount.
200    pub best_bid_amount: Option<f64>,
201    /// Best ask price.
202    pub best_ask_price: Option<f64>,
203    /// Best ask amount.
204    pub best_ask_amount: Option<f64>,
205    /// Last trade price.
206    pub last_price: Option<f64>,
207    /// Mark price.
208    pub mark_price: f64,
209    /// Index price.
210    pub index_price: f64,
211    /// Open interest.
212    pub open_interest: f64,
213    /// Current funding rate (perpetuals).
214    pub current_funding: Option<f64>,
215    /// Funding 8h rate (perpetuals).
216    pub funding_8h: Option<f64>,
217    /// Settlement price (expired instruments).
218    pub settlement_price: Option<f64>,
219    /// 24h volume.
220    pub volume: Option<f64>,
221    /// 24h volume in USD.
222    pub volume_usd: Option<f64>,
223    /// 24h high.
224    pub high: Option<f64>,
225    /// 24h low.
226    pub low: Option<f64>,
227    /// 24h price change.
228    pub price_change: Option<f64>,
229    /// State of the instrument.
230    pub state: String,
231    // Options-specific fields
232    /// Greeks (options).
233    pub greeks: Option<DeribitGreeks>,
234    /// Underlying price (options).
235    pub underlying_price: Option<f64>,
236    /// Underlying index (options).
237    pub underlying_index: Option<String>,
238}
239
240/// Greeks for options.
241#[derive(Debug, Clone, Deserialize)]
242pub struct DeribitGreeks {
243    pub delta: f64,
244    pub gamma: f64,
245    pub vega: f64,
246    pub theta: f64,
247    pub rho: f64,
248}
249
250/// Quote data from quote.{instrument} channel.
251#[derive(Debug, Clone, Deserialize)]
252pub struct DeribitQuoteMsg {
253    /// Instrument name.
254    pub instrument_name: Ustr,
255    /// Timestamp in milliseconds.
256    pub timestamp: u64,
257    /// Best bid price.
258    pub best_bid_price: f64,
259    /// Best bid amount.
260    pub best_bid_amount: f64,
261    /// Best ask price.
262    pub best_ask_price: f64,
263    /// Best ask amount.
264    pub best_ask_amount: f64,
265}
266
267/// Raw Deribit WebSocket message variants.
268#[derive(Debug, Clone)]
269pub enum DeribitWsMessage {
270    /// JSON-RPC response to a request.
271    Response(DeribitJsonRpcResponse<serde_json::Value>),
272    /// Subscription notification (trade, book, ticker data).
273    Notification(DeribitSubscriptionNotification<serde_json::Value>),
274    /// Heartbeat message.
275    Heartbeat(DeribitHeartbeatData),
276    /// JSON-RPC error.
277    Error(DeribitJsonRpcError),
278    /// Reconnection event (internal).
279    Reconnected,
280}
281
282/// Deribit WebSocket error for external consumers.
283#[derive(Debug, Clone, Serialize, Deserialize)]
284pub struct DeribitWebSocketError {
285    /// Error code from Deribit.
286    pub code: i64,
287    /// Error message.
288    pub message: String,
289    /// Timestamp when error occurred.
290    pub timestamp: u64,
291}
292
293impl From<DeribitJsonRpcError> for DeribitWebSocketError {
294    fn from(err: DeribitJsonRpcError) -> Self {
295        Self {
296            code: err.code,
297            message: err.message,
298            timestamp: 0,
299        }
300    }
301}
302
303/// Normalized Nautilus domain message after parsing.
304#[derive(Debug, Clone)]
305pub enum NautilusWsMessage {
306    /// Market data (trades, bars, quotes).
307    Data(Vec<Data>),
308    /// Order book deltas.
309    Deltas(OrderBookDeltas),
310    /// Instrument definition update.
311    Instrument(Box<InstrumentAny>),
312    /// Error from venue.
313    Error(DeribitWsError),
314    /// Unhandled/raw message for debugging.
315    Raw(serde_json::Value),
316    /// Reconnection completed.
317    Reconnected,
318    /// Authentication succeeded with tokens.
319    Authenticated(Box<DeribitAuthResult>),
320}
321
322/// Parses a raw JSON message into a DeribitWsMessage.
323///
324/// # Errors
325///
326/// Returns an error if JSON parsing fails or the message format is unrecognized.
327pub fn parse_raw_message(text: &str) -> Result<DeribitWsMessage, DeribitWsError> {
328    let value: serde_json::Value =
329        serde_json::from_str(text).map_err(|e| DeribitWsError::Json(e.to_string()))?;
330
331    // Check for subscription notification (has "method": "subscription")
332    if let Some(method) = value.get("method").and_then(|m| m.as_str()) {
333        if method == "subscription" {
334            let notification: DeribitSubscriptionNotification<serde_json::Value> =
335                serde_json::from_value(value).map_err(|e| DeribitWsError::Json(e.to_string()))?;
336            return Ok(DeribitWsMessage::Notification(notification));
337        }
338        // Check for heartbeat
339        if method == "heartbeat"
340            && let Some(params) = value.get("params")
341        {
342            let heartbeat: DeribitHeartbeatData = serde_json::from_value(params.clone())
343                .map_err(|e| DeribitWsError::Json(e.to_string()))?;
344            return Ok(DeribitWsMessage::Heartbeat(heartbeat));
345        }
346    }
347
348    // Check for JSON-RPC response (has "id" field)
349    if value.get("id").is_some() {
350        // Check for error response
351        if value.get("error").is_some() {
352            let response: DeribitJsonRpcResponse<serde_json::Value> =
353                serde_json::from_value(value.clone())
354                    .map_err(|e| DeribitWsError::Json(e.to_string()))?;
355            if let Some(err) = response.error {
356                return Ok(DeribitWsMessage::Error(err));
357            }
358        }
359        // Success response
360        let response: DeribitJsonRpcResponse<serde_json::Value> =
361            serde_json::from_value(value).map_err(|e| DeribitWsError::Json(e.to_string()))?;
362        return Ok(DeribitWsMessage::Response(response));
363    }
364
365    // Fallback: try to parse as generic response
366    let response: DeribitJsonRpcResponse<serde_json::Value> =
367        serde_json::from_value(value).map_err(|e| DeribitWsError::Json(e.to_string()))?;
368    Ok(DeribitWsMessage::Response(response))
369}
370
371/// Extracts the instrument name from a channel string.
372///
373/// For example: "trades.BTC-PERPETUAL.raw" -> "BTC-PERPETUAL"
374pub fn extract_instrument_from_channel(channel: &str) -> Option<&str> {
375    let parts: Vec<&str> = channel.split('.').collect();
376    if parts.len() >= 2 {
377        Some(parts[1])
378    } else {
379        None
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use rstest::rstest;
386
387    use super::*;
388
389    #[rstest]
390    fn test_parse_subscription_notification() {
391        let json = r#"{
392            "jsonrpc": "2.0",
393            "method": "subscription",
394            "params": {
395                "channel": "trades.BTC-PERPETUAL.raw",
396                "data": [{"trade_id": "123", "price": 50000.0}]
397            }
398        }"#;
399
400        let msg = parse_raw_message(json).unwrap();
401        assert!(matches!(msg, DeribitWsMessage::Notification(_)));
402    }
403
404    #[rstest]
405    fn test_parse_response() {
406        let json = r#"{
407            "jsonrpc": "2.0",
408            "id": 1,
409            "result": ["trades.BTC-PERPETUAL.raw"],
410            "testnet": true,
411            "usIn": 1234567890,
412            "usOut": 1234567891,
413            "usDiff": 1
414        }"#;
415
416        let msg = parse_raw_message(json).unwrap();
417        assert!(matches!(msg, DeribitWsMessage::Response(_)));
418    }
419
420    #[rstest]
421    fn test_parse_error_response() {
422        let json = r#"{
423            "jsonrpc": "2.0",
424            "id": 1,
425            "error": {
426                "code": 10028,
427                "message": "too_many_requests"
428            }
429        }"#;
430
431        let msg = parse_raw_message(json).unwrap();
432        assert!(matches!(msg, DeribitWsMessage::Error(_)));
433    }
434
435    #[rstest]
436    fn test_extract_instrument_from_channel() {
437        assert_eq!(
438            extract_instrument_from_channel("trades.BTC-PERPETUAL.raw"),
439            Some("BTC-PERPETUAL")
440        );
441        assert_eq!(
442            extract_instrument_from_channel("book.ETH-25DEC25.raw"),
443            Some("ETH-25DEC25")
444        );
445        assert_eq!(extract_instrument_from_channel("platform_state"), None);
446    }
447}