-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix CTAS for non-hdfs storages, also fixes multi storage cases #256
base: main
Are you sure you want to change the base?
Conversation
e15e1dc
to
1a49f14
Compare
services/tables/src/main/java/com/linkedin/openhouse/tables/utils/TableUUIDGenerator.java
Outdated
Show resolved
Hide resolved
services/tables/src/main/java/com/linkedin/openhouse/tables/utils/TableUUIDGenerator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Storage API changes. One feedback on not introducing iceberg at rest layer.
public boolean pathExists(String path) { | ||
try { | ||
return fs.exists(new Path(path)); | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does fs.exists throw an IOException for non-existing paths? thats weird
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it returns true or false: https://github.com/apache/hadoop/blob/cd2cffe73f909a106ba47653acf525220f2665cf/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L1860
But could throw an IOException if getFileStatus() throws an IOException.
I am wondering if it could throw an IOException in scenarios like HDFS is unavailable but the file exists?
storageManager, dbIdFromProps, tblIdFromProps, tableUUIDProperty); | ||
if (TableType.REPLICA_TABLE != tableType && !doesPathExist(previousPath)) { | ||
log.error("Previous tableLocation: {} doesn't exist", previousPath); | ||
Storage storage = catalog.resolveStorage(TableIdentifier.of(dbIdFromProps, tblIdFromProps)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolveStorage will use SelectStorage
if the entry doesn't exist in HTS. That code is not guaranteed to be idempotent during migration, lets avoid using selectStorage again in the consecutive COMMIT/PutSnapshot call.
Can we instead use tableLocation and derive storage from there? ie:
if s3://a/b/c -> S3Storage
if abs://a/b/c -> ABSStorage
if a/b/c -> HDFSStorage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. Please check. there is a catch though, because both local and hdfs storage have same paths
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/BaseStorage.java
Outdated
Show resolved
Hide resolved
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/StorageClient.java
Outdated
Show resolved
Hide resolved
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/BaseStorage.java
Outdated
Show resolved
Hide resolved
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/Storage.java
Outdated
Show resolved
Hide resolved
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/StorageClient.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public boolean pathExists(String path) { | ||
// TODO: Support pathExists on ADLS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this as part of separate PR? DLFC is not initialized: #148
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we then throw not implement exception instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnsupportedOperationException is what we throw usually for such cases. I've rephrased the exception msg to mention its not implemented yet.
public boolean pathExists(String path) { | ||
try { | ||
return fs.exists(new Path(path)); | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it returns true or false: https://github.com/apache/hadoop/blob/cd2cffe73f909a106ba47653acf525220f2665cf/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L1860
But could throw an IOException if getFileStatus() throws an IOException.
I am wondering if it could throw an IOException in scenarios like HDFS is unavailable but the file exists?
@@ -55,4 +56,11 @@ public S3Client getNativeClient() { | |||
public StorageType.Type getStorageType() { | |||
return S3_TYPE; | |||
} | |||
|
|||
@Override | |||
public boolean pathExists(String path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the intended usage of this existence check?
- Do you want to check whether the bucket exists or not?
- Or, do you want to check whether any blob for the table has been created yet or not?
The two checks above are very different. For 1 you should check existence of bucket. For 2 you should check existence of objects with a prefix that way you are doing it now.
I would imagine that you intend to check that the bucket or root location has been created and hence 1 and not 2. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intention is to whether directory exists so it is check 2.
Current code passes the absolute path until table
-> blobfs://bucket_name/data/openhouse/db/t-uuid (blobfs)
-> s3://bucket_name/data/openhouse/db/t-uuid (s3)
-> /data/openhouse/db/t-uuid (hdfs)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer valid. See #256 (comment)
services/tables/src/main/java/com/linkedin/openhouse/tables/utils/TableUUIDGenerator.java
Outdated
Show resolved
Hide resolved
services/tables/src/main/java/com/linkedin/openhouse/tables/utils/TableUUIDGenerator.java
Outdated
Show resolved
Hide resolved
@HotSushi @jainlavina Addressed the comments. Some changes in the latest commit
|
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/StorageClient.java
Outdated
Show resolved
Hide resolved
* @param path absolute path to a file including scheme | ||
* @return true if path exists else false | ||
*/ | ||
boolean fileExists(String path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boolean fileExists(String path); | |
boolean exists(Path path); |
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/StorageManager.java
Outdated
Show resolved
Hide resolved
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/StorageManager.java
Outdated
Show resolved
Hide resolved
cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/StorageManager.java
Outdated
Show resolved
Hide resolved
* @return true if path exists else false | ||
*/ | ||
@Override | ||
public boolean fileExists(String path) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be default impl and go to BaseStorageClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets keep BaseStorageClient FileSystem agnostic.
* scheme. Scheme is not prefix for local and hdfs storage. See: | ||
* https://github.com/linkedin/openhouse/issues/121 | ||
* | ||
* @param path absolute path to a file including scheme |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this comment is not always true because the scheme may not have been passed along for hdfs files?
In that case, may be clarify that the path may or not have scheme specified. If scheme is not specified then it is assumed to be hdfs. For all other storage types, scheme must be specified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
public Storage getStorageFromPath(String path) { | ||
for (Storage storage : storages) { | ||
if (storage.isConfigured()) { | ||
if (StorageType.LOCAL.equals(storage.getType())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be fallback only if path does not start with any other configured endpoint?
What if the path is an S3 storage path but local storage is also configured for some other tables?
|
||
@Override | ||
public boolean pathExists(String path) { | ||
// TODO: Support pathExists on ADLS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we then throw not implement exception instead?
@@ -48,4 +49,20 @@ public FileSystem getNativeClient() { | |||
public StorageType.Type getStorageType() { | |||
return HDFS_TYPE; | |||
} | |||
|
|||
/** | |||
* Checks if the path exists on the backend storage. Scheme is not prefix for local and hdfs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I am having a hard time parsing the language "scheme is not prefix". Would you please consider paraphrasing to something like "Scheme is not specified in the path for local and hdfs storage".
public boolean fileExists(String path) { | ||
try { | ||
HeadObjectRequest headObjectRequest = | ||
HeadObjectRequest.builder().bucket(getRootPrefix()).key(path).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you check the headObject API spec that passing the full path including scheme and bucket name as key will work? It usually expects object name without the bucket name etc.
Summary
This PR introduces the following changes
While extracting UUID from a snapshot, the code constructs the database path excluding the endpoint (scheme) when checking if it is a prefix of the manifestList that is part of the snapshot
Example
ManifestList (from snapshot): s3://bucket-name/database/table-uuid/file.avro
Database prefix: bucket-name/database (not a prefix of above)
Fix: Strip the endpoint(scheme) from the manifest list by resolving the correct storage from the tableLocation
After fix
ManifestList stripped : bucket-name/database/table-uuid/file.avro
Database prefix: bucket-name/database (is a prefix of above)
There are assumptions in code that storage is always cluster default. This fails when the default is a storage without scheme (hdfs) and the db.table is being stored in a storage with scheme (S3, BlobFs)
Fix: Resolve the correct storage for by extracting the tableLocation from table props and checking the scheme (endpoint)
Changes
For all the boxes checked, please include additional details of the changes made in this pull request.
Testing Done
For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.
Additional Information
For all the boxes checked, include additional details of the changes made in this pull request.