Skip to content

Commit

Permalink
[CORE-4444] schema_registry: Switch raw_string to iobuf
Browse files Browse the repository at this point in the history
Also fixes [CORE-684] [CORE-4446] [CORE-4447]

Signed-off-by: Ben Pope <[email protected]>
(cherry picked from commit e7fab4a)

Conflicts:
  src/v/pandaproxy/schema_registry/json.cc (no Json schema))
  src/v/pandaproxy/schema_registry/types.h (no formatter)
  • Loading branch information
BenPope committed Jul 17, 2024
1 parent 5f1e3ef commit 8b68ac5
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 54 deletions.
57 changes: 37 additions & 20 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@

#include "pandaproxy/schema_registry/avro.h"

#include "bytes/streambuf.h"
#include "json/allocator.h"
#include "json/chunked_input_stream.h"
#include "json/document.h"
#include "json/encodings.h"
#include "json/stringbuffer.h"
#include "json/json.h"
#include "json/types.h"
#include "json/writer.h"
#include "pandaproxy/schema_registry/error.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/sharded_store.h"
#include "pandaproxy/schema_registry/types.h"
#include "strings/string_switch.h"

#include <seastar/core/coroutine.hh>
Expand All @@ -30,6 +31,7 @@
#include <avro/Compiler.hh>
#include <avro/Exception.hh>
#include <avro/GenericDatum.hh>
#include <avro/Stream.hh>
#include <avro/Types.hh>
#include <avro/ValidSchema.hh>
#include <boost/outcome/std_result.hpp>
Expand All @@ -43,6 +45,10 @@
#include <stack>
#include <string_view>

namespace pandaproxy::json {
using namespace ::json;
}

namespace pandaproxy::schema_registry {

namespace {
Expand Down Expand Up @@ -421,7 +427,10 @@ std::ostream& operator<<(std::ostream& os, const avro_schema_definition& def) {
}

canonical_schema_definition::raw_string avro_schema_definition::raw() const {
return canonical_schema_definition::raw_string{_impl.toJson(false)};
iobuf_ostream os;
_impl.toJson(os.ostream());
return canonical_schema_definition::raw_string{
json::minify(std::move(os).buf())};
}

ss::sstring avro_schema_definition::name() const {
Expand All @@ -436,17 +445,22 @@ class collected_schema {
bool insert(ss::sstring name, canonical_schema_definition def) {
bool inserted = _names.insert(std::move(name)).second;
if (inserted) {
_schemas.push_back(std::move(def).raw()());
_schemas.push_back(std::move(def).raw());
}
return inserted;
}
ss::sstring flatten() {
return fmt::format("{}", fmt::join(_schemas, "\n"));
canonical_schema_definition::raw_string flatten() && {
iobuf out;
for (auto& s : _schemas) {
out.append(std::move(s));
out.append("\n", 1);
}
return canonical_schema_definition::raw_string{std::move(out)};
}

private:
absl::flat_hash_set<ss::sstring> _names;
std::vector<ss::sstring> _schemas;
std::vector<canonical_schema_definition::raw_string> _schemas;
};

ss::future<collected_schema> collect_schema(
Expand All @@ -473,11 +487,10 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema) {
auto name = schema.sub()();
auto schema_refs = schema.def().refs();
auto refs = co_await collect_schema(store, {}, name, std::move(schema));
auto def = refs.flatten();
iobuf_istream sis{std::move(refs).flatten()()};
auto is = avro::istreamInputStream(sis.istream());
co_return avro_schema_definition{
avro::compileJsonSchemaFromMemory(
reinterpret_cast<const uint8_t*>(def.data()), def.length()),
std::move(schema_refs)};
avro::compileJsonSchemaFromStream(*is), std::move(schema_refs)};
} catch (const avro::Exception& e) {
ex = e;
}
Expand All @@ -492,12 +505,12 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) {
json::Document doc;
constexpr auto flags = rapidjson::kParseDefaultFlags
| rapidjson::kParseStopWhenDoneFlag;
const auto& raw = def.raw()();
if (raw.empty()) {
if (def.raw()().empty()) {
auto ec = error_code::schema_empty;
return error_info{ec, make_error_code(ec).message()};
}
doc.Parse<flags>(raw.data(), raw.size());
json::chunked_input_stream is{def.shared_raw()()};
doc.ParseStream<flags>(is);
if (doc.HasParseError()) {
return error_info{
error_code::schema_invalid,
Expand All @@ -509,21 +522,25 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) {
sanitize_context ctx{.alloc = doc.GetAllocator()};
auto res = sanitize(doc, ctx);
if (res.has_error()) {
// TODO BP: Prevent this linearizaton
iobuf_parser p(std::move(def).raw()());
return error_info{
res.assume_error().code(),
fmt::format("{} {}", res.assume_error().message(), raw)};
fmt::format(
"{} {}",
res.assume_error().message(),
p.read_string(p.bytes_left()))};
}

json::StringBuffer str_buf;
str_buf.Reserve(raw.size());
json::Writer<json::StringBuffer> w{str_buf};
json::chunked_buffer buf;
json::Writer<json::chunked_buffer> w{buf};

if (!doc.Accept(w)) {
return error_info{error_code::schema_invalid, "Invalid schema"};
}

return canonical_schema_definition{
std::string_view{str_buf.GetString(), str_buf.GetSize()},
canonical_schema_definition::raw_string{std::move(buf).as_iobuf()},
schema_type::avro,
def.refs()};
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ ss::future<ctx_server<service>::reply_t> get_subject_versions_version_schema(
auto get_res = co_await rq.service().schema_store().get_subject_schema(
sub, version, inc_del);

rp.rep->write_body("json", get_res.schema.def().raw()());
rp.rep->write_body(
"json", ppj::as_body_writer(std::move(get_res.schema).def().raw()()));
co_return rp;
}

Expand All @@ -543,7 +544,7 @@ get_subject_versions_version_referenced_by(
auto references = co_await rq.service().schema_store().referenced_by(
sub, version);

rp.rep->write_body("json", ppj::rjson_serialize(references));
rp.rep->write_body("json", ppj::rjson_serialize(std::move(references)));
co_return rp;
}

Expand Down
19 changes: 9 additions & 10 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "pandaproxy/schema_registry/protobuf.h"

#include "base/vlog.h"
#include "bytes/streambuf.h"
#include "kafka/protocol/errors.h"
#include "pandaproxy/logger.h"
#include "pandaproxy/schema_registry/errors.h"
Expand Down Expand Up @@ -201,8 +202,8 @@ class dp_error_collector final : public pb::DescriptorPool::ErrorCollector {
class schema_def_input_stream : public pb::io::ZeroCopyInputStream {
public:
explicit schema_def_input_stream(const canonical_schema_definition& def)
: _str(def.raw())
, _impl{_str().data(), static_cast<int>(_str().size())} {}
: _is{def.shared_raw()}
, _impl{&_is.istream()} {}

bool Next(const void** data, int* size) override {
return _impl.Next(data, size);
Expand All @@ -212,8 +213,8 @@ class schema_def_input_stream : public pb::io::ZeroCopyInputStream {
int64_t ByteCount() const override { return _impl.ByteCount(); }

private:
canonical_schema_definition::raw_string _str;
pb::io::ArrayInputStream _impl;
iobuf_istream _is;
pb::io::IstreamInputStream _impl;
};

class parser {
Expand All @@ -231,13 +232,9 @@ class parser {
// Attempt parse a .proto file
if (!_parser.Parse(&t, &_fdp)) {
// base64 decode the schema
std::string_view b64_def{
schema.def().raw()().data(), schema.def().raw()().size()};
auto bytes_def = base64_to_bytes(b64_def);

iobuf_istream is{base64_to_iobuf(schema.def().raw()())};
// Attempt parse as an encoded FileDescriptorProto.pb
if (!_fdp.ParseFromArray(
bytes_def.data(), static_cast<int>(bytes_def.size()))) {
if (!_fdp.ParseFromIstream(&is.istream())) {
throw as_exception(error_collector.error());
}
}
Expand Down Expand Up @@ -326,6 +323,7 @@ struct protobuf_schema_definition::impl {
* messages
*/
ss::sstring debug_string() const {
// TODO BP: Prevent this linearization
auto s = fd->DebugString();

// reordering not required if no package or no dependencies
Expand Down Expand Up @@ -353,6 +351,7 @@ struct protobuf_schema_definition::impl {
auto imports = trim(sv.substr(imports_pos, imports_len));
auto footer = trim(sv.substr(package_pos + package.length()));

// TODO BP: Prevent this linearization
return ssx::sformat(
"{}\n{}\n\n{}\n\n{}\n", header, package, imports, footer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ class post_subject_versions_request_handler
auto sv = std::string_view{str, len};
switch (_state) {
case state::schema: {
iobuf buf;
buf.append(sv.data(), sv.size());
_schema.def = unparsed_schema_definition::raw_string{
ss::sstring{sv}};
std::move(buf)};
_state = state::record;
return true;
}
Expand Down
1 change: 0 additions & 1 deletion src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#pragma once

#include "base/vlog.h"
#include "bytes/iobuf_parser.h"
#include "json/iobuf_writer.h"
#include "json/json.h"
#include "json/types.h"
Expand Down
3 changes: 2 additions & 1 deletion src/v/pandaproxy/schema_registry/test/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ BOOST_AUTO_TEST_CASE(test_make_schema_definition) {
auto res = pps::make_schema_definition<json::UTF8<>>(example_avro_schema);

BOOST_REQUIRE(res);
BOOST_REQUIRE_EQUAL(res.value()(), minified_avro_schema);
auto str = to_string(std::move(res).value());
BOOST_REQUIRE_EQUAL(str, minified_avro_schema);
}

BOOST_AUTO_TEST_CASE(test_make_schema_definition_failure) {
Expand Down
8 changes: 6 additions & 2 deletions src/v/pandaproxy/schema_registry/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "types.h"

#include "util.h"

#include <fmt/core.h>
#include <fmt/format.h>
#include <fmt/ostream.h>
Expand Down Expand Up @@ -47,7 +49,8 @@ std::ostream& operator<<(
os,
"type: {}, definition: {}, references: {}",
to_string_view(def.type()),
def.raw(),
// TODO BP: Prevent this linearization
to_string(def.shared_raw()),
def.refs());
return os;
}
Expand All @@ -59,7 +62,8 @@ std::ostream& operator<<(
os,
"type: {}, definition: {}, references: {}",
to_string_view(def.type()),
def.raw(),
// TODO BP: Prevent this linearization
to_string(def.shared_raw()),
def.refs());
return os;
}
Expand Down
35 changes: 28 additions & 7 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "base/outcome.h"
#include "base/seastarx.h"
#include "json/iobuf_writer.h"
#include "kafka/protocol/errors.h"
#include "model/metadata.h"
#include "strings/string_switch.h"
Expand Down Expand Up @@ -117,7 +118,13 @@ template<typename Tag>
class typed_schema_definition {
public:
using tag = Tag;
using raw_string = named_type<ss::sstring, tag>;
struct raw_string : named_type<iobuf, tag> {
raw_string() = default;
explicit raw_string(iobuf&& buf) noexcept
: named_type<iobuf, tag>{std::move(buf)} {}
explicit raw_string(std::string_view sv)
: named_type<iobuf, tag>{iobuf::from(sv)} {}
};
using references = std::vector<schema_reference>;

typed_schema_definition() = default;
Expand All @@ -131,13 +138,13 @@ class typed_schema_definition {

template<typename T>
typed_schema_definition(T&& def, schema_type type)
: _def{ss::sstring{std::forward<T>(def)}}
: _def{std::forward<T>(def)}
, _type{type}
, _refs{} {}

template<typename T>
typed_schema_definition(T&& def, schema_type type, references refs)
: _def{ss::sstring{std::forward<T>(def)}}
: _def{std::forward<T>(def)}
, _type{type}
, _refs{std::move(refs)} {}

Expand All @@ -152,9 +159,9 @@ class typed_schema_definition {

const raw_string& raw() const& { return _def; }
raw_string raw() && { return std::move(_def); }
raw_string shared_raw() const& {
// temporarily implemented with copy before the type is changed
return _def;
raw_string shared_raw() const {
auto& buf = const_cast<iobuf&>(_def());
return raw_string{buf.share(0, buf.size_bytes())};
}

const references& refs() const& { return _refs; }
Expand All @@ -164,7 +171,9 @@ class typed_schema_definition {
return {shared_raw(), type(), refs()};
}

typed_schema_definition copy() const { return {_def, type(), refs()}; }
typed_schema_definition copy() const {
return {raw_string{_def().copy()}, type(), refs()};
}

auto destructure() && {
return make_tuple(std::move(_def), _type, std::move(_refs));
Expand Down Expand Up @@ -484,3 +493,15 @@ from_string_view<compatibility_level>(std::string_view sv) {
}

} // namespace pandaproxy::schema_registry

namespace json {

template<typename Buffer>
void rjson_serialize(
json::iobuf_writer<Buffer>& w,
const pandaproxy::schema_registry::canonical_schema_definition::raw_string&
def) {
w.String(def());
}

} // namespace json
Loading

0 comments on commit 8b68ac5

Please sign in to comment.