nautilus_live/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::Ref, fmt::Display};
17
18use nautilus_common::{
19    clock::Clock,
20    messages::{
21        DataEvent,
22        data::{
23            BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
24            QuotesResponse, TradesResponse,
25        },
26    },
27};
28use nautilus_core::{UUID4, UnixNanos};
29use nautilus_data::client::DataClient;
30use nautilus_model::{
31    data::{
32        Bar, BarType, Data, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas_API,
33        OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
34    },
35    identifiers::{ClientId, InstrumentId, Venue},
36    instruments::{Instrument, InstrumentAny},
37    orderbook::OrderBook,
38};
39
40#[async_trait::async_trait]
41pub trait LiveDataClient: DataClient {
42    fn get_message_channel(&self) -> tokio::sync::mpsc::UnboundedSender<DataEvent>;
43
44    fn get_clock(&self) -> Ref<'_, dyn Clock>;
45
46    fn send_delta(&self, delta: OrderBookDelta) {
47        self.send_data(Data::Delta(delta));
48    }
49
50    fn send_deltas(&self, deltas: OrderBookDeltas_API) {
51        self.send_data(Data::Deltas(deltas));
52    }
53
54    fn send_depth10(&self, depth: OrderBookDepth10) {
55        self.send_data(Data::Depth10(Box::new(depth)));
56    }
57
58    fn send_quote(&self, quote: QuoteTick) {
59        self.send_data(Data::Quote(quote));
60    }
61
62    fn send_trade(&self, trade: TradeTick) {
63        self.send_data(Data::Trade(trade));
64    }
65
66    fn send_bar(&self, bar: Bar) {
67        self.send_data(Data::Bar(bar));
68    }
69
70    fn send_mark_price(&self, mark_price: MarkPriceUpdate) {
71        self.send_data(Data::MarkPriceUpdate(mark_price));
72    }
73
74    fn send_index_price(&self, index_price: IndexPriceUpdate) {
75        self.send_data(Data::IndexPriceUpdate(index_price));
76    }
77
78    fn send_instrument_close(&self, close: InstrumentClose) {
79        self.send_data(Data::InstrumentClose(close));
80    }
81
82    fn send_data(&self, data: Data) {
83        if let Err(e) = self.get_message_channel().send(DataEvent::Data(data)) {
84            log_send_error(&self.client_id(), &e);
85        }
86    }
87
88    fn send_instrument_response(
89        &self,
90        instrument: InstrumentAny,
91        correlation_id: UUID4,
92        start: Option<UnixNanos>,
93        end: Option<UnixNanos>,
94    ) {
95        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
96            correlation_id,
97            self.client_id(),
98            instrument.id(),
99            instrument,
100            start,
101            end,
102            self.get_clock().timestamp_ns(),
103            None,
104        )));
105
106        self.send_response(response);
107    }
108
109    fn send_instruments_response(
110        &self,
111        venue: Venue,
112        instruments: Vec<InstrumentAny>,
113        correlation_id: UUID4,
114        start: Option<UnixNanos>,
115        end: Option<UnixNanos>,
116    ) {
117        let response = DataResponse::Instruments(InstrumentsResponse::new(
118            correlation_id,
119            self.client_id(),
120            venue,
121            instruments,
122            start,
123            end,
124            self.get_clock().timestamp_ns(),
125            None,
126        ));
127
128        self.send_response(response);
129    }
130
131    fn send_book_response(
132        &self,
133        book: OrderBook,
134        correlation_id: UUID4,
135        start: Option<UnixNanos>,
136        end: Option<UnixNanos>,
137    ) {
138        let response = DataResponse::Book(BookResponse::new(
139            correlation_id,
140            self.client_id(),
141            book.instrument_id,
142            book,
143            start,
144            end,
145            self.get_clock().timestamp_ns(),
146            None,
147        ));
148
149        self.send_response(response);
150    }
151
152    fn send_quotes_response(
153        &self,
154        instrument_id: InstrumentId,
155        quotes: Vec<QuoteTick>,
156        correlation_id: UUID4,
157        start: Option<UnixNanos>,
158        end: Option<UnixNanos>,
159    ) {
160        let response = DataResponse::Quotes(QuotesResponse::new(
161            correlation_id,
162            self.client_id(),
163            instrument_id,
164            quotes,
165            start,
166            end,
167            self.get_clock().timestamp_ns(),
168            None,
169        ));
170
171        self.send_response(response);
172    }
173
174    fn send_trades_response(
175        &self,
176        instrument_id: InstrumentId,
177        trades: Vec<TradeTick>,
178        correlation_id: UUID4,
179        start: Option<UnixNanos>,
180        end: Option<UnixNanos>,
181    ) {
182        let response = DataResponse::Trades(TradesResponse::new(
183            correlation_id,
184            self.client_id(),
185            instrument_id,
186            trades,
187            start,
188            end,
189            self.get_clock().timestamp_ns(),
190            None,
191        ));
192
193        self.send_response(response);
194    }
195
196    fn send_bars(
197        &self,
198        bar_type: BarType,
199        bars: Vec<Bar>,
200        correlation_id: UUID4,
201        start: Option<UnixNanos>,
202        end: Option<UnixNanos>,
203    ) {
204        let response = DataResponse::Bars(BarsResponse::new(
205            correlation_id,
206            self.client_id(),
207            bar_type,
208            bars,
209            start,
210            end,
211            self.get_clock().timestamp_ns(),
212            None,
213        ));
214
215        self.send_response(response);
216    }
217
218    fn send_response(&self, response: DataResponse) {
219        if let Err(e) = self
220            .get_message_channel()
221            .send(DataEvent::Response(response))
222        {
223            log_send_error(&self.client_id(), &e);
224        }
225    }
226}
227
228#[inline(always)]
229fn log_send_error<E: Display>(client_id: &ClientId, e: &E) {
230    log::error!("DataClient-{client_id} failed to send message: {e}");
231}