nautilus_infrastructure/sql/
pg.rsuse derive_builder::Builder;
use sqlx::{postgres::PgConnectOptions, ConnectOptions, PgPool};
#[derive(Debug, Clone, Builder)]
#[builder(default)]
pub struct PostgresConnectOptions {
pub host: String,
pub port: u16,
pub username: String,
pub password: String,
pub database: String,
}
impl PostgresConnectOptions {
pub fn new(
host: String,
port: u16,
username: String,
password: String,
database: String,
) -> Self {
Self {
host,
port,
username,
password,
database,
}
}
}
impl Default for PostgresConnectOptions {
fn default() -> Self {
PostgresConnectOptions::new(
String::from("localhost"),
5432,
String::from("nautilus"),
String::from("pass"),
String::from("nautilus"),
)
}
}
impl From<PostgresConnectOptions> for PgConnectOptions {
fn from(opt: PostgresConnectOptions) -> Self {
PgConnectOptions::new()
.host(opt.host.as_str())
.port(opt.port)
.username(opt.username.as_str())
.password(opt.password.as_str())
.database(opt.database.as_str())
.disable_statement_logging()
}
}
pub fn get_postgres_connect_options(
host: Option<String>,
port: Option<u16>,
username: Option<String>,
password: Option<String>,
database: Option<String>,
) -> anyhow::Result<PostgresConnectOptions> {
let host = match host.or_else(|| std::env::var("POSTGRES_HOST").ok()) {
Some(host) => host,
None => anyhow::bail!("No host provided from argument or POSTGRES_HOST env variable"),
};
let port = match port.or_else(|| {
std::env::var("POSTGRES_PORT")
.map(|port| port.parse::<u16>().unwrap())
.ok()
}) {
Some(port) => port,
None => anyhow::bail!("No port provided from argument or POSTGRES_PORT env variable"),
};
let username = match username.or_else(|| std::env::var("POSTGRES_USERNAME").ok()) {
Some(username) => username,
None => {
anyhow::bail!("No username provided from argument or POSTGRES_USERNAME env variable")
}
};
let database = match database.or_else(|| std::env::var("POSTGRES_DATABASE").ok()) {
Some(database) => database,
None => {
anyhow::bail!("No database provided from argument or POSTGRES_DATABASE env variable")
}
};
let password = match password.or_else(|| std::env::var("POSTGRES_PASSWORD").ok()) {
Some(password) => password,
None => {
anyhow::bail!("No password provided from argument or POSTGRES_PASSWORD env variable")
}
};
Ok(PostgresConnectOptions::new(
host, port, username, password, database,
))
}
pub async fn connect_pg(options: PgConnectOptions) -> anyhow::Result<PgPool> {
Ok(PgPool::connect_with(options).await?)
}
fn get_schema_dir() -> anyhow::Result<String> {
std::env::var("SCHEMA_DIR").or_else(|_| {
let nautilus_git_repo_name = "nautilus_trader";
let binding = std::env::current_dir().unwrap();
let current_dir = binding.to_str().unwrap();
match current_dir.find(nautilus_git_repo_name){
Some(index) => {
let schema_path = current_dir[0..index + nautilus_git_repo_name.len()].to_string() + "/schema";
Ok(schema_path)
}
None => anyhow::bail!("Could not calculate schema dir from current directory path or SCHEMA_DIR env variable")
}
})
}
pub async fn init_postgres(
pg: &PgPool,
database: String,
password: String,
schema_dir: Option<String>,
) -> anyhow::Result<()> {
log::info!("Initializing Postgres database with target permissions and schema");
match sqlx::query("CREATE SCHEMA IF NOT EXISTS public;")
.execute(pg)
.await
{
Ok(_) => log::info!("Schema public created successfully"),
Err(e) => log::error!("Error creating schema public: {:?}", e),
}
match sqlx::query(format!("CREATE ROLE {} PASSWORD '{}' LOGIN;", database, password).as_str())
.execute(pg)
.await
{
Ok(_) => log::info!("Role {} created successfully", database),
Err(e) => {
if e.to_string().contains("already exists") {
log::info!("Role {} already exists", database);
} else {
log::error!("Error creating role {}: {:?}", database, e);
}
}
}
let schema_dir = schema_dir.unwrap_or_else(|| get_schema_dir().unwrap());
let mut sql_files =
std::fs::read_dir(schema_dir)?.collect::<Result<Vec<_>, std::io::Error>>()?;
for file in &mut sql_files {
let file_name = file.file_name();
log::info!("Executing schema file: {:?}", file_name);
let file_path = file.path();
let sql_content = std::fs::read_to_string(file_path.clone())?;
let delimiter = match file_name.to_str() {
Some("functions.sql") => "$$ LANGUAGE plpgsql;",
_ => ";",
};
let sql_statements = sql_content
.split(delimiter)
.filter(|s| !s.trim().is_empty())
.map(|s| format!("{}{}", s, delimiter));
for sql_statement in sql_statements {
sqlx::query(&sql_statement)
.execute(pg)
.await
.map_err(|err| {
if err.to_string().contains("already exists") {
log::info!("Already exists error on statement, skipping");
} else {
panic!(
"Error executing statement {} with error: {:?}",
sql_statement, err
)
}
})
.unwrap();
}
}
match sqlx::query(format!("GRANT CONNECT ON DATABASE {0} TO {0};", database).as_str())
.execute(pg)
.await
{
Ok(_) => log::info!("Connect privileges granted to role {}", database),
Err(e) => log::error!(
"Error granting connect privileges to role {}: {:?}",
database,
e
),
}
match sqlx::query(format!("GRANT ALL PRIVILEGES ON SCHEMA public TO {};", database).as_str())
.execute(pg)
.await
{
Ok(_) => log::info!("All schema privileges granted to role {}", database),
Err(e) => log::error!(
"Error granting all privileges to role {}: {:?}",
database,
e
),
}
match sqlx::query(
format!(
"GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {};",
database
)
.as_str(),
)
.execute(pg)
.await
{
Ok(_) => log::info!("All tables privileges granted to role {}", database),
Err(e) => log::error!(
"Error granting all privileges to role {}: {:?}",
database,
e
),
}
match sqlx::query(
format!(
"GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {};",
database
)
.as_str(),
)
.execute(pg)
.await
{
Ok(_) => log::info!("All sequences privileges granted to role {}", database),
Err(e) => log::error!(
"Error granting all privileges to role {}: {:?}",
database,
e
),
}
match sqlx::query(
format!(
"GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO {};",
database
)
.as_str(),
)
.execute(pg)
.await
{
Ok(_) => log::info!("All functions privileges granted to role {}", database),
Err(e) => log::error!(
"Error granting all privileges to role {}: {:?}",
database,
e
),
}
Ok(())
}
pub async fn drop_postgres(pg: &PgPool, database: String) -> anyhow::Result<()> {
match sqlx::query(format!("DROP OWNED BY {}", database).as_str())
.execute(pg)
.await
{
Ok(_) => log::info!("Dropped owned objects by role {}", database),
Err(e) => log::error!("Error dropping owned by role {}: {:?}", database, e),
}
match sqlx::query(format!("REVOKE CONNECT ON DATABASE {0} FROM {0};", database).as_str())
.execute(pg)
.await
{
Ok(_) => log::info!("Revoked connect privileges from role {}", database),
Err(e) => log::error!(
"Error revoking connect privileges from role {}: {:?}",
database,
e
),
}
match sqlx::query(format!("REVOKE ALL PRIVILEGES ON DATABASE {0} FROM {0};", database).as_str())
.execute(pg)
.await
{
Ok(_) => log::info!("Revoked all privileges from role {}", database),
Err(e) => log::error!(
"Error revoking all privileges from role {}: {:?}",
database,
e
),
}
match sqlx::query("DROP SCHEMA IF EXISTS public CASCADE")
.execute(pg)
.await
{
Ok(_) => log::info!("Dropped schema public"),
Err(e) => log::error!("Error dropping schema public: {:?}", e),
}
match sqlx::query(format!("DROP ROLE IF EXISTS {};", database).as_str())
.execute(pg)
.await
{
Ok(_) => log::info!("Dropped role {}", database),
Err(e) => log::error!("Error dropping role {}: {:?}", database, e),
}
Ok(())
}