diff --git a/internal/channel/channel.go b/internal/channel/channel.go index a318ac69f..5fa5658f2 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -4,16 +4,19 @@ import ( "context" "errors" "fmt" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/contracts" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/pkg/plugin" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "net/url" ) type Channel struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Name string `db:"name"` Type string `db:"type"` Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information @@ -27,6 +30,19 @@ type Channel struct { pluginCtxCancel func() } +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (c *Channel) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", c.ID) + encoder.AddString("name", c.Name) + encoder.AddString("type", c.Type) + return nil +} + +// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. +func (c *Channel) IncrementalInitAndValidate() error { + return ValidateType(c.Type) +} + // newConfig helps to store the channel's updated properties type newConfig struct { ctype string diff --git a/internal/config/baseconf/incremental_config.go b/internal/config/baseconf/incremental_config.go new file mode 100644 index 000000000..18f3a4cc2 --- /dev/null +++ b/internal/config/baseconf/incremental_config.go @@ -0,0 +1,45 @@ +package baseconf + +import ( + "github.com/icinga/icinga-go-library/types" +) + +// IncrementalDbEntry contains the changed_at and deleted columns as struct fields. +// +// This type partially implements config.IncrementalConfigurable with GetChangedAt and IsDeleted. Thus, it can be +// embedded in other types with the _`db:",inline"`_ struct tag. However, most structs might want to embed the +// IncrementalPkDbEntry struct instead. +type IncrementalDbEntry struct { + ChangedAt types.UnixMilli `db:"changed_at"` + Deleted types.Bool `db:"deleted"` +} + +// GetChangedAt returns the changed_at value of this entry from the database. +// +// It is required by the config.IncrementalConfigurable interface. +func (i IncrementalDbEntry) GetChangedAt() types.UnixMilli { + return i.ChangedAt +} + +// IsDeleted indicates if this entry is marked as deleted in the database. +// +// It is required by the config.IncrementalConfigurable interface. +func (i IncrementalDbEntry) IsDeleted() bool { + return i.Deleted.Valid && i.Deleted.Bool +} + +// IncrementalPkDbEntry implements a single primary key named id of a generic type next to IncrementalDbEntry. +// +// This type embeds IncrementalDbEntry and adds a single column/value id field, getting one step closer to implementing +// the config.IncrementalConfigurable interface. Thus, it needs to be embedded with the _`db:",inline"`_ struct tag. +type IncrementalPkDbEntry[PK comparable] struct { + IncrementalDbEntry `db:",inline"` + ID PK `db:"id"` +} + +// GetPrimaryKey returns the id of this entry from the database. +// +// It is required by the config.IncrementalConfigurable interface. +func (i IncrementalPkDbEntry[PK]) GetPrimaryKey() PK { + return i.ID +} diff --git a/internal/config/channel.go b/internal/config/channel.go index 769a919f3..04cc4c63f 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -3,84 +3,30 @@ package config import ( "context" "github.com/icinga/icinga-notifications/internal/channel" - "github.com/jmoiron/sqlx" "go.uber.org/zap" ) -func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error { - var channelPtr *channel.Channel - stmt := r.db.BuildSelectStmt(channelPtr, channelPtr) - r.logger.Debugf("Executing query %q", stmt) - - var channels []*channel.Channel - if err := tx.SelectContext(ctx, &channels, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - channelsById := make(map[int64]*channel.Channel) - for _, c := range channels { - channelLogger := r.logger.With( - zap.Int64("id", c.ID), - zap.String("name", c.Name), - zap.String("type", c.Type), - ) - if channelsById[c.ID] != nil { - channelLogger.Warnw("ignoring duplicate config for channel type") - } else if err := channel.ValidateType(c.Type); err != nil { - channelLogger.Errorw("Cannot load channel config", zap.Error(err)) - } else { - channelsById[c.ID] = c - - channelLogger.Debugw("loaded channel config") - } - } - - if r.Channels != nil { - // mark no longer existing channels for deletion - for id := range r.Channels { - if _, ok := channelsById[id]; !ok { - channelsById[id] = nil - } - } - } - - r.pending.Channels = channelsById - - return nil -} - +// applyPendingChannels synchronizes changed channels. func (r *RuntimeConfig) applyPendingChannels() { - if r.Channels == nil { - r.Channels = make(map[int64]*channel.Channel) - } - - for id, pendingChannel := range r.pending.Channels { - if pendingChannel == nil { - r.Channels[id].Logger.Info("Channel has been removed") - r.Channels[id].Stop() - - delete(r.Channels, id) - } else if currentChannel := r.Channels[id]; currentChannel != nil { - // Currently, the whole config is reloaded from the database frequently, replacing everything. - // Prevent restarting the plugin processes every time by explicitly checking for config changes. - // The if condition should no longer be necessary when https://github.com/Icinga/icinga-notifications/issues/5 - // is solved properly. - if currentChannel.Type != pendingChannel.Type || currentChannel.Name != pendingChannel.Name || currentChannel.Config != pendingChannel.Config { - currentChannel.Type = pendingChannel.Type - currentChannel.Name = pendingChannel.Name - currentChannel.Config = pendingChannel.Config - - currentChannel.Restart() - } - } else { - pendingChannel.Start(context.TODO(), r.logs.GetChildLogger("channel").With( - zap.Int64("id", pendingChannel.ID), - zap.String("name", pendingChannel.Name))) - - r.Channels[id] = pendingChannel - } - } - - r.pending.Channels = nil + incrementalApplyPending( + r, + &r.Channels, &r.configChange.Channels, + func(newElement *channel.Channel) error { + newElement.Start(context.TODO(), r.logs.GetChildLogger("channel").With( + zap.Int64("id", newElement.ID), + zap.String("name", newElement.Name))) + return nil + }, + func(curElement, update *channel.Channel) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + curElement.Type = update.Type + curElement.Config = update.Config + curElement.Restart() + return nil + }, + func(delElement *channel.Channel) error { + delElement.Stop() + return nil + }) } diff --git a/internal/config/contact.go b/internal/config/contact.go index bb72efad4..9a272aec9 100644 --- a/internal/config/contact.go +++ b/internal/config/contact.go @@ -1,62 +1,57 @@ package config import ( - "context" + "fmt" "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" + "slices" ) -func (r *RuntimeConfig) fetchContacts(ctx context.Context, tx *sqlx.Tx) error { - var contactPtr *recipient.Contact - stmt := r.db.BuildSelectStmt(contactPtr, contactPtr) - r.logger.Debugf("Executing query %q", stmt) - - var contacts []*recipient.Contact - if err := tx.SelectContext(ctx, &contacts, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - contactsByID := make(map[int64]*recipient.Contact) - for _, c := range contacts { - contactsByID[c.ID] = c - - r.logger.Debugw("loaded contact config", - zap.Int64("id", c.ID), - zap.String("name", c.FullName)) - } - - if r.Contacts != nil { - // mark no longer existing contacts for deletion - for id := range r.Contacts { - if _, ok := contactsByID[id]; !ok { - contactsByID[id] = nil +// applyPendingContacts synchronizes changed contacts +func (r *RuntimeConfig) applyPendingContacts() { + incrementalApplyPending( + r, + &r.Contacts, &r.configChange.Contacts, + nil, + func(curElement, update *recipient.Contact) error { + curElement.ChangedAt = update.ChangedAt + curElement.FullName = update.FullName + curElement.Username = update.Username + curElement.DefaultChannelID = update.DefaultChannelID + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.ContactAddresses, &r.configChange.ContactAddresses, + func(newElement *recipient.Address) error { + contact, ok := r.Contacts[newElement.ContactID] + if !ok { + return fmt.Errorf("contact address refers unknown contact %d", newElement.ContactID) } - } - } - - r.pending.Contacts = contactsByID - return nil -} - -func (r *RuntimeConfig) applyPendingContacts() { - if r.Contacts == nil { - r.Contacts = make(map[int64]*recipient.Contact) - } + contact.Addresses = append(contact.Addresses, newElement) + return nil + }, + func(curElement, update *recipient.Address) error { + if curElement.ContactID != update.ContactID { + return errRemoveAndAddInstead + } - for id, pendingContact := range r.pending.Contacts { - if pendingContact == nil { - delete(r.Contacts, id) - } else if currentContact := r.Contacts[id]; currentContact != nil { - currentContact.FullName = pendingContact.FullName - currentContact.Username = pendingContact.Username - currentContact.DefaultChannelID = pendingContact.DefaultChannelID - } else { - r.Contacts[id] = pendingContact - } - } + curElement.ChangedAt = update.ChangedAt + curElement.Type = update.Type + curElement.Address = update.Address + return nil + }, + func(delElement *recipient.Address) error { + contact, ok := r.Contacts[delElement.ContactID] + if !ok { + return nil + } - r.pending.Contacts = nil + contact.Addresses = slices.DeleteFunc(contact.Addresses, func(address *recipient.Address) bool { + return address.ID == delElement.ID + }) + return nil + }) } diff --git a/internal/config/contact_address.go b/internal/config/contact_address.go deleted file mode 100644 index f89f82f0a..000000000 --- a/internal/config/contact_address.go +++ /dev/null @@ -1,112 +0,0 @@ -package config - -import ( - "context" - "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" - "slices" -) - -func (r *RuntimeConfig) fetchContactAddresses(ctx context.Context, tx *sqlx.Tx) error { - var addressPtr *recipient.Address - stmt := r.db.BuildSelectStmt(addressPtr, addressPtr) - r.logger.Debugf("Executing query %q", stmt) - - var addresses []*recipient.Address - if err := tx.SelectContext(ctx, &addresses, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - addressesById := make(map[int64]*recipient.Address) - for _, a := range addresses { - addressesById[a.ID] = a - r.logger.Debugw("loaded contact_address config", - zap.Int64("id", a.ID), - zap.Int64("contact_id", a.ContactID), - zap.String("type", a.Type), - zap.String("address", a.Address), - ) - } - - if r.ContactAddresses != nil { - // mark no longer existing contacts for deletion - for id := range r.ContactAddresses { - if _, ok := addressesById[id]; !ok { - addressesById[id] = nil - } - } - } - - r.pending.ContactAddresses = addressesById - - return nil -} - -func (r *RuntimeConfig) applyPendingContactAddresses() { - if r.ContactAddresses == nil { - r.ContactAddresses = make(map[int64]*recipient.Address) - } - - for id, pendingAddress := range r.pending.ContactAddresses { - currentAddress := r.ContactAddresses[id] - - if pendingAddress == nil { - r.removeContactAddress(currentAddress) - } else if currentAddress != nil { - r.updateContactAddress(currentAddress, pendingAddress) - } else { - r.addContactAddress(pendingAddress) - } - } - - r.pending.ContactAddresses = nil -} - -func (r *RuntimeConfig) addContactAddress(addr *recipient.Address) { - contact := r.Contacts[addr.ContactID] - if contact != nil { - if i := slices.Index(contact.Addresses, addr); i < 0 { - contact.Addresses = append(contact.Addresses, addr) - - r.logger.Debugw("added new address to contact", - zap.Any("contact", contact), - zap.Any("address", addr)) - } - } - - r.ContactAddresses[addr.ID] = addr -} - -func (r *RuntimeConfig) updateContactAddress(addr, pending *recipient.Address) { - contactChanged := addr.ContactID != pending.ContactID - - if contactChanged { - r.removeContactAddress(addr) - } - - addr.ContactID = pending.ContactID - addr.Type = pending.Type - addr.Address = pending.Address - - if contactChanged { - r.addContactAddress(addr) - } - - r.logger.Debugw("updated contact address", zap.Any("address", addr)) -} - -func (r *RuntimeConfig) removeContactAddress(addr *recipient.Address) { - if contact := r.Contacts[addr.ContactID]; contact != nil { - if i := slices.Index(contact.Addresses, addr); i >= 0 { - contact.Addresses = slices.Delete(contact.Addresses, i, i+1) - - r.logger.Debugw("removed address from contact", - zap.Any("contact", contact), - zap.Any("address", addr)) - } - } - - delete(r.ContactAddresses, addr.ID) -} diff --git a/internal/config/group.go b/internal/config/group.go index 433162aaf..45bab4e75 100644 --- a/internal/config/group.go +++ b/internal/config/group.go @@ -1,100 +1,56 @@ package config import ( - "context" + "fmt" "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" + "slices" ) -func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { - var groupPtr *recipient.Group - stmt := r.db.BuildSelectStmt(groupPtr, groupPtr) - r.logger.Debugf("Executing query %q", stmt) - - var groups []*recipient.Group - if err := tx.SelectContext(ctx, &groups, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - groupsById := make(map[int64]*recipient.Group) - for _, g := range groups { - groupsById[g.ID] = g - - r.logger.Debugw("loaded group config", - zap.Int64("id", g.ID), - zap.String("name", g.Name)) - } - - type ContactgroupMember struct { - GroupId int64 `db:"contactgroup_id"` - ContactId int64 `db:"contact_id"` - } - - var memberPtr *ContactgroupMember - stmt = r.db.BuildSelectStmt(memberPtr, memberPtr) - r.logger.Debugf("Executing query %q", stmt) - - var members []*ContactgroupMember - if err := tx.SelectContext(ctx, &members, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - for _, m := range members { - memberLogger := r.logger.With( - zap.Int64("contact_id", m.ContactId), - zap.Int64("contactgroup_id", m.GroupId), - ) - - if g := groupsById[m.GroupId]; g == nil { - memberLogger.Warnw("ignoring member for unknown contactgroup_id") - } else { - g.MemberIDs = append(g.MemberIDs, m.ContactId) - - memberLogger.Debugw("loaded contact group member", - zap.String("contactgroup_name", g.Name)) - } - } - - if r.Groups != nil { - // mark no longer existing groups for deletion - for id := range r.Groups { - if _, ok := groupsById[id]; !ok { - groupsById[id] = nil - } - } - } - - r.pending.Groups = groupsById - - return nil -} - +// applyPendingGroups synchronizes changed groups. func (r *RuntimeConfig) applyPendingGroups() { - if r.Groups == nil { - r.Groups = make(map[int64]*recipient.Group) - } + incrementalApplyPending( + r, + &r.Groups, &r.configChange.Groups, + nil, + func(curElement, update *recipient.Group) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.groupMembers, &r.configChange.groupMembers, + func(newElement *recipient.GroupMember) error { + group, ok := r.Groups[newElement.GroupId] + if !ok { + return fmt.Errorf("group member refers unknown group %d", newElement.GroupId) + } - for id, pendingGroup := range r.pending.Groups { - if pendingGroup == nil { - delete(r.Groups, id) - } else { - pendingGroup.Members = make([]*recipient.Contact, 0, len(pendingGroup.MemberIDs)) - for _, contactID := range pendingGroup.MemberIDs { - if c := r.Contacts[contactID]; c != nil { - pendingGroup.Members = append(pendingGroup.Members, c) - } + contact, ok := r.Contacts[newElement.ContactId] + if !ok { + return fmt.Errorf("group member refers unknown contact %d", newElement.ContactId) } - if currentGroup := r.Groups[id]; currentGroup != nil { - *currentGroup = *pendingGroup - } else { - r.Groups[id] = pendingGroup + group.Members = append(group.Members, contact) + return nil + }, + func(element, update *recipient.GroupMember) error { + // The only fields in GroupMember are - next to ChangedAt and Deleted - GroupId and ContactId. As those two + // fields are the primary key, changing one would result in another primary key, which is technically not an + // update anymore. + return fmt.Errorf("group membership entry cannot change") + }, + func(delElement *recipient.GroupMember) error { + group, ok := r.Groups[delElement.GroupId] + if !ok { + return nil } - } - } - r.pending.Groups = nil + group.Members = slices.DeleteFunc(group.Members, func(contact *recipient.Contact) bool { + return contact.ID == delElement.ContactId + }) + return nil + }) } diff --git a/internal/config/incremental_sync.go b/internal/config/incremental_sync.go new file mode 100644 index 000000000..5959ec4aa --- /dev/null +++ b/internal/config/incremental_sync.go @@ -0,0 +1,261 @@ +package config + +import ( + "context" + "errors" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/jmoiron/sqlx" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "time" +) + +// IncrementalConfigurable specifies Getter methods required for types supporting incremental configuration loading. +type IncrementalConfigurable[PK comparable] interface { + zapcore.ObjectMarshaler + + // GetPrimaryKey returns the primary key value. + GetPrimaryKey() PK + + // GetChangedAt returns the changed_at value. + GetChangedAt() types.UnixMilli + + // IsDeleted returns if this entry was marked as deleted. + IsDeleted() bool +} + +// IncrementalConfigurableInitAndValidatable defines a single method for new and updated elements to allow both +// initialization and validation, to be used within incrementalFetch. +type IncrementalConfigurableInitAndValidatable interface { + // IncrementalInitAndValidate allows both to initialize and validates with an optional error. + // + // If an error is returned, the incrementalFetch function aborts the element in question. + IncrementalInitAndValidate() error +} + +// incrementalFetch queries all recently changed elements of BaseT and stores them in changeConfigSetField. +// +// The RuntimeConfig.configChangeTimestamps map contains the last known timestamp for each BaseT table. Only those +// elements where the changed_at SQL column is greater than the stored timestamp will be fetched and stored in the +// temporary RuntimeConfig.configChange ConfigSet. Later on, incrementalApplyPending merges it into the main ConfigSet. +func incrementalFetch[ + BaseT any, + PK comparable, + T interface { + *BaseT + IncrementalConfigurable[PK] + }, +](ctx context.Context, tx *sqlx.Tx, r *RuntimeConfig, changeConfigSetField *map[PK]T) error { + startTime := time.Now() + + var typePtr T + + tableName := database.TableName(typePtr) + changedAt, hasChangedAt := r.configChangeTimestamps[tableName] + + stmtLogger := r.logger.With(zap.String("table", tableName)) + + var ( + stmt = r.db.BuildSelectStmt(typePtr, typePtr) + stmtArgs []any + ) + if hasChangedAt { + stmtLogger = stmtLogger.With(zap.Time("changed_at", changedAt.Time())) + stmt += ` WHERE "changed_at" > ?` + stmtArgs = []any{changedAt} + } + + stmt = r.db.Rebind(stmt + ` ORDER BY "changed_at"`) + stmtLogger = stmtLogger.With(zap.String("query", stmt)) + + var ts []T + if err := tx.SelectContext(ctx, &ts, stmt, stmtArgs...); err != nil { + stmtLogger.Errorw("Cannot execute query to fetch incremental config updates", zap.Error(err)) + return err + } + + *changeConfigSetField = make(map[PK]T) + countDel, countErr, countLoad := 0, 0, 0 + for _, t := range ts { + r.configChangeTimestamps[tableName] = t.GetChangedAt() + + logger := r.logger.With(zap.String("table", tableName), zap.Inline(t)) + + if t.IsDeleted() { + if !hasChangedAt { + // This is a special case for the first synchronization or each run when nothing is stored in the + // database yet. Unfortunately, it is not possible to add a "WHERE "deleted" = 'n'" to the query above + // as newer deleted elements would be skipped in the first run, but being read in a subsequent run. + logger.Debug("Skipping deleted element as no prior configuration is loaded") + continue + } + + countDel++ + logger.Debug("Marking entry as deleted") + (*changeConfigSetField)[t.GetPrimaryKey()] = nil + continue + } + + if t, ok := any(t).(IncrementalConfigurableInitAndValidatable); ok { + err := t.IncrementalInitAndValidate() + if err != nil { + countErr++ + logger.Errorw("Cannot validate entry, skipping element", zap.Error(err)) + continue + } + } + + countLoad++ + logger.Debug("Loaded entry") + (*changeConfigSetField)[t.GetPrimaryKey()] = t + } + + stmtLogger = stmtLogger.With("took", time.Since(startTime)) + if countDel > 0 || countErr > 0 || countLoad > 0 { + stmtLogger.Debugw("Fetched incremental configuration updates", + zap.Int("deleted_elements", countDel), + zap.Int("faulty_elements", countErr), + zap.Int("loaded_elements", countLoad)) + } else { + stmtLogger.Debug("No configuration updates are available") + } + + return nil +} + +// errRemoveAndAddInstead is a special non-error which might be expected from incrementalApplyPending's updateFn to +// signal that the current element should be updated by being deleted through the deleteFn first and added again by the +// createFn hook function. +var errRemoveAndAddInstead = errors.New("re-adding by invoking the deletion function followed by the creation function") + +// incrementalApplyPending merges the incremental change from RuntimeConfig.configChange into the main ConfigSet. +// +// The recently fetched incremental change can be of three different types: +// - Newly created elements. Therefore, the createFn callback function will be called upon it, allowing both further +// initialization and also aborting by returning an error. +// - Changed elements. The updateFn callback function receives the current and the updated element, expecting the +// implementation to synchronize the necessary changes into the current element. This hook is allowed to return an +// error as well. However, it might also return the special errRemoveAndAddInstead, resulting in the old element to +// be deleted first and then re-added, with optional calls to the other two callbacks included. +// - Finally, deleted elements. Additional cleanup might be performed by the deleteFn. +// +// If no specific callback action is necessary, each function can be nil. A nil updateFn results in the same behavior as +// errRemoveAndAddInstead. +func incrementalApplyPending[ + BaseT any, + PK comparable, + T interface { + *BaseT + IncrementalConfigurable[PK] + }, +]( + r *RuntimeConfig, + configSetField, changeConfigSetField *map[PK]T, + createFn func(newElement T) error, + updateFn func(curElement, update T) error, + deleteFn func(delElement T) error, +) { + startTime := time.Now() + tableName := database.TableName(T(nil)) + countErr, countDelSkip, countDel, countUpdate, countNew := 0, 0, 0, 0, 0 + + if *configSetField == nil { + *configSetField = make(map[PK]T) + } + + createAction := func(id PK, newT T) error { + if createFn != nil { + if err := createFn(newT); err != nil { + countErr++ + return fmt.Errorf("creation callback error, %w", err) + } + } + (*configSetField)[id] = newT + countNew++ + return nil + } + + deleteAction := func(id PK, oldT T) error { + defer delete(*configSetField, id) + countDel++ + if deleteFn != nil { + if err := deleteFn(oldT); err != nil { + countErr++ + return fmt.Errorf("deletion callback error, %w", err) + } + } + return nil + } + + for id, newT := range *changeConfigSetField { + oldT, oldExists := (*configSetField)[id] + + logger := r.logger.With( + zap.String("table", tableName), + zap.Any("id", id)) + + if newT == nil && !oldExists { + countDelSkip++ + logger.Warn("Skipping unknown marked as deleted configuration element") + } else if newT == nil { + logger := logger.With(zap.Object("deleted", oldT)) + if err := deleteAction(id, oldT); err != nil { + logger.Errorw("Deleting configuration element failed", zap.Error(err)) + } else { + logger.Debug("Deleted configuration element") + } + } else if oldExists { + logger := logger.With(zap.Object("old", oldT), zap.Object("update", newT)) + reAdd := updateFn == nil + if updateFn != nil { + if err := updateFn(oldT, newT); errors.Is(err, errRemoveAndAddInstead) { + reAdd = true + } else if err != nil { + logger.Errorw("Updating callback failed", zap.Error(err)) + countErr++ + continue + } + } + if reAdd { + logger.Debug("Invoking update by removing and re-adding element") + if err := deleteAction(id, oldT); err != nil { + logger.Errorw("Deleting the old element during re-adding failed", zap.Error(err)) + continue + } + if err := createAction(id, newT); err != nil { + logger.Errorw("Creating the new element during re-adding failed", zap.Error(err)) + continue + } + } + countUpdate++ + logger.Debug("Updated known configuration element") + } else { + logger := logger.With(zap.Object("new", newT)) + if err := createAction(id, newT); err != nil { + logger.Errorw("Creating configuration element failed", zap.Error(err)) + } else { + logger.Debug("Created new configuration element") + } + } + } + + *changeConfigSetField = nil + appliedChanges := countErr > 0 || countDelSkip > 0 || countDel > 0 || countUpdate > 0 || countNew > 0 + + logger := r.logger.With( + zap.String("table", tableName), + zap.Duration("took", time.Since(startTime))) + if appliedChanges { + logger.Infow("Applied configuration updates", + zap.Int("faulty_elements", countErr), + zap.Int("deleted_unknown_elements", countDelSkip), + zap.Int("deleted_elements", countDel), + zap.Int("updated_elements", countUpdate), + zap.Int("new_elements", countNew)) + r.configChangeAvailable = true + } else { + logger.Debug("No configuration updates available to be applied") + } +} diff --git a/internal/config/rule.go b/internal/config/rule.go index 10012cb58..59b69c943 100644 --- a/internal/config/rule.go +++ b/internal/config/rule.go @@ -1,195 +1,114 @@ package config import ( - "context" - "github.com/icinga/icinga-notifications/internal/filter" + "fmt" "github.com/icinga/icinga-notifications/internal/rule" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" "slices" ) -func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { - var rulePtr *rule.Rule - stmt := r.db.BuildSelectStmt(rulePtr, rulePtr) - r.logger.Debugf("Executing query %q", stmt) - - var rules []*rule.Rule - if err := tx.SelectContext(ctx, &rules, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - rulesByID := make(map[int64]*rule.Rule) - for _, ru := range rules { - ruleLogger := r.logger.With(zap.Inline(ru)) - - if ru.ObjectFilterExpr.Valid { - f, err := filter.Parse(ru.ObjectFilterExpr.String) - if err != nil { - ruleLogger.Warnw("ignoring rule as parsing object_filter failed", zap.Error(err)) - continue +// applyPendingRules synchronizes changed rules. +func (r *RuntimeConfig) applyPendingRules() { + incrementalApplyPending( + r, + &r.Rules, &r.configChange.Rules, + func(newElement *rule.Rule) error { + if newElement.TimePeriodID.Valid { + tp, ok := r.TimePeriods[newElement.TimePeriodID.Int64] + if !ok { + return fmt.Errorf("rule refers unknown time period %d", newElement.TimePeriodID.Int64) + } + newElement.TimePeriod = tp } - ru.ObjectFilter = f - } - - ru.Escalations = make(map[int64]*rule.Escalation) - - rulesByID[ru.ID] = ru - ruleLogger.Debugw("loaded rule config") - } - - var escalationPtr *rule.Escalation - stmt = r.db.BuildSelectStmt(escalationPtr, escalationPtr) - r.logger.Debugf("Executing query %q", stmt) - - var escalations []*rule.Escalation - if err := tx.SelectContext(ctx, &escalations, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - escalationsByID := make(map[int64]*rule.Escalation) - for _, escalation := range escalations { - escalationLogger := r.logger.With(zap.Inline(escalation)) - - rule := rulesByID[escalation.RuleID] - if rule == nil { - escalationLogger.Warnw("ignoring escalation for unknown rule_id") - continue - } - - if escalation.ConditionExpr.Valid { - cond, err := filter.Parse(escalation.ConditionExpr.String) - if err != nil { - escalationLogger.Warnw("ignoring escalation, failed to parse condition", zap.Error(err)) - continue + newElement.Escalations = make(map[int64]*rule.Escalation) + return nil + }, + func(curElement, update *rule.Rule) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + + curElement.TimePeriodID = update.TimePeriodID + if curElement.TimePeriodID.Valid { + tp, ok := r.TimePeriods[curElement.TimePeriodID.Int64] + if !ok { + return fmt.Errorf("rule refers unknown time period %d", curElement.TimePeriodID.Int64) + } + curElement.TimePeriod = tp + } else { + curElement.TimePeriod = nil } - escalation.Condition = cond - } - - if escalation.FallbackForID.Valid { - // TODO: implement fallbacks (needs extra validation: mismatching rule_id, cycles) - escalationLogger.Warnw("ignoring fallback escalation (not yet implemented)") - continue - } - - rule.Escalations[escalation.ID] = escalation - escalationsByID[escalation.ID] = escalation - escalationLogger.Debugw("loaded escalation config") - } - - var recipientPtr *rule.EscalationRecipient - stmt = r.db.BuildSelectStmt(recipientPtr, recipientPtr) - r.logger.Debugf("Executing query %q", stmt) - - var recipients []*rule.EscalationRecipient - if err := tx.SelectContext(ctx, &recipients, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - for _, recipient := range recipients { - recipientLogger := r.logger.With( - zap.Int64("id", recipient.ID), - zap.Int64("escalation_id", recipient.EscalationID), - zap.Int64("channel_id", recipient.ChannelID.Int64)) - - escalation := escalationsByID[recipient.EscalationID] - if escalation == nil { - recipientLogger.Warnw("ignoring recipient for unknown escalation") - } else { - escalation.Recipients = append(escalation.Recipients, recipient) - recipientLogger.Debugw("loaded escalation recipient config") - } - } - - if r.Rules != nil { - // mark no longer existing rules for deletion - for id := range r.Rules { - if _, ok := rulesByID[id]; !ok { - rulesByID[id] = nil + // ObjectFilter{,Expr} are being initialized by config.IncrementalConfigurableInitAndValidatable. + curElement.ObjectFilter = update.ObjectFilter + curElement.ObjectFilterExpr = update.ObjectFilterExpr + + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.ruleEscalations, &r.configChange.ruleEscalations, + func(newElement *rule.Escalation) error { + elementRule, ok := r.Rules[newElement.RuleID] + if !ok { + return fmt.Errorf("rule escalation refers unknown rule %d", newElement.RuleID) } - } - } - - r.pending.Rules = rulesByID - - return nil -} - -func (r *RuntimeConfig) applyPendingRules() { - if r.Rules == nil { - r.Rules = make(map[int64]*rule.Rule) - } - - for id, pendingRule := range r.pending.Rules { - if pendingRule == nil { - delete(r.Rules, id) - } else { - ruleLogger := r.logger.With(zap.Inline(pendingRule)) - if pendingRule.TimePeriodID.Valid { - if p := r.TimePeriods[pendingRule.TimePeriodID.Int64]; p == nil { - ruleLogger.Warnw("ignoring rule with unknown timeperiod_id") - continue - } else { - pendingRule.TimePeriod = p - } + elementRule.Escalations[newElement.ID] = newElement + return nil + }, + func(curElement, update *rule.Escalation) error { + if curElement.RuleID != update.RuleID { + return errRemoveAndAddInstead } - for _, escalation := range pendingRule.Escalations { - for i, recipient := range escalation.Recipients { - recipientLogger := r.logger.With( - zap.Int64("id", recipient.ID), - zap.Int64("escalation_id", recipient.EscalationID), - zap.Int64("channel_id", recipient.ChannelID.Int64), - zap.Inline(recipient.Key)) - - if recipient.ContactID.Valid { - id := recipient.ContactID.Int64 - if c := r.Contacts[id]; c != nil { - recipient.Recipient = c - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else if recipient.GroupID.Valid { - id := recipient.GroupID.Int64 - if g := r.Groups[id]; g != nil { - recipient.Recipient = g - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else if recipient.ScheduleID.Valid { - id := recipient.ScheduleID.Int64 - if s := r.Schedules[id]; s != nil { - recipient.Recipient = s - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } + curElement.ChangedAt = update.ChangedAt + curElement.NameRaw = update.NameRaw + // Condition{,Expr} are being initialized by config.IncrementalConfigurableInitAndValidatable. + curElement.Condition = update.Condition + curElement.ConditionExpr = update.ConditionExpr + // TODO: synchronize Fallback{ForID,s} when implemented + + return nil + }, + func(delElement *rule.Escalation) error { + elementRule, ok := r.Rules[delElement.RuleID] + if !ok { + return nil + } - escalation.Recipients = slices.DeleteFunc(escalation.Recipients, func(r *rule.EscalationRecipient) bool { - return r == nil - }) + delete(elementRule.Escalations, delElement.ID) + return nil + }) + + incrementalApplyPending( + r, + &r.ruleEscalationRecipients, &r.configChange.ruleEscalationRecipients, + func(newElement *rule.EscalationRecipient) error { + newElement.Recipient = r.GetRecipient(newElement.Key) + if newElement.Recipient == nil { + return fmt.Errorf("rule escalation recipient is missing or unknown") } - if currentRule := r.Rules[id]; currentRule != nil { - *currentRule = *pendingRule - } else { - r.Rules[id] = pendingRule + escalation := r.GetRuleEscalation(newElement.EscalationID) + if escalation == nil { + return fmt.Errorf("rule escalation recipient refers to unknown escalation %d", newElement.EscalationID) + } + escalation.Recipients = append(escalation.Recipients, newElement) + + return nil + }, + nil, + func(delElement *rule.EscalationRecipient) error { + escalation := r.GetRuleEscalation(delElement.EscalationID) + if escalation == nil { + return nil } - } - } - r.pending.Rules = nil + escalation.Recipients = slices.DeleteFunc(escalation.Recipients, func(recipient *rule.EscalationRecipient) bool { + return recipient.EscalationID == delElement.EscalationID + }) + return nil + }) } diff --git a/internal/config/runtime.go b/internal/config/runtime.go index 69b38188d..4c47d843c 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -6,11 +6,11 @@ import ( "errors" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/channel" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/timeperiod" - "github.com/jmoiron/sqlx" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" "strconv" @@ -29,8 +29,13 @@ type RuntimeConfig struct { // This became necessary due to circular imports, either with the incident or icinga2 package. EventStreamLaunchFunc func(source *Source) - // pending contains changes to config objects that are to be applied to the embedded live config. - pending ConfigSet + // configChange contains incremental changes to config objects to be merged into the live configuration. + // + // It will be both created and deleted within RuntimeConfig.UpdateFromDatabase. To keep track of the known state, + // the last known timestamp of each ConfigSet type is stored within configChangeTimestamps. + configChange *ConfigSet + configChangeAvailable bool + configChangeTimestamps map[string]types.UnixMilli logs *logging.Logging logger *logging.Logger @@ -48,6 +53,8 @@ func NewRuntimeConfig( return &RuntimeConfig{ EventStreamLaunchFunc: esLaunch, + configChangeTimestamps: make(map[string]types.UnixMilli), + logs: logs, logger: logs.GetChildLogger("runtime-updates"), db: db, @@ -63,19 +70,39 @@ type ConfigSet struct { Schedules map[int64]*recipient.Schedule Rules map[int64]*rule.Rule Sources map[int64]*Source + + // The following fields contain intermediate values, necessary for the incremental config synchronization. + // Furthermore, they allow accessing intermediate tables as everything is referred by pointers. + groupMembers map[recipient.GroupMemberKey]*recipient.GroupMember + timePeriodEntries map[int64]*timeperiod.Entry + scheduleRotations map[int64]*recipient.Rotation + scheduleRotationMembers map[int64]*recipient.RotationMember + ruleEscalations map[int64]*rule.Escalation + ruleEscalationRecipients map[int64]*rule.EscalationRecipient } func (r *RuntimeConfig) UpdateFromDatabase(ctx context.Context) error { - err := r.fetchFromDatabase(ctx) - if err != nil { + startTime := time.Now() + defer func() { + r.logger.Debugw("Finished configuration synchronization", zap.Duration("took", time.Since(startTime))) + }() + + r.logger.Debug("Synchronizing configuration with database") + + r.configChange = &ConfigSet{} + r.configChangeAvailable = false + defer func() { r.configChange = nil }() + + if err := r.fetchFromDatabase(ctx); err != nil { return err } r.applyPending() - - err = r.debugVerify() - if err != nil { - panic(err) + if r.configChangeAvailable { + r.logger.Debug("Synchronizing applied configuration changes, verifying state") + if err := r.debugVerify(); err != nil { + r.logger.Fatalw("Newly synchronized configuration failed verification", zap.Error(err)) + } } return nil @@ -88,10 +115,8 @@ func (r *RuntimeConfig) PeriodicUpdates(ctx context.Context, interval time.Durat for { select { case <-ticker.C: - r.logger.Debug("periodically updating config") - err := r.UpdateFromDatabase(ctx) - if err != nil { - r.logger.Errorw("periodic config update failed, continuing with previous config", zap.Error(err)) + if err := r.UpdateFromDatabase(ctx); err != nil { + r.logger.Errorw("Periodic configuration synchronization failed", zap.Error(err)) } case <-ctx.Done(): return @@ -201,12 +226,6 @@ func (r *RuntimeConfig) GetSourceFromCredentials(user, pass string, logger *logg } func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error { - r.logger.Debug("fetching configuration from database") - start := time.Now() - - // Reset all pending state to start from a clean state. - r.pending = ConfigSet{} - tx, err := r.db.BeginTxx(ctx, &sql.TxOptions{ Isolation: sql.LevelRepeatableRead, ReadOnly: true, @@ -217,42 +236,46 @@ func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error { // The transaction is only used for reading, never has to be committed. defer func() { _ = tx.Rollback() }() - updateFuncs := []func(ctx context.Context, tx *sqlx.Tx) error{ - r.fetchChannels, - r.fetchContacts, - r.fetchContactAddresses, - r.fetchGroups, - r.fetchTimePeriods, - r.fetchSchedules, - r.fetchRules, - r.fetchSources, + fetchFns := []func() error{ + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Channels) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Contacts) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ContactAddresses) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Groups) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.groupMembers) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Schedules) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.scheduleRotations) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.scheduleRotationMembers) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.TimePeriods) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.timePeriodEntries) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Rules) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ruleEscalations) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ruleEscalationRecipients) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Sources) }, } - for _, f := range updateFuncs { - if err := f(ctx, tx); err != nil { + for _, f := range fetchFns { + if err := f(); err != nil { return err } } - r.logger.Debugw("fetched configuration from database", zap.Duration("took", time.Since(start))) - return nil } +// applyPending synchronizes all changes. func (r *RuntimeConfig) applyPending() { r.mu.Lock() defer r.mu.Unlock() - r.logger.Debug("applying pending configuration") - start := time.Now() - - r.applyPendingChannels() - r.applyPendingContacts() - r.applyPendingContactAddresses() - r.applyPendingGroups() - r.applyPendingTimePeriods() - r.applyPendingSchedules() - r.applyPendingRules() - r.applyPendingSources() - - r.logger.Debugw("applied pending configuration", zap.Duration("took", time.Since(start))) + applyFns := []func(){ + r.applyPendingChannels, + r.applyPendingContacts, + r.applyPendingGroups, + r.applyPendingSchedules, + r.applyPendingTimePeriods, + r.applyPendingRules, + r.applyPendingSources, + } + for _, f := range applyFns { + f() + } } diff --git a/internal/config/schedule.go b/internal/config/schedule.go index af3c7557b..a1fb7db58 100644 --- a/internal/config/schedule.go +++ b/internal/config/schedule.go @@ -1,172 +1,114 @@ package config import ( - "context" + "fmt" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/timeperiod" - "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" ) -func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error { - var schedulePtr *recipient.Schedule - stmt := r.db.BuildSelectStmt(schedulePtr, schedulePtr) - r.logger.Debugf("Executing query %q", stmt) - - var schedules []*recipient.Schedule - if err := tx.SelectContext(ctx, &schedules, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - schedulesById := make(map[int64]*recipient.Schedule) - for _, g := range schedules { - schedulesById[g.ID] = g - - r.logger.Debugw("loaded schedule config", - zap.Int64("id", g.ID), - zap.String("name", g.Name)) - } - - var rotationPtr *recipient.Rotation - stmt = r.db.BuildSelectStmt(rotationPtr, rotationPtr) - r.logger.Debugf("Executing query %q", stmt) - - var rotations []*recipient.Rotation - if err := tx.SelectContext(ctx, &rotations, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - rotationsById := make(map[int64]*recipient.Rotation) - for _, rotation := range rotations { - rotationLogger := r.logger.With(zap.Object("rotation", rotation)) - - if schedule := schedulesById[rotation.ScheduleID]; schedule == nil { - rotationLogger.Warnw("ignoring schedule rotation for unknown schedule_id") - } else { - rotationsById[rotation.ID] = rotation - schedule.Rotations = append(schedule.Rotations, rotation) - - rotationLogger.Debugw("loaded schedule rotation") - } - } - - var rotationMemberPtr *recipient.RotationMember - stmt = r.db.BuildSelectStmt(rotationMemberPtr, rotationMemberPtr) - r.logger.Debugf("Executing query %q", stmt) - - var members []*recipient.RotationMember - if err := tx.SelectContext(ctx, &members, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - rotationMembersById := make(map[int64]*recipient.RotationMember) - for _, member := range members { - memberLogger := r.logger.With(zap.Object("rotation_member", member)) - - if rotation := rotationsById[member.RotationID]; rotation == nil { - memberLogger.Warnw("ignoring rotation member for unknown rotation_member_id") - } else { - member.TimePeriodEntries = make(map[int64]*timeperiod.Entry) - rotation.Members = append(rotation.Members, member) - rotationMembersById[member.ID] = member - - memberLogger.Debugw("loaded schedule rotation member") - } - } - - var entryPtr *timeperiod.Entry - stmt = r.db.BuildSelectStmt(entryPtr, entryPtr) + " WHERE rotation_member_id IS NOT NULL" - r.logger.Debugf("Executing query %q", stmt) - - var entries []*timeperiod.Entry - if err := tx.SelectContext(ctx, &entries, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - for _, entry := range entries { - var member *recipient.RotationMember - if entry.RotationMemberID.Valid { - member = rotationMembersById[entry.RotationMemberID.Int64] - } - - if member == nil { - r.logger.Warnw("ignoring entry for unknown rotation_member_id", - zap.Int64("timeperiod_entry_id", entry.ID), - zap.Int64("timeperiod_id", entry.TimePeriodID)) - continue - } - - err := entry.Init() - if err != nil { - r.logger.Warnw("ignoring time period entry", - zap.Object("entry", entry), - zap.Error(err)) - continue - } - - member.TimePeriodEntries[entry.ID] = entry - } - - for _, schedule := range schedulesById { - schedule.RefreshRotations() - } - - if r.Schedules != nil { - // mark no longer existing schedules for deletion - for id := range r.Schedules { - if _, ok := schedulesById[id]; !ok { - schedulesById[id] = nil +// applyPendingSchedules synchronizes changed schedules. +func (r *RuntimeConfig) applyPendingSchedules() { + // Set of schedules (by id) which Rotation was altered and where RefreshRotations must be called. + updatedScheduleIds := make(map[int64]struct{}) + + incrementalApplyPending( + r, + &r.Schedules, &r.configChange.Schedules, + nil, + func(curElement, update *recipient.Schedule) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.scheduleRotations, &r.configChange.scheduleRotations, + func(newElement *recipient.Rotation) error { + schedule, ok := r.Schedules[newElement.ScheduleID] + if !ok { + return fmt.Errorf("rotation refers to unknown schedule %d", newElement.ScheduleID) } - } - } - r.pending.Schedules = schedulesById + schedule.Rotations = append(schedule.Rotations, newElement) + updatedScheduleIds[schedule.ID] = struct{}{} + return nil + }, + func(curElement, update *recipient.Rotation) error { + if curElement.ScheduleID != update.ScheduleID { + return errRemoveAndAddInstead + } - return nil -} + curElement.ChangedAt = update.ChangedAt + curElement.ActualHandoff = update.ActualHandoff + curElement.Priority = update.Priority + curElement.Name = update.Name + + updatedScheduleIds[curElement.ScheduleID] = struct{}{} + return nil + }, + func(delElement *recipient.Rotation) error { + schedule, ok := r.Schedules[delElement.ScheduleID] + if !ok { + return nil + } -func (r *RuntimeConfig) applyPendingSchedules() { - if r.Schedules == nil { - r.Schedules = make(map[int64]*recipient.Schedule) - } + schedule.Rotations = slices.DeleteFunc(schedule.Rotations, func(rotation *recipient.Rotation) bool { + return rotation.ID == delElement.ID + }) + updatedScheduleIds[schedule.ID] = struct{}{} + return nil + }) + + incrementalApplyPending( + r, + &r.scheduleRotationMembers, &r.configChange.scheduleRotationMembers, + func(newElement *recipient.RotationMember) error { + rotation, ok := r.scheduleRotations[newElement.RotationID] + if !ok { + return fmt.Errorf("schedule rotation member refers unknown rotation %d", newElement.RotationID) + } - for id, pendingSchedule := range r.pending.Schedules { - if pendingSchedule == nil { - delete(r.Schedules, id) - } else { - for _, rotation := range pendingSchedule.Rotations { - for _, member := range rotation.Members { - memberLogger := r.logger.With( - zap.Object("rotation", rotation), - zap.Object("rotation_member", member)) + rotation.Members = append(rotation.Members, newElement) + updatedScheduleIds[rotation.ScheduleID] = struct{}{} - if member.ContactID.Valid { - member.Contact = r.Contacts[member.ContactID.Int64] - if member.Contact == nil { - memberLogger.Warnw("rotation member has an unknown contact_id") - } - } + if newElement.ContactID.Valid { + newElement.Contact, ok = r.Contacts[newElement.ContactID.Int64] + if !ok { + return fmt.Errorf("schedule rotation member refers unknown contact %d", newElement.ContactID.Int64) + } + } - if member.ContactGroupID.Valid { - member.ContactGroup = r.Groups[member.ContactGroupID.Int64] - if member.ContactGroup == nil { - memberLogger.Warnw("rotation member has an unknown contactgroup_id") - } - } + if newElement.ContactGroupID.Valid { + newElement.ContactGroup, ok = r.Groups[newElement.ContactGroupID.Int64] + if !ok { + return fmt.Errorf("schedule rotation member refers unknown contact group %d", newElement.ContactGroupID.Int64) } } - if currentSchedule := r.Schedules[id]; currentSchedule != nil { - *currentSchedule = *pendingSchedule - } else { - r.Schedules[id] = pendingSchedule + newElement.TimePeriodEntries = make(map[int64]*timeperiod.Entry) + return nil + }, + nil, + func(delElement *recipient.RotationMember) error { + rotation, ok := r.scheduleRotations[delElement.RotationID] + if !ok { + return nil } - } - } - r.pending.Schedules = nil + rotation.Members = slices.DeleteFunc(rotation.Members, func(member *recipient.RotationMember) bool { + return member.ID == delElement.ID + }) + updatedScheduleIds[rotation.ScheduleID] = struct{}{} + return nil + }) + + for id := range updatedScheduleIds { + schedule := r.Schedules[id] + r.logger.Debugw("Refreshing schedule rotations", zap.Inline(schedule)) + schedule.RefreshRotations() + } } diff --git a/internal/config/source.go b/internal/config/source.go index df986859f..affb9620b 100644 --- a/internal/config/source.go +++ b/internal/config/source.go @@ -3,8 +3,8 @@ package config import ( "context" "github.com/icinga/icinga-go-library/types" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" + "github.com/icinga/icinga-notifications/internal/config/baseconf" + "go.uber.org/zap/zapcore" ) // SourceTypeIcinga2 represents the "icinga2" Source Type for Event Stream API sources. @@ -12,7 +12,8 @@ const SourceTypeIcinga2 = "icinga2" // Source entry within the ConfigSet to describe a source. type Source struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Type string `db:"type"` Name string `db:"name"` @@ -29,109 +30,31 @@ type Source struct { Icinga2SourceCancel context.CancelFunc `db:"-" json:"-"` } -// fieldEquals checks if this Source's database fields are equal to those of another Source. -func (source *Source) fieldEquals(other *Source) bool { - boolEq := func(a, b types.Bool) bool { return (!a.Valid && !b.Valid) || (a == b) } - stringEq := func(a, b types.String) bool { return (!a.Valid && !b.Valid) || (a == b) } - - return source.ID == other.ID && - source.Type == other.Type && - source.Name == other.Name && - stringEq(source.ListenerPasswordHash, other.ListenerPasswordHash) && - stringEq(source.Icinga2BaseURL, other.Icinga2BaseURL) && - stringEq(source.Icinga2AuthUser, other.Icinga2AuthUser) && - stringEq(source.Icinga2AuthPass, other.Icinga2AuthPass) && - stringEq(source.Icinga2CAPem, other.Icinga2CAPem) && - stringEq(source.Icinga2CommonName, other.Icinga2CommonName) && - boolEq(source.Icinga2InsecureTLS, other.Icinga2InsecureTLS) -} - -// stop this Source's worker; currently only Icinga Event Stream API Client. -func (source *Source) stop() { - if source.Type == SourceTypeIcinga2 && source.Icinga2SourceCancel != nil { - source.Icinga2SourceCancel() - source.Icinga2SourceCancel = nil - } -} - -func (r *RuntimeConfig) fetchSources(ctx context.Context, tx *sqlx.Tx) error { - var sourcePtr *Source - stmt := r.db.BuildSelectStmt(sourcePtr, sourcePtr) - r.logger.Debugf("Executing query %q", stmt) - - var sources []*Source - if err := tx.SelectContext(ctx, &sources, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - sourcesById := make(map[int64]*Source) - for _, s := range sources { - sourceLogger := r.logger.With( - zap.Int64("id", s.ID), - zap.String("name", s.Name), - zap.String("type", s.Type), - ) - if sourcesById[s.ID] != nil { - sourceLogger.Error("Ignoring duplicate config for source ID") - continue - } - - sourcesById[s.ID] = s - sourceLogger.Debug("loaded source config") - } - - if r.Sources != nil { - // mark no longer existing sources for deletion - for id := range r.Sources { - if _, ok := sourcesById[id]; !ok { - sourcesById[id] = nil - } - } - } - - r.pending.Sources = sourcesById - +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (source *Source) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", source.ID) + encoder.AddString("type", source.Type) + encoder.AddString("name", source.Name) return nil } +// applyPendingSources synchronizes changed sources. func (r *RuntimeConfig) applyPendingSources() { - if r.Sources == nil { - r.Sources = make(map[int64]*Source) - } - - for id, pendingSource := range r.pending.Sources { - logger := r.logger.With(zap.Int64("id", id)) - currentSource := r.Sources[id] - - // Compare the pending source with an optional existing source; instruct the Event Source Client, if necessary. - if pendingSource == nil && currentSource != nil { - logger.Info("Source has been removed") - - currentSource.stop() - delete(r.Sources, id) - continue - } else if pendingSource != nil && currentSource != nil { - if currentSource.fieldEquals(pendingSource) { - continue + incrementalApplyPending( + r, + &r.Sources, &r.configChange.Sources, + func(newElement *Source) error { + if newElement.Type == SourceTypeIcinga2 { + r.EventStreamLaunchFunc(newElement) } - - logger.Info("Source has been updated") - currentSource.stop() - } else if pendingSource != nil && currentSource == nil { - logger.Info("Source has been added") - } else { - // Neither an active nor a pending source? - logger.Error("Cannot applying pending configuration: neither an active nor a pending source") - continue - } - - if pendingSource.Type == SourceTypeIcinga2 { - r.EventStreamLaunchFunc(pendingSource) - } - - r.Sources[id] = pendingSource - } - - r.pending.Sources = nil + return nil + }, + nil, + func(delElement *Source) error { + if delElement.Type == SourceTypeIcinga2 && delElement.Icinga2SourceCancel != nil { + delElement.Icinga2SourceCancel() + delElement.Icinga2SourceCancel = nil + } + return nil + }) } diff --git a/internal/config/timeperiod.go b/internal/config/timeperiod.go index 9263c52d4..411467a18 100644 --- a/internal/config/timeperiod.go +++ b/internal/config/timeperiod.go @@ -1,100 +1,63 @@ package config import ( - "context" "fmt" "github.com/icinga/icinga-notifications/internal/timeperiod" - "github.com/jmoiron/sqlx" - "go.uber.org/zap" + "slices" ) -func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error { - var timePeriodPtr *timeperiod.TimePeriod - stmt := r.db.BuildSelectStmt(timePeriodPtr, timePeriodPtr) - r.logger.Debugf("Executing query %q", stmt) - - var timePeriods []*timeperiod.TimePeriod - if err := tx.SelectContext(ctx, &timePeriods, stmt); err != nil { - r.logger.Errorln(err) - return err - } - timePeriodsById := make(map[int64]*timeperiod.TimePeriod) - for _, period := range timePeriods { - timePeriodsById[period.ID] = period - } - - var entryPtr *timeperiod.Entry - stmt = r.db.BuildSelectStmt(entryPtr, entryPtr) - r.logger.Debugf("Executing query %q", stmt) - - var entries []*timeperiod.Entry - if err := tx.SelectContext(ctx, &entries, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - for _, entry := range entries { - p := timePeriodsById[entry.TimePeriodID] - if p == nil { - r.logger.Warnw("ignoring entry for unknown timeperiod_id", - zap.Int64("timeperiod_entry_id", entry.ID), - zap.Int64("timeperiod_id", entry.TimePeriodID)) - continue - } - - if p.Name == "" { - p.Name = fmt.Sprintf("Time Period #%d", entry.TimePeriodID) - } - - err := entry.Init() - if err != nil { - r.logger.Warnw("ignoring time period entry", - zap.Object("entry", entry), - zap.Error(err)) - continue - } - - p.Entries = append(p.Entries, entry) +// applyPendingTimePeriods synchronizes changed time periods. +func (r *RuntimeConfig) applyPendingTimePeriods() { + incrementalApplyPending( + r, + &r.TimePeriods, &r.configChange.TimePeriods, + nil, + func(curElement, update *timeperiod.TimePeriod) error { + curElement.ChangedAt = update.ChangedAt + curElement.Name = update.Name + return nil + }, + nil) + + incrementalApplyPending( + r, + &r.timePeriodEntries, &r.configChange.timePeriodEntries, + func(newElement *timeperiod.Entry) error { + period, ok := r.TimePeriods[newElement.TimePeriodID] + if !ok { + return fmt.Errorf("time period entry refers unknown time period %d", newElement.TimePeriodID) + } - r.logger.Debugw("loaded time period entry", - zap.Object("timeperiod", p), - zap.Object("entry", entry)) - } + period.Entries = append(period.Entries, newElement) - for _, p := range timePeriodsById { - if p.Name == "" { - p.Name = fmt.Sprintf("Time Period #%d (empty)", p.ID) - } - } + // rotation_member_id is nullable for future standalone timeperiods + if newElement.RotationMemberID.Valid { + rotationMember, ok := r.scheduleRotationMembers[newElement.RotationMemberID.Int64] + if !ok { + return fmt.Errorf("time period entry refers unknown rotation member %d", newElement.RotationMemberID.Int64) + } - if r.TimePeriods != nil { - // mark no longer existing time periods for deletion - for id := range r.TimePeriods { - if _, ok := timePeriodsById[id]; !ok { - timePeriodsById[id] = nil + rotationMember.TimePeriodEntries[newElement.ID] = newElement } - } - } - - r.pending.TimePeriods = timePeriodsById - - return nil -} -func (r *RuntimeConfig) applyPendingTimePeriods() { - if r.TimePeriods == nil { - r.TimePeriods = make(map[int64]*timeperiod.TimePeriod) - } + return nil + }, + nil, + func(delElement *timeperiod.Entry) error { + period, ok := r.TimePeriods[delElement.TimePeriodID] + if ok { + period.Entries = slices.DeleteFunc(period.Entries, func(entry *timeperiod.Entry) bool { + return entry.ID == delElement.ID + }) + } - for id, pendingTimePeriod := range r.pending.TimePeriods { - if pendingTimePeriod == nil { - delete(r.TimePeriods, id) - } else if currentTimePeriod := r.TimePeriods[id]; currentTimePeriod != nil { - *currentTimePeriod = *pendingTimePeriod - } else { - r.TimePeriods[id] = pendingTimePeriod - } - } + if delElement.RotationMemberID.Valid { + rotationMember, ok := r.scheduleRotationMembers[delElement.RotationMemberID.Int64] + if ok { + delete(rotationMember.TimePeriodEntries, delElement.ID) + } + } - r.pending.TimePeriods = nil + return nil + }) } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index ddad59956..18b8b0a49 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -92,7 +92,11 @@ func (i *Incident) ID() int64 { } func (i *Incident) HasManager() bool { - for _, state := range i.Recipients { + for recipientKey, state := range i.Recipients { + if i.runtimeConfig.GetRecipient(recipientKey) == nil { + i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) + continue + } if state.Role == RoleManager { return true } @@ -410,10 +414,6 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 } for _, r := range i.runtimeConfig.Rules { - if !r.IsActive.Valid || !r.IsActive.Bool { - continue - } - if _, ok := i.Rules[r.ID]; !ok { matched, err := r.Eval(i.Object) if err != nil { @@ -474,8 +474,8 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, for rID := range i.Rules { r := i.runtimeConfig.Rules[rID] - - if r == nil || !r.IsActive.Valid || !r.IsActive.Bool { + if r == nil { + i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", rID)) continue } @@ -525,6 +525,11 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, escalations []*rule.Escalation) error { for _, escalation := range escalations { r := i.runtimeConfig.Rules[escalation.RuleID] + if r == nil { + i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) + continue + } + i.logger.Infow("Rule reached escalation", zap.Object("rule", r), zap.Object("escalation", escalation)) state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} @@ -570,6 +575,10 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifications []*NotificationEntry) error { for _, notification := range notifications { contact := i.runtimeConfig.Contacts[notification.ContactID] + if contact == nil { + i.logger.Debugw("Incident refers unknown contact, might got deleted", zap.Int64("contact_id", notification.ContactID)) + continue + } if i.notifyContact(contact, ev, notification.ChannelID) != nil { notification.State = NotificationStateFailed @@ -692,6 +701,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { for escalationID := range i.EscalationState { escalation := i.runtimeConfig.GetRuleEscalation(escalationID) if escalation == nil { + i.logger.Debugw("Incident refers unknown escalation, might got deleted", zap.Int64("escalation_id", escalationID)) continue } @@ -704,6 +714,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { for recipientKey, state := range i.Recipients { r := i.runtimeConfig.GetRecipient(recipientKey) if r == nil { + i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue } diff --git a/internal/incident/incidents_test.go b/internal/incident/incidents_test.go index b4b35c0de..2cf4c0e07 100644 --- a/internal/incident/incidents_test.go +++ b/internal/incident/incidents_test.go @@ -23,7 +23,14 @@ func TestLoadOpenIncidents(t *testing.T) { db := testutils.GetTestDB(ctx, t) // Insert a dummy source for our test cases! - source := config.Source{Type: "notifications", Name: "Icinga Notifications", Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}} + source := config.Source{ + Type: "notifications", + Name: "Icinga Notifications", + Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}, + } + source.ChangedAt = types.UnixMilli(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)) + source.Deleted = types.Bool{Bool: false, Valid: true} + err := utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { id, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, source, "id"), source) require.NoError(t, err, "populating source table should not fail") diff --git a/internal/object/objects_test.go b/internal/object/objects_test.go index 35f521972..25075bb67 100644 --- a/internal/object/objects_test.go +++ b/internal/object/objects_test.go @@ -21,11 +21,12 @@ func TestRestoreMutedObjects(t *testing.T) { var sourceID int64 err := utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { args := map[string]any{ - "type": "notifications", - "name": "Icinga Notifications", + "type": "notifications", + "name": "Icinga Notifications", + "changed_at": 1720702049000, } // We can't use config.Source here unfortunately due to cyclic import error! - id, err := utils.InsertAndFetchId(ctx, tx, `INSERT INTO source (type, name) VALUES (:type, :name)`, args) + id, err := utils.InsertAndFetchId(ctx, tx, `INSERT INTO source (type, name, changed_at) VALUES (:type, :name, :changed_at)`, args) require.NoError(t, err, "populating source table should not fail") sourceID = id diff --git a/internal/recipient/contact.go b/internal/recipient/contact.go index 82732f1f6..bf85810e9 100644 --- a/internal/recipient/contact.go +++ b/internal/recipient/contact.go @@ -2,12 +2,14 @@ package recipient import ( "database/sql" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "go.uber.org/zap/zapcore" "time" ) type Contact struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + FullName string `db:"full_name"` Username sql.NullString `db:"username"` DefaultChannelID int64 `db:"default_channel_id"` @@ -33,12 +35,20 @@ func (c *Contact) MarshalLogObject(encoder zapcore.ObjectEncoder) error { var _ Recipient = (*Contact)(nil) type Address struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + ContactID int64 `db:"contact_id"` Type string `db:"type"` Address string `db:"address"` } +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (a *Address) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", a.ID) + encoder.AddInt64("contact_id", a.ContactID) + return nil +} + func (a *Address) TableName() string { return "contact_address" } diff --git a/internal/recipient/group.go b/internal/recipient/group.go index 243dde7b9..ea355d863 100644 --- a/internal/recipient/group.go +++ b/internal/recipient/group.go @@ -1,15 +1,16 @@ package recipient import ( + "github.com/icinga/icinga-notifications/internal/config/baseconf" "go.uber.org/zap/zapcore" "time" ) type Group struct { - ID int64 `db:"id"` - Name string `db:"name"` - Members []*Contact `db:"-"` - MemberIDs []int64 `db:"-"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + + Name string `db:"name"` + Members []*Contact `db:"-"` } func (g *Group) GetContactsAt(t time.Time) []*Contact { @@ -32,4 +33,31 @@ func (g *Group) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } +// GroupMemberKey represents the combined primary key of GroupMember. +type GroupMemberKey struct { + GroupId int64 `db:"contactgroup_id"` + ContactId int64 `db:"contact_id"` +} + +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (g *GroupMemberKey) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("contactgroup_id", g.GroupId) + encoder.AddInt64("contact_id", g.ContactId) + return nil +} + +type GroupMember struct { + GroupMemberKey `db:",inline"` + baseconf.IncrementalDbEntry `db:",inline"` +} + +func (g *GroupMember) TableName() string { + return "contactgroup_member" +} + +// GetPrimaryKey is required by the config.IncrementalConfigurable interface. +func (g *GroupMember) GetPrimaryKey() GroupMemberKey { + return g.GroupMemberKey +} + var _ Recipient = (*Group)(nil) diff --git a/internal/recipient/rotations.go b/internal/recipient/rotations.go index ea28da6f7..b2e684a72 100644 --- a/internal/recipient/rotations.go +++ b/internal/recipient/rotations.go @@ -26,12 +26,16 @@ func (r *rotationResolver) update(rotations []*Rotation) { // Group sortedByHandoff by priority using a temporary map with the priority as key. prioMap := make(map[int32]*rotationsWithPriority) for _, rotation := range rotations { - p := prioMap[rotation.Priority] + if !rotation.Priority.Valid { + continue + } + + p := prioMap[rotation.Priority.Int32] if p == nil { p = &rotationsWithPriority{ - priority: rotation.Priority, + priority: rotation.Priority.Int32, } - prioMap[rotation.Priority] = p + prioMap[rotation.Priority.Int32] = p } p.sortedByHandoff = append(p.sortedByHandoff, rotation) diff --git a/internal/recipient/rotations_test.go b/internal/recipient/rotations_test.go index 82bf3640b..7c2645d88 100644 --- a/internal/recipient/rotations_test.go +++ b/internal/recipient/rotations_test.go @@ -41,7 +41,7 @@ func Test_rotationResolver_getCurrentRotations(t *testing.T) { // Weekend rotation starting 2024, alternating between contacts contactWeekend2024a and contactWeekend2024b { ActualHandoff: types.UnixMilli(parse("2024-01-01")), - Priority: 0, + Priority: sql.NullInt32{Int32: 0, Valid: true}, Members: []*RotationMember{ { Contact: contactWeekend2024a, @@ -71,7 +71,7 @@ func Test_rotationResolver_getCurrentRotations(t *testing.T) { // alternating between contacts contactWeekend2025a and contactWeekend2025b { ActualHandoff: types.UnixMilli(parse("2025-01-01")), - Priority: 0, + Priority: sql.NullInt32{Int32: 0, Valid: true}, Members: []*RotationMember{ { Contact: contactWeekend2025a, @@ -101,7 +101,7 @@ func Test_rotationResolver_getCurrentRotations(t *testing.T) { // with an override for 12 to 14 o'clock with contactWeekdayNoon. { ActualHandoff: types.UnixMilli(parse("2024-01-01")), - Priority: 1, + Priority: sql.NullInt32{Int32: 1, Valid: true}, Members: []*RotationMember{ { Contact: contactWeekdayNoon, @@ -117,7 +117,7 @@ func Test_rotationResolver_getCurrentRotations(t *testing.T) { }, }, { ActualHandoff: types.UnixMilli(parse("2024-01-01")), - Priority: 2, + Priority: sql.NullInt32{Int32: 2, Valid: true}, Members: []*RotationMember{ { Contact: contactWeekday, diff --git a/internal/recipient/schedule.go b/internal/recipient/schedule.go index d66ef027d..e1ae79252 100644 --- a/internal/recipient/schedule.go +++ b/internal/recipient/schedule.go @@ -3,13 +3,15 @@ package recipient import ( "database/sql" "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/timeperiod" "go.uber.org/zap/zapcore" "time" ) type Schedule struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Name string `db:"name"` Rotations []*Rotation `db:"-"` @@ -32,10 +34,11 @@ func (s *Schedule) MarshalLogObject(encoder zapcore.ObjectEncoder) error { } type Rotation struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + ScheduleID int64 `db:"schedule_id"` ActualHandoff types.UnixMilli `db:"actual_handoff"` - Priority int32 `db:"priority"` + Priority sql.NullInt32 `db:"priority"` Name string `db:"name"` Members []*RotationMember `db:"-"` } @@ -44,13 +47,16 @@ type Rotation struct { func (r *Rotation) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", r.ID) encoder.AddInt64("schedule_id", r.ScheduleID) - encoder.AddInt32("priority", r.Priority) + if r.Priority.Valid { + encoder.AddInt32("priority", r.Priority.Int32) + } encoder.AddString("name", r.Name) return nil } type RotationMember struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + RotationID int64 `db:"rotation_id"` Contact *Contact `db:"-"` ContactID sql.NullInt64 `db:"contact_id"` @@ -59,6 +65,7 @@ type RotationMember struct { TimePeriodEntries map[int64]*timeperiod.Entry `db:"-"` } +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. func (r *RotationMember) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", r.ID) encoder.AddInt64("rotation_id", r.RotationID) diff --git a/internal/rule/escalation.go b/internal/rule/escalation.go index 823648cf7..40e14a178 100644 --- a/internal/rule/escalation.go +++ b/internal/rule/escalation.go @@ -2,6 +2,8 @@ package rule import ( "database/sql" + "fmt" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/recipient" "go.uber.org/zap/zapcore" @@ -10,7 +12,8 @@ import ( ) type Escalation struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + RuleID int64 `db:"rule_id"` NameRaw sql.NullString `db:"name"` Condition filter.Filter `db:"-"` @@ -21,6 +24,25 @@ type Escalation struct { Recipients []*EscalationRecipient `db:"-"` } +// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. +func (e *Escalation) IncrementalInitAndValidate() error { + if e.ConditionExpr.Valid { + cond, err := filter.Parse(e.ConditionExpr.String) + if err != nil { + return err + } + + e.Condition = cond + } + + if e.FallbackForID.Valid { + // TODO: implement fallbacks (needs extra validation: mismatching rule_id, cycles) + return fmt.Errorf("ignoring fallback escalation (not yet implemented)") + } + + return nil +} + // MarshalLogObject implements the zapcore.ObjectMarshaler interface. // // This allows us to use `zap.Inline(escalation)` or `zap.Object("rule_escalation", escalation)` wherever @@ -93,13 +115,24 @@ func (e *Escalation) TableName() string { } type EscalationRecipient struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + EscalationID int64 `db:"rule_escalation_id"` ChannelID sql.NullInt64 `db:"channel_id"` recipient.Key `db:",inline"` Recipient recipient.Recipient `db:"-"` } +// MarshalLogObject implements the zapcore.ObjectMarshaler interface. +func (r *EscalationRecipient) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", r.ID) + encoder.AddInt64("rule_escalation_id", r.EscalationID) + if r.ChannelID.Valid { + encoder.AddInt64("channel_id", r.ChannelID.Int64) + } + return r.Key.MarshalLogObject(encoder) +} + func (r *EscalationRecipient) TableName() string { return "rule_escalation_recipient" } diff --git a/internal/rule/rule.go b/internal/rule/rule.go index 22b3c0500..8b1fbcef1 100644 --- a/internal/rule/rule.go +++ b/internal/rule/rule.go @@ -2,6 +2,7 @@ package rule import ( "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/timeperiod" @@ -10,8 +11,8 @@ import ( ) type Rule struct { - ID int64 `db:"id"` - IsActive types.Bool `db:"is_active"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Name string `db:"name"` TimePeriod *timeperiod.TimePeriod `db:"-"` TimePeriodID types.Int `db:"timeperiod_id"` @@ -20,6 +21,20 @@ type Rule struct { Escalations map[int64]*Escalation `db:"-"` } +// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. +func (r *Rule) IncrementalInitAndValidate() error { + if r.ObjectFilterExpr.Valid { + f, err := filter.Parse(r.ObjectFilterExpr.String) + if err != nil { + return err + } + + r.ObjectFilter = f + } + + return nil +} + // MarshalLogObject implements the zapcore.ObjectMarshaler interface. func (r *Rule) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", r.ID) diff --git a/internal/timeperiod/timeperiod.go b/internal/timeperiod/timeperiod.go index e5718845b..431858969 100644 --- a/internal/timeperiod/timeperiod.go +++ b/internal/timeperiod/timeperiod.go @@ -2,7 +2,9 @@ package timeperiod import ( "database/sql" + "fmt" "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config/baseconf" "github.com/pkg/errors" "github.com/teambition/rrule-go" "go.uber.org/zap/zapcore" @@ -10,11 +12,19 @@ import ( ) type TimePeriod struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + Name string `db:"-"` Entries []*Entry `db:"-"` } +func (p *TimePeriod) IncrementalInitAndValidate() error { + if p.Name == "" { + p.Name = fmt.Sprintf("Time Period #%d", p.ID) + } + return nil +} + func (p *TimePeriod) TableName() string { return "timeperiod" } @@ -57,7 +67,8 @@ func (p *TimePeriod) NextTransition(base time.Time) time.Time { } type Entry struct { - ID int64 `db:"id"` + baseconf.IncrementalPkDbEntry[int64] `db:",inline"` + TimePeriodID int64 `db:"timeperiod_id"` StartTime types.UnixMilli `db:"start_time"` EndTime types.UnixMilli `db:"end_time"` @@ -69,6 +80,11 @@ type Entry struct { rrule *rrule.RRule } +// IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. +func (e *Entry) IncrementalInitAndValidate() error { + return e.Init() +} + // TableName implements the contracts.TableNamer interface. func (e *Entry) TableName() string { return "timeperiod_entry" @@ -77,12 +93,16 @@ func (e *Entry) TableName() string { // MarshalLogObject implements the zapcore.ObjectMarshaler interface. func (e *Entry) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", e.ID) + encoder.AddInt64("timeperiod_id", e.TimePeriodID) encoder.AddTime("start", e.StartTime.Time()) encoder.AddTime("end", e.EndTime.Time()) encoder.AddString("timezone", e.Timezone) if e.RRule.Valid { encoder.AddString("rrule", e.RRule.String) } + if e.RotationMemberID.Valid { + encoder.AddInt64("rotation_member_id", e.RotationMemberID.Int64) + } return nil } diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index 4002b281b..16518e490 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -48,55 +48,86 @@ CREATE TABLE channel ( -- for now type determines the implementation, in the future, this will need a reference to a concrete -- implementation to allow multiple implementations of a sms channel for example, probably even user-provided ones + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_channel PRIMARY KEY (id) ); +CREATE INDEX idx_channel_changed_at ON channel(changed_at); + CREATE TABLE contact ( id bigserial, full_name citext NOT NULL, username citext, -- reference to web user default_channel_id bigint NOT NULL REFERENCES channel(id), + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contact PRIMARY KEY (id), + + -- As the username is unique, it must be NULLed for deletion via "deleted = 'y'" UNIQUE (username) ); +CREATE INDEX idx_contact_changed_at ON contact(changed_at); + CREATE TABLE contact_address ( id bigserial, contact_id bigint NOT NULL REFERENCES contact(id), type text NOT NULL, -- 'phone', 'email', ... address text NOT NULL, -- phone number, email address, ... - CONSTRAINT pk_contact_address PRIMARY KEY (id), - UNIQUE (contact_id, type) -- constraint may be relaxed in the future to support multiple addresses per type + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + + CONSTRAINT pk_contact_address PRIMARY KEY (id) ); +CREATE INDEX idx_contact_address_changed_at ON contact_address(changed_at); + CREATE TABLE contactgroup ( id bigserial, name citext NOT NULL, + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contactgroup PRIMARY KEY (id) ); +CREATE INDEX idx_contactgroup_changed_at ON contactgroup(changed_at); + CREATE TABLE contactgroup_member ( contactgroup_id bigint NOT NULL REFERENCES contactgroup(id), contact_id bigint NOT NULL REFERENCES contact(id), + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contactgroup_member PRIMARY KEY (contactgroup_id, contact_id) ); +CREATE INDEX idx_contactgroup_member_changed_at ON contactgroup_member(changed_at); + CREATE TABLE schedule ( id bigserial, name citext NOT NULL, + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_schedule PRIMARY KEY (id) ); +CREATE INDEX idx_schedule_changed_at ON schedule(changed_at); + CREATE TABLE rotation ( id bigserial, schedule_id bigint NOT NULL REFERENCES schedule(id), -- the lower the more important, starting at 0, avoids the need to re-index upon addition - priority integer NOT NULL, + priority integer, name text NOT NULL, mode rotation_type NOT NULL, -- JSON with rotation-specific attributes @@ -105,34 +136,51 @@ CREATE TABLE rotation ( -- A date in the format 'YYYY-MM-DD' when the first handoff should happen. -- It is a string as handoffs are restricted to happen only once per day - first_handoff date NOT NULL, + first_handoff date, -- Set to the actual time of the first handoff. -- If this is in the past during creation of the rotation, it is set to the creation time. -- Used by Web to avoid showing shifts that never happened actual_handoff bigint NOT NULL, - -- each schedule can only have one rotation with a given priority starting at a given date + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + + -- Each schedule can only have one rotation with a given priority starting at a given date. + -- Columns schedule_id, priority, first_handoff must be NULLed for deletion via "deleted = 'y'". UNIQUE (schedule_id, priority, first_handoff), + CHECK (deleted = 'y' OR priority IS NOT NULL AND first_handoff IS NOT NULL), CONSTRAINT pk_rotation PRIMARY KEY (id) ); +CREATE INDEX idx_rotation_changed_at ON rotation(changed_at); + CREATE TABLE timeperiod ( id bigserial, owned_by_rotation_id bigint REFERENCES rotation(id), -- nullable for future standalone timeperiods + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_timeperiod PRIMARY KEY (id) ); +CREATE INDEX idx_timeperiod_changed_at ON timeperiod(changed_at); + CREATE TABLE rotation_member ( id bigserial, rotation_id bigint NOT NULL REFERENCES rotation(id), contact_id bigint REFERENCES contact(id), contactgroup_id bigint REFERENCES contactgroup(id), - position integer NOT NULL, + position integer, + + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', - UNIQUE (rotation_id, position), -- each position in a rotation can only be used once + -- Each position in a rotation can only be used once. + -- Column position must be NULLed for deletion via "deleted = 'y'". + UNIQUE (rotation_id, position), -- Two UNIQUE constraints prevent duplicate memberships of the same contact or contactgroup in a single rotation. -- Multiple NULLs are not considered to be duplicates, so rows with a contact_id but no contactgroup_id are @@ -140,11 +188,15 @@ CREATE TABLE rotation_member ( -- ensures that each row has only non-NULL values in one of these constraints. UNIQUE (rotation_id, contact_id), UNIQUE (rotation_id, contactgroup_id), - CHECK (num_nonnulls(contact_id, contactgroup_id) = 1), + + CONSTRAINT ck_rotation_member_either_contact_id_or_contactgroup_id CHECK (num_nonnulls(contact_id, contactgroup_id) = 1), + CONSTRAINT ck_rotation_member_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL), CONSTRAINT pk_rotation_member PRIMARY KEY (id) ); +CREATE INDEX idx_rotation_member_changed_at ON rotation_member(changed_at); + CREATE TABLE timeperiod_entry ( id bigserial, timeperiod_id bigint NOT NULL REFERENCES timeperiod(id), @@ -156,9 +208,14 @@ CREATE TABLE timeperiod_entry ( timezone text NOT NULL, -- e.g. 'Europe/Berlin', relevant for evaluating rrule (DST changes differ between zones) rrule text, -- recurrence rule (RFC5545) + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_timeperiod_entry PRIMARY KEY (id) ); +CREATE INDEX idx_timeperiod_entry_changed_at ON timeperiod_entry(changed_at); + CREATE TABLE source ( id bigserial, -- The type "icinga2" is special and requires (at least some of) the icinga2_ prefixed columns. @@ -184,6 +241,9 @@ CREATE TABLE source ( icinga2_common_name text, icinga2_insecure_tls boolenum NOT NULL DEFAULT 'n', + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + -- The hash is a PHP password_hash with PASSWORD_DEFAULT algorithm, defaulting to bcrypt. This check roughly ensures -- that listener_password_hash can only be populated with bcrypt hashes. -- https://icinga.com/docs/icinga-web/latest/doc/20-Advanced-Topics/#manual-user-creation-for-database-authentication-backend @@ -193,6 +253,8 @@ CREATE TABLE source ( CONSTRAINT pk_source PRIMARY KEY (id) ); +CREATE INDEX idx_source_changed_at ON source(changed_at); + CREATE TABLE object ( id bytea NOT NULL, -- SHA256 of identifying tags and the source.id source_id bigint NOT NULL REFERENCES source(id), @@ -258,25 +320,38 @@ CREATE TABLE rule ( name citext NOT NULL, timeperiod_id bigint REFERENCES timeperiod(id), object_filter text, - is_active boolenum NOT NULL DEFAULT 'y', + + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', CONSTRAINT pk_rule PRIMARY KEY (id) ); +CREATE INDEX idx_rule_changed_at ON rule(changed_at); + CREATE TABLE rule_escalation ( id bigserial, rule_id bigint NOT NULL REFERENCES rule(id), - position integer NOT NULL, + position integer, condition text, name citext, -- if not set, recipients are used as a fallback for display purposes fallback_for bigint REFERENCES rule_escalation(id), + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_rule_escalation PRIMARY KEY (id), + -- Each position in an escalation can only be used once. + -- Column position must be NULLed for deletion via "deleted = 'y'" UNIQUE (rule_id, position), - CHECK (NOT (condition IS NOT NULL AND fallback_for IS NOT NULL)) + + CONSTRAINT ck_rule_escalation_not_both_condition_and_fallback_for CHECK (NOT (condition IS NOT NULL AND fallback_for IS NOT NULL)), + CONSTRAINT ck_rule_escalation_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL) ); +CREATE INDEX idx_rule_escalation_changed_at ON rule_escalation(changed_at); + CREATE TABLE rule_escalation_recipient ( id bigserial, rule_escalation_id bigint NOT NULL REFERENCES rule_escalation(id), @@ -285,11 +360,16 @@ CREATE TABLE rule_escalation_recipient ( schedule_id bigint REFERENCES schedule(id), channel_id bigint REFERENCES channel(id), + changed_at bigint NOT NULL, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_rule_escalation_recipient PRIMARY KEY (id), CHECK (num_nonnulls(contact_id, contactgroup_id, schedule_id) = 1) ); +CREATE INDEX idx_rule_escalation_recipient_changed_at ON rule_escalation_recipient(changed_at); + CREATE TABLE incident ( id bigserial, object_id bytea NOT NULL REFERENCES object(id), diff --git a/schema/pgsql/upgrades/032.sql b/schema/pgsql/upgrades/032.sql new file mode 100644 index 000000000..2362a34c2 --- /dev/null +++ b/schema/pgsql/upgrades/032.sql @@ -0,0 +1,100 @@ +ALTER TABLE channel + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_channel_changed_at ON channel(changed_at); + +ALTER TABLE contact + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_contact_changed_at ON contact(changed_at); + +ALTER TABLE contact_address + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n', + DROP CONSTRAINT contact_address_contact_id_type_key; + +CREATE INDEX idx_contact_address_changed_at ON contact_address(changed_at); + +ALTER TABLE contactgroup + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_contactgroup_changed_at ON contactgroup(changed_at); + +ALTER TABLE contactgroup_member + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_contactgroup_member_changed_at ON contactgroup_member(changed_at); + +ALTER TABLE schedule + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_schedule_changed_at ON schedule(changed_at); + +ALTER TABLE rotation + ALTER COLUMN priority DROP NOT NULL, + ALTER COLUMN first_handoff DROP NOT NULL, + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n', + ADD CHECK (deleted = 'y' OR priority IS NOT NULL AND first_handoff IS NOT NULL); + +CREATE INDEX idx_rotation_changed_at ON rotation(changed_at); + +ALTER TABLE timeperiod + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_timeperiod_changed_at ON timeperiod(changed_at); + +ALTER TABLE rotation_member + RENAME CONSTRAINT rotation_member_check TO ck_rotation_member_either_contact_id_or_contactgroup_id; + +ALTER TABLE rotation_member + ALTER COLUMN position DROP NOT NULL, + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n', + ADD CONSTRAINT ck_rotation_member_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL); + +CREATE INDEX idx_rotation_member_changed_at ON rotation_member(changed_at); + +ALTER TABLE timeperiod_entry + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_timeperiod_entry_changed_at ON timeperiod_entry(changed_at); + +ALTER TABLE source + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_source_changed_at ON source(changed_at); + +ALTER TABLE rule + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_rule_changed_at ON rule(changed_at); + +UPDATE rule SET deleted = 'y' WHERE is_active = 'n'; +ALTER TABLE rule DROP COLUMN is_active; + +ALTER TABLE rule_escalation + RENAME CONSTRAINT rule_escalation_check TO ck_rule_escalation_not_both_condition_and_fallback_for; + +ALTER TABLE rule_escalation + ALTER COLUMN position DROP NOT NULL, + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n', + ADD CONSTRAINT ck_rule_escalation_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL); + +CREATE INDEX idx_rule_escalation_changed_at ON rule_escalation(changed_at); + +ALTER TABLE rule_escalation_recipient + ADD COLUMN changed_at bigint NOT NULL, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +CREATE INDEX idx_rule_escalation_recipient_changed_at ON rule_escalation_recipient(changed_at);