Skip to content

Commit

Permalink
Stream events directly to GCS
Browse files Browse the repository at this point in the history
This takes a fairly different approach to how we emit our logs to GCS.  Previously we received them in one container and wrote them out to the local filesystem, and a sidecar would periodically enumerate the files emitted by that process, concatenate them, and send them up to GCS in a single upload.

When we shifted to Cloud Run, this approach became problematic because the filesystem is backed by memory, so under heavy load the event handler could see a lot of memory pressure between rotations and between the filesystem and the concatenation for the upload they end up in memory twice.

By collapsing the two processes together and simply uploading things directly, we can initiate a new file write, trickle events to that writer, and then flush the active writers.  Worst case the client is dumb and buffers things once, but in a perfect world this would initiate an upload of unknown size and we would stream events as they come in, which would dramatically reduce our memory pressure to roughly O(active events).

Signed-off-by: Matt Moore <[email protected]>
  • Loading branch information
mattmoor committed Sep 18, 2024
1 parent 652a950 commit 751aef9
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 523 deletions.
3 changes: 2 additions & 1 deletion modules/cloudevent-recorder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ module "foo-emits-events" {
}
```
<!-- BEGIN_TF_DOCS -->

## Requirements

No requirements.
Expand Down Expand Up @@ -136,7 +137,7 @@ No requirements.
| <a name="input_cloud_storage_config_max_duration"></a> [cloud\_storage\_config\_max\_duration](#input\_cloud\_storage\_config\_max\_duration) | The maximum duration that can elapse before a new Cloud Storage file is created. Min 1 minute, max 10 minutes, default 5 minutes. | `number` | `300` | no |
| <a name="input_deletion_protection"></a> [deletion\_protection](#input\_deletion\_protection) | Whether to enable deletion protection on data resources. | `bool` | `true` | no |
| <a name="input_enable_profiler"></a> [enable\_profiler](#input\_enable\_profiler) | Enable cloud profiler. | `bool` | `false` | no |
| <a name="input_flush_interval"></a> [flush\_interval](#input\_flush\_interval) | Flush interval for logrotate, as a duration string. | `string` | `""` | no |
| <a name="input_flush_interval"></a> [flush\_interval](#input\_flush\_interval) | Flush interval for logrotate, as a duration string. | `string` | `"3m"` | no |
| <a name="input_ignore_unknown_values"></a> [ignore\_unknown\_values](#input\_ignore\_unknown\_values) | Whether to ignore unknown values in the data, when transferring data to BigQuery. | `bool` | `false` | no |
| <a name="input_limits"></a> [limits](#input\_limits) | Resource limits for the regional go service. | <pre>object({<br> cpu = string<br> memory = string<br> })</pre> | `null` | no |
| <a name="input_location"></a> [location](#input\_location) | The location to create the BigQuery dataset in, and in which to run the data transfer jobs from GCS. | `string` | `"US"` | no |
Expand Down
36 changes: 0 additions & 36 deletions modules/cloudevent-recorder/cmd/logrotate/main.go

This file was deleted.

87 changes: 77 additions & 10 deletions modules/cloudevent-recorder/cmd/recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,29 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/sethvargo/go-envconfig"
"gocloud.dev/blob"

"github.com/chainguard-dev/clog"
_ "github.com/chainguard-dev/clog/gcp/init"
"github.com/chainguard-dev/terraform-infra-common/pkg/httpmetrics"
mce "github.com/chainguard-dev/terraform-infra-common/pkg/httpmetrics/cloudevents"
"github.com/chainguard-dev/terraform-infra-common/pkg/profiler"

// Add gcsblob support that we need to support gs:// prefixes
_ "gocloud.dev/blob/gcsblob"
)

var env = envconfig.MustProcess(context.Background(), &struct {
Port int `env:"PORT, default=8080"`
LogPath string `env:"LOG_PATH, required"`
Port int `env:"PORT, default=8080"`
FlushInterval time.Duration `env:"FLUSH_INTERVAL, default=3m"`
Bucket string `env:"BUCKET, required"`
}{})

func main() {
Expand All @@ -40,20 +48,79 @@ func main() {
if err != nil {
clog.Fatalf("failed to create event client, %v", err)
}

bucket, err := blob.OpenBucket(ctx, env.Bucket)
if err != nil {
clog.Fatalf("failed to open bucket, %v", err)
}
defer bucket.Close()

var m sync.Mutex
writers := make(map[string]*blob.Writer, 10)

// Periodically flush the writers to commit the data to the bucket.
go func() {
done := false
for {
writersToDrain := func() map[string]*blob.Writer {
m.Lock()
defer m.Unlock()
// Swap the writers map so we can safely iterate and close the writers.
writersToDrain := writers
writers = make(map[string]*blob.Writer, 10)
return writersToDrain
}()

for t, w := range writersToDrain {
clog.Infof("Flushing writer[%s]", t)
if err := w.Close(); err != nil {
clog.Errorf("failed to close writer[%s]: %v", t, err)
}
}

if done {
clog.InfoContextf(ctx, "Exiting flush loop")
return
}
select {
case <-time.After(env.FlushInterval):
case <-ctx.Done():
clog.InfoContext(ctx, "Flushing one more time")
done = true
}
}
}()

// Listen for events and as they come in write them to the appropriate
// writer based on event type.
if err := c.StartReceiver(ctx, func(_ context.Context, event cloudevents.Event) error {
dir := filepath.Join(env.LogPath, event.Type())
if err := os.MkdirAll(dir, 0755); err != nil {
writer, err := func() (*blob.Writer, error) {
m.Lock()
defer m.Unlock()

w, ok := writers[event.Type()]
if !ok {
w, err = bucket.NewWriter(ctx, filepath.Join(event.Type(), strconv.FormatInt(time.Now().UnixNano(), 10)), nil)
if err != nil {
clog.Errorf("failed to create writer: %v", err)
return nil, err
}
}
writers[event.Type()] = w
return w, nil
}()
if err != nil {
clog.Errorf("failed to create writer: %v", err)
return err
}

filename := filepath.Join(dir, event.ID())
if err := os.WriteFile(filename, event.Data(), 0600); err != nil {
clog.Warnf("failed to write file %s; %v", filename, err)
if err := os.RemoveAll(filename); err != nil {
clog.Warnf("failed to remove failed write file: %s; %v", filename, err)
}
// Write the event data as a line to the writer.
line := string(event.Data())
if _, err := writer.Write([]byte(line + "\n")); err != nil {
clog.Errorf("failed to write event data: %v", err)
return err
}

return nil
}); err != nil {
clog.Fatalf("failed to start event receiver, %v", err)
Expand Down
43 changes: 7 additions & 36 deletions modules/cloudevent-recorder/recorder.tf
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,15 @@ resource "google_storage_bucket_iam_binding" "recorder-writes-to-gcs-buckets" {
members = ["serviceAccount:${google_service_account.recorder.email}"]
}

locals {
lenv = [{
name = "LOG_PATH"
value = "/logs"
}]

logrotate_env = var.flush_interval == "" ? local.lenv : concat(local.lenv, [{
name = "FLUSH_INTERVAL"
value = var.flush_interval
}])
}

module "this" {
count = var.method == "trigger" ? 1 : 0
source = "../regional-go-service"
project_id = var.project_id
name = var.name
regions = var.regions

deletion_protection = var.deletion_protection

service_account = google_service_account.recorder.email
containers = {
"recorder" = {
Expand All @@ -48,37 +38,18 @@ module "this" {
}
ports = [{ container_port = 8080 }]
env = [{
name = "LOG_PATH"
value = "/logs"
}]
volume_mounts = [{
name = "logs"
mount_path = "/logs"
name = "FLUSH_INTERVAL"
value = var.flush_interval
}]
resources = {
limits = var.limits
}
}
"logrotate" = {
source = {
working_dir = path.module
importpath = "./cmd/logrotate"
}
env = local.logrotate_env
regional-env = [{
name = "BUCKET"
value = { for k, v in google_storage_bucket.recorder : k => v.url }
}]
volume_mounts = [{
name = "logs"
mount_path = "/logs"
}]
resources = {
limits = var.limits
}
}
}
volumes = [{
name = "logs"
empty_dir = {}
}]

scaling = var.scaling

Expand Down
2 changes: 1 addition & 1 deletion modules/cloudevent-recorder/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,5 @@ variable "split_triggers" {
variable "flush_interval" {
description = "Flush interval for logrotate, as a duration string."
type = string
default = ""
default = "3m"
}
Loading

0 comments on commit 751aef9

Please sign in to comment.