nautilus_tardis/http/
client.rs1use std::{env, time::Duration};
17
18use nautilus_core::{consts::USER_AGENT, UnixNanos};
19use nautilus_model::instruments::InstrumentAny;
20use reqwest::Response;
21
22use super::{
23 error::{Error, TardisErrorResponse},
24 models::InstrumentInfo,
25 parse::parse_instrument_any,
26 query::InstrumentFilter,
27 TARDIS_BASE_URL,
28};
29use crate::enums::Exchange;
30
31pub type Result<T> = std::result::Result<T, Error>;
32
33#[cfg_attr(
36 feature = "python",
37 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
38)]
39#[derive(Debug, Clone)]
40pub struct TardisHttpClient {
41 base_url: String,
42 api_key: String,
43 client: reqwest::Client,
44 normalize_symbols: bool,
45}
46
47impl TardisHttpClient {
48 pub fn new(
50 api_key: Option<&str>,
51 base_url: Option<&str>,
52 timeout_secs: Option<u64>,
53 normalize_symbols: bool,
54 ) -> anyhow::Result<Self> {
55 let api_key = match api_key {
56 Some(key) => key.to_string(),
57 None => env::var("TARDIS_API_KEY").map_err(|_| {
58 anyhow::anyhow!(
59 "API key must be provided or set in the 'TARDIS_API_KEY' environment variable"
60 )
61 })?,
62 };
63
64 let base_url = base_url.map_or_else(|| TARDIS_BASE_URL.to_string(), ToString::to_string);
65 let timeout = timeout_secs.map_or_else(|| Duration::from_secs(60), Duration::from_secs);
66
67 let client = reqwest::Client::builder()
68 .user_agent(USER_AGENT)
69 .timeout(timeout)
70 .build()?;
71
72 Ok(Self {
73 base_url,
74 api_key,
75 client,
76 normalize_symbols,
77 })
78 }
79
80 async fn handle_error_response<T>(resp: Response) -> Result<T> {
81 let status = resp.status().as_u16();
82 let error_text = resp.text().await.unwrap_or_default();
83
84 if let Ok(error) = serde_json::from_str::<TardisErrorResponse>(&error_text) {
85 Err(Error::ApiError {
86 status,
87 code: error.code,
88 message: error.message,
89 })
90 } else {
91 Err(Error::ApiError {
92 status,
93 code: 0,
94 message: error_text,
95 })
96 }
97 }
98
99 pub async fn instruments_info(
103 &self,
104 exchange: Exchange,
105 filter: Option<&InstrumentFilter>,
106 ) -> Result<Vec<InstrumentInfo>> {
107 let mut url = format!("{}/instruments/{exchange}", &self.base_url);
108 if let Some(filter) = filter {
109 if let Ok(filter_json) = serde_json::to_string(filter) {
110 url.push_str(&format!("?filter={}", urlencoding::encode(&filter_json)));
111 }
112 }
113 tracing::debug!("Requesting: {url}");
114
115 let resp = self
116 .client
117 .get(url)
118 .bearer_auth(&self.api_key)
119 .send()
120 .await?;
121 tracing::debug!("Response status: {}", resp.status());
122
123 if !resp.status().is_success() {
124 return Self::handle_error_response(resp).await;
125 }
126
127 let body = resp.text().await?;
128 tracing::trace!("{body}");
129
130 match serde_json::from_str(&body) {
131 Ok(parsed) => Ok(parsed),
132 Err(e) => {
133 tracing::error!("Failed to parse response: {}", e);
134 tracing::debug!("Response body was: {}", body);
135 Err(Error::ResponseParse(e.to_string()))
136 }
137 }
138 }
139
140 pub async fn instrument_info(
144 &self,
145 exchange: Exchange,
146 symbol: &str,
147 ) -> Result<InstrumentInfo> {
148 let url = format!("{}/instruments/{exchange}/{symbol}", &self.base_url);
149 tracing::debug!("Requesting {url}");
150
151 let resp = self
152 .client
153 .get(url)
154 .bearer_auth(&self.api_key)
155 .send()
156 .await?;
157 tracing::debug!("Response status: {}", resp.status());
158
159 if !resp.status().is_success() {
160 return Self::handle_error_response(resp).await;
161 }
162
163 let body = resp.text().await?;
164 tracing::trace!("{body}");
165
166 match serde_json::from_str(&body) {
167 Ok(parsed) => Ok(parsed),
168 Err(e) => {
169 tracing::error!("Failed to parse response: {}", e);
170 tracing::debug!("Response body was: {}", body);
171 Err(Error::ResponseParse(e.to_string()))
172 }
173 }
174 }
175
176 pub async fn instruments(
180 &self,
181 exchange: Exchange,
182 start: Option<u64>,
183 end: Option<u64>,
184 ts_init: Option<u64>,
185 filter: Option<&InstrumentFilter>,
186 ) -> Result<Vec<InstrumentAny>> {
187 let response = self.instruments_info(exchange, filter).await?;
188 let ts_init = ts_init.map(UnixNanos::from);
189
190 Ok(response
191 .into_iter()
192 .flat_map(|info| {
193 parse_instrument_any(info, start, end, ts_init, self.normalize_symbols)
194 })
195 .collect())
196 }
197
198 pub async fn instrument(
202 &self,
203 exchange: Exchange,
204 symbol: &str,
205 start: Option<u64>,
206 end: Option<u64>,
207 ts_init: Option<u64>,
208 ) -> Result<Vec<InstrumentAny>> {
209 let response = self.instrument_info(exchange, symbol).await?;
210 let ts_init = ts_init.map(UnixNanos::from);
211
212 Ok(parse_instrument_any(
213 response,
214 start,
215 end,
216 ts_init,
217 self.normalize_symbols,
218 ))
219 }
220}