nautilus_tardis/http/
client.rs1use std::{env, fmt::Debug, time::Duration};
17
18use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT};
19use nautilus_cryptography::providers::install_cryptographic_provider;
20use nautilus_model::instruments::InstrumentAny;
21use reqwest::Response;
22
23use super::{
24 TARDIS_BASE_URL,
25 error::{Error, TardisErrorResponse},
26 instruments::is_available,
27 models::TardisInstrumentInfo,
28 parse::parse_instrument_any,
29 query::InstrumentFilter,
30};
31use crate::{
32 common::{consts::TARDIS_API_KEY, credential::Credential},
33 enums::TardisExchange,
34};
35
36pub type Result<T> = std::result::Result<T, Error>;
37
38#[cfg_attr(
41 feature = "python",
42 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.tardis")
43)]
44#[derive(Clone)]
45pub struct TardisHttpClient {
46 base_url: String,
47 credential: Option<Credential>,
48 client: reqwest::Client,
49 normalize_symbols: bool,
50}
51
52impl Debug for TardisHttpClient {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct(stringify!(TardisHttpClient))
55 .field("base_url", &self.base_url)
56 .field(
57 "credential",
58 &self.credential.as_ref().map(|_| "<redacted>"),
59 )
60 .field("normalize_symbols", &self.normalize_symbols)
61 .finish()
62 }
63}
64
65impl TardisHttpClient {
66 pub fn new(
73 api_key: Option<&str>,
74 base_url: Option<&str>,
75 timeout_secs: Option<u64>,
76 normalize_symbols: bool,
77 ) -> anyhow::Result<Self> {
78 let credential = match api_key {
79 Some(key) => Some(Credential::new(key)),
80 None => env::var(TARDIS_API_KEY).ok().map(Credential::new),
81 };
82
83 if credential.is_none() {
84 anyhow::bail!(
85 "API key must be provided or set in the '{TARDIS_API_KEY}' environment variable"
86 );
87 }
88
89 let base_url = base_url.map_or_else(|| TARDIS_BASE_URL.to_string(), ToString::to_string);
90 let timeout = timeout_secs.map_or_else(|| Duration::from_secs(60), Duration::from_secs);
91
92 install_cryptographic_provider();
93 let client = reqwest::Client::builder()
94 .user_agent(NAUTILUS_USER_AGENT)
95 .timeout(timeout)
96 .build()?;
97
98 Ok(Self {
99 base_url,
100 credential,
101 client,
102 normalize_symbols,
103 })
104 }
105
106 #[must_use]
108 pub const fn credential(&self) -> Option<&Credential> {
109 self.credential.as_ref()
110 }
111
112 async fn handle_error_response<T>(resp: Response) -> Result<T> {
113 let status = resp.status().as_u16();
114 let error_text = match resp.text().await {
115 Ok(text) => text,
116 Err(e) => {
117 log::warn!("Failed to extract error response body: {e}");
118 String::from("Failed to extract error response")
119 }
120 };
121
122 if let Ok(error) = serde_json::from_str::<TardisErrorResponse>(&error_text) {
123 Err(Error::ApiError {
124 status,
125 code: error.code,
126 message: error.message,
127 })
128 } else {
129 Err(Error::ApiError {
130 status,
131 code: 0,
132 message: error_text,
133 })
134 }
135 }
136
137 pub async fn instruments_info(
145 &self,
146 exchange: TardisExchange,
147 symbol: Option<&str>,
148 filter: Option<&InstrumentFilter>,
149 ) -> Result<Vec<TardisInstrumentInfo>> {
150 let mut url = format!("{}/instruments/{exchange}", &self.base_url);
151 if let Some(symbol) = symbol {
152 url.push_str(&format!("/{symbol}"));
153 }
154 if let Some(filter) = filter
155 && let Ok(filter_json) = serde_json::to_string(filter)
156 {
157 url.push_str(&format!("?filter={}", urlencoding::encode(&filter_json)));
158 }
159 log::debug!("Requesting: {url}");
160
161 let resp = self
162 .client
163 .get(url)
164 .bearer_auth(self.credential.as_ref().map_or("", |c| c.api_key()))
165 .send()
166 .await?;
167 log::debug!("Response status: {}", resp.status());
168
169 if !resp.status().is_success() {
170 return Self::handle_error_response(resp).await;
171 }
172
173 let body = resp.text().await?;
174 log::trace!("{body}");
175
176 if let Ok(instrument) = serde_json::from_str::<TardisInstrumentInfo>(&body) {
177 return Ok(vec![instrument]);
178 }
179
180 match serde_json::from_str(&body) {
181 Ok(parsed) => Ok(parsed),
182 Err(e) => {
183 log::error!("Failed to parse response: {e}");
184 log::debug!("Response body was: {body}");
185 Err(Error::ResponseParse(e.to_string()))
186 }
187 }
188 }
189
190 #[allow(clippy::too_many_arguments)]
198 pub async fn instruments(
199 &self,
200 exchange: TardisExchange,
201 symbol: Option<&str>,
202 filter: Option<&InstrumentFilter>,
203 start: Option<UnixNanos>,
204 end: Option<UnixNanos>,
205 available_offset: Option<UnixNanos>,
206 effective: Option<UnixNanos>,
207 ts_init: Option<UnixNanos>,
208 ) -> Result<Vec<InstrumentAny>> {
209 let response = self.instruments_info(exchange, symbol, filter).await?;
210
211 Ok(response
212 .into_iter()
213 .filter(|info| is_available(info, start, end, available_offset, effective))
214 .flat_map(|info| parse_instrument_any(info, effective, ts_init, self.normalize_symbols))
215 .collect())
216 }
217}