恶意URL检测:基于TF-IDF特征向量进行机器学习的微型WAF
数据源: Github开源数据集https://github.com/faizann24/Using-machine-learning-to-detect-malicious-URLs
https://github.com/exp-db/AI-Driven-WAF
https://github.com/Echo-Ws/UrlDetect
摘要
随着互联网科技的不断发展,工信部近期指出,过去十年我国已建成全球规模最大的网络基础设施。不断完善的网络基础设施能够给我们带来很多便捷的应用,同时也带来了很多新式网络安全风险。由于传统的黑名单式的过滤规则依赖人工修改,随着网络环境的发展增删管理成本将迅速增高。因此本文尝试通过一些常见的机器学习方法来构建一个不依赖硬编码规则的微型防火墙,对恶意的URL进行检测并拦截。使用的模型主要有逻辑斯蒂回归、SVM支持向量机、朴素贝叶斯等。同时由于目标数据的文本形式,还需要首先筛选每个URL中的关键词,生成TF-IDF特征向量。最后本文对由不同方法产生的模型进行了测试和结果分析,测试结果中发现,利用逻辑斯蒂回归方案构建的恶意URL检测系统,在测试集上表现出了较好的分类性能,在正确率、召回率、精确率、F1值上表现出了较好的综合性能,同时不会产生过拟合现象。最后本文使用flask框架装载此模型,构建了一个restfulAPI,演示了该微型防火墙的可能工作模式。
**关键词:**URL;恶意检测;SVM支持向量机;逻辑回归;朴素贝叶斯;TF-IDF;
研究背景
OWASP统计的2021全球Web安全问题Top 10中,权限控制失效和注入式攻击分别占到第一位和第三位。权限控制包含用户cookie、jwt、序列化数据的篡改,以及CORS跨域的错误配置导致未经授权的API恶意访问;注入式攻击包含sql、ldap、ognl、xml注入等。而这些漏洞的利用基本上都依靠URL或者报文的其他部分当作载体,并且明显有相当一部分共通的特征,故而有可能使用机器学习进行一定的预防。著名网络安全解决方案服务商卡巴斯基指出:在机器学习的支持下,AI 网络安全将在不久的将来成为一种强大的工具。与其他行业一样,人工交互在安全方面长期以来是必不可少且不可替代的。尽管目前网络安全严重依赖于人工操作,但我们逐渐看到,在处理特定任务时,科技已经比人类效率更高。
数据准备
获取URL请求数据集
本文的URL请求数据均采集于GitHub的开源数据集,其中包含带有标记的正常请求数据1,265,974条,恶意请求数据44,530条
URL解码统一格式
收集的请求数据中既有URL编码一次或多次的请求,也有原始的请求,故需要对URL进行循环解码统一格式。
数据集去重
收集的请求数据中存在常见的恶意请求,例如sql注入万能密码:foo.com?foo=0’ or 1=1#以及常见XSS攻击:foo.com?foo=等,很大概率会在各数据集中大批量重合,而且重合程度各不相同。故需要对数据进行去重,防止训练时影响参数的权重而导致模型效果欠佳。
替换/去除不合规字符
由于种种原因,收集的数据中还存在不合规的字符,比如中文的单双引号、全角数字、非utf-8编码的字符等。本文对可识别、可替换的不合规字符进行了替换操作,而对由于编码等问题导致的无意义字符进行了数据去除操作。
模型训练
本文的模型训练用到了scikit-learn(sklearn)==0.19.2以及numpy、scipy三个python库
将URL字符串转换为TF-IDF特征矩阵
TF-IDF是一种统计方法,用以评估一字词对于一个文件集或一个语料库中的其中一份文件的重要程度。字词的重要性随着它在文件中出现的次数成正比增加,但同时会随着它在语料库中出现的频率成反比下降。
使用sklearn.feature_extraction.text的TfidfVectorizer函数可以对目标文本生成TF-IDF特征向量,定义矢量化函数为TF-IDF矢量函数实例:self.vectorizer = TfidfVectorizer(tokenizer=self.get_ngrams)。
其中tokenizer=self.get_ngrams为自定义的生成特征值的方法,本文通过对URL进行分组生成特征,每组由2-5个字符组成,步长为1来取特征向量组成数组,提交给TF-IDF矢量函数。
应用不同模型进行训练
应用逻辑斯蒂回归模型进行训练
使用sklearn.linear_model的LogisticRegression函数能够对数据进行线性拟合,然后使用激励函数逻辑斯蒂函数将其处理到0至1的区间,步步迭代,从而分类正常请求和恶意请求。
应用svm支持向量机模型进行训练
使用sklearn.svm的LinearSVC函数能够自动对数据寻找不同集合之间的最大超平面,从而分类正常请求和恶意请求。
应用朴素贝叶斯模型进行训练
使用sklearn.naive_bayes的MultinomialNB函数能够计算各特征的概率,对其使用最大似然估计方法,估计参数的值。
准确度评测
使用sklearn.metrics的classification_report函数,能够提供包含精确率、召回率、F1值等的完整报告。如下为三种模型的预测结果报告。
逻辑斯蒂回归预测结果
定义向量中单个元素包含两个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 1.00 | 1.00 | 1.00 | 316542 |
恶意请求 | 0.99 | 0.94 | 0.96 | 11084 |
avg / total | 1.00 | 1.00 | 1.00 | 327626 |
定义向量中单个元素包含三个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 1.00 | 1.00 | 1.00 | 316542 |
恶意请求 | 1.00 | 0.94 | 0.97 | 11084 |
avg / total | 1.00 | 1.00 | 1.00 | 327626 |
模型的准确度:0.9978969923021982
定义向量中单个元素包含四个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 1.00 | 1.00 | 1.00 | 316542 |
恶意请求 | 1.00 | 0.93 | 0.96 | 11084 |
avg / total | 1.00 | 1.00 | 1.00 | 327626 |
模型的准确度:0.9975520868307155
svm支持向量机预测结果
定义向量中单个元素包含两个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 1.00 | 1.00 | 1.00 | 316542 |
恶意请求 | 0.99 | 0.97 | 0.97 | 11084 |
avg / total | 1.00 | 1.00 | 1.00 | 327626 |
模型的准确度:0.998525758028972
定义向量中单个元素包含三个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 1.00 | 1.00 | 1.00 | 316542 |
恶意请求 | 1.00 | 0.98 | 0.99 | 11084 |
avg / total | 1.00 | 1.00 | 1.00 | 327626 |
模型的准确度:0.9993468161867495
定义向量中单个元素包含四个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 1.00 | 1.00 | 1.00 | 316542 |
恶意请求 | 1.00 | 0.98 | 0.99 | 11084 |
avg / total | 1.00 | 1.00 | 1.00 | 327626 |
定义向量中单个元素包含五个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 1.00 | 1.00 | 1.00 | 316542 |
恶意请求 | 1.00 | 0.97 | 0.98 | 11084 |
avg / total | 1.00 | 1.00 | 1.00 | 327626 |
朴素贝叶斯预测结果
定义向量中单个元素包含两个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 0.99 | 1.00 | 1.00 | 316542 |
恶意请求 | 0.90 | 0.81 | 0.85 | 11084 |
avg / total | 0.99 | 0.99 | 0.99 | 327626 |
模型的准确度:0.9904586327092477
定义向量中单个元素包含三个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 0.99 | 0.99 | 0.99 | 316542 |
恶意请求 | 0.82 | 0.85 | 0.84 | 11084 |
avg / total | 0.99 | 0.99 | 0.99 | 327626 |
模型的准确度:0.9887066350045479
定义向量中单个元素包含四个字符
正常请求数目: 1265974 恶意请求数目: 44530
| precision | recall | f1-score | support |
---|
正常请求 | 0.99 | 0.99 | 0.99 | 316542 |
恶意请求 | 0.78 | 0.81 | 0.80 | 11084 |
avg / total | 0.99 | 0.99 | 0.99 | 327626 |
模型的准确度:0.9858619279300178
评估总结
通过对比发现,使用svm支持向量机方法,并且向量中单个元素包含三个字符时训练的模型,在精确率、召回率、F1值上均表现出了较好的性能。
但是后期,在进行模型的基本测试时,本文发现使用svm支持向量机方法训练的模型将一些明显正常的url预测为恶意url,推测可能是过拟合或者数据集不够全面导致
svm支持向量机模型均认为以下url为恶意url(两个正常url,字符型sql注入绕过和sql联合注入)
而逻辑斯蒂回归模型则能够准确区分两者,例如(三个正常url,字符型sql注入绕过和数字型sql注入绕过以及sql联合注入)
总的来说,逻辑斯蒂回归在不会发生过拟合影响基本判断的情况下,其准确率、召回率等并不低于svm支持向量机模型过多,是最理想的模型
模型应用
完善脚本接口
由于sklearn的线程问题,无法直接将模型放入flask的路由函数中,故采用flask调用系统shell命令来执行脚本的方法进行替代。同时考虑到URL中可能出现空格以及其他命令行关键字,采用对传入的URL进行base64编码的方法解决该问题。
使用flask编写restfulAPI对脚本进行调用
flask负责通过接收指定路由上的POST数据得到原始URL值,然后对其base64编码,传递给命令行进行脚本调用,获得执行结果后将其传递回前端。
项目完整代码
使用的python库
scikit-learn==0.19.2
numpy
Scipy
逻辑斯蒂回归模型
import os
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
import urllib
import pickle
import html
GRAM_BIT_NUM = 3
class lgs_net(object):
def __init__(self):
good_query_list = self.get_query_list('goodqueries.txt')
bad_query_list = self.get_query_list('badqueries.txt')
print('正常请求数目: ', len(good_query_list), '恶意请求数目: ', len(bad_query_list))
good_y = [0 for i in range(0, len(good_query_list))]
bad_y = [1 for i in range(0, len(bad_query_list))]
queries = bad_query_list + good_query_list
y = bad_y + good_y
self.vectorizer = TfidfVectorizer(tokenizer=self.get_ngrams)
X = self.vectorizer.fit_transform(queries)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
self.lgs = LogisticRegression()
self.lgs.fit(X_train, y_train)
y_predict = self.lgs.predict(X_test)
print('模型的准确度:{}'.format(self.lgs.score(X_test, y_test)))
print(classification_report(y_test, y_predict, target_names=['正常请求', '恶意请求']))
def predict(self, new_queries):
new_queries = [urllib.parse.unquote(url) for url in new_queries]
X_predict = self.vectorizer.transform(new_queries)
res = self.lgs.predict(X_predict)
res_list = []
for q, r in zip(new_queries, res):
tmp = '正常请求' if r == 0 else '恶意请求'
q_entity = html.escape(q)
res_list.append({'url': q_entity, 'res': tmp})
return res_list
def get_query_list(self, filename):
directory = str(os.getcwd())
filepath = directory + "/" + filename
data = open(filepath, 'r', encoding='utf-8').readlines()
query_list = []
for d in data:
d = str(urllib.parse.unquote(d))
query_list.append(d)
return list(set(query_list))
def get_ngrams(self, query):
tmp_query = str(query)
ngrams = []
for i in range(0, len(tmp_query) - GRAM_BIT_NUM):
ngrams.append(tmp_query[i:i + GRAM_BIT_NUM])
return ngrams
if __name__ == '__main__':
model = lgs_net()
with open('lgs.pkl', 'wb') as output:
pickle.dump(model, output)
with open('lgs.pkl', 'rb') as f:
model = pickle.load(f)
model.predict(['www.foo.com/id=1<script>alert(1)</script>', 'www.foo.com/name=admin\' or 1=1', 'abc.com/admin.php',
'"><svg onload=confirm(1)>', 'test/q=<a href="javascript:confirm(1)>', 'q=../etc/passwd'])
svm支持向量机模型
import os
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.svm import LinearSVC
from sklearn.metrics import classification_report
import urllib
import pickle
import html
GRAM_BIT_NUM = 3
class svm_net(object):
def __init__(self):
good_query_list = self.get_query_list('goodqueries.txt')
bad_query_list = self.get_query_list('badqueries.txt')
print('正常请求数目: ', len(good_query_list), '恶意请求数目: ', len(bad_query_list))
good_y = [0 for i in range(0, len(good_query_list))]
bad_y = [1 for i in range(0, len(bad_query_list))]
queries = bad_query_list + good_query_list
y = bad_y + good_y
self.vectorizer = TfidfVectorizer(tokenizer=self.get_ngrams)
X = self.vectorizer.fit_transform(queries)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
self.svm = LinearSVC()
self.svm.fit(X_train, y_train)
y_predict = self.svm.predict(X_test)
print('模型的准确度:{}'.format(self.svm.score(X_test, y_test)))
print(classification_report(y_test, y_predict, target_names=['正常请求', '恶意请求']))
def predict(self, new_queries):
new_queries = [urllib.parse.unquote(url) for url in new_queries]
X_predict = self.vectorizer.transform(new_queries)
res = self.svm.predict(X_predict)
res_list = []
for q, r in zip(new_queries, res):
tmp = '正常请求' if r == 0 else '恶意请求'
q_entity = html.escape(q)
res_list.append({'url': q_entity, 'res': tmp})
return res_list
def get_query_list(self, filename):
directory = str(os.getcwd())
filepath = directory + "/" + filename
data = open(filepath, 'r', encoding='utf-8').readlines()
query_list = []
for d in data:
d = str(urllib.parse.unquote(d))
query_list.append(d)
return list(set(query_list))
def get_ngrams(self, query):
tmp_query = str(query)
ngrams = []
for i in range(0, len(tmp_query) - GRAM_BIT_NUM):
ngrams.append(tmp_query[i:i + GRAM_BIT_NUM])
return ngrams
if __name__ == '__main__':
model = svm_net()
with open('svm.pkl', 'wb') as output:
pickle.dump(model, output)
with open('svm.pkl', 'rb') as f:
model = pickle.load(f)
model.predict(['www.foo.com/id=1<script>alert(1)</script>', 'www.foo.com/name=admin\' or 1=1', 'abc.com/admin.php',
'"><svg onload=confirm(1)>', 'test/q=<a href="javascript:confirm(1)>', 'q=../etc/passwd'])
朴素贝叶斯模型
import os
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import classification_report
import urllib
import pickle
import html
GRAM_BIT_NUM = 3
class bys_net(object):
def __init__(self):
good_query_list = self.get_query_list('goodqueries.txt')
bad_query_list = self.get_query_list('badqueries.txt')
print('正常请求数目: ', len(good_query_list), '恶意请求数目: ', len(bad_query_list))
good_y = [0 for i in range(0, len(good_query_list))]
bad_y = [1 for i in range(0, len(bad_query_list))]
queries = bad_query_list + good_query_list
y = bad_y + good_y
self.vectorizer = TfidfVectorizer(tokenizer=self.get_ngrams)
X = self.vectorizer.fit_transform(queries)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
self.mnb = MultinomialNB()
self.mnb.fit(X_train, y_train)
y_predict = self.mnb.predict(X_test)
print('模型的准确度:{}'.format(self.mnb.score(X_test, y_test)))
print(classification_report(y_test, y_predict, target_names=['正常请求', '恶意请求']))
def predict(self, new_queries):
new_queries = [urllib.parse.unquote(url) for url in new_queries]
X_predict = self.vectorizer.transform(new_queries)
res = self.mnb.predict(X_predict)
res_list = []
for q, r in zip(new_queries, res):
tmp = '正常请求' if r == 0 else '恶意请求'
q_entity = html.escape(q)
res_list.append({'url': q_entity, 'res': tmp})
return res_list
def get_query_list(self, filename):
directory = str(os.getcwd())
filepath = directory + "/" + filename
data = open(filepath, 'r', encoding='utf-8').readlines()
query_list = []
for d in data:
d = str(urllib.parse.unquote(d))
query_list.append(d)
return list(set(query_list))
def get_ngrams(self, query):
tmp_query = str(query)
ngrams = []
for i in range(0, len(tmp_query) - GRAM_BIT_NUM):
ngrams.append(tmp_query[i:i + GRAM_BIT_NUM])
return ngrams
if __name__ == '__main__':
model = bys_net()
with open('bys.pkl', 'wb') as output:
pickle.dump(model, output)
with open('bys.pkl', 'rb') as f:
model = pickle.load(f)
model.predict(['www.foo.com/id=1<script>alert(1)</script>', 'www.foo.com/name=admin\' or 1=1', 'abc.com/admin.php',
'"><svg onload=confirm(1)>', 'test/q=<a href="javascript:confirm(1)>', 'q=../etc/passwd'])
flask装载svm支持向量机模型的微型WAF
该flask服务由flask入口app.py、训练/预测脚本url_detect.py、模型文件svm.pkl以及数据集goodqueries.txt、badqueries.txt五部分组成
import base64
import os
from flask import Flask, request
app = Flask(__name__)
@app.route('/', methods=['POST'])
def url_detect():
if request.method == 'POST':
try:
url = request.form.get('url')
url_b64 = str(base64.b64encode(bytes(url, encoding='utf-8')))[2:-1]
result = os.popen('python3 url_detect.py ' + url_b64)
return result.read()
except:
return "500 Internal Server Error"
if __name__ == '__main__':
app.run(host="0.0.0.0")
url_detect.py
import os
import sys
from sklearn.model_selection import train_test_split
from sklearn.svm import LinearSVC
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import classification_report
import urllib
import pickle
import html
import base64
import warnings
warnings.filterwarnings('ignore')
GRAM_BIT_NUM = 3
class lgs_net(object):
def __init__(self):
good_query_list = self.get_query_list('goodqueries.txt')
bad_query_list = self.get_query_list('badqueries.txt')
print('正常请求数目: ', len(good_query_list), '恶意请求数目: ', len(bad_query_list))
good_y = [0 for i in range(0, len(good_query_list))]
bad_y = [1 for i in range(0, len(bad_query_list))]
queries = bad_query_list + good_query_list
y = bad_y + good_y
self.vectorizer = TfidfVectorizer(tokenizer=self.get_ngrams)
X = self.vectorizer.fit_transform(queries)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)
self.lgs = LinearSVC()
self.lgs.fit(X_train, y_train)
y_predict = self.lgs.predict(X_test)
print('模型的准确度:{}'.format(self.lgs.score(X_test, y_test)))
print(classification_report(y_test, y_predict, target_names=['正常请求', '恶意请求']))
def get_query_list(self, filename):
directory = str(os.getcwd())
filepath = directory + "/" + filename
data = open(filepath, 'r', encoding='utf-8').readlines()
query_list = []
for d in data:
d = str(urllib.parse.unquote(d))
query_list.append(d)
return list(set(query_list))
def get_ngrams(self, query):
tmp_query = str(query)
ngrams = []
for i in range(0, len(tmp_query) - GRAM_BIT_NUM):
ngrams.append(tmp_query[i:i + GRAM_BIT_NUM])
return ngrams
def predict(self, new_queries):
new_queries = [urllib.parse.unquote(str(url)) for url in new_queries]
X_predict = self.vectorizer.transform(new_queries)
res = self.lgs.predict(X_predict)
res_list = []
for q, r in zip(new_queries, res):
res = 'good query' if r == 0 else 'bad query'
q_entity = html.escape(q)
res_list.append({'url': str(q_entity)[7:-6], 'res': res})
print(res_list)
return res_list
if __name__ == '__main__':
url = sys.argv[1]
url = base64.b64decode(bytes(str(url), encoding='utf-8'))
with open('lgs.pkl', 'rb') as input:
model = pickle.load(input)
model.predict([url])