diff --git a/apiserver.go b/apiserver.go index b75f603..2e1238c 100644 --- a/apiserver.go +++ b/apiserver.go @@ -47,12 +47,13 @@ func (api *apiServer) Start(ctx context.Context) { engine.NoMethod(uniresp.NoMethodHandler) engine.NoRoute(uniresp.NotFoundHandler) - handler := Actions{BgJob: api.arch} + handler := Actions{ArchKeeper: api.arch} engine.GET("/overview", handler.Overview) engine.GET("/record/:id", handler.GetRecord) engine.GET("/validate/:id", handler.Validate) engine.POST("/fix/:id", handler.Fix) + engine.POST("/dedup-reset", handler.DedupReset) api.server = &http.Server{ Handler: engine, diff --git a/archiver/archiver.go b/archiver/archiver.go index e4d3cdc..a64eef0 100644 --- a/archiver/archiver.go +++ b/archiver/archiver.go @@ -17,6 +17,7 @@ package archiver import ( + "camus/cncdb" "camus/reporting" "context" "fmt" @@ -25,19 +26,35 @@ import ( "github.com/rs/zerolog/log" ) +// ArchKeeper handles continuous operations related +// to the concordance archive (contrary to the name, it +// also contains word lists, paradigm. queries and keyword +// queries). +// The main responsibility of ArchKeeper is to read queued +// query IDs, read them ASAP from Redis and store them +// to kontext_conc_persistence SQL table. +// Due to the nature of the partitioning of the table, ArchKeeper +// must also involve some deduplication to prevent extensive +// growth of duplicate records. It is not expected that +// ArchKeeper will catch 100% of duplicates because there is +// also a cleanup job that removes old unused records and +// for each checked record, it also performs a deduplication. But +// the job affects only years old records so we still need +// to prevent (at least some) recent duplicates so that the database +// is reasonably large. type ArchKeeper struct { - redis *RedisAdapter - db IMySQLOps - reporting reporting.IReporting - checkInterval time.Duration - checkIntervalChunk int - dedup *Deduplicator - tz *time.Location - stats reporting.OpStats + redis *RedisAdapter + db cncdb.IMySQLOps + reporting reporting.IReporting + conf *Conf + dedup *Deduplicator + tz *time.Location + stats reporting.OpStats } +// Start starts the ArchKeeper service func (job *ArchKeeper) Start(ctx context.Context) { - ticker := time.NewTicker(job.checkInterval) + ticker := time.NewTicker(job.conf.CheckInterval()) go func() { for { select { @@ -51,6 +68,7 @@ func (job *ArchKeeper) Start(ctx context.Context) { }() } +// Stop stops the ArchKeeper service func (job *ArchKeeper) Stop(ctx context.Context) error { log.Warn().Msg("stopping ArchKeeper") if err := job.dedup.OnClose(); err != nil { @@ -59,19 +77,30 @@ func (job *ArchKeeper) Stop(ctx context.Context) error { return nil } -func (job *ArchKeeper) StoreToDisk() { +// StoreToDisk stores current operations data from RAM +// to a configured disk file. +func (job *ArchKeeper) StoreToDisk() error { + return job.dedup.StoreToDisk() +} +// Reset clears current operations data stored in RAM +// and initializes itself according to the configuration. +func (job *ArchKeeper) Reset() error { + return job.dedup.Reset() } +// GetStats returns statistics related to ArchKeeper operations. +// We use it mainly for pushing stats to a TimescaleDB instance. func (job *ArchKeeper) GetStats() reporting.OpStats { return job.stats } -func (job *ArchKeeper) LoadRecordsByID(concID string) ([]ArchRecord, error) { +func (job *ArchKeeper) LoadRecordsByID(concID string) ([]cncdb.ArchRecord, error) { return job.db.LoadRecordsByID(concID) } -func (job *ArchKeeper) handleImplicitReq(rec ArchRecord, item queueRecord, currStats *reporting.OpStats) bool { +func (job *ArchKeeper) handleImplicitReq( + rec cncdb.ArchRecord, item queueRecord, currStats *reporting.OpStats) bool { match, err := job.dedup.TestAndSolve(rec) if err != nil { @@ -106,7 +135,8 @@ func (job *ArchKeeper) handleImplicitReq(rec ArchRecord, item queueRecord, currS return false } -func (job *ArchKeeper) handleExplicitReq(rec ArchRecord, item queueRecord, currStats *reporting.OpStats) { +func (job *ArchKeeper) handleExplicitReq( + rec cncdb.ArchRecord, item queueRecord, currStats *reporting.OpStats) { exists, err := job.db.ContainsRecord(rec.ID) if err != nil { currStats.NumErrors++ @@ -132,7 +162,7 @@ func (job *ArchKeeper) handleExplicitReq(rec ArchRecord, item queueRecord, currS } func (job *ArchKeeper) performCheck() error { - items, err := job.redis.NextNItems(int64(job.checkIntervalChunk)) + items, err := job.redis.NextNItems(int64(job.conf.CheckIntervalChunk)) log.Debug(). AnErr("error", err). Int("itemsToProcess", len(items)). @@ -175,26 +205,25 @@ func (job *ArchKeeper) performCheck() error { return nil } -func (job *ArchKeeper) DeduplicateInArchive(curr []ArchRecord, rec ArchRecord) (ArchRecord, error) { +func (job *ArchKeeper) DeduplicateInArchive( + curr []cncdb.ArchRecord, rec cncdb.ArchRecord) (cncdb.ArchRecord, error) { return job.db.DeduplicateInArchive(curr, rec) } func NewArchKeeper( redis *RedisAdapter, - db IMySQLOps, + db cncdb.IMySQLOps, dedup *Deduplicator, reporting reporting.IReporting, tz *time.Location, - checkInterval time.Duration, - checkIntervalChunk int, + conf *Conf, ) *ArchKeeper { return &ArchKeeper{ - redis: redis, - db: db, - dedup: dedup, - reporting: reporting, - tz: tz, - checkInterval: checkInterval, - checkIntervalChunk: checkIntervalChunk, + redis: redis, + db: db, + dedup: dedup, + reporting: reporting, + tz: tz, + conf: conf, } } diff --git a/archiver/conf.go b/archiver/conf.go index b434454..74df13d 100644 --- a/archiver/conf.go +++ b/archiver/conf.go @@ -17,36 +17,54 @@ package archiver import ( + "camus/util" "fmt" + "time" "github.com/rs/zerolog/log" ) -type RedisConf struct { - Host string `json:"host"` - Port int `json:"port"` - DB int `json:"db"` - Password string `json:"password"` - QueueKey string `json:"queueKey"` - FailedQueueKey string `json:"failedQueueKey"` - FailedRecordsKey string `json:"failedRecordsKey"` +const ( + dfltPreloadLastNItems = 500 +) + +type Conf struct { + DDStateFilePath string `json:"ddStateFilePath"` + CheckIntervalSecs int `json:"checkIntervalSecs"` + CheckIntervalChunk int `json:"checkIntervalChunk"` + PreloadLastNItems int `json:"preloadLastNItems"` } -func (conf *RedisConf) ValidateAndDefaults() error { - if conf.DB == 0 { - return fmt.Errorf("missing Redis configuration: `db`") +func (conf *Conf) CheckInterval() time.Duration { + return time.Duration(conf.CheckIntervalSecs) * time.Second +} + +func (conf *Conf) ValidateAndDefaults() error { + if conf == nil { + return fmt.Errorf("missing `archiver` section") } - if conf.QueueKey == "" { - return fmt.Errorf("missing Redis configuration: `queueKey`") + if conf.DDStateFilePath == "" { + return fmt.Errorf("missing path to deduplicator state file (ddStateFilePath)") } - if conf.FailedQueueKey == "" { - conf.FailedQueueKey = conf.QueueKey + "_failed" + + tmp, err := util.NearestPrime(conf.CheckIntervalSecs) + if err != nil { + return fmt.Errorf("failed to tune ops timing: %w", err) + } + if tmp != conf.CheckIntervalSecs { log.Warn(). - Str("value", conf.FailedQueueKey). - Msg("Redis configuration `failedQueueKey` missing - using default") + Int("oldValue", conf.CheckIntervalSecs). + Int("newValue", tmp). + Msg("tuned value of checkIntervalSecs so it cannot be easily overlapped by other timers") + conf.CheckIntervalSecs = tmp } - if conf.FailedRecordsKey == "" { - return fmt.Errorf("missing Redis configuration: `failedRecordsKey`") + + if conf.PreloadLastNItems == 0 { + conf.PreloadLastNItems = dfltPreloadLastNItems + log.Warn(). + Int("value", conf.PreloadLastNItems). + Msg("archiver value `preloadLastNItems` not set, using default") } + return nil } diff --git a/archiver/deduplicator.go b/archiver/deduplicator.go index 6fba733..758b582 100644 --- a/archiver/deduplicator.go +++ b/archiver/deduplicator.go @@ -17,8 +17,10 @@ package archiver import ( + "camus/cncdb" "fmt" "os" + "sync" "time" "github.com/bits-and-blooms/bloom" @@ -32,20 +34,22 @@ const ( ) type Deduplicator struct { - items *bloom.BloomFilter - concDB IMySQLOps - tz *time.Location - preloadLastN int - storageFilePath string + knownIDs *bloom.BloomFilter + knownIDsMutex *sync.RWMutex + concDB cncdb.IMySQLOps + tz *time.Location + conf *Conf } func (dd *Deduplicator) StoreToDisk() error { - f, err := os.OpenFile(dd.storageFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + f, err := os.OpenFile(dd.conf.DDStateFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { return fmt.Errorf("failed to store deduplicator state to disk: %w", err) } defer f.Close() - _, err = dd.items.WriteTo(f) + dd.knownIDsMutex.Lock() + defer dd.knownIDsMutex.Unlock() + _, err = dd.knownIDs.WriteTo(f) if err != nil { return fmt.Errorf("failed to store deduplicator state to disk: %w", err) } @@ -57,12 +61,14 @@ func (dd *Deduplicator) OnClose() error { } func (dd *Deduplicator) LoadFromDisk() error { - f, err := os.Open(dd.storageFilePath) + f, err := os.Open(dd.conf.DDStateFilePath) if err != nil { return fmt.Errorf("failed to load deduplicator state from disk: %w", err) } defer f.Close() - _, err = dd.items.ReadFrom(f) + dd.knownIDsMutex.Lock() + defer dd.knownIDsMutex.Unlock() + _, err = dd.knownIDs.ReadFrom(f) if err != nil { return fmt.Errorf("failed to load deduplicator state from disk: %w", err) } @@ -70,33 +76,40 @@ func (dd *Deduplicator) LoadFromDisk() error { } func (dd *Deduplicator) Add(concID string) { - dd.items.AddString(concID) + dd.knownIDsMutex.Lock() + defer dd.knownIDsMutex.Unlock() + dd.knownIDs.AddString(concID) } func (dd *Deduplicator) Reset() error { log.Warn().Msg("performing deduplicator reset") - dd.items.ClearAll() - if dd.preloadLastN > 0 { - return dd.PreloadLastNItems(dd.preloadLastN) + dd.knownIDsMutex.Lock() + defer dd.knownIDsMutex.Unlock() + dd.knownIDs.ClearAll() + if dd.conf.PreloadLastNItems > 0 { + return dd.preloadLastNItems() } return nil } -func (dd *Deduplicator) PreloadLastNItems(num int) error { - dd.preloadLastN = num - items, err := dd.concDB.LoadRecentNRecords(num) +func (dd *Deduplicator) preloadLastNItems() error { + items, err := dd.concDB.LoadRecentNRecords(dd.conf.PreloadLastNItems) if err != nil { - return fmt.Errorf("failed to preload last N items: %w", err) + return fmt.Errorf("deduplicator failed to preload last N items: %w", err) } for _, item := range items { dd.Add(item.ID) } - log.Debug().Int("numItems", num).Msg("preloaded items for better deduplication") + log.Debug(). + Int("numItems", dd.conf.PreloadLastNItems). + Msg("preloaded items for better deduplication") return nil } func (dd *Deduplicator) TestRecord(concID string) bool { - return dd.items.TestString(concID) + dd.knownIDsMutex.RLock() + defer dd.knownIDsMutex.RUnlock() + return dd.knownIDs.TestString(concID) } // TestAndSolve looks for whether the record has been recently used and if so @@ -105,8 +118,8 @@ func (dd *Deduplicator) TestRecord(concID string) bool { // The "recently used" means that we keep track of recently stored IDs and test // for them only. I.e. we do not perform full search in query persistence db // for each and every concID we want to store. -func (dd *Deduplicator) TestAndSolve(newRec ArchRecord) (bool, error) { - if !dd.items.TestString(newRec.ID) { +func (dd *Deduplicator) TestAndSolve(newRec cncdb.ArchRecord) (bool, error) { + if !dd.TestRecord(newRec.ID) { return false, nil } recs, err := dd.concDB.LoadRecordsByID(newRec.ID) @@ -123,11 +136,11 @@ func (dd *Deduplicator) TestAndSolve(newRec ArchRecord) (bool, error) { Str("concId", newRec.ID). Int("numVariants", len(recs)). Msg("found archived record") - queryTest := make(map[string][]ArchRecord) + queryTest := make(map[string][]cncdb.ArchRecord) for _, rec := range recs { _, ok := queryTest[rec.Data] if !ok { - queryTest[rec.Data] = make([]ArchRecord, 0, 10) + queryTest[rec.Data] = make([]cncdb.ArchRecord, 0, 10) } queryTest[rec.Data] = append(queryTest[rec.Data], rec) } @@ -154,15 +167,17 @@ func (dd *Deduplicator) TestAndSolve(newRec ArchRecord) (bool, error) { return true, err } -func NewDeduplicator(concDB IMySQLOps, loc *time.Location, stateFilePath string) (*Deduplicator, error) { +func NewDeduplicator( + concDB cncdb.IMySQLOps, conf *Conf, loc *time.Location) (*Deduplicator, error) { filter := bloom.NewWithEstimates(bloomFilterNumBits, bloomFilterProbCollision) d := &Deduplicator{ - tz: loc, - items: filter, - concDB: concDB, - storageFilePath: stateFilePath, + tz: loc, + knownIDs: filter, + concDB: concDB, + conf: conf, + knownIDsMutex: &sync.RWMutex{}, } - isf, err := fs.IsFile(stateFilePath) + isf, err := fs.IsFile(conf.DDStateFilePath) if err != nil { return d, fmt.Errorf("failed to init Deduplicator: %w", err) } @@ -170,7 +185,7 @@ func NewDeduplicator(concDB IMySQLOps, loc *time.Location, stateFilePath string) if err := d.LoadFromDisk(); err != nil { return d, fmt.Errorf("failed to init Deduplicator: %w", err) } - log.Info().Str("file", stateFilePath).Msg("loaded previously stored dedup. state") + log.Info().Str("file", conf.DDStateFilePath).Msg("loaded previously stored dedup. state") } return d, nil } diff --git a/archiver/redis.go b/archiver/redis.go index 5855149..612f705 100644 --- a/archiver/redis.go +++ b/archiver/redis.go @@ -17,6 +17,7 @@ package archiver import ( + "camus/cncdb" "context" "encoding/json" "fmt" @@ -91,7 +92,7 @@ func (rd *RedisAdapter) NextNItems(n int64) ([]queueRecord, error) { return ans, nil } -func (rd *RedisAdapter) AddError(item queueRecord, rec *ArchRecord) error { +func (rd *RedisAdapter) AddError(item queueRecord, rec *cncdb.ArchRecord) error { itemJSON, err := json.Marshal(item) if err != nil { return fmt.Errorf("failed to add error record %s: %w", item.Key, err) @@ -114,12 +115,12 @@ func (rd *RedisAdapter) mkKey(id string) string { return fmt.Sprintf("concordance:%s", id) } -func (rd *RedisAdapter) GetConcRecord(id string) (ArchRecord, error) { +func (rd *RedisAdapter) GetConcRecord(id string) (cncdb.ArchRecord, error) { ans := rd.redis.Get(rd.ctx, rd.mkKey(id)) if ans.Err() != nil { - return ArchRecord{}, fmt.Errorf("failed to get concordance record: %w", ans.Err()) + return cncdb.ArchRecord{}, fmt.Errorf("failed to get concordance record: %w", ans.Err()) } - return ArchRecord{ + return cncdb.ArchRecord{ ID: id, Data: ans.Val(), }, nil diff --git a/archiver/redisconf.go b/archiver/redisconf.go new file mode 100644 index 0000000..b434454 --- /dev/null +++ b/archiver/redisconf.go @@ -0,0 +1,52 @@ +// Copyright 2024 Tomas Machalek +// Copyright 2024 Institute of the Czech National Corpus, +// Faculty of Arts, Charles University +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package archiver + +import ( + "fmt" + + "github.com/rs/zerolog/log" +) + +type RedisConf struct { + Host string `json:"host"` + Port int `json:"port"` + DB int `json:"db"` + Password string `json:"password"` + QueueKey string `json:"queueKey"` + FailedQueueKey string `json:"failedQueueKey"` + FailedRecordsKey string `json:"failedRecordsKey"` +} + +func (conf *RedisConf) ValidateAndDefaults() error { + if conf.DB == 0 { + return fmt.Errorf("missing Redis configuration: `db`") + } + if conf.QueueKey == "" { + return fmt.Errorf("missing Redis configuration: `queueKey`") + } + if conf.FailedQueueKey == "" { + conf.FailedQueueKey = conf.QueueKey + "_failed" + log.Warn(). + Str("value", conf.FailedQueueKey). + Msg("Redis configuration `failedQueueKey` missing - using default") + } + if conf.FailedRecordsKey == "" { + return fmt.Errorf("missing Redis configuration: `failedRecordsKey`") + } + return nil +} diff --git a/archiver/stats.go b/archiver/stats.go new file mode 100644 index 0000000..ff66b13 --- /dev/null +++ b/archiver/stats.go @@ -0,0 +1,78 @@ +// Copyright 2024 Tomas Machalek +// Copyright 2024 Institute of the Czech National Corpus, +// Faculty of Arts, Charles University +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package archiver + +import ( + "camus/cncdb" + "encoding/json" + "fmt" + "time" +) + +const ( + yearStatsCacheKey = "camus_years_stats" +) + +type CountPerYear struct { + Year int `json:"year"` + Count int `json:"count"` +} + +type YearsStats struct { + Years []CountPerYear `json:"years"` + LastUpdate time.Time `json:"lastUpdate"` +} + +func (job *ArchKeeper) YearsStats(forceReload bool) (YearsStats, error) { + var cached string + var err error + var ans YearsStats + if !forceReload { + cached, err = job.redis.Get(yearStatsCacheKey) + if err != nil { + return ans, fmt.Errorf("failed to get cached years stats: %w", err) + } + } + if cached == "" { + fmt.Println("NO CACHE____") + data, err := job.db.GetArchSizesByYears(forceReload) + if err == cncdb.ErrTooDemandingQuery { + return ans, nil + + } else if err != nil { + return ans, fmt.Errorf("failed to load years stats from db: %w", err) + } + ans.LastUpdate = time.Now().In(job.tz) + ans.Years = make([]CountPerYear, len(data)) + for i, item := range data { + ans.Years[i] = CountPerYear{Year: item[0], Count: item[1]} + } + jsonData, err := json.Marshal(ans) + if err != nil { + return ans, fmt.Errorf("failed to marshal recent years stats data: %w", err) + } + if err := job.redis.Set(yearStatsCacheKey, jsonData); err != nil { + return ans, fmt.Errorf("failed to store recent years stats to cache: %w", err) + } + + } else { + if err := json.Unmarshal([]byte(cached), &ans); err != nil { + return ans, fmt.Errorf("failed to unmarshal years stats from cache: %w", err) + } + } + return ans, nil +} diff --git a/archiver/types.go b/archiver/types.go index 84b7f56..24d525b 100644 --- a/archiver/types.go +++ b/archiver/types.go @@ -17,16 +17,15 @@ package archiver import ( + "camus/cncdb" "encoding/json" - "fmt" - "time" ) // ------------------------- type Deduplication struct { - NumMerged int `json:"numMerged"` - FinalRecord ArchRecord `json:"finalRecord"` + NumMerged int `json:"numMerged"` + FinalRecord cncdb.ArchRecord `json:"finalRecord"` error error } @@ -41,9 +40,9 @@ func (dedup Deduplication) MarshalJSON() ([]byte, error) { } return json.Marshal( struct { - NumMerged int `json:"numMerged"` - FinalRecord ArchRecord `json:"finalRecord"` - Error string `json:"error,omitempty"` + NumMerged int `json:"numMerged"` + FinalRecord cncdb.ArchRecord `json:"finalRecord"` + Error string `json:"error,omitempty"` }{ NumMerged: dedup.NumMerged, FinalRecord: dedup.FinalRecord, @@ -51,63 +50,3 @@ func (dedup Deduplication) MarshalJSON() ([]byte, error) { }, ) } - -// ---------------------------------- - -type GeneralDataRecord map[string]any - -func (rec GeneralDataRecord) GetPrevID() string { - v, ok := rec["prev_id"] - if !ok { - return "" - } - typedV, ok := v.(string) - if !ok { - return "" - } - return typedV -} - -func (rec GeneralDataRecord) GetCorpora() []string { - v, ok := rec["corpora"] - if !ok { - return []string{} - } - typedV, ok := v.([]string) - if !ok { - return []string{} - } - return typedV -} - -func (rec GeneralDataRecord) GetQuery() []string { - v, ok := rec["q"] - if !ok { - return []string{} - } - typedV, ok := v.([]string) - if !ok { - return []string{} - } - return typedV -} - -// ------------------------ - -type ArchRecord struct { - ID string - Data string - Created time.Time - NumAccess int - LastAccess time.Time - Permanent int -} - -func (rec ArchRecord) FetchData() (GeneralDataRecord, error) { - ans := make(GeneralDataRecord) - err := json.Unmarshal([]byte(rec.Data), &ans) - if err != nil { - return GeneralDataRecord{}, fmt.Errorf("failed to fetch ArchRecord data: %w", err) - } - return ans, nil -} diff --git a/camus.go b/camus.go index f80a2ad..6c2a415 100644 --- a/camus.go +++ b/camus.go @@ -19,6 +19,7 @@ package main import ( "camus/archiver" "camus/cleaner" + "camus/cncdb" "camus/cnf" "camus/reporting" "context" @@ -54,32 +55,24 @@ type service interface { } func createArchiver( - db archiver.IMySQLOps, + db cncdb.IMySQLOps, rdb *archiver.RedisAdapter, reporting reporting.IReporting, conf *cnf.Conf, - loadLastN int, ) *archiver.ArchKeeper { - dedup, err := archiver.NewDeduplicator(db, conf.TimezoneLocation(), conf.DDStateFilePath) + dedup, err := archiver.NewDeduplicator(db, conf.Archiver, conf.TimezoneLocation()) if err != nil { log.Error().Err(err).Msg("Failed to initialize deduplicator") os.Exit(1) return nil } - if loadLastN > 0 { - if err := dedup.PreloadLastNItems(loadLastN); err != nil { - log.Error().Err(err).Msg("Failed to preload items") - os.Exit(1) - } - } return archiver.NewArchKeeper( rdb, db, dedup, reporting, conf.TimezoneLocation(), - time.Duration(conf.CheckIntervalSecs)*time.Second, - conf.CheckIntervalChunk, + conf.Archiver, ) } @@ -100,8 +93,6 @@ func main() { fmt.Fprintf(os.Stderr, "%s [options] version\n", filepath.Base(os.Args[0])) flag.PrintDefaults() } - loadLastN := flag.Int( - "load-last-n", 0, "Load last N items from archive database to start with deduplication checking early") dryRun := flag.Bool( "dry-run", false, "If set, then instead of writing to database, Camus will just report operations to the log") dryRunCleaner := flag.Bool( @@ -131,7 +122,7 @@ func main() { switch action { case "start": - db, err := archiver.DBOpen(conf.MySQL) + db, err := cncdb.DBOpen(conf.MySQL) if err != nil { log.Error().Err(err).Msg("Failed to open SQL database") os.Exit(1) @@ -156,20 +147,20 @@ func main() { reportingService = &reporting.DummyWriter{} } - var dbOps archiver.IMySQLOps - dbOpsRaw := archiver.NewMySQLOps(db, conf.TimezoneLocation()) + var dbOps cncdb.IMySQLOps + dbOpsRaw := cncdb.NewMySQLOps(db, conf.TimezoneLocation()) if *dryRun { - dbOps = archiver.NewMySQLDryRun(dbOpsRaw) + dbOps = cncdb.NewMySQLDryRun(dbOpsRaw) } else { dbOps = dbOpsRaw } - arch := createArchiver(dbOps, rdb, reportingService, conf, *loadLastN) + arch := createArchiver(dbOps, rdb, reportingService, conf) - var cleanerDbOps archiver.IMySQLOps + var cleanerDbOps cncdb.IMySQLOps if *dryRunCleaner { - cleanerDbOps = archiver.NewMySQLDryRun(dbOpsRaw) + cleanerDbOps = cncdb.NewMySQLDryRun(dbOpsRaw) } else { cleanerDbOps = dbOps diff --git a/cleaner/cleaner.go b/cleaner/cleaner.go index ecb81e4..02e8bf2 100644 --- a/cleaner/cleaner.go +++ b/cleaner/cleaner.go @@ -18,6 +18,7 @@ package cleaner import ( "camus/archiver" + "camus/cncdb" "camus/reporting" "context" "fmt" @@ -33,7 +34,7 @@ const ( type Service struct { conf Conf - db archiver.IMySQLOps + db cncdb.IMySQLOps rdb *archiver.RedisAdapter tz *time.Location cleanupRunning bool @@ -54,7 +55,7 @@ func (job *Service) Start(ctx context.Context) { } else { numProc := job.conf.NumProcessItemsPerTick - if TimeIsAtNight(t) { + if cncdb.TimeIsAtNight(t) { numProc = job.conf.NumProcessItemsPerTickNight } err := job.performCleanup(numProc) @@ -128,7 +129,7 @@ func (job *Service) performCleanup(itemsToProc int) error { continue } - err = archiver.ValidateQueryInstances(variants) + err = cncdb.ValidateQueryInstances(variants) if err != nil { log.Warn(). Err(err). @@ -209,7 +210,7 @@ func (job *Service) performCleanup(itemsToProc int) error { } func NewService( - db archiver.IMySQLOps, + db cncdb.IMySQLOps, rdb *archiver.RedisAdapter, reporting reporting.IReporting, conf Conf, diff --git a/cleaner/conf.go b/cleaner/conf.go index 0ca5ebe..887700a 100644 --- a/cleaner/conf.go +++ b/cleaner/conf.go @@ -31,10 +31,6 @@ const ( dfltNightItemsIncrease = 2 ) -func TimeIsAtNight(t time.Time) bool { - return t.Hour() >= 22 || t.Hour() <= 5 -} - type Conf struct { CheckIntervalSecs int `json:"checkIntervalSecs"` NumProcessItemsPerTick int `json:"numProcessItemsPerTick"` @@ -52,6 +48,9 @@ func (conf Conf) MinAgeUnvisited() time.Duration { } func (conf *Conf) ValidateAndDefaults(opsCheckIntervalSecs int) error { + if conf == nil { + return fmt.Errorf("missing `cleaner` section") + } if conf.CheckIntervalSecs < minAllowedCheckInterval { return fmt.Errorf( "invalid value %d for checkIntervalSecs (must be >= %d)", diff --git a/archiver/common.go b/cncdb/common.go similarity index 91% rename from archiver/common.go rename to cncdb/common.go index 51f3b47..7fc945b 100644 --- a/archiver/common.go +++ b/cncdb/common.go @@ -14,9 +14,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package archiver +package cncdb import ( + "errors" "fmt" "strings" "time" @@ -24,6 +25,14 @@ import ( "github.com/google/uuid" ) +var ( + ErrTooDemandingQuery = errors.New("too demanding query") +) + +func TimeIsAtNight(t time.Time) bool { + return t.Hour() >= 22 || t.Hour() <= 5 +} + func MergeRecords(recs []ArchRecord, newRec ArchRecord, tz *time.Location) ArchRecord { if len(recs) == 0 { panic("cannot merge empty slice of ArchRecords") diff --git a/archiver/mysql.go b/cncdb/mysql.go similarity index 89% rename from archiver/mysql.go rename to cncdb/mysql.go index b349a77..eedd5bf 100644 --- a/archiver/mysql.go +++ b/cncdb/mysql.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package archiver +package cncdb import ( "database/sql" @@ -184,6 +184,28 @@ func (ops *MySQLOps) DeduplicateInArchive(curr []ArchRecord, rec ArchRecord) (Ar return ans, nil } +func (ops *MySQLOps) GetArchSizesByYears(forceLoad bool) ([][2]int, error) { + if !forceLoad && !TimeIsAtNight(time.Now().In(ops.tz)) { + return [][2]int{}, ErrTooDemandingQuery + } + rows, err := ops.db.Query( + "SELECT COUNT(*), YEAR(created) AS yc " + + "FROM kontext_conc_persistence " + + "GROUP BY YEAR(created) ORDER BY yc") + if err != nil { + return [][2]int{}, fmt.Errorf("failed to fetch arch. sizes: %w", err) + } + ans := make([][2]int, 0, 30) + for rows.Next() { + var v, year int + if err := rows.Scan(&v, &year); err != nil { + return [][2]int{}, fmt.Errorf("failed to get values from arch. sizes row: %w", err) + } + ans = append(ans, [2]int{year, v}) + } + return ans, nil +} + func NewMySQLOps(db *sql.DB, tz *time.Location) *MySQLOps { return &MySQLOps{ db: db, diff --git a/archiver/mysqldr.go b/cncdb/mysqldr.go similarity index 86% rename from archiver/mysqldr.go rename to cncdb/mysqldr.go index 4c6d304..2372215 100644 --- a/archiver/mysqldr.go +++ b/cncdb/mysqldr.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package archiver +package cncdb import ( "time" @@ -22,6 +22,9 @@ import ( "github.com/rs/zerolog/log" ) +// MySQLDryRun is a dry-run mode version of mysql adapter. It performs +// read operations just like normal adapter but any modifying operation +// just logs its information. type MySQLDryRun struct { db *MySQLOps } @@ -62,6 +65,10 @@ func (db *MySQLDryRun) DeduplicateInArchive(curr []ArchRecord, rec ArchRecord) ( return ArchRecord{}, nil } +func (ops *MySQLDryRun) GetArchSizesByYears(forceLoad bool) ([][2]int, error) { + return ops.db.GetArchSizesByYears(forceLoad) +} + func NewMySQLDryRun(ops *MySQLOps) *MySQLDryRun { return &MySQLDryRun{db: ops} } diff --git a/archiver/mysqli.go b/cncdb/mysqli.go similarity index 73% rename from archiver/mysqli.go rename to cncdb/mysqli.go index 71ec515..7562428 100644 --- a/archiver/mysqli.go +++ b/cncdb/mysqli.go @@ -14,10 +14,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package archiver +package cncdb import "time" +// IMySQLOps is an abstract interface for high level +// database operations. We need it mainly to allow +// injecting "dummy" database adapter for "dry-run" mode. type IMySQLOps interface { LoadRecentNRecords(num int) ([]ArchRecord, error) LoadRecordsFromDate(fromDate time.Time, maxItems int) ([]ArchRecord, error) @@ -27,4 +30,10 @@ type IMySQLOps interface { UpdateRecordStatus(id string, status int) error RemoveRecordsByID(concID string) error DeduplicateInArchive(curr []ArchRecord, rec ArchRecord) (ArchRecord, error) + + // GetArchSizesByYears + // Without forceReload, the function refuses to perform actual query outside + // defined night time. + // Returns list of pairs where FIRST item is always YEAR, the SECOND one is COUNT + GetArchSizesByYears(forceLoad bool) ([][2]int, error) } diff --git a/cncdb/types.go b/cncdb/types.go new file mode 100644 index 0000000..a6f1483 --- /dev/null +++ b/cncdb/types.go @@ -0,0 +1,81 @@ +// Copyright 2024 Tomas Machalek +// Copyright 2024 Institute of the Czech National Corpus, +// Faculty of Arts, Charles University +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cncdb + +import ( + "encoding/json" + "fmt" + "time" +) + +type GeneralDataRecord map[string]any + +func (rec GeneralDataRecord) GetPrevID() string { + v, ok := rec["prev_id"] + if !ok { + return "" + } + typedV, ok := v.(string) + if !ok { + return "" + } + return typedV +} + +func (rec GeneralDataRecord) GetCorpora() []string { + v, ok := rec["corpora"] + if !ok { + return []string{} + } + typedV, ok := v.([]string) + if !ok { + return []string{} + } + return typedV +} + +func (rec GeneralDataRecord) GetQuery() []string { + v, ok := rec["q"] + if !ok { + return []string{} + } + typedV, ok := v.([]string) + if !ok { + return []string{} + } + return typedV +} + +// ---------------------------------- + +type ArchRecord struct { + ID string + Data string + Created time.Time + NumAccess int + LastAccess time.Time + Permanent int +} + +func (rec ArchRecord) FetchData() (GeneralDataRecord, error) { + ans := make(GeneralDataRecord) + err := json.Unmarshal([]byte(rec.Data), &ans) + if err != nil { + return GeneralDataRecord{}, fmt.Errorf("failed to fetch ArchRecord data: %w", err) + } + return ans, nil +} diff --git a/cnf/conf.go b/cnf/conf.go index 8d159c2..2600ce7 100644 --- a/cnf/conf.go +++ b/cnf/conf.go @@ -19,7 +19,7 @@ package cnf import ( "camus/archiver" "camus/cleaner" - "camus/util" + "camus/cncdb" "encoding/json" "fmt" "os" @@ -50,10 +50,8 @@ type Conf struct { LogFile string `json:"logFile"` LogLevel logging.LogLevel `json:"logLevel"` Redis *archiver.RedisConf `json:"redis"` - MySQL *archiver.DBConf `json:"db"` - CheckIntervalSecs int `json:"checkIntervalSecs"` - CheckIntervalChunk int `json:"checkIntervalChunk"` - DDStateFilePath string `json:"ddStateFilePath"` + MySQL *cncdb.DBConf `json:"db"` + Archiver *archiver.Conf `json:"archiver"` Cleaner cleaner.Conf `json:"cleaner"` Reporting hltscl.PgConf `json:"reporting"` } @@ -105,26 +103,16 @@ func ValidateAndDefaults(conf *Conf) { log.Fatal().Err(err).Msg("invalid time zone") } - if conf.DDStateFilePath == "" { - log.Fatal().Msg("missing path to deduplicator state file (ddStateFilePath)") - } - - tmp, err := util.NearestPrime(conf.CheckIntervalSecs) - if err != nil { - log.Fatal().Err(err).Msg("failed to tune ops timing") - } - if tmp != conf.CheckIntervalSecs { - log.Warn(). - Int("oldValue", conf.CheckIntervalSecs). - Int("newValue", tmp). - Msg("tuned value of checkIntervalSecs so it cannot be easily overlapped by other timers") - conf.CheckIntervalSecs = tmp - } if err := conf.Redis.ValidateAndDefaults(); err != nil { log.Fatal().Err(err).Msg("invalid Redis configuration") } - if err := conf.Cleaner.ValidateAndDefaults(conf.CheckIntervalSecs); err != nil { + if err := conf.Archiver.ValidateAndDefaults(); err != nil { + log.Fatal().Err(err).Msg("invalid archiver configuration") + } + + if err := conf.Cleaner.ValidateAndDefaults(conf.Archiver.CheckIntervalSecs); err != nil { log.Fatal().Err(err).Msg("invalid Clean configuration") } + } diff --git a/handler.go b/handler.go index 29cd6c8..3b02063 100644 --- a/handler.go +++ b/handler.go @@ -18,6 +18,7 @@ package main import ( "camus/archiver" + "camus/cncdb" "fmt" "net/http" "regexp" @@ -53,17 +54,27 @@ func (v visitedIds) IDList() []string { // ------ type Actions struct { - BgJob *archiver.ArchKeeper + ArchKeeper *archiver.ArchKeeper } func (a *Actions) Overview(ctx *gin.Context) { ans := make(map[string]any) - ans["archiver"] = a.BgJob.GetStats() + ans["archiver"] = a.ArchKeeper.GetStats() + var forceTotalsReload bool + if ctx.Query("forceReload") == "1" { + forceTotalsReload = true + } + totals, err := a.ArchKeeper.YearsStats(forceTotalsReload) + if err != nil { + uniresp.RespondWithErrorJSON(ctx, err, http.StatusInternalServerError) + return + } + ans["totals"] = totals uniresp.WriteJSONResponse(ctx.Writer, ans) } func (a *Actions) GetRecord(ctx *gin.Context) { - rec, err := a.BgJob.LoadRecordsByID(ctx.Param("id")) + rec, err := a.ArchKeeper.LoadRecordsByID(ctx.Param("id")) if err != nil { uniresp.RespondWithErrorJSON(ctx, err, http.StatusInternalServerError) // TODO return @@ -83,13 +94,13 @@ func (a *Actions) Validate(ctx *gin.Context) { ) return } - recs, err := a.BgJob.LoadRecordsByID(currID) + recs, err := a.ArchKeeper.LoadRecordsByID(currID) if err != nil { uniresp.RespondWithErrorJSON(ctx, err, http.StatusInternalServerError) // TODO return } queryVariants := make(map[string]int) - var reprData archiver.GeneralDataRecord + var reprData cncdb.GeneralDataRecord for _, rec := range recs { data, err := rec.FetchData() if err != nil { @@ -111,24 +122,24 @@ func (a *Actions) Validate(ctx *gin.Context) { uniresp.WriteJSONResponse( ctx.Writer, map[string]any{ - "message": "OK", + "ok": true, "visitedIds": visitedIDs.IDList(), }, ) } func (a *Actions) Fix(ctx *gin.Context) { - recs, err := a.BgJob.LoadRecordsByID(ctx.Param("id")) + recs, err := a.ArchKeeper.LoadRecordsByID(ctx.Param("id")) if err != nil { uniresp.RespondWithErrorJSON(ctx, err, http.StatusInternalServerError) // TODO return } - fixedRecs := make([]archiver.ArchRecord, len(recs)) + fixedRecs := make([]cncdb.ArchRecord, len(recs)) for i, rec := range recs { rec.Data = brokenConcRec1.ReplaceAllString(rec.Data, "") fixedRecs[i] = rec } - newRec, err := a.BgJob.DeduplicateInArchive(fixedRecs, fixedRecs[0]) + newRec, err := a.ArchKeeper.DeduplicateInArchive(fixedRecs, fixedRecs[0]) if err != nil { uniresp.RespondWithErrorJSON(ctx, err, http.StatusInternalServerError) // TODO return @@ -138,3 +149,11 @@ func (a *Actions) Fix(ctx *gin.Context) { ans["fixed"] = newRec uniresp.WriteJSONResponse(ctx.Writer, ans) } + +func (a *Actions) DedupReset(ctx *gin.Context) { + if err := a.ArchKeeper.Reset(); err != nil { + uniresp.RespondWithErrorJSON(ctx, err, http.StatusInternalServerError) + return + } + uniresp.WriteJSONResponse(ctx.Writer, map[string]any{"ok": true}) +}