Skip to content

Commit

Permalink
Start controller submission work
Browse files Browse the repository at this point in the history
This commit implements a new Submit() method to be used by the Connect API when
we're creating a new package. Most of the work is focused on the deposit of the
transfer into the shared directory which involves talking to the Archivematica
Storage Service using the ssclient package.

There's more work to do, I haven't tested the RPC yet. I've imported the latest
version of go.artefactual.dev/ssclient (v0.1.0).
  • Loading branch information
sevein committed May 11, 2024
1 parent cdf7ed1 commit 665f5ad
Show file tree
Hide file tree
Showing 14 changed files with 709 additions and 59 deletions.
2 changes: 1 addition & 1 deletion hack/ccp/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax = docker/dockerfile:1.4

FROM golang:1.22.2-alpine AS build
FROM golang:1.22.3-alpine AS build
ARG VERSION_PATH=github.com/artefactual/archivematica/hack/ccp/internal/version
ARG VERSION_NUMBER
ARG VERSION_GIT_COMMIT
Expand Down
15 changes: 15 additions & 0 deletions hack/ccp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ require (
github.com/go-sql-driver/mysql v1.8.1
github.com/gohugoio/hugo v0.125.7
github.com/google/uuid v1.6.0
github.com/hashicorp/go-retryablehttp v0.7.6
github.com/mikespook/gearman-go v0.0.0-20220520031403-2a518e866145
github.com/peterbourgon/ff/v3 v3.4.0
github.com/rs/cors v1.11.0
github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a
github.com/testcontainers/testcontainers-go v0.30.0
github.com/testcontainers/testcontainers-go/modules/mysql v0.30.0
go.artefactual.dev/ssclient v0.1.0
go.artefactual.dev/tools v0.10.0
go.starlark.net v0.0.0-20240411212711-9b43f0afd521
go.uber.org/goleak v1.3.0
Expand All @@ -41,10 +43,12 @@ require (
github.com/bep/godartsass/v2 v2.0.0 // indirect
github.com/bep/golibsass v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cjlapao/common-go v0.0.39 // indirect
github.com/cli/safeexec v1.0.1 // indirect
github.com/containerd/containerd v1.7.12 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/docker v25.0.5+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
Expand All @@ -57,10 +61,17 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/klauspost/compress v1.16.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/microsoft/kiota-abstractions-go v1.6.0 // indirect
github.com/microsoft/kiota-http-go v1.3.3 // indirect
github.com/microsoft/kiota-serialization-form-go v1.0.0 // indirect
github.com/microsoft/kiota-serialization-json-go v1.0.7 // indirect
github.com/microsoft/kiota-serialization-multipart-go v1.0.0 // indirect
github.com/microsoft/kiota-serialization-text-go v1.0.0 // indirect
github.com/mitchellh/hashstructure v1.1.0 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
Expand All @@ -72,12 +83,15 @@ require (
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/std-uritemplate/std-uritemplate/go v0.0.55 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/tdewolff/parse/v2 v2.7.13 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
Expand All @@ -94,4 +108,5 @@ require (
golang.org/x/tools v0.20.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311173647-c811ad7063a7 // indirect
google.golang.org/grpc v1.62.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
26 changes: 26 additions & 0 deletions hack/ccp/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ github.com/bep/tmc v0.5.1/go.mod h1:tGYHN8fS85aJPhDLgXETVKp+PR382OvFi2+q2GkGsq0=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cjlapao/common-go v0.0.39 h1:bAAUrj2B9v0kMzbAOhzjSmiyDy+rd56r2sy7oEiQLlA=
github.com/cjlapao/common-go v0.0.39/go.mod h1:M3dzazLjTjEtZJbbxoA5ZDiGCiHmpwqW9l4UWaddwOA=
github.com/clbanning/mxj/v2 v2.7.0 h1:WA/La7UGCanFe5NpHF0Q3DNtnCsVoxbPKuyBNHWRyME=
github.com/clbanning/mxj/v2 v2.7.0/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
github.com/cli/safeexec v1.0.0/go.mod h1:Z/D4tTN8Vs5gXYHDCbaM1S/anmEDnJb1iW0+EJ5zx3Q=
Expand Down Expand Up @@ -170,6 +172,12 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0Q
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/hairyhenderson/go-codeowners v0.4.0 h1:Wx/tRXb07sCyHeC8mXfio710Iu35uAy5KYiBdLHdv4Q=
github.com/hairyhenderson/go-codeowners v0.4.0/go.mod h1:iJgZeCt+W/GzXo5uchFCqvVHZY2T4TAIpvuVlKVkLxc=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-retryablehttp v0.7.6 h1:TwRYfx2z2C4cLbXmT8I5PgP/xmuqASDyiVuGYfs9GZM=
github.com/hashicorp/go-retryablehttp v0.7.6/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/invopop/yaml v0.2.0 h1:7zky/qH+O0DwAyoobXUqvVBwgBFRxKoQ/3FjcVpjTMY=
Expand Down Expand Up @@ -211,6 +219,18 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/microsoft/kiota-abstractions-go v1.6.0 h1:qbGBNMU0/o5myKbikCBXJFohVCFrrpx2cO15Rta2WyA=
github.com/microsoft/kiota-abstractions-go v1.6.0/go.mod h1:7YH20ZbRWXGfHSSvdHkdztzgCB9mRdtFx13+hrYIEpo=
github.com/microsoft/kiota-http-go v1.3.3 h1:FKjK5BLFONu5eIBxtrkirkixFQmcPwitZ8iwZHKbESo=
github.com/microsoft/kiota-http-go v1.3.3/go.mod h1:IWw/PwtBs/GYz+Pa75gPW7MFNFv0aKPFsLw5WqzL1SE=
github.com/microsoft/kiota-serialization-form-go v1.0.0 h1:UNdrkMnLFqUCccQZerKjblsyVgifS11b3WCx+eFEsAI=
github.com/microsoft/kiota-serialization-form-go v1.0.0/go.mod h1:h4mQOO6KVTNciMF6azi1J9QB19ujSw3ULKcSNyXXOMA=
github.com/microsoft/kiota-serialization-json-go v1.0.7 h1:yMbckSTPrjZdM4EMXgzLZSA3CtDaUBI350u0VoYRz7Y=
github.com/microsoft/kiota-serialization-json-go v1.0.7/go.mod h1:1krrY7DYl3ivPIzl4xTaBpew6akYNa8/Tal8g+kb0cc=
github.com/microsoft/kiota-serialization-multipart-go v1.0.0 h1:3O5sb5Zj+moLBiJympbXNaeV07K0d46IfuEd5v9+pBs=
github.com/microsoft/kiota-serialization-multipart-go v1.0.0/go.mod h1:yauLeBTpANk4L03XD985akNysG24SnRJGaveZf+p4so=
github.com/microsoft/kiota-serialization-text-go v1.0.0 h1:XOaRhAXy+g8ZVpcq7x7a0jlETWnWrEum0RhmbYrTFnA=
github.com/microsoft/kiota-serialization-text-go v1.0.0/go.mod h1:sM1/C6ecnQ7IquQOGUrUldaO5wj+9+v7G2W3sQ3fy6M=
github.com/mikespook/gearman-go v0.0.0-20220520031403-2a518e866145 h1:6kTCi6p3Hd6JYROnq+1UOdewoXj90zKKDQPlsHYTSEs=
github.com/mikespook/gearman-go v0.0.0-20220520031403-2a518e866145/go.mod h1:77Th6O6AZfMU6i5hLJnjN5xxUBoio7LN0aOyxGhqV1U=
github.com/mitchellh/hashstructure v1.1.0 h1:P6P1hdjqAAknpY/M1CGipelZgp+4y9ja9kmUZPXP+H0=
Expand Down Expand Up @@ -276,6 +296,8 @@ github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/std-uritemplate/std-uritemplate/go v0.0.55 h1:muSH037g97K7U2f94G9LUuE8tZlJsoSSrPsO9V281WY=
github.com/std-uritemplate/std-uritemplate/go v0.0.55/go.mod h1:rG/bqh/ThY4xE5de7Rap3vaDkYUT76B0GPJ0loYeTTc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down Expand Up @@ -311,6 +333,8 @@ github.com/yuin/goldmark-emoji v1.0.2 h1:c/RgTShNgHTtc6xdz2KKI74jJr6rWi7FPgnP9GA
github.com/yuin/goldmark-emoji v1.0.2/go.mod h1:RhP/RWpexdp+KHs7ghKnifRoIs/Bq4nDS7tRbCkOwKY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.artefactual.dev/ssclient v0.1.0 h1:6fDwCOia9DFI8bZ2lZlCMSuoctwFY7P7XiMbJLmJB9E=
go.artefactual.dev/ssclient v0.1.0/go.mod h1:ImaAHtgGIbKlnrOUzczBMmltNVbhYkKZ7ujUjfBtUj8=
go.artefactual.dev/tools v0.10.0 h1:+LeZS5oHupAQBXvLQ4aGIuZyqf7zCpD7s3UpyDl9zn4=
go.artefactual.dev/tools v0.10.0/go.mod h1:PIy0RtC45gC4sASb4r26g0aCU24kSWIp+mcV1p2gtpY=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
Expand Down Expand Up @@ -435,6 +459,8 @@ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGm
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
Expand Down
44 changes: 41 additions & 3 deletions hack/ccp/internal/api/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package admin

import (
"context"
"errors"
"net"
"net/http"
"time"
Expand Down Expand Up @@ -90,10 +91,47 @@ func (s *Server) Run() error {
return nil
}

// validateCreatePackageRequets validates the request.
//
// TODO: use https://github.com/bufbuild/protovalidate.
func validateCreatePackageRequest(msg *adminv1.CreatePackageRequest) error {
if msg.Name == "" {
return errors.New("name is empty")
}

hasPaths := false
for _, item := range msg.Path {
if len(item) > 0 {
hasPaths = true
break
}
}
if !hasPaths {
return errors.New("path is empty")
}

if msg.Type == adminv1.TransferType_TRANSFER_TYPE_UNSPECIFIED {
return errors.New("type is unspecified")
}

return nil
}

func (s *Server) CreatePackage(ctx context.Context, req *connect.Request[adminv1.CreatePackageRequest]) (*connect.Response[adminv1.CreatePackageResponse], error) {
return connect.NewResponse(&adminv1.CreatePackageResponse{
Id: uuid.New().String(),
}), nil
if err := validateCreatePackageRequest(req.Msg); err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}

resp := &adminv1.CreatePackageResponse{}

if pkg, err := s.ctrl.Submit(ctx, req.Msg); err != nil {
return nil, connect.NewError(connect.CodeUnknown, err)
} else {
s.logger.Info("TODO: return identifier", "pkg", pkg.Name())
resp.Id = uuid.New().String()
}

return connect.NewResponse(resp), nil
}

func (s *Server) ApproveTransfer(ctx context.Context, req *connect.Request[adminv1.ApproveTransferRequest]) (*connect.Response[adminv1.ApproveTransferResponse], error) {
Expand Down
82 changes: 54 additions & 28 deletions hack/ccp/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"time"

"github.com/artefactual-labs/gearmin"
"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"

adminv1 "github.com/artefactual/archivematica/hack/ccp/internal/api/gen/archivematica/ccp/admin/v1beta1"
"github.com/artefactual/archivematica/hack/ccp/internal/store"
"github.com/artefactual/archivematica/hack/ccp/internal/workflow"
)
Expand Down Expand Up @@ -94,17 +94,46 @@ func (c *Controller) Run() error {
return nil
}

func (c *Controller) HandleWatchedDirEvents(evs []fsnotify.Event) {
for _, ev := range evs {
if ev.Op&fsnotify.Create == fsnotify.Create {
if err := c.handle(ev.Name); err != nil {
c.logger.Info("Failed to handle event.", "err", err, "path", ev.Name)
}
}
// Submit a new package to the queue.
func (c *Controller) Submit(ctx context.Context, req *adminv1.CreatePackageRequest) (*Package, error) {
// 1. Create Package (Transfer).
// transfer = models.Transfer.objects.create(**kwargs)
// if not processing_configuration_file_exists(processing_config): processing_config = "default"
// transfer.set_processing_configuration(processing_config)
// transfer.update_active_agent(user_id)
// 2. Create temporary directory inside sharedDir/tmp.
// tmpdir = mkdtemp(dir=os.path.join(_get_setting("SHARED_DIRECTORY"), "tmp"))
// 3. Identify starting point.
// starting_point = PACKAGE_TYPE_STARTING_POINTS.get(type_)
// 4. Start creation.
// params = (transfer, name, path, tmpdir, starting_point)
// if auto_approve:
// params = params + (workflow, package_queue)
// result = executor.submit(_start_package_transfer_with_auto_approval, *params)
// else:
// result = executor.submit(_start_package_transfer, *params)
// 5. Adjust permissions?
// result.add_done_callback(lambda f: os.chmod(tmpdir, 0o770))

pkg, err := NewTransferPackage(c.groupCtx, c.logger.WithName("package"), c.store, c.sharedDir, req)
if err != nil {
return nil, fmt.Errorf("create package: %v", err)
}

c.queue(pkg)
c.pick() // Start work right away, we don't want to wait for the next tick.

return pkg, nil
}

func (c *Controller) handle(path string) error {
// Notify the controller of a new with a slice of filesystem events.
func (c *Controller) Notify(path string) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("notify: %v", err)
}
}()

rel, err := filepath.Rel(c.watchedDir, path)
if err != nil {
return err
Expand All @@ -113,36 +142,33 @@ func (c *Controller) handle(path string) error {
dir, _ := filepath.Split(rel)
dir = trim(dir)

var match bool
for _, wd := range c.wf.WatchedDirectories {
if trim(wd.Path) == dir {
match = true
c.logger.V(2).Info("Identified new package.", "path", path, "type", wd.UnitType)
c.queue(path, wd)
c.pick()
var wd *workflow.WatchedDirectory
for _, item := range c.wf.WatchedDirectories {
if trim(item.Path) == dir {
wd = item
break
}
}
if !match {
if wd == nil {
return fmt.Errorf("unmatched event")
}

return nil
}

func (c *Controller) queue(path string, wd *workflow.WatchedDirectory) {
ctx, cancel := context.WithTimeout(c.groupCtx, time.Minute)
defer cancel()
c.logger.V(2).Info("Identified new package.", "path", path, "type", wd.UnitType)

logger := c.logger.WithName("package").WithValues("wd", wd.Path, "path", path)
p, err := NewPackage(ctx, logger, c.store, path, c.sharedDir, wd)
if err != nil {
logger.Error(err, "Failed to create new package.")
return
if pkg, err := NewPackage(c.groupCtx, logger, c.store, c.sharedDir, path, wd); err != nil {
return err
} else {
c.queue(pkg)
c.pick() // Start work right away, we don't want to wait for the next tick.
}

return nil
}

func (c *Controller) queue(pkg *Package) {
c.mu.Lock()
c.queuedPackages = append(c.queuedPackages, p)
c.queuedPackages = append(c.queuedPackages, pkg)
c.mu.Unlock()
}

Expand Down
Loading

0 comments on commit 665f5ad

Please sign in to comment.