Skip to content

Commit

Permalink
simplify timed version (#664)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulwe authored Mar 21, 2024
1 parent 62e1809 commit 0d9caad
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 70 deletions.
94 changes: 36 additions & 58 deletions utils/timed_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"sync"
"time"

"go.uber.org/atomic"

"github.com/livekit/protocol/livekit"
)

Expand All @@ -33,19 +31,18 @@ const tickMask uint64 = (1 << tickBits) - 1
var epoch = time.Date(2000, 0, 0, 0, 0, 0, 0, time.UTC).UnixMicro()

type TimedVersionGenerator interface {
New() *TimedVersion
Next() TimedVersion
}

func timedVersionComponents(v uint64) (ts int64, ticks int32) {
return int64(v>>tickBits) + epoch, int32(v & tickMask)
func timedVersionComponents(v TimedVersion) (ts int64, ticks int32) {
return int64(v>>tickBits) + epoch, int32(uint64(v) & tickMask)
}

func timedVersionFromComponents(ts int64, ticks int32) TimedVersion {
if ts < epoch {
ts = epoch
}
return TimedVersion{v: *atomic.NewUint64((uint64(ts-epoch) << tickBits) | uint64(ticks))}
return TimedVersion((uint64(ts-epoch) << tickBits) | uint64(ticks))
}

type timedVersionGenerator struct {
Expand Down Expand Up @@ -90,19 +87,7 @@ func (g *timedVersionGenerator) Next() TimedVersion {
}
}

type TimedVersion struct {
v atomic.Uint64
}

func NewTimedVersionFromProto(proto *livekit.TimedVersion) *TimedVersion {
v := timedVersionFromComponents(proto.GetUnixMicro(), proto.GetTicks())
return &v
}

func NewTimedVersionFromTime(t time.Time) *TimedVersion {
v := timedVersionFromComponents(t.UnixMicro(), 0)
return &v
}
type TimedVersion uint64

func TimedVersionFromProto(proto *livekit.TimedVersion) TimedVersion {
return timedVersionFromComponents(proto.GetUnixMicro(), proto.GetTicks())
Expand All @@ -112,74 +97,67 @@ func TimedVersionFromTime(t time.Time) TimedVersion {
return timedVersionFromComponents(t.UnixMicro(), 0)
}

func (t *TimedVersion) Update(other *TimedVersion) bool {
func (t *TimedVersion) Update(other TimedVersion) bool {
return t.Upgrade(other)
}

func (t *TimedVersion) Upgrade(other *TimedVersion) bool {
return t.update(other, func(ov, prev uint64) bool { return ov > prev })
}

func (t *TimedVersion) Downgrade(other *TimedVersion) bool {
return t.update(other, func(ov, prev uint64) bool { return ov < prev })
func (t *TimedVersion) Upgrade(other TimedVersion) bool {
if *t < other {
*t = other
return true
}
return false
}

func (t *TimedVersion) update(other *TimedVersion, cmp func(ov, prev uint64) bool) bool {
ov := other.v.Load()
for {
prev := t.v.Load()
if !cmp(ov, prev) {
return false
}
if t.v.CompareAndSwap(prev, ov) {
return true
}
func (t *TimedVersion) Downgrade(other TimedVersion) bool {
if *t > other {
*t = other
return true
}
return false
}

func (t *TimedVersion) Store(other *TimedVersion) {
t.v.Store(other.v.Load())
func (t *TimedVersion) Store(other TimedVersion) {
*t = other
}

func (t *TimedVersion) Load() TimedVersion {
return TimedVersion{v: *atomic.NewUint64(t.v.Load())}
func (t TimedVersion) Load() TimedVersion {
return t
}

func (t *TimedVersion) After(other *TimedVersion) bool {
return t.v.Load() > other.v.Load()
func (t TimedVersion) After(other TimedVersion) bool {
return t > other
}

func (t *TimedVersion) Compare(other *TimedVersion) int {
ov := other.v.Load()
v := t.v.Load()
if v < ov {
func (t TimedVersion) Compare(other TimedVersion) int {
if t < other {
return -1
}
if v == ov {
if t == other {
return 0
}
return 1
}

func (t *TimedVersion) IsZero() bool {
return t.v.Load() == 0
func (t TimedVersion) IsZero() bool {
return t == 0
}

func (t *TimedVersion) ToProto() *livekit.TimedVersion {
ts, ticks := timedVersionComponents(t.v.Load())
func (t TimedVersion) ToProto() *livekit.TimedVersion {
ts, ticks := timedVersionComponents(t)
return &livekit.TimedVersion{
UnixMicro: ts,
Ticks: ticks,
}
}

func (t *TimedVersion) Time() time.Time {
ts, _ := timedVersionComponents(t.v.Load())
func (t TimedVersion) Time() time.Time {
ts, _ := timedVersionComponents(t)
return time.UnixMicro(ts)
}

func (t *TimedVersion) String() string {
ts, ticks := timedVersionComponents(t.v.Load())
func (t TimedVersion) String() string {
ts, ticks := timedVersionComponents(t)
return fmt.Sprintf("%d.%d", ts, ticks)
}

Expand All @@ -188,7 +166,7 @@ func (t TimedVersion) Value() (driver.Value, error) {
return nil, nil
}

ts, ticks := timedVersionComponents(t.v.Load())
ts, ticks := timedVersionComponents(t)
b := make([]byte, 0, 12)
b = binary.BigEndian.AppendUint64(b, uint64(ts))
b = binary.BigEndian.AppendUint32(b, uint32(ticks))
Expand All @@ -200,7 +178,7 @@ func (t *TimedVersion) Scan(src interface{}) (err error) {
case []byte:
switch len(b) {
case 0:
t.v.Store(0)
*t = 0
case 12:
ts := int64(binary.BigEndian.Uint64(b))
ticks := int32(binary.BigEndian.Uint32(b[8:]))
Expand All @@ -209,7 +187,7 @@ func (t *TimedVersion) Scan(src interface{}) (err error) {
return errors.New("(*TimedVersion).Scan: unsupported format")
}
case nil:
t.v.Store(0)
*t = 0
default:
return errors.New("(*TimedVersion).Scan: unsupported data type")
}
Expand Down
24 changes: 12 additions & 12 deletions utils/timed_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
func TestTimedVersion(t *testing.T) {
t.Run("timed versions are monotonic and comparable", func(t *testing.T) {
gen := NewDefaultTimedVersionGenerator()
tv1 := gen.New()
tv2 := gen.New()
tv3 := gen.New()
tv1 := gen.Next()
tv2 := gen.Next()
tv3 := gen.Next()

require.True(t, tv3.After(tv1))
require.True(t, tv3.After(tv2))
Expand All @@ -44,28 +44,28 @@ func TestTimedVersion(t *testing.T) {

t.Run("protobuf roundtrip", func(t *testing.T) {
gen := NewDefaultTimedVersionGenerator()
tv1 := gen.New()
tv2 := NewTimedVersionFromProto(tv1.ToProto())
require.Equal(t, tv1.v.Load(), tv2.v.Load())
tv1 := gen.Next()
tv2 := TimedVersionFromProto(tv1.ToProto())
require.Equal(t, tv1, tv2)
})

t.Run("from zero time yields epoch version", func(t *testing.T) {
gen := NewDefaultTimedVersionGenerator()
tv1 := NewTimedVersionFromTime(time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC))
tv2 := gen.New()
tv1 := TimedVersionFromTime(time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC))
tv2 := gen.Next()
require.True(t, tv2.After(tv1))
})

t.Run("time.Time roundtrip", func(t *testing.T) {
ts1 := time.Now().Round(time.Microsecond)
tv1 := NewTimedVersionFromTime(ts1)
tv1 := TimedVersionFromTime(ts1)
ts2 := tv1.Time()
tv2 := NewTimedVersionFromTime(ts2)
tv2 := TimedVersionFromTime(ts2)
require.Equal(t, ts1, ts2)
require.Equal(t, tv1.v.Load(), tv2.v.Load())
require.Equal(t, tv1, tv2)
})

t.Run("timed version from nil is zero", func(t *testing.T) {
require.True(t, NewTimedVersionFromProto(nil).IsZero())
require.True(t, TimedVersionFromProto(nil).IsZero())
})
}

0 comments on commit 0d9caad

Please sign in to comment.