1use std::collections::HashMap;
17
18use ahash::AHashMap;
19use nautilus_common::{custom::CustomData, signal::Signal};
20use nautilus_model::{
21 accounts::{Account, AccountAny},
22 data::{Bar, DataType, QuoteTick, TradeTick},
23 events::{
24 AccountState, OrderEvent, OrderEventAny, OrderSnapshot,
25 position::snapshot::PositionSnapshot,
26 },
27 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
28 instruments::{Instrument, InstrumentAny},
29 orders::{Order, OrderAny},
30 types::{AccountBalance, Currency, MarginBalance},
31};
32use sqlx::{PgPool, Row};
33
34use super::models::{
35 orders::OrderSnapshotModel,
36 positions::PositionSnapshotModel,
37 types::{CustomDataModel, SignalModel},
38};
39use crate::sql::models::{
40 accounts::AccountEventModel,
41 data::{BarModel, QuoteTickModel, TradeTickModel},
42 enums::{
43 AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
44 CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
45 },
46 general::{GeneralRow, OrderEventOrderClientIdCombination},
47 instruments::InstrumentAnyModel,
48 orders::OrderEventAnyModel,
49 types::CurrencyModel,
50};
51
52#[derive(Debug)]
53pub struct DatabaseQueries;
54
55impl DatabaseQueries {
56 pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
62 sqlx::query("SELECT truncate_all_tables()")
63 .execute(pool)
64 .await
65 .map(|_| ())
66 .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
67 }
68
69 pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
75 sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
76 .bind(key)
77 .bind(value)
78 .execute(pool)
79 .await
80 .map(|_| ())
81 .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
82 }
83
84 pub async fn load(pool: &PgPool) -> anyhow::Result<HashMap<String, Vec<u8>>> {
90 sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
91 .fetch_all(pool)
92 .await
93 .map(|rows| {
94 let mut cache: HashMap<String, Vec<u8>> = HashMap::new();
95 for row in rows {
96 cache.insert(row.id, row.value);
97 }
98 cache
99 })
100 .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
101 }
102
103 pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
109 sqlx::query(
110 "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
111 )
112 .bind(currency.code.as_str())
113 .bind(i32::from(currency.precision))
114 .bind(i32::from(currency.iso4217))
115 .bind(currency.name.as_str())
116 .bind(CurrencyTypeModel(currency.currency_type))
117 .execute(pool)
118 .await
119 .map(|_| ())
120 .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
121 }
122
123 pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
129 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
130 .fetch_all(pool)
131 .await
132 .map(|rows| rows.into_iter().map(|row| row.0).collect())
133 .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
134 }
135
136 pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
142 sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
143 .bind(code)
144 .fetch_optional(pool)
145 .await
146 .map(|currency| currency.map(|row| row.0))
147 .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
148 }
149
150 pub async fn add_instrument(
156 pool: &PgPool,
157 kind: &str,
158 instrument: Box<dyn Instrument>,
159 ) -> anyhow::Result<()> {
160 sqlx::query(r#"
161 INSERT INTO "instrument" (
162 id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
163 multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
164 price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
165 min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
166 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
167 ON CONFLICT (id)
168 DO UPDATE
169 SET
170 kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
171 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
172 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
173 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32, ts_event = $33, updated_at = CURRENT_TIMESTAMP
174 "#)
175 .bind(instrument.id().to_string())
176 .bind(kind)
177 .bind(instrument.raw_symbol().to_string())
178 .bind(instrument.base_currency().map(|x| x.code.as_str()))
179 .bind(instrument.underlying().map(|x| x.to_string()))
180 .bind(instrument.quote_currency().code.as_str())
181 .bind(instrument.settlement_currency().code.as_str())
182 .bind(instrument.isin().map(|x| x.to_string()))
183 .bind(AssetClassModel(instrument.asset_class()))
184 .bind(instrument.exchange().map(|x| x.to_string()))
185 .bind(instrument.multiplier().to_string())
186 .bind(instrument.option_kind().map(|x| x.to_string()))
187 .bind(instrument.is_inverse())
188 .bind(instrument.strike_price().map(|x| x.to_string()))
189 .bind(instrument.activation_ns().map(|x| x.to_string()))
190 .bind(instrument.expiration_ns().map(|x| x.to_string()))
191 .bind(i32::from(instrument.price_precision()))
192 .bind(i32::from(instrument.size_precision()))
193 .bind(instrument.price_increment().to_string())
194 .bind(instrument.size_increment().to_string())
195 .bind(instrument.maker_fee().to_string())
196 .bind(instrument.taker_fee().to_string())
197 .bind(instrument.margin_init().to_string())
198 .bind(instrument.margin_maint().to_string())
199 .bind(instrument.lot_size().map(|x| x.to_string()))
200 .bind(instrument.max_quantity().map(|x| x.to_string()))
201 .bind(instrument.min_quantity().map(|x| x.to_string()))
202 .bind(instrument.max_notional().map(|x| x.to_string()))
203 .bind(instrument.min_notional().map(|x| x.to_string()))
204 .bind(instrument.max_price().map(|x| x.to_string()))
205 .bind(instrument.min_price().map(|x| x.to_string()))
206 .bind(instrument.ts_init().to_string())
207 .bind(instrument.ts_event().to_string())
208 .execute(pool)
209 .await
210 .map(|_| ())
211 .map_err(|e| anyhow::anyhow!(format!("Failed to insert item {} into instrument table: {:?}", instrument.id().to_string(), e)))
212 }
213
214 pub async fn load_instrument(
220 pool: &PgPool,
221 instrument_id: &InstrumentId,
222 ) -> anyhow::Result<Option<InstrumentAny>> {
223 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
224 .bind(instrument_id.to_string())
225 .fetch_optional(pool)
226 .await
227 .map(|instrument| instrument.map(|row| row.0))
228 .map_err(|e| {
229 anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
230 })
231 }
232
233 pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
239 sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
240 .fetch_all(pool)
241 .await
242 .map(|rows| rows.into_iter().map(|row| row.0).collect())
243 .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
244 }
245
246 pub async fn add_order(
256 pool: &PgPool,
257 _kind: &str,
258 updated: bool,
259 order: Box<dyn Order>,
260 client_id: Option<ClientId>,
261 ) -> anyhow::Result<()> {
262 if updated {
263 let exists = Self::check_if_order_initialized_exists(pool, order.client_order_id())
264 .await
265 .unwrap();
266 assert!(
267 exists,
268 "OrderInitialized event does not exist for order: {}",
269 order.client_order_id()
270 );
271 }
272 match order.last_event().clone() {
273 OrderEventAny::Accepted(event) => {
274 Self::add_order_event(pool, Box::new(event), client_id).await
275 }
276 OrderEventAny::CancelRejected(event) => {
277 Self::add_order_event(pool, Box::new(event), client_id).await
278 }
279 OrderEventAny::Canceled(event) => {
280 Self::add_order_event(pool, Box::new(event), client_id).await
281 }
282 OrderEventAny::Denied(event) => {
283 Self::add_order_event(pool, Box::new(event), client_id).await
284 }
285 OrderEventAny::Emulated(event) => {
286 Self::add_order_event(pool, Box::new(event), client_id).await
287 }
288 OrderEventAny::Expired(event) => {
289 Self::add_order_event(pool, Box::new(event), client_id).await
290 }
291 OrderEventAny::Filled(event) => {
292 Self::add_order_event(pool, Box::new(event), client_id).await
293 }
294 OrderEventAny::Initialized(event) => {
295 Self::add_order_event(pool, Box::new(event), client_id).await
296 }
297 OrderEventAny::ModifyRejected(event) => {
298 Self::add_order_event(pool, Box::new(event), client_id).await
299 }
300 OrderEventAny::PendingCancel(event) => {
301 Self::add_order_event(pool, Box::new(event), client_id).await
302 }
303 OrderEventAny::PendingUpdate(event) => {
304 Self::add_order_event(pool, Box::new(event), client_id).await
305 }
306 OrderEventAny::Rejected(event) => {
307 Self::add_order_event(pool, Box::new(event), client_id).await
308 }
309 OrderEventAny::Released(event) => {
310 Self::add_order_event(pool, Box::new(event), client_id).await
311 }
312 OrderEventAny::Submitted(event) => {
313 Self::add_order_event(pool, Box::new(event), client_id).await
314 }
315 OrderEventAny::Updated(event) => {
316 Self::add_order_event(pool, Box::new(event), client_id).await
317 }
318 OrderEventAny::Triggered(event) => {
319 Self::add_order_event(pool, Box::new(event), client_id).await
320 }
321 }
322 }
323
324 pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
334 let mut transaction = pool.begin().await?;
335
336 sqlx::query(
339 r#"
340 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
341 "#,
342 )
343 .bind(snapshot.trader_id.to_string())
344 .execute(&mut *transaction)
345 .await
346 .map(|_| ())
347 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
348
349 sqlx::query(
350 r#"
351 INSERT INTO "order" (
352 id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
353 account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
354 trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
355 expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
356 is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
357 trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
358 parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
359 created_at, updated_at
360 ) VALUES (
361 $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
362 $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
363 $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
364 CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
365 )
366 ON CONFLICT (id)
367 DO UPDATE SET
368 trader_id = $2,
369 strategy_id = $3,
370 instrument_id = $4,
371 venue_order_id = $5,
372 position_id = $6,
373 account_id = $7,
374 last_trade_id = $8,
375 order_type = $9,
376 order_side = $10,
377 quantity = $11,
378 price = $12,
379 trigger_price = $13,
380 trigger_type = $14,
381 limit_offset = $15,
382 trailing_offset = $16,
383 trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
384 time_in_force = $18,
385 expire_time = $19,
386 filled_qty = $20,
387 liquidity_side = $21,
388 avg_px = $22,
389 slippage = $23,
390 commissions = $24,
391 status = $25,
392 is_post_only = $26,
393 is_reduce_only = $27,
394 is_quote_quantity = $28,
395 display_qty = $29,
396 emulation_trigger = $30,
397 trigger_instrument_id = $31,
398 contingency_type = $32,
399 order_list_id = $33,
400 linked_order_ids = $34,
401 parent_order_id = $35,
402 exec_algorithm_id = $36,
403 exec_algorithm_params = $37,
404 exec_spawn_id = $38,
405 tags = $39,
406 init_id = $40,
407 ts_init = $41,
408 ts_last = $42,
409 updated_at = CURRENT_TIMESTAMP
410 "#)
411 .bind(snapshot.client_order_id.to_string()) .bind(snapshot.trader_id.to_string())
413 .bind(snapshot.strategy_id.to_string())
414 .bind(snapshot.instrument_id.to_string())
415 .bind(snapshot.venue_order_id.map(|x| x.to_string()))
416 .bind(snapshot.position_id.map(|x| x.to_string()))
417 .bind(snapshot.account_id.map(|x| x.to_string()))
418 .bind(snapshot.last_trade_id.map(|x| x.to_string()))
419 .bind(snapshot.order_type.to_string())
420 .bind(snapshot.order_side.to_string())
421 .bind(snapshot.quantity.to_string())
422 .bind(snapshot.price.map(|x| x.to_string()))
423 .bind(snapshot.trigger_price.map(|x| x.to_string()))
424 .bind(snapshot.trigger_type.map(|x| x.to_string()))
425 .bind(snapshot.limit_offset.map(|x| x.to_string()))
426 .bind(snapshot.trailing_offset.map(|x| x.to_string()))
427 .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
428 .bind(snapshot.time_in_force.to_string())
429 .bind(snapshot.expire_time.map(|x| x.to_string()))
430 .bind(snapshot.filled_qty.to_string())
431 .bind(snapshot.liquidity_side.map(|x| x.to_string()))
432 .bind(snapshot.avg_px)
433 .bind(snapshot.slippage)
434 .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
435 .bind(snapshot.status.to_string())
436 .bind(snapshot.is_post_only)
437 .bind(snapshot.is_reduce_only)
438 .bind(snapshot.is_quote_quantity)
439 .bind(snapshot.display_qty.map(|x| x.to_string()))
440 .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
441 .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
442 .bind(snapshot.contingency_type.map(|x| x.to_string()))
443 .bind(snapshot.order_list_id.map(|x| x.to_string()))
444 .bind(snapshot.linked_order_ids.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
445 .bind(snapshot.parent_order_id.map(|x| x.to_string()))
446 .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
447 .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
448 .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
449 .bind(snapshot.tags.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
450 .bind(snapshot.init_id.to_string())
451 .bind(snapshot.ts_init.to_string())
452 .bind(snapshot.ts_last.to_string())
453 .execute(&mut *transaction)
454 .await
455 .map(|_| ())
456 .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
457
458 transaction
459 .commit()
460 .await
461 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
462 }
463
464 pub async fn load_order_snapshot(
470 pool: &PgPool,
471 client_order_id: &ClientOrderId,
472 ) -> anyhow::Result<Option<OrderSnapshot>> {
473 sqlx::query_as::<_, OrderSnapshotModel>(
474 r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
475 )
476 .bind(client_order_id.to_string())
477 .fetch_optional(pool)
478 .await
479 .map(|model| model.map(|m| m.0))
480 .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
481 }
482
483 pub async fn add_position_snapshot(
489 pool: &PgPool,
490 snapshot: PositionSnapshot,
491 ) -> anyhow::Result<()> {
492 let mut transaction = pool.begin().await?;
493
494 sqlx::query(
497 r#"
498 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
499 "#,
500 )
501 .bind(snapshot.trader_id.to_string())
502 .execute(&mut *transaction)
503 .await
504 .map(|_| ())
505 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
506
507 sqlx::query(r#"
508 INSERT INTO "position" (
509 id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
510 quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
511 duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
512 ) VALUES (
513 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
514 $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
515 )
516 ON CONFLICT (id)
517 DO UPDATE
518 SET
519 trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
520 peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
521 commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
522 "#)
523 .bind(snapshot.position_id.to_string())
524 .bind(snapshot.trader_id.to_string())
525 .bind(snapshot.strategy_id.to_string())
526 .bind(snapshot.instrument_id.to_string())
527 .bind(snapshot.account_id.to_string())
528 .bind(snapshot.opening_order_id.to_string())
529 .bind(snapshot.closing_order_id.map(|x| x.to_string()))
530 .bind(snapshot.entry.to_string())
531 .bind(snapshot.side.to_string())
532 .bind(snapshot.signed_qty)
533 .bind(snapshot.quantity.to_string())
534 .bind(snapshot.peak_qty.to_string())
535 .bind(snapshot.quote_currency.to_string())
536 .bind(snapshot.base_currency.map(|x| x.to_string()))
537 .bind(snapshot.settlement_currency.to_string())
538 .bind(snapshot.avg_px_open)
539 .bind(snapshot.avg_px_close)
540 .bind(snapshot.realized_return)
541 .bind(snapshot.realized_pnl.map(|x| x.to_string()))
542 .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
543 .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
544 .bind(snapshot.duration_ns.map(|x| x.to_string()))
545 .bind(snapshot.ts_opened.to_string())
546 .bind(snapshot.ts_closed.map(|x| x.to_string()))
547 .bind(snapshot.ts_init.to_string())
548 .bind(snapshot.ts_last.to_string())
549 .execute(&mut *transaction)
550 .await
551 .map(|_| ())
552 .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
553 transaction
554 .commit()
555 .await
556 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
557 }
558
559 pub async fn load_position_snapshot(
565 pool: &PgPool,
566 position_id: &PositionId,
567 ) -> anyhow::Result<Option<PositionSnapshot>> {
568 sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
569 .bind(position_id.to_string())
570 .fetch_optional(pool)
571 .await
572 .map(|model| model.map(|m| m.0))
573 .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
574 }
575
576 pub async fn check_if_order_initialized_exists(
582 pool: &PgPool,
583 client_order_id: ClientOrderId,
584 ) -> anyhow::Result<bool> {
585 sqlx::query(r#"
586 SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
587 "#)
588 .bind(client_order_id.to_string())
589 .fetch_one(pool)
590 .await
591 .map(|row| row.get(0))
592 .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
593 }
594
595 pub async fn check_if_account_event_exists(
601 pool: &PgPool,
602 account_id: AccountId,
603 ) -> anyhow::Result<bool> {
604 sqlx::query(
605 r#"
606 SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
607 "#,
608 )
609 .bind(account_id.to_string())
610 .fetch_one(pool)
611 .await
612 .map(|row| row.get(0))
613 .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
614 }
615
616 pub async fn add_order_event(
622 pool: &PgPool,
623 order_event: Box<dyn OrderEvent>,
624 client_id: Option<ClientId>,
625 ) -> anyhow::Result<()> {
626 let mut transaction = pool.begin().await?;
627
628 sqlx::query(
631 r#"
632 INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
633 "#,
634 )
635 .bind(order_event.trader_id().to_string())
636 .execute(&mut *transaction)
637 .await
638 .map(|_| ())
639 .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
640
641 if let Some(client_id) = client_id {
644 sqlx::query(
645 r#"
646 INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
647 "#,
648 )
649 .bind(client_id.to_string())
650 .execute(&mut *transaction)
651 .await
652 .map(|_| ())
653 .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
654 }
655
656 sqlx::query(r#"
657 INSERT INTO "order_event" (
658 id, kind, client_order_id, order_type, order_side, trader_id, client_id, reason, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
659 post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
660 trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
661 order_list_id, linked_order_ids, parent_order_id,
662 exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
663 ) VALUES (
664 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
665 $21, $22, $23, $24, $25, $26::trailing_offset_type, $27, $28, $29, $30, $31, $32, $33, $34,
666 $35, $36, $37, $38, $39, $40, $41, $42, $43, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
667 )
668 ON CONFLICT (id)
669 DO UPDATE
670 SET
671 kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, reason = $8, strategy_id = $9, instrument_id = $10, trade_id = $11, currency = $12,
672 quantity = $13, time_in_force = $14, liquidity_side = $15, post_only = $16, reduce_only = $17, quote_quantity = $18, reconciliation = $19, price = $20, last_px = $21,
673 last_qty = $22, trigger_price = $23, trigger_type = $24, limit_offset = $25, trailing_offset = $26, trailing_offset_type = $27, expire_time = $28, display_qty = $29,
674 emulation_trigger = $30, trigger_instrument_id = $31, contingency_type = $32, order_list_id = $33, linked_order_ids = $34, parent_order_id = $35, exec_algorithm_id = $36,
675 exec_spawn_id = $37, venue_order_id = $38, account_id = $39, position_id = $40, commission = $41, ts_event = $42, ts_init = $43, updated_at = CURRENT_TIMESTAMP
676
677 "#)
678 .bind(order_event.id().to_string())
679 .bind(order_event.kind())
680 .bind(order_event.client_order_id().to_string())
681 .bind(order_event.order_type().map(|x| x.to_string()))
682 .bind(order_event.order_side().map(|x| x.to_string()))
683 .bind(order_event.trader_id().to_string())
684 .bind(client_id.map(|x| x.to_string()))
685 .bind(order_event.reason().map(|x| x.to_string()))
686 .bind(order_event.strategy_id().to_string())
687 .bind(order_event.instrument_id().to_string())
688 .bind(order_event.trade_id().map(|x| x.to_string()))
689 .bind(order_event.currency().map(|x| x.code.as_str()))
690 .bind(order_event.quantity().map(|x| x.to_string()))
691 .bind(order_event.time_in_force().map(|x| x.to_string()))
692 .bind(order_event.liquidity_side().map(|x| x.to_string()))
693 .bind(order_event.post_only())
694 .bind(order_event.reduce_only())
695 .bind(order_event.quote_quantity())
696 .bind(order_event.reconciliation())
697 .bind(order_event.price().map(|x| x.to_string()))
698 .bind(order_event.last_px().map(|x| x.to_string()))
699 .bind(order_event.last_qty().map(|x| x.to_string()))
700 .bind(order_event.trigger_price().map(|x| x.to_string()))
701 .bind(order_event.trigger_type().map(|x| x.to_string()))
702 .bind(order_event.limit_offset().map(|x| x.to_string()))
703 .bind(order_event.trailing_offset().map(|x| x.to_string()))
704 .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
705 .bind(order_event.expire_time().map(|x| x.to_string()))
706 .bind(order_event.display_qty().map(|x| x.to_string()))
707 .bind(order_event.emulation_trigger().map(|x| x.to_string()))
708 .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
709 .bind(order_event.contingency_type().map(|x| x.to_string()))
710 .bind(order_event.order_list_id().map(|x| x.to_string()))
711 .bind(order_event.linked_order_ids().map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
712 .bind(order_event.parent_order_id().map(|x| x.to_string()))
713 .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
714 .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
715 .bind(order_event.venue_order_id().map(|x| x.to_string()))
716 .bind(order_event.account_id().map(|x| x.to_string()))
717 .bind(order_event.position_id().map(|x| x.to_string()))
718 .bind(order_event.commission().map(|x| x.to_string()))
719 .bind(order_event.ts_event().to_string())
720 .bind(order_event.ts_init().to_string())
721 .execute(&mut *transaction)
722 .await
723 .map(|_| ())
724 .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
725 transaction
726 .commit()
727 .await
728 .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
729 }
730
731 pub async fn load_order_events(
737 pool: &PgPool,
738 client_order_id: &ClientOrderId,
739 ) -> anyhow::Result<Vec<OrderEventAny>> {
740 sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
741 .bind(client_order_id.to_string())
742 .fetch_all(pool)
743 .await
744 .map(|rows| rows.into_iter().map(|row| row.0).collect())
745 .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
746 }
747
748 pub async fn load_order(
758 pool: &PgPool,
759 client_order_id: &ClientOrderId,
760 ) -> anyhow::Result<Option<OrderAny>> {
761 let order_events = Self::load_order_events(pool, client_order_id).await;
762
763 match order_events {
764 Ok(order_events) => {
765 if order_events.is_empty() {
766 return Ok(None);
767 }
768 let order = OrderAny::from_events(order_events).unwrap();
769 Ok(Some(order))
770 }
771 Err(e) => anyhow::bail!("Failed to load order events: {e}"),
772 }
773 }
774
775 pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
785 let mut orders: Vec<OrderAny> = Vec::new();
786 let client_order_ids: Vec<ClientOrderId> = sqlx::query(
787 r#"
788 SELECT DISTINCT client_order_id FROM "order_event"
789 "#,
790 )
791 .fetch_all(pool)
792 .await
793 .map(|rows| {
794 rows.into_iter()
795 .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
796 .collect()
797 })
798 .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
799 for id in client_order_ids {
800 let order = Self::load_order(pool, &id).await.unwrap();
801 match order {
802 Some(order) => {
803 orders.push(order);
804 }
805 None => {
806 continue;
807 }
808 }
809 }
810 Ok(orders)
811 }
812
813 pub async fn add_account(
823 pool: &PgPool,
824 kind: &str,
825 updated: bool,
826 account: Box<dyn Account>,
827 ) -> anyhow::Result<()> {
828 if updated {
829 let exists = Self::check_if_account_event_exists(pool, account.id())
830 .await
831 .unwrap();
832 assert!(
833 exists,
834 "Account event does not exist for account: {}",
835 account.id()
836 );
837 }
838
839 let mut transaction = pool.begin().await?;
840
841 sqlx::query(
842 r#"
843 INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
844 "#,
845 )
846 .bind(account.id().to_string())
847 .execute(&mut *transaction)
848 .await
849 .map(|_| ())
850 .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
851
852 let account_event = account.last_event().unwrap();
853 sqlx::query(r#"
854 INSERT INTO "account_event" (
855 id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
856 ) VALUES (
857 $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
858 )
859 ON CONFLICT (id)
860 DO UPDATE
861 SET
862 kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
863 ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
864 "#)
865 .bind(account_event.event_id.to_string())
866 .bind(kind.to_string())
867 .bind(account_event.account_id.to_string())
868 .bind(account_event.base_currency.map(|x| x.code.as_str()))
869 .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
870 .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
871 .bind(account_event.is_reported)
872 .bind(account_event.ts_event.to_string())
873 .bind(account_event.ts_init.to_string())
874 .execute(&mut *transaction)
875 .await
876 .map(|_| ())
877 .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
878 transaction
879 .commit()
880 .await
881 .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
882 }
883
884 pub async fn load_account_events(
890 pool: &PgPool,
891 account_id: &AccountId,
892 ) -> anyhow::Result<Vec<AccountState>> {
893 sqlx::query_as::<_, AccountEventModel>(
894 r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
895 )
896 .bind(account_id.to_string())
897 .fetch_all(pool)
898 .await
899 .map(|rows| rows.into_iter().map(|row| row.0).collect())
900 .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
901 }
902
903 pub async fn load_account(
913 pool: &PgPool,
914 account_id: &AccountId,
915 ) -> anyhow::Result<Option<AccountAny>> {
916 let account_events = Self::load_account_events(pool, account_id).await;
917 match account_events {
918 Ok(account_events) => {
919 if account_events.is_empty() {
920 return Ok(None);
921 }
922 let account = AccountAny::from_events(account_events).unwrap();
923 Ok(Some(account))
924 }
925 Err(e) => anyhow::bail!("Failed to load account events: {e}"),
926 }
927 }
928
929 pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
939 let mut accounts: Vec<AccountAny> = Vec::new();
940 let account_ids: Vec<AccountId> = sqlx::query(
941 r#"
942 SELECT DISTINCT account_id FROM "account_event"
943 "#,
944 )
945 .fetch_all(pool)
946 .await
947 .map(|rows| {
948 rows.into_iter()
949 .map(|row| AccountId::from(row.get::<&str, _>(0)))
950 .collect()
951 })
952 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
953 for id in account_ids {
954 let account = Self::load_account(pool, &id).await.unwrap();
955 match account {
956 Some(account) => {
957 accounts.push(account);
958 }
959 None => {
960 continue;
961 }
962 }
963 }
964 Ok(accounts)
965 }
966
967 pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
973 sqlx::query(r#"
974 INSERT INTO "trade" (
975 instrument_id, price, quantity, aggressor_side, venue_trade_id,
976 ts_event, ts_init, created_at, updated_at
977 ) VALUES (
978 $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
979 )
980 ON CONFLICT (id)
981 DO UPDATE
982 SET
983 instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
984 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
985 "#)
986 .bind(trade.instrument_id.to_string())
987 .bind(trade.price.to_string())
988 .bind(trade.size.to_string())
989 .bind(AggressorSideModel(trade.aggressor_side))
990 .bind(trade.trade_id.to_string())
991 .bind(trade.ts_event.to_string())
992 .bind(trade.ts_init.to_string())
993 .execute(pool)
994 .await
995 .map(|_| ())
996 .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
997 }
998
999 pub async fn load_trades(
1005 pool: &PgPool,
1006 instrument_id: &InstrumentId,
1007 ) -> anyhow::Result<Vec<TradeTick>> {
1008 sqlx::query_as::<_, TradeTickModel>(
1009 r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1010 )
1011 .bind(instrument_id.to_string())
1012 .fetch_all(pool)
1013 .await
1014 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1015 .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
1016 }
1017
1018 pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
1024 sqlx::query(r#"
1025 INSERT INTO "quote" (
1026 instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
1027 ) VALUES (
1028 $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1029 )
1030 ON CONFLICT (id)
1031 DO UPDATE
1032 SET
1033 instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
1034 ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
1035 "#)
1036 .bind(quote.instrument_id.to_string())
1037 .bind(quote.bid_price.to_string())
1038 .bind(quote.ask_price.to_string())
1039 .bind(quote.bid_size.to_string())
1040 .bind(quote.ask_size.to_string())
1041 .bind(quote.ts_event.to_string())
1042 .bind(quote.ts_init.to_string())
1043 .execute(pool)
1044 .await
1045 .map(|_| ())
1046 .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
1047 }
1048
1049 pub async fn load_quotes(
1055 pool: &PgPool,
1056 instrument_id: &InstrumentId,
1057 ) -> anyhow::Result<Vec<QuoteTick>> {
1058 sqlx::query_as::<_, QuoteTickModel>(
1059 r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1060 )
1061 .bind(instrument_id.to_string())
1062 .fetch_all(pool)
1063 .await
1064 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1065 .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
1066 }
1067
1068 pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
1074 println!("Adding bar: {bar:?}");
1075 sqlx::query(r#"
1076 INSERT INTO "bar" (
1077 instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
1078 ) VALUES (
1079 $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1080 )
1081 ON CONFLICT (id)
1082 DO UPDATE
1083 SET
1084 instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
1085 open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
1086 "#)
1087 .bind(bar.bar_type.instrument_id().to_string())
1088 .bind(bar.bar_type.spec().step.get() as i32)
1089 .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
1090 .bind(PriceTypeModel(bar.bar_type.spec().price_type))
1091 .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
1092 .bind(bar.open.to_string())
1093 .bind(bar.high.to_string())
1094 .bind(bar.low.to_string())
1095 .bind(bar.close.to_string())
1096 .bind(bar.volume.to_string())
1097 .bind(bar.ts_event.to_string())
1098 .bind(bar.ts_init.to_string())
1099 .execute(pool)
1100 .await
1101 .map(|_| ())
1102 .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
1103 }
1104
1105 pub async fn load_bars(
1111 pool: &PgPool,
1112 instrument_id: &InstrumentId,
1113 ) -> anyhow::Result<Vec<Bar>> {
1114 sqlx::query_as::<_, BarModel>(
1115 r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1116 )
1117 .bind(instrument_id.to_string())
1118 .fetch_all(pool)
1119 .await
1120 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1121 .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
1122 }
1123
1124 pub async fn load_distinct_order_event_client_ids(
1130 pool: &PgPool,
1131 ) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1132 let mut map: AHashMap<ClientOrderId, ClientId> = AHashMap::new();
1133 let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
1134 r#"
1135 SELECT DISTINCT
1136 client_order_id AS "client_order_id",
1137 client_id AS "client_id"
1138 FROM "order_event"
1139 "#,
1140 )
1141 .fetch_all(pool)
1142 .await
1143 .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
1144 for id in result {
1145 map.insert(id.client_order_id, id.client_id);
1146 }
1147 Ok(map)
1148 }
1149
1150 pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
1156 sqlx::query(
1157 r#"
1158 INSERT INTO "signal" (
1159 name, value, ts_event, ts_init, created_at, updated_at
1160 ) VALUES (
1161 $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1162 )
1163 ON CONFLICT (id)
1164 DO UPDATE
1165 SET
1166 name = $1, value = $2, ts_event = $3, ts_init = $4,
1167 updated_at = CURRENT_TIMESTAMP
1168 "#,
1169 )
1170 .bind(signal.name.to_string())
1171 .bind(signal.value.to_string())
1172 .bind(signal.ts_event.to_string())
1173 .bind(signal.ts_init.to_string())
1174 .execute(pool)
1175 .await
1176 .map(|_| ())
1177 .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
1178 }
1179
1180 pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
1186 sqlx::query_as::<_, SignalModel>(
1187 r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
1188 )
1189 .bind(name)
1190 .fetch_all(pool)
1191 .await
1192 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1193 .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1194 }
1195
1196 pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1202 sqlx::query(
1203 r#"
1204 INSERT INTO "custom" (
1205 data_type, metadata, value, ts_event, ts_init, created_at, updated_at
1206 ) VALUES (
1207 $1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1208 )
1209 ON CONFLICT (id)
1210 DO UPDATE
1211 SET
1212 data_type = $1, metadata = $2, value = $3, ts_event = $4, ts_init = $5,
1213 updated_at = CURRENT_TIMESTAMP
1214 "#,
1215 )
1216 .bind(data.data_type.type_name().to_string())
1217 .bind(
1218 data.data_type
1219 .metadata()
1220 .as_ref()
1221 .map_or_else(|| Ok(serde_json::Value::Null), serde_json::to_value)?,
1222 )
1223 .bind(data.value.to_vec())
1224 .bind(data.ts_event.to_string())
1225 .bind(data.ts_init.to_string())
1226 .execute(pool)
1227 .await
1228 .map(|_| ())
1229 .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1230 }
1231
1232 pub async fn load_custom_data(
1238 pool: &PgPool,
1239 data_type: &DataType,
1240 ) -> anyhow::Result<Vec<CustomData>> {
1241 let metadata_json = data_type
1243 .metadata()
1244 .as_ref()
1245 .map_or(Ok(serde_json::Value::Null), |metadata| {
1246 serde_json::to_value(metadata)
1247 })?;
1248
1249 sqlx::query_as::<_, CustomDataModel>(
1250 r#"SELECT * FROM "custom" WHERE data_type = $1 AND metadata = $2 ORDER BY ts_init ASC"#,
1251 )
1252 .bind(data_type.type_name())
1253 .bind(metadata_json)
1254 .fetch_all(pool)
1255 .await
1256 .map(|rows| rows.into_iter().map(|row| row.0).collect())
1257 .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))
1258 }
1259}