1#![allow(unsafe_code)]
23
24use std::{
25 cell::{RefCell, UnsafeCell},
26 fmt::Debug,
27 rc::Rc,
28};
29
30use ahash::{AHashMap, AHashSet};
31use nautilus_model::identifiers::{ComponentId, TraderId};
32use ustr::Ustr;
33
34use crate::{
35 actor::{Actor, registry::get_actor_registry},
36 cache::Cache,
37 clock::Clock,
38 enums::{ComponentState, ComponentTrigger},
39};
40
41pub trait Component {
43 fn component_id(&self) -> ComponentId;
45
46 fn state(&self) -> ComponentState;
48
49 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()>;
55
56 fn is_ready(&self) -> bool {
58 self.state() == ComponentState::Ready
59 }
60
61 fn not_running(&self) -> bool {
63 !self.is_running()
64 }
65
66 fn is_running(&self) -> bool {
68 self.state() == ComponentState::Running
69 }
70
71 fn is_stopped(&self) -> bool {
73 self.state() == ComponentState::Stopped
74 }
75
76 fn is_degraded(&self) -> bool {
78 self.state() == ComponentState::Degraded
79 }
80
81 fn is_faulted(&self) -> bool {
83 self.state() == ComponentState::Faulted
84 }
85
86 fn is_disposed(&self) -> bool {
88 self.state() == ComponentState::Disposed
89 }
90
91 fn register(
97 &mut self,
98 trader_id: TraderId,
99 clock: Rc<RefCell<dyn Clock>>,
100 cache: Rc<RefCell<Cache>>,
101 ) -> anyhow::Result<()>;
102
103 fn initialize(&mut self) -> anyhow::Result<()> {
109 self.transition_state(ComponentTrigger::Initialize)
110 }
111
112 fn start(&mut self) -> anyhow::Result<()> {
118 self.transition_state(ComponentTrigger::Start)?; if let Err(e) = self.on_start() {
121 log_error(&e);
122 return Err(e); }
124
125 self.transition_state(ComponentTrigger::StartCompleted)?;
126
127 Ok(())
128 }
129
130 fn stop(&mut self) -> anyhow::Result<()> {
136 self.transition_state(ComponentTrigger::Stop)?; if let Err(e) = self.on_stop() {
139 log_error(&e);
140 return Err(e); }
142
143 self.transition_state(ComponentTrigger::StopCompleted)?;
144
145 Ok(())
146 }
147
148 fn resume(&mut self) -> anyhow::Result<()> {
154 self.transition_state(ComponentTrigger::Resume)?; if let Err(e) = self.on_resume() {
157 log_error(&e);
158 return Err(e); }
160
161 self.transition_state(ComponentTrigger::ResumeCompleted)?;
162
163 Ok(())
164 }
165
166 fn degrade(&mut self) -> anyhow::Result<()> {
172 self.transition_state(ComponentTrigger::Degrade)?; if let Err(e) = self.on_degrade() {
175 log_error(&e);
176 return Err(e); }
178
179 self.transition_state(ComponentTrigger::DegradeCompleted)?;
180
181 Ok(())
182 }
183
184 fn fault(&mut self) -> anyhow::Result<()> {
190 self.transition_state(ComponentTrigger::Fault)?; if let Err(e) = self.on_fault() {
193 log_error(&e);
194 return Err(e); }
196
197 self.transition_state(ComponentTrigger::FaultCompleted)?;
198
199 Ok(())
200 }
201
202 fn reset(&mut self) -> anyhow::Result<()> {
208 self.transition_state(ComponentTrigger::Reset)?; if let Err(e) = self.on_reset() {
211 log_error(&e);
212 return Err(e); }
214
215 self.transition_state(ComponentTrigger::ResetCompleted)?;
216
217 Ok(())
218 }
219
220 fn dispose(&mut self) -> anyhow::Result<()> {
226 self.transition_state(ComponentTrigger::Dispose)?; if let Err(e) = self.on_dispose() {
229 log_error(&e);
230 return Err(e); }
232
233 self.transition_state(ComponentTrigger::DisposeCompleted)?;
234
235 Ok(())
236 }
237
238 fn on_start(&mut self) -> anyhow::Result<()> {
244 log::warn!(
245 "The `on_start` handler was called when not overridden, \
246 it's expected that any actions required when stopping the component \
247 occur here, such as unsubscribing from data",
248 );
249 Ok(())
250 }
251
252 fn on_stop(&mut self) -> anyhow::Result<()> {
258 log::warn!(
259 "The `on_stop` handler was called when not overridden, \
260 it's expected that any actions required when stopping the component \
261 occur here, such as unsubscribing from data",
262 );
263 Ok(())
264 }
265
266 fn on_resume(&mut self) -> anyhow::Result<()> {
272 log::warn!(
273 "The `on_resume` handler was called when not overridden, \
274 it's expected that any actions required when resuming the component \
275 following a stop occur here"
276 );
277 Ok(())
278 }
279
280 fn on_reset(&mut self) -> anyhow::Result<()> {
286 log::warn!(
287 "The `on_reset` handler was called when not overridden, \
288 it's expected that any actions required when resetting the component \
289 occur here, such as resetting indicators and other state"
290 );
291 Ok(())
292 }
293
294 fn on_dispose(&mut self) -> anyhow::Result<()> {
300 Ok(())
301 }
302
303 fn on_degrade(&mut self) -> anyhow::Result<()> {
309 Ok(())
310 }
311
312 fn on_fault(&mut self) -> anyhow::Result<()> {
318 Ok(())
319 }
320}
321
322fn log_error(e: &anyhow::Error) {
323 log::error!("{e}");
324}
325
326#[rustfmt::skip]
327impl ComponentState {
328 pub fn transition(&mut self, trigger: &ComponentTrigger) -> anyhow::Result<Self> {
334 let new_state = match (&self, trigger) {
335 (Self::PreInitialized, ComponentTrigger::Initialize) => Self::Ready,
336 (Self::Ready, ComponentTrigger::Reset) => Self::Resetting,
337 (Self::Ready, ComponentTrigger::Start) => Self::Starting,
338 (Self::Ready, ComponentTrigger::Dispose) => Self::Disposing,
339 (Self::Resetting, ComponentTrigger::ResetCompleted) => Self::Ready,
340 (Self::Starting, ComponentTrigger::StartCompleted) => Self::Running,
341 (Self::Starting, ComponentTrigger::Stop) => Self::Stopping,
342 (Self::Starting, ComponentTrigger::Fault) => Self::Faulting,
343 (Self::Running, ComponentTrigger::Stop) => Self::Stopping,
344 (Self::Running, ComponentTrigger::Degrade) => Self::Degrading,
345 (Self::Running, ComponentTrigger::Fault) => Self::Faulting,
346 (Self::Resuming, ComponentTrigger::Stop) => Self::Stopping,
347 (Self::Resuming, ComponentTrigger::ResumeCompleted) => Self::Running,
348 (Self::Resuming, ComponentTrigger::Fault) => Self::Faulting,
349 (Self::Stopping, ComponentTrigger::StopCompleted) => Self::Stopped,
350 (Self::Stopping, ComponentTrigger::Fault) => Self::Faulting,
351 (Self::Stopped, ComponentTrigger::Reset) => Self::Resetting,
352 (Self::Stopped, ComponentTrigger::Resume) => Self::Resuming,
353 (Self::Stopped, ComponentTrigger::Dispose) => Self::Disposing,
354 (Self::Stopped, ComponentTrigger::Fault) => Self::Faulting,
355 (Self::Degrading, ComponentTrigger::DegradeCompleted) => Self::Degraded,
356 (Self::Degraded, ComponentTrigger::Resume) => Self::Resuming,
357 (Self::Degraded, ComponentTrigger::Stop) => Self::Stopping,
358 (Self::Degraded, ComponentTrigger::Fault) => Self::Faulting,
359 (Self::Disposing, ComponentTrigger::DisposeCompleted) => Self::Disposed,
360 (Self::Faulting, ComponentTrigger::FaultCompleted) => Self::Faulted,
361 _ => anyhow::bail!("Invalid state trigger {self} -> {trigger}"),
362 };
363 Ok(new_state)
364 }
365}
366
367thread_local! {
368 static COMPONENT_REGISTRY: ComponentRegistry = ComponentRegistry::new();
369}
370
371pub struct ComponentRegistry {
376 components: RefCell<AHashMap<Ustr, Rc<UnsafeCell<dyn Component>>>>,
377 borrows: RefCell<AHashSet<Ustr>>,
378}
379
380impl Debug for ComponentRegistry {
381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382 let components_ref = self.components.borrow();
383 let keys: Vec<&Ustr> = components_ref.keys().collect();
384 f.debug_struct(stringify!(ComponentRegistry))
385 .field("components", &keys)
386 .field("active_borrows", &self.borrows.borrow().len())
387 .finish()
388 }
389}
390
391impl Default for ComponentRegistry {
392 fn default() -> Self {
393 Self::new()
394 }
395}
396
397impl ComponentRegistry {
398 pub fn new() -> Self {
399 Self {
400 components: RefCell::new(AHashMap::new()),
401 borrows: RefCell::new(AHashSet::new()),
402 }
403 }
404
405 pub fn insert(&self, id: Ustr, component: Rc<UnsafeCell<dyn Component>>) {
406 self.components.borrow_mut().insert(id, component);
407 }
408
409 pub fn get(&self, id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
410 self.components.borrow().get(id).cloned()
411 }
412
413 pub fn is_borrowed(&self, id: &Ustr) -> bool {
415 self.borrows.borrow().contains(id)
416 }
417
418 fn try_borrow(&self, id: Ustr) -> bool {
420 let mut borrows = self.borrows.borrow_mut();
421 if borrows.contains(&id) {
422 false
423 } else {
424 borrows.insert(id);
425 true
426 }
427 }
428
429 fn release_borrow(&self, id: &Ustr) {
431 self.borrows.borrow_mut().remove(id);
432 }
433}
434
435struct BorrowGuard {
440 id: Ustr,
441}
442
443impl BorrowGuard {
444 fn new(id: Ustr) -> Self {
445 Self { id }
446 }
447}
448
449impl Drop for BorrowGuard {
450 fn drop(&mut self) {
451 get_component_registry().release_borrow(&self.id);
452 }
453}
454
455pub fn get_component_registry() -> &'static ComponentRegistry {
457 COMPONENT_REGISTRY.with(|registry| unsafe {
458 std::mem::transmute::<&ComponentRegistry, &'static ComponentRegistry>(registry)
461 })
462}
463
464pub fn register_component<T>(component: T) -> Rc<UnsafeCell<T>>
466where
467 T: Component + 'static,
468{
469 let component_id = component.component_id().inner();
470 let component_ref = Rc::new(UnsafeCell::new(component));
471
472 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
474 get_component_registry().insert(component_id, component_trait_ref);
475
476 component_ref
477}
478
479pub fn register_component_actor<T>(component: T) -> Rc<UnsafeCell<T>>
481where
482 T: Component + Actor + 'static,
483{
484 let component_id = component.component_id().inner();
485 let actor_id = component.id();
486 let component_ref = Rc::new(UnsafeCell::new(component));
487
488 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = component_ref.clone();
490 get_component_registry().insert(component_id, component_trait_ref);
491
492 let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = component_ref.clone();
494 get_actor_registry().insert(actor_id, actor_trait_ref);
495
496 component_ref
497}
498
499pub fn start_component(id: &Ustr) -> anyhow::Result<()> {
507 let registry = get_component_registry();
508 let component_ref = registry
509 .get(id)
510 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
511
512 if !registry.try_borrow(*id) {
513 anyhow::bail!(
514 "Component '{id}' is already mutably borrowed. \
515 This would create aliasing mutable references (undefined behavior)."
516 );
517 }
518
519 let _guard = BorrowGuard::new(*id);
520
521 unsafe {
523 let component = &mut *component_ref.get();
524 component.start()
525 }
526}
527
528pub fn stop_component(id: &Ustr) -> anyhow::Result<()> {
536 let registry = get_component_registry();
537 let component_ref = registry
538 .get(id)
539 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
540
541 if !registry.try_borrow(*id) {
542 anyhow::bail!(
543 "Component '{id}' is already mutably borrowed. \
544 This would create aliasing mutable references (undefined behavior)."
545 );
546 }
547
548 let _guard = BorrowGuard::new(*id);
549
550 unsafe {
552 let component = &mut *component_ref.get();
553 component.stop()
554 }
555}
556
557pub fn reset_component(id: &Ustr) -> anyhow::Result<()> {
565 let registry = get_component_registry();
566 let component_ref = registry
567 .get(id)
568 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
569
570 if !registry.try_borrow(*id) {
571 anyhow::bail!(
572 "Component '{id}' is already mutably borrowed. \
573 This would create aliasing mutable references (undefined behavior)."
574 );
575 }
576
577 let _guard = BorrowGuard::new(*id);
578
579 unsafe {
581 let component = &mut *component_ref.get();
582 component.reset()
583 }
584}
585
586pub fn dispose_component(id: &Ustr) -> anyhow::Result<()> {
594 let registry = get_component_registry();
595 let component_ref = registry
596 .get(id)
597 .ok_or_else(|| anyhow::anyhow!("Component '{id}' not found in global registry"))?;
598
599 if !registry.try_borrow(*id) {
600 anyhow::bail!(
601 "Component '{id}' is already mutably borrowed. \
602 This would create aliasing mutable references (undefined behavior)."
603 );
604 }
605
606 let _guard = BorrowGuard::new(*id);
607
608 unsafe {
610 let component = &mut *component_ref.get();
611 component.dispose()
612 }
613}
614
615pub fn get_component(id: &Ustr) -> Option<Rc<UnsafeCell<dyn Component>>> {
617 get_component_registry().get(id)
618}
619
620#[cfg(test)]
621pub fn clear_component_registry() {
623 let registry = get_component_registry();
624 registry.components.borrow_mut().clear();
625 registry.borrows.borrow_mut().clear();
626}
627
628#[cfg(test)]
629mod tests {
630 use std::sync::atomic::{AtomicBool, Ordering};
631
632 use rstest::rstest;
633
634 use super::*;
635
636 struct TestComponent {
637 id: ComponentId,
638 state: ComponentState,
639 should_panic: &'static AtomicBool,
640 }
641
642 impl TestComponent {
643 fn new(name: &str, should_panic: &'static AtomicBool) -> Self {
644 Self {
645 id: ComponentId::new(name),
646 state: ComponentState::Ready,
647 should_panic,
648 }
649 }
650 }
651
652 impl Component for TestComponent {
653 fn component_id(&self) -> ComponentId {
654 self.id
655 }
656
657 fn state(&self) -> ComponentState {
658 self.state
659 }
660
661 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
662 self.state = self.state.transition(&trigger)?;
663 Ok(())
664 }
665
666 fn register(
667 &mut self,
668 _trader_id: TraderId,
669 _clock: Rc<RefCell<dyn Clock>>,
670 _cache: Rc<RefCell<Cache>>,
671 ) -> anyhow::Result<()> {
672 Ok(())
673 }
674
675 #[allow(clippy::panic_in_result_fn)] fn on_start(&mut self) -> anyhow::Result<()> {
677 if self.should_panic.load(Ordering::SeqCst) {
678 panic!("Intentional panic for testing");
679 }
680 Ok(())
681 }
682 }
683
684 static NO_PANIC: AtomicBool = AtomicBool::new(false);
685 static DO_PANIC: AtomicBool = AtomicBool::new(true);
686
687 #[rstest]
688 fn test_component_borrow_tracking_prevents_double_borrow() {
689 clear_component_registry();
690
691 let id = Ustr::from("test-component-1");
692 let component = TestComponent::new("test-component-1", &NO_PANIC);
693 let component_id = component.id.inner();
694
695 let component_ref = Rc::new(UnsafeCell::new(component));
696 get_component_registry().insert(component_id, component_ref);
697
698 let result1 = start_component(&id);
700 assert!(result1.is_ok());
701
702 let result2 = stop_component(&id);
704 assert!(result2.is_ok());
705 }
706
707 #[rstest]
708 fn test_component_borrow_released_after_lifecycle_call() {
709 clear_component_registry();
710
711 let id = Ustr::from("test-component-2");
712 let component = TestComponent::new("test-component-2", &NO_PANIC);
713 let component_id = component.id.inner();
714
715 let component_ref = Rc::new(UnsafeCell::new(component));
716 get_component_registry().insert(component_id, component_ref);
717
718 let _ = start_component(&id);
720
721 assert!(!get_component_registry().is_borrowed(&id));
723 }
724
725 #[rstest]
726 fn test_component_borrow_released_on_panic() {
727 clear_component_registry();
728
729 let id = Ustr::from("test-component-panic");
730 let component = TestComponent::new("test-component-panic", &DO_PANIC);
731 let component_id = component.id.inner();
732
733 let component_ref = Rc::new(UnsafeCell::new(component));
734 get_component_registry().insert(component_id, component_ref);
735
736 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
738 let _ = start_component(&id);
739 }));
740 assert!(result.is_err(), "Expected panic from on_start");
741
742 assert!(
744 !get_component_registry().is_borrowed(&id),
745 "Borrow was not released after panic"
746 );
747 }
748}