diff --git a/.github/workflows/code-checks.yaml b/.github/workflows/code-checks.yaml index c379eb07c..d7a097558 100644 --- a/.github/workflows/code-checks.yaml +++ b/.github/workflows/code-checks.yaml @@ -4,7 +4,7 @@ on: push jobs: code-checks: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest steps: - name: checkout uses: actions/checkout@v4 diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 22c38dd1b..867860c83 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -23,9 +23,9 @@ jobs: to_test: - "mnist-keras numpyhelper" - "mnist-pytorch numpyhelper" - python_version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python_version: ["3.9", "3.10", "3.11", "3.12"] os: - - ubuntu-22.04 + - ubuntu-24.04 runs-on: ${{ matrix.os }} steps: - name: checkout diff --git a/Dockerfile b/Dockerfile index b651dbea4..169fc5097 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,54 +1,61 @@ -# Base image -ARG BASE_IMG=python:3.10-slim -FROM $BASE_IMG +# Stage 1: Builder +ARG BASE_IMG=python:3.12-slim +FROM $BASE_IMG as builder ARG GRPC_HEALTH_PROBE_VERSION="" - -# Requirements (use MNIST Keras as default) ARG REQUIREMENTS="" +WORKDIR /build + +# Install build dependencies +RUN apt-get update && apt-get upgrade -y && apt-get install -y --no-install-recommends python3-dev gcc wget \ + && rm -rf /var/lib/apt/lists/* + # Add FEDn and default configs -COPY . /app -COPY config/settings-client.yaml.template /app/config/settings-client.yaml -COPY config/settings-combiner.yaml.template /app/config/settings-combiner.yaml -COPY config/settings-hooks.yaml.template /app/config/settings-hooks.yaml -COPY config/settings-reducer.yaml.template /app/config/settings-reducer.yaml -COPY $REQUIREMENTS /app/config/requirements.txt +COPY . /build +COPY $REQUIREMENTS /build/requirements.txt -# Install developer tools (needed for psutil) -RUN apt-get update && apt-get install -y python3-dev gcc +# Install dependencies +RUN python -m venv /venv \ + && /venv/bin/pip install --upgrade pip \ + && /venv/bin/pip install --no-cache-dir 'setuptools>=65' \ + && /venv/bin/pip install --no-cache-dir . \ + && if [[ ! -z "$REQUIREMENTS" ]]; then \ + /venv/bin/pip install --no-cache-dir -r /build/requirements.txt; \ + fi \ + && rm -rf /build/requirements.txt -# Install grpc health probe checker + +# Install grpc health probe RUN if [ ! -z "$GRPC_HEALTH_PROBE_VERSION" ]; then \ - apt-get install -y wget && \ - wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ - chmod +x /bin/grpc_health_probe && \ - apt-get remove -y wget && apt autoremove -y; \ - else \ - echo "No grpc_health_probe version specified, skipping installation"; \ + wget -qO /build/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ + chmod +x /build/grpc_health_probe; \ fi -# Setup working directory +# Stage 2: Runtime +FROM $BASE_IMG + WORKDIR /app -# Create FEDn app directory -SHELL ["/bin/bash", "-c"] -RUN mkdir -p /app \ - && mkdir -p /app/client \ - && mkdir -p /app/certs \ - && mkdir -p /app/client/package \ - && mkdir -p /app/certs \ - # - # Install FEDn and requirements - && python -m venv /venv \ - && /venv/bin/pip install --upgrade pip \ - && /venv/bin/pip install --no-cache-dir 'setuptools>=65' \ - && /venv/bin/pip install --no-cache-dir -e . \ - && if [[ ! -z "$REQUIREMENTS" ]]; then \ - /venv/bin/pip install --no-cache-dir -r /app/config/requirements.txt; \ - fi \ - # - # Clean up - && rm -r /app/config/requirements.txt +# Copy application and venv from the builder stage +COPY --from=builder /venv /venv +COPY --from=builder /build /app + +# Use a non-root user +RUN set -ex \ + # Create a non-root user + && addgroup --system --gid 1001 appgroup \ + && adduser --system --uid 1001 --gid 1001 --no-create-home appuser \ + # Creare application specific tmp directory, set ENV TMPDIR to /app/tmp + && mkdir -p /app/tmp \ + && chown -R appuser:appgroup /venv /app \ + # Upgrade the package index and install security upgrades + && apt-get update \ + && apt-get upgrade -y \ + && apt-get autoremove -y \ + && apt-get clean -y \ + && rm -rf /var/lib/apt/lists/* +USER appuser + +ENTRYPOINT [ "/venv/bin/fedn" ] -ENTRYPOINT [ "/venv/bin/fedn" ] \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index f020d9a0c..598aad0cb 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -58,21 +58,23 @@ services: - USER=test - PROJECT=project - FLASK_DEBUG=1 - - STATESTORE_CONFIG=/app/config/settings-reducer.yaml - - MODELSTORAGE_CONFIG=/app/config/settings-reducer.yaml + - STATESTORE_CONFIG=/app/config/settings-reducer.yaml.template + - MODELSTORAGE_CONFIG=/app/config/settings-reducer.yaml.template + - FEDN_COMPUTE_PACKAGE_DIR=/app + - TMPDIR=/app/tmp build: context: . args: - BASE_IMG: ${BASE_IMG:-python:3.10-slim} + BASE_IMG: ${BASE_IMG:-python:3.12-slim} working_dir: /app volumes: - ${HOST_REPO_DIR:-.}/fedn:/app/fedn depends_on: - minio - mongo - entrypoint: [ "sh", "-c" ] command: - - "/venv/bin/pip install --no-cache-dir -e . && /venv/bin/fedn controller start" + - controller + - start ports: - 8092:8092 @@ -81,24 +83,27 @@ services: environment: - PYTHONUNBUFFERED=0 - GET_HOSTS_FROM=dns - - STATESTORE_CONFIG=/app/config/settings-combiner.yaml - - MODELSTORAGE_CONFIG=/app/config/settings-combiner.yaml + - STATESTORE_CONFIG=/app/config/settings-combiner.yaml.template + - MODELSTORAGE_CONFIG=/app/config/settings-combiner.yaml.template - HOOK_SERVICE_HOST=hook:12081 + - TMPDIR=/app/tmp build: context: . args: - BASE_IMG: ${BASE_IMG:-python:3.10-slim} - GRPC_HEALTH_PROBE_VERSION: v0.4.24 + BASE_IMG: ${BASE_IMG:-python:3.12-slim} + GRPC_HEALTH_PROBE_VERSION: v0.4.35 working_dir: /app volumes: - ${HOST_REPO_DIR:-.}/fedn:/app/fedn - entrypoint: [ "sh", "-c" ] command: - - "/venv/bin/pip install --no-cache-dir -e . && /venv/bin/fedn combiner start --init config/settings-combiner.yaml" + - combiner + - start + - --init + - config/settings-combiner.yaml.template ports: - 12080:12080 healthcheck: - test: [ "CMD", "/bin/grpc_health_probe", "-addr=localhost:12080" ] + test: [ "CMD", "/app/grpc_health_probe", "-addr=localhost:12080" ] interval: 20s timeout: 10s retries: 5 @@ -110,11 +115,12 @@ services: container_name: hook environment: - GET_HOSTS_FROM=dns + - TMPDIR=/app/tmp build: context: . args: - BASE_IMG: ${BASE_IMG:-python:3.10-slim} - GRPC_HEALTH_PROBE_VERSION: v0.4.24 + BASE_IMG: ${BASE_IMG:-python:3.12-slim} + GRPC_HEALTH_PROBE_VERSION: v0.4.35 working_dir: /app volumes: - ${HOST_REPO_DIR:-.}/fedn:/app/fedn @@ -141,9 +147,11 @@ services: working_dir: /app volumes: - ${HOST_REPO_DIR:-.}/fedn:/app/fedn - entrypoint: [ "sh", "-c" ] command: - - "/venv/bin/pip install --no-cache-dir -e . && /venv/bin/fedn client start --api-url http://api-server:8092" + - client + - start + - --api-url + - http://api-server:8092 deploy: replicas: 0 depends_on: diff --git a/docs/conf.py b/docs/conf.py index c48e378f4..f451f8b74 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,7 +11,7 @@ author = "Scaleout Systems AB" # The full version, including alpha/beta/rc tags -release = "0.19.0" +release = "0.20.0" # Add any Sphinx extension module names here, as strings extensions = [ diff --git a/docs/quickstart.rst b/docs/quickstart.rst index e7641f0d4..0a309bb32 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -9,7 +9,7 @@ Getting started with FEDn **Prerequisites** -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A FEDn Studio account `__ diff --git a/examples/FedSimSiam/README.rst b/examples/FedSimSiam/README.rst index 5831fd3ea..47fa93c6b 100644 --- a/examples/FedSimSiam/README.rst +++ b/examples/FedSimSiam/README.rst @@ -16,7 +16,7 @@ To run the example, follow the steps below. For a more detailed explanation, fol Prerequisites ------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A project in FEDn Studio `__ Creating the compute package and seed model diff --git a/examples/FedSimSiam/client/python_env.yaml b/examples/FedSimSiam/client/python_env.yaml index d728b82be..45f23ad30 100644 --- a/examples/FedSimSiam/client/python_env.yaml +++ b/examples/FedSimSiam/client/python_env.yaml @@ -9,7 +9,6 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") - fedn diff --git a/examples/flower-client/client/python_env.yaml b/examples/flower-client/client/python_env.yaml index a82e7e50d..06b00186c 100644 --- a/examples/flower-client/client/python_env.yaml +++ b/examples/flower-client/client/python_env.yaml @@ -10,8 +10,7 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") - fire==0.3.1 - flwr-datasets[vision]==0.1.0 \ No newline at end of file diff --git a/examples/huggingface/README.rst b/examples/huggingface/README.rst index eaaad3254..68bceb685 100644 --- a/examples/huggingface/README.rst +++ b/examples/huggingface/README.rst @@ -27,7 +27,7 @@ To run the example, follow the steps below. For a more detailed explanation, fol Prerequisites ------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A project in FEDn Studio `__ Creating the compute package and seed model diff --git a/examples/huggingface/client/python_env.yaml b/examples/huggingface/client/python_env.yaml index 87ee6f32d..6cc2925b4 100644 --- a/examples/huggingface/client/python_env.yaml +++ b/examples/huggingface/client/python_env.yaml @@ -9,9 +9,8 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") - transformers - datasets - fedn diff --git a/examples/mnist-keras/README.rst b/examples/mnist-keras/README.rst index aaf13c21d..741813362 100644 --- a/examples/mnist-keras/README.rst +++ b/examples/mnist-keras/README.rst @@ -8,7 +8,7 @@ This is a TF/Keras version of the PyTorch Quickstart Tutorial. For a step-by-ste Prerequisites ------------------------------------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ Creating the compute package and seed model ------------------------------------------- diff --git a/examples/mnist-pytorch-DPSGD/README.rst b/examples/mnist-pytorch-DPSGD/README.rst index 88220584a..42125eade 100644 --- a/examples/mnist-pytorch-DPSGD/README.rst +++ b/examples/mnist-pytorch-DPSGD/README.rst @@ -2,21 +2,30 @@ FEDn Project: Federated Differential Privacy MNIST (Opacus + PyTorch) ---------------------------------------------------------------------- This example FEDn Project demonstrates how Differential Privacy can be integrated to enhance the confidentiality of client data. -We have expanded our baseline MNIST-PyTorch example by incorporating the Opacus framework, which is specifically designed for PyTorch models. +We have expanded our baseline MNIST-PyTorch example by incorporating the Opacus framework, which is specifically designed for PyTorch models. To learn more about differential privacy, read our `blogpost `__ about it. Prerequisites ------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A project in FEDn Studio `__ -Edit Differential Privacy budget + +Edit Client-Specific Differential Privacy Parameters -------------------------- -- The **Differential Privacy budget** (`FINAL_EPSILON`, `DELTA`) is configured in the `compute` package at `client/train.py` (lines 35 and 39). -- If `HARDLIMIT` (line 40) is set to `True`, the `FINAL_EPSILON` will not exceed its specified limit. -- If `HARDLIMIT` is set to `False`, the expected `FINAL_EPSILON` will be around its specified value given the server runs `GLOBAL_ROUNDS` variable (line 36). +The **Differential Privacy budget** (``epsilon``, ``delta``), along with other settings, is configurable in the ``client_settings.yaml`` file: + +- **epochs**: Number of local epochs per round. +- **epsilon**: Total epsilon budget to spend, determined by the ``global_rounds`` set on the server side. +- **delta**: Total delta budget to spend. +- **max_grad_norm**: Clipping threshold for gradients. +- **global_rounds**: Number of rounds the server will run. +- **hardlimit**: + + - If ``hardlimit`` is set to ``True``, the ``epsilon`` budget will not exceed its specified limit, even if it means skipping updates for some rounds. + - If ``hardlimit`` is set to ``False``, the expected ``epsilon`` will be approximately equal to its specified value, assuming the server completes the specified ``global_rounds`` of updates. Creating the compute package and seed model ------------------------------------------- diff --git a/examples/mnist-pytorch-DPSGD/client/fedn.yaml b/examples/mnist-pytorch-DPSGD/client/fedn.yaml index 30873488b..ce9af3ea9 100644 --- a/examples/mnist-pytorch-DPSGD/client/fedn.yaml +++ b/examples/mnist-pytorch-DPSGD/client/fedn.yaml @@ -1,4 +1,6 @@ -python_env: python_env.yaml +# Remove the python_env tag below to handle the environment manually +python_env: python_env.yaml + entry_points: build: command: python model.py diff --git a/examples/mnist-pytorch-DPSGD/client/python_env.yaml b/examples/mnist-pytorch-DPSGD/client/python_env.yaml index 13d586102..526022145 100644 --- a/examples/mnist-pytorch-DPSGD/client/python_env.yaml +++ b/examples/mnist-pytorch-DPSGD/client/python_env.yaml @@ -10,7 +10,6 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") - opacus diff --git a/examples/mnist-pytorch-DPSGD/client/train.py b/examples/mnist-pytorch-DPSGD/client/train.py index 1210b7ad5..d4527ec1f 100644 --- a/examples/mnist-pytorch-DPSGD/client/train.py +++ b/examples/mnist-pytorch-DPSGD/client/train.py @@ -1,19 +1,22 @@ import os import sys +import numpy as np import torch +import yaml +from data import load_data from model import load_parameters, save_parameters +from opacus import PrivacyEngine +from opacus.utils.batch_memory_manager import BatchMemoryManager -from data import load_data from fedn.utils.helpers.helpers import save_metadata -from opacus import PrivacyEngine -from torch.utils.data import Dataset +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.abspath(dir_path)) + -import numpy as np -from opacus.utils.batch_memory_manager import BatchMemoryManager # Define a custom Dataset class -class CustomDataset(Dataset): +class CustomDataset(torch.utils.data.Dataset): def __init__(self, x_data, y_data): self.x_data = x_data self.y_data = y_data @@ -27,20 +30,16 @@ def __getitem__(self, idx): return x_data, y_data -dir_path = os.path.dirname(os.path.realpath(__file__)) -sys.path.append(os.path.abspath(dir_path)) - -MAX_GRAD_NORM = 1.2 -FINAL_EPSILON = 8.0 -GLOBAL_ROUNDS = 4 -EPOCHS = 5 -EPSILON = FINAL_EPSILON/GLOBAL_ROUNDS +MAX_PHYSICAL_BATCH_SIZE = 32 +EPOCHS = 1 +EPSILON = 1000.0 DELTA = 1e-5 -HARDLIMIT = False +MAX_GRAD_NORM = 1.2 +GLOBAL_ROUNDS = 10 +HARDLIMIT = True -MAX_PHYSICAL_BATCH_SIZE = 32 -def train(in_model_path, out_model_path, data_path=None, batch_size=32, epochs=1, lr=0.01): +def train(in_model_path, out_model_path, data_path=None, batch_size=32, lr=0.01): """Complete a model update. Load model paramters from in_model_path (managed by the FEDn client), @@ -60,68 +59,68 @@ def train(in_model_path, out_model_path, data_path=None, batch_size=32, epochs=1 :param lr: The learning rate to use. :type lr: float """ + with open("../../client_settings.yaml", "r") as fh: + try: + settings = yaml.safe_load(fh) + EPSILON = float(settings["epsilon"]) + DELTA = float(settings["delta"]) + MAX_GRAD_NORM = float(settings["max_grad_norm"]) + GLOBAL_ROUNDS = int(settings["global_rounds"]) + HARDLIMIT = bool(settings["hardlimit"]) + global MAX_PHYSICAL_BATCH_SIZE + MAX_PHYSICAL_BATCH_SIZE = int(settings["max_physical_batch_size"]) + except yaml.YAMLError as exc: + print(exc) + # Load data - print("data_path: ", data_path) x_train, y_train = load_data(data_path) - trainset = CustomDataset(x_train, y_train) - batch_size = 32 - train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, - shuffle=True, num_workers=2) # Load parmeters and initialize model model = load_parameters(in_model_path) - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - model = model.to(device) + # Train + optimizer = torch.optim.SGD(model.parameters(), lr=lr) + privacy_engine = PrivacyEngine() - # Load epsilon - if os.path.isfile("epsilon.npy"): - - tot_epsilon = np.load("epsilon.npy") - print("load consumed epsilon: ", tot_epsilon) + if os.path.isfile("privacy_accountant.state"): + privacy_engine.accountant = torch.load("privacy_accountant.state") - else: + trainset = CustomDataset(x_train, y_train) + train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, num_workers=2) - print("initiate tot_epsilon") - tot_epsilon = 0. + try: + epsilon_spent = privacy_engine.get_epsilon(DELTA) + except ValueError: + epsilon_spent = 0 + print("epsilon before training: ", epsilon_spent) - # Train - optimizer = torch.optim.SGD(model.parameters(), lr=lr) - privacy_engine = PrivacyEngine() + round_epsilon = np.sqrt((epsilon_spent / EPSILON * np.sqrt(GLOBAL_ROUNDS)) ** 2 + 1) * EPSILON / np.sqrt(GLOBAL_ROUNDS) + print("target epsilon: ", round_epsilon) model, optimizer, train_loader = privacy_engine.make_private_with_epsilon( module=model, optimizer=optimizer, data_loader=train_loader, epochs=EPOCHS, - target_epsilon=EPSILON, + target_epsilon=round_epsilon, target_delta=DELTA, max_grad_norm=MAX_GRAD_NORM, ) - print(f"Using sigma={optimizer.noise_multiplier} and C={MAX_GRAD_NORM}") - - - - for epoch in range(EPOCHS): - train_dp(model, train_loader, optimizer, epoch + 1, device, privacy_engine) - - d_epsilon = privacy_engine.get_epsilon(DELTA) - print("epsilon spent: ", d_epsilon) - tot_epsilon = np.sqrt(tot_epsilon**2 + d_epsilon**2) - print("saving tot_epsilon: ", tot_epsilon) - np.save("epsilon.npy", tot_epsilon) - - if HARDLIMIT and tot_epsilon >= FINAL_EPSILON: - print("DP Budget Exceeded: The differential privacy budget has been exhausted, no model updates will be applied to preserve privacy guarantees.") + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + train_dp(model, train_loader, optimizer, EPOCHS, device, privacy_engine) + try: + print("epsilon after training: ", privacy_engine.get_epsilon(DELTA)) + except ValueError: + print("cant calculate epsilon") - else: + if HARDLIMIT and privacy_engine.get_epsilon(DELTA) < EPSILON: # Metadata needed for aggregation server side metadata = { # num_examples are mandatory "num_examples": len(x_train), "batch_size": batch_size, - "epochs": epochs, + "epochs": EPOCHS, "lr": lr, } @@ -130,27 +129,17 @@ def train(in_model_path, out_model_path, data_path=None, batch_size=32, epochs=1 # Save model update (mandatory) save_parameters(model, out_model_path) + else: + print("Epsilon too high, not saving model") -def accuracy(preds, labels): - return (preds == labels).mean() - - - + # Save privacy accountant + torch.save(privacy_engine.accountant, "privacy_accountant.state") def train_dp(model, train_loader, optimizer, epoch, device, privacy_engine): model.train() criterion = torch.nn.NLLLoss() # nn.CrossEntropyLoss() - - losses = [] - top1_acc = [] - - with BatchMemoryManager( - data_loader=train_loader, - max_physical_batch_size=MAX_PHYSICAL_BATCH_SIZE, - optimizer=optimizer - ) as memory_safe_data_loader: - + with BatchMemoryManager(data_loader=train_loader, max_physical_batch_size=MAX_PHYSICAL_BATCH_SIZE, optimizer=optimizer) as memory_safe_data_loader: for i, (images, target) in enumerate(memory_safe_data_loader): optimizer.zero_grad() images = images.to(device) @@ -159,27 +148,9 @@ def train_dp(model, train_loader, optimizer, epoch, device, privacy_engine): # compute output output = model(images) loss = criterion(output, target) - - preds = np.argmax(output.detach().cpu().numpy(), axis=1) - labels = target.detach().cpu().numpy() - - # measure accuracy and record loss - acc = accuracy(preds, labels) - - losses.append(loss.item()) - top1_acc.append(acc) - loss.backward() optimizer.step() - if (i + 1) % 200 == 0: - epsilon = privacy_engine.get_epsilon(DELTA) - print( - f"\tTrain Epoch: {epoch} \t" - f"Loss: {np.mean(losses):.6f} " - f"Acc@1: {np.mean(top1_acc) * 100:.6f} " - f"(ε = {epsilon:.2f}, δ = {DELTA})" - ) if __name__ == "__main__": train(sys.argv[1], sys.argv[2]) diff --git a/examples/mnist-pytorch-DPSGD/client_settings.yaml b/examples/mnist-pytorch-DPSGD/client_settings.yaml new file mode 100644 index 000000000..cb5e7cc95 --- /dev/null +++ b/examples/mnist-pytorch-DPSGD/client_settings.yaml @@ -0,0 +1,8 @@ +# Constants +max_physical_batch_size: 32 +epochs: 1 +epsilon: 100.0 +delta: 1e-5 +max_grad_norm: 1.2 +global_rounds: 10 +hardlimit: true diff --git a/examples/mnist-pytorch/README.rst b/examples/mnist-pytorch/README.rst index 990b902b2..1c0afc5d0 100644 --- a/examples/mnist-pytorch/README.rst +++ b/examples/mnist-pytorch/README.rst @@ -9,7 +9,7 @@ The example is intented as a minimalistic quickstart to learn how to use FEDn. Prerequisites ------------- -- `Python >=3.8, <=3.12 `__ +- `Python >=3.9, <=3.12 `__ - `A project in FEDn Studio `__ Creating the compute package and seed model diff --git a/examples/mnist-pytorch/client/python_env.yaml b/examples/mnist-pytorch/client/python_env.yaml index 272b196ea..7a35ff7a2 100644 --- a/examples/mnist-pytorch/client/python_env.yaml +++ b/examples/mnist-pytorch/client/python_env.yaml @@ -10,6 +10,5 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64" and python_version >= "3.9") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux" and python_version >= "3.9") - - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64" and python_version >= "3.9") - - numpy==1.24.4; python_version == "3.8" + - numpy==2.0.2; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") + - numpy==1.26.4; (sys_platform == "darwin" and platform_machine == "x86_64") diff --git a/examples/monai-2D-mednist/README.rst b/examples/monai-2D-mednist/README.rst index f61820682..00eca5321 100644 --- a/examples/monai-2D-mednist/README.rst +++ b/examples/monai-2D-mednist/README.rst @@ -16,7 +16,7 @@ Prerequisites Using FEDn Studio: -- `Python 3.8, 3.9, 3.10 or 3.11 `__ +- `Python 3.9, 3.10 or 3.11 `__ - `A FEDn Studio account `__ diff --git a/examples/monai-2D-mednist/client/python_env.yaml b/examples/monai-2D-mednist/client/python_env.yaml index 389b3a42a..546f1ffbe 100644 --- a/examples/monai-2D-mednist/client/python_env.yaml +++ b/examples/monai-2D-mednist/client/python_env.yaml @@ -10,7 +10,6 @@ dependencies: - torch==2.2.2; sys_platform == "darwin" and platform_machine == "x86_64" - torchvision==0.19.1; (sys_platform == "darwin" and platform_machine == "arm64") or (sys_platform == "win32" or sys_platform == "win64" or sys_platform == "linux") - torchvision==0.17.2; sys_platform == "darwin" and platform_machine == "x86_64" - - numpy==1.26.4; python_version >= "3.9" - - numpy==1.24.4; python_version == "3.8" + - numpy==1.26.4 - monai-weekly[pillow, tqdm] - scikit-learn diff --git a/examples/monai-2D-mednist/client/validate.py b/examples/monai-2D-mednist/client/validate.py index ff4eb9263..f4b8cfd33 100644 --- a/examples/monai-2D-mednist/client/validate.py +++ b/examples/monai-2D-mednist/client/validate.py @@ -55,7 +55,7 @@ def validate(in_model_path, out_json_path, data_path=None, client_settings_path= image_list = clients["client " + str(split_index)]["validation"] - val_ds = MedNISTDataset(data_path=data_path+"/MedNIST/", transforms=val_transforms, image_files=image_list) + val_ds = MedNISTDataset(data_path=data_path + "/MedNIST/", transforms=val_transforms, image_files=image_list) val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers) @@ -86,8 +86,9 @@ def validate(in_model_path, out_json_path, data_path=None, client_settings_path= # JSON schema report.update({"test_accuracy": accuracy_score(y_true, y_pred), "test_f1_score": f1_score(y_true, y_pred, average="macro")}) - for r in report: - print(r, ": ", report[r]) + + for key, value in report.items(): + print(f"{key}: {value}") # Save JSON save_metrics(report, out_json_path) diff --git a/fedn/cli/__init__.py b/fedn/cli/__init__.py index 7028dbfa6..be680eb23 100644 --- a/fedn/cli/__init__.py +++ b/fedn/cli/__init__.py @@ -11,3 +11,4 @@ from .status_cmd import status_cmd # noqa: F401 from .validation_cmd import validation_cmd # noqa: F401 from .controller_cmd import controller_cmd # noqa: F401 +from .login_cmd import login_cmd # noqa: F401 diff --git a/fedn/cli/client_cmd.py b/fedn/cli/client_cmd.py index 666bc6545..7c9ffc1e7 100644 --- a/fedn/cli/client_cmd.py +++ b/fedn/cli/client_cmd.py @@ -60,12 +60,42 @@ def list_clients(ctx, protocol: str, host: str, port: str, token: str = None, n_ if _token: headers["Authorization"] = _token - click.echo(f"\nListing clients: {url}\n") - click.echo(f"Headers: {headers}") try: response = requests.get(url, headers=headers) - print_response(response, "clients") + print_response(response, "clients", None) + except requests.exceptions.ConnectionError: + click.echo(f"Error: Could not connect to {url}") + +@click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") +@click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") +@click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-id", "--id", required=True, help="Client ID") +@client_cmd.command("get") +@click.pass_context +def get_client(ctx, protocol: str, host: str, port: str, token: str = None, id: str = None): + """Return: + ------ + - result: client with given id + + """ + url = get_api_url(protocol=protocol, host=host, port=port, endpoint="clients") + headers = {} + + + _token = get_token(token) + + if _token: + headers["Authorization"] = _token + + if id: + url = f"{url}{id}" + + + try: + response = requests.get(url, headers=headers) + print_response(response, "client", id) except requests.exceptions.ConnectionError: click.echo(f"Error: Could not connect to {url}") diff --git a/fedn/cli/combiner_cmd.py b/fedn/cli/combiner_cmd.py index 3e7753e80..0a6403587 100644 --- a/fedn/cli/combiner_cmd.py +++ b/fedn/cli/combiner_cmd.py @@ -88,11 +88,39 @@ def list_combiners(ctx, protocol: str, host: str, port: str, token: str = None, if _token: headers["Authorization"] = _token - click.echo(f"\nListing combiners: {url}\n") - click.echo(f"Headers: {headers}") + try: + response = requests.get(url, headers=headers) + print_response(response, "combiners", None) + except requests.exceptions.ConnectionError: + click.echo(f"Error: Could not connect to {url}") + + +@click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") +@click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") +@click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-id", "--id", required=True, help="Combiner ID") +@combiner_cmd.command("get") +@click.pass_context +def get_combiner(ctx, protocol: str, host: str, port: str, token: str = None, id: str = None): + """Return: + ------ + - result: combiner with given id + + """ + url = get_api_url(protocol=protocol, host=host, port=port, endpoint="combiners") + headers = {} + + _token = get_token(token) + + if _token: + headers["Authorization"] = _token + + if id: + url = f"{url}{id}" try: response = requests.get(url, headers=headers) - print_response(response, "combiners") + print_response(response, "combiner", id) except requests.exceptions.ConnectionError: click.echo(f"Error: Could not connect to {url}") diff --git a/fedn/cli/login_cmd.py b/fedn/cli/login_cmd.py new file mode 100644 index 000000000..d2ce8ac90 --- /dev/null +++ b/fedn/cli/login_cmd.py @@ -0,0 +1,61 @@ +import os +from getpass import getpass + +import click +import requests +import yaml + +from .main import main + +# Replace this with the platform's actual login endpoint +home_dir = os.path.expanduser("~") + + +@main.group("studio") +@click.pass_context +def login_cmd(ctx): + """:param ctx:""" + pass + + +@login_cmd.command("login") +@click.option("-p", "--protocol", required=False, default="https", help="Communication protocol") +@click.option("-H", "--host", required=False, default="fedn.scaleoutsystems.com", help="Hostname of controller (api)") +@click.pass_context +def login_cmd(ctx, protocol: str, host: str): + """Logging into FEDn Studio""" + # Step 1: Display welcome message + click.secho("Welcome to Scaleout FEDn!", fg="green") + + url = f"{protocol}://{host}/api/token/" + + # Step 3: Prompt for username and password + username = input("Please enter your username: ") + password = getpass("Please enter your password: ") + + # Call the authentication API + try: + response = requests.post(url, json={"username": username, "password": password}, headers={"Content-Type": "application/json"}) + response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx + except requests.exceptions.RequestException as e: + click.secho("Error connecting to the platform. Please try again.", fg="red") + click.secho(str(e), fg="red") + return + + # Handle the response + if response.status_code == 200: + data = response.json() + if data.get("access"): + click.secho("Login successful!", fg="green") + context_path = os.path.join(home_dir, ".fedn") + if not os.path.exists(context_path): + os.makedirs(context_path) + try: + with open(f"{context_path}/context.yaml", "w") as yaml_file: + yaml.dump(data, yaml_file, default_flow_style=False) # Add access and refresh tokens to context yaml file + except Exception as e: + print(f"Error: Failed to write to YAML file. Details: {e}") + else: + click.secho("Login failed. Please check your credentials.", fg="red") + else: + click.secho(f"Unexpected error: {response.text}", fg="red") diff --git a/fedn/cli/model_cmd.py b/fedn/cli/model_cmd.py index 80a8f795e..2e522e5a1 100644 --- a/fedn/cli/model_cmd.py +++ b/fedn/cli/model_cmd.py @@ -17,10 +17,11 @@ def model_cmd(ctx): @click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") @click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") @click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-session_id", "--session_id", required=False, help="models in session with given session id") @click.option("--n_max", required=False, help="Number of items to list") @model_cmd.command("list") @click.pass_context -def list_models(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): +def list_models(ctx, protocol: str, host: str, port: str, token: str = None, session_id: str = None, n_max: int = None): """Return: ------ - count: number of models @@ -28,6 +29,8 @@ def list_models(ctx, protocol: str, host: str, port: str, token: str = None, n_m """ url = get_api_url(protocol=protocol, host=host, port=port, endpoint="models") + + headers = {} if n_max: @@ -38,11 +41,47 @@ def list_models(ctx, protocol: str, host: str, port: str, token: str = None, n_m if _token: headers["Authorization"] = _token - click.echo(f"\nListing models: {url}\n") - click.echo(f"Headers: {headers}") + if session_id: + url = f"{url}?session_id={session_id}" + + + try: + response = requests.get(url, headers=headers) + print_response(response, "models", None) + except requests.exceptions.ConnectionError: + click.echo(f"Error: Could not connect to {url}") + + +@click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") +@click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") +@click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-id", "--id", required=True, help="Model ID") +@model_cmd.command("get") +@click.pass_context +def get_model(ctx, protocol: str, host: str, port: str, token: str = None, id: str = None): + """Return: + ------ + - result: model with given id + + """ + url = get_api_url(protocol=protocol, host=host, port=port, endpoint="models") + + + headers = {} + + + _token = get_token(token) + + if _token: + headers["Authorization"] = _token + + if id: + url = f"{url}{id}" + try: response = requests.get(url, headers=headers) - print_response(response, "models") + print_response(response, "model", id) except requests.exceptions.ConnectionError: click.echo(f"Error: Could not connect to {url}") diff --git a/fedn/cli/package_cmd.py b/fedn/cli/package_cmd.py index 3c78d9944..b8a130f68 100644 --- a/fedn/cli/package_cmd.py +++ b/fedn/cli/package_cmd.py @@ -66,11 +66,42 @@ def list_packages(ctx, protocol: str, host: str, port: str, token: str = None, n if _token: headers["Authorization"] = _token - click.echo(f"\nListing packages: {url}\n") - click.echo(f"Headers: {headers}") try: response = requests.get(url, headers=headers) - print_response(response, "packages") + print_response(response, "packages", None) + except requests.exceptions.ConnectionError: + click.echo(f"Error: Could not connect to {url}") + + +@click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") +@click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") +@click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-id", "--id", required=True, help="Package ID") +@package_cmd.command("get") +@click.pass_context +def get_package(ctx, protocol: str, host: str, port: str, token: str = None, id: str = None): + """Return: + ------ + - result: package with given id + + """ + url = get_api_url(protocol=protocol, host=host, port=port, endpoint="packages") + headers = {} + + + _token = get_token(token) + + if _token: + headers["Authorization"] = _token + + if id: + url = f"{url}{id}" + + + try: + response = requests.get(url, headers=headers) + print_response(response, "package", id) except requests.exceptions.ConnectionError: click.echo(f"Error: Could not connect to {url}") diff --git a/fedn/cli/round_cmd.py b/fedn/cli/round_cmd.py index ac42f43ef..2f889fef3 100644 --- a/fedn/cli/round_cmd.py +++ b/fedn/cli/round_cmd.py @@ -16,11 +16,12 @@ def round_cmd(ctx): @click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") @click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") @click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-session_id", "--session_id", required=False, help="Rounds in session with given session id") @click.option("-t", "--token", required=False, help="Authentication token") @click.option("--n_max", required=False, help="Number of items to list") @round_cmd.command("list") @click.pass_context -def list_rounds(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): +def list_rounds(ctx, protocol: str, host: str, port: str, token: str = None, session_id: str = None, n_max: int = None): """Return: ------ - count: number of rounds @@ -28,6 +29,7 @@ def list_rounds(ctx, protocol: str, host: str, port: str, token: str = None, n_m """ url = get_api_url(protocol=protocol, host=host, port=port, endpoint="rounds") + headers = {} if n_max: @@ -38,11 +40,48 @@ def list_rounds(ctx, protocol: str, host: str, port: str, token: str = None, n_m if _token: headers["Authorization"] = _token - click.echo(f"\nListing rounds: {url}\n") - click.echo(f"Headers: {headers}") + if session_id: + url = f"{url}?round_config.session_id={session_id}" + try: response = requests.get(url, headers=headers) - print_response(response, "rounds") + print_response(response, "rounds", None) + + except requests.exceptions.ConnectionError: + click.echo(f"Error: Could not connect to {url}") + + +@click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") +@click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") +@click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-id", "--id", required=True, help="Round ID") +@click.option("-t", "--token", required=False, help="Authentication token") +@round_cmd.command("get") +@click.pass_context +def get_round(ctx, protocol: str, host: str, port: str, token: str = None, id: str = None): + """Return: + ------ + - result: round with given id + + """ + url = get_api_url(protocol=protocol, host=host, port=port, endpoint="rounds") + + headers = {} + + + _token = get_token(token) + + if _token: + headers["Authorization"] = _token + + if id: + url = f"{url}{id}" + + + try: + response = requests.get(url, headers=headers) + print_response(response, "round", id) + except requests.exceptions.ConnectionError: click.echo(f"Error: Could not connect to {url}") diff --git a/fedn/cli/session_cmd.py b/fedn/cli/session_cmd.py index 65db98c69..a0f1e64c3 100644 --- a/fedn/cli/session_cmd.py +++ b/fedn/cli/session_cmd.py @@ -38,11 +38,41 @@ def list_sessions(ctx, protocol: str, host: str, port: str, token: str = None, n if _token: headers["Authorization"] = _token - click.echo(f"\nListing sessions: {url}\n") - click.echo(f"Headers: {headers}") try: response = requests.get(url, headers=headers) - print_response(response, "sessions") + print_response(response, "sessions", None) + except requests.exceptions.ConnectionError: + click.echo(f"Error: Could not connect to {url}") + + +@click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") +@click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") +@click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-id", "--id", required=True, help="Session ID") +@session_cmd.command("get") +@click.pass_context +def get_session(ctx, protocol: str, host: str, port: str, token: str = None, id: str = None): + """Return: + ------ + - result: session with given session id + + """ + url = get_api_url(protocol=protocol, host=host, port=port, endpoint="sessions") + headers = {} + + _token = get_token(token) + + if _token: + headers["Authorization"] = _token + + if id: + url = f"{url}{id}" + + + try: + response = requests.get(url, headers=headers) + print_response(response, "session", id) except requests.exceptions.ConnectionError: click.echo(f"Error: Could not connect to {url}") diff --git a/fedn/cli/shared.py b/fedn/cli/shared.py index d32f4ff43..21fa2b072 100644 --- a/fedn/cli/shared.py +++ b/fedn/cli/shared.py @@ -64,7 +64,7 @@ def get_client_package_dir(path: str) -> str: # Print response from api (list of entities) -def print_response(response, entity_name: str): +def print_response(response, entity_name: str, so): """Prints the api response to the cli. :param response: type: array @@ -72,18 +72,28 @@ def print_response(response, entity_name: str): :param entity_name: type: string description: name of entity + :param so: + type: boolean + desriptions: single output format return: None """ if response.status_code == 200: json_data = response.json() - count, result = json_data.values() - click.echo(f"Found {count} {entity_name}") - click.echo("\n---------------------------------\n") - for obj in result: - click.echo("{") - for k, v in obj.items(): + if so: + click.echo(f"Found {entity_name}") + click.echo("\n---------------------------------\n") + for k, v in json_data.items(): click.echo(f"\t{k}: {v}") - click.echo("}") + else: + count, result = json_data.values() + click.echo(f"Found {count} {entity_name}") + click.echo("\n---------------------------------\n") + for obj in result: + print(obj.get("session_id")) + click.echo("{") + for k, v in obj.items(): + click.echo(f"\t{k}: {v}") + click.echo("}") elif response.status_code == 500: json_data = response.json() click.echo(f'Error: {json_data["message"]}') diff --git a/fedn/cli/status_cmd.py b/fedn/cli/status_cmd.py index c879ca1ef..9b751f65b 100644 --- a/fedn/cli/status_cmd.py +++ b/fedn/cli/status_cmd.py @@ -16,10 +16,11 @@ def status_cmd(ctx): @click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") @click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") @click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-session_id", "--session_id", required=False, help="statuses with given session id") @click.option("--n_max", required=False, help="Number of items to list") @status_cmd.command("list") @click.pass_context -def list_statuses(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): +def list_statuses(ctx, protocol: str, host: str, port: str, token: str = None, session_id: str = None, n_max: int = None): """Return: ------ - count: number of statuses @@ -37,11 +38,45 @@ def list_statuses(ctx, protocol: str, host: str, port: str, token: str = None, n if _token: headers["Authorization"] = _token - click.echo(f"\nListing statuses: {url}\n") - click.echo(f"Headers: {headers}") + if session_id: + url = f"{url}?sessionId={session_id}" + + + try: + response = requests.get(url, headers=headers) + print_response(response, "statuses", None) + except requests.exceptions.ConnectionError: + click.echo(f"Error: Could not connect to {url}") + + +@click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") +@click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") +@click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-id", "--id", required=True, help="Status ID") +@status_cmd.command("get") +@click.pass_context +def get_status(ctx, protocol: str, host: str, port: str, token: str = None, id: str = None): + """Return: + ------ + - result: status with given id + + """ + url = get_api_url(protocol=protocol, host=host, port=port, endpoint="statuses") + headers = {} + + + _token = get_token(token) + + if _token: + headers["Authorization"] = _token + + if id: + url = f"{url}{id}" + try: response = requests.get(url, headers=headers) - print_response(response, "statuses") + print_response(response, "status", id) except requests.exceptions.ConnectionError: click.echo(f"Error: Could not connect to {url}") diff --git a/fedn/cli/validation_cmd.py b/fedn/cli/validation_cmd.py index 4bf4e63fa..b7417af5e 100644 --- a/fedn/cli/validation_cmd.py +++ b/fedn/cli/validation_cmd.py @@ -17,10 +17,11 @@ def validation_cmd(ctx): @click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") @click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") @click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-session_id", "--session_id", required=False, help="validations in session with given session id") @click.option("--n_max", required=False, help="Number of items to list") @validation_cmd.command("list") @click.pass_context -def list_validations(ctx, protocol: str, host: str, port: str, token: str = None, n_max: int = None): +def list_validations(ctx, protocol: str, host: str, port: str, token: str = None, session_id: str = None, n_max: int = None): """Return: ------ - count: number of validations @@ -38,11 +39,44 @@ def list_validations(ctx, protocol: str, host: str, port: str, token: str = None if _token: headers["Authorization"] = _token - click.echo(f"\nListing validations: {url}\n") - click.echo(f"Headers: {headers}") + if session_id: + url = f"{url}?sessionId={session_id}" + + + try: + response = requests.get(url, headers=headers) + print_response(response, "validations", None) + except requests.exceptions.ConnectionError: + click.echo(f"Error: Could not connect to {url}") + + +@click.option("-p", "--protocol", required=False, default=CONTROLLER_DEFAULTS["protocol"], help="Communication protocol of controller (api)") +@click.option("-H", "--host", required=False, default=CONTROLLER_DEFAULTS["host"], help="Hostname of controller (api)") +@click.option("-P", "--port", required=False, default=CONTROLLER_DEFAULTS["port"], help="Port of controller (api)") +@click.option("-t", "--token", required=False, help="Authentication token") +@click.option("-id", "--id", required=True, help="validation ID") +@validation_cmd.command("get") +@click.pass_context +def get_validation(ctx, protocol: str, host: str, port: str, token: str = None, id: str = None): + """Return: + ------ + - result: validation with given id + + """ + url = get_api_url(protocol=protocol, host=host, port=port, endpoint="validations") + headers = {} + + _token = get_token(token) + + if _token: + headers["Authorization"] = _token + + if id: + url = f"{url}{id}" + try: response = requests.get(url, headers=headers) - print_response(response, "validations") + print_response(response, "validation", id) except requests.exceptions.ConnectionError: click.echo(f"Error: Could not connect to {url}") diff --git a/fedn/network/api/client.py b/fedn/network/api/client.py index affb1c387..a10d42c71 100644 --- a/fedn/network/api/client.py +++ b/fedn/network/api/client.py @@ -329,8 +329,18 @@ def set_active_model(self, path): :return: A dict with success or failure message. :rtype: dict """ + if path.endswith(".npz"): + helper = "numpyhelper" + elif path.endswith(".bin"): + helper = "binaryhelper" + + if helper: + response = requests.put(self._get_url_api_v1("helpers/active"), json={"helper": helper}, verify=self.verify, headers=self.headers) + with open(path, "rb") as file: - response = requests.post(self._get_url("set_initial_model"), files={"file": file}, verify=self.verify, headers=self.headers) + response = requests.post( + self._get_url("set_initial_model"), files={"file": file}, data={"helper": helper}, verify=self.verify, headers=self.headers + ) return response.json() # --- Packages --- # @@ -606,27 +616,49 @@ def start_session( :return: A dict with success or failure message and session config. :rtype: dict """ + if model_id is None: + response = requests.get(self._get_url_api_v1("models/active"), verify=self.verify, headers=self.headers) + if response.status_code == 200: + model_id = response.json() + else: + return response.json() + response = requests.post( - self._get_url("start_session"), + self._get_url_api_v1("sessions/"), json={ "session_id": id, - "aggregator": aggregator, - "aggregator_kwargs": aggregator_kwargs, - "model_id": model_id, - "round_timeout": round_timeout, - "rounds": rounds, - "round_buffer_size": round_buffer_size, - "delete_models": delete_models, - "validate": validate, - "helper": helper, - "min_clients": min_clients, - "requested_clients": requested_clients, - "server_functions": None if server_functions is None else inspect.getsource(server_functions), + "session_config": { + "aggregator": aggregator, + "aggregator_kwargs": aggregator_kwargs, + "round_timeout": round_timeout, + "buffer_size": round_buffer_size, + "model_id": model_id, + "delete_models_storage": delete_models, + "clients_required": min_clients, + "requested_clients": requested_clients, + "validate": validate, + "helper_type": helper, + "server_functions": None if server_functions is None else inspect.getsource(server_functions), + }, }, verify=self.verify, headers=self.headers, ) + if response.status_code == 201: + if id is None: + id = response.json()["session_id"] + response = requests.post( + self._get_url_api_v1("sessions/start"), + json={ + "session_id": id, + "rounds": rounds, + "round_timeout": round_timeout, + }, + verify=self.verify, + headers=self.headers, + ) + _json = response.json() return _json diff --git a/fedn/network/api/interface.py b/fedn/network/api/interface.py index 1d947b0be..976e08a10 100644 --- a/fedn/network/api/interface.py +++ b/fedn/network/api/interface.py @@ -277,7 +277,7 @@ def get_compute_package(self): :rtype: :class:`flask.Response` """ result = self.statestore.get_compute_package() - if result is None: + if result is None or "file_name" not in result: return ( jsonify({"success": False, "message": "No compute package found."}), 404, diff --git a/fedn/network/api/v1/session_routes.py b/fedn/network/api/v1/session_routes.py index a045e60bf..9158b47df 100644 --- a/fedn/network/api/v1/session_routes.py +++ b/fedn/network/api/v1/session_routes.py @@ -393,7 +393,7 @@ def start_session(): min_clients = session_config["clients_required"] if control.state() == ReducerState.monitoring: - return jsonify({"message": "A session is already running."}) + return jsonify({"message": "A session is already running!"}), 400 if not rounds or not isinstance(rounds, int): rounds = session_config["rounds"] @@ -410,6 +410,7 @@ def start_session(): except Exception: return jsonify({"message": "An unexpected error occurred"}), 500 + @bp.route("/", methods=["PATCH"]) @jwt_auth_required(role="admin") def patch_session(id: str): diff --git a/fedn/network/clients/client.py b/fedn/network/clients/client.py index 92731ccf3..d935ac588 100644 --- a/fedn/network/clients/client.py +++ b/fedn/network/clients/client.py @@ -330,7 +330,7 @@ def get_model_from_combiner(self, id, timeout=20): time_start = time.time() request = fedn.ModelRequest(id=id) request.sender.name = self.name - request.sender.role = fedn.WORKER + request.sender.role = fedn.CLIENT try: for part in self.modelStub.Download(request, metadata=self.metadata): @@ -388,7 +388,7 @@ def _listen_to_task_stream(self): """ r = fedn.ClientAvailableMessage() r.sender.name = self.name - r.sender.role = fedn.WORKER + r.sender.role = fedn.CLIENT r.sender.client_id = self.id # Add client to metadata self._add_grpc_metadata("client", self.name) @@ -406,7 +406,7 @@ def _listen_to_task_stream(self): # Process training request self.send_status( "Received model update request.", - log_level=fedn.Status.AUDIT, + log_level=fedn.LogLevel.AUDIT, type=fedn.StatusType.MODEL_UPDATE_REQUEST, request=request, sesssion_id=request.session_id, @@ -628,7 +628,7 @@ def process_request(self): update = fedn.ModelUpdate() update.sender.name = self.name update.sender.client_id = self.id - update.sender.role = fedn.WORKER + update.sender.role = fedn.CLIENT update.receiver.name = request.sender.name update.receiver.role = request.sender.role update.model_id = request.model_id @@ -641,7 +641,7 @@ def process_request(self): _ = self.combinerStub.SendModelUpdate(update, metadata=self.metadata) self.send_status( "Model update completed.", - log_level=fedn.Status.AUDIT, + log_level=fedn.LogLevel.AUDIT, type=fedn.StatusType.MODEL_UPDATE, request=update, sesssion_id=request.session_id, @@ -655,7 +655,7 @@ def process_request(self): logger.debug(e) else: self.send_status( - "Client {} failed to complete model update.", log_level=fedn.Status.WARNING, request=request, sesssion_id=request.session_id + "Client {} failed to complete model update.", log_level=fedn.LogLevel.WARNING, request=request, sesssion_id=request.session_id ) self.state = ClientState.idle @@ -669,7 +669,7 @@ def process_request(self): # Send validation validation = fedn.ModelValidation() validation.sender.name = self.name - validation.sender.role = fedn.WORKER + validation.sender.role = fedn.CLIENT validation.receiver.name = request.sender.name validation.receiver.role = request.sender.role validation.model_id = str(request.model_id) @@ -683,7 +683,11 @@ def process_request(self): status_type = fedn.StatusType.MODEL_VALIDATION self.send_status( - "Model validation completed.", log_level=fedn.Status.AUDIT, type=status_type, request=validation, sesssion_id=request.session_id + "Model validation completed.", + log_level=fedn.LogLevel.AUDIT, + type=status_type, + request=validation, + sesssion_id=request.session_id, ) except grpc.RpcError as e: status_code = e.code() @@ -695,7 +699,7 @@ def process_request(self): else: self.send_status( "Client {} failed to complete model validation.".format(self.name), - log_level=fedn.Status.WARNING, + log_level=fedn.LogLevel.WARNING, request=request, sesssion_id=request.session_id, ) @@ -720,7 +724,7 @@ def process_request(self): _ = self._process_prediction_request(request.model_id, request.session_id, presigned_url) prediction = fedn.ModelPrediction() prediction.sender.name = self.name - prediction.sender.role = fedn.WORKER + prediction.sender.role = fedn.CLIENT prediction.receiver.name = request.sender.name prediction.receiver.name = request.sender.name prediction.receiver.role = request.sender.role @@ -736,7 +740,7 @@ def process_request(self): _ = self.combinerStub.SendModelPrediction(prediction, metadata=self.metadata) status_type = fedn.StatusType.MODEL_PREDICTION self.send_status( - "Model prediction completed.", log_level=fedn.Status.AUDIT, type=status_type, request=prediction, sesssion_id=request.session_id + "Model prediction completed.", log_level=fedn.LogLevel.AUDIT, type=status_type, request=prediction, sesssion_id=request.session_id ) except grpc.RpcError as e: status_code = e.code() @@ -758,7 +762,7 @@ def _send_heartbeat(self, update_frequency=2.0): :rtype: None """ while True: - heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.WORKER, client_id=self.id)) + heartbeat = fedn.Heartbeat(sender=fedn.Client(name=self.name, role=fedn.CLIENT, client_id=self.id)) try: self.connectorStub.SendHeartbeat(heartbeat, metadata=self.metadata) if self._missed_heartbeat > 0: @@ -789,13 +793,13 @@ def _send_heartbeat(self, update_frequency=2.0): logger.info("SendStatus: Client disconnected.") return - def send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None): + def send_status(self, msg, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None): """Send status message. :param msg: The message to send. :type msg: str :param log_level: The log level of the message. - :type log_level: fedn.Status.INFO, fedn.Status.WARNING, fedn.Status.ERROR + :type log_level: fedn.LogLevel.INFO, fedn.LogLevel.WARNING, fedn.LogLevel.ERROR :param type: The type of the message. :type type: str :param request: The request message. @@ -808,7 +812,7 @@ def send_status(self, msg, log_level=fedn.Status.INFO, type=None, request=None, status = fedn.Status() status.timestamp.GetCurrentTime() status.sender.name = self.name - status.sender.role = fedn.WORKER + status.sender.role = fedn.CLIENT status.log_level = log_level status.status = str(msg) status.session_id = sesssion_id diff --git a/fedn/network/clients/client_v2.py b/fedn/network/clients/client_v2.py index 7898c152f..f0a50243e 100644 --- a/fedn/network/clients/client_v2.py +++ b/fedn/network/clients/client_v2.py @@ -8,10 +8,9 @@ from fedn.common.config import FEDN_CUSTOM_URL_PREFIX from fedn.common.log_config import logger -from fedn.network.clients.fedn_client import (ConnectToApiResult, FednClient, - GrpcConnectionOptions) +from fedn.network.clients.fedn_client import ConnectToApiResult, FednClient, GrpcConnectionOptions from fedn.network.combiner.modelservice import get_tmp_path -from fedn.utils.helpers.helpers import get_helper +from fedn.utils.helpers.helpers import get_helper, save_metadata def get_url(api_url: str, api_port: int) -> str: @@ -135,8 +134,8 @@ def set_helper(self, response: GrpcConnectionOptions = None): # Priority: helper_type from constructor > helper_type from response > default helper_type self.helper = get_helper(helper_type_to_use) - def on_train(self, in_model): - out_model, meta = self._process_training_request(in_model) + def on_train(self, in_model, client_settings): + out_model, meta = self._process_training_request(in_model, client_settings) return out_model, meta def on_validation(self, in_model): @@ -151,7 +150,7 @@ def on_backward(self, in_gradients, client_id): meta = self._process_backward_request(in_gradients, client_id) return meta - def _process_training_request(self, in_model: BytesIO) -> Tuple[BytesIO, dict]: + def _process_training_request(self, in_model: BytesIO, client_settings: dict) -> Tuple[BytesIO, dict]: """Process a training (model update) request. :param in_model: The model to be updated. @@ -166,6 +165,8 @@ def _process_training_request(self, in_model: BytesIO) -> Tuple[BytesIO, dict]: with open(inpath, "wb") as fh: fh.write(in_model.getbuffer()) + save_metadata(metadata=client_settings, filename=inpath) + outpath = self.helper.get_tmp_path() tic = time.time() diff --git a/fedn/network/clients/fedn_client.py b/fedn/network/clients/fedn_client.py index 37c3f3e64..01b44f027 100644 --- a/fedn/network/clients/fedn_client.py +++ b/fedn/network/clients/fedn_client.py @@ -115,11 +115,9 @@ def connect_to_api(self, url: str, token: str, json: dict) -> Tuple[ConnectToApi elif response.status_code == 404: logger.warning("Connect to FEDn Api - Incorrect URL") return ConnectToApiResult.IncorrectUrl, "Incorrect URL" - except Exception: - pass - - logger.warning("Connect to FEDn Api - Unknown error occurred") - return ConnectToApiResult.UnknownError, "Unknown error occurred" + except Exception as e: + logger.warning(f"Connect to FEDn Api - Error occurred: {str(e)}") + return ConnectToApiResult.UnknownError, str(e) def download_compute_package(self, url: str, token: str, name: str = None) -> bool: """Download compute package from controller @@ -235,11 +233,18 @@ def update_local_model(self, request): logger.error("No train callback set") return - self.send_status(f"\t Starting processing of training request for model_id {model_id}", sesssion_id=request.session_id, sender_name=self.name) + self.send_status( + f"\t Starting processing of training request for model_id {model_id}", + sesssion_id=request.session_id, + sender_name=self.name, + log_level=fedn.LogLevel.INFO, + type=fedn.StatusType.MODEL_UPDATE, + ) logger.info(f"Running train callback with model ID: {model_id}") + client_settings = json.loads(request.data).get("client_settings", {}) tic = time.time() - out_model, meta = self.train_callback(in_model) + out_model, meta = self.train_callback(in_model, client_settings) meta["processing_time"] = time.time() - tic tic = time.time() @@ -255,7 +260,7 @@ def update_local_model(self, request): self.send_status( "Model update completed.", - log_level=fedn.Status.AUDIT, + log_level=fedn.LogLevel.AUDIT, type=fedn.StatusType.MODEL_UPDATE, request=update, sesssion_id=request.session_id, @@ -265,7 +270,13 @@ def update_local_model(self, request): def validate_global_model(self, request): model_id = request.model_id - self.send_status(f"Processing validate request for model_id {model_id}", sesssion_id=request.session_id, sender_name=self.name) + self.send_status( + f"Processing validate request for model_id {model_id}", + sesssion_id=request.session_id, + sender_name=self.name, + log_level=fedn.LogLevel.INFO, + type=fedn.StatusType.MODEL_VALIDATION, + ) in_model = self.get_model_from_combiner(id=model_id, client_id=self.client_id) @@ -289,7 +300,7 @@ def validate_global_model(self, request): if result: self.send_status( "Model validation completed.", - log_level=fedn.Status.AUDIT, + log_level=fedn.LogLevel.AUDIT, type=fedn.StatusType.MODEL_VALIDATION, request=validation, sesssion_id=request.session_id, @@ -298,7 +309,7 @@ def validate_global_model(self, request): else: self.send_status( "Client {} failed to complete model validation.".format(self.name), - log_level=fedn.Status.WARNING, + log_level=fedn.LogLevel.WARNING, request=request, sesssion_id=request.session_id, sender_name=self.name, @@ -467,7 +478,7 @@ def get_model_from_combiner(self, id: str, client_id: str, timeout: int = 20) -> def send_model_to_combiner(self, model: BytesIO, id: str): return self.grpc_handler.send_model_to_combiner(model, id) - def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None): + def send_status(self, msg: str, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None): return self.grpc_handler.send_status(msg, log_level, type, request, sesssion_id, sender_name) def send_model_update(self, update: fedn.ModelUpdate) -> bool: diff --git a/fedn/network/clients/grpc_handler.py b/fedn/network/clients/grpc_handler.py index f6e7b983e..5572e7ad8 100644 --- a/fedn/network/clients/grpc_handler.py +++ b/fedn/network/clients/grpc_handler.py @@ -119,7 +119,7 @@ def heartbeat(self, client_name: str, client_id: str): :return: Response from the combiner. :rtype: fedn.Response """ - heartbeat = fedn.Heartbeat(sender=fedn.Client(name=client_name, role=fedn.WORKER, client_id=client_id)) + heartbeat = fedn.Heartbeat(sender=fedn.Client(name=client_name, role=fedn.CLIENT, client_id=client_id)) try: logger.info("Sending heartbeat to combiner") @@ -156,7 +156,7 @@ def listen_to_task_stream(self, client_name: str, client_id: str, callback: Call """ r = fedn.ClientAvailableMessage() r.sender.name = client_name - r.sender.role = fedn.WORKER + r.sender.role = fedn.CLIENT r.sender.client_id = client_id try: @@ -164,9 +164,9 @@ def listen_to_task_stream(self, client_name: str, client_id: str, callback: Call for request in self.combinerStub.TaskStream(r, metadata=self.metadata): if request.sender.role == fedn.COMBINER: self.send_status( - "Received model update request.", - log_level=fedn.Status.AUDIT, - type=fedn.StatusType.MODEL_UPDATE_REQUEST, + "Received request from combiner.", + log_level=fedn.LogLevel.AUDIT, + type=request.type, request=request, sesssion_id=request.session_id, sender_name=client_name, @@ -183,13 +183,13 @@ def listen_to_task_stream(self, client_name: str, client_id: str, callback: Call logger.error(f"GRPC (TaskStream): An error occurred: {e}") self._handle_unknown_error(e, "TaskStream", lambda: self.listen_to_task_stream(client_name, client_id, callback)) - def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None): + def send_status(self, msg: str, log_level=fedn.LogLevel.INFO, type=None, request=None, sesssion_id: str = None, sender_name: str = None): """Send status message. :param msg: The message to send. :type msg: str :param log_level: The log level of the message. - :type log_level: fedn.Status.INFO, fedn.Status.WARNING, fedn.Status.ERROR + :type log_level: fedn.LogLevel.INFO, fedn.LogLevel.WARNING, fedn.LogLevel.ERROR :param type: The type of the message. :type type: str :param request: The request message. @@ -198,7 +198,7 @@ def send_status(self, msg: str, log_level=fedn.Status.INFO, type=None, request=N status = fedn.Status() status.timestamp.GetCurrentTime() status.sender.name = sender_name - status.sender.role = fedn.WORKER + status.sender.role = fedn.CLIENT status.log_level = log_level status.status = str(msg) status.session_id = sesssion_id @@ -231,7 +231,7 @@ def get_model_from_combiner(self, id: str, client_id: str, timeout: int = 20) -> time_start = time.time() request = fedn.ModelRequest(id=id) request.sender.client_id = client_id - request.sender.role = fedn.WORKER + request.sender.role = fedn.CLIENT try: logger.info("Downloading model from combiner.") @@ -298,7 +298,7 @@ def create_update_message( ): update = fedn.ModelUpdate() update.sender.name = sender_name - update.sender.role = fedn.WORKER + update.sender.role = fedn.CLIENT update.sender.client_id = self.metadata[0][1] update.receiver.name = receiver_name update.receiver.role = receiver_role @@ -321,7 +321,7 @@ def create_validation_message( ): validation = fedn.ModelValidation() validation.sender.name = sender_name - validation.sender.role = fedn.WORKER + validation.sender.role = fedn.CLIENT validation.receiver.name = receiver_name validation.receiver.role = receiver_role validation.model_id = model_id @@ -344,7 +344,7 @@ def create_prediction_message( ): prediction = fedn.ModelPrediction() prediction.sender.name = sender_name - prediction.sender.role = fedn.WORKER + prediction.sender.role = fedn.CLIENT prediction.receiver.name = receiver_name prediction.receiver.role = receiver_role prediction.model_id = model_id diff --git a/fedn/network/combiner/combiner.py b/fedn/network/combiner/combiner.py index 9d4750c20..2e911e841 100644 --- a/fedn/network/combiner/combiner.py +++ b/fedn/network/combiner/combiner.py @@ -47,7 +47,7 @@ def role_to_proto_role(role): if role == Role.COMBINER: return fedn.COMBINER if role == Role.WORKER: - return fedn.WORKER + return fedn.CLIENT if role == Role.REDUCER: return fedn.REDUCER if role == Role.OTHER: @@ -304,7 +304,7 @@ def _send_request_type(self, request_type, session_id, model_id=None, config=Non request.sender.name = self.id request.sender.role = fedn.COMBINER request.receiver.client_id = client - request.receiver.role = fedn.WORKER + request.receiver.role = fedn.CLIENT # Set the request data, not used in validation if request_type == fedn.StatusType.MODEL_PREDICTION: presigned_url = self.repository.presigned_put_url(self.repository.prediction_bucket, f"{client}/{session_id}") @@ -643,7 +643,7 @@ def ListActiveClients(self, request: fedn.ListClientsRequest, context): logger.info("grpc.Combiner.ListActiveClients: Number active clients: {}".format(nr_active_clients)) for client in active_clients: - clients.client.append(fedn.Client(name=client, role=fedn.WORKER)) + clients.client.append(fedn.Client(name=client, role=fedn.CLIENT)) return clients def AcceptingClients(self, request: fedn.ConnectionRequest, context): @@ -715,9 +715,8 @@ def TaskStream(self, response, context): metadata = dict(metadata) logger.info("grpc.Combiner.TaskStream: Client connected: {}\n".format(metadata["client"])) - status = fedn.Status(status="Client {} connecting to TaskStream.".format(client.name)) + status = fedn.Status(status="Client {} connecting to TaskStream.".format(client.name), log_level=fedn.LogLevel.INFO, type=fedn.StatusType.NETWORK) logger.info("Client {} connecting to TaskStream.".format(client.name)) - status.log_level = fedn.Status.INFO status.timestamp.GetCurrentTime() self.__whoami(status.sender, self) @@ -771,8 +770,10 @@ def TaskStream(self, response, context): logger.error("Error in ModelUpdateRequestStream: {}".format(e)) logger.warning("Client {} disconnected from TaskStream".format(client.name)) status = fedn.Status(status="Client {} disconnected from TaskStream.".format(client.name)) - status.log_level = fedn.Status.INFO + status.log_level = fedn.LogLevel.INFO + status.type = fedn.StatusType.NETWORK status.timestamp.GetCurrentTime() + self.__whoami(status.sender, self) self._send_status(status) def SendModelUpdate(self, request, context): diff --git a/fedn/network/combiner/interfaces.py b/fedn/network/combiner/interfaces.py index 426e7d9f6..32dbdb0f7 100644 --- a/fedn/network/combiner/interfaces.py +++ b/fedn/network/combiner/interfaces.py @@ -263,7 +263,7 @@ def get_model(self, id, timeout=10): request = fedn.ModelRequest(id=id) request.sender.name = self.name - request.sender.role = fedn.WORKER + request.sender.role = fedn.CLIENT parts = modelservice.Download(request) for part in parts: diff --git a/fedn/network/controller/controlbase.py b/fedn/network/controller/controlbase.py index 297efd426..397a117bb 100644 --- a/fedn/network/controller/controlbase.py +++ b/fedn/network/controller/controlbase.py @@ -261,8 +261,8 @@ def commit(self, model_id, model=None, session_id=None): """ helper = self.get_helper() if model is not None: - logger.info("Saving model file temporarily to disk...") outfile_name = helper.save(model) + logger.info("Saving model file temporarily to {}".format(outfile_name)) logger.info("CONTROL: Uploading model to Minio...") model_id = self.model_repository.set_model(outfile_name, is_file=True) diff --git a/fedn/network/grpc/fedn.proto b/fedn/network/grpc/fedn.proto index ecedac65a..262f0a39a 100644 --- a/fedn/network/grpc/fedn.proto +++ b/fedn/network/grpc/fedn.proto @@ -22,18 +22,18 @@ enum StatusType { BACKWARD = 9; } +enum LogLevel { + NONE = 0; + INFO = 1; + DEBUG = 2; + WARNING = 3; + ERROR = 4; + AUDIT = 5; + } + message Status { Client sender = 1; string status = 2; - - enum LogLevel { - INFO = 0; - DEBUG = 1; - WARNING = 2; - ERROR = 3; - AUDIT = 4; - } - LogLevel log_level = 3; string data = 4; string correlation_id = 5; @@ -165,10 +165,10 @@ message ClientList { } enum Role { - WORKER = 0; - COMBINER = 1; - REDUCER = 2; - OTHER = 3; + OTHER = 0; + CLIENT = 1; + COMBINER = 2; + REDUCER = 3; } message Client { diff --git a/fedn/utils/dist.py b/fedn/utils/dist.py index e5fa7192b..82812dfc3 100644 --- a/fedn/utils/dist.py +++ b/fedn/utils/dist.py @@ -3,7 +3,7 @@ import fedn -def get_version(pacakge): +def get_version(package): # Dynamically get the version of the package try: version = importlib.metadata.version("fedn") diff --git a/fedn/utils/process.py b/fedn/utils/process.py index c2574a760..b08af99b6 100644 --- a/fedn/utils/process.py +++ b/fedn/utils/process.py @@ -101,9 +101,6 @@ def _exec_cmd( env = env if extra_env is None else {**os.environ, **extra_env} - # In Python < 3.8, `subprocess.Popen` doesn't accept a command containing path-like - # objects (e.g. `["ls", pathlib.Path("abc")]`) on Windows. To avoid this issue, - # stringify all elements in `cmd`. Note `str(pathlib.Path("abc"))` returns 'abc'. if isinstance(cmd, list): cmd = list(map(str, cmd)) diff --git a/pyproject.toml b/pyproject.toml index beb511290..d679541cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "fedn" -version = "0.19.0" +version = "0.20.0" description = "Scaleout Federated Learning" authors = [{ name = "Scaleout Systems AB", email = "contact@scaleoutsystems.com" }] readme = "README.rst" @@ -20,30 +20,29 @@ keywords = [ ] classifiers = [ "Natural Language :: English", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", ] -requires-python = '>=3.8,<3.13' +requires-python = '>=3.9,<3.13' dependencies = [ "requests", "urllib3>=1.26.4", "gunicorn>=20.0.4", "minio", - "grpcio>=1.60,<1.67", - "grpcio-tools>=1.60,<1.67", + "grpcio>=1.60,<1.69", + "grpcio-tools>=1.60,<1.69", "numpy>=1.21.6", - "protobuf>=5.0.0,<5.29.0", + "protobuf>=5.0.0,<5.30.0", "pymongo", - "Flask==3.0.3", + "Flask==3.1.0", "pyjwt", "pyopenssl", "psutil", "click==8.1.7", - "grpcio-health-checking>=1.60,<1.67", + "grpcio-health-checking>=1.60,<1.69", "pyyaml", "plotly", "virtualenv",