-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdistributions.py
198 lines (168 loc) · 6.51 KB
/
distributions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# copyright 2020 Andrzej Kaczmarczyk (andrzej >dot> kaczmarczyk <at> agh.edu.pl; a <dot> kaczmarczyk <at> tu-berlin.de)
# This file is part of mul-win-just-pub.
# mul-win-just-pub is licensed under the terms of MIT license
# see LICENSE.txt for the text of the lincense
import random
import math
import csv
class VotesGenerator(object):
def generate(self, candidates):
pass
def description(self):
pass
class ParameterizedGenerator(VotesGenerator):
def parameters(self):
pass
class MallowsModel(ParameterizedGenerator):
def __init__(self, inTargetCommitteeProbability, targetCommitteeSize):
self._inTargetCommProb = inTargetCommitteeProbability
self._targetCommSize = targetCommitteeSize
def generate(self, candidates, votersnr):
def genSingleVoter():
vote = []
for c in targetCommittee:
if random.random() < self._inTargetCommProb:
vote.append(c)
for c in set(candidates).difference(set(targetCommittee)):
if random.random() > self._inTargetCommProb:
vote.append(c)
return vote
targetCommittee = random.sample(candidates, self._targetCommSize)
votes = {}
for i in range(votersnr):
votes[i] = genSingleVoter()
return candidates, votes
def description(self):
return "Mallows Model, probability: {}, committee size: \
{}".format(self._inTargetCommProb, self._targetCommSize)
def parameters(self):
return (self._inTargetCommProb, self._targetCommSize)
class EqualChooseDistribution(ParameterizedGenerator):
def __init__(self, approvalProbability):
self._approvalProbability = approvalProbability
def generate(self, candidates, votersnr):
votes = {}
for i in range(votersnr):
vote = []
for c in candidates:
if random.random() < self._approvalProbability:
vote.append(c)
votes[i]=vote
return candidates, votes
def description(self):
return "Impartial, probability: {}".format(self._approvalProbability)
def parameters(self):
return (self._approvalRadius,)
class OneDDistribution(ParameterizedGenerator):
def __init__(self, approvalRadius):
self._approvalRadius = approvalRadius
def generate(self, candidates, votersnr):
votersPositions = []
candidatesPositions = []
for i in xrange(votersnr):
votersPositions.append(random.uniform(0,1))
for i in xrange(len(candidates)):
candidatesPositions.append(random.uniform(0,1))
votes = {}
for i in range(votersnr):
vote = []
maxApproved = votersPositions[i]+self._approvalRadius
minApproved = votersPositions[i]-self._approvalRadius
for c in xrange(len(candidates)):
if candidatesPositions[c] <= maxApproved and candidatesPositions[c] >= minApproved:
vote.append(c)
votes[i]=vote
return candidates, votes
def description(self):
return "1D, radius: {}".format(self._approvalRadius)
def parameters(self):
return (self._approvalRadius,)
class TwoDDistribution(ParameterizedGenerator):
def __init__(self, approvalRadius):
self._approvalRadius = approvalRadius
def generate(self, candidates, votersnr):
votersPositions = []
candidatesPositions = []
for i in xrange(votersnr):
votersPositions.append((random.uniform(0,1), random.uniform(0,1)))
for i in xrange(len(candidates)):
candidatesPositions.append((random.uniform(0,1), random.uniform(0,1)))
votes = {}
for i in range(votersnr):
vote = []
vx = votersPositions[i][0]
vy = votersPositions[i][1]
for c in xrange(len(candidates)):
cx = candidatesPositions[c][0]
cy = candidatesPositions[c][1]
if math.hypot(vx - cx, vy - cy) <= self._approvalRadius:
vote.append(c)
votes[i]=vote
return candidates, votes
def description(self):
return "2D, radius: {}".format(self._approvalRadius)
def parameters(self):
return (self._approvalRadius,)
class UrnModel(VotesGenerator):
def __init__(self, approvalProbability, numberOfReturned):
self._numberOfReturned = numberOfReturned
self._approvalProbability = approvalProbability
def generate(self, candidates, votersnr):
urn = list(candidates[:])
votes = {}
for i in range(votersnr):
voteCandidates = 0
for c in candidates:
if random.random() < self._approvalProbability:
voteCandidates = voteCandidates + 1
localUrn = urn[:]
vote = []
for _ in range(voteCandidates):
approvedCandidate = random.choice(localUrn)
vote.append(approvedCandidate)
localUrn = [c for c in localUrn if not c == approvedCandidate]
for c in vote:
urn = urn + ([c]*self._numberOfReturned)
votes[i] = vote
return candidates, votes
def description(self):
return "UrnModel, prob:{}, returned: {}".format(self._approvalProbability, self._numberOfReturned)
def parameters(self):
return (self._numberOfReturned, self._approvalProbability)
class PabulibElectionBasedDistribution(VotesGenerator):
def __init__(self, baseElectionPath):
self._baseElectionPath = baseElectionPath
def parameters(self):
return (None, None)
def generate(self, candidatesCount, votesCount):
"""Right now candidatesCount is ignored"""
candidates, allVotes = self._parsePabulibElections()
#if not candidatesCount == allCandidatesCount:
# raise ValueError("Right now one cannot generate pabulib elections"
# "with a different number of candidates that they originally have")
votes = {}
if len(allVotes) < votesCount:
raise ValueError(f"Too small elections to draw {votesCount} votes")
for vote in random.sample(allVotes, votesCount):
votes[len(votes)] = vote
return candidates, votes
def description(self):
return f"Pabulib based distribution from: {self._baseElectionPath}"
def _parsePabulibElections(self):
cand_name_id = {}
votes = []
with open(self._baseElectionPath, 'r', newline='', encoding="utf-8") as csvfile:
reader = csv.reader(csvfile, delimiter=';')
for row in reader:
if str(row[0]).strip().lower() in ["meta", "projects", "votes"]:
section = str(row[0]).strip().lower()
header = next(reader)
elif section == "meta":
pass
elif section == "projects":
cand_name_id[row[0]] = len(cand_name_id);
elif section == "votes":
vote_index = header.index("vote")
vote = row[vote_index].strip().split(",")
votes.append([cand_name_id[cand] for cand in vote])
return sorted(cand_name_id.values()), votes