您现在的位置是:首页 > 编程 > 

威胁狩猎第一步

2025-07-23 07:37:45
威胁狩猎第一步 一、前言尽管自动化安全工具以及第一层和第二层安全运营中心 (SOC) 分析师应该能够处理大约 80% 的威胁,但我们仍需对余下的20%保持高度的警惕。这部分未被充分覆盖的威胁中,往往隐藏着更为复杂且可能带来重大损失的潜在风险。网络威胁狩猎为企业安全带来人为元素,补充了自动化系统的不足。凭借丰富的人为经验能够在威胁可能导致严重问题之前发现、记录、监控并消除威胁。二、简单的异常登

威胁狩猎第一步

一、前言

尽管自动化安全工具以及第一层和第二层安全运营中心 (SOC) 分析师应该能够处理大约 80% 的威胁,但我们仍需对余下的20%保持高度的警惕。这部分未被充分覆盖的威胁中,往往隐藏着更为复杂且可能带来重大损失的潜在风险。

网络威胁狩猎为企业安全带来人为元素,补充了自动化系统的不足。凭借丰富的人为经验能够在威胁可能导致严重问题之前发现、记录、监控并消除威胁。

二、简单的异常登陆检测

假设我们要构建一个最常见的异常检测场景:从海量的SSH连接日志中筛选出异常连接。

那么,如何界定SSH连接的异常性呢?我们可以从多个维度入手,比如时间维度(例如深夜的非正常工作时间段)、机器属性(如机器归属人、归属部门)以及历史记录等。这里,我们选择通过历史记录进行筛选,来做一个简单的实践。具体来说,我们可以统计今天的SSH连接记录,出那些在过去一个月中从未出现过的连接,作为初步的异常检测。

虽然这个思路看似简单,但在实际操作中,却需要使用大数据处理组件来进行。毕竟,一个月的SSH连接数据量庞大,而且每条SSH日志除了包含IP等基本信息外,还包含其他丰富的附加信息,整体数据量大概10G。

我使用mac M 6G机器上对10GB的(模拟生成的)进行简单去重。

代码语言:javascript代码运行次数:0运行复制
import time
from collecti import Counter


with open("",encoding="gbk") as f:
    content = f.read()
    start_time = ()


    word_list = content.split(" ")
    word_counts = Counter(word_list)
    end_time = ()
    execution_time = end_time - start_time
    print(f"Execution time: {execution_time} seconds")
    print(word_counts)

使用python进行处理,内存直接爆炸了,就更不用说处理时间方面了(脚本都没跑完)

那如何解决呢?

三、站在巨人的肩膀上spark

剖析一下为何此次操作会以失败告终:原因在于,我试图一次性将10GB的庞大数据文件全部加载到内存中,随后使用Python进行split、Counter等处理操作,这无疑导致内存使用量急剧飙升,最终因内存耗尽而引发程序崩溃。

为了优化这一流程,我们应当采取分段读取的策略,即每次仅读取文件的一部分数据,并对其进行相应的计算处理。同时,我们还可以利用多线程技术来充分压榨CPU的性能,从而提升运算效率(当然,这一切都需要在严格的内存管理之下进行)。

事实上,上述这些繁琐的步骤已经有人为我们提前做好了:Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模的数据进行快速分析查询。

比如我们生成的一个亿连接日志

代码语言:javascript代码运行次数:0运行复制
import random


with open("","w") as f:
    for _i in range(1,10000000):
        host_ip = "10.0.{}.{}".format(str(random.randint(0, 255)),str(random.randint(0, 255)))
        dst_ip = "10.0.{}.{}".format(str(random.randint(0, 255)), str(random.randint(0, 255)))
        # print(f"log_service,174770440,aegis-log-login,22,{host_ip},{dst_ip},xxxxx-xxxxx-xxx-xxxx-xxxxxxx,SSH,root,inet-xxxxx-xxxx-xxxx-xxx-xxxxxxx")
        f.write(f"log_service,174770440,aegis-log-login,22,{host_ip},{dst_ip},xxxxx-xxxxx-xxx-xxxx-xxxxxxx,SSH,root,inet-xxxxx-xxxx-xxxx-xxx-xxxxxxx\n")
代码语言:javascript代码运行次数:0运行复制
cat  >> 
cat  >> 
cat  >> 
cat  >> 
cat  >> 
cat  >> 
cat  >> 
cat  >> 
cat  >> 
cat  >> 

比如我们的 login日志文件是(csv)格式,将dst_ip > host_ip作为一条记录

注意:这里还需要使用hadoop,因为涉及到文件的共享处理的问题,而spark又比较好支持haddop。

代码语言:javascript代码运行次数:0运行复制
String inputPath = "hdfs://127.0.0.1:9000/";


JavaPairRDD<String, Integer> counts = textFile.
        flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] columns = s.split(",\\s*");
                if (columns.length < 4) {
                    // 如果列数不足4,则跳过这一行
                    return null;
                }
                String joinedKey =  columns[5]+ ">" + columns[4];
                return Arrays.asList(joinedKey).iterator();
            }
        })
        .mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        })
        .reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer a, Integer b) throws Exception {
                return a + b;
            }
        });

统计dst_ip > host_ip的次数,并且进行排序

代码语言:javascript代码运行次数:0运行复制
//先将key和value倒过来,再按照key排序
JavaPairRDD<Integer, String> sorts = counts
        //key和value颠倒,生成新的map
        .mapToPair(tuple2 -> new Tuple2<>(tuple2._2(), tuple2._1()))
        //按照key倒排序
        .sortByKey(false);

最终保存结果

代码语言:javascript代码运行次数:0运行复制
//分区合并成一个,再导出为一个txt保存在hdfs
javaSparkContext.parallelize(all).coalesce(1).saveAsTextFile(outputPath);

最后搭建spark docker镜像里面,这里我们将内存限制了4G。

代码语言:javascript代码运行次数:0运行复制
version: '.8'


services:
  spark:
    image: docker.io/bitnami/spark:.5
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHETICATIO_EABLED=no
      - SPARK_RPC_ECRYPTIO_EABLED=no
      - SPARK_LOCAL_STORAGE_ECRYPTIO_EABLED=no
      - SPARK_SSL_EABLED=no
      - SPARK_USER=spark
    ports:
      - '8080:8080'
      - '7077:7077'
  spark-worker:
    image: docker.io/bitnami/spark:.5
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHETICATIO_EABLED=no
      - SPARK_RPC_ECRYPTIO_EABLED=no
      - SPARK_LOCAL_STORAGE_ECRYPTIO_EABLED=no
      - SPARK_SSL_EABLED=no
      - SPARK_USER=spark

1.7GB,一个亿的数据量,spark在一个4G内存,一个核心CPU的容器中101秒完成从hadoop读取文件,并且对文件进行解析、计算。

最后一个简单的脚本,即可筛选出的想要的登陆数据

代码语言:javascript代码运行次数:0运行复制
test_list = [
    "10.0.156.206>10.0.217.22",
    "10.0.156.206>10.0.217.224"
]


content = ""
with open("part-00000") as f:
    content = f.read()


for _test in test_list:
    if _test in content:
        print(_test + " normal")
    else:
        print(_test + " abnormal")

检测的结果并不是认为异常的,还需要结合多维度的日志才能得出最终结论。

四、总结

这种规模庞大的离线数据处理场景其实屡见不鲜,上述例子只是其中最为基础的一个。同样地,诸如进程的异常派生、网络的异常连接等复杂问题,也都可以运用类似的思路来解决。(在处理这些场景时,采用Apache Spark作为解决方案,是一个不错的选择。)

:lufeisec

腾讯技术创作特训营S11#重启人生

#感谢您对电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格的认可,转载请说明来源于"电脑配置推荐网 - 最新i3 i5 i7组装电脑配置单推荐报价格

本文地址:http://www.dnpztj.cn/biancheng/1191845.html

相关标签:无
上传时间: 2025-07-22 23:32:02
留言与评论(共有 15 条评论)
本站网友 南阳不孕不育
12分钟前 发表
xxxxx-xxxxx-xxx-xxxx-xxxxxxx
本站网友 上海奉贤区酒店
15分钟前 发表
        host_ip = "10.0.{}.{}".format(str(random.randint(0
本站网友 初中英语不好怎么办
12分钟前 发表
 Integer>() {             @Override             public Integer call(Integer a
本站网友 沿海赛洛城租房
26分钟前 发表
即每次仅读取文件的一部分数据
本站网友 丰胸穴位
6分钟前 发表
它使用内存中缓存和优化的查询执行方式
本站网友 去颊脂垫手术
8分钟前 发表
{dst_ip}
本站网友 狐臭治疗
12分钟前 发表
这部分未被充分覆盖的威胁中
本站网友 坪洲岛
23分钟前 发表
网络的异常连接等复杂问题
本站网友 爱因斯坦的超级问题
21分钟前 发表
 255))
本站网友 无锡江大附中
7分钟前 发表
它使用内存中缓存和优化的查询执行方式
本站网友 人民币定期存款利率
2分钟前 发表
这无疑导致内存使用量急剧飙升
本站网友 新生儿喝什么奶粉好
15分钟前 发表
出那些在过去一个月中从未出现过的连接
本站网友 上海独栋办公楼
24分钟前 发表
还包含其他丰富的附加信息
本站网友 荣盛塞纳河谷
4分钟前 发表
往往隐藏着更为复杂且可能带来重大损失的潜在风险