In ToplingDB, dcompact(Distributed Compaction) is formally implemented as a SidePlugin. That is to say, to use Distributed Compaction, the user code does not need any modification, only the json/yaml configuration file needs to be changed.
In the implementation of Distributed Compaction:
- Who runs ToplingDB/RocksDB is called Hoster, as Client in the Server/Client model.
- Who runs dcompact_worker is called Worker, as Server in the Server/Client model.
- One worker can serve multiple hosters at the same time, and one hoster can also send its compact tasks to multiple workers for executing.
- When compiling ToplingDB, this module (topling-dcompact) is automatically cloned from github by ToplingDB's Makefile.
User-defined plugins, such as MergeOperator, CompactionFilter, etc. are compiled into libraries, which are loaded through LD_PRELOAD when running dcompact_worker. In this way, the same process can load multiple different dynamic libraries to provide compact services for multiple different DBs. For example, MyTopling and Todis use this method to share the same dcompact_worker process.
The concrete class name of Distributed Compaction is DcompactEtcd, because ToplingDB Distributed Compaction originally used etcd through etcd-cpp-api for the interaction between Hoster and Worker. But etcd had to be abandoned because of bug #78 of etcd-cpp-api. Currently etcd-related code is useless, only DcompactEtcd is reserved as the class name.
Distributed Compaction is configured in the json configuration file:
"CompactionExecutorFactory": {
"dcompact": {
"class": "DcompactEtcd",
"params": {
"allow_fallback_to_local": true,
"hoster_root": "/nvmepool/shared/db1",
"instance_name": "db1",
"nfs_mnt_src": ":/nvmepool/shared",
"nfs_mnt_opt": "nolock,addr=192.168.31.16",
"http_max_retry": 3,
"http_timeout": 1,
"http_workers": [
"http://active-host:8080",
{ "url": "http://worker.for.submit.compact.job",
"base_url": "http://worker.for.probe.and.shutdown.compact.job",
"web_url": "https://web.page.proxy"
},
"//http://commented-host:8080",
"//end_http_workers"
],
"dcompact_min_level": 2
}
}
}
Here, CompactionExecutorFactory
is a C++ interface, which is a namespace in json. In this namespace, an object whose varname is dcompact
and class name is DcompactEtcd
(implements the CompactionExecutorFactory
interface) is defined, and the object is constructed with params
. The params are explained as follows:
property name | type | default value | description |
---|---|---|---|
allow_fallback_to_local |
bool | false | Whether to allow fallback to local compact if Distributed Compaction fails |
hoster_root |
string | empty | The root directory of the db, generally set the same as the path variable in DB::Open. |
instance_name |
string | empty | The db instance name, in a multi-tenant scenario, the CompactWorker node uses instance_name to distinguish different db instances |
job_url_root |
string | empty | 查看分布式 Compact Job 详细信息的 http url, 一般可以与 http_workers 中的 web_url 相同 |
nfs_mnt_src |
string | empty | NFS mount source |
nfs_mnt_opt |
string | empty | NFS mount options |
http_max_retry |
int | 3 | Maximum number of retries |
retry_sleep_time |
int | 1 | In seconds, sleep time between retry, to avoid flood the server |
overall_timeout |
int | 5 | In seconds, the time-out time for a single execution attempt of a single Distributed Compaction task from start to finish |
http_timeout |
int | 3 | In seconds, the timeout time of the http connection. Under normal circumstances, timeout means an error |
http_workers |
array | empty | Multiple (at least one) http urls. Those starting with // will be skipped, equivalent to being commented out//end_http_workers at the end is for the start_workers.sh script and cannot be deleted |
dcompact_min_level |
int | 2 | Distributed Compaction is used only when the Compact Output Level is greater than or equal to this value, and local compact is used when it is smaller than this value |
In the intranet environment, each worker can be simply configured as a string representing the http url of the worker.
In a public network (such as cloud computing) environment, workers are hidden behind reverse proxy or load balancing, and several configuration fields are added for this:
field name | type | description |
---|---|---|
url | string | Submit a compact job |
base_url | string | probe to query compact job or shutdown to specify compact job |
web_url | string | View the status through stat and view the log file in the browser |
export MAX_PARALLEL_COMPACTIONS=16
export NFS_DYNAMIC_MOUNT=0
export NFS_MOUNT_ROOT=/nvme-shared
export WORKER_DB_ROOT=/tmp
rm -rf $WORKER_DB_ROOT/db-instance-name-1
# Only for ToplingZipTable
export ZIP_SERVER_OPTIONS="listening_ports=8090:num_threads=32"
# ToplingZipTable_XXX can be override here by env, which will
# override the values defined in db Hoster side's json config.
# Hoster side's json config will pass to compact worker through
# rpc, then it may be override by env defined here!
export ToplingZipTable_nltBuildThreads=5
export ToplingZipTable_localTempDir=/dev/shm
export ToplingZipTable_warmupLevel=kValue
export DictZipBlobStore_zipThreads=16
rm -f /tmp/Topling-* # clean garbage files in previous run
ulimit -n 100000
./dcompact_worker.exe -D listening_ports=8080
Taking this script as an example, the script starts dcompact_worker on the current node, and several environment variables are described as follows:
environment variable name | explanation |
---|---|
TERMINATION_CHECK_URL | If the Cloud Server is Spot Instance , it will be in terminating state, most cloud vendor has http api for termination check, and return http 404 if it is in normal state, otherwise 200 for notify it is going to be terminated |
WORKER_DB_ROOT | The worker will create a directory for each attempt of each compact job sent by each hoster. The pseudo metadata and operation logs of the db will be saved in this directory. After the compaction successed, the data in this directory will be useless |
DEL_WORKER_TEMP_DB | Referring to WORKER_DB_ROOT , when the corresponding compact is executed, whether to delete the corresponding directory.While after the execution of compact, the data in the corresponding directory is useless and can be deleted, it can still be retained for subsequent tracking |
NFS_DYNAMIC_MOUNT | 0 means that the system has mounted nfs 1 means the system has not mounted nfs, and the dcompact_worker process dynamically mounts the nfs specified in the http request |
NFS_MOUNT_ROOT | This directory contains multiple Hosters' SST directory, and the directory name is the Instance_name of each host. The directory sent by hostter to worker is specified in JSON. The part of the hoster_root in the DCOMPACTETCD JSON definition is replaced by ${nfs_mount_root}/instance_name |
MAX_PARALLEL_COMPACTIONS | The number of compact tasks that the dcompact_worker process can execute at the same time. It must be greater than 0, but should not be set too large |
ADVERTISE_ADDR | This parameter will be returned to the Hoster through the response of the dcompact request, so that it will be displayed in the link url in the dcompact worker web view of the Hoster, to adapt to the reverse proxy, so that we can view the dcompact_worker of the intranet in the public network environment web view |
WEB_DOMAIN | Adaptive height for dcompact worker web view iframe |
MULTI_PROCESS | If the program using ToplingDB and its plug-ins such as CompactionFilter/EventHandler use global variables, it is impossible to execute Compact tasks from multiple DB instances in the same process. At this time, set MULTI_PROCESS to true to run Compact through multiple processes. |
ZIP_SERVER_OPTIONS | ToplingZipTable environment variable. when MULTI_PROCESS is true, it set the http parameters of ZipServer. For example:export ZIP_SERVER_OPTIONS=listening_ports=8090:num_threads=32 |
FEE_URL | url for fee |
LABOUR_ID | fee.labourId, normally the host name of dcompact worker |
CLOUD_PROVIDER | fee.provider |
ENABLE_HTTP_STOP | enable http stop api, disable this feature can avoid http misoperation/attack to stop dcompact_worker, default is false |
Note: For the same hoster instance, the path of the db data on the hoster and the path of accessing the db data of the hoster on the worker are generally different. Because it is difficult to agree on the mount path between the two, the environment variable NFS_MOUNT_ROOT on the worker and the json variables hoster_root
and instance_name
on the hoster work together to complete the mapping relationship of this path.
Distributed Compaction can be deployed in an auto-scaling group (an auto-scaling cluster) and hung behind a reverse proxy that exposes the HTTP service to the outside, so that the HTTP service is essentially a serverless service.
On the public cloud, the Distributed Compaction of MyTopling and Todis is realized through such serverless services.
In this configuration, the automatic mount capability of dcompact_worker needs to be used. The instance_name, nfs_mnt_src, and nfs_mnt_opt of DB are used to realize this requirement. DB sends the compact request containing these information to the reverse proxy through HTTP request, and the reverse proxy then uses the weight Wait for the proxy strategy to select a back-end dcompact_worker node, and forward the request. dcompact_worker mounts the corresponding NFS as needed, and then executes the compact task: reading input from NFS, and writing output back to NFS.
In this way, if the number of DB nodes or traffic increases or decreases, the machine load of the backend dcompact_worker will increase or decrease accordingly, and then the auto-scaling group will automatically increase or decrease nodes to keep the machine load within a reasonable range. Provide enough computing power without wasting resources.
On the public cloud, the intranets of different accounts are isolated from each other. To access NFS through the intranet, it is necessary to connect the intranets of different accounts. Each public cloud has corresponding support for this requirement.