当前位置:编程学习 > C/C++ >>

分治和hash-从海量数据大文件中查出某时间段内登陆超过阈值的ip地址

一个很大的文件,例如10G,仅包含ip地址和访问时间二列,格式如下:


 

 127.0.0.1   2013-07-22 14:00 
127.0.0.1   2013-07-22 14:02 
127.0.0.1   2013-07-22 14:03 
127.0.0.3   2013-07-22 14:03 
127.0.0.1   2013-07-22 14:04 
127.0.0.1   2013-07-22 14:05 
127.0.0.1   2013-07-22 14:06 
127.0.0.1   2013-07-22 14:07 
127.0.0.1   2013-07-22 14:08 
127.0.0.1   2013-07-22 14:09 
127.0.0.1   2013-07-22 14:10 
127.0.0.1   2013-07-22 14:11 
127.0.0.1   2013-07-22 14:12 
127.0.0.1   2013-07-22 14:13 
127.0.0.4   2013-07-22 14:13 
127.0.0.1   2013-07-22 14:15 
127.0.0.1   2013-07-22 14:16 
127.0.0.4   2013-07-22 14:17 
... ... 

127.0.0.1   2013-07-22 14:00
127.0.0.1   2013-07-22 14:02
127.0.0.1   2013-07-22 14:03
127.0.0.3   2013-07-22 14:03
127.0.0.1   2013-07-22 14:04
127.0.0.1   2013-07-22 14:05
127.0.0.1   2013-07-22 14:06
127.0.0.1   2013-07-22 14:07
127.0.0.1   2013-07-22 14:08
127.0.0.1   2013-07-22 14:09
127.0.0.1   2013-07-22 14:10
127.0.0.1   2013-07-22 14:11
127.0.0.1   2013-07-22 14:12
127.0.0.1   2013-07-22 14:13
127.0.0.4   2013-07-22 14:13
127.0.0.1   2013-07-22 14:15
127.0.0.1   2013-07-22 14:16
127.0.0.4   2013-07-22 14:17
... ...

从文件里查出在5分钟内连续登陆10次以上的ip地址集合并输出。这类问题是一个很常见的应用,通常都是从大的log日志文件中找出有攻击嫌疑的ip。

这类应用因为要处理分析的文件非常大,显然不能将整个文件全部读入内存,然后进行下一步工作。常见的比较成熟的解决方案有:分治+Hash,Bloom filter,2-Bitmap等。 这里就使用第一种方式来解决。
下面是分治与hash的代码


 

 public class DuplicateIP { 
    private String delimiter = " "; 
    private String FILE_PRE = "ip_"; 
     
    private int MAGIC = 10,BATCH_MAGIC = 500; 
    private String root = "/DuplicateIP/"; 
     
    private String filename = ""; 
     
    public DuplicateIP(final String filename) { 
        this.filename = filename; 
    } 
     
    /**
     * 将大文件拆分成较小的文件,进行预处理
     * @throws IOException
     */ 
    private void preProcess() throws IOException { 
        //Path newfile = FileSystems.getDefault().getPath(filename);  
        BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename)));          
        // 用5M的缓冲读取文本文件  
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024); 
         
        //假设文件是10G,那么先根据hashcode拆成小文件,再进行读写判断  
        //如果不拆分文件,将ip地址当做key,访问时间当做value存到hashmap时,  
        //当来访的ip地址足够多的情况下,内存开销吃不消  
//      List<Entity> entities = new ArrayList<Entity>();  
         
        //存放ip的hashcode->accessTimes集合  
        Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>(); 
        String line = ""; 
        int count = 0; 
        while((line = reader.readLine()) != null){ 
            String split[] = line.split(delimiter); 
            if(split != null && split.length >= 2){ 
                //根据ip的hashcode这样拆分文件,拆分后的文件大小在1G上下波动  
                //极端情况是整个文件的ip地址全都相同,只有一个,那么拆分后还是只有一个文件  
                int serial = split[0].trim().hashCode() % MAGIC; 
 
                String splitFilename = FILE_PRE + serial; 
                List<String> lines = hashcodeMap.get(splitFilename); 
                if(lines == null){ 
                    lines = new ArrayList<String>(); 
                     
                    hashcodeMap.put(splitFilename, lines); 
                } 
                lines.add(line); 
            } 
             
            count ++; 
            if(count > 0 && count % BATCH_MAGIC == 0){ 
                for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){   
                    //System.out.println(entry.getKey()+"--->"+entry.getValue());  
                    DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8"));    
                }   
                //一次操作500之后清空,重新执行  
                hashcodeMap.clear(); 
            } 
        } 
         
        reader.close(); 
        fis.close(); 
    } 
     
    private boolean process() throws IOException{ 
        Path target = Paths.get(root); 
         
        //ip -> List<Date>  
        Map<String,List<Date>> resMap = new HashMap<String,List<Date>>(); 
        this.recurseFile(target,resMap); 
         
        for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){ 
            System.out.println(entry.getKey()); 
            for(Date date : entry.getValue()){ 
                System.out.println(date); 
            }            
        } 
         
        return true; 
    } 
     
    /**
     * 递归执行,将5分钟内访问超过阈值的ip找出来
     * 
     * @param parent
     * @return
     * @throws IOException 
     */ 
    private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{ 
        //Path target = Paths.get(dir);  
        if(!Files.exists(parent) || !Files.isDirectory(parent)){ 
            return; 
        } 
         
        Iterator<Path> targets = parent.iterator(); 
        for(;targets.hasNext();){ 
            Path path = targets.next(); 
            if(Files.isDirectory(parent)){ 
                //如果还是目录,递归  
                recurseFile(path.toAbsolutePath(),resMap); 
            }else { 
                //将一个文件中所有的行读上来  
                List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8")); 
                judgeAndcollection(lines,resMap); 
            }            
        } 
    } 
     
    /**
     * 根据从较小文件读上来的每行ip accessTimes进行判断符合条件的ip
     * 并放入resMap
     * 
     * @param lines
     * @param resMap
     */ 
    private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) { 
        if(lines != null){ 
            //ip->List<String>accessTimes  
            Map<String,List<String>> judgeMap = new HashMap<String,List<String>>(); 
            for(String line : lines){ 
                line = line.trim(); 
                int space = line.indexOf(delimiter); 
                String ip = line.substring(0, space); 
                 
                List<String> accessTimes = judgeMap.get(ip); 
                if(accessTimes == null){ 
                    accessTimes = new ArrayList<String>();                     
                } 
                accessTimes.add(line.substring(space + 1).trim()); 
                judgeMap.put(ip, accessTimes); 
            } 
             
            if(judgeMap.size() == 0){ 
                return; 
            } 
             
            for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){ 
                List<String> acessTimes = entry.getValue(); 
                //相同ip,先判断整体大于10个  
                if(acessTimes != null && acessTimes.size() >= MAGIC){ 
                    //开始判断在List集合中,5分钟内访问超过MAGIC=10  
                    List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC); 
                    if(attackTimes != null){ 
                        resMap.put(entry.getKey(), attackTimes); 
                    } 
                } 
            } 
        } 
    } 
     
    /**
     * @param args
     */ 
    public static void main(String[] args) { 
        String filename = "/DuplicateIP/log.txt"; 
        DuplicateIP dip = new DuplicateIP(filename); 
        try { 
            dip.preProcess(); 
            dip.process(); 
        }
补充:软件开发 , C++ ,
CopyRight © 2022 站长资源库 编程知识问答 zzzyk.com All Rights Reserved
部分文章来自网络,