Skip to content

Commit

Permalink
[Issue #636] remove cache schema and table from config file and fix m…
Browse files Browse the repository at this point in the history
…emory leak in cache writer (#728)

1. Now, the cache schema and table names are specified in the layout_version and cache_version values.
2. Close PixelsPhysicalReader in PixelsCacheWriter to release the off-heap buffers allocated for localFs.
3. Revise the pixels cache document.
  • Loading branch information
bianhq authored Sep 22, 2024
1 parent 7f1da2a commit 09a0166
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 234 deletions.
54 changes: 33 additions & 21 deletions pixels-cache/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ Whereas the cache manager on each worker node listens to the update of the cache

## How It Works
The cache plan is stored in etcd and have the following data model:
1. layout_version -> version: data layout version, incremented each time new data layouts are created.
2. cache_version -> version: caching version, incremented each time caches are updated.
3. cache_location_%{version}_%{node_id} -> files: recording the array of files cached on the specified node (cache manager) under the specified caching version.
1. layout_version -> {schema_name}.{table_name}:{layout_version}: data layout version, updated by the user or program that want to trigger cache loading or replacement. Only increasing layout versions are accepted.
2. cache_version -> {schema_name}.{table_name}:{layout_version}: cache version, set by the cache coordinator to notify the cache workers for cache loading or replacement when the cache tasks are ready in etcd.
3. cache_location_{layout_version}_{worker_hostname} -> files: recording the array of files cached on the specified node (cache manager) under the specified caching version.

The cache is read and updated as follows:
1. When the `layout optimizer` generates a new layout, it writes the new layout (with a new `layout_version`) into Etcd.
2. The `cache coordinator` constantly checks the value of `layout_version`, once it finds a newer `layout_version`, it allocates a new version of cache plan in Etcd, then updates the `cache_version` to the latest `layout_version`.
3. Once a cache manager finds a new `cache_version`, it begins to read the new cache plan and finds the new column chunks to be cached by itself, and sets itself as `busy` in Etcd.
2. The `cache coordinator` monitors the new values of `layout_version`, once it finds a newer `layout_version`, it creates the corresponding cache tasks for each cache worker in Etcd, then updates the `cache_version` to the latest `layout_version`.
3. The `cache workers` monitor `cache_version`. When a cache worker finds a new `cache_version`, it reads its cache task from `cache_location_{layout_version}_{worker_hostname}` and sets itself to `busy` to avoid concurrent cache updating. After than, the cache worker begins to load or replace the cache content.
4. When a query comes, Presto/Trino Coordinator checks Etcd for the cache plan, thus find available caches for its query splits.
5. Each Presto/Trino WorkerNode executes query splits with caching information (whether the column chunks in the query split are cached or not), and calls `PixelsCacheReader` to read the cached column chunks locally (if any).

Expand Down Expand Up @@ -44,10 +44,6 @@ index.location=/mnt/ramfs/pixels.index
index.size=1073741824
# the scheme of the storage system to be cached
cache.storage.scheme=hdfs
# which schema to be cached
cache.schema=pixels
# which table to be cached
cache.table=test_105
# set to true if cache.storage.scheme is a locality sensitive storage such as hdfs
cache.absolute.balancer.enabled=true
# set to true to enable pixels-cache
Expand All @@ -61,7 +57,7 @@ heartbeat.lease.ttl.seconds=20
heartbeat.period.seconds=10
```
The above values are a good default setting for each node to cache up-to 64GB data of table `pixels.test_105` stored on `HDFS`.
Change the `cache.schema`, `cache.table`, and `cache.storage.scheme` to cache a different table that is stored in a different storage system.
Change `cache.storage.scheme` to cache the data stored in a different storage system.

### Mount In-memory File System
On each worker node, create and mount an in-memory file system with 65GB capacity:
Expand All @@ -72,14 +68,6 @@ sudo mount -t tmpfs -o size=65g tmpfs /mnt/ramfs
The `size` parameter of the mount command should be larger than or equal to the sum of `cache.size` and `index.size` in
`PIXELS_HOME/pixels.properties`, but must be smaller than the available physical memory size.

Set up the cache before starting Pixels:
```bash
./sbin/reset-cache.sh
```
`reset-cache.sh` is only needed for the first time of initializing pixels-cache.
It initializes some states in etcd for the cache.
If you have modified the `etcd` hostname and port in `$PIXELS_HOME/pixels.properties`, change the `ENDPOINTS` property
in `reset-cache.sh` as well.

## Start Pixels (with cache)

Expand Down Expand Up @@ -107,19 +95,43 @@ the cache and index files.
Then create a new data layout for the cached table, and update `layout_version` of the cached table in etcd to trigger
cache loading or replacement:
```bash
./sbin/load-cache.sh {layout-version}
./sbin/load-cache.sh {schema_name}.{table_name}:{layout_version}
# e.g., ./sbin/load-cache.sh tpch.lineitem:1
```
`schema_name` and `table_name` specifies which table to cache.
Whereas `layout_version` specifies which layout version of the table to cache.
Note that pixels-cache only caches data in the compact path of the layout, so ensure the table is compacted on the layout.
See examples of compacting tables [HERE](../docs/TPC-H.md#data-compaction).
Currently, we only cache the full compact files with the same number of row groups defined by
`numRowGroupInFile` in the `LAYOUT_COMPACT` field of the layout in metadata. The tail compact file
(if exists) with less row groups than `numRowGroupInFile` will be ignored in cache loading or replacement.

If you have modified the `etcd` hostname and port in `$PIXELS_HOME/pixels.properties`, change the `ENDPOINTS` property
in `load-cache.sh` as well.

## Stop Pixels and clear cache
To stop Pixels, run:
```bash
./sbin/stop-pixels.sh
```
on the coordinator node to stop all Pixels daemons in the cluster. Then, run:
on the coordinator node to stop all Pixels daemons in the cluster.

The cache does not lost when Pixels is stopped. And it can be reused the next time Pixels is started.

To clear the cache and free the memory, run:
```bash
sudo -E ./sbin/unpin-cache.sh
```
on each worker node to release the memory pinned for the cache.
After than, you can delete the shared-memory files at `cache.location` and `index.location` on each worker node to
finally release the memory occupied by the cache.
finally release the memory occupied by the cache.
You can also umount the in-memory file system. This is optional. The in-memory file system will be
automatically umount when the operating system is restarted.

Then, run:
```bash
./sbin/reset-cache.sh
```
on any node in the cluster to reset the states related to pixels-cache in etcd.
If you have modified the `etcd` hostname and port in `$PIXELS_HOME/pixels.properties`, change the `ENDPOINTS` property
in `reset-cache.sh` as well.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

/**
* @author guodong
* @author hank
*/
public class PixelsCacheConfig
{
Expand Down Expand Up @@ -67,16 +68,6 @@ public String getStorageScheme()
return configFactory.getProperty("cache.storage.scheme");
}

public String getSchema()
{
return configFactory.getProperty("cache.schema");
}

public String getTable()
{
return configFactory.getProperty("cache.table");
}

public String getHDFSConfigDir()
{
return configFactory.getProperty("hdfs.config.dir");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

/**
* pixels cache header
* index:
Expand Down Expand Up @@ -510,13 +513,23 @@ public static long getCacheSize(MemoryMappedFile cacheFile)
return cacheFile.getLongVolatile(8);
}

public static int hashcode(byte[] bytes) {
public static int hashcode(byte[] bytes)
{
int var1 = 1;

for(int var3 = 0; var3 < bytes.length; ++var3) {
var1 = 31 * var1 + bytes[var3];
for (byte aByte : bytes)
{
var1 = 31 * var1 + aByte;
}

return var1;
}

public static String getHostnameFromCacheLocationLiteral(String cacheLocationLiteral)
{
String[] splits = requireNonNull(cacheLocationLiteral, "cacheLocationLiteral is null").split("_");
checkArgument(splits.length > Constants.HOSTNAME_INDEX_IN_CACHE_LOCATION_LITERAL,
"invalid cacheLocationLiteral: " + cacheLocationLiteral);
return splits[Constants.HOSTNAME_INDEX_IN_CACHE_LOCATION_LITERAL];
}
}
Loading

0 comments on commit 09a0166

Please sign in to comment.