主要是由于es聚合能力较弱,导致500w 的数据两重聚合的场景下,耗时都要10s以上,太慢了,clickhouse OLAP专门为聚合分析而生,性能的确很高,我在预研时用flink做一下数据同步
flink 现在是一个数据集成工具
本文是将kafka同步到clickhouse 测试过几次,1分钟能同步将近1000w的数据,其实主要是kafka 读取的快,因为我也做过mysql 同步到clickhouse 是真的慢, mysql 2000条读取速率,1000w数据要一个小时同步完成
主要是使用jdbc 然后预处理的批量插入, 这个大大提高插入性能
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String bootStrap = args[0];
String groupId = args[1];
String topic = args[2];
String startKafkaOffsetTimestamp = args[3];
String windowOffsetTime = args[4];
SimpleStringSchema simpleStringSchema = new org.apache.flink.api.common.serialization.SimpleStringSchema();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(bootStrap).setGroupId(groupId).setTopics(topic)
//.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='your_username' password='your_password';")
.setProperty("max.poll.records", "500").setProperty("retry.backoff.ms", "1000").setProperty("retries", "5").setProperty("reconnect.backoff.max.ms", "10000")
.setStartingOffsets(OffsetsInitializer.timestamp(Long.parseLong(startKafkaOffsetTimestamp)))
.setValueOnlyDeserializer(simpleStringSchema).build();
DataStreamSource<String> dataStream = env.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "kafka source");
DataStream<DBLogUsage> dataStreamForDb = dataStream.flatMap(new FlatMapFunction<String, DBLogUsage>() {
@Override
public void flatMap(String s, Collector<DBLogUsage> collector) throws Exception {
DBLogUsage dbWpsLogUsage;
try {
dbLogUsage = JacksonUtil.toJavaObject(s, new TypeReference<Message<DBLogUsage>>() {
}).getMsg();
} catch (Exception e) {
LOG.error("send:{}", e);
return;
}
collector.collect(dbLogUsage);
});
}
});
dataStreamForDb
.keyBy(DBLogUsage::getSn)
.addSink(new RichSinkFunction<DBLogUsage>() {
Connection connection;
PreparedStatement preparedStatement;
Long count = 0L;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
try {
ClickHouseDataSource dataSource = new ClickHouseDataSource("jdbc:clickhouse://xxxx:8123/default", null);
connection = dataSource.getConnection("default", "xxxx");
connection.setAutoCommit(false);
try {
Statement statement = connection.createStatement();
statement.execute("create table click_log_usage4( \n" +
" date_n UInt64, \n" +
" hid String, \n" +
") \n" +
"ENGINE = MergeTree() \n" +
"ORDER BY (date_n,hid)");
}catch (Exception e){
LOG.info("create table",e);
}
preparedStatement = connection.prepareStatement("insert into click_log_usage4(date_n,hid)values(?,?)");
} catch (Exception e) {
LOG.error("connect error", e);
throw e;
}
}
@Override
public void invoke(DBLogUsage value, Context context) throws Exception {
super.invoke(value, context);
preparedStatement.setLong(1, value.getDateN());
preparedStatement.setString(2, value.getHid());
preparedStatement.addBatch();
count++;
if (count == 2000) {
preparedStatement.executeBatch();
preparedStatement.clearBatch();
connection.commit();
connection.setAutoCommit(false);
count = 0L;
LOG.info("save batch infos");
}
}
});
env.execute("sync data to clickhouse");
}
发表回复