diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 8413181..f99de6f 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -32,7 +32,15 @@ jobs: run: | go mod download + - name: Run Kafka KRaft Broker + uses: spicyparrot/kafka-kraft-action@v1.1.0 + with: + kafka-version: "3.7.0" + kafka-topics: "example,1" + - name: Test + env: + KAFKA_BOOTSTRAP_SERVER: ${{ env.kafka_runner_address }}:9092 run: make cover - name: Upload coverage reports to Codecov diff --git a/Makefile b/Makefile index 76e8511..f64a146 100644 --- a/Makefile +++ b/Makefile @@ -1,20 +1,17 @@ # Directories containing independent Go modules. MODULE_DIRS = . -.PHONY: test-no-setup -test-no-setup: - ./coverage.sh .PHONY: setup-test setup-test: - docker compose -p $$RANDOM -f ./example/docker-compose.yaml up -d + docker compose -p $$RANDOM -f ./example/compose.yaml up -d .PHONY: test-local -test-local: setup-test test-no-setup +test-local: setup-test cover .PHONY: cover cover: - ./coverage.sh + export GO_TAGS=--tags=integration; ./coverage.sh --tags=integration .PHONY: example-producer example-producer: diff --git a/README.md b/README.md index 59bbf14..e36c098 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![License](https://img.shields.io/github/license/zillow/zkafka)](https://github.com/zillow/zkafka/blob/main/LICENSE) [![GitHub Actions](https://github.com/zillow/zkafka/actions/workflows/go.yml/badge.svg)](https://github.com/zillow/zkafka/actions/workflows/go.yml) [![Codecov](https://codecov.io/gh/zillow/zkafka/branch/main/graph/badge.svg?token=STRT8T67YP)](https://codecov.io/gh/zillow/zkafka) - +[![Go Report Card](https://goreportcard.com/badge/github.com/zillow/zkafka)](https://goreportcard.com/report/github.com/zillow/zkafka) ## Install @@ -93,7 +93,7 @@ client because that value is explicitly set to true after reading of the Additio "KafkaTopicConfig": { "Topic": "KafkaTopicName", "BootstrapServers": [ - "localhost:9093" + "localhost:9092" ], // translates to librdkafka value "bootstrap.servers" // specify ad hoc configuration values which don't have a strongly typed version in the TopicConfig struct. diff --git a/commitmgr_test.go b/commitmgr_test.go index 93e3bd3..8c4ed21 100644 --- a/commitmgr_test.go +++ b/commitmgr_test.go @@ -197,8 +197,8 @@ func Test_commitMgr_PerPartitionDataStructuresBuiltUpConcurrentlyCorrectly(t *te require.Equal(t, int64(0), mgr.inWorkCount, "expected inWorkCount to be empty") } -// Test_commitMgr_mutex_ShouldReturnReferenceToSameMutexForSamePartition tests for a race condition (based on bugfound) -// where two goroutines calling this method received distinct mutexes (which isn't correct for syncronization purposes). +// Test_commitMgr_mutex_ShouldReturnReferenceToSameMutexForSamePartition tests for a race condition (based on bug found) +// where two goroutines calling this method received distinct mutexes (which isn't correct for synchronization purposes). // Try a large amount of times to access the mutex for a particular partition. Always should return same pointer func Test_commitMgr_mutex_ShouldReturnReferenceToSameMutexForSamePartition(t *testing.T) { defer recoverThenFail(t) diff --git a/coverage.sh b/coverage.sh index 28beca1..03ec218 100755 --- a/coverage.sh +++ b/coverage.sh @@ -1,4 +1,8 @@ #!/usr/bin/env bash +set -x + +go_tags=$GO_TAGS +go_tags="${go_tags:---tags=unit}" # golang packages that will be used for either testing or will be assessed for coverage pck1=github.com/zillow/zkafka @@ -22,11 +26,11 @@ function quit() { } # change to example directory for execution (because it uses hardcoded filepaths, and the testable # examples don't work when executed outside of that directory -go test -c -coverpkg=$pck1 -covermode=atomic -o "$root_res" $pck1 +go test $go_tags -c -coverpkg=$pck1 -covermode=atomic -o "$root_res" $pck1 # convert binary to go formatted go tool test2json -t "$root_res" -test.v -test.coverprofile "$root_out" -go test -c -coverpkg=$pck1 -covermode=atomic -o "$source_res" $pck2 +go test $go_tags -c -coverpkg=$pck1 -covermode=atomic -o "$source_res" $pck2 go tool test2json -t "$source_res" -test.v -test.coverprofile "$source_out" # delete aggregate file diff --git a/example/compose.yaml b/example/compose.yaml new file mode 100644 index 0000000..bd28a32 --- /dev/null +++ b/example/compose.yaml @@ -0,0 +1,24 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:latest + container_name: zkafka-zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "22181:2181" + kafka: + image: confluentinc/cp-kafka:latest + container_name: zkafka-broker + depends_on: + - zookeeper + ports: + - "29092:29092" + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 diff --git a/example/consumer/consumer.go b/example/consumer/consumer.go index 66503b8..bb94796 100644 --- a/example/consumer/consumer.go +++ b/example/consumer/consumer.go @@ -12,7 +12,7 @@ import ( func main() { // configure broker connectivity options with zkafka.Config cfg := zkafka.Config{ - BootstrapServers: []string{"localhost:9093"}, + BootstrapServers: []string{"localhost:9092"}, } // configure consumer options with zkafka.ConsumerTopicConfig. See zkafka for full option values diff --git a/example/docker-compose.yaml b/example/docker-compose.yaml deleted file mode 100644 index 04d2e00..0000000 --- a/example/docker-compose.yaml +++ /dev/null @@ -1,10 +0,0 @@ -version: '2.1' - -services: - kafka3: - image: stewartboyd1988/docker-kafka-kraft - container_name: kafka3 - ports: - - "29092:29092" - - "9092:9092" - - "9093:9093" diff --git a/example/producer/producer.go b/example/producer/producer.go index 79b6a91..13c0cdf 100644 --- a/example/producer/producer.go +++ b/example/producer/producer.go @@ -14,7 +14,7 @@ import ( func main() { ctx := context.Background() writer, err := zkafka.NewClient(zkafka.Config{ - BootstrapServers: []string{"localhost:9093"}, + BootstrapServers: []string{"localhost:9092"}, }).Writer(ctx, zkafka.ProducerTopicConfig{ ClientID: "example", Topic: "two-multi-partition", diff --git a/example/worker/worker.go b/example/worker/worker.go index a41d314..0c3125b 100644 --- a/example/worker/worker.go +++ b/example/worker/worker.go @@ -17,7 +17,7 @@ import ( func main() { ctx := context.Background() client := zkafka.NewClient(zkafka.Config{ - BootstrapServers: []string{"localhost:9093"}, + BootstrapServers: []string{"localhost:9092"}, }, zkafka.LoggerOption(stdLogger{}), ) diff --git a/go.mod b/go.mod index d5dce24..af79f0d 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,15 @@ module github.com/zillow/zkafka go 1.22 require ( - github.com/confluentinc/confluent-kafka-go/v2 v2.4.0 + github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 github.com/golang/mock v1.6.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 - github.com/pkg/errors v0.9.1 github.com/sony/gobreaker v1.0.0 github.com/stretchr/testify v1.9.0 github.com/zillow/zfmt v1.0.1 - go.opentelemetry.io/otel v1.27.0 - go.opentelemetry.io/otel/trace v1.27.0 + go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel/trace v1.28.0 golang.org/x/sync v0.7.0 ) @@ -24,7 +23,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/heetch/avro v0.4.5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 165e2ff..436b46a 100644 --- a/go.sum +++ b/go.sum @@ -8,36 +8,40 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= -github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= -github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8= github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/actgardner/gogen-avro/v10 v10.2.1 h1:z3pOGblRjAJCYpkIJ8CmbMJdksi4rAhaygw0dyXZ930= github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= -github.com/aws/aws-sdk-go-v2 v1.17.6 h1:Y773UK7OBqhzi5VDXMi1zVGsoj+CVHs2eaC2bDsLwi0= -github.com/aws/aws-sdk-go-v2 v1.17.6/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2/config v1.18.16 h1:4r7gsCu8Ekwl5iJGE/GmspA2UifqySCCkyyyPFeWs3w= -github.com/aws/aws-sdk-go-v2/config v1.18.16/go.mod h1:XjM6lVbq7UgELp9NjXBrb1DQY/ownlWsvDhEQksemJc= -github.com/aws/aws-sdk-go-v2/credentials v1.13.16 h1:GgToSxaENX/1zXIGNFfiVk4hxryYJ5Vt4Mh8XLAL7Lc= -github.com/aws/aws-sdk-go-v2/credentials v1.13.16/go.mod h1:KP7aFJhfwPFgx9aoVYL2nYHjya5WBD98CWaadpgmnpY= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.24 h1:5qyqXASrX2zy5cTnoHHa4N2c3Lc94GH7gjnBP3GwKdU= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.24/go.mod h1:neYVaeKr5eT7BzwULuG2YbLhzWZ22lpjKdCybR7AXrQ= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.30 h1:y+8n9AGDjikyXoMBTRaHHHSaFEB8267ykmvyPodJfys= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.30/go.mod h1:LUBAO3zNXQjoONBKn/kR1y0Q4cj/D02Ts0uHYjcCQLM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.24 h1:r+Kv+SEJquhAZXaJ7G4u44cIwXV3f8K+N482NNAzJZA= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.24/go.mod h1:gAuCezX/gob6BSMbItsSlMb6WZGV7K2+fWOvk8xBSto= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.31 h1:hf+Vhp5WtTdcSdE+yEcUz8L73sAzN0R+0jQv+Z51/mI= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.31/go.mod h1:5zUjguZfG5qjhG9/wqmuyHRyUftl2B5Cp6NNxNC6kRA= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.24 h1:c5qGfdbCHav6viBwiyDns3OXqhqAbGjfIB4uVu2ayhk= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.24/go.mod h1:HMA4FZG6fyib+NDo5bpIxX1EhYjrAOveZJY2YR0xrNE= -github.com/aws/aws-sdk-go-v2/service/sso v1.12.5 h1:bdKIX6SVF3nc3xJFw6Nf0igzS6Ff/louGq8Z6VP/3Hs= -github.com/aws/aws-sdk-go-v2/service/sso v1.12.5/go.mod h1:vuWiaDB30M/QTC+lI3Wj6S/zb7tpUK2MSYgy3Guh2L0= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.5 h1:xLPZMyuZ4GuqRCIec/zWuIhRFPXh2UOJdLXBSi64ZWQ= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.5/go.mod h1:QjxpHmCwAg0ESGtPQnLIVp7SedTOBMYy+Slr3IfMKeI= -github.com/aws/aws-sdk-go-v2/service/sts v1.18.6 h1:rIFn5J3yDoeuKCE9sESXqM5POTAhOP1du3bv/qTL+tE= -github.com/aws/aws-sdk-go-v2/service/sts v1.18.6/go.mod h1:48WJ9l3dwP0GSHWGc5sFGGlCkuA82Mc2xnw+T6Q8aDw= -github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= -github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= +github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.10 h1:PS+65jThT0T/snC5WjyfHHyUgG+eBoupSDV+f838cro= +github.com/aws/aws-sdk-go-v2/config v1.27.10/go.mod h1:BePM7Vo4OBpHreKRUMuDXX+/+JWP38FLkzl5m27/Jjs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.10 h1:qDZ3EA2lv1KangvQB6y258OssCHD0xvaGiEDkG4X/10= +github.com/aws/aws-sdk-go-v2/credentials v1.17.10/go.mod h1:6t3sucOaYDwDssHQa0ojH1RpmVmF5/jArkye1b2FKMI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.4 h1:WzFol5Cd+yDxPAdnzTA5LmpHYSWinhmSj4rQChV0ee8= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.4/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/buger/goterm v1.0.4 h1:Z9YvGmOih81P0FbVtEYTFF6YsSgxSUKEhf/f9bTMXbY= @@ -46,18 +50,20 @@ github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqy github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/compose-spec/compose-go/v2 v2.0.0-rc.2 h1:eJ01FpliL/02KvsaPyH1bSLbM1S70yWQUojHVRbyvy4= -github.com/compose-spec/compose-go/v2 v2.0.0-rc.2/go.mod h1:IVsvFyGVhw4FASzUtlWNVaAOhYmakXAFY9IlZ7LAuD8= -github.com/confluentinc/confluent-kafka-go/v2 v2.4.0 h1:NbOku86JJlsRJPJKE0snNsz6D1Qr4j5VR/lticrLZrY= -github.com/confluentinc/confluent-kafka-go/v2 v2.4.0/go.mod h1:E1dEQy50ZLfqs7T9luxz0rLxaeFZJZE92XvApJOr/Rk= -github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= -github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= -github.com/containerd/containerd v1.7.12 h1:+KQsnv4VnzyxWcfO9mlxxELaoztsDEjOuCMPAuPqgU0= -github.com/containerd/containerd v1.7.12/go.mod h1:/5OMpE1p0ylxtEUGY8kuCYkDRzJm9NO1TFMWjUpdevk= -github.com/containerd/continuity v0.4.2 h1:v3y/4Yz5jwnvqPKJJ+7Wf93fyWoCB3F5EclWG023MDM= -github.com/containerd/continuity v0.4.2/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= +github.com/compose-spec/compose-go/v2 v2.1.0 h1:qdW2qISQlCQG8v1O2TChcdxgAWTUGgUX/CPSO+ES9+E= +github.com/compose-spec/compose-go/v2 v2.1.0/go.mod h1:bEPizBkIojlQ20pi2vNluBa58tevvj0Y18oUSHPyfdc= +github.com/confluentinc/confluent-kafka-go/v2 v2.5.0 h1:PM18lA9g6u6Qcz06DpXmGRlxXTvWlHqnlAkQi1chPUo= +github.com/confluentinc/confluent-kafka-go/v2 v2.5.0/go.mod h1:Hyo+IIQ/tmsfkOcRP8T6VlSeOW3T33v0Me8Xvq4u90Y= +github.com/containerd/console v1.0.4 h1:F2g4+oChYvBTsASRTz8NP6iIAi97J3TtSAsLbIFn4ro= +github.com/containerd/console v1.0.4/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk= +github.com/containerd/containerd v1.7.15 h1:afEHXdil9iAm03BmhjzKyXnnEBtjaLJefdU7DV0IFes= +github.com/containerd/containerd v1.7.15/go.mod h1:ISzRRTMF8EXNpJlTzyr2XMhN+j9K302C21/+cr3kUnY= +github.com/containerd/continuity v0.4.3 h1:6HVkalIp+2u1ZLH1J/pYX2oBVXlJZvh1X1A7bEZ9Su8= +github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/containerd/ttrpc v1.2.3 h1:4jlhbXIGvijRtNC8F/5CpuJZ7yKOBFGFOOXg1bkISz0= +github.com/containerd/ttrpc v1.2.3/go.mod h1:ieWsXucbb8Mj9PH0rXCw1i8IunRbbAiDkpXkbfflWBM= github.com/containerd/typeurl/v2 v2.1.1 h1:3Q4Pt7i8nYwy2KmQWIw2+1hTvwTE/6w9FqcttATPO/4= github.com/containerd/typeurl/v2 v2.1.1/go.mod h1:IDp2JFvbwZ31H8dQbEIY7sDl2L3o3HZj1hsSQlywkQ0= github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= @@ -65,18 +71,18 @@ github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHf github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= -github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/buildx v0.12.0-rc2.0.20231219140829-617f538cb315 h1:UZxx9xBADdf/9UmSdEUi+pdJoPKpgcf9QUAY5gEIYmY= -github.com/docker/buildx v0.12.0-rc2.0.20231219140829-617f538cb315/go.mod h1:X8ZHhuW6ncwtoJ36TlU+gyaROTcBkTE01VHYmTStQCE= -github.com/docker/cli v25.0.1+incompatible h1:mFpqnrS6Hsm3v1k7Wa/BO23oz0k121MTbTO1lpcGSkU= -github.com/docker/cli v25.0.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= -github.com/docker/compose/v2 v2.24.3 h1:BVc1oDV7aQgksH64pDKTvcI95G36uJ+Mz9DGGBBoZeQ= -github.com/docker/compose/v2 v2.24.3/go.mod h1:D8Nv9+juzD7xiMyyHJ7G2J/MOYiGBmb9SvdIW5+2zKo= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/buildx v0.14.0 h1:FxqcfE7xgeEC4oQlKLpuvfobRDVDXrHE3jByM+mdyqk= +github.com/docker/buildx v0.14.0/go.mod h1:Vy/2lC9QsJvo33+7KKkN/GDE5WxnVqW0/dpcN7ZqPJY= +github.com/docker/cli v26.1.0+incompatible h1:+nwRy8Ocd8cYNQ60mozDDICICD8aoFGtlPXifX/UQ3Y= +github.com/docker/cli v26.1.0+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= +github.com/docker/compose/v2 v2.27.0 h1:FKyClQdErCxUZULC2zo6Jn5ve+epFPe/Y0HaxjmUzNg= +github.com/docker/compose/v2 v2.27.0/go.mod h1:uaqwmY6haO8wXWHk+LAsqqDapX6boH4izRKqj/E7+Bo= github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v25.0.3+incompatible h1:D5fy/lYmY7bvZa0XTZ5/UJPljor41F+vdyJG5luQLfQ= -github.com/docker/docker v25.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v26.1.0+incompatible h1:W1G9MPNbskA6VZWL7b3ZljTh0pXI68FpINx0GKaOdaM= +github.com/docker/docker v26.1.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker-credential-helpers v0.8.0 h1:YQFtbBQb4VrpoPxhFuzEBPQ9E16qz5SpHLS+uswaCp8= github.com/docker/docker-credential-helpers v0.8.0/go.mod h1:UGFXcuoQ5TxPiB54nHOZ32AWRqQdECoh/Mg0AlEYb40= github.com/docker/go v1.5.1-1.0.20160303222718-d30aec9fd63c h1:lzqkGL9b3znc+ZUgi7FlLnqjQhcXxkNM/quxIjBVMD0= @@ -87,8 +93,10 @@ github.com/docker/go-metrics v0.0.1 h1:AgB/0SvBxihN0X8OR4SjsblXkbMvalQ8cjmtKQ2rQ github.com/docker/go-metrics v0.0.1/go.mod h1:cG1hvH2utMXtqgqqYE9plW6lDxS3/5ayHzueweSI3Vw= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/emicklei/go-restful/v3 v3.10.1 h1:rc42Y5YTp7Am7CS630D7JmhRjq4UlEUuEKfrDac4bSQ= -github.com/emicklei/go-restful/v3 v3.10.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203 h1:XBBHcIb256gUJtLmY22n99HaZTz+r2Z51xUPi01m3wg= +github.com/eiannone/keyboard v0.0.0-20220611211555-0d226195f203/go.mod h1:E1jcSv8FaEny+OP/5k9UxZVw9YFWGj7eI4KR/iOBqCg= +github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= +github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= @@ -104,12 +112,12 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= -github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= -github.com/go-openapi/jsonreference v0.20.0 h1:MYlu0sBgChmCfJxxUKZ8g1cPWFOB37YSZqewK7OKeyA= -github.com/go-openapi/jsonreference v0.20.0/go.mod h1:Ag74Ico3lPc+zR+qjn4XBUmXymS4zJbYVCZmcgkasdo= -github.com/go-openapi/swag v0.19.14 h1:gm3vOOXfiuw5i9p5N9xJvfjvuofpyvLA9Wr6QfK5Fng= -github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= +github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= +github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= +github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= +github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= +github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= +github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/googleapis v1.4.1 h1:1Yx4Myt7BxzvUr5ldGSbwYiZG6t9wGBZ+8/fX3Wvtq0= @@ -122,8 +130,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/gnostic v0.5.7-v3refs h1:FhTMOKj2VhjpouxvWJAV1TL304uMlb9zcDqkl6cEI54= -github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -134,6 +142,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= @@ -162,8 +172,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -172,12 +182,12 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= -github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= -github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebGE2xrk= @@ -194,8 +204,10 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/moby/buildkit v0.13.0-beta1.0.20231219135447-957cb50df991 h1:r80LLQ91uOLxU1ElAvrB1o8oBsph51lPzVnr7t2b200= -github.com/moby/buildkit v0.13.0-beta1.0.20231219135447-957cb50df991/go.mod h1:6MddWPSL5jxy+W8eMMHWDOfZzzRRKWXPZqajw72YHBc= +github.com/moby/buildkit v0.13.1 h1:L8afOFhPq2RPJJSr/VyzbufwID7jquZVB7oFHbPRcPE= +github.com/moby/buildkit v0.13.1/go.mod h1:aNmNQKLBFYAOFuzQjR3VA27/FijlvtBD1pjNwTSN37k= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= @@ -222,6 +234,8 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= @@ -234,22 +248,24 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= -github.com/prometheus/client_golang v1.16.0 h1:yk/hx9hDbrGHovbci4BY+pRMfSuuat626eFsHb7tmT8= -github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= -github.com/prometheus/client_model v0.4.0 h1:5lQXD3cAg1OXBf4Wq03gTrXHeaV0TQvGfUooCfx1yqY= -github.com/prometheus/client_model v0.4.0/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU= -github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= -github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= -github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= -github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= +github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= +github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/common v0.44.0 h1:+5BrQJwiBB9xsMygAB3TNvpQKOwlkc25LbISbrdOOfY= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= +github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= +github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/secure-systems-lab/go-securesystemslib v0.4.0 h1:b23VGrQhTA8cN2CbBw7/FulN9fTtqYUdS5+Oxzt+DUE= github.com/secure-systems-lab/go-securesystemslib v0.4.0/go.mod h1:FGBZgq2tXWICsxWQW1msNf49F0Pf2Op5Htayx335Qbs= -github.com/serialx/hashring v0.0.0-20190422032157-8b2912629002 h1:ka9QPuQg2u4LGipiZGsgkg3rJCo4iIUCy75FddM0GRQ= -github.com/serialx/hashring v0.0.0-20190422032157-8b2912629002/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc= +github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b h1:h+3JX2VoWTFuyQEo87pStk/a99dzIO1mM9KxIyLPGTU= +github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc= github.com/shibumi/go-pathspec v1.3.0 h1:QUyMZhFo0Md5B8zV8x2tesohbb5kfbpTi9rBnKh5dkI= github.com/shibumi/go-pathspec v1.3.0/go.mod h1:Xutfslp817l2I1cZvgcfeMQJG5QnU2lh5tVaaMCl3jE= github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11DgpE4= @@ -258,6 +274,8 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 h1:JIAuq3EEf9cgbU6AtGPK4CTG3Zf6CKMNqf0MHTggAUA= +github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966/go.mod h1:sUM3LWHvSMaG192sy56D9F7CNvL7jUJVXoqM1QKLnog= github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0= @@ -268,10 +286,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/testcontainers/testcontainers-go v0.29.1 h1:z8kxdFlovA2y97RWx98v/TQ+tR+SXZm6p35M+xB92zk= -github.com/testcontainers/testcontainers-go v0.29.1/go.mod h1:SnKnKQav8UcgtKqjp/AD8bE1MqZm+3TDb/B8crE3XnI= -github.com/testcontainers/testcontainers-go/modules/compose v0.29.1 h1:47ipPM+s+ltCDOP3Sa1j95AkNb+z+WGiHLDbLU8ixuc= -github.com/testcontainers/testcontainers-go/modules/compose v0.29.1/go.mod h1:Sqh+Ef2ESdbJQjTJl57UOkEHkOc7gXvQLg1b5xh6f1Y= +github.com/testcontainers/testcontainers-go v0.31.0 h1:W0VwIhcEVhRflwL9as3dhY6jXjVCA27AkmbnZ+UTh3U= +github.com/testcontainers/testcontainers-go v0.31.0/go.mod h1:D2lAoA0zUFiSY+eAflqK5mcUx/A5hrrORaEQrd0SefI= +github.com/testcontainers/testcontainers-go/modules/compose v0.31.0 h1:H74o3HisnApIDQx7sWibGzOl/Oo0By8DjyVeUf3qd6I= +github.com/testcontainers/testcontainers-go/modules/compose v0.31.0/go.mod h1:z1JAsvL2/pNFy40yJX0VX9Yk+hzOCIO5DydxBJHBbCY= github.com/theupdateframework/notary v0.7.0 h1:QyagRZ7wlSpjT5N2qQAh/pN+DVqgekv4DzbAiAiEL3c= github.com/theupdateframework/notary v0.7.0/go.mod h1:c9DRxcmhHmVLDay4/2fUYdISnHqbFDGRSlXPO0AhYWw= github.com/tilt-dev/fsnotify v1.4.8-0.20220602155310-fff9c274a375 h1:QB54BJwA6x8QU9nHY3xJSZR2kX9bgpZekRKGkLTmEXA= @@ -280,8 +298,8 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= -github.com/tonistiigi/fsutil v0.0.0-20230825212630-f09800878302 h1:ZT8ibgassurSISJ1Pj26NsM3vY2jxFZn63Nd/TpHmRw= -github.com/tonistiigi/fsutil v0.0.0-20230825212630-f09800878302/go.mod h1:9kMVqMyQ/Sx2df5LtnGG+nbrmiZzCS7V6gjW3oGHsvI= +github.com/tonistiigi/fsutil v0.0.0-20240301111122-7525a1af2bb5 h1:oZS8KCqAg62sxJkEq/Ppzqrb6EooqzWtL8Oaex7bc5c= +github.com/tonistiigi/fsutil v0.0.0-20240301111122-7525a1af2bb5/go.mod h1:vbbYqJlnswsbJqWUcJN8fKtBhnEgldDrcagTgnBVKKM= github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea h1:SXhTLE6pb6eld/v/cCndK0AMpt1wiVFb/YYmqB3/QG0= github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea/go.mod h1:WPnis/6cRcDZSUvVmezrxJPkiO87ThFYsoUiMwWNDJk= github.com/tonistiigi/vt100 v0.0.0-20230623042737-f9a4f7ef6531 h1:Y/M5lygoNPKwVNLMPXgVfsRT40CSFKXCxuU8LoHySjs= @@ -297,56 +315,54 @@ github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFi github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zillow/zfmt v1.0.1 h1:JLN5WaxoqqoEPUpVWer83uhXhDPAA2nZkfQqgKnWp+w= github.com/zillow/zfmt v1.0.1/go.mod h1:0PpKh4rWh+5Ghr2bbuN5UvEcqEz6PkHfE0Idgjyxy7Y= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0 h1:RsQi0qJ2imFfCvZabqzM9cNXBG8k6gXMv1A0cXRmH6A= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0/go.mod h1:vsh3ySueQCiKPxFLvjWC4Z135gIa34TQ/NSqkDTZYUM= -go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.45.0 h1:2ea0IkZBsWH+HA2GkD+7+hRw2u97jzdFyRtXuO14a1s= -go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.45.0/go.mod h1:4m3RnBBb+7dB9d21y510oO1pdB1V4J6smNf14WXcBFQ= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 h1:x8Z78aZx8cOF0+Kkazoc7lwUNMGy0LrzEMxTm4BbTxg= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0/go.mod h1:62CPTSry9QZtOaSsE3tOzhx6LzDhHnXJ6xHeMNNiM6Q= -go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= -go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0/go.mod h1:Mjt1i1INqiaoZOMGR1RIUJN+i3ChKoFRqzrRQhlkbs0= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.1 h1:gbhw/u49SS3gkPWiYweQNJGm/uJN5GkI/FrosxSHT7A= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.1/go.mod h1:GnOaBaFQ2we3b9AGWJpsBa7v1S5RlQzlC3O7dRMxZhM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0/go.mod h1:UVAO61+umUsHLtYb8KXXRoHtxUkdOPkYidzW3gipRLQ= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.42.0 h1:wNMDy/LVGLj2h3p6zg4d0gypKfWKSWI14E1C4smOgl8= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.42.0/go.mod h1:YfbDdXAAkemWJK3H/DshvlrxqFB2rtW4rY6ky/3x/H0= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 h1:digkEZCJWobwBqMwC0cwCq8/wkkRy/OowZg5OArWZrM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0/go.mod h1:/OpE/y70qVkndM0TrxT4KBoN3RsFZP0QaofcfYrj76I= go.opentelemetry.io/otel/exporters/prometheus v0.42.0 h1:jwV9iQdvp38fxXi8ZC+lNpxjK16MRcZlpDYvbuO1FiA= go.opentelemetry.io/otel/exporters/prometheus v0.42.0/go.mod h1:f3bYiqNqhoPxkvI2LrXqQVC546K7BuRDL/kKuxkujhA= -go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= -go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= -go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= -go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= -go.opentelemetry.io/otel/sdk/metric v1.19.0 h1:EJoTO5qysMsYCa+w4UghwFV/ptQgqSL/8Ni+hx+8i1k= -go.opentelemetry.io/otel/sdk/metric v1.19.0/go.mod h1:XjG0jQyFJrv2PbMvwND7LwCEhsJzCzV5210euduKcKY= -go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= -go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= +go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= +go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= +go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= -golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o= +golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= -golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ= -golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= +golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= @@ -356,22 +372,20 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= -golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q= +golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ= -golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -387,6 +401,8 @@ google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -396,24 +412,24 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.26.7 h1:Lf4iEBEJb5OFNmawtBfSZV/UNi9riSJ0t1qdhyZqI40= -k8s.io/api v0.26.7/go.mod h1:Vk9bMadzA49UHPmHB//lX7VRCQSXGoVwfLd3Sc1SSXI= -k8s.io/apimachinery v0.26.7 h1:590jSBwaSHCAFCqltaEogY/zybFlhGsnLteLpuF2wig= -k8s.io/apimachinery v0.26.7/go.mod h1:qYzLkrQ9lhrZRh0jNKo2cfvf/R1/kQONnSiyB7NUJU0= -k8s.io/apiserver v0.26.7 h1:NX/zBZZn4R+Cq6shwyn8Pn8REd0yJJ16dbtv9WkEVEU= -k8s.io/apiserver v0.26.7/go.mod h1:r0wDRWHI7VL/KlQLTkJJBVGZ3KeNfv+VetlyRtr86xs= -k8s.io/client-go v0.26.7 h1:hyU9aKHlwVOykgyxzGYkrDSLCc4+mimZVyUJjPyUn1E= -k8s.io/client-go v0.26.7/go.mod h1:okYjy0jtq6sdeztALDvCh24tg4opOQS1XNvsJlERDAo= -k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= -k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= -k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E= -k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= -k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 h1:kmDqav+P+/5e1i9tFfHq1qcF3sOrDp+YEkVDAHu7Jwk= -k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= -sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= -sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= -sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= +k8s.io/api v0.29.2 h1:hBC7B9+MU+ptchxEqTNW2DkUosJpp1P+Wn6YncZ474A= +k8s.io/api v0.29.2/go.mod h1:sdIaaKuU7P44aoyyLlikSLayT6Vb7bvJNCX105xZXY0= +k8s.io/apimachinery v0.29.2 h1:EWGpfJ856oj11C52NRCHuU7rFDwxev48z+6DSlGNsV8= +k8s.io/apimachinery v0.29.2/go.mod h1:6HVkd1FwxIagpYrHSwJlQqZI3G9LfYWRPAkUvLnXTKU= +k8s.io/apiserver v0.29.2 h1:+Z9S0dSNr+CjnVXQePG8TcBWHr3Q7BmAr7NraHvsMiQ= +k8s.io/apiserver v0.29.2/go.mod h1:B0LieKVoyU7ykQvPFm7XSdIHaCHSzCzQWPFa5bqbeMQ= +k8s.io/client-go v0.29.2 h1:FEg85el1TeZp+/vYJM7hkDlSTFZ+c5nnK44DJ4FyoRg= +k8s.io/client-go v0.29.2/go.mod h1:knlvFZE58VpqbQpJNbCbctTVXcd35mMyAAwBdpt4jrA= +k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= +k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= +k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= +k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= tags.cncf.io/container-device-interface v0.6.2 h1:dThE6dtp/93ZDGhqaED2Pu374SOeUkBfuvkLuiTdwzg= diff --git a/message.go b/message.go index fdaee5f..6604aa0 100644 --- a/message.go +++ b/message.go @@ -2,6 +2,7 @@ package zkafka import ( "context" + "errors" "fmt" "os" "sync" @@ -9,7 +10,6 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/google/uuid" - "github.com/pkg/errors" "github.com/zillow/zfmt" ) diff --git a/reader.go b/reader.go index 6a24c08..6750dc3 100644 --- a/reader.go +++ b/reader.go @@ -5,11 +5,12 @@ package zkafka import ( "context" + "errors" + "fmt" "sync" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/pkg/errors" ) //// go:generate mockgen -destination=./mocks/mock_metrics.go -source=reader.go @@ -98,9 +99,9 @@ func (r *KReader) Read(ctx context.Context) (*Message, error) { r.logger.Debugw(ctx, "Retryable error occurred", "topics", r.topicConfig.topics(), "error", v) return nil, nil } - return nil, errors.Wrap(err, "failed to read kafka message") + return nil, fmt.Errorf("failed to read kafka message: %w", err) } - return nil, errors.Wrap(err, "failed to read kafka message") + return nil, fmt.Errorf("failed to read kafka message: %w", err) } if kmsg == nil { return nil, nil @@ -118,7 +119,7 @@ func (r *KReader) Close() error { r.isClosed = true err := r.consumer.Close() if err != nil { - return errors.Wrap(err, "failed to close kafka reader") + return fmt.Errorf("failed to close kafka reader: %w", err) } return nil } @@ -127,7 +128,7 @@ func (r *KReader) Close() error { func (r *KReader) Assignments(_ context.Context) ([]Assignment, error) { assignments, err := r.consumer.Assignment() if err != nil { - return nil, errors.Wrap(err, "failed to get assignments") + return nil, fmt.Errorf("failed to get assignments: %w", err) } topicPartitions := make([]Assignment, 0, len(assignments)) for _, tp := range assignments { @@ -202,8 +203,8 @@ func (r *KReader) mapMessage(_ context.Context, msg kafka.Message) *Message { } // getRebalanceCb returns a callback which can be used during rebalances. -// It previously attempted to do one final, explict commit of stored offsets. -// This was unncessary per the mantainer of librdkafka (https://github.com/confluentinc/librdkafka/issues/1829#issuecomment-393427324) +// It previously attempted to do one final, explicit commit of stored offsets. +// This was unnecessary per the maintainer of librdkafka (https://github.com/confluentinc/librdkafka/issues/1829#issuecomment-393427324) // since when using auto.offset.commit=true (which this library does) the offsets are commit at configured intervals, during close and finally during rebalance. // // We do however, want to attempt to let current work complete before allowing a rebalance (so we check the in progress heap) for up to 10 seconds. diff --git a/test/integration_test.go b/test/integration_test.go index b03375a..53d1c94 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1081,9 +1081,9 @@ func Test_WorkDelay_GuaranteesProcessingDelayedAtLeastSpecifiedDelayDurationFrom // // This test creates N messages on the topic and then starts processing // 1. It asserts that the time since the message was written is at least that of the delay. -// This is a weak assertion sicne the messages are written before the work consumer group is started. Other tests do a better job confirming this behavior +// This is a weak assertion since the messages are written before the work consumer group is started. Other tests do a better job confirming this behavior // 2. It also asserts that the time between the first and last message is very short. -// This is expected in a backlog situation, since the worker will delay once, and with monotonically increasing timestamps won't have to dely again +// This is expected in a backlog situation, since the worker will delay once, and with monotonically increasing timestamps won't have to delay again func Test_WorkDelay_DoesntHaveDurationStackEffect(t *testing.T) { ctx := context.Background() @@ -1204,7 +1204,7 @@ func createTopic(t *testing.T, bootstrapServer, topic string, partitions int) { func getBootstrap() string { bootstrapServer, ok := os.LookupEnv("KAFKA_BOOTSTRAP_SERVER") if !ok { - bootstrapServer = "localhost:9093" // local development + bootstrapServer = "localhost:9092" // local development } return bootstrapServer } diff --git a/test/work_test.go b/test/worker_test.go similarity index 92% rename from test/work_test.go rename to test/worker_test.go index 9282473..7000947 100644 --- a/test/work_test.go +++ b/test/worker_test.go @@ -18,6 +18,7 @@ import ( "github.com/zillow/zfmt" "github.com/zillow/zkafka" zkafka_mocks "github.com/zillow/zkafka/mocks" + "golang.org/x/sync/errgroup" "github.com/golang/mock/gomock" ) @@ -35,6 +36,7 @@ func TestWork_Run_FailsWithLogsWhenFailedToGetReader(t *testing.T) { defer ctrl.Finish() l := zkafka_mocks.NewMockLogger(ctrl) + l.EXPECT().Debugw(gomock.Any(), gomock.Any()).AnyTimes() l.EXPECT().Warnw(gomock.Any(), "Kafka worker read message failed", "error", gomock.Any(), "topics", gomock.Any()).MinTimes(1) l.EXPECT().Warnw(gomock.Any(), "Kafka topic processing circuit open", "topics", gomock.Any()).AnyTimes() @@ -51,9 +53,10 @@ func TestWork_Run_FailsWithLogsWhenFailedToGetReader(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return fanoutCount.Load() >= 1 @@ -65,6 +68,8 @@ func TestWork_Run_FailsWithLogsWhenFailedToGetReader(t *testing.T) { pollPause: time.Millisecond, maxWait: 10 * time.Second, }) + cancel() + require.NoError(t, grp.Wait()) } func TestWork_Run_FailsWithLogsWhenGotNilReader(t *testing.T) { @@ -76,6 +81,7 @@ func TestWork_Run_FailsWithLogsWhenGotNilReader(t *testing.T) { l := zkafka_mocks.NewMockLogger(ctrl) l.EXPECT().Warnw(gomock.Any(), "Kafka worker read message failed", "error", gomock.Any(), "topics", gomock.Any()).Times(1) + l.EXPECT().Debugw(gomock.Any(), gomock.Any()).AnyTimes() kcp := zkafka_mocks.NewMockClientProvider(ctrl) kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Times(1).Return(nil, nil) @@ -89,7 +95,8 @@ func TestWork_Run_FailsWithLogsWhenGotNilReader(t *testing.T) { cancel() }})) - w.Run(ctx, nil) + err := w.Run(ctx, nil) + require.NoError(t, err) } func TestWork_Run_FailsWithLogsForReadError(t *testing.T) { @@ -100,8 +107,8 @@ func TestWork_Run_FailsWithLogsForReadError(t *testing.T) { defer ctrl.Finish() l := zkafka_mocks.NewMockLogger(ctrl) - l.EXPECT().Warnw(gomock.Any(), "Kafka worker read message failed", "error", gomock.Any(), "topics", gomock.Any()).MinTimes(1) + l.EXPECT().Debugw(gomock.Any(), gomock.Any()).AnyTimes() r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).Times(1).Return(nil, errors.New("error occurred during read")) @@ -117,7 +124,8 @@ func TestWork_Run_FailsWithLogsForReadError(t *testing.T) { cancel() }})) - w.Run(ctx, nil) + err := w.Run(ctx, nil) + require.NoError(t, err) } func TestWork_Run_CircuitBreakerOpensOnReadError(t *testing.T) { @@ -144,7 +152,7 @@ func TestWork_Run_CircuitBreakerOpensOnReadError(t *testing.T) { zkafka.CircuitBreakAfter(1), // Circuit breaks after 1 error. zkafka.CircuitBreakFor(50*time.Millisecond), zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) { - l.Warnw(ctx, "Fanout callback called") + l.Warnw(ctx, "Fan out callback called") cnt.Add(1) }})) @@ -152,19 +160,21 @@ func TestWork_Run_CircuitBreakerOpensOnReadError(t *testing.T) { defer cancel() start := time.Now() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return cnt.Load() >= 10 }, pollOpts{ exit: cancel, timeoutExit: func() { - require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %s", 10) + require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %d", 10) }, }) - require.GreaterOrEqual(t, time.Since(start), 200*time.Millisecond, "Every circuit breaker stoppage is 50ms, and we expect it to be in open state (stoppage) for half the messages (and half open for the other half, 1 message through).") + require.GreaterOrEqual(t, time.Since(start), 150*time.Millisecond, "Every circuit breaker stoppage is 50ms, and we expect it to be in open state (stoppage) for half the messages (and half open for the other half, 1 message through). (10/2-1)*50ms = 200ms. Subtract 50ms for fuzz") + require.NoError(t, grp.Wait()) } func TestWork_Run_CircuitBreaksOnProcessError(t *testing.T) { @@ -205,20 +215,22 @@ func TestWork_Run_CircuitBreaksOnProcessError(t *testing.T) { defer cancel() start := time.Now() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return cnt.Load() >= 10 }, pollOpts{ exit: cancel, timeoutExit: func() { - require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %s", 10) + require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %d", 10) }, }) require.GreaterOrEqual(t, time.Since(start), 400*time.Millisecond, "Every circuit breaker stoppage is 50ms, and we expect it to be executed for each of the n -2 failed messages (first one results in error and trips the circuit breaker. Second message read prior to trip") + require.NoError(t, grp.Wait()) } func TestWork_Run_DoNotSkipCircuitBreak(t *testing.T) { @@ -262,9 +274,10 @@ func TestWork_Run_DoNotSkipCircuitBreak(t *testing.T) { defer cancel() start := time.Now() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return cnt.Load() > 10 @@ -272,10 +285,11 @@ func TestWork_Run_DoNotSkipCircuitBreak(t *testing.T) { exit: cancel, pollPause: time.Microsecond * 100, timeoutExit: func() { - require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %s", 10) + require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %d", 10) }, }) require.GreaterOrEqual(t, time.Since(start), 450*time.Millisecond, "Every circuit breaker stoppage is 50ms, and we expect it to be executed for each of the n -1 failed messages (first one results in error and trips the circuit breaker") + require.NoError(t, grp.Wait()) } func TestWork_Run_DoSkipCircuitBreak(t *testing.T) { @@ -319,20 +333,22 @@ func TestWork_Run_DoSkipCircuitBreak(t *testing.T) { defer cancel() start := time.Now() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return cnt.Load() >= 10 }, pollOpts{ exit: cancel, timeoutExit: func() { - require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %s", 10) + require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %d", 10) }, }) - require.LessOrEqual(t, time.Since(start), 50*time.Millisecond, "Every circuit breaker stoppage is 50ms, and we expect it to be skipped for each of the 10 failed messages. The expected time to process 10 messages is on the order of micro/nanoseconds, but we'll conservatievely be happy with being less than a single circuit break cycle") + require.LessOrEqual(t, time.Since(start), 50*time.Millisecond, "Every circuit breaker stoppage is 50ms, and we expect it to be skipped for each of the 10 failed messages. The expected time to process 10 messages is on the order of micro/nanoseconds, but we'll conservatively be happy with being less than a single circuit break cycle") + require.NoError(t, grp.Wait()) } func TestWork_Run_CircuitBreaksOnProcessPanicInsideProcessorGoRoutine(t *testing.T) { @@ -373,9 +389,10 @@ func TestWork_Run_CircuitBreaksOnProcessPanicInsideProcessorGoRoutine(t *testing defer cancel() start := time.Now() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { ok := cnt.Load() >= 10 @@ -385,11 +402,12 @@ func TestWork_Run_CircuitBreaksOnProcessPanicInsideProcessorGoRoutine(t *testing return ok }, pollOpts{ timeoutExit: func() { - require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %s", 10) + require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %d", 10) }, }) require.GreaterOrEqual(t, time.Since(start), 400*time.Millisecond, "Every circuit breaker stoppage is 50ms, and we expect it to be executed for each of the n failed messages with the exception of the first and second message (first trips, and second is read before the trip)") + require.NoError(t, grp.Wait()) } func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) { @@ -407,6 +425,7 @@ func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) { l.EXPECT().Warnw(gomock.Any(), "Outside context canceled", "error", gomock.Any(), "kmsg", gomock.Any()).AnyTimes() l.EXPECT().Warnw(gomock.Any(), "Kafka topic processing circuit open", "topics", gomock.Any()).Times(0) l.EXPECT().Debugw(gomock.Any(), "Kafka topic message received", "offset", gomock.Any(), "partition", gomock.Any(), "topic", gomock.Any(), "groupID", gomock.Any()).AnyTimes() + l.EXPECT().Debugw(gomock.Any(), gomock.Any()).AnyTimes() r := zkafka_mocks.NewMockReader(ctrl) r.EXPECT().Read(gomock.Any()).MinTimes(4).Return(nil, errors.New("error occurred on read")) @@ -429,9 +448,10 @@ func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { ok := cnt.Load() >= int64(processingCount) @@ -441,9 +461,10 @@ func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) { return ok }, pollOpts{ timeoutExit: func() { - require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %s", processingCount) + require.Failf(t, "Polling condition not met prior to test timeout", "Processing count %d", processingCount) }, }) + require.NoError(t, grp.Wait()) } func TestWork_Run_SpedUpIsFaster(t *testing.T) { @@ -485,7 +506,7 @@ func TestWork_Run_SpedUpIsFaster(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) defer cancel() - workerSlow.Run(ctx, nil) + require.NoError(t, workerSlow.Run(ctx, nil)) }() // use te speedup option so more go routines process the read messages. @@ -501,7 +522,7 @@ func TestWork_Run_SpedUpIsFaster(t *testing.T) { defer cancel() // wait for the cancel to occur via timeout - workerSpedUp.Run(ctx, nil) + require.NoError(t, workerSpedUp.Run(ctx, nil)) }() // by putting a delay in the work.do method we minimize the comparative overhead in creating additional goroutines @@ -532,6 +553,7 @@ func TestKafkaWork_ProcessorReturnsErrorIsLoggedAsWarning(t *testing.T) { l.EXPECT().Warnw(gomock.Any(), "Kafka topic single message processing failed", "error", gomock.Any(), "kmsg", gomock.Any()).MinTimes(1) l.EXPECT().Warnw(gomock.Any(), "Outside context canceled", "kmsg", gomock.Any(), "error", gomock.Any()).AnyTimes() l.EXPECT().Debugw(gomock.Any(), "Kafka topic message received", "offset", gomock.Any(), "partition", gomock.Any(), "topic", gomock.Any(), "groupID", gomock.Any()).AnyTimes() + l.EXPECT().Debugw(gomock.Any(), gomock.Any()).AnyTimes() msg := zkafka.GetFakeMessage("key", "val", &zfmt.JSONFormatter{}, NoopOnDone) mockReader := zkafka_mocks.NewMockReader(ctrl) @@ -555,9 +577,10 @@ func TestKafkaWork_ProcessorReturnsErrorIsLoggedAsWarning(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - work.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return work.Run(ctx, nil) + }) for { if count.Load() >= 1 { cancel() @@ -565,6 +588,7 @@ func TestKafkaWork_ProcessorReturnsErrorIsLoggedAsWarning(t *testing.T) { } time.Sleep(time.Microsecond * 100) } + require.NoError(t, grp.Wait()) } // TestKafkaWork_ProcessorTimeoutCausesContextCancellation demonstrates that ProcessTimeoutMillis will @@ -605,9 +629,10 @@ func TestKafkaWork_ProcessorTimeoutCausesContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - work.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return work.Run(ctx, nil) + }) for { if count.Load() >= 1 { cancel() @@ -615,6 +640,8 @@ func TestKafkaWork_ProcessorTimeoutCausesContextCancellation(t *testing.T) { } time.Sleep(time.Microsecond * 100) } + + require.NoError(t, grp.Wait()) } func TestWork_WithDeadLetterTopic_NoMessagesWrittenToDLTSinceNoErrorsOccurred(t *testing.T) { @@ -661,13 +688,13 @@ func TestWork_WithDeadLetterTopic_NoMessagesWrittenToDLTSinceNoErrorsOccurred(t }), ) - workCompleted := atomic.Bool{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w1.Run(ctx, nil) - workCompleted.Store(true) - }() + + grp := errgroup.Group{} + grp.Go(func() error { + return w1.Run(ctx, nil) + }) pollWait(func() bool { stop := cnt.Load() == 2 @@ -682,14 +709,7 @@ func TestWork_WithDeadLetterTopic_NoMessagesWrittenToDLTSinceNoErrorsOccurred(t maxWait: 10 * time.Second, }) - pollWait(func() bool { - return workCompleted.Load() - }, pollOpts{ - timeoutExit: func() { - require.Fail(t, "Timed out during poll waiting for work exit") - }, - maxWait: 10 * time.Second, - }) + require.NoError(t, grp.Wait()) } func TestWork_WithDeadLetterTopic_MessagesWrittenToDLTSinceErrorOccurred(t *testing.T) { @@ -740,9 +760,10 @@ func TestWork_WithDeadLetterTopic_MessagesWrittenToDLTSinceErrorOccurred(t *test ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w1.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w1.Run(ctx, nil) + }) pollWait(func() bool { return len(processor.ProcessedMessages()) == 2 @@ -753,9 +774,11 @@ func TestWork_WithDeadLetterTopic_MessagesWrittenToDLTSinceErrorOccurred(t *test pollPause: time.Millisecond, maxWait: 10 * time.Second, }) + cancel() + require.NoError(t, grp.Wait()) } -// TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing even if get topic writer (for DLT) returns error processing still continues. +// TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing shows that even if get topic writer (for DLT) returns error processing still continues. // This test configures a single virtual partition to process the reader. If processing halted on account of DLT write error, // the test wouldn't get through all 10 messages func TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing(t *testing.T) { @@ -801,9 +824,10 @@ func TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing(t *test ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w1.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w1.Run(ctx, nil) + }) // the previous poll doesn't fully guarantee that the piece of code that pollWait(func() bool { @@ -815,6 +839,8 @@ func TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing(t *test pollPause: time.Millisecond, maxWait: 10 * time.Second, }) + cancel() + require.NoError(t, grp.Wait()) } // TestWork_WithDeadLetterTopic_FailedToWriteToDLTDoesntPauseProcessing even if callback can't write to DLT, processing still continues. @@ -867,9 +893,10 @@ func TestWork_WithDeadLetterTopic_FailedToWriteToDLTDoesntPauseProcessing(t *tes ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w1.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w1.Run(ctx, nil) + }) // the previous poll doesn't fully guarantee that the piece of code that pollWait(func() bool { @@ -881,6 +908,8 @@ func TestWork_WithDeadLetterTopic_FailedToWriteToDLTDoesntPauseProcessing(t *tes pollPause: time.Millisecond, maxWait: 10 * time.Second, }) + cancel() + require.NoError(t, grp.Wait()) } func TestWork_DisableDLTWrite(t *testing.T) { @@ -934,9 +963,10 @@ func TestWork_DisableDLTWrite(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w1.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w1.Run(ctx, nil) + }) pollWait(func() bool { return len(processor.ProcessedMessages()) == 2 @@ -947,7 +977,8 @@ func TestWork_DisableDLTWrite(t *testing.T) { pollPause: time.Millisecond, maxWait: 10 * time.Second, }) - + cancel() + require.NoError(t, grp.Wait()) } // TestWork_Run_OnDoneCallbackCalledOnProcessorError asserts that our callback @@ -971,7 +1002,7 @@ func TestWork_Run_OnDoneCallbackCalledOnProcessorError(t *testing.T) { kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l)) - sig := make(chan struct{}, 1) + errCount := atomic.Int64{} processingError := errors.New("failed processing") p := &fakeProcessor{ @@ -982,7 +1013,7 @@ func TestWork_Run_OnDoneCallbackCalledOnProcessorError(t *testing.T) { var errReceived error errorCallback := func(ctx context.Context, _ *zkafka.Message, e error) { errReceived = e - sig <- struct{}{} + errCount.Add(1) } w := kwf.Create( @@ -994,11 +1025,21 @@ func TestWork_Run_OnDoneCallbackCalledOnProcessorError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { w.Run(ctx, nil) }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) // wait until channel from error callback is written to - <-sig + pollWait(func() bool { + return errCount.Load() >= 1 + }, pollOpts{ + exit: cancel, + timeoutExit: cancel, + }) require.ErrorIs(t, errReceived, processingError, "Expected processing error to be passed to callback") + cancel() + require.NoError(t, grp.Wait()) } func TestWork_Run_WritesMetrics(t *testing.T) { @@ -1023,7 +1064,7 @@ func TestWork_Run_WritesMetrics(t *testing.T) { lh := NewFakeLifecycleHooks(&lhMtx, &lhState) kwf := zkafka.NewWorkFactory(kcp, zkafka.WithWorkLifecycleHooks(lh)) - sig := make(chan struct{}, 1) + onDoneCount := atomic.Int64{} p := fakeProcessor{ process: func(ctx context.Context, message *zkafka.Message) error { @@ -1034,15 +1075,23 @@ func TestWork_Run_WritesMetrics(t *testing.T) { w := kwf.Create( zkafka.ConsumerTopicConfig{Topic: topicName, GroupID: "xxx"}, &p, - zkafka.WithOnDone(func(ctx context.Context, _ *zkafka.Message, e error) { sig <- struct{}{} }), + zkafka.WithOnDone(func(ctx context.Context, _ *zkafka.Message, e error) { onDoneCount.Add(1) }), ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { w.Run(ctx, nil) }() - // wait until channel from error callback is written to - <-sig + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) + pollWait(func() bool { + return onDoneCount.Load() >= 1 + }, pollOpts{ + exit: cancel, + timeoutExit: cancel, + }) + require.NoError(t, grp.Wait()) } func TestWork_LifecycleHooksCalledForEachItem_Reader(t *testing.T) { @@ -1085,9 +1134,10 @@ func TestWork_LifecycleHooksCalledForEachItem_Reader(t *testing.T) { atomic.AddInt32(&numProcessedItems, 1) })) - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return int(atomic.LoadInt32(&numProcessedItems)) == numMsgs @@ -1109,6 +1159,7 @@ func TestWork_LifecycleHooksCalledForEachItem_Reader(t *testing.T) { require.Equal(t, lhState.postProMeta[0].Topic, topicName) require.Equal(t, lhState.postProMeta[0].GroupID, "xxx") require.Equal(t, lhState.postProMeta[0].VirtualPartitionIndex, 0) + require.NoError(t, grp.Wait()) } func TestWork_LifecycleHooksPostReadCanUpdateContext(t *testing.T) { @@ -1157,9 +1208,10 @@ func TestWork_LifecycleHooksPostReadCanUpdateContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return int(atomic.LoadInt32(&numProcessedItems)) == numMsgs @@ -1168,6 +1220,7 @@ func TestWork_LifecycleHooksPostReadCanUpdateContext(t *testing.T) { }) require.Equal(t, capturedContext.Value("stewy"), "hello", "Expect context passed to process to include data injected at post read step") + require.NoError(t, grp.Wait()) } func TestWork_LifecycleHooksPostReadErrorDoesntHaltProcessing(t *testing.T) { @@ -1214,15 +1267,17 @@ func TestWork_LifecycleHooksPostReadErrorDoesntHaltProcessing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return int(atomic.LoadInt32(&numProcessedItems)) == numMsgs }, pollOpts{ exit: cancel, }) + require.NoError(t, grp.Wait()) } func TestWork_LifecycleHooksCalledForEachItem(t *testing.T) { @@ -1263,9 +1318,10 @@ func TestWork_LifecycleHooksCalledForEachItem(t *testing.T) { atomic.AddInt32(&numProcessedItems, 1) })) - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return int(atomic.LoadInt32(&numProcessedItems)) == numMsgs @@ -1276,6 +1332,7 @@ func TestWork_LifecycleHooksCalledForEachItem(t *testing.T) { require.Equal(t, numMsgs, lhState.numCalls["pre-processing"]) require.Equal(t, numMsgs, lhState.numCalls["post-processing"]) require.Equal(t, 0, lhState.numCalls["post-ack"]) + require.NoError(t, grp.Wait()) } type FakeLifecycleState struct { @@ -1347,7 +1404,8 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen kcp := zkafka_mocks.NewMockClientProvider(ctrl) kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Return(r, nil).AnyTimes() - l := zkafka.NoopLogger{} + //l := zkafka.NoopLogger{} + l := stdLogger{includeDebug: true} kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l)) fanoutCount := atomic.Int64{} @@ -1371,16 +1429,26 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen defer cancel() start := time.Now() - go func() { w.Run(ctx, nil) }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return fanoutCount.Load() >= 100 }, pollOpts{ - exit: cancel, - timeoutExit: cancel, + exit: cancel, + timeoutExit: func() { + require.Failf(t, "Timed out during poll", "Fanout Count %d", fanoutCount.Load()) + }, + maxWait: 10 * time.Second, }) require.LessOrEqual(t, processorCount.Load(), int64(2), "circuit breaker should prevent processor from being called after circuit break opens, since circuit breaker won't close again until after test completes. At most two messages are read prior to circuit breaker opening") require.LessOrEqual(t, time.Since(start), time.Second, "without busy loop breaker we expect fanout to called rapidly. Circuit break is open for 10 seconds. So asserting that fanout was called 100 times in a second is a rough assertion that busy loop breaker is not in effect. Typically these 100 calls should be on the order of micro or nanoseconds. But with resource contention in the pipeline we're more conservative with timing based assertions") + t.Log("begin") + cancel() + t.Log("nend") + require.NoError(t, grp.Wait()) } func TestWork_CircuitBreaker_WaitsForCircuitToOpen(t *testing.T) { @@ -1416,9 +1484,11 @@ func TestWork_CircuitBreaker_WaitsForCircuitToOpen(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() start := time.Now() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) + loopCount := int64(5) for { if processCount.Load() == loopCount { @@ -1428,6 +1498,7 @@ func TestWork_CircuitBreaker_WaitsForCircuitToOpen(t *testing.T) { time.Sleep(time.Microsecond * 100) } require.GreaterOrEqual(t, circuitBreakDuration*time.Duration(loopCount), time.Since(start), "Total time should be greater than circuit break duration * loop count") + require.NoError(t, grp.Wait()) } // TestWork_DontDeadlockWhenCircuitBreakerIsInHalfOpen this test protects against a bug that was demonstrated in another worker library which implements similar behavior. @@ -1476,9 +1547,10 @@ func TestWork_DontDeadlockWhenCircuitBreakerIsInHalfOpen(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) start := time.Now() for { @@ -1491,6 +1563,7 @@ func TestWork_DontDeadlockWhenCircuitBreakerIsInHalfOpen(t *testing.T) { time.Sleep(time.Microsecond) require.GreaterOrEqual(t, 10*time.Second, time.Since(start), "Process timeout likely not being respected. Likely entered a deadlock due to circuit breaker") } + require.NoError(t, grp.Wait()) } // Test_Bugfix_WorkPoolCanBeRestartedAfterShutdown this test is in response to a bug @@ -1534,12 +1607,10 @@ func Test_Bugfix_WorkPoolCanBeRestartedAfterShutdown(t *testing.T) { t.Log("Starting first work.Run") ctx, cancel := context.WithCancel(ctx) defer cancel() - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - w.Run(context.Background(), ctx.Done()) - wg.Done() - }() + wg := errgroup.Group{} + wg.Go(func() error { + return w.Run(context.Background(), ctx.Done()) + }) // wait for at least 1 message to be processed and then cancel the context (which will stop worker) // and break for loop @@ -1551,7 +1622,7 @@ func Test_Bugfix_WorkPoolCanBeRestartedAfterShutdown(t *testing.T) { time.Sleep(time.Millisecond) } // wait until worker fully completes and returns - wg.Wait() + require.NoError(t, wg.Wait()) t.Log("Completed first work.Run") // take a count of how many messages were processed. Because of concurrent processing it might be more than 1 @@ -1560,9 +1631,10 @@ func Test_Bugfix_WorkPoolCanBeRestartedAfterShutdown(t *testing.T) { // Start the worker again (make sure you don't pass in the canceled context). ctx2, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w.Run(context.Background(), ctx2.Done()) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(context.Background(), ctx2.Done()) + }) t.Log("Started polling for second work.Run") @@ -1634,9 +1706,10 @@ func Test_MsgOrderingIsMaintainedPerKeyWithAnyNumberOfVirtualPartitions(t *testi ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) pollWait(func() bool { return len(processor.ProcessedMessages()) == msgCount @@ -1669,6 +1742,7 @@ func Test_MsgOrderingIsMaintainedPerKeyWithAnyNumberOfVirtualPartitions(t *testi require.IsIncreasingf(t, vals0, "messages for key 0 are not sorted %v", vals0) require.IsIncreasingf(t, vals1, "messages for key 1 are not sorted") require.IsIncreasingf(t, vals2, "messages for key 2 are not sorted") + require.NoError(t, grp.Wait()) } func TestWork_LifecycleHookReaderPanicIsHandledAndMessagingProceeds(t *testing.T) { @@ -1721,9 +1795,10 @@ func TestWork_LifecycleHookReaderPanicIsHandledAndMessagingProceeds(t *testing.T }), ) - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) for { m.Lock() @@ -1737,6 +1812,7 @@ func TestWork_LifecycleHookReaderPanicIsHandledAndMessagingProceeds(t *testing.T } require.Len(t, processedMsgs, numMsgs) + require.NoError(t, grp.Wait()) } testPanic(zkafka.LifecycleHooks{ @@ -1826,9 +1902,11 @@ func BenchmarkWork_Run_CircuitBreaker_BusyLoopBreaker(b *testing.B) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) + require.NoError(b, grp.Wait()) } // $ go test -run=XXX -bench=BenchmarkWork_Run_CircuitBreaker_DisableBusyLoopBreaker -cpuprofile profile_cpu_disable.out @@ -1865,9 +1943,11 @@ func BenchmarkWork_Run_CircuitBreaker_DisableBusyLoopBreaker(b *testing.B) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - go func() { - w.Run(ctx, nil) - }() + grp := errgroup.Group{} + grp.Go(func() error { + return w.Run(ctx, nil) + }) + require.NoError(b, grp.Wait()) } func recoverThenFail(t *testing.T) { diff --git a/work.go b/work.go index d9c2150..5456d3d 100644 --- a/work.go +++ b/work.go @@ -2,6 +2,7 @@ package zkafka import ( "context" + "errors" "fmt" "hash/fnv" "strconv" @@ -9,7 +10,6 @@ import ( "time" "github.com/google/uuid" - "github.com/pkg/errors" "github.com/sony/gobreaker" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -122,10 +122,12 @@ func (w *Work) Run(ctx context.Context, shutdown <-chan struct{}) error { g := errgroup.Group{} g.Go(func() error { w.execReader(ctx, shutdown) + w.logger.Debugw(ctx, "exiting reader loop") return nil }) g.Go(func() error { w.execProcessors(ctx, shutdown) + w.logger.Debugw(ctx, "exiting processors loop") return nil }) return g.Wait() @@ -193,7 +195,12 @@ func (w *Work) fanOut(ctx context.Context, shutdown <-chan struct{}) { w.logger.Warnw(ctx, "Kafka topic processing circuit open", "topics", w.topicConfig.topics()) - w.blb.wait() + blocker, cleanup := w.blb.wait() + select { + case <-blocker: + case <-ctx.Done(): + cleanup() + } return } msg, err := w.readMessage(ctx, shutdown) @@ -229,10 +236,14 @@ func (w *Work) fanOut(ctx context.Context, shutdown <-chan struct{}) { } select { case w.messageBuffer <- struct{}{}: - w.virtualPartitions[index] <- workUnit{ + select { + case w.virtualPartitions[index] <- workUnit{ ctx: ctx, msg: msg, successFunc: successFunc, + }: + case <-ctx.Done(): + w.removeInWork(msg) } case <-shutdown: w.removeInWork(msg) @@ -422,7 +433,7 @@ func (w *Work) processSingle(ctx context.Context, msg *Message, partitionIndex i w.logger.Warnw(ctx, "Outside context canceled", "kmsg", msg, "error", x) return nil } - return errors.Wrap(ctxCancel.Err(), "processSingle execution canceled") + return fmt.Errorf("processSingle execution canceled: %w", ctxCancel.Err()) } }() @@ -698,9 +709,11 @@ type busyLoopBreaker struct { maxPause time.Duration } -func (b *busyLoopBreaker) wait() { +func (b *busyLoopBreaker) wait() (<-chan struct{}, func()) { if b.disabled { - return + closedCh := make(chan struct{}) + close(closedCh) + return closedCh, func() {} } c := make(chan struct{}) b.mtx.Lock() @@ -708,10 +721,11 @@ func (b *busyLoopBreaker) wait() { b.mtx.Unlock() timer := time.AfterFunc(b.maxPause, b.release) - // if wait is released externally, we'll want to release this timer's resources - defer timer.Stop() - <-c + return c, func() { + // if wait is released externally, we'll want to release this timer's resources + timer.Stop() + } } func (b *busyLoopBreaker) release() { @@ -733,7 +747,7 @@ func selectPartitionIndex(key string, isKeyNil bool, partitionCount int) (int, e h := fnv.New32a() _, err := h.Write([]byte(key)) if err != nil { - return 0, errors.Wrap(err, "failed to create partition index from seed string") + return 0, fmt.Errorf("failed to create partition index from seed string: %w", err) } index := int(h.Sum32()) return index % partitionCount, nil diff --git a/work_test.go b/work_test.go index b9267ba..37f4483 100644 --- a/work_test.go +++ b/work_test.go @@ -489,7 +489,8 @@ func Test_busyLoopBreaker_waitRespectsMaxPause(t *testing.T) { maxPause: time.Microsecond, } // if this doesn't respect maxPause it would pause here indefinitely - blb.wait() + blocker, _ := blb.wait() + <-blocker } // Test_busyLoopBreaker_waitRespectsRelease asserts that calling release() cancels that wait occuring at the wait() site @@ -504,7 +505,8 @@ func Test_busyLoopBreaker_waitRespectsRelease(t *testing.T) { // This signal can be used versus a timeout to assert blbFinishedWait := make(chan struct{}) go func() { - blb.wait() + blocker, _ := blb.wait() + <-blocker blbFinishedWait <- struct{}{} }() diff --git a/writer.go b/writer.go index 2224742..8e33e12 100644 --- a/writer.go +++ b/writer.go @@ -4,11 +4,12 @@ package zkafka import ( "context" + "errors" + "fmt" "sync" "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" - "github.com/pkg/errors" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.25.0" @@ -120,7 +121,7 @@ func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) - return Response{}, errors.Wrap(err, "error writing message") + return Response{}, fmt.Errorf("error writing message: %w", err) } // wait on callback channel for kafka broker to ack written message e := <-deliveryChan @@ -139,7 +140,7 @@ func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts if m.TopicPartition.Error != nil { w.logger.Debugw(ctx, "Delivery failed", "error", m.TopicPartition.Error) - return Response{}, errors.Wrap(m.TopicPartition.Error, "failed to produce kafka message") + return Response{}, fmt.Errorf("failed to produce kafka message: %w", m.TopicPartition.Error) } return Response{Partition: m.TopicPartition.Partition, Offset: int64(m.TopicPartition.Offset)}, nil } @@ -195,7 +196,7 @@ func (w *KWriter) write(ctx context.Context, msg keyValuePair, opts ...WriteOpti } value, err := w.fmtter.Marshall(msg.value) if err != nil { - return Response{}, errors.Wrap(err, "failed to marshall producer message") + return Response{}, fmt.Errorf("failed to marshall producer message: %w", err) } return w.WriteRaw(ctx, msg.key, value, opts...)