Skip to content

Commit

Permalink
address comment and checktyle fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sahusanket committed Jul 23, 2024
1 parent 53a1eaa commit 1f69734
Show file tree
Hide file tree
Showing 30 changed files with 242 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ public class DynamicPartitionerWithAvroTest extends MapReduceRunnerTestBase {

@Test
public void testMultiWriter() throws Exception {
runDynamicPartitionerMR(ORDERED_RECORDS, true, true);
runDynamicPartitionerMr(ORDERED_RECORDS, true, true);
}

@Test
public void testSingleWriter() throws Exception {
runDynamicPartitionerMR(ORDERED_RECORDS, false, true);
runDynamicPartitionerMr(ORDERED_RECORDS, false, true);
}

@Test
Expand All @@ -95,24 +95,24 @@ public void testSingleWriterWithUnorderedData() throws Exception {
createRecord("john", 84125));
// the input data is not ordered by output partition and its limiting to a single writer,
// so we expect this job to fail
runDynamicPartitionerMR(records, false, false);
runDynamicPartitionerMr(records, false, false);
}

@Test
public void testPartitionAppend() throws Exception {
runDynamicPartitionerMR(ORDERED_RECORDS, true, true,
runDynamicPartitionerMr(ORDERED_RECORDS, true, true,
DynamicPartitioner.PartitionWriteOption.CREATE_OR_APPEND, true);
}

@Test
public void testPartitionAppendWhenNotConfigured() throws Exception {
// partition will exist beforehand, but the append option is not configured; hence the job is expected to fail
runDynamicPartitionerMR(ORDERED_RECORDS, true, true, DynamicPartitioner.PartitionWriteOption.CREATE, false);
runDynamicPartitionerMr(ORDERED_RECORDS, true, true, DynamicPartitioner.PartitionWriteOption.CREATE, false);
}

@Test
public void testPartitionOverwrite() throws Exception {
runDynamicPartitionerMR(ORDERED_RECORDS, true, true,
runDynamicPartitionerMr(ORDERED_RECORDS, true, true,
DynamicPartitioner.PartitionWriteOption.CREATE_OR_OVERWRITE, true);
}

Expand All @@ -124,20 +124,19 @@ private void writeFile(PartitionedFileSet pfs, PartitionKey key) throws IOExcept
partitionOutput.addPartition();
}

private void runDynamicPartitionerMR(final List<? extends GenericRecord> records,
private void runDynamicPartitionerMr(final List<? extends GenericRecord> records,
boolean allowConcurrentWriters,
boolean expectedStatus) throws Exception {
runDynamicPartitionerMR(records, allowConcurrentWriters, false, null, expectedStatus);
runDynamicPartitionerMr(records, allowConcurrentWriters, false, null, expectedStatus);
}

private void runDynamicPartitionerMR(final List<? extends GenericRecord> records,
private void runDynamicPartitionerMr(final List<? extends GenericRecord> records,
boolean allowConcurrentWriters,
final boolean precreatePartitions,
@Nullable final DynamicPartitioner.PartitionWriteOption partitionWriteOption,
boolean expectedStatus) throws Exception {
ApplicationWithPrograms app = deployApp(AppWithMapReduceUsingAvroDynamicPartitioner.class);

final long now = System.currentTimeMillis();
ApplicationWithPrograms app = deployApp(AppWithMapReduceUsingAvroDynamicPartitioner.class);

Check warning on line 139 in cdap-app-fabric-tests/src/test/java/io/cdap/cdap/internal/app/runtime/batch/DynamicPartitionerWithAvroTest.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'app' declaration and its first usage is 7, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).
final Multimap<PartitionKey, GenericRecord> keyToRecordsMap = groupByPartitionKey(records, now);

// write values to the input kvTable
Expand Down Expand Up @@ -172,6 +171,9 @@ public void apply() throws IOException {
// run the partition writer m/r with this output partition time
Map<String, String> arguments = new HashMap<>();
arguments.put(OUTPUT_PARTITION_KEY, Long.toString(now));
// The test case and the output committer was written in hadoop 2, which followed older file output committer
// algorithm ( version 1 ). After upgrading to Hadoop 3, the default algorithm is version 2. So setting it manually
// to previous algorithm version to comply with with test.
arguments.put("system.mapreduce.mapreduce.fileoutputcommitter.algorithm.version", "1");
arguments.put(allowConcurrencyKey, Boolean.toString(allowConcurrentWriters));
if (partitionWriteOption != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public class MapReduceProgramRunner extends AbstractProgramRunnerWithPlugin {
private final RemoteClientFactory remoteClientFactory;
private final AppStateStoreProvider appStateStoreProvider;

/**
* Constructor for a program runner that launches a Map reduce program.
*/
@Inject
public MapReduceProgramRunner(Injector injector, CConfiguration cConf, Configuration hConf,
NamespacePathLocator locationFactory,
Expand Down Expand Up @@ -247,6 +250,12 @@ private File getPluginArchive(ProgramOptions options) {
return new File(options.getArguments().getOption(ProgramOptionConstants.PLUGIN_ARCHIVE));
}

/**
* A method to apply any custom map reduce configuration passed in by user. This will override the default config by
* hadoop or map reduce.
* If a runtime argument is passed by the user prefixed by {@MAPREDUCE_CUSTOM_CONFIG_PREFIX} , then we add it to
* hConf. Which later will be taken as hadoop / map reduce configuration.
*/
private Configuration setCustomMapReduceConfig(Configuration hConf, Arguments options) {
Map<String, String> systemArgs = options.asMap();
for (String name : systemArgs.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testSendForPaginatedListResponder() throws IOException {
}).when(chunkResponder).sendChunk(Mockito.any(ByteBuffer.class));

JsonPaginatedListResponder.respond(
new Gson(), responder, "applications", (jsonListResponder)-> {
new Gson(), responder, "applications", (jsonListResponder) -> {
jsonListResponder.send("application");
return "nextToken";
});
Expand All @@ -70,7 +70,7 @@ public void testMultipleSendForPaginatedListResponder() throws IOException {
}).when(chunkResponder).sendChunk(Mockito.any(ByteBuffer.class));

JsonPaginatedListResponder.respond(
new Gson(), responder, "applications", (jsonListResponder)-> {
new Gson(), responder, "applications", (jsonListResponder) -> {
jsonListResponder.send("application0");
jsonListResponder.send("application1");
return "nextToken";
Expand All @@ -96,7 +96,7 @@ public void testSendForWholeListResponder() throws IOException {
}).when(chunkResponder).sendChunk(Mockito.any(ByteBuffer.class));

JsonWholeListResponder.respond(
new Gson(), responder, (jsonListResponder)-> {
new Gson(), responder, (jsonListResponder) -> {
jsonListResponder.send("application");
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.security.auth.context.AuthenticationTestContext;
import io.cdap.http.NettyHttpService;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Optional;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.InMemoryDiscoveryService;
import org.hamcrest.CoreMatchers;
Expand All @@ -48,11 +52,6 @@
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Optional;

/**
* Tests for {@link RemoteAppStateStore}
*/
Expand Down Expand Up @@ -110,7 +109,7 @@ private static void setUpMockBehaviorForApplicationLifeCycleService() throws App
.getState(Mockito.argThat(new AppNameAppStateKeyMatcher(NOT_FOUND_APP)));
Mockito.doThrow(new ApplicationNotFoundException(new ApplicationId(NAMESPACE, NOT_FOUND_APP)))
.when(applicationLifecycleService)
.deleteState(Mockito.argThat(new AppNameAppStateKeyMatcher( NOT_FOUND_APP)));
.deleteState(Mockito.argThat(new AppNameAppStateKeyMatcher(NOT_FOUND_APP)));

//Throw RuntimeException whenever error app is being used
Mockito.doThrow(new RuntimeException("test")).when(applicationLifecycleService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.common.utils.Tasks;
import io.cdap.cdap.internal.AppFabricTestHelper;
import io.cdap.cdap.security.server.LDAPLoginModule;
import io.cdap.cdap.security.server.LdapLoginModule;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -66,7 +66,7 @@ public void startStopServer() throws Exception {
}

@Test
public void testSSL() throws IOException {
public void testSsl() throws IOException {
CConfiguration cConf = CConfiguration.create();
cConf.setBoolean(Constants.Security.SSL.INTERNAL_ENABLED, true);
SConfiguration sConf = SConfiguration.create();
Expand All @@ -85,7 +85,7 @@ public void testSSL() throws IOException {
Assert.assertTrue(URIScheme.HTTPS.isMatch(discoverable));
InetSocketAddress addr = discoverable.getSocketAddress();
// Since the server uses a self signed certificate we need a client that trusts all certificates
SSLSocket socket = (SSLSocket) LDAPLoginModule.TrustAllSSLSocketFactory.getDefault()
SSLSocket socket = (SSLSocket) LdapLoginModule.TrustAllSslSocketFactory.getDefault()
.createSocket(addr.getHostName(), addr.getPort());
socket.setSoTimeout(5000); // in millis
// Would throw exception if the server does not support ssl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.twill.api.ClassAcceptor;

/**
* Exclude hadoop classes
* Exclude hadoop classes.
*/
public class HadoopClassExcluder extends ClassAcceptor {

Expand All @@ -44,8 +44,8 @@ public boolean accept(String className, URL classUrl, URL classPathUrl) {
// Hadoop from the runtime environemnt
// Removing classes related to `com.fasterxml.jackson.core:jackson-databind`
// and `com.fasterxml.jackson.core:jackson-core`
if (className.startsWith("com.fasterxml.jackson.databind.") ||
className.startsWith("com.fasterxml.jackson.core.")) {
if (className.startsWith("com.fasterxml.jackson.databind.")
|| className.startsWith("com.fasterxml.jackson.core.")) {
return false;
}

Expand Down
31 changes: 15 additions & 16 deletions cdap-common/src/test/java/io/cdap/cdap/io/SchemaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class Node {

/**
* Test parent.
*
* @param <T> Parameter
*/
public class Parent<T> {
Expand All @@ -66,6 +67,7 @@ public class Parent<T> {

/**
* Test child.
*
* @param <T> Paramter.
*/
public class Child<T> extends Parent<Map<String, T>> {
Expand Down Expand Up @@ -158,8 +160,8 @@ public static final class Node4 {
public static final class Node5 {
private static final Schema SCHEMA = Schema.recordOf(
Node5.class.getName(),
Schema.Field.of("x", Schema.nullableOf(Node4.SCHEMA)));
private Node4 x;
Schema.Field.of("x4", Schema.nullableOf(Node4.SCHEMA)));
private Node4 x4;
}

/**
Expand All @@ -168,10 +170,10 @@ public static final class Node5 {
public static final class Node6 {
private static final Schema SCHEMA = Schema.recordOf(
Node6.class.getName(),
Schema.Field.of("x", Schema.nullableOf(Node4.SCHEMA)),
Schema.Field.of("y", Schema.nullableOf(Node5.SCHEMA)));
private Node4 x;
private Node5 y;
Schema.Field.of("x4", Schema.nullableOf(Node4.SCHEMA)),
Schema.Field.of("y5", Schema.nullableOf(Node5.SCHEMA)));
private Node4 x4;
private Node5 y5;
}

@Test
Expand All @@ -189,19 +191,16 @@ public void testAvroRecordSchema() throws Exception {
org.apache.avro.Schema avroStringSchema = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.STRING);
org.apache.avro.Schema avroIntSchema = org.apache.avro.Schema.create(org.apache.avro.Schema.Type.INT);


org.apache.avro.Schema schema = org.apache.avro.Schema.createRecord("UserInfo", "Describes user information",
"org.example.schema", false);

List<org.apache.avro.Schema.Field> fields = new ArrayList<>();

org.apache.avro.Schema.Field field = new org.apache.avro.Schema.Field("username", avroStringSchema,
"Field represents username", "unknown");
fields.add(field);

field = new org.apache.avro.Schema.Field("age", avroIntSchema, "Field represents age of user",-1);
field = new org.apache.avro.Schema.Field("age", avroIntSchema, "Field represents age of user", -1);
fields.add(field);

org.apache.avro.Schema schema = org.apache.avro.Schema.createRecord("UserInfo", "Describes user information",
"org.example.schema", false);
schema.setFields(fields);
Schema parsedSchema = Schema.parseJson(schema.toString());
Assert.assertTrue("UserInfo".equals(parsedSchema.getRecordName()));
Expand Down Expand Up @@ -256,7 +255,7 @@ public void testFieldIgnoreCase() {
}

@Test
public void testParseFlatSQL() throws IOException {
public void testParseFlatSql() throws IOException {
// simple, non-nested types
String schemaStr = "bool_field boolean, "
+ "int_field int not null, "
Expand Down Expand Up @@ -291,7 +290,7 @@ public void testParseFlatSQL() throws IOException {
}

@Test
public void testNestedSQL() throws IOException {
public void testNestedSql() throws IOException {
Schema expected = Schema.recordOf(
"rec",
Schema.Field.of(
Expand Down Expand Up @@ -335,7 +334,7 @@ public void testNestedSQL() throws IOException {
}

@Test
public void testParseSQLWithWhitespace() throws IOException {
public void testParseSqlWithWhitespace() throws IOException {
String schemaStr = "map_field map< string , int > not null,\n"
+ "arr_field array< record< x:int , y:double >\t> not null";
Schema expectedSchema = Schema.recordOf(
Expand All @@ -353,7 +352,7 @@ public void testParseSQLWithWhitespace() throws IOException {
}

@Test
public void testInvalidSQL() {
public void testInvalidSql() {
verifyThrowsException("int x");
verifyThrowsException("x map<int, int");
verifyThrowsException("x array<string");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ protected RemoteDatasetFramework createFramework(AuthenticationContext authentic
cConf.set("system.dataset.remote.retry.policy.base.delay.ms", "0");
cConf.set("system.dataset.remote.retry.policy.max.retries", "2");
RemoteClientFactory mockedFactory = Mockito.spy(remoteClientFactory);
Map<URL, Integer> failedURIs = new HashMap<>();
Map<URL, Integer> failedUris = new HashMap<>();
Mockito.doAnswer(i -> {
RemoteClient realClient = (RemoteClient) i.callRealMethod();
RemoteClient mocked = Mockito.spy(realClient);
Mockito.doAnswer(i2 -> {
HttpRequest request = i2.getArgument(0, HttpRequest.class);
//Fail the first GET with ServiceUnavailableException, second GET with IOException, allow third.
if (request.getMethod() == HttpMethod.GET && !failedURIs.containsKey(request.getURL())) {
failedURIs.put(request.getURL(), 1);
if (request.getMethod() == HttpMethod.GET && !failedUris.containsKey(request.getURL())) {
failedUris.put(request.getURL(), 1);
throw new ServiceUnavailableException("service");
} else if (request.getMethod() == HttpMethod.GET && failedURIs.get(request.getURL()) == 1) {
failedURIs.put(request.getURL(), 2);
} else if (request.getMethod() == HttpMethod.GET && failedUris.get(request.getURL()) == 1) {
failedUris.put(request.getURL(), 2);
throw new IOException();
}
failedURIs.clear();
failedUris.clear();
return i2.callRealMethod();
}).when(mocked).execute(Mockito.any());
return mocked;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@
import io.cdap.cdap.security.guice.CoreSecurityRuntimeModule;
import io.cdap.cdap.security.guice.ExternalAuthenticationModule;
import java.net.InetSocketAddress;
import java.security.SecureRandom;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;

import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.apache.commons.net.DefaultSocketFactory;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.twill.discovery.DiscoveryService;
Expand All @@ -58,12 +54,12 @@ protected String getProtocol() {
}

@Override
protected DefaultHttpClient getHTTPClient() {
protected DefaultHttpClient getHttpClient() {
return new DefaultHttpClient();
}

@Override
protected AsyncHttpClient getAsyncHTTPClient() throws Exception {
protected AsyncHttpClient getAsyncHttpClient() throws Exception {
AsyncHttpClientConfig.Builder configBuilder = new AsyncHttpClientConfig.Builder();

final AsyncHttpClient asyncHttpClient = new AsyncHttpClient(
Expand Down Expand Up @@ -93,18 +89,18 @@ private HttpRouterService(String hostname, DiscoveryService discoveryService) {
@Override
protected void startUp() {
CConfiguration cConf = CConfiguration.create();
SConfiguration sConfiguration = SConfiguration.create();
Injector injector = Guice.createInjector(new CoreSecurityRuntimeModule().getInMemoryModules(),
new ExternalAuthenticationModule(),
new InMemoryDiscoveryModule(),
new AppFabricTestModule(cConf));
DiscoveryServiceClient discoveryServiceClient = injector
.getInstance(DiscoveryServiceClient.class);
UserIdentityExtractor userIdentityExtractor = injector
.getInstance(UserIdentityExtractor.class);
cConf.set(Constants.Router.ADDRESS, hostname);
cConf.setInt(Constants.Router.ROUTER_PORT, 0);
cConf.setInt(Constants.Router.CONNECTION_TIMEOUT_SECS, CONNECTION_IDLE_TIMEOUT_SECS);
SConfiguration sConfiguration = SConfiguration.create();
DiscoveryServiceClient discoveryServiceClient = injector
.getInstance(DiscoveryServiceClient.class);
UserIdentityExtractor userIdentityExtractor = injector
.getInstance(UserIdentityExtractor.class);
router =
new NettyRouter(cConf, sConfiguration, InetAddresses.forString(hostname),
new RouterServiceLookup(cConf, (DiscoveryServiceClient) discoveryService,
Expand Down
Loading

0 comments on commit 1f69734

Please sign in to comment.