Skip to content

Commit

Permalink
[irods#2146] Fix streaming detach mode when redirected.
Browse files Browse the repository at this point in the history
  • Loading branch information
JustinKyleJames committed Jan 10, 2024
1 parent 8f3467d commit a78d050
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 19 deletions.
38 changes: 36 additions & 2 deletions packaging/resource_suite_s3_nocache.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ def setUp(self):
if hasattr(self, 's3sse'):
self.s3_context += ';S3_SERVER_ENCRYPT=' + str(self.s3sse)

self.s3_context += ';ARCHIVE_NAMING_POLICY=' + self.archive_naming_policy

self.admin.assert_icommand("iadmin modresc demoResc name origResc", 'STDOUT_SINGLELINE', 'rename', input='yes\n')

self.admin.assert_icommand("iadmin mkresc demoResc s3 " + hostname + ":/" + self.s3bucketname + "/demoResc " + self.s3_context, 'STDOUT_SINGLELINE', 's3')
Expand Down Expand Up @@ -2821,3 +2819,39 @@ def test_iput_large_file_with_checksum_issue_2124(self):
self.user0.assert_icommand("irm -f {file1}".format(**locals()), 'EMPTY')
s3plugin_lib.remove_if_exists(file1)
s3plugin_lib.remove_if_exists(file1_get)

class Test_S3_NoCache_Decoupled_Base(Test_S3_NoCache_Base):

def test_decoupled_redirect_issue_2146(self):

file1 = "f1"
file1_size = 2*1024*1024
retrieved_file = "f1.get"
resource_host = test.settings.HOSTNAME_3
resource_name = "s3_resc_on_host3"

# create an S3 resource
self.admin.assert_icommand(f'iadmin mkresc {resource_name} s3 {resource_host}:/{self.s3bucketname}/{resource_name} {self.s3_context}', 'STDOUT_SINGLELINE', 's3')

try:
# create the file
lib.make_arbitrary_file(file1, file1_size)

# put the file
self.user1.assert_icommand(f'iput -R {resource_name} {file1}') # iput

# get and verify the file contents
self.user1.assert_icommand(f'iget {file1} {retrieved_file}')

self.assertTrue(filecmp.cmp(file1, retrieved_file)) # confirm retrieved is correct

# verify in the physical path that decoupled mode was used
print(self.user1.run_icommand(['ils', '-l', file1])[0]) # just debug
self.user1.assert_icommand(f'ils -L {file1}', 'STDOUT_SINGLELINE', f'/{self.s3bucketname}/[0-9]+/{file1}', use_regex=True)

finally:
# cleanup
self.user1.assert_icommand("irm -f %s" % file1) # irm
self.admin.assert_icommand("iadmin rmresc %s" % resource_name)
s3plugin_lib.remove_if_exists(file1)
s3plugin_lib.remove_if_exists(retrieved_file)
3 changes: 2 additions & 1 deletion packaging/test_irods_resource_plugin_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .resource_suite_s3_nocache import Test_S3_NoCache_Large_File_Tests_Base
from .resource_suite_s3_nocache import Test_S3_NoCache_Glacier_Base
from .resource_suite_s3_nocache import Test_S3_NoCache_MPU_Disabled_Base
from .resource_suite_s3_nocache import Test_S3_NoCache_Decoupled_Base
from .resource_suite_s3_cache import Test_S3_Cache_Base
from .resource_suite_s3_cache import Test_S3_Cache_Glacier_Base

Expand Down Expand Up @@ -125,7 +126,7 @@ def __init__(self, *args, **kwargs):
self.s3EnableMPU=1
super(Test_S3_NoCache_SSE, self).__init__(*args, **kwargs)

class Test_S3_NoCache_Decoupled(Test_S3_NoCache_Base, unittest.TestCase):
class Test_S3_NoCache_Decoupled(Test_S3_NoCache_Decoupled_Base, unittest.TestCase):
def __init__(self, *args, **kwargs):
"""Set up the test."""
self.keypairfile='/projects/irods/vsphere-testing/externals/amazon_web_services-CI.keypair'
Expand Down
75 changes: 59 additions & 16 deletions s3/s3_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include <assert.h>
#include <curl/curl.h>
#include <fmt/format.h>
#include <filesystem>

extern size_t g_retry_count;
extern size_t g_retry_wait;
Expand Down Expand Up @@ -404,25 +405,59 @@ namespace irods_s3 {
}
}

if (index > 0) {

std::string obj_id = boost::lexical_cast<std::string>(L1desc[index].dataObjInfo->dataId);
std::reverse(obj_id.begin(), obj_id.end());
// On redirect there is not an entry in L1desc[]. The following rules explain the behavior in
// this instance.
//
// 1. s3_notify_operation() gets called on the server the client is connected to.
// 2. In s3_notify_operation(), this method gets called with a L1desc[] entry so that index > 0.
// The L1desc[] entry is updated along with the object->physical_path() but only if
// openType == CREATE. This part ensures the database gets updated with the proper physical path.
// 3. On the redirected server, s3_file_create_operation() gets called which also calls
// this method. In that case there is no L1desc[] entry but object->physical_path()
// needs to be updated so the file is written to the correct location in S3. Do a
// GenQuery to get the object_id and use this to set the object->physical_path().

if (index > 0) {
// There is a corresponding L1desc[] entry. Look up the object_id in it. Reverse it
// for the key. Write the physical_path to the L1desc[] entry as well as object->physical_path().

std::string obj_id = boost::lexical_cast<std::string>(L1desc[index].dataObjInfo->dataId);
std::reverse(obj_id.begin(), obj_id.end());

// make S3 key name
std::ostringstream s3_key_name;
s3_key_name << "/" << bucket_name << "/" << obj_id << "/" << object_name;

// update physical path
rodsLog(developer_messages_log_level, "%s:%d (%s) [[%lu]] updating physical_path to %s\n",
__FILE__, __LINE__, __FUNCTION__, thread_id, s3_key_name.str().c_str());
object->physical_path(s3_key_name.str());
strncpy(L1desc[index].dataObjInfo->filePath, s3_key_name.str().c_str(), MAX_NAME_LEN);
L1desc[index].dataObjInfo->filePath[MAX_NAME_LEN - 1] = '\0';
}

}
}
// Update physical path but only on first creation otherwise the policy that was in effect
// at the time the object was first created wins.
if (L1desc[index].openType == CREATE_TYPE) {
rodsLog(developer_messages_log_level, "%s:%d (%s) [[%lu]] updating physical_path to %s\n",
__FILE__, __LINE__, __FUNCTION__, thread_id, s3_key_name.str().c_str());
object->physical_path(s3_key_name);
strncpy(L1desc[index].dataObjInfo->filePath, s3_key_name.c_str(), MAX_NAME_LEN);
L1desc[index].dataObjInfo->filePath[MAX_NAME_LEN - 1] = '\0';
}
}
else {
// There is no L1desc[] entry. Look up the object_id via GenQuery. Reverse it
// for the key. Write the physical_path to object->physical_path().

auto path{std::filesystem::path(object->logical_path())};
std::string query_string = fmt::format("SELECT DATA_ID WHERE DATA_NAME = '{}' AND COLL_NAME = '{}'",
path.filename().c_str(),
path.parent_path().c_str());
for (const auto& row : irods::query<rsComm_t>{_ctx.comm(), query_string}) {
std::string object_id = row[0];
std::reverse(object_id.begin(), object_id.end());
const auto s3_key_name = fmt::format("/{}/{}/{}", bucket_name, object_id, object_name);
rodsLog(developer_messages_log_level, "%s:%d (%s) [[%lu]] updating physical_path to %s\n",
__FILE__, __LINE__, __FUNCTION__, thread_id, s3_key_name.str().c_str());
object->physical_path(s3_key_name);
break; // data_id is the same for all replicas so we are done
}
}
}
}

std::ios_base::openmode translate_open_mode_posix_to_stream(int oflag, const std::string& call_from) noexcept
{
Expand Down Expand Up @@ -2194,7 +2229,15 @@ namespace irods_s3 {

irods::error s3_notify_operation( irods::plugin_context& _ctx,
const std::string* str ) {
return SUCCESS();
} // s3_notify_operation
if (is_cacheless_mode(_ctx.prop_map())) {
// Must update the physical_path in the L1desc[] table for decoupled naming.
// In the case of a redirect, this runs on the original connected server and this
// is the server that updates the database. In update_physical_path_for_decoupled_naming,
// the update will only happen if it is a create. Anything else uses whatever was
// previously in the database.
update_physical_path_for_decoupled_naming(_ctx);
}
return SUCCESS();
} // s3_notify_operation

}

0 comments on commit a78d050

Please sign in to comment.