diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 000000000..55c57104e --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,144 @@ +on: + pull_request: + push: + branches: main + +name: Continuous integration +env: + latest_version: "1.78.0" + +jobs: + test: + name: Test + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - windows-latest + - ubuntu-latest + - macos-latest + steps: + - uses: actions/checkout@v2 + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.latest_version }} + - uses: Swatinem/rust-cache@v2 + + - name: Install NASM for aws-lc-rs on Windows + if: runner.os == 'Windows' + uses: ilammy/setup-nasm@v1 + + - name: Install ninja-build tool for aws-lc-fips-sys on Windows + if: runner.os == 'Windows' + uses: seanmiddleditch/gha-setup-ninja@v5 + + - name: Install golang for aws-lc-fips-sys on macos + if: runner.os == 'MacOS' + uses: actions/setup-go@v5 + with: + go-version: "1.22.2" + + - run: cargo test --all-features --workspace --lib --tests --profile "ci" + + # Check step to ensure that all targets are valid as the test step doesn't run them. + check: + name: Check + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: + - windows-latest + - ubuntu-latest + - macos-latest + steps: + - uses: actions/checkout@v2 + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.latest_version }} + + - name: Install NASM for aws-lc-rs on Windows + if: runner.os == 'Windows' + uses: ilammy/setup-nasm@v1 + + - name: Install ninja-build tool for aws-lc-fips-sys on Windows + if: runner.os == 'Windows' + uses: seanmiddleditch/gha-setup-ninja@v5 + + - name: Install golang for aws-lc-fips-sys on macos + if: runner.os == 'MacOS' + uses: actions/setup-go@v5 + with: + go-version: "1.22.2" + + - uses: Swatinem/rust-cache@v2 + - run: cargo check --all-features --all-targets --workspace --lib --tests --profile "ci" + + docs: + name: Documentation + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.latest_version }} + - uses: Swatinem/rust-cache@v2 + - name: Build Documentation + run: cargo doc --lib --no-deps --all-features --workspace + env: + RUSTDOCFLAGS: --cfg docsrs -Dwarnings + + fmt: + name: Rustfmt + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.latest_version }} + components: rustfmt + - uses: Swatinem/rust-cache@v2 + - run: cargo fmt --all -- --check + + clippy: + name: Clippy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Install Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ env.latest_version }} + components: clippy + - uses: Swatinem/rust-cache@v2 + - run: cargo clippy --all-features --all-targets --workspace -- -D warnings + + code_coverage: + name: Code Coverage + runs-on: ubuntu-latest + container: + image: xd009642/tarpaulin:develop-nightly + options: --security-opt seccomp=unconfined + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Install Clang + # Required for rocksdb + run: apt-get update && apt-get install -y llvm llvm-dev clang + + - name: Set libclang path + run: echo "LIBCLANG_PATH=$(llvm-config --libdir)" >> $GITHUB_ENV + + - name: Generate code coverage + run: | + cargo tarpaulin + + - name: Upload to codecov.io + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + fail_ci_if_error: true diff --git a/.gitignore b/.gitignore index 8d40cd886..486fd5a19 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,7 @@ !/demos/**/Cargo.lock **/.DS_Store -*.iml \ No newline at end of file +*.iml + +# Code coverage files +*.profraw \ No newline at end of file diff --git a/.tarpaulin.toml b/.tarpaulin.toml new file mode 100644 index 000000000..904e5d5f2 --- /dev/null +++ b/.tarpaulin.toml @@ -0,0 +1,46 @@ +[coverage] +exclude = [ + "example-util", + "console", + "console-views", + "demand-lane", + "demand-map-lane", + "value-lane", + "map-lane", + "command-lane", + "value-store", + "map-store", + "supply-lane", + "value-lane-persistence", + "map-lane-persistence", + "value-store-persistence", + "map-store-persistence", + "event-downlink", + "value-downlink", + "map-downlink", + "local-downlink", + "http-lane", + "transit", + "transit-model", + "tutorial-app", + "tutorial-app-model", + "tutorial-app-generator", + "join_map", + "join_value", + "aggregations", + "time_series", + "swimos_form_derive", + "swimos_agent_derive", + "macro_utilities", + "example_client_2_2", + "example_server_2_2", + "example_client_2_3", + "example_server_2_3" +] +workspace = true +avoid-cfg-tarpaulin = true +profile = "ci" +ignore-tests = true +out = ["Xml"] +timeout = "600s" +all-features = true diff --git a/Cargo.toml b/Cargo.toml index 3f188fdb1..fc2b171e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,8 @@ members = [ "example_apps/join_value", "example_apps/aggregations", "example_apps/time_series", + "example_apps/devguide/2_2/*", + "example_apps/devguide/2_3/*", ] exclude = [ @@ -95,11 +97,11 @@ flate2 = "1.0.22" bitflags = "2.5" rocksdb = "0.22" integer-encoding = "4.0.0" -rustls = "0.20" +rustls = "0.23.10" webpki = "0.22" -webpki-roots = "0.22" -tokio-rustls = "0.23" -rustls-pemfile = "1.0.0" +webpki-roots = "0.26.3" +tokio-rustls = "0.26" +rustls-pemfile = "2.1.2" trust-dns-resolver = "0.23.2" clap = "4.1" crossbeam-queue = { version = "0.3" } @@ -119,7 +121,7 @@ regex = "1.3.6" fnv = "1.0.7" cursive = { default-features = false, version = "0.20" } duration-str = "0.11.2" -quick-xml = "0.32.0" +quick-xml = "0.34.0" csv = "1.2" serde-xml-rs = "0.6" axum = "0.6.20" diff --git a/azure-pipelines.yml b/azure-pipelines.yml deleted file mode 100644 index aaf77d5da..000000000 --- a/azure-pipelines.yml +++ /dev/null @@ -1,24 +0,0 @@ -trigger: [ "main" ] -pr: [ "main" ] - -variables: - RUSTFLAGS: -Dwarnings - RUST_VERSION: 1.78.0 - -stages: - - stage: Lint - jobs: - - template: ci/azure-lint.yml - parameters: - name: lint_crates - rust: $(RUST_VERSION) - displayName: All crates - - stage: Test - dependsOn: Lint - jobs: - - template: ci/azure-test-stable.yml - parameters: - name: test_crates - rust: $(RUST_VERSION) - displayName: All crates - cross: eq(variables['Build.SourceBranch'], 'refs/heads/main') diff --git a/ci/azure-code-coverage.yml b/ci/azure-code-coverage.yml deleted file mode 100644 index cd611ad21..000000000 --- a/ci/azure-code-coverage.yml +++ /dev/null @@ -1,55 +0,0 @@ -trigger: [ "main" ] -pr: [ "main" ] - -variables: - RUSTFLAGS: -Dwarnings - -jobs: - - job: Code_coverage - displayName: Code coverage - - pool: - vmImage: ubuntu-22.04 - - steps: - - template: azure-install-rust.yml - parameters: - rust_version: 1.78.0 - - template: azure-install-sccache.yml - - - script: cargo install cargo-tarpaulin - displayName: Install Tarpaulin - workingDirectory: $(Build.SourcesDirectory) - - - script: cargo tarpaulin --ignore-tests -o xml -t 600 --exclude-files demos form_derive agent_derive --avoid-cfg-tarpaulin --profile "ci" - displayName: Generate code coverage - workingDirectory: $(Build.SourcesDirectory) - - - bash: | - curl -Os https://uploader.codecov.io/latest/linux/codecov - curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM - curl -Os https://uploader.codecov.io/latest/linux/codecov.SHA256SUM.sig - displayName: Download the Codecov uploader and its keys - workingDirectory: $(Build.SourcesDirectory) - - - bash: | - echo $PUBLIC_KEY | base64 --decode | gpg --import - gpg --verify codecov.SHA256SUM.sig codecov.SHA256SUM - displayName: Verify the signature of the SHA256SUM - workingDirectory: $(Build.SourcesDirectory) - env: - PUBLIC_KEY: $(CODECOV_PUBLIC_KEY) - - - bash: | - shasum -a 256 -c codecov.SHA256SUM - displayName: Verify the integrity of the Codecov uploader - workingDirectory: $(Build.SourcesDirectory) - - - bash: | - set -ex - chmod +x codecov - ./codecov -t $TOKEN - displayName: Upload code coverage - workingDirectory: $(Build.SourcesDirectory) - env: - TOKEN: $(CODECOV_TOKEN) diff --git a/ci/azure-install-rust.yml b/ci/azure-install-rust.yml deleted file mode 100644 index 13e72a55f..000000000 --- a/ci/azure-install-rust.yml +++ /dev/null @@ -1,34 +0,0 @@ -steps: - # Linux and macOS. - - script: | - set -e - - curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain none - export PATH=$PATH:$HOME/.cargo/bin - rustup toolchain install $RUSTUP_TOOLCHAIN - rustup default $RUSTUP_TOOLCHAIN - echo "##vso[task.setvariable variable=PATH;]$PATH:$HOME/.cargo/bin" - env: - RUSTUP_TOOLCHAIN: ${{parameters.rust_version}} - displayName: "Install rust (*nix)" - condition: not(eq(variables['Agent.OS'], 'Windows_NT')) - - # Windows. - - script: | - curl -sSf -o rustup-init.exe https://win.rustup.rs - rustup-init.exe -y --default-toolchain %RUSTUP_TOOLCHAIN% --default-host x86_64-pc-windows-msvc - set PATH=%PATH%;%USERPROFILE%\.cargo\bin - echo "##vso[task.setvariable variable=PATH;]%PATH%;%USERPROFILE%\.cargo\bin" - env: - RUSTUP_TOOLCHAIN: ${{parameters.rust_version}} - displayName: "Install rust (windows)" - condition: eq(variables['Agent.OS'], 'Windows_NT') - - # All platforms. - - script: | - rustup component add rust-src - rustup component list --installed - rustup toolchain list - rustc -Vv - cargo -V - displayName: Query rust and cargo versions diff --git a/ci/azure-install-sccache.yml b/ci/azure-install-sccache.yml deleted file mode 100644 index b36c562bb..000000000 --- a/ci/azure-install-sccache.yml +++ /dev/null @@ -1,29 +0,0 @@ -# This template installs sccache (Shared Compilation Cache) -# More info: https://github.com/mozilla/sccache - -steps: - - bash: | - set -ex - curl -L https://github.com/mozilla/sccache/releases/download/0.2.10/sccache-0.2.10-x86_64-unknown-linux-musl.tar.gz | tar xzf - - sccache=`pwd`/sccache-0.2.10-x86_64-unknown-linux-musl/sccache - echo "##vso[task.setvariable variable=RUSTC_WRAPPER;]$sccache" - displayName: Install sccache - Linux X64 - condition: and(eq(variables['Agent.OS'], 'Linux'), eq(variables['Agent.OSArchitecture'], 'X64')) - - bash: | - set -ex - brew install openssl@1.1 - curl -L https://github.com/mozilla/sccache/releases/download/0.2.10/sccache-0.2.10-x86_64-apple-darwin.tar.gz | tar xzf - - sccache=`pwd`/sccache-0.2.10-x86_64-apple-darwin/sccache - echo "##vso[task.setvariable variable=RUSTC_WRAPPER;]$sccache" - displayName: Install sccache - Darwin - condition: eq( variables['Agent.OS'], 'Darwin' ) - - bash: | - set -ex - SCCACHE_ERROR_LOG=`pwd`/sccache.log RUST_LOG=debug $RUSTC_WRAPPER --start-server - $RUSTC_WRAPPER -s - cat sccache.log - displayName: "start sccache" - condition: not(eq( variables['Agent.OS'], 'Windows_NT' )) - env: - SCCACHE_AZURE_CONNECTION_STRING: $(SCCACHE_AZURE_CONNECTION_STRING) - SCCACHE_AZURE_BLOB_CONTAINER: $(SCCACHE_AZURE_BLOB_CONTAINER) \ No newline at end of file diff --git a/ci/azure-lint.yml b/ci/azure-lint.yml deleted file mode 100644 index 2863a7ee3..000000000 --- a/ci/azure-lint.yml +++ /dev/null @@ -1,39 +0,0 @@ -jobs: - - job: ${{ parameters.name }} - displayName: ${{ parameters.displayName }} - pool: - vmImage: ubuntu-22.04 - - steps: - # Install rust - - template: azure-install-rust.yml - parameters: - rust_version: ${{ parameters.rust }} - - # Install clippy - - script: | - rustup component add clippy - cargo clippy --version - displayName: Install Clippy - - # Install rustfmt - - script: | - rustup component add rustfmt - cargo fmt --version - displayName: Install rustfmt - - # Run clippy - - script: | - cargo clippy --all-features --workspace --all-targets --profile "ci" -- -D warnings - displayName: Run Clippy - - # Run rustfmt - - script: | - cargo fmt -- --check - displayName: Run rustfmt - - # Run doc - - script: | - RUSTDOCFLAGS="--cfg docsrs -Dwarnings" - cargo doc --lib --no-deps --all-features --workspace --profile "ci" - displayName: Run doc \ No newline at end of file diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml deleted file mode 100644 index 579da3e79..000000000 --- a/ci/azure-test-stable.yml +++ /dev/null @@ -1,26 +0,0 @@ -jobs: - - job: ${{ parameters.name }} - displayName: ${{ parameters.displayName }} - strategy: - matrix: - Linux: - vmImage: ubuntu-22.04 - - ${{ if parameters.cross }}: - MacOS: - vmImage: macOS-12 - Windows: - vmImage: windows-2019 - pool: - vmImage: $(vmImage) - - steps: - - template: azure-install-rust.yml - parameters: - rust_version: ${{ parameters.rust }} - - template: azure-install-sccache.yml - - script: cargo test --all-features --workspace --lib --tests --profile "ci" - env: - RUST_BACKTRACE: 1 - displayName: cargo test --lib --tests - workingDirectory: $(Build.SourcesDirectory) \ No newline at end of file diff --git a/example_apps/devguide/2_2/example_client/Cargo.toml b/example_apps/devguide/2_2/example_client/Cargo.toml new file mode 100644 index 000000000..9183041a5 --- /dev/null +++ b/example_apps/devguide/2_2/example_client/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "example_client_2_2" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { workspace = true, features = ["full"] } +swimos_client = { path = "../../../../client/swimos_client" } +swimos_form = { path = "../../../../api/swimos_form" } \ No newline at end of file diff --git a/example_apps/devguide/2_2/example_client/src/main.rs b/example_apps/devguide/2_2/example_client/src/main.rs new file mode 100644 index 000000000..2a8531c2f --- /dev/null +++ b/example_apps/devguide/2_2/example_client/src/main.rs @@ -0,0 +1,54 @@ +use swimos_client::{BasicValueDownlinkLifecycle, DownlinkConfig, RemotePath, SwimClientBuilder}; + +#[tokio::main] +async fn main() { + // Build a Swim Client using the default configuration. + // The `build` method returns a `SwimClient` instance and its internal + // runtime future that is spawned below. + let (client, task) = SwimClientBuilder::default().build().await; + let _client_task = tokio::spawn(task); + let handle = client.handle(); + + // Build a path the downlink. + let state_path = RemotePath::new( + // The host address + "ws://0.0.0.0:8080", + // You can provide any agent URI that matches the pattern + // "/example/:id" + "/example/1", + // This is the URI of the ValueLane in our ExampleAgent + "state", + ); + + let lifecycle = BasicValueDownlinkLifecycle::::default() + // Register an event handler that is invoked when the downlink connects to the agent. + .on_linked_blocking(|| println!("Downlink linked")) + // Register an event handler that is invoked when the downlink synchronises its state. + // with the agent. + .on_synced_blocking(|value| println!("Downlink synced with: {value:?}")) + // Register an event handler that is invoked when the downlink receives an event. + .on_event_blocking(|value| println!("Downlink event: {value:?}")); + + // Build our downlink. + // + // This operation may fail if there is a connection issue. + let state_downlink = handle + .value_downlink::(state_path) + .lifecycle(lifecycle) + .downlink_config(DownlinkConfig::default()) + .open() + .await + .expect("Failed to open downlink"); + + for i in 0..10 { + // Update the lane's state. + state_downlink + .set(i) + .await + .expect("Failed to set downlink state"); + } + + tokio::signal::ctrl_c() + .await + .expect("Failed to listen for ctrl-c."); +} diff --git a/example_apps/devguide/2_2/example_server/Cargo.toml b/example_apps/devguide/2_2/example_server/Cargo.toml new file mode 100644 index 000000000..0ede66706 --- /dev/null +++ b/example_apps/devguide/2_2/example_server/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "example_server_2_2" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { workspace = true, features = ["full"] } +swimos = { path = "../../../../swimos", features = ["server"] } +swimos_form = { path = "../../../../api/swimos_form" } \ No newline at end of file diff --git a/example_apps/devguide/2_2/example_server/src/main.rs b/example_apps/devguide/2_2/example_server/src/main.rs new file mode 100644 index 000000000..15e0f2128 --- /dev/null +++ b/example_apps/devguide/2_2/example_server/src/main.rs @@ -0,0 +1,104 @@ +use swimos::{ + agent::{ + agent_lifecycle::HandlerContext, agent_model::AgentModel, event_handler::EventHandler, + lanes::ValueLane, lifecycle, projections, AgentLaneModel, + }, + route::RoutePattern, + server::{Server, ServerBuilder, ServerHandle}, +}; + +use std::{error::Error, time::Duration}; + +#[derive(AgentLaneModel)] +#[projections] +pub struct ExampleAgent { + state: ValueLane, +} + +#[derive(Clone)] +pub struct ExampleLifecycle; + +#[lifecycle(ExampleAgent)] +impl ExampleLifecycle { + // Handler invoked when the agent starts. + #[on_start] + pub fn on_start( + &self, + context: HandlerContext, + ) -> impl EventHandler { + context.effect(|| println!("Starting agent.")) + } + + // Handler invoked when the agent is about to stop. + #[on_stop] + pub fn on_stop( + &self, + context: HandlerContext, + ) -> impl EventHandler { + context.effect(|| println!("Stopping agent.")) + } + + // Handler invoked after the state of 'lane' has changed. + #[on_event(state)] + pub fn on_event( + &self, + context: HandlerContext, + value: &i32, + ) -> impl EventHandler { + let n = *value; + // EventHandler::effect accepts a FnOnce() + // which runs a side effect. + context.effect(move || { + println!("Setting value to: {}", n); + }) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create a dynamic route for our agents. + let route = RoutePattern::parse_str("/example/:id")?; + // Create an agent model which contains the factory for creating the agent as well + // as the lifecycle which will be run. + let agent = AgentModel::new(ExampleAgent::default, ExampleLifecycle.into_lifecycle()); + + // Create a server builder. + let server = ServerBuilder::with_plane_name("Plane") + // Bind to port 8080 + .set_bind_addr("127.0.0.1:8080".parse().unwrap()) + // For this guide, ensure agents timeout fairly quickly. + // An agent will timeout after they have received no new updates + // for this configured period of time. + .update_config(|config| { + config.agent_runtime.inactive_timeout = Duration::from_secs(20); + }) + // Register the agent against the route. + .add_route(route, agent) + .build() + // Building the server may fail if many routes are registered and some + // are ambiguous. + .await?; + + // Run the server. A tuple of the server's runtime + // future and a handle to the runtime is returned. + let (task, handle) = server.run(); + // Watch for ctrl+c signals + let shutdown = manage_handle(handle); + + // Join on the server and ctrl+c futures. + let (_, result) = tokio::join!(shutdown, task); + + result?; + println!("Server stopped successfully."); + Ok(()) +} + +// Utility function for awaiting a stop signal in the terminal. +async fn manage_handle(mut handle: ServerHandle) { + tokio::signal::ctrl_c() + .await + .expect("Failed to register interrupt handler."); + + println!("Stopping server."); + handle.stop(); +} diff --git a/example_apps/devguide/2_3/example_client/Cargo.toml b/example_apps/devguide/2_3/example_client/Cargo.toml new file mode 100644 index 000000000..1fa19be12 --- /dev/null +++ b/example_apps/devguide/2_3/example_client/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "example_client_2_3" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { workspace = true, features = ["full"] } +swimos_client = { path = "../../../../client/swimos_client" } +swimos_form = { path = "../../../../api/swimos_form" } \ No newline at end of file diff --git a/example_apps/devguide/2_3/example_client/src/main.rs b/example_apps/devguide/2_3/example_client/src/main.rs new file mode 100644 index 000000000..adda2cfe5 --- /dev/null +++ b/example_apps/devguide/2_3/example_client/src/main.rs @@ -0,0 +1,73 @@ +use swimos_client::{BasicValueDownlinkLifecycle, DownlinkConfig, RemotePath, SwimClientBuilder}; +use swimos_form::Form; + +#[derive(Debug, Form, Copy, Clone)] +pub enum Operation { + Add(i32), + Sub(i32), +} + +#[tokio::main] +async fn main() { + // Build a Swim Client using the default configuration. + // The `build` method returns a `SwimClient` instance and its internal + // runtime future that is spawned below. + let (client, task) = SwimClientBuilder::default().build().await; + let _client_task = tokio::spawn(task); + let handle = client.handle(); + + // Build a path the downlink. + let state_path = RemotePath::new( + // The host address + "ws://0.0.0.0:8080", + // You can provide any agent URI that matches the pattern + // "/example/:id" + "/example/1", + // This is the URI of the ValueLane in our ExampleAgent + "state", + ); + + let lifecycle = BasicValueDownlinkLifecycle::::default() + // Register an event handler that is invoked when the downlink connects to the agent. + .on_linked_blocking(|| println!("Downlink linked")) + // Register an event handler that is invoked when the downlink synchronises its state. + // with the agent. + .on_synced_blocking(|value| println!("Downlink synced with: {value:?}")) + // Register an event handler that is invoked when the downlink receives an event. + .on_event_blocking(|value| println!("Downlink event: {value:?}")); + + // Build our downlink. + // + // This operation may fail if there is a connection issue. + let _state_downlink = handle + .value_downlink::(state_path) + .lifecycle(lifecycle) + .downlink_config(DownlinkConfig::default()) + .open() + .await + .expect("Failed to open downlink"); + + let exec_path = RemotePath::new( + // The host address + "ws://0.0.0.0:8080", + // You can provide any agent URI that matches the pattern + // "/example/:id" + "/example/1", + // This is the URI of the ValueLane in our ExampleAgent + "exec", + ); + + let exec_downlink = handle + .value_downlink::(exec_path) + .downlink_config(DownlinkConfig::default()) + .open() + .await + .expect("Failed to open exec downlink"); + + exec_downlink.set(Operation::Add(1000)).await.unwrap(); + exec_downlink.set(Operation::Sub(13)).await.unwrap(); + + tokio::signal::ctrl_c() + .await + .expect("Failed to listen for ctrl-c."); +} diff --git a/example_apps/devguide/2_3/example_server/Cargo.toml b/example_apps/devguide/2_3/example_server/Cargo.toml new file mode 100644 index 000000000..c3274305d --- /dev/null +++ b/example_apps/devguide/2_3/example_server/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "example_server_2_3" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { workspace = true, features = ["full"] } +swimos = { path = "../../../../swimos", features = ["server"] } +swimos_form = { path = "../../../../api/swimos_form" } \ No newline at end of file diff --git a/example_apps/devguide/2_3/example_server/src/main.rs b/example_apps/devguide/2_3/example_server/src/main.rs new file mode 100644 index 000000000..2e0f65363 --- /dev/null +++ b/example_apps/devguide/2_3/example_server/src/main.rs @@ -0,0 +1,138 @@ +use swimos::{ + agent::{ + agent_lifecycle::HandlerContext, agent_model::AgentModel, event_handler::EventHandler, + lanes::ValueLane, lifecycle, projections, AgentLaneModel, + }, + route::RoutePattern, + server::{Server, ServerBuilder, ServerHandle}, +}; + +use std::{error::Error, time::Duration}; +use swimos::agent::event_handler::HandlerActionExt; +use swimos::agent::lanes::CommandLane; +use swimos_form::Form; + +// Note how as this is a custom type we need to derive `Form` for it. +// For most types, simply adding the derive attribute will suffice. +#[derive(Debug, Form, Copy, Clone)] +pub enum Operation { + Add(i32), + Sub(i32), +} + +#[derive(AgentLaneModel)] +#[projections] +pub struct ExampleAgent { + state: ValueLane, + exec: CommandLane, +} + +#[derive(Clone)] +pub struct ExampleLifecycle; + +#[lifecycle(ExampleAgent)] +impl ExampleLifecycle { + // Handler invoked when the agent starts. + #[on_start] + pub fn on_start( + &self, + context: HandlerContext, + ) -> impl EventHandler { + context.effect(|| println!("Starting agent.")) + } + + // Handler invoked when the agent is about to stop. + #[on_stop] + pub fn on_stop( + &self, + context: HandlerContext, + ) -> impl EventHandler { + context.effect(|| println!("Stopping agent.")) + } + + // Handler invoked after the state of 'lane' has changed. + #[on_event(state)] + pub fn on_event( + &self, + context: HandlerContext, + value: &i32, + ) -> impl EventHandler { + let n = *value; + // EventHandler::effect accepts a FnOnce() + // which runs a side effect. + context.effect(move || { + println!("Setting value to: {}", n); + }) + } + + #[on_command(exec)] + pub fn on_command( + &self, + context: HandlerContext, + // Notice a reference to the deserialized command envelope is provided. + operation: &Operation, + ) -> impl EventHandler { + let operation = *operation; + context + // Get the current state of our `state` lane. + .get_value(ExampleAgent::STATE) + .and_then(move |state| { + // Calculate the new state. + let new_state = match operation { + Operation::Add(val) => state + val, + Operation::Sub(val) => state - val, + }; + // Return a event handler which updates the state of the `state` lane. + context.set_value(ExampleAgent::STATE, new_state) + }) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create a dynamic route for our agents. + let route = RoutePattern::parse_str("/example/:id")?; + // Create an agent model which contains the factory for creating the agent as well + // as the lifecycle which will be run. + let agent = AgentModel::new(ExampleAgent::default, ExampleLifecycle.into_lifecycle()); + + // Create a server builder. + let server = ServerBuilder::with_plane_name("Plane") + // Bind to port 8080 + .set_bind_addr("127.0.0.1:8080".parse().unwrap()) + // For this guide, ensure agents timeout fairly quickly. + // An agent will timeout after they have received no new updates + // for this configured period of time. + .update_config(|config| { + config.agent_runtime.inactive_timeout = Duration::from_secs(20); + }) + // Register the agent against the route. + .add_route(route, agent) + .build() + // Building the server may fail if many routes are registered and some + // are ambiguous. + .await?; + + // Run the server. A tuple of the server's runtime + // future and a handle to the runtime is returned. + let (task, handle) = server.run(); + // Watch for ctrl+c signals + let shutdown = manage_handle(handle); + + // Join on the server and ctrl+c futures. + let (_, result) = tokio::join!(shutdown, task); + + result?; + println!("Server stopped successfully."); + Ok(()) +} + +// Utility function for awaiting a stop signal in the terminal. +async fn manage_handle(mut handle: ServerHandle) { + tokio::signal::ctrl_c() + .await + .expect("Failed to register interrupt handler."); + + println!("Stopping server."); + handle.stop(); +} diff --git a/runtime/swimos_remote/src/tls/config/mod.rs b/runtime/swimos_remote/src/tls/config/mod.rs index 25ad69905..8d4c27419 100644 --- a/runtime/swimos_remote/src/tls/config/mod.rs +++ b/runtime/swimos_remote/src/tls/config/mod.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use rustls::crypto::CryptoProvider; +use std::sync::Arc; + /// Supported certificate formats for TLS connections. pub enum CertFormat { Pem, @@ -60,9 +63,11 @@ impl PrivateKey { Self::new(CertFormat::Pem, body) } } -/// Combined TLS configuration (both server and client)/ +/// Combined TLS configuration (both server and client). pub struct TlsConfig { + /// Configuration parameters for a TLS client. pub client: ClientConfig, + /// Configuration parameters for a TLS server. pub server: ServerConfig, } @@ -74,17 +79,26 @@ impl TlsConfig { /// Configuration parameters for a TLS server. pub struct ServerConfig { + /// A chain of TLS certificates (starting with the server certificate and ending with the CA). pub chain: CertChain, + /// An unvalidated private key for a server. pub key: PrivateKey, + /// Whether to enable a [`rustls::KeyLog`] implementation that opens a file whose name is given by the + /// `SSLKEYLOGFILE` environment variable, and writes keys into it. While this may be enabled, + /// if `SSLKEYLOGFILE` is not set, it will do nothing. pub enable_log_file: bool, + /// Process-wide [`CryptoProvider`] that must already have been installed as the default + /// provider. + pub provider: Arc, } impl ServerConfig { - pub fn new(chain: CertChain, key: PrivateKey) -> Self { + pub fn new(chain: CertChain, key: PrivateKey, provider: Arc) -> Self { ServerConfig { chain, key, enable_log_file: false, + provider, } } } diff --git a/runtime/swimos_remote/src/tls/errors.rs b/runtime/swimos_remote/src/tls/errors.rs index fe570b146..64c0dab22 100644 --- a/runtime/swimos_remote/src/tls/errors.rs +++ b/runtime/swimos_remote/src/tls/errors.rs @@ -25,7 +25,7 @@ pub enum TlsError { InvalidPrivateKey, /// Certificate validation failed. #[error("Invalid certificate: {0}")] - BadCertificate(#[from] webpki::Error), + BadCertificate(#[from] rustls::Error), /// The provided host name was invalid. #[error("Invalid DNS host name.")] BadHostName, diff --git a/runtime/swimos_remote/src/tls/net/client.rs b/runtime/swimos_remote/src/tls/net/client.rs index 421359145..2aad01aac 100644 --- a/runtime/swimos_remote/src/tls/net/client.rs +++ b/runtime/swimos_remote/src/tls/net/client.rs @@ -15,7 +15,8 @@ use std::{net::SocketAddr, sync::Arc}; use futures::{future::BoxFuture, FutureExt}; -use rustls::{OwnedTrustAnchor, RootCertStore, ServerName}; +use rustls::pki_types::ServerName; +use rustls::RootCertStore; use crate::dns::{BoxDnsResolver, DnsResolver, Resolver}; use crate::net::{ClientConnections, ConnectionError, ConnectionResult, Scheme}; @@ -49,25 +50,16 @@ impl RustlsClientNetworking { } = config; let mut root_store = RootCertStore::empty(); if use_webpki_roots { - root_store.add_server_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.0.iter().map( - |ta| { - OwnedTrustAnchor::from_subject_spki_name_constraints( - ta.subject, - ta.spki, - ta.name_constraints, - ) - }, - )); + root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()) } for cert in custom_roots { for c in super::load_cert_file(cert)? { - root_store.add(&c)?; + root_store.add(c)?; } } let config = rustls::ClientConfig::builder() - .with_safe_defaults() .with_root_certificates(root_store) .with_no_client_auth(); @@ -93,10 +85,10 @@ impl ClientConnections for RustlsClientNetworking { .boxed(), Scheme::Wss => { let domain = if let Some(host_name) = host { - ServerName::try_from(host_name) + ServerName::try_from(host_name.to_string()) .map_err(|err| ConnectionError::BadParameter(err.to_string())) } else { - Ok(ServerName::IpAddress(addr.ip())) + Ok(ServerName::IpAddress(addr.ip().into())) }; async move { let stream = TcpStream::connect(addr).await?; diff --git a/runtime/swimos_remote/src/tls/net/mod.rs b/runtime/swimos_remote/src/tls/net/mod.rs index 206e12aca..6dc44089c 100644 --- a/runtime/swimos_remote/src/tls/net/mod.rs +++ b/runtime/swimos_remote/src/tls/net/mod.rs @@ -26,6 +26,7 @@ pub use client::RustlsClientNetworking; use futures::future::Either; use futures::TryFutureExt; use futures::{future::BoxFuture, FutureExt}; +use rustls::pki_types::CertificateDer; pub use server::{RustlsListener, RustlsServerNetworking}; use crate::tls::{ @@ -36,16 +37,17 @@ use crate::tls::{ use self::server::MaybeRustTlsListener; -fn load_cert_file(file: CertificateFile) -> Result, TlsError> { +fn load_cert_file(file: CertificateFile) -> Result>, TlsError> { let CertificateFile { format, body } = file; - let certs = match format { + match format { CertFormat::Pem => { let mut body_ref = body.as_ref(); - rustls_pemfile::certs(&mut body_ref).map_err(TlsError::InvalidPem)? + rustls_pemfile::certs(&mut body_ref) + .map(|r| r.map_err(TlsError::InvalidPem)) + .collect() } - CertFormat::Der => vec![body], - }; - Ok(certs.into_iter().map(rustls::Certificate).collect()) + CertFormat::Der => Ok(vec![CertificateDer::from(body)]), + } } /// Combined implementation of [`ClientConnections`] and [`ServerConnections`] that wraps diff --git a/runtime/swimos_remote/src/tls/net/server.rs b/runtime/swimos_remote/src/tls/net/server.rs index d137bcfa9..985f074e7 100644 --- a/runtime/swimos_remote/src/tls/net/server.rs +++ b/runtime/swimos_remote/src/tls/net/server.rs @@ -22,6 +22,7 @@ use futures::{ stream::{unfold, BoxStream, FuturesUnordered}, Future, FutureExt, Stream, StreamExt, TryStreamExt, }; +use rustls::pki_types::PrivateKeyDer; use rustls::KeyLogFile; use rustls_pemfile::Item; use tokio::net::{TcpListener, TcpStream}; @@ -73,6 +74,7 @@ impl TryFrom for RustlsServerNetworking { chain: CertChain(certs), key, enable_log_file, + provider, } = config; let mut chain = vec![]; @@ -85,19 +87,19 @@ impl TryFrom for RustlsServerNetworking { CertFormat::Pem => { let mut body_ref = body.as_ref(); match rustls_pemfile::read_one(&mut body_ref).map_err(TlsError::InvalidPem)? { - Some(Item::ECKey(body) | Item::PKCS8Key(body) | Item::RSAKey(body)) => { - rustls::PrivateKey(body) - } - _ => { - return Err(TlsError::InvalidPrivateKey); - } + Some(Item::Sec1Key(body)) => PrivateKeyDer::from(body), + Some(Item::Pkcs8Key(body)) => PrivateKeyDer::from(body), + Some(Item::Pkcs1Key(body)) => PrivateKeyDer::from(body), + _ => return Err(TlsError::InvalidPrivateKey), } } - CertFormat::Der => rustls::PrivateKey(body), + CertFormat::Der => { + PrivateKeyDer::try_from(body).map_err(|_| TlsError::InvalidPrivateKey)? + } }; - let mut config = rustls::ServerConfig::builder() - .with_safe_defaults() + let mut config = rustls::ServerConfig::builder_with_provider(provider) + .with_safe_default_protocol_versions()? .with_no_client_auth() .with_single_cert(chain, server_key) .expect("Invalid certs or private key."); diff --git a/runtime/swimos_remote/src/tls/net/tests.rs b/runtime/swimos_remote/src/tls/net/tests.rs index d183a3dc9..2cf4d202e 100644 --- a/runtime/swimos_remote/src/tls/net/tests.rs +++ b/runtime/swimos_remote/src/tls/net/tests.rs @@ -46,11 +46,18 @@ fn make_server_config() -> ServerConfig { CertificateFile::der(ca_cert), ]); + let provider = rustls::crypto::aws_lc_rs::default_provider(); + provider + .clone() + .install_default() + .expect("Crypto Provider has already been initialised elsewhere."); + let key = PrivateKey::der(server_key); ServerConfig { chain, key, enable_log_file: false, + provider: Arc::new(provider), } } diff --git a/swimos/src/agent.rs b/swimos/src/agent.rs index 0e88e363c..ecf42df3f 100644 --- a/swimos/src/agent.rs +++ b/swimos/src/agent.rs @@ -142,13 +142,16 @@ pub use swimos_agent_derive::AgentLaneModel; /// 3. [Map Lanes](`lanes::MapLane`) /// 4. [Join-Value Lanes](`lanes::JoinValueLane`) /// 5. [Join-Map Lanes](`lanes::JoinMapLane`) -/// 6. [HTTP Lanes](`lanes::HttpLane`) (or [Simple HTTP Lanes](`lanes::SimpleHttpLane`)) +/// 6. [Demand Lanes](`lanes::DemandLane`) +/// 7. [Demand-Map Lanes](`lanes::DemandMapLane`) +/// 8. [Supply Lanes](`lanes::SupplyLane`) +/// 9. [HTTP Lanes](`lanes::HttpLane`) (or [Simple HTTP Lanes](`lanes::SimpleHttpLane`)) /// -/// For [Value Lanes](`lanes::ValueLane`) and [Map Lanes](`lanes::CommandLane`), the type parameter -/// must implement the [`swimos_form::Form`] trait (used for serialization and deserialization). For -/// [Map Lanes](`lanes::MapLane`), [Join-Value Lanes](`lanes::JoinValueLane`) and [Join-Map Lanes](`lanes::JoinMapLane`), -/// both parameters must implement [`swimos_form::Form`] and additionally, the key type `K` must additionally -/// satisfy `K: Hash + Eq + Ord + Clone`. +/// For [Value Lanes](`lanes::ValueLane`), [Command Lanes](`lanes::CommandLane`), [Demand Lanes](`lanes::DemandLane`) and +/// [Supply Lanes](`lanes::SupplyLane`), the type parameter must implement the [`swimos_form::Form`] trait (used for serialization +/// and deserialization). For [Map Lanes](`lanes::MapLane`), [Demand-Map Lanes](`lanes::MapLane`), +/// [Join-Value Lanes](`lanes::JoinValueLane`) and [Join-Map Lanes](`lanes::JoinMapLane`), both parameters must implement +/// [`swimos_form::Form`] and additionally, the key type `K` must additionally satisfy `K: Hash + Eq + Ord + Clone`. /// /// Additionally, for [Join-Map Lanes](`lanes::JoinMapLane`), the link key type `L` must satisfy`L: Hash + Eq + Clone`. /// @@ -163,19 +166,34 @@ pub use swimos_agent_derive::AgentLaneModel; /// codec that is selected for the lane (using the appropriate type parameter). By default, this is the /// [Default Codec](`lanes::http::DefaultCodec`). This codec always requires that type parameters implement /// [`swimos_form::Form`] and, if the `json` feature is active, that they are Serde serializable. +/// [`crate::agent::lanes::http::DefaultCodec`]. +/// +/// The supported store types are: +/// +/// 1. [`crate::agent::stores::ValueStore`] +/// 2. [`crate::agent::stores::MapStore`] +/// +/// These have exactly the same restrictions on their type parameters as the corresponding lane types. /// /// As an example, the following is a valid agent type defining items of each supported kind: /// /// ```no_run /// use swimos::agent::AgentLaneModel; -/// use swimos::agent::lanes::{ValueLane, CommandLane, MapLane, JoinValueLane, JoinMapLane, SimpleHttpLane}; +/// use swimos::agent::lanes::{ +/// ValueLane, CommandLane, DemandLane, +/// DemandMapLane, MapLane, JoinValueLane, +/// JoinMapLane, SimpleHttpLane, SupplyLane +/// }; /// use swimos::agent::stores::{ValueStore, MapStore}; /// /// #[derive(AgentLaneModel)] /// struct ExampleAgent { /// value_lane: ValueLane, /// command_lane: CommandLane, +/// demand_lane: DemandLane, +/// supply_lane: SupplyLane, /// map_lane: MapLane, +/// demand_map_lane: DemandMapLane, /// value_store: ValueStore, /// map_store: MapStore, /// join_value: JoinValueLane,