Skip to content

Commit

Permalink
Update to version v2.12.0
Browse files Browse the repository at this point in the history
  • Loading branch information
graveart committed Sep 2, 2020
1 parent 035210d commit 84c223b
Show file tree
Hide file tree
Showing 130 changed files with 4,695 additions and 1,197 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ git:
- go get github.com/golang/snappy
- go get github.com/stretchr/testify/assert
- go get github.com/iancoleman/orderedmap
- go get github.com/hashicorp/golang-lru
script:
- cd ${BUILD_DIR} && cmake ${CMAKE_OPTS} ${TRAVIS_BUILD_DIR} && make -j4
- ctest --verbose
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
57 changes: 16 additions & 41 deletions bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ func (db *reindexerImpl) modifyItem(ctx context.Context, namespace string, ns *r

resultp := rdSer.readRawtItemParams()

ns.cacheLock.Lock()
delete(ns.cacheItems, resultp.id)
ns.cacheLock.Unlock()
ns.cacheItems.Remove(resultp.id)

if len(precepts) > 0 && (resultp.cptr != 0 || resultp.data != nil) && reflect.TypeOf(item).Kind() == reflect.Ptr {
nsArrEntry := nsArrayEntry{ns, ns.cjsonState.Copy()}
Expand Down Expand Up @@ -139,56 +137,46 @@ func (db *reindexerImpl) getMeta(ctx context.Context, namespace, key string) ([]

func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool, nonCacheableData bool, item interface{}) (interface{}, error) {
useCache := item == nil && (ns.deepCopyIface || allowUnsafe) && !nonCacheableData
hasCache := false
needCopy := ns.deepCopyIface && !allowUnsafe
var err error

if useCache {
ns.cacheLock.RLock()
hasCache = ns.cacheItems != nil
if !hasCache {
ns.cacheLock.RUnlock()
}
}
if useCache && hasCache {
if citem, ok := ns.cacheItems[params.id]; ok && citem.version == params.version {
if useCache && ns.cacheItems != nil {
if citem, ok := ns.cacheItems.Get(params.id); ok && citem.version == params.version {
item = citem.item
ns.cacheLock.RUnlock()
} else {
ns.cacheLock.RUnlock()
item = reflect.New(ns.rtype).Interface()
dec := ns.localCjsonState.NewDecoder(item, logger)
var dataSize uint64
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
dataSize, err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
err = dec.Decode(params.data, item)
dataSize, err = dec.Decode(params.data, item)
} else {
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
if err != nil {
return item, err
}
ns.cacheLock.Lock()
if citem, ok := ns.cacheItems[params.id]; ok {

if citem, ok := ns.cacheItems.Get(params.id); ok {
if citem.version == params.version {
item = citem.item
} else if citem.version < params.version {
ns.cacheItems[params.id] = cacheItem{item: item, version: params.version}
ns.cacheItems.Add(params.id, &cacheItem{item: item, version: params.version, size: dataSize})
}
} else {
ns.cacheItems[params.id] = cacheItem{item: item, version: params.version}
ns.cacheItems.Add(params.id, &cacheItem{item: item, version: params.version, size: dataSize})
}
ns.cacheLock.Unlock()
}
} else {
if item == nil {
item = reflect.New(ns.rtype).Interface()
}
dec := ns.localCjsonState.NewDecoder(item, logger)
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
_, err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
err = dec.Decode(params.data, item)
_, err = dec.Decode(params.data, item)
} else {
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
Expand Down Expand Up @@ -389,19 +377,14 @@ func (db *reindexerImpl) deleteQuery(ctx context.Context, q *Query) (int, error)
// skip total count
rawQueryParams := ser.readRawQueryParams()

ns.cacheLock.Lock()
for i := 0; i < rawQueryParams.count; i++ {
params := ser.readRawtItemParams()
if (rawQueryParams.flags&bindings.ResultsWithJoined) != 0 && ser.GetVarUInt() != 0 {
panic("Internal error: joined items in delete query result")
}
// Update cache
if _, ok := ns.cacheItems[params.id]; ok {
delete(ns.cacheItems, params.id)
}

ns.cacheItems.Remove(params.id)
}
ns.cacheLock.Unlock()
if !ser.Eof() {
panic("Internal error: data after end of delete query result")
}
Expand All @@ -428,19 +411,15 @@ func (db *reindexerImpl) updateQuery(ctx context.Context, q *Query) *Iterator {
ns.cjsonState.ReadPayloadType(&ser.Serializer)
})

ns.cacheLock.Lock()
for i := 0; i < rawQueryParams.count; i++ {
params := ser.readRawtItemParams()
if (rawQueryParams.flags&bindings.ResultsWithJoined) != 0 && ser.GetVarUInt() != 0 {
panic("Internal error: joined items in update query result")
}
// Update cache
if _, ok := ns.cacheItems[params.id]; ok {
delete(ns.cacheItems, params.id)
}

ns.cacheItems.Remove(params.id)
}
ns.cacheLock.Unlock()

if !ser.Eof() {
panic("Internal error: data after end of update query result")
}
Expand Down Expand Up @@ -473,11 +452,7 @@ func (db *reindexerImpl) resetCachesCtx(ctx context.Context) {
}
db.lock.RUnlock()
for _, ns := range nsArray {
ns.cacheLock.Lock()
if ns.cacheItems != nil {
ns.cacheItems = make(map[int]cacheItem)
}
ns.cacheLock.Unlock()
ns.cacheItems.Reset(ns.opts.objCacheSize)
ns.cjsonState.Reset()
db.query(ns.name).Limit(0).ExecCtx(ctx).Close()
}
Expand Down
43 changes: 25 additions & 18 deletions bindings/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ func (binding *Builtin) Init(u []url.URL, options ...interface{}) error {
return err2go(C.reindexer_connect(binding.rx, str2c(u[0].Path), opts, str2c(bindings.ReindexerVersion)))
}

func (binding *Builtin) StartWatchOnCtx(ctx context.Context) (CCtxWrapper, error) {
if binding.ctxWatcher != nil {
return binding.ctxWatcher.StartWatchOnCtx(ctx)
}
return CCtxWrapper{}, bindings.NewError("rq: builtin binding is not initialized properly", bindings.ErrNotValid)
}

func (binding *Builtin) Clone() bindings.RawBinding {
return &Builtin{}
}
Expand Down Expand Up @@ -238,7 +245,7 @@ func (binding *Builtin) ModifyItem(ctx context.Context, nsHash int, namespace st
}
packedArgs := ser1.Bytes()

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -299,7 +306,7 @@ func (binding *Builtin) OpenNamespace(ctx context.Context, namespace string, ena
options: C.uint16_t(storageOptions),
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -308,7 +315,7 @@ func (binding *Builtin) OpenNamespace(ctx context.Context, namespace string, ena
return err2go(C.reindexer_open_namespace(binding.rx, str2c(namespace), opts, ctxInfo.cCtx))
}
func (binding *Builtin) CloseNamespace(ctx context.Context, namespace string) error {
ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -318,7 +325,7 @@ func (binding *Builtin) CloseNamespace(ctx context.Context, namespace string) er
}

func (binding *Builtin) DropNamespace(ctx context.Context, namespace string) error {
ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -328,7 +335,7 @@ func (binding *Builtin) DropNamespace(ctx context.Context, namespace string) err
}

func (binding *Builtin) TruncateNamespace(ctx context.Context, namespace string) error {
ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -338,7 +345,7 @@ func (binding *Builtin) TruncateNamespace(ctx context.Context, namespace string)
}

func (binding *Builtin) RenameNamespace(ctx context.Context, srcNs string, dstNs string) error {
ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -353,7 +360,7 @@ func (binding *Builtin) EnableStorage(ctx context.Context, path string) error {
path += "/"
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -368,7 +375,7 @@ func (binding *Builtin) AddIndex(ctx context.Context, namespace string, indexDef
return err
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -384,7 +391,7 @@ func (binding *Builtin) SetSchema(ctx context.Context, namespace string, schema
return err
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -401,7 +408,7 @@ func (binding *Builtin) UpdateIndex(ctx context.Context, namespace string, index
return err
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -412,7 +419,7 @@ func (binding *Builtin) UpdateIndex(ctx context.Context, namespace string, index
}

func (binding *Builtin) DropIndex(ctx context.Context, namespace, index string) error {
ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -422,7 +429,7 @@ func (binding *Builtin) DropIndex(ctx context.Context, namespace, index string)
}

func (binding *Builtin) PutMeta(ctx context.Context, namespace, key, data string) error {
ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return err
}
Expand All @@ -432,7 +439,7 @@ func (binding *Builtin) PutMeta(ctx context.Context, namespace, key, data string
}

func (binding *Builtin) GetMeta(ctx context.Context, namespace, key string) (bindings.RawBuffer, error) {
ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return nil, err
}
Expand All @@ -448,7 +455,7 @@ func (binding *Builtin) Select(ctx context.Context, query string, asJson bool, p
defer func() { <-binding.cgoLimiter }()
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -479,7 +486,7 @@ func (binding *Builtin) CommitTx(txCtx *bindings.TxCtx) (bindings.RawBuffer, err
defer func() { <-binding.cgoLimiter }()
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(txCtx.UserCtx)
ctxInfo, err := binding.StartWatchOnCtx(txCtx.UserCtx)
if err != nil {
return nil, err
}
Expand All @@ -502,7 +509,7 @@ func (binding *Builtin) SelectQuery(ctx context.Context, data []byte, asJson boo
defer func() { <-binding.cgoLimiter }()
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return nil, err
}
Expand All @@ -518,7 +525,7 @@ func (binding *Builtin) DeleteQuery(ctx context.Context, nsHash int, data []byte
defer func() { <-binding.cgoLimiter }()
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return nil, err
}
Expand All @@ -534,7 +541,7 @@ func (binding *Builtin) UpdateQuery(ctx context.Context, nsHash int, data []byte
defer func() { <-binding.cgoLimiter }()
}

ctxInfo, err := binding.ctxWatcher.StartWatchOnCtx(ctx)
ctxInfo, err := binding.StartWatchOnCtx(ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion bindings/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package bindings

const CInt32Max = int(^uint32(0) >> 1)

const ReindexerVersion = "v2.11.1"
const ReindexerVersion = "v2.12.0"

// public go consts from type_consts.h and reindexer_ctypes.h
const (
Expand Down
8 changes: 3 additions & 5 deletions bindings/cproto/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ type connection struct {
now uint32
termCh chan struct{}

requests [queueSize]requestInfo
enableSnappy int32
isServerChanged bool
requests [queueSize]requestInfo
enableSnappy int32
}

func newConnection(ctx context.Context, owner *NetCProto) (c *connection, err error) {
Expand Down Expand Up @@ -178,7 +177,6 @@ func (c *connection) deadlineTicker() {
atomic.StoreInt32(&c.requests[i].isAsync, 0)
c.requests[i].cmplLock.Unlock()
c.seqs <- nextSeqNum(seqNum)
fmt.Println("Canceling on deadline: ", deadline, ", id: ", seqNum)
cmpl(nil, context.DeadlineExceeded)
} else {
c.requests[i].cmplLock.Unlock()
Expand Down Expand Up @@ -224,7 +222,7 @@ func (c *connection) login(ctx context.Context, owner *NetCProto) (err error) {
serverStartTS := buf.args[1].(int64)
old := atomic.SwapInt64(&owner.serverStartTime, serverStartTS)
if old != 0 && old != serverStartTS {
c.isServerChanged = true
atomic.StoreInt32(&c.owner.isServerChanged, 1)
}
}
return
Expand Down
4 changes: 3 additions & 1 deletion bindings/cproto/cproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Logger interface {
type NetCProto struct {
dsn dsn
pool pool
isServerChanged int32
onChangeCallback func()
serverStartTime int64
retryAttempts bindings.OptionRetryAttempts
Expand Down Expand Up @@ -456,7 +457,8 @@ func (binding *NetCProto) getConn(ctx context.Context) (conn *connection, err er
if err != nil {
return nil, err
}
if conn.isServerChanged {

if atomic.CompareAndSwapInt32(&binding.isServerChanged, 1, 0) {
binding.onChangeCallback()
}
return conn, nil
Expand Down
6 changes: 6 additions & 0 deletions bindings/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ type Status struct {
Err error
CProto StatusCProto
Builtin StatusBuiltin
Cache StatusCache
}

type StatusCache struct {
CurSize int64
MaxSize int64
}

type StatusCProto struct {
Expand Down
Loading

0 comments on commit 84c223b

Please sign in to comment.