Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add an example #8

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d58bdf3
add an example
ray-ruisun May 24, 2023
2f875a3
Update basic_cnn.py
ray-ruisun May 24, 2023
de70807
update dgc algorithm and add a metric
ray-ruisun May 25, 2023
4da548a
Update dgc to latest version
ray-ruisun May 26, 2023
2f3f97e
Update example.py
ray-ruisun May 26, 2023
a395927
Update example.py
ray-ruisun May 26, 2023
2e57da6
Refactor to work with SDK
ignas-gustainis May 29, 2023
4629e67
test dgc
ray-ruisun May 29, 2023
74395b3
Update example.py
ray-ruisun May 29, 2023
f3cccee
Temporary change
ignas-gustainis May 29, 2023
e3d7d4b
Set manual seed for torch
ignas-gustainis May 29, 2023
5ff6ddd
Update example.py
ray-ruisun May 29, 2023
018f179
Update basic_cnn.py
ray-ruisun May 29, 2023
aae2835
Update data_preprocessing.py
ray-ruisun May 29, 2023
13b47c6
Update example.py
ray-ruisun May 29, 2023
457789d
Update example.py
ray-ruisun May 29, 2023
c2ebb5b
Update example.py
ray-ruisun May 30, 2023
27df3ce
Use Sparse-Dense Connection.
limberc May 30, 2023
6003c06
Update.
limberc May 30, 2023
3d4e645
Update example.py
ray-ruisun Jun 5, 2023
9be7708
Update example.py
ray-ruisun Jun 6, 2023
6874712
add compare.py
Jun 22, 2023
6e0684c
bug fix
Jun 28, 2023
9a435b6
add evaluate
Jun 28, 2023
daa9774
delete break
Jun 28, 2023
c1d7d78
accumulate grad add
Jun 29, 2023
cc67fcf
Update.
limberc Jun 30, 2023
8d603d5
Refactor the compare.py
limberc Jun 30, 2023
b71f646
Update dgc into Optimizer format to perform momentum adjustment and g…
limberc Jun 30, 2023
54ff076
Update.
limberc Jun 30, 2023
910e4e9
Update.
limberc Jun 30, 2023
018d876
Add dgc_function.
limberc Jun 30, 2023
9479651
Add `test_dgc_sgd_save_all_grad`
limberc Jun 30, 2023
6b8063c
Add `test_dgc_sgd_save_all_grad`
limberc Jun 30, 2023
957b7a5
Update.
limberc Jun 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
11 changes: 11 additions & 0 deletions examples/credit_card_fraud_detection/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.11.3

WORKDIR /app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 5000/tcp
CMD [ "python", "./example.py" ]
Empty file.
15 changes: 15 additions & 0 deletions examples/credit_card_fraud_detection/build_and_upload.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash
IMAGE_TAG="flock_model"
OUTPUT_FILE=`mktemp`
echo "Building the model image."
docker build -t $IMAGE_TAG .

echo "Saving the docker image to a file and compressing it. It may take a while.."
#time (docker save $IMAGE_TAG | xz -T 0 > $OUTPUT_FILE)
docker save $IMAGE_TAG -o $OUTPUT_FILE

echo "Uploading the compressed image to IPFS.."
json=`curl -F "file=@$OUTPUT_FILE" 127.0.0.1:5001/api/v0/add`
hash=`echo $json | grep -o '"Hash":"[^"]*' | grep -o '[^"]*$'`
rm $OUTPUT_FILE
echo "Model definition IPFS hash: $hash"
149 changes: 149 additions & 0 deletions examples/credit_card_fraud_detection/compare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import copy
import json

import torch
import torch.utils.data
import torch.utils.data.distributed
from loguru import logger
from pandas import DataFrame

from data_preprocessing import get_loader
from flock_sdk import FlockSDK
from models.basic_cnn import CreditFraudNetMLP

flock = FlockSDK()
import random
import numpy as np


def dgc(grads, sparsity: float = 0.9):
"""
This function implements the Deep Gradient Compression (DGC) for a given set of gradients.
Args:
grads: list of gradients to compress
sparsity: the desired sparsity level, a float between 0 and 1.
"""
# Flatten the gradients into a single 1-D tensor
flat_grads = torch.cat([grad.view(-1) for grad in grads])

# Compute the threshold
abs_grads = flat_grads.abs()
k = int(sparsity * flat_grads.numel())
threshold = abs_grads.topk(k, largest=False).values.max()

# Create a mask for the elements to keep
mask = abs_grads.gt(threshold).float()

# Apply the mask to the original gradients
compressed_grads = []
start = 0
for grad in grads:
end = start + grad.numel()
compressed_grad = grad * mask[start:end].view_as(grad)
compressed_grads.append(compressed_grad)
start = end
sparse_tensors = [compressed_grad.to_sparse() for compressed_grad in compressed_grads]

return sparse_tensors


def compare_models(model1, model2):
for (name1, param1), (name2, param2) in zip(model1.named_parameters(), model2.named_parameters()):
if name1 == name2:
print(f'Parameter: {name1}')
print(f'Difference: {torch.sum(param1.data - param2.data)}')


def get_model():
seed = 0
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
return CreditFraudNetMLP(num_features=features)


batch_size = 128
epochs = 100
lr = 0.0003
device = torch.device('cpu')
features = 29


def process_dataset(dataset: list[dict], transform=None):
logger.debug('Processing dataset')
dataset_df = DataFrame.from_records(dataset)
return get_loader(
dataset_df, batch_size=batch_size, shuffle=True, drop_last=False
)


with open('test_dataset_new.json', 'r') as f:
dataset = json.loads(f.read())

data_loader = process_dataset(dataset)

train_model = get_model()
eval_model_1 = get_model()
eval_model_2 = get_model()

# compare_models(train_model, eval_model_1)


train_model.train()
train_model.to(device)
eval_model_1.to(device)
eval_model_2.to(device)

optimizer = torch.optim.SGD(train_model.parameters(), lr=lr)
criterion = torch.nn.BCELoss()

uncompressed_grads_accumulate = None
compressed_grads_accumulate = None

for epoch in range(epochs):
logger.debug(f'Epoch {epoch}')
train_loss = 0.0
train_correct = 0
train_total = 0
for batch_idx, (inputs, targets) in enumerate(data_loader):
# remove Time parameter
inputs, targets = inputs.to(device)[:, 1:], targets.to(device)
outputs = train_model(inputs)
loss = criterion(outputs, targets)
loss.backward()
uncompressed_grads = [p.grad.clone() for p in train_model.parameters()]
optimizer.step()
train_loss += loss.item() * inputs.size(0)
predicted = torch.round(outputs).squeeze()
train_total += targets.size(0)
train_correct += (predicted == targets.squeeze()).sum().item()
if uncompressed_grads_accumulate is None:
uncompressed_grads_accumulate = copy.deepcopy(uncompressed_grads)
else:
for grads_accumulate, uncompressed_grad in zip(uncompressed_grads_accumulate, uncompressed_grads):
grads_accumulate += uncompressed_grad

compressed_grads = dgc(uncompressed_grads)
if compressed_grads_accumulate is None:
compressed_grads_accumulate = copy.deepcopy(compressed_grads)
else:
for grads_accumulate, compressed_grad in zip(compressed_grads_accumulate, compressed_grads):
grads_accumulate += compressed_grad

logger.info(
f'Training Epoch: {epoch}, Acc: {round(100.0 * train_correct / train_total, 2)}, Loss: {round(train_loss / train_total, 4)}'
)

for p, compressed_grad in zip(eval_model_1.parameters(), uncompressed_grads_accumulate):
# p.data.add_(-self.lr * compressed_grad)
p.data -= lr * compressed_grad

for p, compressed_grad in zip(eval_model_2.parameters(), compressed_grads_accumulate):
# p.data.add_(-self.lr * compressed_grad)
p.data -= lr * compressed_grad

# print('==============================================')
compare_models(train_model, eval_model_1)
print('==============================================')
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .dgc import DGC_SGD, dgc_func
87 changes: 87 additions & 0 deletions examples/credit_card_fraud_detection/compresser/dgc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import torch
from torch.optim import SGD


def dgc_func(grads, sparsity: float = 0.95):
"""
This function implements the Deep Gradient Compression (DGC) for a given set of gradients.
Args:
grads: list of gradients to compress
sparsity: the desired sparsity level, a float between 0 and 1.
"""
# Flatten the gradients into a single 1-D tensor
flat_grads = torch.cat([grad.view(-1) for grad in grads])

# Compute the threshold
abs_grads = flat_grads.abs()
k = int(sparsity * flat_grads.numel())
threshold = abs_grads.topk(k, largest=False).values.max()

# Create a mask for the elements to keep
mask = abs_grads.gt(threshold).float()

# Apply the mask to the original gradients
compressed_grads = []
start = 0
for grad in grads:
end = start + grad.numel()
compressed_grad = grad * mask[start:end].view_as(grad)
compressed_grads.append(compressed_grad)
start = end
sparse_tensors = [compressed_grad.to_sparse() for compressed_grad in compressed_grads]

return sparse_tensors


class DGC_SGD(SGD):
def __init__(self, params, lr, momentum=0, dampening=0,
weight_decay=0, nesterov=False, sparsity=0.01, gradient_clip=0.01,
store_compressed_grad=False):
super().__init__(params, lr, momentum, dampening, weight_decay, nesterov)
self.sparsity = sparsity
self.gradient_clip = gradient_clip
self.store_compressed_grad = store_compressed_grad
self.compressed_grads = []

@torch.no_grad()
def step(self, closure=None):
"""Performs a single optimization step."""
self.compressed_grads = []
for group in self.param_groups:
weight_decay = group['weight_decay']
momentum = group['momentum']
dampening = group['dampening']
nesterov = group['nesterov']

for p in group['params']:
if p.grad is None:
continue
d_p = p.grad
d_p.data.clamp_(-self.gradient_clip, self.gradient_clip)

# Gradient Sparsification
grad_abs = d_p.data.abs()
mask = grad_abs.gt(self.sparsity * grad_abs.max())
d_p.data.mul_(mask)
if self.store_compressed_grad:
self.compressed_grads.append(d_p.clone())

# Momentum Correction
if p in self.state and 'momentum_buffer' in self.state[p]:
buf = self.state[p]['momentum_buffer']
buf.mul_(mask)

if weight_decay != 0:
d_p.add_(p, alpha=weight_decay)

if momentum != 0:
if 'momentum_buffer' not in self.state:
buf = self.state[p]['momentum_buffer'] = torch.clone(d_p).detach()
else:
buf = self.state[p]['momentum_buffer']
buf.mul_(momentum).add_(d_p, alpha=1 - dampening)
if nesterov:
d_p = d_p.add(buf, alpha=momentum)
else:
d_p = buf
p.add_(d_p, alpha=-group['lr'])
71 changes: 71 additions & 0 deletions examples/credit_card_fraud_detection/data_preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import os
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset


def load_dataset(data_file_path):
return pd.read_csv(data_file_path)


def split_dataset(data_file_path, num_clients=50, test_rate=0.2):
df = pd.read_csv(data_file_path)

proportions = np.random.dirichlet(np.ones(num_clients), size=1)[0]

split_dfs = []
start = 0
for proportion in proportions:
size = int(proportion * len(df))
split_dfs.append(df.iloc[start : start + size])
start += size

return split_dfs
"""
train_tests = []
for split_df in split_dfs:
train, test = train_test_split(split_df, test_size=test_rate)
train_tests.append((train, test))

return train_tests
"""

def get_loader(dataset_df, batch_size=128, shuffle=True, drop_last=False):
X_df = dataset_df.iloc[:, :-1]
y_df = dataset_df.iloc[:, -1]

X_tensor = torch.tensor(X_df.values, dtype=torch.float32)
y_tensor = torch.tensor(y_df.values, dtype=torch.float32)

y_tensor = y_tensor.unsqueeze(1)
dataset_in_dataset = TensorDataset(X_tensor, y_tensor)
return DataLoader(
dataset_in_dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last
)

# def get_loader(dataset_df, batch_size=128, shuffle=True, drop_last=False):
# X_df = dataset_df.iloc[:, :-1]
# y_df = dataset_df.iloc[:, -1]

# X_tensor = torch.tensor(X_df.values, dtype=torch.float32)
# y_tensor = torch.tensor(y_df.values, dtype=torch.float32)

# X_tensor = X_tensor.unsqueeze(1)
# X_tensor = X_tensor.transpose(
# 1, 2
# ) # Now X_tensor has shape [batch_size, num_features, seq_length]
# y_tensor = y_tensor.unsqueeze(1)
# dataset_in_dataset = TensorDataset(X_tensor, y_tensor)
# return DataLoader(
# dataset_in_dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last
# )


def save_dataset(datasets, dataset_dir="data"):
if not os.path.exists(dataset_dir):
os.makedirs(dataset_dir)

for i, dataset in enumerate(datasets):
dataset.to_json(f"data/client_{i}.json", orient="records")
Loading