Kafka自动注入消费者实战

内容纲要

背景

后台通过通用化配置,创建了Kafka-topic以及canal端实时数据采集,已经能通过canal向Kafka推送binlog消息;现在作为binlog消息消费方的下游,需要自动创建Kafka-consumer并动态注入到容器中,实现消息监听,并处理数据消费逻辑,最后更新到solr索引中;

考虑的因素

  1. 如何拿到新增的topic配置?
    2.以什么方式创建kafka-consumer并投入使用?
    3.如何处理每个consumer拿到消息后的更新逻辑?
    4.以上每一步对于资源的占用如何评估?

可操作的方案

拿到新增的topic配置,可以有两种:

  1. 用watcher监听zk节点事件,实时拿到节点变更信息;
    好处:实时创建并加入监听;
    缺点:一个Kafka集群有多方使用,太多干扰信息,如何拿到我们关心的部分是个问题,所以这个方案只适用于独享Kafka集群时;
    2.定时扫topic配置表,通过比对配置表和运行中的topic内容,取到配置表diff部分认为是新增的topic;
    好处:实现简单,能快速拿到有效信息;
    缺点:资源占用略微比上一种方案高一点;

Kafka-consumer创建,需要考虑两个因素:

  1. 每个topic的consumer个数与topic配置的分区最好相等;
    2.每个consumer内部处理线程池大小需要与数据大小和更新频率挂钩;

每个消费者应该依赖索引更新的处理对象,将拿到的消息丢给处理对象完成索引更新逻辑,需要解决一个问题,每个消费者是独立的线程,不能用依赖注入的方式提前注入处理对象,因为这个线程对象可能还没诞生,如何在独立线程中拿到处理对象:

  1. 参数传递,在外层容器对象中注入处理对象,通过在创建consumer线程时传入这个对象;
    2.在容器启动时生成处理对象,置于容器中,通过bean-name,在线程对象中获取到该对象;
    考虑到代码解耦,这里选择了第二种方式;

资源评估,需要考虑两个因素:

  1. Jvm配置的堆大小和机器CPU核数;
    2.单条数据量大小,每个消费线程所占用内存大小,更新频率;
    由上两个配置,我们可以估算出最大并发消费能力,由此来配置消费线程池的大小,缓冲队列长度,请求拒绝策略等;

代码实战

以上把流程和考虑因素想清楚后,开始考虑代码设计和实现,分为3个类:
1.MQConsumerFactory-负责扫表拿到topic并创建消费者线程对象;
2.KafkaConsumerThread-负责接收消息并传递给处理线程对象;
3.KafkaStreamProcesser-负责数据处理和索引更新;

package com.guahao.cloud.cloudsearchplat.biz.realtime.consumer;

import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.shaded.com.google.common.collect.Sets;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.google.common.collect.Lists;
import com.guahao.cloud.cloudsearchplat.biz.dal.dao.SQLRealtimeDAO;
import com.guahao.cloud.cloudsearchplat.biz.util.SQLUtils;
import com.guahao.cloud.cloudsearchplat.biz.util.ThreadPoolUtil;

/**
 * 消息队列消费者构建工厂
 * 
 * 通过读数据库已配好的topic,diff当前运行中的topic,异步创建新配置的消费者
 * 
 * @author lican
 *
 * @date 2019-06-03
 */
@Component
public class MQConsumerFactory {

    private static final String GROUP = "cloud_search_plat_group";

    @Value("${kafka.connect.host}")
    private String kafkaHost;
    /**
     * 消费者处理线程数,略小于数据库连接池大小
     */
    @Value("${kafka.consumer.thread.num}")
    private Integer consumerThreadNum;

    /**
     * 运行中的topic
     */
    private Set<String> runningTopics = Sets.newConcurrentHashSet();

    private static final Logger LOG = LoggerFactory.getLogger(MQConsumerFactory.class);

    /**
     * 定时查找已配置的topic信息,取到新增topic,创建消费者处理实时消息
     * 
     */
    @Scheduled(cron = "${index.update.cron}")
    private void updateTopics() {
        // 1.查找所有已配置的topic信息
        List<SQLRealtimeDAO> realtimeSqls = SQLUtils.getMapper().selectRealtimeSqlAll();
        if (CollectionUtils.isEmpty(realtimeSqls)) {
            return;
        }
        // 2.取到新增部分topic
        List<SQLRealtimeDAO> diffRealtimes = getDiffTopics(realtimeSqls);
        if (CollectionUtils.isEmpty(diffRealtimes)) {
            return;
        }
        // 3.创建消费者处理实时消息
        for (SQLRealtimeDAO realtime : diffRealtimes) {
            if (!validate(realtime)) {
                LOG.error("RealtimeJob validate error, realtime-param:{}", realtime);
                continue;
            }
            LOG.info("topic:{}, create consumer success, running topics :{}", realtime.getTopic(), runningTopics);
            ThreadPoolUtil.getRealtimeThreadPool()
                    .execute(new KafkaConsumerThread(realtime, consumerThreadNum, createConsumerConfig()));
        }

    }

    /**
     * 实时配置实体校验
     * 
     * 校验内容:主键ID,接口名,topic,实时SQL
     * 
     * @param realtime
     * @return
     */
    private boolean validate(SQLRealtimeDAO realtime) {
        return StringUtils.isNotEmpty(realtime.getBizId()) && StringUtils.isNotEmpty(realtime.getCat())
                && StringUtils.isNotEmpty(realtime.getTopic()) && StringUtils.isNotEmpty(realtime.getRealtimeSql());
    }

    /**
     * 取到新增topic
     * 
     * @return
     */
    private List<SQLRealtimeDAO> getDiffTopics(List<SQLRealtimeDAO> realtimeSqls) {
        List<SQLRealtimeDAO> diffRealtimeSqls = Lists.newArrayList();
        for (SQLRealtimeDAO realtime : realtimeSqls) {
            String topic = realtime.getTopic();
            if (runningTopics.contains(topic) || StringUtils.isBlank(topic)) {
                continue;
            }
            runningTopics.add(topic);
            diffRealtimeSqls.add(realtime);
        }
        return diffRealtimeSqls;
    }

    /**
     * 构造Kafka消费者配置信息
     * 
     * @return
     */
    private Properties createConsumerConfig() {
        Properties prop = new Properties();
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return prop;
    }

    /**
     * 消费者shutdown后,移除运行态topic,便于下一个轮询重新加入
     * 
     * @param topic
     */
    public void removeTopic(String topic) {
        synchronized (runningTopics) {
            if (runningTopics.contains(topic)) {
                runningTopics.remove(topic);
            }
        }
    }
}
package com.guahao.cloud.cloudsearchplat.biz.realtime.consumer;

import java.util.Properties;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.guahao.cloud.cloudsearchplat.biz.dal.dao.SQLRealtimeDAO;
import com.guahao.cloud.cloudsearchplat.biz.util.SpringBeanUtil;
import com.guahao.cloud.cloudsearchplat.biz.util.ThreadPoolUtil;

/**
 * kafka消费者线程
 * 
 * @author lican
 * @date 2019-06-03
 *
 */
public class KafkaConsumerThread implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerThread.class);
    /**
     * Kafka数据消费对象
     */
    private final KafkaConsumer<String, String> consumer;

    /**
     * Kafka Topic实体
     */
    private SQLRealtimeDAO realtime;

    /**
     * 线程池
     */
    private ThreadPoolExecutor executorPool;

    MQConsumerFactory mqFactory;

    /**
     * 消费者线程初始化
     * 
     * @param realtime   实时topic信息
     * @param numThreads 处理数据的线程数
     * @param properties consumer配置
     */
    public KafkaConsumerThread(SQLRealtimeDAO realtime, int numThreads, Properties properties) {
        this.consumer = new KafkaConsumer<>(properties);
        this.realtime = realtime;
        this.executorPool = ThreadPoolUtil.createThreadPool(numThreads);
        this.mqFactory = SpringBeanUtil.getBean("MQConsumerFactory", MQConsumerFactory.class);
        consumer.subscribe(Lists.newArrayList(realtime.getTopic()));
    }

    @Override
    public void run() {
        try {
            while (true) {
                try {
                    // 阻塞队列,轮询取数据
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        // 消息异步处理
                        this.executorPool.submit(new KafkaStreamProcesser(record, realtime));
                        // 提交确认,offset更新,commitSync 异步提交;
                        consumer.commitSync();
                    }
                } catch (Exception e) {
                    LOG.error("本次消息轮询失败,topic:{}", realtime, e);
                    continue;
                }
            }
        } catch (Exception e) {
            LOG.error("Kafka 断开连接,topic:{}", realtime, e);
        } finally {
            shutdown();
        }
    }

    private void shutdown() {
        // 1. 关闭和Kafka的连接
        if (this.consumer != null) {
            this.consumer.close();
        }
        mqFactory.removeTopic(realtime.getTopic());
        // 2. 关闭线程池,会等待线程的执行完成
        if (this.executorPool != null) {
            // 2.1 关闭线程池
            this.executorPool.shutdown();

            // 2.2. 等待关闭完成, 等待五秒
            try {
                if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
                    LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
                }
            } catch (InterruptedException e) {
                LOG.error("Interrupted during shutdown, exiting uncleanly!!", e);
            }
        }

    }

}
package com.guahao.cloud.cloudsearchplat.biz.realtime.consumer;

import java.util.List;
import java.util.Map;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.guahao.cloud.cloudsearchplat.biz.dal.dao.SQLRealtimeDAO;
import com.guahao.cloud.cloudsearchplat.biz.job.IncrSQLIndexJob;
import com.guahao.cloud.cloudsearchplat.biz.model.constant.SQLConfigConstant;
import com.guahao.cloud.cloudsearchplat.biz.model.index.IndexResult;
import com.guahao.cloud.cloudsearchplat.biz.model.index.SQLIndexParam;
import com.guahao.cloud.cloudsearchplat.biz.util.LogTemplateUtils;
import com.guahao.cloud.cloudsearchplat.biz.util.SQLUtils;
import com.guahao.cloud.cloudsearchplat.biz.util.SpringBeanUtil;

/**
 * kafka数据流处理器
 * 
 * @author lican
 * @date 2019-06-03
 */
public class KafkaStreamProcesser implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamProcesser.class);

    private ConsumerRecord<String, String> record;

    private SQLRealtimeDAO realtime;

    IncrSQLIndexJob incrJob;

    public KafkaStreamProcesser(ConsumerRecord<String, String> record, SQLRealtimeDAO realtime) {
        this.record = record;
        this.realtime = realtime;
        this.incrJob = SpringBeanUtil.getBean("incrSQLIndexJob", IncrSQLIndexJob.class);
    }

    @Override
    public void run() {
        String message = record.value();
        if (StringUtils.isEmpty(message)) {
            return;
        }
        String id = getIdValue(message, realtime);
        updateById(id, realtime.getCat(), record);
    }

    /**
     * 按ID更新索引
     * 
     * @param id
     * @return
     */
    private IndexResult updateById(String id, String cat, ConsumerRecord<String, String> record) {
        String sqlFormat = realtime.getRealtimeSql();
        String realtimeSql = sqlFormat.replace(SQLConfigConstant.DAILY_ID_FILTER, id);
        List<Map<String, Object>> rows = Lists.newArrayList();
        try {
            rows = SQLUtils.getMapper().commonSelect(realtimeSql);
        } catch (Exception e) {
            LOG.error("[RealtimeSqlError]get primaryId error, message:{},realtimeSQlL:{}", JSON.toJSONString(record),
                    realtimeSql, e);
            return IndexResult.getDefaultResult();
        }
        if (CollectionUtils.isEmpty(rows)) {
            return IndexResult.getSuccessResult();
        }
        List<String> primaryIds = Lists.newArrayList();
        for (Map<String, Object> row : rows) {
            if (row.containsKey("id")) {
                primaryIds.add(SQLUtils.getStringId(row.get("id")));
            }
        }
        SQLIndexParam param = new SQLIndexParam();
        param.setIds(primaryIds);
        param.setCat(cat);
        IndexResult result = IndexResult.getDefaultResult();
        try {
            result = incrJob.process(param);
        } catch (Exception e) {
            LOG.error("[updateIndexError]realtime index error, message:{},param:{}", JSON.toJSONString(record), param,
                    e);
            return IndexResult.getDefaultResult();
        }
        LOG.info(LogTemplateUtils.getLogTemplate(realtime.getCat(), JSON.toJSONString(record), result,
                System.currentTimeMillis() - getToKafkaTime(record.value()), "realtime", JSON.toJSONString(param)));
        return result;

    }

    /**
     * 获取消息主键ID的值
     * 
     * @param message
     * @param realtime
     * @return
     */
    private String getIdValue(String message, SQLRealtimeDAO realtime) {
        List<String> ids = getBizIds(realtime);
        Map<String, Object> dataMap = getRowMap(message);
        for (String id : ids) {
            if (dataMap.containsKey(id)) {
                return SQLUtils.getSQLStringId(dataMap.get(id));
            }
        }
        return StringUtils.EMPTY;
    }

    /**
     * 获取消息到Kafka的时间戳
     * 
     * @param message
     * @return
     */
    private Long getToKafkaTime(String message) {
        Object toKafkaTime = getMapByJson(message).get("toKafkaTime");
        if (null == toKafkaTime) {
            return System.currentTimeMillis();
        }
        return (Long) toKafkaTime;
    }

    /**
     * 获取数据行
     * 
     * @param message
     * @return
     */
    private Map<String, Object> getRowMap(String message) {
        Object data = getMapByJson(message).get("data");
        if (null == data) {
            return Maps.newHashMap();
        }
        return getMapByJson(data.toString());
    }

    /**
     * 解析实时消息Map
     * 
     * @param message
     * @return
     */
    private Map<String, Object> getMapByJson(String message) {
        try {
            return JSON.parseObject(message, Map.class);
        } catch (Exception e) {
            LOG.error("Parse message map error,message:{}", message, e);
            return Maps.newHashMap();
        }
    }

    /**
     * 获取实时监控表ids
     * 
     * 格式: DB.Table.Id,DB.Table.Id, ...
     * 
     * @param realtime
     * @return
     */
    private List<String> getBizIds(SQLRealtimeDAO realtime) {
        List<String> ids = Lists.newArrayList();
        String[] bizIdStrs = realtime.getBizId().split(",");
        for (String bizId : bizIdStrs) {
            String[] idInfo = bizId.split("\\.");
            if (idInfo.length < 3) {
                continue;
            }
            String id = idInfo[2];
            ids.add(id);
        }
        return ids;
    }
}

发表评论

邮箱地址不会被公开。 必填项已用*标注