nautilus_tardis/http/
client.rs1use std::{env, fmt::Debug, time::Duration};
17
18use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT};
19use nautilus_cryptography::providers::install_cryptographic_provider;
20use nautilus_model::instruments::InstrumentAny;
21use reqwest::Response;
22
23use super::{
24 TARDIS_BASE_URL,
25 error::{Error, TardisErrorResponse},
26 instruments::is_available,
27 models::TardisInstrumentInfo,
28 parse::parse_instrument_any,
29 query::InstrumentFilter,
30};
31use crate::{common::credential::Credential, enums::TardisExchange};
32
33pub type Result<T> = std::result::Result<T, Error>;
34
35#[cfg_attr(
38 feature = "python",
39 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
40)]
41#[derive(Clone)]
42pub struct TardisHttpClient {
43 base_url: String,
44 credential: Option<Credential>,
45 client: reqwest::Client,
46 normalize_symbols: bool,
47}
48
49impl Debug for TardisHttpClient {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 f.debug_struct("TardisHttpClient")
52 .field("base_url", &self.base_url)
53 .field(
54 "credential",
55 &self.credential.as_ref().map(|_| "<redacted>"),
56 )
57 .field("normalize_symbols", &self.normalize_symbols)
58 .finish()
59 }
60}
61
62impl TardisHttpClient {
63 pub fn new(
70 api_key: Option<&str>,
71 base_url: Option<&str>,
72 timeout_secs: Option<u64>,
73 normalize_symbols: bool,
74 ) -> anyhow::Result<Self> {
75 let credential = match api_key {
76 Some(key) => Some(Credential::new(key)),
77 None => env::var("TARDIS_API_KEY").ok().map(Credential::new),
78 };
79
80 if credential.is_none() {
81 anyhow::bail!(
82 "API key must be provided or set in the 'TARDIS_API_KEY' environment variable"
83 );
84 }
85
86 let base_url = base_url.map_or_else(|| TARDIS_BASE_URL.to_string(), ToString::to_string);
87 let timeout = timeout_secs.map_or_else(|| Duration::from_secs(60), Duration::from_secs);
88
89 install_cryptographic_provider();
90 let client = reqwest::Client::builder()
91 .user_agent(NAUTILUS_USER_AGENT)
92 .timeout(timeout)
93 .build()?;
94
95 Ok(Self {
96 base_url,
97 credential,
98 client,
99 normalize_symbols,
100 })
101 }
102
103 #[must_use]
105 pub const fn credential(&self) -> Option<&Credential> {
106 self.credential.as_ref()
107 }
108
109 async fn handle_error_response<T>(resp: Response) -> Result<T> {
110 let status = resp.status().as_u16();
111 let error_text = match resp.text().await {
112 Ok(text) => text,
113 Err(e) => {
114 tracing::warn!("Failed to extract error response body: {e}");
115 String::from("Failed to extract error response")
116 }
117 };
118
119 if let Ok(error) = serde_json::from_str::<TardisErrorResponse>(&error_text) {
120 Err(Error::ApiError {
121 status,
122 code: error.code,
123 message: error.message,
124 })
125 } else {
126 Err(Error::ApiError {
127 status,
128 code: 0,
129 message: error_text,
130 })
131 }
132 }
133
134 pub async fn instruments_info(
142 &self,
143 exchange: TardisExchange,
144 symbol: Option<&str>,
145 filter: Option<&InstrumentFilter>,
146 ) -> Result<Vec<TardisInstrumentInfo>> {
147 let mut url = format!("{}/instruments/{exchange}", &self.base_url);
148 if let Some(symbol) = symbol {
149 url.push_str(&format!("/{symbol}"));
150 }
151 if let Some(filter) = filter
152 && let Ok(filter_json) = serde_json::to_string(filter)
153 {
154 url.push_str(&format!("?filter={}", urlencoding::encode(&filter_json)));
155 }
156 tracing::debug!("Requesting: {url}");
157
158 let resp = self
159 .client
160 .get(url)
161 .bearer_auth(self.credential.as_ref().map_or("", |c| c.api_key()))
162 .send()
163 .await?;
164 tracing::debug!("Response status: {}", resp.status());
165
166 if !resp.status().is_success() {
167 return Self::handle_error_response(resp).await;
168 }
169
170 let body = resp.text().await?;
171 tracing::trace!("{body}");
172
173 if let Ok(instrument) = serde_json::from_str::<TardisInstrumentInfo>(&body) {
174 return Ok(vec![instrument]);
175 }
176
177 match serde_json::from_str(&body) {
178 Ok(parsed) => Ok(parsed),
179 Err(e) => {
180 tracing::error!("Failed to parse response: {e}");
181 tracing::debug!("Response body was: {body}");
182 Err(Error::ResponseParse(e.to_string()))
183 }
184 }
185 }
186
187 #[allow(clippy::too_many_arguments)]
195 pub async fn instruments(
196 &self,
197 exchange: TardisExchange,
198 symbol: Option<&str>,
199 filter: Option<&InstrumentFilter>,
200 start: Option<UnixNanos>,
201 end: Option<UnixNanos>,
202 available_offset: Option<UnixNanos>,
203 effective: Option<UnixNanos>,
204 ts_init: Option<UnixNanos>,
205 ) -> Result<Vec<InstrumentAny>> {
206 let response = self.instruments_info(exchange, symbol, filter).await?;
207
208 Ok(response
209 .into_iter()
210 .filter(|info| is_available(info, start, end, available_offset, effective))
211 .flat_map(|info| parse_instrument_any(info, effective, ts_init, self.normalize_symbols))
212 .collect())
213 }
214}