Skip to content

Commit

Permalink
backend: Add locking in MongoDB KV backend (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelkuhn committed Feb 26, 2024
1 parent 5b7ec78 commit d1e1bca
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 12 deletions.
54 changes: 46 additions & 8 deletions backend/kv/mongodb.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ struct JMongoDBData

gchar* host;
gchar* database;

/// \todo Use a mongoc_client_pool_t instead?
// We need to be careful to not keep the mutex locked across function boundaries since the backend is loaded client-side.
// Otherwise, if a client creates two iterators without iterating them, it could enter backend_get_* twice and deadlock.
GMutex mutex[1];
};

typedef struct JMongoDBData JMongoDBData;
Expand Down Expand Up @@ -99,6 +104,8 @@ backend_batch_start(gpointer backend_data, gchar const* namespace, JSemantics* s
mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_UNACKNOWLEDGED);
}

g_mutex_lock(bd->mutex);

/// \todo cache
m_collection = mongoc_client_get_collection(bd->connection, bd->database, namespace);
m_database = mongoc_client_get_database(bd->connection, bd->database);
Expand All @@ -110,6 +117,8 @@ backend_batch_start(gpointer backend_data, gchar const* namespace, JSemantics* s

bulk_op = mongoc_collection_create_bulk_operation_with_opts(m_collection, opts);

g_mutex_unlock(bd->mutex);

bson_destroy(opts);

mongoc_collection_destroy(m_collection);
Expand All @@ -135,8 +144,7 @@ backend_batch_execute(gpointer backend_data, gpointer backend_batch)
gboolean ret = TRUE;

JMongoDBBatch* batch = backend_batch;

(void)backend_data;
JMongoDBData* bd = backend_data;

g_return_val_if_fail(backend_batch != NULL, FALSE);

Expand All @@ -146,8 +154,12 @@ backend_batch_execute(gpointer backend_data, gpointer backend_batch)
{
bson_t reply[1];

g_mutex_lock(bd->mutex);

ret = mongoc_bulk_operation_execute(batch->bulk_op, reply, NULL);
bson_destroy(reply);

g_mutex_unlock(bd->mutex);
}

/*
Expand All @@ -172,11 +184,10 @@ static gboolean
backend_put(gpointer backend_data, gpointer backend_batch, gchar const* key, gconstpointer value, guint32 len)
{
JMongoDBBatch* batch = backend_batch;
JMongoDBData* bd = backend_data;
bson_t document[1];
bson_t selector[1];

(void)backend_data;

g_return_val_if_fail(backend_batch != NULL, FALSE);
g_return_val_if_fail(key != NULL, FALSE);
g_return_val_if_fail(value != NULL, FALSE);
Expand All @@ -188,11 +199,15 @@ backend_put(gpointer backend_data, gpointer backend_batch, gchar const* key, gco
bson_init(selector);
bson_append_utf8(selector, "key", -1, key, -1);

g_mutex_lock(bd->mutex);

/// \todo use insert when possible
//mongoc_bulk_operation_insert(batch->bulk_op, document);
mongoc_bulk_operation_replace_one(batch->bulk_op, selector, document, TRUE);
batch->is_empty = FALSE;

g_mutex_unlock(bd->mutex);

/*
if (!ret)
{
Expand All @@ -214,19 +229,22 @@ static gboolean
backend_delete(gpointer backend_data, gpointer backend_batch, gchar const* key)
{
JMongoDBBatch* batch = backend_batch;
JMongoDBData* bd = backend_data;
bson_t document[1];

(void)backend_data;

g_return_val_if_fail(backend_batch != NULL, FALSE);
g_return_val_if_fail(key != NULL, FALSE);

bson_init(document);
bson_append_utf8(document, "key", -1, key, -1);

g_mutex_lock(bd->mutex);

mongoc_bulk_operation_remove(batch->bulk_op, document);
batch->is_empty = FALSE;

g_mutex_unlock(bd->mutex);

bson_destroy(document);

return TRUE;
Expand Down Expand Up @@ -257,6 +275,8 @@ backend_get(gpointer backend_data, gpointer backend_batch, gchar const* key, gpo
bson_init(opts);
bson_append_int32(opts, "limit", -1, 1);

g_mutex_lock(bd->mutex);

m_collection = mongoc_client_get_collection(bd->connection, bd->database, batch->namespace);
cursor = mongoc_collection_find_with_opts(m_collection, document, opts, NULL);

Expand All @@ -283,6 +303,8 @@ backend_get(gpointer backend_data, gpointer backend_batch, gchar const* key, gpo
}
}

g_mutex_unlock(bd->mutex);

bson_destroy(opts);
bson_destroy(document);

Expand All @@ -307,9 +329,13 @@ backend_get_all(gpointer backend_data, gchar const* namespace, gpointer* backend

bson_init(document);

g_mutex_lock(bd->mutex);

m_collection = mongoc_client_get_collection(bd->connection, bd->database, namespace);
cursor = mongoc_collection_find_with_opts(m_collection, document, NULL, NULL);

g_mutex_unlock(bd->mutex);

if (cursor != NULL)
{
ret = TRUE;
Expand Down Expand Up @@ -345,9 +371,13 @@ backend_get_by_prefix(gpointer backend_data, gchar const* namespace, gchar const
bson_init(document);
bson_append_regex(document, "key", -1, regex_prefix, NULL);

g_mutex_lock(bd->mutex);

m_collection = mongoc_client_get_collection(bd->connection, bd->database, namespace);
cursor = mongoc_collection_find_with_opts(m_collection, document, NULL, NULL);

g_mutex_unlock(bd->mutex);

if (cursor != NULL)
{
ret = TRUE;
Expand All @@ -364,18 +394,20 @@ backend_get_by_prefix(gpointer backend_data, gchar const* namespace, gchar const
static gboolean
backend_iterate(gpointer backend_data, gpointer backend_iterator, gchar const** key, gconstpointer* value, guint32* len)
{
JMongoDBData* bd = backend_data;

bson_t const* result;
bson_iter_t iter;
mongoc_cursor_t* cursor = backend_iterator;

gboolean ret = FALSE;

(void)backend_data;

g_return_val_if_fail(backend_iterator != NULL, FALSE);
g_return_val_if_fail(value != NULL, FALSE);
g_return_val_if_fail(len != NULL, FALSE);

g_mutex_lock(bd->mutex);

/// \todo
if (mongoc_cursor_next(cursor, &result))
{
Expand Down Expand Up @@ -408,6 +440,8 @@ backend_iterate(gpointer backend_data, gpointer backend_iterator, gchar const**
mongoc_cursor_destroy(cursor);
}

g_mutex_unlock(bd->mutex);

return ret;
}

Expand All @@ -431,6 +465,8 @@ backend_init(gchar const* path, gpointer* backend_data)
bd->host = g_strdup(split[0]);
bd->database = g_strdup(split[1]);

g_mutex_init(bd->mutex);

g_return_val_if_fail(bd->host != NULL, FALSE);
g_return_val_if_fail(bd->database != NULL, FALSE);

Expand Down Expand Up @@ -471,6 +507,8 @@ backend_fini(gpointer backend_data)
g_free(bd->database);
g_free(bd->host);

g_mutex_clear(bd->mutex);

g_free(bd);

mongoc_cleanup();
Expand Down
6 changes: 3 additions & 3 deletions doc/dependencies.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Alternatively, Docker can be used.
### MongoDB

```console
$ podman run --publish 27017:27017 mongo
$ podman run --publish 27017:27017 docker.io/library/mongo
```

### MariaDB
Expand All @@ -112,7 +112,7 @@ $ podman run --publish 3306:3306 \
--env MARIADB_DATABASE=julea_db \
--env MARIADB_USER=julea_user \
--env MARIADB_PASSWORD=julea_pw \
mariadb
docker.io/library/mariadb
```

### MySQL
Expand All @@ -123,5 +123,5 @@ $ podman run --publish 3306:3306 \
--env MYSQL_DATABASE=julea_db \
--env MYSQL_USER=julea_user \
--env MYSQL_PASSWORD=julea_pw \
mysql
docker.io/library/mysql
```
2 changes: 1 addition & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ if fuse_dep.found()
])

executable('julea-fuse', julea_fuse_srcs,
dependencies: [glib_dep, libbson_dep, julea_dep, julea_client_deps['object'], julea_client_deps['kv'], fuse_dep],
dependencies: common_deps + [julea_dep, julea_client_deps['object'], julea_client_deps['kv'], fuse_dep],
include_directories: julea_incs,
install: true,
)
Expand Down

0 comments on commit d1e1bca

Please sign in to comment.