Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading multi-source compressed JSONL files #17161

Merged
merged 40 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3517b26
partial work
shrshi Oct 18, 2024
1cc6f46
compressed input datasource
shrshi Oct 23, 2024
1f46223
formatting
shrshi Oct 23, 2024
334ef06
improving the datasoruce
shrshi Oct 24, 2024
839bdda
cleanup
shrshi Oct 24, 2024
42a4b1b
slow path for some compression formats
shrshi Oct 24, 2024
cff583b
merge
shrshi Oct 24, 2024
c3b6cb3
cleanup
shrshi Oct 24, 2024
e116fa7
remove include
shrshi Oct 24, 2024
3cd7c1d
pr feedback
shrshi Oct 25, 2024
dc7471c
Merge branch 'branch-24.12' into gzip-read-json-bug
shrshi Oct 25, 2024
4bd817e
reorg and cleanup
shrshi Nov 6, 2024
51a382e
update function name
shrshi Nov 6, 2024
ac596a2
merge
shrshi Nov 6, 2024
9ae4c78
pr reviews
shrshi Nov 6, 2024
a6faf3d
storing compressed buf as datasource buf
shrshi Nov 6, 2024
89679b4
Merge branch 'branch-24.12' into gzip-read-json-bug
shrshi Nov 7, 2024
92facb5
pr reviews
shrshi Nov 8, 2024
2c3fd04
Merge branch 'gzip-read-json-bug' of github.com:shrshi/cudf into gzip…
shrshi Nov 8, 2024
751c724
Merge branch 'branch-24.12' into gzip-read-json-bug
shrshi Nov 9, 2024
bfd8013
Merge branch 'branch-24.12' into gzip-read-json-bug
shrshi Nov 9, 2024
3a1be8a
pr reviews
shrshi Nov 12, 2024
0107d5d
Merge branch 'gzip-read-json-bug' of github.com:shrshi/cudf into gzip…
shrshi Nov 12, 2024
aa7c6bf
formatting
shrshi Nov 12, 2024
1d55339
Merge branch 'branch-24.12' into gzip-read-json-bug
shrshi Nov 12, 2024
6fd47ca
cpp tests for compressed json io
shrshi Nov 13, 2024
5753c62
formatting
shrshi Nov 13, 2024
c95ea76
modified byte range tests
shrshi Nov 13, 2024
0bb1b45
fixing zip uncomp
shrshi Nov 13, 2024
c667552
moving test
shrshi Nov 13, 2024
971d32c
removing unnecessary file
shrshi Nov 13, 2024
3ab020a
cleanup
shrshi Nov 13, 2024
aec45a6
large strings snappy test failure
shrshi Nov 13, 2024
998265a
Merge branch 'branch-24.12' into gzip-read-json-bug
shrshi Nov 13, 2024
a0e3069
rename cu -> cpp
shrshi Nov 14, 2024
0d210af
Merge branch 'gzip-read-json-bug' of github.com:shrshi/cudf into gzip…
shrshi Nov 14, 2024
0c905d3
minor fix
shrshi Nov 14, 2024
27af753
formatting
shrshi Nov 14, 2024
e22684f
partial work
shrshi Nov 15, 2024
7460756
pr reviews - moving comp and uncomp to detail namespace
shrshi Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ add_library(
src/io/avro/avro_gpu.cu
src/io/avro/reader_impl.cu
src/io/comp/brotli_dict.cpp
src/io/comp/comp.cpp
src/io/comp/cpu_unbz2.cpp
src/io/comp/debrotli.cu
src/io/comp/gpuinflate.cu
Expand Down
117 changes: 117 additions & 0 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "comp.hpp"

#include "io/utilities/hostdevice_vector.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/error.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <zlib.h> // compress

namespace cudf {
namespace io {

/**
* @brief GZIP host compressor (includes header)
*/
std::vector<std::uint8_t> compress_gzip(host_span<uint8_t const> src)
{
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
zs.avail_in = src.size();
zs.next_in = reinterpret_cast<unsigned char*>(const_cast<unsigned char*>(src.data()));

std::vector<uint8_t> dst;
zs.avail_out = 0;
zs.next_out = nullptr;

int windowbits = 15;
int gzip_encoding = 16;
int ret = deflateInit2(
&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, windowbits | gzip_encoding, 8, Z_DEFAULT_STRATEGY);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression initialization failed.");

uint32_t estcomplen = deflateBound(&zs, src.size());
dst.resize(estcomplen);
zs.avail_out = estcomplen;
zs.next_out = dst.data();

ret = deflate(&zs, Z_FINISH);
CUDF_EXPECTS(ret == Z_STREAM_END, "GZIP DEFLATE compression failed due to insufficient space!");
dst.resize(std::distance(dst.data(), zs.next_out));

ret = deflateEnd(&zs);
CUDF_EXPECTS(ret == Z_OK, "GZIP DEFLATE compression failed at deallocation");

return dst;
}

/**
* @brief SNAPPY device compressor
*/
std::vector<std::uint8_t> compress_snappy(host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
auto const d_src =
detail::make_device_uvector_async(src, stream, cudf::get_current_device_resource_ref());
rmm::device_uvector<uint8_t> d_dst(src.size(), stream);

cudf::detail::hostdevice_vector<device_span<uint8_t const>> inputs(1, stream);
inputs[0] = d_src;
inputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<device_span<uint8_t>> outputs(1, stream);
outputs[0] = d_dst;
outputs.host_to_device_async(stream);

cudf::detail::hostdevice_vector<cudf::io::compression_result> hd_status(1, stream);
hd_status[0] = {};
hd_status.host_to_device_async(stream);

nvcomp::batched_compress(nvcomp::compression_type::SNAPPY, inputs, outputs, hd_status, stream);

stream.synchronize();
hd_status.device_to_host_sync(stream);
CUDF_EXPECTS(hd_status[0].status == cudf::io::compression_status::SUCCESS,
"snappy compression failed");
std::vector<uint8_t> dst(d_dst.size());
cudf::detail::cuda_memcpy(host_span<uint8_t>{dst}, device_span<uint8_t const>{d_dst}, stream);
return dst;
}

std::vector<std::uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream)
{
CUDF_FUNC_RANGE();
switch (compression) {
case compression_type::GZIP: return compress_gzip(src);
case compression_type::SNAPPY: return compress_snappy(src, stream);
default: CUDF_FAIL("Unsupported compression type");
}
}

} // namespace io
} // namespace cudf
43 changes: 43 additions & 0 deletions cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/io/types.hpp>
#include <cudf/utilities/span.hpp>

#include <memory>
#include <string>
#include <vector>

namespace CUDF_EXPORT cudf {
namespace io {
vuule marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief Compresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Decompressed host buffer
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Vector containing the Compressed output
*/
std::vector<uint8_t> compress(compression_type compression,
host_span<uint8_t const> src,
rmm::cuda_stream_view stream);

} // namespace io
} // namespace CUDF_EXPORT cudf
30 changes: 26 additions & 4 deletions cpp/src/io/comp/io_uncomp.hpp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't mind merging this file and comp.hpp. Definitely not required in this PR :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will open a follow-on PR for this

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022, NVIDIA CORPORATION.
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,7 @@

using cudf::host_span;

namespace cudf {
namespace CUDF_EXPORT cudf {
namespace io {
vuule marked this conversation as resolved.
Show resolved Hide resolved

/**
Expand All @@ -36,13 +36,35 @@ namespace io {
*
* @return Vector containing the Decompressed output
*/
std::vector<uint8_t> decompress(compression_type compression, host_span<uint8_t const> src);
[[nodiscard]] std::vector<uint8_t> decompress(compression_type compression,
host_span<uint8_t const> src);

/**
* @brief Decompresses a system memory buffer.
*
* @param compression Type of compression of the input data
* @param src Compressed host buffer
* @param dst Destination host span to place decompressed buffer
* @param stream CUDA stream used for device memory operations and kernel launches
*
* @return Size of decompressed output
*/
size_t decompress(compression_type compression,
host_span<uint8_t const> src,
host_span<uint8_t> dst,
rmm::cuda_stream_view stream);

/**
* @brief Without actually decompressing the compressed input buffer passed, return the size of
* decompressed output. If the decompressed size cannot be extracted apriori, return zero.
*
* @param compression Type of compression of the input data
* @param src Compressed host buffer
*
* @return Size of decompressed output
*/
size_t get_uncompressed_size(compression_type compression, host_span<uint8_t const> src);

/**
* @brief GZIP header flags
* See https://tools.ietf.org/html/rfc1952
Expand All @@ -56,4 +78,4 @@ constexpr uint8_t fcomment = 0x10; // Comment present
}; // namespace GZIPHeaderFlag

} // namespace io
} // namespace cudf
} // namespace CUDF_EXPORT cudf
Loading
Loading