1use std::{
17 collections::{HashMap, VecDeque},
18 time::{Duration, Instant},
19};
20
21use bytes::Bytes;
22use nautilus_common::{
23 cache::database::{CacheDatabaseAdapter, CacheMap},
24 custom::CustomData,
25 runtime::get_runtime,
26 signal::Signal,
27};
28use nautilus_core::UnixNanos;
29use nautilus_model::{
30 accounts::AccountAny,
31 data::{Bar, DataType, QuoteTick, TradeTick},
32 events::{OrderEventAny, OrderSnapshot, position::snapshot::PositionSnapshot},
33 identifiers::{
34 AccountId, ClientId, ClientOrderId, ComponentId, InstrumentId, PositionId, StrategyId,
35 VenueOrderId,
36 },
37 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
38 orderbook::OrderBook,
39 orders::{Order, OrderAny},
40 position::Position,
41 types::Currency,
42};
43use sqlx::{PgPool, postgres::PgConnectOptions};
44use tokio::try_join;
45use ustr::Ustr;
46
47use crate::sql::{
48 pg::{connect_pg, get_postgres_connect_options},
49 queries::DatabaseQueries,
50};
51
52#[derive(Debug)]
53#[cfg_attr(
54 feature = "python",
55 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.infrastructure")
56)]
57pub struct PostgresCacheDatabase {
58 pub pool: PgPool,
59 tx: tokio::sync::mpsc::UnboundedSender<DatabaseQuery>,
60 handle: tokio::task::JoinHandle<()>,
61}
62
63#[allow(clippy::large_enum_variant)]
64#[derive(Debug, Clone)]
65pub enum DatabaseQuery {
66 Close,
67 Add(String, Vec<u8>),
68 AddCurrency(Currency),
69 AddInstrument(InstrumentAny),
70 AddOrder(OrderAny, Option<ClientId>, bool),
71 AddOrderSnapshot(OrderSnapshot),
72 AddPositionSnapshot(PositionSnapshot),
73 AddAccount(AccountAny, bool),
74 AddSignal(Signal),
75 AddCustom(CustomData),
76 AddQuote(QuoteTick),
77 AddTrade(TradeTick),
78 AddBar(Bar),
79 UpdateOrder(OrderEventAny),
80}
81
82impl PostgresCacheDatabase {
83 pub async fn connect(
84 host: Option<String>,
85 port: Option<u16>,
86 username: Option<String>,
87 password: Option<String>,
88 database: Option<String>,
89 ) -> Result<Self, sqlx::Error> {
90 let pg_connect_options =
91 get_postgres_connect_options(host, port, username, password, database);
92 let pool = connect_pg(pg_connect_options.clone().into()).await.unwrap();
93 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<DatabaseQuery>();
94
95 let handle = tokio::spawn(async move {
97 PostgresCacheDatabase::process_commands(rx, pg_connect_options.clone().into()).await;
98 });
99 Ok(PostgresCacheDatabase { pool, tx, handle })
100 }
101
102 async fn process_commands(
103 mut rx: tokio::sync::mpsc::UnboundedReceiver<DatabaseQuery>,
104 pg_connect_options: PgConnectOptions,
105 ) {
106 tracing::debug!("Starting cache processing");
107
108 let pool = connect_pg(pg_connect_options).await.unwrap();
109
110 let mut buffer: VecDeque<DatabaseQuery> = VecDeque::new();
112 let mut last_drain = Instant::now();
113
114 let buffer_interval = Duration::from_millis(0);
116
117 loop {
119 if last_drain.elapsed() >= buffer_interval && !buffer.is_empty() {
120 drain_buffer(&pool, &mut buffer).await;
121 last_drain = Instant::now();
122 } else {
123 match rx.recv().await {
124 Some(msg) => {
125 tracing::debug!("Received {msg:?}");
126 match msg {
127 DatabaseQuery::Close => break,
128 _ => buffer.push_back(msg),
129 }
130 }
131 None => {
132 tracing::debug!("Command channel closed");
133 break;
134 }
135 }
136 }
137 }
138
139 if !buffer.is_empty() {
141 drain_buffer(&pool, &mut buffer).await;
142 }
143
144 tracing::debug!("Stopped cache processing");
145 }
146}
147
148pub async fn get_pg_cache_database() -> anyhow::Result<PostgresCacheDatabase> {
149 let connect_options = get_postgres_connect_options(None, None, None, None, None);
150 Ok(PostgresCacheDatabase::connect(
151 Some(connect_options.host),
152 Some(connect_options.port),
153 Some(connect_options.username),
154 Some(connect_options.password),
155 Some(connect_options.database),
156 )
157 .await?)
158}
159
160#[allow(dead_code)]
161#[allow(unused)]
162#[async_trait::async_trait]
163impl CacheDatabaseAdapter for PostgresCacheDatabase {
164 fn close(&mut self) -> anyhow::Result<()> {
165 let pool = self.pool.clone();
166 let (tx, rx) = std::sync::mpsc::channel();
167
168 log::debug!("Closing connection pool");
169 tokio::task::block_in_place(|| {
170 get_runtime().block_on(async {
171 pool.close().await;
172 if let Err(e) = tx.send(()) {
173 log::error!("Error closing pool: {e:?}");
174 }
175 });
176 });
177
178 if let Err(e) = self.tx.send(DatabaseQuery::Close) {
180 log::error!("Error sending close: {e:?}");
181 }
182
183 log::debug!("Awaiting task 'cache-write'"); tokio::task::block_in_place(|| {
185 if let Err(e) = get_runtime().block_on(&mut self.handle) {
186 log::error!("Error awaiting task 'cache-write': {e:?}");
187 }
188 });
189
190 log::debug!("Closed");
191
192 Ok(rx.recv()?)
193 }
194
195 fn flush(&mut self) -> anyhow::Result<()> {
196 let pool = self.pool.clone();
197 let (tx, rx) = std::sync::mpsc::channel();
198
199 tokio::task::block_in_place(|| {
200 get_runtime().block_on(async {
201 if let Err(e) = DatabaseQueries::truncate(&pool).await {
202 log::error!("Error flushing pool: {e:?}");
203 }
204 if let Err(e) = tx.send(()) {
205 log::error!("Error sending flush result: {e:?}");
206 }
207 });
208 });
209
210 Ok(rx.recv()?)
211 }
212
213 async fn load_all(&self) -> anyhow::Result<CacheMap> {
214 let (currencies, instruments, synthetics, accounts, orders, positions) = try_join!(
215 self.load_currencies(),
216 self.load_instruments(),
217 self.load_synthetics(),
218 self.load_accounts(),
219 self.load_orders(),
220 self.load_positions()
221 )
222 .map_err(|e| anyhow::anyhow!("Error loading cache data: {}", e))?;
223
224 Ok(CacheMap {
225 currencies,
226 instruments,
227 synthetics,
228 accounts,
229 orders,
230 positions,
231 })
232 }
233
234 fn load(&self) -> anyhow::Result<HashMap<String, Bytes>> {
235 let pool = self.pool.clone();
236 let (tx, rx) = std::sync::mpsc::channel();
237 tokio::spawn(async move {
238 let result = DatabaseQueries::load(&pool).await;
239 match result {
240 Ok(items) => {
241 let mapping = items
242 .into_iter()
243 .map(|(k, v)| (k, Bytes::from(v)))
244 .collect();
245 if let Err(e) = tx.send(mapping) {
246 log::error!("Failed to send general items: {e:?}");
247 }
248 }
249 Err(e) => {
250 log::error!("Failed to load general items: {e:?}");
251 if let Err(e) = tx.send(HashMap::new()) {
252 log::error!("Failed to send empty general items: {e:?}");
253 }
254 }
255 }
256 });
257 Ok(rx.recv()?)
258 }
259
260 async fn load_currencies(&self) -> anyhow::Result<HashMap<Ustr, Currency>> {
261 let pool = self.pool.clone();
262 let (tx, rx) = std::sync::mpsc::channel();
263 tokio::spawn(async move {
264 let result = DatabaseQueries::load_currencies(&pool).await;
265 match result {
266 Ok(currencies) => {
267 let mapping = currencies
268 .into_iter()
269 .map(|currency| (currency.code, currency))
270 .collect();
271 if let Err(e) = tx.send(mapping) {
272 log::error!("Failed to send currencies: {e:?}");
273 }
274 }
275 Err(e) => {
276 log::error!("Failed to load currencies: {e:?}");
277 if let Err(e) = tx.send(HashMap::new()) {
278 log::error!("Failed to send empty currencies: {e:?}");
279 }
280 }
281 }
282 });
283 Ok(rx.recv()?)
284 }
285
286 async fn load_instruments(&self) -> anyhow::Result<HashMap<InstrumentId, InstrumentAny>> {
287 let pool = self.pool.clone();
288 let (tx, rx) = std::sync::mpsc::channel();
289 tokio::spawn(async move {
290 let result = DatabaseQueries::load_instruments(&pool).await;
291 match result {
292 Ok(instruments) => {
293 let mapping = instruments
294 .into_iter()
295 .map(|instrument| (instrument.id(), instrument))
296 .collect();
297 if let Err(e) = tx.send(mapping) {
298 log::error!("Failed to send instruments: {e:?}");
299 }
300 }
301 Err(e) => {
302 log::error!("Failed to load instruments: {e:?}");
303 if let Err(e) = tx.send(HashMap::new()) {
304 log::error!("Failed to send empty instruments: {e:?}");
305 }
306 }
307 }
308 });
309 Ok(rx.recv()?)
310 }
311
312 async fn load_synthetics(&self) -> anyhow::Result<HashMap<InstrumentId, SyntheticInstrument>> {
313 todo!()
314 }
315
316 async fn load_accounts(&self) -> anyhow::Result<HashMap<AccountId, AccountAny>> {
317 let pool = self.pool.clone();
318 let (tx, rx) = std::sync::mpsc::channel();
319 tokio::spawn(async move {
320 let result = DatabaseQueries::load_accounts(&pool).await;
321 match result {
322 Ok(accounts) => {
323 let mapping = accounts
324 .into_iter()
325 .map(|account| (account.id(), account))
326 .collect();
327 if let Err(e) = tx.send(mapping) {
328 log::error!("Failed to send accounts: {e:?}");
329 }
330 }
331 Err(e) => {
332 log::error!("Failed to load accounts: {e:?}");
333 if let Err(e) = tx.send(HashMap::new()) {
334 log::error!("Failed to send empty accounts: {e:?}");
335 }
336 }
337 }
338 });
339 Ok(rx.recv()?)
340 }
341
342 async fn load_orders(&self) -> anyhow::Result<HashMap<ClientOrderId, OrderAny>> {
343 let pool = self.pool.clone();
344 let (tx, rx) = std::sync::mpsc::channel();
345 tokio::spawn(async move {
346 let result = DatabaseQueries::load_orders(&pool).await;
347 match result {
348 Ok(orders) => {
349 let mapping = orders
350 .into_iter()
351 .map(|order| (order.client_order_id(), order))
352 .collect();
353 if let Err(e) = tx.send(mapping) {
354 log::error!("Failed to send orders: {e:?}");
355 }
356 }
357 Err(e) => {
358 log::error!("Failed to load orders: {e:?}");
359 if let Err(e) = tx.send(HashMap::new()) {
360 log::error!("Failed to send empty orders: {e:?}");
361 }
362 }
363 }
364 });
365 Ok(rx.recv()?)
366 }
367
368 async fn load_positions(&self) -> anyhow::Result<HashMap<PositionId, Position>> {
369 todo!()
370 }
371
372 fn load_index_order_position(&self) -> anyhow::Result<HashMap<ClientOrderId, Position>> {
373 todo!()
374 }
375
376 fn load_index_order_client(&self) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
377 let pool = self.pool.clone();
378 let (tx, rx) = std::sync::mpsc::channel();
379 tokio::spawn(async move {
380 let result = DatabaseQueries::load_distinct_order_event_client_ids(&pool).await;
381 match result {
382 Ok(currency) => {
383 if let Err(e) = tx.send(currency) {
384 log::error!("Failed to send load_index_order_client result: {e:?}");
385 }
386 }
387 Err(e) => {
388 log::error!("Failed to run query load_distinct_order_event_client_ids: {e:?}");
389 if let Err(e) = tx.send(HashMap::new()) {
390 log::error!("Failed to send empty load_index_order_client result: {e:?}");
391 }
392 }
393 }
394 });
395 Ok(rx.recv()?)
396 }
397
398 async fn load_currency(&self, code: &Ustr) -> anyhow::Result<Option<Currency>> {
399 let pool = self.pool.clone();
400 let code = code.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
402 tokio::spawn(async move {
403 let result = DatabaseQueries::load_currency(&pool, &code).await;
404 match result {
405 Ok(currency) => {
406 if let Err(e) = tx.send(currency) {
407 log::error!("Failed to send currency {code}: {e:?}");
408 }
409 }
410 Err(e) => {
411 log::error!("Failed to load currency {code}: {e:?}");
412 if let Err(e) = tx.send(None) {
413 log::error!("Failed to send None for currency {code}: {e:?}");
414 }
415 }
416 }
417 });
418 Ok(rx.recv()?)
419 }
420
421 async fn load_instrument(
422 &self,
423 instrument_id: &InstrumentId,
424 ) -> anyhow::Result<Option<InstrumentAny>> {
425 let pool = self.pool.clone();
426 let instrument_id = instrument_id.to_owned(); let (tx, rx) = std::sync::mpsc::channel();
428 tokio::spawn(async move {
429 let result = DatabaseQueries::load_instrument(&pool, &instrument_id).await;
430 match result {
431 Ok(instrument) => {
432 if let Err(e) = tx.send(instrument) {
433 log::error!("Failed to send instrument {instrument_id}: {e:?}");
434 }
435 }
436 Err(e) => {
437 log::error!("Failed to load instrument {instrument_id}: {e:?}");
438 if let Err(e) = tx.send(None) {
439 log::error!("Failed to send None for instrument {instrument_id}: {e:?}");
440 }
441 }
442 }
443 });
444 Ok(rx.recv()?)
445 }
446
447 async fn load_synthetic(
448 &self,
449 instrument_id: &InstrumentId,
450 ) -> anyhow::Result<Option<SyntheticInstrument>> {
451 todo!()
452 }
453
454 async fn load_account(&self, account_id: &AccountId) -> anyhow::Result<Option<AccountAny>> {
455 let pool = self.pool.clone();
456 let account_id = account_id.to_owned();
457 let (tx, rx) = std::sync::mpsc::channel();
458 tokio::spawn(async move {
459 let result = DatabaseQueries::load_account(&pool, &account_id).await;
460 match result {
461 Ok(account) => {
462 if let Err(e) = tx.send(account) {
463 log::error!("Failed to send account {account_id}: {e:?}");
464 }
465 }
466 Err(e) => {
467 log::error!("Failed to load account {account_id}: {e:?}");
468 if let Err(e) = tx.send(None) {
469 log::error!("Failed to send None for account {account_id}: {e:?}");
470 }
471 }
472 }
473 });
474 Ok(rx.recv()?)
475 }
476
477 async fn load_order(
478 &self,
479 client_order_id: &ClientOrderId,
480 ) -> anyhow::Result<Option<OrderAny>> {
481 let pool = self.pool.clone();
482 let client_order_id = client_order_id.to_owned();
483 let (tx, rx) = std::sync::mpsc::channel();
484 tokio::spawn(async move {
485 let result = DatabaseQueries::load_order(&pool, &client_order_id).await;
486 match result {
487 Ok(order) => {
488 if let Err(e) = tx.send(order) {
489 log::error!("Failed to send order {client_order_id}: {e:?}");
490 }
491 }
492 Err(e) => {
493 log::error!("Failed to load order {client_order_id}: {e:?}");
494 let _ = tx.send(None);
495 }
496 }
497 });
498 Ok(rx.recv()?)
499 }
500
501 async fn load_position(&self, position_id: &PositionId) -> anyhow::Result<Option<Position>> {
502 todo!()
503 }
504
505 fn load_actor(&self, component_id: &ComponentId) -> anyhow::Result<HashMap<String, Bytes>> {
506 todo!()
507 }
508
509 fn delete_actor(&self, component_id: &ComponentId) -> anyhow::Result<()> {
510 todo!()
511 }
512
513 fn load_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<HashMap<String, Bytes>> {
514 todo!()
515 }
516
517 fn delete_strategy(&self, component_id: &StrategyId) -> anyhow::Result<()> {
518 todo!()
519 }
520
521 fn add(&self, key: String, value: Bytes) -> anyhow::Result<()> {
522 let query = DatabaseQuery::Add(key, value.into());
523 self.tx
524 .send(query)
525 .map_err(|e| anyhow::anyhow!("Failed to send query to database message handler: {e}"))
526 }
527
528 fn add_currency(&self, currency: &Currency) -> anyhow::Result<()> {
529 let query = DatabaseQuery::AddCurrency(*currency);
530 self.tx.send(query).map_err(|e| {
531 anyhow::anyhow!("Failed to query add_currency to database message handler: {e}")
532 })
533 }
534
535 fn add_instrument(&self, instrument: &InstrumentAny) -> anyhow::Result<()> {
536 let query = DatabaseQuery::AddInstrument(instrument.clone());
537 self.tx.send(query).map_err(|e| {
538 anyhow::anyhow!("Failed to send query add_instrument to database message handler: {e}")
539 })
540 }
541
542 fn add_synthetic(&self, synthetic: &SyntheticInstrument) -> anyhow::Result<()> {
543 todo!()
544 }
545
546 fn add_account(&self, account: &AccountAny) -> anyhow::Result<()> {
547 let query = DatabaseQuery::AddAccount(account.clone(), false);
548 self.tx.send(query).map_err(|e| {
549 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
550 })
551 }
552
553 fn add_order(&self, order: &OrderAny, client_id: Option<ClientId>) -> anyhow::Result<()> {
554 let query = DatabaseQuery::AddOrder(order.clone(), client_id, false);
555 self.tx.send(query).map_err(|e| {
556 anyhow::anyhow!("Failed to send query add_order to database message handler: {e}")
557 })
558 }
559
560 fn add_order_snapshot(&self, snapshot: &OrderSnapshot) -> anyhow::Result<()> {
561 let query = DatabaseQuery::AddOrderSnapshot(snapshot.to_owned());
562 self.tx.send(query).map_err(|e| {
563 anyhow::anyhow!(
564 "Failed to send query add_order_snapshot to database message handler: {e}"
565 )
566 })
567 }
568
569 fn add_position(&self, position: &Position) -> anyhow::Result<()> {
570 todo!()
571 }
572
573 fn add_position_snapshot(&self, snapshot: &PositionSnapshot) -> anyhow::Result<()> {
574 let query = DatabaseQuery::AddPositionSnapshot(snapshot.to_owned());
575 self.tx.send(query).map_err(|e| {
576 anyhow::anyhow!(
577 "Failed to send query add_position_snapshot to database message handler: {e}"
578 )
579 })
580 }
581
582 fn add_order_book(&self, order_book: &OrderBook) -> anyhow::Result<()> {
583 todo!()
584 }
585
586 fn add_quote(&self, quote: &QuoteTick) -> anyhow::Result<()> {
587 let query = DatabaseQuery::AddQuote(quote.to_owned());
588 self.tx.send(query).map_err(|e| {
589 anyhow::anyhow!("Failed to send query add_quote to database message handler: {e}")
590 })
591 }
592
593 fn load_quotes(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<QuoteTick>> {
594 let pool = self.pool.clone();
595 let instrument_id = instrument_id.to_owned();
596 let (tx, rx) = std::sync::mpsc::channel();
597 tokio::spawn(async move {
598 let result = DatabaseQueries::load_quotes(&pool, &instrument_id).await;
599 match result {
600 Ok(quotes) => {
601 if let Err(e) = tx.send(quotes) {
602 log::error!("Failed to send quotes for instrument {instrument_id}: {e:?}");
603 }
604 }
605 Err(e) => {
606 log::error!("Failed to load quotes for instrument {instrument_id}: {e:?}");
607 if let Err(e) = tx.send(Vec::new()) {
608 log::error!(
609 "Failed to send empty quotes for instrument {instrument_id}: {e:?}"
610 );
611 }
612 }
613 }
614 });
615 Ok(rx.recv()?)
616 }
617
618 fn add_trade(&self, trade: &TradeTick) -> anyhow::Result<()> {
619 let query = DatabaseQuery::AddTrade(trade.to_owned());
620 self.tx.send(query).map_err(|e| {
621 anyhow::anyhow!("Failed to send query add_trade to database message handler: {e}")
622 })
623 }
624
625 fn load_trades(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<TradeTick>> {
626 let pool = self.pool.clone();
627 let instrument_id = instrument_id.to_owned();
628 let (tx, rx) = std::sync::mpsc::channel();
629 tokio::spawn(async move {
630 let result = DatabaseQueries::load_trades(&pool, &instrument_id).await;
631 match result {
632 Ok(trades) => {
633 if let Err(e) = tx.send(trades) {
634 log::error!("Failed to send trades for instrument {instrument_id}: {e:?}");
635 }
636 }
637 Err(e) => {
638 log::error!("Failed to load trades for instrument {instrument_id}: {e:?}");
639 if let Err(e) = tx.send(Vec::new()) {
640 log::error!(
641 "Failed to send empty trades for instrument {instrument_id}: {e:?}"
642 );
643 }
644 }
645 }
646 });
647 Ok(rx.recv()?)
648 }
649
650 fn add_bar(&self, bar: &Bar) -> anyhow::Result<()> {
651 let query = DatabaseQuery::AddBar(bar.to_owned());
652 self.tx.send(query).map_err(|e| {
653 anyhow::anyhow!("Failed to send query add_bar to database message handler: {e}")
654 })
655 }
656
657 fn load_bars(&self, instrument_id: &InstrumentId) -> anyhow::Result<Vec<Bar>> {
658 let pool = self.pool.clone();
659 let instrument_id = instrument_id.to_owned();
660 let (tx, rx) = std::sync::mpsc::channel();
661 tokio::spawn(async move {
662 let result = DatabaseQueries::load_bars(&pool, &instrument_id).await;
663 match result {
664 Ok(bars) => {
665 if let Err(e) = tx.send(bars) {
666 log::error!("Failed to send bars for instrument {instrument_id}: {e:?}");
667 }
668 }
669 Err(e) => {
670 log::error!("Failed to load bars for instrument {instrument_id}: {e:?}");
671 if let Err(e) = tx.send(Vec::new()) {
672 log::error!(
673 "Failed to send empty bars for instrument {instrument_id}: {e:?}"
674 );
675 }
676 }
677 }
678 });
679 Ok(rx.recv()?)
680 }
681
682 fn add_signal(&self, signal: &Signal) -> anyhow::Result<()> {
683 let query = DatabaseQuery::AddSignal(signal.to_owned());
684 self.tx.send(query).map_err(|e| {
685 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
686 })
687 }
688
689 fn load_signals(&self, name: &str) -> anyhow::Result<Vec<Signal>> {
690 let pool = self.pool.clone();
691 let name = name.to_owned();
692 let (tx, rx) = std::sync::mpsc::channel();
693 tokio::spawn(async move {
694 let result = DatabaseQueries::load_signals(&pool, &name).await;
695 match result {
696 Ok(signals) => {
697 if let Err(e) = tx.send(signals) {
698 log::error!("Failed to send signals for '{name}': {e:?}");
699 }
700 }
701 Err(e) => {
702 log::error!("Failed to load signals for '{name}': {e:?}");
703 if let Err(e) = tx.send(Vec::new()) {
704 log::error!("Failed to send empty signals for '{name}': {e:?}");
705 }
706 }
707 }
708 });
709 Ok(rx.recv()?)
710 }
711
712 fn add_custom_data(&self, data: &CustomData) -> anyhow::Result<()> {
713 let query = DatabaseQuery::AddCustom(data.to_owned());
714 self.tx.send(query).map_err(|e| {
715 anyhow::anyhow!("Failed to send query add_signal to database message handler: {e}")
716 })
717 }
718
719 fn load_custom_data(&self, data_type: &DataType) -> anyhow::Result<Vec<CustomData>> {
720 let pool = self.pool.clone();
721 let data_type = data_type.to_owned();
722 let (tx, rx) = std::sync::mpsc::channel();
723 tokio::spawn(async move {
724 let result = DatabaseQueries::load_custom_data(&pool, &data_type).await;
725 match result {
726 Ok(signals) => {
727 if let Err(e) = tx.send(signals) {
728 log::error!("Failed to send custom data for '{data_type}': {e:?}");
729 }
730 }
731 Err(e) => {
732 log::error!("Failed to load custom data for '{data_type}': {e:?}");
733 if let Err(e) = tx.send(Vec::new()) {
734 log::error!("Failed to send empty custom data for '{data_type}': {e:?}");
735 }
736 }
737 }
738 });
739 Ok(rx.recv()?)
740 }
741
742 fn load_order_snapshot(
743 &self,
744 client_order_id: &ClientOrderId,
745 ) -> anyhow::Result<Option<OrderSnapshot>> {
746 let pool = self.pool.clone();
747 let client_order_id = client_order_id.to_owned();
748 let (tx, rx) = std::sync::mpsc::channel();
749 tokio::spawn(async move {
750 let result = DatabaseQueries::load_order_snapshot(&pool, &client_order_id).await;
751 match result {
752 Ok(snapshot) => {
753 if let Err(e) = tx.send(snapshot) {
754 log::error!("Failed to send order snapshot {client_order_id}: {e:?}");
755 }
756 }
757 Err(e) => {
758 log::error!("Failed to load order snapshot {client_order_id}: {e:?}");
759 if let Err(e) = tx.send(None) {
760 log::error!(
761 "Failed to send None for order snapshot {client_order_id}: {e:?}"
762 );
763 }
764 }
765 }
766 });
767 Ok(rx.recv()?)
768 }
769
770 fn load_position_snapshot(
771 &self,
772 position_id: &PositionId,
773 ) -> anyhow::Result<Option<PositionSnapshot>> {
774 let pool = self.pool.clone();
775 let position_id = position_id.to_owned();
776 let (tx, rx) = std::sync::mpsc::channel();
777 tokio::spawn(async move {
778 let result = DatabaseQueries::load_position_snapshot(&pool, &position_id).await;
779 match result {
780 Ok(snapshot) => {
781 if let Err(e) = tx.send(snapshot) {
782 log::error!("Failed to send position snapshot {position_id}: {e:?}");
783 }
784 }
785 Err(e) => {
786 log::error!("Failed to load position snapshot {position_id}: {e:?}");
787 if let Err(e) = tx.send(None) {
788 log::error!(
789 "Failed to send None for position snapshot {position_id}: {e:?}"
790 );
791 }
792 }
793 }
794 });
795 Ok(rx.recv()?)
796 }
797
798 fn index_venue_order_id(
799 &self,
800 client_order_id: ClientOrderId,
801 venue_order_id: VenueOrderId,
802 ) -> anyhow::Result<()> {
803 todo!()
804 }
805
806 fn index_order_position(
807 &self,
808 client_order_id: ClientOrderId,
809 position_id: PositionId,
810 ) -> anyhow::Result<()> {
811 todo!()
812 }
813
814 fn update_actor(&self) -> anyhow::Result<()> {
815 todo!()
816 }
817
818 fn update_strategy(&self) -> anyhow::Result<()> {
819 todo!()
820 }
821
822 fn update_account(&self, account: &AccountAny) -> anyhow::Result<()> {
823 let query = DatabaseQuery::AddAccount(account.clone(), true);
824 self.tx.send(query).map_err(|e| {
825 anyhow::anyhow!("Failed to send query add_account to database message handler: {e}")
826 })
827 }
828
829 fn update_order(&self, event: &OrderEventAny) -> anyhow::Result<()> {
830 let query = DatabaseQuery::UpdateOrder(event.clone());
831 self.tx.send(query).map_err(|e| {
832 anyhow::anyhow!("Failed to send query update_order to database message handler: {e}")
833 })
834 }
835
836 fn update_position(&self, position: &Position) -> anyhow::Result<()> {
837 todo!()
838 }
839
840 fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
841 todo!()
842 }
843
844 fn snapshot_position_state(&self, position: &Position) -> anyhow::Result<()> {
845 todo!()
846 }
847
848 fn heartbeat(&self, timestamp: UnixNanos) -> anyhow::Result<()> {
849 todo!()
850 }
851}
852
853async fn drain_buffer(pool: &PgPool, buffer: &mut VecDeque<DatabaseQuery>) {
854 for cmd in buffer.drain(..) {
855 let result: anyhow::Result<()> = match cmd {
856 DatabaseQuery::Close => Ok(()),
857 DatabaseQuery::Add(key, value) => DatabaseQueries::add(pool, key, value).await,
858 DatabaseQuery::AddCurrency(currency) => {
859 DatabaseQueries::add_currency(pool, currency).await
860 }
861 DatabaseQuery::AddInstrument(instrument_any) => match instrument_any {
862 InstrumentAny::Betting(instrument) => {
863 DatabaseQueries::add_instrument(pool, "BETTING", Box::new(instrument)).await
864 }
865 InstrumentAny::BinaryOption(instrument) => {
866 DatabaseQueries::add_instrument(pool, "BINARY_OPTION", Box::new(instrument))
867 .await
868 }
869 InstrumentAny::CryptoFuture(instrument) => {
870 DatabaseQueries::add_instrument(pool, "CRYPTO_FUTURE", Box::new(instrument))
871 .await
872 }
873 InstrumentAny::CryptoOption(instrument) => {
874 DatabaseQueries::add_instrument(pool, "CRYPTO_OPTION", Box::new(instrument))
875 .await
876 }
877 InstrumentAny::CryptoPerpetual(instrument) => {
878 DatabaseQueries::add_instrument(pool, "CRYPTO_PERPETUAL", Box::new(instrument))
879 .await
880 }
881 InstrumentAny::CurrencyPair(instrument) => {
882 DatabaseQueries::add_instrument(pool, "CURRENCY_PAIR", Box::new(instrument))
883 .await
884 }
885 InstrumentAny::Equity(equity) => {
886 DatabaseQueries::add_instrument(pool, "EQUITY", Box::new(equity)).await
887 }
888 InstrumentAny::FuturesContract(instrument) => {
889 DatabaseQueries::add_instrument(pool, "FUTURES_CONTRACT", Box::new(instrument))
890 .await
891 }
892 InstrumentAny::FuturesSpread(instrument) => {
893 DatabaseQueries::add_instrument(pool, "FUTURES_SPREAD", Box::new(instrument))
894 .await
895 }
896 InstrumentAny::OptionContract(instrument) => {
897 DatabaseQueries::add_instrument(pool, "OPTION_CONTRACT", Box::new(instrument))
898 .await
899 }
900 InstrumentAny::OptionSpread(instrument) => {
901 DatabaseQueries::add_instrument(pool, "OPTION_SPREAD", Box::new(instrument))
902 .await
903 }
904 },
905 DatabaseQuery::AddOrder(order_any, client_id, updated) => match order_any {
906 OrderAny::Limit(order) => {
907 DatabaseQueries::add_order(pool, "LIMIT", updated, Box::new(order), client_id)
908 .await
909 }
910 OrderAny::LimitIfTouched(order) => {
911 DatabaseQueries::add_order(
912 pool,
913 "LIMIT_IF_TOUCHED",
914 updated,
915 Box::new(order),
916 client_id,
917 )
918 .await
919 }
920 OrderAny::Market(order) => {
921 DatabaseQueries::add_order(pool, "MARKET", updated, Box::new(order), client_id)
922 .await
923 }
924 OrderAny::MarketIfTouched(order) => {
925 DatabaseQueries::add_order(
926 pool,
927 "MARKET_IF_TOUCHED",
928 updated,
929 Box::new(order),
930 client_id,
931 )
932 .await
933 }
934 OrderAny::MarketToLimit(order) => {
935 DatabaseQueries::add_order(
936 pool,
937 "MARKET_TO_LIMIT",
938 updated,
939 Box::new(order),
940 client_id,
941 )
942 .await
943 }
944 OrderAny::StopLimit(order) => {
945 DatabaseQueries::add_order(
946 pool,
947 "STOP_LIMIT",
948 updated,
949 Box::new(order),
950 client_id,
951 )
952 .await
953 }
954 OrderAny::StopMarket(order) => {
955 DatabaseQueries::add_order(
956 pool,
957 "STOP_MARKET",
958 updated,
959 Box::new(order),
960 client_id,
961 )
962 .await
963 }
964 OrderAny::TrailingStopLimit(order) => {
965 DatabaseQueries::add_order(
966 pool,
967 "TRAILING_STOP_LIMIT",
968 updated,
969 Box::new(order),
970 client_id,
971 )
972 .await
973 }
974 OrderAny::TrailingStopMarket(order) => {
975 DatabaseQueries::add_order(
976 pool,
977 "TRAILING_STOP_MARKET",
978 updated,
979 Box::new(order),
980 client_id,
981 )
982 .await
983 }
984 },
985 DatabaseQuery::AddOrderSnapshot(snapshot) => {
986 DatabaseQueries::add_order_snapshot(pool, snapshot).await
987 }
988 DatabaseQuery::AddPositionSnapshot(snapshot) => {
989 DatabaseQueries::add_position_snapshot(pool, snapshot).await
990 }
991 DatabaseQuery::AddAccount(account_any, updated) => match account_any {
992 AccountAny::Cash(account) => {
993 DatabaseQueries::add_account(pool, "CASH", updated, Box::new(account)).await
994 }
995 AccountAny::Margin(account) => {
996 DatabaseQueries::add_account(pool, "MARGIN", updated, Box::new(account)).await
997 }
998 },
999 DatabaseQuery::AddSignal(signal) => DatabaseQueries::add_signal(pool, &signal).await,
1000 DatabaseQuery::AddCustom(data) => DatabaseQueries::add_custom_data(pool, &data).await,
1001 DatabaseQuery::AddQuote(quote) => DatabaseQueries::add_quote(pool, "e).await,
1002 DatabaseQuery::AddTrade(trade) => DatabaseQueries::add_trade(pool, &trade).await,
1003 DatabaseQuery::AddBar(bar) => DatabaseQueries::add_bar(pool, &bar).await,
1004 DatabaseQuery::UpdateOrder(event) => {
1005 DatabaseQueries::add_order_event(pool, event.into_boxed(), None).await
1006 }
1007 };
1008
1009 if let Err(e) = result {
1010 tracing::error!("Error on query: {e:?}");
1011 }
1012 }
1013}