1use std::{
17 collections::{HashMap, VecDeque},
18 time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23 cache::database::{CacheDatabaseAdapter, CacheMap},
24 custom::CustomData,
25 runtime::get_runtime,
26 signal::Signal,
27};
28use nautilus_core::UnixNanos;
29use nautilus_model::{
30 accounts::AccountAny,
31 data::{Bar, DataType, QuoteTick, TradeTick},
32 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
33 identifiers::{
34 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
35 VenueOrderId,
36 },
37 instruments::{InstrumentAny, SyntheticInstrument},
38 orderbook::OrderBook,
39 orders::OrderAny,
40 position::Position,
41 types::Currency,
42};
43use sqlx::{PgPool, postgres::PgConnectOptions};
44use tokio::try_join;
45use ustr::Ustr;
46
47use crate::sql::{
48 pg::{connect_pg, get_postgres_connect_options},
49 queries::DatabaseQueries,
50};
51
52#[derive(Debug)]
53#[cfg_attr(
54 feature = "python",
55 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
56)]
57pub struct PostgresCacheDatabase {
58 pub pool: PgPool,
59 tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
60 handle: tokio::task::JoinHandle<()>,
61}
62
63#[allow(clippy::large_enum_variant)]
64#[derive(Debug, Clone)]
65pub enum DatabaseQuery {
66 Close,
67 Add(String, Vec<u8>),
68 AddCurrency(Currency),
69 AddInstrument(InstrumentAny),
70 AddOrder(OrderAny, Option<ClientId>, bool),
71 AddOrderSnapshot(OrderSnapshot),
72 AddPositionSnapshot(PositionSnapshot),
73 AddAccount(AccountAny, bool),
74 AddSignal(Signal),
75 AddCustom(CustomData),
76 AddQuote(QuoteTick),
77 AddTrade(TradeTick),
78 AddBar(Bar),
79 UpdateOrder(OrderEventAny),
80}
81
82impl PostgresCacheDatabase {
83 pub async fn connect(
84 host: Option<String>,
85 port: Option<u16>,
86 username: Option<String>,
87 password: Option<String>,
88 database: Option<String>,
89 ) -> Result<Self, sqlx::Error> {
90 let pg_connect_options =
91 get_postgres_connect_options(host, port, username, password, database);
92 let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
93 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
94
95 let handle = tokio::spawn(async move {
97 PostgresCacheDatabase::process_commands(rx, pg_connect_options.clone().into()).await;
98 });
99 Ok(PostgresCacheDatabase { pool, tx, handle })
100 }
101
102 async fn process_commands(
103 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
104 pg_connect_options: PgConnectOptions,
105 ) {
106 tracing::debug!("Starting cache processing");
107
108 let pool = connect_pg(pg_connect_options).await.unwrap();
109
110 let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
112 let mut last_drain = Instant::now();
113
114 let buffer_interval = Duration::from_millis(0);
116
117 loop {
119 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
120 drain_buffer(&pool, &mut buffer).await;
121 last_drain = Instant::now();
122 } else {
123 match rx.recv().await {
124 Some(msg) => match msg {
125 DatabaseQuery::Close => break,
126 _ => buffer.push_back(msg),
127 },
128 None => break, }
130 }
131 }
132
133 if !buffer.is_empty() {
135 drain_buffer(&pool, &mut buffer).await;
136 }
137
138 tracing::debug!("Stopped cache processing");
139 }
140}
141
142pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
143 let connect_options = get_postgres_connect_options(None, None, None, None, None);
144 Ok(PostgresCacheDatabase::connect(
145 Some(connect_options.host),
146 Some(connect_options.port),
147 Some(connect_options.username),
148 Some(connect_options.password),
149 Some(connect_options.database),
150 )
151 .await?)
152}
153
154#[allow(dead_code)]
155#[allow(unused)]
156#[async_trait::async_trait]
157impl CacheDatabaseAdapter for PostgresCacheDatabase {
158 fn close(&mut self) -> anyhow::Result<()> {
159 let pool = self.pool.clone();
160 let (tx, rx) = std::sync::mpsc::channel();
161
162 log::debug!("Closing connection pool");
163 tokio::task::block_in_place(|| {
164 get_runtime().block_on(async {
165 pool.close().await;
166 if let Err(e) = tx.send(()) {
167 log::error!("Error closing pool: {e:?}");
168 }
169 });
170 });
171
172 if let Err(e) = self.tx.send(DatabaseQuery::Close) {
174 log::error!("Error sending close: {e:?}");
175 }
176
177 log::debug!("Awaiting task 'cache-write'"); tokio::task::block_in_place(|| {
179 if let Err(e) = get_runtime().block_on(&mut self.handle) {
180 log::error!("Error awaiting task 'cache-write': {e:?}");
181 }
182 });
183
184 log::debug!("Closed");
185
186 Ok(rx.recv()?)
187 }
188
189 fn flush(&mut self) -> anyhow::Result<()> {
190 let pool = self.pool.clone();
191 let (tx, rx) = std::sync::mpsc::channel();
192
193 tokio::task::block_in_place(|| {
194 get_runtime().block_on(async {
195 if let Err(e) = DatabaseQueries::truncate(&pool).await {
196 log::error!("Error flushing pool: {e:?}");
197 }
198 if let Err(e) = tx.send(()) {
199 log::error!("Error sending flush result: {e:?}");
200 }
201 });
202 });
203
204 Ok(rx.recv()?)
205 }
206
207 async fn load_all(&self) -> anyhow::Result<CacheMap> {
208 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
209 self.load_currencies(),
210 self.load_instruments(),
211 self.load_synthetics(),
212 self.load_accounts(),
213 self.load_orders(),
214 self.load_positions()
215 )
216 .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
217
218 Ok(CacheMap {
219 currencies,
220 instruments,
221 synthetics,
222 accounts,
223 orders,
224 positions,
225 })
226 }
227
228 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
229 let pool = self.pool.clone();
230 let (tx, rx) = std::sync::mpsc::channel();
231 tokio::spawn(async move {
232 let result = DatabaseQueries::load(&pool).await;
233 match result {
234 Ok(items) => {
235 let mapping = items
236 .into_iter()
237 .map(|(k, v)| (k, Bytes::from(v)))
238 .collect();
239 if let Err(e) = tx.send(mapping) {
240 log::error!("Failed to send general items: {e:?}");
241 }
242 }
243 Err(e) => {
244 log::error!("Failed to load general items: {e:?}");
245 if let Err(e) = tx.send(HashMap::new()) {
246 log::error!("Failed to send empty general items: {e:?}");
247 }
248 }
249 }
250 });
251 Ok(rx.recv()?)
252 }
253
254 async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
255 let pool = self.pool.clone();
256 let (tx, rx) = std::sync::mpsc::channel();
257 tokio::spawn(async move {
258 let result = DatabaseQueries::load_currencies(&pool).await;
259 match result {
260 Ok(currencies) => {
261 let mapping = currencies
262 .into_iter()
263 .map(|currency| (currency.code, currency))
264 .collect();
265 if let Err(e) = tx.send(mapping) {
266 log::error!("Failed to send currencies: {e:?}");
267 }
268 }
269 Err(e) => {
270 log::error!("Failed to load currencies: {e:?}");
271 if let Err(e) = tx.send(HashMap::new()) {
272 log::error!("Failed to send empty currencies: {e:?}");
273 }
274 }
275 }
276 });
277 Ok(rx.recv()?)
278 }
279
280 async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
281 let pool = self.pool.clone();
282 let (tx, rx) = std::sync::mpsc::channel();
283 tokio::spawn(async move {
284 let result = DatabaseQueries::load_instruments(&pool).await;
285 match result {
286 Ok(instruments) => {
287 let mapping = instruments
288 .into_iter()
289 .map(|instrument| (instrument.id(), instrument))
290 .collect();
291 if let Err(e) = tx.send(mapping) {
292 log::error!("Failed to send instruments: {e:?}");
293 }
294 }
295 Err(e) => {
296 log::error!("Failed to load instruments: {e:?}");
297 if let Err(e) = tx.send(HashMap::new()) {
298 log::error!("Failed to send empty instruments: {e:?}");
299 }
300 }
301 }
302 });
303 Ok(rx.recv()?)
304 }
305
306 async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
307 todo!()
308 }
309
310 async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
311 let pool = self.pool.clone();
312 let (tx, rx) = std::sync::mpsc::channel();
313 tokio::spawn(async move {
314 let result = DatabaseQueries::load_accounts(&pool).await;
315 match result {
316 Ok(accounts) => {
317 let mapping = accounts
318 .into_iter()
319 .map(|account| (account.id(), account))
320 .collect();
321 if let Err(e) = tx.send(mapping) {
322 log::error!("Failed to send accounts: {e:?}");
323 }
324 }
325 Err(e) => {
326 log::error!("Failed to load accounts: {e:?}");
327 if let Err(e) = tx.send(HashMap::new()) {
328 log::error!("Failed to send empty accounts: {e:?}");
329 }
330 }
331 }
332 });
333 Ok(rx.recv()?)
334 }
335
336 async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
337 let pool = self.pool.clone();
338 let (tx, rx) = std::sync::mpsc::channel();
339 tokio::spawn(async move {
340 let result = DatabaseQueries::load_orders(&pool).await;
341 match result {
342 Ok(orders) => {
343 let mapping = orders
344 .into_iter()
345 .map(|order| (order.client_order_id(), order))
346 .collect();
347 if let Err(e) = tx.send(mapping) {
348 log::error!("Failed to send orders: {e:?}");
349 }
350 }
351 Err(e) => {
352 log::error!("Failed to load orders: {e:?}");
353 if let Err(e) = tx.send(HashMap::new()) {
354 log::error!("Failed to send empty orders: {e:?}");
355 }
356 }
357 }
358 });
359 Ok(rx.recv()?)
360 }
361
362 async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
363 todo!()
364 }
365
366 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
367 todo!()
368 }
369
370 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
371 let pool = self.pool.clone();
372 let (tx, rx) = std::sync::mpsc::channel();
373 tokio::spawn(async move {
374 let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
375 match result {
376 Ok(currency) => {
377 if let Err(e) = tx.send(currency) {
378 log::error!("Failed to send load_index_order_client result: {e:?}");
379 }
380 }
381 Err(e) => {
382 log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
383 if let Err(e) = tx.send(HashMap::new()) {
384 log::error!("Failed to send empty load_index_order_client result: {e:?}");
385 }
386 }
387 }
388 });
389 Ok(rx.recv()?)
390 }
391
392 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
393 let pool = self.pool.clone();
394 let code = code.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
396 tokio::spawn(async move {
397 let result = DatabaseQueries::load_currency(&pool, &code).await;
398 match result {
399 Ok(currency) => {
400 if let Err(e) = tx.send(currency) {
401 log::error!("Failed to send currency {code}: {e:?}");
402 }
403 }
404 Err(e) => {
405 log::error!("Failed to load currency {code}: {e:?}");
406 if let Err(e) = tx.send(None) {
407 log::error!("Failed to send None for currency {code}: {e:?}");
408 }
409 }
410 }
411 });
412 Ok(rx.recv()?)
413 }
414
415 async fn load_instrument(
416 &self,
417 instrument_id: &InstrumentId,
418 ) -> anyhow::Result<Option<InstrumentAny>> {
419 let pool = self.pool.clone();
420 let instrument_id = instrument_id.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
422 tokio::spawn(async move {
423 let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
424 match result {
425 Ok(instrument) => {
426 if let Err(e) = tx.send(instrument) {
427 log::error!("Failed to send instrument {instrument_id}: {e:?}");
428 }
429 }
430 Err(e) => {
431 log::error!("Failed to load instrument {instrument_id}: {e:?}");
432 if let Err(e) = tx.send(None) {
433 log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
434 }
435 }
436 }
437 });
438 Ok(rx.recv()?)
439 }
440
441 async fn load_synthetic(
442 &self,
443 instrument_id: &InstrumentId,
444 ) -> anyhow::Result<Option<SyntheticInstrument>> {
445 todo!()
446 }
447
448 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
449 let pool = self.pool.clone();
450 let account_id = account_id.to_owned();
451 let (tx, rx) = std::sync::mpsc::channel();
452 tokio::spawn(async move {
453 let result = DatabaseQueries::load_account(&pool, &account_id).await;
454 match result {
455 Ok(account) => {
456 if let Err(e) = tx.send(account) {
457 log::error!("Failed to send account {account_id}: {e:?}");
458 }
459 }
460 Err(e) => {
461 log::error!("Failed to load account {account_id}: {e:?}");
462 if let Err(e) = tx.send(None) {
463 log::error!("Failed to send None for account {account_id}: {e:?}");
464 }
465 }
466 }
467 });
468 Ok(rx.recv()?)
469 }
470
471 async fn load_order(
472 &self,
473 client_order_id: &ClientOrderId,
474 ) -> anyhow::Result<Option<OrderAny>> {
475 let pool = self.pool.clone();
476 let client_order_id = client_order_id.to_owned();
477 let (tx, rx) = std::sync::mpsc::channel();
478 tokio::spawn(async move {
479 let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
480 match result {
481 Ok(order) => {
482 if let Err(e) = tx.send(order) {
483 log::error!("Failed to send order {client_order_id}: {e:?}");
484 }
485 }
486 Err(e) => {
487 log::error!("Failed to load order {client_order_id}: {e:?}");
488 let _ = tx.send(None);
489 }
490 }
491 });
492 Ok(rx.recv()?)
493 }
494
495 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
496 todo!()
497 }
498
499 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
500 todo!()
501 }
502
503 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
504 todo!()
505 }
506
507 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
508 todo!()
509 }
510
511 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
512 todo!()
513 }
514
515 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
516 let query = DatabaseQuery::Add(key, value.into());
517 self.tx
518 .send(query)
519 .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
520 }
521
522 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
523 let query = DatabaseQuery::AddCurrency(*currency);
524 self.tx.send(query).map_err(|e| {
525 anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
526 })
527 }
528
529 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
530 let query = DatabaseQuery::AddInstrument(instrument.clone());
531 self.tx.send(query).map_err(|e| {
532 anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
533 })
534 }
535
536 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
537 todo!()
538 }
539
540 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
541 let query = DatabaseQuery::AddAccount(account.clone(), false);
542 self.tx.send(query).map_err(|e| {
543 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
544 })
545 }
546
547 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
548 let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
549 self.tx.send(query).map_err(|e| {
550 anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
551 })
552 }
553
554 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
555 let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
556 self.tx.send(query).map_err(|e| {
557 anyhow::anyhow!(
558 "Failed to send query add_order_snapshot to database message handler: {e}"
559 )
560 })
561 }
562
563 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
564 todo!()
565 }
566
567 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
568 let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
569 self.tx.send(query).map_err(|e| {
570 anyhow::anyhow!(
571 "Failed to send query add_position_snapshot to database message handler: {e}"
572 )
573 })
574 }
575
576 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
577 todo!()
578 }
579
580 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
581 let query = DatabaseQuery::AddQuote(quote.to_owned());
582 self.tx.send(query).map_err(|e| {
583 anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
584 })
585 }
586
587 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
588 let pool = self.pool.clone();
589 let instrument_id = instrument_id.to_owned();
590 let (tx, rx) = std::sync::mpsc::channel();
591 tokio::spawn(async move {
592 let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
593 match result {
594 Ok(quotes) => {
595 if let Err(e) = tx.send(quotes) {
596 log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
597 }
598 }
599 Err(e) => {
600 log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
601 if let Err(e) = tx.send(Vec::new()) {
602 log::error!(
603 "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
604 );
605 }
606 }
607 }
608 });
609 Ok(rx.recv()?)
610 }
611
612 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
613 let query = DatabaseQuery::AddTrade(trade.to_owned());
614 self.tx.send(query).map_err(|e| {
615 anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
616 })
617 }
618
619 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
620 let pool = self.pool.clone();
621 let instrument_id = instrument_id.to_owned();
622 let (tx, rx) = std::sync::mpsc::channel();
623 tokio::spawn(async move {
624 let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
625 match result {
626 Ok(trades) => {
627 if let Err(e) = tx.send(trades) {
628 log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
629 }
630 }
631 Err(e) => {
632 log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
633 if let Err(e) = tx.send(Vec::new()) {
634 log::error!(
635 "Failed to send empty trades for instrument {instrument_id}: {e:?}"
636 );
637 }
638 }
639 }
640 });
641 Ok(rx.recv()?)
642 }
643
644 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
645 let query = DatabaseQuery::AddBar(bar.to_owned());
646 self.tx.send(query).map_err(|e| {
647 anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
648 })
649 }
650
651 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
652 let pool = self.pool.clone();
653 let instrument_id = instrument_id.to_owned();
654 let (tx, rx) = std::sync::mpsc::channel();
655 tokio::spawn(async move {
656 let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
657 match result {
658 Ok(bars) => {
659 if let Err(e) = tx.send(bars) {
660 log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
661 }
662 }
663 Err(e) => {
664 log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
665 if let Err(e) = tx.send(Vec::new()) {
666 log::error!(
667 "Failed to send empty bars for instrument {instrument_id}: {e:?}"
668 );
669 }
670 }
671 }
672 });
673 Ok(rx.recv()?)
674 }
675
676 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
677 let query = DatabaseQuery::AddSignal(signal.to_owned());
678 self.tx.send(query).map_err(|e| {
679 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
680 })
681 }
682
683 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
684 let pool = self.pool.clone();
685 let name = name.to_owned();
686 let (tx, rx) = std::sync::mpsc::channel();
687 tokio::spawn(async move {
688 let result = DatabaseQueries::load_signals(&pool, &name).await;
689 match result {
690 Ok(signals) => {
691 if let Err(e) = tx.send(signals) {
692 log::error!("Failed to send signals for '{name}': {e:?}");
693 }
694 }
695 Err(e) => {
696 log::error!("Failed to load signals for '{name}': {e:?}");
697 if let Err(e) = tx.send(Vec::new()) {
698 log::error!("Failed to send empty signals for '{name}': {e:?}");
699 }
700 }
701 }
702 });
703 Ok(rx.recv()?)
704 }
705
706 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
707 let query = DatabaseQuery::AddCustom(data.to_owned());
708 self.tx.send(query).map_err(|e| {
709 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
710 })
711 }
712
713 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
714 let pool = self.pool.clone();
715 let data_type = data_type.to_owned();
716 let (tx, rx) = std::sync::mpsc::channel();
717 tokio::spawn(async move {
718 let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
719 match result {
720 Ok(signals) => {
721 if let Err(e) = tx.send(signals) {
722 log::error!("Failed to send custom data for '{data_type}': {e:?}");
723 }
724 }
725 Err(e) => {
726 log::error!("Failed to load custom data for '{data_type}': {e:?}");
727 if let Err(e) = tx.send(Vec::new()) {
728 log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
729 }
730 }
731 }
732 });
733 Ok(rx.recv()?)
734 }
735
736 fn load_order_snapshot(
737 &self,
738 client_order_id: &ClientOrderId,
739 ) -> anyhow::Result<Option<OrderSnapshot>> {
740 let pool = self.pool.clone();
741 let client_order_id = client_order_id.to_owned();
742 let (tx, rx) = std::sync::mpsc::channel();
743 tokio::spawn(async move {
744 let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
745 match result {
746 Ok(snapshot) => {
747 if let Err(e) = tx.send(snapshot) {
748 log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
749 }
750 }
751 Err(e) => {
752 log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
753 if let Err(e) = tx.send(None) {
754 log::error!(
755 "Failed to send None for order snapshot {client_order_id}: {e:?}"
756 );
757 }
758 }
759 }
760 });
761 Ok(rx.recv()?)
762 }
763
764 fn load_position_snapshot(
765 &self,
766 position_id: &PositionId,
767 ) -> anyhow::Result<Option<PositionSnapshot>> {
768 let pool = self.pool.clone();
769 let position_id = position_id.to_owned();
770 let (tx, rx) = std::sync::mpsc::channel();
771 tokio::spawn(async move {
772 let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
773 match result {
774 Ok(snapshot) => {
775 if let Err(e) = tx.send(snapshot) {
776 log::error!("Failed to send position snapshot {position_id}: {e:?}");
777 }
778 }
779 Err(e) => {
780 log::error!("Failed to load position snapshot {position_id}: {e:?}");
781 if let Err(e) = tx.send(None) {
782 log::error!(
783 "Failed to send None for position snapshot {position_id}: {e:?}"
784 );
785 }
786 }
787 }
788 });
789 Ok(rx.recv()?)
790 }
791
792 fn index_venue_order_id(
793 &self,
794 client_order_id: ClientOrderId,
795 venue_order_id: VenueOrderId,
796 ) -> anyhow::Result<()> {
797 todo!()
798 }
799
800 fn index_order_position(
801 &self,
802 client_order_id: ClientOrderId,
803 position_id: PositionId,
804 ) -> anyhow::Result<()> {
805 todo!()
806 }
807
808 fn update_actor(&self) -> anyhow::Result<()> {
809 todo!()
810 }
811
812 fn update_strategy(&self) -> anyhow::Result<()> {
813 todo!()
814 }
815
816 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
817 let query = DatabaseQuery::AddAccount(account.clone(), true);
818 self.tx.send(query).map_err(|e| {
819 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
820 })
821 }
822
823 fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
824 let query = DatabaseQuery::UpdateOrder(event.clone());
825 self.tx.send(query).map_err(|e| {
826 anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
827 })
828 }
829
830 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
831 todo!()
832 }
833
834 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
835 todo!()
836 }
837
838 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
839 todo!()
840 }
841
842 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
843 todo!()
844 }
845}
846
847async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
848 for cmd in buffer.drain(..) {
849 let result: anyhow::Result<()> = match cmd {
850 DatabaseQuery::Close => Ok(()),
851 DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
852 DatabaseQuery::AddCurrency(currency) => {
853 DatabaseQueries::add_currency(pool, currency).await
854 }
855 DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
856 InstrumentAny::Betting(instrument) => {
857 DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
858 }
859 InstrumentAny::BinaryOption(instrument) => {
860 DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
861 .await
862 }
863 InstrumentAny::CryptoFuture(instrument) => {
864 DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
865 .await
866 }
867 InstrumentAny::CryptoPerpetual(instrument) => {
868 DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
869 .await
870 }
871 InstrumentAny::CurrencyPair(instrument) => {
872 DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
873 .await
874 }
875 InstrumentAny::Equity(equity) => {
876 DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
877 }
878 InstrumentAny::FuturesContract(instrument) => {
879 DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
880 .await
881 }
882 InstrumentAny::FuturesSpread(instrument) => {
883 DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
884 .await
885 }
886 InstrumentAny::OptionContract(instrument) => {
887 DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
888 .await
889 }
890 InstrumentAny::OptionSpread(instrument) => {
891 DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
892 .await
893 }
894 },
895 DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
896 OrderAny::Limit(order) => {
897 DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
898 .await
899 }
900 OrderAny::LimitIfTouched(order) => {
901 DatabaseQueries::add_order(
902 pool,
903 "LIMIT_IF_TOUCHED",
904 updated,
905 Box::new(order),
906 client_id,
907 )
908 .await
909 }
910 OrderAny::Market(order) => {
911 DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
912 .await
913 }
914 OrderAny::MarketIfTouched(order) => {
915 DatabaseQueries::add_order(
916 pool,
917 "MARKET_IF_TOUCHED",
918 updated,
919 Box::new(order),
920 client_id,
921 )
922 .await
923 }
924 OrderAny::MarketToLimit(order) => {
925 DatabaseQueries::add_order(
926 pool,
927 "MARKET_TO_LIMIT",
928 updated,
929 Box::new(order),
930 client_id,
931 )
932 .await
933 }
934 OrderAny::StopLimit(order) => {
935 DatabaseQueries::add_order(
936 pool,
937 "STOP_LIMIT",
938 updated,
939 Box::new(order),
940 client_id,
941 )
942 .await
943 }
944 OrderAny::StopMarket(order) => {
945 DatabaseQueries::add_order(
946 pool,
947 "STOP_MARKET",
948 updated,
949 Box::new(order),
950 client_id,
951 )
952 .await
953 }
954 OrderAny::TrailingStopLimit(order) => {
955 DatabaseQueries::add_order(
956 pool,
957 "TRAILING_STOP_LIMIT",
958 updated,
959 Box::new(order),
960 client_id,
961 )
962 .await
963 }
964 OrderAny::TrailingStopMarket(order) => {
965 DatabaseQueries::add_order(
966 pool,
967 "TRAILING_STOP_MARKET",
968 updated,
969 Box::new(order),
970 client_id,
971 )
972 .await
973 }
974 },
975 DatabaseQuery::AddOrderSnapshot(snapshot) => {
976 DatabaseQueries::add_order_snapshot(pool, snapshot).await
977 }
978 DatabaseQuery::AddPositionSnapshot(snapshot) => {
979 DatabaseQueries::add_position_snapshot(pool, snapshot).await
980 }
981 DatabaseQuery::AddAccount(account_any, updated) => match account_any {
982 AccountAny::Cash(account) => {
983 DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
984 }
985 AccountAny::Margin(account) => {
986 DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
987 }
988 },
989 DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
990 DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
991 DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, "e).await,
992 DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
993 DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
994 DatabaseQuery::UpdateOrder(event) => {
995 DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
996 }
997 };
998
999 if let Err(e) = result {
1000 tracing::error!("Error on query: {e:?}");
1001 }
1002 }
1003}