forked from oven-sh/bun
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathunbounded_queue.zig
149 lines (123 loc) · 5.57 KB
/
unbounded_queue.zig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
const std = @import("std");
const os = std.os;
const mem = std.mem;
const meta = std.meta;
const atomic = std.atomic;
const builtin = std.builtin;
const testing = std.testing;
const assert = std.debug.assert;
const mpsc = @This();
pub const cache_line_length = switch (@import("builtin").target.cpu.arch) {
.x86_64, .aarch64, .powerpc64 => 128,
.arm, .mips, .mips64, .riscv64 => 32,
.s390x => 256,
else => 64,
};
pub fn UnboundedQueue(comptime T: type, comptime next_field: meta.FieldEnum(T)) type {
const next_name = meta.fieldInfo(T, next_field).name;
return struct {
const Self = @This();
pub const Batch = struct {
pub const Iterator = struct {
batch: Self.Batch,
pub fn next(self: *Self.Batch.Iterator) ?*T {
if (self.batch.count == 0) return null;
const front = self.batch.front orelse unreachable;
self.batch.front = @field(front, next_name);
self.batch.count -= 1;
return front;
}
};
front: ?*T = null,
last: ?*T = null,
count: usize = 0,
pub fn iterator(self: Self.Batch) Self.Batch.Iterator {
return .{ .batch = self };
}
};
const next = next_name;
pub const queue_padding_length = cache_line_length / 2;
back: ?*T align(queue_padding_length) = null,
count: usize = 0,
front: T align(queue_padding_length) = init: {
var stub: T = undefined;
@field(stub, next) = null;
break :init stub;
},
pub fn push(self: *Self, src: *T) void {
assert(@atomicRmw(usize, &self.count, .Add, 1, .Release) >= 0);
@field(src, next) = null;
const old_back = @atomicRmw(?*T, &self.back, .Xchg, src, .AcqRel) orelse &self.front;
@field(old_back, next) = src;
}
pub fn pushBatch(self: *Self, first: *T, last: *T, count: usize) void {
assert(@atomicRmw(usize, &self.count, .Add, count, .Release) >= 0);
@field(last, next) = null;
const old_back = @atomicRmw(?*T, &self.back, .Xchg, last, .AcqRel) orelse &self.front;
@field(old_back, next) = first;
}
pub fn pop(self: *Self) ?*T {
const first = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return null;
if (@atomicLoad(?*T, &@field(first, next), .Acquire)) |next_item| {
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
return first;
}
const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
if (first != last) return null;
@atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
return first;
}
var next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
while (next_item == null) : (atomic.spinLoopHint()) {
next_item = @atomicLoad(?*T, &@field(first, next), .Acquire);
}
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
assert(@atomicRmw(usize, &self.count, .Sub, 1, .Monotonic) >= 1);
return first;
}
pub fn popBatch(self: *Self) Self.Batch {
var batch: Self.Batch = .{};
var front = @atomicLoad(?*T, &@field(self.front, next), .Acquire) orelse return batch;
batch.front = front;
var next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
while (next_item) |next_node| : (next_item = @atomicLoad(?*T, &@field(next_node, next), .Acquire)) {
batch.count += 1;
batch.last = front;
front = next_node;
}
const last = @atomicLoad(?*T, &self.back, .Acquire) orelse &self.front;
if (front != last) {
@atomicStore(?*T, &@field(self.front, next), front, .Release);
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
return batch;
}
@atomicStore(?*T, &@field(self.front, next), null, .Monotonic);
if (@cmpxchgStrong(?*T, &self.back, last, &self.front, .AcqRel, .Acquire) == null) {
batch.count += 1;
batch.last = front;
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
return batch;
}
next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
while (next_item == null) : (atomic.spinLoopHint()) {
next_item = @atomicLoad(?*T, &@field(front, next), .Acquire);
}
batch.count += 1;
@atomicStore(?*T, &@field(self.front, next), next_item, .Monotonic);
batch.last = front;
assert(@atomicRmw(usize, &self.count, .Sub, batch.count, .Monotonic) >= batch.count);
return batch;
}
pub fn peek(self: *Self) usize {
const count = @atomicLoad(usize, &self.count, .Acquire);
assert(count >= 0);
return count;
}
pub fn isEmpty(self: *Self) bool {
return self.peek() == 0;
}
};
}