Skip to content

Commit

Permalink
feat: add hook for optimizing series reads based on authorizer (#25207)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwossum authored Aug 2, 2024
1 parent 7333da9 commit 2cf2103
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 4 deletions.
13 changes: 13 additions & 0 deletions internal/authorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type AuthorizerMock struct {
AuthorizeQueryFn func(database string, query *influxql.Query) error
AuthorizeSeriesReadFn func(database string, measurement []byte, tags models.Tags) bool
AuthorizeSeriesWriteFn func(database string, measurement []byte, tags models.Tags) bool
OptimizeSeriesReadFn func(database string, measurement []byte, expr influxql.Expr) (influxql.Expr, query.FineAuthorizer, error)
}

// AuthorizeDatabase determines if the provided privilege is sufficient to
Expand All @@ -32,6 +33,14 @@ func (a *AuthorizerMock) AuthorizeSeriesRead(database string, measurement []byte
return a.AuthorizeSeriesReadFn(database, measurement, tags)
}

// OptimizeSeriesRead optimizes series read queries based on the authorizer.
func (a *AuthorizerMock) OptimizeSeriesRead(database string, measurement []byte, expr influxql.Expr) (influxql.Expr, query.FineAuthorizer, error) {
if a.OptimizeSeriesReadFn != nil {
return a.OptimizeSeriesReadFn(database, measurement, expr)
}
return expr, a, nil
}

// AuthorizeSeriesWrite determines if the series comprising measurement and tags
// can be written to, on the provided database.
func (a *AuthorizerMock) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
Expand All @@ -41,3 +50,7 @@ func (a *AuthorizerMock) AuthorizeSeriesWrite(database string, measurement []byt
func (a *AuthorizerMock) IsOpen() bool {
return false
}

func (a *AuthorizerMock) IsVoid() bool {
return false
}
46 changes: 46 additions & 0 deletions query/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,17 @@ type FineAuthorizer interface {
// AuthorizeSeriesRead determines if a series is authorized for reading
AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool

// OptimizeSeriesRead produces an optimized authorizer-aware WHERE expression and updated authorizer.
OptimizeSeriesRead(database string, measurement []byte, expr influxql.Expr) (influxql.Expr, FineAuthorizer, error)

// AuthorizeSeriesWrite determines if a series is authorized for writing
AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool

// IsOpen guarantees that the other methods of a FineAuthorizer always return true.
IsOpen() bool

// IsVoid guarantees that Authorize methods of a FineAuthorizer always return false.
IsVoid() bool
}

// OpenAuthorizer is the Authorizer used when authorization is disabled.
Expand All @@ -109,16 +115,49 @@ func (a openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte,
return true
}

// OptimizeSeriesRead is a no-op for openAuthorizer.
func (a openAuthorizer) OptimizeSeriesRead(database string, measurement []byte, expr influxql.Expr) (influxql.Expr, FineAuthorizer, error) {
return expr, a, nil
}

// AuthorizeSeriesWrite allows access to any series.
func (a openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return true
}

func (a openAuthorizer) IsOpen() bool { return true }

func (a openAuthorizer) IsVoid() bool { return false }

// AuthorizeSeriesRead allows any query to execute.
func (a openAuthorizer) AuthorizeQuery(_ string, _ *influxql.Query) error { return nil }

// VoidAuthorizer is the Authorizer used when no access is possible.
// It disallows all operations.
type voidAuthorizer struct{}

// VoidAuthorizer can be shared by all goroutines.
var VoidAuthorizer = voidAuthorizer{}

// AuthorizeSeriesRead allows access to no series.
func (a voidAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
return false
}

// OptimizeSeriesRead is a no-op for voidAuthorizer.
func (a voidAuthorizer) OptimizeSeriesRead(database string, measurement []byte, expr influxql.Expr) (influxql.Expr, FineAuthorizer, error) {
return expr, a, nil
}

// AuthorizeSeriesWrite allows access no series.
func (a voidAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return false
}

func (a voidAuthorizer) IsOpen() bool { return false }

func (a voidAuthorizer) IsVoid() bool { return true }

// AuthorizerIsOpen returns true if the provided Authorizer is guaranteed to
// authorize anything. A nil Authorizer returns true for this function, and this
// function should be preferred over directly checking if an Authorizer is nil
Expand All @@ -127,6 +166,13 @@ func AuthorizerIsOpen(a FineAuthorizer) bool {
return a == nil || a.IsOpen()
}

// AuthorizerIsVoid returns true if the provided Authorizer is guaranteed to
// not authorize anything. A nil Authorizer acts as an openAuthorizer, and thus
// not a void authorizer.
func AuthorizerIsVoid(a FineAuthorizer) bool {
return a != nil && a.IsVoid()
}

// ExecutionOptions contains the options for executing a query.
type ExecutionOptions struct {
// The database the query is running against.
Expand Down
6 changes: 6 additions & 0 deletions query/subquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,18 @@ func (*openAuthorizer) AuthorizeQuery(database string, query *influxql.Query) er
func (*openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
return true
}
func (o *openAuthorizer) OptimizeSeriesRead(database string, measurement []byte, expr influxql.Expr) (influxql.Expr, query.FineAuthorizer, error) {
return expr, o, nil
}
func (*openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return true
}
func (*openAuthorizer) IsOpen() bool {
return true
}
func (*openAuthorizer) IsVoid() bool {
return false
}

// Ensure that the subquery gets passed the query authorizer.
func TestSubquery_Authorizer(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,11 @@ func (u *UserInfo) AuthorizeSeriesRead(database string, measurement []byte, tags
return true
}

// OptimizeSeriesRead optimizes series read based on the authorizer.
func (u *UserInfo) OptimizeSeriesRead(database string, measurement []byte, expr influxql.Expr) (influxql.Expr, query.FineAuthorizer, error) {
return expr, u, nil
}

// AuthorizeSeriesWrite is used to limit access per-series (enterprise only)
func (u *UserInfo) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return true
Expand All @@ -1679,6 +1684,11 @@ func (u *UserInfo) IsOpen() bool {
return true
}

// IsVoid is a method on FineAuthorizer to indicate all fine auth is permitted and short circuit some checks.
func (u *UserInfo) IsVoid() bool {
return false
}

// AuthorizeUnrestricted identifies the admin user
//
// Only the pprof endpoint uses this, we should prefer to have explicit permissioning instead.
Expand Down
22 changes: 20 additions & 2 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2908,6 +2908,7 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte,
}

// Iterate all series to collect tag values.
authIsOpen := query.AuthorizerIsOpen(auth)
for {
e, err := itr.Next()
if err != nil {
Expand All @@ -2933,7 +2934,7 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte,
continue
}

if auth != nil {
if !authIsOpen {
name, tags := ParseSeriesKey(buf)
if len(name) == 0 {
continue
Expand Down Expand Up @@ -2970,7 +2971,7 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.FineAuthorizer, name []byte,
}

// MeasurementTagKeyValuesByExpr returns a set of tag values filtered by an expression.
func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool) ([][]string, error) {
func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name []byte, keys []string, expr influxql.Expr, keysSorted bool, log *zap.Logger) ([][]string, error) {
if len(keys) == 0 {
return nil, nil
}
Expand All @@ -2981,6 +2982,23 @@ func (is IndexSet) MeasurementTagKeyValuesByExpr(auth query.FineAuthorizer, name
sort.Strings(keys)
}

if auth != nil {
// OptimizeSeriesRead is a new step. In the extremely unlikely case it returns
// an error, we don't want to abort the query. This would mean tha a query that worked on
// a previous version suddenly breaks on a newer version, all because we tried and failed
// to speed it up. The original expr and auth will still yield the correct answer in the
// same time as the previous versions.
if newExpr, newAuth, err := auth.OptimizeSeriesRead(is.Database(), name, expr); err == nil {
auth = newAuth
expr = newExpr
} else {
log.Error("MeasurementTagKeyValuesByExpr: error in OptimizeSeriesRead, using unoptimized expr and auth", zap.Error(err))
}
}
if query.AuthorizerIsVoid(auth) {
return results, nil
}

release := is.SeriesFile.Retain()
defer release()

Expand Down
4 changes: 2 additions & 2 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1857,7 +1857,7 @@ func (s *Store) TagKeys(ctx context.Context, auth query.FineAuthorizer, shardIDs
sort.Strings(keys)

// Filter against tag values, skip if no values exist.
values, err := is.MeasurementTagKeyValuesByExpr(auth, name, keys, filterExpr, true)
values, err := is.MeasurementTagKeyValuesByExpr(auth, name, keys, filterExpr, true, s.Logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2079,7 +2079,7 @@ func (s *Store) TagValues(ctx context.Context, auth query.FineAuthorizer, shardI
// get all the tag values for each key in the keyset.
// Each slice in the results contains the sorted values associated
// associated with each tag key for the measurement from the key set.
if result.values, err = is.MeasurementTagKeyValuesByExpr(auth, name, result.keys, filterExpr, true); err != nil {
if result.values, err = is.MeasurementTagKeyValuesByExpr(auth, name, result.keys, filterExpr, true, s.Logger); err != nil {
return nil, err
}

Expand Down

0 comments on commit 2cf2103

Please sign in to comment.