-
Notifications
You must be signed in to change notification settings - Fork 4
/
rtdealer.pas
99 lines (85 loc) · 2.4 KB
/
rtdealer.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
{
ROUTER-to-DEALER example
@author cpicanco <[email protected]>
}
program rtdealer;
{$mode objfpc}{$H+}{$COPERATORS ON}
uses
{$IFDEF UNIX}cthreads, cmem{$ENDIF}, SysUtils,
zmq, zmq.helpers;
function worker_task(args : Pointer):PtrInt;
var
context, worker: Pointer;
workload : string;
total: Integer;
begin
context := zmq_ctx_new;
worker := zmq_socket(context, ZMQ_DEALER);
{$IFDEF Win32}
s_set_id(worker, PrtInt(args));
{$ELSE}
s_set_id(worker); // Set a printable identity.
{$ENDIF}
zmq_connect(worker, 'tcp://localhost:5671');
total := 0;
repeat // WriteLn(ThreadID);
// Tell the broker we're ready for work
s_sendmore(worker, '');
s_send(worker, 'Hi Boss');
// Get workload from broker, until finished
s_recv(worker);
workload := s_recv(worker);
if workload = 'Fired!' then
begin
WriteLn(Format('Completed: %d tasks', [total]));
break;
end;
workload := '';
total += 1;
// Do some random work
Sleep(randof(500) + 1);
until False;
zmq_close(worker);
zmq_ctx_destroy(context);
Result := PtrInt(0);
end;
// While this example runs in a single process, that is only to make
// it easier to start and stop the example. Each thread has its own
// context and conceptually acts as a separate process.
const NBR_WORKERS = 10;
var
context, broker: Pointer;
worker_nbr, workers_fired: Integer;
identity : string;
end_time: LongWord;
begin
context := zmq_ctx_new;
broker := zmq_socket(context, ZMQ_ROUTER);
zmq_bind(broker, 'tcp://*:5671');
Randomize;
for worker_nbr := 0 to NBR_WORKERS -1 do
BeginThread(@worker_task, @worker_nbr);
// Run for five seconds and then tell workers to end
end_time := GetTickCount64 + 5000;
workers_fired := 0;
repeat
// Next message gives us least recently used worker
identity := s_recv(broker);
s_sendmore(broker, identity);
identity := '';
s_recv(broker); // Envelope delimiter
s_recv(broker); // Response from worker
s_sendmore(broker, '');
// Encourage workers until it's time to fire them
if (GetTickCount64 < end_time) then
s_send(broker, 'Work harder')
else
begin
s_send(broker, 'Fired!');
workers_fired += 1;
if workers_fired = NBR_WORKERS then break;
end;
until False;
zmq_close(broker);
zmq_ctx_destroy(context);
end.