You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I had created one iceberg test table by pyiceberg, and I loaded the table, of course it returned an empty list with no error.
However, after I created the sink for this test table on risingwave, I cannot load the table data and it throw an exception with the information that some field called "field-id" is missing in the snap-***.avro schema.
PS: I am using Minio for local S3 bucket implemention.
Error message/log
Traceback (most recent call last):
File "C:\Users\*****\PycharmProjects\demoIceberg\load.py", line 13, in <module>
con = table.scan(
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\table\__init__.py", line 904, in to_duckdb
con.register(table_name, self.to_arrow())
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\table\__init__.py", line 889, in to_arrow
self.plan_files(),
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\table\__init__.py", line 831, in plan_files
for manifest_file in snapshot.manifests(io)
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\table\snapshots.py", line 107, in manifests
return list(read_manifest_list(file))
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\manifest.py", line 371, in read_manifest_list
with AvroFile[ManifestFile](
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\avro\file.py", line 171, in __enter__
self.schema = self.header.get_schema()
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\avro\file.py", line 96, in get_schema
return AvroSchemaConversion().avro_to_iceberg(avro_schema)
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\utils\schema_conversion.py", line 119, in avro_to_iceberg
return Schema(*[self._convert_field(field) for field in avro_schema["fields"]], schema_id=1)
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\utils\schema_conversion.py", line 119, in <listcomp>
return Schema(*[self._convert_field(field) for field in avro_schema["fields"]], schema_id=1)
File "C:\Users\*****\PycharmProjects\demoIceberg\venv\lib\site-packages\pyiceberg\utils\schema_conversion.py", line 224, in _convert_field
raise ValueError(f"Cannot convert field, missing field-id: {field}")
ValueError: Cannot convert field, missing field-id: {'name': 'manifest_path', 'type': 'string'}
Describe the bug
I had created one iceberg test table by pyiceberg, and I loaded the table, of course it returned an empty list with no error.
However, after I created the sink for this test table on risingwave, I cannot load the table data and it throw an exception with the information that some field called "field-id" is missing in the snap-***.avro schema.
PS: I am using Minio for local S3 bucket implemention.
Error message/log
To Reproduce
To create table by pyiceberg
from pyiceberg.catalog import load_catalog
catalog = load_catalog(
"rest_backend",
{
"s3.endpoint": "http://",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": "",
"s3.secret-access-key": "***",
}
)
catalog.create_tables()
catalog.create_namespace("demo")
from pyiceberg.schema import Schema
from pyiceberg.types import (
TimestampType,
StringType,
NestedField,
IntegerType,
)
schema = Schema(
NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True),
NestedField(field_id=2, name="name", field_type=StringType(), required=True),
NestedField(field_id=3, name="age", field_type=IntegerType(), required=False)
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
partition_spec = PartitionSpec(
PartitionField(
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
)
)
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
catalog.create_table(
identifier="demo.users",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
properties={'format-version': '2'}
)
To create table and sink on risingwave
CREATE public.users (
datetime timestamp,
name varchar,
age int4
);
CREATE SINK iceberg_sink FROM users
WITH (
connector = 'iceberg',
catalog.type = 'rest',
type='upsert',
s3.region='east-us',
warehouse.path = 's3://pyiceberg',
catalog.uri = 'http://',
s3.endpoint = 'http://',
s3.access.key = '',
s3.secret.key = '',
database.name='rest_backend',
table.name='demo.users',
primary_key='datetime'
);
-- insert some test data
insert into users values (now(), 'Jason', 12);
To load table data by pyiceberg
from pyiceberg.catalog import load_catalog
catalog = load_catalog(
"rest_backend",
{
"s3.endpoint": "http://",
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.access-key-id": "",
"s3.secret-access-key": "***",
}
)
table = catalog.load_table("demo.users")
con = table.scan(
).to_duckdb(table_name="users")
print(
con.execute(
"SELECT * from users"
).fetchall()
)
Expected behavior
No response
How did you deploy RisingWave?
No response
The version of RisingWave
v1.5.1
Additional context
No response
The text was updated successfully, but these errors were encountered: