项目背景

随着业务量的日益增长,业务库用户跑男相关日志数据越来越大,为了减轻业务库压力,对数据按实时与历史的方式进行了分离。实时库保留近一个月数据,历史数据转移到了冷数据库,按年表、月表进行数据存储。业务库压力降低了,但用户跑男相关日志数据的查询时只能按分库分表结构查贸易,职能部门反馈查询很慢且不好操作,比如不能跨年查,历史数据查询经常超时等等,为了解决这些问题,我们开始尝试数据存储方式变更。

评估与预期

  • 数据量:初步估算用户相关日志数据3亿+条(200GB+),跑男相关日志数据3亿+条(90GB+);
  • 预估随着业务量增长,每年数据很大可能超150GB+;
  • 预期数据检索速度:200毫秒左右;

以下是各数据库对比:

MysqlHadoopMongodbElasticSearch
容量扩展海量较大较大
查询灵活性非常好,支持sql非常好,支持sql较好,但是单集合数据量超过10亿条,即使简单条件查询性能也不理想,不如Elasticsearch倒排索引快较好,关联查询较弱,但是可以全文检索,DSL语言可以处理过滤、匹配、聚合、排序等操作
写入速度中等较快较快
各数据库对比

什么是Elasticsearch?

Elasticsearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据,包括文本、数字、地理空间、结构化和非结构化数据。

基本概念

  • Index(索引)

动词,相当于mysql中的insert;名词,相当于mysql的Database;

  • Type(类型)7.0废弃

在index(索引)中,可以定义一个或多个类型,类似于mysql中的table,同一种类型的数据放在一起;

  • Document(文档)

保存在某个索引(Index)下、某种类型(Type)的一个数据(Document),文档是json格式的数据,Document相当于mysql中某个table数据;

  • 倒排索引

分词:将整句拆分为单词

为什么选择Elasticsearch

  1. 相较于其它几种数据库,ES技术栈的经验相对丰富;
  2. ES是一种容纳较大规模数据并且交互性好的数据库,可以在容量和交互性上达到一个非常不错的平衡;
  3. Elasticsearch 很快。由于 Elasticsearch 是在 Lucene 基础上构建而成的,所以在全文本搜索方面表现十分出色。Elasticsearch 同时还是一个近实时的搜索平台,这意味着从文档索引操作到文档变为可搜索状态之间的延时很短,一般只有一秒。因此,Elasticsearch 非常适用于对时间有严苛要求的用例,例如安全分析和基础设施监测。
  4. Elasticsearch 包含一系列广泛的功能(支持全文检索、倒排索引,DSL语言可以处理过滤、匹配、排序、聚合等各种操作)。除了速度、可扩展性和弹性等优势以外,Elasticsearch 还有大量强大的内置功能(例如数据汇总和索引生命周期管理),可以方便用户更加高效地存储和搜索数据。
  5. Elasticsearch 具有分布式的本质特征,使得它可以扩展至数百台(甚至数千台)服务器,并处理 PB 量级的数据。Elasticsearch 中存储的文档分布在不同的容器中,这些容器称为分片,可以进行复制以提供数据冗余副本,副本机制能够保证数据的可靠性,即使有节点宕机也可以保证数据不丢失。
    1. 集群中,若主节点挂掉了,或者是挂了其中一个,在设置了master.nod参数时,如果主节点挂掉,其他的节点会被重新选举为主节点,继续接力。
    2. 集群中,所有节点都挂掉了,解决问题开机后,ES会自动的寻找备份节点/文件恢复数据,在找到备份之后,ES会在不影响系统的情况下恢复数据,同时继续接力

导入数据过程

在单元测试过程中发现,使用Elasticsearch.Net的 Bulk api进行数据写入,大概每秒插入3000条左右,7+亿条数据使用这种方式导入到Es中,耗费时间太长。而用REST API的_bulk来批量插入,速度会非常快,可以达到5 w+条每秒。但是需要先定义一定格式的Json文件,然后再用curl命令去执行Elasticsearch的_bulk来批量插入。这就要求把数据写进Json文件,然后再通过批处理,执行文件插入数据。另外在生成Json格式文件不能过大,文件过大会导致一些意想不到的情况,所以建议生成的文件一个10M左右,然后分别去执行这些小文件就可以了。

数据转移流程

Json格式数据文件格式展示

bat格式批处理文件

核心代码

转移过程核心代理很简单,如下:

if (!Directory.Exists(@"D:\EsData\UserOperateLog"))  
    Directory.CreateDirectory(@"D:\EsData\UserOperateLog");  
using (FileStream fs = new FileStream($"D:\\EsData\\UserOperateLog\\{tbName}_{pageIndex}.json", FileMode.Create))  
{  
    using (StreamWriter sw = new StreamWriter(fs, new UTF8Encoding(false)))  
    {  
        StringBuilder sb = new StringBuilder();    
        foreach (var baseItem in baseDataList)  
        {  
            string esAction = isCreate ? "index" : "update";    
            baseItem.CityName = GetCityName(baseItem.UserCityID ?? 0);  
            baseItem.UserTypeText = Enum.GetName(typeof(UserTypeEnum), baseItem.UserType ?? 0);    
            sb.Append($"{{\"{esAction}\": {{ \"_index\":\"user_operatelog_v1\", \"_type\":\"operatelog\", \"_id\": \"{baseItem.Id}\"}}");  
            sb.Append("}");  
            sb.Append(Environment.NewLine);  
            sb.Append(SerializeData(baseItem));  
            sb.Append(Environment.NewLine);  
            sw.Write(sb.ToString());  
            sb.Length = 0;  
        }  
    }  
}  
  
batStrBild.Append($"curl -u {esUserName}:{esPassword} -s -H \"Content-Type: application/json\" -XPOST {esUrl.Trim('/')}/user_operatelog_v1/operatelog/_bulk?pretty --data-binary ");  
batStrBild.Append(@"@D:\EsData\UserOperateLog\" + tbName + "_" + pageIndex + ".json" + Environment.NewLine); 

查询结果

从正式数据导入结果来看,Elasticsearch的搜索速度能够满足查询要求。

核对数据的过程中,在3亿+数据时,根据城市CityID和添加时间addTime在一年范围的数据,每次千条数据不到200毫秒就完成查询。

首次查询会慢(将磁盘文件里的数据自动缓存到 filesystem cache 里面),但是之后就会很快,而且同一查询条件查询越多,查询就越快(已经缓存到filesystem cache里面)。7亿条数据,大概占用500G空间左右,相比业务库存储的数据,要大1.8倍左右,但搜索速度比较满意。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注