nautilus_tardis/http/
client.rs1use std::{env, fmt::Debug, time::Duration};
17
18use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT};
19use nautilus_model::instruments::InstrumentAny;
20use reqwest::Response;
21
22use super::{
23 TARDIS_BASE_URL,
24 error::{Error, TardisErrorResponse},
25 instruments::is_available,
26 models::TardisInstrumentInfo,
27 parse::parse_instrument_any,
28 query::InstrumentFilter,
29};
30use crate::{common::credential::Credential, enums::TardisExchange};
31
32pub type Result<T> = std::result::Result<T, Error>;
33
34#[cfg_attr(
37 feature = "python",
38 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
39)]
40#[derive(Clone)]
41pub struct TardisHttpClient {
42 base_url: String,
43 credential: Option<Credential>,
44 client: reqwest::Client,
45 normalize_symbols: bool,
46}
47
48impl Debug for TardisHttpClient {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 f.debug_struct("TardisHttpClient")
51 .field("base_url", &self.base_url)
52 .field(
53 "credential",
54 &self.credential.as_ref().map(|_| "<redacted>"),
55 )
56 .field("normalize_symbols", &self.normalize_symbols)
57 .finish()
58 }
59}
60
61impl TardisHttpClient {
62 pub fn new(
69 api_key: Option<&str>,
70 base_url: Option<&str>,
71 timeout_secs: Option<u64>,
72 normalize_symbols: bool,
73 ) -> anyhow::Result<Self> {
74 let credential = match api_key {
75 Some(key) => Some(Credential::new(key)),
76 None => env::var("TARDIS_API_KEY").ok().map(Credential::new),
77 };
78
79 if credential.is_none() {
80 anyhow::bail!(
81 "API key must be provided or set in the 'TARDIS_API_KEY' environment variable"
82 );
83 }
84
85 let base_url = base_url.map_or_else(|| TARDIS_BASE_URL.to_string(), ToString::to_string);
86 let timeout = timeout_secs.map_or_else(|| Duration::from_secs(60), Duration::from_secs);
87
88 let client = reqwest::Client::builder()
89 .user_agent(NAUTILUS_USER_AGENT)
90 .timeout(timeout)
91 .build()?;
92
93 Ok(Self {
94 base_url,
95 credential,
96 client,
97 normalize_symbols,
98 })
99 }
100
101 #[must_use]
103 pub const fn credential(&self) -> Option<&Credential> {
104 self.credential.as_ref()
105 }
106
107 async fn handle_error_response<T>(resp: Response) -> Result<T> {
108 let status = resp.status().as_u16();
109 let error_text = match resp.text().await {
110 Ok(text) => text,
111 Err(e) => {
112 tracing::warn!("Failed to extract error response body: {e}");
113 String::from("Failed to extract error response")
114 }
115 };
116
117 if let Ok(error) = serde_json::from_str::<TardisErrorResponse>(&error_text) {
118 Err(Error::ApiError {
119 status,
120 code: error.code,
121 message: error.message,
122 })
123 } else {
124 Err(Error::ApiError {
125 status,
126 code: 0,
127 message: error_text,
128 })
129 }
130 }
131
132 pub async fn instruments_info(
140 &self,
141 exchange: TardisExchange,
142 symbol: Option<&str>,
143 filter: Option<&InstrumentFilter>,
144 ) -> Result<Vec<TardisInstrumentInfo>> {
145 let mut url = format!("{}/instruments/{exchange}", &self.base_url);
146 if let Some(symbol) = symbol {
147 url.push_str(&format!("/{symbol}"));
148 }
149 if let Some(filter) = filter
150 && let Ok(filter_json) = serde_json::to_string(filter)
151 {
152 url.push_str(&format!("?filter={}", urlencoding::encode(&filter_json)));
153 }
154 tracing::debug!("Requesting: {url}");
155
156 let resp = self
157 .client
158 .get(url)
159 .bearer_auth(self.credential.as_ref().map_or("", |c| c.api_key()))
160 .send()
161 .await?;
162 tracing::debug!("Response status: {}", resp.status());
163
164 if !resp.status().is_success() {
165 return Self::handle_error_response(resp).await;
166 }
167
168 let body = resp.text().await?;
169 tracing::trace!("{body}");
170
171 if let Ok(instrument) = serde_json::from_str::<TardisInstrumentInfo>(&body) {
172 return Ok(vec![instrument]);
173 }
174
175 match serde_json::from_str(&body) {
176 Ok(parsed) => Ok(parsed),
177 Err(e) => {
178 tracing::error!("Failed to parse response: {e}");
179 tracing::debug!("Response body was: {body}");
180 Err(Error::ResponseParse(e.to_string()))
181 }
182 }
183 }
184
185 #[allow(clippy::too_many_arguments)]
193 pub async fn instruments(
194 &self,
195 exchange: TardisExchange,
196 symbol: Option<&str>,
197 filter: Option<&InstrumentFilter>,
198 start: Option<UnixNanos>,
199 end: Option<UnixNanos>,
200 available_offset: Option<UnixNanos>,
201 effective: Option<UnixNanos>,
202 ts_init: Option<UnixNanos>,
203 ) -> Result<Vec<InstrumentAny>> {
204 let response = self.instruments_info(exchange, symbol, filter).await?;
205
206 Ok(response
207 .into_iter()
208 .filter(|info| is_available(info, start, end, available_offset, effective))
209 .flat_map(|info| parse_instrument_any(info, effective, ts_init, self.normalize_symbols))
210 .collect())
211 }
212}