Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
vitstn committed Oct 24, 2024
1 parent 2f985f5 commit fb1c9fe
Show file tree
Hide file tree
Showing 18 changed files with 306 additions and 33 deletions.
7 changes: 6 additions & 1 deletion ydb/library/yql/core/type_ann/type_ann_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7590,11 +7590,16 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
return IGraphTransformer::TStatus::Error;
}

cached.NormalizedName = description.NormalizedName;
cached.FunctionType = description.CallableType;
cached.RunConfigType = description.RunConfigType ? description.RunConfigType : ctx.Expr.MakeType<TVoidExprType>();
cached.NormalizedUserType = description.NormalizedUserType ? description.NormalizedUserType : ctx.Expr.MakeType<TVoidExprType>();
cached.SupportsBlocks = description.SupportsBlocks;
cached.IsStrict = description.IsStrict;

if (name != cached.NormalizedName) {
ctx.Types.UdfTypeCache[std::make_tuple(cached.NormalizedName, TString(typeConfig), userType)] = cached;
}
}

TStringBuf typeConfig = "";
Expand Down Expand Up @@ -7623,7 +7628,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
TStringBuf fileAlias = udfInfo ? udfInfo->FileAlias : ""_sb;
auto ret = ctx.Expr.Builder(input->Pos())
.Callable("Udf")
.Add(0, input->HeadPtr())
.Atom(0, cached.NormalizedName)
.Add(1, runConfigValue)
.Add(2, ExpandType(input->Pos(), *cached.NormalizedUserType, ctx.Expr))
.Atom(3, typeConfig)
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/yql_type_annotation.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ enum class EBlockEngineMode {
};

struct TUdfCachedInfo {
TString NormalizedName;
const TTypeAnnotationNode* FunctionType = nullptr;
const TTypeAnnotationNode* RunConfigType = nullptr;
const TTypeAnnotationNode* NormalizedUserType = nullptr;
Expand Down
123 changes: 111 additions & 12 deletions ydb/library/yql/core/yql_udf_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,40 +80,133 @@ void AddResolveResultToRegistry(const TResolveResult& resolveResult, const TMap<
TUdfIndex::TUdfIndex() {
}

TUdfIndex::TUdfIndex(const TMap<TString, TResourceInfo::TPtr>& resources)
void TUdfIndex::SetCaseSentiveSearch(bool caseSensitive) {
CaseSensitive_ = caseSensitive;
}

TUdfIndex::TUdfIndex(const TMap<TString, TResourceInfo::TPtr>& resources, bool caseSensitive)
: Resources_(resources)
, CaseSensitive_(caseSensitive)
{

for (const auto& x : Resources_) {
ICaseModules_[to_lower(x.first)].insert(x.first);
}
}

bool TUdfIndex::ContainsModule(const TString& moduleName) const {
bool TUdfIndex::ContainsModuleStrict(const TString& moduleName) const {
return Resources_.contains(moduleName);
}

bool TUdfIndex::CanonizeModule(TString& moduleName) const {
if (Resources_.contains(moduleName)) {
return true;
}

if (CaseSensitive_) {
return false;
}

auto p = ICaseModules_.FindPtr(to_lower(moduleName));
if (!p) {
return false;
}

Y_ENSURE(p->size() > 0);
if (p->size() > 1) {
return false;
}

moduleName = *p->begin();
return true;
}

TUdfIndex::EStatus TUdfIndex::ContainsModule(const TString& moduleName) const {
if (Resources_.contains(moduleName)) {
return EStatus::Found;
}

if (CaseSensitive_) {
return EStatus::NotFound;
}

auto p = ICaseModules_.FindPtr(to_lower(moduleName));
if (!p) {
return EStatus::NotFound;
}

Y_ENSURE(p->size() > 0);
return p->size() > 1 ? EStatus::Ambigious : EStatus::Found;
}

bool TUdfIndex::ContainsAnyModule(const TSet<TString>& modules) const {
return AnyOf(modules, [this](auto& m) {
return this->ContainsModule(m);
return Resources_.contains(m);
});
}

bool TUdfIndex::FindFunction(const TString& moduleName, const TString& functionName, TFunctionInfo& function) const {
auto r = FindResourceByModule(moduleName);
TUdfIndex::EStatus TUdfIndex::FindFunction(const TString& moduleName, const TString& functionName, TFunctionInfo& function) const {
auto r = Resources_.FindPtr(moduleName);
if (!r) {
return false;
if (CaseSensitive_) {
return EStatus::NotFound;
}

auto p = ICaseModules_.FindPtr(to_lower(moduleName));
if (!p) {
return EStatus::NotFound;
}

Y_ENSURE(p->size() > 0);
if (p->size() > 1) {
return EStatus::Ambigious;
}

r = Resources_.FindPtr(*p->begin());
Y_ENSURE(r);
}

auto f = r->Functions.FindPtr(functionName);
auto f = (*r)->Functions.FindPtr(functionName);
if (!f) {
return false;
if (CaseSensitive_) {
return EStatus::NotFound;
}

auto p = (*r)->ICaseFuncNames.FindPtr(to_lower(functionName));
if (!p) {
return EStatus::NotFound;
}

Y_ENSURE(p->size() > 0);
if (p->size() > 1) {
return EStatus::Ambigious;
}

f = (*r)->Functions.FindPtr(*p->begin());
Y_ENSURE(f);
}

function = *f;
return true;
return EStatus::Found;
}

TResourceInfo::TPtr TUdfIndex::FindResourceByModule(const TString& moduleName) const {
auto p = Resources_.FindPtr(moduleName);
return p ? *p : nullptr;
if (!p) {
if (CaseSensitive_) {
return nullptr;
}

auto n = ICaseModules_.FindPtr(to_lower(moduleName));
Y_ENSURE(n->size() > 0);
if (n->size() > 1) {
return nullptr;
}

p = Resources_.FindPtr(*n->begin());
Y_ENSURE(p);
}

return *p;
}

TSet<TResourceInfo::TPtr> TUdfIndex::FindResourcesByModules(const TSet<TString>& modules) const {
Expand All @@ -130,6 +223,11 @@ TSet<TResourceInfo::TPtr> TUdfIndex::FindResourcesByModules(const TSet<TString>&
void TUdfIndex::UnregisterResource(TResourceInfo::TPtr resource) {
for (auto& m : resource->Modules) {
Resources_.erase(m);
auto& names = ICaseModules_[to_lower(m)];
names.erase(m);
if (names.empty()) {
ICaseModules_.erase(to_lower(m));
}
}
// resource pointer should be alive here to avoid problems with erase
}
Expand Down Expand Up @@ -170,11 +268,12 @@ void TUdfIndex::RegisterResource(const TResourceInfo::TPtr& resource, EOverrideM

for (auto& m : resource->Modules) {
Resources_.emplace(m, resource);
ICaseModules_[to_lower(m)].insert(m);
}
}

TIntrusivePtr<TUdfIndex> TUdfIndex::Clone() const {
return new TUdfIndex(Resources_);
return new TUdfIndex(Resources_, CaseSensitive_);
}

void TUdfIndex::RegisterResources(const TVector<TResourceInfo::TPtr>& resources, EOverrideMode mode) {
Expand Down
19 changes: 16 additions & 3 deletions ydb/library/yql/core/yql_udf_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ struct TResourceInfo : public TThrRefBase {
TDownloadLink Link;
TSet<TString> Modules;
TMap<TString, TFunctionInfo> Functions;
TMap<TString, TSet<TString>> ICaseFuncNames;

void SetFunctions(const TVector<TFunctionInfo>& functions) {
for (auto& f : functions) {
Functions.emplace(f.Name, f);
ICaseFuncNames[to_lower(f.Name)].insert(f.Name);
}
}
};
Expand All @@ -96,12 +98,21 @@ class TUdfIndex : public TThrRefBase {
RaiseError
};

enum class EStatus {
Found,
NotFound,
Ambigious
};

public:
TUdfIndex();
bool ContainsModule(const TString& moduleName) const;
bool FindFunction(const TString& moduleName, const TString& functionName, TFunctionInfo& function) const;
void SetCaseSentiveSearch(bool caseSensitive);
bool CanonizeModule(TString& moduleName) const;
EStatus ContainsModule(const TString& moduleName) const;
EStatus FindFunction(const TString& moduleName, const TString& functionName, TFunctionInfo& function) const;
TResourceInfo::TPtr FindResourceByModule(const TString& moduleName) const;

bool ContainsModuleStrict(const TString& moduleName) const;
/*
New resource can contain already registered module.
In this case 'mode' will be used to resolve conflicts.
Expand All @@ -114,7 +125,7 @@ class TUdfIndex : public TThrRefBase {
TIntrusivePtr<TUdfIndex> Clone() const;

private:
explicit TUdfIndex(const TMap<TString, TResourceInfo::TPtr>& resources);
explicit TUdfIndex(const TMap<TString, TResourceInfo::TPtr>& resources, bool caseSensitive);

bool ContainsAnyModule(const TSet<TString>& modules) const;
TSet<TResourceInfo::TPtr> FindResourcesByModules(const TSet<TString>& modules) const;
Expand All @@ -123,6 +134,8 @@ class TUdfIndex : public TThrRefBase {
private:
// module => Resource
TMap<TString, TResourceInfo::TPtr> Resources_;
bool CaseSensitive_ = true;
TMap<TString, TSet<TString>> ICaseModules_;
};

void LoadRichMetadataToUdfIndex(const IUdfResolver& resolver, const TVector<TString>& paths, bool isTrusted, TUdfIndex::EOverrideMode mode, TUdfIndex& registry);
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/yql_udf_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class IUdfResolver : public TThrRefBase {
THashMap<TString, TString> SecureParams;

// output
TString NormalizedName;
const TTypeAnnotationNode* NormalizedUserType = nullptr;
const TTypeAnnotationNode* RunConfigType = nullptr;
const TTypeAnnotationNode* CallableType = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ class TOutProcUdfResolver : public IUdfResolver {
ctx.AddError(TIssue(udf->Pos, udfRes.GetError()));
hasErrors = true;
} else {
udf->NormalizedName = udf->Name;
udf->CallableType = ParseTypeFromYson(TStringBuf{udfRes.GetCallableType()}, ctx, udf->Pos);
if (!udf->CallableType) {
hasErrors = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ bool LoadFunctionsMetadata(const TVector<IUdfResolver::TFunction*>& functions,
continue;
}

udf.NormalizedName = udf.Name;
udf.CallableType = ConvertMiniKQLType(udf.Pos, funcInfo.FunctionType, ctx);
YQL_ENSURE(udf.CallableType);
if (funcInfo.RunConfigType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class TUdfResolverWithIndex : public IUdfResolver {
TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override {
with_lock(Lock_) {
TString moduleNameStr(moduleName);
if (!UdfIndex_->ContainsModule(moduleNameStr)) {
if (!UdfIndex_->ContainsModuleStrict(moduleNameStr)) {
return Nothing();
}

Expand Down Expand Up @@ -115,7 +115,7 @@ class TUdfResolverWithIndex : public IUdfResolver {

bool ContainsModule(const TStringBuf& moduleName) const override {
TString moduleNameStr = TString(moduleName);
if (UdfIndex_->ContainsModule(moduleNameStr)) {
if (UdfIndex_->ContainsModuleStrict(moduleNameStr)) {
return true;
}

Expand All @@ -142,17 +142,29 @@ class TUdfResolverWithIndex : public IUdfResolver {
*/

TString moduleNameStr = TString(moduleName);
if (!UdfIndex_->ContainsModule(moduleNameStr)) {
auto moduleStatus = UdfIndex_->ContainsModule(moduleNameStr);
if (moduleStatus == TUdfIndex::EStatus::NotFound) {
fallbackFunction = &function;
return true;
}

if (moduleStatus == TUdfIndex::EStatus::Ambigious) {
ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Ambigious module name: " << moduleName));
return false;
}

TFunctionInfo info;
if (!UdfIndex_->FindFunction(moduleNameStr, function.Name, info)) {
auto functionStatus = UdfIndex_->FindFunction(moduleNameStr, function.Name, info);
if (functionStatus == TUdfIndex::EStatus::NotFound) {
ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Function not found: " << function.Name));
return false;
}

if (functionStatus == TUdfIndex::EStatus::Ambigious) {
ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Ambigious function: " << function.Name));
return false;
}

TResourceFile::TPtr file = DownloadFileWithModule(moduleName, function.Pos, ctx);
if (!file) {
return false;
Expand All @@ -161,6 +173,7 @@ class TUdfResolverWithIndex : public IUdfResolver {
additionalImport = &file->Import_;

if (info.IsTypeAwareness) {
function.Name = info.Name;
fallbackFunction = &function;
return true;
}
Expand All @@ -170,6 +183,7 @@ class TUdfResolverWithIndex : public IUdfResolver {
return false;
}

function.NormalizedName = info.Name;
function.CallableType = ParseTypeFromYson(TStringBuf{info.CallableType}, ctx, function.Pos);
if (!function.CallableType) {
ctx.AddError(TIssue(function.Pos, TStringBuilder() << "Failed to build callable type from YSON for function " << function.Name));
Expand Down Expand Up @@ -205,26 +219,29 @@ class TUdfResolverWithIndex : public IUdfResolver {
TResourceFile::TPtr DownloadFileWithModule(const TStringBuf& module) const {
TString moduleName(module);

const auto it = DownloadedFiles_.find(module);
if (it != DownloadedFiles_.end()) {
return it->second;
}

auto resource = UdfIndex_->FindResourceByModule(moduleName);
if (!resource) {
ythrow yexception() << "No resource has been found for registered module " << moduleName;
}

auto canonizedModuleName = moduleName;
Y_ENSURE(UdfIndex_->CanonizeModule(canonizedModuleName));

const auto it = DownloadedFiles_.find(canonizedModuleName);
if (it != DownloadedFiles_.end()) {
return it->second;
}

// token is empty for urls for now
// assumption: file path is frozen already, no need to put into file storage
const TDownloadLink& downloadLink = resource->Link;
TFileLinkPtr link = downloadLink.IsUrl ? FileStorage_->PutUrl(downloadLink.Path, {}) : CreateFakeFileLink(downloadLink.Path, downloadLink.Md5);
TResourceFile::TPtr file = TResourceFile::Create(moduleName, resource->Modules, link);
TResourceFile::TPtr file = TResourceFile::Create(canonizedModuleName, resource->Modules, link);
for (auto& d : resource->Modules) {
auto p = DownloadedFiles_.emplace(d, file);
if (!p.second) {
// should not happen because UdfIndex handles conflicts
ythrow yexception() << "file already downloaded for module " << moduleName << ", conflicting path " << downloadLink.Path << ", existing local file " << p.first->second->Link_->GetPath();
ythrow yexception() << "file already downloaded for module " << canonizedModuleName << ", conflicting path " << downloadLink.Path << ", existing local file " << p.first->second->Link_->GetPath();
}
}

Expand Down
Loading

0 comments on commit fb1c9fe

Please sign in to comment.