Skip to content

Commit

Permalink
Merge pull request #16 from nyaruka/backfill-monthlies
Browse files Browse the repository at this point in the history
create montly archives when doing backfills, add input and value to r…
  • Loading branch information
nicpottier authored May 22, 2018
2 parents 304d989 + 05e16ee commit 244c913
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 82 deletions.
89 changes: 63 additions & 26 deletions archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@ type Archive struct {
Dailies []*Archive
}

func (a *Archive) endDate() time.Time {
endDate := a.StartDate
if a.Period == DayPeriod {
endDate = endDate.AddDate(0, 0, 1)
} else {
endDate = endDate.AddDate(0, 1, 0)
}
return endDate
}

func (a *Archive) coversDate(d time.Time) bool {
end := a.endDate()
return !a.StartDate.After(d) && end.After(d)
}

const lookupActiveOrgs = `SELECT id, name, created_on, is_anon FROM orgs_org WHERE is_active = TRUE order by id`

// GetActiveOrgs returns the active organizations sorted by id
Expand All @@ -103,7 +118,7 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB) ([]Org, error) {
return orgs, nil
}

const lookupOrgArchives = `SELECT id, start_date, period, archive_type, hash, size, record_count, url, rollup_id FROM archives_archive WHERE org_id = $1 AND archive_type = $2 ORDER BY start_date asc`
const lookupOrgArchives = `SELECT id, start_date, period, archive_type, hash, size, record_count, url, rollup_id FROM archives_archive WHERE org_id = $1 AND archive_type = $2 ORDER BY start_date asc, period desc`

// GetCurrentArchives returns all the current archives for the passed in org and record type
func GetCurrentArchives(ctx context.Context, db *sqlx.DB, org Org, archiveType ArchiveType) ([]*Archive, error) {
Expand All @@ -118,7 +133,6 @@ func GetCurrentArchives(ctx context.Context, db *sqlx.DB, org Org, archiveType A

// GetMissingDayArchives calculates what archives need to be generated for the passed in org this is calculated per day
func GetMissingDayArchives(archives []*Archive, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, error) {
// our first archive would be active days from today
endDate := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, -org.ActiveDays)
orgUTC := org.CreatedOn.In(time.UTC)
startDate := time.Date(orgUTC.Year(), orgUTC.Month(), orgUTC.Day(), 0, 0, 0, 0, time.UTC)
Expand All @@ -131,12 +145,12 @@ func GetMissingDayArchives(archives []*Archive, now time.Time, org Org, archiveT
existing := false

// advance our current archive idx until we are on our start date or later
for archiveIDX < len(archives) && (archives[archiveIDX].StartDate.Before(startDate) || archives[archiveIDX].Period == MonthPeriod) {
for archiveIDX < len(archives) && archives[archiveIDX].StartDate.Before(startDate) && !archives[archiveIDX].coversDate(startDate) {
archiveIDX++
}

// do we already have this archive?
if archiveIDX < len(archives) && archives[archiveIDX].StartDate.Equal(startDate) {
// do we already have an archive covering this date?
if archiveIDX < len(archives) && archives[archiveIDX].coversDate(startDate) {
existing = true
}

Expand All @@ -158,7 +172,7 @@ func GetMissingDayArchives(archives []*Archive, now time.Time, org Org, archiveT
return missing, nil
}

// GetMissingMonthArchives gets which archives need to be rolled up in the set that has been passed in
// GetMissingMonthArchives gets which montly archives are currently missing for this org
func GetMissingMonthArchives(archives []*Archive, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, error) {
lastActive := now.AddDate(0, 0, -org.ActiveDays)
endDate := time.Date(lastActive.Year(), lastActive.Month(), 1, 0, 0, 0, 0, time.UTC)
Expand All @@ -174,11 +188,11 @@ func GetMissingMonthArchives(archives []*Archive, now time.Time, org Org, archiv
existing := false

// advance our current archive idx until we are on our start date or later
for archiveIDX < len(archives) && (archives[archiveIDX].StartDate.Before(startDate) || archives[archiveIDX].Period == DayPeriod) {
for archiveIDX < len(archives) && (archives[archiveIDX].StartDate.Before(startDate) || archives[archiveIDX].Period != MonthPeriod) {
archiveIDX++
}

// do we already have this archive?
// do we already have an archive covering this date?
if archiveIDX < len(archives) && archives[archiveIDX].StartDate.Equal(startDate) {
existing = true
}
Expand All @@ -202,8 +216,8 @@ func GetMissingMonthArchives(archives []*Archive, now time.Time, org Org, archiv
return missing, nil
}

// BuildMonthlyArchive builds a monthly archive from the files present on S3
func BuildMonthlyArchive(ctx context.Context, conf *Config, s3Client s3iface.S3API, archives []*Archive, month *Archive, now time.Time, org Org, archiveType ArchiveType) error {
// BuildRollupArchive builds a monthly archive from the files present on S3
func BuildRollupArchive(ctx context.Context, conf *Config, s3Client s3iface.S3API, archives []*Archive, month *Archive, now time.Time, org Org, archiveType ArchiveType) error {
start := time.Now()

log := logrus.WithFields(logrus.Fields{
Expand Down Expand Up @@ -383,7 +397,7 @@ FROM (
from jsonb_array_elements(fr.path :: jsonb) as path_row) as path_data
) as path,
(select coalesce(jsonb_agg(values_data.tmp_values), '{}'::jsonb) from (
select json_build_object(key, jsonb_build_object('name', value -> 'name', 'time', (value -> 'created_on')::text::timestamptz, 'category', value -> 'category', 'node', value -> 'node_uuid')) as tmp_values
select json_build_object(key, jsonb_build_object('name', value -> 'name', 'value', value -> 'value', 'input', value -> 'input', 'time', (value -> 'created_on')::text::timestamptz, 'category', value -> 'category', 'node', value -> 'node_uuid')) as tmp_values
FROM jsonb_each(fr.results :: jsonb)) as values_data
) as values,
CASE
Expand Down Expand Up @@ -454,6 +468,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
"org_id": archive.Org.ID,
"archive_type": archive.ArchiveType,
"start_date": archive.StartDate,
"end_date": archive.endDate(),
"period": archive.Period,
})

Expand All @@ -478,12 +493,11 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
return err
}

endDate := archive.StartDate.Add(time.Hour * 24)
var rows *sqlx.Rows
if archive.ArchiveType == MessageType {
rows, err = tx.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, endDate)
rows, err = tx.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate())
} else if archive.ArchiveType == RunType {
rows, err = tx.QueryxContext(ctx, lookupFlowRuns, archive.Org.IsAnon, archive.Org.ID, archive.StartDate, endDate)
rows, err = tx.QueryxContext(ctx, lookupFlowRuns, archive.Org.IsAnon, archive.Org.ID, archive.StartDate, archive.endDate())
}
if err != nil {
return err
Expand Down Expand Up @@ -689,8 +703,8 @@ func DeleteArchiveFile(archive *Archive) error {
return nil
}

// BuildOrgDailyArchives builds all the montly archives for the passid in org
func BuildOrgDailyArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
// CreateOrgArchives builds all the missing archives for the passed in org
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)
records := 0
created := make([]*Archive, 0, 1)
Expand All @@ -701,13 +715,35 @@ func BuildOrgDailyArchives(ctx context.Context, now time.Time, config *Config, d
return nil, fmt.Errorf("error getting current archives")
}

archives, err := GetMissingDayArchives(existing, now, org, archiveType)
if err != nil {
return nil, fmt.Errorf("error calculating tasks for type '%s'", archiveType)
var archives []*Archive
if len(existing) == 0 {
// no existing archives means this might be a backfill, figure out if there are full monthes we can build first
archives, err = GetMissingMonthArchives(existing, now, org, archiveType)
if err != nil {
log.WithError(err).Error("error calculating missing monthly archives")
return nil, err
}

// then add in daily archives taking into account the monthly that will be built
daily, err := GetMissingDayArchives(archives, now, org, archiveType)
if err != nil {
log.WithError(err).Error("error calculating missing daily archives")
return nil, err
}
for _, d := range daily {
archives = append(archives, d)
}
} else {
// figure out any missing day archives
archives, err = GetMissingDayArchives(existing, now, org, archiveType)
if err != nil {
log.WithError(err).Error("error calculating missing daily archives")
return nil, err
}
}

for _, archive := range archives {
log = log.WithField("start_date", archive.StartDate).WithField("period", archive.Period).WithField("archive_type", archive.ArchiveType)
log = log.WithField("start_date", archive.StartDate).WithField("end_date", archive.endDate()).WithField("period", archive.Period).WithField("archive_type", archive.ArchiveType)
log.Info("starting archive")
err := CreateArchiveFile(ctx, db, archive, config.TempDir)
if err != nil {
Expand Down Expand Up @@ -754,8 +790,8 @@ func BuildOrgDailyArchives(ctx context.Context, now time.Time, config *Config, d
return created, nil
}

// BuildOrgMonthlyArchives builds all the montly archives for the passid in org
func BuildOrgMonthlyArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
// RollupOrgArchives rolls up monthly archives from our daily archives
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
log := logrus.WithField("org", org.Name).WithField("org_id", org.ID)
records := 0
created := make([]*Archive, 0, 1)
Expand All @@ -766,14 +802,15 @@ func BuildOrgMonthlyArchives(ctx context.Context, now time.Time, config *Config,
return nil, fmt.Errorf("error getting current archives")
}

// now build our monthlies
// get our missing monthly archives
archives, err := GetMissingMonthArchives(existing, now, org, archiveType)
if err != nil {
return nil, fmt.Errorf("error calculating missing monthly archives for type '%s'", archiveType)
}

// build them from rollups
for _, archive := range archives {
err = BuildMonthlyArchive(ctx, config, s3Client, existing, archive, now, org, archiveType)
err = BuildRollupArchive(ctx, config, s3Client, existing, archive, now, org, archiveType)
if err != nil {
log.WithError(err).Error("error building monthly archive")
continue
Expand Down Expand Up @@ -816,12 +853,12 @@ func BuildOrgMonthlyArchives(ctx context.Context, now time.Time, config *Config,

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
created, err := BuildOrgDailyArchives(ctx, now, config, db, s3Client, org, archiveType)
created, err := CreateOrgArchives(ctx, now, config, db, s3Client, org, archiveType)
if err != nil {
return nil, err
}

monthlies, err := BuildOrgMonthlyArchives(ctx, now, config, db, s3Client, org, archiveType)
monthlies, err := RollupOrgArchives(ctx, now, config, db, s3Client, org, archiveType)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 244c913

Please sign in to comment.