diff --git a/storage/store/partial_kv.go b/storage/store/partial_kv.go index 64d1f180f..e04299ac8 100644 --- a/storage/store/partial_kv.go +++ b/storage/store/partial_kv.go @@ -130,8 +130,32 @@ func (p *PartialKV) ReadOps() []byte { return data } -func applyOps(in []byte, store *baseStore) error { +func valueToFloat64(value []byte) float64 { + return math.Float64frombits(binary.LittleEndian.Uint64(value)) +} +func valueToInt64(value []byte) int64 { + big := new(big.Int) + big.SetString(string(value), 10) + return big.Int64() +} + +func valueToBigInt(value []byte) *big.Int { + big := new(big.Int) + big.SetString(string(value), 10) + return big +} + +func valueToBigDecimal(value []byte) (decimal.Decimal, error) { + val := decimal.Decimal{} + err := val.UnmarshalBinary(value) + if err != nil { + return decimal.Decimal{}, err + } + return val, nil +} + +func applyOps(in []byte, store *baseStore) error { ops := &pbssinternal.Operations{} if err := proto.Unmarshal(in, ops); err != nil { return err @@ -152,58 +176,40 @@ func applyOps(in []byte, store *baseStore) error { case pbssinternal.Operation_DELETE_PREFIX: store.DeletePrefix(op.Ord, op.Key) case pbssinternal.Operation_SET_MAX_BIG_INT: - big := new(big.Int) - big.SetString(string(op.Value), 10) - store.SetMaxBigInt(op.Ord, op.Key, big) + store.SetMaxBigInt(op.Ord, op.Key, valueToBigInt(op.Value)) case pbssinternal.Operation_SET_MAX_INT64: - big := new(big.Int) - big.SetString(string(op.Value), 10) - val := big.Int64() - store.SetMaxInt64(op.Ord, op.Key, val) + store.SetMaxInt64(op.Ord, op.Key, valueToInt64(op.Value)) case pbssinternal.Operation_SET_MAX_FLOAT64: - val := math.Float64frombits(binary.LittleEndian.Uint64(op.Value)) - store.SetMaxFloat64(op.Ord, op.Key, val) + store.SetMaxFloat64(op.Ord, op.Key, valueToFloat64(op.Value)) case pbssinternal.Operation_SET_MAX_BIG_DECIMAL: - val := decimal.Decimal{} - err := val.UnmarshalBinary(op.Value) + val, err := valueToBigDecimal(op.Value) if err != nil { return err } store.SetMaxBigDecimal(op.Ord, op.Key, val) case pbssinternal.Operation_SET_MIN_BIG_INT: - big := new(big.Int) - big.SetString(string(op.Value), 10) - store.SetMinBigInt(op.Ord, op.Key, big) + store.SetMinBigInt(op.Ord, op.Key, valueToBigInt(op.Value)) case pbssinternal.Operation_SET_MIN_INT64: big := new(big.Int) big.SetBytes(op.Value) val := big.Int64() store.SetMinInt64(op.Ord, op.Key, val) case pbssinternal.Operation_SET_MIN_FLOAT64: - val := math.Float64frombits(binary.LittleEndian.Uint64(op.Value)) - store.SetMinFloat64(op.Ord, op.Key, val) + store.SetMinFloat64(op.Ord, op.Key, valueToFloat64(op.Value)) case pbssinternal.Operation_SET_MIN_BIG_DECIMAL: - val := decimal.Decimal{} - err := val.UnmarshalBinary(op.Value) + val, err := valueToBigDecimal(op.Value) if err != nil { return err } store.SetMinBigDecimal(op.Ord, op.Key, val) case pbssinternal.Operation_SUM_BIG_INT: - big := new(big.Int) - big.SetString(string(op.Value), 10) - store.SumBigInt(op.Ord, op.Key, big) + store.SumBigInt(op.Ord, op.Key, valueToBigInt(op.Value)) case pbssinternal.Operation_SUM_INT64: - big := new(big.Int) - big.SetString(string(op.Value), 10) - val := big.Int64() - store.SumInt64(op.Ord, op.Key, val) + store.SumInt64(op.Ord, op.Key, valueToInt64(op.Value)) case pbssinternal.Operation_SUM_FLOAT64: - val := math.Float64frombits(binary.LittleEndian.Uint64(op.Value)) - store.SumFloat64(op.Ord, op.Key, val) + store.SumFloat64(op.Ord, op.Key, valueToFloat64(op.Value)) case pbssinternal.Operation_SUM_BIG_DECIMAL: - val := decimal.Decimal{} - err := val.UnmarshalBinary(op.Value) + val, err := valueToBigDecimal(op.Value) if err != nil { return err } @@ -228,6 +234,30 @@ func cloneBytes(b []byte) []byte { return out } +func bigIntToBytes(i *big.Int) []byte { + return []byte(i.String()) +} + +func float64ToBytes(f float64) []byte { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], math.Float64bits(f)) + return buf[:] +} + +func int64ToBytes(i int64) []byte { + big := new(big.Int) + big.SetInt64(i) + return []byte(big.String()) +} + +func bigDecimalToBytes(d decimal.Decimal) []byte { + val, err := d.MarshalBinary() + if err != nil { + panic(err) + } + return val +} + func (p *PartialKV) Set(ord uint64, key string, value string) { p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SET, @@ -288,48 +318,39 @@ func (p *PartialKV) SetMaxBigInt(ord uint64, key string, value *big.Int) { Type: pbssinternal.Operation_SET_MAX_BIG_INT, Ord: ord, Key: key, - Value: []byte(value.String()), + Value: bigIntToBytes(value), }) p.baseStore.SetMaxBigInt(ord, key, value) } func (p *PartialKV) SetMaxInt64(ord uint64, key string, value int64) { - big := new(big.Int) - big.SetInt64(value) - p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SET_MAX_INT64, Ord: ord, Key: key, - Value: []byte(big.String()), + Value: int64ToBytes(value), }) p.baseStore.SetMaxInt64(ord, key, value) } func (p *PartialKV) SetMaxFloat64(ord uint64, key string, value float64) { - var buf [8]byte - binary.LittleEndian.PutUint64(buf[:], math.Float64bits(value)) p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SET_MAX_FLOAT64, Ord: ord, Key: key, - Value: buf[:], + Value: float64ToBytes(value), }) p.baseStore.SetMaxFloat64(ord, key, value) } func (p *PartialKV) SetMaxBigDecimal(ord uint64, key string, value decimal.Decimal) { - val, err := value.MarshalBinary() - if err != nil { - panic(err) - } p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SET_MAX_BIG_DECIMAL, Ord: ord, Key: key, - Value: val, + Value: bigDecimalToBytes(value), }) p.baseStore.SetMaxBigDecimal(ord, key, value) @@ -340,47 +361,38 @@ func (p *PartialKV) SetMinBigInt(ord uint64, key string, value *big.Int) { Type: pbssinternal.Operation_SET_MIN_BIG_INT, Ord: ord, Key: key, - Value: []byte(value.String()), + Value: bigIntToBytes(value), }) p.baseStore.SetMinBigInt(ord, key, value) } func (p *PartialKV) SetMinInt64(ord uint64, key string, value int64) { - big := new(big.Int) - big.SetInt64(value) p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SET_MIN_INT64, Ord: ord, Key: key, - Value: []byte(big.String()), + Value: int64ToBytes(value), }) - p.baseStore.SetMinInt64(ord, key, value) } func (p *PartialKV) SetMinFloat64(ord uint64, key string, value float64) { - var buf [8]byte - binary.LittleEndian.PutUint64(buf[:], math.Float64bits(value)) p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SET_MIN_FLOAT64, Ord: ord, Key: key, - Value: buf[:], + Value: float64ToBytes(value), }) p.baseStore.SetMinFloat64(ord, key, value) } func (p *PartialKV) SetMinBigDecimal(ord uint64, key string, value decimal.Decimal) { - val, err := value.MarshalBinary() - if err != nil { - panic(err) - } p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SET_MIN_BIG_DECIMAL, Ord: ord, Key: key, - Value: val, + Value: bigDecimalToBytes(value), }) p.baseStore.SetMinBigDecimal(ord, key, value) @@ -391,49 +403,40 @@ func (p *PartialKV) SumBigInt(ord uint64, key string, value *big.Int) { Type: pbssinternal.Operation_SUM_BIG_INT, Ord: ord, Key: key, - Value: []byte(value.String()), + Value: bigIntToBytes(value), }) p.baseStore.SumBigInt(ord, key, value) } func (p *PartialKV) SumInt64(ord uint64, key string, value int64) { - big := new(big.Int) - big.SetInt64(value) - p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SUM_INT64, Ord: ord, Key: key, - Value: []byte(big.String()), + Value: int64ToBytes(value), }) p.baseStore.SumInt64(ord, key, value) } func (p *PartialKV) SumFloat64(ord uint64, key string, value float64) { - var buf [8]byte - binary.LittleEndian.PutUint64(buf[:], math.Float64bits(value)) p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SUM_FLOAT64, Ord: ord, Key: key, - Value: buf[:], + Value: float64ToBytes(value), }) p.baseStore.SumFloat64(ord, key, value) } func (p *PartialKV) SumBigDecimal(ord uint64, key string, value decimal.Decimal) { - val, err := value.MarshalBinary() - if err != nil { - panic(err) - } p.operations.Operations = append(p.operations.Operations, &pbssinternal.Operation{ Type: pbssinternal.Operation_SUM_BIG_DECIMAL, Ord: ord, Key: key, - Value: val, + Value: bigDecimalToBytes(value), }) p.baseStore.SumBigDecimal(ord, key, value) diff --git a/storage/store/partial_kv_test.go b/storage/store/partial_kv_test.go index cb0bf6232..7cd00624a 100644 --- a/storage/store/partial_kv_test.go +++ b/storage/store/partial_kv_test.go @@ -4,8 +4,11 @@ import ( "bytes" "context" "io" + "math/big" "testing" + "github.com/shopspring/decimal" + "github.com/streamingfast/dstore" "github.com/streamingfast/substreams/storage/store/marshaller" "github.com/stretchr/testify/require" @@ -60,3 +63,129 @@ func TestPartialKV_Save_Load_Empty_MapNotNil(t *testing.T) { require.NoError(t, err) require.NotNilf(t, kvl.kv, "kvl.kv is nil") } + +func TestBigIntConversion(t *testing.T) { + cases := []struct { + name string + value *big.Int + }{ + { + name: "sunny path", + value: big.NewInt(123), + }, + { + name: "other big int", + value: big.NewInt(2391793721937), + }, + { + name: "other big int", + value: big.NewInt(-312312391793721937), + }, + { + name: "zero big int", + value: big.NewInt(0), + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + result := bigIntToBytes(c.value) + require.Equal(t, c.value, bytesToBigInt(result)) + }) + } +} + +func TestInt64Conversion(t *testing.T) { + cases := []struct { + name string + value int64 + }{ + { + name: "sunny path", + value: int64(123), + }, + { + name: "int64 ", + value: int64(239179379723), + }, + { + name: "negative int64 ", + value: int64(-2391932131218), + }, + { + name: "zero int64 ", + value: int64(0), + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + result := int64ToBytes(c.value) + require.Equal(t, c.value, valueToInt64(result)) + }) + } +} + +func TestFloat64Conversion(t *testing.T) { + cases := []struct { + name string + value float64 + }{ + { + name: "sunny path", + value: float64(123), + }, + { + name: "float with many decimals", + value: float64(12.328137817391723712983798127398127), + }, + { + name: "negative float", + value: float64(-12.2319739128391823812938192389281938129), + }, + { + name: "zero float", + value: float64(0), + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + result := float64ToBytes(c.value) + require.Equal(t, c.value, valueToFloat64(result)) + }) + } +} + +func TestBigDecimal(t *testing.T) { + cases := []struct { + name string + value decimal.Decimal + }{ + { + name: "sunny path", + value: decimal.New(123, 0), + }, + { + name: "big big decimal", + value: decimal.New(123, 29301), + }, + { + name: "negative big decimal", + value: decimal.New(-123, 29301), + }, + { + name: "zero big decimal", + value: decimal.New(0, 29301), + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + result := bigDecimalToBytes(c.value) + newBigDecimal, err := valueToBigDecimal(result) + require.NoError(t, err) + require.Equal(t, c.value, newBigDecimal) + }) + } +}