-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpygn_server.py
executable file
·246 lines (209 loc) · 7.64 KB
/
pygn_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#!/usr/bin/env python
#
# pygn_server
#
# driver for Emacs mode pygn-mode.el
#
# notes
#
# requires chess library
#
# documentation of the server protocol is at doc/server.md
#
# bugs
#
# todo
#
###
### version
###
__version__ = '0.6.3'
###
### imports
###
import sys
import argparse
import signal
import io
import re
import atexit
import shlex
import chess.pgn
import chess.svg
import chess.engine
###
### file-scoped variables
###
ENGINES = {}
###
### subroutines
###
def instantiate_engine(engine_path):
if not engine_path in ENGINES:
ENGINES[engine_path] = chess.engine.SimpleEngine.popen_uci(engine_path)
return ENGINES[engine_path]
def cleanup():
for e in ENGINES.values():
try:
e.quit()
except:
pass
def pgn_to_board_callback(_game,board,last_move,last_fen,args):
if args.board_format[0] == 'svg':
svg = chess.svg.board(board=board,
lastmove=last_move,
size=args.pixels[0],
flipped=args.flipped)
svg = re.sub(r'\r?\n', r' ', svg)
return f':board-svg {svg}'
elif args.board_format[0] == 'text':
text = board.unicode(borders=True)
text = re.sub(r'·', ' ', text)
text = re.sub(r'-----------------', '├───┼───┼───┼───┼───┼───┼───┼───┤', text)
text = re.sub(r'\A ├───┼───┼───┼───┼───┼───┼───┼───┤', ' ┌───┬───┬───┬───┬───┬───┬───┬───┐', text)
text = re.sub(r'├───┼───┼───┼───┼───┼───┼───┼───┤\n a', '└───┴───┴───┴───┴───┴───┴───┴───┘\n a', text)
text = re.sub( r'a b c d e f g h', ' a b c d e f g h', text)
text = re.sub(r'\|', ' │ ', text)
text = re.sub(r'^(\d) ', '\\1', text, flags=re.MULTILINE)
text = text.translate(str.maketrans('♖♘♗♕♔♙♜♞♝♛♚♟⭘','RNBQKPrnbqkp '))
text = re.sub(r'\n', '\\\\n', text)
return f':board-text {text}'
else:
print(f'Bad pgn-mode -board_format value: {args.board_format[0]}', file=sys.stderr)
return None
def pgn_to_fen_callback(_game,board,_last_move,_last_fen,_args):
return f':fen {board.fen()}'
def pgn_to_score_callback(_game,board,_last_move,_last_fen,args):
engine = instantiate_engine(args.engine[0])
uci_info = engine.analyse(board, chess.engine.Limit(depth=args.depth[0]))
return f':score {uci_info["score"]}'
def pgn_to_mainline_callback(game,_board,_last_move,_last_fen,_args):
clean_exporter = chess.pgn.StringExporter(columns=None,
headers=False,
variations=False,
comments=False)
mainline = game.accept(clean_exporter)
mainline = re.sub(r'\s+\S+\Z', '', mainline)
return f':san {mainline}'
# todo should all responses be in sexp form?
def pgn_to_last_move_info_callback(_game,_board,last_move,last_fen,_args):
return f':last-move-info (:fen "{shlex.quote(last_fen)}" :move-uci "{shlex.quote(last_move.uci())}")'
def listen():
"""
Listen for messages on stdin and send response data on stdout.
"""
argparser = generate_argparser()
while True:
input_str = sys.stdin.readline()
# TODO: test readline and empty-line handling on Windows
# Handle terminating characters and garbage.
if len(input_str) == 0:
# eof
cleanup()
break
if input_str == '\n':
continue
# Parse request.
match = re.compile(r'\A:version\s+(\S+)\s+(:\S+)(.*?)\s+--\s+(:\S+)\s+(\S.*)\n').search(input_str)
if not match:
print(f'Bad pgn-mode server request. Could not parse: {input_str}', file=sys.stderr)
continue
[req_version,
req_command,
req_options,
req_payload_type,
req_payload] = match.groups()
if not req_version == __version__:
print(f'Bad request: version mismatch: {req_version}', file=sys.stderr)
continue
# Command code for handling input.
if req_command not in CALLBACKS:
print(f'Bad request command (unknown): {req_command}', file=sys.stderr)
continue
# Options to modify operation of the command.
try:
args = argparser.parse_args(shlex.split(req_options))
except:
print(f'Bad request options: {req_options}', file=sys.stderr)
continue
# :payload-type is for future extensibility, currently always :pgn
if not req_payload_type == ':pgn':
print(f'Bad request :payload-type (unknown): {req_payload_type}', file=sys.stderr)
continue
# Build game board.
pgn = req_payload
pgn = re.sub(r'\\n', '\n', pgn)
pgn = pgn + '\n\n'
game = chess.pgn.read_game(io.StringIO(pgn))
board = game.board()
last_move = False
last_fen = board.fen()
for move in game.mainline_moves():
last_move = move
last_fen = board.fen()
board.push(move)
# Compute response.
response = CALLBACKS[req_command](game,board,last_move,last_fen,args)
# Send response to client.
if response:
print(f':version {__version__} {response}')
###
### argument processing
###
def generate_argparser():
argparser = argparse.ArgumentParser()
argparser.add_argument('-pixels', '--pixels',
metavar='PIXELS',
nargs=1,
type=int,
default=[400],
help='set pixel-per-side for the SVG board output. Default is 400.')
argparser.add_argument('-board_format', '--board_format',
nargs=1,
type=str,
default=['svg'],
help='format for board output. Default is "svg".')
argparser.add_argument('-engine', '--engine',
nargs=1,
type=str,
default=['stockfish'],
help='set path to UCI engine for analysis. Default is "stockfish".')
argparser.add_argument('-depth', '--depth',
nargs=1,
type=int,
default=[10],
help='set depth for depth-limited to UCI evaluations. Default is 10.')
argparser.add_argument('-flipped', '--flipped',
action='store_true',
help='display board flipped (Black perspective).')
return argparser
###
### main
###
if __name__ == '__main__':
try:
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
except:
pass
if len(sys.argv) > 1 and (sys.argv[1] == '-version' or sys.argv[1] == '--version'):
print(__version__)
sys.exit(0)
CALLBACKS = {
':pgn-to-fen': pgn_to_fen_callback,
':pgn-to-board': pgn_to_board_callback,
':pgn-to-score': pgn_to_score_callback,
':pgn-to-mainline': pgn_to_mainline_callback,
':pgn-to-last-move-info': pgn_to_last_move_info_callback,
}
atexit.register(cleanup)
print(f'Server started.')
listen()
#
# Emacs
#
# Local Variables:
# coding: utf-8
# End:
#
# LocalWords:
#