基于深度学习的恶意软件分类器
2022-6-4 17:59:42 Author: mp.weixin.qq.com(查看原文) 阅读量:16 收藏


本文为看雪论坛优秀文章
看雪论坛作者ID:1900


前言

1、实验内容

参考论文:IMCFN: Image-based Malware Classification using Fine-tuned Convolutional Neural Network Architecture(https://www.sciencedirect.com/science/article/pii/S1389128619304736)构建的恶意软件分类器,本文所用的数据集为在基于深度学习的恶意软件分类器(一)【https://bbs.pediy.com/thread-271558.htm文中介绍过的微软公开数据集,与论文中所用数据集并不相同。
下图为分类器的构建过程,首先通过applyColorMap将灰度图转为彩图作为输入数据,接着用这些来微调(fine-tuning)加载了预训练模型的VGG16模型来完成分类器的训练。此外,还通过数据增强(Data Augmentation, DA)来缓解数据不足的问题,增加训练集样本多样性。

2、实验环境

  • Python版本:3.6.13

  • Pytorch版本:1.8.1

  • CUDA版本:11.4

数据处理

下图显示了将一张恶意软件可视化为灰度图后,resize成(224*224)大小后在应用applyColor转换为彩图的结果,从转换结果可以看出转换的彩图依然具有清晰的图像纹理。
                           
在深度学习的任务中,都会希望样本数量可以足够多,且类型丰富。这样的数据集训练出来的模型具有更好的泛化能力,分类器的性能将会更高。但是,实际训练过程中,很有可能无法得到满意的样本数量。
因此,需要通过数据增强来丰富样本数。数据增强的方法有很多,本文选用了其中3种,分别是垂直翻转,水平翻转和随机旋转一定角度(-45度到45度),下面的图图分别对应三种数据增强方法后生成的图片。
通过数据增强,此时拥有了类别更丰富,数量更多的训练样本来构建分类器,此时的数据类代码如下:
import osfrom torch.utils.data import Datasetimport globimport pandas as pdimport torchfrom torchvision import transformsimport cv2from torchvision.transforms import InterpolationModefrom PIL import Image  class MalwareDataset(Dataset):    def __init__(self, file_path, is_train):        self.is_train = is_train        self.file_path = glob.glob(os.path.join(file_path, "*.png"))        self.len = len(self.file_path)         self.transforms_data = transforms.Compose([transforms.ToTensor()])         # 判断是否是获取训练数据集标志        if is_train:            train_label_path = os.path.join(file_path, "..", "trainLabels.csv")            df = pd.read_csv(train_label_path)            self.y_data = get_train_label(self.file_path, df)            self.y_data = torch.Tensor(self.y_data)             # 随机选择三种数据增强方法中的一种            transforms_choice = transforms.RandomChoice([transforms.RandomRotation(degrees=45,                                                                                   interpolation=InterpolationMode.NEAREST,                                                                                   expand=True),                                                         transforms.RandomHorizontalFlip(p=0.4),                                                         transforms.RandomVerticalFlip(p=0.4)])            # 数据增强以后在缩放到(224, 224)后转成tensor            self.transforms_data = transforms.Compose([transforms_choice,                                                       transforms.Resize((224, 224), interpolation=InterpolationMode.NEAREST),                                                       transforms.ToTensor()])     def __getitem__(self, index):        image = cv2.imread(self.file_path[index])        image = cv2.resize(image, (224, 224))        image = cv2.applyColorMap(image, cv2.COLORMAP_RAINBOW)        image = Image.fromarray(image)        image = self.transforms_data(image)         if self.is_train:            return image, self.y_data[index]        else:            file_name = get_file_name(self.file_path[index])            return image, file_name     def __len__(self):        return self.len  # 根据文件路径得出不带后缀的文件名def get_file_name(file_path):    file_name_begin = file_path.rfind("/") + 1    file_name_end = file_path.rfind(".")    return file_path[file_name_begin:file_name_end]  # 从trainLabels.csv中获得文件名对应的类别def get_train_label(file_path, df):    train_label = []    for fp in file_path:        file_name = get_file_name(fp)        train_label.append(df[df["Id"] == file_name]["Class"].astype(int).values[0] - 1)    return train_label


模型

1、卷积神经网络

卷积神经网络(Convolutional Neural NetWorks, CNN)是一类包含卷积计算且具有深度结构的前馈神经网络,具有良好的表征学习能力。卷积层和池化层是卷积神经网络的关键内容,这两个层都是通过滑动窗口实现的。
下图是一个卷积层的实例,此时输入的图像是一个5*5大小的矩阵,卷积核的大小为3 * 3,此时滑动窗口的大小就是3 * 3,滑动窗口的步长为1,该滑动窗口会向下或向右移动,对应的窗口中的数值与卷积核的数值进行卷积运算(对应相乘后相加)作为输出结果。
以下图为例,第一个输出结果就是输入图像深色部分核卷积核中的每一个对应数字相乘得到的结果,也就是0 * 0 + 1 * 1 + 2 * 2 + 5 * 1 + 6 * 2 + 7 * 0 + 0 * 2 + 1 * 1 + 0 * 0 =23。
第一个结果计算完成后,滑动窗口会向右移动一个步幅,第二个输出结果的计算就是1 * 0 + 2 * 1 + 3 * 2 + 6 * 1 + 7 * 2+ 8 * 0 +1 * 2 + 2 * 1 + 3 * 0 = 32。以此类推,最终输入图像经过运算以后会得到最终的输出结果。
 
卷积层的输出结果会在经过池化层处理,如下图所示,池化层分别平均池化层和最大池化层两种。
和卷积层一样,池化层也是通过滑动窗口来实现的,图中对应的滑动窗口大小为2*2,每次也是移动一个步长。池化层的计算很简单,最大池化层就是对窗口中的数值取最大值作为输出,平均池化层就是对窗口中的数值求平均数后最为输出。
通过池化层可以减少输入数据,从而减少特征的尺寸以便更快地计算,同时池化层可以保留最重要的信息。
通常,一个卷积神经网络架构具有一组或多组卷积和池化层,最后通过一组全连接层输出结果。在卷积层和池化层的支持下,CNN可以进行局部的特征学习,专注于输入数据的局部部分,抽取出图像中的有效特征。全连接层就会接收到这些高级别的特征作为输入,因此能够准确地处理复杂数据,在图像识别和分类方面有良好的性能。

2、VGG16

下图是VGG16的网络架构,该网络共有5个Block,包含13个卷积层和5个池化层来抽取图像特征,最终通过全连接层来输出分类结果。
最后的一层全连接层为任务的类别数,这里的1000是ImageNet数据集的类别数,需要修改为本文的分类任务数。

3、微调模型

CNN的卷积层和池化层提取的特征一般都是比较通用的特征,一个图像分类任务训练得到的参数可以迁移到其他任务中去使用。而后面的全连接层是用来输出分类结果,这些层的参数往往因为任务的不同有较大差别。
因此,可以将模型在其他任务中训练得到的参数迁移到本任务中,通过冻结前面的一些卷积层,仅调整后面的全连接层的参数来微调模型构建分类器。此时,需要训练的参数量就会减少,模型训练速度会有所加快。
本文的分类器是通过使用VGG16模型在ImageNet数据集预训练得到的参数,进行微调得到的,模型代码依然如下:
import torch.nn as nnfrom torch.hub import load_state_dict_from_url __all__ = [    'VGG', 'vgg16', 'vgg16_bn']  model_urls = {    'vgg16': 'https://download.pytorch.org/models/vgg16-397923af.pth',    'vgg16_bn': 'https://download.pytorch.org/models/vgg16_bn-6c64b313.pth',}  class VGG(nn.Module):    def __init__(self, features, num_classes=1000, init_weights=True):        super(VGG, self).__init__()        self.features = features        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))        self.classifier1 = nn.Sequential(            nn.Linear(512 * 7 * 7, 4096),            nn.ReLU(True),            nn.Dropout(),            nn.Linear(4096, 4096),            nn.ReLU(True),            nn.Dropout(),            nn.Linear(4096, num_classes),        )        if init_weights:            self._initialize_weights()     def forward(self, x):        x = self.features(x)        x = self.avgpool(x)        x = x.view(x.size(0), -1)        x = self.classifier1(x)        return x     def _initialize_weights(self):        for m in self.modules():            if isinstance(m, nn.Conv2d):                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')                if m.bias is not None:                    nn.init.constant_(m.bias, 0)            elif isinstance(m, nn.BatchNorm2d):                nn.init.constant_(m.weight, 1)                nn.init.constant_(m.bias, 0)            elif isinstance(m, nn.Linear):                nn.init.normal_(m.weight, 0, 0.01)                nn.init.constant_(m.bias, 0)  def make_layers(cfg, batch_norm=False):    layers = []    in_channels = 3    for v in cfg:        if v == 'M':            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]        else:            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)            if batch_norm:                layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]            else:                layers += [conv2d, nn.ReLU(inplace=True)]            in_channels = v    return nn.Sequential(*layers)  cfgs = {    'D': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],}  def _vgg(arch, cfg, batch_norm, pretrained, progress, **kwargs):    if pretrained:        kwargs['init_weights'] = False    model = VGG(make_layers(cfgs[cfg], batch_norm=batch_norm), **kwargs)    if pretrained:        state_dict = load_state_dict_from_url(model_urls[arch],                                              progress=progress)        model.load_state_dict(state_dict, strict=False)    return model  def vgg16(pretrained=False, progress=True, **kwargs):    return _vgg('vgg16', 'D', False, pretrained, progress, **kwargs)  def vgg16_bn(pretrained=False, progress=True, **kwargs):    return _vgg('vgg16_bn', 'D', True, pretrained, progress, **kwargs)

需要微调的层是Block5和之后的全连接层,因此,需要将前面的Block1到Block4全部冻结,以下代码分别是冻结相应的层以及打印需要训练的参数的代码:
# 冻结层def set_parameter_requires_grad(model):    count = 0    for param in model.parameters():        param.requires_grad = False        if param.size()[0] == 512:            count += 1        if count == 6:            break  # 获取需要训练的参数def train_param_number(model):    train_num = sum(param.numel() for param in model.parameters() if param.requires_grad)    print("train_num:%d" % train_num)

通过调用这两个函数,就可以看到在冻结后需要训练的参数量有所减少
model = vgg16(num_classes=9)train_param_number(model)set_parameter_requires_grad(model)train_param_number(model.classifier1)    ......train_num:134297417train_num:119582729


参数

由于通过数据增强丰富了训练集样本,此时有了数量更多的训练集样本,因此需要更多的epoch才能让模型拟合,此时相关的参数设定如下:
import os  class Configure:    # 设置数据集的路径    base_path = ""    train_gray_path = os.path.join(base_path, "train_gray_images")    test_gray_path = os.path.join(base_path, "test_gray_images")    submit_path = os.path.join(base_path, "submit.csv")     is_train = True     # 用来设置是训练模型还是测试模型    batch_size = 8    num_workers = 2    epochs = 40    lr = 1e-3    decay = 0.0005    momentum = 0.9    model_path = "IMCFN.pth"    num_classes = 9


分类结果

对分类器进行训练和测试的代码如下:
import osimport sysfrom MalwareDataset import MalwareDatasetfrom torch.utils.data import DataLoaderimport torchimport torch.nn.functional as Fimport pandas as pdfrom Configure import Configurefrom VGG import vgg16  # 冻结层def set_parameter_requires_grad(model):    count = 0    for param in model.parameters():        param.requires_grad = False        if param.size()[0] == 512:            count += 1        if count == 6:            break  # 获取需要训练的参数def train_param_number(model):    train_num = sum(param.numel() for param in model.parameters() if param.requires_grad)    print("train_num:%d" % train_num)  def load_model(model_path):    if not os.path.exists(model_path):        print("模型路径错误,模型加载失败")        sys.exit(0)    else:        return torch.load(model_path)  def save_model(target_model, model_path):    if os.path.exists(model_path):        os.remove(model_path)    torch.save(target_model, model_path)  def train(epoch):    for batch_idx, data in enumerate(train_loader, 0):        optimizer.zero_grad()                                   # 梯度清0         inputs, labels = data        inputs, labels = inputs.to(device), labels.to(device)         y_pred = modeler(inputs)                                # 前向传播        loss = F.cross_entropy(y_pred, labels.long())           # 计算损失         if batch_idx % 100 == 99:            print("epoch=%d, loss=%f" % (epoch, loss.item()))         loss.backward()                                         # 反向传播        optimizer.step()                                        # 梯度更新  def test():    df = pd.read_csv(conf.submit_path)    with torch.no_grad():        for inputs, file_name in test_loader:            inputs = inputs.to(device)            outputs = modeler(inputs)            predicted = F.softmax(outputs.data, dim=1)            data_len = len(inputs)            for i in range(data_len):                dict_res = {"Id": file_name[i], "Prediction1": 0, "Prediction2": 0,                            "Prediction3": 0, "Prediction4": 0, "Prediction5": 0, "Prediction6": 0,                            "Prediction7": 0, "Prediction8": 0, "Prediction9": 0}                for j in range(9):                    dict_res["Prediction" + str(j + 1)] = predicted[i][j].item()                df = df.append(dict_res, ignore_index=True)    df.to_csv(conf.submit_path, index=0)  if __name__ == '__main__':    os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"    os.environ["CUDA_VISIBLE_DEVICES"] = "5"    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")     conf = Configure()     test_dataset = MalwareDataset(conf.test_gray_path, False)    test_loader = DataLoader(test_dataset, batch_size=conf.batch_size,                             shuffle=False, num_workers=conf.num_workers)     # 根据是否训练还选择是否加载保存的模型    if conf.is_train:        train_dataset = MalwareDataset(conf.train_gray_path, True)        train_loader = DataLoader(train_dataset, batch_size=conf.batch_size,                                  shuffle=True, num_workers=conf.num_workers)        modeler = vgg16(pretrained=True, num_classes=conf.num_classes)    else:        print("=====================开始加载模型================")        modeler = load_model(conf.model_path)        print("=====================模型加载完成================")    # train_param_number(modeler)    set_parameter_requires_grad(modeler)    # train_param_number(modeler)    modeler.to(device)     if conf.is_train:        optimizer = torch.optim.SGD(modeler.parameters(), lr=conf.lr,                                    weight_decay=conf.decay, momentum=conf.momentum)        print("=====================开始训练模型================")        for i in range(conf.epochs):            train(i)        print("=====================模型训练完成================")        save_model(modeler, conf.model_path)    print("=====================开始测试模型================")    test()    print("=====================模型测试完成================")

下面图是模型在不同epoch情况下的损失,由图可以看出,当epoch等于40时,损失最小,分类器的性能最好。

看雪ID:1900

https://bbs.pediy.com/user-home-835440.htm

*本文由看雪论坛 1900 原创,转载请注明来自看雪社区

# 往期推荐

1.Android APP漏洞之战——调试与反调试详解

2.Fuzzm: 针对WebAssembly内存错误的模糊测试

3.2022腾讯游戏安全初赛一题解析

4.一文读懂PE文件签名并手工验证签名有效性

5.一道pwn题目2e4zu的深入分析

6.CNVD-2018-01084 漏洞复现报告(service.cgi 远程命令执行漏洞)

球分享

球点赞

球在看

点击“阅读原文”,了解更多!


文章来源: http://mp.weixin.qq.com/s?__biz=MjM5NTc2MDYxMw==&mid=2458451191&idx=1&sn=8a86e191c520e2622466a733add748b7&chksm=b18fcc7d86f8456b165ddf6bda8192fbd7f465d3ef23e51fdec7d1085c7fe0e4ddf99baf2c77#rd
如有侵权请联系:admin#unsafe.sh