1use std::{cell::Ref, fmt::Display};
17
18use nautilus_common::{
19 clock::Clock,
20 messages::{
21 DataEvent,
22 data::{
23 BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
24 QuotesResponse, TradesResponse,
25 },
26 },
27};
28use nautilus_core::{UUID4, UnixNanos};
29use nautilus_data::client::DataClient;
30use nautilus_model::{
31 data::{
32 Bar, BarType, Data, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas_API,
33 OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
34 },
35 identifiers::{ClientId, InstrumentId, Venue},
36 instruments::{Instrument, InstrumentAny},
37 orderbook::OrderBook,
38};
39
40#[async_trait::async_trait]
41pub trait LiveDataClient: DataClient {
42 fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<DataEvent>;
43
44 fn get_clock(&self) -> Ref<'_, dyn Clock>;
45
46 fn send_delta(&self, delta: OrderBookDelta) {
47 self.send_data(Data::Delta(delta));
48 }
49
50 fn send_deltas(&self, deltas: OrderBookDeltas_API) {
51 self.send_data(Data::Deltas(deltas));
52 }
53
54 fn send_depth10(&self, depth: OrderBookDepth10) {
55 self.send_data(Data::Depth10(Box::new(depth)));
56 }
57
58 fn send_quote(&self, quote: QuoteTick) {
59 self.send_data(Data::Quote(quote));
60 }
61
62 fn send_trade(&self, trade: TradeTick) {
63 self.send_data(Data::Trade(trade));
64 }
65
66 fn send_bar(&self, bar: Bar) {
67 self.send_data(Data::Bar(bar));
68 }
69
70 fn send_mark_price(&self, mark_price: MarkPriceUpdate) {
71 self.send_data(Data::MarkPriceUpdate(mark_price));
72 }
73
74 fn send_index_price(&self, index_price: IndexPriceUpdate) {
75 self.send_data(Data::IndexPriceUpdate(index_price));
76 }
77
78 fn send_instrument_close(&self, close: InstrumentClose) {
79 self.send_data(Data::InstrumentClose(close));
80 }
81
82 fn send_data(&self, data: Data) {
83 if let Err(e) = self.get_message_channel().send(DataEvent::Data(data)) {
84 log_send_error(&self.client_id(), &e);
85 }
86 }
87
88 fn send_instrument_response(
89 &self,
90 instrument: InstrumentAny,
91 correlation_id: UUID4,
92 start: Option<UnixNanos>,
93 end: Option<UnixNanos>,
94 ) {
95 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
96 correlation_id,
97 self.client_id(),
98 instrument.id(),
99 instrument,
100 start,
101 end,
102 self.get_clock().timestamp_ns(),
103 None,
104 )));
105
106 self.send_response(response);
107 }
108
109 fn send_instruments_response(
110 &self,
111 venue: Venue,
112 instruments: Vec<InstrumentAny>,
113 correlation_id: UUID4,
114 start: Option<UnixNanos>,
115 end: Option<UnixNanos>,
116 ) {
117 let response = DataResponse::Instruments(InstrumentsResponse::new(
118 correlation_id,
119 self.client_id(),
120 venue,
121 instruments,
122 start,
123 end,
124 self.get_clock().timestamp_ns(),
125 None,
126 ));
127
128 self.send_response(response);
129 }
130
131 fn send_book_response(
132 &self,
133 book: OrderBook,
134 correlation_id: UUID4,
135 start: Option<UnixNanos>,
136 end: Option<UnixNanos>,
137 ) {
138 let response = DataResponse::Book(BookResponse::new(
139 correlation_id,
140 self.client_id(),
141 book.instrument_id,
142 book,
143 start,
144 end,
145 self.get_clock().timestamp_ns(),
146 None,
147 ));
148
149 self.send_response(response);
150 }
151
152 fn send_quotes_response(
153 &self,
154 instrument_id: InstrumentId,
155 quotes: Vec<QuoteTick>,
156 correlation_id: UUID4,
157 start: Option<UnixNanos>,
158 end: Option<UnixNanos>,
159 ) {
160 let response = DataResponse::Quotes(QuotesResponse::new(
161 correlation_id,
162 self.client_id(),
163 instrument_id,
164 quotes,
165 start,
166 end,
167 self.get_clock().timestamp_ns(),
168 None,
169 ));
170
171 self.send_response(response);
172 }
173
174 fn send_trades_response(
175 &self,
176 instrument_id: InstrumentId,
177 trades: Vec<TradeTick>,
178 correlation_id: UUID4,
179 start: Option<UnixNanos>,
180 end: Option<UnixNanos>,
181 ) {
182 let response = DataResponse::Trades(TradesResponse::new(
183 correlation_id,
184 self.client_id(),
185 instrument_id,
186 trades,
187 start,
188 end,
189 self.get_clock().timestamp_ns(),
190 None,
191 ));
192
193 self.send_response(response);
194 }
195
196 fn send_bars(
197 &self,
198 bar_type: BarType,
199 bars: Vec<Bar>,
200 correlation_id: UUID4,
201 start: Option<UnixNanos>,
202 end: Option<UnixNanos>,
203 ) {
204 let response = DataResponse::Bars(BarsResponse::new(
205 correlation_id,
206 self.client_id(),
207 bar_type,
208 bars,
209 start,
210 end,
211 self.get_clock().timestamp_ns(),
212 None,
213 ));
214
215 self.send_response(response);
216 }
217
218 fn send_response(&self, response: DataResponse) {
219 if let Err(e) = self
220 .get_message_channel()
221 .send(DataEvent::Response(response))
222 {
223 log_send_error(&self.client_id(), &e);
224 }
225 }
226}
227
228#[inline(always)]
229fn log_send_error<E: Display>(client_id: &ClientId, e: &E) {
230 log::error!("DataClient-{client_id} failed to send message: {e}");
231}