Skip to content

Commit

Permalink
lib/object, server: Check existence for read/write operations (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelkuhn committed Oct 20, 2021
1 parent e0b8b19 commit 60f5852
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 34 deletions.
65 changes: 53 additions & 12 deletions lib/object/jdistributed-object.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,12 @@ j_distributed_object_read_background_operation(gpointer data)

reply_operation_count = j_message_get_count(reply);

if (reply_operation_count == 0)
{
background_data->ret = FALSE;
break;
}

for (guint i = 0; i < reply_operation_count && j_list_iterator_next(it); i++)
{
JDistributedObjectReadBuffer* buffer = j_list_iterator_get(it);
Expand Down Expand Up @@ -384,9 +390,7 @@ j_distributed_object_read_background_operation(gpointer data)

j_list_unref(background_data->read.buffers);

g_slice_free(JDistributedObjectBackgroundData, background_data);

return NULL;
return data;
}

/**
Expand Down Expand Up @@ -422,14 +426,21 @@ j_distributed_object_write_background_operation(gpointer data)
reply = j_message_new_reply(background_data->message);
j_message_receive(reply, object_connection);

it = j_list_iterator_new(background_data->write.bytes_written);

while (j_list_iterator_next(it))
if (j_message_get_count(reply) > 0)
{
guint64* bytes_written = j_list_iterator_get(it);
it = j_list_iterator_new(background_data->write.bytes_written);

nbytes = j_message_get_8(reply);
j_helper_atomic_add(bytes_written, nbytes);
while (j_list_iterator_next(it))
{
guint64* bytes_written = j_list_iterator_get(it);

nbytes = j_message_get_8(reply);
j_helper_atomic_add(bytes_written, nbytes);
}
}
else
{
background_data->ret = FALSE;
}
}

Expand All @@ -439,9 +450,7 @@ j_distributed_object_write_background_operation(gpointer data)

j_list_unref(background_data->write.bytes_written);

g_slice_free(JDistributedObjectBackgroundData, background_data);

return NULL;
return data;
}

/**
Expand Down Expand Up @@ -912,11 +921,27 @@ j_distributed_object_read_exec(JList* operations, JSemantics* semantics)
data->operations = NULL;
data->semantics = semantics;
data->read.buffers = br_lists[i];
data->ret = TRUE;

background_data[i] = data;
}

j_helper_execute_parallel(j_distributed_object_read_background_operation, background_data, server_count);

for (guint i = 0; i < server_count; i++)
{
JDistributedObjectBackgroundData* data;

if (background_data[i] == NULL)
{
continue;
}

data = background_data[i];
ret = data->ret && ret;

g_slice_free(JDistributedObjectBackgroundData, data);
}
}
else
{
Expand Down Expand Up @@ -1087,11 +1112,27 @@ j_distributed_object_write_exec(JList* operations, JSemantics* semantics)
data->operations = NULL;
data->semantics = semantics;
data->write.bytes_written = bw_lists[i];
data->ret = TRUE;

background_data[i] = data;
}

j_helper_execute_parallel(j_distributed_object_write_background_operation, background_data, server_count);

for (guint i = 0; i < server_count; i++)
{
JDistributedObjectBackgroundData* data;

if (background_data[i] == NULL)
{
continue;
}

data = background_data[i];
ret = data->ret && ret;

g_slice_free(JDistributedObjectBackgroundData, data);
}
}
else
{
Expand Down
31 changes: 22 additions & 9 deletions lib/object/jobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,12 @@ j_object_read_exec(JList* operations, JSemantics* semantics)

reply_operation_count = j_message_get_count(reply);

if (reply_operation_count == 0)
{
ret = FALSE;
break;
}

for (guint i = 0; i < reply_operation_count && j_list_iterator_next(it); i++)
{
JObjectOperation* operation = j_list_iterator_get(it);
Expand Down Expand Up @@ -703,18 +709,25 @@ j_object_write_exec(JList* operations, JSemantics* semantics)
reply = j_message_new_reply(message);
j_message_receive(reply, object_connection);

it = j_list_iterator_new(operations);

while (j_list_iterator_next(it))
if (j_message_get_count(reply) > 0)
{
JObjectOperation* operation = j_list_iterator_get(it);
guint64* bytes_written = operation->write.bytes_written;
it = j_list_iterator_new(operations);

nbytes = j_message_get_8(reply);
j_helper_atomic_add(bytes_written, nbytes);
}
while (j_list_iterator_next(it))
{
JObjectOperation* operation = j_list_iterator_get(it);
guint64* bytes_written = operation->write.bytes_written;

nbytes = j_message_get_8(reply);
j_helper_atomic_add(bytes_written, nbytes);
}

j_list_iterator_free(it);
j_list_iterator_free(it);
}
else
{
ret = FALSE;
}
}

j_connection_pool_push(J_BACKEND_TYPE_OBJECT, object->index, object_connection);
Expand Down
40 changes: 27 additions & 13 deletions server/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk
{
JMessage* reply;
gpointer object;
gboolean ret;

namespace = j_message_get_string(message);
path = j_message_get_string(message);

reply = j_message_new_reply(message);

/// \todo return value
j_backend_object_open(jd_object_backend, namespace, path, &object);
ret = j_backend_object_open(jd_object_backend, namespace, path, &object);

for (i = 0; i < operation_count; i++)
{
Expand All @@ -211,6 +211,11 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk
length = j_message_get_8(message);
offset = j_message_get_8(message);

if (G_UNLIKELY(!ret))
{
break;
}

if (length > memory_chunk_size)
{
/// \todo return proper error
Expand Down Expand Up @@ -247,7 +252,10 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk
j_statistics_add(statistics, J_STATISTICS_BYTES_SENT, bytes_read);
}

j_backend_object_close(jd_object_backend, object);
if (ret)
{
j_backend_object_close(jd_object_backend, object);
}

j_message_send(reply, connection);
j_message_unref(reply);
Expand All @@ -259,6 +267,7 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk
{
g_autoptr(JMessage) reply = NULL;
gpointer object;
gboolean ret;

if (safety == J_SEMANTICS_SAFETY_NETWORK || safety == J_SEMANTICS_SAFETY_STORAGE)
{
Expand All @@ -268,8 +277,7 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk
namespace = j_message_get_string(message);
path = j_message_get_string(message);

/// \todo return value
j_backend_object_open(jd_object_backend, namespace, path, &object);
ret = j_backend_object_open(jd_object_backend, namespace, path, &object);

for (i = 0; i < operation_count; i++)
{
Expand All @@ -282,7 +290,7 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk
length = j_message_get_8(message);
offset = j_message_get_8(message);

if (length > memory_chunk_size)
if (length > memory_chunk_size && reply != NULL && G_LIKELY(ret))
{
/// \todo return proper error
j_message_add_operation(reply, sizeof(guint64));
Expand All @@ -298,13 +306,16 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk
g_input_stream_read_all(input, buf, length, NULL, NULL, NULL);
j_statistics_add(statistics, J_STATISTICS_BYTES_RECEIVED, length);

j_backend_object_write(jd_object_backend, object, buf, length, offset, &bytes_written);
j_statistics_add(statistics, J_STATISTICS_BYTES_WRITTEN, bytes_written);

if (reply != NULL)
if (G_LIKELY(ret))
{
j_message_add_operation(reply, sizeof(guint64));
j_message_append_8(reply, &bytes_written);
j_backend_object_write(jd_object_backend, object, buf, length, offset, &bytes_written);
j_statistics_add(statistics, J_STATISTICS_BYTES_WRITTEN, bytes_written);

if (reply != NULL)
{
j_message_add_operation(reply, sizeof(guint64));
j_message_append_8(reply, &bytes_written);
}
}

j_memory_chunk_reset(memory_chunk);
Expand All @@ -316,7 +327,10 @@ jd_handle_message(JMessage* message, GSocketConnection* connection, JMemoryChunk
j_statistics_add(statistics, J_STATISTICS_SYNC, 1);
}

j_backend_object_close(jd_object_backend, object);
if (ret)
{
j_backend_object_close(jd_object_backend, object);
}

if (reply != NULL)
{
Expand Down

0 comments on commit 60f5852

Please sign in to comment.