Skip to content

Commit

Permalink
Update to version v2.11.1
Browse files Browse the repository at this point in the history
  • Loading branch information
olegator77 committed Aug 14, 2020
1 parent 0575789 commit 543f613
Show file tree
Hide file tree
Showing 59 changed files with 1,474 additions and 681 deletions.
20 changes: 17 additions & 3 deletions bindings/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,11 @@ func (binding *Builtin) ModifyItem(ctx context.Context, nsHash int, namespace st
}

func (binding *Builtin) ModifyItemTx(txCtx *bindings.TxCtx, format int, data []byte, mode int, precepts []string, stateToken int) error {
select {
case <-txCtx.UserCtx.Done():
return txCtx.UserCtx.Err()
default:
}

ser1 := cjson.NewPoolSerializer()
defer ser1.Close()
Expand All @@ -265,11 +270,20 @@ func (binding *Builtin) ModifyItemTx(txCtx *bindings.TxCtx, format int, data []b
}

func (binding *Builtin) DeleteQueryTx(txCtx *bindings.TxCtx, rawQuery []byte) error {
return err2go(C.reindexer_delete_query_tx(binding.rx, C.uintptr_t(txCtx.Id), buf2c(rawQuery)))
select {
case <-txCtx.UserCtx.Done():
return txCtx.UserCtx.Err()
default:
return err2go(C.reindexer_delete_query_tx(binding.rx, C.uintptr_t(txCtx.Id), buf2c(rawQuery)))
}
}
func (binding *Builtin) UpdateQueryTx(txCtx *bindings.TxCtx, rawQuery []byte) error {

return err2go(C.reindexer_update_query_tx(binding.rx, C.uintptr_t(txCtx.Id), buf2c(rawQuery)))
select {
case <-txCtx.UserCtx.Done():
return txCtx.UserCtx.Err()
default:
return err2go(C.reindexer_update_query_tx(binding.rx, C.uintptr_t(txCtx.Id), buf2c(rawQuery)))
}
}

// ModifyItemTxAsync is not implemented for builtin binding
Expand Down
2 changes: 1 addition & 1 deletion bindings/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package bindings

const CInt32Max = int(^uint32(0) >> 1)

const ReindexerVersion = "v2.11.0"
const ReindexerVersion = "v2.11.1"

// public go consts from type_consts.h and reindexer_ctypes.h
const (
Expand Down
54 changes: 30 additions & 24 deletions bindings/cproto/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func seqNumIsValid(seqNum uint32) bool {
func (c *connection) deadlineTicker() {
timeout := time.Second * time.Duration(deadlineCheckPeriodSec)
ticker := time.NewTicker(timeout)
atomic.StoreUint32(&c.now, 0)
atomic.StoreUint32(&c.now, 1) // Starts from 1, so timeout value < 1s will not transform into 0 value deadline
for range ticker.C {
select {
case <-c.errCh:
Expand All @@ -168,21 +168,20 @@ func (c *connection) deadlineTicker() {
continue
}
deadline := atomic.LoadUint32(&c.requests[i].deadline)
if deadline != 0 && now >= deadline && atomic.CompareAndSwapUint32(&c.requests[i].deadline, deadline, 0) {
if atomic.LoadInt32(&c.requests[i].isAsync) != 0 {
c.requests[i].cmplLock.Lock()
if c.requests[i].cmpl != nil && deadline == atomic.LoadUint32(&c.requests[i].deadline) {
cmpl := c.requests[i].cmpl
c.requests[i].cmpl = nil
seqNum = atomic.LoadUint32(&c.requests[i].seqNum)
atomic.StoreUint32(&c.requests[i].seqNum, maxSeqNum)
atomic.StoreInt32(&c.requests[i].isAsync, 0)
c.requests[i].cmplLock.Unlock()
c.seqs <- nextSeqNum(seqNum)
cmpl(nil, context.DeadlineExceeded)
} else {
c.requests[i].cmplLock.Unlock()
}
if deadline != 0 && now >= deadline && atomic.LoadInt32(&c.requests[i].isAsync) != 0 {
c.requests[i].cmplLock.Lock()
if c.requests[i].cmpl != nil && atomic.CompareAndSwapUint32(&c.requests[i].deadline, deadline, 0) {
cmpl := c.requests[i].cmpl
c.requests[i].cmpl = nil
seqNum = atomic.LoadUint32(&c.requests[i].seqNum)
atomic.StoreUint32(&c.requests[i].seqNum, maxSeqNum)
atomic.StoreInt32(&c.requests[i].isAsync, 0)
c.requests[i].cmplLock.Unlock()
c.seqs <- nextSeqNum(seqNum)
fmt.Println("Canceling on deadline: ", deadline, ", id: ", seqNum)
cmpl(nil, context.DeadlineExceeded)
} else {
c.requests[i].cmplLock.Unlock()
}
}
}
Expand Down Expand Up @@ -385,15 +384,21 @@ func (c *connection) packRPC(cmd int, seq uint32, execTimeout int, args ...inter
in.ser.Close()
}

func (c *connection) awaitSeqNum(ctx context.Context) (seq uint32, remainingTimeout int, err error) {
func (c *connection) awaitSeqNum(ctx context.Context) (seq uint32, remainingTimeout time.Duration, err error) {
select {
case <-ctx.Done():
err = ctx.Err()
default:
}

select {
case seq = <-c.seqs:
if err = ctx.Err(); err != nil {
c.seqs <- seq
return
}
if execDeadline, ok := ctx.Deadline(); ok {
remainingTimeout = int(execDeadline.Sub(time.Now()) / time.Millisecond)
remainingTimeout = execDeadline.Sub(time.Now())
if remainingTimeout <= 0 {
c.seqs <- seq
err = context.DeadlineExceeded
Expand Down Expand Up @@ -431,15 +436,16 @@ func (c *connection) rpcCallAsync(ctx context.Context, cmd int, netTimeout uint3
reqID := seq % queueSize
c.requests[reqID].cmplLock.Lock()
c.requests[reqID].cmpl = cmpl
if timeout == 0 {
atomic.StoreUint32(&c.requests[reqID].deadline, 0)
} else {
atomic.StoreUint32(&c.requests[reqID].deadline, atomic.LoadUint32(&c.now)+uint32(timeout.Seconds()))
}
atomic.StoreInt32(&c.requests[reqID].isAsync, 1)
atomic.StoreUint32(&c.requests[reqID].seqNum, seq)
c.requests[reqID].cmplLock.Unlock()

if timeout != 0 {
atomic.StoreUint32(&c.requests[reqID].deadline, atomic.LoadUint32(&c.now)+uint32(timeout))
}

c.packRPC(cmd, seq, timeout, args...)
c.packRPC(cmd, seq, int(timeout.Milliseconds()), args...)

return
}
Expand All @@ -458,7 +464,7 @@ func (c *connection) rpcCall(ctx context.Context, cmd int, netTimeout uint32, ar
reply := c.requests[reqID].repl

atomic.StoreUint32(&c.requests[reqID].seqNum, seq)
c.packRPC(cmd, seq, timeout, args...)
c.packRPC(cmd, seq, int(timeout.Milliseconds()), args...)

for_loop:
for {
Expand Down
2 changes: 1 addition & 1 deletion bindings/cproto/cproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (binding *NetCProto) RollbackTx(txCtx *bindings.TxCtx) error {
if txCtx.Result == nil {
return nil
}
return txCtx.Result.(*NetBuffer).conn.rpcCallNoResults(txCtx.UserCtx, cmdRollbackTx, uint32(binding.timeouts.RequestTimeout/time.Second), int64(txCtx.Id))
return txCtx.Result.(*NetBuffer).conn.rpcCallNoResults(context.TODO(), cmdRollbackTx, uint32(binding.timeouts.RequestTimeout/time.Second), int64(txCtx.Id))
}

func (binding *NetCProto) ModifyItemTx(txCtx *bindings.TxCtx, format int, data []byte, mode int, precepts []string, stateToken int) error {
Expand Down
18 changes: 18 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# Version 2.12.0 (14.08.2020)

# Core
- [fix] Increased performance of queries with custom sort order
- [fix] Fixed behavior of SQL UPDATE statement with true/false/null values
- [fix] Do not reset expire_after value of ttl indexes after copying tx
- [fix] Error loading of system records (indexes/tags/replState) after copying tx
- [fea] Return error for queries with EqualPosition to fields with IS NULL condition
- [fix] To config added default value of `optimization_sort_workers`
- [fix] Windows specific error with IN () condition executed by comparator

# Reindexer server
- [fea] Removed default limit=10 in GET :db/namespaces/:ns/items method
- [fea] Added `REINDEXER_NOREUSEIDLE` env var for disable server connection reuse

# Go connector
- [fix] Fixed async tx goroutines leak

# Version 2.11.0 (29.07.2020)

# Core
Expand Down
2 changes: 1 addition & 1 deletion cpp_src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ else()
option (LINK_RESOURCES "Link web resources as binary data" ON)
endif()

set (REINDEXER_VERSION_DEFAULT "2.11.0")
set (REINDEXER_VERSION_DEFAULT "2.11.1")

if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "RelWithDebInfo")
Expand Down
11 changes: 6 additions & 5 deletions cpp_src/cmd/reindexer_server/test/specs/items_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ def check_update_msgpack_response(self, data, itemsCount, updatedItems=0):
self.assertEqual(True, data['success'] == True, data)
if updatedItems > 0:
self.assertEqual(True, 'items' in data, data)
self.assertEqual(True, len(data['items']) == updatedItems, data['items'])
self.assertEqual(
True, len(data['items']) == updatedItems, data['items'])

def setUp(self):
super().setUp()
Expand Down Expand Up @@ -90,7 +91,8 @@ def test_update_item_msgpack(self):
item_body['index'] = i
items.append(item_body)

status, body = self.api_get_items(self.current_db, self.current_ns, self.EncodingType.MsgPack)
status, body = self.api_get_items(
self.current_db, self.current_ns, self.EncodingType.MsgPack)
self.assertEqual(True, status == self.API_STATUS['success'], body)
self.assertEqual(True, 'message' in body, body)
data = msgpack.loads(body.get('message'))
Expand All @@ -101,10 +103,10 @@ def test_update_item_msgpack(self):
packer = msgpack.Packer(use_bin_type=True, autoreset=False)
for i in range(0, len(items)):
packer.pack(items[i])

last_index = 'test_' + str(count)
precepts = [last_index + '=serial()']

new_items_body = packer.getbuffer()
status, body = self.api_update_item(
self.current_db, self.current_ns, new_items_body, precepts, self.EncodingType.MsgPack)
Expand All @@ -113,7 +115,6 @@ def test_update_item_msgpack(self):
data = msgpack.loads(body.get('message'))
self.check_update_msgpack_response(data, 10, 10)


def test_update_item(self):
"""Should be able to update item"""

Expand Down
7 changes: 7 additions & 0 deletions cpp_src/core/comparator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ class Comparator : public ComparatorVars {
void Bind(PayloadType type, int field);
void BindEqualPosition(int field, const VariantArray &val, CondType cond);
void BindEqualPosition(const TagsPath &tagsPath, const VariantArray &val, CondType cond);
void ClearDistinct() {
cmpInt.ClearDistinct();
cmpBool.ClearDistinct();
cmpInt64.ClearDistinct();
cmpDouble.ClearDistinct();
cmpString.ClearDistinct();
}

protected:
bool compare(const Variant &kr) {
Expand Down
8 changes: 7 additions & 1 deletion cpp_src/core/comparatorimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class ComparatorImpl {
}

void ExcludeDistinct(T value) { distS_->emplace(value); }
void ClearDistinct() {
if (distS_) distS_->clear();
}

h_vector<T, 1> values_;
intrusive_ptr<intrusive_atomic_rc_wrapper<fast_hash_set<T>>> valuesS_, distS_;
Expand All @@ -99,7 +102,7 @@ class ComparatorImpl {

void addValue(CondType cond, T value) {
if (cond == CondSet) {
valuesS_->emplace(value);
valuesS_->insert(value);
} else {
values_.push_back(value);
}
Expand Down Expand Up @@ -160,6 +163,9 @@ class ComparatorImpl<key_string> {
}

void ExcludeDistinct(p_string value) { distS_->emplace(value.getOrMakeKeyString()); }
void ClearDistinct() {
if (distS_) distS_->clear();
}

h_vector<key_string, 1> values_;
string_view cachedValueSV_;
Expand Down
Loading

0 comments on commit 543f613

Please sign in to comment.