Skip to content

Commit

Permalink
fix 3
Browse files Browse the repository at this point in the history
  • Loading branch information
BePPPower committed Nov 19, 2024
1 parent 58c3070 commit c48f8dc
Show file tree
Hide file tree
Showing 11 changed files with 255 additions and 39 deletions.
24 changes: 24 additions & 0 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,37 @@
#include "vec/columns/column_const.h"
#include "vec/common/arena.h"
#include "vec/common/assert_cast.h"
#include "vec/data_types/serde/data_type_nullable_serde.h"

namespace doris {

namespace vectorized {
class IColumn;
#include "common/compile_check_begin.h"

Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column, int start_idx,
int end_idx, BufferWritable& bw,
FormatOptions& options) const {
SERIALIZE_COLUMN_TO_JSON();
}

Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column, int row_num,
BufferWritable& bw,
FormatOptions& options) const {
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
return Status::OK();
}

Status DataTypeBitMapSerDe::deserialize_column_from_json_vector(
IColumn& column, std::vector<Slice>& slices, int* num_deserialized,
const FormatOptions& options) const {
Expand Down
8 changes: 2 additions & 6 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@ class DataTypeBitMapSerDe : public DataTypeSerDe {
DataTypeBitMapSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level) {};

Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw,
FormatOptions& options) const override {
return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name());
}
FormatOptions& options) const override;

Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx,
BufferWritable& bw, FormatOptions& options) const override {
return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name());
}
BufferWritable& bw, FormatOptions& options) const override;

Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const override;
Expand Down
31 changes: 10 additions & 21 deletions be/src/vec/data_types/serde/data_type_hll_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,17 @@ Status DataTypeHLLSerDe::serialize_column_to_json(const IColumn& column, int64_t
Status DataTypeHLLSerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num,
BufferWritable& bw,
FormatOptions& options) const {
if (!options._output_object_data) {
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
return Status::OK();
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
auto col_row = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = col_row.first;
row_num = col_row.second;
auto& data = const_cast<HyperLogLog&>(assert_cast<const ColumnHLL&>(*ptr).get_element(row_num));
std::unique_ptr<char[]> buf =
std::make_unique_for_overwrite<char[]>(data.max_serialized_size());
size_t size = data.serialize((uint8*)buf.get());
bw.write(buf.get(), size);
return Status::OK();
}

Expand Down
20 changes: 19 additions & 1 deletion be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,25 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons
orc::ColumnVectorBatch* orc_col_batch, int64_t start,
int64_t end,
std::vector<StringRef>& buffer_list) const {
return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name());
auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
const auto& string_column = assert_cast<const ColumnString&>(column);

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
std::string_view string_ref = string_column.get_data_at(row_id).to_string_view();
auto* serialized_value = new std::string();
*serialized_value =
JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size());
auto len = serialized_value->length();
StringRef bufferRef(*serialized_value);
buffer_list.emplace_back(bufferRef);
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data);
cur_batch->length[row_id] = len;
}
}

cur_batch->numElements = end - start;
return Status::OK();
}

void convert_jsonb_to_rapidjson(const JsonbValue& val, rapidjson::Value& target,
Expand Down
28 changes: 28 additions & 0 deletions be/src/vec/data_types/serde/data_type_object_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <rapidjson/stringbuffer.h>

#include <cstdint>
#include <string>

#include "common/exception.h"
#include "common/status.h"
Expand Down Expand Up @@ -164,6 +165,33 @@ void DataTypeObjectSerDe::write_column_to_arrow(const IColumn& column, const Nul
}
}

Status DataTypeObjectSerDe::write_column_to_orc(const std::string& timezone, const IColumn& column,
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch, int start,
int end,
std::vector<StringRef>& buffer_list) const {
const auto* var = check_and_get_column<ColumnObject>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
auto* serialized_value = new std::string();
if (!var->serialize_one_row_to_string(row_id, serialized_value)) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Failed to serialize variant {}",
var->dump_structure());
}
auto len = serialized_value->length();
StringRef bufferRef(*serialized_value);
buffer_list.emplace_back(bufferRef);
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data);
cur_batch->length[row_id] = len;
}
}

cur_batch->numElements = end - start;
return Status::OK();
}

} // namespace vectorized

} // namespace doris
4 changes: 1 addition & 3 deletions be/src/vec/data_types/serde/data_type_object_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ class DataTypeObjectSerDe : public DataTypeSerDe {
Status write_column_to_orc(const std::string& timezone, const IColumn& column,
const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch,
int64_t start, int64_t end,
std::vector<StringRef>& buffer_list) const override {
return Status::NotSupported("write_column_to_orc with type " + column.get_name());
}
std::vector<StringRef>& buffer_list) const override;

private:
template <bool is_binary_format>
Expand Down
16 changes: 14 additions & 2 deletions be/src/vec/data_types/serde/data_type_quantilestate_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "vec/columns/column_const.h"
#include "vec/common/arena.h"
#include "vec/common/string_ref.h"
#include "vec/data_types/serde/data_type_nullable_serde.h"

namespace doris {

Expand All @@ -44,12 +45,23 @@ class DataTypeQuantileStateSerDe : public DataTypeSerDe {

Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw,
FormatOptions& options) const override {
return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name());
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
return Status::OK();
}

Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx,
BufferWritable& bw, FormatOptions& options) const override {
return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name());
SERIALIZE_COLUMN_TO_JSON();
}
Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,11 @@
20220201 4 00000045010000000000000040 010114CAA737BD54146E 05010AAD0CDD7C590000
20220201 5 00000045010000000000000840 0101DCBC5BA258F9602C 05013A0C180F00000000

-- !select_load_orc --
20220201 0 \N \N \N
20220201 1 \N \N \N
20220201 2 \N \N \N
20220201 3 \N \N \N
20220201 4 \N \N \N
20220201 5 \N \N \N

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_load_parquet --
20220201 0 {"k1":"100"} {"k1":"100"}
20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"}
20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"}
20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123}
20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"}
20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"}

-- !select_load_orc --
20220201 0 {"k1":"100"} {"k1":"100"}
20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"}
20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"}
20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123}
20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"}
20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"}

-- !select_load_orc --
20220201 0 {"k1":"100"} {"k1":"100"}
20220201 1 {"k1":"100","k2":"123"} {"k1":"100","k2":"123"}
20220201 2 {"k1":"100","abc":"567"} {"abc":"567","k1":"100"}
20220201 3 {"k1":"100","k3":123} {"k1":"100","k3":123}
20220201 4 {"k1":"100","doris":"nereids"} {"doris":"nereids","k1":"100"}
20220201 5 {"k1":"100","doris":"pipeline"} {"doris":"pipeline","k1":"100"}

Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,37 @@ suite("test_outfile_complex_type", "p0") {
"""

// parquet file format
def outfile_url = outfile_to_S3("parquet")
def format = "parquet"
def outfile_url = outfile_to_S3("${format}")
qt_select_load_parquet """ SELECT dt, id, hex(price), hex(hll_t), hex(device_id) FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.parquet",
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "parquet",
"format" = "${format}",
"region" = "${region}"
);
"""

// orc file foramt
outfile_url = outfile_to_S3("orc")
format = "orc"
outfile_url = outfile_to_S3("${format}")
qt_select_load_orc """ SELECT dt, id, hex(price), hex(hll_t), hex(device_id) FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.orc",
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "orc",
"format" = "${format}",
"region" = "${region}"
);
"""

// orc file foramt
format = "csv"
outfile_url = outfile_to_S3("${format}")
qt_select_load_orc """ SELECT * FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "${format}",
"region" = "${region}"
);
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_outfile_jsonb_and_variant", "p0") {
String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");

def export_table_name = "test_outfile_jsonb_and_variant_table"
def outFilePath = "${bucket}/outfile/jsonb_and_variant/exp_"

def outfile_to_S3 = { format ->
// select ... into outfile ...
def res = sql """
SELECT * FROM ${export_table_name} t
INTO OUTFILE "s3://${outFilePath}"
FORMAT AS ${format}
PROPERTIES (
"s3.endpoint" = "${s3_endpoint}",
"s3.region" = "${region}",
"s3.secret_key"="${sk}",
"s3.access_key" = "${ak}"
);
"""

return res[0][3]
}

sql """ DROP TABLE IF EXISTS ${export_table_name} """
sql """
CREATE TABLE `${export_table_name}` (
`dt` int(11) NULL COMMENT "",
`id` int(11) NULL COMMENT "",
`json_col` JSON NULL COMMENT "",
`variant_col` variant NULL COMMENT ""
) ENGINE=OLAP
DISTRIBUTED BY HASH(`dt`)
PROPERTIES (
"replication_num" = "1"
);
"""

sql """
INSERT INTO `${export_table_name}` values
(20220201,0, '{"k1": "100"}', '{"k1": "100"}'),
(20220201,1, '{"k1": "100", "k2": "123"}', '{"k1": "100", "k2": "123"}'),
(20220201,2, '{"k1": "100", "abc": "567"}', '{"k1": "100", "abc": "567"}'),
(20220201,3, '{"k1": "100", "k3": 123}', '{"k1": "100", "k3": 123}'),
(20220201,4, '{"k1": "100", "doris": "nereids"}', '{"k1": "100", "doris": "nereids"}'),
(20220201,5, '{"k1": "100", "doris": "pipeline"}', '{"k1": "100", "doris": "pipeline"}');
"""

// parquet file format
def format = "parquet"
def outfile_url = outfile_to_S3("${format}")
qt_select_load_parquet """ SELECT * FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "${format}",
"region" = "${region}"
);
"""

// orc file foramt
format = "orc"
outfile_url = outfile_to_S3("${format}")
qt_select_load_orc """ SELECT * FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "${format}",
"region" = "${region}"
);
"""

// orc file foramt
format = "csv"
outfile_url = outfile_to_S3("${format}")
qt_select_load_orc """ SELECT * FROM S3 (
"uri" = "http://${bucket}.${s3_endpoint}${outfile_url.substring(5 + bucket.length(), outfile_url.length() - 1)}0.${format}",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "${format}",
"region" = "${region}"
);
"""
}

0 comments on commit c48f8dc

Please sign in to comment.