百万级高可用调度系统实战

内容纲要

1. 索引调度系统介绍

索引调度系统是基于airflow 开发的定时调度系统,对airflow进行封装,简化任务的配置及管理,调度系统的分布式功能使用celery。搜索的定时任务接近400个,分钟级的任务接近300个,预计后期会达到日调用接近100w次。因此调度系统如何保证高可用及后期的水平扩展至关重要。本文简要介绍下调度系统高可用的实现。

1.1 当前cron表达式的缺点

当前的定时任务配置在Linux Crontab 定时任务,主要存在以下缺点:

  1. 配置容易出错,不好管理
  2. 日志查看困难
  3. 水平扩展较难实现
  4. 大索引的分布式调度很难实现
  5. 任务失败告警定制比较复杂
  6. 增量任务的失败补偿机制缺失
  7. 。。。

1.2 调度系统主要解决的问题

调度系统能解决的问题就是同当前配置的cron表达式进行对比,能够解决的问题如下:

问题项
任务级别的失败重试,执行超时机制,单任务最大并发限制
支持手动触发任务
任务关闭
关闭一段时间后任务开启的执行逻辑
多台机器上任务的并行调度
增量任务的失败补偿机制
一个大任务里的多个子任务的间隔串行执行
支持灵活的任务分片规则
任务报警机制更加灵活
执行log页面化查看,执行结果可视化图表化

1.3 调度系统的架构

1.3.1 系统组件图

image

1.3.2 系统结构图

image

2.调度系统的高可用

2.1 调度器(master)高可用

索引调度系统的核心使用的是开源的airflow, airflow的调度器(master)节点是中心化的节点,即一个系统中只能有一个master节点。这显然不能满足高可用的需求。因此,我们结合zk对airflow的jobs.py进行改造,做到去中心化。

具体实现如下:

  1. 在调度器启动时,注册机器名及心跳时间到zk节点上,如下图所示:
    image

  2. 修改airflow 的jobs.py中的create_dag_run方法

    1. 调度器调度任务时先从zk上获取可用的master节点(‘可用’ 即 当前时间 - 心跳时间 < 30)
    2. 把可用的master节点列表放到一个数据中,如:master_nodes = ['lx38', '1x47'];
    3. 对master_nodes列表进行排序,保证每个master看到的列表的完全相等
    4. master调度任务时;依据任务名计算hash值; task_hash_code = hash(dag.dag_id)
    5. 计算任务是否可用被当前master调度器调度 index = tash_hash_code % len(master_codes)
    6. 如果os.uname()[1] == master_nodes[index] 则进行调度,否则则直接返回,不进行任务的调度

具体代码实现如下:

@provide_session
def create_dag_run(self, dag, session=None):

    self.zk_client.heart_beat()
    hash_code = hash(dag.dag_id)
    master_node = self.zk_client.get_master_node()
    index = hash_code % len(master_node)
    if len(master_node) > index:
        print(index, dag.dag_id, master_node[index])
        if master_node[index] != host_info.host_name():
            return
    if dag.schedule_interval and conf.getboolean('scheduler', 'USE_JOB_SCHEDULE'):
        active_runs = DagRun.find(
            dag_id=dag.dag_id,
            state=State.RUNNING,
            external_trigger=False,
            session=session
        )

2.2 worker高可用

worker节点的高可用依据队列进行实现。如 我们在lx48 启动worker节点时指定消息队列名为bds, 在lx40上启动worker节点时指定消息队列名也是bds。那么lx48和lx40会消费bds队列中的任务,并且保证不会被重复消费。如果lx48机器挂掉或者不可用,那么由1x40消费所有bds队列中的任务,这样不会影响业务。

2.3 消息队列高可用

airflow的消息队列比较灵活,目前常用的是rabitmq及redis。airflow使用celery作为分布式任务调度,celery对于消息队列的支持使用的是AMQP协议,即任何中间件值用实现标准的AMQP协议,那么都可以作为airflow的消息队列进行使用。

消息队列我们使用的是redis。redis的集群模式使用的哨兵模式,一个master节点,两个slave节点。滨安机房一套,兴义机房一套。完全独立,实现双机房双活,由于redis是dba进行维护,对于消息队列的高可用不做高多的介绍。

2.4 数据库高可用

数据库我们使用的是mysql, 主要存放任务执行数据。数据库使用的是主从模式,通过VIP提供统一出口,由keepalived 保持主从之间的存活。由dba进行维护,不做详细的介绍

2.5 zk高可用

zk主要用来存放worker的心跳数据,master心跳数据,dag 任务元数据,master高可用节点信息。zk我们团队进行维护,搭建了滨安一套,兴义一套。主要是做双机房双活是使用。每个机房zk三个物理节点,在高可用上支持挂掉一个节点,后期为了更高的稳定性,准备新增zk物理节点为5个。

2.6 告警

告警主要有两类:分系统告警及业务告警。告警形式主要有短信告警,钉钉告警,邮件告警,后期也支持微信告警

系统告警:

告警项
磁盘io
cpu
网络
jvm
中间件:redis,zk, mysql 监控

业务告警

告警项
任务失败重试邮件告警
任务失败短信告警
同一个任务名称累计超过5个在running状态则短信告警
一个任务执行时长超过配置的任务间隔时长则钉钉告警
一个任务执行时长超过配置的任务间隔10倍则短信告警

2.7 master及worker自动拉起

自动拉起顾名思义就是当master节点或者worker节点挂掉或者假死时不需要人工干预,能够自动重启,并且不影响调度功能。

  1. 我们的调度系统目前能够做到在master节点或者worker节点挂掉时能在10s能自动拉起;
  2. 在master节点或者worker节点假死时能够在35s内自动重启

2.7.1 master自动拉起实现:

  1. 心跳逻辑实现

修改airflow 的jobs.py的创建任务的逻辑 create_dag_run,这个方法被调度每隔2s调用一次,检测是否有符合条件的任务,进行调度。因此我们可以在这个方法中做调度器的心跳实现。具体代码如下:

@provide_session
def create_dag_run(self, dag, session=None):
    """
    This method checks whether a new DagRun needs to be created
    for a DAG based on scheduling interval.
    Returns DagRun if one is scheduled. Otherwise returns None.
    """
    # 做任务心跳逻辑;每隔2s往zk节点更新信息
    self.zk_client.heart_beat()
    ...

heart_beat()的代码如下:

def heart_beat(self):
    date_str = str(datetime.now())
    try:
        # 心跳内容存放当前时间
        self.zk.set(self.path, bytes(date_str))
    except Exception as e:
        self.logger.error(e)
    finally:
        # 写一份到文件
        file_util.save_to_file(common_config.scheduler_heart_file, date_str.split('.')[0])

心跳在zk的存储如下图:
image

  1. master假死或down掉自动拉起

    在调度系统启动一个进程,每隔15s检测一次master节点的心跳时间及master进程是否存在
    如果 当前时间-心跳时间 > 35; 则kill master进程;然后重启
    如果master进程不存在,则重启
  2. 第二点中的进程检测在索引调度系统中,如何保证索引调度系统不down掉及假死

    在linux中配置每隔1分钟的cron表达式
    */1 * * * * cd /home/qa/airflow/wyscheduler/alarm && ./heat_beat_cron.py scheduler >> /tmp/zhangxjairflow_worker.log 2>&1 &

2.7.2 worker节点自动拉起

心跳逻辑实现

在索引调度系统给每个worker节点配置一个心跳任务;master每隔1分钟调度心跳任务在每个worker节点执行,worker节点执行成功更新心跳时间,心跳时间的存储在zk节点中。

worker节点的心跳任务如下图:
image

点击节点名,可以看到具体的心跳任务,如下图:
image

zk节点的心跳存储如下图:
image

  1. worker假死或down掉自动拉起

    在调度系统启动一个进程,每隔15s检测一次worker节点的心跳时间及worker进程是否存在
    如果 当前时间-心跳时间 > 75; 则kill worker进程;然后重启
    如果worker进程不存在,则重启
  2. 第二点中的进程检测在索引调度系统中,如何保证索引调度系统不down掉及假死

    在linux中配置每隔1分钟的cron表达式
    */1 * * * * cd /home/qa/airflow/wyscheduler/alarm && ./heat_beat_cron.py worker >> /tmp/zhangxjairflow_worker.log 2>&1 &

    2.8 任务失败重试

调度系统中的每个任务在失败时,可以进行重试,重试的次数支持自定义;

重试时支持延时多少秒进行进行重试,延时时间支持自定义。

2.9 僵尸任务处理

当worker节点被kill -9 或者物理机突然断电或者挂掉时,可能出现正在执行的任务变成僵尸任务一直存在。airflow 对于正在执行的任务,每隔10s更新一次任务的心跳时间(对应mysql 是更新job表中的latest_heartbeat)。

我们在调度系统启动一个每隔15s的定时任务,
定时检查running状态任务的latest_heartbeat,如果当前时间 - latest_heartbeat > 30;
则清除dag_run及task_instance中的记录,调度器会重新调度此任务执行。

3. 未来展望

未来我主要从以下几个方面对系统进行改进:

  1. 对airflow的使用的celery进行改造,使worker消费任务时能够根据机器的负载,内存,磁盘,网络等进行动态消费,避免均匀消费时对机器的压力,使用的机器资源的利用最大化。
  2. 对airflow的调度器进行改造,使得对秒级任务的支持更加友好。

发表评论

邮箱地址不会被公开。 必填项已用*标注