Skip to content

Commit

Permalink
Performance enhancements for DB v6 writes (#2394)
Browse files Browse the repository at this point in the history
* cache entry IDs when writing to the DB

Signed-off-by: Alex Goodman <[email protected]>

* update json schema for db search cmds

Signed-off-by: Alex Goodman <[email protected]>

* keep transactions simple and add package write cache

Signed-off-by: Alex Goodman <[email protected]>

* ensure query planner accounts for indexes

Signed-off-by: Alex Goodman <[email protected]>

* account for cleaning OS values

Signed-off-by: Alex Goodman <[email protected]>

---------

Signed-off-by: Alex Goodman <[email protected]>
  • Loading branch information
wagoodman authored Jan 23, 2025
1 parent de80302 commit 78db49c
Show file tree
Hide file tree
Showing 20 changed files with 1,070 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (c *CPE) String() string {
if c == nil {
return ""
}

return v6.Cpe(*c).String()
}

Expand Down
101 changes: 79 additions & 22 deletions grype/db/internal/gormadapter/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var writerStatements = []string{
var heavyWriteStatements = []string{
`PRAGMA cache_size = -1073741824`, // ~1 GB (negative means treat as bytes not page count); one caveat is to not pick a value that risks swapping behavior, negating performance gains
`PRAGMA mmap_size = 1073741824`, // ~1 GB; the maximum size of the memory-mapped I/O buffer (to access the database file as if it were a part of the process’s virtual memory)
`PRAGMA defer_foreign_keys = ON`, // defer enforcement of foreign key constraints until the end of the transaction (to avoid the overhead of checking constraints for each row)
}

var readConnectionOptions = []string{
Expand Down Expand Up @@ -146,69 +147,125 @@ func Open(path string, options ...Option) (*gorm.DB, error) {
return nil, fmt.Errorf("unable to connect to DB: %w", err)
}

return prepareDB(dbObj, cfg)
return cfg.prepareDB(dbObj)
}

func prepareDB(dbObj *gorm.DB, cfg config) (*gorm.DB, error) {
if cfg.writable {
log.Trace("using writable DB statements")
if err := applyStatements(dbObj, writerStatements); err != nil {
func (c config) prepareDB(dbObj *gorm.DB) (*gorm.DB, error) {
if c.writable {
log.Debug("using writable DB statements")
if err := c.applyStatements(dbObj, writerStatements); err != nil {
return nil, fmt.Errorf("unable to apply DB writer statements: %w", err)
}
}

if cfg.truncate && cfg.allowLargeMemoryFootprint {
log.Trace("using large memory footprint DB statements")
if err := applyStatements(dbObj, heavyWriteStatements); err != nil {
if c.truncate && c.allowLargeMemoryFootprint {
log.Debug("using large memory footprint DB statements")
if err := c.applyStatements(dbObj, heavyWriteStatements); err != nil {
return nil, fmt.Errorf("unable to apply DB heavy writer statements: %w", err)
}
}

if len(commonStatements) > 0 {
if err := applyStatements(dbObj, commonStatements); err != nil {
if err := c.applyStatements(dbObj, commonStatements); err != nil {
return nil, fmt.Errorf("unable to apply DB common statements: %w", err)
}
}

if len(cfg.statements) > 0 {
if err := applyStatements(dbObj, cfg.statements); err != nil {
if len(c.statements) > 0 {
if err := c.applyStatements(dbObj, c.statements); err != nil {
return nil, fmt.Errorf("unable to apply DB custom statements: %w", err)
}
}

if len(cfg.models) > 0 && cfg.writable {
log.Trace("applying DB migrations")
if err := dbObj.AutoMigrate(cfg.models...); err != nil {
if len(c.models) > 0 && c.writable {
log.Debug("applying DB migrations")
if err := dbObj.AutoMigrate(c.models...); err != nil {
return nil, fmt.Errorf("unable to migrate: %w", err)
}
// now that there are potentially new models and indexes, analyze the DB to ensure the query planner is up-to-date
if err := dbObj.Exec("ANALYZE").Error; err != nil {
return nil, fmt.Errorf("unable to analyze DB: %w", err)
}
}

if len(cfg.initialData) > 0 && cfg.truncate {
log.Trace("writing initial data")
for _, d := range cfg.initialData {
if len(c.initialData) > 0 && c.truncate {
log.Debug("writing initial data")
for _, d := range c.initialData {
if err := dbObj.Create(d).Error; err != nil {
return nil, fmt.Errorf("unable to create initial data: %w", err)
}
}
}

if cfg.debug {
if c.debug {
dbObj = dbObj.Debug()
}

return dbObj, nil
}

func applyStatements(db *gorm.DB, statements []string) error {
func (c config) applyStatements(db *gorm.DB, statements []string) error {
for _, sqlStmt := range statements {
db.Exec(sqlStmt)
if db.Error != nil {
return fmt.Errorf("unable to execute (%s): %w", sqlStmt, db.Error)
if err := db.Exec(sqlStmt).Error; err != nil {
return fmt.Errorf("unable to execute (%s): %w", sqlStmt, err)
}
if strings.HasPrefix(sqlStmt, "PRAGMA") {
name, value, err := c.pragmaNameValue(sqlStmt)
if err != nil {
return fmt.Errorf("unable to parse PRAGMA statement: %w", err)
}

var result string
if err := db.Raw("PRAGMA " + name + ";").Scan(&result).Error; err != nil {
return fmt.Errorf("unable to verify PRAGMA %q: %w", name, err)
}

if !strings.EqualFold(result, value) {
if value == "ON" && result == "1" {
continue
}
if value == "OFF" && result == "0" {
continue
}
return fmt.Errorf("PRAGMA %q was not set to %q (%q)", name, value, result)
}
}
}
return nil
}

func (c config) pragmaNameValue(sqlStmt string) (string, string, error) {
sqlStmt = strings.TrimSuffix(strings.TrimSpace(sqlStmt), ";") // remove the trailing semicolon
if strings.Count(sqlStmt, ";") > 0 {
return "", "", fmt.Errorf("PRAGMA statements should not contain semicolons: %q", sqlStmt)
}

// check if the pragma was set, parse the pragma name and value from the statement. This is because
// sqlite will not return errors when there are issues with the pragma key or value, but it will
// be inconsistent with the expected value if you explicitly check
var name, value string

clean := strings.TrimPrefix(sqlStmt, "PRAGMA")
fields := strings.SplitN(clean, "=", 2)
if len(fields) == 2 {
name = strings.ToLower(strings.TrimSpace(fields[0]))
value = strings.TrimSpace(fields[1])
} else {
return "", "", fmt.Errorf("unable to parse PRAGMA statement: %q", sqlStmt)
}

if c.memory && name == "mmap_size" {
// memory only DBs do not have mmap capability
value = ""
}

if name == "" {
return "", "", fmt.Errorf("unable to parse name from PRAGMA statement: %q", sqlStmt)
}

return name, value, nil
}

func deleteDB(path string) error {
if _, err := os.Stat(path); err == nil {
if err := os.Remove(path); err != nil {
Expand Down
95 changes: 95 additions & 0 deletions grype/db/internal/gormadapter/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,98 @@ func TestPrepareWritableDB(t *testing.T) {
require.Contains(t, err.Error(), "unable to create parent directory")
})
}

func TestPragmaNameValue(t *testing.T) {
tests := []struct {
name string
cfg config
input string
wantName string
wantValue string
wantErr require.ErrorAssertionFunc
}{
{
name: "basic pragma",
cfg: config{memory: false},
input: "PRAGMA journal_mode=WAL",
wantName: "journal_mode",
wantValue: "WAL",
},
{
name: "pragma with spaces",
cfg: config{memory: false},
input: "PRAGMA cache_size = 2000 ",
wantName: "cache_size",
wantValue: "2000",
},
{
name: "pragma with trailing semicolon",
cfg: config{memory: false},
input: "PRAGMA synchronous=NORMAL;",
wantName: "synchronous",
wantValue: "NORMAL",
},
{
name: "pragma with multiple semicolons",
cfg: config{memory: false},
input: "PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;",
wantErr: require.Error,
},
{
name: "invalid pragma format",
cfg: config{memory: false},
input: "PRAGMA invalid_format",
wantErr: require.Error,
},
{
name: "mmap_size pragma with memory DB",
cfg: config{memory: true},
input: "PRAGMA mmap_size=1000",
wantName: "mmap_size",
wantValue: "", // should be empty for memory DB
},
{
name: "mmap_size pragma with regular DB",
cfg: config{memory: false},
input: "PRAGMA mmap_size=1000",
wantName: "mmap_size",
wantValue: "1000",
},
{
name: "pragma with numeric value",
cfg: config{memory: false},
input: "PRAGMA page_size=4096",
wantName: "page_size",
wantValue: "4096",
},
{
name: "pragma with mixed case",
cfg: config{memory: false},
input: "PRAGMA Journal_Mode=WAL",
wantName: "journal_mode",
wantValue: "WAL",
},
{
name: "empty pragma",
cfg: config{memory: false},
input: "PRAGMA =value",
wantErr: require.Error,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.wantErr == nil {
tt.wantErr = require.NoError
}

gotName, gotValue, err := tt.cfg.pragmaNameValue(tt.input)
tt.wantErr(t, err)

if err == nil {
require.Equal(t, tt.wantName, gotName)
require.Equal(t, tt.wantValue, gotValue)
}
})
}
}
61 changes: 59 additions & 2 deletions grype/db/v6/affected_cpe_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,70 @@ func newAffectedCPEStore(db *gorm.DB, bs *blobStore) *affectedCPEStore {

// AddAffectedCPEs adds one or more affected CPEs to the store
func (s *affectedCPEStore) AddAffectedCPEs(packages ...*AffectedCPEHandle) error {
if err := s.addCpes(packages...); err != nil {
return fmt.Errorf("unable to add CPEs from affected package CPEs: %w", err)
}
for _, pkg := range packages {
if err := s.blobStore.addBlobable(pkg); err != nil {
return fmt.Errorf("unable to add affected package blob: %w", err)
}

if err := s.db.Create(pkg).Error; err != nil {
return fmt.Errorf("unable to add affected CPE: %w", err)
if err := s.db.Omit("CPE").Create(pkg).Error; err != nil {
return fmt.Errorf("unable to add affected CPEs: %w", err)
}
}
return nil
}

func (s *affectedCPEStore) addCpes(packages ...*AffectedCPEHandle) error { // nolint:dupl
cacheInst, ok := cacheFromContext(s.db.Statement.Context)
if !ok {
return fmt.Errorf("unable to fetch CPE cache from context")
}

var final []*Cpe
byCacheKey := make(map[string][]*Cpe)
for _, p := range packages {
if p.CPE != nil {
key := p.CPE.cacheKey()
if existingID, ok := cacheInst.getID(p.CPE); ok {
// seen in a previous transaction...
p.CpeID = existingID
} else if _, ok := byCacheKey[key]; !ok {
// not seen within this transaction
final = append(final, p.CPE)
}
byCacheKey[key] = append(byCacheKey[key], p.CPE)
}
}

if len(final) == 0 {
return nil
}

if err := s.db.Create(final).Error; err != nil {
return fmt.Errorf("unable to create CPE records: %w", err)
}

// update the cache with the new records
for _, ref := range final {
cacheInst.set(ref)
}

// update all references with the IDs from the cache
for _, refs := range byCacheKey {
for _, ref := range refs {
id, ok := cacheInst.getID(ref)
if ok {
ref.setRowID(id)
}
}
}

// update the parent objects with the FK ID
for _, p := range packages {
if p.CPE != nil {
p.CpeID = p.CPE.ID
}
}
return nil
Expand Down
12 changes: 8 additions & 4 deletions grype/db/v6/affected_cpe_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestAffectedCPEStore_AddAffectedCPEs(t *testing.T) {
},
Name: "CVE-2023-5678",
},
CpeID: 1,
CPE: &Cpe{
Part: "a",
Vendor: "vendor-1",
Expand Down Expand Up @@ -148,8 +147,7 @@ func TestAffectedCPEStore_PreventDuplicateCPEs(t *testing.T) {
ID: "nvd",
},
},
CpeID: 1,
CPE: &Cpe{
CPE: &Cpe{ // ID = 1
Part: "a",
Vendor: "vendor-1",
Product: "product-1",
Expand All @@ -171,8 +169,9 @@ func TestAffectedCPEStore_PreventDuplicateCPEs(t *testing.T) {
ID: "nvd",
},
},
CpeID: 2,
CpeID: 2, // for testing explicitly set to 2, but this is unrealistic
CPE: &Cpe{
ID: 2, // different, again, unrealistic but useful for testing
Part: "a", // same
Vendor: "vendor-1", // same
Product: "product-1", // same
Expand All @@ -199,6 +198,11 @@ func TestAffectedCPEStore_PreventDuplicateCPEs(t *testing.T) {
PreloadVulnerability: true,
})
require.NoError(t, err)

// the CPEs should be the same, and the store should reconcile the IDs
duplicateCPE.CpeID = cpe1.CpeID
duplicateCPE.CPE.ID = cpe1.CPE.ID

expected := []AffectedCPEHandle{*cpe1, *duplicateCPE}
require.Len(t, actualHandles, len(expected), "expected both handles to be stored")
if d := cmp.Diff(expected, actualHandles); d != "" {
Expand Down
Loading

0 comments on commit 78db49c

Please sign in to comment.