Skip to content

Commit

Permalink
filterx: rework FilterXScope to support floating and message-tied var…
Browse files Browse the repository at this point in the history
…iables

Signed-off-by: Balazs Scheidler <[email protected]>
  • Loading branch information
bazsi committed Apr 8, 2024
1 parent 7175992 commit 953af92
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 55 deletions.
30 changes: 20 additions & 10 deletions lib/filterx/expr-message-ref.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,43 +37,53 @@ _eval(FilterXExpr *s)
FilterXMessageRefExpr *self = (FilterXMessageRefExpr *) s;
FilterXEvalContext *context = filterx_eval_get_context();
LogMessage *msg = context->msgs[0];
FilterXObject *msg_ref;
FilterXVariable *variable;

msg_ref = filterx_scope_lookup_message_ref(context->scope, self->handle);
if (msg_ref)
return msg_ref;
variable = filterx_scope_lookup_variable(context->scope, self->handle);
if (variable)
return filterx_variable_get_value(variable);

gssize value_len;
LogMessageValueType t;
const gchar *value = log_msg_get_value_if_set_with_type(msg, self->handle, &value_len, &t);
if (!value)
return NULL;

msg_ref = filterx_message_value_new_borrowed(value, value_len, t);
filterx_scope_register_message_ref(context->scope, self->handle, msg_ref);
FilterXObject *msg_ref = filterx_message_value_new_borrowed(value, value_len, t);
variable = filterx_scope_register_variable(context->scope, self->handle, FALSE, msg_ref);
return msg_ref;
}

static void
_update_repr(FilterXExpr *s, FilterXObject *new_repr)
{
FilterXMessageRefExpr *self = (FilterXMessageRefExpr *) s;
FilterXEvalContext *context = filterx_eval_get_context();
FilterXScope *scope = filterx_eval_get_scope();
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);

filterx_scope_register_message_ref(context->scope, self->handle, new_repr);
g_assert(variable != NULL);
filterx_variable_set_value(variable, new_repr);
}

static gboolean
_assign(FilterXExpr *s, FilterXObject *new_value)
{
FilterXMessageRefExpr *self = (FilterXMessageRefExpr *) s;
FilterXScope *scope = filterx_eval_get_scope();
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);

if (!variable)
{
/* NOTE: we pass NULL as initial_value to make sure the new variable
* is considered changed due to the assignment */
variable = filterx_scope_register_variable(scope, self->handle, FALSE, NULL);
}

/* this only clones mutable objects */
new_value = filterx_object_clone(new_value);
filterx_variable_set_value(variable, new_value);


filterx_scope_register_message_ref(scope, self->handle, new_value);
new_value->assigned = TRUE;

filterx_object_unref(new_value);
return TRUE;
Expand Down
9 changes: 1 addition & 8 deletions lib/filterx/filterx-object.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,9 @@ struct _FilterXObject
* modified_in_place -- set to TRUE in case the value in this
* FilterXObject was changed
*
* assigned -- should be stored in the FilterXScope but we
* can reuse a bit here. Signifies if the value was assigned to a
* name-value pair.
*
* shadow -- this object is a shadow of a LogMessage
* name-value pair. Whenever assigned to another name-value pair,
* this needs to be copied.
*
*/
guint thread_index:16, modified_in_place:1, shadow:1, assigned:1;
guint thread_index:16, modified_in_place:1;
FilterXType *type;
};

Expand Down
129 changes: 94 additions & 35 deletions lib/filterx/filterx-scope.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,87 @@
#include "filterx/filterx-scope.h"
#include "scratch-buffers.h"

struct _FilterXVariable
{
NVHandle handle;
/*
* floating -- Indicates that this variable is not tied to the log
* message, it is a floating variable
* assigned -- Indicates that the variable was assigned to a new value
*/
guint32 floating:1,
assigned:1;
FilterXObject *value;
};

FilterXObject *
filterx_variable_get_value(FilterXVariable *v)
{
return filterx_object_ref(v->value);
}

void
filterx_variable_set_value(FilterXVariable *v, FilterXObject *new_value)
{
filterx_object_unref(v->value);
v->value = filterx_object_ref(new_value);
v->assigned = TRUE;
}

static void
_variable_free(FilterXVariable *v)
{
filterx_object_unref(v->value);
}

struct _FilterXScope
{
GAtomicCounter ref_cnt;
GHashTable *value_cache;
GArray *variables;
GHashTable *by_handle;
GPtrArray *weak_refs;
gboolean write_protected;
};

FilterXObject *
filterx_scope_lookup_message_ref(FilterXScope *self, NVHandle handle)
static gboolean
_lookup_variable(FilterXScope *self, NVHandle handle, FilterXVariable **v)
{
FilterXObject *object = NULL;
gpointer _value = NULL;

if (g_hash_table_lookup_extended(self->value_cache, GINT_TO_POINTER(handle), NULL, (gpointer *) &object))
if (g_hash_table_lookup_extended(self->by_handle, GINT_TO_POINTER(handle), NULL, &_value))
{
filterx_object_ref(object);
*v = (FilterXVariable *) _value;
return *v != NULL;
}
return object;
return FALSE;
}

void
filterx_scope_register_message_ref(FilterXScope *self, NVHandle handle, FilterXObject *value)
FilterXVariable *
filterx_scope_lookup_variable(FilterXScope *self, NVHandle handle)
{
g_assert(self->write_protected == FALSE);
FilterXVariable *v;

value->shadow = TRUE;
g_hash_table_insert(self->value_cache, GINT_TO_POINTER(handle), filterx_object_ref(value));
if (_lookup_variable(self, handle, &v))
return v;
return NULL;
}

FilterXVariable *
filterx_scope_register_variable(FilterXScope *self,
NVHandle handle, gboolean floating,
FilterXObject *initial_value)
{
FilterXVariable v, *v_slot;

v.handle = handle;
v.assigned = FALSE;
v.floating = floating;
v.value = filterx_object_ref(initial_value);
v_slot = &g_array_index(self->variables, FilterXVariable, self->variables->len);
g_array_append_val(self->variables, v);

g_hash_table_insert(self->by_handle, GINT_TO_POINTER(handle), v_slot);
return v_slot;
}

void
Expand All @@ -65,24 +119,30 @@ void
filterx_scope_sync_to_message(FilterXScope *self, LogMessage *msg)
{
GString *buffer = scratch_buffers_alloc();
GHashTableIter iter;
gpointer _key, _value;

g_hash_table_iter_init(&iter, self->value_cache);
while (g_hash_table_iter_next(&iter, &_key, &_value))
for (gint i = 0; i < self->variables->len; i++)
{
NVHandle handle = GPOINTER_TO_INT(_key);
FilterXObject *value = (FilterXObject *) _value;

if (!(value->modified_in_place || value->assigned))
FilterXVariable *v = &g_array_index(self->variables, FilterXVariable, i);

/* we don't need to sync the value if:
*
* 1) this is a floating variable; OR
*
* 2) the value was extracted from the message but was not changed in
* place (for mutable objects), and was not assigned to
*
*/
if (v->floating ||
!(v->assigned || v->value->modified_in_place))
continue;
LogMessageValueType t;
g_string_truncate(buffer, 0);
if (!filterx_object_marshal(value, buffer, &t))
if (!filterx_object_marshal(v->value, buffer, &t))
g_assert_not_reached();
log_msg_set_value_with_type(msg, handle, buffer->str, buffer->len, t);
log_msg_set_value_with_type(msg, v->handle, buffer->str, buffer->len, t);
v->value->modified_in_place = FALSE;
v->assigned = FALSE;
}

}

FilterXScope *
Expand All @@ -91,7 +151,9 @@ filterx_scope_new(void)
FilterXScope *self = g_new0(FilterXScope, 1);

g_atomic_counter_set(&self->ref_cnt, 1);
self->value_cache = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) filterx_object_unref);
self->variables = g_array_sized_new(FALSE, TRUE, sizeof(FilterXVariable), 16);
g_array_set_clear_func(self->variables, (GDestroyNotify) _variable_free);
self->by_handle = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, NULL);
self->weak_refs = g_ptr_array_new_with_free_func((GDestroyNotify) filterx_object_unref);
return self;
}
Expand All @@ -101,18 +163,14 @@ filterx_scope_clone(FilterXScope *other)
{
FilterXScope *self = filterx_scope_new();

GHashTableIter iter;
gpointer _key, _value;

g_hash_table_iter_init(&iter, self->value_cache);
while (g_hash_table_iter_next(&iter, &_key, &_value))
for (gint i = 0; i < other->variables->len; i++)
{
NVHandle handle = GPOINTER_TO_INT(_key);
FilterXObject *value = (FilterXObject *) _value;
FilterXVariable *v = &g_array_index(other->variables, FilterXVariable, i);
g_array_append_val(self->variables, *v);
FilterXVariable *v_clone = &g_array_index(self->variables, FilterXVariable, i);

/* NOTE: clone will not actually clone inmutable objects, in those
* cases we just take a reference */
g_hash_table_insert(self->value_cache, GINT_TO_POINTER(handle), filterx_object_clone(value));
v_clone->value = filterx_object_clone(v->value);
g_hash_table_insert(self->by_handle, GINT_TO_POINTER(v->handle), v_clone);
}

/* NOTE: we don't clone weak references, those only relate to mutable
Expand Down Expand Up @@ -143,7 +201,8 @@ filterx_scope_make_writable(FilterXScope **pself)
static void
_free(FilterXScope *self)
{
g_hash_table_unref(self->value_cache);
g_array_free(self->variables, TRUE);
g_hash_table_unref(self->by_handle);
g_ptr_array_free(self->weak_refs, TRUE);
g_free(self);
}
Expand Down
25 changes: 23 additions & 2 deletions lib/filterx/filterx-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,34 @@
#include "filterx-object.h"
#include "logmsg/logmsg.h"

typedef struct _FilterXVariable FilterXVariable;

FilterXObject *filterx_variable_get_value(FilterXVariable *v);
void filterx_variable_set_value(FilterXVariable *v, FilterXObject *new_value);

/*
* FilterXScope represents variables in a filterx scope.
*
* Variables are either tied to a LogMessage (when we are caching
* demarshalled values in the scope) or are values that are "floating", e.g.
* not yet tied to any values in the underlying LogMessage.
*
* Floating values are "temp" values that are not synced to the LogMessage
* upon the exit from the scope.
*
*/
typedef struct _FilterXScope FilterXScope;

void filterx_scope_sync_to_message(FilterXScope *self, LogMessage *msg);
FilterXObject *filterx_scope_lookup_message_ref(FilterXScope *self, NVHandle handle);
void filterx_scope_register_message_ref(FilterXScope *self, NVHandle handle, FilterXObject *value);

FilterXVariable *filterx_scope_lookup_variable(FilterXScope *self, NVHandle handle);
FilterXVariable *filterx_scope_register_variable(FilterXScope *self,
NVHandle handle, gboolean floating,
FilterXObject *initial_value);

void filterx_scope_store_weak_ref(FilterXScope *self, FilterXObject *object);


/* copy on write */
void filterx_scope_write_protect(FilterXScope *self);
FilterXScope *filterx_scope_make_writable(FilterXScope **pself);
Expand Down

0 comments on commit 953af92

Please sign in to comment.