1use std::{
19 fs,
20 num::NonZeroU64,
21 path::PathBuf,
22 str::FromStr,
23 sync::{Arc, RwLock},
24};
25
26use ahash::AHashMap;
27use databento::{
28 dbn::{self, decode::DbnMetadata},
29 historical::timeseries::GetRangeParams,
30};
31use indexmap::IndexMap;
32use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::AtomicTime};
33use nautilus_model::{
34 data::{Bar, Data, InstrumentStatus, OrderBookDepth10, QuoteTick, TradeTick},
35 enums::BarAggregation,
36 identifiers::{InstrumentId, Symbol, Venue},
37 instruments::InstrumentAny,
38 types::Currency,
39};
40
41use crate::{
42 common::get_date_time_range,
43 decode::{
44 decode_imbalance_msg, decode_instrument_def_msg, decode_mbp10_msg, decode_record,
45 decode_statistics_msg, decode_status_msg,
46 },
47 symbology::{
48 MetadataCache, check_consistent_symbology, decode_nautilus_instrument_id,
49 infer_symbology_type, instrument_id_to_symbol_string,
50 },
51 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
52};
53
54#[derive(Debug, Clone)]
59pub struct DatabentoHistoricalClient {
60 pub key: String,
61 clock: &'static AtomicTime,
62 inner: Arc<tokio::sync::Mutex<databento::HistoricalClient>>,
63 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
64 symbol_venue_map: Arc<RwLock<AHashMap<Symbol, Venue>>>,
65 use_exchange_as_venue: bool,
66}
67
68#[derive(Debug)]
70pub struct RangeQueryParams {
71 pub dataset: String,
72 pub symbols: Vec<String>,
73 pub start: UnixNanos,
74 pub end: Option<UnixNanos>,
75 pub limit: Option<u64>,
76 pub price_precision: Option<u8>,
77}
78
79#[derive(Debug, Clone)]
81pub struct DatasetRange {
82 pub start: String,
83 pub end: String,
84}
85
86impl DatabentoHistoricalClient {
87 pub fn new(
93 key: String,
94 publishers_filepath: PathBuf,
95 clock: &'static AtomicTime,
96 use_exchange_as_venue: bool,
97 ) -> anyhow::Result<Self> {
98 let client = databento::HistoricalClient::builder()
99 .user_agent_extension(NAUTILUS_USER_AGENT.into())
100 .key(key.clone())
101 .map_err(|e| anyhow::anyhow!("Failed to create client builder: {e}"))?
102 .build()
103 .map_err(|e| anyhow::anyhow!("Failed to build client: {e}"))?;
104
105 let file_content = fs::read_to_string(publishers_filepath)?;
106 let publishers_vec: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
107
108 let publisher_venue_map = publishers_vec
109 .into_iter()
110 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
111 .collect::<IndexMap<u16, Venue>>();
112
113 Ok(Self {
114 clock,
115 inner: Arc::new(tokio::sync::Mutex::new(client)),
116 publisher_venue_map: Arc::new(publisher_venue_map),
117 symbol_venue_map: Arc::new(RwLock::new(AHashMap::new())),
118 key,
119 use_exchange_as_venue,
120 })
121 }
122
123 pub async fn get_dataset_range(&self, dataset: &str) -> anyhow::Result<DatasetRange> {
129 let mut client = self.inner.lock().await;
130 let response = client
131 .metadata()
132 .get_dataset_range(dataset)
133 .await
134 .map_err(|e| anyhow::anyhow!("Failed to get dataset range: {e}"))?;
135
136 Ok(DatasetRange {
137 start: response.start.to_string(),
138 end: response.end.to_string(),
139 })
140 }
141
142 pub async fn get_range_instruments(
148 &self,
149 params: RangeQueryParams,
150 ) -> anyhow::Result<Vec<InstrumentAny>> {
151 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
152 check_consistent_symbology(&symbols)?;
153
154 let first_symbol = params
155 .symbols
156 .first()
157 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
158 let stype_in = infer_symbology_type(first_symbol);
159 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
160 let time_range = get_date_time_range(params.start, end)?;
161
162 let range_params = GetRangeParams::builder()
163 .dataset(params.dataset)
164 .date_time_range(time_range)
165 .symbols(symbols)
166 .stype_in(stype_in)
167 .schema(dbn::Schema::Definition)
168 .limit(params.limit.and_then(NonZeroU64::new))
169 .build();
170
171 let mut client = self.inner.lock().await;
172 let mut decoder = client
173 .timeseries()
174 .get_range(&range_params)
175 .await
176 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
177
178 let metadata = decoder.metadata().clone();
179 let mut metadata_cache = MetadataCache::new(metadata);
180 let mut instruments = Vec::new();
181
182 while let Ok(Some(msg)) = decoder.decode_record::<dbn::InstrumentDefMsg>().await {
183 let record = dbn::RecordRef::from(msg);
184 let sym_map = self
185 .symbol_venue_map
186 .read()
187 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
188 let mut instrument_id = decode_nautilus_instrument_id(
189 &record,
190 &mut metadata_cache,
191 &self.publisher_venue_map,
192 &sym_map,
193 )?;
194
195 if self.use_exchange_as_venue && instrument_id.venue == Venue::GLBX() {
196 let exchange = msg
197 .exchange()
198 .map_err(|e| anyhow::anyhow!("Missing exchange in record: {e}"))?;
199 let venue = Venue::from_code(exchange)
200 .map_err(|e| anyhow::anyhow!("Venue not found for exchange {exchange}: {e}"))?;
201 instrument_id.venue = venue;
202 }
203
204 match decode_instrument_def_msg(msg, instrument_id, None) {
205 Ok(instrument) => instruments.push(instrument),
206 Err(e) => tracing::error!("Failed to decode instrument: {e:?}"),
207 }
208 }
209
210 Ok(instruments)
211 }
212
213 pub async fn get_range_quotes(
219 &self,
220 params: RangeQueryParams,
221 schema: Option<String>,
222 ) -> anyhow::Result<Vec<QuoteTick>> {
223 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
224 check_consistent_symbology(&symbols)?;
225
226 let first_symbol = params
227 .symbols
228 .first()
229 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
230 let stype_in = infer_symbology_type(first_symbol);
231 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
232 let time_range = get_date_time_range(params.start, end)?;
233 let schema = schema.unwrap_or_else(|| "mbp-1".to_string());
234 let dbn_schema = dbn::Schema::from_str(&schema)?;
235
236 match dbn_schema {
237 dbn::Schema::Mbp1
238 | dbn::Schema::Bbo1S
239 | dbn::Schema::Bbo1M
240 | dbn::Schema::Cmbp1
241 | dbn::Schema::Cbbo1S
242 | dbn::Schema::Cbbo1M => (),
243 _ => anyhow::bail!(
244 "Invalid schema. Must be one of: mbp-1, bbo-1s, bbo-1m, cmbp-1, cbbo-1s, cbbo-1m"
245 ),
246 }
247
248 let range_params = GetRangeParams::builder()
249 .dataset(params.dataset)
250 .date_time_range(time_range)
251 .symbols(symbols)
252 .stype_in(stype_in)
253 .schema(dbn_schema)
254 .limit(params.limit.and_then(NonZeroU64::new))
255 .build();
256
257 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
258
259 let mut client = self.inner.lock().await;
260 let mut decoder = client
261 .timeseries()
262 .get_range(&range_params)
263 .await
264 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
265
266 let metadata = decoder.metadata().clone();
267 let mut metadata_cache = MetadataCache::new(metadata);
268 let mut result: Vec<QuoteTick> = Vec::new();
269
270 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
271 let sym_map = self
272 .symbol_venue_map
273 .read()
274 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
275 let instrument_id = decode_nautilus_instrument_id(
276 &record,
277 &mut metadata_cache,
278 &self.publisher_venue_map,
279 &sym_map,
280 )?;
281
282 let (data, _) = decode_record(
283 &record,
284 instrument_id,
285 price_precision,
286 None,
287 false, true,
289 )?;
290
291 match data {
292 Some(Data::Quote(quote)) => result.push(quote),
293 None => {} _ => anyhow::bail!("Invalid data element not `QuoteTick`, was {data:?}"),
295 }
296 Ok(())
297 };
298
299 match dbn_schema {
300 dbn::Schema::Mbp1 => {
301 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp1Msg>().await {
302 process_record(dbn::RecordRef::from(msg))?;
303 }
304 }
305 dbn::Schema::Cmbp1 => {
306 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Cmbp1Msg>().await {
307 process_record(dbn::RecordRef::from(msg))?;
308 }
309 }
310 dbn::Schema::Bbo1M => {
311 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1MMsg>().await {
312 process_record(dbn::RecordRef::from(msg))?;
313 }
314 }
315 dbn::Schema::Bbo1S => {
316 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Bbo1SMsg>().await {
317 process_record(dbn::RecordRef::from(msg))?;
318 }
319 }
320 dbn::Schema::Cbbo1S | dbn::Schema::Cbbo1M => {
321 while let Ok(Some(msg)) = decoder.decode_record::<dbn::CbboMsg>().await {
322 process_record(dbn::RecordRef::from(msg))?;
323 }
324 }
325 _ => anyhow::bail!("Invalid schema {dbn_schema}"),
326 }
327
328 Ok(result)
329 }
330
331 pub async fn get_range_order_book_depth10(
337 &self,
338 params: RangeQueryParams,
339 depth: Option<usize>,
340 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
341 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
342 check_consistent_symbology(&symbols)?;
343
344 let first_symbol = params
345 .symbols
346 .first()
347 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
348 let stype_in = infer_symbology_type(first_symbol);
349 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
350 let time_range = get_date_time_range(params.start, end)?;
351
352 let _depth = depth.unwrap_or(10);
354 if _depth != 10 {
355 anyhow::bail!("Only depth=10 is currently supported for order book depths");
356 }
357
358 let range_params = GetRangeParams::builder()
359 .dataset(params.dataset)
360 .date_time_range(time_range)
361 .symbols(symbols)
362 .stype_in(stype_in)
363 .schema(dbn::Schema::Mbp10)
364 .limit(params.limit.and_then(NonZeroU64::new))
365 .build();
366
367 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
368
369 let mut client = self.inner.lock().await;
370 let mut decoder = client
371 .timeseries()
372 .get_range(&range_params)
373 .await
374 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
375
376 let metadata = decoder.metadata().clone();
377 let mut metadata_cache = MetadataCache::new(metadata);
378 let mut result: Vec<OrderBookDepth10> = Vec::new();
379
380 let mut process_record = |record: dbn::RecordRef| -> anyhow::Result<()> {
381 let sym_map = self
382 .symbol_venue_map
383 .read()
384 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
385 let instrument_id = decode_nautilus_instrument_id(
386 &record,
387 &mut metadata_cache,
388 &self.publisher_venue_map,
389 &sym_map,
390 )?;
391
392 if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
393 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, None)?;
394 result.push(depth);
395 }
396
397 Ok(())
398 };
399
400 while let Ok(Some(msg)) = decoder.decode_record::<dbn::Mbp10Msg>().await {
401 process_record(dbn::RecordRef::from(msg))?;
402 }
403
404 Ok(result)
405 }
406
407 pub async fn get_range_trades(
413 &self,
414 params: RangeQueryParams,
415 ) -> anyhow::Result<Vec<TradeTick>> {
416 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
417 check_consistent_symbology(&symbols)?;
418
419 let first_symbol = params
420 .symbols
421 .first()
422 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
423 let stype_in = infer_symbology_type(first_symbol);
424 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
425 let time_range = get_date_time_range(params.start, end)?;
426
427 let range_params = GetRangeParams::builder()
428 .dataset(params.dataset)
429 .date_time_range(time_range)
430 .symbols(symbols)
431 .stype_in(stype_in)
432 .schema(dbn::Schema::Trades)
433 .limit(params.limit.and_then(NonZeroU64::new))
434 .build();
435
436 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
437
438 let mut client = self.inner.lock().await;
439 let mut decoder = client
440 .timeseries()
441 .get_range(&range_params)
442 .await
443 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
444
445 let metadata = decoder.metadata().clone();
446 let mut metadata_cache = MetadataCache::new(metadata);
447 let mut result: Vec<TradeTick> = Vec::new();
448
449 while let Ok(Some(msg)) = decoder.decode_record::<dbn::TradeMsg>().await {
450 let record = dbn::RecordRef::from(msg);
451 let sym_map = self
452 .symbol_venue_map
453 .read()
454 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
455 let instrument_id = decode_nautilus_instrument_id(
456 &record,
457 &mut metadata_cache,
458 &self.publisher_venue_map,
459 &sym_map,
460 )?;
461
462 let (data, _) = decode_record(
463 &record,
464 instrument_id,
465 price_precision,
466 None,
467 false, true,
469 )?;
470
471 match data {
472 Some(Data::Trade(trade)) => {
473 result.push(trade);
474 }
475 _ => anyhow::bail!("Invalid data element not `TradeTick`, was {data:?}"),
476 }
477 }
478
479 Ok(result)
480 }
481
482 pub async fn get_range_bars(
488 &self,
489 params: RangeQueryParams,
490 aggregation: BarAggregation,
491 timestamp_on_close: bool,
492 ) -> anyhow::Result<Vec<Bar>> {
493 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
494 check_consistent_symbology(&symbols)?;
495
496 let first_symbol = params
497 .symbols
498 .first()
499 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
500 let stype_in = infer_symbology_type(first_symbol);
501 let schema = match aggregation {
502 BarAggregation::Second => dbn::Schema::Ohlcv1S,
503 BarAggregation::Minute => dbn::Schema::Ohlcv1M,
504 BarAggregation::Hour => dbn::Schema::Ohlcv1H,
505 BarAggregation::Day => dbn::Schema::Ohlcv1D,
506 _ => anyhow::bail!("Invalid `BarAggregation` for request, was {aggregation}"),
507 };
508
509 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
510 let time_range = get_date_time_range(params.start, end)?;
511
512 let range_params = GetRangeParams::builder()
513 .dataset(params.dataset)
514 .date_time_range(time_range)
515 .symbols(symbols)
516 .stype_in(stype_in)
517 .schema(schema)
518 .limit(params.limit.and_then(NonZeroU64::new))
519 .build();
520
521 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
522
523 let mut client = self.inner.lock().await;
524 let mut decoder = client
525 .timeseries()
526 .get_range(&range_params)
527 .await
528 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
529
530 let metadata = decoder.metadata().clone();
531 let mut metadata_cache = MetadataCache::new(metadata);
532 let mut result: Vec<Bar> = Vec::new();
533
534 while let Ok(Some(msg)) = decoder.decode_record::<dbn::OhlcvMsg>().await {
535 let record = dbn::RecordRef::from(msg);
536 let sym_map = self
537 .symbol_venue_map
538 .read()
539 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
540 let instrument_id = decode_nautilus_instrument_id(
541 &record,
542 &mut metadata_cache,
543 &self.publisher_venue_map,
544 &sym_map,
545 )?;
546
547 let (data, _) = decode_record(
548 &record,
549 instrument_id,
550 price_precision,
551 None,
552 false, timestamp_on_close,
554 )?;
555
556 match data {
557 Some(Data::Bar(bar)) => {
558 result.push(bar);
559 }
560 _ => anyhow::bail!("Invalid data element not `Bar`, was {data:?}"),
561 }
562 }
563
564 Ok(result)
565 }
566
567 pub async fn get_range_imbalance(
573 &self,
574 params: RangeQueryParams,
575 ) -> anyhow::Result<Vec<DatabentoImbalance>> {
576 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
577 check_consistent_symbology(&symbols)?;
578
579 let first_symbol = params
580 .symbols
581 .first()
582 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
583 let stype_in = infer_symbology_type(first_symbol);
584 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
585 let time_range = get_date_time_range(params.start, end)?;
586
587 let range_params = GetRangeParams::builder()
588 .dataset(params.dataset)
589 .date_time_range(time_range)
590 .symbols(symbols)
591 .stype_in(stype_in)
592 .schema(dbn::Schema::Imbalance)
593 .limit(params.limit.and_then(NonZeroU64::new))
594 .build();
595
596 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
597
598 let mut client = self.inner.lock().await;
599 let mut decoder = client
600 .timeseries()
601 .get_range(&range_params)
602 .await
603 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
604
605 let metadata = decoder.metadata().clone();
606 let mut metadata_cache = MetadataCache::new(metadata);
607 let mut result: Vec<DatabentoImbalance> = Vec::new();
608
609 while let Ok(Some(msg)) = decoder.decode_record::<dbn::ImbalanceMsg>().await {
610 let record = dbn::RecordRef::from(msg);
611 let sym_map = self
612 .symbol_venue_map
613 .read()
614 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
615 let instrument_id = decode_nautilus_instrument_id(
616 &record,
617 &mut metadata_cache,
618 &self.publisher_venue_map,
619 &sym_map,
620 )?;
621
622 let imbalance = decode_imbalance_msg(msg, instrument_id, price_precision, None)?;
623 result.push(imbalance);
624 }
625
626 Ok(result)
627 }
628
629 pub async fn get_range_statistics(
635 &self,
636 params: RangeQueryParams,
637 ) -> anyhow::Result<Vec<DatabentoStatistics>> {
638 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
639 check_consistent_symbology(&symbols)?;
640
641 let first_symbol = params
642 .symbols
643 .first()
644 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
645 let stype_in = infer_symbology_type(first_symbol);
646 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
647 let time_range = get_date_time_range(params.start, end)?;
648
649 let range_params = GetRangeParams::builder()
650 .dataset(params.dataset)
651 .date_time_range(time_range)
652 .symbols(symbols)
653 .stype_in(stype_in)
654 .schema(dbn::Schema::Statistics)
655 .limit(params.limit.and_then(NonZeroU64::new))
656 .build();
657
658 let price_precision = params.price_precision.unwrap_or(Currency::USD().precision);
659
660 let mut client = self.inner.lock().await;
661 let mut decoder = client
662 .timeseries()
663 .get_range(&range_params)
664 .await
665 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
666
667 let metadata = decoder.metadata().clone();
668 let mut metadata_cache = MetadataCache::new(metadata);
669 let mut result: Vec<DatabentoStatistics> = Vec::new();
670
671 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatMsg>().await {
672 let record = dbn::RecordRef::from(msg);
673 let sym_map = self
674 .symbol_venue_map
675 .read()
676 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
677 let instrument_id = decode_nautilus_instrument_id(
678 &record,
679 &mut metadata_cache,
680 &self.publisher_venue_map,
681 &sym_map,
682 )?;
683
684 let statistics = decode_statistics_msg(msg, instrument_id, price_precision, None)?;
685 result.push(statistics);
686 }
687
688 Ok(result)
689 }
690
691 pub async fn get_range_status(
697 &self,
698 params: RangeQueryParams,
699 ) -> anyhow::Result<Vec<InstrumentStatus>> {
700 let symbols: Vec<&str> = params.symbols.iter().map(String::as_str).collect();
701 check_consistent_symbology(&symbols)?;
702
703 let first_symbol = params
704 .symbols
705 .first()
706 .ok_or_else(|| anyhow::anyhow!("No symbols provided"))?;
707 let stype_in = infer_symbology_type(first_symbol);
708 let end = params.end.unwrap_or_else(|| self.clock.get_time_ns());
709 let time_range = get_date_time_range(params.start, end)?;
710
711 let range_params = GetRangeParams::builder()
712 .dataset(params.dataset)
713 .date_time_range(time_range)
714 .symbols(symbols)
715 .stype_in(stype_in)
716 .schema(dbn::Schema::Status)
717 .limit(params.limit.and_then(NonZeroU64::new))
718 .build();
719
720 let mut client = self.inner.lock().await;
721 let mut decoder = client
722 .timeseries()
723 .get_range(&range_params)
724 .await
725 .map_err(|e| anyhow::anyhow!("Failed to get range: {e}"))?;
726
727 let metadata = decoder.metadata().clone();
728 let mut metadata_cache = MetadataCache::new(metadata);
729 let mut result: Vec<InstrumentStatus> = Vec::new();
730
731 while let Ok(Some(msg)) = decoder.decode_record::<dbn::StatusMsg>().await {
732 let record = dbn::RecordRef::from(msg);
733 let sym_map = self
734 .symbol_venue_map
735 .read()
736 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
737 let instrument_id = decode_nautilus_instrument_id(
738 &record,
739 &mut metadata_cache,
740 &self.publisher_venue_map,
741 &sym_map,
742 )?;
743
744 let status = decode_status_msg(msg, instrument_id, None)?;
745 result.push(status);
746 }
747
748 Ok(result)
749 }
750
751 pub fn prepare_symbols_from_instrument_ids(
757 &self,
758 instrument_ids: &[InstrumentId],
759 ) -> anyhow::Result<Vec<String>> {
760 let mut symbol_venue_map = self
761 .symbol_venue_map
762 .write()
763 .map_err(|e| anyhow::anyhow!("symbol_venue_map lock poisoned: {e}"))?;
764
765 let symbols: Vec<String> = instrument_ids
766 .iter()
767 .map(|instrument_id| {
768 instrument_id_to_symbol_string(*instrument_id, &mut symbol_venue_map)
769 })
770 .collect();
771
772 Ok(symbols)
773 }
774}