一次安全项目中的机器学习探索
2023-7-31 22:39:32 Author: mp.weixin.qq.com(查看原文) 阅读量:4 收藏

前言

注:本文来自于蓝星群群友无敌大聪聪的投稿,感谢其无私分享!

这篇文章的分享起源于今年内部做的一个安全项目,目前项目还在进行当中,做的过程中有一些收获和感悟,也有这几天的一些探索。

分享一下其中涉及到的安全管理、安全技术的内容,没有很高大上的东西,大部分都是自己的经验总结和一些很浅的尝试。

可能会帮助到一些遇到类似问题的同学,开阔一些思路,促进一下相关的交流。

请轻拍 🧱

背景

公司的制度中要求定期进行各个系统的账号权限稽核工作,涉及几十个系统的账号、权限。每次由安全发起检查流程,需要系统管理员完成所负责系统的账号和权限检查、确认及清理工作。

从安全管理角度,这项工作在各类外部的安全检查中,等级保护、ISO27001、外部审计等,都是必定会被检查的的项目,相信甲方的安全合规同学应该都深有体会。

从安全防护的角度,账号和权限管理也是很重要的一环,不论是来自外部的攻击,还是内部的违规操作,不当的账号和权限管理都可能引发更大的危害。

系统管理员完成检查工作后,在流程内上传纸质签字单。

需求

从制度流程角度看,这项重要工作目前已经开展了,并且大部分系统管理员还是能按时配合完成,也会在自查过程中回收一些账号、关闭一些转岗的权限。

但是从技术角度看,安全在这个过程中,能做的更多是提出要检查的系统范围、检查的要点,做好提醒工作。通俗地讲,有了管理手段,但是缺了一些技术上的手段,总让人对实际效果产生担忧。

考虑到这项工作的重要性和收益,于是在4月份左右,我们开始尝试把这部分工作做的自动一些,期望能提高账号权限核查工作的效率和质量,也能减轻系统管理员在权限复核时的压力。

实践

需求的实现和系统的开发由资深程序员(同事老A👴)负责,他对业务足够熟悉,这里面很多系统是他曾经开发的,刷脸找业务同事配合也更方便。

数据接入:

① 通过公司的数据计算平台,快速实现相关表的只读申请,不用再单独去每个数据库开一个只读账号,通过计算平台的命名空间进行隔离。

② 数据计算平台上可以完成跨数据库类型、跨数据库、跨表的查询、计算工作。

③ 数据计算平台具备实时性,非定时抽取数据。(不过权限检查工作,并没有太高的实时性要求)。

④ 被检系统的用户、权限、菜单项等表数据的接入,并进行数据表的映射(建模),方便对不同系统使用相同的语句进行查询。

⑤ 接入人力资源主数据、域账号数据,作为人员、账号判断依据。

数据计算平台除了方便实时获取数据外,并且提供了计算函数,方便进行逻辑处理的编写,同时内嵌前端低代码平台,方便进行数据的各种展示和操作,如增删改查、筛选、可视化等等,大大节约了本次需求的开发时间。

检查内容:

最开始接入数据后,只是用来检查离职账号未禁用的情况,但是随着边开发、边使用,老A挖掘出了更多的需求。

目前的检查主题包括:

① 离职账号未禁用主题

② 外包账号检查主题

③ 实习生账号检查主题

④ 供应商账号检查主题

⑤ 公共账号检查主题

⑥ 审计账号检查主题(外审时开通的,审计结束后应关闭)

⑦ 测试、演示账号和检查主题(类似叫demo、test的账号,不宜长期激活)

关于账号未禁用这部分,单独说明一下,虽然大部分系统依赖于域账号登录,离职后域账号会关闭。但是考虑到以下风险的可能性,仍然建议每个系统独立设置一个账号的开关功能,理由如下:

① 方便一键关闭该用户在该系统内的所有访问权限

② 系统中可能存在本地账号,并不受域账号控制

③ 公司正常流程是员工正式离职一个月前,各系统管理员会接到邮件通知,但是有时会出现遗漏

④ 同上,域账号一般离职最后才关闭,而工作交接阶段可能已无必要访问该系统(其实这对双方都是一种保护)

弱口令:

系统迭代的过程中,已经不局限在最初的单一系统的账号定期核查需求,而是依托于账号衍生出了不同风险主题的需求。公司的域账号的密码强度要求、定期强制修改策略非常严格,新开发的现代系统大多都接入了单点登录或者LDAP认证。

然而,老A清楚,公司还有很多年代久远的业务系统,久到比我工作的年头还长,并且有些是外部采购的系统,系统本身的安全控制做得不太好,又不方便接入统一的账号体系。

于是有一天,老A开始了弱口令检查的主题,对账号密码的哈希值进行了字典破解,检查出上百个弱口令账号。同步给系统管理员限期整改密码复杂度策略,强制更新存在的弱口令。

总结:

依托于公司的数据计算平台、前端低代码平台,短期内已经完成了30多个系统的数据接入整理和需求开发。

进行了多个“主题”的检查,发现并处理了诸多类型的问题(问题比预期的要多)。

并且在需求开发的过程中,随时检查发现的问题,随时通知系统管理员进行处理并跟踪汇总。

目前主要接入的以业务系统为主,后续会考虑接入其它类型的系统账号权限数据,尤其是一些高风险的系统,比如GitLab、邮件、堡垒机或其它集权系统(这些系统有些可能并没有数据库或未对用户开放数据库访问,对接到计算平台可能会麻烦一些)。

目前账号检查需求基本收尾,准备在平台上继续实现统计报表、趋势视图、问题跟踪等管理功能的需求。

另外,老A说考虑未来会接入一部分日志数据,这个需求我们还在讨论,个人感觉除了登录日志可能会利于发现“长期不使用”的账号,其它日志接到这个系统里有悖于这个系统设计的初衷。

探索

账号这块儿检查的需求已经完成的差不多了,相对来说只要各个系统的数据能接进来,检查的规则相对简单。

上周,老A和我说,他检查的过程中,发现某个员工不应该出现在这个系统的用户表里。我问他怎么判断的,他说这个员工的岗位和这个系统已经没有联系了,可能多年前负责过这个系统,后来的工作职责已经不再负责这个系统了。(人形态专家规则引擎!👍)

想到之前和他介绍过一个国内互联网公司开源的合规系统,里面有“权限互斥”的检查,检查是否同一个人兼职系统管理员、开发,但是规则相对来说还是比较“单薄”,只能做这个场景和维度的判断,规则也是硬编到代码里,完全依赖于“专家规则”,可能并不能满足我们未来的需求。

于是,我尝试了一下用机器学习来分析潜在的“权限不合理”现象,下面就把这几天的工作思路分享一下,不一定完全可靠,权当探索。

准备工作:

因为是快速测试验证,所以直接在本地进行,只管老A要了一个系统的用户、角色表的数据。

工具:python、numpy、pandas、sklearn、matplotlib

数据:PXX.xlsx(关键信息已脱敏)

目标:通过算法检出权限异常账号

第一步

数据预处理

# 1.读取数据,预览数据# 分别读取数据表,检查是否存在空值import pandas as pdimport numpy as np
print("获取用户部门表:df_user_dept")df_user_dept = pd.read_excel("./data/PXX.xlsx", sheet_name="user_dept").sort_values(by="user_id", ignore_index=True)validate(df_user_dept)print(df_user_dept)
print("获取用户角色表:df_user_role")df_user_role = pd.read_excel("./data/PXX.xlsx", sheet_name="user_role").sort_values(by="user_id", ignore_index=True)validate(df_user_role)print(df_user_role)
print("获取角色名称表:df_role")df_role = pd.read_excel("./data/PXX.xlsx", sheet_name="role").sort_values(by="role_id", ignore_index=True)validate(df_role)print(df_role)

这张表实际计算过程中并不需要,只是为了了解一下表结构

def validate(df):    # 检查空值    if df.isna().values.any():        print("发现空值:", df[df.isna().values == True])    # 检查重复    if df.duplicated().values.any():        print("发现重复值:", df[df.duplicated().values == True])

需要检查一下缺失值、重复数据,实际上我这里导出的数据并不涉及。

# 2.组装数据,形成用户权限矩阵# 建立role_id列user_id_uni_size = df_user_dept["user_id"].unique().sizerole_id_uni_size = df_role["role_id"].unique().sizeprint("建立以role_id作为列名的数据, 行数量:%s 列数量:%s" % (user_id_uni_size, role_id_uni_size))pd_role_id_cols = pd.DataFrame(np.zeros((user_id_uni_size, role_id_uni_size)),                               columns=["role_id_" + str(_) for _ in df_role["role_id"].unique().tolist()])print(pd_role_id_cols)

这里需要拼接一下数据,把64种role_id类型,变成64列全为“0”的数据

# 组装数据,形成每行为一个用户的信息,以及是否有每个role_id的信息,形成用户权限矩阵print("建立用户权限矩阵")df_user_role_matrix = df_user_dept.filter(    items=["user_id", "user_name", "domain_account", "company_code", "department_name", "role_id"])df_user_role_matrix = pd.concat([df_user_role_matrix, pd_role_id_cols], axis=1)print(df_user_role_matrix.head(5).to_string())

拼接到用户表,这里只打印了5行

print("更新用户权限矩阵数据")for rowi in df_user_role_matrix.index:    user_id = df_user_role_matrix.loc[rowi, "user_id"]    for _ in df_user_role[df_user_role["user_id"] == user_id]["role_id"].values:        col_role_id = "role_id_" + str(_)        df_user_role_matrix.loc[rowi, col_role_id] = 1print("更新完成")print(df_user_role_matrix.head(5).to_string())

根据user_role表的信息,把用户存在的role所在的列更新为“1”,一个用户可能存在多个role

得到[309 rows x 64 columns]的权限矩阵

# 3.进行特征编码、标准化print("进行特征编码")# 使用独热编码from sklearn.preprocessing import OneHotEncoder, StandardScaler
def one_hot_encode(df, cols):    enc = OneHotEncoder(handle_unknown='ignore', sparse_output=False)    enc.fit(df[cols].values)    matrix = enc.transform(df[cols].values)    col_names = []    print("%s %s 类别为:" % (cols, enc.categories_))    for col in cols:        for val in df[col].unique():            col_names.append("%s_%s" % (col, val))return pd.DataFrame(data=matrix, columns=col_names, dtype=int)
pd_encode = one_hot_encode(df_user_role_matrix, ["company_code", "department_name"])print(pd_encode.head(5).to_string())

可以看到,company_code、department_name这两个字段代表公司名称、部门名称,内容都是类别型数据,必须转换成数值型才能进行计算。

常用的编码方式有:序号编码,独热编码,二进制编码等。由于这两个字段里的数据,彼此之间不存在大小等顺序(比如温度:高、中、低)的关系,使用序号编码(1,2,3,4,5)在进行欧氏距离计算时会影响到计算结果。

所以这里使用了热独编码。比如company_code['TBQ', 'TCM', 'TFM', 'TGI', 'TWM']有5种唯一的值,独热编码会将其编码成为一个5维的稀疏矩阵,像下面这样。

从行的角度看,只要这5列中有1列为1,则另外4列都为0。

同理,department_name也会进行编码,只不过由于部门这个属性的类别比较多,所以编码后得到的列也会更多。

df_user_role_matrix = pd.concat([df_user_role_matrix, pd_encode], axis=1)print(df_user_role_matrix.head(5).to_string())print("最终得到的数据:%s行 * %s 列" % df_user_role_matrix.shape)

将编码后的稀疏矩阵拼接到原数据后,最终得到的数据:309行 * 175 列

# 标准化# 由于我这里均为0、1的数据,各列数据没有比例差异,不再需要进行特征缩放。

第二步

进行数据训练

x_train = df_user_role_matrix.drop(["user_id", "user_name", "domain_account", "company_code", "department_name"], axis=1)print(x_train)print("x_train:%s行 * %s 列" % x_train.shape)

去掉user_id、user_name等列,最终将[309 rows x 170 columns]的x_train进行训练

# 局部离群因子(Local Outlier Factor),LOF算法简介

LOF算法,是基于密度的离群点检测方法中一个比较有代表性的算法。

基于统计的异常检测算法通常需要假设数据服从特定的概率分布,这个假设往往是不成立的。而聚类的方法通常只能给出 0/1 的判断(即:是不是异常点),不能量化每个数据点的异常程度。

相比较而言,基于密度的LOF算法要更简单、直观。它不需要对数据的分布做太多要求,还能量化每个数据点的异常程度。

摘自:

https://zhuanlan.zhihu.com/p/28178476 

https://zhuanlan.zhihu.com/p/448276009

from sklearn.neighbors import LocalOutlierFactor# 异常比例设置为auto,也可根据预估期望情况设置,比如0.1即10%lof = LocalOutlierFactor(n_neighbors=50, contamination="auto")y_predict = lof.fit_predict(x_train)y_scores = lof.negative_outlier_factor_print("y_predict:%s" % y_predict)print("y_scores:%s" % y_scores)

结果:y_predict值1为正常,-1为异常;y_scores值,与1的比例越大,代表越异常

print("将计算结果拼接到权限矩阵")df_user_role_matrix_target = pd.concat([df_user_role_matrix, pd.DataFrame({"target": y_predict, "score": y_scores})],axis=1)print(df_user_role_matrix_target)
# 根据计算结果匹配出异常用户的信息outlier_num = df_user_role_matrix_target[df_user_role_matrix_target["target"] == -1].shape[0]print("总数据:%s个,异常数据%s个" % (df_user_role_matrix_target.shape[0], outlier_num))print("异常账号为:%s" % df_user_role_matrix_target[df_user_role_matrix_target["target"] == -1]["user_name"].values)print("异常值为:%s" % df_user_role_matrix_target[df_user_role_matrix_target["target"] == -1]["score"].values)

最终检测出16个异常账号,打印用户账号、异常值

第三步

可视化展示

实际到上一步已经完成算法的使用了,但是可能比较抽象,这里用可视化进行一下展示,会更直观一点。

# 查看一下相关性print("皮尔逊相关系数多重共线性:")print(df_user_role_matrix_target.corr()["target"].sort_values(ascending=False).to_string())

可以看到不同特征与最终结果的相关性(貌似每一个都不是特别相关的...比例都不大)

这里有170个特征,虽然对计算没有影响,但是想直接展示在二维、三维空间显然是不可能的...

这里存粹是为了演示直观一点儿的效果,故意将数据降至二维,用于在X、Y轴进行数据点的展示。常见的降维包括PCA主成分分析(线性降维)、TSNE(非线性降维)等,具体区别和适用场景感兴趣的同学可自行查阅。

# 将多维数据降至2维用于后续的可视化展示from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, method="exact")x_train_pc = tsne.fit_transform(x_train)x_train_pc = StandardScaler().fit_transform(x_train_pc)

降至二维后的数据

# 使用matplotlib查看一下可视化效果# 绘制散点图import matplotlib.pyplot as plt
fig = plt.figure()ax = fig.add_subplot()ax.set_title("异常数据可视化")ax.set_xlabel("异常数量: %s 个" % outlier_num)ax.scatter(x_train_pc[:, 0], x_train_pc[:, 1], c=y_predict, s=3)plt.show()

局部放大

异常数据可视化效果,可以理解那些离群的点就是异常。

扩展

找老A确认了几个账号,说应该是IT的同事,确实不应该有这个系统相关的权限,我也去问了一个当事人,他很惊讶地说为什么他有这个系统的权限,然后登录确认了一下。

我也是现学现卖,这只是一次探索,当然不能排除瞎猫碰上死耗子...... 🐭

如果还有可以优化的思路:

① 根据人工确认后的标签,对训练检测的结果进行对比评分,使用GridSearchCV、cross_val_score等进行算法的选择、参数的调优。

② 扩展一下特征,比如获取一下人力主数据中的岗位序列(开发、测试、运维、业务、产品、运营)

③ 跳出单个系统的权限,用所有系统的账号信息,只去分析某个员工是否应该有某个系统的账号(个人感觉这个可能识别的准确率会更高,更容易检测出异常)

④ 用于其它场景的异常分析,比如内部系统的访问日志(如内部系统的API访问,不再是单纯靠固定的阀值规则去设定单个API的访问频率、访问时间等,这里可以思考一下有哪些特征可以被使用)

后语

老A在公司已经工作了23年,以前主要是开发C/S系统的,这次帮我们开发这个需求,除了要学习内部数据计算平台、前端低代码平台的语法,还自学了Python。之前完全没有安全工作的经验,那天他检查弱密码的时候,自己手写代码进行哈希的字典破解。我告诉他,其实我们都是直接用hash-identifier、hashcat工具的......

他没说话,接着写他的代码,他已经习惯了自己解决问题的方式,并且确实解决了问题。到了这个年龄,还能保持学习的态度和对工作的专注,真的瑞思拜!🙏(这不是卷,他貌似也没有卷的必要了...... DDDD)

合作的过程中,也发现他看待安全问题的视角和关注的点,有时反而是我们安全工作久了容易忽略的,并且对业务的理解和熟悉对安全工作是多么的重要,有时候知识跨界并不是不可能。

这次想写点儿什么分享,也是因为被他的工作热情感染了,和他切磋一下,哈哈。平时工作和项目比较忙,再有机会总结可能又要过很久了。在此祝各位安全同行们都能一直保持热情,虽然这两年企业大环境下安全的资源压力比较大,如果工作中遇到什么困难,多鼓励鼓励自己,想想当初为何出发。

🚬 Old soldiers never die, they just fade away...... 

注:更多详情可点击“原文查看”!


查询和订阅最新安全事件,请关注”安全小飞侠“吧!


文章来源: https://mp.weixin.qq.com/s?__biz=MzAwMzAwOTQ5Nw==&mid=2650941826&idx=1&sn=8d042a1b35492d77e3d01ee9df5a3363&chksm=81373734b640be22c9bee22acff7ca737a54e09c5b9ed14f8f9215d77d8878c28385a68ffad4&scene=58&subscene=0#rd
如有侵权请联系:admin#unsafe.sh