diff --git a/api/state.go b/api/state.go index 99fc21d3..7a399a01 100644 --- a/api/state.go +++ b/api/state.go @@ -19,6 +19,7 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "errors" "fmt" ) @@ -66,23 +67,8 @@ type EntryBundle struct { Entries [][]byte } -// MarshalText implements encoding/TextMarshaller and writes out an EntryBundle -// instance as sequences of big-endian uint16 length-prefixed log entries, -// as specified by the tlog-tiles spec. -// TODO(#41): this _may_ need to be changed to support CT -func (t EntryBundle) MarshalText() ([]byte, error) { - r := &bytes.Buffer{} - sizeBs := make([]byte, 2) - for _, n := range t.Entries { - binary.BigEndian.PutUint16(sizeBs, uint16(len(n))) - if _, err := r.Write(sizeBs); err != nil { - return nil, err - } - if _, err := r.Write(n); err != nil { - return nil, err - } - } - return r.Bytes(), nil +func (t *EntryBundle) MarshalText() ([]byte, error) { + return nil, errors.New("unimplemented") } // UnmarshalText implements encoding/TextUnmarshaler and reads EntryBundles diff --git a/api/state_test.go b/api/state_test.go index cd25d954..7c3f5c47 100644 --- a/api/state_test.go +++ b/api/state_test.go @@ -16,11 +16,13 @@ package api_test import ( + "bytes" "crypto/rand" "fmt" "testing" "github.com/google/go-cmp/cmp" + tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" ) @@ -80,27 +82,26 @@ func TestLeafBundle_MarshalTileRoundtrip(t *testing.T) { }, } { t.Run(fmt.Sprintf("tile size %d", test.size), func(t *testing.T) { - tile := api.EntryBundle{Entries: make([][]byte, 0, test.size)} + bundleRaw := &bytes.Buffer{} + want := make([][]byte, test.size) for i := 0; i < test.size; i++ { // Fill in the leaf index - tile.Entries = append(tile.Entries, make([]byte, i*100)) - if _, err := rand.Read(tile.Entries[i]); err != nil { + want[i] = make([]byte, i*100) + if _, err := rand.Read(want[i]); err != nil { t.Error(err) } - } - - raw, err := tile.MarshalText() - if err != nil { - t.Fatalf("MarshalText() = %v", err) + _, _ = bundleRaw.Write(tessera.NewEntry(want[i]).MarshalBundleData(uint64(i))) } tile2 := api.EntryBundle{} - if err := tile2.UnmarshalText(raw); err != nil { + if err := tile2.UnmarshalText(bundleRaw.Bytes()); err != nil { t.Fatalf("UnmarshalText() = %v", err) } - if diff := cmp.Diff(tile, tile2); len(diff) != 0 { - t.Fatalf("Got tile with diff: %s", diff) + for i := 0; i < test.size; i++ { + if got, want := tile2.Entries[i], want[i]; !bytes.Equal(got, want) { + t.Errorf("%d: want %x, got %x", i, got, want) + } } }) } diff --git a/ct_only.go b/ct_only.go new file mode 100644 index 00000000..d6b60ac0 --- /dev/null +++ b/ct_only.go @@ -0,0 +1,58 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tessera + +import ( + "context" + + "github.com/transparency-dev/trillian-tessera/ctonly" +) + +// Storage described the expected functions from Tessera storage implementations. +type Storage interface { + // Add should duably assign an index to the provided Entry, and return it. + // + // Implementations MUST call MarshalBundleData method on the entry before persisting/integrating it. + Add(context.Context, *Entry) (uint64, error) +} + +// NewCertificateTransparencySequencedWriter returns a function which knows how to add a CT-specific entry type to the log. +// +// This entry point MUST ONLY be used for CT logs participating in the CT ecosystem. +// It should not be used as the basis for any other/new transparency application as this protocol: +// a) embodies some techniques which are not considered to be best practice (it does this to retain backawards-compatibility with RFC6962) +// b) is not compatible with the https://c2sp.org/tlog-tiles API which we _very strongly_ encourage you to use instead. +// +// Returns the assigned index in the log, or an error. +func NewCertificateTransparencySequencedWriter(s Storage) func(context.Context, *ctonly.Entry) (uint64, error) { + return func(ctx context.Context, e *ctonly.Entry) (uint64, error) { + return s.Add(ctx, convertCTEntry(e)) + } +} + +// convertCTEntry returns an Entry struct which will do the right thing for CT Static API logs. +// +// This MUST NOT be used for any other purpose. +func convertCTEntry(e *ctonly.Entry) *Entry { + r := &Entry{} + r.internal.Identity = e.Identity() + r.marshalForBundle = func(idx uint64) []byte { + r.internal.LeafHash = e.MerkleLeafHash(idx) + r.internal.Data = e.LeafData(idx) + return r.internal.Data + } + + return r +} diff --git a/ctonly/ct.go b/ctonly/ct.go new file mode 100644 index 00000000..0e3666e0 --- /dev/null +++ b/ctonly/ct.go @@ -0,0 +1,169 @@ +// Copyright 2024 The Tessera authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// +// Original source: https://github.com/FiloSottile/sunlight/blob/main/tile.go +// +// # Copyright 2023 The Sunlight Authors +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +// Package ctonly has support for CT Tiles API. +// +// This code should not be reused outside of CT. +// Most of this code came from Filipo's Sunlight implementation of https://c2sp.org/ct-static-api. +package ctonly + +import ( + "crypto/sha256" + "errors" + + "github.com/transparency-dev/merkle/rfc6962" + "golang.org/x/crypto/cryptobyte" +) + +// Entry represents a CT log entry. +type Entry struct { + Timestamp uint64 + IsPrecert bool + Certificate []byte + Precertificate []byte + PrecertSigningCert []byte + IssuerKeyHash []byte +} + +// LeafData returns the data which should be added to an entry bundle for this entry. +// +// Note that this will include data which IS NOT directly committed to by the entry's +// MerkleLeafHash. +func (c Entry) LeafData(idx uint64) []byte { + b := cryptobyte.NewBuilder([]byte{}) + b.AddUint64(uint64(c.Timestamp)) + if !c.IsPrecert { + b.AddUint16(0 /* entry_type = x509_entry */) + b.AddUint24LengthPrefixed(func(b *cryptobyte.Builder) { + b.AddBytes(c.Certificate) + }) + } else { + b.AddUint16(1 /* entry_type = precert_entry */) + b.AddBytes(c.IssuerKeyHash[:]) + b.AddUint24LengthPrefixed(func(b *cryptobyte.Builder) { + b.AddBytes(c.Certificate) + }) + } + addExtensions(b, idx) + if c.IsPrecert { + b.AddUint24LengthPrefixed(func(b *cryptobyte.Builder) { + b.AddBytes(c.Precertificate) + }) + b.AddUint24LengthPrefixed(func(b *cryptobyte.Builder) { + b.AddBytes(c.PrecertSigningCert) + }) + } + return b.BytesOrPanic() +} + +// MerkleLeafHash returns the RFC6962 leaf hash for this entry. +// +// Note that we embed an SCT extension which captures the index of the entry in the log according to +// the mechanism specified in https://c2sp.org/ct-static-api. +func (c Entry) MerkleLeafHash(leafIndex uint64) []byte { + b := &cryptobyte.Builder{} + b.AddUint8(0 /* version = v1 */) + b.AddUint8(0 /* leaf_type = timestamped_entry */) + b.AddUint64(uint64(c.Timestamp)) + if !c.IsPrecert { + b.AddUint16(0 /* entry_type = x509_entry */) + b.AddUint24LengthPrefixed(func(b *cryptobyte.Builder) { + b.AddBytes(c.Certificate) + }) + } else { + b.AddUint16(1 /* entry_type = precert_entry */) + b.AddBytes(c.IssuerKeyHash[:]) + b.AddUint24LengthPrefixed(func(b *cryptobyte.Builder) { + b.AddBytes(c.Certificate) + }) + } + addExtensions(b, leafIndex) + return rfc6962.DefaultHasher.HashLeaf(b.BytesOrPanic()) +} + +func (c Entry) Identity() []byte { + var r [sha256.Size]byte + if c.IsPrecert { + r = sha256.Sum256(c.Precertificate) + } else { + r = sha256.Sum256(c.Certificate) + } + return r[:] +} + +func addExtensions(b *cryptobyte.Builder, leafIndex uint64) { + b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) { + ext, err := extensions{LeafIndex: leafIndex}.Marshal() + if err != nil { + b.SetError(err) + return + } + b.AddBytes(ext) + }) +} + +// extensions is the CTExtensions field of SignedCertificateTimestamp and +// TimestampedEntry, according to c2sp.org/static-ct-api. +type extensions struct { + LeafIndex uint64 +} + +func (c extensions) Marshal() ([]byte, error) { + // enum { + // leaf_index(0), (255) + // } ExtensionType; + // + // struct { + // ExtensionType extension_type; + // opaque extension_data<0..2^16-1>; + // } Extension; + // + // Extension CTExtensions<0..2^16-1>; + // + // uint8 uint40[5]; + // uint40 LeafIndex; + + b := &cryptobyte.Builder{} + b.AddUint8(0 /* extension_type = leaf_index */) + b.AddUint16LengthPrefixed(func(b *cryptobyte.Builder) { + if c.LeafIndex >= 1<<40 { + b.SetError(errors.New("leaf_index out of range")) + return + } + addUint40(b, uint64(c.LeafIndex)) + }) + return b.Bytes() +} + +// addUint40 appends a big-endian, 40-bit value to the byte string. +func addUint40(b *cryptobyte.Builder, v uint64) { + b.AddBytes([]byte{byte(v >> 32), byte(v >> 24), byte(v >> 16), byte(v >> 8), byte(v)}) +} diff --git a/entry.go b/entry.go index 1f7706ad..6709c219 100644 --- a/entry.go +++ b/entry.go @@ -16,9 +16,8 @@ package tessera import ( - "bytes" "crypto/sha256" - "encoding/gob" + "encoding/binary" "github.com/transparency-dev/merkle/rfc6962" ) @@ -33,7 +32,11 @@ type Entry struct { Data []byte Identity []byte LeafHash []byte + Index *uint64 } + + // marshalForBundle knows how to convert this entry's Data into a marshalled bundle entry. + marshalForBundle func(index uint64) []byte } // Data returns the raw entry bytes which will form the entry in the log. @@ -46,12 +49,25 @@ func (e Entry) Identity() []byte { return e.internal.Identity } // Note that in almost all cases, this should be the RFC6962 definition of a leaf hash. func (e Entry) LeafHash() []byte { return e.internal.LeafHash } +// Index returns the index assigned to the entry in the log, or nil if no index has been assigned. +func (e Entry) Index() *uint64 { return e.internal.Index } + +// MarshalBundleData returns this entry's data in a format ready to be appended to an EntryBundle. +// +// Note that MarshalBundleData _may_ be called multiple times, potentially with different values for index +// (e.g. if there's a failure in the storage when trying to persist the assignment), so index should not +// be considered final until the storage Add method has returned successfully with the durably assigned index. +func (e *Entry) MarshalBundleData(index uint64) []byte { + e.internal.Index = &index + return e.marshalForBundle(index) +} + // NewEntry creates a new Entry object with leaf data. -func NewEntry(data []byte, opts ...EntryOpt) Entry { - e := Entry{} +func NewEntry(data []byte, opts ...EntryOpt) *Entry { + e := &Entry{} e.internal.Data = data for _, opt := range opts { - opt(&e) + opt(e) } if e.internal.Identity == nil { h := sha256.Sum256(e.internal.Data) @@ -59,23 +75,18 @@ func NewEntry(data []byte, opts ...EntryOpt) Entry { } if e.internal.LeafHash == nil { e.internal.LeafHash = rfc6962.DefaultHasher.HashLeaf(e.internal.Data) - } - return e -} - -func (e *Entry) MarshalBinary() ([]byte, error) { - b := &bytes.Buffer{} - enc := gob.NewEncoder(b) - if err := enc.Encode(e.internal); err != nil { - return nil, err + if e.marshalForBundle == nil { + // By default we will marshal ourselves into a bundle using the mechanism described + // by https://c2sp.org/tlog-tiles: + e.marshalForBundle = func(_ uint64) []byte { + r := make([]byte, 0, 2+len(e.internal.Data)) + r = binary.BigEndian.AppendUint16(r, uint16(len(e.internal.Data))) + r = append(r, e.internal.Data...) + return r + } } - return b.Bytes(), nil -} - -func (e *Entry) UnmarshalBinary(buf []byte) error { - dec := gob.NewDecoder(bytes.NewReader(buf)) - return dec.Decode(&e.internal) + return e } // EntryOpt is the signature of options for creating new Entry instances. diff --git a/entry_test.go b/entry_test.go index 45d8a282..6880893c 100644 --- a/entry_test.go +++ b/entry_test.go @@ -15,25 +15,24 @@ package tessera import ( - "reflect" + "bytes" + "fmt" "testing" ) -func TestEntryMarshalRoundTrip(t *testing.T) { - e := NewEntry([]byte("this is data"), WithIdentity([]byte("I am who I am"))) - e.internal.LeafHash = []byte("lettuce") +func TestEntryMarshalBundleDelegates(t *testing.T) { + const wantIdx = uint64(143) + wantBundle := []byte(fmt.Sprintf("Yes %d", wantIdx)) - raw, err := e.MarshalBinary() - if err != nil { - t.Fatalf("MarshalBinary: %v", err) + e := NewEntry([]byte("this is data")) + e.marshalForBundle = func(gotIdx uint64) []byte { + if gotIdx != wantIdx { + t.Fatalf("Got idx %d, want %d", gotIdx, wantIdx) + } + return wantBundle } - e2 := Entry{} - if err := (&e2).UnmarshalBinary(raw); err != nil { - t.Fatalf("UnmarshalBinary: %v", err) - } - - if !reflect.DeepEqual(e, e2) { - t.Fatalf("got %+v, want %+v", e2, e) + if got, want := e.MarshalBundleData(wantIdx), wantBundle; !bytes.Equal(got, want) { + t.Fatalf("Got %q, want %q", got, want) } } diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 5ac6735f..522132cb 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -42,6 +42,7 @@ import ( "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" gcs "cloud.google.com/go/storage" + "github.com/google/go-cmp/cmp" tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" @@ -78,9 +79,8 @@ type objStore interface { // sequencer describes a type which knows how to sequence entries. type sequencer interface { - // assignEntries should durably allocate contiguous index numbers to the provided entries, - // and return the lowest assigned index. - assignEntries(ctx context.Context, entries []tessera.Entry) (uint64, error) + // assignEntries should durably allocate contiguous index numbers to the provided entries. + assignEntries(ctx context.Context, entries []*tessera.Entry) error // consumeEntries should call the provided function with up to limit previously sequenced entries. // If the call to consumeFunc returns no error, the entries should be considered to have been consumed. // If any entries were successfully consumed, the implementation should also return true; this @@ -89,7 +89,7 @@ type sequencer interface { } // consumeFunc is the signature of a function which can consume entries from the sequencer. -type consumeFunc func(ctx context.Context, from uint64, entries []tessera.Entry) error +type consumeFunc func(ctx context.Context, from uint64, entries []storage.SequencedEntry) error // Config holds GCP project and resource configuration for a storage instance. type Config struct { @@ -155,7 +155,7 @@ func New(ctx context.Context, cfg Config, opts ...func(*tessera.StorageOptions)) } // Add is the entrypoint for adding entries to a sequencing log. -func (s *Storage) Add(ctx context.Context, e tessera.Entry) (uint64, error) { +func (s *Storage) Add(ctx context.Context, e *tessera.Entry) (uint64, error) { return s.queue.Add(ctx, e)() } @@ -216,10 +216,10 @@ func (s *Storage) getTiles(ctx context.Context, tileIDs []storage.TileID, logSiz } -// getEntryBundle returns the entry bundle at the location implied by the given index and treeSize. +// getEntryBundle returns the serialised entry bundle at the location implied by the given index and treeSize. // // Returns a wrapped os.ErrNotExist if the bundle does not exist. -func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) (*api.EntryBundle, error) { +func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64) ([]byte, error) { objName := layout.EntriesPath(bundleIndex, logSize) data, _, err := s.objStore.getObject(ctx, objName) if err != nil { @@ -231,24 +231,16 @@ func (s *Storage) getEntryBundle(ctx context.Context, bundleIndex uint64, logSiz return nil, err } - r := &api.EntryBundle{} - if err := r.UnmarshalText(data); err != nil { - return nil, fmt.Errorf("failed to unmarshal %q: %v", objName, err) - } - return r, nil + return data, nil } -// setEntryBundle idempotently stores the entry bundle at the location implied by the bundleIndex and treeSize. -func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundle *api.EntryBundle) error { +// setEntryBundle idempotently stores the serialised entry bundle at the location implied by the bundleIndex and treeSize. +func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSize uint64, bundleRaw []byte) error { objName := layout.EntriesPath(bundleIndex, logSize) - data, err := bundle.MarshalText() - if err != nil { - return err - } // Note that setObject does an idempotent interpretation of DoesNotExist - it only // returns an error if the named object exists _and_ contains different data to what's // passed in here. - if err := s.objStore.setObject(ctx, objName, data, &gcs.Conditions{DoesNotExist: true}); err != nil { + if err := s.objStore.setObject(ctx, objName, bundleRaw, &gcs.Conditions{DoesNotExist: true}); err != nil { return fmt.Errorf("setObject(%q): %v", objName, err) } @@ -256,7 +248,7 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSiz } // integrate incorporates the provided entries into the log starting at fromSeq. -func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []tessera.Entry) error { +func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) error { tb := storage.NewTreeBuilder(func(ctx context.Context, tileIDs []storage.TileID, treeSize uint64) ([]*api.HashTile, error) { n, err := s.getTiles(ctx, tileIDs, treeSize) if err != nil { @@ -317,26 +309,29 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries []tesse // updateEntryBundles adds the entries being integrated into the entry bundles. // // The right-most bundle will be grown, if it's partial, and/or new bundles will be created as required. -func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entries []tessera.Entry) error { +func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entries []storage.SequencedEntry) error { numAdded := uint64(0) bundleIndex, entriesInBundle := fromSeq/entryBundleSize, fromSeq%entryBundleSize - bundle := &api.EntryBundle{} + bundleWriter := &bytes.Buffer{} if entriesInBundle > 0 { // If the latest bundle is partial, we need to read the data it contains in for our newer, larger, bundle. part, err := s.getEntryBundle(ctx, uint64(bundleIndex), uint64(entriesInBundle)) if err != nil { return err } - bundle = part + + if _, err := bundleWriter.Write(part); err != nil { + return fmt.Errorf("bundleWriter: %v", err) + } } seqErr := errgroup.Group{} // goSetEntryBundle is a function which uses seqErr to spin off a go-routine to write out an entry bundle. // It's used in the for loop below. - goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, fromSeq uint64, bundle api.EntryBundle) { + goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, fromSeq uint64, bundleRaw []byte) { seqErr.Go(func() error { - if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, &bundle); err != nil { + if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundleRaw); err != nil { return err } return nil @@ -345,18 +340,21 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // Add new entries to the bundle for _, e := range entries { - bundle.Entries = append(bundle.Entries, e.Data()) + if _, err := bundleWriter.Write(e.BundleData); err != nil { + return fmt.Errorf("Write: %v", err) + } entriesInBundle++ fromSeq++ numAdded++ if entriesInBundle == entryBundleSize { // This bundle is full, so we need to write it out... klog.V(1).Infof("Bundle idx %d is full", bundleIndex) - goSetEntryBundle(ctx, bundleIndex, fromSeq, *bundle) + goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) // ... and prepare the next entry bundle for any remaining entries in the batch bundleIndex++ entriesInBundle = 0 - bundle = &api.EntryBundle{} + // Don't use Reset/Truncate here - the backing []bytes is still being used by goSetEntryBundle above. + bundleWriter = &bytes.Buffer{} klog.V(1).Infof("Starting bundle idx %d", bundleIndex) } } @@ -364,7 +362,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie // this needs writing out too. if entriesInBundle > 0 { klog.V(1).Infof("Writing partial bundle idx %d.%d", bundleIndex, entriesInBundle) - goSetEntryBundle(ctx, bundleIndex, fromSeq, *bundle) + goSetEntryBundle(ctx, bundleIndex, fromSeq, bundleWriter.Bytes()) } return seqErr.Wait() } @@ -444,16 +442,7 @@ func (s *spannerSequencer) initDB(ctx context.Context) error { // Entries are allocated contiguous indices, in the order in which they appear in the entries parameter. // This is achieved by storing the passed-in entries in the Seq table in Spanner, keyed by the // index assigned to the first entry in the batch. -func (s *spannerSequencer) assignEntries(ctx context.Context, entries []tessera.Entry) (uint64, error) { - // Flatted the entries into a single slice of bytes which we can store in the Seq.v column. - b := &bytes.Buffer{} - e := gob.NewEncoder(b) - if err := e.Encode(entries); err != nil { - return 0, fmt.Errorf("failed to serialise batch: %v", err) - } - data := b.Bytes() - num := len(entries) - +func (s *spannerSequencer) assignEntries(ctx context.Context, entries []*tessera.Entry) error { var next int64 // Unfortunately, Spanner doesn't support uint64 so we'll have to cast around a bit. _, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { @@ -466,8 +455,27 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries []tessera. if err := row.Columns(&id, &next); err != nil { return fmt.Errorf("failed to parse id column: %v", err) } - next := uint64(next) // Shadow next with a uint64 version of the same value to save on casts. + + sequencedEntries := make([]storage.SequencedEntry, len(entries)) + // Assign provisional sequence numbers to entries. + // We need to do this here in order to support serialisations which include the log position. + for i, e := range entries { + sequencedEntries[i] = storage.SequencedEntry{ + BundleData: e.MarshalBundleData(next + uint64(i)), + LeafHash: e.LeafHash(), + } + } + + // Flatten the entries into a single slice of bytes which we can store in the Seq.v column. + b := &bytes.Buffer{} + e := gob.NewEncoder(b) + if err := e.Encode(sequencedEntries); err != nil { + return fmt.Errorf("failed to serialise batch: %v", err) + } + data := b.Bytes() + num := len(entries) + // TODO(al): think about whether aligning bundles to tile boundaries would be a good idea or not. m := []*spanner.Mutation{ // Insert our newly sequenced batch of entries into Seq, @@ -483,10 +491,10 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries []tessera. }) if err != nil { - return 0, fmt.Errorf("failed to flush batch: %v", err) + return fmt.Errorf("failed to flush batch: %v", err) } - return uint64(next), nil + return nil } // consumeEntries calls f with previously sequenced entries. @@ -517,7 +525,7 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c defer rows.Stop() seqsConsumed := []int64{} - entries := make([]tessera.Entry, 0, limit) + entries := make([]storage.SequencedEntry, 0, limit) orderCheck := fromSeq for { row, err := rows.Next() @@ -536,7 +544,7 @@ func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f c } g := gob.NewDecoder(bytes.NewReader(vGob)) - b := []tessera.Entry{} + b := []storage.SequencedEntry{} if err := g.Decode(&b); err != nil { return fmt.Errorf("failed to deserialise v: %v", err) } @@ -653,6 +661,7 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, return fmt.Errorf("failed to fetch existing content for %q (@%d): %v", objName, existingGen, err) } if !bytes.Equal(existing, data) { + klog.Errorf("Resource %q non-idempotent write:\n%s", objName, cmp.Diff(existing, data)) return fmt.Errorf("precondition failed: resource content for %q differs from data to-be-written", objName) } diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 81082c7c..fa56e230 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -70,18 +70,19 @@ func TestSpannerSequencerAssignEntries(t *testing.T) { want := uint64(0) for chunks := 0; chunks < 10; chunks++ { - entries := []tessera.Entry{} + entries := []*tessera.Entry{} for i := 0; i < 10+chunks; i++ { entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("item %d/%d", chunks, i)))) } - got, err := seq.assignEntries(ctx, entries) - if err != nil { + if err := seq.assignEntries(ctx, entries); err != nil { t.Fatalf("assignEntries: %v", err) } - if got != want { - t.Errorf("Chunk %d got seq %d, want %d", chunks, got, want) + for i, e := range entries { + if got := *e.Index(); got != want { + t.Errorf("Chunk %d entry %d got seq %d, want %d", chunks, i, got, want) + } + want++ } - want += uint64(len(entries)) } } @@ -96,24 +97,31 @@ func TestSpannerSequencerRoundTrip(t *testing.T) { } seq := 0 + wantEntries := []storage.SequencedEntry{} for chunks := 0; chunks < 10; chunks++ { - entries := []tessera.Entry{} + entries := []*tessera.Entry{} for i := 0; i < 10+chunks; i++ { - entries = append(entries, tessera.NewEntry([]byte(fmt.Sprintf("item %d", seq)))) + e := tessera.NewEntry([]byte(fmt.Sprintf("item %d", seq))) + entries = append(entries, e) + wantEntries = append(wantEntries, storage.SequencedEntry{ + BundleData: e.MarshalBundleData(uint64(seq)), + LeafHash: e.LeafHash(), + }) seq++ } - if _, err := s.assignEntries(ctx, entries); err != nil { + if err := s.assignEntries(ctx, entries); err != nil { t.Fatalf("assignEntries: %v", err) } } seenIdx := uint64(0) - f := func(_ context.Context, fromSeq uint64, entries []tessera.Entry) error { + f := func(_ context.Context, fromSeq uint64, entries []storage.SequencedEntry) error { if fromSeq != seenIdx { return fmt.Errorf("f called with fromSeq %d, want %d", fromSeq, seenIdx) } for i, e := range entries { - if !reflect.DeepEqual(e, tessera.NewEntry([]byte(fmt.Sprintf("item %d", i)))) { + + if got, want := e, wantEntries[i]; !reflect.DeepEqual(got, want) { return fmt.Errorf("entry %d+%d != %d", fromSeq, i, seenIdx) } seenIdx++ @@ -185,13 +193,16 @@ func TestTileRoundtrip(t *testing.T) { } } -func makeBundle(t *testing.T, size uint64) *api.EntryBundle { +func makeBundle(t *testing.T, size uint64) []byte { t.Helper() - r := &api.EntryBundle{Entries: make([][]byte, size)} + r := &bytes.Buffer{} for i := uint64(0); i < size; i++ { - r.Entries[i] = []byte(fmt.Sprintf("%d", i)) + e := tessera.NewEntry([]byte(fmt.Sprintf("%d", i))) + if _, err := r.Write(e.MarshalBundleData(i)); err != nil { + t.Fatalf("MarshalBundleEntry: %v", err) + } } - return r + return r.Bytes() } func TestBundleRoundtrip(t *testing.T) { diff --git a/storage/integrate.go b/storage/integrate.go index 861e2bc7..b7efee9f 100644 --- a/storage/integrate.go +++ b/storage/integrate.go @@ -23,7 +23,6 @@ import ( "github.com/transparency-dev/merkle/compact" "github.com/transparency-dev/merkle/rfc6962" - tessera "github.com/transparency-dev/trillian-tessera" "github.com/transparency-dev/trillian-tessera/api" "github.com/transparency-dev/trillian-tessera/api/layout" "golang.org/x/exp/maps" @@ -87,7 +86,15 @@ func (t *TreeBuilder) newRange(ctx context.Context, treeSize uint64) (*compact.R return t.rf.NewRange(0, treeSize, hashes) } -func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries []tessera.Entry) (newSize uint64, rootHash []byte, tiles map[TileID]*api.HashTile, err error) { +// SequencedEntry represents a log entry which has already been sequenced. +type SequencedEntry struct { + // BundleData is the entry's data serialised into the correct format for appending to an entry bundle. + BundleData []byte + // LeafHash is the entry's Merkle leaf hash. + LeafHash []byte +} + +func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries []SequencedEntry) (newSize uint64, rootHash []byte, tiles map[TileID]*api.HashTile, err error) { baseRange, err := t.newRange(ctx, fromSize) if err != nil { return 0, nil, nil, fmt.Errorf("failed to create range covering existing log: %w", err) @@ -111,7 +118,7 @@ func (t *TreeBuilder) Integrate(ctx context.Context, fromSize uint64, entries [] visitor := tc.Visitor(ctx) for _, e := range entries { // Update range and set nodes - if err := newRange.Append(e.LeafHash(), visitor); err != nil { + if err := newRange.Append(e.LeafHash, visitor); err != nil { return 0, nil, nil, fmt.Errorf("newRange.Append(): %v", err) } diff --git a/storage/integrate_test.go b/storage/integrate_test.go index 8c628591..6304626d 100644 --- a/storage/integrate_test.go +++ b/storage/integrate_test.go @@ -131,10 +131,14 @@ func TestIntegrate(t *testing.T) { seq := uint64(0) for chunk := 0; chunk < numChunks; chunk++ { oldSeq := seq - c := make([]tessera.Entry, chunkSize) + c := make([]SequencedEntry, chunkSize) for i := range c { leaf := []byte{byte(seq)} - c[i] = tessera.NewEntry(leaf) + entry := tessera.NewEntry(leaf) + c[i] = SequencedEntry{ + BundleData: entry.MarshalBundleData(seq), + LeafHash: entry.LeafHash(), + } if err := cr.Append(rfc6962.DefaultHasher.HashLeaf(leaf), nil); err != nil { t.Fatalf("compact Append: %v", err) } diff --git a/storage/queue.go b/storage/queue.go index 1e90c0bb..905fc4a9 100644 --- a/storage/queue.go +++ b/storage/queue.go @@ -17,6 +17,7 @@ package storage import ( "context" + "errors" "sync" "time" @@ -40,15 +41,18 @@ type Queue struct { flush FlushFunc inFlightMu sync.Mutex - inFlight map[string]*entry + inFlight map[string]*queueItem } -// IndexFunc is a function which returns an assigned log index, or an error. -type IndexFunc func() (idx uint64, err error) +// Future is a function which returns an assigned log index, or an error. +type Future func() (idx uint64, err error) // FlushFunc is the signature of a function which will receive the slice of queued entries. -// It should return the index assigned to the first entry in the provided slice. -type FlushFunc func(ctx context.Context, entries []tessera.Entry) (index uint64, err error) +// Normally, this function would be provided by storage implementations. It's important to note +// that the implementation MUST call each entry's MarshalBundleData function before attempting +// to integrate it into the tree. +// See the comment on Entry.MarshalBundleData for further info. +type FlushFunc func(ctx context.Context, entries []*tessera.Entry) error // NewQueue creates a new queue with the specified maximum age and size. // @@ -58,7 +62,7 @@ type FlushFunc func(ctx context.Context, entries []tessera.Entry) (index uint64, func NewQueue(ctx context.Context, maxAge time.Duration, maxSize uint, f FlushFunc) *Queue { q := &Queue{ flush: f, - inFlight: make(map[string]*entry, maxSize), + inFlight: make(map[string]*queueItem, maxSize), } // The underlying queue implementation blocks additions during a flush. @@ -66,11 +70,11 @@ func NewQueue(ctx context.Context, maxAge time.Duration, maxSize uint, f FlushFu // decouple the queue flush and storage write by handling the latter in // a worker goroutine. // This same worker thread will also handle the callbacks to f. - work := make(chan []*entry, 1) + work := make(chan []*queueItem, 1) toWork := func(items []interface{}) { - entries := make([]*entry, len(items)) + entries := make([]*queueItem, len(items)) for i, t := range items { - entries[i] = t.(*entry) + entries[i] = t.(*queueItem) } work <- entries @@ -98,7 +102,7 @@ func NewQueue(ctx context.Context, maxAge time.Duration, maxSize uint, f FlushFu // squashDupes keeps track of all in-flight requests, enabling dupe squashing for entries currently in the queue. // Returns an entry struct, and a bool which is true if the provided entry is a dupe and should NOT be added to the queue. -func (q *Queue) squashDupes(e tessera.Entry) (*entry, bool) { +func (q *Queue) squashDupes(e *tessera.Entry) (*queueItem, bool) { q.inFlightMu.Lock() defer q.inFlightMu.Unlock() @@ -112,55 +116,58 @@ func (q *Queue) squashDupes(e tessera.Entry) (*entry, bool) { } // Add places e into the queue, and returns a func which may be called to retrieve the assigned index. -func (q *Queue) Add(ctx context.Context, e tessera.Entry) IndexFunc { +func (q *Queue) Add(ctx context.Context, e *tessera.Entry) Future { entry, isDupe := q.squashDupes(e) if isDupe { // This entry is already in the queue, so no need to add it again. - return entry.index + return entry.f } if err := q.buf.Push(entry); err != nil { - entry.assign(0, err) + entry.notify(err) } - return entry.index + return entry.f } // doFlush handles the queue flush, and sending notifications of assigned log indices. -func (q *Queue) doFlush(ctx context.Context, entries []*entry) { - entriesData := make([]tessera.Entry, 0, len(entries)) +func (q *Queue) doFlush(ctx context.Context, entries []*queueItem) { + entriesData := make([]*tessera.Entry, 0, len(entries)) for _, e := range entries { - entriesData = append(entriesData, e.data) + entriesData = append(entriesData, e.entry) } - s, err := q.flush(ctx, entriesData) + err := q.flush(ctx, entriesData) // Send assigned indices to all the waiting Add() requests, including dupes. q.inFlightMu.Lock() defer q.inFlightMu.Unlock() - for i, e := range entries { - e.assign(s+uint64(i), err) - k := string(e.data.Identity()) + for _, e := range entries { + if e.entry.Index() == nil { + panic(errors.New("Logic error: flush complete, but entry was not assigned an index - did storage fail to call entry.MarshalBundleData?")) + } + e.notify(err) + k := string(e.entry.Identity()) delete(q.inFlight, k) } } -// entry represents an in-flight entry in the queue. +// queueItem represents an in-flight queueItem in the queue. // -// The index field acts as a Future for the entry's assigned index/error, and will +// The f field acts as a future for the queueItem's assigned index/error, and will // hang until assign is called. -type entry struct { - data tessera.Entry - c chan IndexFunc - index IndexFunc +type queueItem struct { + entry *tessera.Entry + c chan Future + f Future } // newEntry creates a new entry for the provided data. -func newEntry(data tessera.Entry) *entry { - e := &entry{ - data: data, - c: make(chan IndexFunc, 1), +func newEntry(data *tessera.Entry) *queueItem { + e := &queueItem{ + entry: data, + c: make(chan Future, 1), } - e.index = sync.OnceValues(func() (uint64, error) { + e.f = sync.OnceValues(func() (uint64, error) { return (<-e.c)() }) return e @@ -170,9 +177,12 @@ func newEntry(data tessera.Entry) *entry { // // This func must only be called once, and will cause any current or future callers of index() // to be given the values provided here. -func (e *entry) assign(idx uint64, err error) { +func (e *queueItem) notify(err error) { e.c <- func() (uint64, error) { - return idx, err + if err != nil { + return 0, err + } + return *e.entry.Index(), nil } close(e.c) } diff --git a/storage/queue_test.go b/storage/queue_test.go index 217c13f2..b58ca598 100644 --- a/storage/queue_test.go +++ b/storage/queue_test.go @@ -19,7 +19,6 @@ import ( "fmt" "reflect" "sync" - "sync/atomic" "testing" "time" @@ -54,31 +53,32 @@ func TestQueue(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() assignMu := sync.Mutex{} - assignedItems := make([]tessera.Entry, test.numItems) + assignedItems := make([]*tessera.Entry, test.numItems) assignedIndex := uint64(0) // flushFunc mimics sequencing storage - it takes entries, assigns them to - // positions in assignedItems and returns the first assigned positition for each batch. - flushFunc := func(_ context.Context, entries []tessera.Entry) (uint64, error) { + // positions in assignedItems. + flushFunc := func(_ context.Context, entries []*tessera.Entry) error { assignMu.Lock() defer assignMu.Unlock() - s := assignedIndex for _, e := range entries { + _ = e.MarshalBundleData(assignedIndex) assignedItems[assignedIndex] = e assignedIndex++ } - return s, nil + return nil } // Create the Queue q := storage.NewQueue(ctx, test.maxWait, uint(test.maxEntries), flushFunc) // Now submit a bunch of entries - adds := make([]storage.IndexFunc, test.numItems) - wantEntries := make([]tessera.Entry, test.numItems) + adds := make([]storage.Future, test.numItems) + wantEntries := make([]*tessera.Entry, test.numItems) for i := uint64(0); i < test.numItems; i++ { - wantEntries[i] = tessera.NewEntry([]byte(fmt.Sprintf("item %d", i))) + d := []byte(fmt.Sprintf("item %d", i)) + wantEntries[i] = tessera.NewEntry(d, tessera.WithIdentity(d)) adds[i] = q.Add(ctx, wantEntries[i]) } @@ -88,8 +88,8 @@ func TestQueue(t *testing.T) { t.Errorf("Add: %v", err) return } - if got, want := assignedItems[N], wantEntries[i]; !reflect.DeepEqual(got, want) { - t.Errorf("Got item@%d %q, want %q", N, got, want) + if got, want := assignedItems[N].Data(), wantEntries[i].Data(); !reflect.DeepEqual(got, want) { + t.Errorf("Got item@%d %v, want %v", N, got, want) } } }) @@ -98,16 +98,18 @@ func TestQueue(t *testing.T) { func TestDedup(t *testing.T) { ctx := context.Background() - idx := atomic.Uint64{} + idx := uint64(0) - q := storage.NewQueue(ctx, time.Second, 10 /*maxSize*/, func(ctx context.Context, entries []tessera.Entry) (index uint64, err error) { - r := idx.Load() - idx.Add(1) - return r, nil + q := storage.NewQueue(ctx, time.Second, 10 /*maxSize*/, func(ctx context.Context, entries []*tessera.Entry) error { + for _, e := range entries { + _ = e.MarshalBundleData(idx) + idx++ + } + return nil }) numEntries := 10 - adds := []storage.IndexFunc{} + adds := []storage.Future{} for i := 0; i < numEntries; i++ { adds = append(adds, q.Add(ctx, tessera.NewEntry([]byte("Have I seen this before?")))) }