From d8d98681e3560fbbee62b0d0ec6b4a14d390e380 Mon Sep 17 00:00:00 2001 From: Jason Adams Date: Wed, 1 May 2024 09:32:30 +1000 Subject: [PATCH 1/2] DATA-1506 State Diffs POC --- .../EthereumPubSubToBigQueryPipeline.java | 6 + .../ethereum/domain/BalanceDiff.java | 167 ++++++++++++++++ .../ethereum/domain/NonceDiff.java | 167 ++++++++++++++++ .../ethereum/domain/StorageDiff.java | 181 ++++++++++++++++++ .../fns/ConvertBalanceDiffsToTableRowsFn.java | 31 +++ .../fns/ConvertNonceDiffsToTableRowsFn.java | 31 +++ .../fns/ConvertStorageDiffsToTableRowsFn.java | 32 ++++ terraform/main.tf | 2 +- 8 files changed, 616 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/blockchainetl/ethereum/domain/BalanceDiff.java create mode 100644 src/main/java/io/blockchainetl/ethereum/domain/NonceDiff.java create mode 100644 src/main/java/io/blockchainetl/ethereum/domain/StorageDiff.java create mode 100644 src/main/java/io/blockchainetl/ethereum/fns/ConvertBalanceDiffsToTableRowsFn.java create mode 100644 src/main/java/io/blockchainetl/ethereum/fns/ConvertNonceDiffsToTableRowsFn.java create mode 100644 src/main/java/io/blockchainetl/ethereum/fns/ConvertStorageDiffsToTableRowsFn.java diff --git a/src/main/java/io/blockchainetl/ethereum/EthereumPubSubToBigQueryPipeline.java b/src/main/java/io/blockchainetl/ethereum/EthereumPubSubToBigQueryPipeline.java index f2fa61e..88707b5 100644 --- a/src/main/java/io/blockchainetl/ethereum/EthereumPubSubToBigQueryPipeline.java +++ b/src/main/java/io/blockchainetl/ethereum/EthereumPubSubToBigQueryPipeline.java @@ -9,6 +9,9 @@ import io.blockchainetl.ethereum.fns.ConvertTokenTransfersToTableRowsFn; import io.blockchainetl.ethereum.fns.ConvertTokensToTableRowsFn; import io.blockchainetl.ethereum.fns.ConvertTracesToTableRowsFn; +import io.blockchainetl.ethereum.fns.ConvertStorageDiffsToTableRowsFn; +import io.blockchainetl.ethereum.fns.ConvertBalanceDiffsToTableRowsFn; +import io.blockchainetl.ethereum.fns.ConvertNonceDiffsToTableRowsFn; import io.blockchainetl.ethereum.fns.ConvertTransactionsToTableRowsFn; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; @@ -40,6 +43,9 @@ static void runEthereumPipeline(PubSubToBigQueryPipelineOptions options) { entityConfigs.put("logs", ConvertLogsToTableRowsFn.class); entityConfigs.put("token_transfers", ConvertTokenTransfersToTableRowsFn.class); entityConfigs.put("traces", ConvertTracesToTableRowsFn.class); + entityConfigs.put("storage_diffs", ConvertStorageDiffsToTableRowsFn.class); + entityConfigs.put("balance_diffs", ConvertBalanceDiffsToTableRowsFn.class); + entityConfigs.put("nonce_diffs", ConvertNonceDiffsToTableRowsFn.class); entityConfigs.put("contracts", ConvertContractsToTableRowsFn.class); entityConfigs.put("tokens", ConvertTokensToTableRowsFn.class); runPipeline(options,chainConfigs, entityConfigs); diff --git a/src/main/java/io/blockchainetl/ethereum/domain/BalanceDiff.java b/src/main/java/io/blockchainetl/ethereum/domain/BalanceDiff.java new file mode 100644 index 0000000..25871f3 --- /dev/null +++ b/src/main/java/io/blockchainetl/ethereum/domain/BalanceDiff.java @@ -0,0 +1,167 @@ +package io.blockchainetl.ethereum.domain; + +import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; +import org.apache.avro.reflect.Nullable; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.math.BigInteger; + +@DefaultCoder(AvroCoder.class) +@JsonIgnoreProperties(ignoreUnknown = true) +public class BalanceDiff { + + @Nullable + @JsonProperty("address") + private String address; + + @Nullable + @JsonProperty("from_value") + private String fromValue; + + @Nullable + @JsonProperty("to_value") + private String toValue; + + @Nullable + @JsonProperty("transaction_hash") + private String transactionHash; + + @Nullable + @JsonProperty("transaction_index") + private Long transactionIndex; + + @Nullable + @JsonProperty("block_timestamp") + private Long blockTimestamp; + + @Nullable + @JsonProperty("block_number") + private Long blockNumber; + + @Nullable + @JsonProperty("block_hash") + private String blockHash; + + @Nullable + @JsonProperty("chain_id") + private Long chainId; + + public BalanceDiff() {} + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getFromValue() { + return fromValue; + } + + public void setFromValue(String fromValue) { + this.fromValue = fromValue; + } + + public String getToValue() { + return toValue; + } + + public void setToValue(String toValue) { + this.toValue = toValue; + } + + public String getTransactionHash() { + return transactionHash; + } + + public void setTransactionHash(String transactionHash) { + this.transactionHash = transactionHash; + } + + public Long getTransactionIndex() { + return transactionIndex; + } + + public void setTransactionIndex(Long transactionIndex) { + this.transactionIndex = transactionIndex; + } + + public Long getBlockTimestamp() { + return blockTimestamp; + } + + public void setBlockTimestamp(Long blockTimestamp) { + this.blockTimestamp = blockTimestamp; + } + + public Long getBlockNumber() { + return blockNumber; + } + + public void setBlockNumber(Long blockNumber) { + this.blockNumber = blockNumber; + } + + public String getBlockHash() { + return blockHash; + } + + public void setBlockHash(String blockHash) { + this.blockHash = blockHash; + } + + public Long getChainId() { + return chainId; + } + + public void setChainId(Long chainId) { + this.chainId = chainId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BalanceDiff that = (BalanceDiff) o; + return Objects.equal(address, that.address) && + Objects.equal(fromValue, that.fromValue) && + Objects.equal(toValue, that.toValue) && + Objects.equal(transactionHash, that.transactionHash) && + Objects.equal(transactionIndex, that.transactionIndex) && + Objects.equal(blockTimestamp, that.blockTimestamp) && + Objects.equal(blockNumber, that.blockNumber) && + Objects.equal(blockHash, that.blockHash) && + Objects.equal(chainId, that.chainId); + } + + @Override + public int hashCode() { + return Objects.hashCode(address, fromValue, toValue, transactionHash, transactionIndex, blockTimestamp, + blockNumber, blockHash, chainId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("address", address) + .add("fromValue", fromValue) + .add("toValue", toValue) + .add("transactionHash", transactionHash) + .add("transactionIndex", transactionIndex) + .add("blockTimestamp", blockTimestamp) + .add("blockNumber", blockNumber) + .add("blockHash", blockHash) + .add("chainId", chainId) + .toString(); + } +} diff --git a/src/main/java/io/blockchainetl/ethereum/domain/NonceDiff.java b/src/main/java/io/blockchainetl/ethereum/domain/NonceDiff.java new file mode 100644 index 0000000..29b26cd --- /dev/null +++ b/src/main/java/io/blockchainetl/ethereum/domain/NonceDiff.java @@ -0,0 +1,167 @@ +package io.blockchainetl.ethereum.domain; + +import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; +import org.apache.avro.reflect.Nullable; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.math.BigInteger; + +@DefaultCoder(AvroCoder.class) +@JsonIgnoreProperties(ignoreUnknown = true) +public class NonceDiff { + + @Nullable + @JsonProperty("address") + private String address; + + @Nullable + @JsonProperty("from_value") + private String fromValue; + + @Nullable + @JsonProperty("to_value") + private String toValue; + + @Nullable + @JsonProperty("transaction_hash") + private String transactionHash; + + @Nullable + @JsonProperty("transaction_index") + private Long transactionIndex; + + @Nullable + @JsonProperty("block_timestamp") + private Long blockTimestamp; + + @Nullable + @JsonProperty("block_number") + private Long blockNumber; + + @Nullable + @JsonProperty("block_hash") + private String blockHash; + + @Nullable + @JsonProperty("chain_id") + private Long chainId; + + public NonceDiff() {} + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getFromValue() { + return fromValue; + } + + public void setFromValue(String fromValue) { + this.fromValue = fromValue; + } + + public String getToValue() { + return toValue; + } + + public void setToValue(String toValue) { + this.toValue = toValue; + } + + public String getTransactionHash() { + return transactionHash; + } + + public void setTransactionHash(String transactionHash) { + this.transactionHash = transactionHash; + } + + public Long getTransactionIndex() { + return transactionIndex; + } + + public void setTransactionIndex(Long transactionIndex) { + this.transactionIndex = transactionIndex; + } + + public Long getBlockTimestamp() { + return blockTimestamp; + } + + public void setBlockTimestamp(Long blockTimestamp) { + this.blockTimestamp = blockTimestamp; + } + + public Long getBlockNumber() { + return blockNumber; + } + + public void setBlockNumber(Long blockNumber) { + this.blockNumber = blockNumber; + } + + public String getBlockHash() { + return blockHash; + } + + public void setBlockHash(String blockHash) { + this.blockHash = blockHash; + } + + public Long getChainId() { + return chainId; + } + + public void setChainId(Long chainId) { + this.chainId = chainId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NonceDiff that = (NonceDiff) o; + return Objects.equal(address, that.address) && + Objects.equal(fromValue, that.fromValue) && + Objects.equal(toValue, that.toValue) && + Objects.equal(transactionHash, that.transactionHash) && + Objects.equal(transactionIndex, that.transactionIndex) && + Objects.equal(blockTimestamp, that.blockTimestamp) && + Objects.equal(blockNumber, that.blockNumber) && + Objects.equal(blockHash, that.blockHash) && + Objects.equal(chainId, that.chainId); + } + + @Override + public int hashCode() { + return Objects.hashCode(address, fromValue, toValue, transactionHash, transactionIndex, blockTimestamp, + blockNumber, blockHash, chainId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("address", address) + .add("fromValue", fromValue) + .add("toValue", toValue) + .add("transactionHash", transactionHash) + .add("transactionIndex", transactionIndex) + .add("blockTimestamp", blockTimestamp) + .add("blockNumber", blockNumber) + .add("blockHash", blockHash) + .add("chainId", chainId) + .toString(); + } +} diff --git a/src/main/java/io/blockchainetl/ethereum/domain/StorageDiff.java b/src/main/java/io/blockchainetl/ethereum/domain/StorageDiff.java new file mode 100644 index 0000000..fc7359c --- /dev/null +++ b/src/main/java/io/blockchainetl/ethereum/domain/StorageDiff.java @@ -0,0 +1,181 @@ +package io.blockchainetl.ethereum.domain; + +import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; +import org.apache.avro.reflect.Nullable; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.annotate.JsonProperty; + +import java.math.BigInteger; + +@DefaultCoder(AvroCoder.class) +@JsonIgnoreProperties(ignoreUnknown = true) +public class StorageDiff { + + @Nullable + @JsonProperty("address") + private String address; + + @Nullable + @JsonProperty("slot") + private String slot; + + @Nullable + @JsonProperty("from_value") + private String fromValue; + + @Nullable + @JsonProperty("to_value") + private String toValue; + + @Nullable + @JsonProperty("transaction_hash") + private String transactionHash; + + @Nullable + @JsonProperty("transaction_index") + private Long transactionIndex; + + @Nullable + @JsonProperty("block_timestamp") + private Long blockTimestamp; + + @Nullable + @JsonProperty("block_number") + private Long blockNumber; + + @Nullable + @JsonProperty("block_hash") + private String blockHash; + + @Nullable + @JsonProperty("chain_id") + private Long chainId; + + public StorageDiff() {} + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public String getSlot() { + return slot; + } + + public void setSlot(String slot) { + this.slot = slot; + } + + public String getFromValue() { + return fromValue; + } + + public void setFromValue(String fromValue) { + this.fromValue = fromValue; + } + + public String getToValue() { + return toValue; + } + + public void setToValue(String toValue) { + this.toValue = toValue; + } + + public String getTransactionHash() { + return transactionHash; + } + + public void setTransactionHash(String transactionHash) { + this.transactionHash = transactionHash; + } + + public Long getTransactionIndex() { + return transactionIndex; + } + + public void setTransactionIndex(Long transactionIndex) { + this.transactionIndex = transactionIndex; + } + + public Long getBlockTimestamp() { + return blockTimestamp; + } + + public void setBlockTimestamp(Long blockTimestamp) { + this.blockTimestamp = blockTimestamp; + } + + public Long getBlockNumber() { + return blockNumber; + } + + public void setBlockNumber(Long blockNumber) { + this.blockNumber = blockNumber; + } + + public String getBlockHash() { + return blockHash; + } + + public void setBlockHash(String blockHash) { + this.blockHash = blockHash; + } + + public Long getChainId() { + return chainId; + } + + public void setChainId(Long chainId) { + this.chainId = chainId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StorageDiff that = (StorageDiff) o; + return Objects.equal(address, that.address) && + Objects.equal(slot, that.slot) && + Objects.equal(fromValue, that.fromValue) && + Objects.equal(toValue, that.toValue) && + Objects.equal(transactionHash, that.transactionHash) && + Objects.equal(transactionIndex, that.transactionIndex) && + Objects.equal(blockTimestamp, that.blockTimestamp) && + Objects.equal(blockNumber, that.blockNumber) && + Objects.equal(blockHash, that.blockHash) && + Objects.equal(chainId, that.chainId); + } + + @Override + public int hashCode() { + return Objects.hashCode(address, slot, fromValue, toValue, transactionHash, transactionIndex, blockTimestamp, + blockNumber, blockHash, chainId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("address", address) + .add("slot", slot) + .add("fromValue", fromValue) + .add("toValue", toValue) + .add("transactionHash", transactionHash) + .add("transactionIndex", transactionIndex) + .add("blockTimestamp", blockTimestamp) + .add("blockNumber", blockNumber) + .add("blockHash", blockHash) + .add("chainId", chainId) + .toString(); + } +} diff --git a/src/main/java/io/blockchainetl/ethereum/fns/ConvertBalanceDiffsToTableRowsFn.java b/src/main/java/io/blockchainetl/ethereum/fns/ConvertBalanceDiffsToTableRowsFn.java new file mode 100644 index 0000000..23b346c --- /dev/null +++ b/src/main/java/io/blockchainetl/ethereum/fns/ConvertBalanceDiffsToTableRowsFn.java @@ -0,0 +1,31 @@ +package io.blockchainetl.ethereum.fns; + +import com.google.api.services.bigquery.model.TableRow; +import io.blockchainetl.common.fns.ConvertEntitiesToTableRowsFn; +import io.blockchainetl.common.utils.JsonUtils; +import io.blockchainetl.ethereum.domain.BalanceDiff; + +public class ConvertBalanceDiffsToTableRowsFn extends ConvertEntitiesToTableRowsFn { + + public ConvertBalanceDiffsToTableRowsFn(String startTimestamp, Long allowedTimestampSkewSeconds) { + super(startTimestamp, allowedTimestampSkewSeconds, "", false); + } + + public ConvertBalanceDiffsToTableRowsFn(String startTimestamp, Long allowedTimestampSkewSeconds, String logPrefix) { + super(startTimestamp, allowedTimestampSkewSeconds, logPrefix, false); + } + + @Override + protected void populateTableRowFields(TableRow row, String element) { + BalanceDiff balanceDiff = JsonUtils.parseJson(element, BalanceDiff.class); + + row.set("address", balanceDiff.getAddress()); + row.set("from_value", balanceDiff.getFromValue()); + row.set("to_value", balanceDiff.getToValue()); + row.set("transaction_hash", balanceDiff.getTransactionHash()); + row.set("transaction_index", balanceDiff.getTransactionIndex()); + row.set("block_number", balanceDiff.getBlockNumber()); + row.set("block_hash", balanceDiff.getBlockHash()); + row.set("chain_id", balanceDiff.getChainId()); + } +} diff --git a/src/main/java/io/blockchainetl/ethereum/fns/ConvertNonceDiffsToTableRowsFn.java b/src/main/java/io/blockchainetl/ethereum/fns/ConvertNonceDiffsToTableRowsFn.java new file mode 100644 index 0000000..bf89547 --- /dev/null +++ b/src/main/java/io/blockchainetl/ethereum/fns/ConvertNonceDiffsToTableRowsFn.java @@ -0,0 +1,31 @@ +package io.blockchainetl.ethereum.fns; + +import com.google.api.services.bigquery.model.TableRow; +import io.blockchainetl.common.fns.ConvertEntitiesToTableRowsFn; +import io.blockchainetl.common.utils.JsonUtils; +import io.blockchainetl.ethereum.domain.NonceDiff; + +public class ConvertNonceDiffsToTableRowsFn extends ConvertEntitiesToTableRowsFn { + + public ConvertNonceDiffsToTableRowsFn(String startTimestamp, Long allowedTimestampSkewSeconds) { + super(startTimestamp, allowedTimestampSkewSeconds, "", false); + } + + public ConvertNonceDiffsToTableRowsFn(String startTimestamp, Long allowedTimestampSkewSeconds, String logPrefix) { + super(startTimestamp, allowedTimestampSkewSeconds, logPrefix, false); + } + + @Override + protected void populateTableRowFields(TableRow row, String element) { + NonceDiff nonceDiff = JsonUtils.parseJson(element, NonceDiff.class); + + row.set("address", nonceDiff.getAddress()); + row.set("from_value", nonceDiff.getFromValue()); + row.set("to_value", nonceDiff.getToValue()); + row.set("transaction_hash", nonceDiff.getTransactionHash()); + row.set("transaction_index", nonceDiff.getTransactionIndex()); + row.set("block_number", nonceDiff.getBlockNumber()); + row.set("block_hash", nonceDiff.getBlockHash()); + row.set("chain_id", nonceDiff.getChainId()); + } +} diff --git a/src/main/java/io/blockchainetl/ethereum/fns/ConvertStorageDiffsToTableRowsFn.java b/src/main/java/io/blockchainetl/ethereum/fns/ConvertStorageDiffsToTableRowsFn.java new file mode 100644 index 0000000..e9ea131 --- /dev/null +++ b/src/main/java/io/blockchainetl/ethereum/fns/ConvertStorageDiffsToTableRowsFn.java @@ -0,0 +1,32 @@ +package io.blockchainetl.ethereum.fns; + +import com.google.api.services.bigquery.model.TableRow; +import io.blockchainetl.common.fns.ConvertEntitiesToTableRowsFn; +import io.blockchainetl.common.utils.JsonUtils; +import io.blockchainetl.ethereum.domain.StorageDiff; + +public class ConvertStorageDiffsToTableRowsFn extends ConvertEntitiesToTableRowsFn { + + public ConvertStorageDiffsToTableRowsFn(String startTimestamp, Long allowedTimestampSkewSeconds) { + super(startTimestamp, allowedTimestampSkewSeconds, "", false); + } + + public ConvertStorageDiffsToTableRowsFn(String startTimestamp, Long allowedTimestampSkewSeconds, String logPrefix) { + super(startTimestamp, allowedTimestampSkewSeconds, logPrefix, false); + } + + @Override + protected void populateTableRowFields(TableRow row, String element) { + StorageDiff storageDiff = JsonUtils.parseJson(element, StorageDiff.class); + + row.set("address", storageDiff.getAddress()); + row.set("slot", storageDiff.getSlot()); + row.set("from_value", storageDiff.getFromValue()); + row.set("to_value", storageDiff.getToValue()); + row.set("transaction_hash", storageDiff.getTransactionHash()); + row.set("transaction_index", storageDiff.getTransactionIndex()); + row.set("block_number", storageDiff.getBlockNumber()); + row.set("block_hash", storageDiff.getBlockHash()); + row.set("chain_id", storageDiff.getChainId()); + } +} diff --git a/terraform/main.tf b/terraform/main.tf index 7119175..ec982d4 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -112,7 +112,7 @@ resource "google_dataflow_flex_template_job" "zkevm_imtbl_mainnet_job" { parameters = { chainConfigFile = "/template/blockchain_zkevm_imtbl_mainnet_${terraform.workspace}.json" - allowedTimestampSkewSeconds = "5184000" + allowedTimestampSkewSeconds = "31536000" gcpTempLocation = "gs://${terraform.workspace}-im-data-imx-resource/ethereum-etl/zkevm-imtbl-mainnet-streaming/temp" tempLocation = "gs://${terraform.workspace}-im-data-imx-resource/ethereum-etl/zkevm-imtbl-mainnet-streaming/temp" project = "${terraform.workspace}-im-data" From cf216fbe03a7a19b98be2fb71999fdfbfb1c7397 Mon Sep 17 00:00:00 2001 From: Jason Adams Date: Thu, 9 May 2024 08:04:50 +1000 Subject: [PATCH 2/2] DATA-1506 State Diffs POC --- terraform/main.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/main.tf b/terraform/main.tf index ec982d4..7119175 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -112,7 +112,7 @@ resource "google_dataflow_flex_template_job" "zkevm_imtbl_mainnet_job" { parameters = { chainConfigFile = "/template/blockchain_zkevm_imtbl_mainnet_${terraform.workspace}.json" - allowedTimestampSkewSeconds = "31536000" + allowedTimestampSkewSeconds = "5184000" gcpTempLocation = "gs://${terraform.workspace}-im-data-imx-resource/ethereum-etl/zkevm-imtbl-mainnet-streaming/temp" tempLocation = "gs://${terraform.workspace}-im-data-imx-resource/ethereum-etl/zkevm-imtbl-mainnet-streaming/temp" project = "${terraform.workspace}-im-data"