spark可视化要解决的问题
针对普通客户端浏览和分析大数据困难的问题, 结合 Spark 和 LOD 技术, 以热图为例提出一种面向大数据可视化技术框架 首先利用 Spark 平台分层并以瓦片为单位并行计算, 然后将结果分布式存储在 HDFS 上, 最后通过web 服务器应用Ajax技术结合地理信息提供各种时空分析服务文中重点解决了数据点位置和地图之间的映射, 以及由于并行计算导致的热图瓦片之间边缘偏差这2个问题实验结果表明,该方法将数据交互操作与数据绘制和计算任务分离, 为浏览器端大数据可视化提供了一个新的思路
目前大数据可视化面临的主要问题包括:
1) 数据复杂散乱 经常发生数据缺失、数据值不对、结构化程度不高
2) 迭代式分析成本高 在初次查询后如果发现结果不对, 改变查询条件重新查询代价高
3) 构建复杂工作流困难 从多数据源取得包含各种不同特征的原始数据,然后执行机器学习算法或者复杂查询, 探索过程漫长
4) 受到原有技术限制, 对小规模数据分析很难直接扩展到大数据分析
5) 数据点的规模超过普通显示器可能提供的有效像素点
Hadoop和Spark先后成为大数据分析工业界的研究热点,前者是一个能够对大量数据提供分布式处理的软件框架和文件系统(hadoopdistrib-utedfilesystem,HDFS);后者是一个通用大数据计算平台,可以解决大数据计算中的批处理、 交互查询及流式计算等核心问题Zeppelin可以作为Spark的解释器,进一步提供基于 Web 页面的数据分析和可视化协作可以输出表格、柱状图、折线图、饼状图、点图等,但是无法提供更为复杂的交互分析手段
相关工作
面向 web 的轻量级数据可视化工具主要是一些JavaScript库,利用canvas或者svg画散点,svg不能支持十亿以上的节点,使用 canvas 画布绘图的heatmapjs 在面对大数据量时也无能为力
热图是一种常用的基本数据可视化技术,通常用颜色编码数值大小,并以矩阵或方格形式整齐排列,在二维平面或者地图上呈现数据空间分布,被广泛应用在许多领域近年来,许多研究者成功地将热图应用在眼动数据可视分析上, 有效地概括并表达用户视觉注意力的累计分布
LOD针对数据可视化绘制速度慢、效率低等问题,孙敏等提出基于格网划分的LOD(levelsofdetail)分层方法, 实现对大数据集 DEM 数据的实时漫游
并行计算大数据热图
经纬度换算
并行计算
在 Spark 平台上实现热图的绘制,首先将经纬度坐标转换为对应不同瓦片上的像素坐标每个基站的辐射范围可近似认为相同, 即每个基站(收集数据的基站坐标)的初始影响力近似相同,因此可采用影响力叠加法将数据点绘制到画布上,然后做径向渐变,叠加出每个位置的影响大小,得到初始灰度图,如图2a所示然后将每一个像素点着色,根据每个像素的灰度值大小,以及调色板将灰度值映射成相对应的颜色 图 2b 是一个透明的PNG 格式, 调色板如图 2c 所示 本文中出现的热图均采用图 2c 调色板
image_1bv9vh4oqfatb5a12s4ui1sbe9png-1859kB
将计算出的热图结果存储在HDFS上,并与经纬度以及层级建立索引关系方便以后读取,拼接后的热图绘制效果如图 3 所示
image_1bv9vj0vb16um1pt91qrs17bh1jam9png-128kB
瓦片边缘问题
image_1bv9vno7315gi1l1r1gog1cdg1tdnmpng-346kB
边缘热点可能处于2片或者4片瓦片之间,因此需要通过2次或者4次重复计算通过本文提出的重叠计算方法可以解决热图分片计算的边缘问题。
实验
image_1bv9vr8311fe817tt11bb22qlr113png-2842kB
总结
本文提出的大数据热图可视化方法能够有效地解决前端绘制计算量大的问题,通过在Spark平台上以瓦片为单位分层次并行计算热图, 将生成的热图存储在HDFS上,然后通过web服务器提供浏览器交互服务, 用户可以通过在地图上拖动鼠标或放大/缩小等操作选择感兴趣区域,再分析不同时间点用户行为差异或渐变过程 通过解决热图数据点和地图映射关系问题以及瓦片热图之间的边缘问题,提供大数据热图绘方法, 以满足用户交互、协同和共享等多方面需求该方法可以拓展到其他常用可视化方法,如ScatterPlot, Bar Chart,平行坐标等但绘制过程是基于Spark计算后得到的离线数据,在实时性上还不能得到保证, 在下一步工作中, 我们将着手利用 Spark Streaming 库来解决这一问题
spark和hadoop的区别:诞生的先后顺序、计算不同、平台不同。
诞生的先后顺序,hadoop属于第一代开源大数据处理平台,而spark属于第二代。属于下一代的spark肯定在综合评价上要优于第一代的hadoop。
计算不同spark和hadoop在分布式计算的底层思路上,其实是极为相似的,即mapreduce分布式运算模型:将运算分成两个阶段,阶段1-map,负责从上游拉取数据后各自运算,然后将运算结果shuffle给下游的reduce,reduce再各自对通过shuffle读取来的数据进行聚合运算spark和hadoop在分布式计算的具体实现上,又有区别;hadoop中的mapreduce运算框架,一个运算job,进行一次map-reduce的过程;而spark的一个job中,可以将多个map-reduce过程级联进行。
平台不同spark和hadoop区别是,spark是一个运算平台,而hadoop是一个复合平台(包含运算引擎,还包含分布式文件存储系统,还包含分布式运算的资源调度系统),所以,spark跟hadoop来比较的话,主要是比运算这一块大数据技术发展到目前这个阶段,hadoop主要是它的运算部分日渐式微,而spark目前如日中天,相关技术需求量大,offer好拿。
两台服务器手动部署大数据平台
##### 初始服务器数量
- 2台centos7
##### 建议配置
- 32G(RAM)
- 24cpu
- 10t(SATA)
### 1环境
- 系统centos7
- jdk:180_171(64位)
- zookeeper:348
- spark-210-bin-hadoop26
- kafka_210-01021
- hadoop-270
- hbase-126
- elasticsearch-630
### 2系统准备
对应的安装包文件:
elasticsearch-630targz
hadoop-270targz
hbase-126-bintargz
jdk-8u171-linux-x64targz
kafka_210-01021tgz
mysql-5723-1el7x86_64rpm-bundletar
spark210hadoop26tgzgz
zookeeper-348targz
一、 配置好hosts
```
两台设备的host
ip1 hello1
ip2 hello2
关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
二、机器之间做好免密
1 在hello1服务器中,cd /root/
2 ssh-keygen -trsa (全部按回车,走默认配置)
3 cat ~/ssh/id_rsapub >> ~/ssh/authorized_keys
4 chmod 600 ~/ssh/authorized_keys
5 scp ~/ssh/authorized_keys root@hello2:~/ssh/
到此处时可以实现hello1机器上通过root账户登录到hello2中,但从hello2中无法通过免密码登录到hello1服务器。
6 在hello2服务器中,cd /root/
7 ssh-keygen -trsa (全部按回车,走默认配置)
8 cat ~/ssh/id_rsapub >> ~/ssh/authorized_keys
9 scp ~/ssh/authorized_keys root@hello1:~/ssh/
到此处时可以实现hello1机器与hello2机器之间免密码互通
三、建立一个用户操作elasticsearch用户,后期所有安装软件放在该目录下(当前使用root账户安装)
1添加用户:
useradd -m -s /bin/bash es
2为该用户设置密码:
password es
四、安装JDK
如果系统自带openjdk,先将其卸载掉!
1创建jdk安装路径(hello1、hello2都执行)
执行: mkdir /usr/java
2解压缩jdk到安装目录
执行: tar -zxvf jdk-8u171-linux-x64targz -C /usr/java/
3添加环境变量
vi /etc/profile,添加以下语句
export JAVA_HOME=/usr/java/jdk180_171
export CLASSPATH=:$JAVA_HOME/lib/dtjar:$JAVA_HOME/lib/toolsjar
export PATH=$PATH:$JAVA_HOME/bin
执行:source /etc/profile
4复制安装包和数据目录到hello2
scp -r /usr/java/jdk180_171 hello2:/usr/java/
scp /etc/profile hello2:/etc/
登录到hello2上,进入/home/es目录
执行: source /etc/profile
5、验证:
两台服务器上分别执行: java -version,查看输出的版本是否与安装的版本一致。
五、安装mysql
1如果centos系统中自带mariadb,先卸载mariadb。
2解压mysql安装包程序
执行:tar -xvf mysql-5723-1el7x86_64rpm-bundletar
3依次安装里面rpm包组建
rpm -ivh mysql-community-common-5723-1el7x86_64rpm
rpm -ivh mysql-community-libs-5723-1el7x86_64rpm
rpm -ivh mysql-community-client-5723-1el7x86_64rpm
rpm -ivh mysql-community-server-5723-1el7x86_64rpm
rpm -ivh mysql-community-devel-5723-1el7x86_64rpm
4启动MySQL
执行: systemctl start mysqld
5登录mysql服务器
这种方式安装好后,会再mycnf文件中自动生成一个密码,
执行:cat /var/log/mysqldlog | grep password, 出现如下记录:
2017-09-15T01:58:11863301Z 1 [Note] A temporary password is generated for root@localhost: m-NdrSG4ipuO
其中“m-NdrSG4ipuO”为mysql root账户的初始密码。
登录:
执行: mysql -uroot -p
输入密码: m-NdrSG4ipuO,即可进入mysql服务器。
后续可自行修改root密码,创建新账户等操作。
六、安装zookeeper
1解压zookeeper安装包到指定目录(/home/es)
tar -zxvf zookeeper-348targz -C /home/es
2创建程序软连接
cd /home/es/
ln -s zookeeper-348 zookeeper
3添加执行路径环境
vi /etc/profile
添加
export ZOOKEEPER_HOME=/home/es/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
执行
source /etc/profile
4修改配置文件
cd /home/es/zookeeper
cp conf/zoo_samplecfg conf/zoocfg
在/home/data下创建对应的zookeeper数据存储目录
mkdir /home/data/zookeeper
mkdir /home/data/zookeeper/data
mkdir /home/data/zookeeper/log
修改配置文件:conf/zoocfg,添加以下语句
dataDir=/home/data/zookeeper/data
dataLogDir=/home/data/zookeeper/log
server1=hello1:2888:3888
server2=hello2:2888:3888
5创建server表示符文件
touch /home/data/zookeeper/data/myid
echo echo 1>/home/data/zookeeper/data/myid
6复制安装包和数据目录到hello2
scp -r /home/es/zookeeper-348 es@hello2:/home/es
scp -r /home/data/zookeeper es@hello2:/home/data
scp /etc/profile es@hello2:/etc
登录到hello2上
cd /home/es
ln -s zookeeper-348 zookeeper
echo echo 2>/home/data/zookeeper/data/myid
执行
source /etc/profile
7两台机器上分别执行
zkServersh start
8验证
jps | grep QuorumPeerMain,查看是否有该进程
zkServersh status,查看服务状态
六、安装kafka
1解压kafka安装包到指定目录(/home/es)
tar -zxvf kafka_210-01021tgz -C /home/es
2创建程序软连接
cd /home/es/
ln -s kafka_210-01021 kafka
3修改配置文件
备份:
cp config/serverproperties config/serverpropertiesbak
创建kafka日志目录:
mkdir /home/data/kafka
mkdir /home/data/kafka/kafka-logs
修改:config/serverproperties,具体对应字段如下:
brokerid=0
deletetopicenable=true
numnetworkthreads=10
numiothreads=32
socketsendbufferbytes=102400
socketreceivebufferbytes=102400
socketrequestmaxbytes=104857600
logdirs=/home/data/kafka/kafka-logs
numpartitions=1
numrecoverythreadsperdatadir=1
logretentionhours=168
logsegmentbytes=1073741824
logretentioncheckintervalms=300000
zookeeperconnect=hello1:2181,hello2:2181
zookeeperconnectiontimeoutms=6000
6复制安装包和数据目录到hello2
scp -r /home/es/kafka_210-01021 es@hello2:/home/es
scp -r /home/data/kafka es@hello2:/home/data
修改hello2中的配置
登录到hello2上,cd /home/es/kafka,修改config/serverproperties中brokerid值为2
7启动kafka
在两台机器的/home/es/kafka中,创建一个日志存放目录:mkdir start_log,执行以下命令:
nohup bin/kafka-server-startsh config/serverproperties > start_log/kafka_start_log 2>&1 &
8验证运行情况
jps | grep Kafka,查看进程
通过kafka命令查看topic。
七、安装hadoop
1解压hadoop安装包到指定目录(/home/es)
tar -zxvf hadoop-270targz -C /home/es
2创建程序软连接
cd /home/es/
ln -s hadoop-270 hadoop
3创建数据存放目录
mkdir /home/data/hadoop
mkdir /home/data/hadoop/tmp
mkdir /home/data/hadoop/dfs
mkdir /home/data/hadoop/dfs/data
mkdir /home/data/hadoop/dfs/name
4修改配置文件
修改/home/es/hadoop/etc/hadoop/core-sitexml
<configuration>
<property>
<name>fsdefaultFS</name>
<value>hdfs://hello1:9000</value>
</property>
<property>
<name>hadooptmpdir</name>
<value>file:/home/data/hadoop/tmp</value>
</property>
<property>
<name>iofilebuffersize</name>
<value>131702</value>
</property>
</configuration>
修改/home/es/hadoop/etc/hadoop/hdfs-sitexml
<configuration>
<property>
<name>dfsnamenodenamedir</name>
<value>file:/home/data/hadoop/dfs/name</value>
</property>
<property>
<name>dfsdatanodedatadir</name>
<value>file:/home/data/hadoop/dfs/data</value>
</property>
<property>
<name>dfsreplication</name>
<value>2</value>
</property>
<property>
<name>dfsnamenodesecondaryhttp-address</name>
<value>hello1:9001</value>
</property>
<property>
<name>dfswebhdfsenabled</name>
<value>true</value>
</property>
</configuration>
修改/home/es/hadoop/etc/hadoop/mapred-sitexml
<configuration>
<property>
<name>mapreduceframeworkname</name>
<value>yarn</value>
</property>
<property>
<name>mapreducejobhistoryaddress</name>
<value>hello1:10020</value>
</property>
<property>
<name>mapreducejobhistorywebappaddress</name>
<value>hello1:19888</value>
</property>
</configuration>
修改/home/es/hadoop/etc/hadoop/yarn-sitexml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarnnodemanageraux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarnnodemanagerauxservicesmapreduceshuffleclass</name>
<value>orgapachehadoopmapredShuffleHandler</value>
</property>
<property>
<name>yarnresourcemanageraddress</name>
<value>hello1:8032</value>
</property>
<property>
<name>yarnresourcemanagerscheduleraddress</name>
<value>hello1:8030</value>
</property>
<property>
<name>yarnresourcemanagerresource-trackeraddress</name>
<value>hello1:8031</value>
</property>
<property>
<name>yarnresourcemanageradminaddress</name>
<value>hello1:8033</value>
</property>
<property>
<name>yarnresourcemanagerwebappaddress</name>
<value>hello1:8088</value>
</property>
<property>
<name>yarnnodemanagerresourcememory-mb</name>
<value>768</value>
</property>
</configuration>
配置/home/es/hadoop/etc/hadoop目录下hadoop-envsh、yarn-envsh的JAVA_HOME(不设置的话,启动不了)
export JAVA_HOME=/usr/java/jdk180_171
配置/home/es/hadoop/etc/hadoop目录下的slaves,删除默认的localhost,增加2个从节点,
hello1
hello2
5、将配置好的Hadoop复制到各个节点对应位置上,通过scp传送
scp -r /home/es/hadoop-270 hello2:/home/es/
scp -r /home/data/hadoop hello2:/home/data/
登录到hello2上,进入/home/es目录
执行: ln -s hadoop-270 hadoop
6、格式化nameNode及启动hadoop
在主服务器启动hadoop,从节点会自动启动,进入/home/es/hadoop目录
初始化,输入命令,bin/hdfs namenode -format
全部启动sbin/start-allsh,也可以分开sbin/start-dfssh、sbin/start-yarnsh
输入命令,jps,可以看到相关信息
7、验证hadoop运行情况
浏览器打开http://hello1:8088/
浏览器打开http://hello1:50070/
8、添加hadoop环境变量到/etc/profile
export HADOOP_HOME=/home/es/hadoop export PATH=$PATH:$HADOOP_HOME/sbin
export PATH=$PATH:$HADOOP_HOME/bin
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djavalibrarypath=$HADOOP_HOME/lib/native"
执行: source /etc/profile
八、安装Hbase
1解压hbase安装包到指定目录(/home/es)
tar -zxvf hbase-126-bintargz -C /home/es
2创建程序软连接
cd /home/es/
ln -s hbase-126 hbase
3添加hbase环境变量到/etc/profile
export HBASE_HOME=/home/es/hbase
export PATH=$HBASE_HOME/bin:$PATH
执行:source /etc/profile
4修改HBASE配置文件
vi /home/es/hbase/conf/hbase-envsh
增加: export JAVA_HOME=/usr/java/jdk180_171
修改: export HBASE_MANAGES_ZK=false
vi /home/es/hbase/conf/hbase-sitexml
修改类容:
<configuration>
<property>
<name>hbaserootdir</name> <!-- hbase存放数据目录 -->
<value>hdfs://hello1:9000/hbase/hbase_db</value>
<!-- 端口要和Hadoop的fsdefaultFS端口一致-->
</property>
<property>
<name>hbaseclusterdistributed</name> <!-- 是否分布式部署 -->
<value>true</value>
</property>
<property>
<name>hbasezookeeperquorum</name> <!-- list of zookooper -->
<value>hello1,hello2</value>
</property>
<property><!--zookooper配置、日志等的存储位置 -->
<name>hbasezookeeperpropertydataDir</name>
<value>/home/es/hbase/zookeeper</value>
</property>
</configuration>
配置regionservers,vi /home/es/hbase/conf/regionservers
去掉默认的localhost,加入hello1、hello2
5、将配置好的hbase复制到各个节点对应位置上,通过scp传送
scp -r /home/es/hbase-126 hello2:/home/es/
scp /etc/profile hello2:/etc/
登录到hello2上,进入/home/es目录
执行: ln -s hbase-126 hbase
source /etc/profile
6、hbase的启动
hello1中执行: start-hbasesh
7、验证hbase运行情况
输入jps命令查看进程是否启动成功,若 hello1上出现HMaster、HRegionServer、HQuormPeer,hello2上出现HRegionServer、HQuorumPeer,就是启动成功了。
输入hbase shell 命令 进入hbase命令模式,输入status命令,查看运行状态。
在浏览器中输入http://hello1:16010就可以在界面上看到hbase的配置
注意事项:
正常安装后,创建普通不带压缩表可以正常读写,当使用snappy进行压缩创建表时,该表无法再regionServer中启动!
解决方法:
1在hbase-sitexml文件中添加一下属性
<property>
<name>hbaseregionservercodecs</name>
<value>snappy</value>
</property>
2每台机器中将hadoop_nativezip解压缩到hbase安装目录的lib下,执行 unzip hadoop_nativezip $HBASE_HOME/lib/
3在$HBASE_HOME/conf/hbase-envsh 中添加:export HBASE_LIBRARY_PATH=/home/es/hbase/lib/native
4重启Hbase服务即可
九、Spark安装
1解压hbase安装包到指定目录(/home/es)
tar -zxvf spark210hadoop26tgzgz -C /home/es
2创建程序软连接
cd /home/es/
ln -s spark210hadoop26 spark
3修改配置文件
mv /home/es/spark/conf/spark-envshtemplate /home/es/spark/conf/spark-envsh
vi /home/es/spark/conf/spark-envsh
修改对应配置:
export JAVA_HOME=/usr/java/jdk180_171
export SPARK_MASTER_IP=hello1
export SPARK_MASTER_PORT=7077
export SPARK_LOCAL_IP=hello1
修改slaves文件
mv /home/es/spark/conf/slavestemplate /home/es/spark/conf/slaves
vi /home/es/spark/conf/slaves
将localhost修改成:
hello1
hello2
5、将配置好的hbase复制到各个节点对应位置上,通过scp传送
scp -r /home/es/spark210hadoop26 hello2:/home/es/
登录到hello2上,进入/home/es目录
执行: ln -s spark210hadoop26 spark
在hello2中修改/home/es/spark/conf/spark-envsh
export JAVA_HOME=/usr/java/jdk180_171
export SPARK_MASTER_IP=hello1
export SPARK_MASTER_PORT=7077
export SPARK_LOCAL_IP=hello2
6、启动spark
cd /home/es/spark
执行: sbin/start-allsh
7、检测执行结果
jps | grep Worker,看是否有相应的进程。
十、安装elasticsearch
由于elasticsearch,用root账户无法启动,故该组件用es账户安装
1、切换到es账户: su es
2、解压hbase安装包到指定目录(/home/es)
tar -zxvf elasticsearch-630targz -C /home/es/
创建程序软连接
cd /home/es/
ln -s elasticsearch-630 elasticsearch
3、修改配置文件
vi /home/es/elasticsearch/config/elasticsearchyml
# 集群的名字
clustername: crrc-health
# 节点名字
nodename: node-1
# 数据存储目录(多个路径用逗号分隔)
pathdata: /home/data1/elasticsearch/data
# 日志目录
pathlogs: /home/data1/elasticsearch/logs
#本机的ip地址
networkhost: hello1
#设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点
discoveryzenpingunicasthosts: ["hello1", "hello2"]
# 设置节点间交互的tcp端口(集群),(默认9300)
transporttcpport: 9300
# 监听端口(默认)
httpport: 9200
# 增加参数,使head插件可以访问es
httpcorsenabled: true
httpcorsallow-origin: ""
4、创建elasticsearch数据和存储目录
mkdir /home/data1/elasticsearch
mkdir /home/data1/elasticsearch/data
mkdir /home/data1/elasticsearch/logs
5、修改linux系统的默认硬限制参数
切换至root用户: su root
vim /etc/security/limitsconf
添加:
es soft nofile 65536
es hard nofile 65536
退出es登录,重新用es账户登录,使用命令:ulimit -Hn查看硬限制参数。
vi /etc/sysctlconf
添加:
vmmax_map_count=655360
执行:
sysctl -p
6、将配置好的elasticsearch复制到各个节点对应位置上,通过scp传送
scp -r /home/es/elasticsearch-630 hello2:/home/es/
scp -r /home/data1/elasticsearch hello2:/home/data1/
登录到hello2上,进入/home/es目录
执行: ln -s elasticsearch-630 elasticsearch-630
在hello2中修改/home/es/elasticsearch/config/elasticsearchyml
修改: networkhost: hello2
7、启动elasticsearch
使用es账户
执行:
/home/es/elasticsearch/bin/elasticsearch -d
8、验证
控制台中输入:curl http://hello1:9200
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
:Spark是一种安全的、经正式定义的编程语言,被设计用来支持一些安全或商业集成为关键因素的应用软件的设计。其通过运行用户定义的main函数,在集群上执行各种并发操作和计算Spark提供的最主要的抽象,Spark的正式和明确的定义使得多种静态分析技术在Spark源代码的应用中成为可能。
使用IDE新建Scala 或 Java 工程,确保项目结构符合 Maven 推荐的项目结构。
以IDEA为例:
从静态数据源(Parquet,Json,CVS,JDBC,Hive,RDDs)读取数据,运行分析
再 resource 目录构建一个 Json 数据源 datajson :
新建 Static Data Spark Demoscala :
以上,我们拟对数据进行展示和基本的筛选工作(age > 10)
开启调试,可以看到 log 中Spark执行了 3 个 Job ,并已经正确输出了预期的结果。
接下来就可以根据需求进行更复杂的数据处理操作
从Kafka、Flume、S3/HDFS、kinesis、Twitter等数据源读取数据进行实时分析
例:从 Kafka 读取流数据,进行实时处理。
由于读取Kafka流式数据,我们需要模拟kafka流。
参考Kafka文档
核心文件 KafkaApplicationjava
applicationyml
以上,我们向Kafka服务器的 topic 为 saprk 上不断发送数据以模拟数据流。
现在,启动程序开始模拟数据流
复用上例中的目录结构,也可以新建一个 sbt 项目。
新建文件 StreamDataSparkDemoscala
以上,我们从Kafaka服务器读取一个 topic 为 spark 的流,然后进行展示。
运行程序,输出如下:
取出数据之后,就可以用于实时分析了。
假设topic spark 为新注册的用户信息,我们可以统计新用户的每实时注册量,以及阶段内新注册用户性别比例。
在 StreamDataSparkDemoscala 中修改
<未完待续>
0条评论