Skip to content

Commit

Permalink
Add mergedb subcommand for merging multiple separate multi runs (#53)
Browse files Browse the repository at this point in the history
* Add new wait pkg.

* Add neaten.Touch, DeleteAllPrefixedDirEntries and MergeDGUTDBDirectories().

* Upgrade golangci-lint and disable dot-import rule.

---------

Co-authored-by: Ash Holland <[email protected]>
  • Loading branch information
sb10 and sersorrel authored Jan 17, 2024
1 parent 284dc5a commit 37ab138
Show file tree
Hide file tree
Showing 15 changed files with 945 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.54.2
version: v1.55.2
only-new-issues: true
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ linters-settings:
simple: true
range-loops: true
for-loops: true
revive:
rules:
- name: dot-imports
disabled: true
unparam:
check-exported: true
unused:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ bench: export CGO_ENABLED = 1
bench:
go test -tags netgo --count 1 -run Bench -bench=. ./...

# curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.50.1
# curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.55.2
lint: export CGO_ENABLED = 1
lint:
@golangci-lint run
Expand Down
6 changes: 3 additions & 3 deletions basedirs/basedirs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var basedirMDTRegexp = regexp.MustCompile(`\/mdt\d(\/|\z)`)
// BaseDirs is used to summarise disk usage information by base directory and
// group or user.
type BaseDirs struct {
dir string
dbPath string
tree *dgut.Tree
quotas *Quotas
ch codec.Handle
Expand All @@ -58,14 +58,14 @@ type BaseDirs struct {

// NewCreator returns a BaseDirs that lets you create a database summarising
// usage information by base directory, taken from the given tree and quotas.
func NewCreator(dir string, tree *dgut.Tree, quotas *Quotas) (*BaseDirs, error) {
func NewCreator(dbPath string, tree *dgut.Tree, quotas *Quotas) (*BaseDirs, error) {
mp, err := getMountPoints()
if err != nil {
return nil, err
}

return &BaseDirs{
dir: dir,
dbPath: dbPath,
tree: tree,
quotas: quotas,
ch: new(codec.BincHandle),
Expand Down
84 changes: 84 additions & 0 deletions basedirs/basedirs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,18 @@ import (
internaldb "github.com/wtsi-ssg/wrstat/v4/internal/db"
"github.com/wtsi-ssg/wrstat/v4/internal/fixtimes"
"github.com/wtsi-ssg/wrstat/v4/summary"
bolt "go.etcd.io/bbolt"
)

func TestBaseDirs(t *testing.T) { //nolint:gocognit
csvPath := internaldata.CreateQuotasCSV(t, `1,/lustre/scratch125,4000000000,20
2,/lustre/scratch125,300,30
2,/lustre/scratch123,400,40
77777,/lustre/scratch125,500,50
1,/nfs/scratch125,4000000000,20
2,/nfs/scratch125,300,30
2,/nfs/scratch123,400,40
77777,/nfs/scratch125,500,50
`)

Convey("Given a Tree and Quotas you can make a BaseDirs", t, func() {
Expand Down Expand Up @@ -768,6 +773,85 @@ func TestBaseDirs(t *testing.T) { //nolint:gocognit
So(wso, ShouldEqual, expectedProjectASubDirUsage)
})
})

Convey("and merge with another database", func() {
_, newFiles := internaldata.FakeFilesForDGUTDBForBasedirsTesting(gid, uid)
for i := range newFiles {
newFiles[i].Path = "/nfs" + newFiles[i].Path[7:]
}

newTree, err := internaldb.CreateDGUTDBFromFakeFiles(t, newFiles)
So(err, ShouldBeNil)

newDBPath := filepath.Join(dir, "newdir.db")

newBd, err := NewCreator(newDBPath, newTree, quotas)
So(err, ShouldBeNil)
So(bd, ShouldNotBeNil)

newBd.mountPoints = mountPoints{
"/nfs/scratch123/",
"/nfs/scratch125/",
}

err = newBd.CreateDatabase(yesterday)
So(err, ShouldBeNil)

outputDBPath := filepath.Join(dir, "merged.db")

err = MergeDBs(dbPath, newDBPath, outputDBPath)
So(err, ShouldBeNil)

db, err := bolt.Open(outputDBPath, dbOpenMode, &bolt.Options{
ReadOnly: true,
})

So(err, ShouldBeNil)
defer db.Close()

countKeys := func(bucket string) (int, int) {
lustreKeys, nfsKeys := 0, 0

db.View(func(tx *bolt.Tx) error { //nolint:errcheck
bucket := tx.Bucket([]byte(bucket))

return bucket.ForEach(func(k, _ []byte) error {
if strings.Contains(string(k), "/lustre/") {
lustreKeys++
}
if strings.Contains(string(k), "/nfs/") {
nfsKeys++
}

return nil
})
})

return lustreKeys, nfsKeys
}

expectedKeys := 6

lustreKeys, nfsKeys := countKeys(groupUsageBucket)
So(lustreKeys, ShouldEqual, expectedKeys)
So(nfsKeys, ShouldEqual, expectedKeys)

lustreKeys, nfsKeys = countKeys(groupHistoricalBucket)
So(lustreKeys, ShouldEqual, 5)
So(nfsKeys, ShouldEqual, 5)

lustreKeys, nfsKeys = countKeys(groupSubDirsBucket)
So(lustreKeys, ShouldEqual, expectedKeys)
So(nfsKeys, ShouldEqual, expectedKeys)

lustreKeys, nfsKeys = countKeys(userUsageBucket)
So(lustreKeys, ShouldEqual, expectedKeys)
So(nfsKeys, ShouldEqual, expectedKeys)

lustreKeys, nfsKeys = countKeys(userSubDirsBucket)
So(lustreKeys, ShouldEqual, expectedKeys)
So(nfsKeys, ShouldEqual, expectedKeys)
})
})
})
}
Expand Down
89 changes: 84 additions & 5 deletions basedirs/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,7 @@ type Usage struct {
// Provide a time that will be used as the date when appending to the historical
// data.
func (b *BaseDirs) CreateDatabase(historyDate time.Time) error {
db, err := bolt.Open(b.dir, dbOpenMode, &bolt.Options{
NoFreelistSync: true,
NoGrowSync: true,
FreelistType: bolt.FreelistMapType,
})
db, err := openDB(b.dbPath)
if err != nil {
return err
}
Expand All @@ -112,6 +108,14 @@ func (b *BaseDirs) CreateDatabase(historyDate time.Time) error {
return db.Close()
}

func openDB(dbPath string) (*bolt.DB, error) {
return bolt.Open(dbPath, dbOpenMode, &bolt.Options{
NoFreelistSync: true,
NoGrowSync: true,
FreelistType: bolt.FreelistMapType,
})
}

func (b *BaseDirs) updateDatabase(historyDate time.Time, gids, uids []uint32) func(*bolt.Tx) error { //nolint:gocognit
return func(tx *bolt.Tx) error {
if err := clearUsageBuckets(tx); err != nil {
Expand Down Expand Up @@ -566,3 +570,78 @@ func (b *BaseDirs) history(bucket *bolt.Bucket, gid uint32, path string) ([]Hist

return history, err
}

// MergeDBs merges the basedirs.db database at the given A and B paths and
// creates a new database file at outputPath.
func MergeDBs(pathA, pathB, outputPath string) error { //nolint:funlen
var (
err error
dbA, dbB, dbC *bolt.DB
)

closeDB := func(db *bolt.DB) {
errc := db.Close()
if err == nil {
err = errc
}
}

dbA, err = openDB(pathA)
if err != nil {
return err
}

defer closeDB(dbA)

dbB, err = openDB(pathB)
if err != nil {
return err
}

defer closeDB(dbB)

dbC, err = openDB(outputPath)
if err != nil {
return err
}

defer closeDB(dbC)

err = dbC.Update(func(tx *bolt.Tx) error {
err = transferAllBucketContents(tx, dbA)
if err != nil {
return err
}

return transferAllBucketContents(tx, dbB)
})

return err
}

func transferAllBucketContents(utx *bolt.Tx, source *bolt.DB) error {
if err := createBucketsIfNotExist(utx); err != nil {
return err
}

return source.View(func(vtx *bolt.Tx) error {
for _, bucket := range []string{groupUsageBucket, groupHistoricalBucket,
groupSubDirsBucket, userUsageBucket, userSubDirsBucket} {
err := transferBucketContents(vtx, utx, bucket)
if err != nil {
return err
}
}

return nil
})
}

func transferBucketContents(vtx, utx *bolt.Tx, bucketName string) error {
sourceBucket := vtx.Bucket([]byte(bucketName))
destBucket := utx.Bucket([]byte(bucketName))

return sourceBucket.ForEach(func(k, v []byte) error {
return destBucket.Put(k, v)
})
}
Loading

0 comments on commit 37ab138

Please sign in to comment.