FATE横向联邦学习:肠癌图像分类任务(下)——联邦化

在上一篇已经在本地跑通了肠癌图像分类的整个流程,现在我们将它移植到FATE上,实现联邦学习。

数据预处理

仿照“手写数字识别”任务,将三份训练数据进行预处理。并新建配置文件,处理后的格式如图。

image.png

  • code/: bind开头的文件是用于数据绑定;colon_conf是conf文件,colon_dsl是dsl文件
  • test/:500张测试数据
  • val/:500张验证集
  • train_pX/:第X份训练数据,每一份3000张

    • images/:图片文件夹,存放所有图像
    • config.yaml:图片文件夹配置:通道数,格式等。
    • filenames:images目录下的所有文件名(去掉后缀),每个文件名占一行
    • targets:images目录下的所有文件名(去掉后缀)和label,逗号区分,每个文件名和类别占一行。

修改源代码

之前在分析源码时,可以看到homo_nn的模型配置比较定制化,不够灵活,因此我们修改源码实现:

  1. 修改数据加载
  2. 使用预训练的vgg16算法模型;
  3. 使用GPU
  4. 实现模型评估

修改数据加载

我们知道vgg16传入的图像尺寸为$2242243$,而肠癌数据集的图像格式大小为$7687683$,所以需要先对数据加载进行修改:

修改FATE/python/federatedml/nn/backend/pytorch/data.py的VisionDataSet类的get_item方法:

def __getitem__(self, index):

img = Image.open(self.images[index]).convert(self._PIL_mode)
if img.size[0] > 224:
resize_transform = torchvision.transforms.Compose(
[
torchvision.transforms.Resize(256),
torchvision.transforms.CenterCrop(224),
]
)
img = resize_transform(img)
if self.targets_is_image:
target = Image.open(self.targets[index])
else:
target = self.targets[index]
return self.transforms(img, target)

修改算法模型

我们需要修改homonn组件下的torch.py文件,FedLightModule类的__init方法

# --------- 修改前 ----------
# self.model = nn.Sequential(*layers)
# --------- 修改后 ----------
# 这里非常定制化,out_features=2是针对本二分类任务,如果需要更灵活的传参,可以读取配置文件的配置
LOGGER.info("define vgg16")
self.model = models.vgg16(pretrained=True)
self.model.classifier[6] = nn.Linear(in_features=4096, out_features=2, bias=True)

使用GPU

需要修改homo_nn组件下的_torch.py文件,FedLightModule类的training_step、validation_step以及do_convergence_check和encrypt方法。改动如下。

do_convergence_check和encrypt要改动的原因并不清楚,大致来看,应该是在model聚合和加密的时候需要将其从GPU中取出。。不过可以肯定,如此改动就会生效。

def validation_step(self, batch, batch_idx):
x, y = batch
# -------------- add start ------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# LOGGER.info(f'device:{device}')
x, y = x.to(device), y.to(device)
# -------------- add end ------------------
y_hat = self.forward(x)
loss = self.loss_fn(y_hat, y)

if y_hat.shape[1] > 1:
accuracy = (y_hat.argmax(dim=1) == y).sum().float() / float(y.size(0))
else:
y_prob = y_hat[:, 0] > 0.5
accuracy = (y == y_prob).sum().float() / y.size(0)
return {"val_loss": loss, "val_accuracy": accuracy}

def training_step(self, batch, batch_idx):
x, y = batch
# -------------- add start ------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
x, y = x.to(device), y.to(device)
self.model = self.model.to(device)
# -------------- add end ------------------
y_hat = self.model(x)
# LOGGER.info('y_hat: {}'.format(y_hat.detach().numpy()))
loss = self.loss_fn(y_hat, y)
self.log("train_loss", loss)
return loss

def do_convergence_check(self, weight, loss):
# loss_value = loss.detach().numpy().tolist()
loss_value = loss.detach().cpu().numpy().tolist()

self.loss_summary.append(loss_value)

# send loss to server
self.send_loss(loss_value, weight)

# recv convergence status
status = self.recv_loss()
return status

def encrypt(self, tensor: torch.Tensor, weight):
return self.random_padding_cipher.encrypt(
# torch.clone(tensor).detach().mul_(weight)
torch.clone(tensor).detach().mul_(weight).cpu()
).numpy()

模型评估

以下是一个简单的模型评估代码,在homo_nn文件夹下新建evaluation.py,代码及注释如下

# 导入必要的库,注意这里需要导入homo_nn下的FedLightModule类
# 如果库不存在,需要在fate-flow容器下pip安装
from federatedml.nn.homo_nn._torch import FedLightModule
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import datasets, models, transforms
from tqdm import tqdm
import os
from torch import nn
import torch
from PIL import Image
import numpy as np
import base64
# import cv2
from io import BytesIO

# dataset

class My_Dataset(Dataset):
def __init__(self, image_path, target):
super().__init__()
self.image_path = image_path
self.target = target

def __len__(self):
return len(self.image_path)

def __getitem__(self, index):
path = self.image_path[index]
img = Image.open(path).convert('RGB')
resize_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
])

toTensor_transform = transforms.Compose([
transforms.ToTensor()
])
label = self.target[index]
if img.size[0] > 224:
img = resize_transform(img)
img = toTensor_transform(img)
return img, label


# test
def test_model(test_loader, model, criterion, device):
model.eval()

true_labels = []
pred_labels = []
scores = []

size = len(test_loader.dataset)
num_batches = len(test_loader)
print(f'size:{size}; num_batches:{num_batches}')
losses, correct = 0, 0
# log_loss = 0
################################# validation #################################
with torch.no_grad():
for batch, (x, y) in enumerate(tqdm(test_loader)):
device = torch.device(device)
x, y = x.to(device), y.to(device)
pred = model(x)
loss = criterion(pred, y.long().squeeze())
current = batch * len(x)
scores += pred.tolist()
y_pred, y_true = torch.argmax(pred, axis=1), y.long().squeeze()
correct += (y_pred == y_true).type(torch.float).sum().item()
loss, current = np.round(loss.item(), 5), batch * len(x)
true_labels += y_true.detach().cpu().tolist()
pred_labels += y_pred.detach().cpu().tolist()
losses += loss
correct /= size
losses /= num_batches
print(f'losses:{losses}\n')
metrics = f"Test: Accuracy: {(100*correct):>0.2f}%, Avg loss: {losses:>5f} \n"
return np.array(true_labels), np.array(pred_labels), np.array(scores), metrics



'''
model_path: model path
res_path: save csv path,保存预测的数据结果
metric_path: mertic path,保存模型评估结果
typ : predict or test,test是评估模型,需要输出模型的评估指标,predict是单纯的对未知label数据进行预测
test_path: test data path,测试数据集path,默认和train是相同的组织形式
'''
def evaluation(model_path, res_path, metric_path, typ, test_path):
model = FedLightModule.load_from_checkpoint(model_path)
test_images_over = []
test_labels_over = []
if typ == 'test':
with open(os.path.join(test_path, 'targets'), 'r') as f:
line = f.readline()
while line:
filename = line.split(',')[0] + '.jpg'
target = int(line.split(',')[-1].replace('\n',''))
test_images_over.append(os.path.join(test_path, 'images', filename))
test_labels_over.append(target)
line = f.readline()
elif typ == 'predict':
with open(os.path.join(test_path, 'filenames'), 'r') as f:
line = f.readline()
while line:
filename = line.replace('\n', '') + '.jpg'
target = 0
test_images_over.append(os.path.join(test_path, 'images', filename))
test_labels_over.append(target)
line = f.readline()
test_dataset = My_Dataset(test_images_over, test_labels_over)
test_loader = DataLoader(test_dataset, batch_size=32)
loss_fn = nn.CrossEntropyLoss()
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = 'cpu'
true_labels, pred_labels, scores, metrics = test_model(test_loader, model, loss_fn, device)

l = len(true_labels)
if typ == 'test':
with open(metric_path, 'w') as f:
f.write(f'{metrics}\n')
with open(res_path, 'w') as f:
f.write('true_label,pred_label,score\n')
for i in range(l):
true_label, pred_label, score = true_labels[i], pred_labels[i], scores[i]
f.write(f'{true_label},{pred_label},{score}\n')
elif typ == 'predict':
with open(res_path, 'w') as f:
f.write('pred_label,score\n')
for i in range(l):
true_label, pred_label, score = true_labels[i], pred_labels[i], scores[i]
f.write(f'{pred_label},{score}\n')


if __name__ == '__main__':
res_path = "./123.csv"
metric_path = "./123.txt"
typ = "test"
path = "/data/projects/fate/examples/gwork/colon/test"
model_path = ''
evaluation(model_path, res_path, metric_path, typ, path)

代码修改完毕。下面进行训练。

联邦学习

分别修改CONF和DSL配置

colon_conf.json

{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"arbiter": [
10000
],
"host": [
9998,9997
],
"guest": [
9999
]
},
"component_parameters": {
"common": {
"homo_nn_0": {
"api_version": 2,
"encode_label": true,
"max_iter": 2,
"batch_size": 32,
"optimizer": {
"lr": 0.000001,
"optimizer": "Adam"
},
"loss": "CrossEntropyLoss",
"metrics": [
"accuracy"
],
"nn_define": [
],
"config_type": "pytorch"
}
},
"role": {
"host": {
"0": {
"reader_0": {
"table": {
"name": "colon_images_0",
"namespace": "experiment"
}
}
},
"1": {
"reader_0": {
"table": {
"name": "colon_images_1",
"namespace": "experiment"
}
}
}
},
"guest": {
"0": {
"reader_0": {
"table": {
"name": "colon_images_2",
"namespace": "experiment"
}
}
}
}
}
}
}

colon_dsl.json

首先数据已经处理成FATE可以读取的格式。

然后将三份数据分发到集群的三台不同的机器上,分别是fate9999,fate9998,以及fate9997

进入三台机器的fate-client container,使用flow table bind -c 命令将文件夹绑定到table。

在发起方FATE9999的client容器中,进入code文件夹,执行flow job submit -c colon_conf.json -d colon_dsl.json启动任务

查看FATE-BOARD Job

他的homo_nn组件输出的日志如下:

image.png

模型评估

在job结束后,会保存check point,保存的容器为fate-flow container。

以fate9999为例,保存路径为:

/data/projects/fate/fateflow/jobs/202210131350059277200/guest/9999/homo_cv_0/202210131350059277200_homo_cv_0/0/task_executor/7f86f6064aff11edbd540242c0a70064/model.ckpt

不同角色(guset、host)、不同party_id的机器上路径可能有所差异。

将该路径复制到刚才的evaluation.py的model_path中,执行python evaluation.py,查看输出结果:

Accuracy: 99.20%, Avg loss: 0.022641

实验结果

结合baseline的实验结果,可以得到下表。

使用的数据 训练类型 模型评估
全部训练数据(Train_1+Train_2+Train_3) 本地(GPU) Accuracy: 99.60%, Avg loss: 0.011775
Train_1 本地(GPU) Accuracy: 73.20%, Avg loss: 0.641481
Train_2 本地(GPU) Accuracy: 71.40%, Avg loss: 0.649055
Train_3 本地(GPU) Accuracy: 56.80%, Avg loss: 0.654335
全部训练数据(Train_1+Train_2+Train_3) 联邦(GPU) Accuracy: 99.20%, Avg loss: 0.022641

在增加了部分读写日志,并加以分析可以得到训练的时间分布:

image.png

实验分析

  1. 使用联邦学习的效果要优于分开训练的模型效果,证明了联邦学习的有效性。

  2. 做实验发现,使用CPU进行训练,每个epoch需要大约15分钟,而使用GPU之后,每个epoch仅需要29s左右。使用GPU的计算效率要远远大于使用CPU。

  3. 对日志进行分析发现,计算时间约为60s,模型参数加密、解密以及传输的时间约为比例约为600s,二者比例约为1:10。因此可以得出结论:结肠癌联邦学习的时间瓶颈不在于本地模型的训练时间,而是在于模型参数加密、解密以及传输时间。

补充

修改了homo_nn的源码,导致原有的homo_nn的功能失效,所以这里不推荐这样改,更推荐开发新的组件来完成。

文章作者: Met Guo
文章链接: https://guoyujian.github.io/2022/11/04/FATE%E6%A8%AA%E5%90%91%E8%81%94%E9%82%A6%E5%AD%A6%E4%B9%A0%EF%BC%9A%E8%82%A0%E7%99%8C%E5%9B%BE%E5%83%8F%E5%88%86%E7%B1%BB%E4%BB%BB%E5%8A%A1%EF%BC%88%E4%B8%8B%EF%BC%89%E2%80%94%E2%80%94%E8%81%94%E9%82%A6%E5%8C%96/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Gmet's Blog