Skip to content

Commit

Permalink
Rename and push down the Exists logic
Browse files Browse the repository at this point in the history
  • Loading branch information
phbnf committed Aug 21, 2024
1 parent 4f849ef commit 6b3ebe3
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 48 deletions.
2 changes: 1 addition & 1 deletion personalities/sctfe/ct_server_gcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func newGCPStorage(ctx context.Context, vCfg *sctfe.ValidatedLogConfig, signer n
return nil, fmt.Errorf("Failed to initialize GCP Tessera storage: %v", err)
}

issuerStorage, err := gcpMap.NewGCSStorage(ctx, cfg.ProjectId, cfg.Bucket, "fingerprints/", "application/pkix-cert")
issuerStorage, err := gcpMap.NewIssuerStorage(ctx, cfg.ProjectId, cfg.Bucket, "fingerprints/", "application/pkix-cert")
if err != nil {
return nil, fmt.Errorf("Failed to initialize GCP issuer storage: %v", err)
}
Expand Down
16 changes: 3 additions & 13 deletions personalities/sctfe/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type KV struct {

type IssuerStorage interface {
Exists(ctx context.Context, key []byte) (bool, error)
AddMultiple(ctx context.Context, kv []KV) error
AddIssuers(ctx context.Context, kv []KV) error
}

// CTStorage implements Storage.
Expand Down Expand Up @@ -71,20 +71,10 @@ func (cts *CTStorage) AddIssuerChain(ctx context.Context, chain []*x509.Certific
for _, c := range chain {
id := sha256.Sum256(c.Raw)
key := []byte(hex.EncodeToString(id[:]))
// We first try and see if this issuer cert has already been stored since reads
// are cheaper than writes.
// TODO(phboneff): monitor usage, eventually write directly depending on usage patterns
ok, err := cts.issuers.Exists(ctx, key)
if err != nil {
return fmt.Errorf("error checking if issuer %q exists: %s", string(key), err)
}
if !ok {
kvs = append(kvs, KV{K: key, V: c.Raw})
}
kvs = append(kvs, KV{K: key, V: c.Raw})
}
if err := cts.issuers.AddMultiple(ctx, kvs); err != nil {
if err := cts.issuers.AddIssuers(ctx, kvs); err != nil {
return fmt.Errorf("error storing intermediates: %v", err)

}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package gcp implements SCTFE storage systems for issuers and deduplication.
// Package gcp implements SCTFE storage systems for issuers.
//
// The interfaces are defined in sctfe/storage.go
package gcp
Expand All @@ -30,17 +30,17 @@ import (
"k8s.io/klog/v2"
)

// GCSStorage is a key value store backed by GCS on GCP.
type GCSStorage struct {
// IssuersStorage is a key value store backed by GCS on GCP to store issuer chains.
type IssuersStorage struct {
bucket *gcs.BucketHandle
prefix string
contentType string
}

// NewGCSStorage creates a new GCSStorage.
// NewIssuerStorage creates a new GCSStorage.
//
// The specified bucket must exist or an error will be returned.
func NewGCSStorage(ctx context.Context, projectID string, bucket string, prefix string, contentType string) (*GCSStorage, error) {
func NewIssuerStorage(ctx context.Context, projectID string, bucket string, prefix string, contentType string) (*IssuersStorage, error) {
c, err := gcs.NewClient(ctx, gcs.WithJSONReads())
if err != nil {
return nil, fmt.Errorf("failed to create GCS client: %v", err)
Expand All @@ -59,7 +59,7 @@ func NewGCSStorage(ctx context.Context, projectID string, bucket string, prefix
break
}
}
r := &GCSStorage{
r := &IssuersStorage{
bucket: c.Bucket(bucket),
prefix: prefix,
contentType: contentType,
Expand All @@ -69,12 +69,12 @@ func NewGCSStorage(ctx context.Context, projectID string, bucket string, prefix
}

// keyToObjName converts bytes to a GCS object name.
func (s *GCSStorage) keyToObjName(key []byte) string {
func (s *IssuersStorage) keyToObjName(key []byte) string {
return path.Join(s.prefix, string(key))
}

// Exists checks whether an object is stored under key.
func (s *GCSStorage) Exists(ctx context.Context, key []byte) (bool, error) {
func (s *IssuersStorage) Exists(ctx context.Context, key []byte) (bool, error) {
objName := s.keyToObjName(key)
obj := s.bucket.Object(objName)
_, err := obj.Attrs(ctx)
Expand All @@ -88,40 +88,45 @@ func (s *GCSStorage) Exists(ctx context.Context, key []byte) (bool, error) {
return true, nil
}

// Add stores the provided data under key.
// AddIssuers stores all Issuers values under Key
//
// If there is already an object under that key, it does not override it, and returns.
// TODO(phboneff): consider reading the object to make sure it's identical
func (s *GCSStorage) Add(ctx context.Context, key []byte, data []byte) error {
objName := s.keyToObjName(key)
obj := s.bucket.Object(objName)

// Don't overwrite if it already exists
w := obj.If(gcs.Conditions{DoesNotExist: true}).NewWriter(ctx)
w.ObjectAttrs.ContentType = s.contentType

if _, err := w.Write(data); err != nil {
return fmt.Errorf("failed to write object %q to bucket %q: %w", objName, s.bucket.BucketName(), err)
func (s *IssuersStorage) AddIssuers(ctx context.Context, kv []sctfe.KV) error {
// We first try and see if this issuer cert has already been stored since reads
// are cheaper than writes.
// TODO(phboneff): monitor usage, eventually write directly depending on usage patterns
toStore := []sctfe.KV{}
for _, kv := range kv {
ok, err := s.Exists(ctx, kv.K)
if err != nil {
return fmt.Errorf("error checking if issuer %q exists: %s", string(kv.K), err)
}
if !ok {
toStore = append(toStore, kv)
}
}
// TODO(phboneff): add parallel writes
for _, kv := range toStore {
objName := s.keyToObjName(kv.K)
obj := s.bucket.Object(objName)

// Don't overwrite if it already exists
w := obj.If(gcs.Conditions{DoesNotExist: true}).NewWriter(ctx)
w.ObjectAttrs.ContentType = s.contentType

if err := w.Close(); err != nil {
// If we run into a precondition failure error, it means that the object already exists.
if ee, ok := err.(*googleapi.Error); ok && ee.Code == http.StatusPreconditionFailed {
klog.V(2).Infof("Add: object %q already exists in bucket %q, continuing", objName, s.bucket.BucketName())
return nil
if _, err := w.Write(kv.V); err != nil {
return fmt.Errorf("failed to write object %q to bucket %q: %w", objName, s.bucket.BucketName(), err)
}

return fmt.Errorf("failed to close write on %q: %v", objName, err)
}
return nil
}
if err := w.Close(); err != nil {
// If we run into a precondition failure error, it means that the object already exists.
if ee, ok := err.(*googleapi.Error); ok && ee.Code == http.StatusPreconditionFailed {
klog.V(2).Infof("Add: object %q already exists in bucket %q, continuing", objName, s.bucket.BucketName())
return nil
}

func (s *GCSStorage) AddMultiple(ctx context.Context, kv []sctfe.KV) error {
// TODO(phboneff): add parallel writes
for _, kv := range kv {
err := s.Add(ctx, kv.K, kv.V)
if err != nil {
return fmt.Errorf("error storing value under key %q: %v", string(kv.K), err)
return fmt.Errorf("failed to close write on %q: %v", objName, err)
}
}
return nil
Expand Down

0 comments on commit 6b3ebe3

Please sign in to comment.