nautilus_data/engine/
runner.rsuse std::{cell::RefCell, collections::VecDeque, rc::Rc};
use nautilus_common::messages::data::{DataClientResponse, DataResponse};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use super::DataEngine;
pub trait Runner {
type Sender;
fn new() -> Self;
fn run(&mut self, engine: &mut DataEngine);
fn get_sender(&self) -> Self::Sender;
}
pub trait SendResponse {
fn send(&self, resp: DataResponse);
}
pub type DataResponseQueue = Rc<RefCell<VecDeque<DataClientResponse>>>;
pub struct BacktestRunner {
queue: DataResponseQueue,
}
impl Runner for BacktestRunner {
type Sender = DataResponseQueue;
fn new() -> Self {
Self {
queue: Rc::new(RefCell::new(VecDeque::new())),
}
}
fn run(&mut self, engine: &mut DataEngine) {
while let Some(resp) = self.queue.as_ref().borrow_mut().pop_front() {
match resp {
DataClientResponse::Response(resp) => engine.response(resp),
DataClientResponse::Data(data) => engine.process_data(data),
}
}
}
fn get_sender(&self) -> Self::Sender {
self.queue.clone()
}
}
pub struct LiveRunner {
resp_tx: UnboundedSender<DataClientResponse>,
resp_rx: UnboundedReceiver<DataClientResponse>,
}
impl Runner for LiveRunner {
type Sender = UnboundedSender<DataClientResponse>;
fn new() -> Self {
let (resp_tx, resp_rx) = tokio::sync::mpsc::unbounded_channel::<DataClientResponse>();
Self { resp_tx, resp_rx }
}
fn run(&mut self, engine: &mut DataEngine) {
while let Some(resp) = self.resp_rx.blocking_recv() {
match resp {
DataClientResponse::Response(resp) => engine.response(resp),
DataClientResponse::Data(data) => engine.process_data(data),
}
}
}
fn get_sender(&self) -> Self::Sender {
self.resp_tx.clone()
}
}