nautilus_infrastructure/python/sql/
cache.rsuse std::collections::HashMap;
use bytes::Bytes;
use nautilus_common::{
cache::database::CacheDatabaseAdapter, custom::CustomData, runtime::get_runtime, signal::Signal,
};
use nautilus_core::python::to_pyruntime_err;
use nautilus_model::{
data::{bar::Bar, quote::QuoteTick, trade::TradeTick, DataType},
events::position::snapshot::PositionSnapshot,
identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId},
python::{
account::{convert_account_any_to_pyobject, convert_pyobject_to_account_any},
events::order::pyobject_to_order_event,
instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
orders::{convert_order_any_to_pyobject, convert_pyobject_to_order_any},
},
types::currency::Currency,
};
use pyo3::prelude::*;
use crate::sql::{cache::PostgresCacheDatabase, queries::DatabaseQueries};
#[pymethods]
impl PostgresCacheDatabase {
#[staticmethod]
#[pyo3(name = "connect")]
#[pyo3(signature = (host=None, port=None, username=None, password=None, database=None))]
fn py_connect(
host: Option<String>,
port: Option<u16>,
username: Option<String>,
password: Option<String>,
database: Option<String>,
) -> PyResult<Self> {
let result = get_runtime().block_on(async {
PostgresCacheDatabase::connect(host, port, username, password, database).await
});
result.map_err(to_pyruntime_err)
}
#[pyo3(name = "close")]
fn py_close(&mut self) -> PyResult<()> {
self.close().map_err(to_pyruntime_err)
}
#[pyo3(name = "flush_db")]
fn py_flush_db(&mut self) -> PyResult<()> {
self.flush().map_err(to_pyruntime_err)
}
#[pyo3(name = "load")]
fn py_load(&self) -> PyResult<HashMap<String, Vec<u8>>> {
get_runtime()
.block_on(async { DatabaseQueries::load(&self.pool).await })
.map_err(to_pyruntime_err)
}
#[pyo3(name = "load_currency")]
fn py_load_currency(&self, code: &str) -> PyResult<Option<Currency>> {
let result = get_runtime()
.block_on(async { DatabaseQueries::load_currency(&self.pool, code).await });
result.map_err(to_pyruntime_err)
}
#[pyo3(name = "load_currencies")]
fn py_load_currencies(&self) -> PyResult<Vec<Currency>> {
let result =
get_runtime().block_on(async { DatabaseQueries::load_currencies(&self.pool).await });
result.map_err(to_pyruntime_err)
}
#[pyo3(name = "load_instrument")]
fn py_load_instrument(
&self,
py: Python,
instrument_id: InstrumentId,
) -> PyResult<Option<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_instrument(&self.pool, &instrument_id)
.await
.unwrap();
match result {
Some(instrument) => {
let py_object = instrument_any_to_pyobject(py, instrument)?;
Ok(Some(py_object))
}
None => Ok(None),
}
})
}
#[pyo3(name = "load_instruments")]
fn py_load_instruments(&self, py: Python) -> PyResult<Vec<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_instruments(&self.pool).await.unwrap();
let mut instruments = Vec::new();
for instrument in result {
let py_object = instrument_any_to_pyobject(py, instrument)?;
instruments.push(py_object);
}
Ok(instruments)
})
}
#[pyo3(name = "load_order")]
fn py_load_order(
&self,
py: Python,
client_order_id: ClientOrderId,
) -> PyResult<Option<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_order(&self.pool, &client_order_id)
.await
.unwrap();
match result {
Some(order) => {
let py_object = convert_order_any_to_pyobject(py, order)?;
Ok(Some(py_object))
}
None => Ok(None),
}
})
}
#[pyo3(name = "load_account")]
fn py_load_account(&self, py: Python, account_id: AccountId) -> PyResult<Option<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_account(&self.pool, &account_id)
.await
.unwrap();
match result {
Some(account) => {
let py_object = convert_account_any_to_pyobject(py, account)?;
Ok(Some(py_object))
}
None => Ok(None),
}
})
}
#[pyo3(name = "load_quotes")]
fn py_load_quotes(&self, py: Python, instrument_id: InstrumentId) -> PyResult<Vec<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_quotes(&self.pool, &instrument_id)
.await
.unwrap();
let mut quotes = Vec::new();
for quote in result {
let py_object = quote.into_py(py);
quotes.push(py_object);
}
Ok(quotes)
})
}
#[pyo3(name = "load_trades")]
fn py_load_trades(&self, py: Python, instrument_id: InstrumentId) -> PyResult<Vec<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_trades(&self.pool, &instrument_id)
.await
.unwrap();
let mut trades = Vec::new();
for trade in result {
let py_object = trade.into_py(py);
trades.push(py_object);
}
Ok(trades)
})
}
#[pyo3(name = "load_bars")]
fn py_load_bars(&self, py: Python, instrument_id: InstrumentId) -> PyResult<Vec<PyObject>> {
get_runtime().block_on(async {
let result = DatabaseQueries::load_bars(&self.pool, &instrument_id)
.await
.unwrap();
let mut bars = Vec::new();
for bar in result {
let py_object = bar.into_py(py);
bars.push(py_object);
}
Ok(bars)
})
}
#[pyo3(name = "load_signals")]
fn py_load_signals(&self, name: &str) -> PyResult<Vec<Signal>> {
get_runtime().block_on(async {
DatabaseQueries::load_signals(&self.pool, name)
.await
.map_err(to_pyruntime_err)
})
}
#[pyo3(name = "load_custom_data")]
fn py_load_custom_data(&self, data_type: DataType) -> PyResult<Vec<CustomData>> {
get_runtime().block_on(async {
DatabaseQueries::load_custom_data(&self.pool, &data_type)
.await
.map_err(to_pyruntime_err)
})
}
#[pyo3(name = "add")]
fn py_add(&self, key: String, value: Vec<u8>) -> PyResult<()> {
self.add(key, Bytes::from(value)).map_err(to_pyruntime_err)
}
#[pyo3(name = "add_currency")]
fn py_add_currency(&self, currency: Currency) -> PyResult<()> {
self.add_currency(¤cy).map_err(to_pyruntime_err)
}
#[pyo3(name = "add_instrument")]
fn py_add_instrument(&self, py: Python, instrument: PyObject) -> PyResult<()> {
let instrument_any = pyobject_to_instrument_any(py, instrument)?;
self.add_instrument(&instrument_any)
.map_err(to_pyruntime_err)
}
#[pyo3(name = "add_order")]
#[pyo3(signature = (order, client_id=None))]
fn py_add_order(
&self,
py: Python,
order: PyObject,
client_id: Option<ClientId>,
) -> PyResult<()> {
let order_any = convert_pyobject_to_order_any(py, order)?;
self.add_order(&order_any, client_id)
.map_err(to_pyruntime_err)
}
#[pyo3(name = "add_position_snapshot")]
fn py_add_position_snapshot(&self, snapshot: PositionSnapshot) -> PyResult<()> {
self.add_position_snapshot(&snapshot)
.map_err(to_pyruntime_err)
}
#[pyo3(name = "add_account")]
fn py_add_account(&self, py: Python, account: PyObject) -> PyResult<()> {
let account_any = convert_pyobject_to_account_any(py, account)?;
self.add_account(&account_any).map_err(to_pyruntime_err)
}
#[pyo3(name = "add_quote")]
fn py_add_quote(&self, quote: QuoteTick) -> PyResult<()> {
self.add_quote("e).map_err(to_pyruntime_err)
}
#[pyo3(name = "add_trade")]
fn py_add_trade(&self, trade: TradeTick) -> PyResult<()> {
self.add_trade(&trade).map_err(to_pyruntime_err)
}
#[pyo3(name = "add_bar")]
fn py_add_bar(&self, bar: Bar) -> PyResult<()> {
self.add_bar(&bar).map_err(to_pyruntime_err)
}
#[pyo3(name = "add_signal")]
fn py_add_signal(&self, signal: Signal) -> PyResult<()> {
self.add_signal(&signal).map_err(to_pyruntime_err)
}
#[pyo3(name = "add_custom_data")]
fn py_add_custom_data(&self, data: CustomData) -> PyResult<()> {
self.add_custom_data(&data).map_err(to_pyruntime_err)
}
#[pyo3(name = "update_order")]
fn py_update_order(&self, py: Python, order_event: PyObject) -> PyResult<()> {
let event = pyobject_to_order_event(py, order_event)?;
self.update_order(&event).map_err(to_pyruntime_err)
}
#[pyo3(name = "update_account")]
fn py_update_account(&self, py: Python, order: PyObject) -> PyResult<()> {
let order_any = convert_pyobject_to_account_any(py, order)?;
self.update_account(&order_any).map_err(to_pyruntime_err)
}
}