Skip to content

Commit

Permalink
example: Implement experimental BookKeeper benchmark; example/ycsb: A…
Browse files Browse the repository at this point in the history
…dd CSV exporter
  • Loading branch information
fruhland committed Dec 3, 2024
1 parent 3f0cd5a commit e680e49
Show file tree
Hide file tree
Showing 39 changed files with 587 additions and 141 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,4 +413,5 @@ With 16 to 19 μs, IPoIB's latency results in that range are more than 3 times a
## Publications

- *hadroNIO: Accelerating Java NIO via UCX*, Fabian Ruhland, Filip Krakowski, Michael Schöttner; appeared in: Proceedings of the IEEE International Symposium on Parallel and Distributed Computing ([ISPDC](https://ispdc2021.utcluj.ro/), [IEEE Xplore](https://ieeexplore.ieee.org/document/9521601)), Cluj-Napoca, Romania, 2021.
- *Accelerating netty-based applications through transparent InfiniBand support*, Fabian Ruhland, Filip Krakowski, Michael Schöttner ([arXiv](https://arxiv.org/abs/2209.14048)), 2022.
- *Accelerating netty-based applications through transparent InfiniBand support*, Fabian Ruhland, Filip Krakowski, Michael Schöttner ([arXiv](https://arxiv.org/abs/2209.14048)), 2022.
- *Transparent network acceleration for big data computing in Java*, Fabian Ruhland, Filip Krakowski, Michael Schöttner; appeared in: Proceedings of the IEEE 22nd International Conference on Trust, Security and Privacy in Computing and Communications (TrustCom) ([TrustCom](https://hpcn.exeter.ac.uk/trustcom2023/), [IEEE Xplore](https://ieeexplore.ieee.org/document/10538981)), Exeter, United Kingdom, 2023.
23 changes: 12 additions & 11 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,34 @@ ext {
projectVersion = '0.3.3' + (Boolean.valueOf(System.getProperty('release')) ? '' : '-SNAPSHOT')

// Gradle plugins
gradleVersion = '8.1.1'
buildConfigPluginVersion = '4.0.2'
gradleVersion = '8.6'
buildConfigPluginVersion = '5.3.5'
gitVersionPluginVersion = '3.0.0'
shadowPluginVersion = '8.1.1'
protobufPluginVersion = '0.9.2'
protobufPluginVersion = '0.9.4'

// Core dependencies
agronaVersion = '1.18.1'
linuxEpollVersion = '1.0.1'
slf4jVersion = '2.0.7'

// Binding dependencies
jucxVersion = '1.14'
jucxVersion = '1.14.1'
infinileapVersion = '0.2.0-SNAPSHOT'

// Example dependencies
log4jVersion = '2.20.0'
picocliVersion = '4.7.3'
kryoVersion = '5.5.0'
nettyVersion = '4.1.92.Final'
grpcVersion = '1.54.1'
protobufVersion = '3.22.3'
log4jVersion = '2.22.1'
picocliVersion = '4.7.5'
kryoVersion = '5.6.0'
nettyVersion = '4.1.107.Final'
grpcVersion = '1.61.1'
protobufVersion = '3.25.3'
tomcatVersion = '6.0.53'
ycsbVersion = '0.17.0'
bookkeeperVersion = '4.16.4'

// Test dependencies
junitJupiterVersion = '5.9.2'
junitJupiterVersion = '5.10.2'
}

subprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ enum PollMethod {
private static final int DEFAULT_FLUSH_INTERVAL_SIZE = 1024;

private static final int DEFAULT_BUSY_POLL_TIMEOUT_NANOS = 20000;
private static final String DEFAULT_POLL_METHOD = "DYNAMIC";
private static final String DEFAULT_POLL_METHOD = "BUSY_POLLING";

private final int sendBufferLength;
private final int receiveBufferLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class HadronioSelector extends AbstractSelector {

@Override
protected void implCloseSelector() throws IOException {
LOGGER.info("Closing selector");
if (DebugConfig.DEBUG) LOGGER.debug("Closing selector");
selectorClosed = true;

synchronized (this) {
Expand Down Expand Up @@ -96,7 +96,7 @@ protected SelectionKey register(final AbstractSelectableChannel channel, final i
final var key = new HadronioSelectionKey(channel, this);
key.interestOps(interestOps);
key.attach(attachment);
LOGGER.info("Registering channel with selection key [{}]", key);
if (DebugConfig.DEBUG) LOGGER.debug("Registering channel with selection key [{}]", key);

synchronized (keys) {
synchronized (wakeupLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import de.hhu.bsinfo.hadronio.binding.UcxWorker;
import java.util.Stack;

import de.hhu.bsinfo.hadronio.generated.DebugConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,7 +61,7 @@ public synchronized ServerSocketChannel bind(final SocketAddress socketAddress,

try {
listener.bind(localAddress, connectionRequest -> {
LOGGER.info("Received connection request");
if (DebugConfig.DEBUG) LOGGER.debug("Received connection request");

if (backlog <= 0 || pendingRequests.size() < backlog) {
pendingRequests.push(connectionRequest);
Expand Down Expand Up @@ -125,7 +126,7 @@ public synchronized SocketChannel accept() throws IOException {
listener.getWorker().progress();
}

LOGGER.info("Accepting connection request");
if (DebugConfig.DEBUG) LOGGER.debug("Accepting connection request");
final var endpoint = listener.accept(pendingRequests.pop());
final var socket = new HadronioSocketChannel(provider(), endpoint);

Expand All @@ -144,14 +145,14 @@ public SocketAddress getLocalAddress() {

@Override
protected void implCloseSelectableChannel() throws IOException {
LOGGER.info("Closing server socket channel bound to [{}]", getLocalAddress());
if (DebugConfig.DEBUG) LOGGER.debug("Closing server socket channel bound to [{}]", getLocalAddress());
channelClosed = true;
listener.close();
}

@Override
protected void implConfigureBlocking(boolean blocking) {
LOGGER.info("Server socket channel is now configured to be [{}]", blocking ? "BLOCKING" : "NON-BLOCKING");
if (DebugConfig.DEBUG) LOGGER.debug("Server socket channel is now configured to be [{}]", blocking ? "BLOCKING" : "NON-BLOCKING");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public SocketChannel shutdownInput() throws IOException {
throw new ClosedChannelException();
}

LOGGER.info("Closing connection for input");
if (DebugConfig.DEBUG) LOGGER.debug("Closing connection for input");
inputClosed = true;
return this;
}
Expand All @@ -134,7 +134,7 @@ public SocketChannel shutdownOutput() throws IOException {
throw new ClosedChannelException();
}

LOGGER.info("Closing connection for output");
if (DebugConfig.DEBUG) LOGGER.debug("Closing connection for output");
outputClosed = true;
return this;
}
Expand Down Expand Up @@ -173,7 +173,7 @@ public synchronized boolean connect(final SocketAddress remoteAddress) throws IO
}

connectionPending = true;
LOGGER.info("Connecting to [{}]", remoteAddress);
if (DebugConfig.DEBUG) LOGGER.debug("Connecting to [{}]", remoteAddress);
endpoint.connect((InetSocketAddress) remoteAddress);
establishConnection();

Expand Down Expand Up @@ -321,7 +321,7 @@ public SocketAddress getLocalAddress() throws IOException {

@Override
protected void implCloseSelectableChannel() throws IOException {
LOGGER.info("Closing socket channel");
if (DebugConfig.DEBUG) LOGGER.debug("Closing socket channel");
channelClosed = true;
inputClosed = true;
outputClosed = true;
Expand All @@ -331,7 +331,7 @@ protected void implCloseSelectableChannel() throws IOException {

@Override
protected void implConfigureBlocking(final boolean blocking) {
LOGGER.info("Socket channel is now configured to be [{}]", blocking ? "BLOCKING" : "NON-BLOCKING");
if (DebugConfig.DEBUG) LOGGER.debug("Socket channel is now configured to be [{}]", blocking ? "BLOCKING" : "NON-BLOCKING");
}

@Override
Expand Down Expand Up @@ -401,7 +401,7 @@ public void onConnection(final boolean success, long localTag, long remoteTag) {

endpoint.setSendCallback(new SendCallback(sendBuffer));
endpoint.setReceiveCallback(new ReceiveCallback(this, readableMessages, isFlushing, configuration.getFlushIntervalSize()));
LOGGER.info("SocketChannel connected successfully (connection: [{} -> {}], localTag: [0x{}], remoteTag: [0x{}])", endpoint.getLocalAddress(), endpoint.getRemoteAddress(), Long.toHexString(localTag), Long.toHexString(remoteTag));
if (DebugConfig.DEBUG) LOGGER.debug("SocketChannel connected successfully (connection: [{} -> {}], localTag: [0x{}], remoteTag: [0x{}])", endpoint.getLocalAddress(), endpoint.getRemoteAddress(), Long.toHexString(localTag), Long.toHexString(remoteTag));

if (isBlocking()) {
connected = true;
Expand Down Expand Up @@ -462,7 +462,7 @@ void establishConnection() {
endpoint.setSendCallback(connectionCallback);
endpoint.setReceiveCallback(connectionCallback);

LOGGER.info("Exchanging tags to establish connection");
if (DebugConfig.DEBUG) LOGGER.debug("Exchanging tags to establish connection");
endpoint.sendStream(sendBuffer.addressOffset(), 2 * Long.BYTES, true, true);
endpoint.receiveStream(receiveBuffer.addressOffset(), 2 * Long.BYTES, true, false);
}
Expand Down
1 change: 1 addition & 0 deletions example/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
implementation "org.apache.tomcat:annotations-api:${tomcatVersion}"
implementation "site.ycsb:core:${ycsbVersion}"
implementation "org.apache.bookkeeper:bookkeeper-server:${bookkeeperVersion}"

testImplementation "org.junit.jupiter:junit-jupiter-api:${junitJupiterVersion}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junitJupiterVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ readonly BENCHMARK_NAME=${11}
wait() {
local seconds=$1

for (( k=0; k<$seconds; k++ )) do
for (( k=0; k<seconds; k++ )) do
printf "."
sleep 1s
done
Expand Down
16 changes: 9 additions & 7 deletions example/src/dist/bin/kvs/benchmark_client_inc_connections.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,27 @@ readonly STEPPING=${6}
readonly BENCHMARK_NAME=${7}
readonly RESULT_FILE=${8}
readonly WORKLOAD=${9}
readonly OPERATION_COUNT=${10}
readonly LOAD=${11}

wait() {
local seconds=$1

for (( i=0; i<$seconds; i++ )) do
for (( k=0; k<seconds; k++ )) do
printf "."
sleep 1s
done

printf "\n"
}

port=3000

for (( i=MIN_THREADS; i<=MAX_THREADS; i=(i+STEPPING)/STEPPING*STEPPING )); do
sed -i "/^operationcount=/c\operationcount=$((OPERATION_COUNT * i))" "${WORKLOAD}"
for j in {0..4}; do
wait 30
port=$((port + 1))
./bin/hadronio grpc kvs -b -r "${REMOTE_ADDRESS}:${port}" -a "${BIND_ADDRESS}" -w "${WORKLOAD}" -p LOAD
./bin/hadronio grpc kvs -b -r "${REMOTE_ADDRESS}:${port}" -a "${BIND_ADDRESS}" -w "${WORKLOAD}" -p RUN -m "${RECORD_SIZE}" -t "${i}" -o "${RESULT_FILE}" -n "${BENCHMARK_NAME}" -i "${j}"
wait 10
if [ "${LOAD}" == "true" ]; then
./bin/hadronio grpc kvs -b -r "${REMOTE_ADDRESS}" -a "${BIND_ADDRESS}" -w "${WORKLOAD}" -p LOAD
fi
./bin/hadronio grpc kvs -b -r "${REMOTE_ADDRESS}" -a "${BIND_ADDRESS}" -w "${WORKLOAD}" -p RUN -m "${RECORD_SIZE}" -t "${i}" -o "${RESULT_FILE}" -n "${BENCHMARK_NAME}" -i "${j}" -l
done
done
9 changes: 4 additions & 5 deletions example/src/dist/bin/kvs/benchmark_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

readonly BIND_ADDRESS=${1}
readonly ITERATIONS=${2}
readonly CONNECTIONS=${3}
readonly START_ITERATION=${4}

port=3000

for (( i=0; i<ITERATIONS; i++ )); do
for (( i=START_ITERATION; i<ITERATIONS; i++ )); do
for j in {0..4}; do
port=$((port + 1))
./bin/hadronio grpc kvs -s -a "${BIND_ADDRESS}:${port}"
./bin/hadronio grpc kvs -s -a "${BIND_ADDRESS}" -c $((CONNECTIONS * i))
done
done
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ readonly STEPPING=${8}
wait() {
local seconds=$1

for (( k=0; k<$seconds; k++ )) do
for (( k=0; k<seconds; k++ )) do
printf "."
sleep 1s
done
Expand Down
2 changes: 1 addition & 1 deletion example/src/dist/bin/netty/benchmark_client_inc_size.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ readonly CONNECTIONS=${7}
wait() {
local seconds=$1

for (( i=0; i<$seconds; i++ )) do
for (( k=0; k<seconds; k++ )) do
printf "."
sleep 1s
done
Expand Down
14 changes: 7 additions & 7 deletions example/src/dist/workloads/sample-workload
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,25 @@
workload=site.ycsb.workloads.CoreWorkload

# The number of records in the dataset at the start of the workload.
recordcount=10000
recordcount=1000

# The number of operations to perform in the workload.
operationcount=2500000
operationcount=1000000

# The size of each field.
fieldlength=1024
fieldlength=16

# The number of fields in a record.
fieldcount=16
fieldcount=1

# Should reads read all fields (true) or just one (false).
readallfields=true

# What proportion of operations should be reads.
readproportion=0.95
readproportion=1

# What proportion of operations should be updates.
updateproportion=0.05
updateproportion=0

# What proportion of operations should be scans.
scanproportion=0
Expand All @@ -30,4 +30,4 @@ insertproportion=0

# What distribution should be used to select the records to operate
# on – uniform, zipfian, hotspot, sequential, exponential or latest
requestdistribution=zipfian
requestdistribution=zipfian
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import de.hhu.bsinfo.hadronio.example.blocking.Blocking;
import de.hhu.bsinfo.hadronio.example.grpc.Grpc;
import de.hhu.bsinfo.hadronio.example.netty.Netty;
import de.hhu.bsinfo.hadronio.example.bookkeeper.Bookkeeper;
import de.hhu.bsinfo.hadronio.util.InetSocketAddressConverter;
import picocli.CommandLine;

Expand All @@ -11,7 +12,7 @@
@CommandLine.Command(
name = "hadronio",
description = "Test applications for hadroNIO",
subcommands = { Blocking.class, Netty.class, Grpc.class }
subcommands = { Blocking.class, Netty.class, Grpc.class, Bookkeeper.class }
)
public class Application implements Runnable {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package de.hhu.bsinfo.hadronio.example.bookkeeper;

import de.hhu.bsinfo.hadronio.example.bookkeeper.benchmark.Benchmark;
import picocli.CommandLine;

@CommandLine.Command(
name = "bookkeeper",
description = "Example applications using bookkeeper",
subcommands = { Benchmark.class }
)
public class Bookkeeper implements Runnable {

@Override
public void run() {
CommandLine.usage(this, System.err);
}
}
Loading

0 comments on commit e680e49

Please sign in to comment.