nautilus_infrastructure/sql/
queries.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17use nautilus_common::{custom::CustomData, signal::Signal};
18use nautilus_model::{
19    accounts::{Account, AccountAny},
20    data::{Bar, DataType, QuoteTick, TradeTick},
21    events::{
22        AccountState, OrderEvent, OrderEventAny, OrderSnapshot,
23        position::snapshot::PositionSnapshot,
24    },
25    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId},
26    instruments::{Instrument, InstrumentAny},
27    orders::{Order, OrderAny},
28    types::{AccountBalance, Currency, MarginBalance},
29};
30use sqlx::{PgPool, Row};
31
32use super::models::{
33    orders::OrderSnapshotModel,
34    positions::PositionSnapshotModel,
35    types::{CustomDataModel, SignalModel},
36};
37use crate::sql::models::{
38    accounts::AccountEventModel,
39    data::{BarModel, QuoteTickModel, TradeTickModel},
40    enums::{
41        AggregationSourceModel, AggressorSideModel, AssetClassModel, BarAggregationModel,
42        CurrencyTypeModel, PriceTypeModel, TrailingOffsetTypeModel,
43    },
44    general::{GeneralRow, OrderEventOrderClientIdCombination},
45    instruments::InstrumentAnyModel,
46    orders::OrderEventAnyModel,
47    types::CurrencyModel,
48};
49
50#[derive(Debug)]
51pub struct DatabaseQueries;
52
53impl DatabaseQueries {
54    /// Truncates all tables in the cache database via the provided Postgres `pool`.
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if the TRUNCATE operation fails.
59    pub async fn truncate(pool: &PgPool) -> anyhow::Result<()> {
60        sqlx::query("SELECT truncate_all_tables()")
61            .execute(pool)
62            .await
63            .map(|_| ())
64            .map_err(|e| anyhow::anyhow!("Failed to truncate tables: {e}"))
65    }
66
67    /// Inserts a raw key-value entry into the `general` table via the provided `pool`.
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if the INSERT operation fails.
72    pub async fn add(pool: &PgPool, key: String, value: Vec<u8>) -> anyhow::Result<()> {
73        sqlx::query("INSERT INTO general (id, value) VALUES ($1, $2)")
74            .bind(key)
75            .bind(value)
76            .execute(pool)
77            .await
78            .map(|_| ())
79            .map_err(|e| anyhow::anyhow!("Failed to insert into general table: {e}"))
80    }
81
82    /// Loads all entries from the `general` table via the provided `pool`.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if the SELECT operation fails.
87    pub async fn load(pool: &PgPool) -> anyhow::Result<AHashMap<String, Vec<u8>>> {
88        sqlx::query_as::<_, GeneralRow>("SELECT * FROM general")
89            .fetch_all(pool)
90            .await
91            .map(|rows| {
92                let mut cache: AHashMap<String, Vec<u8>> = AHashMap::new();
93                for row in rows {
94                    cache.insert(row.id, row.value);
95                }
96                cache
97            })
98            .map_err(|e| anyhow::anyhow!("Failed to load general table: {e}"))
99    }
100
101    /// Inserts or ignores a `Currency` row via the provided `pool`.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if the INSERT operation fails.
106    pub async fn add_currency(pool: &PgPool, currency: Currency) -> anyhow::Result<()> {
107        sqlx::query(
108            "INSERT INTO currency (id, precision, iso4217, name, currency_type) VALUES ($1, $2, $3, $4, $5::currency_type) ON CONFLICT (id) DO NOTHING"
109        )
110            .bind(currency.code.as_str())
111            .bind(i32::from(currency.precision))
112            .bind(i32::from(currency.iso4217))
113            .bind(currency.name.as_str())
114            .bind(CurrencyTypeModel(currency.currency_type))
115            .execute(pool)
116            .await
117            .map(|_| ())
118            .map_err(|e| anyhow::anyhow!("Failed to insert into currency table: {e}"))
119    }
120
121    /// Loads all `Currency` entries via the provided `pool`.
122    ///
123    /// # Errors
124    ///
125    /// Returns an error if the SELECT operation fails.
126    pub async fn load_currencies(pool: &PgPool) -> anyhow::Result<Vec<Currency>> {
127        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency ORDER BY id ASC")
128            .fetch_all(pool)
129            .await
130            .map(|rows| rows.into_iter().map(|row| row.0).collect())
131            .map_err(|e| anyhow::anyhow!("Failed to load currencies: {e}"))
132    }
133
134    /// Loads a single `Currency` entry by `code` via the provided `pool`.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if the SELECT operation fails.
139    pub async fn load_currency(pool: &PgPool, code: &str) -> anyhow::Result<Option<Currency>> {
140        sqlx::query_as::<_, CurrencyModel>("SELECT * FROM currency WHERE id = $1")
141            .bind(code)
142            .fetch_optional(pool)
143            .await
144            .map(|currency| currency.map(|row| row.0))
145            .map_err(|e| anyhow::anyhow!("Failed to load currency: {e}"))
146    }
147
148    /// Inserts or updates an `InstrumentAny` entry via the provided `pool`.
149    ///
150    /// # Errors
151    ///
152    /// Returns an error if the INSERT or UPDATE operation fails.
153    pub async fn add_instrument(
154        pool: &PgPool,
155        kind: &str,
156        instrument: Box<dyn Instrument>,
157    ) -> anyhow::Result<()> {
158        sqlx::query(r#"
159            INSERT INTO "instrument" (
160                id, kind, raw_symbol, base_currency, underlying, quote_currency, settlement_currency, isin, asset_class, exchange,
161                multiplier, option_kind, is_inverse, strike_price, activation_ns, expiration_ns, price_precision, size_precision,
162                price_increment, size_increment, maker_fee, taker_fee, margin_init, margin_maint, lot_size, max_quantity, min_quantity, max_notional,
163                min_notional, max_price, min_price, ts_init, ts_event, created_at, updated_at
164            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::asset_class, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
165            ON CONFLICT (id)
166            DO UPDATE
167            SET
168                kind = $2, raw_symbol = $3, base_currency= $4, underlying = $5, quote_currency = $6, settlement_currency = $7, isin = $8, asset_class = $9, exchange = $10,
169                 multiplier = $11, option_kind = $12, is_inverse = $13, strike_price = $14, activation_ns = $15, expiration_ns = $16 , price_precision = $17, size_precision = $18,
170                 price_increment = $19, size_increment = $20, maker_fee = $21, taker_fee = $22, margin_init = $23, margin_maint = $24, lot_size = $25, max_quantity = $26,
171                 min_quantity = $27, max_notional = $28, min_notional = $29, max_price = $30, min_price = $31, ts_init = $32,  ts_event = $33, updated_at = CURRENT_TIMESTAMP
172            "#)
173            .bind(instrument.id().to_string())
174            .bind(kind)
175            .bind(instrument.raw_symbol().to_string())
176            .bind(instrument.base_currency().map(|x| x.code.as_str()))
177            .bind(instrument.underlying().map(|x| x.to_string()))
178            .bind(instrument.quote_currency().code.as_str())
179            .bind(instrument.settlement_currency().code.as_str())
180            .bind(instrument.isin().map(|x| x.to_string()))
181            .bind(AssetClassModel(instrument.asset_class()))
182            .bind(instrument.exchange().map(|x| x.to_string()))
183            .bind(instrument.multiplier().to_string())
184            .bind(instrument.option_kind().map(|x| x.to_string()))
185            .bind(instrument.is_inverse())
186            .bind(instrument.strike_price().map(|x| x.to_string()))
187            .bind(instrument.activation_ns().map(|x| x.to_string()))
188            .bind(instrument.expiration_ns().map(|x| x.to_string()))
189            .bind(i32::from(instrument.price_precision()))
190            .bind(i32::from(instrument.size_precision()))
191            .bind(instrument.price_increment().to_string())
192            .bind(instrument.size_increment().to_string())
193            .bind(instrument.maker_fee().to_string())
194            .bind(instrument.taker_fee().to_string())
195            .bind(instrument.margin_init().to_string())
196            .bind(instrument.margin_maint().to_string())
197            .bind(instrument.lot_size().map(|x| x.to_string()))
198            .bind(instrument.max_quantity().map(|x| x.to_string()))
199            .bind(instrument.min_quantity().map(|x| x.to_string()))
200            .bind(instrument.max_notional().map(|x| x.to_string()))
201            .bind(instrument.min_notional().map(|x| x.to_string()))
202            .bind(instrument.max_price().map(|x| x.to_string()))
203            .bind(instrument.min_price().map(|x| x.to_string()))
204            .bind(instrument.ts_init().to_string())
205            .bind(instrument.ts_event().to_string())
206            .execute(pool)
207            .await
208            .map(|_| ())
209            .map_err(|e| anyhow::anyhow!("Failed to insert item {} into instrument table: {:?}", instrument.id(), e))
210    }
211
212    /// Loads a single `InstrumentAny` entry by `instrument_id` via the provided `pool`.
213    ///
214    /// # Errors
215    ///
216    /// Returns an error if the SELECT operation fails.
217    pub async fn load_instrument(
218        pool: &PgPool,
219        instrument_id: &InstrumentId,
220    ) -> anyhow::Result<Option<InstrumentAny>> {
221        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument WHERE id = $1")
222            .bind(instrument_id.to_string())
223            .fetch_optional(pool)
224            .await
225            .map(|instrument| instrument.map(|row| row.0))
226            .map_err(|e| {
227                anyhow::anyhow!("Failed to load instrument with id {instrument_id},error is: {e}")
228            })
229    }
230
231    /// Loads all `InstrumentAny` entries via the provided `pool`.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the SELECT operation fails.
236    pub async fn load_instruments(pool: &PgPool) -> anyhow::Result<Vec<InstrumentAny>> {
237        sqlx::query_as::<_, InstrumentAnyModel>("SELECT * FROM instrument")
238            .fetch_all(pool)
239            .await
240            .map(|rows| rows.into_iter().map(|row| row.0).collect())
241            .map_err(|e| anyhow::anyhow!("Failed to load instruments: {e}"))
242    }
243
244    /// Inserts or updates an `OrderAny` entry via the provided `pool`.
245    ///
246    /// # Errors
247    ///
248    /// Returns an error if the SQL INSERT or UPDATE operation fails.
249    ///
250    /// # Panics
251    ///
252    /// Panics if the order initialization existence check unwraps `None` after awaiting.
253    pub async fn add_order(
254        pool: &PgPool,
255        _kind: &str,
256        updated: bool,
257        order: Box<dyn Order>,
258        client_id: Option<ClientId>,
259    ) -> anyhow::Result<()> {
260        if updated {
261            let exists = Self::check_if_order_initialized_exists(pool, order.client_order_id())
262                .await
263                .unwrap();
264            assert!(
265                exists,
266                "OrderInitialized event does not exist for order: {}",
267                order.client_order_id()
268            );
269        }
270        match order.last_event().clone() {
271            OrderEventAny::Accepted(event) => {
272                Self::add_order_event(pool, Box::new(event), client_id).await
273            }
274            OrderEventAny::CancelRejected(event) => {
275                Self::add_order_event(pool, Box::new(event), client_id).await
276            }
277            OrderEventAny::Canceled(event) => {
278                Self::add_order_event(pool, Box::new(event), client_id).await
279            }
280            OrderEventAny::Denied(event) => {
281                Self::add_order_event(pool, Box::new(event), client_id).await
282            }
283            OrderEventAny::Emulated(event) => {
284                Self::add_order_event(pool, Box::new(event), client_id).await
285            }
286            OrderEventAny::Expired(event) => {
287                Self::add_order_event(pool, Box::new(event), client_id).await
288            }
289            OrderEventAny::Filled(event) => {
290                Self::add_order_event(pool, Box::new(event), client_id).await
291            }
292            OrderEventAny::Initialized(event) => {
293                Self::add_order_event(pool, Box::new(event), client_id).await
294            }
295            OrderEventAny::ModifyRejected(event) => {
296                Self::add_order_event(pool, Box::new(event), client_id).await
297            }
298            OrderEventAny::PendingCancel(event) => {
299                Self::add_order_event(pool, Box::new(event), client_id).await
300            }
301            OrderEventAny::PendingUpdate(event) => {
302                Self::add_order_event(pool, Box::new(event), client_id).await
303            }
304            OrderEventAny::Rejected(event) => {
305                Self::add_order_event(pool, Box::new(event), client_id).await
306            }
307            OrderEventAny::Released(event) => {
308                Self::add_order_event(pool, Box::new(event), client_id).await
309            }
310            OrderEventAny::Submitted(event) => {
311                Self::add_order_event(pool, Box::new(event), client_id).await
312            }
313            OrderEventAny::Updated(event) => {
314                Self::add_order_event(pool, Box::new(event), client_id).await
315            }
316            OrderEventAny::Triggered(event) => {
317                Self::add_order_event(pool, Box::new(event), client_id).await
318            }
319        }
320    }
321
322    /// Inserts an `OrderSnapshot` entry via the provided `pool`.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the SQL INSERT operation fails.
327    ///
328    /// # Panics
329    ///
330    /// Panics if serialization of `snapshot.exec_algorithm_params` fails.
331    pub async fn add_order_snapshot(pool: &PgPool, snapshot: OrderSnapshot) -> anyhow::Result<()> {
332        let mut transaction = pool.begin().await?;
333
334        // Insert trader if it does not exist
335        // TODO remove this when node and trader initialization is implemented
336        sqlx::query(
337            r#"
338            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
339            "#,
340        )
341        .bind(snapshot.trader_id.to_string())
342        .execute(&mut *transaction)
343        .await
344        .map(|_| ())
345        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
346
347        sqlx::query(
348            r#"
349            INSERT INTO "order" (
350                id, trader_id, strategy_id, instrument_id, client_order_id, venue_order_id, position_id,
351                account_id, last_trade_id, order_type, order_side, quantity, price, trigger_price,
352                trigger_type, limit_offset, trailing_offset, trailing_offset_type, time_in_force,
353                expire_time, filled_qty, liquidity_side, avg_px, slippage, commissions, status,
354                is_post_only, is_reduce_only, is_quote_quantity, display_qty, emulation_trigger,
355                trigger_instrument_id, contingency_type, order_list_id, linked_order_ids,
356                parent_order_id, exec_algorithm_id, exec_algorithm_params, exec_spawn_id, tags, init_id, ts_init, ts_last,
357                created_at, updated_at
358            ) VALUES (
359                $1, $2, $3, $4, $1, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16,
360                $17::TRAILING_OFFSET_TYPE, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28,
361                $29, $30, $31, $32, $33, $34, $35, $36, $37, $38, $39, $40, $41, $42,
362                CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
363            )
364            ON CONFLICT (id)
365            DO UPDATE SET
366                trader_id = $2,
367                strategy_id = $3,
368                instrument_id = $4,
369                venue_order_id = $5,
370                position_id = $6,
371                account_id = $7,
372                last_trade_id = $8,
373                order_type = $9,
374                order_side = $10,
375                quantity = $11,
376                price = $12,
377                trigger_price = $13,
378                trigger_type = $14,
379                limit_offset = $15,
380                trailing_offset = $16,
381                trailing_offset_type = $17::TRAILING_OFFSET_TYPE,
382                time_in_force = $18,
383                expire_time = $19,
384                filled_qty = $20,
385                liquidity_side = $21,
386                avg_px = $22,
387                slippage = $23,
388                commissions = $24,
389                status = $25,
390                is_post_only = $26,
391                is_reduce_only = $27,
392                is_quote_quantity = $28,
393                display_qty = $29,
394                emulation_trigger = $30,
395                trigger_instrument_id = $31,
396                contingency_type = $32,
397                order_list_id = $33,
398                linked_order_ids = $34,
399                parent_order_id = $35,
400                exec_algorithm_id = $36,
401                exec_algorithm_params = $37,
402                exec_spawn_id = $38,
403                tags = $39,
404                init_id = $40,
405                ts_init = $41,
406                ts_last = $42,
407                updated_at = CURRENT_TIMESTAMP
408        "#)
409            .bind(snapshot.client_order_id.to_string())  // Used for both id and client_order_id
410            .bind(snapshot.trader_id.to_string())
411            .bind(snapshot.strategy_id.to_string())
412            .bind(snapshot.instrument_id.to_string())
413            .bind(snapshot.venue_order_id.map(|x| x.to_string()))
414            .bind(snapshot.position_id.map(|x| x.to_string()))
415            .bind(snapshot.account_id.map(|x| x.to_string()))
416            .bind(snapshot.last_trade_id.map(|x| x.to_string()))
417            .bind(snapshot.order_type.to_string())
418            .bind(snapshot.order_side.to_string())
419            .bind(snapshot.quantity.to_string())
420            .bind(snapshot.price.map(|x| x.to_string()))
421            .bind(snapshot.trigger_price.map(|x| x.to_string()))
422            .bind(snapshot.trigger_type.map(|x| x.to_string()))
423            .bind(snapshot.limit_offset.map(|x| x.to_string()))
424            .bind(snapshot.trailing_offset.map(|x| x.to_string()))
425            .bind(snapshot.trailing_offset_type.map(|x| x.to_string()))
426            .bind(snapshot.time_in_force.to_string())
427            .bind(snapshot.expire_time.map(|x| x.to_string()))
428            .bind(snapshot.filled_qty.to_string())
429            .bind(snapshot.liquidity_side.map(|x| x.to_string()))
430            .bind(snapshot.avg_px)
431            .bind(snapshot.slippage)
432            .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
433            .bind(snapshot.status.to_string())
434            .bind(snapshot.is_post_only)
435            .bind(snapshot.is_reduce_only)
436            .bind(snapshot.is_quote_quantity)
437            .bind(snapshot.display_qty.map(|x| x.to_string()))
438            .bind(snapshot.emulation_trigger.map(|x| x.to_string()))
439            .bind(snapshot.trigger_instrument_id.map(|x| x.to_string()))
440            .bind(snapshot.contingency_type.map(|x| x.to_string()))
441            .bind(snapshot.order_list_id.map(|x| x.to_string()))
442            .bind(snapshot.linked_order_ids.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
443            .bind(snapshot.parent_order_id.map(|x| x.to_string()))
444            .bind(snapshot.exec_algorithm_id.map(|x| x.to_string()))
445            .bind(snapshot.exec_algorithm_params.map(|x| serde_json::to_value(x).unwrap()))
446            .bind(snapshot.exec_spawn_id.map(|x| x.to_string()))
447            .bind(snapshot.tags.map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
448            .bind(snapshot.init_id.to_string())
449            .bind(snapshot.ts_init.to_string())
450            .bind(snapshot.ts_last.to_string())
451            .execute(&mut *transaction)
452            .await
453            .map(|_| ())
454            .map_err(|e| anyhow::anyhow!("Failed to insert into order table: {e}"))?;
455
456        transaction
457            .commit()
458            .await
459            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
460    }
461
462    /// Loads an `OrderSnapshot` entry by client order ID via the provided `pool`.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the SQL SELECT or deserialization fails.
467    pub async fn load_order_snapshot(
468        pool: &PgPool,
469        client_order_id: &ClientOrderId,
470    ) -> anyhow::Result<Option<OrderSnapshot>> {
471        sqlx::query_as::<_, OrderSnapshotModel>(
472            r#"SELECT * FROM "order" WHERE client_order_id = $1"#,
473        )
474        .bind(client_order_id.to_string())
475        .fetch_optional(pool)
476        .await
477        .map(|model| model.map(|m| m.0))
478        .map_err(|e| anyhow::anyhow!("Failed to load order snapshot: {e}"))
479    }
480
481    /// Inserts or updates a `PositionSnapshot` entry via the provided `pool`.
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the SQL INSERT or UPDATE operation fails, or if beginning the transaction fails.
486    pub async fn add_position_snapshot(
487        pool: &PgPool,
488        snapshot: PositionSnapshot,
489    ) -> anyhow::Result<()> {
490        let mut transaction = pool.begin().await?;
491
492        // Insert trader if it does not exist
493        // TODO remove this when node and trader initialization is implemented
494        sqlx::query(
495            r#"
496            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
497        "#,
498        )
499        .bind(snapshot.trader_id.to_string())
500        .execute(&mut *transaction)
501        .await
502        .map(|_| ())
503        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
504
505        sqlx::query(r#"
506            INSERT INTO "position" (
507                id, trader_id, strategy_id, instrument_id, account_id, opening_order_id, closing_order_id, entry, side, signed_qty, quantity, peak_qty,
508                quote_currency, base_currency, settlement_currency, avg_px_open, avg_px_close, realized_return, realized_pnl, unrealized_pnl, commissions,
509                duration_ns, ts_opened, ts_closed, ts_init, ts_last, created_at, updated_at
510            ) VALUES (
511                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
512                $21, $22, $23, $24, $25, $26, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
513            )
514            ON CONFLICT (id)
515            DO UPDATE
516            SET
517                trader_id = $2, strategy_id = $3, instrument_id = $4, account_id = $5, opening_order_id = $6, closing_order_id = $7, entry = $8, side = $9, signed_qty = $10, quantity = $11,
518                peak_qty = $12, quote_currency = $13, base_currency = $14, settlement_currency = $15, avg_px_open = $16, avg_px_close = $17, realized_return = $18, realized_pnl = $19, unrealized_pnl = $20,
519                commissions = $21, duration_ns = $22, ts_opened = $23, ts_closed = $24, ts_init = $25, ts_last = $26, updated_at = CURRENT_TIMESTAMP
520        "#)
521            .bind(snapshot.position_id.to_string())
522            .bind(snapshot.trader_id.to_string())
523            .bind(snapshot.strategy_id.to_string())
524            .bind(snapshot.instrument_id.to_string())
525            .bind(snapshot.account_id.to_string())
526            .bind(snapshot.opening_order_id.to_string())
527            .bind(snapshot.closing_order_id.map(|x| x.to_string()))
528            .bind(snapshot.entry.to_string())
529            .bind(snapshot.side.to_string())
530            .bind(snapshot.signed_qty)
531            .bind(snapshot.quantity.to_string())
532            .bind(snapshot.peak_qty.to_string())
533            .bind(snapshot.quote_currency.to_string())
534            .bind(snapshot.base_currency.map(|x| x.to_string()))
535            .bind(snapshot.settlement_currency.to_string())
536            .bind(snapshot.avg_px_open)
537            .bind(snapshot.avg_px_close)
538            .bind(snapshot.realized_return)
539            .bind(snapshot.realized_pnl.map(|x| x.to_string()))
540            .bind(snapshot.unrealized_pnl.map(|x| x.to_string()))
541            .bind(snapshot.commissions.iter().map(ToString::to_string).collect::<Vec<String>>())
542            .bind(snapshot.duration_ns.map(|x| x.to_string()))
543            .bind(snapshot.ts_opened.to_string())
544            .bind(snapshot.ts_closed.map(|x| x.to_string()))
545            .bind(snapshot.ts_init.to_string())
546            .bind(snapshot.ts_last.to_string())
547            .execute(&mut *transaction)
548            .await
549            .map(|_| ())
550            .map_err(|e| anyhow::anyhow!("Failed to insert into position table: {e}"))?;
551        transaction
552            .commit()
553            .await
554            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
555    }
556
557    /// Loads a `PositionSnapshot` entry by `position_id` via the provided `pool`.
558    ///
559    /// # Errors
560    ///
561    /// Returns an error if the SQL SELECT or deserialization fails.
562    pub async fn load_position_snapshot(
563        pool: &PgPool,
564        position_id: &PositionId,
565    ) -> anyhow::Result<Option<PositionSnapshot>> {
566        sqlx::query_as::<_, PositionSnapshotModel>(r#"SELECT * FROM "position" WHERE id = $1"#)
567            .bind(position_id.to_string())
568            .fetch_optional(pool)
569            .await
570            .map(|model| model.map(|m| m.0))
571            .map_err(|e| anyhow::anyhow!("Failed to load position snapshot: {e}"))
572    }
573
574    /// Checks if an `OrderInitialized` event exists for the given `client_order_id` via the provided `pool`.
575    ///
576    /// # Errors
577    ///
578    /// Returns an error if the SQL SELECT operation fails.
579    pub async fn check_if_order_initialized_exists(
580        pool: &PgPool,
581        client_order_id: ClientOrderId,
582    ) -> anyhow::Result<bool> {
583        sqlx::query(r#"
584            SELECT EXISTS(SELECT 1 FROM "order_event" WHERE client_order_id = $1 AND kind = 'OrderInitialized')
585        "#)
586            .bind(client_order_id.to_string())
587            .fetch_one(pool)
588            .await
589            .map(|row| row.get(0))
590            .map_err(|e| anyhow::anyhow!("Failed to check if order initialized exists: {e}"))
591    }
592
593    /// Checks if any account event exists for the given `account_id` via the provided `pool`.
594    ///
595    /// # Errors
596    ///
597    /// Returns an error if the SQL SELECT operation fails.
598    pub async fn check_if_account_event_exists(
599        pool: &PgPool,
600        account_id: AccountId,
601    ) -> anyhow::Result<bool> {
602        sqlx::query(
603            r#"
604            SELECT EXISTS(SELECT 1 FROM "account_event" WHERE account_id = $1)
605        "#,
606        )
607        .bind(account_id.to_string())
608        .fetch_one(pool)
609        .await
610        .map(|row| row.get(0))
611        .map_err(|e| anyhow::anyhow!("Failed to check if account event exists: {e}"))
612    }
613
614    /// Inserts or updates an order event entry via the provided `pool`.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if the SQL INSERT or UPDATE operation fails.
619    pub async fn add_order_event(
620        pool: &PgPool,
621        order_event: Box<dyn OrderEvent>,
622        client_id: Option<ClientId>,
623    ) -> anyhow::Result<()> {
624        let mut transaction = pool.begin().await?;
625
626        // Insert trader if it does not exist
627        // TODO remove this when node and trader initialization is implemented
628        sqlx::query(
629            r#"
630            INSERT INTO "trader" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
631        "#,
632        )
633        .bind(order_event.trader_id().to_string())
634        .execute(&mut *transaction)
635        .await
636        .map(|_| ())
637        .map_err(|e| anyhow::anyhow!("Failed to insert into trader table: {e}"))?;
638
639        // Insert client if it does not exist
640        // TODO remove this when client initialization is implemented
641        if let Some(client_id) = client_id {
642            sqlx::query(
643                r#"
644                INSERT INTO "client" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
645            "#,
646            )
647            .bind(client_id.to_string())
648            .execute(&mut *transaction)
649            .await
650            .map(|_| ())
651            .map_err(|e| anyhow::anyhow!("Failed to insert into client table: {e}"))?;
652        }
653
654        sqlx::query(r#"
655            INSERT INTO "order_event" (
656                id, kind, client_order_id, order_type, order_side, trader_id, client_id, reason, strategy_id, instrument_id, trade_id, currency, quantity, time_in_force, liquidity_side,
657                post_only, reduce_only, quote_quantity, reconciliation, price, last_px, last_qty, trigger_price, trigger_type, limit_offset, trailing_offset,
658                trailing_offset_type, expire_time, display_qty, emulation_trigger, trigger_instrument_id, contingency_type,
659                order_list_id, linked_order_ids, parent_order_id,
660                exec_algorithm_id, exec_spawn_id, venue_order_id, account_id, position_id, commission, ts_event, ts_init, created_at, updated_at
661            ) VALUES (
662                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20,
663                $21, $22, $23, $24, $25, $26::trailing_offset_type, $27, $28, $29, $30, $31, $32, $33, $34,
664                $35, $36, $37, $38, $39, $40, $41, $42, $43, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
665            )
666            ON CONFLICT (id)
667            DO UPDATE
668            SET
669                kind = $2, client_order_id = $3, order_type = $4, order_side=$5, trader_id = $6, client_id = $7, reason = $8, strategy_id = $9, instrument_id = $10, trade_id = $11, currency = $12,
670                quantity = $13, time_in_force = $14, liquidity_side = $15, post_only = $16, reduce_only = $17, quote_quantity = $18, reconciliation = $19, price = $20, last_px = $21,
671                last_qty = $22, trigger_price = $23, trigger_type = $24, limit_offset = $25, trailing_offset = $26, trailing_offset_type = $27, expire_time = $28, display_qty = $29,
672                emulation_trigger = $30, trigger_instrument_id = $31, contingency_type = $32, order_list_id = $33, linked_order_ids = $34, parent_order_id = $35, exec_algorithm_id = $36,
673                exec_spawn_id = $37, venue_order_id = $38, account_id = $39, position_id = $40, commission = $41, ts_event = $42, ts_init = $43, updated_at = CURRENT_TIMESTAMP
674
675        "#)
676            .bind(order_event.id().to_string())
677            .bind(order_event.kind())
678            .bind(order_event.client_order_id().to_string())
679            .bind(order_event.order_type().map(|x| x.to_string()))
680            .bind(order_event.order_side().map(|x| x.to_string()))
681            .bind(order_event.trader_id().to_string())
682            .bind(client_id.map(|x| x.to_string()))
683            .bind(order_event.reason().map(|x| x.to_string()))
684            .bind(order_event.strategy_id().to_string())
685            .bind(order_event.instrument_id().to_string())
686            .bind(order_event.trade_id().map(|x| x.to_string()))
687            .bind(order_event.currency().map(|x| x.code.as_str()))
688            .bind(order_event.quantity().map(|x| x.to_string()))
689            .bind(order_event.time_in_force().map(|x| x.to_string()))
690            .bind(order_event.liquidity_side().map(|x| x.to_string()))
691            .bind(order_event.post_only())
692            .bind(order_event.reduce_only())
693            .bind(order_event.quote_quantity())
694            .bind(order_event.reconciliation())
695            .bind(order_event.price().map(|x| x.to_string()))
696            .bind(order_event.last_px().map(|x| x.to_string()))
697            .bind(order_event.last_qty().map(|x| x.to_string()))
698            .bind(order_event.trigger_price().map(|x| x.to_string()))
699            .bind(order_event.trigger_type().map(|x| x.to_string()))
700            .bind(order_event.limit_offset().map(|x| x.to_string()))
701            .bind(order_event.trailing_offset().map(|x| x.to_string()))
702            .bind(order_event.trailing_offset_type().map(TrailingOffsetTypeModel))
703            .bind(order_event.expire_time().map(|x| x.to_string()))
704            .bind(order_event.display_qty().map(|x| x.to_string()))
705            .bind(order_event.emulation_trigger().map(|x| x.to_string()))
706            .bind(order_event.trigger_instrument_id().map(|x| x.to_string()))
707            .bind(order_event.contingency_type().map(|x| x.to_string()))
708            .bind(order_event.order_list_id().map(|x| x.to_string()))
709            .bind(order_event.linked_order_ids().map(|x| x.iter().map(ToString::to_string).collect::<Vec<String>>()))
710            .bind(order_event.parent_order_id().map(|x| x.to_string()))
711            .bind(order_event.exec_algorithm_id().map(|x| x.to_string()))
712            .bind(order_event.exec_spawn_id().map(|x| x.to_string()))
713            .bind(order_event.venue_order_id().map(|x| x.to_string()))
714            .bind(order_event.account_id().map(|x| x.to_string()))
715            .bind(order_event.position_id().map(|x| x.to_string()))
716            .bind(order_event.commission().map(|x| x.to_string()))
717            .bind(order_event.ts_event().to_string())
718            .bind(order_event.ts_init().to_string())
719            .execute(&mut *transaction)
720            .await
721            .map(|_| ())
722            .map_err(|e| anyhow::anyhow!("Failed to insert into order_event table: {e}"))?;
723        transaction
724            .commit()
725            .await
726            .map_err(|e| anyhow::anyhow!("Failed to commit transaction: {e}"))
727    }
728
729    /// Loads all order events for a `client_order_id` via the provided `pool`.
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if the SQL SELECT or deserialization fails.
734    pub async fn load_order_events(
735        pool: &PgPool,
736        client_order_id: &ClientOrderId,
737    ) -> anyhow::Result<Vec<OrderEventAny>> {
738        sqlx::query_as::<_, OrderEventAnyModel>(r#"SELECT * FROM "order_event" event WHERE event.client_order_id = $1 ORDER BY created_at ASC"#)
739        .bind(client_order_id.to_string())
740        .fetch_all(pool)
741        .await
742        .map(|rows| rows.into_iter().map(|row| row.0).collect())
743        .map_err(|e| anyhow::anyhow!("Failed to load order events: {e}"))
744    }
745
746    /// Loads and assembles a complete `OrderAny` for a `client_order_id` via the provided `pool`.
747    ///
748    /// # Errors
749    ///
750    /// Returns an error if assembling events or SQL operations fail.
751    ///
752    /// # Panics
753    ///
754    /// Panics if assembling the order from events fails.
755    pub async fn load_order(
756        pool: &PgPool,
757        client_order_id: &ClientOrderId,
758    ) -> anyhow::Result<Option<OrderAny>> {
759        let order_events = Self::load_order_events(pool, client_order_id).await;
760
761        match order_events {
762            Ok(order_events) => {
763                if order_events.is_empty() {
764                    return Ok(None);
765                }
766                let order = OrderAny::from_events(order_events).unwrap();
767                Ok(Some(order))
768            }
769            Err(e) => anyhow::bail!("Failed to load order events: {e}"),
770        }
771    }
772
773    /// Loads and assembles all `OrderAny` entries via the provided `pool`.
774    ///
775    /// # Errors
776    ///
777    /// Returns an error if loading events or SQL operations fail.
778    ///
779    /// # Panics
780    ///
781    /// Panics if loading or assembling any individual order fails.
782    pub async fn load_orders(pool: &PgPool) -> anyhow::Result<Vec<OrderAny>> {
783        let mut orders: Vec<OrderAny> = Vec::new();
784        let client_order_ids: Vec<ClientOrderId> = sqlx::query(
785            r#"
786            SELECT DISTINCT client_order_id FROM "order_event"
787        "#,
788        )
789        .fetch_all(pool)
790        .await
791        .map(|rows| {
792            rows.into_iter()
793                .map(|row| ClientOrderId::from(row.get::<&str, _>(0)))
794                .collect()
795        })
796        .map_err(|e| anyhow::anyhow!("Failed to load order ids: {e}"))?;
797        for id in client_order_ids {
798            let order = Self::load_order(pool, &id).await.unwrap();
799            match order {
800                Some(order) => {
801                    orders.push(order);
802                }
803                None => {
804                    continue;
805                }
806            }
807        }
808        Ok(orders)
809    }
810
811    /// Inserts or updates an `AccountAny` entry via the provided `pool`.
812    ///
813    /// # Errors
814    ///
815    /// Returns an error if the SQL INSERT or UPDATE operation fails.
816    ///
817    /// # Panics
818    ///
819    /// Panics if checking for existing account event unwrap fails.
820    pub async fn add_account(
821        pool: &PgPool,
822        kind: &str,
823        updated: bool,
824        account: Box<dyn Account>,
825    ) -> anyhow::Result<()> {
826        if updated {
827            let exists = Self::check_if_account_event_exists(pool, account.id())
828                .await
829                .unwrap();
830            assert!(
831                exists,
832                "Account event does not exist for account: {}",
833                account.id()
834            );
835        }
836
837        let mut transaction = pool.begin().await?;
838
839        sqlx::query(
840            r#"
841            INSERT INTO "account" (id) VALUES ($1) ON CONFLICT (id) DO NOTHING
842        "#,
843        )
844        .bind(account.id().to_string())
845        .execute(&mut *transaction)
846        .await
847        .map(|_| ())
848        .map_err(|e| anyhow::anyhow!("Failed to insert into account table: {e}"))?;
849
850        let account_event = account.last_event().unwrap();
851        sqlx::query(r#"
852            INSERT INTO "account_event" (
853                id, kind, account_id, base_currency, balances, margins, is_reported, ts_event, ts_init, created_at, updated_at
854            ) VALUES (
855                $1, $2, $3, $4, $5, $6, $7, $8, $9, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
856            )
857            ON CONFLICT (id)
858            DO UPDATE
859            SET
860                kind = $2, account_id = $3, base_currency = $4, balances = $5, margins = $6, is_reported = $7,
861                ts_event = $8, ts_init = $9, updated_at = CURRENT_TIMESTAMP
862        "#)
863            .bind(account_event.event_id.to_string())
864            .bind(kind.to_string())
865            .bind(account_event.account_id.to_string())
866            .bind(account_event.base_currency.map(|x| x.code.as_str()))
867            .bind(serde_json::to_value::<Vec<AccountBalance>>(account_event.balances).unwrap())
868            .bind(serde_json::to_value::<Vec<MarginBalance>>(account_event.margins).unwrap())
869            .bind(account_event.is_reported)
870            .bind(account_event.ts_event.to_string())
871            .bind(account_event.ts_init.to_string())
872            .execute(&mut *transaction)
873            .await
874            .map(|_| ())
875            .map_err(|e| anyhow::anyhow!("Failed to insert into account_event table: {e}"))?;
876        transaction
877            .commit()
878            .await
879            .map_err(|e| anyhow::anyhow!("Failed to commit add_account transaction: {e}"))
880    }
881
882    /// Loads all account events for `account_id` via the provided `pool`.
883    ///
884    /// # Errors
885    ///
886    /// Returns an error if the SQL SELECT or deserialization fails.
887    pub async fn load_account_events(
888        pool: &PgPool,
889        account_id: &AccountId,
890    ) -> anyhow::Result<Vec<AccountState>> {
891        sqlx::query_as::<_, AccountEventModel>(
892            r#"SELECT * FROM "account_event" WHERE account_id = $1 ORDER BY created_at ASC"#,
893        )
894        .bind(account_id.to_string())
895        .fetch_all(pool)
896        .await
897        .map(|rows| rows.into_iter().map(|row| row.0).collect())
898        .map_err(|e| anyhow::anyhow!("Failed to load account events: {e}"))
899    }
900
901    /// Loads and assembles a complete `AccountAny` for `account_id` via the provided `pool`.
902    ///
903    /// # Errors
904    ///
905    /// Returns an error if assembling events or SQL operations fail.
906    ///
907    /// # Panics
908    ///
909    /// Panics if assembling the account from events fails.
910    pub async fn load_account(
911        pool: &PgPool,
912        account_id: &AccountId,
913    ) -> anyhow::Result<Option<AccountAny>> {
914        let account_events = Self::load_account_events(pool, account_id).await;
915        match account_events {
916            Ok(account_events) => {
917                if account_events.is_empty() {
918                    return Ok(None);
919                }
920                let account = AccountAny::from_events(account_events).unwrap();
921                Ok(Some(account))
922            }
923            Err(e) => anyhow::bail!("Failed to load account events: {e}"),
924        }
925    }
926
927    /// Loads and assembles all `AccountAny` entries via the provided `pool`.
928    ///
929    /// # Errors
930    ///
931    /// Returns an error if loading events or SQL operations fail.
932    ///
933    /// # Panics
934    ///
935    /// Panics if loading or assembling any individual account fails.
936    pub async fn load_accounts(pool: &PgPool) -> anyhow::Result<Vec<AccountAny>> {
937        let mut accounts: Vec<AccountAny> = Vec::new();
938        let account_ids: Vec<AccountId> = sqlx::query(
939            r#"
940            SELECT DISTINCT account_id FROM "account_event"
941        "#,
942        )
943        .fetch_all(pool)
944        .await
945        .map(|rows| {
946            rows.into_iter()
947                .map(|row| AccountId::from(row.get::<&str, _>(0)))
948                .collect()
949        })
950        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
951        for id in account_ids {
952            let account = Self::load_account(pool, &id).await.unwrap();
953            match account {
954                Some(account) => {
955                    accounts.push(account);
956                }
957                None => {
958                    continue;
959                }
960            }
961        }
962        Ok(accounts)
963    }
964
965    /// Inserts a `TradeTick` entry via the provided `pool`.
966    ///
967    /// # Errors
968    ///
969    /// Returns an error if the SQL INSERT operation fails.
970    pub async fn add_trade(pool: &PgPool, trade: &TradeTick) -> anyhow::Result<()> {
971        sqlx::query(r#"
972            INSERT INTO "trade" (
973                instrument_id, price, quantity, aggressor_side, venue_trade_id,
974                ts_event, ts_init, created_at, updated_at
975            ) VALUES (
976                $1, $2, $3, $4::aggressor_side, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
977            )
978            ON CONFLICT (id)
979            DO UPDATE
980            SET
981                instrument_id = $1, price = $2, quantity = $3, aggressor_side = $4, venue_trade_id = $5,
982                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
983        "#)
984            .bind(trade.instrument_id.to_string())
985            .bind(trade.price.to_string())
986            .bind(trade.size.to_string())
987            .bind(AggressorSideModel(trade.aggressor_side))
988            .bind(trade.trade_id.to_string())
989            .bind(trade.ts_event.to_string())
990            .bind(trade.ts_init.to_string())
991            .execute(pool)
992            .await
993            .map(|_| ())
994            .map_err(|e| anyhow::anyhow!("Failed to insert into trade table: {e}"))
995    }
996
997    /// Loads all `TradeTick` entries for `instrument_id` via the provided `pool`.
998    ///
999    /// # Errors
1000    ///
1001    /// Returns an error if the SQL SELECT or deserialization fails.
1002    pub async fn load_trades(
1003        pool: &PgPool,
1004        instrument_id: &InstrumentId,
1005    ) -> anyhow::Result<Vec<TradeTick>> {
1006        sqlx::query_as::<_, TradeTickModel>(
1007            r#"SELECT * FROM "trade" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1008        )
1009        .bind(instrument_id.to_string())
1010        .fetch_all(pool)
1011        .await
1012        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1013        .map_err(|e| anyhow::anyhow!("Failed to load trades: {e}"))
1014    }
1015
1016    /// Inserts a `QuoteTick` entry via the provided `pool`.
1017    ///
1018    /// # Errors
1019    ///
1020    /// Returns an error if the SQL INSERT operation fails.
1021    pub async fn add_quote(pool: &PgPool, quote: &QuoteTick) -> anyhow::Result<()> {
1022        sqlx::query(r#"
1023            INSERT INTO "quote" (
1024                instrument_id, bid_price, ask_price, bid_size, ask_size, ts_event, ts_init, created_at, updated_at
1025            ) VALUES (
1026                $1, $2, $3, $4, $5, $6, $7, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1027            )
1028            ON CONFLICT (id)
1029            DO UPDATE
1030            SET
1031                instrument_id = $1, bid_price = $2, ask_price = $3, bid_size = $4, ask_size = $5,
1032                ts_event = $6, ts_init = $7, updated_at = CURRENT_TIMESTAMP
1033        "#)
1034            .bind(quote.instrument_id.to_string())
1035            .bind(quote.bid_price.to_string())
1036            .bind(quote.ask_price.to_string())
1037            .bind(quote.bid_size.to_string())
1038            .bind(quote.ask_size.to_string())
1039            .bind(quote.ts_event.to_string())
1040            .bind(quote.ts_init.to_string())
1041            .execute(pool)
1042            .await
1043            .map(|_| ())
1044            .map_err(|e| anyhow::anyhow!("Failed to insert into quote table: {e}"))
1045    }
1046
1047    /// Loads all `QuoteTick` entries for `instrument_id` via the provided `pool`.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if the SQL SELECT or deserialization fails.
1052    pub async fn load_quotes(
1053        pool: &PgPool,
1054        instrument_id: &InstrumentId,
1055    ) -> anyhow::Result<Vec<QuoteTick>> {
1056        sqlx::query_as::<_, QuoteTickModel>(
1057            r#"SELECT * FROM "quote" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1058        )
1059        .bind(instrument_id.to_string())
1060        .fetch_all(pool)
1061        .await
1062        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1063        .map_err(|e| anyhow::anyhow!("Failed to load quotes: {e}"))
1064    }
1065
1066    /// Inserts a `Bar` entry via the provided `pool`.
1067    ///
1068    /// # Errors
1069    ///
1070    /// Returns an error if the SQL INSERT operation fails.
1071    pub async fn add_bar(pool: &PgPool, bar: &Bar) -> anyhow::Result<()> {
1072        println!("Adding bar: {bar:?}");
1073        sqlx::query(r#"
1074            INSERT INTO "bar" (
1075                instrument_id, step, bar_aggregation, price_type, aggregation_source, open, high, low, close, volume, ts_event, ts_init, created_at, updated_at
1076            ) VALUES (
1077                $1, $2, $3::bar_aggregation, $4::price_type, $5::aggregation_source, $6, $7, $8, $9, $10, $11, $12, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1078            )
1079            ON CONFLICT (id)
1080            DO UPDATE
1081            SET
1082                instrument_id = $1, step = $2, bar_aggregation = $3::bar_aggregation, price_type = $4::price_type, aggregation_source = $5::aggregation_source,
1083                open = $6, high = $7, low = $8, close = $9, volume = $10, ts_event = $11, ts_init = $12, updated_at = CURRENT_TIMESTAMP
1084        "#)
1085            .bind(bar.bar_type.instrument_id().to_string())
1086            .bind(bar.bar_type.spec().step.get() as i32)
1087            .bind(BarAggregationModel(bar.bar_type.spec().aggregation))
1088            .bind(PriceTypeModel(bar.bar_type.spec().price_type))
1089            .bind(AggregationSourceModel(bar.bar_type.aggregation_source()))
1090            .bind(bar.open.to_string())
1091            .bind(bar.high.to_string())
1092            .bind(bar.low.to_string())
1093            .bind(bar.close.to_string())
1094            .bind(bar.volume.to_string())
1095            .bind(bar.ts_event.to_string())
1096            .bind(bar.ts_init.to_string())
1097            .execute(pool)
1098            .await
1099            .map(|_| ())
1100            .map_err(|e| anyhow::anyhow!("Failed to insert into bar table: {e}"))
1101    }
1102
1103    /// Loads all `Bar` entries for `instrument_id` via the provided `pool`.
1104    ///
1105    /// # Errors
1106    ///
1107    /// Returns an error if the SQL SELECT or deserialization fails.
1108    pub async fn load_bars(
1109        pool: &PgPool,
1110        instrument_id: &InstrumentId,
1111    ) -> anyhow::Result<Vec<Bar>> {
1112        sqlx::query_as::<_, BarModel>(
1113            r#"SELECT * FROM "bar" WHERE instrument_id = $1 ORDER BY ts_event ASC"#,
1114        )
1115        .bind(instrument_id.to_string())
1116        .fetch_all(pool)
1117        .await
1118        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1119        .map_err(|e| anyhow::anyhow!("Failed to load bars: {e}"))
1120    }
1121
1122    /// Loads all distinct client order IDs from order events via the provided `pool`.
1123    ///
1124    /// # Errors
1125    ///
1126    /// Returns an error if the SQL SELECT or iteration fails.
1127    pub async fn load_distinct_order_event_client_ids(
1128        pool: &PgPool,
1129    ) -> anyhow::Result<AHashMap<ClientOrderId, ClientId>> {
1130        let mut map: AHashMap<ClientOrderId, ClientId> = AHashMap::new();
1131        let result = sqlx::query_as::<_, OrderEventOrderClientIdCombination>(
1132            r#"
1133            SELECT DISTINCT
1134                client_order_id AS "client_order_id",
1135                client_id AS "client_id"
1136            FROM "order_event"
1137        "#,
1138        )
1139        .fetch_all(pool)
1140        .await
1141        .map_err(|e| anyhow::anyhow!("Failed to load account ids: {e}"))?;
1142        for id in result {
1143            map.insert(id.client_order_id, id.client_id);
1144        }
1145        Ok(map)
1146    }
1147
1148    /// Inserts a `Signal` entry via the provided `pool`.
1149    ///
1150    /// # Errors
1151    ///
1152    /// Returns an error if the SQL INSERT operation fails.
1153    pub async fn add_signal(pool: &PgPool, signal: &Signal) -> anyhow::Result<()> {
1154        sqlx::query(
1155            r#"
1156            INSERT INTO "signal" (
1157                name, value, ts_event, ts_init, created_at, updated_at
1158            ) VALUES (
1159                $1, $2, $3, $4, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1160            )
1161            ON CONFLICT (id)
1162            DO UPDATE
1163            SET
1164                name = $1, value = $2, ts_event = $3, ts_init = $4,
1165                updated_at = CURRENT_TIMESTAMP
1166        "#,
1167        )
1168        .bind(signal.name.to_string())
1169        .bind(signal.value.clone())
1170        .bind(signal.ts_event.to_string())
1171        .bind(signal.ts_init.to_string())
1172        .execute(pool)
1173        .await
1174        .map(|_| ())
1175        .map_err(|e| anyhow::anyhow!("Failed to insert into signal table: {e}"))
1176    }
1177
1178    /// Loads all `Signal` entries by `name` via the provided `pool`.
1179    ///
1180    /// # Errors
1181    ///
1182    /// Returns an error if the SQL SELECT or deserialization fails.
1183    pub async fn load_signals(pool: &PgPool, name: &str) -> anyhow::Result<Vec<Signal>> {
1184        sqlx::query_as::<_, SignalModel>(
1185            r#"SELECT * FROM "signal" WHERE name = $1 ORDER BY ts_init ASC"#,
1186        )
1187        .bind(name)
1188        .fetch_all(pool)
1189        .await
1190        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1191        .map_err(|e| anyhow::anyhow!("Failed to load signals: {e}"))
1192    }
1193
1194    /// Inserts a `CustomData` entry via the provided `pool`.
1195    ///
1196    /// # Errors
1197    ///
1198    /// Returns an error if the SQL INSERT operation fails.
1199    pub async fn add_custom_data(pool: &PgPool, data: &CustomData) -> anyhow::Result<()> {
1200        sqlx::query(
1201            r#"
1202            INSERT INTO "custom" (
1203                data_type, metadata, value, ts_event, ts_init, created_at, updated_at
1204            ) VALUES (
1205                $1, $2, $3, $4, $5, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
1206            )
1207            ON CONFLICT (id)
1208            DO UPDATE
1209            SET
1210                data_type = $1, metadata = $2, value = $3, ts_event = $4, ts_init = $5,
1211                updated_at = CURRENT_TIMESTAMP
1212        "#,
1213        )
1214        .bind(data.data_type.type_name().to_string())
1215        .bind(
1216            data.data_type
1217                .metadata()
1218                .as_ref()
1219                .map_or_else(|| Ok(serde_json::Value::Null), serde_json::to_value)?,
1220        )
1221        .bind(data.value.to_vec())
1222        .bind(data.ts_event.to_string())
1223        .bind(data.ts_init.to_string())
1224        .execute(pool)
1225        .await
1226        .map(|_| ())
1227        .map_err(|e| anyhow::anyhow!("Failed to insert into custom table: {e}"))
1228    }
1229
1230    /// Loads all `CustomData` entries of `data_type` via the provided `pool`.
1231    ///
1232    /// # Errors
1233    ///
1234    /// Returns an error if the SQL SELECT or deserialization fails.
1235    pub async fn load_custom_data(
1236        pool: &PgPool,
1237        data_type: &DataType,
1238    ) -> anyhow::Result<Vec<CustomData>> {
1239        // TODO: This metadata JSON could be more efficient at some point
1240        let metadata_json = data_type
1241            .metadata()
1242            .as_ref()
1243            .map_or(Ok(serde_json::Value::Null), |metadata| {
1244                serde_json::to_value(metadata)
1245            })?;
1246
1247        sqlx::query_as::<_, CustomDataModel>(
1248            r#"SELECT * FROM "custom" WHERE data_type = $1 AND metadata = $2 ORDER BY ts_init ASC"#,
1249        )
1250        .bind(data_type.type_name())
1251        .bind(metadata_json)
1252        .fetch_all(pool)
1253        .await
1254        .map(|rows| rows.into_iter().map(|row| row.0).collect())
1255        .map_err(|e| anyhow::anyhow!("Failed to load custom data: {e}"))
1256    }
1257}