forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader_permit.hh
330 lines (257 loc) · 9.89 KB
/
reader_permit.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
/*
* Copyright (C) 2019-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <seastar/util/optimized_optional.hh>
#include "seastarx.hh"
#include "db/timeout_clock.hh"
#include "schema/schema_fwd.hh"
#include "tracing/trace_state.hh"
namespace query {
struct max_result_size;
}
namespace seastar {
class file;
} // namespace seastar
struct reader_resources {
int count = 0;
ssize_t memory = 0;
static reader_resources with_memory(ssize_t memory) { return reader_resources(0, memory); }
reader_resources() = default;
reader_resources(int count, ssize_t memory)
: count(count)
, memory(memory) {
}
reader_resources operator-(const reader_resources& other) const {
return reader_resources{count - other.count, memory - other.memory};
}
reader_resources& operator-=(const reader_resources& other) {
count -= other.count;
memory -= other.memory;
return *this;
}
reader_resources operator+(const reader_resources& other) const {
return reader_resources{count + other.count, memory + other.memory};
}
reader_resources& operator+=(const reader_resources& other) {
count += other.count;
memory += other.memory;
return *this;
}
bool non_zero() const {
return count > 0 || memory > 0;
}
};
inline bool operator==(const reader_resources& a, const reader_resources& b) {
return a.count == b.count && a.memory == b.memory;
}
class reader_concurrency_semaphore;
/// A permit for a specific read.
///
/// Used to track the read's resource consumption. Use `consume_memory()` to
/// register memory usage, which returns a `resource_units` RAII object that
/// should be held onto while the respective resources are in use.
class reader_permit {
friend class reader_concurrency_semaphore;
friend class tracking_allocator_base;
public:
class resource_units;
class need_cpu_guard;
class awaits_guard;
enum class state {
waiting_for_admission,
waiting_for_memory,
waiting_for_execution,
active,
active_need_cpu,
active_await,
inactive,
evicted,
};
class impl;
private:
shared_ptr<impl> _impl;
private:
reader_permit() = default;
reader_permit(shared_ptr<impl>);
explicit reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, std::string_view op_name,
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
explicit reader_permit(reader_concurrency_semaphore& semaphore, schema_ptr schema, sstring&& op_name,
reader_resources base_resources, db::timeout_clock::time_point timeout, tracing::trace_state_ptr trace_ptr);
reader_permit::impl& operator*() { return *_impl; }
reader_permit::impl* operator->() { return _impl.get(); }
void mark_need_cpu() noexcept;
void mark_not_need_cpu() noexcept;
void mark_awaits() noexcept;
void mark_not_awaits() noexcept;
operator bool() const { return bool(_impl); }
friend class optimized_optional<reader_permit>;
void consume(reader_resources res);
void signal(reader_resources res);
public:
~reader_permit();
reader_permit(const reader_permit&) = default;
reader_permit(reader_permit&&) = default;
reader_permit& operator=(const reader_permit&) = default;
reader_permit& operator=(reader_permit&&) = default;
bool operator==(const reader_permit& o) const {
return _impl == o._impl;
}
reader_concurrency_semaphore& semaphore();
const schema_ptr& get_schema() const;
std::string_view get_op_name() const;
state get_state() const;
bool needs_readmission() const;
// Call only when needs_readmission() = true.
future<> wait_readmission();
resource_units consume_memory(size_t memory = 0);
resource_units consume_resources(reader_resources res);
future<resource_units> request_memory(size_t memory);
reader_resources consumed_resources() const;
reader_resources base_resources() const;
void release_base_resources() noexcept;
sstring description() const;
db::timeout_clock::time_point timeout() const noexcept;
void set_timeout(db::timeout_clock::time_point timeout) noexcept;
const tracing::trace_state_ptr& trace_state() const noexcept;
void set_trace_state(tracing::trace_state_ptr trace_ptr) noexcept;
// If the read was aborted, throw the exception the read was aborted with.
// Otherwise no-op.
void check_abort();
query::max_result_size max_result_size() const;
void set_max_result_size(query::max_result_size);
void on_start_sstable_read() noexcept;
void on_finish_sstable_read() noexcept;
uintptr_t id() { return reinterpret_cast<uintptr_t>(_impl.get()); }
};
using reader_permit_opt = optimized_optional<reader_permit>;
class reader_permit::resource_units {
reader_permit _permit;
reader_resources _resources;
friend class reader_permit;
friend class reader_concurrency_semaphore;
private:
class already_consumed_tag {};
resource_units(reader_permit permit, reader_resources res, already_consumed_tag);
resource_units(reader_permit permit, reader_resources res);
public:
resource_units(const resource_units&) = delete;
resource_units(resource_units&&) noexcept;
~resource_units();
resource_units& operator=(const resource_units&) = delete;
resource_units& operator=(resource_units&&) noexcept;
void add(resource_units&& o);
void reset_to(reader_resources res);
void reset_to_zero() noexcept;
reader_permit permit() const { return _permit; }
reader_resources resources() const { return _resources; }
};
/// Mark a permit as needing CPU.
///
/// Conceptually, a permit is considered as needing CPU, when at least one reader
/// associated with it has an ongoing foreground operation initiated by
/// its consumer. E.g. a pending `fill_buffer()` call.
/// This class is an RAII need_cpu marker meant to be used by keeping it alive
/// while the reader is in need of CPU.
class reader_permit::need_cpu_guard {
reader_permit_opt _permit;
public:
explicit need_cpu_guard(reader_permit permit) noexcept : _permit(std::move(permit)) {
_permit->mark_need_cpu();
}
need_cpu_guard(need_cpu_guard&&) noexcept = default;
need_cpu_guard(const need_cpu_guard&) = delete;
~need_cpu_guard() {
if (_permit) {
_permit->mark_not_need_cpu();
}
}
need_cpu_guard& operator=(need_cpu_guard&&) = delete;
need_cpu_guard& operator=(const need_cpu_guard&) = delete;
};
/// Mark a permit as awaiting I/O or an operation running on a remote shard.
///
/// Conceptually, a permit is considered awaiting, when at least one reader
/// associated with it is waiting on I/O or a remote shard as part of a
/// foreground operation initiated by its consumer. E.g. an sstable reader
/// waiting on a disk read as part of its `fill_buffer()` call.
/// This class is an RAII awaits marker meant to be used by keeping it alive
/// until said awaited event completes.
class reader_permit::awaits_guard {
reader_permit_opt _permit;
public:
explicit awaits_guard(reader_permit permit) noexcept : _permit(std::move(permit)) {
_permit->mark_awaits();
}
awaits_guard(awaits_guard&&) noexcept = default;
awaits_guard(const awaits_guard&) = delete;
~awaits_guard() {
if (_permit) {
_permit->mark_not_awaits();
}
}
awaits_guard& operator=(awaits_guard&&) = delete;
awaits_guard& operator=(const awaits_guard&) = delete;
};
template <typename Char>
temporary_buffer<Char> make_tracked_temporary_buffer(temporary_buffer<Char> buf, reader_permit::resource_units units) {
return temporary_buffer<Char>(buf.get_write(), buf.size(), make_object_deleter(buf.release(), std::move(units)));
}
inline temporary_buffer<char> make_new_tracked_temporary_buffer(size_t size, reader_permit& permit) {
auto buf = temporary_buffer<char>(size);
return temporary_buffer<char>(buf.get_write(), buf.size(), make_object_deleter(buf.release(), permit.consume_memory(size)));
}
file make_tracked_file(file f, reader_permit p);
class tracking_allocator_base {
reader_permit _permit;
protected:
tracking_allocator_base(reader_permit permit) noexcept : _permit(std::move(permit)) { }
void consume(size_t memory) {
_permit.consume(reader_resources::with_memory(memory));
}
void signal(size_t memory) {
_permit.signal(reader_resources::with_memory(memory));
}
};
template <typename T>
class tracking_allocator : public tracking_allocator_base {
public:
using value_type = T;
using propagate_on_container_move_assignment = std::true_type;
using is_always_equal = std::false_type;
private:
std::allocator<T> _alloc;
public:
tracking_allocator(reader_permit permit) noexcept : tracking_allocator_base(std::move(permit)) { }
T* allocate(size_t n) {
auto p = _alloc.allocate(n);
try {
consume(n * sizeof(T));
} catch (...) {
_alloc.deallocate(p, n);
throw;
}
return p;
}
void deallocate(T* p, size_t n) {
_alloc.deallocate(p, n);
if (n) {
signal(n * sizeof(T));
}
}
template <typename U>
friend bool operator==(const tracking_allocator<U>& a, const tracking_allocator<U>& b);
};
template <typename T>
bool operator==(const tracking_allocator<T>& a, const tracking_allocator<T>& b) {
return a._semaphore == b._semaphore;
}
template <> struct fmt::formatter<reader_permit::state> : fmt::formatter<string_view> {
auto format(reader_permit::state, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <> struct fmt::formatter<reader_resources> : fmt::formatter<string_view> {
auto format(const reader_resources&, fmt::format_context& ctx) const -> decltype(ctx.out());
};