-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathstorage.c
2830 lines (2362 loc) · 74.2 KB
/
storage.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
*******************************************************************************
*
* STORAGE INTERACTION
*
* This module is responsible for interaction with the storage of AQO data.
* It does not provide information protection from concurrent updates.
*
*******************************************************************************
*
* Copyright (c) 2016-2022, Postgres Professional
*
* IDENTIFICATION
* aqo/storage.c
*
*/
#include "postgres.h"
#include <unistd.h>
#include "funcapi.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "aqo.h"
#include "aqo_shared.h"
#include "machine_learning.h"
#include "preprocessing.h"
#include "storage.h"
/* AQO storage file names */
#define PGAQO_STAT_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_statistics.stat"
#define PGAQO_TEXT_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_query_texts.stat"
#define PGAQO_DATA_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_data.stat"
#define PGAQO_QUERIES_FILE PGSTAT_STAT_PERMANENT_DIRECTORY "/pgaqo_queries.stat"
#define AQO_DATA_COLUMNS (7)
#define FormVectorSz(v_name) (form_vector((v_name), (v_name ## _size)))
typedef enum {
QUERYID = 0, EXEC_TIME_AQO, EXEC_TIME, PLAN_TIME_AQO, PLAN_TIME,
EST_ERROR_AQO, EST_ERROR, NEXECS_AQO, NEXECS, TOTAL_NCOLS
} aqo_stat_cols;
typedef enum {
QT_QUERYID = 0, QT_QUERY_STRING, QT_TOTAL_NCOLS
} aqo_qtexts_cols;
typedef enum {
AD_FS = 0, AD_FSS, AD_NFEATURES, AD_FEATURES, AD_TARGETS, AD_RELIABILITY,
AD_OIDS, AD_TOTAL_NCOLS
} aqo_data_cols;
typedef enum {
AQ_QUERYID = 0, AQ_FS, AQ_LEARN_AQO, AQ_USE_AQO, AQ_AUTO_TUNING, AQ_SMART_TIMEOUT, AQ_COUNT_INCREASE_TIMEOUT,
AQ_TOTAL_NCOLS
} aqo_queries_cols;
typedef void* (*form_record_t) (void *ctx, size_t *size);
typedef bool (*deform_record_t) (void *data, size_t size);
int querytext_max_size = 1000;
int dsm_size_max = 100; /* in MB */
HTAB *stat_htab = NULL;
HTAB *queries_htab = NULL;
HTAB *qtexts_htab = NULL;
static dsa_area *qtext_dsa = NULL;
HTAB *data_htab = NULL;
static dsa_area *data_dsa = NULL;
static HTAB *deactivated_queries = NULL;
/* Used to check data file consistency */
static const uint32 PGAQO_FILE_HEADER = 123467589;
static const uint32 PGAQO_PG_MAJOR_VERSION = PG_VERSION_NUM / 100;
/*
* Used for internal aqo_queries_store() calls.
* No NULL arguments expected in this case.
*/
AqoQueriesNullArgs aqo_queries_nulls = { false, false, false, false };
static ArrayType *form_matrix(double *matrix, int nrows, int ncols);
static void dsa_init(void);
static int data_store(const char *filename, form_record_t callback,
long nrecs, void *ctx);
static void data_load(const char *filename, deform_record_t callback, void *ctx);
static size_t _compute_data_dsa(const DataEntry *entry);
static bool _aqo_stat_remove(uint64 queryid);
static bool _aqo_queries_remove(uint64 queryid);
static bool _aqo_qtexts_remove(uint64 queryid);
static bool _aqo_data_remove(data_key *key);
static bool neirest_neighbor(double **matrix, int old_rows, double *neighbor, int cols);
static double fs_distance(double *a, double *b, int len);
PG_FUNCTION_INFO_V1(aqo_query_stat);
PG_FUNCTION_INFO_V1(aqo_query_texts);
PG_FUNCTION_INFO_V1(aqo_data);
PG_FUNCTION_INFO_V1(aqo_queries);
PG_FUNCTION_INFO_V1(aqo_enable_query);
PG_FUNCTION_INFO_V1(aqo_disable_query);
PG_FUNCTION_INFO_V1(aqo_queries_update);
PG_FUNCTION_INFO_V1(aqo_reset);
PG_FUNCTION_INFO_V1(aqo_cleanup);
PG_FUNCTION_INFO_V1(aqo_drop_class);
PG_FUNCTION_INFO_V1(aqo_cardinality_error);
PG_FUNCTION_INFO_V1(aqo_execution_time);
PG_FUNCTION_INFO_V1(aqo_query_texts_update);
PG_FUNCTION_INFO_V1(aqo_query_stat_update);
PG_FUNCTION_INFO_V1(aqo_data_update);
bool
load_fss_ext(uint64 fs, int fss, OkNNrdata *data, List **reloids)
{
return load_aqo_data(fs, fss, data, reloids, false, NULL);
}
bool
update_fss_ext(uint64 fs, int fss, OkNNrdata *data, List *reloids)
{
/*
* 'reloids' explictly passed to aqo_data_store().
* So AqoDataArgs fields 'nrels' & 'oids' are
* set to 0 and NULL repectively.
*/
AqoDataArgs data_arg =
{data->rows, data->cols, 0, data->matrix,
data->targets, data->rfactors, NULL};
return aqo_data_store(fs, fss, &data_arg, reloids);
}
/*
* Forms ArrayType object for storage from simple C-array matrix.
*/
ArrayType *
form_matrix(double *matrix, int nrows, int ncols)
{
Datum *elems;
ArrayType *array;
int dims[2] = {nrows, ncols};
int lbs[2];
int i,
j;
lbs[0] = lbs[1] = 1;
elems = palloc(sizeof(*elems) * nrows * ncols);
for (i = 0; i < nrows; ++i)
for (j = 0; j < ncols; ++j)
{
elems[i * ncols + j] = Float8GetDatum(matrix[i * ncols + j]);
Assert(!isnan(matrix[i * ncols + j]));
}
array = construct_md_array(elems, NULL, 2, dims, lbs,
FLOAT8OID, 8, FLOAT8PASSBYVAL, 'd');
return array;
}
/*
* Forms ArrayType object for storage from simple C-array vector.
*/
static ArrayType *
form_vector(double *vector, int nrows)
{
Datum *elems;
ArrayType *array;
int dims[1];
int lbs[1];
int i;
dims[0] = nrows;
lbs[0] = 1;
elems = palloc(sizeof(*elems) * nrows);
for (i = 0; i < nrows; ++i)
elems[i] = Float8GetDatum(vector[i]);
array = construct_md_array(elems, NULL, 1, dims, lbs,
FLOAT8OID, 8, FLOAT8PASSBYVAL, 'd');
return array;
}
/* Creates a storage for hashes of deactivated queries */
void
init_deactivated_queries_storage(void)
{
HASHCTL hash_ctl;
/* Create the hashtable proper */
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(uint64);
hash_ctl.entrysize = sizeof(uint64);
deactivated_queries = hash_create("AQO deactivated queries",
128, /* start small and extend */
&hash_ctl,
HASH_ELEM | HASH_BLOBS);
}
/* Checks whether the query with given hash is deactivated */
bool
query_is_deactivated(uint64 queryid)
{
bool found;
(void) hash_search(deactivated_queries, &queryid, HASH_FIND, &found);
return found;
}
/* Adds given query hash into the set of hashes of deactivated queries */
void
add_deactivated_query(uint64 queryid)
{
(void) hash_search(deactivated_queries, &queryid, HASH_ENTER, NULL);
}
static void
reset_deactivated_queries(void)
{
HASH_SEQ_STATUS hash_seq;
uint64 *queryid;
hash_seq_init(&hash_seq, deactivated_queries);
while ((queryid = hash_seq_search(&hash_seq)) != NULL)
{
if (!hash_search(deactivated_queries, queryid, HASH_REMOVE, NULL))
elog(PANIC, "[AQO] hash table corrupted");
}
}
/*
* Update AQO statistics.
*
* Add a record (or update an existed) to stat storage for the query class.
* Returns a copy of stat entry, allocated in current memory context. Caller is
* in charge to free this struct after usage.
* If stat hash table is full, return NULL and log this fact.
*/
StatEntry *
aqo_stat_store(uint64 queryid, bool use_aqo, AqoStatArgs *stat_arg,
bool append_mode)
{
StatEntry *entry;
bool found;
int pos;
bool tblOverflow;
HASHACTION action;
Assert(stat_htab);
LWLockAcquire(&aqo_state->stat_lock, LW_EXCLUSIVE);
tblOverflow = hash_get_num_entries(stat_htab) < fs_max_items ? false : true;
action = tblOverflow ? HASH_FIND : HASH_ENTER;
entry = (StatEntry *) hash_search(stat_htab, &queryid, action, &found);
/* Initialize entry on first usage */
if (!found)
{
uint64 qid;
if (action == HASH_FIND)
{
/*
* Hash table is full. To avoid possible problems - don't try to add
* more, just exit
*/
LWLockRelease(&aqo_state->stat_lock);
ereport(LOG,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("[AQO] Stat storage is full. No more feature spaces can be added."),
errhint("Increase value of aqo.fs_max_items on restart of the instance")));
return NULL;
}
qid = entry->queryid;
memset(entry, 0, sizeof(StatEntry));
entry->queryid = qid;
}
if (!append_mode)
{
size_t sz;
if (found)
{
memset(entry, 0, sizeof(StatEntry));
entry->queryid = queryid;
}
sz = stat_arg->cur_stat_slot_aqo * sizeof(entry->est_error_aqo[0]);
memcpy(entry->plan_time_aqo, stat_arg->plan_time_aqo, sz);
memcpy(entry->exec_time_aqo, stat_arg->exec_time_aqo, sz);
memcpy(entry->est_error_aqo, stat_arg->est_error_aqo, sz);
entry->execs_with_aqo = stat_arg->execs_with_aqo;
entry->cur_stat_slot_aqo = stat_arg->cur_stat_slot_aqo;
sz = stat_arg->cur_stat_slot * sizeof(entry->est_error[0]);
memcpy(entry->plan_time, stat_arg->plan_time, sz);
memcpy(entry->exec_time, stat_arg->exec_time, sz);
memcpy(entry->est_error, stat_arg->est_error, sz);
entry->execs_without_aqo = stat_arg->execs_without_aqo;
entry->cur_stat_slot = stat_arg->cur_stat_slot;
aqo_state->stat_changed = true;
LWLockRelease(&aqo_state->stat_lock);
return entry;
}
/* Update the entry data */
if (use_aqo)
{
Assert(entry->cur_stat_slot_aqo >= 0);
pos = entry->cur_stat_slot_aqo;
if (entry->cur_stat_slot_aqo < STAT_SAMPLE_SIZE - 1)
entry->cur_stat_slot_aqo++;
else
{
size_t sz = (STAT_SAMPLE_SIZE - 1) * sizeof(entry->est_error_aqo[0]);
Assert(entry->cur_stat_slot_aqo = STAT_SAMPLE_SIZE - 1);
memmove(entry->plan_time_aqo, &entry->plan_time_aqo[1], sz);
memmove(entry->exec_time_aqo, &entry->exec_time_aqo[1], sz);
memmove(entry->est_error_aqo, &entry->est_error_aqo[1], sz);
}
entry->execs_with_aqo++;
entry->plan_time_aqo[pos] = *stat_arg->plan_time_aqo;
entry->exec_time_aqo[pos] = *stat_arg->exec_time_aqo;
entry->est_error_aqo[pos] = *stat_arg->est_error_aqo;
}
else
{
Assert(entry->cur_stat_slot >= 0);
pos = entry->cur_stat_slot;
if (entry->cur_stat_slot < STAT_SAMPLE_SIZE - 1)
entry->cur_stat_slot++;
else
{
size_t sz = (STAT_SAMPLE_SIZE - 1) * sizeof(entry->est_error[0]);
Assert(entry->cur_stat_slot = STAT_SAMPLE_SIZE - 1);
memmove(entry->plan_time, &entry->plan_time[1], sz);
memmove(entry->exec_time, &entry->exec_time[1], sz);
memmove(entry->est_error, &entry->est_error[1], sz);
}
entry->execs_without_aqo++;
entry->plan_time[pos] = *stat_arg->plan_time;
entry->exec_time[pos] = *stat_arg->exec_time;
entry->est_error[pos] = *stat_arg->est_error;
}
entry = memcpy(palloc(sizeof(StatEntry)), entry, sizeof(StatEntry));
aqo_state->stat_changed = true;
LWLockRelease(&aqo_state->stat_lock);
return entry;
}
/*
* Returns AQO statistics on controlled query classes.
*/
Datum
aqo_query_stat(PG_FUNCTION_ARGS)
{
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupDesc;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
Tuplestorestate *tupstore;
Datum values[TOTAL_NCOLS + 1];
bool nulls[TOTAL_NCOLS + 1];
HASH_SEQ_STATUS hash_seq;
StatEntry *entry;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not allowed in this context")));
/* Switch into long-lived context to construct returned data structures */
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupDesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
Assert(tupDesc->natts == TOTAL_NCOLS);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupDesc;
MemoryContextSwitchTo(oldcontext);
memset(nulls, 0, TOTAL_NCOLS + 1);
LWLockAcquire(&aqo_state->stat_lock, LW_SHARED);
hash_seq_init(&hash_seq, stat_htab);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
memset(nulls, 0, TOTAL_NCOLS + 1);
values[QUERYID] = Int64GetDatum(entry->queryid);
values[NEXECS] = Int64GetDatum(entry->execs_without_aqo);
values[NEXECS_AQO] = Int64GetDatum(entry->execs_with_aqo);
values[EXEC_TIME_AQO] = PointerGetDatum(form_vector(entry->exec_time_aqo, entry->cur_stat_slot_aqo));
values[EXEC_TIME] = PointerGetDatum(form_vector(entry->exec_time, entry->cur_stat_slot));
values[PLAN_TIME_AQO] = PointerGetDatum(form_vector(entry->plan_time_aqo, entry->cur_stat_slot_aqo));
values[PLAN_TIME] = PointerGetDatum(form_vector(entry->plan_time, entry->cur_stat_slot));
values[EST_ERROR_AQO] = PointerGetDatum(form_vector(entry->est_error_aqo, entry->cur_stat_slot_aqo));
values[EST_ERROR] = PointerGetDatum(form_vector(entry->est_error, entry->cur_stat_slot));
tuplestore_putvalues(tupstore, tupDesc, values, nulls);
}
LWLockRelease(&aqo_state->stat_lock);
return (Datum) 0;
}
static long
aqo_stat_reset(void)
{
HASH_SEQ_STATUS hash_seq;
StatEntry *entry;
long num_remove = 0;
long num_entries;
LWLockAcquire(&aqo_state->stat_lock, LW_EXCLUSIVE);
num_entries = hash_get_num_entries(stat_htab);
hash_seq_init(&hash_seq, stat_htab);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
if (!hash_search(stat_htab, &entry->queryid, HASH_REMOVE, NULL))
elog(PANIC, "[AQO] hash table corrupted");
num_remove++;
}
aqo_state->stat_changed = true;
LWLockRelease(&aqo_state->stat_lock);
if (num_remove != num_entries)
elog(ERROR, "[AQO] Stat memory storage is corrupted or parallel access without a lock was detected.");
aqo_stat_flush();
return num_remove;
}
static void *
_form_stat_record_cb(void *ctx, size_t *size)
{
HASH_SEQ_STATUS *hash_seq = (HASH_SEQ_STATUS *) ctx;
StatEntry *entry;
*size = sizeof(StatEntry);
entry = hash_seq_search(hash_seq);
if (entry == NULL)
return NULL;
return memcpy(palloc(*size), entry, *size);
}
/* Implement data flushing according to pgss_shmem_shutdown() */
void
aqo_stat_flush(void)
{
HASH_SEQ_STATUS hash_seq;
int ret;
long entries;
/* Use exclusive lock to prevent concurrent flushing in different backends. */
LWLockAcquire(&aqo_state->stat_lock, LW_EXCLUSIVE);
if (!aqo_state->stat_changed)
/* Hash table wasn't changed, meaningless to store it in permanent storage */
goto end;
entries = hash_get_num_entries(stat_htab);
hash_seq_init(&hash_seq, stat_htab);
ret = data_store(PGAQO_STAT_FILE, _form_stat_record_cb, entries,
(void *) &hash_seq);
if (ret != 0)
hash_seq_term(&hash_seq);
else
/* Hash table and disk storage are now consistent */
aqo_state->stat_changed = false;
end:
LWLockRelease(&aqo_state->stat_lock);
}
static void *
_form_qtext_record_cb(void *ctx, size_t *size)
{
HASH_SEQ_STATUS *hash_seq = (HASH_SEQ_STATUS *) ctx;
QueryTextEntry *entry;
void *data;
char *query_string;
char *ptr;
entry = hash_seq_search(hash_seq);
if (entry == NULL)
return NULL;
Assert(DsaPointerIsValid(entry->qtext_dp));
query_string = dsa_get_address(qtext_dsa, entry->qtext_dp);
Assert(query_string != NULL);
*size = sizeof(entry->queryid) + strlen(query_string) + 1;
ptr = data = palloc(*size);
Assert(ptr != NULL);
memcpy(ptr, &entry->queryid, sizeof(entry->queryid));
ptr += sizeof(entry->queryid);
memcpy(ptr, query_string, strlen(query_string) + 1);
return data;
}
void
aqo_qtexts_flush(void)
{
HASH_SEQ_STATUS hash_seq;
int ret;
long entries;
dsa_init();
LWLockAcquire(&aqo_state->qtexts_lock, LW_EXCLUSIVE);
if (!aqo_state->qtexts_changed)
/* XXX: mull over forced mode. */
goto end;
entries = hash_get_num_entries(qtexts_htab);
hash_seq_init(&hash_seq, qtexts_htab);
ret = data_store(PGAQO_TEXT_FILE, _form_qtext_record_cb, entries,
(void *) &hash_seq);
if (ret != 0)
hash_seq_term(&hash_seq);
else
/* Hash table and disk storage are now consistent */
aqo_state->qtexts_changed = false;
end:
LWLockRelease(&aqo_state->qtexts_lock);
}
/*
* Getting a hash table iterator, return a newly allocated memory chunk and its
* size for subsequent writing into storage.
*/
static void *
_form_data_record_cb(void *ctx, size_t *size)
{
HASH_SEQ_STATUS *hash_seq = (HASH_SEQ_STATUS *) ctx;
DataEntry *entry;
char *data;
char *ptr,
*dsa_ptr;
size_t sz;
entry = hash_seq_search(hash_seq);
if (entry == NULL)
return NULL;
/* Size of data is DataEntry (without DSA pointer) plus size of DSA chunk */
sz = offsetof(DataEntry, data_dp) + _compute_data_dsa(entry);
ptr = data = palloc(sz);
/* Put the data into the chunk */
/* Plane copy of all bytes of hash table entry */
memcpy(ptr, entry, offsetof(DataEntry, data_dp));
ptr += offsetof(DataEntry, data_dp);
Assert(DsaPointerIsValid(entry->data_dp));
dsa_ptr = (char *) dsa_get_address(data_dsa, entry->data_dp);
Assert((sz - (ptr - data)) == _compute_data_dsa(entry));
memcpy(ptr, dsa_ptr, sz - (ptr - data));
*size = sz;
return data;
}
void
aqo_data_flush(void)
{
HASH_SEQ_STATUS hash_seq;
int ret;
long entries;
dsa_init();
LWLockAcquire(&aqo_state->data_lock, LW_EXCLUSIVE);
if (!aqo_state->data_changed)
/* XXX: mull over forced mode. */
goto end;
entries = hash_get_num_entries(data_htab);
hash_seq_init(&hash_seq, data_htab);
ret = data_store(PGAQO_DATA_FILE, _form_data_record_cb, entries,
(void *) &hash_seq);
if (ret != 0)
/*
* Something happened and storing procedure hasn't finished walking
* along all records of the hash table.
*/
hash_seq_term(&hash_seq);
else
/* Hash table and disk storage are now consistent */
aqo_state->data_changed = false;
end:
LWLockRelease(&aqo_state->data_lock);
}
static void *
_form_queries_record_cb(void *ctx, size_t *size)
{
HASH_SEQ_STATUS *hash_seq = (HASH_SEQ_STATUS *) ctx;
QueriesEntry *entry;
*size = sizeof(QueriesEntry);
entry = hash_seq_search(hash_seq);
if (entry == NULL)
return NULL;
return memcpy(palloc(*size), entry, *size);
}
void
aqo_queries_flush(void)
{
HASH_SEQ_STATUS hash_seq;
int ret;
long entries;
LWLockAcquire(&aqo_state->queries_lock, LW_EXCLUSIVE);
if (!aqo_state->queries_changed)
goto end;
entries = hash_get_num_entries(queries_htab);
hash_seq_init(&hash_seq, queries_htab);
ret = data_store(PGAQO_QUERIES_FILE, _form_queries_record_cb, entries,
(void *) &hash_seq);
if (ret != 0)
hash_seq_term(&hash_seq);
else
/* Hash table and disk storage are now consistent */
aqo_state->queries_changed = false;
end:
LWLockRelease(&aqo_state->queries_lock);
}
static int
data_store(const char *filename, form_record_t callback,
long nrecs, void *ctx)
{
FILE *file;
size_t size;
uint32 counter = 0;
void *data;
char *tmpfile;
tmpfile = psprintf("%s.tmp", filename);
file = AllocateFile(tmpfile, PG_BINARY_W);
if (file == NULL)
goto error;
if (fwrite(&PGAQO_FILE_HEADER, sizeof(uint32), 1, file) != 1 ||
fwrite(&PGAQO_PG_MAJOR_VERSION, sizeof(uint32), 1, file) != 1 ||
fwrite(&nrecs, sizeof(long), 1, file) != 1)
goto error;
while ((data = callback(ctx, &size)) != NULL)
{
/* TODO: Add CRC code ? */
if (fwrite(&size, sizeof(size), 1, file) != 1 ||
fwrite(data, size, 1, file) != 1)
goto error;
counter++;
}
Assert(counter == nrecs);
if (FreeFile(file))
{
file = NULL;
goto error;
}
/* Parallel (re)writing into a file haven't happen. */
(void) durable_rename(tmpfile, filename, PANIC);
elog(LOG, "[AQO] %d records stored in file %s.", counter, filename);
return 0;
error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not write AQO file \"%s\": %m", tmpfile)));
if (file)
FreeFile(file);
unlink(tmpfile);
pfree(tmpfile);
return -1;
}
static bool
_deform_stat_record_cb(void *data, size_t size)
{
bool found;
StatEntry *entry;
uint64 queryid;
Assert(LWLockHeldByMeInMode(&aqo_state->stat_lock, LW_EXCLUSIVE));
Assert(size == sizeof(StatEntry));
queryid = ((StatEntry *) data)->queryid;
entry = (StatEntry *) hash_search(stat_htab, &queryid, HASH_ENTER, &found);
Assert(!found && entry);
memcpy(entry, data, sizeof(StatEntry));
return true;
}
void
aqo_stat_load(void)
{
Assert(!LWLockHeldByMe(&aqo_state->stat_lock));
LWLockAcquire(&aqo_state->stat_lock, LW_EXCLUSIVE);
/* Load on postmaster sturtup. So no any concurrent actions possible here. */
Assert(hash_get_num_entries(stat_htab) == 0);
data_load(PGAQO_STAT_FILE, _deform_stat_record_cb, NULL);
LWLockRelease(&aqo_state->stat_lock);
}
static bool
_check_dsa_validity(dsa_pointer ptr)
{
if (DsaPointerIsValid(ptr))
return true;
elog(LOG, "[AQO] DSA Pointer isn't valid. Is the memory limit exceeded?");
return false;
}
static bool
_deform_qtexts_record_cb(void *data, size_t size)
{
bool found;
QueryTextEntry *entry;
uint64 queryid = *(uint64 *) data;
char *query_string = (char *) data + sizeof(queryid);
size_t len = size - sizeof(queryid);
char *strptr;
Assert(LWLockHeldByMeInMode(&aqo_state->qtexts_lock, LW_EXCLUSIVE));
Assert(strlen(query_string) + 1 == len);
entry = (QueryTextEntry *) hash_search(qtexts_htab, &queryid,
HASH_ENTER, &found);
Assert(!found);
entry->qtext_dp = dsa_allocate(qtext_dsa, len);
if (!_check_dsa_validity(entry->qtext_dp))
{
/*
* DSA stuck into problems. Rollback changes. Return false in belief
* that caller recognize it and don't try to call us more.
*/
(void) hash_search(qtexts_htab, &queryid, HASH_REMOVE, NULL);
return false;
}
strptr = (char *) dsa_get_address(qtext_dsa, entry->qtext_dp);
strlcpy(strptr, query_string, len);
return true;
}
void
aqo_qtexts_load(void)
{
uint64 queryid = 0;
bool found;
Assert(!LWLockHeldByMe(&aqo_state->qtexts_lock));
Assert(qtext_dsa != NULL);
LWLockAcquire(&aqo_state->qtexts_lock, LW_EXCLUSIVE);
if (hash_get_num_entries(qtexts_htab) != 0)
{
/* Someone have done it concurrently. */
elog(LOG, "[AQO] Another backend have loaded query texts concurrently.");
LWLockRelease(&aqo_state->qtexts_lock);
return;
}
data_load(PGAQO_TEXT_FILE, _deform_qtexts_record_cb, NULL);
/* Check existence of default feature space */
(void) hash_search(qtexts_htab, &queryid, HASH_FIND, &found);
aqo_state->qtexts_changed = false; /* mem data consistent with disk */
LWLockRelease(&aqo_state->qtexts_lock);
if (!found)
{
if (!aqo_qtext_store(0, "COMMON feature space (do not delete!)"))
elog(PANIC, "[AQO] DSA Initialization was unsuccessful");
}
}
/*
* Getting a data chunk from a caller, add a record into the 'ML data'
* shmem hash table. Allocate and fill DSA chunk for variadic part of the data.
*/
static bool
_deform_data_record_cb(void *data, size_t size)
{
bool found;
DataEntry *fentry = (DataEntry *) data; /*Depends on a platform? */
DataEntry *entry;
size_t sz;
char *ptr = (char *) data,
*dsa_ptr;
Assert(ptr != NULL);
Assert(LWLockHeldByMeInMode(&aqo_state->data_lock, LW_EXCLUSIVE));
entry = (DataEntry *) hash_search(data_htab, &fentry->key,
HASH_ENTER, &found);
Assert(!found);
/* Copy fixed-size part of entry byte-by-byte even with caves */
memcpy(entry, fentry, offsetof(DataEntry, data_dp));
ptr += offsetof(DataEntry, data_dp);
sz = _compute_data_dsa(entry);
Assert(sz + offsetof(DataEntry, data_dp) == size);
entry->data_dp = dsa_allocate(data_dsa, sz);
if (!_check_dsa_validity(entry->data_dp))
{
/*
* DSA stuck into problems. Rollback changes. Return false in belief
* that caller recognize it and don't try to call us more.
*/
(void) hash_search(data_htab, &fentry->key, HASH_REMOVE, NULL);
return false;
}
dsa_ptr = (char *) dsa_get_address(data_dsa, entry->data_dp);
Assert(dsa_ptr != NULL);
memcpy(dsa_ptr, ptr, sz);
return true;
}
void
aqo_data_load(void)
{
Assert(!LWLockHeldByMe(&aqo_state->data_lock));
Assert(data_dsa != NULL);
LWLockAcquire(&aqo_state->data_lock, LW_EXCLUSIVE);
if (hash_get_num_entries(data_htab) != 0)
{
/* Someone have done it concurrently. */
elog(LOG, "[AQO] Another backend have loaded query data concurrently.");
LWLockRelease(&aqo_state->data_lock);
return;
}
data_load(PGAQO_DATA_FILE, _deform_data_record_cb, NULL);
aqo_state->data_changed = false; /* mem data is consistent with disk */
LWLockRelease(&aqo_state->data_lock);
}
static bool
_deform_queries_record_cb(void *data, size_t size)
{
bool found;
QueriesEntry *entry;
uint64 queryid;
Assert(LWLockHeldByMeInMode(&aqo_state->queries_lock, LW_EXCLUSIVE));
Assert(size == sizeof(QueriesEntry));
queryid = ((QueriesEntry *) data)->queryid;
entry = (QueriesEntry *) hash_search(queries_htab, &queryid, HASH_ENTER, &found);
Assert(!found);
memcpy(entry, data, sizeof(QueriesEntry));
return true;
}
void
aqo_queries_load(void)
{
bool found;
uint64 queryid = 0;
Assert(!LWLockHeldByMe(&aqo_state->queries_lock));
LWLockAcquire(&aqo_state->queries_lock, LW_EXCLUSIVE);
/* Load on postmaster startup. So no any concurrent actions possible here. */
Assert(hash_get_num_entries(queries_htab) == 0);
data_load(PGAQO_QUERIES_FILE, _deform_queries_record_cb, NULL);
/* Check existence of default feature space */
(void) hash_search(queries_htab, &queryid, HASH_FIND, &found);
LWLockRelease(&aqo_state->queries_lock);
if (!found)
{
if (!aqo_queries_store(0, 0, 0, 0, 0, &aqo_queries_nulls))
elog(PANIC, "[AQO] aqo_queries initialization was unsuccessful");
}
}
static void
data_load(const char *filename, deform_record_t callback, void *ctx)
{
FILE *file;
long i;
uint32 header;
int32 pgver;
long num;
file = AllocateFile(filename, PG_BINARY_R);
if (file == NULL)
{
if (errno != ENOENT)
goto read_error;
return;
}
if (fread(&header, sizeof(uint32), 1, file) != 1 ||
fread(&pgver, sizeof(uint32), 1, file) != 1 ||
fread(&num, sizeof(long), 1, file) != 1)
goto read_error;
if (header != PGAQO_FILE_HEADER || pgver != PGAQO_PG_MAJOR_VERSION)
goto data_error;
for (i = 0; i < num; i++)
{
void *data;
size_t size;
bool res;
if (fread(&size, sizeof(size), 1, file) != 1)
goto read_error;
data = palloc(size);
if (fread(data, size, 1, file) != 1)
goto read_error;
res = callback(data, size);
if (!res)
{
/* Error detected. Do not try to read tails of the storage. */
elog(LOG, "[AQO] Because of an error skip %ld storage records.",
num - i);
break;
}
}
FreeFile(file);
elog(LOG, "[AQO] %ld records loaded from file %s.", num, filename);
return;
read_error:
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", filename)));
goto fail;
data_error:
ereport(LOG,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("ignoring invalid data in file \"%s\"", filename)));
fail:
if (file)
FreeFile(file);
unlink(filename);
}
static void
on_shmem_shutdown(int code, Datum arg)
{