From c1591ae3e82b53fad3e8702715d31ae571531b52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marius=20Andr=C3=A9=20Elsfjordstrand=20Beck?= Date: Fri, 3 Nov 2023 13:53:30 +0100 Subject: [PATCH] Initial commit --- .gitattributes | 1 + .github/ISSUE_TEMPLATE/bug-report.md | 34 ++++ .github/ISSUE_TEMPLATE/feature-request.md | 17 ++ .github/dependabot.yaml | 31 ++++ .github/workflows/main.yaml | 51 ++++++ .github/workflows/release.yaml | 48 +++++ .gitignore | 1 + .golangci.yaml | 118 +++++++++++++ Dockerfile | 20 +++ LICENSE | 202 ++++++++++++++++++++++ README.md | 30 ++++ go.mod | 27 +++ go.sum | 91 ++++++++++ internal/checksum/checksum.go | 70 ++++++++ internal/checksum/checksum_test.go | 86 +++++++++ internal/fai/fai.go | 161 +++++++++++++++++ internal/fai/fai_test.go | 63 +++++++ internal/fai/options.go | 158 +++++++++++++++++ internal/fai/options_test.go | 77 +++++++++ internal/log/noop.go | 34 ++++ internal/metrics/metrics.go | 42 +++++ internal/queue/queue.go | 78 +++++++++ internal/queue/queue_test.go | 100 +++++++++++ internal/warc/testdata/invalid.warc.gz | 3 + internal/warc/testdata/valid.warc.gz | 3 + internal/warc/validate.go | 55 ++++++ internal/warc/validate_test.go | 58 +++++++ main.go | 86 +++++++++ 28 files changed, 1745 insertions(+) create mode 100644 .gitattributes create mode 100644 .github/ISSUE_TEMPLATE/bug-report.md create mode 100644 .github/ISSUE_TEMPLATE/feature-request.md create mode 100644 .github/dependabot.yaml create mode 100644 .github/workflows/main.yaml create mode 100644 .github/workflows/release.yaml create mode 100644 .gitignore create mode 100644 .golangci.yaml create mode 100644 Dockerfile create mode 100644 LICENSE create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/checksum/checksum.go create mode 100644 internal/checksum/checksum_test.go create mode 100644 internal/fai/fai.go create mode 100644 internal/fai/fai_test.go create mode 100644 internal/fai/options.go create mode 100644 internal/fai/options_test.go create mode 100644 internal/log/noop.go create mode 100644 internal/metrics/metrics.go create mode 100644 internal/queue/queue.go create mode 100644 internal/queue/queue_test.go create mode 100644 internal/warc/testdata/invalid.warc.gz create mode 100644 internal/warc/testdata/valid.warc.gz create mode 100644 internal/warc/validate.go create mode 100644 internal/warc/validate_test.go create mode 100644 main.go diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..8c6421b --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.warc.gz filter=lfs diff=lfs merge=lfs -text diff --git a/.github/ISSUE_TEMPLATE/bug-report.md b/.github/ISSUE_TEMPLATE/bug-report.md new file mode 100644 index 0000000..9e244d1 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug-report.md @@ -0,0 +1,34 @@ +--- +name: Bug Report +about: Create a bug report to help us improve +labels: "bug" +--- + +**Bug Report** + + + +**To Reproduce** + +```shell +# A standalone program is preferred +``` + +**Expected Behavior** + + + +**Actual Behavior** + + + +**Your Environment** + + diff --git a/.github/ISSUE_TEMPLATE/feature-request.md b/.github/ISSUE_TEMPLATE/feature-request.md new file mode 100644 index 0000000..cc7d1ac --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature-request.md @@ -0,0 +1,17 @@ +--- +name: Feature request +about: Propose a new feature +labels: "feature" +--- + +**Motivation** + + + +**Feature** + + diff --git a/.github/dependabot.yaml b/.github/dependabot.yaml new file mode 100644 index 0000000..a77d668 --- /dev/null +++ b/.github/dependabot.yaml @@ -0,0 +1,31 @@ +version: 2 +updates: + - package-ecosystem: "gomod" + directory: "/" + schedule: + interval: "weekly" + labels: + - "dependencies" + commit-message: + prefix: "build" + include: "scope" + + - package-ecosystem: "docker" + directory: "/" + schedule: + interval: "weekly" + labels: + - "dependencies" + commit-message: + prefix: "build" + include: "scope" + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + labels: + - "dependencies" + commit-message: + prefix: "build" + include: "scope" diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml new file mode 100644 index 0000000..ca2fbf4 --- /dev/null +++ b/.github/workflows/main.yaml @@ -0,0 +1,51 @@ +name: Main +on: push + +jobs: + test: + name: Test + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + lfs: true + - name: Setup go + uses: actions/setup-go@v4 + with: + go-version: "1.21" + cache: false + - name: Build + run: "go build ./..." + - name: Test + run: "go test ./..." + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + - name: Setup go + uses: actions/setup-go@v4 + with: + go-version: "1.21" + cache: false + - name: Lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.55.1 + args: --config=.golangci.yaml + codeqL-build: + name: CodeQL build + runs-on: ubuntu-latest + permissions: + security-events: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..a8b7210 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,48 @@ +name: Release + +on: + push: + branches: + - main + tags: + - "v[0-9]+.[0-9]+.[0-9]+**" + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + +jobs: + build: + name: Create and release docker image + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + + steps: + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Extract metadata (tags, labels, version) for Docker + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=semver,pattern={{version}} + type=ref,event=branch + type=ref,event=pr + + - name: Log in to the container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5 + with: + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3852366 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/fai diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..415485c --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,118 @@ +output: + format: github-actions + print-issued-lines: true + print-linter-name: true + uniq-by-line: true + sort-results: true + +linters-settings: + gofmt: + simplify: true + +issues: + max-issues-per-linter: 0 + max-same-issues: 0 + +severity: + case-sensitive: true + +linters: + enable-all: true + disable: + # TODO(https://github.com/nlnwa/go_container/issues/3): The following + # sub-linters should be evaluated if they are going to be enabled or not. + - asasalint + - asciicheck + - bidichk + - bodyclose + - containedctx + - cyclop + - deadcode + - decorder + - depguard + - dogsled + - dupl + - dupword + - errchkjson + - execinquery + - exhaustive + - exhaustivestruct + - exhaustruct + - exportloopref + - forbidigo + - forcetypeassert + - funlen + - gci + - ginkgolinter + - gocheckcompilerdirectives + - gochecknoglobals + - gochecknoinits + - gocognit + - goconst + - gocritic + - gocyclo + - godot + - godox + - goerr113 + - gofumpt + - goheader + - goimports + - golint + - gomnd + - gomoddirectives + - gomodguard + - goprintffuncname + - gosec + - gosmopolitan + - grouper + - ifshort + - importas + - interfacebloat + - interfacer + - ireturn + - lll + - loggercheck + - maintidx + - makezero + - maligned + - mirror + - misspell + - musttag + - nakedret + - nestif + - nilerr + - nilnil + - nlreturn + - noctx + - nolintlint + - nonamedreturns + - nosnakecase + - nosprintfhostport + - paralleltest + - prealloc + - predeclared + - promlinter + - reassign + - revive + - rowserrcheck + - scopelint + - sqlclosecheck + - structcheck + - stylecheck + - tagalign + - tagliatelle + - tenv + - testableexamples + - testpackage + - thelper + - tparallel + - unconvert + - unparam + - usestdlibvars + - varcheck + - varnamelen + - wastedassign + - whitespace + - wrapcheck + - wsl + - zerologlint diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..01d27c6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM golang:1.21 AS build + +WORKDIR /go/src/app + +COPY go.mod go.sum ./ + +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -trimpath -ldflags "-s -w" . + + +FROM gcr.io/distroless/static-debian12:latest + +COPY --from=build /go/src/app/fai /fai + +EXPOSE 8081 + +ENTRYPOINT ["/fai"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a5381e8 --- /dev/null +++ b/README.md @@ -0,0 +1,30 @@ +# First Article Inspection (FAI) + +FAI loops trough files in a given directory that match a given pattern +and for every matching file: + +- creates a checksum file +- validates the file as a WARC-file +- logs the name, size, checksum and validation status +- updates a file size histogram metric and a validation error counter metric +- moves the file and the corresponding checksum file to a target directory + +```text +Usage of fai: + -concurrency int + number of concurrent files processed (default [number of CPU cores]) + -invalid-target-dir string + path to target directory where invalid files and their corresponding checksum files will be moved to + -metrics-port int + port to expose metrics on (default 8081) + -pattern string + glob pattern used to match filenames in source directory (default "*") + -sleep duration + sleep duration between directory listings, set to 0 to only do a single run (default 5s) + -source-dir string + path to source directory + -tmp-dir string + path to directory where temporary buffer files will be stored + -valid-target-dir string + path to target directory where valid files and their corresponding checksum files will be moved to +``` \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..acfab46 --- /dev/null +++ b/go.mod @@ -0,0 +1,27 @@ +module github.com/nlnwa/fai + +go 1.21 + +require ( + github.com/nlnwa/gowarc v1.1.1 + github.com/prometheus/client_golang v1.17.0 +) + +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.10.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/google/uuid v1.4.0 // indirect + github.com/klauspost/compress v1.17.2 // indirect + github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect + github.com/nlnwa/whatwg-url v0.4.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/common v0.45.0 // indirect + github.com/prometheus/procfs v0.12.0 // indirect + github.com/prometheus/prometheus v0.47.2 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9aa5190 --- /dev/null +++ b/go.sum @@ -0,0 +1,91 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.5.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= +github.com/bits-and-blooms/bitset v1.10.0 h1:ePXTeiPEazB5+opbv5fr8umg2R/1NlzgDsyepwsSr88= +github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= +github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= +github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4= +github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= +github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= +github.com/nlnwa/gowarc v1.1.1 h1:C0kWp8aX3E/+/TvaqlO6I86drYO8qdZDGbJLD+aJilk= +github.com/nlnwa/gowarc v1.1.1/go.mod h1:aFbCuh7ZaGlKO2IAH7GbMgp0b2K8kzK2cb0zWsXWp/8= +github.com/nlnwa/whatwg-url v0.4.0 h1:B3kFb5EL7KILeBkhrlQvFi41Ex0p4ropVA9brt5ungI= +github.com/nlnwa/whatwg-url v0.4.0/go.mod h1:pLzpJjFPtA+n7RCLvp0GBxvDHa/2ckNCBK9mfEeNOMQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= +github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/prometheus/prometheus v0.47.2 h1:jWcnuQHz1o1Wu3MZ6nMJDuTI0kU5yJp9pkxh8XEkNvI= +github.com/prometheus/prometheus v0.47.2/go.mod h1:J/bmOSjgH7lFxz2gZhrWEZs2i64vMS+HIuZfmYNhJ/M= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/checksum/checksum.go b/internal/checksum/checksum.go new file mode 100644 index 0000000..cfcf9ac --- /dev/null +++ b/internal/checksum/checksum.go @@ -0,0 +1,70 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package checksum + +import ( + "crypto/md5" + "encoding/hex" + "io" + "os" +) + +// MD5Sum returns the md5 checksum of the given file encoded as a hex string. +func MD5Sum(filepath string) (string, error) { + f, err := os.Open(filepath) + if err != nil { + return "", err + } + defer f.Close() + + h := md5.New() + if _, err := io.Copy(h, f); err != nil { + return "", err + } + + return hex.EncodeToString(h.Sum(nil)), nil +} + +// separator is used to separate the checksum from the filepath in the checksum file. +const separator = " " + +// CreateChecksumFile creates a checksum file for the given file. +// It returns the path to the created checksum file. +func CreateChecksumFile(file string, checksum string, extension string) (string, error) { + // Don't allow empty checksum + if checksum == "" { + panic("checksum is empty") + } + + checksumFile := file + extension + content := checksum + separator + file + "\n" + + // Create checksum file + f, err := os.Create(checksumFile) + if err != nil { + return "", err + } + defer f.Close() + + // Write content to checksum file + _, err = f.WriteString(content) + if err != nil { + return "", err + } + + return checksumFile, nil +} diff --git a/internal/checksum/checksum_test.go b/internal/checksum/checksum_test.go new file mode 100644 index 0000000..dd3d41f --- /dev/null +++ b/internal/checksum/checksum_test.go @@ -0,0 +1,86 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package checksum + +import ( + "os" + "testing" +) + +func TestMD5Sum(t *testing.T) { + tmpDir := t.TempDir() + f, err := os.CreateTemp(tmpDir, "test") + + if err != nil { + t.Fatalf("failed to create test file: %v", err) + } + _, err = f.WriteString("Test string") + if err != nil { + t.Fatalf("failed to write to test file: %v", err) + } + testFile := f.Name() + defer os.Remove(testFile) + defer f.Close() + + got, err := MD5Sum(testFile) + if err != nil { + t.Errorf("expected no error, but got %v", err) + } + want := "0fd3dbec9730101bff92acc820befc34" + if got != want { + t.Errorf("expected %s, but got %s", want, got) + } +} + +func TestCreateChecksumFile(t *testing.T) { + tmpDir := t.TempDir() + f, err := os.CreateTemp(tmpDir, "test") + + if err != nil { + t.Fatalf("failed to create test file: %v", err) + } + _, err = f.WriteString("Test string") + if err != nil { + t.Fatalf("failed to write to test file: %v", err) + } + testFile := f.Name() + defer os.Remove(testFile) + defer f.Close() + + wantHash := "0fd3dbec9730101bff92acc820befc34" + fileHash, err := MD5Sum(testFile) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + checksumFile, err := CreateChecksumFile(testFile, fileHash, ".md5") + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + defer os.Remove(checksumFile) + + b, err := os.ReadFile(checksumFile) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + + got := string(b) + want := wantHash + separator + testFile + "\n" + + if got != want { + t.Errorf("expected %s, got %s", want, got) + } +} diff --git a/internal/fai/fai.go b/internal/fai/fai.go new file mode 100644 index 0000000..63d0832 --- /dev/null +++ b/internal/fai/fai.go @@ -0,0 +1,161 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fai + +import ( + "context" + "errors" + "log/slog" + "os" + "path/filepath" + "time" + + "github.com/nlnwa/fai/internal/checksum" + "github.com/nlnwa/fai/internal/metrics" + "github.com/nlnwa/fai/internal/queue" + "github.com/nlnwa/fai/internal/warc" +) + +type fAI struct { + sourceDir string + validTargetDir string + invalidTargetDir string + tmpDir string + concurrency int + sleep time.Duration + globPattern string + logger *slog.Logger +} + +func New(options ...Option) (*fAI, error) { + opts, err := validateOptions(options...) + if err != nil { + return nil, err + } + + return &fAI{ + sourceDir: opts.sourceDir, + validTargetDir: opts.validTargetDir, + invalidTargetDir: opts.invalidTargetDir, + tmpDir: opts.tmpDir, + concurrency: opts.concurrency, + sleep: opts.sleep, + globPattern: opts.globPattern, + logger: opts.logger, + }, nil +} + +const checksumFileSuffix = ".md5" + +func (f *fAI) processFile(file string) { + if _, err := os.Stat(file); errors.Is(err, os.ErrNotExist) { + // file does not exist so skip it + return + } + + // calculate checksum + md5sum, err := checksum.MD5Sum(file) + if err != nil { + f.logger.Error("Failed to calculate checksum", "file", file, "error", err) + return + } + // create checksum file + checksumFile, err := checksum.CreateChecksumFile(file, md5sum, checksumFileSuffix) + if err != nil { + f.logger.Error("Failed to create checksum file", "file", file, "error", err) + return + } + + // validate file + isValid, err := warc.IsValid(file, filepath.Join(f.tmpDir, "buffer")) + if err != nil { + f.logger.Error("Failed to validate file", "file", file, "error", err) + return + } + + targetDir := f.validTargetDir + if !isValid { + targetDir = f.invalidTargetDir + metrics.ValidationError() + } + + newChecksumFile := filepath.Join(targetDir, filepath.Base(checksumFile)) + newFile := filepath.Join(targetDir, filepath.Base(file)) + + // Move checksum file and file to target directory. + // + // The order is important because a failed move of the checksum file + // will result in the file being checksummed again (ok). If the file + // is moved first and the checksum file fails to move then the + // checksum file will never be created (not ok). + + // move checksum file to new location + err = os.Rename(checksumFile, newChecksumFile) + if err != nil { + f.logger.Error("Failed to move checksum file", "source", checksumFile, "target", newChecksumFile, "error", err) + return + } + + // move file to new location + err = os.Rename(file, newFile) + if err != nil { + f.logger.Error("Failed to move file", "source", file, "target", newFile, "error", err) + return + } + + // get file size + fileInfo, err := os.Stat(newFile) + if err != nil { + f.logger.Error("Failed to get file size", "file", newFile, "error", err) + return + } + fileSizeBytes := fileInfo.Size() + + metrics.Size(fileSizeBytes) + + f.logger.Info("Processed file", "file", newFile, "size", fileSizeBytes, "md5", md5sum, "valid", isValid) +} + +// Run starts the FAI. +// It will run until the context is cancelled or stop after one pass if the sleep duration is zero. +func (f *fAI) Run(ctx context.Context) { + f.logger.Info("Starting FAI", "sourceDir", f.sourceDir, "validTargetDir", f.validTargetDir, "invalidTargetDir", f.invalidTargetDir, "concurrency", f.concurrency, "sleep", f.sleep) + + queue := queue.NewWorkQueue(f.processFile, f.concurrency) + defer queue.CloseAndWait() + + for { + files, _ := filepath.Glob(f.globPattern) + for _, file := range files { + select { + case <-ctx.Done(): + return + default: + queue.Add(file) + } + } + select { + case <-ctx.Done(): + return + case <-time.After(f.sleep): + // do a single pass if sleep duration is zero + if f.sleep == 0 { + return + } + } + } +} diff --git a/internal/fai/fai_test.go b/internal/fai/fai_test.go new file mode 100644 index 0000000..6310b9b --- /dev/null +++ b/internal/fai/fai_test.go @@ -0,0 +1,63 @@ +package fai + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +// createTestFile creates a test file in the given directory +func createTestFile(t *testing.T, dir string) *os.File { + t.Helper() + f, _ := os.CreateTemp(dir, "testfile") + return f +} + +func TestRun(t *testing.T) { + sourceDir := t.TempDir() + targetDir := t.TempDir() + + testFiles := []struct { + file *os.File + expectedDir string + isValid bool + }{ + { + file: createTestFile(t, sourceDir), + expectedDir: targetDir, + }, + { + file: createTestFile(t, sourceDir), + expectedDir: targetDir, + }, + } + + fai, err := New( + WithConcurrency(len(testFiles)), + WithSleep(0), + WithSourceDir(sourceDir), + WithValidTargetDir(targetDir), + WithInvalidTargetDir(targetDir), + WithTmpDir(t.TempDir()), + ) + if err != nil { + t.Fatalf("failed to create fai: %v", err) + } + + // run fai + fai.Run(context.Background()) + + for _, testFile := range testFiles { + // check that test file has been moved + _, err = os.Stat(filepath.Join(testFile.expectedDir, filepath.Base(testFile.file.Name()))) + if err != nil { + t.Errorf("failed to stat test file '%v'", err) + } + // check that checksum file has been created and moved + _, err = os.Stat(filepath.Join(testFile.expectedDir, filepath.Base(testFile.file.Name())+".md5")) + if err != nil { + t.Errorf("failed to stat checksum file: %v", err) + } + } +} diff --git a/internal/fai/options.go b/internal/fai/options.go new file mode 100644 index 0000000..b1087e8 --- /dev/null +++ b/internal/fai/options.go @@ -0,0 +1,158 @@ +package fai + +import ( + "errors" + "fmt" + "log/slog" + "os" + "path/filepath" + "runtime" + "strings" + "time" + + "github.com/nlnwa/fai/internal/log" +) + +type options struct { + sourceDir string + validTargetDir string + invalidTargetDir string + tmpDir string + concurrency int + sleep time.Duration + globPattern string + logger *slog.Logger +} + +func defaultOptions() *options { + return &options{ + sourceDir: "", + validTargetDir: "", + invalidTargetDir: "", + tmpDir: "", + concurrency: runtime.NumCPU(), + sleep: 1 * time.Second, + globPattern: "*", + logger: log.Noop(), + } +} + +func validateOptions(options ...Option) (*options, error) { + o := defaultOptions() + for _, opt := range options { + opt(o) + } + + if o.concurrency < 1 { + return nil, fmt.Errorf("concurrency must be greater than 0") + } + + var err error + + if (o.sourceDir == o.validTargetDir || o.sourceDir == o.invalidTargetDir) && (strings.HasSuffix(o.globPattern, "*") || strings.HasSuffix(o.globPattern, checksumFileSuffix)) { + return nil, fmt.Errorf("source and target directories cannot be the same when glob pattern is a wildcard or glob pattern ends with %s", checksumFileSuffix) + } + + // make sure source, temp and target directories are absolute paths + o.sourceDir, err = filepath.Abs(o.sourceDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path of source directory: %w", err) + } + if info, err := os.Stat(o.sourceDir); errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("source directory does not exist: %w", err) + } else if !info.IsDir() { + return nil, fmt.Errorf("source directory is not a directory: %w", err) + } + + o.validTargetDir, err = filepath.Abs(o.validTargetDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path of valid target directory: %w", err) + } + if info, err := os.Stat(o.validTargetDir); errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("valid target directory does not exist: %w", err) + } else if !info.IsDir() { + return nil, fmt.Errorf("valid target directory is not a directory: %w", err) + } + o.invalidTargetDir, err = filepath.Abs(o.invalidTargetDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path of invalid target directory: %w", err) + } + if info, err := os.Stat(o.invalidTargetDir); errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("invalid target directory does not exist: %w", err) + } else if !info.IsDir() { + return nil, fmt.Errorf("invalid target directory is not a directory: %w", err) + } + o.tmpDir, err = filepath.Abs(o.tmpDir) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path of tmp directory: %w", err) + } + if info, err := os.Stat(o.tmpDir); errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("tmp directory does not exist: %w", err) + } else if !info.IsDir() { + return nil, fmt.Errorf("tmp directory is not a directory: %w", err) + } + + o.globPattern = filepath.Join(o.sourceDir, o.globPattern) + + // Test glob pattern is valid + _, err = filepath.Glob(o.globPattern) + if err != nil { + return nil, fmt.Errorf("invalid glob pattern: %w", err) + } + + if o.logger == nil { + return nil, fmt.Errorf("logger cannot be nil") + } + + return o, nil +} + +type Option func(opts *options) + +func WithSourceDir(dir string) Option { + return func(opts *options) { + opts.sourceDir = dir + } +} + +func WithValidTargetDir(dir string) Option { + return func(opts *options) { + opts.validTargetDir = dir + } +} + +func WithInvalidTargetDir(dir string) Option { + return func(opts *options) { + opts.invalidTargetDir = dir + } +} + +func WithTmpDir(dir string) Option { + return func(opts *options) { + opts.tmpDir = dir + } +} + +func WithConcurrency(concurrency int) Option { + return func(opts *options) { + opts.concurrency = concurrency + } +} + +func WithSleep(sleep time.Duration) Option { + return func(opts *options) { + opts.sleep = sleep + } +} + +func WithGlobPattern(globPattern string) Option { + return func(opts *options) { + opts.globPattern = globPattern + } +} + +func WithLogger(logger *slog.Logger) Option { + return func(opts *options) { + opts.logger = logger + } +} diff --git a/internal/fai/options_test.go b/internal/fai/options_test.go new file mode 100644 index 0000000..c20caea --- /dev/null +++ b/internal/fai/options_test.go @@ -0,0 +1,77 @@ +package fai + +import ( + "os" + "testing" +) + +func TestValidateOptions(t *testing.T) { + testDir := t.TempDir() + testFile, err := os.CreateTemp(testDir, "test") + if err != nil { + t.Fatalf("failed to create test file: %v", err) + } + + // Test glob pattern is a wildcard and source and valid target directory are the same + _, err = validateOptions( + WithSourceDir(testDir), + WithValidTargetDir(testDir), + WithGlobPattern("*"), + ) + if err == nil { + t.Error("expected error when source and valid target directory are the same and glob pattern is '*'") + } + + // Test glob pattern is a wildcard and source and invalid target directory are the same + _, err = validateOptions( + WithSourceDir(testDir), + WithInvalidTargetDir(testDir), + WithGlobPattern("*"), + ) + if err == nil { + t.Error("expected error when source and invalid target directory are the same and glob pattern is '*'") + } + + // Test glob pattern ends with checksum file suffix and source and target directory are the same + _, err = validateOptions( + WithSourceDir(testDir), + WithValidTargetDir(testDir), + WithGlobPattern("*."+checksumFileSuffix), + ) + if err == nil { + t.Errorf("expected error when source and target directory are the same and glob pattern ends with '%s'", checksumFileSuffix) + } + + // Test valid target directory is not a directory + _, err = validateOptions( + WithSourceDir(testDir), + WithValidTargetDir(testFile.Name()), + ) + if err == nil { + t.Error("expected error when valid target directory is not a directory") + } + + // Test concurrency cannot be zero + _, err = validateOptions( + WithConcurrency(0), + ) + if err == nil { + t.Error("expected error when concurrency is zero") + } + + // Test glob pattern is invalid + _, err = validateOptions( + WithGlobPattern("["), + ) + if err == nil { + t.Error("expected error when glob pattern is invalid") + } + + // Test logger cannot be nil + _, err = validateOptions( + WithLogger(nil), + ) + if err == nil { + t.Error("expected error when logger is nil") + } +} diff --git a/internal/log/noop.go b/internal/log/noop.go new file mode 100644 index 0000000..87d56a8 --- /dev/null +++ b/internal/log/noop.go @@ -0,0 +1,34 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package log + +import ( + "context" + "log/slog" +) + +// noopHandler is a slog.Handler that does nothing. +type noopHandler struct{} + +func (n noopHandler) Enabled(context.Context, slog.Level) bool { return false } +func (n noopHandler) Handle(context.Context, slog.Record) error { return nil } +func (n noopHandler) WithAttrs([]slog.Attr) slog.Handler { return n } +func (n noopHandler) WithGroup(string) slog.Handler { return n } + +func Noop() *slog.Logger { + return slog.New(noopHandler{}) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000..8612c08 --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,42 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var filesize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "file_size_bytes", + Help: "Size of files in bytes.", + // 1MB, 100MB, 500MB, 1GB + Buckets: []float64{1000000, 100000000, 500000000, 1000000000}, +}) + +var validationError = promauto.NewCounter(prometheus.CounterOpts{ + Name: "validation_error", + Help: "Number of files with validation errors.", +}) + +func ValidationError() { + validationError.Inc() +} + +func Size(size int64) { + filesize.Observe(float64(size)) +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 0000000..930771c --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,78 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue + +import ( + "sync" +) + +// workQueue is a queue of jobs that are executed by a number of workers. +type workQueue struct { + queue chan string + wg sync.WaitGroup + m sync.RWMutex + hm map[string]struct{} +} + +// NewWorkQueue creates a new work queue and adds the given number of workers. +func NewWorkQueue(execute func(string), concurrency int) *workQueue { + iw := &workQueue{ + queue: make(chan string, concurrency), + hm: make(map[string]struct{}, concurrency), + } + + for i := 0; i < concurrency; i++ { + iw.wg.Add(1) + go func() { + defer iw.wg.Done() + for job := range iw.queue { + execute(job) + iw.m.Lock() + delete(iw.hm, job) + iw.m.Unlock() + } + }() + } + + return iw +} + +// CloseAndWait closes the queue and waits for all workers to complete. +func (iw *workQueue) CloseAndWait() { + // close queue + close(iw.queue) + // and wait for queue to be drained + iw.wg.Wait() +} + +// Add adds a job to the queue. +// If the job is already in the queue, it will be ignored. +// If the queue is full, it will block until there is room. +// If the queue is closed, it will panic. +func (iw *workQueue) Add(job string) { + iw.m.Lock() + // check if job is already in queue + if _, ok := iw.hm[job]; ok { + iw.m.Unlock() + return + } + // add job to queue + iw.hm[job] = struct{}{} + iw.m.Unlock() + + iw.queue <- job +} diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go new file mode 100644 index 0000000..40825bf --- /dev/null +++ b/internal/queue/queue_test.go @@ -0,0 +1,100 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package queue + +import ( + "math/rand" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestWorkQueue(t *testing.T) { + concurrency := 10000 + jobs := 1000000 + executed := new(atomic.Int32) + + var m sync.Mutex + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + getTimeout := func() time.Duration { + m.Lock() + defer m.Unlock() + return time.Duration(r.Intn(10)) * time.Millisecond + } + + perJobFn := func(path string) { + time.Sleep(getTimeout()) + executed.Add(1) + } + + queue := NewWorkQueue(perJobFn, concurrency) + for i := 0; i < jobs; i++ { + queue.Add(strconv.Itoa(i)) + } + + queue.CloseAndWait() + + if len(queue.hm) != 0 { + t.Errorf("expected queue to be empty, but got %d jobs", len(queue.hm)) + } + if executed.Load() != int32(jobs) { + t.Errorf("expected %d jobs to have been executed, but got %d", jobs, executed.Load()) + } +} + +func TestAddToClosedWorkQueue(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("expected panic") + } + }() + queue := NewWorkQueue(func(string) {}, 1) + queue.CloseAndWait() + queue.Add("this should panic") +} + +func TestAddSameJobToWorkQueue(t *testing.T) { + executed := new(atomic.Int32) + + perJobFn := func(path string) { + time.Sleep(10 * time.Millisecond) + executed.Add(1) + } + + queue := NewWorkQueue(perJobFn, 2) + + // add same job 100 times + // since each job takes 10 ms to execute, only one job should be expected to have been + // executed because 100 jobs should have time to be added to the queue before the first + // job is finished + for i := 0; i < 100; i++ { + queue.Add("job") + } + + queue.CloseAndWait() + + // only one job should have been executed + want := int32(1) + got := executed.Load() + + if got != want { + t.Errorf("expected %d jobs to have been executed, but got %d", want, got) + } +} diff --git a/internal/warc/testdata/invalid.warc.gz b/internal/warc/testdata/invalid.warc.gz new file mode 100644 index 0000000..ac47116 --- /dev/null +++ b/internal/warc/testdata/invalid.warc.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:2fffc1398c7712c34afda2f6a4dea5072a48357954176d775499ac31cd6cdbc4 +size 3649 diff --git a/internal/warc/testdata/valid.warc.gz b/internal/warc/testdata/valid.warc.gz new file mode 100644 index 0000000..f3a4350 --- /dev/null +++ b/internal/warc/testdata/valid.warc.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9d3f830f757a3733d56de5b76b5a3ccabb5de4cfdfed2559205b9c719ee6cbdb +size 1779 diff --git a/internal/warc/validate.go b/internal/warc/validate.go new file mode 100644 index 0000000..514e498 --- /dev/null +++ b/internal/warc/validate.go @@ -0,0 +1,55 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package warc + +import ( + "errors" + "io" + + "github.com/nlnwa/gowarc" +) + +func IsValid(file, tmpDir string) (bool, error) { + wf, err := gowarc.NewWarcFileReader(file, 0, + gowarc.WithBufferTmpDir(tmpDir), + ) + if err != nil { + return false, err + } + defer wf.Close() + + for { + wr, _, validation, err := wf.Next() + if errors.Is(err, io.EOF) { + return validation.Valid(), nil + } + if err != nil { + *validation = append(*validation, err) + } + func() { + defer wr.Close() + err = wr.ValidateDigest(validation) + if err != nil { + *validation = append(*validation, err) + } + }() + // stop processing if we have found an invalid record + if !validation.Valid() { + return false, nil + } + } +} diff --git a/internal/warc/validate_test.go b/internal/warc/validate_test.go new file mode 100644 index 0000000..e6c556b --- /dev/null +++ b/internal/warc/validate_test.go @@ -0,0 +1,58 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package warc + +import ( + "testing" +) + +const ( + validWarcFile = "testdata/valid.warc.gz" + invalidWarcFile = "testdata/invalid.warc.gz" +) + +func TestMain(m *testing.M) { + m.Run() +} + +func TestIsValid(t *testing.T) { + tests := []struct { + file string + isValid bool + }{ + { + file: validWarcFile, + isValid: true, + }, + { + file: invalidWarcFile, + isValid: false, + }, + } + + for _, test := range tests { + t.Run(test.file, func(t *testing.T) { + isValid, err := IsValid(test.file, "") + if err != nil { + t.Errorf("expected no error, got: %v", err) + } + if isValid != test.isValid { + t.Errorf("expected %v, got: %v", test.isValid, isValid) + } + }) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..72931a1 --- /dev/null +++ b/main.go @@ -0,0 +1,86 @@ +/* + * Copyright 2023 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "flag" + "fmt" + "log/slog" + "net/http" + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "github.com/nlnwa/fai/internal/fai" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +func main() { + sourceDir := "" + validTargetDir := "" + invalidTargetDir := "" + tmpDir := "" + + concurrency := runtime.NumCPU() + sleep := 5 * time.Second + pattern := "*" + metricsPort := 8081 + + flag.StringVar(&sourceDir, "source-dir", sourceDir, "path to source directory") + flag.StringVar(&validTargetDir, "valid-target-dir", validTargetDir, "path to target directory where valid files and their corresponding checksum files will be moved to") + flag.StringVar(&invalidTargetDir, "invalid-target-dir", invalidTargetDir, "path to target directory where invalid files and their corresponding checksum files will be moved to") + flag.StringVar(&tmpDir, "tmp-dir", tmpDir, "path to directory where temporary buffer files will be stored") + flag.IntVar(&concurrency, "concurrency", concurrency, "number of concurrent files processed") + flag.DurationVar(&sleep, "sleep", sleep, "sleep duration between directory listings, set to 0 to only do a single run") + flag.StringVar(&pattern, "pattern", pattern, "glob pattern used to match filenames in source directory") + flag.IntVar(&metricsPort, "metrics-port", metricsPort, "port to expose metrics on") + flag.Parse() + + logger := slog.Default() + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + go func() { + defer cancel() + http.Handle("/metrics", promhttp.Handler()) + err := http.ListenAndServe(fmt.Sprintf(":%d", metricsPort), nil) + if err != nil { + logger.Error("Failed to start metrics server", "error", err) + } + }() + + f, err := fai.New( + fai.WithSourceDir(sourceDir), + fai.WithValidTargetDir(validTargetDir), + fai.WithInvalidTargetDir(invalidTargetDir), + fai.WithTmpDir(tmpDir), + fai.WithConcurrency(concurrency), + fai.WithSleep(sleep), + fai.WithGlobPattern(pattern), + fai.WithLogger(logger), + ) + if err != nil { + logger.Error("", "error", err) + os.Exit(1) + } + + f.Run(ctx) +}