Skip to content

Commit

Permalink
Multiple changes/improvements:
Browse files Browse the repository at this point in the history
1) sql db ops moved to a separate package
2) archiver conc as a separate type
3) "preload last N items" moved from cmd arg to config
4) add some comments
5) added function producing arch. item counts per year
  • Loading branch information
tomachalek committed Aug 5, 2024
1 parent 3d362fa commit ada6109
Show file tree
Hide file tree
Showing 18 changed files with 467 additions and 208 deletions.
3 changes: 2 additions & 1 deletion apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ func (api *apiServer) Start(ctx context.Context) {
engine.NoMethod(uniresp.NoMethodHandler)
engine.NoRoute(uniresp.NotFoundHandler)

handler := Actions{BgJob: api.arch}
handler := Actions{ArchKeeper: api.arch}

engine.GET("/overview", handler.Overview)
engine.GET("/record/:id", handler.GetRecord)
engine.GET("/validate/:id", handler.Validate)
engine.POST("/fix/:id", handler.Fix)
engine.POST("/dedup-reset", handler.DedupReset)

api.server = &http.Server{
Handler: engine,
Expand Down
79 changes: 54 additions & 25 deletions archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package archiver

import (
"camus/cncdb"
"camus/reporting"
"context"
"fmt"
Expand All @@ -25,19 +26,35 @@ import (
"github.com/rs/zerolog/log"
)

// ArchKeeper handles continuous operations related
// to the concordance archive (contrary to the name, it
// also contains word lists, paradigm. queries and keyword
// queries).
// The main responsibility of ArchKeeper is to read queued
// query IDs, read them ASAP from Redis and store them
// to kontext_conc_persistence SQL table.
// Due to the nature of the partitioning of the table, ArchKeeper
// must also involve some deduplication to prevent extensive
// growth of duplicate records. It is not expected that
// ArchKeeper will catch 100% of duplicates because there is
// also a cleanup job that removes old unused records and
// for each checked record, it also performs a deduplication. But
// the job affects only years old records so we still need
// to prevent (at least some) recent duplicates so that the database
// is reasonably large.
type ArchKeeper struct {
redis *RedisAdapter
db IMySQLOps
reporting reporting.IReporting
checkInterval time.Duration
checkIntervalChunk int
dedup *Deduplicator
tz *time.Location
stats reporting.OpStats
redis *RedisAdapter
db cncdb.IMySQLOps
reporting reporting.IReporting
conf *Conf
dedup *Deduplicator
tz *time.Location
stats reporting.OpStats
}

// Start starts the ArchKeeper service
func (job *ArchKeeper) Start(ctx context.Context) {
ticker := time.NewTicker(job.checkInterval)
ticker := time.NewTicker(job.conf.CheckInterval())
go func() {
for {
select {
Expand All @@ -51,6 +68,7 @@ func (job *ArchKeeper) Start(ctx context.Context) {
}()
}

// Stop stops the ArchKeeper service
func (job *ArchKeeper) Stop(ctx context.Context) error {
log.Warn().Msg("stopping ArchKeeper")
if err := job.dedup.OnClose(); err != nil {
Expand All @@ -59,19 +77,30 @@ func (job *ArchKeeper) Stop(ctx context.Context) error {
return nil
}

func (job *ArchKeeper) StoreToDisk() {
// StoreToDisk stores current operations data from RAM
// to a configured disk file.
func (job *ArchKeeper) StoreToDisk() error {
return job.dedup.StoreToDisk()
}

// Reset clears current operations data stored in RAM
// and initializes itself according to the configuration.
func (job *ArchKeeper) Reset() error {
return job.dedup.Reset()
}

// GetStats returns statistics related to ArchKeeper operations.
// We use it mainly for pushing stats to a TimescaleDB instance.
func (job *ArchKeeper) GetStats() reporting.OpStats {
return job.stats
}

func (job *ArchKeeper) LoadRecordsByID(concID string) ([]ArchRecord, error) {
func (job *ArchKeeper) LoadRecordsByID(concID string) ([]cncdb.ArchRecord, error) {
return job.db.LoadRecordsByID(concID)
}

func (job *ArchKeeper) handleImplicitReq(rec ArchRecord, item queueRecord, currStats *reporting.OpStats) bool {
func (job *ArchKeeper) handleImplicitReq(
rec cncdb.ArchRecord, item queueRecord, currStats *reporting.OpStats) bool {

match, err := job.dedup.TestAndSolve(rec)
if err != nil {
Expand Down Expand Up @@ -106,7 +135,8 @@ func (job *ArchKeeper) handleImplicitReq(rec ArchRecord, item queueRecord, currS
return false
}

func (job *ArchKeeper) handleExplicitReq(rec ArchRecord, item queueRecord, currStats *reporting.OpStats) {
func (job *ArchKeeper) handleExplicitReq(
rec cncdb.ArchRecord, item queueRecord, currStats *reporting.OpStats) {
exists, err := job.db.ContainsRecord(rec.ID)
if err != nil {
currStats.NumErrors++
Expand All @@ -132,7 +162,7 @@ func (job *ArchKeeper) handleExplicitReq(rec ArchRecord, item queueRecord, currS
}

func (job *ArchKeeper) performCheck() error {
items, err := job.redis.NextNItems(int64(job.checkIntervalChunk))
items, err := job.redis.NextNItems(int64(job.conf.CheckIntervalChunk))
log.Debug().
AnErr("error", err).
Int("itemsToProcess", len(items)).
Expand Down Expand Up @@ -175,26 +205,25 @@ func (job *ArchKeeper) performCheck() error {
return nil
}

func (job *ArchKeeper) DeduplicateInArchive(curr []ArchRecord, rec ArchRecord) (ArchRecord, error) {
func (job *ArchKeeper) DeduplicateInArchive(
curr []cncdb.ArchRecord, rec cncdb.ArchRecord) (cncdb.ArchRecord, error) {
return job.db.DeduplicateInArchive(curr, rec)
}

func NewArchKeeper(
redis *RedisAdapter,
db IMySQLOps,
db cncdb.IMySQLOps,
dedup *Deduplicator,
reporting reporting.IReporting,
tz *time.Location,
checkInterval time.Duration,
checkIntervalChunk int,
conf *Conf,
) *ArchKeeper {
return &ArchKeeper{
redis: redis,
db: db,
dedup: dedup,
reporting: reporting,
tz: tz,
checkInterval: checkInterval,
checkIntervalChunk: checkIntervalChunk,
redis: redis,
db: db,
dedup: dedup,
reporting: reporting,
tz: tz,
conf: conf,
}
}
56 changes: 37 additions & 19 deletions archiver/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,54 @@
package archiver

import (
"camus/util"
"fmt"
"time"

"github.com/rs/zerolog/log"
)

type RedisConf struct {
Host string `json:"host"`
Port int `json:"port"`
DB int `json:"db"`
Password string `json:"password"`
QueueKey string `json:"queueKey"`
FailedQueueKey string `json:"failedQueueKey"`
FailedRecordsKey string `json:"failedRecordsKey"`
const (
dfltPreloadLastNItems = 500
)

type Conf struct {
DDStateFilePath string `json:"ddStateFilePath"`
CheckIntervalSecs int `json:"checkIntervalSecs"`
CheckIntervalChunk int `json:"checkIntervalChunk"`
PreloadLastNItems int `json:"preloadLastNItems"`
}

func (conf *RedisConf) ValidateAndDefaults() error {
if conf.DB == 0 {
return fmt.Errorf("missing Redis configuration: `db`")
func (conf *Conf) CheckInterval() time.Duration {
return time.Duration(conf.CheckIntervalSecs) * time.Second
}

func (conf *Conf) ValidateAndDefaults() error {
if conf == nil {
return fmt.Errorf("missing `archiver` section")
}
if conf.QueueKey == "" {
return fmt.Errorf("missing Redis configuration: `queueKey`")
if conf.DDStateFilePath == "" {
return fmt.Errorf("missing path to deduplicator state file (ddStateFilePath)")
}
if conf.FailedQueueKey == "" {
conf.FailedQueueKey = conf.QueueKey + "_failed"

tmp, err := util.NearestPrime(conf.CheckIntervalSecs)
if err != nil {
return fmt.Errorf("failed to tune ops timing: %w", err)
}
if tmp != conf.CheckIntervalSecs {
log.Warn().
Str("value", conf.FailedQueueKey).
Msg("Redis configuration `failedQueueKey` missing - using default")
Int("oldValue", conf.CheckIntervalSecs).
Int("newValue", tmp).
Msg("tuned value of checkIntervalSecs so it cannot be easily overlapped by other timers")
conf.CheckIntervalSecs = tmp
}
if conf.FailedRecordsKey == "" {
return fmt.Errorf("missing Redis configuration: `failedRecordsKey`")

if conf.PreloadLastNItems == 0 {
conf.PreloadLastNItems = dfltPreloadLastNItems
log.Warn().
Int("value", conf.PreloadLastNItems).
Msg("archiver value `preloadLastNItems` not set, using default")
}

return nil
}
Loading

0 comments on commit ada6109

Please sign in to comment.