Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use namespace-regex config has not authorized bug #525

Open
wants to merge 1 commit into
base: rel6
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion monstache.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,10 @@ func (config *configOptions) dynamicDirectReadList() bool {
return len(config.DirectReadNs) == 1 && config.DirectReadNs[0] == ""
}

func (config *configOptions) dynamicChangeStreamList() bool {
return len(config.ChangeStreamNs) == 1 && config.ChangeStreamNs[0] == ""
}

func (config *configOptions) ignoreDatabaseForDirectReads(db string) bool {
return db == "local" || db == "admin" || db == "config" || db == config.ConfigDatabaseName
}
Expand All @@ -529,6 +533,14 @@ func (config *configOptions) ignoreCollectionForDirectReads(col string) bool {
return strings.HasPrefix(col, "system.")
}

func (config *configOptions) ignoreDatabaseForChangeStreamReads(db string) bool {
return config.ignoreDatabaseForDirectReads(db)
}

func (config *configOptions) ignoreCollectionForChangeStreamReads(col string) bool {
return config.ignoreCollectionForDirectReads(col)
}

func afterBulk(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if response == nil || !response.Errors {
return
Expand Down Expand Up @@ -4573,7 +4585,6 @@ func (ic *indexClient) buildDynamicDirectReadNs(filter gtm.OpFilter) (names []st
}
}
}

if len(names) == 0 {
warnLog.Println("Dynamic direct read candidates: NONE")
} else {
Expand All @@ -4582,6 +4593,59 @@ func (ic *indexClient) buildDynamicDirectReadNs(filter gtm.OpFilter) (names []st
return
}

func (ic *indexClient) buildDynamicChangeStreamNs(filter gtm.OpFilter) (names []string) {
client, config := ic.mongo, ic.config
if config.DirectReadExcludeRegex != "" {
filter = gtm.ChainOpFilters(filterInverseWithRegex(config.NsRegex), filter)
}
if config.DirectReadIncludeRegex != "" {
filter = gtm.ChainOpFilters(filterWithRegex(config.NsRegex), filter)
}

dbs, err := client.ListDatabaseNames(context.Background(), bson.M{})
if err != nil {
errorLog.Fatalf("Failed to read database names for dynamic direct reads: %s", err)
}
uniqueNSMap := make(map[string]struct{}, 0)
// has dynamic rules, watch database
// at the same time match the exact table name as much as possible
for _, d := range dbs {
if config.ignoreDatabaseForChangeStreamReads(d) {
continue
}

uniqueNSMap[d] = struct{}{}
db := client.Database(d)
cols, err := db.ListCollectionNames(context.Background(), bson.M{})
if err != nil {
errorLog.Fatalf("Failed to read db %s collection names for dynamic direct reads: %s", d, err)
return
}
for _, c := range cols {
if config. ignoreCollectionForChangeStreamReads(c) {
continue
}
ns := strings.Join([]string{d, c}, ".")
if filter(&gtm.Op{Namespace: ns}) {
names = append(names, ns)
} else {
infoLog.Printf("Excluding collection [%s] for dynamic direct reads", ns)
}
}
}

// has dynamic rules, watch database
for name := range uniqueNSMap {
names = append(names, name )
}
if len(names) == 0 {
warnLog.Println("Dynamic change stream read candidates: NONE")
} else {
infoLog.Printf("Dynamic change stream read candidates: %v", names)
}
return
}

func (ic *indexClient) parseBufferDuration() time.Duration {
config := ic.config
gtmBufferDuration, err := time.ParseDuration(config.GtmSettings.BufferDuration)
Expand Down Expand Up @@ -4617,16 +4681,23 @@ func (ic *indexClient) buildGtmOptions() *gtm.Options {
directReadFilter = gtm.ChainOpFilters(filterArray...)
after := ic.buildTimestampGen()
token := ic.buildTokenGen()

if config.dynamicDirectReadList() {
config.DirectReadNs = ic.buildDynamicDirectReadNs(nsFilter)
}

if config.DirectReadStateful {
var err error
config.DirectReadNs, err = ic.filterDirectReadNamespaces(config.DirectReadNs)
if err != nil {
errorLog.Fatalf("Error retrieving direct read state: %s", err)
}
}
if config.dynamicChangeStreamList() {
config.ChangeStreamNs = ic.buildDynamicChangeStreamNs(nsFilter)
}


gtmOpts := &gtm.Options{
After: after,
Token: token,
Expand Down