From 2cacc6dcb532ff97d478f6a3cf047ef128a6df78 Mon Sep 17 00:00:00 2001 From: michelh Date: Tue, 29 Dec 2015 08:09:27 +0100 Subject: [PATCH 1/2] Memory leak fix --- .../Kafka.Client/Common/ClientIdAndBroker.cs | 79 +- .../Server/AbstractFetcherThread.cs | 903 +++++------ src/Kafka/Kafka.Client/Utils/ZkUtils.cs | 1328 ++++++++--------- 3 files changed, 1192 insertions(+), 1118 deletions(-) diff --git a/src/Kafka/Kafka.Client/Common/ClientIdAndBroker.cs b/src/Kafka/Kafka.Client/Common/ClientIdAndBroker.cs index c2d670b..c6d26c3 100644 --- a/src/Kafka/Kafka.Client/Common/ClientIdAndBroker.cs +++ b/src/Kafka/Kafka.Client/Common/ClientIdAndBroker.cs @@ -1,24 +1,57 @@ -namespace Kafka.Client.Common -{ - /// - /// Convenience case class since (clientId, brokerInfo) pairs are used to create - /// SyncProducer Request Stats and SimpleConsumer Request and Response Stats. - /// - public class ClientIdAndBroker - { - public string ClientId { get; private set; } - - public string BrokerInfo { get; private set; } - - public ClientIdAndBroker(string clientId, string brokerInfo) - { - this.ClientId = clientId; - this.BrokerInfo = brokerInfo; - } - - public override string ToString() - { - return string.Format("{0}-{1}", this.ClientId, this.BrokerInfo); - } - } +namespace Kafka.Client.Common +{ + /// + /// Convenience case class since (clientId, brokerInfo) pairs are used to create + /// SyncProducer Request Stats and SimpleConsumer Request and Response Stats. + /// + public class ClientIdAndBroker + { + public string ClientId { get; private set; } + + public string BrokerInfo { get; private set; } + + public ClientIdAndBroker(string clientId, string brokerInfo) + { + this.ClientId = clientId; + this.BrokerInfo = brokerInfo; + } + + public override string ToString() + { + return string.Format("{0}-{1}", this.ClientId, this.BrokerInfo); + } + + protected bool Equals(ClientIdAndBroker other) + { + return this.ClientId == other.ClientId && this.BrokerInfo == other.BrokerInfo; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != this.GetType()) + { + return false; + } + + return this.Equals((ClientIdAndBroker)obj); + } + + public override int GetHashCode() + { + unchecked + { + return ((this.ClientId != null ? this.ClientId.GetHashCode() : 0) * 397) ^ (this.BrokerInfo != null ? this.BrokerInfo.GetHashCode() : 0); + } + } + } } \ No newline at end of file diff --git a/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs b/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs index b215733..90a4a66 100644 --- a/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs +++ b/src/Kafka/Kafka.Client/Server/AbstractFetcherThread.cs @@ -1,432 +1,473 @@ -namespace Kafka.Client.Server -{ - using System; - using System.Collections.Generic; - using System.Linq; - - using Kafka.Client.Api; - using Kafka.Client.Clusters; - using Kafka.Client.Common; - using Kafka.Client.Common.Imported; - using Kafka.Client.Consumers; - using Kafka.Client.Extensions; - using Kafka.Client.Messages; - using Kafka.Client.Utils; - - using Spring.Threading.Locks; - - internal abstract class AbstractFetcherThread : ShutdownableThread - { - private string clientId; - - private Broker sourceBroker; - - private int socketTimeout; - - private int socketBufferSize; - - private int fetchSize; - - private int fetcherBrokerId; - - private int maxWait; - - private int minBytes; - - private readonly IDictionary partitionMap = new Dictionary(); - - private readonly ReentrantLock partitionMapLock; - - private readonly ICondition partitionMapCond; - - protected readonly SimpleConsumer simpleConsumer; - - private readonly string brokerInfo; - - private readonly ClientIdAndBroker metricId; - - public FetcherStats FetcherStats { get; private set; } - - public FetcherLagStats FetcherLagStats { get; private set; } - - private readonly FetchRequestBuilder fetchRequestBuilder; - - internal AbstractFetcherThread( - string name, - string clientId, - Broker sourceBroker, - int socketTimeout, - int socketBufferSize, - int fetchSize, - int fetcherBrokerId = -1, - int maxWait = 0, - int minBytes = 1, - bool isInterruptible = true) - : base(name, isInterruptible) - { - this.clientId = clientId; - this.sourceBroker = sourceBroker; - this.socketTimeout = socketTimeout; - this.socketBufferSize = socketBufferSize; - this.fetchSize = fetchSize; - this.fetcherBrokerId = fetcherBrokerId; - this.maxWait = maxWait; - this.minBytes = minBytes; - - this.partitionMapLock = new ReentrantLock(); - this.partitionMapCond = this.partitionMapLock.NewCondition(); - this.simpleConsumer = new SimpleConsumer( - sourceBroker.Host, sourceBroker.Port, socketTimeout, socketBufferSize, clientId); - this.brokerInfo = string.Format("host_{0}-port_{1}", sourceBroker.Host, sourceBroker.Port); - - this.metricId = new ClientIdAndBroker(clientId, this.brokerInfo); - - this.FetcherStats = new FetcherStats(this.metricId); - this.FetcherLagStats = new FetcherLagStats(this.metricId); - this.fetchRequestBuilder = - new FetchRequestBuilder().ClientId(clientId) - .ReplicaId(fetcherBrokerId) - .MaxWait(maxWait) - .MinBytes(minBytes); - } - - /// - /// process fetched Data - /// - /// - /// - /// - public abstract void ProcessPartitionData( - TopicAndPartition topicAndPartition, long fetchOffset, FetchResponsePartitionData partitionData); - - /// - /// handle a partition whose offset is out of range and return a new fetch offset - /// - /// - /// - public abstract long HandleOffsetOutOfRange(TopicAndPartition topicAndPartition); - - /// - /// deal with partitions with errors, potentially due to leadership changes - /// - /// - public abstract void HandlePartitionsWithErrors(IEnumerable partitions); - - public override void Shutdown() - { - base.Shutdown(); - this.simpleConsumer.Close(); - } - - public override void DoWork() - { - this.partitionMapLock.Lock(); - try - { - if (this.partitionMap.Count == 0) - { - this.partitionMapCond.Await(TimeSpan.FromMilliseconds(200)); - } - - foreach (var topicAndOffset in this.partitionMap) - { - var topicAndPartition = topicAndOffset.Key; - var offset = topicAndOffset.Value; - this.fetchRequestBuilder.AddFetch(topicAndPartition.Topic, topicAndPartition.Partiton, offset, this.fetchSize); - } - } - finally - { - this.partitionMapLock.Unlock(); - } - - var fetchRequest = this.fetchRequestBuilder.Build(); - if (fetchRequest.RequestInfo.Count > 0) - { - this.ProcessFetchRequest(fetchRequest); - } - } - - public void ProcessFetchRequest(FetchRequest fetchRequest) - { - var partitionsWithError = new HashSet(); - FetchResponse response = null; - try - { - Logger.DebugFormat("issuing to broker {0} of fetch request {1}", this.sourceBroker.Id, fetchRequest); - response = this.simpleConsumer.Fetch(fetchRequest); - } - catch (Exception e) - { - if (isRunning.Get()) - { - Logger.Error("Error in fetch " + fetchRequest, e); - this.partitionMapLock.Lock(); - try - { - foreach (var key in this.partitionMap.Keys) - { - partitionsWithError.Add(key); - } - } - finally - { - this.partitionMapLock.Unlock(); - } - } - } - - this.FetcherStats.RequestRate.Mark(); - - if (response != null) - { - // process fetched Data - this.partitionMapLock.Lock(); - try - { - foreach (var topicAndData in response.Data) - { - var topicAndPartition = topicAndData.Key; - var partitionData = topicAndData.Value; - var topic = topicAndPartition.Topic; - var partitionId = topicAndPartition.Partiton; - long currentOffset; - if (this.partitionMap.TryGetValue(topicAndPartition, out currentOffset) - && fetchRequest.RequestInfo[topicAndPartition].Offset == currentOffset) - { - // we append to the log if the current offset is defined and it is the same as the offset requested during fetch - switch (partitionData.Error) - { - case ErrorMapping.NoError: - try - { - var messages = (ByteBufferMessageSet)partitionData.Messages; - var validBytes = messages.ValidBytes; - var messageAndOffset = - messages.ShallowIterator().ToEnumerable().LastOrDefault(); - var newOffset = messageAndOffset != null - ? messageAndOffset.NextOffset - : currentOffset; - - this.partitionMap[topicAndPartition] = newOffset; - this.FetcherLagStats.GetFetcherLagStats(topic, partitionId).Lag = partitionData.Hw - - newOffset; - this.FetcherStats.ByteRate.Mark(validBytes); - - // Once we hand off the partition Data to the subclass, we can't mess with it any more in this thread - this.ProcessPartitionData(topicAndPartition, currentOffset, partitionData); - } - catch (InvalidMessageException ime) - { - // we log the error and continue. This ensures two things - // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag - // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and - // should get fixed in the subsequent fetches - Logger.ErrorFormat( - "Found invalid messages during fetch for partiton [{0},{1}] offset {2} error {3}", - topic, - partitionId, - currentOffset, - ime.Message); - } - catch (Exception e) - { - throw new KafkaException( - string.Format( - "error processing Data for partition [{0},{1}] offset {2}", - topic, - partitionId, - currentOffset), - e); - } - - break; - case ErrorMapping.OffsetOutOfRangeCode: - try - { - var newOffset = this.HandleOffsetOutOfRange(topicAndPartition); - this.partitionMap[topicAndPartition] = newOffset; - Logger.ErrorFormat( - "Current offset {0} for partiton [{1},{2}] out of range; reste offset to {3}", - currentOffset, - topic, - partitionId, - newOffset); - } - catch (Exception e) - { - Logger.Error( - string.Format( - "Error getting offset for partiton [{0},{1}] to broker {2}", - topic, - partitionId, - sourceBroker.Id), - e); - partitionsWithError.Add(topicAndPartition); - } - - break; - default: - if (isRunning.Get()) - { - Logger.ErrorFormat( - "Error for partition [{0},{1}] to broker {2}:{3}", - topic, - partitionId, - this.sourceBroker.Id, - ErrorMapping.ExceptionFor(partitionData.Error).GetType().Name); - partitionsWithError.Add(topicAndPartition); - } - - break; - } - } - } - } - finally - { - this.partitionMapLock.Unlock(); - } - } - - if (partitionsWithError.Count > 0) - { - Logger.DebugFormat("handling partitions with error for {0}", string.Join(",", partitionsWithError)); - this.HandlePartitionsWithErrors(partitionsWithError); - } - } - - public void AddPartitions(IDictionary partitionAndOffsets) - { - this.partitionMapLock.LockInterruptibly(); - try - { - foreach (var topicAndOffset in partitionAndOffsets) - { - var topicAndPartition = topicAndOffset.Key; - var offset = topicAndOffset.Value; - - // If the partitionMap already has the topic/partition, then do not update the map with the old offset - if (!this.partitionMap.ContainsKey(topicAndPartition)) - { - this.partitionMap[topicAndPartition] = PartitionTopicInfo.IsOffsetInvalid(offset) - ? this.HandleOffsetOutOfRange(topicAndPartition) - : offset; - } - - this.partitionMapCond.SignalAll(); - } - } - finally - { - this.partitionMapLock.Unlock(); - } - } - - public void RemovePartitions(ISet topicAndPartitions) - { - this.partitionMapLock.LockInterruptibly(); - try - { - foreach (var tp in topicAndPartitions) - { - this.partitionMap.Remove(tp); - } - } - finally - { - this.partitionMapLock.Unlock(); - } - } - - public int PartitionCount() - { - this.partitionMapLock.LockInterruptibly(); - try - { - return this.partitionMap.Count; - } - finally - { - this.partitionMapLock.Unlock(); - } - } - } - - internal class FetcherLagMetrics - { - private readonly AtomicLong lagVal = new AtomicLong(-1); - - public FetcherLagMetrics(ClientIdBrokerTopicPartition metricId) - { - MetersFactory.NewGauge(metricId + "-ConsumerLag", () => this.lagVal.Get()); - } - - internal long Lag - { - get - { - return this.lagVal.Get(); - } - - set - { - this.lagVal.Set(value); - } - } - } - - internal class FetcherLagStats - { - private readonly ClientIdAndBroker metricId; - - private readonly Func valueFactory; - - public Pool Stats { get; private set; } - - public FetcherLagStats(ClientIdAndBroker metricId) - { - this.metricId = metricId; - this.valueFactory = k => new FetcherLagMetrics(k); - this.Stats = new Pool(this.valueFactory); - } - - internal FetcherLagMetrics GetFetcherLagStats(string topic, int partitionId) - { - return this.Stats.GetAndMaybePut( - new ClientIdBrokerTopicPartition(this.metricId.ClientId, this.metricId.BrokerInfo, topic, partitionId)); - } - } - - internal class FetcherStats - { - public FetcherStats(ClientIdAndBroker metricId) - { - this.RequestRate = MetersFactory.NewMeter(metricId + "-RequestsPerSec", "requests", TimeSpan.FromSeconds(1)); - this.ByteRate = MetersFactory.NewMeter(metricId + "-BytesPerSec", "bytes", TimeSpan.FromSeconds(1)); - } - - internal IMeter RequestRate { get; private set; } - - internal IMeter ByteRate { get; private set; } - } - - internal class ClientIdBrokerTopicPartition - { - public string ClientId { get; private set; } - - public string BrokerInfo { get; private set; } - - public string Topic { get; private set; } - - public int PartitonId { get; private set; } - - public ClientIdBrokerTopicPartition(string clientId, string brokerInfo, string topic, int partitonId) - { - this.ClientId = clientId; - this.BrokerInfo = brokerInfo; - this.Topic = topic; - this.PartitonId = partitonId; - } - } +namespace Kafka.Client.Server +{ + using System; + using System.Collections.Generic; + using System.Linq; + + using Kafka.Client.Api; + using Kafka.Client.Clusters; + using Kafka.Client.Common; + using Kafka.Client.Common.Imported; + using Kafka.Client.Consumers; + using Kafka.Client.Extensions; + using Kafka.Client.Messages; + using Kafka.Client.Utils; + + using Spring.Threading.Locks; + + internal abstract class AbstractFetcherThread : ShutdownableThread + { + private string clientId; + + private Broker sourceBroker; + + private int socketTimeout; + + private int socketBufferSize; + + private int fetchSize; + + private int fetcherBrokerId; + + private int maxWait; + + private int minBytes; + + private readonly IDictionary partitionMap = new Dictionary(); + + private readonly ReentrantLock partitionMapLock; + + private readonly ICondition partitionMapCond; + + protected readonly SimpleConsumer simpleConsumer; + + private readonly string brokerInfo; + + private readonly ClientIdAndBroker metricId; + + public FetcherStats FetcherStats { get; private set; } + + public FetcherLagStats FetcherLagStats { get; private set; } + + private readonly FetchRequestBuilder fetchRequestBuilder; + + internal AbstractFetcherThread( + string name, + string clientId, + Broker sourceBroker, + int socketTimeout, + int socketBufferSize, + int fetchSize, + int fetcherBrokerId = -1, + int maxWait = 0, + int minBytes = 1, + bool isInterruptible = true) + : base(name, isInterruptible) + { + this.clientId = clientId; + this.sourceBroker = sourceBroker; + this.socketTimeout = socketTimeout; + this.socketBufferSize = socketBufferSize; + this.fetchSize = fetchSize; + this.fetcherBrokerId = fetcherBrokerId; + this.maxWait = maxWait; + this.minBytes = minBytes; + + this.partitionMapLock = new ReentrantLock(); + this.partitionMapCond = this.partitionMapLock.NewCondition(); + this.simpleConsumer = new SimpleConsumer( + sourceBroker.Host, sourceBroker.Port, socketTimeout, socketBufferSize, clientId); + this.brokerInfo = string.Format("host_{0}-port_{1}", sourceBroker.Host, sourceBroker.Port); + + this.metricId = new ClientIdAndBroker(clientId, this.brokerInfo); + + this.FetcherStats = new FetcherStats(this.metricId); + this.FetcherLagStats = new FetcherLagStats(this.metricId); + this.fetchRequestBuilder = + new FetchRequestBuilder().ClientId(clientId) + .ReplicaId(fetcherBrokerId) + .MaxWait(maxWait) + .MinBytes(minBytes); + } + + /// + /// process fetched Data + /// + /// + /// + /// + public abstract void ProcessPartitionData( + TopicAndPartition topicAndPartition, long fetchOffset, FetchResponsePartitionData partitionData); + + /// + /// handle a partition whose offset is out of range and return a new fetch offset + /// + /// + /// + public abstract long HandleOffsetOutOfRange(TopicAndPartition topicAndPartition); + + /// + /// deal with partitions with errors, potentially due to leadership changes + /// + /// + public abstract void HandlePartitionsWithErrors(IEnumerable partitions); + + public override void Shutdown() + { + base.Shutdown(); + this.simpleConsumer.Close(); + } + + public override void DoWork() + { + this.partitionMapLock.Lock(); + try + { + if (this.partitionMap.Count == 0) + { + this.partitionMapCond.Await(TimeSpan.FromMilliseconds(200)); + } + + foreach (var topicAndOffset in this.partitionMap) + { + var topicAndPartition = topicAndOffset.Key; + var offset = topicAndOffset.Value; + this.fetchRequestBuilder.AddFetch(topicAndPartition.Topic, topicAndPartition.Partiton, offset, this.fetchSize); + } + } + finally + { + this.partitionMapLock.Unlock(); + } + + var fetchRequest = this.fetchRequestBuilder.Build(); + if (fetchRequest.RequestInfo.Count > 0) + { + this.ProcessFetchRequest(fetchRequest); + } + } + + public void ProcessFetchRequest(FetchRequest fetchRequest) + { + var partitionsWithError = new HashSet(); + FetchResponse response = null; + try + { + Logger.DebugFormat("issuing to broker {0} of fetch request {1}", this.sourceBroker.Id, fetchRequest); + response = this.simpleConsumer.Fetch(fetchRequest); + } + catch (Exception e) + { + if (isRunning.Get()) + { + Logger.Error("Error in fetch " + fetchRequest, e); + this.partitionMapLock.Lock(); + try + { + foreach (var key in this.partitionMap.Keys) + { + partitionsWithError.Add(key); + } + } + finally + { + this.partitionMapLock.Unlock(); + } + } + } + + this.FetcherStats.RequestRate.Mark(); + + if (response != null) + { + // process fetched Data + this.partitionMapLock.Lock(); + try + { + foreach (var topicAndData in response.Data) + { + var topicAndPartition = topicAndData.Key; + var partitionData = topicAndData.Value; + var topic = topicAndPartition.Topic; + var partitionId = topicAndPartition.Partiton; + long currentOffset; + if (this.partitionMap.TryGetValue(topicAndPartition, out currentOffset) + && fetchRequest.RequestInfo[topicAndPartition].Offset == currentOffset) + { + // we append to the log if the current offset is defined and it is the same as the offset requested during fetch + switch (partitionData.Error) + { + case ErrorMapping.NoError: + try + { + var messages = (ByteBufferMessageSet)partitionData.Messages; + var validBytes = messages.ValidBytes; + var messageAndOffset = + messages.ShallowIterator().ToEnumerable().LastOrDefault(); + var newOffset = messageAndOffset != null + ? messageAndOffset.NextOffset + : currentOffset; + + this.partitionMap[topicAndPartition] = newOffset; + this.FetcherLagStats.GetFetcherLagStats(topic, partitionId).Lag = partitionData.Hw + - newOffset; + this.FetcherStats.ByteRate.Mark(validBytes); + + // Once we hand off the partition Data to the subclass, we can't mess with it any more in this thread + this.ProcessPartitionData(topicAndPartition, currentOffset, partitionData); + } + catch (InvalidMessageException ime) + { + // we log the error and continue. This ensures two things + // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag + // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and + // should get fixed in the subsequent fetches + Logger.ErrorFormat( + "Found invalid messages during fetch for partiton [{0},{1}] offset {2} error {3}", + topic, + partitionId, + currentOffset, + ime.Message); + } + catch (Exception e) + { + throw new KafkaException( + string.Format( + "error processing Data for partition [{0},{1}] offset {2}", + topic, + partitionId, + currentOffset), + e); + } + + break; + case ErrorMapping.OffsetOutOfRangeCode: + try + { + var newOffset = this.HandleOffsetOutOfRange(topicAndPartition); + this.partitionMap[topicAndPartition] = newOffset; + Logger.ErrorFormat( + "Current offset {0} for partiton [{1},{2}] out of range; reste offset to {3}", + currentOffset, + topic, + partitionId, + newOffset); + } + catch (Exception e) + { + Logger.Error( + string.Format( + "Error getting offset for partiton [{0},{1}] to broker {2}", + topic, + partitionId, + sourceBroker.Id), + e); + partitionsWithError.Add(topicAndPartition); + } + + break; + default: + if (isRunning.Get()) + { + Logger.ErrorFormat( + "Error for partition [{0},{1}] to broker {2}:{3}", + topic, + partitionId, + this.sourceBroker.Id, + ErrorMapping.ExceptionFor(partitionData.Error).GetType().Name); + partitionsWithError.Add(topicAndPartition); + } + + break; + } + } + } + } + finally + { + this.partitionMapLock.Unlock(); + } + } + + if (partitionsWithError.Count > 0) + { + Logger.DebugFormat("handling partitions with error for {0}", string.Join(",", partitionsWithError)); + this.HandlePartitionsWithErrors(partitionsWithError); + } + } + + public void AddPartitions(IDictionary partitionAndOffsets) + { + this.partitionMapLock.LockInterruptibly(); + try + { + foreach (var topicAndOffset in partitionAndOffsets) + { + var topicAndPartition = topicAndOffset.Key; + var offset = topicAndOffset.Value; + + // If the partitionMap already has the topic/partition, then do not update the map with the old offset + if (!this.partitionMap.ContainsKey(topicAndPartition)) + { + this.partitionMap[topicAndPartition] = PartitionTopicInfo.IsOffsetInvalid(offset) + ? this.HandleOffsetOutOfRange(topicAndPartition) + : offset; + } + + this.partitionMapCond.SignalAll(); + } + } + finally + { + this.partitionMapLock.Unlock(); + } + } + + public void RemovePartitions(ISet topicAndPartitions) + { + this.partitionMapLock.LockInterruptibly(); + try + { + foreach (var tp in topicAndPartitions) + { + this.partitionMap.Remove(tp); + } + } + finally + { + this.partitionMapLock.Unlock(); + } + } + + public int PartitionCount() + { + this.partitionMapLock.LockInterruptibly(); + try + { + return this.partitionMap.Count; + } + finally + { + this.partitionMapLock.Unlock(); + } + } + } + + internal class FetcherLagMetrics + { + private readonly AtomicLong lagVal = new AtomicLong(-1); + + public FetcherLagMetrics(ClientIdBrokerTopicPartition metricId) + { + MetersFactory.NewGauge(metricId + "-ConsumerLag", () => this.lagVal.Get()); + } + + internal long Lag + { + get + { + return this.lagVal.Get(); + } + + set + { + this.lagVal.Set(value); + } + } + } + + internal class FetcherLagStats + { + private readonly ClientIdAndBroker metricId; + + private readonly Func valueFactory; + + public Pool Stats { get; private set; } + + public FetcherLagStats(ClientIdAndBroker metricId) + { + this.metricId = metricId; + this.valueFactory = k => new FetcherLagMetrics(k); + this.Stats = new Pool(this.valueFactory); + } + + internal FetcherLagMetrics GetFetcherLagStats(string topic, int partitionId) + { + return this.Stats.GetAndMaybePut( + new ClientIdBrokerTopicPartition(this.metricId.ClientId, this.metricId.BrokerInfo, topic, partitionId)); + } + } + + internal class FetcherStats + { + public FetcherStats(ClientIdAndBroker metricId) + { + this.RequestRate = MetersFactory.NewMeter(metricId + "-RequestsPerSec", "requests", TimeSpan.FromSeconds(1)); + this.ByteRate = MetersFactory.NewMeter(metricId + "-BytesPerSec", "bytes", TimeSpan.FromSeconds(1)); + } + + internal IMeter RequestRate { get; private set; } + + internal IMeter ByteRate { get; private set; } + } + + internal class ClientIdBrokerTopicPartition + { + public string ClientId { get; private set; } + + public string BrokerInfo { get; private set; } + + public string Topic { get; private set; } + + public int PartitonId { get; private set; } + + public ClientIdBrokerTopicPartition(string clientId, string brokerInfo, string topic, int partitonId) + { + this.ClientId = clientId; + this.BrokerInfo = brokerInfo; + this.Topic = topic; + this.PartitonId = partitonId; + } + + public override string ToString() + { + return string.Format("{0}-{1}-{2}-{3}", this.ClientId, this.BrokerInfo, this.Topic, this.PartitonId); + } + + protected bool Equals(ClientIdBrokerTopicPartition other) + { + return this.ClientId == other.ClientId && this.BrokerInfo == other.BrokerInfo && this.Topic == other.Topic && this.PartitonId == other.PartitonId; + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != this.GetType()) + { + return false; + } + + return this.Equals((ClientIdBrokerTopicPartition)obj); + } + + public override int GetHashCode() + { + unchecked + { + return ((this.ClientId != null ? this.ClientId.GetHashCode() : 0) * 397) ^ + (this.BrokerInfo != null ? this.BrokerInfo.GetHashCode() * 164 : 0) ^ + (this.Topic != null ? this.Topic.GetHashCode() * 62 : 0) ^ + (this.PartitonId.GetHashCode()); + } + } + } } \ No newline at end of file diff --git a/src/Kafka/Kafka.Client/Utils/ZkUtils.cs b/src/Kafka/Kafka.Client/Utils/ZkUtils.cs index ac040d3..b2da5aa 100644 --- a/src/Kafka/Kafka.Client/Utils/ZkUtils.cs +++ b/src/Kafka/Kafka.Client/Utils/ZkUtils.cs @@ -1,665 +1,665 @@ -namespace Kafka.Client.Utils -{ - using System; - using System.Collections.Generic; - using System.Linq; - using System.Reflection; - using System.Text; - using System.Threading; - - using Kafka.Client.Clusters; - using Kafka.Client.Consumers; - using Kafka.Client.Extensions; - using Kafka.Client.ZKClient; - using Kafka.Client.ZKClient.Exceptions; - using Kafka.Client.ZKClient.Serialize; - - using Spring.Threading.Locks; - - using log4net; - - using Newtonsoft.Json; - using Newtonsoft.Json.Linq; - - using Org.Apache.Zookeeper.Data; - - public static class ZkUtils - { - private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - public const string ConsumersPath = "/consumers"; - - public const string BrokerIdsPath = "/brokers/ids"; - - public const string BrokerTopicsPath = "/brokers/topics"; - - public const string TopicConfigPath = "/config/topics"; - - public const string TopicConfigChangesPath = "/config/changes"; - - public const string ControllerPath = "/controller"; - - public const string ControllerEpochPath = "/controller_epoch"; - - public const string ReassignPartitionsPath = "/admin/reassign_partitions"; - - public const string DeleteTopicsPath = "/admin/delete_topics"; - - public const string PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"; - - public static string GetTopicPath(string topic) - { - return BrokerTopicsPath + "/" + topic; - } - - public static string GetTopicPartitionsPath(string topic) - { - return GetTopicPath(topic) + "/partitions"; - } - - public static string GetTopicConfigPath(string topic) - { - return TopicConfigPath + "/" + topic; - } - - public static string GetDeleteTopicPath(string topic) - { - return DeleteTopicsPath + "/" + topic; - } - - public static string GetTopicPartitionPath(string topic, int partitionId) - { - return GetTopicPartitionsPath(topic) + "/" + partitionId; - } - - public static string GetTopicPartitionLeaderAndIsrPath(string topic, int partitionId) - { - return GetTopicPartitionPath(topic, partitionId) + "/" + "state"; - } - - public static List GetSortedBrokerList(ZkClient zkClient) - { - return GetChildren(zkClient, BrokerIdsPath).Select(int.Parse).OrderBy(x => x).ToList(); - } - - public static List GetAllBrokersInCluster(ZkClient zkClient) - { - var brokerIds = GetChildrenParentMayNotExist(zkClient, BrokerIdsPath).OrderBy(x => x).ToList(); - return - brokerIds.Select(int.Parse) - .Select(id => GetBrokerInfo(zkClient, id)) - .Where(x => x != null) - .ToList(); - } - - public static int? GetLeaderForPartition(ZkClient zkClient, string topic, int partition) - { - var leaderAndIsrOpt = ReadDataMaybeNull(zkClient, GetTopicPartitionLeaderAndIsrPath(topic, partition)).Item1; - if (leaderAndIsrOpt != null) - { - return JObject.Parse(leaderAndIsrOpt).SelectToken("leader").Value(); - } - - return null; - } - - public static string GetConsumerPartitionOwnerPath(string group, string topic, int partition) - { - var topicDirs = new ZKGroupTopicDirs(group, topic); - return topicDirs.ConsumerOwnerDir + "/" + partition; - } - - /// - /// Get JSON partition to replica map from zookeeper. - /// - /// - /// - public static string ReplicaAssignmentZkData(Dictionary> map) - { - return JObject.FromObject(new { version = 1, partitions = map }).ToString(); - } - - - /// - /// make sure a persistent path exists in ZK. Create the path if not exist. - /// - /// - /// - public static void MakeSurePersistentPathExists(ZkClient client, string path) - { - if (!client.Exists(path)) - { - client.CreatePersistent(path, true); // won't throw NoNodeException or NodeExistsException - } - } - - /// - /// create the parent path - /// - /// - /// - private static void CreateParentPath(ZkClient client, string path) - { - var parentDir = path.Substring(0, path.LastIndexOf('/')); - if (parentDir.Length != 0) - { - client.CreatePersistent(parentDir, true); - } - } - - /// - /// Create an ephemeral node with the given path and data. Create parents if necessary. - /// - /// - /// - /// - private static void CreateEphemeralPath(ZkClient client, string path, string data) - { - try - { - client.CreateEphemeral(path, data); - } - catch (ZkNoNodeException) - { - CreateParentPath(client, path); - client.CreateEphemeral(path, data); - } - } - - /// - /// Create an ephemeral node with the given path and data. - /// Throw NodeExistException if node already exists. - /// - /// - /// - /// - public static void CreateEphemeralPathExpectConflict(ZkClient client, string path, string data) - { - try - { - CreateEphemeralPath(client, path, data); - } - catch (ZkNodeExistsException) - { - // this can happen when there is connection loss; make sure the Data is what we intend to write - string storedData = null; - try - { - storedData = ReadData(client, path).Item1; - } - catch (ZkNoNodeException) - { - // the node disappeared; treat as if node existed and let caller handles this - } - - if (storedData == null || storedData != data) - { - Logger.InfoFormat("Conflict in {0} Data: {1}, stored Data: {2}", path, data, storedData); - throw; - } - else - { - // otherwise, the creation succeeded, return normally - Logger.InfoFormat("{0} exists with value {1} during connection loss", path, data); - } - } - } - - /// - /// Create an ephemeral node with the given path and data. - /// Throw NodeExistsException if node already exists. - /// Handles the following ZK session timeout b_u_g - /// - /// https://issues.apache.org/jira/browse/ZOOKEEPER-1740 - /// - /// Upon receiving a NodeExistsException, read the data from the conflicted path and - /// trigger the checker function comparing the read data and the expected data, - /// If the checker function returns true then the above b_u_g might be encountered, back off and retry; - /// otherwise re-throw the exception - /// - /// - /// - /// - /// - /// - /// - public static void CreateEphemeralPathExpectConflictHandleZKBug( - ZkClient zkClient, - string path, - string data, - object expectedCallerData, - Func checker, - int backoffTime) - { - while (true) - { - try - { - CreateEphemeralPathExpectConflict(zkClient, path, data); - return; - } - catch (ZkNodeExistsException) - { - // An ephemeral node may still exist even after its corresponding session has expired - // due to a Zookeeper ug, in this case we need to retry writing until the previous node is deleted - // and hence the write succeeds without ZkNodeExistsException - var writtenData = ReadDataMaybeNull(zkClient, path).Item1; - if (writtenData != null) - { - if (checker(writtenData, expectedCallerData)) - { - Logger.InfoFormat( - "I wrote this conflicted ephemeral node [{0}] at {1} a while back in a different session, " - + "hence I will backoff for this node to be deleted by Zookeeper and retry", - data, - path); - - Thread.Sleep(backoffTime); - } - else - { - throw; - } - } - else - { - // the node disappeared; retry creating the ephemeral node immediately - } - } - } - } - - /// - /// Create an persistent node with the given path and data. Create parents if necessary. - /// - public static void CreatePersistentPath(ZkClient client, string path, string data) - { - try - { - client.CreatePersistent(path, data); - } - catch (ZkNoNodeException e) - { - CreateParentPath(client, path); - client.CreatePersistent(path, data); - } - } - - /// - /// Update the value of a persistent node with the given path and data. - /// create parrent directory if necessary. Never throw NodeExistException. - /// Return the updated path zkVersion - /// - /// - /// - /// - public static void UpdatePersistentPath(ZkClient client, string path, string data) - { - try - { - client.WriteData(path, data); - } - catch (ZkNoNodeException) - { - CreateParentPath(client, path); - try - { - client.CreatePersistent(path, data); - } - catch (ZkNodeExistsException) - { - client.WriteData(path, data); - } - } - } - - public static bool DeletePath(ZkClient client, string path) - { - try - { - return client.Delete(path); - } - catch (ZkNoNodeException) - { - // this can happen during a connection loss event, return normally - Logger.InfoFormat("{0} deleted during connection loss; This is ok. ", path); - return false; - } - } - - public static void MaybeDeletePath(string zkUrl, string dir) - { - try - { - var zk = new ZkClient(zkUrl, 30 * 1000, 30 * 1000, new ZkStringSerializer()); - zk.DeleteRecursive(dir); - zk.Dispose(); - } - catch - { - // swallow - } - } - - - public static Tuple ReadData(ZkClient client, string path) - { - var stat = new Stat(); - var dataString = client.ReadData(path, stat); - return Tuple.Create(dataString, stat); - } - - public static Tuple ReadDataMaybeNull(ZkClient client, string path) - { - var stat = new Stat(); - try - { - var obj = client.ReadData(path, stat); - return Tuple.Create(obj, stat); - } - catch (ZkNoNodeException) - { - return Tuple.Create((string)null, stat); - } - } - - public static List GetChildren(ZkClient zkClient, string path) - { - return zkClient.GetChildren(path); - } - - public static IList GetChildrenParentMayNotExist(ZkClient client, string path) - { - try - { - return client.GetChildren(path); - } - catch (ZkNoNodeException) - { - return null; - } - } - - public static bool PathExists(ZkClient client, string path) - { - return client.Exists(path); - } - - public static Cluster GetCluster(ZkClient zkClient) - { - var cluster = new Cluster(); - var nodes = GetChildrenParentMayNotExist(zkClient, BrokerIdsPath); - foreach (var node in nodes) - { - var brokerZkString = ReadData(zkClient, BrokerIdsPath + "/" + node).Item1; - cluster.Add(Broker.CreateBroker(int.Parse(node), brokerZkString)); - } - - return cluster; - } - - public static IDictionary>> GetPartitionAssignmentForTopics( - ZkClient zkClient, IList topics) - { - IDictionary>> ret = new Dictionary>>(); - foreach (var topic in topics) - { - var jsonPartitionMap = ReadDataMaybeNull(zkClient, GetTopicPath(topic)).Item1; - IDictionary> partitionMap = new Dictionary>(); - if (jsonPartitionMap != null) - { - var m = JObject.Parse(jsonPartitionMap); - var replicaMap = (IDictionary)m.Get("partitions"); - if (replicaMap != null) - { - partitionMap = replicaMap.ToDictionary( - kvp => int.Parse(kvp.Key), kvp => kvp.Value.Values().ToList()); - } - } - - Logger.DebugFormat("Partition map for /brokers/topics/{0} is {1}", topic, JObject.FromObject(partitionMap).ToString(Formatting.None)); - ret[topic] = partitionMap; - } - - return ret; - } - - public static IDictionary> GetConsumersPerTopic(ZkClient zkClient, string group) - { - var dirs = new ZKGroupDirs(group); - var consumers = GetChildrenParentMayNotExist(zkClient, dirs.ConsumerRegistryDir); - var consumerPerTopicMap = new Dictionary>(); - foreach (var consumer in consumers) - { - var topicCount = TopicCount.ConstructTopicCount(group, consumer, zkClient); - foreach (var topicAndConsumer in topicCount.GetConsumerThreadIdsPerTopic()) - { - var topic = topicAndConsumer.Key; - var consumerThreadIdSet = topicAndConsumer.Value; - foreach (var consumerThreadId in consumerThreadIdSet) - { - var curConsumers = consumerPerTopicMap.Get(topic); - if (curConsumers != null) - { - curConsumers.Add(consumerThreadId); - } - else - { - consumerPerTopicMap[topic] = new List { consumerThreadId }; - } - } - } - } - - consumerPerTopicMap = consumerPerTopicMap.ToDictionary(x => x.Key, x => x.Value.OrderBy(y => y).ToList()); - - return consumerPerTopicMap; - } - - /// - /// This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker - /// or throws an exception if the broker dies before the query to zookeeper finishes - /// - /// The zookeeper client connection - /// The broker id - /// An optional Broker object encapsulating the broker metadata - public static Broker GetBrokerInfo(ZkClient zkClient, int brokerId) - { - var brokerInfo = ReadDataMaybeNull(zkClient, BrokerIdsPath + "/" + brokerId); - if (brokerInfo != null) - { - return Broker.CreateBroker(brokerId, brokerInfo.Item1); - } - else - { - return null; - } - } - } - - internal class LeaderExistsOrChangedListener : IZkDataListener - { - private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - - private string topic; - - private int partition; - - private ReentrantLock leaderLock; - - private ICondition leaderExistsOrChanged; - - private int? oldLeaderOpt; - - private ZkClient zkClient; - - public LeaderExistsOrChangedListener(string topic, int partition, ReentrantLock leaderLock, ICondition leaderExistsOrChanged, int? oldLeaderOpt, ZkClient zkClient) - { - this.topic = topic; - this.partition = partition; - this.leaderLock = leaderLock; - this.leaderExistsOrChanged = leaderExistsOrChanged; - this.oldLeaderOpt = oldLeaderOpt; - this.zkClient = zkClient; - } - - public void HandleDataChange(string dataPath, object data) - { - var dataPathSplited = dataPath.Split('/'); - var t = dataPathSplited[dataPathSplited.Length - 4]; - var p = int.Parse(dataPathSplited[dataPathSplited.Length - 2]); - this.leaderLock.Lock(); - - try - { - if (t == this.topic && p == this.partition) - { - if (this.oldLeaderOpt.HasValue == false) - { - Logger.DebugFormat( - "In leader existence listener on partition [{0}, {1}], leader has been created", - topic, - partition); - this.leaderExistsOrChanged.Signal(); - } - else - { - var newLeaderOpt = ZkUtils.GetLeaderForPartition(this.zkClient, t, p); - if (newLeaderOpt.HasValue && newLeaderOpt.Value != this.oldLeaderOpt.Value) - { - Logger.DebugFormat("In leader change listener on partition [{0}, {1}], leader has been moved from {2} to {3}", topic, partition, oldLeaderOpt.Value, newLeaderOpt.Value); - this.leaderExistsOrChanged.Signal(); - } - } - } - } - finally - { - this.leaderLock.Unlock(); - } - - } - - public void HandleDataDeleted(string dataPath) - { - leaderLock.Lock(); - try - { - leaderExistsOrChanged.Signal(); - } - finally - { - leaderLock.Unlock(); - } - } - } - - public class ZkStringSerializer : IZkSerializer - { - public byte[] Serialize(object data) - { - return Encoding.UTF8.GetBytes(data.ToString()); - } - - public object Deserialize(byte[] bytes) - { - if (bytes == null) - { - return null; - } - - return Encoding.UTF8.GetString(bytes); - } - } - - public class ZKGroupDirs - { - public string Group { get; set; } - - public ZKGroupDirs(string @group) - { - this.Group = @group; - } - - public string ConsumerDir - { - get - { - return ZkUtils.ConsumersPath; - } - } - - public string ConsumerGroupDir - { - get - { - return this.ConsumerDir + "/" + this.Group; - } - } - - public string ConsumerRegistryDir - { - get - { - return this.ConsumerDir + "/ids"; - } - } - } - - public class ZKGroupTopicDirs : ZKGroupDirs - { - public string Topic { get; private set; } - - public ZKGroupTopicDirs(string @group, string topic) - : base(@group) - { - this.Topic = topic; - } - - public string ConsumerOffsetDir - { - get - { - return this.ConsumerGroupDir + "/offsets/" + this.Topic; - } - } - - public string ConsumerOwnerDir - { - get - { - return this.ConsumerGroupDir + "/owners/" + this.Topic; - } - } - } - - public class ZkConfig - { - public const int DefaultSessionTimeout = 6000; - - public const int DefaultConnectionTimeout = 6000; - - public const int DefaultSyncTime = 2000; - - public ZkConfig() - : this(null, DefaultSessionTimeout, DefaultConnectionTimeout, DefaultSyncTime) - { - } - - public ZkConfig(string zkconnect, int zksessionTimeoutMs, int zkconnectionTimeoutMs, int zksyncTimeMs) - { - this.ZkConnect = zkconnect; - this.ZkConnectionTimeoutMs = zkconnectionTimeoutMs; - this.ZkSessionTimeoutMs = zksessionTimeoutMs; - this.ZkSyncTimeMs = zksyncTimeMs; - } - - public string ZkConnect { get; set; } - - public int ZkSessionTimeoutMs { get; set; } - - public int ZkConnectionTimeoutMs { get; set; } - - public int ZkSyncTimeMs { get; set; } - } +namespace Kafka.Client.Utils +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Reflection; + using System.Text; + using System.Threading; + + using Kafka.Client.Clusters; + using Kafka.Client.Consumers; + using Kafka.Client.Extensions; + using Kafka.Client.ZKClient; + using Kafka.Client.ZKClient.Exceptions; + using Kafka.Client.ZKClient.Serialize; + + using Spring.Threading.Locks; + + using log4net; + + using Newtonsoft.Json; + using Newtonsoft.Json.Linq; + + using Org.Apache.Zookeeper.Data; + + public static class ZkUtils + { + private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + public const string ConsumersPath = "/consumers"; + + public const string BrokerIdsPath = "/brokers/ids"; + + public const string BrokerTopicsPath = "/brokers/topics"; + + public const string TopicConfigPath = "/config/topics"; + + public const string TopicConfigChangesPath = "/config/changes"; + + public const string ControllerPath = "/controller"; + + public const string ControllerEpochPath = "/controller_epoch"; + + public const string ReassignPartitionsPath = "/admin/reassign_partitions"; + + public const string DeleteTopicsPath = "/admin/delete_topics"; + + public const string PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"; + + public static string GetTopicPath(string topic) + { + return BrokerTopicsPath + "/" + topic; + } + + public static string GetTopicPartitionsPath(string topic) + { + return GetTopicPath(topic) + "/partitions"; + } + + public static string GetTopicConfigPath(string topic) + { + return TopicConfigPath + "/" + topic; + } + + public static string GetDeleteTopicPath(string topic) + { + return DeleteTopicsPath + "/" + topic; + } + + public static string GetTopicPartitionPath(string topic, int partitionId) + { + return GetTopicPartitionsPath(topic) + "/" + partitionId; + } + + public static string GetTopicPartitionLeaderAndIsrPath(string topic, int partitionId) + { + return GetTopicPartitionPath(topic, partitionId) + "/" + "state"; + } + + public static List GetSortedBrokerList(ZkClient zkClient) + { + return GetChildren(zkClient, BrokerIdsPath).Select(int.Parse).OrderBy(x => x).ToList(); + } + + public static List GetAllBrokersInCluster(ZkClient zkClient) + { + var brokerIds = GetChildrenParentMayNotExist(zkClient, BrokerIdsPath).OrderBy(x => x).ToList(); + return + brokerIds.Select(int.Parse) + .Select(id => GetBrokerInfo(zkClient, id)) + .Where(x => x != null) + .ToList(); + } + + public static int? GetLeaderForPartition(ZkClient zkClient, string topic, int partition) + { + var leaderAndIsrOpt = ReadDataMaybeNull(zkClient, GetTopicPartitionLeaderAndIsrPath(topic, partition)).Item1; + if (leaderAndIsrOpt != null) + { + return JObject.Parse(leaderAndIsrOpt).SelectToken("leader").Value(); + } + + return null; + } + + public static string GetConsumerPartitionOwnerPath(string group, string topic, int partition) + { + var topicDirs = new ZKGroupTopicDirs(group, topic); + return topicDirs.ConsumerOwnerDir + "/" + partition; + } + + /// + /// Get JSON partition to replica map from zookeeper. + /// + /// + /// + public static string ReplicaAssignmentZkData(Dictionary> map) + { + return JObject.FromObject(new { version = 1, partitions = map }).ToString(); + } + + + /// + /// make sure a persistent path exists in ZK. Create the path if not exist. + /// + /// + /// + public static void MakeSurePersistentPathExists(ZkClient client, string path) + { + if (!client.Exists(path)) + { + client.CreatePersistent(path, true); // won't throw NoNodeException or NodeExistsException + } + } + + /// + /// create the parent path + /// + /// + /// + private static void CreateParentPath(ZkClient client, string path) + { + var parentDir = path.Substring(0, path.LastIndexOf('/')); + if (parentDir.Length != 0) + { + client.CreatePersistent(parentDir, true); + } + } + + /// + /// Create an ephemeral node with the given path and data. Create parents if necessary. + /// + /// + /// + /// + private static void CreateEphemeralPath(ZkClient client, string path, string data) + { + try + { + client.CreateEphemeral(path, data); + } + catch (ZkNoNodeException) + { + CreateParentPath(client, path); + client.CreateEphemeral(path, data); + } + } + + /// + /// Create an ephemeral node with the given path and data. + /// Throw NodeExistException if node already exists. + /// + /// + /// + /// + public static void CreateEphemeralPathExpectConflict(ZkClient client, string path, string data) + { + try + { + CreateEphemeralPath(client, path, data); + } + catch (ZkNodeExistsException) + { + // this can happen when there is connection loss; make sure the Data is what we intend to write + string storedData = null; + try + { + storedData = ReadData(client, path).Item1; + } + catch (ZkNoNodeException) + { + // the node disappeared; treat as if node existed and let caller handles this + } + + if (storedData == null || storedData != data) + { + Logger.InfoFormat("Conflict in {0} Data: {1}, stored Data: {2}", path, data, storedData); + throw; + } + else + { + // otherwise, the creation succeeded, return normally + Logger.InfoFormat("{0} exists with value {1} during connection loss", path, data); + } + } + } + + /// + /// Create an ephemeral node with the given path and data. + /// Throw NodeExistsException if node already exists. + /// Handles the following ZK session timeout b_u_g + /// + /// https://issues.apache.org/jira/browse/ZOOKEEPER-1740 + /// + /// Upon receiving a NodeExistsException, read the data from the conflicted path and + /// trigger the checker function comparing the read data and the expected data, + /// If the checker function returns true then the above b_u_g might be encountered, back off and retry; + /// otherwise re-throw the exception + /// + /// + /// + /// + /// + /// + /// + public static void CreateEphemeralPathExpectConflictHandleZKBug( + ZkClient zkClient, + string path, + string data, + object expectedCallerData, + Func checker, + int backoffTime) + { + while (true) + { + try + { + CreateEphemeralPathExpectConflict(zkClient, path, data); + return; + } + catch (ZkNodeExistsException) + { + // An ephemeral node may still exist even after its corresponding session has expired + // due to a Zookeeper ug, in this case we need to retry writing until the previous node is deleted + // and hence the write succeeds without ZkNodeExistsException + var writtenData = ReadDataMaybeNull(zkClient, path).Item1; + if (writtenData != null) + { + if (checker(writtenData, expectedCallerData)) + { + Logger.InfoFormat( + "I wrote this conflicted ephemeral node [{0}] at {1} a while back in a different session, " + + "hence I will backoff for this node to be deleted by Zookeeper and retry", + data, + path); + + Thread.Sleep(backoffTime); + } + else + { + throw; + } + } + else + { + // the node disappeared; retry creating the ephemeral node immediately + } + } + } + } + + /// + /// Create an persistent node with the given path and data. Create parents if necessary. + /// + public static void CreatePersistentPath(ZkClient client, string path, string data) + { + try + { + client.CreatePersistent(path, data); + } + catch (ZkNoNodeException e) + { + CreateParentPath(client, path); + client.CreatePersistent(path, data); + } + } + + /// + /// Update the value of a persistent node with the given path and data. + /// create parrent directory if necessary. Never throw NodeExistException. + /// Return the updated path zkVersion + /// + /// + /// + /// + public static void UpdatePersistentPath(ZkClient client, string path, string data) + { + try + { + client.WriteData(path, data); + } + catch (ZkNoNodeException) + { + CreateParentPath(client, path); + try + { + client.CreatePersistent(path, data); + } + catch (ZkNodeExistsException) + { + client.WriteData(path, data); + } + } + } + + public static bool DeletePath(ZkClient client, string path) + { + try + { + return client.Delete(path); + } + catch (ZkNoNodeException) + { + // this can happen during a connection loss event, return normally + Logger.InfoFormat("{0} deleted during connection loss; This is ok. ", path); + return false; + } + } + + public static void MaybeDeletePath(string zkUrl, string dir) + { + try + { + var zk = new ZkClient(zkUrl, 30 * 1000, 30 * 1000, new ZkStringSerializer()); + zk.DeleteRecursive(dir); + zk.Dispose(); + } + catch + { + // swallow + } + } + + + public static Tuple ReadData(ZkClient client, string path) + { + var stat = new Stat(); + var dataString = client.ReadData(path, stat); + return Tuple.Create(dataString, stat); + } + + public static Tuple ReadDataMaybeNull(ZkClient client, string path) + { + var stat = new Stat(); + try + { + var obj = client.ReadData(path, stat); + return Tuple.Create(obj, stat); + } + catch (ZkNoNodeException) + { + return Tuple.Create((string)null, stat); + } + } + + public static List GetChildren(ZkClient zkClient, string path) + { + return zkClient.GetChildren(path); + } + + public static IList GetChildrenParentMayNotExist(ZkClient client, string path) + { + try + { + return client.GetChildren(path); + } + catch (ZkNoNodeException) + { + return null; + } + } + + public static bool PathExists(ZkClient client, string path) + { + return client.Exists(path); + } + + public static Cluster GetCluster(ZkClient zkClient) + { + var cluster = new Cluster(); + var nodes = GetChildrenParentMayNotExist(zkClient, BrokerIdsPath); + foreach (var node in nodes) + { + var brokerZkString = ReadData(zkClient, BrokerIdsPath + "/" + node).Item1; + cluster.Add(Broker.CreateBroker(int.Parse(node), brokerZkString)); + } + + return cluster; + } + + public static IDictionary>> GetPartitionAssignmentForTopics( + ZkClient zkClient, IList topics) + { + IDictionary>> ret = new Dictionary>>(); + foreach (var topic in topics) + { + var jsonPartitionMap = ReadDataMaybeNull(zkClient, GetTopicPath(topic)).Item1; + IDictionary> partitionMap = new Dictionary>(); + if (jsonPartitionMap != null) + { + var m = JObject.Parse(jsonPartitionMap); + var replicaMap = (IDictionary)m.Get("partitions"); + if (replicaMap != null) + { + partitionMap = replicaMap.ToDictionary( + kvp => int.Parse(kvp.Key), kvp => kvp.Value.Values().ToList()); + } + } + + Logger.DebugFormat("Partition map for /brokers/topics/{0} is {1}", topic, JObject.FromObject(partitionMap).ToString(Formatting.None)); + ret[topic] = partitionMap; + } + + return ret; + } + + public static IDictionary> GetConsumersPerTopic(ZkClient zkClient, string group) + { + var dirs = new ZKGroupDirs(group); + var consumers = GetChildrenParentMayNotExist(zkClient, dirs.ConsumerRegistryDir); + var consumerPerTopicMap = new Dictionary>(); + foreach (var consumer in consumers) + { + var topicCount = TopicCount.ConstructTopicCount(group, consumer, zkClient); + foreach (var topicAndConsumer in topicCount.GetConsumerThreadIdsPerTopic()) + { + var topic = topicAndConsumer.Key; + var consumerThreadIdSet = topicAndConsumer.Value; + foreach (var consumerThreadId in consumerThreadIdSet) + { + var curConsumers = consumerPerTopicMap.Get(topic); + if (curConsumers != null) + { + curConsumers.Add(consumerThreadId); + } + else + { + consumerPerTopicMap[topic] = new List { consumerThreadId }; + } + } + } + } + + consumerPerTopicMap = consumerPerTopicMap.ToDictionary(x => x.Key, x => x.Value.OrderBy(y => y).ToList()); + + return consumerPerTopicMap; + } + + /// + /// This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker + /// or throws an exception if the broker dies before the query to zookeeper finishes + /// + /// The zookeeper client connection + /// The broker id + /// An optional Broker object encapsulating the broker metadata + public static Broker GetBrokerInfo(ZkClient zkClient, int brokerId) + { + var brokerInfo = ReadDataMaybeNull(zkClient, BrokerIdsPath + "/" + brokerId); + if (brokerInfo != null) + { + return Broker.CreateBroker(brokerId, brokerInfo.Item1); + } + else + { + return null; + } + } + } + + internal class LeaderExistsOrChangedListener : IZkDataListener + { + private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + private string topic; + + private int partition; + + private ReentrantLock leaderLock; + + private ICondition leaderExistsOrChanged; + + private int? oldLeaderOpt; + + private ZkClient zkClient; + + public LeaderExistsOrChangedListener(string topic, int partition, ReentrantLock leaderLock, ICondition leaderExistsOrChanged, int? oldLeaderOpt, ZkClient zkClient) + { + this.topic = topic; + this.partition = partition; + this.leaderLock = leaderLock; + this.leaderExistsOrChanged = leaderExistsOrChanged; + this.oldLeaderOpt = oldLeaderOpt; + this.zkClient = zkClient; + } + + public void HandleDataChange(string dataPath, object data) + { + var dataPathSplited = dataPath.Split('/'); + var t = dataPathSplited[dataPathSplited.Length - 4]; + var p = int.Parse(dataPathSplited[dataPathSplited.Length - 2]); + this.leaderLock.Lock(); + + try + { + if (t == this.topic && p == this.partition) + { + if (this.oldLeaderOpt.HasValue == false) + { + Logger.DebugFormat( + "In leader existence listener on partition [{0}, {1}], leader has been created", + topic, + partition); + this.leaderExistsOrChanged.Signal(); + } + else + { + var newLeaderOpt = ZkUtils.GetLeaderForPartition(this.zkClient, t, p); + if (newLeaderOpt.HasValue && newLeaderOpt.Value != this.oldLeaderOpt.Value) + { + Logger.DebugFormat("In leader change listener on partition [{0}, {1}], leader has been moved from {2} to {3}", topic, partition, oldLeaderOpt.Value, newLeaderOpt.Value); + this.leaderExistsOrChanged.Signal(); + } + } + } + } + finally + { + this.leaderLock.Unlock(); + } + + } + + public void HandleDataDeleted(string dataPath) + { + leaderLock.Lock(); + try + { + leaderExistsOrChanged.Signal(); + } + finally + { + leaderLock.Unlock(); + } + } + } + + public class ZkStringSerializer : IZkSerializer + { + public byte[] Serialize(object data) + { + return Encoding.UTF8.GetBytes(data.ToString()); + } + + public object Deserialize(byte[] bytes) + { + if (bytes == null) + { + return null; + } + + return Encoding.UTF8.GetString(bytes); + } + } + + public class ZKGroupDirs + { + public string Group { get; set; } + + public ZKGroupDirs(string @group) + { + this.Group = @group; + } + + public string ConsumerDir + { + get + { + return ZkUtils.ConsumersPath; + } + } + + public string ConsumerGroupDir + { + get + { + return this.ConsumerDir + "/" + this.Group; + } + } + + public string ConsumerRegistryDir + { + get + { + return this.ConsumerDir + "/ids"; + } + } + } + + public class ZKGroupTopicDirs : ZKGroupDirs + { + public string Topic { get; private set; } + + public ZKGroupTopicDirs(string @group, string topic) + : base(@group) + { + this.Topic = topic; + } + + public string ConsumerOffsetDir + { + get + { + return this.ConsumerGroupDir + "/offsets/" + this.Topic; + } + } + + public string ConsumerOwnerDir + { + get + { + return this.ConsumerGroupDir + "/owners/" + this.Topic; + } + } + } + + public class ZkConfig + { + public const int DefaultSessionTimeout = 6000; + + public const int DefaultConnectionTimeout = 6000; + + public const int DefaultSyncTime = 2000; + + public ZkConfig() + : this(null, DefaultSessionTimeout, DefaultConnectionTimeout, DefaultSyncTime) + { + } + + public ZkConfig(string zkconnect, int zksessionTimeoutMs, int zkconnectionTimeoutMs, int zksyncTimeMs) + { + this.ZkConnect = zkconnect; + this.ZkConnectionTimeoutMs = zkconnectionTimeoutMs; + this.ZkSessionTimeoutMs = zksessionTimeoutMs; + this.ZkSyncTimeMs = zksyncTimeMs; + } + + public string ZkConnect { get; set; } + + public int ZkSessionTimeoutMs { get; set; } + + public int ZkConnectionTimeoutMs { get; set; } + + public int ZkSyncTimeMs { get; set; } + } } \ No newline at end of file From 2fdff7996a4c928c3cc1b44f85bd30ecb116f916 Mon Sep 17 00:00:00 2001 From: michelh Date: Tue, 29 Dec 2015 08:10:31 +0100 Subject: [PATCH 2/2] Multiple consumer groups bug fix --- src/Kafka/Kafka.Client/Utils/ZkUtils.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Kafka/Kafka.Client/Utils/ZkUtils.cs b/src/Kafka/Kafka.Client/Utils/ZkUtils.cs index b2da5aa..0b34b5c 100644 --- a/src/Kafka/Kafka.Client/Utils/ZkUtils.cs +++ b/src/Kafka/Kafka.Client/Utils/ZkUtils.cs @@ -601,7 +601,7 @@ public string ConsumerRegistryDir { get { - return this.ConsumerDir + "/ids"; + return this.ConsumerGroupDir + "/ids"; } } }