From 84c223b8f0c9569c6c96325c2789ef4b26aaad4b Mon Sep 17 00:00:00 2001 From: Alexandr Shelepin Date: Wed, 2 Sep 2020 14:31:33 +0300 Subject: [PATCH] Update to version v2.12.0 --- .travis.yml | 1 + LICENSE | 2 +- bindings.go | 57 +- bindings/builtin/builtin.go | 43 +- bindings/consts.go | 2 +- bindings/cproto/connection.go | 8 +- bindings/cproto/cproto.go | 4 +- bindings/interface.go | 6 + changelog.md | 26 +- cjson/creflect.go | 33 +- cjson/decoder.go | 65 +- cpp_src/CMakeLists.txt | 7 +- cpp_src/client/namespace.cc | 2 +- cpp_src/client/namespace.h | 2 +- cpp_src/client/reindexer.cc | 5 +- cpp_src/client/reindexer.h | 10 +- cpp_src/client/rpcclient.cc | 57 +- cpp_src/client/rpcclient.h | 4 +- cpp_src/client/transaction.cc | 1 - cpp_src/client/transaction.h | 7 +- cpp_src/cmake/modules/FindGperftools.cmake | 2 +- cpp_src/cmake/modules/FindJemalloc.cmake | 2 +- cpp_src/cmake/modules/FindSnappy.cmake | 2 +- .../reindexer_server/contrib/config.yml.in | 2 + .../cmd/reindexer_server/contrib/sysvinit.in | 2 +- .../reindexer_server/test/mocks/users.json | 2 +- .../test/specs/mixins/helper_mixin.py | 35 +- .../cmd/reindexer_tool/commandsprocessor.cc | 45 +- .../cmd/reindexer_tool/commandsprocessor.h | 5 +- .../cmd/reindexer_tool/contrib/testdb.tar.bz2 | Bin 977235 -> 977234 bytes cpp_src/core/cjson/jschemachecker.cc | 188 ++ cpp_src/core/cjson/jschemachecker.h | 63 + cpp_src/core/dbconfig.cc | 4 + cpp_src/core/dbconfig.h | 1 + cpp_src/core/expressiontree.h | 50 +- cpp_src/core/expressiontree.md | 306 ++++ cpp_src/core/ft/ft_fast/dataprocessor.cc | 2 +- cpp_src/core/ft/numtotext.cc | 2 +- cpp_src/core/ft/numtotext.h | 2 +- cpp_src/core/ft/stopwords/stop.h | 2 +- cpp_src/core/ft/typos.cc | 2 +- cpp_src/core/iclientsstats.cc | 1 + cpp_src/core/iclientsstats.h | 22 +- cpp_src/core/index/indextext/fastindextext.cc | 2 +- cpp_src/core/item.cc | 4 +- cpp_src/core/item.h | 4 +- cpp_src/core/itemimpl.cc | 30 +- cpp_src/core/itemimpl.h | 2 +- cpp_src/core/namespace/namespace.cc | 4 + cpp_src/core/namespace/namespaceimpl.cc | 21 +- cpp_src/core/namespace/namespaceimpl.h | 3 +- cpp_src/core/nsselecter/querypreprocessor.cc | 2 +- .../nsselecter/selectiteratorcontainer.cc | 10 +- .../core/nsselecter/selectiteratorcontainer.h | 4 +- cpp_src/core/payload/payloadtype.h | 2 + cpp_src/core/query/dsl/dslencoder.cc | 6 +- cpp_src/core/query/dsl/dslparser.cc | 84 +- cpp_src/core/query/dsl/query.json.h | 1 + cpp_src/core/query/query.h | 1 - cpp_src/core/query/sql/sqlparser.cc | 3 +- cpp_src/core/reindexer.cc | 5 +- cpp_src/core/reindexer.h | 10 +- cpp_src/core/reindexerimpl.cc | 13 +- cpp_src/core/reindexerimpl.h | 3 +- cpp_src/core/schema.cc | 27 +- cpp_src/core/schema.h | 12 +- cpp_src/core/transaction.cc | 4 +- cpp_src/core/transaction.h | 4 +- cpp_src/core/type_consts.h | 17 + cpp_src/estl/span.h | 2 +- cpp_src/estl/string_view.h | 2 +- cpp_src/estl/trivial_reverse_iterator.h | 2 +- cpp_src/gtests/bench/dict.txt | 2 +- cpp_src/gtests/tests/API/base_tests.cc | 316 ++-- .../gtests/tests/fixtures/clientsstats_api.cc | 20 + .../gtests/tests/fixtures/clientsstats_api.h | 1 + .../gtests/tests/fixtures/replication_api.cc | 2 + .../gtests/tests/fixtures/replication_api.h | 3 + .../tests/fixtures/replication_load_api.h | 38 + .../gtests/tests/fixtures/servercontrol.cc | 24 + cpp_src/gtests/tests/fixtures/servercontrol.h | 2 + .../gtests/tests/unit/clientsstats_test.cc | 76 +- .../gtests/tests/unit/equalposition_tests.cc | 58 + .../unit/replication_master_master_test.cc | 45 +- cpp_src/gtests/tests/unit/replication_test.cc | 333 ++++ .../gtests/tests/unit/schema_check_test.cc | 244 +++ cpp_src/net/cproto/serverconnection.cc | 2 +- cpp_src/net/http/router.h | 2 +- cpp_src/replicator/replicator.cc | 56 +- cpp_src/replicator/updatesobserver.cc | 162 +- cpp_src/replicator/updatesobserver.h | 59 +- cpp_src/replicator/walrecord.cc | 4 +- cpp_src/replicator/walselecter.cc | 2 +- cpp_src/replicator/waltracker.cc | 76 +- cpp_src/replicator/waltracker.h | 40 +- cpp_src/server/clientsstats.cc | 20 +- cpp_src/server/clientsstats.h | 18 +- cpp_src/server/config.cc | 5 + cpp_src/server/config.h | 1 + cpp_src/server/contrib/CMakeLists.txt | 45 + cpp_src/server/contrib/dslshemagenerator.py | 134 ++ cpp_src/server/contrib/server.md | 342 +++- cpp_src/server/contrib/server.yml | 1506 +++++++++++------ cpp_src/server/dbmanager.cc | 18 +- cpp_src/server/dbmanager.h | 2 +- cpp_src/server/httpserver.cc | 355 +++- cpp_src/server/httpserver.h | 57 +- cpp_src/server/rpcserver.cc | 83 +- cpp_src/server/rpcserver.h | 9 +- cpp_src/server/serverimpl.cc | 3 +- cpp_src/tools/stringstools.cc | 2 +- cpp_src/vendor/atoi/atoi.h | 2 +- cpp_src/vendor/base64/base64.c | 2 +- dependencies.sh | 33 +- describer.go | 2 + iterator.go | 2 +- query.go | 2 +- readme.md | 20 +- reindexer.go | 15 +- reindexer_impl.go | 97 +- replication.md | 10 + test/cache_items_test.go | 87 + test/config_test.go | 1 + test/ft/dict.txt | 2 +- .../full_text_search_basic_test_data.json | 2 +- .../full_text_search_ranking_test_data.json | 2 +- ...ch_ranking_test_data_for_future_needs.json | 2 +- test/multi_dsn_test.go | 59 + test/tx_test.go | 11 +- tx.go | 6 +- 130 files changed, 4695 insertions(+), 1197 deletions(-) create mode 100644 cpp_src/core/cjson/jschemachecker.cc create mode 100644 cpp_src/core/cjson/jschemachecker.h create mode 100644 cpp_src/core/expressiontree.md create mode 100644 cpp_src/core/query/dsl/query.json.h create mode 100644 cpp_src/gtests/tests/unit/schema_check_test.cc create mode 100644 cpp_src/server/contrib/CMakeLists.txt create mode 100644 cpp_src/server/contrib/dslshemagenerator.py create mode 100644 test/cache_items_test.go diff --git a/.travis.yml b/.travis.yml index dc716586f..5aa7604c9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -40,6 +40,7 @@ git: - go get github.com/golang/snappy - go get github.com/stretchr/testify/assert - go get github.com/iancoleman/orderedmap + - go get github.com/hashicorp/golang-lru script: - cd ${BUILD_DIR} && cmake ${CMAKE_OPTS} ${TRAVIS_BUILD_DIR} && make -j4 - ctest --verbose diff --git a/LICENSE b/LICENSE index 415ca0259..0d03d7763 100644 --- a/LICENSE +++ b/LICENSE @@ -199,4 +199,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. + limitations under the License. \ No newline at end of file diff --git a/bindings.go b/bindings.go index 3ec3eff64..5f1e92c42 100644 --- a/bindings.go +++ b/bindings.go @@ -65,9 +65,7 @@ func (db *reindexerImpl) modifyItem(ctx context.Context, namespace string, ns *r resultp := rdSer.readRawtItemParams() - ns.cacheLock.Lock() - delete(ns.cacheItems, resultp.id) - ns.cacheLock.Unlock() + ns.cacheItems.Remove(resultp.id) if len(precepts) > 0 && (resultp.cptr != 0 || resultp.data != nil) && reflect.TypeOf(item).Kind() == reflect.Ptr { nsArrEntry := nsArrayEntry{ns, ns.cjsonState.Copy()} @@ -139,46 +137,36 @@ func (db *reindexerImpl) getMeta(ctx context.Context, namespace, key string) ([] func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool, nonCacheableData bool, item interface{}) (interface{}, error) { useCache := item == nil && (ns.deepCopyIface || allowUnsafe) && !nonCacheableData - hasCache := false needCopy := ns.deepCopyIface && !allowUnsafe var err error - if useCache { - ns.cacheLock.RLock() - hasCache = ns.cacheItems != nil - if !hasCache { - ns.cacheLock.RUnlock() - } - } - if useCache && hasCache { - if citem, ok := ns.cacheItems[params.id]; ok && citem.version == params.version { + if useCache && ns.cacheItems != nil { + if citem, ok := ns.cacheItems.Get(params.id); ok && citem.version == params.version { item = citem.item - ns.cacheLock.RUnlock() } else { - ns.cacheLock.RUnlock() item = reflect.New(ns.rtype).Interface() dec := ns.localCjsonState.NewDecoder(item, logger) + var dataSize uint64 if params.cptr != 0 { - err = dec.DecodeCPtr(params.cptr, item) + dataSize, err = dec.DecodeCPtr(params.cptr, item) } else if params.data != nil { - err = dec.Decode(params.data, item) + dataSize, err = dec.Decode(params.data, item) } else { panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name)) } if err != nil { return item, err } - ns.cacheLock.Lock() - if citem, ok := ns.cacheItems[params.id]; ok { + + if citem, ok := ns.cacheItems.Get(params.id); ok { if citem.version == params.version { item = citem.item } else if citem.version < params.version { - ns.cacheItems[params.id] = cacheItem{item: item, version: params.version} + ns.cacheItems.Add(params.id, &cacheItem{item: item, version: params.version, size: dataSize}) } } else { - ns.cacheItems[params.id] = cacheItem{item: item, version: params.version} + ns.cacheItems.Add(params.id, &cacheItem{item: item, version: params.version, size: dataSize}) } - ns.cacheLock.Unlock() } } else { if item == nil { @@ -186,9 +174,9 @@ func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool, } dec := ns.localCjsonState.NewDecoder(item, logger) if params.cptr != 0 { - err = dec.DecodeCPtr(params.cptr, item) + _, err = dec.DecodeCPtr(params.cptr, item) } else if params.data != nil { - err = dec.Decode(params.data, item) + _, err = dec.Decode(params.data, item) } else { panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name)) } @@ -389,19 +377,14 @@ func (db *reindexerImpl) deleteQuery(ctx context.Context, q *Query) (int, error) // skip total count rawQueryParams := ser.readRawQueryParams() - ns.cacheLock.Lock() for i := 0; i < rawQueryParams.count; i++ { params := ser.readRawtItemParams() if (rawQueryParams.flags&bindings.ResultsWithJoined) != 0 && ser.GetVarUInt() != 0 { panic("Internal error: joined items in delete query result") } // Update cache - if _, ok := ns.cacheItems[params.id]; ok { - delete(ns.cacheItems, params.id) - } - + ns.cacheItems.Remove(params.id) } - ns.cacheLock.Unlock() if !ser.Eof() { panic("Internal error: data after end of delete query result") } @@ -428,19 +411,15 @@ func (db *reindexerImpl) updateQuery(ctx context.Context, q *Query) *Iterator { ns.cjsonState.ReadPayloadType(&ser.Serializer) }) - ns.cacheLock.Lock() for i := 0; i < rawQueryParams.count; i++ { params := ser.readRawtItemParams() if (rawQueryParams.flags&bindings.ResultsWithJoined) != 0 && ser.GetVarUInt() != 0 { panic("Internal error: joined items in update query result") } // Update cache - if _, ok := ns.cacheItems[params.id]; ok { - delete(ns.cacheItems, params.id) - } - + ns.cacheItems.Remove(params.id) } - ns.cacheLock.Unlock() + if !ser.Eof() { panic("Internal error: data after end of update query result") } @@ -473,11 +452,7 @@ func (db *reindexerImpl) resetCachesCtx(ctx context.Context) { } db.lock.RUnlock() for _, ns := range nsArray { - ns.cacheLock.Lock() - if ns.cacheItems != nil { - ns.cacheItems = make(map[int]cacheItem) - } - ns.cacheLock.Unlock() + ns.cacheItems.Reset(ns.opts.objCacheSize) ns.cjsonState.Reset() db.query(ns.name).Limit(0).ExecCtx(ctx).Close() } diff --git a/bindings/builtin/builtin.go b/bindings/builtin/builtin.go index a5ad4c88e..913e9c019 100644 --- a/bindings/builtin/builtin.go +++ b/bindings/builtin/builtin.go @@ -197,6 +197,13 @@ func (binding *Builtin) Init(u []url.URL, options ...interface{}) error { return err2go(C.reindexer_connect(binding.rx, str2c(u[0].Path), opts, str2c(bindings.ReindexerVersion))) } +func (binding *Builtin) StartWatchOnCtx(ctx context.Context) (CCtxWrapper, error) { + if binding.ctxWatcher != nil { + return binding.ctxWatcher.StartWatchOnCtx(ctx) + } + return CCtxWrapper{}, bindings.NewError("rq: builtin binding is not initialized properly", bindings.ErrNotValid) +} + func (binding *Builtin) Clone() bindings.RawBinding { return &Builtin{} } @@ -238,7 +245,7 @@ func (binding *Builtin) ModifyItem(ctx context.Context, nsHash int, namespace st } packedArgs := ser1.Bytes() - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return nil, err } @@ -299,7 +306,7 @@ func (binding *Builtin) OpenNamespace(ctx context.Context, namespace string, ena options: C.uint16_t(storageOptions), } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -308,7 +315,7 @@ func (binding *Builtin) OpenNamespace(ctx context.Context, namespace string, ena return err2go(C.reindexer_open_namespace(binding.rx, str2c(namespace), opts, ctxInfo.cCtx)) } func (binding *Builtin) CloseNamespace(ctx context.Context, namespace string) error { - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -318,7 +325,7 @@ func (binding *Builtin) CloseNamespace(ctx context.Context, namespace string) er } func (binding *Builtin) DropNamespace(ctx context.Context, namespace string) error { - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -328,7 +335,7 @@ func (binding *Builtin) DropNamespace(ctx context.Context, namespace string) err } func (binding *Builtin) TruncateNamespace(ctx context.Context, namespace string) error { - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -338,7 +345,7 @@ func (binding *Builtin) TruncateNamespace(ctx context.Context, namespace string) } func (binding *Builtin) RenameNamespace(ctx context.Context, srcNs string, dstNs string) error { - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -353,7 +360,7 @@ func (binding *Builtin) EnableStorage(ctx context.Context, path string) error { path += "/" } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -368,7 +375,7 @@ func (binding *Builtin) AddIndex(ctx context.Context, namespace string, indexDef return err } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -384,7 +391,7 @@ func (binding *Builtin) SetSchema(ctx context.Context, namespace string, schema return err } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -401,7 +408,7 @@ func (binding *Builtin) UpdateIndex(ctx context.Context, namespace string, index return err } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -412,7 +419,7 @@ func (binding *Builtin) UpdateIndex(ctx context.Context, namespace string, index } func (binding *Builtin) DropIndex(ctx context.Context, namespace, index string) error { - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -422,7 +429,7 @@ func (binding *Builtin) DropIndex(ctx context.Context, namespace, index string) } func (binding *Builtin) PutMeta(ctx context.Context, namespace, key, data string) error { - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return err } @@ -432,7 +439,7 @@ func (binding *Builtin) PutMeta(ctx context.Context, namespace, key, data string } func (binding *Builtin) GetMeta(ctx context.Context, namespace, key string) (bindings.RawBuffer, error) { - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return nil, err } @@ -448,7 +455,7 @@ func (binding *Builtin) Select(ctx context.Context, query string, asJson bool, p defer func() { <-binding.cgoLimiter }() } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return nil, err } @@ -479,7 +486,7 @@ func (binding *Builtin) CommitTx(txCtx *bindings.TxCtx) (bindings.RawBuffer, err defer func() { <-binding.cgoLimiter }() } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(txCtx.UserCtx) + ctxInfo, err := binding.StartWatchOnCtx(txCtx.UserCtx) if err != nil { return nil, err } @@ -502,7 +509,7 @@ func (binding *Builtin) SelectQuery(ctx context.Context, data []byte, asJson boo defer func() { <-binding.cgoLimiter }() } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return nil, err } @@ -518,7 +525,7 @@ func (binding *Builtin) DeleteQuery(ctx context.Context, nsHash int, data []byte defer func() { <-binding.cgoLimiter }() } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return nil, err } @@ -534,7 +541,7 @@ func (binding *Builtin) UpdateQuery(ctx context.Context, nsHash int, data []byte defer func() { <-binding.cgoLimiter }() } - ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx) + ctxInfo, err := binding.StartWatchOnCtx(ctx) if err != nil { return nil, err } diff --git a/bindings/consts.go b/bindings/consts.go index 68390c707..d76c242f1 100644 --- a/bindings/consts.go +++ b/bindings/consts.go @@ -2,7 +2,7 @@ package bindings const CInt32Max = int(^uint32(0) >> 1) -const ReindexerVersion = "v2.11.1" +const ReindexerVersion = "v2.12.0" // public go consts from type_consts.h and reindexer_ctypes.h const ( diff --git a/bindings/cproto/connection.go b/bindings/cproto/connection.go index 7189ac599..a0a8889fe 100644 --- a/bindings/cproto/connection.go +++ b/bindings/cproto/connection.go @@ -103,9 +103,8 @@ type connection struct { now uint32 termCh chan struct{} - requests [queueSize]requestInfo - enableSnappy int32 - isServerChanged bool + requests [queueSize]requestInfo + enableSnappy int32 } func newConnection(ctx context.Context, owner *NetCProto) (c *connection, err error) { @@ -178,7 +177,6 @@ func (c *connection) deadlineTicker() { atomic.StoreInt32(&c.requests[i].isAsync, 0) c.requests[i].cmplLock.Unlock() c.seqs <- nextSeqNum(seqNum) - fmt.Println("Canceling on deadline: ", deadline, ", id: ", seqNum) cmpl(nil, context.DeadlineExceeded) } else { c.requests[i].cmplLock.Unlock() @@ -224,7 +222,7 @@ func (c *connection) login(ctx context.Context, owner *NetCProto) (err error) { serverStartTS := buf.args[1].(int64) old := atomic.SwapInt64(&owner.serverStartTime, serverStartTS) if old != 0 && old != serverStartTS { - c.isServerChanged = true + atomic.StoreInt32(&c.owner.isServerChanged, 1) } } return diff --git a/bindings/cproto/cproto.go b/bindings/cproto/cproto.go index bf10432ed..56c78a128 100644 --- a/bindings/cproto/cproto.go +++ b/bindings/cproto/cproto.go @@ -39,6 +39,7 @@ type Logger interface { type NetCProto struct { dsn dsn pool pool + isServerChanged int32 onChangeCallback func() serverStartTime int64 retryAttempts bindings.OptionRetryAttempts @@ -456,7 +457,8 @@ func (binding *NetCProto) getConn(ctx context.Context) (conn *connection, err er if err != nil { return nil, err } - if conn.isServerChanged { + + if atomic.CompareAndSwapInt32(&binding.isServerChanged, 1, 0) { binding.onChangeCallback() } return conn, nil diff --git a/bindings/interface.go b/bindings/interface.go index c5d44a78a..0dacec11a 100644 --- a/bindings/interface.go +++ b/bindings/interface.go @@ -279,6 +279,12 @@ type Status struct { Err error CProto StatusCProto Builtin StatusBuiltin + Cache StatusCache +} + +type StatusCache struct { + CurSize int64 + MaxSize int64 } type StatusCProto struct { diff --git a/changelog.md b/changelog.md index edcad49f0..4cbc22876 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,28 @@ +# Version 2.12.0 (02.09.2020) + +## Core +- [fea] WAL size settings via #config namespace +- [fea] WAL updates filtering +- [fea] Return error for queries with EqualPosition to fields with IS NULL condition + +# Reindexer server +- [fea] Transactions HTTP API +- [fea] Add RPC-transactions limit for each connection +- [fea] More advanced JSON DSL validation for queries + +# Go connector +- [fea] Items cache size limits setting +- [fix] Fix SEGFAULT on unauthorized builtin server + +# Version 2.11.2 (25.08.2020) + +# Core +- [fix] Fulltext snippet location in text +- [fix] Unneeded force replication fixed + +# Go connector +- [fix] Reset state on reconnect to different server + # Version 2.11.1 (14.08.2020) # Core @@ -5,7 +30,6 @@ - [fix] Fixed behavior of SQL UPDATE statement with true/false/null values - [fix] Do not reset expire_after value of ttl indexes after copying tx - [fix] Error loading of system records (indexes/tags/replState) after copying tx -- [fea] Return error for queries with EqualPosition to fields with IS NULL condition - [fix] To config added default value of `optimization_sort_workers` - [fix] Windows specific error with IN () condition executed by comparator diff --git a/cjson/creflect.go b/cjson/creflect.go index 5d074896b..f9325f42f 100644 --- a/cjson/creflect.go +++ b/cjson/creflect.go @@ -149,7 +149,7 @@ func (pl *payloadIface) getArrayLen(field int) int { } // get c reflect value and set to go reflect valie -func (pl *payloadIface) getValue(field int, idx int, v reflect.Value) { +func (pl *payloadIface) getValue(field int, idx int, v reflect.Value, dataSize *uint64) { k := v.Type().Kind() switch pl.t.Fields[field].Type { @@ -180,9 +180,15 @@ func (pl *payloadIface) getValue(field int, idx int, v reflect.Value) { default: panic(fmt.Errorf("Unknown key value type %d", pl.t.Fields[field].Type)) } + + if k == reflect.String { + *dataSize += uint64(v.Type().Size()) + uint64(v.Len()) + } else { + *dataSize += uint64(v.Type().Size()) + } } -func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Value) { +func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Value, dataSize *uint64) { if cnt == 0 { return @@ -201,46 +207,55 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val for i := 0; i < cnt; i++ { (*a)[i] = int(pi[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]uint: *a = make([]uint, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = uint(pu[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]int16: *a = make([]int16, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = int16(pi[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]uint16: *a = make([]uint16, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = uint16(pu[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]int32: *a = make([]int32, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = int32(pi[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]uint32: *a = make([]uint32, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = uint32(pu[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]int8: *a = make([]int8, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = int8(pi[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]uint8: *a = make([]uint8, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = uint8(pu[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]bool: *a = make([]bool, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = bool(pi[i] != 0) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) default: slice := reflect.MakeSlice(v.Type(), cnt, cnt) switch v.Type().Elem().Kind() { @@ -255,6 +270,7 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val sv.SetUint(uint64(pu[i])) } } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: for i := 0; i < cnt; i++ { sv := slice.Index(i) @@ -266,6 +282,7 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val sv.SetInt(int64(pi[i])) } } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) default: panic(fmt.Errorf("Can't set []int to []%s", v.Type().Elem().Kind().String())) } @@ -277,22 +294,26 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val pi := (*[1 << 27]int64)(ptr)[:l:l] *a = make([]int64, cnt, cnt) copy(*a, pi) + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]uint64: pi := (*[1 << 27]uint64)(ptr)[:l:l] *a = make([]uint64, cnt, cnt) copy(*a, pi) + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]int: pi := (*[1 << 27]int64)(ptr)[:l:l] *a = make([]int, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = int(pi[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]uint: pi := (*[1 << 27]uint64)(ptr)[:l:l] *a = make([]uint, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = uint(pi[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) default: slice := reflect.MakeSlice(v.Type(), cnt, cnt) switch v.Type().Elem().Kind() { @@ -308,6 +329,7 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val sv.SetUint(uint64(pi[i])) } } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: pi := (*[1 << 27]int64)(ptr)[:l:l] for i := 0; i < cnt; i++ { @@ -320,6 +342,7 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val sv.SetInt(int64(pi[i])) } } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) default: panic(fmt.Errorf("Can't set []int64 to []%s", v.Type().Elem().Kind().String())) } @@ -333,11 +356,13 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val for i := 0; i < cnt; i++ { (*a)[i] = float64(pi[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case *[]float32: *a = make([]float32, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = float32(pi[i]) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) default: slice := reflect.MakeSlice(v.Type(), cnt, cnt) for i := 0; i < cnt; i++ { @@ -351,6 +376,7 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val } } v.Set(slice) + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) } case valueBool: pb := (*[1 << 27]Cbool)(ptr)[:l:l] @@ -374,11 +400,13 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val } v.Set(slice) } + *dataSize += uint64(cnt) * uint64(v.Type().Elem().Size()) case valueString: if a, ok := v.Addr().Interface().(*[]string); ok { *a = make([]string, cnt, cnt) for i := 0; i < cnt; i++ { (*a)[i] = pl.getString(field, i+startIdx) + *dataSize += uint64(len((*a)[i])) } } else { slice := reflect.MakeSlice(v.Type(), cnt, cnt) @@ -392,6 +420,7 @@ func (pl *payloadIface) getArray(field int, startIdx int, cnt int, v reflect.Val } else { sv.SetString(s) } + *dataSize += uint64(len(s)) } v.Set(slice) } diff --git a/cjson/decoder.go b/cjson/decoder.go index 346ba003a..fe14be5c3 100644 --- a/cjson/decoder.go +++ b/cjson/decoder.go @@ -232,7 +232,7 @@ func mkValue(ctagType int) (v reflect.Value) { return v } -func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect.Value, fieldsoutcnt []int, cctagsPath []int) { +func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect.Value, fieldsoutcnt []int, cctagsPath []int, dataSize *uint64) { atag := carraytag(rdser.GetUInt32()) count := atag.Count() subtag := atag.Tag() @@ -273,12 +273,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = int(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*int)(ptr)[:count:count] for i := 0; i < count; i++ { u := int(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Uint: if !isPtr { @@ -286,12 +288,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = uint(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*uint)(ptr)[:count:count] for i := 0; i < count; i++ { u := uint(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Int64: if !isPtr { @@ -299,12 +303,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = int64(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*int64)(ptr)[:count:count] for i := 0; i < count; i++ { u := int64(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Uint64: if !isPtr { @@ -312,12 +318,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = uint64(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*uint64)(ptr)[:count:count] for i := 0; i < count; i++ { u := uint64(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Int32: if !isPtr { @@ -325,12 +333,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = int32(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*int32)(ptr)[:count:count] for i := 0; i < count; i++ { u := int32(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Uint32: if !isPtr { @@ -338,12 +348,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = uint32(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*uint32)(ptr)[:count:count] for i := 0; i < count; i++ { u := uint32(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Int16: if !isPtr { @@ -351,12 +363,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = int16(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*int16)(ptr)[:count:count] for i := 0; i < count; i++ { u := int16(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Uint16: if !isPtr { @@ -364,12 +378,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = uint16(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*uint16)(ptr)[:count:count] for i := 0; i < count; i++ { u := uint16(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Int8: if !isPtr { @@ -377,12 +393,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = int8(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*int8)(ptr)[:count:count] for i := 0; i < count; i++ { u := int8(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Uint8: if !isPtr { @@ -390,12 +408,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = uint8(asInt(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*uint8)(ptr)[:count:count] for i := 0; i < count; i++ { u := uint8(asInt(rdser, subtag)) sl[i] = &u } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Float32: if !isPtr { @@ -403,12 +423,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = float32(asFloat(rdser, subtag)) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*float32)(ptr)[:count:count] for i := 0; i < count; i++ { f := float32(asFloat(rdser, subtag)) sl[i] = &f } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Float64: if !isPtr { @@ -416,12 +438,14 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = asFloat(rdser, subtag) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*float64)(ptr)[:count:count] for i := 0; i < count; i++ { f := asFloat(rdser, subtag) sl[i] = &f } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.Bool: if !isPtr { @@ -429,24 +453,28 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = rdser.GetVarUInt() != 0 } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) } else { sl := (*[1 << 28]*bool)(ptr)[:count:count] for i := 0; i < count; i++ { b := rdser.GetVarUInt() != 0 sl[i] = &b } + *dataSize += uint64(count) * uint64(v.Type().Elem().Elem().Size()) } case reflect.String: if !isPtr { sl := (*[1 << 27]string)(ptr)[:count:count] for i := 0; i < count; i++ { sl[i] = asString(rdser, subtag) + *dataSize += uint64(len(sl[i])) } } else { sl := (*[1 << 28]*string)(ptr)[:count:count] for i := 0; i < count; i++ { s := asString(rdser, subtag) sl[i] = &s + *dataSize += uint64(len(s)) } } case reflect.Interface: @@ -454,12 +482,13 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. for i := 0; i < count; i++ { sl[i] = asIface(rdser, subtag) } + *dataSize += uint64(count) * uint64(v.Type().Elem().Size()) default: panic(fmt.Errorf("Internal error - can't decode array of type %s", v.Type().Elem().Kind().String())) } } else { for i := 0; i < count; i++ { - dec.decodeValue(pl, rdser, v.Index(i), fieldsoutcnt, cctagsPath) + dec.decodeValue(pl, rdser, v.Index(i), fieldsoutcnt, cctagsPath, dataSize) } } if k == reflect.Interface { @@ -468,7 +497,7 @@ func (dec *Decoder) decodeSlice(pl *payloadIface, rdser *Serializer, v *reflect. } } -func (dec *Decoder) decodeValue(pl *payloadIface, rdser *Serializer, v reflect.Value, fieldsoutcnt []int, cctagsPath []int) bool { +func (dec *Decoder) decodeValue(pl *payloadIface, rdser *Serializer, v reflect.Value, fieldsoutcnt []int, cctagsPath []int, dataSize *uint64) bool { ctag := ctag(rdser.GetVarUInt()) ctagType := ctag.Type() @@ -562,36 +591,45 @@ func (dec *Decoder) decodeValue(pl *payloadIface, rdser *Serializer, v reflect.V switch ctagType { case TAG_ARRAY: count := int(rdser.GetVarUInt()) - pl.getArray(ctagField, *cnt, count, v) + pl.getArray(ctagField, *cnt, count, v, dataSize) *cnt += count + *dataSize += uint64(v.Type().Size()) default: - pl.getValue(ctagField, *cnt, v) + pl.getValue(ctagField, *cnt, v, dataSize) (*cnt)++ } } else { // get data from serialized tuple switch ctagType { case TAG_ARRAY: - dec.decodeSlice(pl, rdser, &v, fieldsoutcnt, cctagsPath) + dec.decodeSlice(pl, rdser, &v, fieldsoutcnt, cctagsPath, dataSize) + *dataSize += uint64(v.Type().Size()) case TAG_OBJECT: - for dec.decodeValue(pl, rdser, v, fieldsoutcnt, cctagsPath) { + for dec.decodeValue(pl, rdser, v, fieldsoutcnt, cctagsPath, dataSize) { + } + if k == reflect.Map { + *dataSize += uint64(v.Type().Size()) } case TAG_STRING: str := rdser.GetVString() switch { case k == reflect.String: v.SetString(str) + *dataSize += uint64(v.Len()) + uint64(v.Type().Size()) case k == reflect.Slice, k == reflect.Array: b, e := base64.StdEncoding.DecodeString(str) if e != nil { panic(fmt.Errorf("Can't base64 decode %s", str)) } v.SetBytes(b) + *dataSize += uint64(len(b)) case k == reflect.Interface: v.Set(reflect.ValueOf(str)) + *dataSize += uint64(len(str)) case k == reflect.Struct && v.Type().String() == "time.Time": tm, _ := time.Parse(time.RFC3339Nano, str) v.Set(reflect.ValueOf(tm)) + *dataSize += uint64(v.Type().Size()) default: panic(fmt.Errorf("Can't set string to %s", v.Type().Kind().String())) } @@ -608,6 +646,7 @@ func (dec *Decoder) decodeValue(pl *payloadIface, rdser *Serializer, v reflect.V case reflect.Bool: v.SetBool(asInt(rdser, ctagType) != 0) } + *dataSize += uint64(v.Type().Size()) } } @@ -633,7 +672,7 @@ func (dec *Decoder) decodeValue(pl *payloadIface, rdser *Serializer, v reflect.V return true } -func (dec *Decoder) DecodeCPtr(cptr uintptr, dest interface{}) (err error) { +func (dec *Decoder) DecodeCPtr(cptr uintptr, dest interface{}) (dataSize uint64, err error) { pl := &payloadIface{p: cptr, t: &dec.state.payloadType} @@ -666,14 +705,14 @@ func (dec *Decoder) DecodeCPtr(cptr uintptr, dest interface{}) (err error) { fieldsoutcnt := make([]int, 64, 64) ctagsPath := make([]int, 0, 8) - dec.decodeValue(pl, ser, reflect.ValueOf(dest), fieldsoutcnt, ctagsPath) + dec.decodeValue(pl, ser, reflect.ValueOf(dest), fieldsoutcnt, ctagsPath, &dataSize) if !ser.Eof() { panic(fmt.Errorf("Internal error - left unparsed data")) } - return err + return dataSize, err } -func (dec *Decoder) Decode(cjson []byte, dest interface{}) (err error) { +func (dec *Decoder) Decode(cjson []byte, dest interface{}) (dataSize uint64, err error) { dec.state.lock.RLock() defer dec.state.lock.RUnlock() @@ -701,9 +740,9 @@ func (dec *Decoder) Decode(cjson []byte, dest interface{}) (err error) { fieldsoutcnt := make([]int, 64, 64) ctagsPath := make([]int, 0, 8) - dec.decodeValue(nil, ser, reflect.ValueOf(dest), fieldsoutcnt, ctagsPath) + dec.decodeValue(nil, ser, reflect.ValueOf(dest), fieldsoutcnt, ctagsPath, &dataSize) // if !ser.Eof() { // panic(fmt.Errorf("Internal error - left unparsed data")) // } - return err + return dataSize, err } diff --git a/cpp_src/CMakeLists.txt b/cpp_src/CMakeLists.txt index 96e462846..94ea1b830 100644 --- a/cpp_src/CMakeLists.txt +++ b/cpp_src/CMakeLists.txt @@ -6,6 +6,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) include(CMakeToolsHelpers OPTIONAL) include(ExternalProject) include(ProcessorCount) + set(CMAKE_DISABLE_IN_SOURCE_BUILD ON) option (WITH_ASAN "Enable AddressSanitized build" OFF) @@ -21,7 +22,7 @@ else() option (LINK_RESOURCES "Link web resources as binary data" ON) endif() -set (REINDEXER_VERSION_DEFAULT "2.11.1") +set (REINDEXER_VERSION_DEFAULT "2.12.0") if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE "RelWithDebInfo") @@ -98,8 +99,12 @@ list(APPEND REINDEXER_LIBRARIES reindexer) add_library(${TARGET} STATIC ${HDRS} ${SRCS} ${VENDORS}) add_definitions(-DREINDEX_CORE_BUILD=1) + + # add_definitions(-DREINDEX_FT_EXTRA_DEBUG=1) +add_subdirectory(server/contrib) + ## Dependencies # tcmalloc diff --git a/cpp_src/client/namespace.cc b/cpp_src/client/namespace.cc index c48c5d5d4..ef7c9d49b 100644 --- a/cpp_src/client/namespace.cc +++ b/cpp_src/client/namespace.cc @@ -14,4 +14,4 @@ Item Namespace::NewItem() { } } // namespace client -} // namespace reindexer +} // namespace reindexer \ No newline at end of file diff --git a/cpp_src/client/namespace.h b/cpp_src/client/namespace.h index 91cc73ac2..e5ccd060a 100644 --- a/cpp_src/client/namespace.h +++ b/cpp_src/client/namespace.h @@ -25,4 +25,4 @@ class Namespace { }; } // namespace client -} // namespace reindexer +} // namespace reindexer \ No newline at end of file diff --git a/cpp_src/client/reindexer.cc b/cpp_src/client/reindexer.cc index 7703935b0..0225767be 100644 --- a/cpp_src/client/reindexer.cc +++ b/cpp_src/client/reindexer.cc @@ -52,7 +52,10 @@ Error Reindexer::DropIndex(string_view nsName, const IndexDef& index) { return i Error Reindexer::SetSchema(string_view nsName, string_view schema) { return impl_->SetSchema(nsName, schema, ctx_); } Error Reindexer::EnumNamespaces(vector& defs, EnumNamespacesOpts opts) { return impl_->EnumNamespaces(defs, opts, ctx_); } Error Reindexer::EnumDatabases(vector& dbList) { return impl_->EnumDatabases(dbList, ctx_); } -Error Reindexer::SubscribeUpdates(IUpdatesObserver* observer, bool subscribe) { return impl_->SubscribeUpdates(observer, subscribe); } +Error Reindexer::SubscribeUpdates(IUpdatesObserver* observer, const UpdatesFilters& filters, SubscriptionOpts opts) { + return impl_->SubscribeUpdates(observer, filters, opts); +} +Error Reindexer::UnsubscribeUpdates(IUpdatesObserver* observer) { return impl_->UnsubscribeUpdates(observer); } Error Reindexer::GetSqlSuggestions(const string_view sqlQuery, int pos, vector& suggests) { return impl_->GetSqlSuggestions(sqlQuery, pos, suggests); } diff --git a/cpp_src/client/reindexer.h b/cpp_src/client/reindexer.h index ca87dc181..3859eb85f 100644 --- a/cpp_src/client/reindexer.h +++ b/cpp_src/client/reindexer.h @@ -12,6 +12,7 @@ namespace reindexer { class IUpdatesObserver; +class UpdatesFilters; namespace client { using std::vector; @@ -159,8 +160,13 @@ class Reindexer { Error EnumMeta(string_view nsName, vector &keys); /// Subscribe to updates of database /// @param observer - Observer interface, which will receive updates - /// @param subscribe - true: subscribe, false: unsubscribe - Error SubscribeUpdates(IUpdatesObserver *observer, bool subscribe); + /// @param filters - Subscription filters set + /// @param opts - Subscription options (allows to either add new filters or reset them) + Error SubscribeUpdates(IUpdatesObserver *observer, const UpdatesFilters &filters, SubscriptionOpts opts = SubscriptionOpts()); + /// Unsubscribe from updates of database + /// Cancelation context doesn't affect this call + /// @param observer - Observer interface, which will be unsubscribed updates + Error UnsubscribeUpdates(IUpdatesObserver *observer); /// Get possible suggestions for token (set by 'pos') in Sql query. /// @param sqlQuery - sql query. /// @param pos - position in sql query for suggestions. diff --git a/cpp_src/client/rpcclient.cc b/cpp_src/client/rpcclient.cc index 5b2ff14a2..b7db3e114 100644 --- a/cpp_src/client/rpcclient.cc +++ b/cpp_src/client/rpcclient.cc @@ -311,6 +311,30 @@ Error RPCClient::modifyItemAsync(string_view nsName, Item* item, int mode, cprot return errOK; } +Error RPCClient::subscribeImpl(bool subscribe) { + Error err; + auto updatesConn = updatesConn_.load(); + if (subscribe) { + UpdatesFilters filter = observers_.GetMergedFilter(); + WrSerializer ser; + filter.GetJSON(ser); + if (updatesConn) { + err = updatesConn->Call({cproto::kCmdSubscribeUpdates, config_.RequestTimeout, milliseconds(0)}, 1, ser.Slice()).Status(); + } else { + auto conn = getConn(); + err = conn->Call({cproto::kCmdSubscribeUpdates, config_.RequestTimeout, milliseconds(0)}, 1, ser.Slice()).Status(); + if (err.ok()) { + updatesConn_ = conn; + } + conn->SetUpdatesHandler([this](RPCAnswer&& ans, cproto::ClientConnection* conn) { onUpdates(ans, conn); }); + } + } else if (updatesConn) { + err = updatesConn->Call({cproto::kCmdSubscribeUpdates, config_.RequestTimeout, milliseconds(0)}, 0).Status(); + updatesConn_ = nullptr; + } + return err; +} + Item RPCClient::NewItem(string_view nsName) { try { auto ns = getNamespace(nsName); @@ -574,27 +598,14 @@ Error RPCClient::EnumDatabases(vector& dbList, const InternalRdxContext& } } -Error RPCClient::SubscribeUpdates(IUpdatesObserver* observer, bool subscribe) { - if (subscribe) { - observers_.Add(observer); - } else { - observers_.Delete(observer); - } - subscribe = !observers_.empty(); - Error err; - auto updatesConn = updatesConn_.load(); - if (subscribe && !updatesConn) { - auto conn = getConn(); - err = conn->Call({cproto::kCmdSubscribeUpdates, config_.RequestTimeout, milliseconds(0)}, 1).Status(); - if (err.ok()) { - updatesConn_ = conn; - } - conn->SetUpdatesHandler([this](RPCAnswer&& ans, cproto::ClientConnection* conn) { onUpdates(ans, conn); }); - } else if (!subscribe && updatesConn) { - err = updatesConn->Call({cproto::kCmdSubscribeUpdates, config_.RequestTimeout, milliseconds(0)}, 0).Status(); - updatesConn_ = nullptr; - } - return err; +Error RPCClient::SubscribeUpdates(IUpdatesObserver* observer, const UpdatesFilters& filters, SubscriptionOpts opts) { + observers_.Add(observer, filters, opts); + return subscribeImpl(true); +} + +Error RPCClient::UnsubscribeUpdates(IUpdatesObserver* observer) { + observers_.Delete(observer); + return subscribeImpl(!observers_.empty()); } Error RPCClient::GetSqlSuggestions(string_view query, int pos, std::vector& suggests) { @@ -758,12 +769,12 @@ Transaction RPCClient::NewTransaction(string_view nsName, const InternalRdxConte net::cproto::ClientConnection* conn = getConn(); auto res = conn->Call({cproto::kCmdStartTransaction, config_.RequestTimeout, ctx.execTimeout()}, nsName); auto err = res.Status(); - auto args = res.GetArgs(1); if (err.ok()) { + auto args = res.GetArgs(1); return Transaction(this, conn, int64_t(args[0]), config_.RequestTimeout, ctx.execTimeout(), std::string(nsName.data(), nsName.size())); } - return Transaction(); + return Transaction(std::move(err)); } Error RPCClient::CommitTransaction(Transaction& tr, const InternalRdxContext& ctx) { diff --git a/cpp_src/client/rpcclient.h b/cpp_src/client/rpcclient.h index fe0712f3a..49198de2f 100644 --- a/cpp_src/client/rpcclient.h +++ b/cpp_src/client/rpcclient.h @@ -76,7 +76,8 @@ class RPCClient { Error GetMeta(string_view nsName, const string &key, string &data, const InternalRdxContext &ctx); Error PutMeta(string_view nsName, const string &key, const string_view &data, const InternalRdxContext &ctx); Error EnumMeta(string_view nsName, vector &keys, const InternalRdxContext &ctx); - Error SubscribeUpdates(IUpdatesObserver *observer, bool subscribe); + Error SubscribeUpdates(IUpdatesObserver *observer, const UpdatesFilters &filters, SubscriptionOpts opts = SubscriptionOpts()); + Error UnsubscribeUpdates(IUpdatesObserver *observer); Error GetSqlSuggestions(string_view query, int pos, std::vector &suggests); Error Status(); @@ -99,6 +100,7 @@ class RPCClient { Error modifyItem(string_view nsName, Item &item, int mode, seconds netTimeout, const InternalRdxContext &ctx); Error modifyItemAsync(string_view nsName, Item *item, int mode, cproto::ClientConnection *, seconds netTimeout, const InternalRdxContext &ctx); + Error subscribeImpl(bool subscribe); Namespace *getNamespace(string_view nsName); Error startWorkers(); Error addConnectEntry(const string &dsn, const client::ConnectOpts &opts, size_t idx); diff --git a/cpp_src/client/transaction.cc b/cpp_src/client/transaction.cc index 1885c173e..41a06603c 100644 --- a/cpp_src/client/transaction.cc +++ b/cpp_src/client/transaction.cc @@ -17,7 +17,6 @@ void Transaction::Modify(Query&& query) { } throw Error(errLogic, "Connection pointer in transaction is nullptr."); } -bool Transaction::IsFree() { return (conn_ == nullptr); } void Transaction::addTxItem(Item&& item, ItemModifyMode mode) { auto itData = item.GetJSON(); diff --git a/cpp_src/client/transaction.h b/cpp_src/client/transaction.h index e5b7fb3d5..e27844c65 100644 --- a/cpp_src/client/transaction.h +++ b/cpp_src/client/transaction.h @@ -25,12 +25,13 @@ class Transaction { void Modify(Item&& item, ItemModifyMode mode) { addTxItem(std::move(item), mode); } void Modify(Query&& query); - bool IsFree(); + bool IsFree() const { return (conn_ == nullptr) || !status_.ok(); } Item NewItem(); + Error Status() const { return status_; } private: friend class RPCClient; - Transaction() {} + Transaction(Error status) : status_(std::move(status)) {} Transaction(RPCClient* rpcClient, net::cproto::ClientConnection* conn, int64_t txId, std::chrono::seconds RequestTimeout, std::chrono::milliseconds execTimeout, std::string nsName) : txId_(txId), @@ -45,6 +46,7 @@ class Transaction { txId_ = -1; rpcClient_ = nullptr; conn_ = nullptr; + status_ = errOK; } int64_t txId_ = -1; @@ -53,6 +55,7 @@ class Transaction { std::chrono::seconds RequestTimeout_; std::chrono::milliseconds execTimeout_; std::string nsName_; + Error status_; }; } // namespace client diff --git a/cpp_src/cmake/modules/FindGperftools.cmake b/cpp_src/cmake/modules/FindGperftools.cmake index aa2aade4e..a50a3a6f9 100644 --- a/cpp_src/cmake/modules/FindGperftools.cmake +++ b/cpp_src/cmake/modules/FindGperftools.cmake @@ -49,4 +49,4 @@ mark_as_advanced( GPERFTOOLS_TCMALLOC_AND_PROFILER GPERFTOOLS_LIBRARIES GPERFTOOLS_INCLUDE_DIR) - + \ No newline at end of file diff --git a/cpp_src/cmake/modules/FindJemalloc.cmake b/cpp_src/cmake/modules/FindJemalloc.cmake index e9f3a9bbc..70da50b36 100755 --- a/cpp_src/cmake/modules/FindJemalloc.cmake +++ b/cpp_src/cmake/modules/FindJemalloc.cmake @@ -52,4 +52,4 @@ if ( UNIX ) endif ( JEMALLOC_LIBRARY ) mark_as_advanced( JEMALLOC_FOUND JEMALLOC_LIBRARY JEMALLOC_EXTRA_LIBRARIES JEMALLOC_INCLUDE_DIR ) -endif (UNIX) +endif (UNIX) \ No newline at end of file diff --git a/cpp_src/cmake/modules/FindSnappy.cmake b/cpp_src/cmake/modules/FindSnappy.cmake index ef88a3c1c..0a7fbe235 100644 --- a/cpp_src/cmake/modules/FindSnappy.cmake +++ b/cpp_src/cmake/modules/FindSnappy.cmake @@ -78,4 +78,4 @@ endif () mark_as_advanced( SNAPPY_LIBRARY SNAPPY_INCLUDE_DIR -) +) \ No newline at end of file diff --git a/cpp_src/cmd/reindexer_server/contrib/config.yml.in b/cpp_src/cmd/reindexer_server/contrib/config.yml.in index 15540badc..293d3cb11 100644 --- a/cpp_src/cmd/reindexer_server/contrib/config.yml.in +++ b/cpp_src/cmd/reindexer_server/contrib/config.yml.in @@ -12,6 +12,8 @@ net: webroot: ${REINDEXER_INSTALL_PREFIX}/share/reindexer/web # Enables authorization via login/password (requires users.yml file in db directory) security: false + # Idle timeout for http transactions + tx_idle_timeout: 600 # Logger configuration logger: diff --git a/cpp_src/cmd/reindexer_server/contrib/sysvinit.in b/cpp_src/cmd/reindexer_server/contrib/sysvinit.in index 7878feb88..684e1ef67 100755 --- a/cpp_src/cmd/reindexer_server/contrib/sysvinit.in +++ b/cpp_src/cmd/reindexer_server/contrib/sysvinit.in @@ -95,4 +95,4 @@ case "$1" in ;; esac -exit 0 +exit 0 \ No newline at end of file diff --git a/cpp_src/cmd/reindexer_server/test/mocks/users.json b/cpp_src/cmd/reindexer_server/test/mocks/users.json index bb044f0d9..7149ea729 100644 --- a/cpp_src/cmd/reindexer_server/test/mocks/users.json +++ b/cpp_src/cmd/reindexer_server/test/mocks/users.json @@ -23,4 +23,4 @@ "*": "owner" } } -} +} \ No newline at end of file diff --git a/cpp_src/cmd/reindexer_server/test/specs/mixins/helper_mixin.py b/cpp_src/cmd/reindexer_server/test/specs/mixins/helper_mixin.py index 4ebaf8381..15b340a37 100644 --- a/cpp_src/cmd/reindexer_server/test/specs/mixins/helper_mixin.py +++ b/cpp_src/cmd/reindexer_server/test/specs/mixins/helper_mixin.py @@ -204,20 +204,27 @@ def helper_query_dsl_joined_construct(self, namespace, join_type, op, limit=10, def helper_query_dsl_construct(self, namespace, limit=10, offset=0, distinct=[], req_total='disabled', filters=[], sort={}, joined=[], merged=[], aggregations=[], select_filter=[], select_functions=[]): - return { - 'namespace': namespace, - 'limit': limit, - 'offset': offset, - 'distinct': distinct, - 'req_total': req_total, - 'filters': filters, - 'sort': sort, - 'joined': joined, - 'merged': merged, - 'select_filter': select_filter, - 'select_functions': select_functions, - 'aggregations': aggregations - } + ret = {'namespace': namespace, + 'limit': limit, + 'offset': offset, + 'req_total': req_total} + if distinct: + ret['distinct'] = distinct + if filters: + ret['filters'] = filters + if sort: + ret['sort'] = sort + if joined: + ret['joined'] = joined + if merged: + ret['merged'] = merged + if aggregations: + ret['aggregations'] = aggregations + if select_filter: + ret['select_filter'] = select_filter + if select_functions: + ret['select_functions'] = select_functions + return ret def helper_msg_role_status(self, status): msg = "Role: {role}. Status: {status}".format( diff --git a/cpp_src/cmd/reindexer_tool/commandsprocessor.cc b/cpp_src/cmd/reindexer_tool/commandsprocessor.cc index 0e9464a31..cc0de3a5c 100644 --- a/cpp_src/cmd/reindexer_tool/commandsprocessor.cc +++ b/cpp_src/cmd/reindexer_tool/commandsprocessor.cc @@ -555,9 +555,49 @@ Error CommandsProcessor::commandSubscribe(const string& command) { LineParser parser(command); parser.NextToken(); - bool on = !iequals(parser.NextToken(), "off"); + reindexer::UpdatesFilters filters; + auto token = parser.NextToken(); + if (iequals(token, "off")) { + return db_.UnsubscribeUpdates(this); + } else if (token.empty() || iequals(token, "on")) { + return db_.SubscribeUpdates(this, filters); + } + std::vector nsInSubscription; + while (!token.empty()) { + filters.AddFilter(token, reindexer::UpdatesFilters::Filter()); + nsInSubscription.emplace_back(token); + token = parser.NextToken(); + } - return db_.SubscribeUpdates(this, on); + auto err = db_.SubscribeUpdates(this, filters); + if (!err.ok()) { + return err; + } + vector allNsDefs; + err = db_.EnumNamespaces(allNsDefs, reindexer::EnumNamespacesOpts().WithClosed()); + if (!err.ok()) { + return err; + } + for (auto& ns : allNsDefs) { + for (auto it = nsInSubscription.begin(); it != nsInSubscription.end();) { + if (*it == ns.name) { + it = nsInSubscription.erase(it); + } else { + ++it; + } + } + } + if (!nsInSubscription.empty()) { + std::cout << "WARNING: You have subscribed for non-existing namespace updates: "; + for (auto it = nsInSubscription.begin(); it != nsInSubscription.end(); ++it) { + if (it != nsInSubscription.begin()) { + std::cout << ", "; + } + std::cout << *it; + } + std::cout << std::endl; + } + return errOK; } template @@ -683,6 +723,7 @@ void CommandsProcessor::addCommandsSuggestions(std::string const& c } } else if (token == "\\subscribe") { checkForCommandNameMatch(parser.NextToken(), {"on", "off"}, suggestions); + checkForNsNameMatch(parser.NextToken(), suggestions); } else if (token == "\\databases") { token = parser.NextToken(); if (token == "use") { diff --git a/cpp_src/cmd/reindexer_tool/commandsprocessor.h b/cpp_src/cmd/reindexer_tool/commandsprocessor.h index a53079a40..c4ab416cd 100644 --- a/cpp_src/cmd/reindexer_tool/commandsprocessor.h +++ b/cpp_src/cmd/reindexer_tool/commandsprocessor.h @@ -170,9 +170,12 @@ class CommandsProcessor : public reindexer::IUpdatesObserver { Syntax: \bench