flink 从kafka 数据写入到clickhouse

主要是由于es聚合能力较弱,导致500w 的数据两重聚合的场景下,耗时都要10s以上,太慢了,clickhouse OLAP专门为聚合分析而生,性能的确很高,我在预研时用flink做一下数据同步

flink 现在是一个数据集成工具

本文是将kafka同步到clickhouse 测试过几次,1分钟能同步将近1000w的数据,其实主要是kafka 读取的快,因为我也做过mysql 同步到clickhouse 是真的慢, mysql 2000条读取速率,1000w数据要一个小时同步完成

主要是使用jdbc 然后预处理的批量插入, 这个大大提高插入性能

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String bootStrap = args[0];
        String groupId = args[1];
        String topic = args[2];
        String startKafkaOffsetTimestamp = args[3];
        String windowOffsetTime = args[4];

        SimpleStringSchema simpleStringSchema = new org.apache.flink.api.common.serialization.SimpleStringSchema();
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(bootStrap).setGroupId(groupId).setTopics(topic)
                //.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';")
                .setProperty("max.poll.records", "500").setProperty("retry.backoff.ms", "1000").setProperty("retries", "5").setProperty("reconnect.backoff.max.ms", "10000")
                .setStartingOffsets(OffsetsInitializer.timestamp(Long.parseLong(startKafkaOffsetTimestamp)))
                .setValueOnlyDeserializer(simpleStringSchema).build();

        DataStreamSource<String> dataStream = env.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "kafka source");

        DataStream<DBLogUsage> dataStreamForDb = dataStream.flatMap(new FlatMapFunction<String, DBLogUsage>() {
            @Override
            public void flatMap(String s, Collector<DBLogUsage> collector) throws Exception {
                DBLogUsage dbWpsLogUsage;
                try {
                    dbLogUsage = JacksonUtil.toJavaObject(s, new TypeReference<Message<DBLogUsage>>() {
                    }).getMsg();
                } catch (Exception e) {
                    LOG.error("send:{}", e);
                    return;
                }
                    collector.collect(dbLogUsage);
                });
            }

        });

        dataStreamForDb
                .keyBy(DBLogUsage::getSn)
                .addSink(new RichSinkFunction<DBLogUsage>() {

                    Connection connection;

                    PreparedStatement preparedStatement;

                    Long count = 0L;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);

                        try {
                            ClickHouseDataSource dataSource = new ClickHouseDataSource("jdbc:clickhouse://xxxx:8123/default", null);
                            connection = dataSource.getConnection("default", "xxxx");
                            connection.setAutoCommit(false);
                            try {
                            Statement statement = connection.createStatement();
                            statement.execute("create table click_log_usage4(  \n" +
                                    "    date_n UInt64,  \n" +
                                    "    hid String,  \n" +
                                    ")  \n" +
                                    "ENGINE = MergeTree()  \n" +
                                    "ORDER BY (date_n,hid)");
                            }catch (Exception e){
                                LOG.info("create table",e);
                            }
                            preparedStatement = connection.prepareStatement("insert into click_log_usage4(date_n,hid)values(?,?)");

                        } catch (Exception e) {
                            LOG.error("connect error", e);
                            throw e;
                        }

                    }

                    @Override
                    public void invoke(DBLogUsage value, Context context) throws Exception {
                        super.invoke(value, context);

                        preparedStatement.setLong(1, value.getDateN());
                        preparedStatement.setString(2, value.getHid());
                        preparedStatement.addBatch();

                        count++;
                        if (count == 2000) {
                            preparedStatement.executeBatch();
                            preparedStatement.clearBatch();
                            connection.commit();
                            connection.setAutoCommit(false);
                            count = 0L;
                            LOG.info("save batch infos");
                        }
                    }
                });

        env.execute("sync data to clickhouse");
    }

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注