flink 实时去重日活统计

部署安装

docker network create flink-network

docker run -itd –name=jobmanager –publish 8091:8081 –network flink-network –env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink jobmanager

docker run -itd –name=taskmanger –network flink-network –env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink taskmanager

FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"

docker run \
    -itd \
    --name=jobmanager \
    --network flink-network \
    --publish 8081:8081 \
    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    flink:1.18.1-scala_2.12 jobmanager
docker run \
    -itd \
    --name=taskmanager \
    --network flink-network \
    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
    flink:1.18.1-scala_2.12 taskmanager

flink 使用场景

大数据场景下,从一个数据源到另一个数据源
• 数据清洗,有字段为null,数据错误
• 数据多表关联聚合为宽表
• 数据聚合,日活,月活
• 数据过滤,在无休止的数据流中找到需要的数据或者事件

流批一体: 数据既可以当流一直计算,也可以当成批计算一批数据

连接器: 提供了高效的kafka连接器,jdbc,es等连接器,开箱即用,用来做数据读取输入,和数据输出

// kafka 连接器,消费kafka数据,转成 dataStream
        SimpleStringSchema simpleStringSchema = new org.apache.flink.api.common.serialization.SimpleStringSchema();
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(bootStrap).setGroupId(groupId).setTopics(topic)
                //.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';")
                .setProperty("max.poll.records", "500").setProperty("retry.backoff.ms", "1000").setProperty("retries", "5").setProperty("reconnect.backoff.max.ms", "10000")
                .setStartingOffsets(OffsetsInitializer.timestamp(Long.parseLong(startKafkaOffsetTimestamp)))
                .setValueOnlyDeserializer(simpleStringSchema).build();

        DataStreamSource<String> dataStream = env.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "kafka source");

分区:根据key将数据分成独立的流(每个key一个流),某个流的数据,可以进行调用,类似kafka 的分区,
.keyBy(DBLogUsage::getId)
窗口: 按照时间来将数据流划分成一个一个的桶,然后交给后面的窗口函数进行计算
.window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
水印: 划分窗口时用的时间,有些时间是根据业务后面添加的,要主动告诉窗口当前数据流里面的每一条数据的水印是啥,才能划分到不同的窗口

.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(1)) {
                    @Override
                    public long extractTimestamp(Event event) {
                        return event.timestamp;
                    }
                })

窗口函数:数据被拆成不同的窗口后,进行计算的逻辑

// 实现聚合操作的
.process(new ProcessWindowFunction<Event, Object, String, TimeWindow>() {
 @Override
                    public void process(String s, ProcessWindowFunction<Event, Object, String, TimeWindow>.Context context, Iterable<Event> iterable, Collector<Object> collector) throws Exception {
                        long sum = 0;
                        for (Event e : iterable) {
                            sum++;
                        }
                        collector.collect(new Tuple2<>(s, sum));
                    }
                })

数据统计的发展

在阶段1:每天启动定时任务,从mysql中分批读取数据,在java 代码中来进行数据统计
• 业务复杂,每次定时任务要自己写很多聚合逻辑,还要自己处理mysql连接异常,读取异常
• 没有有效的可靠性机制,定时任务失败没有重试,如果失败,只能后续补数据.
• 依赖sql 优化,因为是基于mysql 做查询,如果查某一天的日活数据,因为超时查询失败就会一直失败

在阶段2: 引入了flink 框架,实现了读取数据库里面的数据,然后分批进行数据流运算,启用了一个时间为一天的窗口

在阶段3: 日活数据能实时看到,比如现在9点能看到 0点到9点当前的日活结果

根据上面的flink 代码如何实现第二阶段批处理和第三阶段的实时流

其实第二阶段很好实现就是使用窗口函数.window(TumblingEventTimeWindows.of(Time.days(1))) 设置一个滚动一天的窗口,等一天后执行窗口函数process等进行聚合,但是问题来了,这种批处理要等一天,不能实时看到一天内的日活.数据存在flink里面一天过去后才进行处理

如何能够使用flink 且能看到一天内的实时的日活呢
其实代码使用的不变,流程变了一点
如下步骤:

  1. 先根据时间进行keyBy,分流,将比如0729的日活数据分流
    2.在process 里面使用MapState进行userId 去重,然后使用ValueState 对当前日活数据进行加一操作,
  2. 在第二步输出的内容不再是日活最终数据了,而是日活valuseState 的值, 比如 六条日活数据,输出1,2,3,4,5,6
  3. 在第三步输出的dataSteam流中(1,2,3,4,5,6) 再起一个一秒的窗口,获取流里面最大的值6,然后进行sink 输出到数据库等中

这样就可以做到去重及准实时获取日活数据


评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注