nautilus_blockchain/python/
config.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for blockchain configuration.
17
18use std::sync::Arc;
19
20use nautilus_infrastructure::sql::pg::PostgresConnectOptions;
21use nautilus_model::defi::{Chain, DexType};
22use pyo3::prelude::*;
23
24use crate::config::{BlockchainDataClientConfig, DexPoolFilters};
25
26#[pymethods]
27#[pyo3_stub_gen::derive::gen_stub_pymethods(module = "nautilus_trader.adapters.blockchain")]
28impl DexPoolFilters {
29    /// Creates a new `DexPoolFilters` instance.
30    #[new]
31    #[must_use]
32    pub fn py_new(remove_pools_with_empty_erc20_fields: Option<bool>) -> Self {
33        Self::new(remove_pools_with_empty_erc20_fields)
34    }
35}
36
37#[pymethods]
38#[pyo3_stub_gen::derive::gen_stub_pymethods(module = "nautilus_trader.adapters.blockchain")]
39impl BlockchainDataClientConfig {
40    /// Creates a new `BlockchainDataClientConfig` instance.
41    #[new]
42    #[allow(clippy::too_many_arguments)]
43    #[pyo3(signature = (chain, dex_ids, http_rpc_url, rpc_requests_per_second=None, multicall_calls_per_rpc_request=None, wss_rpc_url=None, use_hypersync_for_live_data=true, from_block=None, pool_filters=None, postgres_cache_database_config=None))]
44    fn py_new(
45        #[gen_stub(
46            override_type(
47                type_repr = "nautilus_trader.model.Chain",
48                imports = ("nautilus_trader.model",),
49            ),
50        )]
51        chain: &Chain,
52        #[gen_stub(
53            override_type(
54                type_repr = "typing.Sequence[nautilus_trader.model.DexType]",
55                imports = ("typing", "nautilus_trader.model"),
56            ),
57        )]
58        dex_ids: Vec<DexType>,
59        http_rpc_url: String,
60        rpc_requests_per_second: Option<u32>,
61        multicall_calls_per_rpc_request: Option<u32>,
62        wss_rpc_url: Option<String>,
63        use_hypersync_for_live_data: bool,
64        from_block: Option<u64>,
65        pool_filters: Option<DexPoolFilters>,
66        #[gen_stub(
67            override_type(
68                type_repr = "typing.Optional[nautilus_trader.infrastructure.PostgresConnectOptions]",
69                imports = ("typing", "nautilus_trader.infrastructure"),
70            ),
71        )]
72        postgres_cache_database_config: Option<PostgresConnectOptions>,
73    ) -> Self {
74        Self::new(
75            Arc::new(chain.clone()),
76            dex_ids,
77            http_rpc_url,
78            rpc_requests_per_second,
79            multicall_calls_per_rpc_request,
80            wss_rpc_url,
81            use_hypersync_for_live_data,
82            from_block,
83            pool_filters,
84            postgres_cache_database_config,
85        )
86    }
87
88    /// Returns the chain configuration.
89    #[getter]
90    #[gen_stub(
91        override_return_type(
92            type_repr = "nautilus_trader.model.Chain",
93            imports = ("nautilus_trader.model",),
94        ),
95    )]
96    fn chain(&self) -> Chain {
97        (*self.chain).clone()
98    }
99
100    /// Returns the HTTP RPC URL.
101    #[getter]
102    fn http_rpc_url(&self) -> String {
103        self.http_rpc_url.clone()
104    }
105
106    /// Returns the WebSocket RPC URL.
107    #[getter]
108    fn wss_rpc_url(&self) -> Option<String> {
109        self.wss_rpc_url.clone()
110    }
111
112    /// Returns the RPC requests per second limit.
113    #[getter]
114    const fn rpc_requests_per_second(&self) -> Option<u32> {
115        self.rpc_requests_per_second
116    }
117
118    /// Returns whether to use HyperSync for live data.
119    #[getter]
120    const fn use_hypersync_for_live_data(&self) -> bool {
121        self.use_hypersync_for_live_data
122    }
123
124    /// Returns the starting block for sync.
125    #[getter]
126    #[allow(clippy::wrong_self_convention)]
127    const fn from_block(&self) -> Option<u64> {
128        self.from_block
129    }
130
131    /// Returns a string representation of the configuration.
132    fn __repr__(&self) -> String {
133        format!(
134            "BlockchainDataClientConfig(chain={:?}, http_rpc_url={}, wss_rpc_url={:?}, use_hypersync_for_live_data={}, from_block={:?})",
135            self.chain.name,
136            self.http_rpc_url,
137            self.wss_rpc_url,
138            self.use_hypersync_for_live_data,
139            self.from_block
140        )
141    }
142}