最近半年,我开始投入从事信息安全合规的相关工作,实践中发现一个很让人有点尴尬的现象,经常一些最新的监管动态或行业信息是靠朋友圈、微信群聊、刷微博、甚至其他同事转发才了解到的。而作为合规团队的一员,理应掌握第一手消息,并对最新变化进行影响分析,最终形成有效的工作指导。
因此,为了让自己能够更方便地获取情报输入,我决定自己动手,跟踪各主要监管方的消息发布渠道,并将最新消息发送到我们的办公软件上。
主要跟踪的是网信办、工信部、通管局、tc260、病毒检测中心、国家信息技术安全研究中心等单位的官方网站和官方微博账号。
一开始琢磨的时候,想着还挺复杂,一个页面抓下来以后,要如何分析,怎么把所需要的信息提取出来。但真的上手以后,发现事情并没有那么复杂,大部分的站点都会把消息以JSON的形式吐到本地,我们只需要操作JSON就好了。
所以,第一个任务就是找到对应的JSON,我们可以利用Chrome浏览器的「网络」组件进行分析。
以某单位的「时讯要闻」为例,通过在Chrome浏览器里可以观察到,本地js请求了服务端的新闻JSON列表,然后再加载到本地前端程序的。
这次实现用的是Python,没用Go。因为我觉得用Go去反序列化,还得先定义数据结构,有点麻烦。。。。
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import requests
def query(url):
r = requests.get(url, headers=headers)
result = json.loads(r.text)
title = result['list'][0]['list'][0]['title']
pubtime = str(result['list'][0]['list'][0]['viewDate'])
if result['list'][0]['type'] == 79:
link = xwzx_prefix + str(result['list'][0]['list'][0]['id'])
else:
link = aqdt_prefix + str(result['list'][0]['list'][0]['id'])
bbs_name = result['list'][0]['list'][0]['sourceFrom']
data = {
"title":title,
"pubtime":pubtime,
"link":link,
"bbs_name":bbs_name
}
return data
在用什么方法获得最新消息这块,有一个小坑。
例如某国字头单位,一开始,我想着是每30分钟抓取一次最新的消息,如果当前时间减去发布时间小于30分钟,那程序就认为这是最新消息,并发送到我们的办公软件上。
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import time
nowtime = time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.localtime())
def send_message(url):
result = query(url)
time_now_ux = int(time.time())
time_pub_ux = int(result['pubtime'])
dev = time_now_ux - time_pub_ux
if dev <= 1800:
send(news)
print(news)
else:
print(nowtime, "--*-- No news updated --*--")
可实操的时候发现好几次,网站更新了,程序却没响应。我一度以为自己写的代码逻辑有问题,反复调试了2天我才闹明白,不是我的问题,而是他们的发布时间有问题。
在这里就不得不吐槽我们一些单位发新闻,完全没有时间观念啊。比如新闻上显示发布时间是14点37分,可却是16:20发布的,程序在16:30检查的时候,发现最新的新闻是2个小时以前的,不在最近30分钟内,就给忽略了 😮💨。
所以,最后,我使用了更麻烦的内容比较的方法,具体逻辑如下:
读取新闻列表JSON
将第1条消息存到一个本地文本文件
指定时间后再次读取新闻列表,并与本地文本文件进行比较
如果最新消息与本地文件不一致,则认为新闻列表已更新
调用消息推送函数,推送最新消息
将最新消息写入本地文本文件
# !/usr/bin/env python
# -*- coding: utf-8 -*-
if __name__ == '__main__':
news = latest_news_list()
content = []
with open(log_file) as f1:
raw_content = f1.readlines()
for rc in raw_content:
content.append(rc.rstrip())
for n in news:
if n in content:
print(nowtime, "----- No news updated -----")
else:
send()
write_file(n)
将代码上传服务器后,用crontab来定制定时任务。
如果需要每隔30分钟跑一次程序,crontab的时间配置为:/30 * * * * [command]
配置可以在这里:https://tool.lu/crontab/ 验证。
另外,需要注意的是,如果服务器是用pyenv来指定Python版本的话,在添加crontab任务的时候,要使用Python命令的绝对路径。如下所示。
*/30 * * * * /root/.pyenv/versions/3.9.9/bin/python /data/crawler/news.py >> /data/crawler/news.log 2>&1
最后,你就拥有了一个自动的新闻播报员。 😆
作者:韭不黄,十余年来一直混迹于信息安全行业,扛过设备,卖过服务,做过审计,查过黑客,反过欺诈,岁月神偷带走了我的苹果肌,留下了一肚子的瘫软,貌似我将要成为一个油腻的中年男人,不过依然对世界充满好奇,依然是一颗嫩绿的韭菜~