威胁狩猎第一步
威胁狩猎第一步
一、前言
尽管自动化安全工具以及第一层和第二层安全运营中心 (SOC) 分析师应该能够处理大约 80% 的威胁,但我们仍需对余下的20%保持高度的警惕。这部分未被充分覆盖的威胁中,往往隐藏着更为复杂且可能带来重大损失的潜在风险。
网络威胁狩猎为企业安全带来人为元素,补充了自动化系统的不足。凭借丰富的人为经验能够在威胁可能导致严重问题之前发现、记录、监控并消除威胁。
二、简单的异常登陆检测
假设我们要构建一个最常见的异常检测场景:从海量的SSH连接日志中筛选出异常连接。
那么,如何界定SSH连接的异常性呢?我们可以从多个维度入手,比如时间维度(例如深夜的非正常工作时间段)、机器属性(如机器归属人、归属部门)以及历史记录等。这里,我们选择通过历史记录进行筛选,来做一个简单的实践。具体来说,我们可以统计今天的SSH连接记录,出那些在过去一个月中从未出现过的连接,作为初步的异常检测。
虽然这个思路看似简单,但在实际操作中,却需要使用大数据处理组件来进行。毕竟,一个月的SSH连接数据量庞大,而且每条SSH日志除了包含IP等基本信息外,还包含其他丰富的附加信息,整体数据量大概10G。
我使用mac M 6G机器上对10GB的(模拟生成的)进行简单去重。
代码语言:javascript代码运行次数:0运行复制import time
from collecti import Counter
with open("",encoding="gbk") as f:
content = f.read()
start_time = ()
word_list = content.split(" ")
word_counts = Counter(word_list)
end_time = ()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")
print(word_counts)
使用python进行处理,内存直接爆炸了,就更不用说处理时间方面了(脚本都没跑完)
那如何解决呢?
三、站在巨人的肩膀上spark
剖析一下为何此次操作会以失败告终:原因在于,我试图一次性将10GB的庞大数据文件全部加载到内存中,随后使用Python进行split、Counter等处理操作,这无疑导致内存使用量急剧飙升,最终因内存耗尽而引发程序崩溃。
为了优化这一流程,我们应当采取分段读取的策略,即每次仅读取文件的一部分数据,并对其进行相应的计算处理。同时,我们还可以利用多线程技术来充分压榨CPU的性能,从而提升运算效率(当然,这一切都需要在严格的内存管理之下进行)。
事实上,上述这些繁琐的步骤已经有人为我们提前做好了:Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模的数据进行快速分析查询。
比如我们生成的一个亿连接日志
代码语言:javascript代码运行次数:0运行复制import random
with open("","w") as f:
for _i in range(1,10000000):
host_ip = "10.0.{}.{}".format(str(random.randint(0, 255)),str(random.randint(0, 255)))
dst_ip = "10.0.{}.{}".format(str(random.randint(0, 255)), str(random.randint(0, 255)))
# print(f"log_service,174770440,aegis-log-login,22,{host_ip},{dst_ip},xxxxx-xxxxx-xxx-xxxx-xxxxxxx,SSH,root,inet-xxxxx-xxxx-xxxx-xxx-xxxxxxx")
f.write(f"log_service,174770440,aegis-log-login,22,{host_ip},{dst_ip},xxxxx-xxxxx-xxx-xxxx-xxxxxxx,SSH,root,inet-xxxxx-xxxx-xxxx-xxx-xxxxxxx\n")
代码语言:javascript代码运行次数:0运行复制cat >>
cat >>
cat >>
cat >>
cat >>
cat >>
cat >>
cat >>
cat >>
cat >>
比如我们的 login日志文件是(csv)格式,将dst_ip > host_ip作为一条记录
注意:这里还需要使用hadoop,因为涉及到文件的共享处理的问题,而spark又比较好支持haddop。
代码语言:javascript代码运行次数:0运行复制String inputPath = "hdfs://127.0.0.1:9000/";
JavaPairRDD<String, Integer> counts = textFile.
flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] columns = s.split(",\\s*");
if (columns.length < 4) {
// 如果列数不足4,则跳过这一行
return null;
}
String joinedKey = columns[5]+ ">" + columns[4];
return Arrays.asList(joinedKey).iterator();
}
})
.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word, 1);
}
})
.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
});
统计dst_ip > host_ip的次数,并且进行排序
代码语言:javascript代码运行次数:0运行复制//先将key和value倒过来,再按照key排序
JavaPairRDD<Integer, String> sorts = counts
//key和value颠倒,生成新的map
.mapToPair(tuple2 -> new Tuple2<>(tuple2._2(), tuple2._1()))
//按照key倒排序
.sortByKey(false);
最终保存结果
代码语言:javascript代码运行次数:0运行复制//分区合并成一个,再导出为一个txt保存在hdfs
javaSparkContext.parallelize(all).coalesce(1).saveAsTextFile(outputPath);
最后搭建spark docker镜像里面,这里我们将内存限制了4G。
代码语言:javascript代码运行次数:0运行复制version: '.8'
services:
spark:
image: docker.io/bitnami/spark:.5
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHETICATIO_EABLED=no
- SPARK_RPC_ECRYPTIO_EABLED=no
- SPARK_LOCAL_STORAGE_ECRYPTIO_EABLED=no
- SPARK_SSL_EABLED=no
- SPARK_USER=spark
ports:
- '8080:8080'
- '7077:7077'
spark-worker:
image: docker.io/bitnami/spark:.5
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHETICATIO_EABLED=no
- SPARK_RPC_ECRYPTIO_EABLED=no
- SPARK_LOCAL_STORAGE_ECRYPTIO_EABLED=no
- SPARK_SSL_EABLED=no
- SPARK_USER=spark
1.7GB,一个亿的数据量,spark在一个4G内存,一个核心CPU的容器中101秒完成从hadoop读取文件,并且对文件进行解析、计算。
最后一个简单的脚本,即可筛选出的想要的登陆数据
代码语言:javascript代码运行次数:0运行复制test_list = [
"10.0.156.206>10.0.217.22",
"10.0.156.206>10.0.217.224"
]
content = ""
with open("part-00000") as f:
content = f.read()
for _test in test_list:
if _test in content:
print(_test + " normal")
else:
print(_test + " abnormal")
检测的结果并不是认为异常的,还需要结合多维度的日志才能得出最终结论。
四、总结
这种规模庞大的离线数据处理场景其实屡见不鲜,上述例子只是其中最为基础的一个。同样地,诸如进程的异常派生、网络的异常连接等复杂问题,也都可以运用类似的思路来解决。(在处理这些场景时,采用Apache Spark作为解决方案,是一个不错的选择。)
:lufeisec
腾讯技术创作特训营S11#重启人生
#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格
推荐阅读
留言与评论(共有 15 条评论) |
本站网友 南阳不孕不育 | 12分钟前 发表 |
xxxxx-xxxxx-xxx-xxxx-xxxxxxx | |
本站网友 上海奉贤区酒店 | 15分钟前 发表 |
host_ip = "10.0.{}.{}".format(str(random.randint(0 | |
本站网友 初中英语不好怎么办 | 12分钟前 发表 |
Integer>() { @Override public Integer call(Integer a | |
本站网友 沿海赛洛城租房 | 26分钟前 发表 |
即每次仅读取文件的一部分数据 | |
本站网友 丰胸穴位 | 6分钟前 发表 |
它使用内存中缓存和优化的查询执行方式 | |
本站网友 去颊脂垫手术 | 8分钟前 发表 |
{dst_ip} | |
本站网友 狐臭治疗 | 12分钟前 发表 |
这部分未被充分覆盖的威胁中 | |
本站网友 坪洲岛 | 23分钟前 发表 |
网络的异常连接等复杂问题 | |
本站网友 爱因斯坦的超级问题 | 21分钟前 发表 |
255)) | |
本站网友 无锡江大附中 | 7分钟前 发表 |
它使用内存中缓存和优化的查询执行方式 | |
本站网友 人民币定期存款利率 | 2分钟前 发表 |
这无疑导致内存使用量急剧飙升 | |
本站网友 新生儿喝什么奶粉好 | 15分钟前 发表 |
出那些在过去一个月中从未出现过的连接 | |
本站网友 上海独栋办公楼 | 24分钟前 发表 |
还包含其他丰富的附加信息 | |
本站网友 荣盛塞纳河谷 | 4分钟前 发表 |
往往隐藏着更为复杂且可能带来重大损失的潜在风险 |