nautilus_infrastructure/sql/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_common::{custom::CustomData, signal::Signal};
19use nautilus_model::{
20    accounts::{any::AccountAny, base::Account},
21    data::{Bar, DataType, QuoteTick, TradeTick},
22    events::{
23        position::snapshot::PositionSnapshot, AccountState, OrderEvent, OrderEventAny,
24        OrderSnapshot,
25    },
26    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
27    instruments::{Instrument, InstrumentAny},
28    orders::{Order, OrderAny},
29    types::{AccountBalance, Currency, MarginBalance},
30};
31use sqlx::{PgPool, Row};
32
33use super::models::{
34    orders::OrderSnapshotModel,
35    positions::PositionSnapshotModel,
36    types::{CustomDataModel, SignalModel},
37};
38use crate::sql::models::{
39    accounts::AccountEventModel,
40    data::{BarModel, QuoteTickModel, TradeTickModel},
41    enums::{
42        AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
43        CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
44    },
45    general::{GeneralRow, OrderEventOrderClientIdCombination},
46    instruments::InstrumentAnyModel,
47    orders::OrderEventAnyModel,
48    types::CurrencyModel,
49};
50
51#[derive(Debug)]
52pub struct DatabaseQueries;
53
54impl DatabaseQueries {
55    pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
56        sqlx::query("SELECT truncate_all_tables()")
57            .execute(pool)
58            .await
59            .map(|_| ())
60            .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
61    }
62
63    pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
64        sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
65            .bind(key)
66            .bind(value)
67            .execute(pool)
68            .await
69            .map(|_| ())
70            .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
71    }
72
73    pub async fn load(pool: &PgPool) -> anyhow::Result<HashMap<String, Vec<u8>>> {
74        sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
75            .fetch_all(pool)
76            .await
77            .map(|rows| {
78                let mut cache: HashMap<String, Vec<u8>> = HashMap::new();
79                for row in rows {
80                    cache.insert(row.id, row.value);
81                }
82                cache
83            })
84            .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
85    }
86
87    pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
88        sqlx::query(
89            "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
90        )
91            .bind(currency.code.as_str())
92            .bind(currency.precision as i32)
93            .bind(currency.iso4217 as i32)
94            .bind(currency.name.as_str())
95            .bind(CurrencyTypeModel(currency.currency_type))
96            .execute(pool)
97            .await
98            .map(|_| ())
99            .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
100    }
101
102    pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
103        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
104            .fetch_all(pool)
105            .await
106            .map(|rows| rows.into_iter().map(|row| row.0).collect())
107            .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
108    }
109
110    pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
111        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
112            .bind(code)
113            .fetch_optional(pool)
114            .await
115            .map(|currency| currency.map(|row| row.0))
116            .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
117    }
118
119    pub async fn add_instrument(
120        pool: &PgPool,
121        kind: &str,
122        instrument: Box<dyn Instrument>,
123    ) -> anyhow::Result<()> {
124        sqlx::query(r#"
125            INSERT INTO "instrument" (
126                id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
127                multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
128                price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
129                min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
130            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
131            ON CONFLICT (id)
132            DO UPDATE
133            SET
134                kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
135                 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
136                 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
137                 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32,  ts_event = $33, updated_at = CURRENT_TIMESTAMP
138            "#)
139            .bind(instrument.id().to_string())
140            .bind(kind)
141            .bind(instrument.raw_symbol().to_string())
142            .bind(instrument.base_currency().map(|x| x.code.as_str()))
143            .bind(instrument.underlying().map(|x| x.to_string()))
144            .bind(instrument.quote_currency().code.as_str())
145            .bind(instrument.settlement_currency().code.as_str())
146            .bind(instrument.isin().map(|x| x.to_string()))
147            .bind(AssetClassModel(instrument.asset_class()))
148            .bind(instrument.exchange().map(|x| x.to_string()))
149            .bind(instrument.multiplier().to_string())
150            .bind(instrument.option_kind().map(|x| x.to_string()))
151            .bind(instrument.is_inverse())
152            .bind(instrument.strike_price().map(|x| x.to_string()))
153            .bind(instrument.activation_ns().map(|x| x.to_string()))
154            .bind(instrument.expiration_ns().map(|x| x.to_string()))
155            .bind(instrument.price_precision() as i32)
156            .bind(instrument.size_precision() as i32)
157            .bind(instrument.price_increment().to_string())
158            .bind(instrument.size_increment().to_string())
159            .bind(instrument.maker_fee().to_string())
160            .bind(instrument.taker_fee().to_string())
161            .bind(instrument.margin_init().to_string())
162            .bind(instrument.margin_maint().to_string())
163            .bind(instrument.lot_size().map(|x| x.to_string()))
164            .bind(instrument.max_quantity().map(|x| x.to_string()))
165            .bind(instrument.min_quantity().map(|x| x.to_string()))
166            .bind(instrument.max_notional().map(|x| x.to_string()))
167            .bind(instrument.min_notional().map(|x| x.to_string()))
168            .bind(instrument.max_price().map(|x| x.to_string()))
169            .bind(instrument.min_price().map(|x| x.to_string()))
170            .bind(instrument.ts_init().to_string())
171            .bind(instrument.ts_event().to_string())
172            .execute(pool)
173            .await
174            .map(|_| ())
175            .map_err(|e| anyhow::anyhow!(format!("Failed to insert item {} into instrument table: {:?}", instrument.id().to_string(), e)))
176    }
177
178    pub async fn load_instrument(
179        pool: &PgPool,
180        instrument_id: &InstrumentId,
181    ) -> anyhow::Result<Option<InstrumentAny>> {
182        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
183            .bind(instrument_id.to_string())
184            .fetch_optional(pool)
185            .await
186            .map(|instrument| instrument.map(|row| row.0))
187            .map_err(|e| {
188                anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
189            })
190    }
191
192    pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
193        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
194            .fetch_all(pool)
195            .await
196            .map(|rows| rows.into_iter().map(|row| row.0).collect())
197            .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
198    }
199
200    pub async fn add_order(
201        pool: &PgPool,
202        _kind: &str,
203        updated: bool,
204        order: Box<dyn Order>,
205        client_id: Option<ClientId>,
206    ) -> anyhow::Result<()> {
207        if updated {
208            let exists =
209                DatabaseQueries::check_if_order_initialized_exists(pool, order.client_order_id())
210                    .await
211                    .unwrap();
212            if !exists {
213                panic!(
214                    "OrderInitialized event does not exist for order: {}",
215                    order.client_order_id()
216                );
217            }
218        }
219        match order.last_event().clone() {
220            OrderEventAny::Accepted(event) => {
221                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
222            }
223            OrderEventAny::CancelRejected(event) => {
224                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
225            }
226            OrderEventAny::Canceled(event) => {
227                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
228            }
229            OrderEventAny::Denied(event) => {
230                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
231            }
232            OrderEventAny::Emulated(event) => {
233                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
234            }
235            OrderEventAny::Expired(event) => {
236                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
237            }
238            OrderEventAny::Filled(event) => {
239                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
240            }
241            OrderEventAny::Initialized(event) => {
242                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
243            }
244            OrderEventAny::ModifyRejected(event) => {
245                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
246            }
247            OrderEventAny::PendingCancel(event) => {
248                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
249            }
250            OrderEventAny::PendingUpdate(event) => {
251                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
252            }
253            OrderEventAny::Rejected(event) => {
254                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
255            }
256            OrderEventAny::Released(event) => {
257                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
258            }
259            OrderEventAny::Submitted(event) => {
260                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
261            }
262            OrderEventAny::Updated(event) => {
263                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
264            }
265            OrderEventAny::Triggered(event) => {
266                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
267            }
268            OrderEventAny::PartiallyFilled(event) => {
269                DatabaseQueries::add_order_event(pool, Box::new(event), client_id).await
270            }
271        }
272    }
273
274    pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
275        let mut transaction = pool.begin().await?;
276
277        // Insert trader if it does not exist
278        // TODO remove this when node and trader initialization is implemented
279        sqlx::query(
280            r#"
281            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
282            "#,
283        )
284        .bind(snapshot.trader_id.to_string())
285        .execute(&mut *transaction)
286        .await
287        .map(|_| ())
288        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
289
290        sqlx::query(
291            r#"
292            INSERT INTO "order" (
293                id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
294                account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
295                trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
296                expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
297                is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
298                trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
299                parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
300                created_at, updated_at
301            ) VALUES (
302                $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
303                $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
304                $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
305                CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
306            )
307            ON CONFLICT (id)
308            DO UPDATE SET
309                trader_id = $2,
310                strategy_id = $3,
311                instrument_id = $4,
312                venue_order_id = $5,
313                position_id = $6,
314                account_id = $7,
315                last_trade_id = $8,
316                order_type = $9,
317                order_side = $10,
318                quantity = $11,
319                price = $12,
320                trigger_price = $13,
321                trigger_type = $14,
322                limit_offset = $15,
323                trailing_offset = $16,
324                trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
325                time_in_force = $18,
326                expire_time = $19,
327                filled_qty = $20,
328                liquidity_side = $21,
329                avg_px = $22,
330                slippage = $23,
331                commissions = $24,
332                status = $25,
333                is_post_only = $26,
334                is_reduce_only = $27,
335                is_quote_quantity = $28,
336                display_qty = $29,
337                emulation_trigger = $30,
338                trigger_instrument_id = $31,
339                contingency_type = $32,
340                order_list_id = $33,
341                linked_order_ids = $34,
342                parent_order_id = $35,
343                exec_algorithm_id = $36,
344                exec_algorithm_params = $37,
345                exec_spawn_id = $38,
346                tags = $39,
347                init_id = $40,
348                ts_init = $41,
349                ts_last = $42,
350                updated_at = CURRENT_TIMESTAMP
351        "#)
352            .bind(snapshot.client_order_id.to_string())  // Used for both id and client_order_id
353            .bind(snapshot.trader_id.to_string())
354            .bind(snapshot.strategy_id.to_string())
355            .bind(snapshot.instrument_id.to_string())
356            .bind(snapshot.venue_order_id.map(|x| x.to_string()))
357            .bind(snapshot.position_id.map(|x| x.to_string()))
358            .bind(snapshot.account_id.map(|x| x.to_string()))
359            .bind(snapshot.last_trade_id.map(|x| x.to_string()))
360            .bind(snapshot.order_type.to_string())
361            .bind(snapshot.order_side.to_string())
362            .bind(snapshot.quantity.to_string())
363            .bind(snapshot.price.map(|x| x.to_string()))
364            .bind(snapshot.trigger_price.map(|x| x.to_string()))
365            .bind(snapshot.trigger_type.map(|x| x.to_string()))
366            .bind(snapshot.limit_offset.map(|x| x.to_string()))
367            .bind(snapshot.trailing_offset.map(|x| x.to_string()))
368            .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
369            .bind(snapshot.time_in_force.to_string())
370            .bind(snapshot.expire_time.map(|x| x.to_string()))
371            .bind(snapshot.filled_qty.to_string())
372            .bind(snapshot.liquidity_side.map(|x| x.to_string()))
373            .bind(snapshot.avg_px)
374            .bind(snapshot.slippage)
375            .bind(snapshot.commissions.iter().map(|x| x.to_string()).collect::<Vec<String>>())
376            .bind(snapshot.status.to_string())
377            .bind(snapshot.is_post_only)
378            .bind(snapshot.is_reduce_only)
379            .bind(snapshot.is_quote_quantity)
380            .bind(snapshot.display_qty.map(|x| x.to_string()))
381            .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
382            .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
383            .bind(snapshot.contingency_type.map(|x| x.to_string()))
384            .bind(snapshot.order_list_id.map(|x| x.to_string()))
385            .bind(snapshot.linked_order_ids.map(|x| x.iter().map(|x| x.to_string()).collect::<Vec<String>>()))
386            .bind(snapshot.parent_order_id.map(|x| x.to_string()))
387            .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
388            .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
389            .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
390            .bind(snapshot.tags.map(|x| x.iter().map(|x| x.to_string()).collect::<Vec<String>>()))
391            .bind(snapshot.init_id.to_string())
392            .bind(snapshot.ts_init.to_string())
393            .bind(snapshot.ts_last.to_string())
394            .execute(&mut *transaction)
395            .await
396            .map(|_| ())
397            .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
398
399        transaction
400            .commit()
401            .await
402            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
403    }
404
405    pub async fn load_order_snapshot(
406        pool: &PgPool,
407        client_order_id: &ClientOrderId,
408    ) -> anyhow::Result<Option<OrderSnapshot>> {
409        sqlx::query_as::<_, OrderSnapshotModel>(
410            r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
411        )
412        .bind(client_order_id.to_string())
413        .fetch_optional(pool)
414        .await
415        .map(|model| model.map(|m| m.0))
416        .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
417    }
418
419    pub async fn add_position_snapshot(
420        pool: &PgPool,
421        snapshot: PositionSnapshot,
422    ) -> anyhow::Result<()> {
423        let mut transaction = pool.begin().await?;
424
425        // Insert trader if it does not exist
426        // TODO remove this when node and trader initialization is implemented
427        sqlx::query(
428            r#"
429            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
430        "#,
431        )
432        .bind(snapshot.trader_id.to_string())
433        .execute(&mut *transaction)
434        .await
435        .map(|_| ())
436        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
437
438        sqlx::query(r#"
439            INSERT INTO "position" (
440                id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
441                quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
442                duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
443            ) VALUES (
444                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
445                $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
446            )
447            ON CONFLICT (id)
448            DO UPDATE
449            SET
450                trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
451                peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
452                commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
453        "#)
454            .bind(snapshot.position_id.to_string())
455            .bind(snapshot.trader_id.to_string())
456            .bind(snapshot.strategy_id.to_string())
457            .bind(snapshot.instrument_id.to_string())
458            .bind(snapshot.account_id.to_string())
459            .bind(snapshot.opening_order_id.to_string())
460            .bind(snapshot.closing_order_id.map(|x| x.to_string()))
461            .bind(snapshot.entry.to_string())
462            .bind(snapshot.side.to_string())
463            .bind(snapshot.signed_qty)
464            .bind(snapshot.quantity.to_string())
465            .bind(snapshot.peak_qty.to_string())
466            .bind(snapshot.quote_currency.to_string())
467            .bind(snapshot.base_currency.map(|x| x.to_string()))
468            .bind(snapshot.settlement_currency.to_string())
469            .bind(snapshot.avg_px_open)
470            .bind(snapshot.avg_px_close)
471            .bind(snapshot.realized_return)
472            .bind(snapshot.realized_pnl.map(|x| x.to_string()))
473            .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
474            .bind(snapshot.commissions.iter().map(|x| x.to_string()).collect::<Vec<String>>())
475            .bind(snapshot.duration_ns.map(|x| x.to_string()))
476            .bind(snapshot.ts_opened.to_string())
477            .bind(snapshot.ts_closed.map(|x| x.to_string()))
478            .bind(snapshot.ts_init.to_string())
479            .bind(snapshot.ts_last.to_string())
480            .execute(&mut *transaction)
481            .await
482            .map(|_| ())
483            .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
484        transaction
485            .commit()
486            .await
487            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
488    }
489
490    pub async fn load_position_snapshot(
491        pool: &PgPool,
492        position_id: &PositionId,
493    ) -> anyhow::Result<Option<PositionSnapshot>> {
494        sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
495            .bind(position_id.to_string())
496            .fetch_optional(pool)
497            .await
498            .map(|model| model.map(|m| m.0))
499            .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
500    }
501
502    pub async fn check_if_order_initialized_exists(
503        pool: &PgPool,
504        client_order_id: ClientOrderId,
505    ) -> anyhow::Result<bool> {
506        sqlx::query(r#"
507            SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
508        "#)
509            .bind(client_order_id.to_string())
510            .fetch_one(pool)
511            .await
512            .map(|row| row.get(0))
513            .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
514    }
515
516    pub async fn check_if_account_event_exists(
517        pool: &PgPool,
518        account_id: AccountId,
519    ) -> anyhow::Result<bool> {
520        sqlx::query(
521            r#"
522            SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
523        "#,
524        )
525        .bind(account_id.to_string())
526        .fetch_one(pool)
527        .await
528        .map(|row| row.get(0))
529        .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
530    }
531
532    pub async fn add_order_event(
533        pool: &PgPool,
534        order_event: Box<dyn OrderEvent>,
535        client_id: Option<ClientId>,
536    ) -> anyhow::Result<()> {
537        let mut transaction = pool.begin().await?;
538
539        // Insert trader if it does not exist
540        // TODO remove this when node and trader initialization is implemented
541        sqlx::query(
542            r#"
543            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
544        "#,
545        )
546        .bind(order_event.trader_id().to_string())
547        .execute(&mut *transaction)
548        .await
549        .map(|_| ())
550        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
551
552        // Insert client if it does not exist
553        // TODO remove this when client initialization is implemented
554        if let Some(client_id) = client_id {
555            sqlx::query(
556                r#"
557                INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
558            "#,
559            )
560            .bind(client_id.to_string())
561            .execute(&mut *transaction)
562            .await
563            .map(|_| ())
564            .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
565        }
566
567        sqlx::query(r#"
568            INSERT INTO "order_event" (
569                id, kind, client_order_id, order_type, order_side, trader_id, client_id, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
570                post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
571                trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
572                order_list_id, linked_order_ids, parent_order_id,
573                exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
574            ) VALUES (
575                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
576                $21, $22, $23, $24, $25::trailing_offset_type, $26, $27, $28, $29, $30, $31, $32, $33, $34,
577                $35, $36, $37, $38, $39, $40, $41, $42,  CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
578            )
579            ON CONFLICT (id)
580            DO UPDATE
581            SET
582                kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, strategy_id = $8, instrument_id = $9, trade_id = $10, currency = $11,
583                quantity = $12, time_in_force = $13, liquidity_side = $14, post_only = $15, reduce_only = $16, quote_quantity = $17, reconciliation = $18, price = $19, last_px = $20,
584                last_qty = $21, trigger_price = $22, trigger_type = $23, limit_offset = $24, trailing_offset = $25, trailing_offset_type = $26, expire_time = $27, display_qty = $28,
585                emulation_trigger = $29, trigger_instrument_id = $30, contingency_type = $31, order_list_id = $32, linked_order_ids = $33, parent_order_id = $34, exec_algorithm_id = $35,
586                exec_spawn_id = $36, venue_order_id = $37, account_id = $38, position_id = $39, commission = $40, ts_event = $41, ts_init = $42, updated_at = CURRENT_TIMESTAMP
587
588        "#)
589            .bind(order_event.id().to_string())
590            .bind(order_event.kind())
591            .bind(order_event.client_order_id().to_string())
592            .bind(order_event.order_type().map(|x| x.to_string()))
593            .bind(order_event.order_side().map(|x| x.to_string()))
594            .bind(order_event.trader_id().to_string())
595            .bind(client_id.map(|x| x.to_string()))
596            .bind(order_event.strategy_id().to_string())
597            .bind(order_event.instrument_id().to_string())
598            .bind(order_event.trade_id().map(|x| x.to_string()))
599            .bind(order_event.currency().map(|x| x.code.as_str()))
600            .bind(order_event.quantity().map(|x| x.to_string()))
601            .bind(order_event.time_in_force().map(|x| x.to_string()))
602            .bind(order_event.liquidity_side().map(|x| x.to_string()))
603            .bind(order_event.post_only())
604            .bind(order_event.reduce_only())
605            .bind(order_event.quote_quantity())
606            .bind(order_event.reconciliation())
607            .bind(order_event.price().map(|x| x.to_string()))
608            .bind(order_event.last_px().map(|x| x.to_string()))
609            .bind(order_event.last_qty().map(|x| x.to_string()))
610            .bind(order_event.trigger_price().map(|x| x.to_string()))
611            .bind(order_event.trigger_type().map(|x| x.to_string()))
612            .bind(order_event.limit_offset().map(|x| x.to_string()))
613            .bind(order_event.trailing_offset().map(|x| x.to_string()))
614            .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
615            .bind(order_event.expire_time().map(|x| x.to_string()))
616            .bind(order_event.display_qty().map(|x| x.to_string()))
617            .bind(order_event.emulation_trigger().map(|x| x.to_string()))
618            .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
619            .bind(order_event.contingency_type().map(|x| x.to_string()))
620            .bind(order_event.order_list_id().map(|x| x.to_string()))
621            .bind(order_event.linked_order_ids().map(|x| x.iter().map(|x| x.to_string()).collect::<Vec<String>>()))
622            .bind(order_event.parent_order_id().map(|x| x.to_string()))
623            .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
624            .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
625            .bind(order_event.venue_order_id().map(|x| x.to_string()))
626            .bind(order_event.account_id().map(|x| x.to_string()))
627            .bind(order_event.position_id().map(|x| x.to_string()))
628            .bind(order_event.commission().map(|x| x.to_string()))
629            .bind(order_event.ts_event().to_string())
630            .bind(order_event.ts_init().to_string())
631            .execute(&mut *transaction)
632            .await
633            .map(|_| ())
634            .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
635        transaction
636            .commit()
637            .await
638            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
639    }
640
641    pub async fn load_order_events(
642        pool: &PgPool,
643        client_order_id: &ClientOrderId,
644    ) -> anyhow::Result<Vec<OrderEventAny>> {
645        sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
646        .bind(client_order_id.to_string())
647        .fetch_all(pool)
648        .await
649        .map(|rows| rows.into_iter().map(|row| row.0).collect())
650        .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
651    }
652
653    pub async fn load_order(
654        pool: &PgPool,
655        client_order_id: &ClientOrderId,
656    ) -> anyhow::Result<Option<OrderAny>> {
657        let order_events = DatabaseQueries::load_order_events(pool, client_order_id).await;
658
659        match order_events {
660            Ok(order_events) => {
661                if order_events.is_empty() {
662                    return Ok(None);
663                }
664                let order = OrderAny::from_events(order_events).unwrap();
665                Ok(Some(order))
666            }
667            Err(e) => anyhow::bail!("Failed to load order events: {e}"),
668        }
669    }
670
671    pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
672        let mut orders: Vec<OrderAny> = Vec::new();
673        let client_order_ids: Vec<ClientOrderId> = sqlx::query(
674            r#"
675            SELECT DISTINCT client_order_id FROM "order_event"
676        "#,
677        )
678        .fetch_all(pool)
679        .await
680        .map(|rows| {
681            rows.into_iter()
682                .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
683                .collect()
684        })
685        .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
686        for id in client_order_ids {
687            let order = DatabaseQueries::load_order(pool, &id).await.unwrap();
688            match order {
689                Some(order) => {
690                    orders.push(order);
691                }
692                None => {
693                    continue;
694                }
695            }
696        }
697        Ok(orders)
698    }
699
700    pub async fn add_account(
701        pool: &PgPool,
702        kind: &str,
703        updated: bool,
704        account: Box<dyn Account>,
705    ) -> anyhow::Result<()> {
706        if updated {
707            let exists = DatabaseQueries::check_if_account_event_exists(pool, account.id())
708                .await
709                .unwrap();
710            if !exists {
711                panic!("Account event does not exist for account: {}", account.id());
712            }
713        }
714
715        let mut transaction = pool.begin().await?;
716
717        sqlx::query(
718            r#"
719            INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
720        "#,
721        )
722        .bind(account.id().to_string())
723        .execute(&mut *transaction)
724        .await
725        .map(|_| ())
726        .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
727
728        let account_event = account.last_event().unwrap();
729        sqlx::query(r#"
730            INSERT INTO "account_event" (
731                id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
732            ) VALUES (
733                $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
734            )
735            ON CONFLICT (id)
736            DO UPDATE
737            SET
738                kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
739                ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
740        "#)
741            .bind(account_event.event_id.to_string())
742            .bind(kind.to_string())
743            .bind(account_event.account_id.to_string())
744            .bind(account_event.base_currency.map(|x| x.code.as_str()))
745            .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
746            .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
747            .bind(account_event.is_reported)
748            .bind(account_event.ts_event.to_string())
749            .bind(account_event.ts_init.to_string())
750            .execute(&mut *transaction)
751            .await
752            .map(|_| ())
753            .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
754        transaction
755            .commit()
756            .await
757            .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
758    }
759
760    pub async fn load_account_events(
761        pool: &PgPool,
762        account_id: &AccountId,
763    ) -> anyhow::Result<Vec<AccountState>> {
764        sqlx::query_as::<_, AccountEventModel>(
765            r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
766        )
767        .bind(account_id.to_string())
768        .fetch_all(pool)
769        .await
770        .map(|rows| rows.into_iter().map(|row| row.0).collect())
771        .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
772    }
773
774    pub async fn load_account(
775        pool: &PgPool,
776        account_id: &AccountId,
777    ) -> anyhow::Result<Option<AccountAny>> {
778        let account_events = DatabaseQueries::load_account_events(pool, account_id).await;
779        match account_events {
780            Ok(account_events) => {
781                if account_events.is_empty() {
782                    return Ok(None);
783                }
784                let account = AccountAny::from_events(account_events).unwrap();
785                Ok(Some(account))
786            }
787            Err(e) => anyhow::bail!("Failed to load account events: {e}"),
788        }
789    }
790
791    pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
792        let mut accounts: Vec<AccountAny> = Vec::new();
793        let account_ids: Vec<AccountId> = sqlx::query(
794            r#"
795            SELECT DISTINCT account_id FROM "account_event"
796        "#,
797        )
798        .fetch_all(pool)
799        .await
800        .map(|rows| {
801            rows.into_iter()
802                .map(|row| AccountId::from(row.get::<&str, _>(0)))
803                .collect()
804        })
805        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
806        for id in account_ids {
807            let account = DatabaseQueries::load_account(pool, &id).await.unwrap();
808            match account {
809                Some(account) => {
810                    accounts.push(account);
811                }
812                None => {
813                    continue;
814                }
815            }
816        }
817        Ok(accounts)
818    }
819
820    pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
821        sqlx::query(r#"
822            INSERT INTO "trade" (
823                instrument_id, price, quantity, aggressor_side, venue_trade_id,
824                ts_event, ts_init, created_at, updated_at
825            ) VALUES (
826                $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
827            )
828            ON CONFLICT (id)
829            DO UPDATE
830            SET
831                instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
832                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
833        "#)
834            .bind(trade.instrument_id.to_string())
835            .bind(trade.price.to_string())
836            .bind(trade.size.to_string())
837            .bind(AggressorSideModel(trade.aggressor_side))
838            .bind(trade.trade_id.to_string())
839            .bind(trade.ts_event.to_string())
840            .bind(trade.ts_init.to_string())
841            .execute(pool)
842            .await
843            .map(|_| ())
844            .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
845    }
846
847    pub async fn load_trades(
848        pool: &PgPool,
849        instrument_id: &InstrumentId,
850    ) -> anyhow::Result<Vec<TradeTick>> {
851        sqlx::query_as::<_, TradeTickModel>(
852            r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
853        )
854        .bind(instrument_id.to_string())
855        .fetch_all(pool)
856        .await
857        .map(|rows| rows.into_iter().map(|row| row.0).collect())
858        .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
859    }
860
861    pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
862        sqlx::query(r#"
863            INSERT INTO "quote" (
864                instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
865            ) VALUES (
866                $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
867            )
868            ON CONFLICT (id)
869            DO UPDATE
870            SET
871                instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
872                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
873        "#)
874            .bind(quote.instrument_id.to_string())
875            .bind(quote.bid_price.to_string())
876            .bind(quote.ask_price.to_string())
877            .bind(quote.bid_size.to_string())
878            .bind(quote.ask_size.to_string())
879            .bind(quote.ts_event.to_string())
880            .bind(quote.ts_init.to_string())
881            .execute(pool)
882            .await
883            .map(|_| ())
884            .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
885    }
886
887    pub async fn load_quotes(
888        pool: &PgPool,
889        instrument_id: &InstrumentId,
890    ) -> anyhow::Result<Vec<QuoteTick>> {
891        sqlx::query_as::<_, QuoteTickModel>(
892            r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
893        )
894        .bind(instrument_id.to_string())
895        .fetch_all(pool)
896        .await
897        .map(|rows| rows.into_iter().map(|row| row.0).collect())
898        .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
899    }
900
901    pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
902        println!("Adding bar: {:?}", bar);
903        sqlx::query(r#"
904            INSERT INTO "bar" (
905                instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
906            ) VALUES (
907                $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
908            )
909            ON CONFLICT (id)
910            DO UPDATE
911            SET
912                instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
913                open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
914        "#)
915            .bind(bar.bar_type.instrument_id().to_string())
916            .bind(bar.bar_type.spec().step.get() as i32)
917            .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
918            .bind(PriceTypeModel(bar.bar_type.spec().price_type))
919            .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
920            .bind(bar.open.to_string())
921            .bind(bar.high.to_string())
922            .bind(bar.low.to_string())
923            .bind(bar.close.to_string())
924            .bind(bar.volume.to_string())
925            .bind(bar.ts_event.to_string())
926            .bind(bar.ts_init.to_string())
927            .execute(pool)
928            .await
929            .map(|_| ())
930            .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
931    }
932
933    pub async fn load_bars(
934        pool: &PgPool,
935        instrument_id: &InstrumentId,
936    ) -> anyhow::Result<Vec<Bar>> {
937        sqlx::query_as::<_, BarModel>(
938            r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
939        )
940        .bind(instrument_id.to_string())
941        .fetch_all(pool)
942        .await
943        .map(|rows| rows.into_iter().map(|row| row.0).collect())
944        .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
945    }
946
947    pub async fn load_distinct_order_event_client_ids(
948        pool: &PgPool,
949    ) -> anyhow::Result<HashMap<ClientOrderId, ClientId>> {
950        let mut map: HashMap<ClientOrderId, ClientId> = HashMap::new();
951        let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
952            r#"
953            SELECT DISTINCT
954                client_order_id AS "client_order_id",
955                client_id AS "client_id"
956            FROM "order_event"
957        "#,
958        )
959        .fetch_all(pool)
960        .await
961        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
962        for id in result {
963            map.insert(id.client_order_id, id.client_id);
964        }
965        Ok(map)
966    }
967
968    pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
969        sqlx::query(
970            r#"
971            INSERT INTO "signal" (
972                name, value, ts_event, ts_init, created_at, updated_at
973            ) VALUES (
974                $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
975            )
976            ON CONFLICT (id)
977            DO UPDATE
978            SET
979                name = $1, value = $2, ts_event = $3, ts_init = $4,
980                updated_at = CURRENT_TIMESTAMP
981        "#,
982        )
983        .bind(signal.name.to_string())
984        .bind(signal.value.to_string())
985        .bind(signal.ts_event.to_string())
986        .bind(signal.ts_init.to_string())
987        .execute(pool)
988        .await
989        .map(|_| ())
990        .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
991    }
992
993    pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
994        sqlx::query_as::<_, SignalModel>(
995            r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
996        )
997        .bind(name)
998        .fetch_all(pool)
999        .await
1000        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1001        .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1002    }
1003
1004    pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1005        sqlx::query(
1006            r#"
1007            INSERT INTO "custom" (
1008                data_type, metadata, value, ts_event, ts_init, created_at, updated_at
1009            ) VALUES (
1010                $1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1011            )
1012            ON CONFLICT (id)
1013            DO UPDATE
1014            SET
1015                data_type = $1, metadata = $2, value = $3, ts_event = $4, ts_init = $5,
1016                updated_at = CURRENT_TIMESTAMP
1017        "#,
1018        )
1019        .bind(data.data_type.type_name().to_string())
1020        .bind(
1021            data.data_type
1022                .metadata()
1023                .as_ref()
1024                .map_or_else(|| Ok(serde_json::Value::Null), serde_json::to_value)?,
1025        )
1026        .bind(data.value.to_vec())
1027        .bind(data.ts_event.to_string())
1028        .bind(data.ts_init.to_string())
1029        .execute(pool)
1030        .await
1031        .map(|_| ())
1032        .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1033    }
1034
1035    pub async fn load_custom_data(
1036        pool: &PgPool,
1037        data_type: &DataType,
1038    ) -> anyhow::Result<Vec<CustomData>> {
1039        // TODO: This metadata JSON could be more efficient at some point
1040        let metadata_json = data_type
1041            .metadata()
1042            .as_ref()
1043            .map_or(Ok(serde_json::Value::Null), |metadata| {
1044                serde_json::to_value(metadata)
1045            })?;
1046
1047        sqlx::query_as::<_, CustomDataModel>(
1048            r#"SELECT * FROM "custom" WHERE data_type = $1 AND metadata = $2 ORDER BY ts_init ASC"#,
1049        )
1050        .bind(data_type.type_name())
1051        .bind(metadata_json)
1052        .fetch_all(pool)
1053        .await
1054        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1055        .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))
1056    }
1057}