Skip to content

Commit

Permalink
KAFKA-12271 Immutable container classes to support new MetadataCache …
Browse files Browse the repository at this point in the history
…(#10018)

Three new classes are added to support the upcoming changes to MetadataCache
required for handling Raft metadata records.

Reviewers: Jason Gustafson <[email protected]>

Co-authored-by: David Arthur <[email protected]>
  • Loading branch information
a0x8o and mumrah committed Feb 3, 2021
1 parent 230c81d commit 4b511a7
Show file tree
Hide file tree
Showing 22 changed files with 929 additions and 54 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ project(':core') {

dependencies {
compile project(':clients')
compile project(':metadata')
compile project(':raft')
compile libs.jacksonDatabind
compile libs.jacksonModuleScala
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-jmh-benchmarks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<allow pkg="kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.timeline" />

<subpackage name="cache">
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@
<allow class="javax.servlet.http.HttpServletResponse" />
<allow class="javax.ws.rs.core.Response" />
<allow pkg="com.fasterxml.jackson.core.type" />
<allow pkg="org.apache.kafka.metadata" />
</subpackage>
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.connect.rest.basic.auth.extension;

import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.slf4j.Logger;
Expand All @@ -26,6 +27,7 @@
import javax.security.auth.login.Configuration;
import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;

/**
* Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link
Expand Down Expand Up @@ -62,14 +64,38 @@ public class BasicAuthSecurityRestExtension implements ConnectRestExtension {

private static final Logger log = LoggerFactory.getLogger(BasicAuthSecurityRestExtension.class);

private static final Supplier<Configuration> CONFIGURATION = initializeConfiguration(Configuration::getConfiguration);

// Capture the JVM's global JAAS configuration as soon as possible, as it may be altered later
// by connectors, converters, other REST extensions, etc.
private static final Configuration CONFIGURATION = Configuration.getConfiguration();
static Supplier<Configuration> initializeConfiguration(Supplier<Configuration> configurationSupplier) {
try {
Configuration configuration = configurationSupplier.get();
return () -> configuration;
} catch (Exception e) {
// We have to be careful not to throw anything here as this static block gets executed during plugin scanning and any exceptions will
// cause the worker to fail during startup, even if it's not configured to use the basic auth extension.
return () -> {
throw new ConnectException("Failed to retrieve JAAS configuration", e);
};
}
}

private final Supplier<Configuration> configuration;

public BasicAuthSecurityRestExtension() {
this(CONFIGURATION);
}

// For testing
BasicAuthSecurityRestExtension(Supplier<Configuration> configuration) {
this.configuration = configuration;
}

@Override
public void register(ConnectRestExtensionContext restPluginContext) {
log.trace("Registering JAAS basic auth filter");
restPluginContext.configurable().register(new JaasBasicAuthFilter(CONFIGURATION));
restPluginContext.configurable().register(new JaasBasicAuthFilter(configuration.get()));
log.trace("Finished registering JAAS basic auth filter");
}

Expand All @@ -80,7 +106,8 @@ public void close() throws IOException {

@Override
public void configure(Map<String, ?> configs) {

// If we failed to retrieve a JAAS configuration during startup, throw that exception now
configuration.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.connect.rest.basic.auth.extension;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.easymock.Capture;
import org.easymock.EasyMock;
Expand All @@ -27,7 +28,15 @@
import javax.security.auth.login.Configuration;
import javax.ws.rs.core.Configurable;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BasicAuthSecurityRestExtensionTest {

Expand Down Expand Up @@ -63,4 +72,44 @@ public void testJaasConfigurationNotOverwritten() {
assertNotEquals(overwrittenConfiguration, jaasFilter.getValue().configuration,
"Overwritten JAAS configuration should not be used by basic auth REST extension");
}

@Test
public void testBadJaasConfigInitialization() {
SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad"));
Supplier<Configuration> configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> {
throw jaasConfigurationException;
});

ConnectException thrownException = assertThrows(ConnectException.class, configuration::get);
assertEquals(jaasConfigurationException, thrownException.getCause());
}

@Test
public void testGoodJaasConfigInitialization() {
AtomicBoolean configurationInitializerEvaluated = new AtomicBoolean(false);
Configuration mockConfiguration = EasyMock.mock(Configuration.class);
Supplier<Configuration> configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> {
configurationInitializerEvaluated.set(true);
return mockConfiguration;
});

assertTrue(configurationInitializerEvaluated.get());
assertEquals(mockConfiguration, configuration.get());
}

@Test
public void testBadJaasConfigExtensionSetup() {
SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad"));
Supplier<Configuration> configuration = () -> {
throw jaasConfigurationException;
};

BasicAuthSecurityRestExtension extension = new BasicAuthSecurityRestExtension(configuration);

Exception thrownException = assertThrows(Exception.class, () -> extension.configure(Collections.emptyMap()));
assertEquals(jaasConfigurationException, thrownException);

thrownException = assertThrows(Exception.class, () -> extension.register(EasyMock.mock(ConnectRestExtensionContext.class)));
assertEquals(jaasConfigurationException, thrownException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
*/
package org.apache.kafka.connect.util.clusters;

import kafka.server.BrokerState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.RunningAsBroker;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
Expand All @@ -47,6 +45,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -241,13 +240,13 @@ public String zKConnectString() {
}

/**
* Get the brokers that have a {@link RunningAsBroker} state.
* Get the brokers that have a {@link BrokerState#RUNNING} state.
*
* @return the list of {@link KafkaServer} instances that are running;
* never null but possibly empty
*/
public Set<KafkaServer> runningBrokers() {
return brokersInState(state -> state.currentState() == RunningAsBroker.state());
return brokersInState(state -> state == BrokerState.RUNNING);
}

/**
Expand All @@ -264,7 +263,7 @@ public Set<KafkaServer> brokersInState(Predicate<BrokerState> desiredState) {

protected boolean hasState(KafkaServer server, Predicate<BrokerState> desiredState) {
try {
return desiredState.test(server.brokerState());
return desiredState.test(server.brokerState().get());
} catch (Throwable e) {
// Broker failed to respond.
return false;
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ package kafka.log
import java.io._
import java.nio.file.Files
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}

import kafka.metrics.KafkaMetricsGroup
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
import kafka.server._
import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException}
import org.apache.kafka.metadata.BrokerState

import scala.jdk.CollectionConverters._
import scala.collection._
Expand Down Expand Up @@ -60,7 +61,7 @@ class LogManager(logDirs: Seq[File],
val retentionCheckMs: Long,
val maxPidExpirationMs: Int,
scheduler: Scheduler,
val brokerState: BrokerState,
val brokerState: AtomicReference[BrokerState],
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel,
time: Time) extends Logging with KafkaMetricsGroup {
Expand Down Expand Up @@ -326,7 +327,7 @@ class LogManager(logDirs: Seq[File],
} else {
// log recovery itself is being performed by `Log` class during initialization
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
brokerState.newState(RecoveringFromUncleanShutdown)
brokerState.set(BrokerState.RECOVERY)
}

var recoveryPoints = Map[TopicPartition, Long]()
Expand Down Expand Up @@ -1182,7 +1183,7 @@ object LogManager {
def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
zkClient: KafkaZkClient,
brokerState: BrokerState,
brokerState: AtomicReference[BrokerState],
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.server
import java.io.{File, IOException}
import java.net.{InetAddress, SocketTimeoutException}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

import kafka.api.{KAFKA_0_9_0, KAFKA_2_2_IV0, KAFKA_2_4_IV1}
import kafka.cluster.Broker
Expand All @@ -46,6 +46,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.zookeeper.client.ZKClientConfig

Expand Down Expand Up @@ -102,7 +103,7 @@ class KafkaServer(
var kafkaYammerMetrics: KafkaYammerMetrics = null
var metrics: Metrics = null

val brokerState: BrokerState = new BrokerState
val brokerState = new AtomicReference[BrokerState](BrokerState.NOT_RUNNING)

var dataPlaneRequestProcessor: KafkaApis = null
var controlPlaneRequestProcessor: KafkaApis = null
Expand Down Expand Up @@ -166,7 +167,7 @@ class KafkaServer(

private[kafka] def featureChangeListener = _featureChangeListener

newGauge("BrokerState", () => brokerState.currentState)
newGauge("BrokerState", () => brokerState.get().value())
newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)

Expand All @@ -193,7 +194,7 @@ class KafkaServer(

val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
brokerState.newState(Starting)
brokerState.set(BrokerState.STARTING)

/* setup zookeeper */
initZkClient(time)
Expand Down Expand Up @@ -380,7 +381,7 @@ class KafkaServer(

socketServer.startProcessingRequests(authorizerFutures)

brokerState.newState(RunningAsBroker)
brokerState.set(BrokerState.RUNNING)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
Expand Down Expand Up @@ -632,7 +633,7 @@ class KafkaServer(
// the shutdown.
info("Starting controlled shutdown")

brokerState.newState(PendingControlledShutdown)
brokerState.set(BrokerState.PENDING_CONTROLLED_SHUTDOWN)

val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)

Expand All @@ -657,7 +658,7 @@ class KafkaServer(
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown(), this)
brokerState.newState(BrokerShuttingDown)
brokerState.set(BrokerState.SHUTTING_DOWN)

if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
Expand Down Expand Up @@ -726,7 +727,7 @@ class KafkaServer(
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()

brokerState.newState(NotRunning)
brokerState.set(BrokerState.NOT_RUNNING)

startupComplete.set(false)
isShuttingDown.set(false)
Expand Down
Loading

0 comments on commit 4b511a7

Please sign in to comment.