-
Notifications
You must be signed in to change notification settings - Fork 6
/
buffer.py
252 lines (208 loc) · 9.97 KB
/
buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Andy Stewart
#
# Author: Andy Stewart <[email protected]>
# Maintainer: Andy Stewart <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from PyQt6.QtCore import QTimer
from core.utils import *
from core.webengine import BrowserBuffer
import json
import os
import subprocess
import threading
class AppBuffer(BrowserBuffer):
def __init__(self, buffer_id, url, arguments):
BrowserBuffer.__init__(self, buffer_id, url, arguments, False)
# Get free port.
self.port = get_free_port()
self.http_url = "http://127.0.0.1:{0}".format(get_free_port())
self.url = url
(self.terminal_font_size, self.terminal_font_family) = get_emacs_vars(["eaf-terminal-font-size", "eaf-terminal-font-family"])
arguments_dict = json.loads(arguments)
self.command = arguments_dict["command"]
self.start_directory = arguments_dict["directory"].rstrip('/')
self.current_directory = self.start_directory
self.executing_command = ""
self.index_file = "{0}/index.html".format(self.http_url)
self.server_js = os.path.join(os.path.dirname(__file__), "server.js")
self.buffer_widget.titleChanged.connect(self.change_title)
self.update_title(url)
http_thread = threading.Thread(target=self.run_http_server, args=())
http_thread.start()
self.search_term = ""
# Start server process.
args = ["node", self.server_js, str(self.port), self.start_directory, self.command]
self.background_process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False)
# The background process (server.js) might take time to start.
# If we open the terminal page before it's up and running, we'll get a
# "connection refused" error when connecting to the websocket port.
# User will have to reload the page to get a terminal.
# Adding this extra step seems to solve this timing problem.
try:
outs, errs = self.background_process.communicate(timeout=1)
except Exception:
print("Terminal: timed out when communicating with server.js.")
self.open_terminal_page()
QTimer.singleShot(250, self.focus_widget)
self.build_all_methods(self)
self.timer=QTimer()
self.timer.start(250)
self.timer.timeout.connect(self.checking_status)
def run_http_server(self):
from http.server import SimpleHTTPRequestHandler
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
# directory=os.path.dirname(__file__), This argument add in python3.7 after
super().__init__(*args, **kwargs)
def translate_path(self, path):
# On python3.7 before version, http server don't support setting directory
# default use the project path.
path = super().translate_path(path)
return os.path.dirname(__file__) + path[len(os.getcwd()):]
from socketserver import TCPServer
with TCPServer(("127.0.0.1", int(self.http_url.split(":")[-1])), Handler) as h:
h.serve_forever()
@PostGui()
def open_terminal_page(self):
theme = "dark" if self.dark_mode_is_enabled() else "light"
from urllib import request
with request.urlopen(self.index_file) as f:
html = f.read().decode("utf-8").replace("%{port}", str(self.port))\
.replace("%{http_url}", self.http_url)\
.replace("%{theme}", theme)\
.replace("%{terminal_font_size}", str(self.terminal_font_size))\
.replace("%{current_directory}", self.current_directory)\
.replace("%{terminal_font_family}", self.terminal_font_family)\
.replace("%{theme_background_color}", self.theme_background_color)\
.replace("%{theme_foreground_color}", self.theme_foreground_color)
self.buffer_widget.setHtml(html)
@interactive
def update_theme(self):
super().update_theme()
self.buffer_widget.eval_js_file(os.path.join(os.path.dirname(__file__), "dark_theme.js" if self.theme_mode == "dark" else "light_theme.js"))
self.buffer_widget.eval_js("theme.background = '{}'; theme.foreground = '{}'; term.setOption('theme', theme);".format(
self.theme_background_color, self.theme_foreground_color))
def checking_status(self):
changed_directory = str(self.buffer_widget.execute_js("title"))
changed_executing_command = str(self.buffer_widget.execute_js("executing_command"))
if len(changed_executing_command) > 30:
changed_executing_command = changed_executing_command[:30]
if changed_executing_command != self.executing_command and changed_executing_command != "":
self.change_title(changed_executing_command)
self.executing_command = changed_executing_command
elif changed_executing_command == "" and self.executing_command != "" or not changed_directory == self.current_directory:
self.update_title(changed_directory)
if not changed_directory == self.current_directory:
eval_in_emacs('eaf--change-default-directory', [self.buffer_id, changed_directory])
self.current_directory = changed_directory
if self.executing_command != "":
self.executing_command = ""
if subprocess.Popen.poll(self.background_process) is not None:
self.destroy_buffer()
def destroy_buffer(self):
self.close_buffer()
self.timer.stop()
@interactive
def copy_text(self):
text = self.buffer_widget.execute_js("get_selection();")
if text == "":
message_to_emacs("Nothing selected")
else:
self.set_clipboard_text(text)
message_to_emacs("Copy text")
@interactive
def yank_text(self):
text = self.get_clipboard_text()
from core.utils import string_to_base64
self.buffer_widget.eval_js_function("paste", string_to_base64(text))
@interactive
def scroll_other_buffer(self, scroll_direction, scroll_type):
if scroll_type == "page":
if scroll_direction == "up":
self.scroll_up_page()
else:
self.scroll_down_page()
else:
if scroll_direction == "up":
self.scroll_up()
else:
self.scroll_down()
@interactive
def scroll_up(self):
self.buffer_widget.eval_js_function("scroll_line", 1)
@interactive
def scroll_down(self):
self.buffer_widget.eval_js_function("scroll_line", -1)
@interactive
def scroll_up_page(self):
row_number = get_emacs_func_result("eaf-terminal-get-row-number", [])
self.buffer_widget.eval_js_function("scroll_line", row_number)
@interactive
def scroll_down_page(self):
row_number = get_emacs_func_result("eaf-terminal-get-row-number", [])
self.buffer_widget.eval_js_function("scroll_line", -row_number)
@interactive
def scroll_to_begin(self):
self.buffer_widget.eval_js_function("scroll_to_begin")
@interactive
def scroll_to_bottom(self):
self.buffer_widget.eval_js_function("scroll_to_bottom")
def select_all(self):
self.buffer_widget.eval_js_function("select_all")
def clear_selection(self):
self.buffer_widget.eval_js_function("clear_selection")
def clear(self):
self.buffer_widget.eval_js_function("clear")
def _search_text(self, text, is_backward = False):
if self.search_term != text:
self.search_term = text
if is_backward:
# self.web_page.findText(self.search_term, self.web_page.FindBackward)
self.buffer_widget.eval_js_function("find_next", text)
else:
# self.web_page.findText(self.search_term)
self.buffer_widget.eval_js_function("find_prev", text)
@interactive
def search_text_forward(self):
if self.search_term == "":
self.send_input_message("Forward Search Text: ", "search_text_forward")
else:
self._search_text(self.search_term)
@interactive
def search_text_backward(self):
if self.search_term == "":
self.send_input_message("Backward Search Text: ", "search_text_backward")
else:
self._search_text(self.search_term, True)
@interactive
def action_quit(self):
if self.search_term != "":
self._search_text("")
def handle_input_response(self, callback_tag, result_content):
if callback_tag == "search_text_forward":
self._search_text(str(result_content))
elif callback_tag == "search_text_backward":
self._search_text(str(result_content), True)
def dark_mode_is_enabled(self):
''' Return bool of whether dark mode is enabled.'''
return get_app_dark_mode("eaf-terminal-dark-mode")
def update_title(self, url):
self.change_title("Term [{}]".format(os.path.sep.join(list(filter(lambda x: x != '', url.split(os.path.sep)))[-2:])))