Skip to content

Commit

Permalink
sql_cacher: Add support for dumping a given cache
Browse files Browse the repository at this point in the history
  • Loading branch information
liviuchircu committed Nov 20, 2024
1 parent 41aea5f commit 1687155
Showing 1 changed file with 243 additions and 2 deletions.
245 changes: 243 additions & 2 deletions modules/sql_cacher/sql_cacher.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "../../ipc.h"
#include "../../status_report.h"
#include "sql_cacher.h"
#include "../../lib/csv.h"

static int mod_init(void);
static void destroy(void);
Expand All @@ -44,7 +45,15 @@ int pv_parse_name(pv_spec_p sp, const str *in);
int pv_init_param(pv_spec_p sp, int param);
int pv_get_sql_cached_value(struct sip_msg *msg, pv_param_t *param, pv_value_t *res);
static int parse_cache_entry(unsigned int type, void *val);
static void optimize_cdb_decode(pv_name_fix_t *pv_name);
static void free_c_entry(cache_entry_t *c);
static int fixup_cache_dump_con(void **param);
static int fixup_cache_dump_fields(void **param);
static int fixup_cache_dump_fields_free(void **param);
static int fixup_cache_dump_ret(void **param);
static int fixup_cache_dump_ret_free(void **param);
static int sql_cache_dump(struct sip_msg *msg, db_handlers_t *dbh,
pv_name_fix_t *cols, pvname_list_t *dst_avps);

static mi_response_t *mi_reload_1(const mi_params_t *params,
struct mi_handler *async_hdl);
Expand All @@ -69,6 +78,17 @@ gen_lock_t *queries_lock;

void *sql_srg = NULL;

/* module functions */
static const cmd_export_t cmds[] = {
{"sql_cache_dump", (cmd_function)sql_cache_dump, {
{CMD_PARAM_STR, fixup_cache_dump_con, 0},
{CMD_PARAM_STR, fixup_cache_dump_fields, fixup_cache_dump_fields_free},
{CMD_PARAM_STR|CMD_PARAM_NO_EXPAND, fixup_cache_dump_ret, fixup_cache_dump_ret_free},
{0, 0, 0}},
ALL_ROUTES},
{0, 0, {{0, 0, 0}}, 0}
};

/* module parameters */
static const param_export_t mod_params[] = {
{"spec_delimiter", STR_PARAM, &spec_delimiter.s},
Expand Down Expand Up @@ -118,7 +138,7 @@ struct module_exports exports = {
DEFAULT_DLFLAGS, /* dlopen flags */
0, /* load function */
&deps, /* OpenSIPS module dependencies */
0, /* exported functions */
cmds, /* exported functions */
0, /* exported async functions */
mod_params, /* exported parameters */
0, /* exported statistics */
Expand Down Expand Up @@ -1499,7 +1519,7 @@ static int cdb_fetch(pv_name_fix_t *pv_name, str *cdb_res, int *entry_rld_vers)
* 2 - error
* 3 - does not match reload version (old value)
*/
static int cdb_val_decode(pv_name_fix_t *pv_name, str *cdb_val, int reload_version,
static int cdb_val_decode(const pv_name_fix_t *pv_name, const str *cdb_val, int reload_version,
str *str_res, int *int_res)
{
int int_val, next_str_off, i, rc;
Expand Down Expand Up @@ -2126,3 +2146,224 @@ static void destroy(void)
lock_destroy(queries_lock);
lock_dealloc(queries_lock);
}

/* make the param con @c_entry available during param fields processing */
static cache_entry_t *c_entry;
static int fixup_cache_dump_con(void **param)
{
db_handlers_t *it;
str s = *(str *)*param;

for (it = db_hdls_list; it; it = it->next) {
if (str_match(&it->c_entry->id, &s)) {
*param = it;
c_entry = it->c_entry;

if (!CACHEDB_CAPABILITY(&it->cdbf, CACHEDB_CAP_ITER_KEYS)) {
LM_ERR("cacheDB id '%.*s' has no support "
"for key iteration\n", s.len, s.s);
return -1;
}

return 0;
}
}
LM_ERR("caching id not found: '%.*s'\n", s.len, s.s);
return E_UNSPEC;
}

/* make the param 2 size available during param 3 processing */
static int ncols;

static int fixup_cache_dump_fields(void **param)
{
pv_name_fix_t *fixed_cols;
csv_record *cols, *col;
int i, len;
char *p;
str s = *(str *)*param;

cols = parse_csv_record(&s);
if (!cols) {
LM_ERR("oom\n");
return -1;
}

len = 0;
for (col = cols; col; col = col->next, ncols++) {
if (col->s.len == 0) {
LM_ERR("empty-string column in cache: '%.*s'\n", s.len, s.s);
return -1;
}
len += col->s.len;
}

fixed_cols = pkg_malloc((ncols + 1) * sizeof *fixed_cols + len);
if (!fixed_cols) {
LM_ERR("oom\n");
return -1;
}
memset(fixed_cols, 0, (ncols + 1) * sizeof *fixed_cols);
p = (char *)fixed_cols + (ncols + 1) * sizeof *fixed_cols;

for (i = 0, col = cols; col; col = col->next, i++) {
memcpy(p, col->s.s, col->s.len);
fixed_cols[i].id.len = -1; /* hack: signifies 'reload-version' */
fixed_cols[i].col.s = p;
fixed_cols[i].col.len = col->s.len;
fixed_cols[i].c_entry = c_entry;
p += col->s.len;
}
free_csv_record(cols);
c_entry = NULL;

*param = (void *)fixed_cols;
return 0;
}
static int fixup_cache_dump_fields_free(void **param)
{
pkg_free(*param);
*param = NULL;
return 0;
}
static int fixup_cache_dump_ret(void **param)
{
int i;
pvname_list_t *avp_list, *avp;
str s = *(str *)*param;

avp_list = parse_pvname_list(&s, PVT_AVP);
if (!avp_list) {
LM_ERR("failed to parse AVP list: %s\n", s.s);
return E_UNSPEC;
}

for (i = 0, avp = avp_list; avp; i++, avp = avp->next)
;

if (i != ncols) {
LM_ERR("number of columns (%d) differs from number of AVPs (%d)\n",
ncols, i);
return E_UNSPEC;
}

ncols = 0;
*param = (void *)avp_list;
return 0;
}

static int fixup_cache_dump_ret_free(void **param)
{
pvname_list_t *l = (pvname_list_t *)*param, *next;

while (l) {
next = l->next;
pkg_free(l);
l = next;
}

*param = NULL;
return 0;
}

static const pv_name_fix_t *_selected_cols;
static pvname_list_t *_out_avps;
static int _rld_ver;
static struct sip_msg *_sip_msg;
static int decode_kv2avps(const str *key, const str *value)
{
static pv_value_t val_null = {str_init("<null>"), 0, PV_VAL_STR};
const pv_name_fix_t *col;
pvname_list_t *avp;
pv_value_t val;
str str_res;
int rc, int_res;

LM_DBG("called for key %.*s, val: %.*s\n", key->len, key->s, value->len, value->s);

/* skip internal keys (not part of the SQL table dataset) */
if (key->s[_selected_cols->c_entry->id.len] == '_')
return -1;

/* each column offset is pre-computed; fill in the AVPs ASAP! */
for (col = _selected_cols, avp = _out_avps; col->c_entry;
col++, avp = avp->next) {

str_res = STR_NULL;
int_res = 0;
rc = cdb_val_decode(col, value, _rld_ver, &str_res, &int_res);
switch (rc) {
case 0:
if (is_str_column(col)) {
val.rs = str_res;
val.flags = PV_VAL_STR;
} else {
val.ri = int_res;
val.flags = PV_VAL_INT|PV_TYPE_INT;
}
break;

case 1:
val = val_null;
break;

default:
LM_ERR("failed to decode key: '%.*s', val: '%.*s' (%d)\n",
key->len, key->s, value->len, value->s, rc);
return -1;
}

if (avp->sname.setf(_sip_msg, &avp->sname.pvp, 0, &val) != 0) {
LM_ERR("failed to set AVP\n");
return -1;
}
}

return 0;
}

static int sql_cache_dump(struct sip_msg *msg, db_handlers_t *dbh,
pv_name_fix_t *cols, pvname_list_t *dst_avps)
{
cache_entry_t *cache = dbh->c_entry;
int i, n, ver;

LM_DBG("dumping data from cache: %.*s\n", cache->id.len, cache->id.s);

lock_start_read(cache->ref_lock);

ver = get_rld_vers_from_cache(cache, dbh);
if (ver < 0) {
lock_stop_read(cache->ref_lock);
LM_ERR("failed to get reload version\n");
return -1;
}

if (cols[0].id.len != ver) {
for (i = 0; cols[i].c_entry; i++) {
optimize_cdb_decode(&cols[i]);
LM_DBG("optimized fields for col '%.*s': %d/%d/%d\n",
cols[i].col.len, cols[i].col.s, cols[i].col_offset,
cols[i].col_nr, cols[i].last_str);
}

cols[0].id.len = ver;
}

_selected_cols = cols;
_out_avps = dst_avps;
_rld_ver = ver;
_sip_msg = msg;

n = dbh->cdbf.iter_keys(dbh->cdbcon, decode_kv2avps);
if (n < 0) {
lock_stop_read(cache->ref_lock);
LM_ERR("failed to fully iterate through cache '%.*s'\n",
cache->id.len, cache->id.s);
return -1;
}

lock_stop_read(cache->ref_lock);

return n == 0 ? -2 : n;
}

0 comments on commit 1687155

Please sign in to comment.