-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrpc_server.py
141 lines (119 loc) · 4.21 KB
/
grpc_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Bridge gRPC Server.
This program is free software: you can redistribute it under the terms
of the GNU General Public License, v. 3.0. If a copy of the GNU General
Public License was not distributed with this file, see <https://www.gnu.org/licenses/>.
"""
import os
from datetime import datetime
from concurrent import futures
import grpc
from grpc_interceptor import ServerInterceptor
import bridge_pb2_grpc
from utils import get_logger, get_env_var
from bridge_grpc_service import BridgeService
logger = get_logger("bridge.grpc.server")
class LoggingInterceptor(ServerInterceptor):
"""
gRPC server interceptor for logging requests.
"""
def __init__(self):
"""
Initialize the LoggingInterceptor.
"""
self.logger = logger
self.server_protocol = "HTTP/2.0"
def intercept(self, method, request_or_iterator, context, method_name):
"""
Intercept method called for each incoming RPC.
"""
response = method(request_or_iterator, context)
if context.details():
self.logger.error(
'%s - - [%s] "%s %s" %s -',
context.peer(),
datetime.now().strftime("%B %d, %Y %H:%M:%S"),
method_name,
self.server_protocol,
str(context.code()).split(".")[1],
)
else:
self.logger.info(
'%s - - [%s] "%s %s" %s -',
context.peer(),
datetime.now().strftime("%B %d, %Y %H:%M:%S"),
method_name,
self.server_protocol,
"OK",
)
return response
def serve():
"""
Starts the gRPC server and listens for requests.
"""
mode = get_env_var("MODE", "development")
server_certificate = get_env_var("SSL_CERTIFICATE_FILE")
private_key = get_env_var("SSL_CERTIFICATE_KEY_FILE")
hostname = get_env_var("GRPC_HOST")
secure_port = get_env_var("GRPC_SSL_PORT")
port = get_env_var("GRPC_PORT")
num_cpu_cores = os.cpu_count()
max_workers = 10
logger.info("Starting server in %s mode...", mode)
logger.info("Hostname: %s", hostname)
logger.info("Insecure port: %s", port)
logger.info("Secure port: %s", secure_port)
logger.info("Logical CPU cores available: %s", num_cpu_cores)
logger.info("gRPC server max workers: %s", max_workers)
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=max_workers),
interceptors=[LoggingInterceptor()],
)
bridge_pb2_grpc.add_EntityServiceServicer_to_server(BridgeService(), server)
if mode == "production":
try:
with open(server_certificate, "rb") as f:
server_certificate_data = f.read()
with open(private_key, "rb") as f:
private_key_data = f.read()
server_credentials = grpc.ssl_server_credentials(
((private_key_data, server_certificate_data),)
)
server.add_secure_port(f"{hostname}:{secure_port}", server_credentials)
logger.info(
"TLS is enabled: The server is securely running at %s:%s",
hostname,
secure_port,
)
except FileNotFoundError as e:
logger.critical(
(
"Unable to start server: TLS certificate or key file not found: %s. "
"Please check your configuration."
),
e,
)
raise
except Exception as e:
logger.critical(
(
"Unable to start server: Error loading TLS credentials: %s. ",
"Please check your configuration.",
),
e,
)
raise
else:
server.add_insecure_port(f"{hostname}:{port}")
logger.warning(
"The server is running in insecure mode at %s:%s", hostname, port
)
server.start()
try:
server.wait_for_termination()
except KeyboardInterrupt:
logger.info("Shutting down the server...")
server.stop(0)
logger.info("The server has stopped successfully")
if __name__ == "__main__":
serve()