Skip to content

Commit

Permalink
Merge pull request #64 from ecmwf/feature/FDB-332_partialDate
Browse files Browse the repository at this point in the history
Feature/fdb 332 partial date
  • Loading branch information
danovaro authored Jan 29, 2025
2 parents f1ec62b + 7801ae5 commit 31e578e
Show file tree
Hide file tree
Showing 41 changed files with 718 additions and 119 deletions.
4 changes: 2 additions & 2 deletions src/fdb5/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ list( APPEND fdb5_srcs
types/TypeInteger.h
types/TypeMonth.cc
types/TypeMonth.h
types/TypeYear.cc
types/TypeYear.h
types/TypeParam.cc
types/TypeParam.h
types/TypesFactory.cc
Expand Down Expand Up @@ -415,14 +417,12 @@ ecbuild_add_library(
"${UUID_LIBRARIES}"

PRIVATE_INCLUDES
"${PMEM_INCLUDE_DIRS}"
"${LUSTREAPI_INCLUDE_DIRS}"
"${DAOS_INCLUDE_DIRS}"
"${DAOS_TESTS_INCLUDE_DIRS}"

PRIVATE_LIBS
${grib_handling_pkg}
${PMEM_LIBRARIES}
${LUSTREAPI_LIBRARIES}
"${DAOS_LIBRARIES}"
"${DAOS_TESTS_LIBRARIES}"
Expand Down
7 changes: 6 additions & 1 deletion src/fdb5/api/local/AxesVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ bool AxesVisitor::preVisitDatabase(const eckit::URI& uri, const Schema& schema)
}

bool AxesVisitor::visitDatabase(const Catalogue& catalogue) {
if (level_>1) {
EntryVisitor::visitDatabase(catalogue);
}

dbKey_ = catalogue.key();
axes_.wipe();
axes_.insert(dbKey_);
Expand All @@ -50,8 +54,9 @@ bool AxesVisitor::visitDatabase(const Catalogue& catalogue) {
}

bool AxesVisitor::visitIndex(const Index& index) {
EntryVisitor::visitIndex(index);

if (index.partialMatch(request_)) {
if (index.partialMatch(*rule_, request_)) {
IndexAxis tmpAxis;
tmpAxis.insert(index.key());
tmpAxis.sort();
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/api/local/DumpVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DumpVisitor : public QueryVisitor<DumpElement> {
return true;
}

bool visitIndex(const Index& /*index*/) override { NOTIMP; }
bool visitIndex(const Index&) override { NOTIMP; }

void visitDatum(const Field& /*field*/, const Key& /*datumKey*/) override { NOTIMP; }

Expand Down
33 changes: 23 additions & 10 deletions src/fdb5/api/local/ListVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct ListVisitor : public QueryVisitor<ListElement> {

/// @todo remove this with better logic
bool preVisitDatabase(const eckit::URI& uri, const Schema& schema) override {

// If level == 1, avoid constructing the Catalogue/Store objects, so just interrogate the URIs
if (level_ == 1 && uri.scheme() == "toc") {
/// @todo only works with the toc backend
Expand All @@ -74,11 +75,16 @@ struct ListVisitor : public QueryVisitor<ListElement> {

bool ret = QueryVisitor::visitDatabase(catalogue);

ASSERT(currentCatalogue_->key().partialMatch(request_));
auto dbRequest = catalogue.rule().registry().canonicalise(request_);
if (!currentCatalogue_->key().partialMatch(dbRequest)) {
return false;
}

// Subselect the parts of the request
indexRequest_ = request_;
for (const auto& kv : currentCatalogue_->key()) { indexRequest_.unsetValues(kv.first); }
for (const auto& [k,v] : currentCatalogue_->key()) {
indexRequest_.unsetValues(k);
}

if (level_ == 1) {
queue_.emplace(currentCatalogue_->key(), 0);
Expand All @@ -96,15 +102,14 @@ struct ListVisitor : public QueryVisitor<ListElement> {
bool visitIndex(const Index& index) override {
QueryVisitor::visitIndex(index);

if (index.partialMatch(request_)) {
if (index.partialMatch(*rule_, request_)) {

// Subselect the parts of the request
datumRequest_ = indexRequest_;

for (const auto& kv : index.key()) { datumRequest_.unsetValues(kv.first); }

// Take into account any rule-specific behaviour in the request
datumRequest_ = rule_->registry().canonicalise(datumRequest_);
for (const auto& kv : index.key()) {
datumRequest_.unsetValues(kv.first);
}

if (level_ == 2) {
queue_.emplace(currentCatalogue_->key(), currentIndex_->key(), 0);
Expand All @@ -122,9 +127,17 @@ struct ListVisitor : public QueryVisitor<ListElement> {
ASSERT(currentCatalogue_);
ASSERT(currentIndex_);

if (datumKey.match(datumRequest_)) {
queue_.emplace(currentCatalogue_->key(), currentIndex_->key(), datumKey, field.stableLocation(),
field.timestamp());
// Take into account any rule-specific behaviour in the request
auto canonical = rule_->registry().canonicalise(request_);

if (datumKey.partialMatch(canonical)) {
for (const auto& k : datumKey.keys()) {
datumRequest_.unsetValues(k);
}
if (datumRequest_.parameters().size() == 0) {
queue_.emplace(currentCatalogue_->key(), currentIndex_->key(), datumKey, field.stableLocation(),
field.timestamp());
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/fdb5/daos/DaosCatalogue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ const Schema& DaosCatalogue::schema() const {

}

const Rule& DaosCatalogue::rule() const {
return *rule_;
}

void DaosCatalogue::loadSchema() {

eckit::Timer timer("DaosCatalogue::loadSchema()", eckit::Log::debug<fdb5::LibFdb5>());
Expand All @@ -102,6 +106,8 @@ void DaosCatalogue::loadSchema() {
std::istringstream stream{std::string(v.begin(), v.end())};
schema_.load(stream);

rule_ = &schema_.matchingRule(dbKey_);

}

WipeVisitor* DaosCatalogue::wipeVisitor(const Store& store, const metkit::mars::MarsRequest& request, std::ostream& out, bool doit, bool porcelain, bool unsafeWipeAll) const {
Expand Down
2 changes: 2 additions & 0 deletions src/fdb5/daos/DaosCatalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class DaosCatalogue : public CatalogueImpl, public DaosCommon {
void dump(std::ostream& out, bool simple, const eckit::Configuration& conf) const override { NOTIMP; };
std::vector<eckit::PathName> metadataPaths() const override { NOTIMP; };
const Schema& schema() const override;
const Rule& rule() const override;

StatsReportVisitor* statsReportVisitor() const override { NOTIMP; };
PurgeVisitor* purgeVisitor(const Store& store) const override { NOTIMP; };
Expand All @@ -68,6 +69,7 @@ class DaosCatalogue : public CatalogueImpl, public DaosCommon {
private: // members

Schema schema_;
const RuleDatabase* rule_ {nullptr};

};

Expand Down
4 changes: 3 additions & 1 deletion src/fdb5/database/Catalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Catalogue {

virtual std::unique_ptr<Store> buildStore() const = 0;
virtual const Schema& schema() const = 0;
virtual const Rule& rule() const = 0;

virtual bool selectIndex(const Key& idxKey) = 0;
virtual void deselectIndex() = 0;
Expand Down Expand Up @@ -259,7 +260,8 @@ class NullCatalogue : public Catalogue {
std::unique_ptr<Store> buildStore() const override { NOTIMP; }

const Schema& schema() const override { NOTIMP; }

const Rule& rule() const override { NOTIMP; }

bool selectIndex(const Key& idxKey) override { NOTIMP; }
void deselectIndex() override { NOTIMP; }

Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/EntryVisitMechanism.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ bool EntryVisitor::visitDatabase(const Catalogue& catalogue) {
currentCatalogue_ = &catalogue;
currentStore_ = nullptr;
currentIndex_ = nullptr;
rule_ = nullptr;
rule_ = &currentCatalogue_->rule();
return true;
}

Expand Down
20 changes: 7 additions & 13 deletions src/fdb5/database/Index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,15 @@ void IndexBase::put(const Key& key, const Field& field) {
add(key, field);
}

// const TypesRegistry& IndexBase::registry() const {
// if (!registry_) {
// const auto& rule = catalogue_.schema().matchingRule(catalogue_.key(), key_);
// registry_ = std::ref(rule.registry());
// }
// return registry_.value().get();
// }
bool IndexBase::partialMatch(const Rule& rule, const metkit::mars::MarsRequest& request) const {

bool IndexBase::partialMatch(const metkit::mars::MarsRequest& request) const {
// rule is the Datum rule (3rd level)
// to match the index key, we need to canonicalise the request with the rule at Index level (2nd level) aka rule.parent()
auto canonical = rule.parent().registry().canonicalise(request);
if (!key_.partialMatch(canonical)) { return false; }

if (!key_.partialMatch(request)) { return false; }

if (!axes_.partialMatch(request)) { return false; }

return true;
canonical = rule.registry().canonicalise(request);
return axes_.partialMatch(canonical);
}

bool IndexBase::mayContain(const Key& key) const {
Expand Down
6 changes: 4 additions & 2 deletions src/fdb5/database/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ namespace fdb5 {

class Index;
class IndexLocationVisitor;
class Rule;
class Schema;

//----------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -87,7 +88,7 @@ class IndexBase : public eckit::Counted {
/// @note default args on virtual methods is not best practice; no guarantee that overrides will have same defaults
virtual void dump(std::ostream& out, const char* indent, bool simple = false, bool dumpFields = false) const = 0;

virtual bool partialMatch(const metkit::mars::MarsRequest& request) const;
virtual bool partialMatch(const Rule& rule, const metkit::mars::MarsRequest& request) const;
virtual bool mayContain(const Key& key) const;
virtual bool mayContainPartial(const Key& key) const;

Expand Down Expand Up @@ -177,7 +178,8 @@ class Index {
IndexBase* content() { return content_; }
const IndexBase* content() const { return content_; }

bool partialMatch(const metkit::mars::MarsRequest& request) const { return content_->partialMatch(request); }
bool partialMatch(const Rule& rule, const metkit::mars::MarsRequest& request) const { return content_->partialMatch(rule, request); }
// bool partialMatch(metkit::mars::MarsRequest& request) const { return content_->partialMatch(request); }
bool mayContain(const Key& key) const { return content_->mayContain(key); }
bool mayContainPartial(const Key& key) const { return content_->mayContainPartial(key); }

Expand Down
3 changes: 2 additions & 1 deletion src/fdb5/database/Key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ Key TypedKey::canonical() const {
Key key;
for (const auto& keyword : names()) {
const auto& value = get(keyword);
value.empty() ? key.push(keyword, value) : key.push(keyword, registry_.lookupType(keyword).toKey(value));
const Type& type = registry_.lookupType(keyword);
value.empty() ? key.push(type.alias(), value) : key.push(type.alias(), type.toKey(value));
}
return key;
}
Expand Down
6 changes: 3 additions & 3 deletions src/fdb5/database/Manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ std::set<std::string> Manager::engines(const metkit::mars::MarsRequest& rq, bool
} else {

// Match all possible expansions of the first level according to the schema
std::set<Key> keys;
std::map<Key, const Rule*> keys;
config_.schema().matchDatabase(rq, keys, "");

std::set<std::string> expandedKeys;
for (auto k = keys.begin(); k != keys.end(); ++k) {
expandedKeys.insert(k->valuesToString());
for (const auto& [k,r] : keys) {
expandedKeys.insert(k.valuesToString());
}

for (auto e = engineTypes.begin(); e != engineTypes.end(); ++e) {
Expand Down
1 change: 0 additions & 1 deletion src/fdb5/fdb5_config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// features

#cmakedefine fdb5_HAVE_LUSTRE
#cmakedefine fdb5_HAVE_PMEMFDB
#cmakedefine fdb5_HAVE_RADOSFDB
#cmakedefine fdb5_HAVE_TOCFDB
#cmakedefine fdb5_HAVE_DUMMY_DAOS
Expand Down
13 changes: 11 additions & 2 deletions src/fdb5/remote/client/RemoteCatalogue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Schema* fetchSchema(const Key& dbKey, const RemoteCatalogue& catalogue) {
//----------------------------------------------------------------------------------------------------------------------

RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config)
: CatalogueImpl(key, {}, config), // xxx what are control identifiers? Setting empty here...
: CatalogueImpl(key, {}, config),
Client({config.getString("host"), config.getInt("port")}, ""),
config_(config) { }

Expand Down Expand Up @@ -108,6 +108,7 @@ const Key RemoteCatalogue::currentIndexKey() {
void RemoteCatalogue::deselectIndex() {
currentIndexKey_ = Key();
}

const Schema& RemoteCatalogue::schema() const {
// lazy loading schema
if (!schema_) {
Expand All @@ -117,6 +118,14 @@ const Schema& RemoteCatalogue::schema() const {
return *schema_;
}

const Rule& RemoteCatalogue::rule() const {
// lazy loading rule
if (!rule_) {
rule_ = std::cref(schema().matchingRule(dbKey_));
}
return rule_.value().get();
}

void RemoteCatalogue::flush(size_t archivedFields) {

std::lock_guard<std::mutex> lock(archiveMutex_);
Expand Down Expand Up @@ -169,7 +178,7 @@ eckit::URI RemoteCatalogue::uri() const {
void RemoteCatalogue::loadSchema() {
// NB we're at the db level, so get the db schema. We will want to get the master schema beforehand.
// (outside of the catalogue)
if (!schema_) { schema_.reset(fetchSchema(dbKey_, *this)); }
schema();
}

bool RemoteCatalogue::handle(Message message, uint32_t requestID) {
Expand Down
2 changes: 2 additions & 0 deletions src/fdb5/remote/client/RemoteCatalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C
const Key currentIndexKey() override;
void deselectIndex() override;
const Schema& schema() const override;
const Rule& rule() const override;

std::vector<eckit::PathName> metadataPaths() const override;
void visitEntries(EntryVisitor& visitor, bool sorted = false) override;
Expand Down Expand Up @@ -91,6 +92,7 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C
private:

Key currentIndexKey_;
mutable std::optional<std::reference_wrapper<const RuleDatabase>> rule_;
mutable std::unique_ptr<Schema> schema_;

std::mutex archiveMutex_;
Expand Down
Loading

0 comments on commit 31e578e

Please sign in to comment.