Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Implement basic CRC writer #4073

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

huan233usc
Copy link
Collaborator

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR adds the utility class for writing CRC files for delta table after commit. Integration will be done in the following PR.

How was this patch tested?

Added a new test suite

Does this PR introduce any user-facing changes?

No

Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Left some comments.

public class ChecksumWriter {

private static final Logger log = LoggerFactory.getLogger(ChecksumWriter.class);
public static StructType CRC_FILE_SCHEMA =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this CRC_FILE_SCHEMA be stored in https://github.com/delta-io/delta/pull/4077/files#diff-897131313222a4b4add04e6c677dc523ce53481fb5e784d7058e9d550a12159a VersionStats

(although I'd like VersionStats to be renamed)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can have a FULL_SCHEMA (that we use for writing) and a READ_SCHEMA that we use for reading

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create a ChecksumUtils.java for holding shared code path like FULL_SCHEMA. In the end we may also want to make read schema also the same as FULL_SCHEMA. I will do a refactor later.

*
* @return true if checksum file is successfully written, false otherwise.
*/
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some logs stmts?

*
* @return true if checksum file is successfully written, false otherwise.
*/
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: txnId not tnxId

*
* @return true if checksum file is successfully written, false otherwise.
*/
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the transaction id? do all transactions have one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah pretty sure this is optional (I think this is setTxnId?...)

https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file-schema

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is optional, changed the signature.

class MockCheckSumFileJsonWriter extends BaseMockJsonHandler {
var capturedCrcRow: Row = new GenericRow(new StructType(), new util.HashMap[Integer, AnyRef]);

override def writeJsonFileAtomically(filePath: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: weird indentation. Please see https://github.com/databricks/scala-style-guide

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed scala format plugin locally and re-run


def createTestProtocol(): Protocol = {
new Protocol(
/* minReaderVersion= */ 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use version 1, 2 not 0, 1.

we may assert in the future that these values are proper, in the protocol constructor; let's get ahead of that now

Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. For the tests (especially in kernel-api when we're testing inner utils try to make sure we cover all the cases based on the code added/written).

public class ChecksumWriter {

private static final Logger log = LoggerFactory.getLogger(ChecksumWriter.class);
public static StructType CRC_FILE_SCHEMA =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can have a FULL_SCHEMA (that we use for writing) and a READ_SCHEMA that we use for reading

.getJsonHandler()
.writeJsonFileAtomically(
newChecksumPath.toString(),
toCloseableIterator(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to use Utils.singletonCloseableIterator here instead (cleaner since that's exactly what we want)

Comment on lines 46 to 49
this.protocol = protocol;
this.metadata = metadata;
this.tableSizeBytes = tableSizeBytes;
this.numFiles = numFiles;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is metadata and protocol required non-null here? let's add requireNonNull to all these non-primitive args

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise we need null checks up in ChecksumWriter. buildCheckSumRow

*
* @return true if checksum file is successfully written, false otherwise.
*/
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah pretty sure this is optional (I think this is setTxnId?...)

https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file-schema

OptionalLong.of(100),
OptionalLong.of(1))
checksumWriter.maybeWriteCheckSum(snapshotHint, "tnx")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate the schema first

OptionalLong.of(100),
OptionalLong.of(1))
checksumWriter.maybeWriteCheckSum(snapshotHint, "tnx")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate the checksum file path that it was written to

OptionalLong.of(100),
OptionalLong.of(1))
checksumWriter.maybeWriteCheckSum(snapshotHint, "tnx")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate that no additional rows were written to the file (only 1 row was written and it has the schema we expect)

import java.util
import java.util.{Collections, HashMap, Optional, OptionalLong}

class ChecksumReadWriteSuite extends AnyFunSuite with MockEngineUtils {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests for numFiles or getTableSizeBytes aren't present ==> no checksum file is written

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also probably want to test that the entire write does not fail if the checksum write fails, but I think we can probably do this in the PR with the e2e integration

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also tests for when txnId is not present (let's make it optional as in the protocol)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done for numFiles or getTableSizeBytes aren't present ==> no checksum file is written and when txnId is not present.

for the second one I will make sure to add it in e2e tests.

Comment on lines 76 to 80
new ArrayValue() { // partitionColumns
override def getSize = 1

override def getElements: ColumnVector = singletonStringColumnVector("c3")
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use VectorUtils.stringArrayValue here

Comment on lines 82 to 89
new MapValue() { // conf
override def getSize = 1

override def getKeys: ColumnVector = singletonStringColumnVector("delta.appendOnly")

override def getValues: ColumnVector =
singletonStringColumnVector("true")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here with VectorUtils.stringStringMapValue

@huan233usc huan233usc marked this pull request as draft January 29, 2025 20:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants