Skip to content

Commit

Permalink
Updated
Browse files Browse the repository at this point in the history
  • Loading branch information
stewartboyd119 committed Sep 22, 2024
1 parent 14f6589 commit 837a16e
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 25 deletions.
13 changes: 9 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts
if err != nil {
return nil, err
}
writerKey := getWriterKey(topicConfig)
c.mu.RLock()
w, exist := c.writers[topicConfig.ClientID]
w, exist := c.writers[writerKey]
if exist && !w.isClosed {
c.mu.RUnlock()
return w, nil
Expand All @@ -123,7 +124,7 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts

c.mu.Lock()
defer c.mu.Unlock()
w, exist = c.writers[topicConfig.ClientID]
w, exist = c.writers[writerKey]
if exist && !w.isClosed {
return w, nil
}
Expand Down Expand Up @@ -151,8 +152,8 @@ func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts
return nil, err
}

c.writers[topicConfig.ClientID] = writer
return c.writers[topicConfig.ClientID], nil
c.writers[writerKey] = writer
return c.writers[writerKey], nil
}

// Close terminates all cached readers and writers gracefully.
Expand Down Expand Up @@ -218,3 +219,7 @@ func getTracer(tp trace.TracerProvider) trace.Tracer {
}
return tp.Tracer(instrumentationName, trace.WithInstrumentationVersion("v1.0.0"))
}

func getWriterKey(cfg ProducerTopicConfig) string {
return cfg.ClientID + "-" + cfg.Topic
}
29 changes: 17 additions & 12 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestClient_Writer(t *testing.T) {
name: "get from cache",
fields: fields{
writers: map[string]*KWriter{
"test-id": &KWriter{},
"test-id-topic": &KWriter{},
},
},
args: args{
Expand Down Expand Up @@ -1020,9 +1020,7 @@ func Test_makeConfig_Producer(t *testing.T) {
},
topicConfig: ProducerTopicConfig{
ClientID: "clientid",
Topic: "",
Formatter: "",
SchemaID: 0,
Topic: "yyy",
Transaction: true,
},
},
Expand All @@ -1031,7 +1029,7 @@ func Test_makeConfig_Producer(t *testing.T) {
"enable.idempotence": true,
"request.required.acks": -1,
"max.in.flight.requests.per.connection": 1,
"client.id": "clientid",
"client.id": "clientid-yyy",
"linger.ms": 0,
},
},
Expand All @@ -1043,12 +1041,13 @@ func Test_makeConfig_Producer(t *testing.T) {
},
topicConfig: ProducerTopicConfig{
ClientID: "clientid",
Topic: "zzz",
DeliveryTimeoutMs: ptr(100),
},
},
want: kafka.ConfigMap{
"bootstrap.servers": "http://localhost:8080,https://localhost:8081",
"client.id": "clientid",
"client.id": "clientid-zzz",
"delivery.timeout.ms": 100,
"enable.idempotence": true,
"linger.ms": 0,
Expand All @@ -1062,6 +1061,7 @@ func Test_makeConfig_Producer(t *testing.T) {
},
topicConfig: ProducerTopicConfig{
ClientID: "clientid",
Topic: "zzz",
DeliveryTimeoutMs: ptr(100),
AdditionalProps: map[string]any{
"stewarts.random.property.not.included.in.topicconfig": 123,
Expand All @@ -1071,7 +1071,7 @@ func Test_makeConfig_Producer(t *testing.T) {
want: kafka.ConfigMap{
"bootstrap.servers": "http://localhost:8080",
"enable.idempotence": true,
"client.id": "clientid",
"client.id": "clientid-zzz",
"delivery.timeout.ms": 100,
"stewarts.random.property.not.included.in.topicconfig": 123,
"linger.ms": 0,
Expand All @@ -1087,6 +1087,7 @@ func Test_makeConfig_Producer(t *testing.T) {
},
topicConfig: ProducerTopicConfig{
ClientID: "clientid",
Topic: "abc",
DeliveryTimeoutMs: ptr(100),
EnableIdempotence: ptr(false),
RequestRequiredAcks: ptr("all"),
Expand All @@ -1098,7 +1099,7 @@ func Test_makeConfig_Producer(t *testing.T) {
},
want: kafka.ConfigMap{
"bootstrap.servers": "http://localhost:8080",
"client.id": "clientid",
"client.id": "clientid-abc",
"enable.idempotence": false,
"delivery.timeout.ms": 100,
"auto.commit.interval.ms": 20,
Expand All @@ -1120,12 +1121,13 @@ func Test_makeConfig_Producer(t *testing.T) {
},
topicConfig: ProducerTopicConfig{
ClientID: "clientid",
Topic: "xxx",
SaslUsername: ptr(""),
},
},
want: kafka.ConfigMap{
"bootstrap.servers": "http://localhost:8080",
"client.id": "clientid",
"client.id": "clientid-xxx",
"enable.idempotence": true,
"linger.ms": 0,
},
Expand All @@ -1140,12 +1142,13 @@ func Test_makeConfig_Producer(t *testing.T) {
},
topicConfig: ProducerTopicConfig{
ClientID: "clientid",
Topic: "xxx",
SaslUsername: ptr("usernameOverride"),
},
},
want: kafka.ConfigMap{
"bootstrap.servers": "http://localhost:8080",
"client.id": "clientid",
"client.id": "clientid-xxx",
"enable.idempotence": true,
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.password": "password",
Expand All @@ -1164,12 +1167,13 @@ func Test_makeConfig_Producer(t *testing.T) {
},
topicConfig: ProducerTopicConfig{
ClientID: "clientid",
Topic: "xxx",
SaslPassword: ptr("passwordOverride"),
},
},
want: kafka.ConfigMap{
"bootstrap.servers": "http://localhost:8080",
"client.id": "clientid",
"client.id": "clientid-xxx",
"enable.idempotence": true,
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.password": "passwordOverride",
Expand All @@ -1188,13 +1192,14 @@ func Test_makeConfig_Producer(t *testing.T) {
},
topicConfig: ProducerTopicConfig{
ClientID: "clientid",
Topic: "xxx",
SaslUsername: ptr("usernameOverride"),
SaslPassword: ptr("passwordOverride"),
},
},
want: kafka.ConfigMap{
"bootstrap.servers": "http://localhost:8080",
"client.id": "clientid",
"client.id": "clientid-xxx",
"enable.idempotence": true,
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.password": "passwordOverride",
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func makeConsumerConfig(conf Config, topicConfig ConsumerTopicConfig, prefix str
func makeProducerConfig(conf Config, topicConfig ProducerTopicConfig) kafka.ConfigMap {
configMap := kafka.ConfigMap{}

configMap[clientID] = topicConfig.ClientID
configMap[clientID] = getWriterKey(topicConfig)

if topicConfig.RequestRequiredAcks != nil {
configMap[requestRequiredAcks] = *topicConfig.RequestRequiredAcks
Expand Down
7 changes: 0 additions & 7 deletions test/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2060,13 +2060,6 @@ func TestWork_ShutdownCausesRunExit(t *testing.T) {
require.NoError(t, err)
}

func Test_FailedAuthentication(t *testing.T) {
// not sure if we can even do this with the local broker
// See if can require username/password for dynamically provisioned topic
t.Fail()

}

// $ go test -run=XXX -bench=BenchmarkWork_Run_CircuitBreaker_BusyLoopBreaker -cpuprofile profile_cpu.out
// $ go tool pprof --web profile_cpu.out
// $ go tool pprof -http=":8000" test.test ./profile_cpu.out
Expand Down
2 changes: 1 addition & 1 deletion work.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func (f WorkFactory) Create(topicConfig ConsumerTopicConfig, processor processor
if topicConfig.DeadLetterTopicConfig != nil {
cfg := *topicConfig.DeadLetterTopicConfig
if cfg.ClientID == "" {
cfg.ClientID = topicConfig.ClientID + "-auto-deadletter-publisher"
cfg.ClientID = topicConfig.ClientID
}
options = append(options, WithDeadLetterTopic(cfg))
}
Expand Down

0 comments on commit 837a16e

Please sign in to comment.