后端基于Python的DFA过滤算法 敏感词过滤
后端基于Python的DFA过滤算法:一:整个敏感词过滤来说,需要考虑到性能,因此需要将敏感词加载成字典结构的JSON文件
################################
# 初始化敏感词字典
################################
class initSensitiveWords(object):
# 敏感词文本文件以逗号分隔
def __init__(self):
self.sensitiveWord_dict = {}
self.delimit = '\x00'
def construct_dict(self, sensitiveWord):
#构造敏感词字典
sensitiveWord = sensitiveWord.lower()
chars = sensitiveWord.strip()
if not chars:
return
current_level = self.sensitiveWord_dict
# 遍历关键字的每个字
for i in range(len(chars)):
# 如果这个字已经存在字符链的key中就进入其子字典
if chars[i] in current_level:
current_level = current_level[chars[i]]
else:
if not isinstance(current_level, dict):
break
for j in range(i, len(chars)):
current_level[chars[j]] = {}
last_level, last_char = current_level, chars[j]
current_level = current_level[chars[j]]
last_level[last_char] = {self.delimit: 0}
break
if i == len(chars) - 1:
current_level[self.delimit] = 0
def write_as_json(self,text_path, json_path = 'sensitiveWords.json'):
# 构造字典并写入文件
with open(text_path, 'r', encoding='utf8') as f1, \
open(json_path, 'w', encoding='utf8') as f2:
for sensitiveWord in f1.read().split(','):
self.construct_dict(str(sensitiveWord).strip())
import json
f2.write(json.dumps(self.sensitiveWord_dict))
text_path = '敏感词1易做图'
json_path = 'sensitiveWords1.json'
obj = initSensitiveWords()
obj.write_as_json(text_path, json_path)
###################################
#Python实现敏感词过滤DFA算法
###################################
import os
path = os.path.join(os.getcwd(), 'sensitiveWords.json')
class DFAFilter(object):
def __init__(self, path):
self.keyword_chains = {}
self.delimit = '\x00'
self.path = path
self._parse()
def _parse(self):
#加载敏感词字典
with open(self.path, encoding='utf-8') as f:
import json
self.keyword_chains = json.loads(f.read())
def filter(self, message, repl="*"):
#敏感词过滤
message = message.lower()
ret = []
start = 0
while start < len(message):
level = self.keyword_chains
step_ins = 0
for char in message[start:]:
if char in level:
step_ins += 1
if self.delimit not in level[char]:
level = level[char]
else:
ret.append(repl * step_ins)
start += step_ins - 1
break
else:
ret.append(message[start])
break
else:
ret.append(message[start])
start += 1
return ''.join(ret)
在算法实现之后经过测试:
import time
import random
from utils.msgFilter import msgFilter
################################################
####################测试素材生成##################
with open('dirty.txt','r',encoding='utf8') as f:
dirty = f.read().split('\n')
with open('new.txt','w',encoding='utf8') as f:
for i in range(10000):
msg = random.choice(dirty)
f.write(msg+'\n')
#################################################
#################################################
###################过滤性能测试####################
with open('new.txt','r',encoding='utf8') as f:
dirty = f.read().split('\n')
start = time.time()
for msg in dirty:
msgFilter.filter(msg)
stop = time.time()
print(stop-start)
##################################################
0.5772008895874023
function check(keyword_chains, content) {
var ret = [];
var delimit = '\x00';
var start = 0;
while (start < content.length) {
let level = keyword_chains;
let step_ins = 0;
let string = content.substr(start, content.length);
console.log(string);
for (let i = 0; i < string.length; i++) {
let char = string[i];
console.log(char);
if (char in level) {
step_ins += 1;
if (delimit in level[char]) {
for (let i = 0; i < step_ins; i++) {
ret.push('*')
}
start += step_ins - 1;
break
}
else {
level = level[char]
}
} else {
ret.push(content[start]);
break
}
}
start += 1
}
console.log(ret.join(''));
return ret.join('')
}
jsDFA