-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Implement basic CRC writer #4073
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Left some comments.
public class ChecksumWriter { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(ChecksumWriter.class); | ||
public static StructType CRC_FILE_SCHEMA = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this CRC_FILE_SCHEMA be stored in https://github.com/delta-io/delta/pull/4077/files#diff-897131313222a4b4add04e6c677dc523ce53481fb5e784d7058e9d550a12159a VersionStats
(although I'd like VersionStats to be renamed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can have a FULL_SCHEMA
(that we use for writing) and a READ_SCHEMA
that we use for reading
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I create a ChecksumUtils.java for holding shared code path like FULL_SCHEMA. In the end we may also want to make read schema also the same as FULL_SCHEMA. I will do a refactor later.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotHint.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java
Outdated
Show resolved
Hide resolved
* | ||
* @return true if checksum file is successfully written, false otherwise. | ||
*/ | ||
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some logs stmts?
kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checksum/ChecksumReadWriteSuite.scala
Show resolved
Hide resolved
* | ||
* @return true if checksum file is successfully written, false otherwise. | ||
*/ | ||
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: txnId
not tnxId
* | ||
* @return true if checksum file is successfully written, false otherwise. | ||
*/ | ||
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the transaction id? do all transactions have one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah pretty sure this is optional (I think this is setTxnId?...)
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file-schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it is optional, changed the signature.
class MockCheckSumFileJsonWriter extends BaseMockJsonHandler { | ||
var capturedCrcRow: Row = new GenericRow(new StructType(), new util.HashMap[Integer, AnyRef]); | ||
|
||
override def writeJsonFileAtomically(filePath: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: weird indentation. Please see https://github.com/databricks/scala-style-guide
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed scala format plugin locally and re-run
|
||
def createTestProtocol(): Protocol = { | ||
new Protocol( | ||
/* minReaderVersion= */ 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use version 1, 2 not 0, 1.
we may assert in the future that these values are proper, in the protocol constructor; let's get ahead of that now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments. For the tests (especially in kernel-api
when we're testing inner utils try to make sure we cover all the cases based on the code added/written).
public class ChecksumWriter { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(ChecksumWriter.class); | ||
public static StructType CRC_FILE_SCHEMA = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can have a FULL_SCHEMA
(that we use for writing) and a READ_SCHEMA
that we use for reading
.getJsonHandler() | ||
.writeJsonFileAtomically( | ||
newChecksumPath.toString(), | ||
toCloseableIterator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should be able to use Utils.singletonCloseableIterator
here instead (cleaner since that's exactly what we want)
this.protocol = protocol; | ||
this.metadata = metadata; | ||
this.tableSizeBytes = tableSizeBytes; | ||
this.numFiles = numFiles; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is metadata
and protocol
required non-null here? let's add requireNonNull
to all these non-primitive args
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise we need null checks up in ChecksumWriter. buildCheckSumRow
* | ||
* @return true if checksum file is successfully written, false otherwise. | ||
*/ | ||
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah pretty sure this is optional (I think this is setTxnId?...)
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file-schema
OptionalLong.of(100), | ||
OptionalLong.of(1)) | ||
checksumWriter.maybeWriteCheckSum(snapshotHint, "tnx") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate the schema first
OptionalLong.of(100), | ||
OptionalLong.of(1)) | ||
checksumWriter.maybeWriteCheckSum(snapshotHint, "tnx") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate the checksum file path that it was written to
OptionalLong.of(100), | ||
OptionalLong.of(1)) | ||
checksumWriter.maybeWriteCheckSum(snapshotHint, "tnx") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate that no additional rows were written to the file (only 1 row was written and it has the schema we expect)
import java.util | ||
import java.util.{Collections, HashMap, Optional, OptionalLong} | ||
|
||
class ChecksumReadWriteSuite extends AnyFunSuite with MockEngineUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tests for numFiles or getTableSizeBytes aren't present ==> no checksum file is written
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also probably want to test that the entire write does not fail if the checksum write fails, but I think we can probably do this in the PR with the e2e integration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also tests for when txnId is not present (let's make it optional as in the protocol)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done for numFiles or getTableSizeBytes aren't present ==> no checksum file is written and when txnId is not present.
for the second one I will make sure to add it in e2e tests.
new ArrayValue() { // partitionColumns | ||
override def getSize = 1 | ||
|
||
override def getElements: ColumnVector = singletonStringColumnVector("c3") | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use VectorUtils.stringArrayValue
here
new MapValue() { // conf | ||
override def getSize = 1 | ||
|
||
override def getKeys: ColumnVector = singletonStringColumnVector("delta.appendOnly") | ||
|
||
override def getValues: ColumnVector = | ||
singletonStringColumnVector("true") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing here with VectorUtils.stringStringMapValue
Which Delta project/connector is this regarding?
Description
This PR adds the utility class for writing CRC files for delta table after commit. Integration will be done in the following PR.
How was this patch tested?
Added a new test suite
Does this PR introduce any user-facing changes?
No