-
Notifications
You must be signed in to change notification settings - Fork 0
/
coqtui_coqtop.py
132 lines (116 loc) · 3.62 KB
/
coqtui_coqtop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import os
import re
import select
import subprocess
import xml.etree.ElementTree as ET
import signal
from collections import deque
class EmptyLogger:
def log(self, message):
pass
class DebugLogger:
def log(self, message):
print message
# there's only 5 escape in xml. coqtop use , which is
# incorrect. Fix it.
def fix(cmd):
return cmd.replace(" ", ' ')
def ignore_sigint():
signal.signal(signal.SIGINT, signal.SIG_IGN)
class Cmd:
def __init__(self, tag):
self.tag = tag
def to_string(self):
xml = ET.Element('call', {'val': self.tag})
self.build_xml(xml)
return ET.tostring(xml, 'utf-8')
def build_xml(self, xml):
xml.extend([ET.Element('unit')])
def parse_response(self, value_xml, all_xml):
pass
class QueueCmd:
def __init__(self):
self.cmds = deque()
self.response = ''
def queue(self, cmd):
self.cmds.append(cmd)
def receive(self, data):
self.response += data
try:
elt = ET.fromstring('<coqtoproot>'
+ fix(self.response)
+ '</coqtoproot>')
for c in elt:
if c.tag == 'value':
self.cmds[0].parse_response(c, elt)
self.cmds.popleft()
self.response = ''
except ET.ParseError as e:
return
def is_empty(self):
return len(self.cmds) == 0
class CmdAbout(Cmd):
def __init__(self):
Cmd.__init__(self, 'About')
def build_xml(self, xml):
unit = ET.Element('unit')
xml.extend([unit])
def parse_response(self, value_xml, all_xml):
print [x.text for x in value_xml.findall('./coq_info/string')]
class CmdGoal(Cmd):
def __init__(self):
Cmd.__init__(self, 'Goal')
def build_xml(self, xml):
unit = ET.Element('unit')
xml.extend([unit])
def parse_response(self, value_xml, all_xml):
print "parse_response: " + ET.tostring(value_xml)
class Coqtop:
coqtop = None
current_cmd = None
recv_data = ''
def __init__(self, logger, args):
options = [ 'coqtop'
, '-ideslave'
, '-main-channel'
, 'stdfds'
, '-async-proofs'
, 'on'
]
# Exception should be catched outside
self.coqtop = subprocess.Popen(
options + list(args)
, stdin = subprocess.PIPE
, stdout = subprocess.PIPE
, stderr = None
, preexec_fn = ignore_sigint
)
self.logger = logger
if self.logger is None:
self.logger = EmptyLogger()
self.cmds = QueueCmd()
def queue_cmd(self, cmd):
string = cmd.to_string()
self.logger.log("---- SEND BEGIN ----\n" + string + "\n---- SEND END ----\n")
self.coqtop.stdin.write(string)
self.coqtop.stdin.flush()
self.cmds.queue(cmd)
def process(self):
if self.cmds.is_empty():
return
fd = self.coqtop.stdout.fileno()
if fd not in select.select([fd], [], [], 0)[0]:
return None
d = os.read(fd, 0x4000)
self.logger.log("---- RECV BEGIN ----\n" + d + "\n---- RECV END ----\n")
self.cmds.receive(d)
def is_idle(self):
return self.cmds.is_empty()
if __name__ == "__main__":
coqtop = Coqtop(DebugLogger(), [])
coqtop.queue_cmd(CmdAbout())
coqtop.queue_cmd(CmdGoal())
coqtop.queue_cmd(CmdGoal())
coqtop.queue_cmd(CmdAbout())
while not coqtop.is_idle():
coqtop.process()