ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Apache Airflow2.0.2 遇到的问题及解决方案

2021-05-23 11:00:10  阅读:1076  来源: 互联网

标签:airflow postgresql postgres Airflow2.0 解决方案 celery pg scheduler Apache


Airflow2.0.2 问题及解决方案

a.测试运行期间发现 airflow scheduler 由于 mysql deadlock 而挂掉的问题

_mysql_exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')

看了下若干的issue,感觉mysql对airflow scheduler的支持不够好,且考虑到后面要部署多个scheduler的情况,决定改用postgres 作为metadata backend

使用docker启动一个pg实例:

docker run -d \\
--name some-postgres \\
-e POSTGRES_PASSWORD=mysecretpassword \\
-e PGDATA=/var/lib/postgresql/data/pgdata \\
-v /opt/postgres/data:/var/lib/postgresql/data \\
-p 5432:5432 \\
postgres

使用pg的话,需要将mysql中的数据同步到pg中,google找到了 pgloader这个软件,使用docker可以直接用,示例命令如下:

docker run --rm --name pgloader dimitri/pgloader:ccl.latest pgloader pgloader mysql://root:password@localhost:3306/airflow postgresql://pgloader:password@localhost:5432/airflow

然后修改airflow.cfg中的两个关于db的参数 并安装 pg 相关的package

result_backend = db+postgresql://postgres:mysecretpassword@localhost/airflow_uat 
sql_alchemy_conn = postgresql+psycopg2://postgres:mysecretpassword@localhost/airflow_uat

pip install psycopg2-binary

再次启动,发现Scheduler 和 Worker之间没有通信:

具体体现在 pool 里面有很多 queue的task,然后在 celery flower中没看到一个任务被处理

通过查找 airflow worker 节点的日志, 发现了这个问题:

sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "task_id_sequence" does not exist

再通过Google进行搜索,发现了这个issue及comment https://github.com/celery/celery/issues/3213#issuecomment-308238718,评论中说 是Celery的版本不同导致的,需要删除掉 数据库中与 celery,需要删除celery_taskmeta celery_tasksetmeta的表,再重启 airflow celery worker即可.

这时再重启各个节点,trigger一个dag,发现scheduler已经可以正常发消息给worker了,任务正常的执行了.

然而,问题还没结束,在Home页面 明晃晃的显示了 The scheduler does not appear to be running. ...

这个是什么问题呢? Scheduler明明在工作,任务都能顺利执行

这时候,我们使用 airflow_weub_url/health 查看一下scheduler的状态,可以看到 有一个latest_scheduler_heartbeat字段,这个数据是从哪里来的呢?

我们通过查找源码,可以通过 https://github.com/apache/airflow/blob/2.0.2/airflow/www/views.py#L421 这个入口

看到是从SchedulerJob中来的,继续跟踪进去,发现是继承了其父类base_job中的方法

https://github.com/apache/airflow/blob/2.0.2/airflow/jobs/base_job.py#L109

从base_job中的代码中,可以看到起对应到数据库中的 job 表,对job_type筛选成SchedulerJob,发现出现了一些超过当前时间的数据,删除掉这些数据,然后再观察 Airflow Home Page,发现问题已经解决。

b.scheduler 时区问题

airflow有两个参数可以对时区进行设置一个是default_timezone,另一个是default_ui_timezone,两者都设置为Asia/Shanghai,发现在Airflow Web UI 上已经显示了北京时间,但是对scheduler并不起作用,需要对源码稍作修改才能使得scheduler安装 utc + 8 的时区来调度任务,参考这篇博文(Airflow 1.10 中文时区解决(排坑))进行修改即可。只要改 timezone.py 及 sqlalchemy.py 这两个文件的代码即可。后期有空可以向社区提一个pr,免得每次都要去改源码解决问题。

c.airflow psycopg2.OperationalError: FATAL: sorry, too many clients already

这个问题是 设置的 worker 的并发过大,导致需要的查pg的连接过多导致的。可以进行以下的调整:

在airflow.cfg 文件中   将 sql_alchemy_pool_size 调小 , 如 sql_alchemy_pool_size = 50   这样会让airflow 的调度性能降低,调度变慢

或者在 pg 的  postgresql.conf 文件中把 max_connections 调大,如  max_connections = 1000,这样可以使得 airflow 调度的能力加强,但是pg的资源消耗变得更多

-- 可以通过下面的命令查看 postgresql.conf 文件的路径
SHOW config_file;

d. 在webui 上手动的clear历史的任务没用

查看日志,发现 INFO - Run scheduled___ of  some_dag_id has timed-out.  的信息

通过日志查找github issue,可以查到 https://github.com/apache/airflow/issues/14265 这个issue,2.0.2版本的dagrun_timeout参数的作用和1.10.x版本的不一样了,导致出现了这个问题。根据comment的建议,可以把dagrun_timeout这个参数设置的大一些或者去除这个参数,用execution_timeout这个参数检测每个 task的timeout代替。

欢迎加入Apache Airflow 技术交流群,共同玩转Airflow.

标签:airflow,postgresql,postgres,Airflow2.0,解决方案,celery,pg,scheduler,Apache
来源: https://blog.csdn.net/Young2018/article/details/117189604

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有