diff --git a/.gitignore b/.gitignore index 4d594143..a42b10a6 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ C/include Cpp/**/include Cpp/**/share Cpp/**/lib +examples/Python/include # Created by https://www.toptal.com/developers/gitignore/api/intellij,gradle,eclipse,maven,visualstudiocode # Edit at https://www.toptal.com/developers/gitignore?templates=intellij,gradle,eclipse,maven,visualstudiocode diff --git a/examples/Python/src/WebServer/README.md b/examples/Python/src/WebServer/README.md new file mode 100644 index 00000000..446545ba --- /dev/null +++ b/examples/Python/src/WebServer/README.md @@ -0,0 +1,113 @@ +# Web Server + +This example shows how to create an HTTP web server backend in Lingua Franca python target. + +## Application + +In this example, we will build a distributed logging service with two replicated databases, each database with an HTTP web server that handles add log and get log requests from frontend. The HTTP web server backend is `logging.lf`, and the frontend is `logging.html`. Valid requests are the following three kinds: + +- Add log: adds a log to the distributed database. The add log request is broadcast to all database replicas. +- Get log: get all historical logs from a single database. This returns without waiting for consistency, so the logs could be out of order and inconsistent with each other. +- Get log consistent: get consistent historical logs from a single database. This request will respond slower but with consistency, meaning requests to different replicas will return identical logs if the requests have the same timestamp. + +## HTTP Server + +Building an HTTP server in Lingua Franca python target is a nontrivial task for several reasons: + +- The HTTP server in python is a blocking operation that prevents a reaction from finishing. +- Typical python web frameworks use a decorator style and require you to return the response in the handler function, but to utilize the full potential of Lingua Franca, we often need to implement logic in different reactions. + +To tackle the issues above, we can: + +- Start the HTTP server in a separate thread, so it doesn't block the execution of reactions. Handlers act as external triggers to the Lingua Franca program. +- The `WebServer` reactor has a state `events` that is a dictionary of `event_id`->[asyncio event](https://docs.python.org/3/library/asyncio.html) +- The handler will add an event to the state `events` when a request comes in, trigger an action in Lingua Franca, and complete the request when the event is unblocked. + +## Minimal +1 Example + +First, let's build a minimal web server that adds one to the number in the request. The backend is in `minimal.lf`, and frontend is `minimal.html`. + +The handler is as follows: + +```python +@self.app.post("/addone") +async def addone(request: Request): + event = asyncio.Event() + request_id = str(uuid.uuid4()) + self.events[request_id] = event + num = int((await request.json())["data"]) + addone_action.schedule(0, [request_id, num]) + await event.wait() + num = self.events[request_id] + del self.events[request_id] + return {"status": "success", "num": num} +``` + +`self` here refers to the `WebServer` reactor in which the handler is defined. The `self.app` is an instance of `FastAPI` application instance, defined as a state of the `WebServer` reactor. Import statements are in the `preamble` and not shown here for simplicity. This handler function will be triggered to generate a response to an HTTP `POST` request at the `/addone` path. + +And the reaction to the action is + +```python +reaction(addone_action){= + request_id, num = addone_action.value + event = self.events[request_id] + self.events[request_id] = num + 1 + event.set() +=} +``` + +When a request is processed by a handler, a response is generated in the following steps: + +1. Create a python async io event and add it to `self.events`. +2. Trigger a physical action in Lingua Franca to process the request. +3. Block the handler until the event is unblocked by another reaction. +4. When the action has been processed, another reaction unblocks the asyncio event. +5. The handler can now continue to execute and respond to the web request. + +## Minimal +1 Example with WebServer Library + +We can also build the +1 example with the prebuilt `WebServer` library at `../lib/WebServer.lf` that modularizes the web server. You only have to implement the following code to accomplish the same functionality, as demonstrated in `minimal_with_lib.lf`: + +```python +target Python { + coordination: decentralized +} + +import WebServer from "../lib/WebServer.lf" + +reactor Handler { + input request + output response + + reaction(request) -> response {= + request_id, req_data = request.value + num = int(req_data["data"]) + num += 1 + resp = {"status": "success", "num": num} + response.set([request_id, resp]) + =} +} + +federated reactor { + server = new WebServer(path="/addone") + handler = new Handler() + server.request -> handler.request + handler.response ~> server.response +} + +``` + +Note that the `request_id` has to be sent to and from the `Handler` reactor so that the `WebServer` knows which request to respond to. Also, notice that the response is connected with a physical connection `~>`, this is because these connections carry no timing semantics -- they simply carry the data to be sent back to the frontend as a response and need to be executed as soon as possible. This also prevents an STP violation from being triggered. + +## Distributed Logging + +![logging](logging.svg) + +To implement our distributed logging application, we need to respond to three distinct operations, but the reusable `WebServer` reactor has only one API path. We can solve this by introducing a new `Router` reactor and [composing reactors](https://www.lf-lang.org/docs/writing-reactors/composing-reactors), as shown in the diagram above. Each HTTP request body now carries an additional `operation` field that allows the router to route the request to different reactions through connections. + +Now we can implement a distributed logging system by instantiating several `WebServer` reactors on different network ports and adding two `Database` reactors for each `WebServer`. + +* One `Database` reactor has an STA offset of 0 and is connected by physical connections. This will prioritize availability, generating a quick response that is not (necessarily) consistent. +* Another `Database` reactor has an STA offset of 3s (this can be changed) and is connected by logical connections. This will guarantee that the logs in this `Database` reactor will be consistent as long as out-of-order messages arrive within 3s. + +Note that this is implemented with banks and multiports. When sending logs, we want the `WebServer` to send logs to all `Database` reactors, so the `newlog` connection is implemented with broadcasts; but when getting logs, we want to know the log state of the single corresponding `Database` reactor, hence there is no broadcast here. In the last line, `db.sendlogs, dbc.sendlogs ~> interleaved(server.response)` uses interleaved connections because each `WebServer` corresponds to two `Database`, one consistent and one not, and we need to avoid having a `WebServer` connecting to two inconsistent databases and another connecting to two consistent databases. \ No newline at end of file diff --git a/examples/Python/src/WebServer/logging.html b/examples/Python/src/WebServer/logging.html new file mode 100644 index 00000000..2617992e --- /dev/null +++ b/examples/Python/src/WebServer/logging.html @@ -0,0 +1,137 @@ + + + + + + Send and Get Logs + + + +

Send and Get Logs

+ + +

+ +
+ + +

+ + + + +

Stored Logs:

+
+ + + + diff --git a/examples/Python/src/WebServer/logging.lf b/examples/Python/src/WebServer/logging.lf new file mode 100644 index 00000000..638fec7f --- /dev/null +++ b/examples/Python/src/WebServer/logging.lf @@ -0,0 +1,79 @@ +target Python { + coordination: decentralized +} + +import WebServer from "../lib/WebServer.lf" + +reactor Router { + input request + output newlog + output getlog + output getlog_consistent + + reaction(request) -> newlog, getlog, getlog_consistent {= + # print(f"Router received request: {request.value}") + request_id, req_data = request.value + if req_data["operation"] == "newlog" and "log" in req_data.keys(): + newlog.set([request_id, req_data["log"]]) + elif req_data["operation"] == "getlog": + getlog.set(request_id) + elif req_data["operation"] == "getlog_consistent": + getlog_consistent.set(request_id) + else: + print("Invalid Request") + return + =} +} + +reactor WebServerRouter(bank_index=0, STA=0) { + output newlog + output getlog + output getlog_consistent + webserver = new WebServer(port = {= 5000+self.bank_index =}, path="/") + router = new Router() + webserver.request -> router.request + router.newlog -> newlog + router.getlog -> getlog + router.getlog_consistent -> getlog_consistent + input[2] response + + reaction(response) -> webserver.response {= + for port in response: + if port.is_present: + webserver.response.set(port.value) + =} +} + +reactor Database(bank_index=0, portwidth=2, STA = 0 s) { + state logs = [] + input[portwidth] addlog + input getlog + output sendlogs + + reaction(startup) {= + self.logs = [] + =} + + reaction(addlog) -> sendlogs {= + for i, port in enumerate(addlog): + if port.is_present: + request_id, log_message = port.value + self.logs.append(log_message) + =} + + reaction(getlog) -> sendlogs {= + sendlogs.set([getlog.value, {"status": "success", "logs": self.logs}]) + =} +} + +federated reactor(ReplicaCount=2) { + server = new[ReplicaCount] WebServerRouter() + db = new[ReplicaCount] Database(portwidth=ReplicaCount) + (server.newlog)+ ~> db.addlog + server.getlog ~> db.getlog + + dbc = new[ReplicaCount] Database(portwidth=ReplicaCount, STA = 3 s) + (server.newlog)+ -> dbc.addlog + server.getlog_consistent -> dbc.getlog + db.sendlogs, dbc.sendlogs ~> interleaved(server.response) +} diff --git a/examples/Python/src/WebServer/logging.svg b/examples/Python/src/WebServer/logging.svg new file mode 100644 index 00000000..bb661b6b --- /dev/null +++ b/examples/Python/src/WebServer/logging.svg @@ -0,0 +1 @@ +loggingWebServerRouterWebServerresponserequestRouterrequestnewloggetloggetlog_consistent2responsenewloggetloggetlog_consistentDatabase2addloggetlogsendlogsDatabase2addloggetlogsendlogs$$$$ \ No newline at end of file diff --git a/examples/Python/src/WebServer/minimal.html b/examples/Python/src/WebServer/minimal.html new file mode 100644 index 00000000..908bad09 --- /dev/null +++ b/examples/Python/src/WebServer/minimal.html @@ -0,0 +1,44 @@ + + + + + + API Request + + + +

Send Request to API

+
+ + +
+ + +

Result:

+

+  
+
diff --git a/examples/Python/src/WebServer/minimal.lf b/examples/Python/src/WebServer/minimal.lf
new file mode 100644
index 00000000..9b838e3c
--- /dev/null
+++ b/examples/Python/src/WebServer/minimal.lf
@@ -0,0 +1,58 @@
+target Python {
+  keepalive: true
+}
+
+preamble {=
+  from fastapi import FastAPI, Request, HTTPException
+  from fastapi.middleware.cors import CORSMiddleware
+  import threading
+  import uvicorn
+  import asyncio
+  import uuid
+=}
+
+reactor WebServer(bank_index=0, STA=0) {
+  state app
+  state events
+  physical action addone_action
+
+  reaction(startup) -> addone_action {=
+    self.events = {}
+    self.app = FastAPI()
+    self.app.add_middleware(
+        CORSMiddleware,
+        allow_origins=["*"],
+        allow_credentials=True,
+        allow_methods=["*"],
+        allow_headers=["*"],
+    )
+    @self.app.post("/addone")
+    async def addone(request: Request):
+        event = asyncio.Event()
+        request_id = str(uuid.uuid4())
+        self.events[request_id] = event
+        num = int((await request.json())["data"])
+        addone_action.schedule(0, [request_id, num])
+        await event.wait()
+        num = self.events[request_id]
+        del self.events[request_id]
+        return {"status": "success", "num": num}
+
+    def run_fastapi_app():
+        print(f"[WebServer{self.bank_index}] FastAPI server starting")
+        uvicorn.run(self.app, host="127.0.0.1", port=5000+self.bank_index, log_level="warning")
+    fastapi_thread = threading.Thread(target=run_fastapi_app)
+    fastapi_thread.start()
+  =}
+
+  reaction(addone_action) {=
+    request_id, num = addone_action.value
+    event = self.events[request_id]
+    self.events[request_id] = num + 1
+    event.set()
+  =}
+}
+
+main reactor {
+  server = new WebServer()
+}
diff --git a/examples/Python/src/WebServer/minimal_with_lib.lf b/examples/Python/src/WebServer/minimal_with_lib.lf
new file mode 100644
index 00000000..f5da0f75
--- /dev/null
+++ b/examples/Python/src/WebServer/minimal_with_lib.lf
@@ -0,0 +1,25 @@
+target Python {
+  keepalive: true
+}
+
+import WebServer from "../lib/WebServer.lf"
+
+reactor Handler {
+  input request
+  output response
+
+  reaction(request) -> response {=
+    request_id, req_data = request.value
+    num = int(req_data["data"])
+    num += 1
+    resp = {"status": "success", "num": num}
+    response.set([request_id, resp])
+  =}
+}
+
+main reactor {
+  server = new WebServer(path="/addone")
+  handler = new Handler()
+  server.request -> handler.request
+  handler.response ~> server.response
+}
diff --git a/examples/Python/src/lib/WebServer.lf b/examples/Python/src/lib/WebServer.lf
new file mode 100644
index 00000000..79e962be
--- /dev/null
+++ b/examples/Python/src/lib/WebServer.lf
@@ -0,0 +1,88 @@
+/**
+ * @file
+ * @author Shulu Li
+ * @brief Reactor for handling HTTP requests in Python.
+ */
+target Python {
+  keepalive: true
+}
+
+/**
+ * @brief A reactor that starts a FastAPI server to handle HTTP requests.
+ *
+ * The `port` parameter is the port number on which the server listens for requests.
+ *
+ * The `path` parameter is the path at which the server listens for HTTP POST requests.
+ *
+ * The `request` output is a list of two values: the request ID and the request data. If the request
+ * header indicates that the body is a JSON object, the request data is parsed into a python
+ * dictionary. Otherwise, the request body is forwarded as-is.
+ *
+ * The `response` input is a list of two values: the request ID and the response data. Request ID is
+ * required to respond to the correct request. Use a physical connection to connect the `response`
+ * input to avoid STP violations.
+ *
+ * To use this reactor, you must install the fastapi and uvicorn libraries for Python. You can do
+ * this with `pip install fastapi uvicorn`.
+ */
+reactor WebServer(port=5000, path="/") {
+  state app
+  state events
+  physical action phy_action
+  output request
+  input response
+
+  reaction(startup) -> phy_action {=
+    from fastapi import FastAPI, Request, HTTPException
+    from fastapi.middleware.cors import CORSMiddleware
+    import threading
+    import uvicorn
+    import asyncio
+    import uuid
+    self.events = {}
+    assert isinstance(self.path, str), "The path is not a string"
+    assert self.path.startswith("/"), "The path must start with a /"
+    self.app = FastAPI()
+    self.app.add_middleware(
+        CORSMiddleware,
+        allow_origins=["*"],
+        allow_credentials=True,
+        allow_methods=["*"],
+        allow_headers=["*"],
+    )
+    @self.app.post(self.path)
+    async def addone(request: Request):
+        event = asyncio.Event()
+        request_id = str(uuid.uuid4())
+        self.events[request_id] = event
+        if request.headers.get("content-type") == "application/json":
+            req_data = await request.json()
+        else:
+            req_data = await request.body()
+        phy_action.schedule(0, [request_id, req_data])
+        await event.wait()
+        resp_data = self.events[request_id]
+        del self.events[request_id]
+        return resp_data
+
+    def run_fastapi_app():
+        print(f"FastAPI server starting...")
+        uvicorn.run(self.app, host="127.0.0.1", port=self.port, log_level="warning")
+    fastapi_thread = threading.Thread(target=run_fastapi_app)
+    fastapi_thread.start()
+  =}
+
+  reaction(phy_action) -> request {=
+    request.set(phy_action.value)
+  =}
+
+  reaction(response) {=
+    request_id, resp_data = response.value
+    if request_id not in self.events:
+        print("Invalid Request ID")
+        return
+    event = self.events[request_id]
+    self.events[request_id] = resp_data
+    event.set()
+  =}
+}