Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/sui-indexer-alt-framework/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use url::Url;

use crate::Indexer;
use crate::IndexerArgs;
use crate::Result;
use crate::ingestion::ClientArgs;
use crate::ingestion::IngestionConfig;
use crate::metrics::IndexerMetrics;
Expand Down Expand Up @@ -145,7 +144,7 @@ impl IndexerClusterBuilder {
/// - Required fields are missing
/// - Database connection cannot be established
/// - Metrics registry creation fails
pub async fn build(self) -> Result<IndexerCluster> {
pub async fn build(self) -> anyhow::Result<IndexerCluster> {
let database_url = self.database_url.context("database_url is required")?;

tracing_subscriber::fmt::init();
Expand Down Expand Up @@ -191,7 +190,7 @@ impl IndexerCluster {
/// Starts the indexer and metrics service, returning a handle over the service's tasks.
/// The service will exit when the indexer has finished processing all the checkpoints it was
/// configured to process, or when it is instructed to shut down.
pub async fn run(self) -> Result<Service> {
pub async fn run(self) -> anyhow::Result<Service> {
let s_indexer = self.indexer.run().await?;
let s_metrics = self.metrics.run().await?;

Expand Down
40 changes: 11 additions & 29 deletions crates/sui-indexer-alt-framework/src/ingestion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ use crate::ingestion::error::Result;
use crate::ingestion::ingestion_client::CheckpointEnvelope;
use crate::ingestion::ingestion_client::IngestionClient;
use crate::ingestion::ingestion_client::IngestionClientArgs;
use crate::ingestion::streaming_client::CheckpointStream;
use crate::ingestion::ingestion_client::retry_transient_with_slow_monitor;
use crate::ingestion::streaming_client::CheckpointStreamingClient;
use crate::ingestion::streaming_client::GrpcStreamingClient;
use crate::ingestion::streaming_client::StreamingClientArgs;
use crate::metrics::IngestionMetrics;

use self::ingestion_client::retry_transient_with_slow_monitor;

mod broadcaster;
mod byte_count;
pub(crate) mod decode;
Expand All @@ -44,8 +42,6 @@ mod test_utils;

pub(crate) const MAX_GRPC_MESSAGE_SIZE_BYTES: usize = 128 * 1024 * 1024;

const OPERATION: &str = "latest_checkpoint_number";

/// Combined arguments for both ingestion and streaming clients.
/// This is a convenience wrapper that flattens both argument types.
#[derive(clap::Args, Clone, Debug, Default)]
Expand Down Expand Up @@ -151,6 +147,8 @@ impl IngestionService {
&self.ingestion_client
}

/// Return the latest checkpoint number known to the ingestion service, preferably via the
/// streaming client, and failing that via the ingestion client.
pub async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
let streaming_client = self.streaming_client.clone();
let ingestion_client = self.ingestion_client.clone();
Expand All @@ -163,8 +161,9 @@ impl IngestionService {
.map_err(|e| backoff::Error::transient(Error::LatestCheckpointError(e)))
}
};

Ok(retry_transient_with_slow_monitor(
OPERATION,
"latest_checkpoint_number",
future,
&self.metrics.ingested_latest_checkpoint_latency,
)
Expand Down Expand Up @@ -269,39 +268,22 @@ impl Default for IngestionConfig {
}
}

/// Try to get the latest checkpoint number from the streaming client first, falling back to
/// the ingestion client if the streaming client is unavailable or fails.
async fn latest_checkpoint_number(
streaming_client: &mut Option<impl CheckpointStreamingClient>,
streaming_client: &mut Option<impl CheckpointStreamingClient + Send>,
ingestion_client: &IngestionClient,
) -> anyhow::Result<u64> {
if let Some(streaming_client) = streaming_client.as_mut() {
match streaming_client.connect().await {
Ok(CheckpointStream { mut stream, .. }) => match stream.peek().await {
Some(Ok(checkpoint)) => {
return Ok(checkpoint.summary.sequence_number);
}
Some(Err(e)) => {
warn!(
operation = OPERATION,
"Failed to peek checkpoint stream: {e}"
);
}
None => {
warn!(
operation = OPERATION,
"Checkpoint stream ended unexpectedly"
);
}
},
match streaming_client.latest_checkpoint_number().await {
Ok(checkpoint_number) => return Ok(checkpoint_number),
Err(e) => {
warn!(
operation = OPERATION,
"Failed to connect streaming client: {e}"
operation = "latest_checkpoint_number",
"Failed to get latest checkpoint number from streaming client: {e}"
);
}
}
}

ingestion_client.latest_checkpoint_number().await
}

Expand Down
10 changes: 5 additions & 5 deletions crates/sui-indexer-alt-framework/src/ingestion/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::str::FromStr;

use anyhow::Context as _;
use anyhow::anyhow;
use async_trait::async_trait;
use prost_types::FieldMask;
Expand Down Expand Up @@ -60,11 +61,10 @@ impl IngestionClientTrait for RpcClient {
}

async fn latest_checkpoint_number(&self) -> anyhow::Result<u64> {
let response = get_service_info_request(self).await?;
let Some(latest_checkpoint_number) = response.checkpoint_height else {
return Err(anyhow!("Checkpoint height not found {response:?}"));
};
Ok(latest_checkpoint_number)
get_service_info_request(self)
.await?
.checkpoint_height
.context("Checkpoint height not found")
}
}

Expand Down
24 changes: 13 additions & 11 deletions crates/sui-indexer-alt-framework/src/ingestion/store_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,8 @@ use crate::ingestion::ingestion_client::CheckpointResult;
use crate::ingestion::ingestion_client::IngestionClientTrait;
use crate::types::full_checkpoint_content::Checkpoint;

/// Disable object_store's internal retries so that transient errors (429s, 5xx) propagate
/// immediately to the framework's own retry logic.
pub(super) fn retry_config() -> RetryConfig {
RetryConfig {
max_retries: 0,
..Default::default()
}
}
// from sui-indexer-alt-object-store
pub(crate) const WATERMARK_PATH: &str = "_metadata/watermark/checkpoint_blob.json";

pub struct StoreIngestionClient {
store: Arc<dyn ObjectStore>,
Expand All @@ -37,9 +31,6 @@ pub struct StoreIngestionClient {
total_ingested_bytes: Option<IntCounter>,
}

// from sui-indexer-alt-object-store
pub(crate) const WATERMARK_PATH: &str = "_metadata/watermark/checkpoint_blob.json";

#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct ObjectStoreWatermark {
pub checkpoint_hi_inclusive: u64,
Expand Down Expand Up @@ -82,8 +73,10 @@ impl StoreIngestionClient {
Err(Error::NotFound { .. }) => return Ok(None),
Err(e) => return Err(e).context(format!("error reading {WATERMARK_PATH}")),
};

let watermark: ObjectStoreWatermark =
serde_json::from_slice(&bytes).context(format!("error parsing {WATERMARK_PATH}"))?;

Ok(Some(watermark.checkpoint_hi_inclusive))
}
}
Expand Down Expand Up @@ -126,6 +119,15 @@ impl IngestionClientTrait for StoreIngestionClient {
}
}

/// Disable object_store's internal retries so that transient errors (429s, 5xx) propagate
/// immediately to the framework's own retry logic.
pub(super) fn retry_config() -> RetryConfig {
RetryConfig {
max_retries: 0,
..Default::default()
}
}

#[cfg(test)]
pub(crate) mod tests {
use axum::http::StatusCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ pub struct CheckpointStream {
pub trait CheckpointStreamingClient {
/// Returns the CheckpointStream and chain id.
async fn connect(&mut self) -> Result<CheckpointStream>;

/// Returns the latest checkpoint number available from the streaming source.
async fn latest_checkpoint_number(&mut self) -> Result<u64> {
let mut stream = self.connect().await?;

match stream.stream.next().await {
Some(Ok(checkpoint)) => Ok(checkpoint.summary.sequence_number),
Some(Err(e)) => Err(e),
None => Err(Error::StreamingError(anyhow!("Stream ended unexpectedly"))),
}
}
}

#[derive(clap::Args, Clone, Debug, Default)]
Expand Down
24 changes: 11 additions & 13 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::pipeline::sequential::SequentialConfig;
use crate::pipeline::sequential::{self};
use crate::service::Service;

pub use anyhow::Result;
pub use sui_field_count::FieldCount;
pub use sui_futures::service;
/// External users access the store trait through framework::store
Expand Down Expand Up @@ -210,7 +209,7 @@ impl<S: Store> Indexer<S> {
ingestion_config: IngestionConfig,
metrics_prefix: Option<&str>,
registry: &Registry,
) -> Result<Self> {
) -> anyhow::Result<Self> {
let IndexerArgs {
first_checkpoint,
last_checkpoint,
Expand All @@ -224,8 +223,7 @@ impl<S: Store> Indexer<S> {
IngestionService::new(client_args, ingestion_config, metrics_prefix, registry)?;

let latest_checkpoint = ingestion_service.latest_checkpoint_number().await?;

info!(latest_checkpoint, "Ingestion store state");
info!(latest_checkpoint);

Ok(Self {
store,
Expand Down Expand Up @@ -288,7 +286,7 @@ impl<S: Store> Indexer<S> {
/// respective watermarks.
///
/// Ingestion will stop after consuming the configured `last_checkpoint` if one is provided.
pub async fn run(self) -> Result<Service> {
pub async fn run(self) -> anyhow::Result<Service> {
if let Some(enabled_pipelines) = self.enabled_pipelines {
ensure!(
enabled_pipelines.is_empty(),
Expand Down Expand Up @@ -332,7 +330,7 @@ impl<S: Store> Indexer<S> {
&mut self,
pipeline_task: String,
retention: Option<u64>,
) -> Result<Option<u64>> {
) -> anyhow::Result<Option<u64>> {
ensure!(
self.added_pipelines.insert(P::NAME),
"Pipeline {:?} already added",
Expand Down Expand Up @@ -388,7 +386,7 @@ impl<S: ConcurrentStore> Indexer<S> {
&mut self,
handler: H,
config: ConcurrentConfig,
) -> Result<()> {
) -> anyhow::Result<()> {
let pipeline_task =
pipeline_task::<S>(H::NAME, self.task.as_ref().map(|t| t.task.as_str()))?;
let retention = config.pruner.as_ref().map(|p| p.retention);
Expand Down Expand Up @@ -425,7 +423,7 @@ impl<T: SequentialStore> Indexer<T> {
&mut self,
handler: H,
config: SequentialConfig,
) -> Result<()> {
) -> anyhow::Result<()> {
if self.task.is_some() {
bail!(
"Sequential pipelines do not support pipeline tasks. \
Expand Down Expand Up @@ -510,7 +508,7 @@ mod tests {
async fn process(
&self,
checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
) -> Result<Vec<Self::Value>> {
) -> anyhow::Result<Vec<Self::Value>> {
let cp_num = checkpoint.summary.sequence_number;

// Wait until the checkpoint is allowed to be processed
Expand Down Expand Up @@ -542,7 +540,7 @@ mod tests {
&self,
batch: &Self::Batch,
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> Result<usize> {
) -> anyhow::Result<usize> {
for value in batch {
conn.0
.commit_data(Self::NAME, value.0, vec![value.0])
Expand All @@ -563,7 +561,7 @@ mod tests {
async fn process(
&self,
checkpoint: &Arc<sui_types::full_checkpoint_content::Checkpoint>,
) -> Result<Vec<Self::Value>> {
) -> anyhow::Result<Vec<Self::Value>> {
Ok(vec![MockValue(checkpoint.summary.sequence_number)])
}
}
Expand All @@ -586,7 +584,7 @@ mod tests {
&self,
batch: &Self::Batch,
conn: &mut <Self::Store as Store>::Connection<'a>,
) -> Result<usize> {
) -> anyhow::Result<usize> {
for value in batch {
conn.0
.commit_data(Self::NAME, value.0, vec![value.0])
Expand All @@ -609,7 +607,7 @@ mod tests {
&self,
_batch: &Self::Batch,
_conn: &mut <Self::Store as Store>::Connection<'a>,
) -> Result<usize> {
) -> anyhow::Result<usize> {
Ok(1)
}
}
Expand Down
Loading