Skip to content

Commit

Permalink
example hook: dataset metadata validation (#7752)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozkatz authored May 20, 2024
1 parent aa9784d commit 9564812
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 6 deletions.
20 changes: 18 additions & 2 deletions docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,22 @@ gs.write_fuse_symlink(source, destination, mount_info)
-- Symlink: "/home/user/gcs-mount/exported/path/to/object" -> "/home/user/gcs-mount/lakefs/data/abc/def"
```

### `hook`

A set of utilities to aide in writing user friendly hooks.

### `hook/fail(message)`

Will abort the current hook's execution with the given message. This is similar to using `error()`, but is typically used to separate
generic runtime errors (an API call that returned an unexpected response) and explict failure of the calling hook.

When called, errors will appear without a stacktrace, and the error message will be directly the one given as `message`.

```lua
> hook = require("hook")
> hook.fail("this hook shall not pass because of: " .. reason)
```

### `lakefs`

The Lua Hook library allows calling back to the lakeFS API using the identity of the user that triggered the action.
Expand Down Expand Up @@ -811,8 +827,8 @@ Returns a table for the given path string with the following structure:
Receives a variable number of strings and returns a joined string that represents a path:

```lua
> require("path")
> path.join("path/", "to", "a", "file.data")
> path = require("path")
> path.join("/", "path/", "to", "a", "file.data")
path/o/a/file.data
```

Expand Down
145 changes: 145 additions & 0 deletions examples/hooks/dataset_validator.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
--[[
Validate the existence of mandatory metadata describing a dataset.
A metadata file should exist either in the same directory as the modified dataset, or in any parent directory.
The closest metadata file would take precedence (i.e. same folder > parent > 2nd parent).
# Example hook definition (_lakefs_actions/validate_dataset_fields.yaml):
name: Validate Dataset Fields
description: Validate the existence of mandatory metadata describing a dataset.
on:
pre-merge:
branches:
- main
hooks:
- id: validate_datasets
type: lua
properties:
script_path: scripts/dataset_validator.lua
args:
prefix: 'datasets/'
metadata_file_name: dataset_metadata.yaml
fields:
- name: contains_pii
required: true
type: boolean
- name: approval_link
required: true
type: string
match_pattern: 'https?:\/\/.*'
- name: rank
required: true
type: number
- name: department
type: string
choices: ['hr', 'it', 'other']
]]

path = require("path")
regexp = require("regexp")
yaml = require("encoding/yaml")

lakefs = require("lakefs")
hook = require("hook")

function is_a_valid_choice(choices, value)
for _, c in ipairs(choices) do
if c == value then
return true
end
end
return false
end

function check_field(field_descriptor, value, filename)
-- check required but missing
if value == nil and field_descriptor.required then
hook.fail(filename .. ": field '" .. field_descriptor.name .. "' is required but no value given")
end
-- check type is correct
if field_descriptor.type ~= nil and type(value) ~= field_descriptor.type then
hook.fail(filename .. ": field '" .. field_descriptor.name .. "' should be of type " .. field_descriptor.type)
end
-- check choices
if field_descriptor.choices ~= nil and not is_a_valid_choice(field_descriptor.choices, value) then
hook.fail(filename .. ": field '" .. field_descriptor.name .. "' should be one of '" .. table.concat(field_descriptor.choices, ", ") .. "'")
end
-- check pattern
if field_descriptor.match_pattern ~= nil then
if value ~= nil and type(value) ~= "string" then
hook.fail(filename .. ": field " .. field_descriptor.name .. " should be text (got '" .. type(value) .. "') and match pattern '" .. field_descriptor.match_pattern .. "'")
elseif value ~= nil and not regexp.match(field_descriptor.match_pattern, value) then
hook.fail(filename .. ": field " .. field_descriptor.name .. " should match pattern '" .. field_descriptor.match_pattern .. "'")
end
end
end


-- main flow
after = ""
has_more = true
metadata_files = {}
while has_more do
local code, resp = lakefs.diff_refs(action.repository_id, action.branch_id, action.source_ref, after, args.prefix)
if code ~= 200 then
error("could not diff: " .. resp.message)
end
for _, result in pairs(resp.results) do
print("" .. result.type .. " " .. result.path)
if result.type == "added" then
should_check = true
valid = true
has_parent = true
current = result.path
descriptor_for_file = ""

-- find nearest metadata file
while has_parent do
parsed = path.parse(current)
if not parsed.parent or parsed.parent == "" then
has_parent = false
break
end
current_descriptor = path.join("/", parsed.parent, args.metadata_file_name)
-- check if this descriptor has already been cached
if metadata_files[current_descriptor] then
-- cache hit
descriptor_for_file = metadata_files[current_descriptor]
break

elseif metadata_files[current_descriptor] == nil then
-- cache miss
-- attempt to fetch it
code, body = lakefs.get_object(action.repository_id, action.source_ref, current_descriptor)
if code == 200 then
metadata_files[current_descriptor] = yaml.unmarshal(body)
descriptor_for_file = current_descriptor
break
elseif code ~= 404 then
error("failed to look up metadata file: '" .. current_descriptor .. "', HTTP " .. tostring(code))
else
-- indicates this doesn't exist, no need to look it up again
metadata_files[current_descriptor] = false
end
end

current = parsed.parent
end

-- check if we found a descriptor
if descriptor_for_file == "" then
hook.fail("No dataset metadata found for file: " .. result.path)
end
end
end
-- pagination
has_more = resp.pagination.has_more
after = resp.pagination.next_offset
end

-- now let's review all the metadata files for this commit:
for metadata_filename, metadata_file in pairs(metadata_files) do
for _, field_descriptor in ipairs(args.fields) do
check_field(field_descriptor, metadata_file[field_descriptor.name], metadata_filename)
end
end
14 changes: 13 additions & 1 deletion pkg/actions/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

"github.com/Shopify/go-lua"
"github.com/spf13/viper"

lualibs "github.com/treeverse/lakefs/pkg/actions/lua"
"github.com/treeverse/lakefs/pkg/actions/lua/hook"
"github.com/treeverse/lakefs/pkg/actions/lua/lakefs"
luautil "github.com/treeverse/lakefs/pkg/actions/lua/util"
"github.com/treeverse/lakefs/pkg/api/apiutil"
Expand Down Expand Up @@ -151,11 +153,21 @@ func (h *LuaHook) Run(ctx context.Context, record graveler.HookRecord, buf *byte
}

func LuaRun(l *lua.State, code, name string) error {
l.Global("debug")
l.Field(-1, "traceback")
traceback := l.Top()
var mode string
if err := lua.LoadBuffer(l, code, name, mode); err != nil {
v, ok := l.ToString(l.Top())
if ok {
err = fmt.Errorf("%w: %s", err, v)
}
return err
}
return l.ProtectedCall(0, lua.MultipleReturns, 0)
if err := l.ProtectedCall(0, lua.MultipleReturns, traceback); err != nil {
return hook.Unwrap(err)
}
return nil
}

func (h *LuaHook) collectMetrics(l *lua.State) {
Expand Down
49 changes: 49 additions & 0 deletions pkg/actions/lua/hook/lib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package hook

import (
"strings"

"github.com/Shopify/go-lua"
)

// helpers for writing lua actions

// ErrHookFailure indicates an explicit failure from a hook
// (as opposed to a generic error that occurred during execution)
type ErrHookFailure string

func (e ErrHookFailure) Error() string {
return string(e)
}

func Open(l *lua.State) {
open := func(l *lua.State) int {
lua.NewLibrary(l, library)
return 1
}
lua.Require(l, "hook", open, false)
l.Pop(1)
}

var library = []lua.RegistryFunction{
{Name: "fail", Function: fail},
}

func fail(l *lua.State) int {
p := lua.CheckString(l, 1)
lua.Errorf(l, "<HookFailure>%s</HookFailure>", p)
panic("unreachable")
}

func Unwrap(err error) error {
switch err.(type) {
case lua.RuntimeError, *lua.RuntimeError:
str := err.Error()
_, after, found := strings.Cut(str, "<HookFailure>")
if found {
before, _, _ := strings.Cut(after, "</HookFailure>")
return ErrHookFailure(before)
}
}
return err
}
75 changes: 75 additions & 0 deletions pkg/actions/lua/hook/lib_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package hook_test

import (
"testing"

"github.com/treeverse/lakefs/pkg/actions/lua/hook"

"github.com/Shopify/go-lua"
)

const scriptWithExplicitFailure = `
hook = require("hook")
hook.fail("this hook shall not pass")
`

const scriptWithExplicitError = `
error("oh no")
`

const scriptWithSyntaxError = `
local a = 15
a += "a"
`

func TestUnwrap(t *testing.T) {
t.Run("explicit fail", func(t *testing.T) {
l := lua.NewState()
lua.OpenLibraries(l)
hook.Open(l)
err := lua.DoString(l, scriptWithExplicitFailure)
if err == nil {
t.Error("expected error but got none!")
}
before := err
after := hook.Unwrap(before)
if after.Error() != "this hook shall not pass" {
t.Errorf("could not unwrap lua hook error, got %s", after.Error())
}
})
t.Run("regular error", func(t *testing.T) {
l := lua.NewState()
lua.OpenLibraries(l)
hook.Open(l)
err := lua.DoString(l, scriptWithExplicitError)
if err == nil {
t.Error("expected error but got none!")
}
before := err
after := hook.Unwrap(err)
if after.Error() != before.Error() {
t.Error("unwrapping things not returned by hook.fail should not change the error")
}
})
t.Run("syntax error", func(t *testing.T) {
l := lua.NewState()
lua.OpenLibraries(l)
hook.Open(l)
err := lua.DoString(l, scriptWithSyntaxError)
if err == nil {
t.Error("expected error but got none!")
}
before := err
after := hook.Unwrap(err)
if after.Error() != before.Error() {
t.Error("unwrapping things not returned by hook.fail should not change the error")
}
})
t.Run("nil error", func(t *testing.T) {
after := hook.Unwrap(nil)
if after != nil {
t.Error("unwrapping nil should return nil")
}
})
}
3 changes: 3 additions & 0 deletions pkg/actions/lua/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/Shopify/go-lua"

"github.com/treeverse/lakefs/pkg/actions/lua/crypto/aes"
"github.com/treeverse/lakefs/pkg/actions/lua/crypto/hmac"
"github.com/treeverse/lakefs/pkg/actions/lua/crypto/sha256"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/treeverse/lakefs/pkg/actions/lua/encoding/parquet"
"github.com/treeverse/lakefs/pkg/actions/lua/encoding/yaml"
"github.com/treeverse/lakefs/pkg/actions/lua/formats"
"github.com/treeverse/lakefs/pkg/actions/lua/hook"
"github.com/treeverse/lakefs/pkg/actions/lua/net/http"
"github.com/treeverse/lakefs/pkg/actions/lua/net/url"
"github.com/treeverse/lakefs/pkg/actions/lua/path"
Expand Down Expand Up @@ -45,6 +47,7 @@ func Open(l *lua.State, ctx context.Context, cfg OpenSafeConfig) {
aes.Open(l)
parquet.Open(l)
path.Open(l)
hook.Open(l)
aws.Open(l, ctx)
gcloud.Open(l, ctx)
azure.Open(l, ctx)
Expand Down
5 changes: 5 additions & 0 deletions pkg/actions/lua/path/path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func TestParse(t *testing.T) {
ExpectedBasename: "bar",
ExpectedParent: "",
},
{
Input: "bar/",
ExpectedBasename: "bar",
ExpectedParent: "",
},
{
Input: "/bar",
ExpectedBasename: "bar",
Expand Down
10 changes: 8 additions & 2 deletions pkg/actions/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"github.com/treeverse/lakefs/pkg/actions/lua/hook"

"github.com/antonmedv/expr"
"github.com/hashicorp/go-multierror"
"github.com/treeverse/lakefs/pkg/auth"
Expand Down Expand Up @@ -374,8 +376,12 @@ func (s *StoreService) runTasks(ctx context.Context, record graveler.HookRecord,
if task.Err != nil {
_, _ = fmt.Fprintf(&buf, "Error: %s\n", task.Err)
// wrap error with more information
task.Err = fmt.Errorf("hook run id '%s' failed on action '%s' hook '%s': %w",
task.HookRunID, task.Action.Name, task.HookID, task.Err)
if _, ok := task.Err.(hook.ErrHookFailure); ok {
task.Err = fmt.Errorf("%s: %w", task.HookID, task.Err)
} else {
task.Err = fmt.Errorf("hook run id '%s' failed on action '%s' hook '%s': %w",
task.HookRunID, task.Action.Name, task.HookID, task.Err)
}
}

err := hookOutputWriter.OutputWrite(ctx, &buf, int64(buf.Len()))
Expand Down
Loading

0 comments on commit 9564812

Please sign in to comment.