Skip to main content

nautilus_dydx/common/
instrument_cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Thread-safe instrument cache for dYdX adapter.
17//!
18//! This module provides a centralized cache for instrument data that is shared
19//! between HTTP client, WebSocket client, and execution client via `Arc`.
20//!
21//! # Design
22//!
23//! dYdX uses different identifiers in different contexts:
24//! - **InstrumentId** ("BTC-USD-PERP.DYDX"): Nautilus internal identifier (primary key)
25//! - **Market ticker** ("BTC-USD"): Used in public WebSocket channels
26//! - **clob_pair_id** (0, 1, 2...): Used in blockchain transactions and order messages
27//!
28//! This cache provides O(1) lookups by any of these identifiers through internal indices.
29//! Using `InstrumentId` as the primary key provides better type safety and eliminates
30//! redundant conversions.
31//!
32//! # Thread Safety
33//!
34//! All operations use `DashMap` for lock-free concurrent access. The cache can be
35//! safely shared across multiple async tasks via `Arc<InstrumentCache>`.
36
37use std::sync::atomic::{AtomicBool, Ordering};
38
39use dashmap::DashMap;
40use nautilus_model::{
41    identifiers::InstrumentId,
42    instruments::{Instrument, InstrumentAny},
43};
44use ustr::Ustr;
45
46use crate::{grpc::OrderMarketParams, http::models::PerpetualMarket};
47
48/// Thread-safe instrument cache with multiple lookup indices.
49///
50/// Shared between HTTP client, WebSocket client, and execution client via `Arc`.
51/// Provides O(1) lookups by `InstrumentId`, market ticker, or clob_pair_id.
52
53#[derive(Debug, Default)]
54pub struct InstrumentCache {
55    /// Primary storage: InstrumentId → InstrumentAny
56    instruments: DashMap<InstrumentId, InstrumentAny>,
57    /// Index: clob_pair_id (0, 1, 2...) → InstrumentId (direct lookup)
58    clob_pair_id_index: DashMap<u32, InstrumentId>,
59    /// Index: market ticker ("BTC-USD") → InstrumentId (direct lookup)
60    market_index: DashMap<Ustr, InstrumentId>,
61    /// Market parameters: InstrumentId → PerpetualMarket
62    market_params: DashMap<InstrumentId, PerpetualMarket>,
63    /// Whether cache has been initialized with instrument data
64    initialized: AtomicBool,
65}
66
67impl InstrumentCache {
68    /// Creates a new empty instrument cache.
69    #[must_use]
70    pub fn new() -> Self {
71        Self::default()
72    }
73
74    /// Inserts an instrument with its market data.
75    ///
76    /// This populates the primary storage and all lookup indices.
77    pub fn insert(&self, instrument: InstrumentAny, market: PerpetualMarket) {
78        let instrument_id = instrument.id();
79        let ticker = Ustr::from(&market.ticker);
80        let clob_pair_id = market.clob_pair_id;
81
82        // Primary storage
83        self.instruments.insert(instrument_id, instrument);
84
85        // Build indices for reverse lookups (now point directly to InstrumentId)
86        self.clob_pair_id_index.insert(clob_pair_id, instrument_id);
87        self.market_index.insert(ticker, instrument_id);
88
89        // Store full market params for order building
90        self.market_params.insert(instrument_id, market);
91    }
92
93    /// Bulk inserts instruments with their market data.
94    ///
95    /// Marks the cache as initialized after insertion.
96    pub fn insert_many(&self, items: Vec<(InstrumentAny, PerpetualMarket)>) {
97        for (instrument, market) in items {
98            self.insert(instrument, market);
99        }
100        self.initialized.store(true, Ordering::Release);
101    }
102
103    /// Clears all cached data.
104    ///
105    /// Useful for refreshing instruments from the API.
106    pub fn clear(&self) {
107        self.instruments.clear();
108        self.clob_pair_id_index.clear();
109        self.market_index.clear();
110        self.market_params.clear();
111        self.initialized.store(false, Ordering::Release);
112    }
113
114    /// Inserts an instrument without market data (InstrumentId lookup only).
115    ///
116    /// Use this for caching instruments when market params are not available.
117    /// Note: `get_by_clob_id()` and `get_by_market()` won't work for instruments
118    /// inserted this way - only `get()` by InstrumentId will work.
119    pub fn insert_instrument_only(&self, instrument: InstrumentAny) {
120        let instrument_id = instrument.id();
121        self.instruments.insert(instrument_id, instrument);
122    }
123
124    /// Bulk inserts instruments without market data.
125    ///
126    /// Marks the cache as initialized after insertion.
127    pub fn insert_instruments_only(&self, instruments: Vec<InstrumentAny>) {
128        for instrument in instruments {
129            self.insert_instrument_only(instrument);
130        }
131        self.initialized.store(true, Ordering::Release);
132    }
133
134    /// Gets an instrument by InstrumentId.
135    #[must_use]
136    pub fn get(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
137        self.instruments.get(instrument_id).map(|r| r.clone())
138    }
139
140    /// Gets an instrument by market ticker (e.g., "BTC-USD").
141    ///
142    /// This is the identifier used in public WebSocket channels.
143    #[must_use]
144    pub fn get_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
145        let ticker_ustr = Ustr::from(ticker);
146        self.market_index
147            .get(&ticker_ustr)
148            .and_then(|instrument_id| self.instruments.get(&*instrument_id).map(|r| r.clone()))
149    }
150
151    /// Gets an instrument by clob_pair_id (e.g., 0, 1, 2).
152    ///
153    /// This is the identifier used in blockchain transactions and order messages.
154    #[must_use]
155    pub fn get_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
156        self.clob_pair_id_index
157            .get(&clob_pair_id)
158            .and_then(|instrument_id| self.instruments.get(&*instrument_id).map(|r| r.clone()))
159    }
160
161    /// Gets an InstrumentId by clob_pair_id.
162    ///
163    /// Returns directly from index without cloning full instrument.
164    #[must_use]
165    pub fn get_id_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentId> {
166        self.clob_pair_id_index.get(&clob_pair_id).map(|r| *r)
167    }
168
169    /// Gets an InstrumentId by market ticker.
170    ///
171    /// Returns directly from index without cloning full instrument.
172    #[must_use]
173    pub fn get_id_by_market(&self, ticker: &str) -> Option<InstrumentId> {
174        let ticker_ustr = Ustr::from(ticker);
175        self.market_index.get(&ticker_ustr).map(|r| *r)
176    }
177
178    /// Gets full market parameters by InstrumentId.
179    ///
180    /// Returns the complete `PerpetualMarket` data including margin requirements,
181    /// quantization parameters, and current oracle price.
182    #[must_use]
183    pub fn get_market_params(&self, instrument_id: &InstrumentId) -> Option<PerpetualMarket> {
184        self.market_params.get(instrument_id).map(|r| r.clone())
185    }
186
187    /// Gets order market parameters for order building.
188    ///
189    /// Returns the subset of market data needed for constructing orders
190    /// (quantization, clob_pair_id, etc.).
191    #[must_use]
192    pub fn get_order_market_params(
193        &self,
194        instrument_id: &InstrumentId,
195    ) -> Option<OrderMarketParams> {
196        self.get_market_params(instrument_id)
197            .map(|market| OrderMarketParams {
198                atomic_resolution: market.atomic_resolution,
199                clob_pair_id: market.clob_pair_id,
200                oracle_price: Some(market.oracle_price),
201                quantum_conversion_exponent: market.quantum_conversion_exponent,
202                step_base_quantums: market.step_base_quantums,
203                subticks_per_tick: market.subticks_per_tick,
204            })
205    }
206
207    /// Updates oracle price for a market.
208    ///
209    /// Called when receiving price updates via WebSocket `v4_markets` channel.
210    pub fn update_oracle_price(&self, ticker: &str, oracle_price: rust_decimal::Decimal) {
211        let ticker_ustr = Ustr::from(ticker);
212        if let Some(instrument_id) = self.market_index.get(&ticker_ustr)
213            && let Some(mut market) = self.market_params.get_mut(&*instrument_id)
214        {
215            market.oracle_price = oracle_price;
216        }
217    }
218
219    /// Returns whether the cache has been initialized with instrument data.
220    #[must_use]
221    pub fn is_initialized(&self) -> bool {
222        self.initialized.load(Ordering::Acquire)
223    }
224
225    /// Returns the number of cached instruments.
226    #[must_use]
227    pub fn len(&self) -> usize {
228        self.instruments.len()
229    }
230
231    /// Returns whether the cache is empty.
232    #[must_use]
233    pub fn is_empty(&self) -> bool {
234        self.instruments.is_empty()
235    }
236
237    /// Returns all cached instruments.
238    ///
239    /// Useful for WebSocket handler initialization and instrument replay.
240    #[must_use]
241    pub fn all_instruments(&self) -> Vec<InstrumentAny> {
242        self.instruments.iter().map(|r| r.clone()).collect()
243    }
244
245    /// Returns all InstrumentIds.
246    #[must_use]
247    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
248        self.instruments.iter().map(|r| r.value().id()).collect()
249    }
250
251    /// Checks if an instrument exists by InstrumentId.
252    #[must_use]
253    pub fn contains(&self, instrument_id: &InstrumentId) -> bool {
254        self.instruments.contains_key(instrument_id)
255    }
256
257    /// Checks if an instrument exists by clob_pair_id.
258    #[must_use]
259    pub fn contains_clob_id(&self, clob_pair_id: u32) -> bool {
260        self.clob_pair_id_index.contains_key(&clob_pair_id)
261    }
262
263    /// Checks if an instrument exists by market ticker (e.g., "BTC-USD").
264    #[must_use]
265    pub fn contains_market(&self, ticker: &str) -> bool {
266        let ticker_ustr = Ustr::from(ticker);
267        self.market_index.contains_key(&ticker_ustr)
268    }
269
270    /// Returns a HashMap of all instruments keyed by InstrumentId.
271    ///
272    /// This is useful for parsing functions that expect `HashMap<InstrumentId, InstrumentAny>`.
273    /// Note: Creates a snapshot copy, so frequent calls should be avoided.
274    #[must_use]
275    pub fn to_instrument_id_map(&self) -> std::collections::HashMap<InstrumentId, InstrumentAny> {
276        self.instruments
277            .iter()
278            .map(|entry| (entry.value().id(), entry.value().clone()))
279            .collect()
280    }
281
282    /// Returns a HashMap of oracle prices keyed by InstrumentId.
283    ///
284    /// This is useful for parsing functions like `parse_account_state` that need oracle prices.
285    /// Note: Creates a snapshot copy, so frequent calls should be avoided.
286    #[must_use]
287    pub fn to_oracle_prices_map(
288        &self,
289    ) -> std::collections::HashMap<InstrumentId, rust_decimal::Decimal> {
290        self.market_params
291            .iter()
292            .map(|entry| (*entry.key(), entry.value().oracle_price))
293            .collect()
294    }
295
296    /// Logs a warning about a missing instrument for a clob_pair_id, listing known mappings.
297    pub fn log_missing_clob_pair_id(&self, clob_pair_id: u32) {
298        let known: Vec<(u32, String)> = self
299            .clob_pair_id_index
300            .iter()
301            .map(|entry| (*entry.key(), entry.value().symbol.as_str().to_string()))
302            .collect();
303
304        log::warn!(
305            "Instrument for clob_pair_id {clob_pair_id} not found in cache. \
306             Known CLOB pair IDs and symbols: {known:?}"
307        );
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use nautilus_core::UnixNanos;
314    use nautilus_model::{
315        identifiers::{InstrumentId, Symbol, Venue},
316        instruments::{CryptoPerpetual, InstrumentAny},
317        types::{Currency, Price, Quantity},
318    };
319    use rstest::rstest;
320    use rust_decimal_macros::dec;
321    use ustr::Ustr;
322
323    use super::*;
324    use crate::common::enums::DydxMarketStatus;
325
326    fn create_test_instrument(symbol: &str) -> InstrumentAny {
327        let instrument_id = InstrumentId::new(Symbol::new(symbol), Venue::new("DYDX"));
328        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
329            instrument_id,
330            instrument_id.symbol,
331            Currency::BTC(),
332            Currency::USD(),
333            Currency::USD(),
334            false,
335            1,                       // price_precision
336            3,                       // size_precision
337            Price::new(0.1, 1),      // price_increment
338            Quantity::new(0.001, 3), // size_increment
339            None,                    // multiplier
340            None,                    // lot_size
341            None,                    // max_quantity
342            None,                    // min_quantity
343            None,                    // max_notional
344            None,                    // min_notional
345            None,                    // max_price
346            None,                    // min_price
347            None,                    // margin_init
348            None,                    // margin_maint
349            None,                    // maker_fee
350            None,                    // taker_fee
351            UnixNanos::default(),    // ts_event
352            UnixNanos::default(),    // ts_init
353        ))
354    }
355
356    fn create_test_market(ticker: &str, clob_pair_id: u32) -> PerpetualMarket {
357        PerpetualMarket {
358            clob_pair_id,
359            ticker: Ustr::from(ticker),
360            status: DydxMarketStatus::Active,
361            base_asset: Some(Ustr::from("BTC")),
362            quote_asset: Some(Ustr::from("USD")),
363            step_size: dec!(0.001),
364            tick_size: dec!(0.1),
365            index_price: Some(dec!(50000)),
366            oracle_price: dec!(50000),
367            price_change_24h: dec!(0),
368            next_funding_rate: dec!(0),
369            next_funding_at: None,
370            min_order_size: Some(dec!(0.001)),
371            market_type: None,
372            initial_margin_fraction: dec!(0.05),
373            maintenance_margin_fraction: dec!(0.03),
374            base_position_notional: None,
375            incremental_position_size: None,
376            incremental_initial_margin_fraction: None,
377            max_position_size: None,
378            open_interest: dec!(1000),
379            atomic_resolution: -10,
380            quantum_conversion_exponent: -9,
381            subticks_per_tick: 1000000,
382            step_base_quantums: 1000000,
383            is_reduce_only: false,
384        }
385    }
386
387    #[rstest]
388    fn test_insert_and_get() {
389        let cache = InstrumentCache::new();
390        let instrument = create_test_instrument("BTC-USD-PERP");
391        let instrument_id = instrument.id();
392        let market = create_test_market("BTC-USD", 0);
393
394        cache.insert(instrument, market);
395
396        // Get by InstrumentId
397        let retrieved = cache.get(&instrument_id);
398        assert!(retrieved.is_some());
399        assert_eq!(retrieved.unwrap().id().symbol.as_str(), "BTC-USD-PERP");
400    }
401
402    #[rstest]
403    fn test_get_by_market() {
404        let cache = InstrumentCache::new();
405        let instrument = create_test_instrument("BTC-USD-PERP");
406        let market = create_test_market("BTC-USD", 0);
407
408        cache.insert(instrument, market);
409
410        // Get by market ticker
411        let retrieved = cache.get_by_market("BTC-USD");
412        assert!(retrieved.is_some());
413        assert_eq!(retrieved.unwrap().id().symbol.as_str(), "BTC-USD-PERP");
414    }
415
416    #[rstest]
417    fn test_get_by_clob_id() {
418        let cache = InstrumentCache::new();
419        let instrument = create_test_instrument("BTC-USD-PERP");
420        let market = create_test_market("BTC-USD", 0);
421
422        cache.insert(instrument, market);
423
424        // Get by clob_pair_id
425        let retrieved = cache.get_by_clob_id(0);
426        assert!(retrieved.is_some());
427        assert_eq!(retrieved.unwrap().id().symbol.as_str(), "BTC-USD-PERP");
428
429        // Non-existent clob_pair_id
430        assert!(cache.get_by_clob_id(999).is_none());
431    }
432
433    #[rstest]
434    fn test_insert_many() {
435        let cache = InstrumentCache::new();
436
437        let items = vec![
438            (
439                create_test_instrument("BTC-USD-PERP"),
440                create_test_market("BTC-USD", 0),
441            ),
442            (
443                create_test_instrument("ETH-USD-PERP"),
444                create_test_market("ETH-USD", 1),
445            ),
446        ];
447
448        assert!(!cache.is_initialized());
449        cache.insert_many(items);
450        assert!(cache.is_initialized());
451
452        assert_eq!(cache.len(), 2);
453        assert!(cache.get_by_market("BTC-USD").is_some());
454        assert!(cache.get_by_market("ETH-USD").is_some());
455        assert!(cache.get_by_clob_id(0).is_some());
456        assert!(cache.get_by_clob_id(1).is_some());
457    }
458
459    #[rstest]
460    fn test_clear() {
461        let cache = InstrumentCache::new();
462        let instrument = create_test_instrument("BTC-USD-PERP");
463        let market = create_test_market("BTC-USD", 0);
464
465        cache.insert(instrument, market);
466        assert_eq!(cache.len(), 1);
467
468        cache.clear();
469        assert_eq!(cache.len(), 0);
470        assert!(!cache.is_initialized());
471    }
472
473    #[rstest]
474    fn test_get_market_params() {
475        let cache = InstrumentCache::new();
476        let instrument = create_test_instrument("BTC-USD-PERP");
477        let market = create_test_market("BTC-USD", 0);
478
479        cache.insert(instrument.clone(), market);
480
481        let params = cache.get_market_params(&instrument.id());
482        assert!(params.is_some());
483        let params = params.unwrap();
484        assert_eq!(params.clob_pair_id, 0);
485        assert_eq!(params.ticker, "BTC-USD");
486    }
487
488    #[rstest]
489    fn test_update_oracle_price() {
490        let cache = InstrumentCache::new();
491        let instrument = create_test_instrument("BTC-USD-PERP");
492        let market = create_test_market("BTC-USD", 0);
493
494        cache.insert(instrument.clone(), market);
495
496        // Initial oracle price
497        let params = cache.get_market_params(&instrument.id()).unwrap();
498        assert_eq!(params.oracle_price, dec!(50000));
499
500        // Update oracle price
501        cache.update_oracle_price("BTC-USD", dec!(55000));
502
503        let params = cache.get_market_params(&instrument.id()).unwrap();
504        assert_eq!(params.oracle_price, dec!(55000));
505    }
506
507    #[rstest]
508    fn test_to_oracle_prices_map() {
509        let cache = InstrumentCache::new();
510
511        let items = vec![
512            (
513                create_test_instrument("BTC-USD-PERP"),
514                create_test_market("BTC-USD", 0),
515            ),
516            (
517                create_test_instrument("ETH-USD-PERP"),
518                create_test_market("ETH-USD", 1),
519            ),
520        ];
521
522        cache.insert_many(items);
523
524        // Update one oracle price
525        cache.update_oracle_price("ETH-USD", dec!(3000));
526
527        let oracle_map = cache.to_oracle_prices_map();
528        assert_eq!(oracle_map.len(), 2);
529
530        // BTC-USD should have default 50000
531        let btc_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
532        assert_eq!(oracle_map.get(&btc_id), Some(&dec!(50000)));
533
534        // ETH-USD should have updated price 3000
535        let eth_id = InstrumentId::new(Symbol::new("ETH-USD-PERP"), Venue::new("DYDX"));
536        assert_eq!(oracle_map.get(&eth_id), Some(&dec!(3000)));
537    }
538}