diff --git a/client.go b/client.go index d2908ae..bb0bbcc 100644 --- a/client.go +++ b/client.go @@ -113,8 +113,9 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts if err != nil { return nil, err } + writerKey := getWriterKey(topicConfig) c.mu.RLock() - w, exist := c.writers[topicConfig.ClientID] + w, exist := c.writers[writerKey] if exist && !w.isClosed { c.mu.RUnlock() return w, nil @@ -123,7 +124,7 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts c.mu.Lock() defer c.mu.Unlock() - w, exist = c.writers[topicConfig.ClientID] + w, exist = c.writers[writerKey] if exist && !w.isClosed { return w, nil } @@ -151,8 +152,8 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts return nil, err } - c.writers[topicConfig.ClientID] = writer - return c.writers[topicConfig.ClientID], nil + c.writers[writerKey] = writer + return c.writers[writerKey], nil } // Close terminates all cached readers and writers gracefully. @@ -218,3 +219,7 @@ func getTracer(tp trace.TracerProvider) trace.Tracer { } return tp.Tracer(instrumentationName, trace.WithInstrumentationVersion("v1.0.0")) } + +func getWriterKey(cfg ProducerTopicConfig) string { + return cfg.ClientID + "-" + cfg.Topic +} diff --git a/client_test.go b/client_test.go index ce6c0af..144e083 100644 --- a/client_test.go +++ b/client_test.go @@ -410,7 +410,7 @@ func TestClient_Writer(t *testing.T) { name: "get from cache", fields: fields{ writers: map[string]*KWriter{ - "test-id": &KWriter{}, + "test-id-topic": &KWriter{}, }, }, args: args{ @@ -1020,9 +1020,7 @@ func Test_makeConfig_Producer(t *testing.T) { }, topicConfig: ProducerTopicConfig{ ClientID: "clientid", - Topic: "", - Formatter: "", - SchemaID: 0, + Topic: "yyy", Transaction: true, }, }, @@ -1031,7 +1029,7 @@ func Test_makeConfig_Producer(t *testing.T) { "enable.idempotence": true, "request.required.acks": -1, "max.in.flight.requests.per.connection": 1, - "client.id": "clientid", + "client.id": "clientid-yyy", "linger.ms": 0, }, }, @@ -1043,12 +1041,13 @@ func Test_makeConfig_Producer(t *testing.T) { }, topicConfig: ProducerTopicConfig{ ClientID: "clientid", + Topic: "zzz", DeliveryTimeoutMs: ptr(100), }, }, want: kafka.ConfigMap{ "bootstrap.servers": "http://localhost:8080,https://localhost:8081", - "client.id": "clientid", + "client.id": "clientid-zzz", "delivery.timeout.ms": 100, "enable.idempotence": true, "linger.ms": 0, @@ -1062,6 +1061,7 @@ func Test_makeConfig_Producer(t *testing.T) { }, topicConfig: ProducerTopicConfig{ ClientID: "clientid", + Topic: "zzz", DeliveryTimeoutMs: ptr(100), AdditionalProps: map[string]any{ "stewarts.random.property.not.included.in.topicconfig": 123, @@ -1071,7 +1071,7 @@ func Test_makeConfig_Producer(t *testing.T) { want: kafka.ConfigMap{ "bootstrap.servers": "http://localhost:8080", "enable.idempotence": true, - "client.id": "clientid", + "client.id": "clientid-zzz", "delivery.timeout.ms": 100, "stewarts.random.property.not.included.in.topicconfig": 123, "linger.ms": 0, @@ -1087,6 +1087,7 @@ func Test_makeConfig_Producer(t *testing.T) { }, topicConfig: ProducerTopicConfig{ ClientID: "clientid", + Topic: "abc", DeliveryTimeoutMs: ptr(100), EnableIdempotence: ptr(false), RequestRequiredAcks: ptr("all"), @@ -1098,7 +1099,7 @@ func Test_makeConfig_Producer(t *testing.T) { }, want: kafka.ConfigMap{ "bootstrap.servers": "http://localhost:8080", - "client.id": "clientid", + "client.id": "clientid-abc", "enable.idempotence": false, "delivery.timeout.ms": 100, "auto.commit.interval.ms": 20, @@ -1120,12 +1121,13 @@ func Test_makeConfig_Producer(t *testing.T) { }, topicConfig: ProducerTopicConfig{ ClientID: "clientid", + Topic: "xxx", SaslUsername: ptr(""), }, }, want: kafka.ConfigMap{ "bootstrap.servers": "http://localhost:8080", - "client.id": "clientid", + "client.id": "clientid-xxx", "enable.idempotence": true, "linger.ms": 0, }, @@ -1140,12 +1142,13 @@ func Test_makeConfig_Producer(t *testing.T) { }, topicConfig: ProducerTopicConfig{ ClientID: "clientid", + Topic: "xxx", SaslUsername: ptr("usernameOverride"), }, }, want: kafka.ConfigMap{ "bootstrap.servers": "http://localhost:8080", - "client.id": "clientid", + "client.id": "clientid-xxx", "enable.idempotence": true, "sasl.mechanism": "SCRAM-SHA-256", "sasl.password": "password", @@ -1164,12 +1167,13 @@ func Test_makeConfig_Producer(t *testing.T) { }, topicConfig: ProducerTopicConfig{ ClientID: "clientid", + Topic: "xxx", SaslPassword: ptr("passwordOverride"), }, }, want: kafka.ConfigMap{ "bootstrap.servers": "http://localhost:8080", - "client.id": "clientid", + "client.id": "clientid-xxx", "enable.idempotence": true, "sasl.mechanism": "SCRAM-SHA-256", "sasl.password": "passwordOverride", @@ -1188,13 +1192,14 @@ func Test_makeConfig_Producer(t *testing.T) { }, topicConfig: ProducerTopicConfig{ ClientID: "clientid", + Topic: "xxx", SaslUsername: ptr("usernameOverride"), SaslPassword: ptr("passwordOverride"), }, }, want: kafka.ConfigMap{ "bootstrap.servers": "http://localhost:8080", - "client.id": "clientid", + "client.id": "clientid-xxx", "enable.idempotence": true, "sasl.mechanism": "SCRAM-SHA-256", "sasl.password": "passwordOverride", diff --git a/config.go b/config.go index 7c21684..dace81a 100644 --- a/config.go +++ b/config.go @@ -365,7 +365,7 @@ func makeConsumerConfig(conf Config, topicConfig ConsumerTopicConfig, prefix str func makeProducerConfig(conf Config, topicConfig ProducerTopicConfig) kafka.ConfigMap { configMap := kafka.ConfigMap{} - configMap[clientID] = topicConfig.ClientID + configMap[clientID] = getWriterKey(topicConfig) if topicConfig.RequestRequiredAcks != nil { configMap[requestRequiredAcks] = *topicConfig.RequestRequiredAcks diff --git a/test/worker_test.go b/test/worker_test.go index 08d4758..d1334c9 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -2060,13 +2060,6 @@ func TestWork_ShutdownCausesRunExit(t *testing.T) { require.NoError(t, err) } -func Test_FailedAuthentication(t *testing.T) { - // not sure if we can even do this with the local broker - // See if can require username/password for dynamically provisioned topic - t.Fail() - -} - // $ go test -run=XXX -bench=BenchmarkWork_Run_CircuitBreaker_BusyLoopBreaker -cpuprofile profile_cpu.out // $ go tool pprof --web profile_cpu.out // $ go tool pprof -http=":8000" test.test ./profile_cpu.out diff --git a/work.go b/work.go index 7bfdce9..b676bc9 100644 --- a/work.go +++ b/work.go @@ -672,7 +672,7 @@ func (f WorkFactory) Create(topicConfig ConsumerTopicConfig, processor processor if topicConfig.DeadLetterTopicConfig != nil { cfg := *topicConfig.DeadLetterTopicConfig if cfg.ClientID == "" { - cfg.ClientID = topicConfig.ClientID + "-auto-deadletter-publisher" + cfg.ClientID = topicConfig.ClientID } options = append(options, WithDeadLetterTopic(cfg)) }