Skip to content

Commit

Permalink
feat: update celestia integration
Browse files Browse the repository at this point in the history
  • Loading branch information
AllFi committed Nov 27, 2024
1 parent ec1c755 commit 4beda6d
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 247 deletions.
414 changes: 305 additions & 109 deletions da-indexer/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions da-indexer/da-indexer-logic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ sea-orm = { version = "0.12.2", features = [
"postgres-array",
] }

celestia-rpc = "0.1.1"
celestia-types = "0.1.1"
celestia-rpc = "0.7.0"
celestia-types = "0.7.0"
tokio = { version = "1", features = ["full"] }
hex = "0.4.3"
lazy_static = "1.4.0"
sha3 = "0.10.8"
futures = "0.3"
jsonrpsee = { version = "0.20", features = ["client-core", "macros"] }
jsonrpsee = { version = "0.24.7", features = ["client-core", "macros"] }
serde = "1.0"
serde_with = "3.6.1"
serde_json = "1.0.96"
async-trait = "0.1"
http = "0.2.9"
http = "1.1.0"
tonic = { version = "0.7", features = ["tls", "tls-roots"] }
prost = "0.10"
ethabi = "18.0"
Expand Down
22 changes: 13 additions & 9 deletions da-indexer/da-indexer-logic/src/celestia/da.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use crate::celestia::rpc_client::ShareV2Client;
use crate::{
celestia::{repository::blobs, rpc_client},
indexer::{Job, DA},
};
use anyhow::Result;
use async_trait::async_trait;
use celestia_rpc::{Client, HeaderClient, ShareClient};
use celestia_rpc::{Client, HeaderClient};
use celestia_types::{Blob, ExtendedHeader};
use sea_orm::{DatabaseConnection, TransactionTrait};
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};

use crate::{
celestia::{repository::blobs, rpc_client},
indexer::{Job, DA},
};

use super::{job::CelestiaJob, parser, repository::blocks, settings::IndexerSettings};

pub struct CelestiaDA {
Expand Down Expand Up @@ -53,12 +53,16 @@ impl CelestiaDA {
}

async fn get_blobs_by_height(&self, height: u64) -> Result<(ExtendedHeader, Vec<Blob>)> {
// TODO: it seems possible to avoid this request with new Celestia API
let header = self.client.header_get_by_height(height).await?;
let mut blobs = vec![];

let mut blobs = vec![];
if parser::maybe_contains_blobs(&header.dah) {
let eds = self.client.share_get_eds(&header).await?;
blobs = parser::parse_eds(&eds, header.dah.square_len())?;
let eds = self
.client
.share_get_eds_v2(height, header.header.version.app)
.await?;
blobs = parser::parse_eds(&eds, header.header.version.app)?;
}

Ok((header, blobs))
Expand Down
83 changes: 10 additions & 73 deletions da-indexer/da-indexer-logic/src/celestia/parser.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::{Error, Result};
use anyhow::Result;
use celestia_types::{
blob::Blob, consts::appconsts, nmt::Namespace, Commitment, DataAvailabilityHeader,
ExtendedDataSquare, Share,
blob::Blob, nmt::Namespace, AppVersion, DataAvailabilityHeader, ExtendedDataSquare,
};

lazy_static! {
Expand All @@ -17,81 +16,19 @@ lazy_static! {

/// Checks if the DataAvailabilityHeader might contain blobs.
pub fn maybe_contains_blobs(dah: &DataAvailabilityHeader) -> bool {
dah.row_roots.iter().any(|row| {
dah.row_roots().iter().any(|row| {
*PAY_FOR_BLOB_NAMESPACE >= row.min_namespace().into()
&& *PAY_FOR_BLOB_NAMESPACE <= row.max_namespace().into()
})
}

/// Extracts blobs from the ExtendedDataSquare.
/// The format described here: https://github.com/celestiaorg/celestia-app/blob/main/specs/src/specs/shares.md
pub fn parse_eds(eds: &ExtendedDataSquare, width: usize) -> Result<Vec<Blob>> {
// sanity check
if width * width != eds.data_square.len() {
return Err(Error::msg("data square length mismatch"));
}
pub fn parse_eds(eds: &ExtendedDataSquare, app_version: u64) -> Result<Vec<Blob>> {
let app_version = AppVersion::from_u64(app_version)
.ok_or_else(|| anyhow::anyhow!("invalid or unsupported app_version: {app_version}"))?;

let mut blobs: Vec<Blob> = vec![];
let mut sequence_length = 0;
let mut parsed_length = 0;

for row in eds.data_square.chunks(width).take(width / 2) {
for share in row.iter().take(width / 2) {
let share = Share::from_raw(share)?;
let ns = share.namespace();

if ns == *TAIL_PADDING_NAMESPACE {
break;
}

if ns.is_reserved_on_celestia() {
continue;
}

let info_byte = share.info_byte();

let mut share_data;
if info_byte.is_sequence_start() {
assert!(parsed_length == sequence_length);

sequence_length = share.sequence_length().unwrap() as usize;
parsed_length = 0;

if sequence_length == 0
&& blobs.last().is_some()
&& blobs.last().unwrap().namespace == ns
{
// Namespace Padding Share, should be ignored
continue;
}

blobs.push(Blob {
namespace: ns,
data: vec![0; sequence_length],
share_version: info_byte.version(),
commitment: Commitment([0; 32]),
});

// first share: skip info byte and sequence length
share_data = &share.data()[1 + appconsts::SEQUENCE_LEN_BYTES..];
} else {
// continuation share: skip info byte
share_data = &share.data()[1..];
}

let data_length = share_data.len().min(sequence_length - parsed_length);
share_data = &share_data[..data_length];

let last_blob = blobs.last_mut().unwrap();
last_blob.data[parsed_length..(parsed_length + data_length)]
.copy_from_slice(share_data);
parsed_length += data_length;

if parsed_length == sequence_length {
last_blob.commitment =
Commitment::from_blob(ns, info_byte.version(), &last_blob.data)?;
}
}
}
Ok(blobs)
Blob::reconstruct_all(eds.data_square(), app_version).map_err(|err| {
tracing::error!("failed to parse EDS: {:?}", err);
anyhow::anyhow!(err)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub async fn upsert_many<C: ConnectionTrait>(
blobs: Vec<CelestiaBlob>,
) -> Result<(), anyhow::Error> {
let blobs = blobs.into_iter().map(|blob| {
// TODO: do we need to store blob index?
let model = Model {
id: compute_id(height, &blob.commitment.0),
height: height as i64,
Expand Down
52 changes: 50 additions & 2 deletions da-indexer/da-indexer-logic/src/celestia/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use celestia_rpc::{Error, Result};
use std::future::Future;

use celestia_rpc::Error;
use celestia_types::{AppVersion, ExtendedDataSquare};
use http::{header, HeaderValue};
use jsonrpsee::{
core::client::{self, ClientT},
http_client::{HeaderMap, HttpClientBuilder},
ws_client::WsClientBuilder,
};
Expand All @@ -12,7 +16,7 @@ pub async fn new_celestia_client(
auth_token: Option<&str>,
max_request_size: u32,
max_response_size: u32,
) -> Result<celestia_rpc::Client> {
) -> celestia_rpc::Result<celestia_rpc::Client> {
let mut headers = HeaderMap::new();

if let Some(token) = auth_token {
Expand Down Expand Up @@ -42,3 +46,47 @@ pub async fn new_celestia_client(

Ok(client)
}

// celestia_rpc::Client doesn't support new version of share.GetEDS method
// so we need to implement it manually
pub mod rpc {
use celestia_types::eds::RawExtendedDataSquare;
use jsonrpsee::proc_macros::rpc;

#[rpc(client)]
pub trait ShareV2 {
#[method(name = "share.GetEDS")]
async fn share_get_eds_v2(
&self,
height: u64,
) -> Result<RawExtendedDataSquare, client::Error>;
}
}

pub trait ShareV2Client: ClientT {
/// GetEDS gets the full EDS identified by the given root.
fn share_get_eds_v2<'a, 'b, 'fut>(
&'a self,
height: u64,
app_version: u64,
) -> impl Future<Output = Result<ExtendedDataSquare, client::Error>> + Send + 'fut
where
'a: 'fut,
'b: 'fut,
Self: Sized + Sync + 'fut,
{
async move {
let app_version = AppVersion::from_u64(app_version).ok_or_else(|| {
let e = format!("Invalid or unsupported AppVersion: {app_version}");
client::Error::Custom(e)
})?;

let raw_eds = rpc::ShareV2Client::share_get_eds_v2(self, height).await?;

ExtendedDataSquare::from_raw(raw_eds, app_version)
.map_err(|e| client::Error::Custom(e.to_string()))
}
}
}

impl<T> ShareV2Client for T where T: ClientT {}
14 changes: 12 additions & 2 deletions da-indexer/da-indexer-logic/src/celestia/tests/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use celestia_types::{nmt::Namespace, Blob as CelestiaBlob, Commitment};
use celestia_types::{
consts::appconsts::subtree_root_threshold, nmt::Namespace, AppVersion, Blob as CelestiaBlob,
Commitment,
};

use crate::celestia::{
repository::{blobs, blocks},
Expand Down Expand Up @@ -49,12 +52,19 @@ fn celestia_blob(seed: u32) -> CelestiaBlob {
Namespace::new(0, &[&[0_u8; 18], &sha3("namespace", seed)[..10]].concat()).unwrap();
let data = sha3("data", seed).to_vec();
let share_version = 0;
let commitment = Commitment::from_blob(namespace, share_version, &data).unwrap();
let commitment = Commitment::from_blob(
namespace,
&data,
share_version,
subtree_root_threshold(AppVersion::latest()),
)
.unwrap();
CelestiaBlob {
namespace,
data,
share_version,
commitment,
index: None,
}
}

Expand Down

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion da-indexer/da-indexer-logic/src/celestia/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod blobs;
pub mod blocks;
pub mod l2_router;
pub mod parser;

use blockscout_service_launcher::test_database::TestDbGuard;

Expand Down
45 changes: 0 additions & 45 deletions da-indexer/da-indexer-logic/src/celestia/tests/parser.rs

This file was deleted.

0 comments on commit 4beda6d

Please sign in to comment.