diff --git a/hack/ccp/Dockerfile b/hack/ccp/Dockerfile index 836c87674..fcba8ec4a 100644 --- a/hack/ccp/Dockerfile +++ b/hack/ccp/Dockerfile @@ -1,6 +1,6 @@ # syntax = docker/dockerfile:1.4 -FROM golang:1.22.2-alpine AS build +FROM golang:1.22.3-alpine AS build ARG VERSION_PATH=github.com/artefactual/archivematica/hack/ccp/internal/version ARG VERSION_NUMBER ARG VERSION_GIT_COMMIT diff --git a/hack/ccp/go.mod b/hack/ccp/go.mod index 21e54ba9a..e7ba7e598 100644 --- a/hack/ccp/go.mod +++ b/hack/ccp/go.mod @@ -14,12 +14,14 @@ require ( github.com/go-sql-driver/mysql v1.8.1 github.com/gohugoio/hugo v0.125.7 github.com/google/uuid v1.6.0 + github.com/hashicorp/go-retryablehttp v0.7.6 github.com/mikespook/gearman-go v0.0.0-20220520031403-2a518e866145 github.com/peterbourgon/ff/v3 v3.4.0 github.com/rs/cors v1.11.0 github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a github.com/testcontainers/testcontainers-go v0.30.0 github.com/testcontainers/testcontainers-go/modules/mysql v0.30.0 + go.artefactual.dev/ssclient v0.1.0 go.artefactual.dev/tools v0.10.0 go.starlark.net v0.0.0-20240411212711-9b43f0afd521 go.uber.org/goleak v1.3.0 @@ -41,10 +43,12 @@ require ( github.com/bep/godartsass/v2 v2.0.0 // indirect github.com/bep/golibsass v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cjlapao/common-go v0.0.39 // indirect github.com/cli/safeexec v1.0.1 // indirect github.com/containerd/containerd v1.7.12 // indirect github.com/containerd/log v0.1.0 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/docker v25.0.5+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect @@ -57,10 +61,17 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/klauspost/compress v1.16.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/microsoft/kiota-abstractions-go v1.6.0 // indirect + github.com/microsoft/kiota-http-go v1.3.3 // indirect + github.com/microsoft/kiota-serialization-form-go v1.0.0 // indirect + github.com/microsoft/kiota-serialization-json-go v1.0.7 // indirect + github.com/microsoft/kiota-serialization-multipart-go v1.0.0 // indirect + github.com/microsoft/kiota-serialization-text-go v1.0.0 // indirect github.com/mitchellh/hashstructure v1.1.0 // indirect github.com/moby/patternmatcher v0.6.0 // indirect github.com/moby/sys/sequential v0.5.0 // indirect @@ -72,12 +83,15 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/shirou/gopsutil/v3 v3.23.12 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect + github.com/std-uritemplate/std-uritemplate/go v0.0.55 // indirect + github.com/stretchr/testify v1.9.0 // indirect github.com/tdewolff/parse/v2 v2.7.13 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect @@ -94,4 +108,5 @@ require ( golang.org/x/tools v0.20.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240311173647-c811ad7063a7 // indirect google.golang.org/grpc v1.62.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/hack/ccp/go.sum b/hack/ccp/go.sum index 51cea2ef8..f3f8c7c6e 100644 --- a/hack/ccp/go.sum +++ b/hack/ccp/go.sum @@ -55,6 +55,8 @@ github.com/bep/tmc v0.5.1/go.mod h1:tGYHN8fS85aJPhDLgXETVKp+PR382OvFi2+q2GkGsq0= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cjlapao/common-go v0.0.39 h1:bAAUrj2B9v0kMzbAOhzjSmiyDy+rd56r2sy7oEiQLlA= +github.com/cjlapao/common-go v0.0.39/go.mod h1:M3dzazLjTjEtZJbbxoA5ZDiGCiHmpwqW9l4UWaddwOA= github.com/clbanning/mxj/v2 v2.7.0 h1:WA/La7UGCanFe5NpHF0Q3DNtnCsVoxbPKuyBNHWRyME= github.com/clbanning/mxj/v2 v2.7.0/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s= github.com/cli/safeexec v1.0.0/go.mod h1:Z/D4tTN8Vs5gXYHDCbaM1S/anmEDnJb1iW0+EJ5zx3Q= @@ -170,6 +172,12 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0Q github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= github.com/hairyhenderson/go-codeowners v0.4.0 h1:Wx/tRXb07sCyHeC8mXfio710Iu35uAy5KYiBdLHdv4Q= github.com/hairyhenderson/go-codeowners v0.4.0/go.mod h1:iJgZeCt+W/GzXo5uchFCqvVHZY2T4TAIpvuVlKVkLxc= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= +github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= +github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-retryablehttp v0.7.6 h1:TwRYfx2z2C4cLbXmT8I5PgP/xmuqASDyiVuGYfs9GZM= +github.com/hashicorp/go-retryablehttp v0.7.6/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/invopop/yaml v0.2.0 h1:7zky/qH+O0DwAyoobXUqvVBwgBFRxKoQ/3FjcVpjTMY= @@ -211,6 +219,18 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/microsoft/kiota-abstractions-go v1.6.0 h1:qbGBNMU0/o5myKbikCBXJFohVCFrrpx2cO15Rta2WyA= +github.com/microsoft/kiota-abstractions-go v1.6.0/go.mod h1:7YH20ZbRWXGfHSSvdHkdztzgCB9mRdtFx13+hrYIEpo= +github.com/microsoft/kiota-http-go v1.3.3 h1:FKjK5BLFONu5eIBxtrkirkixFQmcPwitZ8iwZHKbESo= +github.com/microsoft/kiota-http-go v1.3.3/go.mod h1:IWw/PwtBs/GYz+Pa75gPW7MFNFv0aKPFsLw5WqzL1SE= +github.com/microsoft/kiota-serialization-form-go v1.0.0 h1:UNdrkMnLFqUCccQZerKjblsyVgifS11b3WCx+eFEsAI= +github.com/microsoft/kiota-serialization-form-go v1.0.0/go.mod h1:h4mQOO6KVTNciMF6azi1J9QB19ujSw3ULKcSNyXXOMA= +github.com/microsoft/kiota-serialization-json-go v1.0.7 h1:yMbckSTPrjZdM4EMXgzLZSA3CtDaUBI350u0VoYRz7Y= +github.com/microsoft/kiota-serialization-json-go v1.0.7/go.mod h1:1krrY7DYl3ivPIzl4xTaBpew6akYNa8/Tal8g+kb0cc= +github.com/microsoft/kiota-serialization-multipart-go v1.0.0 h1:3O5sb5Zj+moLBiJympbXNaeV07K0d46IfuEd5v9+pBs= +github.com/microsoft/kiota-serialization-multipart-go v1.0.0/go.mod h1:yauLeBTpANk4L03XD985akNysG24SnRJGaveZf+p4so= +github.com/microsoft/kiota-serialization-text-go v1.0.0 h1:XOaRhAXy+g8ZVpcq7x7a0jlETWnWrEum0RhmbYrTFnA= +github.com/microsoft/kiota-serialization-text-go v1.0.0/go.mod h1:sM1/C6ecnQ7IquQOGUrUldaO5wj+9+v7G2W3sQ3fy6M= github.com/mikespook/gearman-go v0.0.0-20220520031403-2a518e866145 h1:6kTCi6p3Hd6JYROnq+1UOdewoXj90zKKDQPlsHYTSEs= github.com/mikespook/gearman-go v0.0.0-20220520031403-2a518e866145/go.mod h1:77Th6O6AZfMU6i5hLJnjN5xxUBoio7LN0aOyxGhqV1U= github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0= @@ -276,6 +296,8 @@ github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8= github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY= github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= +github.com/std-uritemplate/std-uritemplate/go v0.0.55 h1:muSH037g97K7U2f94G9LUuE8tZlJsoSSrPsO9V281WY= +github.com/std-uritemplate/std-uritemplate/go v0.0.55/go.mod h1:rG/bqh/ThY4xE5de7Rap3vaDkYUT76B0GPJ0loYeTTc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -311,6 +333,8 @@ github.com/yuin/goldmark-emoji v1.0.2 h1:c/RgTShNgHTtc6xdz2KKI74jJr6rWi7FPgnP9GA github.com/yuin/goldmark-emoji v1.0.2/go.mod h1:RhP/RWpexdp+KHs7ghKnifRoIs/Bq4nDS7tRbCkOwKY= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.artefactual.dev/ssclient v0.1.0 h1:6fDwCOia9DFI8bZ2lZlCMSuoctwFY7P7XiMbJLmJB9E= +go.artefactual.dev/ssclient v0.1.0/go.mod h1:ImaAHtgGIbKlnrOUzczBMmltNVbhYkKZ7ujUjfBtUj8= go.artefactual.dev/tools v0.10.0 h1:+LeZS5oHupAQBXvLQ4aGIuZyqf7zCpD7s3UpyDl9zn4= go.artefactual.dev/tools v0.10.0/go.mod h1:PIy0RtC45gC4sASb4r26g0aCU24kSWIp+mcV1p2gtpY= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= @@ -435,6 +459,8 @@ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGm google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/hack/ccp/internal/api/admin/admin.go b/hack/ccp/internal/api/admin/admin.go index 534759535..12d997d89 100644 --- a/hack/ccp/internal/api/admin/admin.go +++ b/hack/ccp/internal/api/admin/admin.go @@ -2,6 +2,7 @@ package admin import ( "context" + "errors" "net" "net/http" "time" @@ -90,10 +91,47 @@ func (s *Server) Run() error { return nil } +// validateCreatePackageRequets validates the request. +// +// TODO: use https://github.com/bufbuild/protovalidate. +func validateCreatePackageRequest(msg *adminv1.CreatePackageRequest) error { + if msg.Name == "" { + return errors.New("name is empty") + } + + hasPaths := false + for _, item := range msg.Path { + if len(item) > 0 { + hasPaths = true + break + } + } + if !hasPaths { + return errors.New("path is empty") + } + + if msg.Type == adminv1.TransferType_TRANSFER_TYPE_UNSPECIFIED { + return errors.New("type is unspecified") + } + + return nil +} + func (s *Server) CreatePackage(ctx context.Context, req *connect.Request[adminv1.CreatePackageRequest]) (*connect.Response[adminv1.CreatePackageResponse], error) { - return connect.NewResponse(&adminv1.CreatePackageResponse{ - Id: uuid.New().String(), - }), nil + if err := validateCreatePackageRequest(req.Msg); err != nil { + return nil, connect.NewError(connect.CodeInvalidArgument, err) + } + + resp := &adminv1.CreatePackageResponse{} + + if pkg, err := s.ctrl.Submit(ctx, req.Msg); err != nil { + return nil, connect.NewError(connect.CodeUnknown, err) + } else { + s.logger.Info("TODO: return identifier", "pkg", pkg.Name()) + resp.Id = uuid.New().String() + } + + return connect.NewResponse(resp), nil } func (s *Server) ApproveTransfer(ctx context.Context, req *connect.Request[adminv1.ApproveTransferRequest]) (*connect.Response[adminv1.ApproveTransferResponse], error) { diff --git a/hack/ccp/internal/controller/controller.go b/hack/ccp/internal/controller/controller.go index c10882ada..45eaf24d6 100644 --- a/hack/ccp/internal/controller/controller.go +++ b/hack/ccp/internal/controller/controller.go @@ -9,10 +9,10 @@ import ( "time" "github.com/artefactual-labs/gearmin" - "github.com/fsnotify/fsnotify" "github.com/go-logr/logr" "golang.org/x/sync/errgroup" + adminv1 "github.com/artefactual/archivematica/hack/ccp/internal/api/gen/archivematica/ccp/admin/v1beta1" "github.com/artefactual/archivematica/hack/ccp/internal/store" "github.com/artefactual/archivematica/hack/ccp/internal/workflow" ) @@ -94,17 +94,46 @@ func (c *Controller) Run() error { return nil } -func (c *Controller) HandleWatchedDirEvents(evs []fsnotify.Event) { - for _, ev := range evs { - if ev.Op&fsnotify.Create == fsnotify.Create { - if err := c.handle(ev.Name); err != nil { - c.logger.Info("Failed to handle event.", "err", err, "path", ev.Name) - } - } +// Submit a new package to the queue. +func (c *Controller) Submit(ctx context.Context, req *adminv1.CreatePackageRequest) (*Package, error) { + // 1. Create Package (Transfer). + // transfer = models.Transfer.objects.create(**kwargs) + // if not processing_configuration_file_exists(processing_config): processing_config = "default" + // transfer.set_processing_configuration(processing_config) + // transfer.update_active_agent(user_id) + // 2. Create temporary directory inside sharedDir/tmp. + // tmpdir = mkdtemp(dir=os.path.join(_get_setting("SHARED_DIRECTORY"), "tmp")) + // 3. Identify starting point. + // starting_point = PACKAGE_TYPE_STARTING_POINTS.get(type_) + // 4. Start creation. + // params = (transfer, name, path, tmpdir, starting_point) + // if auto_approve: + // params = params + (workflow, package_queue) + // result = executor.submit(_start_package_transfer_with_auto_approval, *params) + // else: + // result = executor.submit(_start_package_transfer, *params) + // 5. Adjust permissions? + // result.add_done_callback(lambda f: os.chmod(tmpdir, 0o770)) + + pkg, err := NewTransferPackage(c.groupCtx, c.logger.WithName("package"), c.store, c.sharedDir, req) + if err != nil { + return nil, fmt.Errorf("create package: %v", err) } + + c.queue(pkg) + c.pick() // Start work right away, we don't want to wait for the next tick. + + return pkg, nil } -func (c *Controller) handle(path string) error { +// Notify the controller of a new with a slice of filesystem events. +func (c *Controller) Notify(path string) (err error) { + defer func() { + if err != nil { + err = fmt.Errorf("notify: %v", err) + } + }() + rel, err := filepath.Rel(c.watchedDir, path) if err != nil { return err @@ -113,36 +142,33 @@ func (c *Controller) handle(path string) error { dir, _ := filepath.Split(rel) dir = trim(dir) - var match bool - for _, wd := range c.wf.WatchedDirectories { - if trim(wd.Path) == dir { - match = true - c.logger.V(2).Info("Identified new package.", "path", path, "type", wd.UnitType) - c.queue(path, wd) - c.pick() + var wd *workflow.WatchedDirectory + for _, item := range c.wf.WatchedDirectories { + if trim(item.Path) == dir { + wd = item break } } - if !match { + if wd == nil { return fmt.Errorf("unmatched event") } - return nil -} - -func (c *Controller) queue(path string, wd *workflow.WatchedDirectory) { - ctx, cancel := context.WithTimeout(c.groupCtx, time.Minute) - defer cancel() + c.logger.V(2).Info("Identified new package.", "path", path, "type", wd.UnitType) logger := c.logger.WithName("package").WithValues("wd", wd.Path, "path", path) - p, err := NewPackage(ctx, logger, c.store, path, c.sharedDir, wd) - if err != nil { - logger.Error(err, "Failed to create new package.") - return + if pkg, err := NewPackage(c.groupCtx, logger, c.store, c.sharedDir, path, wd); err != nil { + return err + } else { + c.queue(pkg) + c.pick() // Start work right away, we don't want to wait for the next tick. } + return nil +} + +func (c *Controller) queue(pkg *Package) { c.mu.Lock() - c.queuedPackages = append(c.queuedPackages, p) + c.queuedPackages = append(c.queuedPackages, pkg) c.mu.Unlock() } diff --git a/hack/ccp/internal/controller/package.go b/hack/ccp/internal/controller/package.go index 93f58c0fa..075a87f95 100644 --- a/hack/ccp/internal/controller/package.go +++ b/hack/ccp/internal/controller/package.go @@ -16,6 +16,7 @@ import ( "github.com/go-logr/logr" "github.com/google/uuid" + adminv1 "github.com/artefactual/archivematica/hack/ccp/internal/api/gen/archivematica/ccp/admin/v1beta1" "github.com/artefactual/archivematica/hack/ccp/internal/python" "github.com/artefactual/archivematica/hack/ccp/internal/store" "github.com/artefactual/archivematica/hack/ccp/internal/store/enums" @@ -41,47 +42,75 @@ type Package struct { // Current path, populated by hydrate(). path string - // Whether the package was submitted as a directory. - isDir bool - - // Watched directory workflow document. + // Watched directory workflow document. Used by the iterator to discover + // the starting chain. watchedAt *workflow.WatchedDirectory // User decisinon manager decision decision } -func NewPackage(ctx context.Context, logger logr.Logger, store store.Store, path, sharedDir string, wd *workflow.WatchedDirectory) (*Package, error) { +func newPackage(logger logr.Logger, store store.Store, sharedDir string) *Package { + return &Package{ + logger: logger, + store: store, + sharedDir: joinPath(sharedDir, ""), + } +} + +// NewPackage creates a new package after a path observed in a watched directory. +func NewPackage(ctx context.Context, logger logr.Logger, store store.Store, sharedDir, path string, wd *workflow.WatchedDirectory) (*Package, error) { fi, err := os.Stat(path) if err != nil { return nil, fmt.Errorf("stat: %v", err) } + isDir := fi.IsDir() - p := &Package{ - logger: logger, - store: store, - path: path, - isDir: fi.IsDir(), - sharedDir: joinPath(sharedDir, ""), - watchedAt: wd, - } + pkg := newPackage(logger, store, sharedDir) + pkg.path = path + pkg.watchedAt = wd switch { case wd.UnitType == "Transfer": - p.unit = &Transfer{pkg: p} - case wd.UnitType == "SIP" && p.isDir: - p.unit = &SIP{pkg: p} - case wd.UnitType == "DIP" && p.isDir: - p.unit = &DIP{pkg: p} + pkg.unit = &Transfer{pkg: pkg} + case wd.UnitType == "SIP" && isDir: + pkg.unit = &SIP{pkg: pkg} + case wd.UnitType == "DIP" && isDir: + pkg.unit = &DIP{pkg: pkg} default: - return nil, fmt.Errorf("unexpected type given for file %q (dir: %t)", path, p.isDir) + return nil, fmt.Errorf("unexpected type given for file %q (dir: %t)", path, isDir) } - if err := p.hydrate(ctx, path, wd.Path); err != nil { + if err := pkg.hydrate(ctx, path, wd.Path); err != nil { + return nil, fmt.Errorf("hydrate: %v", err) + } + + return pkg, nil +} + +// NewTransferPackage creates a new package after an API request. +func NewTransferPackage(ctx context.Context, logger logr.Logger, store store.Store, sharedDir string, req *adminv1.CreatePackageRequest) (*Package, error) { + pkg := &Package{ + logger: logger, + store: store, + } + + pkg.unit = &Transfer{pkg: pkg} + + tmpDir, err := os.MkdirTemp(filepath.Join(sharedDir, "tmp"), "") + if err != nil { + return nil, err + } + + transferType := Transfers.WithName("standard") + + logger.Info("Here we are.", "tmpdir", tmpDir, "transferType", transferType) + + if err := pkg.hydrate(ctx, "", ""); err != nil { return nil, fmt.Errorf("hydrate: %v", err) } - return p, nil + return pkg, nil } // Path returns the real (no share dir vars) path to the package. diff --git a/hack/ccp/internal/controller/source.go b/hack/ccp/internal/controller/source.go new file mode 100644 index 000000000..ee4e1229d --- /dev/null +++ b/hack/ccp/internal/controller/source.go @@ -0,0 +1,155 @@ +package controller + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/artefactual/archivematica/hack/ccp/internal/derrors" + "github.com/artefactual/archivematica/hack/ccp/internal/ssclient" +) + +// This file contains functions that relate to the process of retrieving +// transfers from transfer source locations that may or may not be provided by +// Archivematica Storage Service. This is built after what we have in the +// `server.packages` Python module. + +// StartTransfer starts a transfer. +// +// The transfer is deposited into the internal processing directory and the +// workflow is triggered manually. +// +// This method does not rely on the activeTransfer watched directory. It does +// not prompt the user to accept the transfer because we go directly into the +// next chain link. +func StartTransfer(sharedDir, tmpDir, name, path string) error { + destRel, destAbs, src := determineTransferPaths(sharedDir, tmpDir, name, path) + fmt.Println(destRel, destAbs, src) + + copyFromTransferSources(nil) + + tsrc, tdst := "", "" + dst, err := moveToInternalSharedDir(sharedDir, tsrc, tdst) + if err != nil { + return err + } + + fmt.Println("The transfer is now in the internal processing directory!", dst) + + // TODO: update transfer.currentlocation in the database. + // TODO: schedule job chain. + + return nil +} + +// StartTransferWithWatchedDir starts a transfer using watched directories. +// +// This means copying the transfer into one of the standard watched dirs. +// MCPServer will continue the processing and prompt the user once the +// contents in the watched directory are detected by the watched directory +// observer. +// +// With this method of starting a transfer, the workflow requires user approval. +// This allows for adding metadata to the transfer before accepting it. +func StartTransferWithWatchedDir() { + panic("not implemented") + // _determine_transfer_paths + // _copy_from_transfer_sources + // _move_to_internal_shared_dir + // update transfer.currentlocation with the new destination +} + +func locationPath(locPath string) (id, path string) { + if before, after, found := strings.Cut(locPath, ":"); found { + id = before + path = after + } else { + id = before + } + + return id, path +} + +// determineTransferPaths +// +// name and path are part of the client transfer request. +func determineTransferPaths(sharedDir, tmpDir, name, path string) (string, string, string) { + archived := false + lpath := strings.ToLower(path) + if strings.HasSuffix(lpath, ".zip") || + strings.HasSuffix(lpath, ".tgz") || + strings.HasSuffix(lpath, ".tar.gz") { + archived = true + } + + var ( + transferDir string + destAbs string + ) + + if archived { + transferDir = tmpDir + _, p := locationPath(path) + destAbs = filepath.Join(tmpDir, filepath.Base(p)) + } else { + path = joinPath(path, "") // Copy contents of dir but not dir. + destAbs = filepath.Join(tmpDir, name) + transferDir = destAbs + } + + destRel := strings.Replace(transferDir, sharedDir, "", 1) + + return destRel, destAbs, path +} + +// moveToInternalSharedDir moves a transfer into an internal directory. +func moveToInternalSharedDir(sharedDir, path, dest string) (_ string, err error) { + defer derrors.Add(&err, "moveToInternalSharedDir(%s, %s, %s)", sharedDir, path, dest) + + // Validate path. + if path == "" { + return "", errors.New("no path provided") + } + if strings.Contains(path, "..") { + return "", errors.New("illegal path") + } + if _, err := os.Stat(path); os.IsNotExist(err) { + return "", errors.New("path does not exist") + } + + var ( + attempt = 0 + suggested = filepath.Join(dest, filepath.Base(path)) + newPath = suggested + ) + for { + if _, err := os.Stat(newPath); os.IsNotExist(err) { + if err := os.Rename(path, newPath); os.IsExist(err) { + goto incr // Magic! + } else if err != nil { + return "", err + } + + return newPath, nil // Success! + } + + incr: + attempt++ + if attempt > 1000 { + return "", fmt.Errorf("reached max. number of attempts: %d", attempt) + } + + ext := filepath.Ext(dest) + base := strings.TrimSuffix(suggested, ext) + newPath = fmt.Sprintf("%s_%d%s", base, attempt, ext) + } +} + +func copyFromTransferSources(c ssclient.Client) { + // - processing_location = storage_service.get_first_location(purpose="CP") + // - transfer_sources = storage_service.get_location(purpose="TS") + // - _default_transfer_source_location_uuid + // - storage_service.copy_files(location, processing_location, files) +} diff --git a/hack/ccp/internal/controller/source_test.go b/hack/ccp/internal/controller/source_test.go new file mode 100644 index 000000000..ff04da6f3 --- /dev/null +++ b/hack/ccp/internal/controller/source_test.go @@ -0,0 +1,110 @@ +package controller + +import ( + "path/filepath" + "testing" + + "gotest.tools/v3/assert" + "gotest.tools/v3/fs" +) + +func TestDetermineTransferPaths(t *testing.T) { + t.Parallel() + + type args struct { + sharedDir, tmpDir, name, path string + } + + type want struct { + destRel, destAbs, src string + } + + type test struct { + args args + want want + } + + tests := []test{ + { + args{ + sharedDir: "/var/archivematica/sharedDirectory", + tmpDir: "/var/archivematica/sharedDirectory/tmp/tmp.12345", + name: "Name1", + path: "/var/source/transfer.tar.gz", + }, + want{ + destRel: "/tmp/tmp.12345", + destAbs: "/var/archivematica/sharedDirectory/tmp/tmp.12345", + src: "/var/source/transfer.tar.gz", + }, + }, + { + args{ + sharedDir: "/var/archivematica/sharedDirectory", + tmpDir: "/var/archivematica/sharedDirectory/tmp/tmp.12345", + name: "Name2", + path: "/var/source/transfer", + }, + want{ + destRel: "/tmp/tmp.12345/Name2", + destAbs: "/var/archivematica/sharedDirectory/tmp/tmp.12345/Name2", + src: "/var/source/transfer/", + }, + }, + { + args{ + sharedDir: "/var/archivematica/sharedDirectory", + tmpDir: "/var/archivematica/sharedDirectory/tmp/tmp.12345", + name: "NameWithLocation", + path: "cae8fe7a-0ad4-495f-abf5-9d3dbd71ba36:/var/source/transfer.tar.gz", + }, + want{ + destRel: "/tmp/tmp.12345", + destAbs: "/var/archivematica/sharedDirectory/tmp/tmp.12345/transfer.tar.gz", + src: "cae8fe7a-0ad4-495f-abf5-9d3dbd71ba36:/var/source/transfer.tar.gz", + }, + }, + } + for _, tc := range tests { + t.Run(tc.args.name, func(t *testing.T) { + t.Parallel() + + transferRel, filePath, path := determineTransferPaths( + tc.args.sharedDir, tc.args.tmpDir, tc.args.name, tc.args.path, + ) + + assert.Equal(t, transferRel, tc.want.destRel, "unexpected transferRel") + assert.Equal(t, filePath, tc.want.destAbs, "unexpected filePath") + assert.Equal(t, path, tc.want.src, "unexpected path") + }) + } +} + +func TestMoveToInternalSharedDir(t *testing.T) { + t.Parallel() + + tmpDir := fs.NewDir(t, "ccp", + fs.WithDir("source", + fs.WithDir("Images", + fs.WithFile("MARBLES.TGA", "contents"), + ), + ), + fs.WithDir("sharedDir", + fs.WithDir("deposits", + fs.WithDir("Images"), + fs.WithDir("Images_1"), + fs.WithDir("Images_2"), + ), + ), + ) + + dest, err := moveToInternalSharedDir( + tmpDir.Join("sharedDir"), + tmpDir.Join("source", "Images"), + tmpDir.Join("sharedDir", "deposits"), + ) + + assert.NilError(t, err) + assert.Equal(t, dest, tmpDir.Join("sharedDir", "deposits", filepath.Base(dest))) + assert.Assert(t, fs.Equal(dest, fs.Expected(t, fs.WithFile("MARBLES.TGA", "contents"), fs.MatchAnyFileMode))) +} diff --git a/hack/ccp/internal/controller/transfer.go b/hack/ccp/internal/controller/transfer.go index 98db276c4..2555dfde2 100644 --- a/hack/ccp/internal/controller/transfer.go +++ b/hack/ccp/internal/controller/transfer.go @@ -2,9 +2,17 @@ package controller import ( "github.com/google/uuid" + + adminv1 "github.com/artefactual/archivematica/hack/ccp/internal/api/gen/archivematica/ccp/admin/v1beta1" ) type TransferType struct { + // Name of the transfer type. + Name string + + // Type in the transfer type enum. + Type adminv1.TransferType + // WatcheDir is the watched directory used to trigger this type of transfer. WatchedDir string @@ -33,9 +41,31 @@ func (t TransferTypes) Decide(linkID uuid.UUID) uuid.UUID { return uuid.Nil } +func (t TransferTypes) WithName(name string) *TransferType { + for _, item := range t { + if item.Name == name { + return &item + } + } + + return nil +} + +func (t TransferTypes) WithType(tt adminv1.TransferType) *TransferType { + for _, item := range t { + if item.Type == tt { + return &item + } + } + + return nil +} + // List of transfer types supported by Archivematica. var Transfers TransferTypes = []TransferType{ { + Name: "standard", + Type: adminv1.TransferType_TRANSFER_TYPE_STANDARD, WatchedDir: "activeTransfers/standardTransfer", Chain: uuid.MustParse("6953950b-c101-4f4c-a0c3-0cd0684afe5e"), Link: uuid.MustParse("045c43ae-d6cf-44f7-97d6-c8a602748565"), @@ -43,36 +73,50 @@ var Transfers TransferTypes = []TransferType{ Decision: uuid.MustParse("b4567e89-9fea-4256-99f5-a88987026488"), }, { + Name: "zipfile", + Type: adminv1.TransferType_TRANSFER_TYPE_ZIP_FILE, WatchedDir: "activeTransfers/zippedDirectory", Chain: uuid.MustParse("f3caceff-5ad5-4bad-b98c-e73f8cd03450"), Link: uuid.MustParse("541f5994-73b0-45bb-9cb5-367c06a21be7"), }, { + Name: "unzipped bag", + Type: adminv1.TransferType_TRANSFER_TYPE_UNZIPPED_BAG, WatchedDir: "activeTransfers/baggitDirectory", Chain: uuid.MustParse("c75ef451-2040-4511-95ac-3baa0f019b48"), Link: uuid.MustParse("154dd501-a344-45a9-97e3-b30093da35f5"), }, { + Name: "zipped bag", + Type: adminv1.TransferType_TRANSFER_TYPE_ZIPPED_BAG, WatchedDir: "activeTransfers/baggitZippedDirectory", Chain: uuid.MustParse("167dc382-4ab1-4051-8e22-e7f1c1bf3e6f"), Link: uuid.MustParse("3229e01f-adf3-4294-85f7-4acb01b3fbcf"), }, { + Name: "dspace", + Type: adminv1.TransferType_TRANSFER_TYPE_DSPACE, WatchedDir: "activeTransfers/Dspace", Chain: uuid.MustParse("1cb2ef0e-afe8-45b5-8d8f-a1e120f06605"), Link: uuid.MustParse("bda96b35-48c7-44fc-9c9e-d7c5a05016c1"), }, { + Name: "maildir", + Type: adminv1.TransferType_TRANSFER_TYPE_MAILDIR, WatchedDir: "activeTransfers/maildir", Chain: uuid.MustParse("d381cf76-9313-415f-98a1-55c91e4d78e0"), Link: uuid.MustParse("da2d650e-8ce3-4b9a-ac97-8ca4744b019f"), }, { + Name: "TRIM", + Type: adminv1.TransferType_TRANSFER_TYPE_TRIM, WatchedDir: "activeTransfers/TRIM", Chain: uuid.MustParse("e4a59e3e-3dba-4eb5-9cf1-c1fb3ae61fa9"), Link: uuid.MustParse("2483c25a-ade8-4566-a259-c6c37350d0d6"), }, { + Name: "dataverse", + Type: adminv1.TransferType_TRANSFER_TYPE_DATAVERSE, WatchedDir: "activeTransfers/dataverseTransfer", Chain: uuid.MustParse("10c00bc8-8fc2-419f-b593-cf5518695186"), Link: uuid.MustParse("0af6b163-5455-4a76-978b-e35cc9ee445f"), diff --git a/hack/ccp/internal/derrors/derrors.go b/hack/ccp/internal/derrors/derrors.go new file mode 100644 index 000000000..692eea385 --- /dev/null +++ b/hack/ccp/internal/derrors/derrors.go @@ -0,0 +1,40 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package derrors defines internal error values to categorize the different +// types error semantics we support. +package derrors + +import "fmt" + +// Add adds context to the error. +// The result cannot be unwrapped to recover the original error. +// It does nothing when *errp == nil. +// +// Example: +// +// defer derrors.Add(&err, "copy(%s, %s)", src, dst) +// +// See Wrap for an equivalent function that allows +// the result to be unwrapped. +func Add(errp *error, format string, args ...any) { + if *errp != nil { + *errp = fmt.Errorf("%s: %v", fmt.Sprintf(format, args...), *errp) + } +} + +// Wrap adds context to the error and allows +// unwrapping the result to recover the original error. +// +// Example: +// +// defer derrors.Wrap(&err, "copy(%s, %s)", src, dst) +// +// See Add for an equivalent function that does not allow +// the result to be unwrapped. +func Wrap(errp *error, format string, args ...any) { + if *errp != nil { + *errp = fmt.Errorf("%s: %w", fmt.Sprintf(format, args...), *errp) + } +} diff --git a/hack/ccp/internal/servercmd/watcher.go b/hack/ccp/internal/servercmd/watcher.go index dd4ade190..97d0cab9a 100644 --- a/hack/ccp/internal/servercmd/watcher.go +++ b/hack/ccp/internal/servercmd/watcher.go @@ -6,14 +6,18 @@ import ( "path/filepath" "time" + "github.com/fsnotify/fsnotify" "github.com/go-logr/logr" "github.com/gohugoio/hugo/watcher" - "github.com/artefactual/archivematica/hack/ccp/internal/controller" "github.com/artefactual/archivematica/hack/ccp/internal/workflow" ) -func watch(logger logr.Logger, ctrl *controller.Controller, wf *workflow.Document, path string) (*watcher.Batcher, error) { +type observer interface { + Notify(path string) error +} + +func watch(logger logr.Logger, o observer, wf *workflow.Document, path string) (*watcher.Batcher, error) { w, err := watcher.New(500*time.Millisecond, 700*time.Millisecond, false) if err != nil { return nil, err @@ -45,7 +49,7 @@ func watch(logger logr.Logger, ctrl *controller.Controller, wf *workflow.Documen for { select { case evs := <-w.Events: - ctrl.HandleWatchedDirEvents(evs) + notify(logger, o, evs) case err := <-w.Errors(): if err != nil { logger.V(1).Info("Error while watching.", "err", err) @@ -57,3 +61,14 @@ func watch(logger logr.Logger, ctrl *controller.Controller, wf *workflow.Documen return w, nil } + +func notify(logger logr.Logger, o observer, evs []fsnotify.Event) { + for _, ev := range evs { + if ev.Op&fsnotify.Create != fsnotify.Create { + continue + } + if err := o.Notify(ev.Name); err != nil { + logger.Error(err, "Failed to notify controller with a new event.", "err", err, "path", ev.Name) + } + } +} diff --git a/hack/ccp/internal/ssclient/ssclient.go b/hack/ccp/internal/ssclient/ssclient.go new file mode 100644 index 000000000..9cf685672 --- /dev/null +++ b/hack/ccp/internal/ssclient/ssclient.go @@ -0,0 +1,131 @@ +package ssclient + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/hashicorp/go-retryablehttp" + ssclientlib "go.artefactual.dev/ssclient" + "go.artefactual.dev/ssclient/kiota" + + "github.com/artefactual/archivematica/hack/ccp/internal/store" +) + +type Pipeline struct { + ID uuid.UUID + URI string +} + +type Location struct { + ID uuid.UUID + Purpose string + Path string +} + +// Client wraps go.artefactual.dev/ssclient-go. +type Client interface { + ReadPipeline(ctx context.Context, id uuid.UUID) (*Pipeline, error) + ReadLocation(ctx context.Context, purpose string) ([]*Location, error) + ReadDefaultLocation(ctx context.Context, purpose string) (*Location, error) + CopyFiles(ctx context.Context, l *Location, files []string) error +} + +// clientImpl implements Client. It uses the store to read the pipeline ID and +// it caches the pipeline details to avoid hitting the server too often. +type clientImpl struct { + client *kiota.Client + store store.Store + + // Cached pipeline with the last retrieval timestamp and protected. + p *Pipeline + ts time.Time + mu sync.RWMutex +} + +var _ Client = (*clientImpl)(nil) + +func NewClient(store store.Store, baseURL, username, key string) (*clientImpl, error) { + stdClient := retryablehttp.NewClient().StandardClient() + k, err := ssclientlib.New(stdClient, baseURL, username, key) + if err != nil { + return nil, err + } + + c := &clientImpl{client: k, store: store} + + return c, nil +} + +func (c *clientImpl) ReadPipeline(ctx context.Context, id uuid.UUID) (*Pipeline, error) { + m, err := c.client.Api().V2().Pipeline().ByUuid(id.String()).Get(ctx, nil) + if err != nil { + return nil, err + } + + p := &Pipeline{ + URI: *m.GetResourceUri(), + } + + if id, err := uuid.Parse(*m.GetUuid()); err != nil { + return nil, err + } else { + p.ID = id + } + + return p, nil +} + +func (c *clientImpl) ReadLocation(ctx context.Context, purpose string) ([]*Location, error) { + p, err := c.pipeline(ctx) + if err != nil { + return nil, err + } + fmt.Println(p.ID) + + return nil, nil +} + +func (c *clientImpl) ReadDefaultLocation(ctx context.Context, purpose string) (*Location, error) { + return nil, nil +} + +func (c *clientImpl) CopyFiles(ctx context.Context, l *Location, files []string) error { + p, err := c.pipeline(ctx) + if err != nil { + return err + } + fmt.Println(p.URI) + + return nil +} + +// pipeline returns the details of the current pipeline. +func (c *clientImpl) pipeline(ctx context.Context) (Pipeline, error) { + const ttl = time.Second * 15 + + c.mu.Lock() + if c.p != nil && time.Since(c.ts) <= ttl { + defer c.mu.Unlock() + return *c.p, nil + } + c.mu.Unlock() + + pipelineID, err := c.store.ReadPipelineID(ctx) + if err != nil { + return Pipeline{}, err + } + p, err := c.ReadPipeline(ctx, pipelineID) + if err != nil { + return Pipeline{}, err + } + + c.mu.RLock() + c.p = p + c.ts = time.Now() + c.mu.Unlock() + + return *c.p, nil +} diff --git a/hack/ccp/internal/ssclient/ssclient_test.go b/hack/ccp/internal/ssclient/ssclient_test.go new file mode 100644 index 000000000..c97a98a94 --- /dev/null +++ b/hack/ccp/internal/ssclient/ssclient_test.go @@ -0,0 +1,21 @@ +package ssclient_test + +import ( + "testing" + + "go.uber.org/mock/gomock" + "gotest.tools/v3/assert" + + "github.com/artefactual/archivematica/hack/ccp/internal/ssclient" + "github.com/artefactual/archivematica/hack/ccp/internal/store/fake" +) + +func TestClient(t *testing.T) { + t.Parallel() + + store := fake.NewMockStore(gomock.NewController(t)) + + c, err := ssclient.NewClient(store, "bu", "u", "k") + assert.NilError(t, err) + assert.Assert(t, c != nil) +} diff --git a/hack/ccp/internal/store/mysql.go b/hack/ccp/internal/store/mysql.go index 0ad7cd39e..9e24f349c 100644 --- a/hack/ccp/internal/store/mysql.go +++ b/hack/ccp/internal/store/mysql.go @@ -55,7 +55,7 @@ func connectToMySQL(logger logr.Logger, dsn string) (*sql.DB, error) { // mysqlstoreImpl implements the Store interface. While most queries are built // using sqlc, there are some cases where more dynamism is required where we -// are using the goqu SQL builder, e.g. UpdateUnitStatus. +// are using the goqu SQL builder, e.g. UpdatePackageStatus. type mysqlStoreImpl struct { logger logr.Logger pool *sql.DB @@ -159,7 +159,7 @@ func (s *mysqlStoreImpl) CreateTasks(ctx context.Context, tasks []*Task) (err er } func (s *mysqlStoreImpl) UpdatePackageStatus(ctx context.Context, id uuid.UUID, packageType enums.PackageType, status enums.PackageStatus) (err error) { - defer wrap(&err, "UpdateUnitStatus(%s, %s, %s)", id, packageType, status) + defer wrap(&err, "UpdatePackageStatus(%s, %s, %s)", id, packageType, status) if !packageType.IsValid() { return fmt.Errorf("invalid type: %v", err) @@ -184,7 +184,7 @@ func (s *mysqlStoreImpl) UpdatePackageStatus(ctx context.Context, id uuid.UUID, } values := goqu.Record{ - "status": status, + "status": int(status), } if status == enums.PackageStatusCompletedSuccessfully { values["completed_at"] = time.Now()