From 3820073a8a6951fccc354a610da3ca14b633e8c9 Mon Sep 17 00:00:00 2001 From: rentiansheng Date: Fri, 4 Jun 2021 20:40:18 +0800 Subject: [PATCH] fix: use namespace-regix config has not authorized bug issue #524 --- monstache.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/monstache.go b/monstache.go index c487c20..83356fd 100644 --- a/monstache.go +++ b/monstache.go @@ -521,6 +521,10 @@ func (config *configOptions) dynamicDirectReadList() bool { return len(config.DirectReadNs) == 1 && config.DirectReadNs[0] == "" } +func (config *configOptions) dynamicChangeStreamList() bool { + return len(config.ChangeStreamNs) == 1 && config.ChangeStreamNs[0] == "" +} + func (config *configOptions) ignoreDatabaseForDirectReads(db string) bool { return db == "local" || db == "admin" || db == "config" || db == config.ConfigDatabaseName } @@ -529,6 +533,14 @@ func (config *configOptions) ignoreCollectionForDirectReads(col string) bool { return strings.HasPrefix(col, "system.") } +func (config *configOptions) ignoreDatabaseForChangeStreamReads(db string) bool { + return config.ignoreDatabaseForDirectReads(db) +} + +func (config *configOptions) ignoreCollectionForChangeStreamReads(col string) bool { + return config.ignoreCollectionForDirectReads(col) +} + func afterBulk(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { if response == nil || !response.Errors { return @@ -4573,7 +4585,6 @@ func (ic *indexClient) buildDynamicDirectReadNs(filter gtm.OpFilter) (names []st } } } - if len(names) == 0 { warnLog.Println("Dynamic direct read candidates: NONE") } else { @@ -4582,6 +4593,59 @@ func (ic *indexClient) buildDynamicDirectReadNs(filter gtm.OpFilter) (names []st return } +func (ic *indexClient) buildDynamicChangeStreamNs(filter gtm.OpFilter) (names []string) { + client, config := ic.mongo, ic.config + if config.DirectReadExcludeRegex != "" { + filter = gtm.ChainOpFilters(filterInverseWithRegex(config.NsRegex), filter) + } + if config.DirectReadIncludeRegex != "" { + filter = gtm.ChainOpFilters(filterWithRegex(config.NsRegex), filter) + } + + dbs, err := client.ListDatabaseNames(context.Background(), bson.M{}) + if err != nil { + errorLog.Fatalf("Failed to read database names for dynamic direct reads: %s", err) + } + uniqueNSMap := make(map[string]struct{}, 0) + // has dynamic rules, watch database + // at the same time match the exact table name as much as possible + for _, d := range dbs { + if config.ignoreDatabaseForChangeStreamReads(d) { + continue + } + + uniqueNSMap[d] = struct{}{} + db := client.Database(d) + cols, err := db.ListCollectionNames(context.Background(), bson.M{}) + if err != nil { + errorLog.Fatalf("Failed to read db %s collection names for dynamic direct reads: %s", d, err) + return + } + for _, c := range cols { + if config. ignoreCollectionForChangeStreamReads(c) { + continue + } + ns := strings.Join([]string{d, c}, ".") + if filter(>m.Op{Namespace: ns}) { + names = append(names, ns) + } else { + infoLog.Printf("Excluding collection [%s] for dynamic direct reads", ns) + } + } + } + + // has dynamic rules, watch database + for name := range uniqueNSMap { + names = append(names, name ) + } + if len(names) == 0 { + warnLog.Println("Dynamic change stream read candidates: NONE") + } else { + infoLog.Printf("Dynamic change stream read candidates: %v", names) + } + return +} + func (ic *indexClient) parseBufferDuration() time.Duration { config := ic.config gtmBufferDuration, err := time.ParseDuration(config.GtmSettings.BufferDuration) @@ -4617,9 +4681,11 @@ func (ic *indexClient) buildGtmOptions() *gtm.Options { directReadFilter = gtm.ChainOpFilters(filterArray...) after := ic.buildTimestampGen() token := ic.buildTokenGen() + if config.dynamicDirectReadList() { config.DirectReadNs = ic.buildDynamicDirectReadNs(nsFilter) } + if config.DirectReadStateful { var err error config.DirectReadNs, err = ic.filterDirectReadNamespaces(config.DirectReadNs) @@ -4627,6 +4693,11 @@ func (ic *indexClient) buildGtmOptions() *gtm.Options { errorLog.Fatalf("Error retrieving direct read state: %s", err) } } + if config.dynamicChangeStreamList() { + config.ChangeStreamNs = ic.buildDynamicChangeStreamNs(nsFilter) + } + + gtmOpts := >m.Options{ After: after, Token: token,