基于大数据和机器学习的web异常参数检测系统Demo实现

Posted by webber on Category: 机器学习 Tags: 机器学习 HMM 隐马尔可夫 web异常参数

基于大数据和机器学习的

web异常参数检测系统Demo实现


原文发表在FreeBuf:http://www.freebuf.com/articles/web/134334.html

一、前言

 如何在网络安全领域利用数据科学解决安全问题一直是一个火热的话题,讨论算法和实现的文章也不少。前段时间看到楚安的文章《数据科学在Web威胁感知中的应用》,其中提到如何用隐马尔可夫模型(HMM)建立web参数模型,检测注入类的web攻击。获益匪浅,随尝试用python实现该算法,并尝试在大数据环境下的部署应用。

二、算法一般过程

 隐马尔可夫模型是一个统计模型,可以利用这个模型解决三类基本问题:

  • 学习问题:给定观察序列,学习出模型参数
  • 评估问题:已知模型参数,评估出观察序列出现在这个模型下的概率
  • 解码问题:已知模型参数和给出的观察序列,求出可能性最大的隐藏状态序列

 这里我们是要解决前两类问题,使用白样本**数据学习出模型和参数基线,计算检测数据在该模型下出现的可能性,如果得分低于基线就可以认为这个参数异常,产出告警。算法可分为训练过程和检测过程,算法本身我这里不在细说(可参见前言中的文章或兜哥的文章),这里重点讲一下参数的抽取和泛化。 训练过程

检测过程

参数的抽取

 对http请求数据进行拆解,提取如下参数,这部分的难点在于如何正确的识别编码方式并解码:

  • GET、POST、Cookie请求参数
  • GET、POST、Cookie参数名本身
  • 请求的URL路径
  • http请求头,如Content_type、Content-Length(对应strust2-045)

参数泛化

 需要将参数值泛化为规律性的观测经验,并取字符的unicode数值作为观察序列,泛化的方法如下:

  • 大小写英文字母泛化为”A”,对应的unicode数值为65
  • 数字泛化为”N”,对应的unicode数值为78
  • 中文或中文字符泛化为“C”,对应的unicode数值为67
  • 特殊字符和其他字符集的编码不作泛化,直接取unicode数值
  • 参数值为空的取0

三、系统架构

 在训练过程中要使用尽可能多的历史数据进行训练,这显然是一个批(batch)计算过程;在检测过程中我们希望能够实时的检测数据,及时的发现攻击,这是一个流(streaming)计算过程。典型的批+流式框架如Cisco的Opensoc使用开源大数据架构,kafka作为消息总线,Storm进行实时计算,Hadoop存储数据和批量计算。但是这样的架构有一个缺点,我们需要维护Storm和MapReduce两套不同的代码。考虑到学习成本,使用Spark作为统一的数据处理引擎,即可以实现批处理,也可以使用spark streaming实现近实时的计算。 系统架构  系统架构如上图,需要在spark上运行三个任务,spark streaming将kafka中的数据实时的存入hdfs;训练算法定期加载批量数据进行模型训练,并将模型参数保存到Hdfs;检测算法加载模型,检测实时数据,并将告警保存到ES。

四、Spark简介

 Apache Spark是一个快速通用的大数据计算框架,由Scala语言实现,同时提供Java、python、R语言的API接口。相比于Hadoop的Mapreduce,Spark可以实现在内存中计算,具有更高的计算速度,并且spark streaming提供流数据计算框架,以类似批处理的方式处理流数据。

RDD

 RDD是Spark中抽象的数据结构类型,是一个弹性分布式数据集,数据在Spark中被表示为RDD。RDD提供丰富的API接口,实现对数据的操作,如map、flatmap、reduce、filter、groupby等等。

DStream

 DStream(离散数据流)是Spark Streaming中的数据结构类型,它是由特定时间间隔内的数据RDD构成,可以实现与RDD的互操作,Dstream也提供与RDD类似的API接口。

DataFrame

 DataFrame是spark中结构化的数据集,类似于数据库的表,可以理解为内存中的分布式表,提供了丰富的类SQL操作接口。

五、数据采集与存储

 获取http请求数据通常有两种方式,第一种从web应用中采集日志,使用logstash从日志文件中提取日志并泛化,写入Kafka(可参见兜哥文章);第二种可以从网络流量中抓包提取http信息。我这里使用第二种,用python结合Tcpflow采集http数据,在数据量不大的情况下可稳定运行。

数据采集

 与Tcpdump以包单位保存数据不同,Tcpflow是以流为单位保存数据内容,分析http数据使用tcpflow会更便捷。Tcpflow在linux下可以监控网卡流量,将tcp流保存到文件中,因此可以用python的pyinotify模块监控流文件,当流文件写入结束后提取http数据,写入Kafka,Python实现的过程如下图。 数据采集过程

核心代码:

子进程,处理数据到kafka
queue = Queue()
cess(target=processKafka,args=(queue,options.kafka,options.topic))
threadKafka.start()
#子线程,开启并监控TCPFLOW
tempDir=tempfile.mkdtemp()
threadPacp=threading.Thread(target=processPcap,args=(tempDir,tcpFlowPath,tcpflow_args))
threadPacp.start()
#主进程,监控文件并生成数据
wm=pyinotify.WatchManager()
wm.add_watch(tempDir,pyinotify.ALL_EVENTS)
eventHandler=MonitorFlow(queue)
notifier=pyinotify.Notifier(wm,eventHandler)
notifier.loop()

数据存储

 开启一个Spark Streaming任务,从kafka消费数据写入Hdfs,Dstream的python API没有好的入库接口,需要将Dstream的RDD转成DataFrame进行保存,保存为json文件。 核心代码:

topic = {in_topic: in_topic_partitions}
#从kafka获取数据生成Dstream
dstream = KafkaUtils.createStream(ssc, zookeeper,app_conf["app_name"], topic)
dstream = dstream.map(lambda record: json.loads(record[1]))
dstream.foreachRDD(lambda rdd: self.save(rdd))
#将RDD转成DataFrame存入Hdfs
def save(self, rdd):
    if rdd.take(1):
        df = sqlcontext.createDataFrame(rdd)
        df.write.json(app_conf["savedir"], mode="append")
    else:
        pass

六、算法实现

抽取器(Extractor)

 抽取器实现原始数据的参数提取和数据泛化,传入一条json格式的http请求数据,可以返回所有参数的id、参数类型、参数名、参数的观察状态序列。 代码示例:

class Extractor(object):
    def __init__(self,data):
        self.parameter={}
        self.data=data
        self.uri = urllib.unquote(data["uri"].encode("utf-8"))
        self.path = decode(get_path(self.uri))
        self.payload = get_payload(self.uri).strip("?")
        self.get_parameter()
#提取post参数
def post(self):
    post_data=urllib.unquote(urllib.unquote(self.data["data"]))
    content_t=self.data["content_type"]
#提取urlencode编码的参数
    def ex_urlencoded(post_data):
        for p in post_data.split("&"):
            p_list = p.split("=")
            p_name = p_list[0]
            if len(p_list) > 1:
                p_value = reduce(operator.add, p_list[1:])
        #取md5作为参数id
                p_id = get_md5(self.data["host"] + self.path + decode(p_name) + self.data["method"])
                p_state = self.get_Ostate(p_value)
                p_type = "post"
                yield (p_id, p_state, p_type, p_name)
#提取json格式的参数
    def ex_json(post_data):
        post_data=json.loads(post_data)
        for p_name,p_value in post_data.items():
            p_id = get_md5(self.data["host"] + self.path + decode(p_name) + self.data["method"])
            p_state=self.get_Ostate(str(p_value))
            p_type="post"
            yield (p_id, p_state, p_type, p_name)

训练器(Trainer)

 训练器完成对参数的训练,传入参数的所有观察序列,返回训练好的模型和profile,HMM模型使用python下的hmmlearn模块,profile取观察序列的最小得分。 核心代码:

class Trainer(object):
    def __init__(self,data):
        self.p_id=data["p_id"]
        self.p_state=data["p_states"]
    def train(self):
        Hstate_num=range(len(self.p_state))
        Ostate_num=range(len(self.p_state))
        Ostate = []
        for (index,value) in enumerate(self.p_state):
            Ostate+=value     #观察状态序列
            Hstate_num[index]=len(set(np.array(value).reshape(1,len(value))[0]))
            Ostate_num[index]=len(value)
        self.Ostate=Ostate
        self.Hstate_num=Hstate_num
        self.n=int(round(np.array(Hstate_num).mean()))#隐藏状态数
        model = GaussianHMM(n_components=self.n, n_iter=1000, init_params="mcs",covariance_type="full")
        model.fit(np.array(Ostate),lengths=Ostate_num)
#计算基线
    def get_profile(self):
        scores=np.array(range(len(self.p_state)),dtype="float64")
        for (index,value) in enumerate(self.p_state):
            scores[index]=self.model.score(value)
        self.profile=float(scores.min())
        self.scores=scores
训练任务
Spark训练任务抽取所有http请求数据的参数,并按照参数ID分组,分别进行训练,将训练模型保存到Hdfs。
核心代码:
    #读取原始数据
    df =sqlcontext.read.json(self.app_conf["data_dir"])
    rdd=df.toJSON()
#过滤出请求数据
    p_rdd=rdd.filter(self.filter).cache()
#抽取数据参数
    p_rdd=p_rdd.flatMap(self.extract).cache()
    p_list=p_rdd.collect()
    p_dict={}
#按照参数ID分组
    for p in p_list:
        if p.keys()[0] not in p_dict.keys():
            p_dict[p.keys()[0]]={}
            p_dict[p.keys()[0]]["p_states"]=[p.values()[0]["p_state"]]
            p_dict[p.keys()[0]]["p_type"]=p.values()[0]["p_type"]
            p_dict[p.keys()[0]]["p_name"] = p.values()[0]["p_name"]
        p_dict[p.keys()[0]]["p_states"].append(p.values()[0]["p_state"])
    for key in p_dict.keys():
        if len(p_dict[key]["p_states"]) <self.app_conf["min_train_num"]:
            p_dict.pop(key)
    models=[]
  #训练参数模型
    for p_id in p_dict.keys():
        data={}
        data["p_id"]=p_id
        data["p_states"]=p_dict[p_id]["p_states"]
        trainer=Trainer(data)
        (m,p)=trainer.get_model()
        model = {}
        model["p_id"] = p_id
        model["p_type"]=p_dict[p_id]["p_type"]
        model["p_name"] = p_dict[p_id]["p_name"]
        model["model"] = pickle.dumps(m)
        model["profile"] = p
        models.append(model)
        logging.info("[+]Trained:%s,num is %s"%(p_id,trained_num))
        trained_num+=1
#保存模型参数到Hdfs,保存为Json文件
    model_df=sqlcontext.createDataFrame(models)
    date=time.strftime("%Y-%m-%d_%H-%M")
    path="hdfs://%s:8020%smodel%s.json"%(self.app_conf["namenode_model"],self.app_conf["model_dir"],date)
    model_df.write.json(path=path)

检测任务

 Spark Streaming检测任务实时获取kafka流数据,抽取出数据的参数,如果参数有训练模型,就计算参数得分,小于基线输出告警到Elasticsearch。 核心代码:

#获取模型参数
    model_data = sqlcontext.read.json(self.app_conf["model_dir"]).collect()
    model_keys=[0]*len(model_data)
    for index,model_d in enumerate(model_data):
        model_keys[index]=model_d["p_id"]
    ssc=StreamingContext(sc,20)
    model_data = ssc._sc.broadcast(model_data)
    model_keys = ssc._sc.broadcast(model_keys)
    zookeeper = self.app_conf["zookeeper"]
    in_topic = self.app_conf["in_topic"]
    in_topic_partitions = self.app_conf["in_topic_partitions"]
    topic = {in_topic: in_topic_partitions}
#获取kafka数据
    dstream = KafkaUtils.createStream(ssc, zookeeper, self.app_conf["app_name"], topic)
#过滤出请求数据
    dstream=dstream.filter(self.filter)
#对每条数据进行检测
    dstream.foreachRDD(
       lambda rdd: rdd.foreachPartition(
           lambda iter:self.detector(iter,model_data,model_keys)
       )
    )
    ssc.start()
    ssc.awaitTermination()
def detector(self, iter,model_data,model_keys):
    es = ES(self.app_conf["elasticsearch"])
    index_name = self.app_conf["index_name"]
    type_name = self.app_conf["type_name"]
    model_data=model_data.value
    model_keys=model_keys.value
    for record in iter:
        record=json.loads(record[1])
        try:
            #抽取数据参数
parameters = Extractor(record).parameter
            for (p_id, p_data) in parameters.items():
                if p_id in model_keys:
                    model_d = model_data[model_keys.index(p_id)]
                    model = pickle.loads(model_d.model)
                    profile = model_d.profile
                    score = model.score(np.array(p_data["p_state"]))
                    if score < profile:
        #小于profile的参数数据输出告警到es
                        alarm = ES.pop_null(record)
                        alarm["alarm_type"] = "HmmParameterAnomaly "
                        alarm["p_id"] = p_id
                        alarm["p_name"] = model_d.p_name
                        alarm["p_type"] = model_d.p_type
                        alarm["p_profile"] = profile
                        alarm["score"] = score
                        es.write_to_es(index_name, type_name, alarm)
        except (UnicodeDecodeError, UnicodeEncodeError):
            logging.info("Error:%s" % str(record))

七、总结

 所有的机器学习算法都大致可分为训练、检测阶段,基于HMM的web参数异常检测是其中的典型代表,本文尝试将机器学习算法在大数据环境下使用,所有用到的代码都会在Github上公开(其实数据抽取部分并不完美,欢迎提出好的建议)。 代码地址:https://github.com/SparkSharly/Sharly


The article is not allowed to repost unless author authorized




You can donate to me if the article makes sense to you

|

Welcome to follow my wechat



Comments



user-avatar

wells fargo full site login

It's a pity you don't have a donate button! I'd certainly donate to this excellent blog! I guess for now i'll settle for book-marking and adding your RSS feed to my Google account. I look forward to fresh updates and will talk about this website with my Facebook group. Talk soon!

2018/03/16
user-avatar

wells fargo log in

Thank you for the good writeup. It in fact was a amusement account it. Look advanced to far added agreeable from you! However, how could we communicate?

2018/03/19
user-avatar

wells fargo online sign in

I will immediately clutch your rss feed as I can't to find your email subscription hyperlink or newsletter service. Do you have any? Kindly allow me recognize so that I may subscribe. Thanks.

2018/03/20
user-avatar

Webber

@wells fargo log in My email: weguo0022@hotmail.com You can contact me with WeChat.Add my account by scanning the two-dimensional code behind the article.

2018/03/31