forked from guyskk/pybeautifier
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpybeautifier.py
164 lines (138 loc) · 4.35 KB
/
pybeautifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# coding: utf-8
import json
import logging
import os
import socket
import sys
import daemon
try:
from autopep8 import fix_code
def autopep8(x, ignore=None, max_line_length=79):
return fix_code(x, options={
'ignore': ignore,
'max_line_length': max_line_length
})
except:
autopep8 = None
try:
from yapf.yapflib.yapf_api import FormatCode
def yapf(x, style_config=None):
return FormatCode(x, style_config=style_config)[0]
except:
yapf = None
try:
from isort import SortImports
def isort(x, multi_line_output=None, line_length=79):
return SortImports(
file_contents=x,
multi_line_output=multi_line_output,
line_length=line_length,
include_trailing_comma=True,
balanced_wrapping=True,
).output
except:
isort = None
IS_DAEMON = len(sys.argv) > 1 and sys.argv[1] == '-d'
FORMATERS = {
'yapf': yapf,
'autopep8': autopep8,
'isort': isort
}
HOST = os.getenv("BEAUTIFIER_HOST", "127.0.0.1")
PORT = int(os.getenv("BEAUTIFIER_PORT", "36805"))
ADDRESS = (HOST, PORT)
def setup_logging():
format = '[%(levelname)-5s %(asctime)s] %(message)s'
if IS_DAEMON:
logging.basicConfig(filename='/tmp/pybeautifier.log', filemode='w',
format=format, level=logging.INFO)
else:
logging.basicConfig(format=format, level=logging.DEBUG)
logging.info('PID is %d' % os.getpid())
logging.info('Listening tcp://%s:%s' % ADDRESS)
def send(client, error, data):
response = {'error': error, 'data': data}
if error:
logging.error(error)
response = json.dumps(response, ensure_ascii=False)
logging.debug('response size: %s' % len(response))
client.sendall(response.encode('utf-8'))
def handle(client, request):
"""
Handle format request
request struct:
{
'data': 'data_need_format',
'formaters': [
{
'name': 'formater_name',
'config': {} # None or dict
},
... # formaters
]
}
if no formaters, use autopep8 formater and it's default config
"""
formaters = request.get('formaters', None)
if not formaters:
formaters = [{'name': 'autopep8'}]
logging.debug('formaters: ' + json.dumps(formaters, indent=4))
data = request.get('data', None)
if not isinstance(data, str):
return send(client, 'invalid data', None)
max_line_length = None
for formater in formaters:
max_line_length = formater.get('config', {}).get('max_line_length')
if max_line_length:
break
for formater in formaters:
name = formater.get('name', None)
config = formater.get('config', {})
if name not in FORMATERS:
return send(client, 'formater {} not support'.format(name), None)
formater = FORMATERS[name]
if formater is None:
return send(client, 'formater {} not installed'.format(name), None)
if name == 'isort' and max_line_length:
config.setdefault('line_length', max_line_length)
data = formater(data, **config)
return send(client, None, data)
def serve():
setup_logging()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# prevent port can’t be immediately reused
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server.bind(ADDRESS)
server.listen(1)
while True:
client, addr = server.accept()
try:
request = client.recv(1024 * 1024).decode('utf-8')
logging.debug('request size: %s' % len(request))
if request:
handle(client, json.loads(request))
except KeyboardInterrupt:
break
except Exception as ex:
try:
send(client, repr(ex), None)
except:
pass
logging.exception(ex)
finally:
client.close()
except KeyboardInterrupt:
pass
except Exception as ex:
logging.exception(ex)
finally:
server.close()
def main():
if IS_DAEMON:
with daemon.DaemonContext():
serve()
else:
serve()
if __name__ == '__main__':
main()