Skip to content

Commit

Permalink
fix: prevent silently dropped writes with overlapping shards (influxd…
Browse files Browse the repository at this point in the history
…ata#21946)



Co-authored-by: Geoffrey Wossum <[email protected]>
  • Loading branch information
danxmoran and gwossum authored Jul 27, 2021
1 parent 9f13f1c commit 9d81f4a
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21839](https://github.com/influxdata/influxdb/pull/21839): Fix display and parsing of `influxd upgrade` CLI prompts in PowerShell.
1. [21850](https://github.com/influxdata/influxdb/pull/21850): Systemd unit should block on startup until http endpoint is ready
1. [21925](https://github.com/influxdata/influxdb/pull/21925): Upgrade to golang-jwt 3.2.1.
1. [21946](https://github.com/influxdata/influxdb/pull/21946): Prevent silently dropped writes when there are overlapping shards.

## v2.0.7 [2021-06-04]

Expand Down
42 changes: 42 additions & 0 deletions cmd/influxd/launcher/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,45 @@ func TestLauncher_UpdateRetentionPolicy(t *testing.T) {
})
}
}

func TestLauncher_OverlappingShards(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l.ShutdownOrFail(t, ctx)

bkt := influxdb.Bucket{Name: "test", ShardGroupDuration: time.Hour, OrgID: l.Org.ID}
require.NoError(t, l.BucketService(t).CreateBucket(ctx, &bkt))

req := l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, bkt.ID),
"m,s=0 n=0 1626416520000000000\nm,s=0 n=1 1626420120000000000\n")
resp, err := nethttp.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

newDur := humanize.Day
_, err = l.BucketService(t).UpdateBucket(ctx, bkt.ID, influxdb.BucketUpdate{ShardGroupDuration: &newDur})
require.NoError(t, err)

req = l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, bkt.ID),
// NOTE: The 3rd point's timestamp is chronologically earlier than the other two points, but it
// must come after the others in the request to trigger the overlapping-shard bug. If it comes
// first in the request, the bug is avoided because:
// 1. The point-writer sees there is no shard for the earlier point, and creates a new 24h shard-group
// 2. The new 24 group covers the timestamps of the remaining 2 points, so the writer doesn't bother looking
// for existing shards that also cover the timestamp
// 3. With only 1 shard mapped to the 3 points, there is no overlap to trigger the bug
"m,s=0 n=0 1626416520000000000\nm,s=0 n=1 1626420120000000000\nm,s=1 n=1 1626412920000000000\n")
resp, err = nethttp.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

query := `from(bucket:"test") |> range(start:2000-01-01T00:00:00Z,stop:2050-01-01T00:00:00Z)` +
` |> drop(columns:["_start","_stop"])`
exp := `,result,table,_time,_value,_field,_measurement,s` + "\r\n" +
`,_result,0,2021-07-16T06:22:00Z,0,n,m,0` + "\r\n" +
`,_result,0,2021-07-16T07:22:00Z,1,n,m,0` + "\r\n" +
`,_result,1,2021-07-16T05:22:00Z,1,n,m,1` + "\r\n\r\n"

buf, err := http.SimpleQuery(l.URL(), query, l.Org.Name, l.Auth.Token)
require.NoError(t, err)
require.Equal(t, exp, string(buf))
}
80 changes: 66 additions & 14 deletions v1/coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
}

// Holds all the shard groups and shards that are required for writes.
list := make(sgList, 0, 8)
list := sgList{items: make(meta.ShardGroupInfos, 0, 8)}
min := time.Unix(0, models.MinNanoTime)
if rp.Duration > 0 {
min = time.Now().Add(-rp.Duration)
Expand All @@ -220,7 +220,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
if sg == nil {
return nil, errors.New("nil shard group")
}
list = list.Append(*sg)
list.Add(*sg)
}

mapping := NewShardMapping(len(wp.Points))
Expand All @@ -242,10 +242,21 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)

// sgList is a wrapper around a meta.ShardGroupInfos where we can also check
// if a given time is covered by any of the shard groups in the list.
type sgList meta.ShardGroupInfos
type sgList struct {
items meta.ShardGroupInfos

// needsSort indicates if items has been modified without a sort operation.
needsSort bool

// earliest is the last begin time of any item in items.
earliest time.Time

// latest is the greatest end time of any item in items.
latest time.Time
}

func (l sgList) Covers(t time.Time) bool {
if len(l) == 0 {
if len(l.items) == 0 {
return false
}
return l.ShardGroupAt(t) != nil
Expand All @@ -261,20 +272,61 @@ func (l sgList) Covers(t time.Time) bool {
// - a shard group with the earliest end time;
// - (assuming identical end times) the shard group with the earliest start time.
func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo {
idx := sort.Search(len(l), func(i int) bool { return l[i].EndTime.After(t) })

// We couldn't find a shard group the point falls into.
if idx == len(l) || t.Before(l[idx].StartTime) {
if l.items.Len() == 0 {
return nil
}
return &l[idx]

// find the earliest shardgroup that could contain this point using binary search.
if l.needsSort {
sort.Sort(l.items)
l.needsSort = false
}
idx := sort.Search(l.items.Len(), func(i int) bool { return l.items[i].EndTime.After(t) })

// Check if sort.Search actually found the proper shard. It feels like we should also
// be checking l.items[idx].EndTime, but sort.Search was looking at that field for us.
if idx == l.items.Len() || t.Before(l.items[idx].StartTime) {
// This could mean we are looking for a time not in the list, or we have
// overlaping shards. Overlapping shards do not work with binary searches
// on 1d arrays. You have to use an interval tree, but that's a lot of
// work for what is hopefully a rare event. Instead, we'll check if t
// should be in l, and perform a linear search if it is. This way we'll
// do the correct thing, it may just take a little longer. If we don't
// do this, then we may non-silently drop writes we should have accepted.

if t.Before(l.earliest) || t.After(l.latest) {
// t is not in range, we can avoid going through the linear search.
return nil
}

// Oh no, we've probably got overlapping shards. Perform a linear search.
for idx = 0; idx < l.items.Len(); idx++ {
if l.items[idx].Contains(t) {
// Found it!
break
}
}
if idx == l.items.Len() {
// We did not find a shard which contained t. This is very strange.
return nil
}
}

return &l.items[idx]
}

// Append appends a shard group to the list, and returns a sorted list.
func (l sgList) Append(sgi meta.ShardGroupInfo) sgList {
next := append(l, sgi)
sort.Sort(meta.ShardGroupInfos(next))
return next
// Add appends a shard group to the list, updating the earliest/latest times of the list if needed.
func (l *sgList) Add(sgi meta.ShardGroupInfo) {
l.items = append(l.items, sgi)
l.needsSort = true

// Update our earliest and latest times for l.items
if l.earliest.IsZero() || l.earliest.After(sgi.StartTime) {
l.earliest = sgi.StartTime
}
if l.latest.IsZero() || l.latest.Before(sgi.EndTime) {
l.latest = sgi.EndTime
}
}

// WritePoints writes the data to the underlying storage. consistencyLevel and user are only used for clustered scenarios
Expand Down
53 changes: 52 additions & 1 deletion v1/coordinator/points_writer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package coordinator
import (
"testing"
"time"

"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/stretchr/testify/require"
)

func TestSgList_ShardGroupAt(t *testing.T) {
Expand All @@ -11,14 +14,18 @@ func TestSgList_ShardGroupAt(t *testing.T) {
return base.Add(time.Duration(24*n) * time.Hour)
}

list := sgList{
items := meta.ShardGroupInfos{
{ID: 1, StartTime: day(0), EndTime: day(1)},
{ID: 2, StartTime: day(1), EndTime: day(2)},
{ID: 3, StartTime: day(2), EndTime: day(3)},
// SG day 3 to day 4 missing...
{ID: 4, StartTime: day(4), EndTime: day(5)},
{ID: 5, StartTime: day(5), EndTime: day(6)},
}
var list sgList
for _, i := range items {
list.Add(i)
}

examples := []struct {
T time.Time
Expand All @@ -44,3 +51,47 @@ func TestSgList_ShardGroupAt(t *testing.T) {
}
}
}

func TestSgList_ShardGroupAtOverlapping(t *testing.T) {
base := time.Date(2016, 10, 19, 0, 0, 0, 0, time.UTC)
hour := func(n int) time.Time {
return base.Add(time.Duration(n) * time.Hour)
}
day := func(n int) time.Time {
return base.Add(time.Duration(24*n) * time.Hour)
}

items := meta.ShardGroupInfos{
{ID: 1, StartTime: hour(5), EndTime: hour(6)},
{ID: 2, StartTime: hour(6), EndTime: hour(7)},
// Day-long shard overlaps with the two hour-long shards.
{ID: 3, StartTime: base, EndTime: day(1)},
}
var list sgList
for _, i := range items {
list.Add(i)
}

examples := []struct {
T time.Time
ShardGroupID uint64 // 0 will indicate we don't expect a shard group
}{
{T: base.Add(-time.Minute), ShardGroupID: 0}, // Before any SG
{T: base, ShardGroupID: 3},
{T: hour(5), ShardGroupID: 1},
{T: hour(7).Add(-time.Minute), ShardGroupID: 2},
{T: hour(8), ShardGroupID: 3},
{T: day(2), ShardGroupID: 0}, // No matching SG
}

for _, example := range examples {
t.Run(example.T.String(), func(t *testing.T) {
sg := list.ShardGroupAt(example.T)
var id uint64
if sg != nil {
id = sg.ID
}
require.Equal(t, example.ShardGroupID, id)
})
}
}

0 comments on commit 9d81f4a

Please sign in to comment.