当前位置:首页 > 洗衣机 > 文章正文

双十一大年夜屏实时计算深度剖析

编辑:[db:作者] 时间:2024-08-25 00:11:49

什么是智能推举?

双十一大年夜屏实时计算深度剖析

定义: 根据用户行为习气所供应的数据, 系统供应策略模型,自动推举符合用户行为的信息。

例举:

比如根据用户对商品的点击数据(韶光周期,点击频次), 推举类似的商品;

根据用户的评价与满意度, 推举得当的品牌;

根据用户的利用习气与点击行为,推举类似的资讯。

小红书推举系统

实时流处理

. Flink处理(新一代大数据处理引擎)

实时数仓

什么是实时数仓

数据仓库(Data Warehouse),可简写为DW或DWH,是一个弘大的数据存储凑集,通过对各种业务数据进行筛选与整合,天生企业的剖析性报告和各种报表,为企业的决策供应支持。
实时仓库是基于Storm/Spark(Streaming)/Flink等实时处理框架,构建的具备实时性特色的数据仓库。

阿里巴巴菜鸟网络实时数仓设计

大数据剖析运用

. IoT数据剖析

什么是IoT

物联网是新一代信息技能,也是未来发展的趋势,英文全称为: Internet of things(IOT),顾名思义, 物联网便是万物相联。
物联网通过智能感知、识别技能与普适打算等通信感知技能,广泛运用于网络的领悟中,也因此被称为继打算机、互联网之后天下信息家当发展的第三次浪潮。

华为Iot数据剖析平台架构:

金融风控

风险是金融机构业务固有特性,与金融机构相伴而生。
金融机构盈利的来源便是承担风险的风险溢价。

金融机构中常见的六种风险:市场风险、信用风险、流动性风险、操作风险、荣誉风险及法律风险。
个中最紧张的是市场风险和信用风险。

线上信贷流程,通过后台大数据系统进行反敲诈和信用评估:

电商行业、

用户在电商的购物网站数据通过实时大数据剖析之后, 通过大屏汇总展示, 比如天猫的双11购物活动,通过大屏, 将全国上亿买家的订单数据可视化,实时性的动态展示,包含总览数据,流式TopN数据,多维区域统计数据等,极大的增强了对海量数据的可读性。

Flink快速入门

Flink概述

Flink是什么

Flink是一个面向数据流处理和批处理的分布式开源打算框架。

无界流VS有界流

任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事宜日志等等。

Apache Flink 善于处理无界和有界数据集。
精确的韶光掌握和有状态的打算,使得 Flink能够运行任何处理无界流的运用。

流数据分为无界流和有界流。

1) 无界流:有定义流的开始,但没有定义流的结束, 会一直地产生数据,无界流采取的是流处理办法。

2) 有界流:有定义流的开始, 也有定义流的结束, 须要在获取所有数据后再进行打算,有界流采取的是批处理办法

编程模型

DataSet 一样平常用来处理有界流数据。

DataStream一样平常用来处理无界流数据。

Flink根本案例

1. 环境搭建配置

POM配置FLINK集成

2. 批处理案例

功能: 通过批处理办法,统计日志文件中的非常数量。

流处理案例

功能: 根据IP统计访问次数

Flink支配配置

1. 安装配置JDK8环境

2. 下载Flink安装包

官方地址安装包

3. 安装配置

解压

tar -xvf flink-1.11.2-bin-scala_2.11.tgz

运行

bin/start-cluster.sh

主节点访问端口:

vi conf/masters:

localhost:8081

4. 访问掌握台

http://10.10.20.132:8081/#/overview

Available Task Slots: 有效任务槽数量

对应配置文件: vi conf/flink-conf.yaml

taskmanager.numberOfTaskSlots: 1

TaskManger与JobManager关系

Flink任务提交

第一种办法: 界面提交

修正代码配置

socket数据源连接,采取主机名称配置

DataStreamSource<String> socketStr = env.socketTextStream("flink1", 9911,

"\n");

工程代码打包

POM文件增加打包插件

把稳,这里不能采取spring-boot-maven-plugin打包插件, 否则flink不能正常识别。

提交任务

上传Jar包

接下来,在flink1节点上, 开启Socket交互端口9911

[root@flink1 flink-1.11.2]# nc -lk 9911

然后提交并实行任务

savepoint path: 容错机制中快照保存的路径。

运行验证

nc发送一些数据, 在TaskManager当中可以查看输出结果。

第二种办法: 命令行提交

在flink掌握台打消原有的Job任务。

上传Jar包

将Jar包上传至flink做事器:

提交任务

采取命令行办法提交任务:

验证结果

发送一些数据并在掌握台验证输出结果

Flink接入体系

Flink Connectors

Flink 连接器包含数据源输入与汇聚输出两部分。
Flink自身内置了一些根本的连接器,数据源输入包含文件、目录、Socket以及 支持从collections 和 iterators 中读取数据;汇聚输出支持把数据写入文件、标准输出(stdout)、标准缺点输出(stderr)和 socket。

官方地址

Flink还可以支持扩展的连接器,能够与第三方系统进行交互。
目前支持以下系统:

Apache Kafka (source/sink)Apache Cassandra (sink)Amazon Kinesis Streams (source/sink)Elasticsearch (sink)Hadoop FileSystem (sink)RabbitMQ (source/sink) Apache NiFi (source/sink)Twitter Streaming API (source)Google PubSub (source/sink)JDBC (sink)

常用的是Kafka、ES、HDFS以及JDBC。

JDBC(读/写)

Flink Connectors JDBC 如何利用?

功能: 将凑集数据写入数据库中

数据表:

自定义写入数据源

功能:读取Socket数据, 采取流办法写入数据库中。

Flink大屏数据实战

双十一大屏数据

总览数据

总发卖量/总发卖金额TopN: 热销商品/商品类目/商品PV/商品UV

区域/分类数据

不同区域发卖排名不同分类发卖排名

Canal同步做事安装

下载安装包安装包后台管理包

2. 解压

解压安装包:

mkdir -p /usr/local/canal

tar -xzvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/

解压管理包:

mkdir -p /usr/local/canal-admin

tar -xvf canal.admin-1.1.4.tar.gz -C /usr/local/canal-admin

初始化管理数据库

导入初始化数据脚本:

mysql -uroot -p654321 < /usr/local/canal-admin/conf/canal_manager.sql

修正MySQL做事同步配置

编辑配置文件:

vi /etc/my.cnf

增加同步配置:

[mysqld]

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

server_id=1 # MySQL ID做事标识

重启做事:

service mysqld restart

检讨同步功能是否开启

创建同步用户:

mysql> FLUSH PRIVILEGES;

mysql> CREATE USER canal IDENTIFIED BY 'canal';

授予同步所需权限:

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO

'canal'@'%';

Query OK, 0 rows affected (0.00 sec)

mysql> FLUSH PRIVILEGES;

Query OK, 0 rows affected (0.00 sec)

修正后台管理配置文件

vi /usr/local/canal-admin/conf/application.yml

配置内容:

先启动后台管理做事, 再启动Canal做事, 后台管理做事启动命令

/usr/local/canal-admin/bin/startup.sh

Canal做事配置

vi /usr/local/canal/conf/canal_local.properties

启动Canal做事

/usr/local/canal/bin/startup.sh local

后台管理配置

修正Server管理配置:

修正Instance配置(如果没有, 则新建,载入模板即可):

regex同步配置规则:

常见例子:

1. 所有表:. or .\..

2. canal schema下所有表: canal\..

3. canal下的以canal打头的表:canal\.canal.

4. canal schema下的一张表:canal.test1

5. 多个规则组合利用:canal\..,mysql.test1,mysql.test2 (逗号分隔)

热销商品统计

功能实现流程:

订单数据源的实现flink代码功能实现Flink 与 Spring Boot的集成测试验证,比对SQL:

select goodsId, sum(execPrice execVolume) as totalAmount from t_order

where execTime < 韶光窗口的结束韶光戳 group by goodsId order by totalAmount

desc

数据呈现

kibana做事安装

Kibana是一个针对Elasticsearch的开源剖析及可视化平台,用来搜索、查看交互存储在Elasticsearch索引中的数据。

到官网下载, Kibana安装包, 与之对应6.8.1版本, 选择Linux 64位版本下载,并进行解压。
Kibana启动不能利用root用户, 利用上面创建的elsearch用户, 进行赋权:

chown -R elsearch:elsearch kibana-6.8.1-linux-x86_64

修正配置文件

vi config/kibana.yml , 修正以下配置:

# 做事端口

server.port: 5601

# 做事地址

server.host: "0.0.0.0"

# elasticsearch做事地址, 填写集群所有节点地址, 之间用逗号分割

elasticsearch.hosts: ["http://10.10.20.28:9200", "http://10.10.20.29:9200",

"http://10.10.20.30:9200"]

启动kibana

./kibana -q

看到以下日志, 代表启动正常

log [01:40:00.143] [info][listening] Server running at http://0.0.0.0:5601

如果涌现启动失落败的情形, 要检讨集群各节点的日志, 确保做事正常运行状态。

订单状态监控统计(CEP)

增加订单支付流水数据源创建对应的表与实体实体: OrderPaymentBO: JoinOrderAddress修正Canal的后台配置, 增加地址数据源的监听行列步队。
核心代码实现:实现订单支付流水数据源的监听处理。
定义CEP处理规则,解析出支付成功的订单。

5. 测试验证

检讨订单状态是未支付 -》 已支付的数据

select from t_order_payment pay where exists (

select 1 from t_order_payment tmp where tmp.orderId = pay.orderId and

tmp.status = 0

) and pay.status = 1

检讨超时的数据: 初始状态为0, 指定时间之内没有已支付的数据。

布隆过滤器

功能: 统计商品在一段韶光内的UV(采取布隆过滤器)

核心代码:

本站所发布的文字与图片素材为非商业目的改编或整理,版权归原作者所有,如侵权或涉及违法,请联系我们删除,如需转载请保留原文地址:http://www.baanla.com/xyj/40267.html

XML地图 | 自定链接

Copyright 2005-20203 www.baidu.com 版权所有 | 琼ICP备2023011765号-4 | 统计代码

声明:本站所有内容均只可用于学习参考,信息与图片素材来源于互联网,如内容侵权与违规,请与本站联系,将在三个工作日内处理,联系邮箱:123456789@qq.com