内容纲要
背景
后台通过通用化配置,创建了Kafka-topic以及canal端实时数据采集,已经能通过canal向Kafka推送binlog消息;现在作为binlog消息消费方的下游,需要自动创建Kafka-consumer并动态注入到容器中,实现消息监听,并处理数据消费逻辑,最后更新到solr索引中;
考虑的因素
- 如何拿到新增的topic配置?
2.以什么方式创建kafka-consumer并投入使用?
3.如何处理每个consumer拿到消息后的更新逻辑?
4.以上每一步对于资源的占用如何评估?
可操作的方案
拿到新增的topic配置,可以有两种:
- 用watcher监听zk节点事件,实时拿到节点变更信息;
好处:实时创建并加入监听;
缺点:一个Kafka集群有多方使用,太多干扰信息,如何拿到我们关心的部分是个问题,所以这个方案只适用于独享Kafka集群时;
2.定时扫topic配置表,通过比对配置表和运行中的topic内容,取到配置表diff部分认为是新增的topic;
好处:实现简单,能快速拿到有效信息;
缺点:资源占用略微比上一种方案高一点;
Kafka-consumer创建,需要考虑两个因素:
- 每个topic的consumer个数与topic配置的分区最好相等;
2.每个consumer内部处理线程池大小需要与数据大小和更新频率挂钩;
每个消费者应该依赖索引更新的处理对象,将拿到的消息丢给处理对象完成索引更新逻辑,需要解决一个问题,每个消费者是独立的线程,不能用依赖注入的方式提前注入处理对象,因为这个线程对象可能还没诞生,如何在独立线程中拿到处理对象:
- 参数传递,在外层容器对象中注入处理对象,通过在创建consumer线程时传入这个对象;
2.在容器启动时生成处理对象,置于容器中,通过bean-name,在线程对象中获取到该对象;
考虑到代码解耦,这里选择了第二种方式;
资源评估,需要考虑两个因素:
- Jvm配置的堆大小和机器CPU核数;
2.单条数据量大小,每个消费线程所占用内存大小,更新频率;
由上两个配置,我们可以估算出最大并发消费能力,由此来配置消费线程池的大小,缓冲队列长度,请求拒绝策略等;
代码实战
以上把流程和考虑因素想清楚后,开始考虑代码设计和实现,分为3个类:
1.MQConsumerFactory-负责扫表拿到topic并创建消费者线程对象;
2.KafkaConsumerThread-负责接收消息并传递给处理线程对象;
3.KafkaStreamProcesser-负责数据处理和索引更新;
package com.guahao.cloud.cloudsearchplat.biz.realtime.consumer;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.shaded.com.google.common.collect.Sets;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.google.common.collect.Lists;
import com.guahao.cloud.cloudsearchplat.biz.dal.dao.SQLRealtimeDAO;
import com.guahao.cloud.cloudsearchplat.biz.util.SQLUtils;
import com.guahao.cloud.cloudsearchplat.biz.util.ThreadPoolUtil;
/**
* 消息队列消费者构建工厂
*
* 通过读数据库已配好的topic,diff当前运行中的topic,异步创建新配置的消费者
*
* @author lican
*
* @date 2019-06-03
*/
@Component
public class MQConsumerFactory {
private static final String GROUP = "cloud_search_plat_group";
@Value("${kafka.connect.host}")
private String kafkaHost;
/**
* 消费者处理线程数,略小于数据库连接池大小
*/
@Value("${kafka.consumer.thread.num}")
private Integer consumerThreadNum;
/**
* 运行中的topic
*/
private Set<String> runningTopics = Sets.newConcurrentHashSet();
private static final Logger LOG = LoggerFactory.getLogger(MQConsumerFactory.class);
/**
* 定时查找已配置的topic信息,取到新增topic,创建消费者处理实时消息
*
*/
@Scheduled(cron = "${index.update.cron}")
private void updateTopics() {
// 1.查找所有已配置的topic信息
List<SQLRealtimeDAO> realtimeSqls = SQLUtils.getMapper().selectRealtimeSqlAll();
if (CollectionUtils.isEmpty(realtimeSqls)) {
return;
}
// 2.取到新增部分topic
List<SQLRealtimeDAO> diffRealtimes = getDiffTopics(realtimeSqls);
if (CollectionUtils.isEmpty(diffRealtimes)) {
return;
}
// 3.创建消费者处理实时消息
for (SQLRealtimeDAO realtime : diffRealtimes) {
if (!validate(realtime)) {
LOG.error("RealtimeJob validate error, realtime-param:{}", realtime);
continue;
}
LOG.info("topic:{}, create consumer success, running topics :{}", realtime.getTopic(), runningTopics);
ThreadPoolUtil.getRealtimeThreadPool()
.execute(new KafkaConsumerThread(realtime, consumerThreadNum, createConsumerConfig()));
}
}
/**
* 实时配置实体校验
*
* 校验内容:主键ID,接口名,topic,实时SQL
*
* @param realtime
* @return
*/
private boolean validate(SQLRealtimeDAO realtime) {
return StringUtils.isNotEmpty(realtime.getBizId()) && StringUtils.isNotEmpty(realtime.getCat())
&& StringUtils.isNotEmpty(realtime.getTopic()) && StringUtils.isNotEmpty(realtime.getRealtimeSql());
}
/**
* 取到新增topic
*
* @return
*/
private List<SQLRealtimeDAO> getDiffTopics(List<SQLRealtimeDAO> realtimeSqls) {
List<SQLRealtimeDAO> diffRealtimeSqls = Lists.newArrayList();
for (SQLRealtimeDAO realtime : realtimeSqls) {
String topic = realtime.getTopic();
if (runningTopics.contains(topic) || StringUtils.isBlank(topic)) {
continue;
}
runningTopics.add(topic);
diffRealtimeSqls.add(realtime);
}
return diffRealtimeSqls;
}
/**
* 构造Kafka消费者配置信息
*
* @return
*/
private Properties createConsumerConfig() {
Properties prop = new Properties();
prop.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return prop;
}
/**
* 消费者shutdown后,移除运行态topic,便于下一个轮询重新加入
*
* @param topic
*/
public void removeTopic(String topic) {
synchronized (runningTopics) {
if (runningTopics.contains(topic)) {
runningTopics.remove(topic);
}
}
}
}
package com.guahao.cloud.cloudsearchplat.biz.realtime.consumer;
import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.guahao.cloud.cloudsearchplat.biz.dal.dao.SQLRealtimeDAO;
import com.guahao.cloud.cloudsearchplat.biz.util.SpringBeanUtil;
import com.guahao.cloud.cloudsearchplat.biz.util.ThreadPoolUtil;
/**
* kafka消费者线程
*
* @author lican
* @date 2019-06-03
*
*/
public class KafkaConsumerThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerThread.class);
/**
* Kafka数据消费对象
*/
private final KafkaConsumer<String, String> consumer;
/**
* Kafka Topic实体
*/
private SQLRealtimeDAO realtime;
/**
* 线程池
*/
private ThreadPoolExecutor executorPool;
MQConsumerFactory mqFactory;
/**
* 消费者线程初始化
*
* @param realtime 实时topic信息
* @param numThreads 处理数据的线程数
* @param properties consumer配置
*/
public KafkaConsumerThread(SQLRealtimeDAO realtime, int numThreads, Properties properties) {
this.consumer = new KafkaConsumer<>(properties);
this.realtime = realtime;
this.executorPool = ThreadPoolUtil.createThreadPool(numThreads);
this.mqFactory = SpringBeanUtil.getBean("MQConsumerFactory", MQConsumerFactory.class);
consumer.subscribe(Lists.newArrayList(realtime.getTopic()));
}
@Override
public void run() {
try {
while (true) {
try {
// 阻塞队列,轮询取数据
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// 消息异步处理
this.executorPool.submit(new KafkaStreamProcesser(record, realtime));
// 提交确认,offset更新,commitSync 异步提交;
consumer.commitSync();
}
} catch (Exception e) {
LOG.error("本次消息轮询失败,topic:{}", realtime, e);
continue;
}
}
} catch (Exception e) {
LOG.error("Kafka 断开连接,topic:{}", realtime, e);
} finally {
shutdown();
}
}
private void shutdown() {
// 1. 关闭和Kafka的连接
if (this.consumer != null) {
this.consumer.close();
}
mqFactory.removeTopic(realtime.getTopic());
// 2. 关闭线程池,会等待线程的执行完成
if (this.executorPool != null) {
// 2.1 关闭线程池
this.executorPool.shutdown();
// 2.2. 等待关闭完成, 等待五秒
try {
if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
}
} catch (InterruptedException e) {
LOG.error("Interrupted during shutdown, exiting uncleanly!!", e);
}
}
}
}
package com.guahao.cloud.cloudsearchplat.biz.realtime.consumer;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.guahao.cloud.cloudsearchplat.biz.dal.dao.SQLRealtimeDAO;
import com.guahao.cloud.cloudsearchplat.biz.job.IncrSQLIndexJob;
import com.guahao.cloud.cloudsearchplat.biz.model.constant.SQLConfigConstant;
import com.guahao.cloud.cloudsearchplat.biz.model.index.IndexResult;
import com.guahao.cloud.cloudsearchplat.biz.model.index.SQLIndexParam;
import com.guahao.cloud.cloudsearchplat.biz.util.LogTemplateUtils;
import com.guahao.cloud.cloudsearchplat.biz.util.SQLUtils;
import com.guahao.cloud.cloudsearchplat.biz.util.SpringBeanUtil;
/**
* kafka数据流处理器
*
* @author lican
* @date 2019-06-03
*/
public class KafkaStreamProcesser implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamProcesser.class);
private ConsumerRecord<String, String> record;
private SQLRealtimeDAO realtime;
IncrSQLIndexJob incrJob;
public KafkaStreamProcesser(ConsumerRecord<String, String> record, SQLRealtimeDAO realtime) {
this.record = record;
this.realtime = realtime;
this.incrJob = SpringBeanUtil.getBean("incrSQLIndexJob", IncrSQLIndexJob.class);
}
@Override
public void run() {
String message = record.value();
if (StringUtils.isEmpty(message)) {
return;
}
String id = getIdValue(message, realtime);
updateById(id, realtime.getCat(), record);
}
/**
* 按ID更新索引
*
* @param id
* @return
*/
private IndexResult updateById(String id, String cat, ConsumerRecord<String, String> record) {
String sqlFormat = realtime.getRealtimeSql();
String realtimeSql = sqlFormat.replace(SQLConfigConstant.DAILY_ID_FILTER, id);
List<Map<String, Object>> rows = Lists.newArrayList();
try {
rows = SQLUtils.getMapper().commonSelect(realtimeSql);
} catch (Exception e) {
LOG.error("[RealtimeSqlError]get primaryId error, message:{},realtimeSQlL:{}", JSON.toJSONString(record),
realtimeSql, e);
return IndexResult.getDefaultResult();
}
if (CollectionUtils.isEmpty(rows)) {
return IndexResult.getSuccessResult();
}
List<String> primaryIds = Lists.newArrayList();
for (Map<String, Object> row : rows) {
if (row.containsKey("id")) {
primaryIds.add(SQLUtils.getStringId(row.get("id")));
}
}
SQLIndexParam param = new SQLIndexParam();
param.setIds(primaryIds);
param.setCat(cat);
IndexResult result = IndexResult.getDefaultResult();
try {
result = incrJob.process(param);
} catch (Exception e) {
LOG.error("[updateIndexError]realtime index error, message:{},param:{}", JSON.toJSONString(record), param,
e);
return IndexResult.getDefaultResult();
}
LOG.info(LogTemplateUtils.getLogTemplate(realtime.getCat(), JSON.toJSONString(record), result,
System.currentTimeMillis() - getToKafkaTime(record.value()), "realtime", JSON.toJSONString(param)));
return result;
}
/**
* 获取消息主键ID的值
*
* @param message
* @param realtime
* @return
*/
private String getIdValue(String message, SQLRealtimeDAO realtime) {
List<String> ids = getBizIds(realtime);
Map<String, Object> dataMap = getRowMap(message);
for (String id : ids) {
if (dataMap.containsKey(id)) {
return SQLUtils.getSQLStringId(dataMap.get(id));
}
}
return StringUtils.EMPTY;
}
/**
* 获取消息到Kafka的时间戳
*
* @param message
* @return
*/
private Long getToKafkaTime(String message) {
Object toKafkaTime = getMapByJson(message).get("toKafkaTime");
if (null == toKafkaTime) {
return System.currentTimeMillis();
}
return (Long) toKafkaTime;
}
/**
* 获取数据行
*
* @param message
* @return
*/
private Map<String, Object> getRowMap(String message) {
Object data = getMapByJson(message).get("data");
if (null == data) {
return Maps.newHashMap();
}
return getMapByJson(data.toString());
}
/**
* 解析实时消息Map
*
* @param message
* @return
*/
private Map<String, Object> getMapByJson(String message) {
try {
return JSON.parseObject(message, Map.class);
} catch (Exception e) {
LOG.error("Parse message map error,message:{}", message, e);
return Maps.newHashMap();
}
}
/**
* 获取实时监控表ids
*
* 格式: DB.Table.Id,DB.Table.Id, ...
*
* @param realtime
* @return
*/
private List<String> getBizIds(SQLRealtimeDAO realtime) {
List<String> ids = Lists.newArrayList();
String[] bizIdStrs = realtime.getBizId().split(",");
for (String bizId : bizIdStrs) {
String[] idInfo = bizId.split("\\.");
if (idInfo.length < 3) {
continue;
}
String id = idInfo[2];
ids.add(id);
}
return ids;
}
}