Skip to content

Commit

Permalink
Merge pull request ClickHouse#63636 from CurtizJ/azure-refactoring
Browse files Browse the repository at this point in the history
Refactoring near azure blob storage
  • Loading branch information
CurtizJ authored Jul 5, 2024
2 parents 6a40628 + 7f2bfc5 commit 7fbe5a7
Show file tree
Hide file tree
Showing 21 changed files with 723 additions and 756 deletions.
122 changes: 67 additions & 55 deletions src/Backups/BackupIO_AzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,48 +29,49 @@ namespace ErrorCodes
}

BackupReaderAzureBlobStorage::BackupReaderAzureBlobStorage(
const StorageAzureConfiguration & configuration_,
const AzureBlobStorage::ConnectionParams & connection_params_,
const String & blob_path_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, getLogger("BackupReaderAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURL().toString(), false, false}
, configuration(configuration_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false}
, connection_params(connection_params_)
, blob_path(blob_path_)
{
auto client_ptr = configuration.createClient(/* is_readonly */false, /* attempt_to_create_container */true);
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});
auto client_ptr = AzureBlobStorage::getContainerClient(connection_params, /*readonly=*/ false);
auto settings_ptr = AzureBlobStorage::getRequestSettingsForBackup(context_->getSettingsRef(), allow_azure_native_copy);

object_storage = std::make_unique<AzureObjectStorage>("BackupReaderAzureBlobStorage",
std::move(client_ptr),
configuration.createSettings(context_),
configuration_.container,
configuration.getConnectionURL().toString());
object_storage = std::make_unique<AzureObjectStorage>(
"BackupReaderAzureBlobStorage",
std::move(client_ptr),
std::move(settings_ptr),
connection_params.getContainer(),
connection_params.getConnectionURL());

client = object_storage->getAzureBlobStorageClient();
auto settings_copy = *object_storage->getSettings();
settings_copy.use_native_copy = allow_azure_native_copy;
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
settings = object_storage->getSettings();
}

BackupReaderAzureBlobStorage::~BackupReaderAzureBlobStorage() = default;

bool BackupReaderAzureBlobStorage::fileExists(const String & file_name)
{
String key = fs::path(configuration.blob_path) / file_name;
String key = fs::path(blob_path) / file_name;
return object_storage->exists(StoredObject(key));
}

UInt64 BackupReaderAzureBlobStorage::getFileSize(const String & file_name)
{
String key = fs::path(configuration.blob_path) / file_name;
String key = fs::path(blob_path) / file_name;
ObjectMetadata object_metadata = object_storage->getObjectMetadata(key);
return object_metadata.size_bytes;
}

std::unique_ptr<SeekableReadBuffer> BackupReaderAzureBlobStorage::readFile(const String & file_name)
{
String key = fs::path(configuration.blob_path) / file_name;
String key = fs::path(blob_path) / file_name;
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client, key, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
Expand All @@ -85,23 +86,23 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,
&& destination_data_source_description.is_encrypted == encrypted_in_backup)
{
LOG_TRACE(log, "Copying {} from AzureBlobStorage to disk {}", path_in_backup, destination_disk->getName());
auto write_blob_function = [&](const Strings & blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
auto write_blob_function = [&](const Strings & dst_blob_path, WriteMode mode, const std::optional<ObjectAttributes> &) -> size_t
{
/// Object storage always uses mode `Rewrite` because it simulates append using metadata and different files.
if (blob_path.size() != 2 || mode != WriteMode::Rewrite)
if (dst_blob_path.size() != 2 || mode != WriteMode::Rewrite)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Blob writing function called with unexpected blob_path.size={} or mode={}",
blob_path.size(), mode);
dst_blob_path.size(), mode);

copyAzureBlobStorageFile(
client,
destination_disk->getObjectStorage()->getAzureBlobStorageClient(),
configuration.container,
fs::path(configuration.blob_path) / path_in_backup,
connection_params.getContainer(),
fs::path(blob_path) / path_in_backup,
0,
file_size,
/* dest_container */ blob_path[1],
/* dest_path */ blob_path[0],
/* dest_container */ dst_blob_path[1],
/* dest_path */ dst_blob_path[0],
settings,
read_settings,
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupRDAzure"));
Expand All @@ -119,28 +120,33 @@ void BackupReaderAzureBlobStorage::copyFileToDisk(const String & path_in_backup,


BackupWriterAzureBlobStorage::BackupWriterAzureBlobStorage(
const StorageAzureConfiguration & configuration_,
const AzureBlobStorage::ConnectionParams & connection_params_,
const String & blob_path_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
const ContextPtr & context_,
bool attempt_to_create_container)
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterAzureBlobStorage"))
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, configuration_.getConnectionURL().toString(), false, false}
, configuration(configuration_)
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::Azure, MetadataStorageType::None, connection_params_.getConnectionURL(), false, false}
, connection_params(connection_params_)
, blob_path(blob_path_)
{
auto client_ptr = configuration.createClient(/* is_readonly */false, attempt_to_create_container);
client_ptr->SetClickhouseOptions(Azure::Storage::Blobs::ClickhouseClientOptions{.IsClientForDisk=true});

object_storage = std::make_unique<AzureObjectStorage>("BackupWriterAzureBlobStorage",
std::move(client_ptr),
configuration.createSettings(context_),
configuration.container,
configuration_.getConnectionURL().toString());
if (!attempt_to_create_container)
connection_params.endpoint.container_already_exists = true;

auto client_ptr = AzureBlobStorage::getContainerClient(connection_params, /*readonly=*/ false);
auto settings_ptr = AzureBlobStorage::getRequestSettingsForBackup(context_->getSettingsRef(), allow_azure_native_copy);

object_storage = std::make_unique<AzureObjectStorage>(
"BackupWriterAzureBlobStorage",
std::move(client_ptr),
std::move(settings_ptr),
connection_params.getContainer(),
connection_params.getConnectionURL());

client = object_storage->getAzureBlobStorageClient();
auto settings_copy = *object_storage->getSettings();
settings_copy.use_native_copy = allow_azure_native_copy;
settings = std::make_unique<const AzureObjectStorageSettings>(settings_copy);
settings = object_storage->getSettings();
}

void BackupWriterAzureBlobStorage::copyFileFromDisk(
Expand All @@ -159,18 +165,18 @@ void BackupWriterAzureBlobStorage::copyFileFromDisk(
{
/// getBlobPath() can return more than 3 elements if the file is stored as multiple objects in AzureBlobStorage container.
/// In this case we can't use the native copy.
if (auto blob_path = src_disk->getBlobPath(src_path); blob_path.size() == 2)
if (auto src_blob_path = src_disk->getBlobPath(src_path); src_blob_path.size() == 2)
{
LOG_TRACE(log, "Copying file {} from disk {} to AzureBlobStorag", src_path, src_disk->getName());
copyAzureBlobStorageFile(
src_disk->getObjectStorage()->getAzureBlobStorageClient(),
client,
/* src_container */ blob_path[1],
/* src_path */ blob_path[0],
/* src_container */ src_blob_path[1],
/* src_path */ src_blob_path[0],
start_pos,
length,
configuration.container,
fs::path(configuration.blob_path) / path_in_backup,
connection_params.getContainer(),
fs::path(blob_path) / path_in_backup,
settings,
read_settings,
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
Expand All @@ -188,11 +194,11 @@ void BackupWriterAzureBlobStorage::copyFile(const String & destination, const St
copyAzureBlobStorageFile(
client,
client,
configuration.container,
fs::path(configuration.blob_path)/ source,
connection_params.getContainer(),
fs::path(blob_path)/ source,
0,
size,
/* dest_container */ configuration.container,
/* dest_container */ connection_params.getContainer(),
/* dest_path */ destination,
settings,
read_settings,
Expand All @@ -206,22 +212,28 @@ void BackupWriterAzureBlobStorage::copyDataToFile(
UInt64 length)
{
copyDataToAzureBlobStorageFile(
create_read_buffer, start_pos, length, client, configuration.container,
fs::path(configuration.blob_path) / path_in_backup, settings,
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(), "BackupWRAzure"));
create_read_buffer,
start_pos,
length,
client,
connection_params.getContainer(),
fs::path(blob_path) / path_in_backup,
settings,
threadPoolCallbackRunnerUnsafe<void>(getBackupsIOThreadPool().get(),
"BackupWRAzure"));
}

BackupWriterAzureBlobStorage::~BackupWriterAzureBlobStorage() = default;

bool BackupWriterAzureBlobStorage::fileExists(const String & file_name)
{
String key = fs::path(configuration.blob_path) / file_name;
String key = fs::path(blob_path) / file_name;
return object_storage->exists(StoredObject(key));
}

UInt64 BackupWriterAzureBlobStorage::getFileSize(const String & file_name)
{
String key = fs::path(configuration.blob_path) / file_name;
String key = fs::path(blob_path) / file_name;
RelativePathsWithMetadata children;
object_storage->listObjects(key,children,/*max_keys*/0);
if (children.empty())
Expand All @@ -231,15 +243,15 @@ UInt64 BackupWriterAzureBlobStorage::getFileSize(const String & file_name)

std::unique_ptr<ReadBuffer> BackupWriterAzureBlobStorage::readFile(const String & file_name, size_t /*expected_file_size*/)
{
String key = fs::path(configuration.blob_path) / file_name;
String key = fs::path(blob_path) / file_name;
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client, key, read_settings, settings->max_single_read_retries,
settings->max_single_download_retries);
}

std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const String & file_name)
{
String key = fs::path(configuration.blob_path) / file_name;
String key = fs::path(blob_path) / file_name;
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client,
key,
Expand All @@ -251,7 +263,7 @@ std::unique_ptr<WriteBuffer> BackupWriterAzureBlobStorage::writeFile(const Strin

void BackupWriterAzureBlobStorage::removeFile(const String & file_name)
{
String key = fs::path(configuration.blob_path) / file_name;
String key = fs::path(blob_path) / file_name;
StoredObject object(key);
object_storage->removeObjectIfExists(object);
}
Expand All @@ -260,7 +272,7 @@ void BackupWriterAzureBlobStorage::removeFiles(const Strings & file_names)
{
StoredObjects objects;
for (const auto & file_name : file_names)
objects.emplace_back(fs::path(configuration.blob_path) / file_name);
objects.emplace_back(fs::path(blob_path) / file_name);

object_storage->removeObjectsIfExist(objects);

Expand All @@ -270,7 +282,7 @@ void BackupWriterAzureBlobStorage::removeFilesBatch(const Strings & file_names)
{
StoredObjects objects;
for (const auto & file_name : file_names)
objects.emplace_back(fs::path(configuration.blob_path) / file_name);
objects.emplace_back(fs::path(blob_path) / file_name);

object_storage->removeObjectsIfExist(objects);
}
Expand Down
20 changes: 11 additions & 9 deletions src/Backups/BackupIO_AzureBlobStorage.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#pragma once

#include "config.h"

#if USE_AZURE_BLOB_STORAGE
#include <Backups/BackupIO_Default.h>
#include <Disks/DiskType.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/ObjectStorage/Azure/Configuration.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>


namespace DB
Expand All @@ -17,7 +15,8 @@ class BackupReaderAzureBlobStorage : public BackupReaderDefault
{
public:
BackupReaderAzureBlobStorage(
const StorageAzureConfiguration & configuration_,
const AzureBlobStorage::ConnectionParams & connection_params_,
const String & blob_path_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
Expand All @@ -40,16 +39,18 @@ class BackupReaderAzureBlobStorage : public BackupReaderDefault
private:
const DataSourceDescription data_source_description;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureConfiguration configuration;
AzureBlobStorage::ConnectionParams connection_params;
String blob_path;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<const AzureObjectStorageSettings> settings;
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings;
};

class BackupWriterAzureBlobStorage : public BackupWriterDefault
{
public:
BackupWriterAzureBlobStorage(
const StorageAzureConfiguration & configuration_,
const AzureBlobStorage::ConnectionParams & connection_params_,
const String & blob_path_,
bool allow_azure_native_copy,
const ReadSettings & read_settings_,
const WriteSettings & write_settings_,
Expand Down Expand Up @@ -87,9 +88,10 @@ class BackupWriterAzureBlobStorage : public BackupWriterDefault

const DataSourceDescription data_source_description;
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> client;
StorageAzureConfiguration configuration;
AzureBlobStorage::ConnectionParams connection_params;
String blob_path;
std::unique_ptr<AzureObjectStorage> object_storage;
std::shared_ptr<const AzureObjectStorageSettings> settings;
std::shared_ptr<const AzureBlobStorage::RequestSettings> settings;
};

}
Expand Down
Loading

0 comments on commit 7fbe5a7

Please sign in to comment.