Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding extra queue to check metrics are independent #4869

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,13 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase {
private String rootPath;

public static final String QUEUE1 = "METRICSQ1";
public static final String QUEUE2 = "METRICSQ2";
public static final String QUEUE1_METRIC_LABEL = MetricsUtil.formatString(QUEUE1);
public static final String QUEUE2_METRIC_LABEL = MetricsUtil.formatString(QUEUE2);
public static final String QUEUE1_SERVICE = "Q1";
public static final String QUEUE2_SERVICE = "Q2";
public static final int QUEUE1_SIZE = 6;
public static final int QUEUE2_SIZE = 6;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use a different size for each queue?


// Metrics collector Thread
final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -203,6 +207,14 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf)
cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE, "6");
cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0);

// Queue 2 with zero compactors
cfg.setProperty(Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE2_SERVICE + ".planner",
RatioBasedCompactionPlanner.class.getName());
cfg.setProperty(
Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE2_SERVICE + ".planner.opts.groups",
"[{'group':'" + QUEUE2 + "'}]");
cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE2, 0);

// This test waits for dead compactors to be absent in zookeeper. The following setting will
// make entries in ZK related to dead compactor processes expire sooner.
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10");
Expand Down Expand Up @@ -423,6 +435,196 @@ public void testQueueMetrics() throws Exception {
}
}

@Test
public void testMultipleQueueMetricsIndependence() throws Exception {
String table2 = getUniqueNames(2)[1];
long highestFileCountQueue1 = 0L;
long highestFileCountQueue2 = 0L;
ServerContext context = getCluster().getServerContext();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String dir1 = getDir("/testBulkFile-Queue1-");
String dir2 = getDir("/testBulkFile-Queue2-");
FileSystem fs = getCluster().getFileSystem();
fs.mkdirs(new Path(dir1));
fs.mkdirs(new Path(dir2));

Map<String,String> props =
Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
"table.compaction.dispatcher.opts.service", QUEUE2_SERVICE);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props)
.withInitialTabletAvailability(TabletAvailability.HOSTED);
c.tableOperations().create(table2, ntc);

// Create splits so there are two groupings of tablets with similar file counts for both
// queues.
String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
addSplits(c, tableName, splitString);
addSplits(c, table2, splitString);

// Add files to both directories (simulating two different queues)
for (int i = 0; i < 100; i++) {
writeData(dir1 + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
writeData(dir2 + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
}
c.tableOperations().importDirectory(dir1).to(tableName).load();
c.tableOperations().importDirectory(dir2).to(table2).load();

// Set up compaction configurations for two different queues
IteratorSetting iterSettingQueue1 = new IteratorSetting(100, CompactionIT.TestFilter.class);
iterSettingQueue1.addOption("expectedQ", QUEUE1);
iterSettingQueue1.addOption("modulus", 3 + "");

IteratorSetting iterSettingQueue2 = new IteratorSetting(100, CompactionIT.TestFilter.class);
iterSettingQueue2.addOption("expectedQ", QUEUE2);
iterSettingQueue2.addOption("modulus", 5 + "");

CompactionConfig configQ1 =
new CompactionConfig().setIterators(List.of(iterSettingQueue1)).setWait(false);
CompactionConfig configQ2 =
new CompactionConfig().setIterators(List.of(iterSettingQueue2)).setWait(false);

c.tableOperations().compact(tableName, configQ1);
c.tableOperations().compact(table2, configQ2);

// Get file sizes for each queue's tablets
try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) {
for (TabletMetadata tablet : tm) {
long fileSize = tablet.getFiles().size();
log.info("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize);
highestFileCountQueue1 = Math.max(highestFileCountQueue1, fileSize);
highestFileCountQueue2 = Math.max(highestFileCountQueue2, fileSize);
}
}

verifyData(c, tableName, 0, 100 * 100 - 1, false);
verifyData(c, table2, 0, 100 * 100 - 1, false);
}

// Fetch and verify metrics for both queues
boolean sawMetricsQ1 = false;
boolean sawMetricsQ2 = false;

while (!sawMetricsQ1 && !sawMetricsQ2) {
while (!queueMetrics.isEmpty()) {
var qm = queueMetrics.take();
if (qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
&& qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
if (Integer.parseInt(qm.getValue()) > 0) {
sawMetricsQ1 = true;
}
} else if (qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
&& qm.getTags().containsValue(QUEUE2_METRIC_LABEL)) {
if (Integer.parseInt(qm.getValue()) > 0) {
sawMetricsQ2 = true;
}
}
}
// If metrics are not found in the queue, sleep until the next poll.
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}

// Verify metrics independence by comparing their lowest priorities
long lowestPriorityQ1 = Short.MIN_VALUE;
long lowestPriorityQ2 = Short.MIN_VALUE;
long rejectedCountQ1 = 0L;
long rejectedCountQ2 = 0L;
int queue1Size = 0;
int queue2Size = 0;
boolean sawQs = false;

while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
// Handle QUEUE1 metrics
if (metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED.getName())) {
rejectedCountQ1 = Long.parseLong(metric.getValue());
} else if (metric.getName()
.contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getName())) {
lowestPriorityQ1 = Math.max(lowestPriorityQ1, Long.parseLong(metric.getValue()));
} else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getName())) {
queue1Size = Integer.parseInt(metric.getValue());
}
} else if (metric.getTags().containsValue(QUEUE2_METRIC_LABEL)) {
if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED.getName())) {
rejectedCountQ2 = Long.parseLong(metric.getValue());
} else if (metric.getName()
.contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getName())) {
lowestPriorityQ2 = Math.max(lowestPriorityQ2, Long.parseLong(metric.getValue()));
} else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH.getName())) {
queue2Size = Integer.parseInt(metric.getValue());
}
} else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUES.getName())) {
sawQs = true;
} else {
log.debug("{}", metric);
}
}

// Confirm metrics were generated and in some cases, validate contents.
assertTrue(rejectedCountQ1 > 0L);
assertTrue(rejectedCountQ2 > 0L);

// Priority is the file counts + number of compactions for that tablet.
// The lowestPriority job in the queue should have been
// at least 1 count higher than the highest file count.
short highestFileCountPrioQ1 = CompactionJobPrioritizer.createPriority(
getCluster().getServerContext().getTableId(tableName), CompactionKind.USER,
(int) highestFileCountQueue1, 0);
assertTrue(lowestPriorityQ1 > highestFileCountPrioQ1,
lowestPriorityQ1 + " " + highestFileCountQueue1 + " " + highestFileCountPrioQ1);

short highestFileCountPrioQ2 =
CompactionJobPrioritizer.createPriority(getCluster().getServerContext().getTableId(table2),
CompactionKind.USER, (int) highestFileCountQueue2, 0);
assertTrue(lowestPriorityQ2 > highestFileCountPrioQ2,
lowestPriorityQ2 + " " + highestFileCountQueue2 + " " + highestFileCountPrioQ2);

// Multiple Queues have been created
assertTrue(sawQs);

assertEquals(QUEUE1_SIZE, queue1Size);
assertEquals(QUEUE2_SIZE, queue2Size);

// Start Compactors for both QUEUE1 and QUEUE2
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 1);
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE2, 1);
getCluster().getClusterControl().start(ServerType.COMPACTOR);

boolean emptyQueue1 = false;
boolean emptyQueue2 = false;

UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());

// Continue checking until both queues are empty
while (!emptyQueue1 || !emptyQueue2) {
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
// Check metrics for QUEUE1
if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
if (Integer.parseInt(metric.getValue()) == 0) {
emptyQueue1 = true;
}
} else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
&& metric.getTags().containsValue(QUEUE2_METRIC_LABEL)) {
if (Integer.parseInt(metric.getValue()) == 0) {
emptyQueue2 = true;
}
}

// Check if the total number of queues is zero, and ensure no metrics for the queues remain
if (metric.getName().equals(COMPACTOR_JOB_PRIORITY_QUEUES.getName())) {
if (Integer.parseInt(metric.getValue()) == 0) {
emptyQueue1 = true;
emptyQueue2 = true;
}
}
}
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}

}

/**
* Test that the compaction queue is cleared when compactions no longer need to happen.
*/
Expand Down