博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
公有云厂商自建威胁情报系统
阅读量:7056 次
发布时间:2019-06-28

本文共 6475 字,大约阅读时间需要 21 分钟。

本文讲的是
公有云厂商自建威胁情报系统
Q:公有云厂商为什么要建立威胁情报系统?威胁情报的价值在哪里?我们不做会有什么损失?威胁情报交换能给我们带来什么附加价值?

各个问题都很尖锐~

业务层面

根据自己的公有云厂商业务特点(提供云主机、云存储、云数据库、云GPU),建立威胁情报体系,那么面临的威胁 主要集中在DDoS攻击和主机入侵。

1、有效的做到事中及时防御,降低用户遭受DDoS攻击总量,减少DDoS用户攻击成本。

2、DDoS攻击溯源,云安全增值服务。

3、内部针对性攻击事件溯源。

技术层面

1、历史追溯:通过聚类关联分析内部外部历史威胁情报数据,获得攻击事件

domain/IP/URL/samples/analysis report

2、及时阻断:能够对DDoS发起点(botnet主控端)、中间过程发起点(反射源/匿名代理TORP2P节点等)、到达点(直接攻击源),造成攻击无法持续。侧重点在前两部分。

3、DDoS溯源,找到攻击源关联的Domain注册信息、Email信息、云主机注册信息等任何通过身份信息注册的账号。

下面是一些攻击证据展示:

Threat Top 1、公有云平台业务系统和云上用户遭受DDoS攻击

Proof 1、没一天停过,在黑洞之前,攻击峰值总和维持在30G左右。

公有云厂商自建威胁情报系统

Proof 2、云上用户占比7成、云基础设施占3成。

公有云厂商自建威胁情报系统

Threat Top 2、云内主机中马

Proof 1、对外DDoS攻击告警

公有云厂商自建威胁情报系统

Proof 2、CDN网络遭受入侵

Proof3、DGA DNS Analytics解析监控,Xshell 感染终端监控。

当然还有很多威胁,我就不在这里赘述。

0x01、如何自建公有云厂商威胁情报系统

首先,需要建立一整套威胁情报生命周期管理:

在保护云上用户的隐私的前提下,有效的建立一套自动化情报收集、情报处理(特征提取工程、关联分析)、情报应用(抗D 、云WAF黑白名单、云基础设施定向攻击排查),以及情报共享(与拥有情报数据资源的厂商交换数据)。

其次,在带宽和服务器资源允许的情况下,使用DPI/DFI技术、蜜罐技术、沙箱技术建立自身的威胁情报收集系统,提供高质量的威胁情报(注意,是带证据的IOC情报)、通过AI技术提高检测速度和降低误报率。

自动化情报收集系统

@1、云数据中心NIDS部署,集中收集日志。

Suricata IDS + ELK

网络拓扑

公有云厂商自建威胁情报系统

主要关注:shellcode监控、mysqlRDPSSH登陆尝试、木马活动、DoS等

@2、DDoS攻击数据源抓取

先说说技术方案选型问题,考虑到性能问题,主要是抓5元组的数据,那么DPI方案不成,xflow流量采样的方式(例如:绿盟NTA解决方案),1:1000抓包,在高速DDoS攻击的时候是无法抓到瞬间脉冲攻击包的。好吧,只有自己开发基于DPDK的数据统计和抓包系统。

公有云厂商自建威胁情报系统

@3、Web威胁数据收集

基础架构是复用DDoS Pcapdump服务器,根据云平台基础设施IP列表,抓取7层数据,存储到redis,然后汇总到数据仓库中。

@4、云平台基础设施服务器部署HIDS系统

有两种选择,一是使用传统的HIDS软件的方式收集,二是使用EDR机器学习的方法分析,我选择了后者。

公有云厂商自建威胁情报系统

公有云厂商自建威胁情报系统

详细描述一下在这一领域的实践之路:

1、训练数据选择

选择800万良性文件和400万恶意文件进行训练,Top 10 恶意家族是:

公有云厂商自建威胁情报系统

2、训练模型选择

(1)    通过https://github.com/erocarrera/pefile.git(VirusTotal和Cuckoo都在使用)访问PE文件(PE头信息、PE导入表、PE导出表、PE节数据、PE版本信息) 哈希所有部分。

Linux系统、MacOSX系统使用https://github.com/lief-project/LIEF 获取 elf文件和MachO文件头相关信息,并且哈希。

(2)    通过对准确率、漏报率、性能、模型大小、查询执行时间的评估来选择模型。(目前使用sklearn库,将来打算移植到ML Toolkit for TensorFlow,听说训练速度可以提升50倍,毕竟GPU越来越吊)

公有云厂商自建威胁情报系统

通过对文件判断的malware分类结果上报数据,提供基于主机的威胁情报源。

@5、公网蜜罐系统

公有云厂商自建威胁情报系统

有关公网蜜罐有以下思考:

(1)需要用高交互蜜罐,直接修改真实服务加入log记录系统,当然log系统尽量做的隐藏些。

蜜罐本身有一定威胁情报产生能力(降低误报(3次以上持续攻击才记录)、生成有价值情报(例如:web蜜罐产生sqlixss等情报数据,而不是只收集access log))

(2)有一套完善的基础服务管理系统,可以快速部署和销毁蜜罐系统,符合大规模部署要求。

情报处理

其实就是建立一套威胁溯源平台,平台提供查询接口,提供一个可视化的关系图图,例如:

公有云厂商自建威胁情报系统

单更重要的情报逻辑处理部分:

这里以DDoS一次攻击事件分析说起:

(1)    首先通过自动化收集系统中收集到的攻击数据:

Data1:{攻击目的IP/攻击时间/攻击类型/攻击峰值/攻击持续时间}

Data2: {攻击目的IP /攻击目的端口/攻击时间/攻击源IP/攻击源端口/地理位置/攻击IP运营商/包大小}  取Top 1000数据

(2)    集合历史攻击数据 ,获取超过2次攻击源IP

(3)    通过端口扫描,回扫。确定IP存活状态和开放端口状态。

(4)    通过外联威胁情报查询IP信誉。确定IP状态排除TOR/匿名代理等

(5)    通过内外情报源查询攻击IP对应malware样本。

(6)    关联蜜罐系统收集样本,获取botnet Server地址。

(7)    对发生攻击的botnet Server发起反制措施。

情报共享

建立提高情报数据共享接口,同时需要对对外提供数据做脱敏处理。

@1、进入到ELK数据,建立威胁情报对外接口

(1)入库:

#!/usr/bin/env python#coding=utf-8import tracebackfrom elasticsearch import Elasticsearchimport sysimport jsonimport datetime,timeimport psycopg2connC = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",port="5432")conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",port="5432")def GetNum():         es = Elasticsearch('x.x.x.x')         data = es.search(index=index_day, body={"query": {"match_all": {}}},size=1)         return data["hits"]["total"]def Getidslog():         es = Elasticsearch('x.x.x.x')         tmp_str = datetime.datetime.now().strftime('%Y.%m.%d')         index_day = 'logstash-' + tmp_str         es.indices.put_settings(                                                 {"index": {                                                        "max_result_window": 500000                                                 }})         num = GetNum()         data = es.search(index=index_day, body={"query": {"match_all": {}}}, size=num)         datalen=len(data["hits"]["hits"])         tableName = "%s_%s" % ("idslog", time.strftime("%Y%m%d"))         cur1 = conn1.cursor()         try:              for c in xrange(0,datalen):#                   print c                     if data["hits"]["hits"][c]:                            m_ctime=data["hits"]["hits"][c]["_source"]["@timestamp"]                            m_src_ip=data["hits"]["hits"][c]["_source"]["src_ip"]                            m_category=data["hits"]["hits"][c]["_source"]["alert"]["category"]                            m_signature=data["hits"]["hits"][c]["_source"]["alert"]["signature"]                            sql = "INSERT INTO %s (ctime,src_ip, category,signature) VALUES ('%s','%s','%s','%s')"                            sqlCmd = sql % (tableName, m_ctime, m_src_ip, m_category, m_signature)                            print sqlCmd                            cur1.execute(sqlCmd)                            conn1.commit()         except Exception,e:              traceback.print_exc()def CreateTable():    curC = connC.cursor()    sqlCreate = "create table if not exists %s (                  ctime TEXT,                 src_ip TEXT,                 category TEXT,                 signature TEXT                 )"    tableName = "%s_%s"%("idslog", time.strftime("%Y%m%d"))    sqlCmd = sqlCreate%tableName    curC.execute(sqlCmd)    curC.close()    connC.commit()if __name__ == '__main__':         CreateTable()         Getidslog()

(2)通过django web方式提供API接口。

  url(r'^api/outxxx/id$', outputAPI.as_view()),//获取某个IP对应的威胁情报,(只提供必要的情报,内部数据需要脱敏)url(r'^api/outxxx/IPlist$', outputIPlistAPI.as_view()), //获得所有ip列表class outputAPI(APIView):    def get(self, request, format=None):        m_src_ip = request.GET.get("ip") //安全机制已屏蔽        print m_src_ip        tableName ='idslog_20170814'        conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",                                port="5432")        cur1 = conn1.cursor()        SQL1="select * from %s WHERE src_ip=%s" %(tableName,m_src_ip)        cur1.execute(SQL1)        rows = cur1.fetchall()        list =[]        for row in rows:            m = {"ctime": row[0], "src_ip": row[1],"category":row[2],"signature":row[3]}            list.append(m)        b=json.dumps(list)        return HttpResponse(b)class outputIPlistAPI(APIView):    def get(self, request, format=None):        tableName = 'idslog_20170814'        conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",                                 port="5432")        cur1 = conn1.cursor()        SQL1 = "select DISTINCT(src_ip) from {} ".format(tableName)        cur1.execute(SQL1)        rows = cur1.fetchall()        list = []        for row in rows:            m = {"src_ip": row[0]}            list.append(m)        b = json.dumps(list)        return HttpResponse(b)

0x03、总结

公有云厂商自建威胁情报系统,是一个长期的投入,虽然短期看不出来多少效果,但是针对数据情报的积累到一定量的时候会有质的变化。

原文发布时间为:2017年8月23日
本文作者:bt0sea
本文来自云栖社区合作伙伴嘶吼,了解相关信息可以关注嘶吼网站。

转载地址:http://ojwll.baihongyu.com/

你可能感兴趣的文章
Java EE守护者将推动Jakarta EE的发展
查看>>
Apache Kylin在绿城客户画像系统中的实践
查看>>
通过jsonp获取json数据--实现AJAX跨域请求
查看>>
使用gRPC构建真实世界的微服务
查看>>
建立自组织敏捷团队
查看>>
[linux内核]完整编译内核源码的过程
查看>>
elasticsearch之排序
查看>>
Magento表单使用ajax验证
查看>>
mac上安装使用docker
查看>>
MySQL5.7.11压缩包版安装
查看>>
如何提升代码水平
查看>>
Elixir 分布式 Application 故障转移和接管
查看>>
微软为 Chrome 和 Firefox 发布了 Windows Defender 扩展
查看>>
英特尔基于 LLVM 的 SYCL 开源编译器现已发布
查看>>
java B2B2C 多租户电子商城系统
查看>>
(十二)java springboot b2b2c shop 多用户商城系统源码- SSO单点登录之OAuth2.0 登出流程(3)...
查看>>
换个角度看GAN:另一种损失函数
查看>>
连接mysql报错Table ‘performance_schema.session_variables’ ...
查看>>
Linux基础命令---zipinfo
查看>>
关于移动互联网产品的指标分析初探
查看>>