From cde0087557f843e8cfde7c073ed3eb3e610dce86 Mon Sep 17 00:00:00 2001 From: Termina1 Date: Mon, 16 Dec 2024 12:32:33 +0200 Subject: [PATCH] Revert "reduce lock duration" This reverts commit 9e004557c0f5f457f37a81f18b4c99abc1c9df0a. --- chotki.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/chotki.go b/chotki.go index 5ca8656..0746146 100644 --- a/chotki.go +++ b/chotki.go @@ -496,15 +496,12 @@ func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, exce // Here new packets are timestamped and queued for save func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error) { cho.lock.Lock() + defer cho.lock.Unlock() if cho.db == nil { - cho.lock.Unlock() return rdx.BadId, ErrClosed } - id = (cho.last & ^rdx.OffMask) + rdx.ProInc - cho.last = id - cho.lock.Unlock() i := protocol.Record('I', id.ZipBytes()) r := protocol.Record('R', ref.ZipBytes()) packet := protocol.Record(lit, i, r, protocol.Join(body...)) @@ -566,6 +563,13 @@ func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error) return parseErr } + if id.Src() == cho.src && id > cho.last { + if id.Off() != 0 { + return rdx.ErrBadPacket + } + cho.last = id + } + pb, noApply := pebble.Batch{}, false cho.log.DebugCtx(ctx, "new packet", "type", string(lit), "packet", id.String())