Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: the metadata of iceberg is corrupted after iceberg sink. #17976

Closed
chenzl25 opened this issue Aug 8, 2024 · 1 comment · Fixed by #17975 or #18020
Closed

Bug: the metadata of iceberg is corrupted after iceberg sink. #17976

chenzl25 opened this issue Aug 8, 2024 · 1 comment · Fixed by #17975 or #18020
Milestone

Comments

@chenzl25
Copy link
Contributor

chenzl25 commented Aug 8, 2024

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,mysql:mysql-connector-java:8.0.33,org.apache.hadoop:hadoop-aws:3.3.2 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.demo.type=hadoop \
    --conf spark.sql.catalog.demo.warehouse=s3a://hummock001/ \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
    --S --e "DROP TABLE demo.demo_db.t4"


spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,mysql:mysql-connector-java:8.0.33,org.apache.hadoop:hadoop-aws:3.3.2 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.demo.type=hadoop \
    --conf spark.sql.catalog.demo.warehouse=s3a://hummock001/ \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
    --S --e "CREATE TABLE demo.demo_db.t4 (a BIGINT) TBLPROPERTIES ('format-version'='2')"


create table t4(a bigint primary key);

create sink t4_sink from t4
with (
    connector = 'iceberg',
    type='upsert',
    primary_key = 'a',
    catalog.type = 'storage',
    warehouse.path = 's3a://hummock001/',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.access.key = 'hummockadmin',
    s3.secret.key = 'hummockadmin',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='t4'
);


create source iceberg_t4
with (
    connector = 'iceberg',
    catalog.type = 'storage',
    warehouse.path = 's3a://hummock001/',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.access.key = 'hummockadmin',
    s3.secret.key = 'hummockadmin',
    s3.region = 'us-east-1',
    database.name='demo_db',
    table.name='t4'
);




insert into t4 values(1);flush;
insert into t4 values(2);flush;
insert into t4 values(3);flush;
insert into t4 values(4);flush;
insert into t4 values(5);flush;
insert into t4 values(6);flush;


spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,mysql:mysql-connector-java:8.0.33,org.apache.hadoop:hadoop-aws:3.3.2 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.demo.type=hadoop \
    --conf spark.sql.catalog.demo.warehouse=s3a://hummock001/ \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
    --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
    --S


CALL demo.system.rewrite_data_files(table => 'demo_db.t4');
CALL demo.system.rewrite_manifests(table => 'demo_db.t4');
ALTER TABLE demo.demo_db.t CREATE TAG tt;
select * from demo.demo_db.t4.refs;
CALL demo.system.expire_snapshots(table => 'demo_db.t4');
CALL demo.system.remove_orphan_files(table => 'demo_db.t4');


insert into t4 values(7);flush;

# error happens
select count(*) from demo.demo_db.t4
@fuyufjh
Copy link
Member

fuyufjh commented Aug 12, 2024

From my understanding, all these data lakes are designed to provide atomic transactions and therefore solve the problem of concurrent modification.

However, #17975 only partially solve it in a best effort way. Is it possible to completely solve it, for example, by compare-and-set the Iceberg's metadata version atomically?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants