1.项目背景
通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,
第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,
还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,
年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论。
当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。
2.项目架构
3.项目实现
系统环境:
表1
系统 | 版本 |
windows | 10 专业版 |
linux | CentOS 6.8 |
开发工具:
表2
工具 | 版本 |
idea | 2017.2.5旗舰版 |
maven | 3.3.9 |
JDK | 1.8+ |
提示:idea2017.2.5必须使用maven3.3.9,不要使用maven3.5,有部分兼容性问题
集群环境:
表3
框架 | 版本 |
hadoop | 2.7.2 |
zookeeper | 3.4.10 |
hbase | 1.3.1 |
flume | 1.7.0 |
kafka | 2.11-0.11.0.0 |
硬件环境:
表4
| hadoop102 | hadoop103 | hadoop104 |
内存 | 4G | 2G | 2G |
CPU | 2核 | 1核 | 1核 |
硬盘 | 50G | 50G | 50G |
3.1 数据生产
此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,
数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到
项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个
环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题。
3.1.1 数据结构
我们将在HBase中存储两个电话号码,以及通话建立的时间和通话持续时间,
最后再加上一个flag作为判断第一个电话号码是否为主叫。
姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中。
表5
列名 | 解释 | 举例 |
call1 | 第一个手机号码 | 15369468720 |
call1_name | 第一个手机号码人姓名(非必须) | 李雁 |
call2 | 第二个手机号码 | 19920860202 |
call2_name | 第二个手机号码人姓名(非必须) | 卫艺 |
date_time | 建立通话的时间 | 20171017081520 |
date_time_ts | 建立通话的时间(时间戳形式) |
|
duration | 通话持续时间(秒) | 0600 |
3.2 数据采集/消费(存储)
我们常用的一种模型是:
线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS
消费存储模块流程如图2所示:
1)启动zookeeper,kafka集群
[lxl@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties ...... ......
2)创建kafka主题
[lxl@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --topic ct --partitions 3 --replication-factor 2
检查一下是否创建主题成功:
$ /opt/module/kafka/bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
3)启动kafka控制台消费者,等待flume信息的输入
[lxl@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic ct
4)配置flume(flume-kafka.conf)
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +O /opt/module/datas/call.log a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = ct a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
5)启动flume
[lxl@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f /opt/module/flume/job/flume-2-kafka.conf
hbase(main):001:0> scan 'ct:calllog'
代码:
3.3 数据分析
我们的数据已经完整的采集到了HBase集群中,这次我们需要对采集到的数据进行分析,统计出我们想要的结果。注意,在分析的过程中,我们不一定会采取一个业务指标对应一个mapreduce-job的方式,如果情景允许,我们会采取一个mapreduce分析多个业务指标的方式来进行任务。具体何时采用哪种方式,我们后续会详细探讨。
分析模块流程如图3所示:
业务指标:
a) 用户每天主叫通话个数统计,通话时间统计。
b) 用户每月通话记录统计,通话时间统计。
c) 用户之间亲密关系统计。(通话次数与通话时间体现用户亲密关系)
需求分析
根据需求目标,设计出下述(3.2.2)表结构。我们需要按照时间范围(年月日),结合MapReduce统计出所属时间范围内所有手机号码的通话次数总和以及通话时长总和。
思路:
a) 维度,即某个角度,某个视角,按照时间维度来统计通话,比如我想统计2018年所有月份所有日子的通话记录,那这个维度我们大概可以表述为2018年*月*日
b) 通过Mapper将数据按照不同维度聚合给Reducer
c) 通过Reducer拿到按照各个维度聚合过来的数据,进行汇总,输出
d) 根据业务需求,将Reducer的输出通过Outputformat把数据
数据输入:HBase
数据输出:Mysql
HBase中数据源结构:
表6
标签 | 举例&说明 |
rowkey | hashregion_call1_datetime_call2_flag_duration 01_15837312345_20170527081033_13766889900_1_0180 |
family | f1列族:存放主叫信息 f2列族:存放被叫信息 |
call1 | 第一个手机号码 |
call2 | 第二个手机号码 |
date_time | 通话建立的时间,例如:20171017081520 |
date_time_ts | date_time对应的时间戳形式 |
duration | 通话时长(单位:秒) |
flag | 标记call1是主叫还是被叫(call1的身份与call2的身份互斥) |
a) 已知目标,那么需要结合目标思考已有数据是否能够支撑目标实现;
b) 根据目标数据结构,构建Mysql表结构,建表;
c) 思考代码需要涉及到哪些功能模块,建立不同功能模块对应的包结构。
d) 描述数据,一定是基于某个维度(视角)的,所以构建维度类。比如按照“年”与“手机号码”的组合作为key聚合所有的数据,便可以统计这个手机号码,这一年的相关结果。
e) 自定义OutputFormat用于对接Mysql,使数据输出。
f) 创建相关工具类。
Mysql表结构设计(参考)
我们将分析的结果数据保存到Mysql中,以方便Web端进行查询展示。
1) 表7:db_telecom.tb_contacts
用于存放用户手机号码与联系人姓名。
表7 db_telecom.tb_contacts
列 | 备注 | 类型 |
id | 自增主键 | int(11) NOT NULL |
telephone | 手机号码 | varchar(255) NOT NULL |
name | 联系人姓名 | varchar(255) NOT NULL |
2) 表8:db_telecom.tb_call
用于存放某个时间维度下通话次数与通话时长的总和。
表8 db_telecom.tb_call
列 | 备注 | 类型 |
id_date_contact | 复合主键(联系人维度id,时间维度id) | varchar(255) NOT NULL |
id_date_dimension | 时间维度id | int(11) NOT NULL |
id_contact | 查询人的电话号码 | int(11) NOT NULL |
call_sum | 通话次数总和 | int(11) NOT NULL DEFAULT 0 |
call_duration_sum | 通话时长总和 | int(11) NOT NULL DEFAULT 0 |
3) 表9:db_telecom.tb_dimension_date
用于存放时间维度的相关数据
表9 db_telecom.tb_dimension_date
列 | 备注 | 类型 |
id | 自增主键 | int(11) NOT NULL |
year | 年,当前通话信息所在年 | int(11) NOT NULL |
month | 月,当前通话信息所在月,如果按照年来统计信息,则month为-1。 | int(11) NOT NULL |
day | 日,当前通话信息所在日,如果是按照月来统计信息,则day为-1。 | int(11) NOT NULL |
4) 表10:db_telecom.tb_intimacy
用于存放所有用户用户关系的结果数据。(作业中使用)
表10 db_telecom.tb_intimacy
列 | 备注 | 类型 |
id | 自增主键 | int(11) NOT NULL |
intimacy_rank | 好友亲密度排名 | int(11) NOT NULL |
id_contact1 | 联系人1,当前所查询人 | int(11) NOT NULL |
id_contact2 | 联系人2,与联系人为好友 | int(11) NOT NULL |
call_count | 两联系人通话次数 | int(11) NOT NULL DEFAULT 0 |
call_duration_count | 两联系人通话持续时间 | int(11) NOT NULL DEFAULT 0 |
最终设计:
数据分析流程
3.4 数据展示
数据展示模块流程如图7所示: