-
Notifications
You must be signed in to change notification settings - Fork 0
/
PS5_problem4.cpp
166 lines (121 loc) · 5.71 KB
/
PS5_problem4.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#include <iostream>
// Assumes that the input and output arrays are created
// outside of the function. Note that the output lock array is assumed to be
// initialized to 1 (this allows for a mutex)
#include <thread>
#include <stdlib.h>
#define MAX_VALUE 1023
#define NUM_THREADS 1//100
#define INPUT_ARRAY_ELEMENTS (1024*1024*512) // 512 million entries
using namespace std;
struct thread_data{
int input_array_size;
int *input_array;
int *output_array;
int *output_lock_array;
int thread_num;
pthread_cond_t cv;
pthread_mutex_t mutex;
//this gets passed around through functions, so must be a pointer!
int *flag;
};
static struct thread_data thread_data_array[NUM_THREADS];
void P(int* output_lock_array_ptr, int thread_num){
cout << "P("
<< thread_num
<< "): data @ thread_data_array[" << *output_lock_array_ptr
<< "] flag=" << thread_data_array[*output_lock_array_ptr].flag
<< endl;
//lock this thread in preparation for exclusive access
pthread_mutex_lock(&thread_data_array[*output_lock_array_ptr].mutex);
cout << "P(" << thread_num << ") locked." << endl;
//note that while loop is only entered if this part of the array is unavailable
//check that this coordinate in the output array is not currently being accessed
while (*(thread_data_array[*output_lock_array_ptr].flag) == 0) {
/* When the current thread executes this
* pthread_cond_wait() statement, the
* current thread will be blocked on s->cv
* and (atomically) unlocks s->mutex !!!
* Unlocking s->mutex will let other thread
* in to test s->flag.... */
pthread_cond_wait(&(thread_data_array[*output_lock_array_ptr].cv),
&thread_data_array[*output_lock_array_ptr].mutex);
//wait until the lock has been released by some other thread's V() call
//cout << (*output_lock_array_ptr) << " waiting..." << endl;
}
/* This will cause all other threads
* that executes a P() call to wait
* in the (above) while-loop !!! */
*(thread_data_array[*output_lock_array_ptr].flag) = 0;
cout << "P(" << thread_num << ") flag set to 0." << endl;
//release lock in preparation for access
pthread_mutex_unlock(&thread_data_array[*output_lock_array_ptr].mutex);
cout << "P(" << thread_num << ") unlocked." << endl;
}
void V(int* output_lock_array_ptr) {
cout << "V(" << *output_lock_array_ptr << ")" << endl;
//lock this thread in preparation for exclusive access
pthread_mutex_lock(&thread_data_array[*output_lock_array_ptr].mutex);
/* This call may restart some thread that
* was blocked on s->cv (in the P() call)
* if there was not thread blocked on
* cv, this operation does absolutely
* nothing... */
pthread_cond_signal(&thread_data_array[*output_lock_array_ptr].cv);
/* Update semaphore state to Up */
*(thread_data_array[*output_lock_array_ptr].flag) = 1;
//release lock in preparation for access
pthread_mutex_unlock(&thread_data_array[*output_lock_array_ptr].mutex);
}
void count_all(int input_array_size, int *input_array, int *output_array, int *output_lock_array){
int counter;
for(counter = 0; counter < input_array_size; counter++) {
assert(input_array[counter] <= MAX_VALUE);
assert(input_array[counter] >= 0);
//retrieve an exclusive lock to this cord in output_array
//waiting on other threads, if necessary
cout << "processing entry #" << counter << ", with value=" << input_array[counter] << endl;
int thread_data_pos = output_lock_array[counter];
cout << "Thread Data:"
<< "Thread #" << thread_data_array[thread_data_pos].thread_num
<< endl;
//change flag state
P(&output_lock_array[counter], thread_data_array[thread_data_pos].thread_num);
//thread-safe work
output_array[input_array[counter]]++;
//release exclusive lock
V(&output_lock_array[counter]);
}
}
void *function_starter(void *thread_args) {
thread_data *temp_data = static_cast<struct thread_data*>(thread_args);
count_all(temp_data->input_array_size, temp_data->input_array, temp_data->output_array, temp_data->output_lock_array);
pthread_exit(nullptr);
}
int main(int argc, char *argv[]){
int counter;
pthread_t threads[NUM_THREADS];
int *input_array = static_cast<int*>(malloc(INPUT_ARRAY_ELEMENTS*sizeof(int)));
// this would be a good place to read a file into input_array
int *output_array = static_cast<int*>(malloc((MAX_VALUE + 1) * sizeof(int)));
int *output_lock_array = static_cast<int*>(malloc((MAX_VALUE + 1) * sizeof(int)));
for(counter = 0; counter <= MAX_VALUE; counter++){
output_array[counter] = 0;
output_lock_array[counter] = 1;
}
for(counter = 0; counter < NUM_THREADS; counter++){
thread_data_array[counter].input_array_size = INPUT_ARRAY_ELEMENTS / NUM_THREADS;
thread_data_array[counter].input_array = &input_array[INPUT_ARRAY_ELEMENTS / NUM_THREADS * counter];
thread_data_array[counter].output_array = output_array;
thread_data_array[counter].output_lock_array = output_lock_array;
thread_data_array[counter].thread_num = counter;
thread_data_array[counter].flag = new int(1); //initialize to all access.
pthread_mutex_unlock(&thread_data_array[counter].mutex);
pthread_create(&threads[counter], nullptr, function_starter, static_cast<void*>(&(thread_data_array[counter])));
}
for(counter = 0; counter < NUM_THREADS; counter++){
void *dummy;
pthread_join(threads[counter], &dummy);
}
cout << "All Processes Successfully Completed!" << endl;
}