Skip to content

Commit 96b7b0e

Browse files
committed
ethereum: Encapsulate adapter field and use HashMap for health lookups
- Make `adapter` field private on EthereumNetworkAdapter, add getter - Replace Vec-based health checker lookup with HashMap<String, Arc<Health>> for O(1) lookups instead of O(n*m) - Remove redundant empty check in select_weighted_adapter; WeightedIndex already returns Err for empty input, falling through to random selection - Replace struct literal construction in tests with ::new() calls - Add explicit assertions that health scores start at 1.0
1 parent 419aeb7 commit 96b7b0e

1 file changed

Lines changed: 75 additions & 68 deletions

File tree

chain/ethereum/src/network.rs

Lines changed: 75 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use graph::prelude::rand::{
1414
Rng,
1515
};
1616
use itertools::Itertools;
17+
use std::collections::HashMap;
1718
use std::sync::Arc;
1819

1920
pub use graph::impl_slog_value;
@@ -29,7 +30,7 @@ pub const DEFAULT_ADAPTER_ERROR_RETEST_PERCENT: f64 = 0.2;
2930
pub struct EthereumNetworkAdapter {
3031
endpoint_metrics: Arc<EndpointMetrics>,
3132
pub capabilities: NodeCapabilities,
32-
pub adapter: Arc<EthereumAdapter>,
33+
adapter: Arc<EthereumAdapter>,
3334
/// The maximum number of times this adapter can be used. We use the
3435
/// strong_count on `adapter` to determine whether the adapter is above
3536
/// that limit. That's a somewhat imprecise but convenient way to
@@ -70,6 +71,10 @@ impl EthereumNetworkAdapter {
7071
}
7172
}
7273

74+
pub fn adapter(&self) -> &Arc<EthereumAdapter> {
75+
&self.adapter
76+
}
77+
7378
#[cfg(debug_assertions)]
7479
fn is_call_only(&self) -> bool {
7580
self.adapter.is_call_only()
@@ -97,7 +102,7 @@ pub struct EthereumNetworkAdapters {
97102
// Percentage of request that should be used to retest errored adapters.
98103
retest_percent: f64,
99104
weighted: bool,
100-
health_checkers: Vec<Arc<Health>>,
105+
health_checkers: HashMap<String, Arc<Health>>,
101106
}
102107

103108
impl EthereumNetworkAdapters {
@@ -108,7 +113,7 @@ impl EthereumNetworkAdapters {
108113
call_only_adapters: vec![],
109114
retest_percent: DEFAULT_ADAPTER_ERROR_RETEST_PERCENT,
110115
weighted: false,
111-
health_checkers: vec![],
116+
health_checkers: HashMap::new(),
112117
}
113118
}
114119

@@ -135,7 +140,7 @@ impl EthereumNetworkAdapters {
135140
ProviderCheckStrategy::MarkAsValid,
136141
);
137142

138-
Self::new(chain_id, provider, call_only, None, false, vec![])
143+
Self::new(chain_id, provider, call_only, None, false, HashMap::new())
139144
}
140145

141146
pub fn new(
@@ -144,7 +149,7 @@ impl EthereumNetworkAdapters {
144149
call_only_adapters: Vec<EthereumNetworkAdapter>,
145150
retest_percent: Option<f64>,
146151
weighted: bool,
147-
health_checkers: Vec<Arc<Health>>,
152+
health_checkers: HashMap<String, Arc<Health>>,
148153
) -> Self {
149154
#[cfg(debug_assertions)]
150155
call_only_adapters.iter().for_each(|a| {
@@ -233,7 +238,7 @@ impl EthereumNetworkAdapters {
233238
.max_by_key(|a| a.current_error_count())
234239
.filter(|a| a.current_error_count() > 0)
235240
{
236-
return Ok(most_errored.adapter.clone());
241+
return Ok(most_errored.adapter().clone());
237242
}
238243
}
239244

@@ -267,29 +272,21 @@ impl EthereumNetworkAdapters {
267272
input: &[&EthereumNetworkAdapter],
268273
required_capabilities: &NodeCapabilities,
269274
) -> Result<Arc<EthereumAdapter>, Error> {
270-
if input.is_empty() {
271-
return Err(anyhow!(
272-
"A matching Ethereum network with {:?} was not found.",
273-
required_capabilities
274-
));
275-
}
276-
277275
let weights: Vec<_> = input
278276
.iter()
279277
.map(|a| {
280-
let health_checker = self
278+
let score = self
281279
.health_checkers
282-
.iter()
283-
.find(|h| h.provider() == a.provider());
284-
let score = health_checker.map_or(1.0, |h| h.score());
280+
.get(a.provider())
281+
.map_or(1.0, |h| h.score());
285282
a.weight * score
286283
})
287284
.collect();
288285
if let Ok(dist) = WeightedIndex::new(&weights) {
289286
let idx = dist.sample(&mut rand::rng());
290-
Ok(input[idx].adapter.clone())
287+
Ok(input[idx].adapter().clone())
291288
} else {
292-
// Fallback to random selection if weights are invalid
289+
// Fallback to random selection if weights are invalid (e.g., all zeros or empty)
293290
Self::select_random_adapter(input, required_capabilities)
294291
}
295292
}
@@ -303,12 +300,9 @@ impl EthereumNetworkAdapters {
303300
input: &[&EthereumNetworkAdapter],
304301
required_capabilities: &NodeCapabilities,
305302
) -> Result<Arc<EthereumAdapter>, Error> {
306-
let choices = input
307-
.iter()
308-
.copied()
309-
.choose_multiple(&mut rand::rng(), 3);
303+
let choices = input.iter().copied().choose_multiple(&mut rand::rng(), 3);
310304
if let Some(adapter) = choices.iter().min_by_key(|a| a.current_error_count()) {
311-
Ok(adapter.adapter.clone())
305+
Ok(adapter.adapter().clone())
312306
} else {
313307
Err(anyhow!(
314308
"A matching Ethereum network with {:?} was not found.",
@@ -349,7 +343,7 @@ impl EthereumNetworkAdapters {
349343
.await
350344
.map(|mut adapters| adapters.next())
351345
.unwrap_or_default()
352-
.map(|ethereum_network_adapter| ethereum_network_adapter.adapter.clone())
346+
.map(|ethereum_network_adapter| ethereum_network_adapter.adapter().clone())
353347
}
354348

355349
/// call_or_cheapest will bypass ProviderManagers' validation in order to remain non async.
@@ -381,21 +375,21 @@ impl EthereumNetworkAdapters {
381375
let adapters = self
382376
.call_only_adapters
383377
.iter()
384-
.min_by_key(|x| Arc::strong_count(&x.adapter))
378+
.min_by_key(|x| Arc::strong_count(x.adapter()))
385379
.ok_or(anyhow!("no available call only endpoints"))?;
386380

387381
// TODO: This will probably blow up a lot sooner than [limit] amount of
388382
// subgraphs, since we probably use a few instances.
389383
if !adapters
390384
.limit
391-
.has_capacity(Arc::strong_count(&adapters.adapter))
385+
.has_capacity(Arc::strong_count(adapters.adapter()))
392386
{
393387
bail!("call only adapter has reached the concurrency limit");
394388
}
395389

396390
// Cloning here ensure we have the correct count at any given time, if we return a reference it can be cloned later
397391
// which could cause a high number of endpoints to be given away before accounting for them.
398-
Ok(Some(adapters.adapter.clone()))
392+
Ok(Some(adapters.adapter().clone()))
399393
}
400394
}
401395

@@ -413,6 +407,7 @@ mod tests {
413407
use graph::{
414408
endpoint::EndpointMetrics, firehose::SubgraphLimit, prelude::MetricsRegistry, url::Url,
415409
};
410+
use std::collections::HashMap;
416411
use std::sync::Arc;
417412

418413
use crate::{
@@ -823,26 +818,26 @@ mod tests {
823818
SubgraphLimit::Unlimited
824819
};
825820

826-
no_retest_adapters.push(EthereumNetworkAdapter {
827-
endpoint_metrics: metrics.clone(),
828-
capabilities: NodeCapabilities {
821+
no_retest_adapters.push(EthereumNetworkAdapter::new(
822+
metrics.clone(),
823+
NodeCapabilities {
829824
archive: true,
830825
traces: false,
831826
},
832-
adapter: adapter.clone(),
833-
limit: limit.clone(),
834-
weight: 1.0,
835-
});
836-
always_retest_adapters.push(EthereumNetworkAdapter {
837-
endpoint_metrics: metrics.clone(),
838-
capabilities: NodeCapabilities {
827+
adapter.clone(),
828+
limit.clone(),
829+
1.0,
830+
));
831+
always_retest_adapters.push(EthereumNetworkAdapter::new(
832+
metrics.clone(),
833+
NodeCapabilities {
839834
archive: true,
840835
traces: false,
841836
},
842837
adapter,
843838
limit,
844-
weight: 1.0,
845-
});
839+
1.0,
840+
));
846841
});
847842
let manager = ProviderManager::<EthereumNetworkAdapter>::new(
848843
logger,
@@ -864,7 +859,7 @@ mod tests {
864859
vec![],
865860
Some(0f64),
866861
false,
867-
vec![],
862+
HashMap::new(),
868863
);
869864

870865
let always_retest_adapters = EthereumNetworkAdapters::new(
@@ -873,7 +868,7 @@ mod tests {
873868
vec![],
874869
Some(1f64),
875870
false,
876-
vec![],
871+
HashMap::new(),
877872
);
878873

879874
assert_eq!(
@@ -921,36 +916,35 @@ mod tests {
921916
metrics.report_for_test(&ProviderName::from(error_provider), false);
922917

923918
let mut no_retest_adapters = vec![];
924-
no_retest_adapters.push(EthereumNetworkAdapter {
925-
endpoint_metrics: metrics.clone(),
926-
capabilities: NodeCapabilities {
919+
no_retest_adapters.push(EthereumNetworkAdapter::new(
920+
metrics.clone(),
921+
NodeCapabilities {
927922
archive: true,
928923
traces: false,
929924
},
930-
adapter: fake_adapter(&logger, error_provider, &provider_metrics, &metrics, false)
931-
.await,
932-
limit: SubgraphLimit::Unlimited,
933-
weight: 1.0,
934-
});
925+
fake_adapter(&logger, error_provider, &provider_metrics, &metrics, false).await,
926+
SubgraphLimit::Unlimited,
927+
1.0,
928+
));
935929

936930
let mut always_retest_adapters = vec![];
937-
always_retest_adapters.push(EthereumNetworkAdapter {
938-
endpoint_metrics: metrics.clone(),
939-
capabilities: NodeCapabilities {
931+
always_retest_adapters.push(EthereumNetworkAdapter::new(
932+
metrics.clone(),
933+
NodeCapabilities {
940934
archive: true,
941935
traces: false,
942936
},
943-
adapter: fake_adapter(
937+
fake_adapter(
944938
&logger,
945939
no_error_provider,
946940
&provider_metrics,
947941
&metrics,
948942
false,
949943
)
950944
.await,
951-
limit: SubgraphLimit::Unlimited,
952-
weight: 1.0,
953-
});
945+
SubgraphLimit::Unlimited,
946+
1.0,
947+
));
954948
let manager = ProviderManager::<EthereumNetworkAdapter>::new(
955949
logger.clone(),
956950
always_retest_adapters
@@ -966,7 +960,7 @@ mod tests {
966960
vec![],
967961
Some(1f64),
968962
false,
969-
vec![],
963+
HashMap::new(),
970964
);
971965

972966
assert_eq!(
@@ -996,7 +990,7 @@ mod tests {
996990
vec![],
997991
Some(0f64),
998992
false,
999-
vec![],
993+
HashMap::new(),
1000994
);
1001995
assert_eq!(
1002996
no_retest_adapters
@@ -1011,31 +1005,31 @@ mod tests {
10111005
);
10121006

10131007
let mut no_available_adapter = vec![];
1014-
no_available_adapter.push(EthereumNetworkAdapter {
1015-
endpoint_metrics: metrics.clone(),
1016-
capabilities: NodeCapabilities {
1008+
no_available_adapter.push(EthereumNetworkAdapter::new(
1009+
metrics.clone(),
1010+
NodeCapabilities {
10171011
archive: true,
10181012
traces: false,
10191013
},
1020-
adapter: fake_adapter(
1014+
fake_adapter(
10211015
&logger,
10221016
no_error_provider,
10231017
&provider_metrics,
10241018
&metrics,
10251019
false,
10261020
)
10271021
.await,
1028-
limit: SubgraphLimit::Disabled,
1029-
weight: 1.0,
1030-
});
1022+
SubgraphLimit::Disabled,
1023+
1.0,
1024+
));
10311025
let manager = ProviderManager::new(
10321026
logger,
10331027
vec![(chain_id.clone(), no_available_adapter.to_vec())].into_iter(),
10341028
ProviderCheckStrategy::MarkAsValid,
10351029
);
10361030

10371031
let no_available_adapter =
1038-
EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false, vec![]);
1032+
EthereumNetworkAdapters::new(chain_id, manager, vec![], None, false, HashMap::new());
10391033
let res = no_available_adapter
10401034
.cheapest_with(&NodeCapabilities {
10411035
archive: true,
@@ -1141,7 +1135,20 @@ mod tests {
11411135
let health_checker1 = Arc::new(Health::new(adapter1.clone()));
11421136
let health_checker2 = Arc::new(Health::new(adapter2.clone()));
11431137

1144-
adapters.health_checkers = vec![health_checker1.clone(), health_checker2.clone()];
1138+
// Verify health checkers start with a perfect score of 1.0
1139+
assert_eq!(health_checker1.score(), 1.0);
1140+
assert_eq!(health_checker2.score(), 1.0);
1141+
1142+
let mut health_map = HashMap::new();
1143+
health_map.insert(
1144+
health_checker1.provider().to_string(),
1145+
health_checker1.clone(),
1146+
);
1147+
health_map.insert(
1148+
health_checker2.provider().to_string(),
1149+
health_checker2.clone(),
1150+
);
1151+
adapters.health_checkers = health_map;
11451152
adapters.weighted = true;
11461153

11471154
let mut adapter1_count = 0;

0 commit comments

Comments
 (0)