airflow已经成为了任务编排系统的事实标准,使用和terraform一样的代码及配置的任务开发方式。
airflow使用python作为开发语言,非常简单易学、容易上手。
完整案例代码已上传github:https://github.com/neatlife/myairflow
获取airflow实例
可以使用docker一键启动
git clone https://github.com/puckel/docker-airflow
cd docker-airflow
docker-compose -f docker-compose-LocalExecutor.yml up -d
访问ip:8080查看效果
可以看到airflow已经可用了
这个dag文件就是用来定义任务和任务之间的先后、依赖关系的。
参考:https://airflow.apache.org/tutorial.html#default-arguments
其中几个比较重要的参数如下:
代码实现如下 vim mydag.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"start_date": datetime(2019, 7, 10),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG("mydag", default_args=default_args, schedule_interval=timedelta(1))
上面的”schedule_interval=timedelta(1)”代表一天触发一次这个任务,也可以使用crontab的语法,参考:https://airflow.apache.org/scheduler.html#dag-runs
这个新的dag文件需要使用python执行这个脚本导入,可以使用下面的命令进行导入
docker-compose -f docker-compose-LocalExecutor.yml exec webserver bash
python dags/mydag.py
刷新这个web界面,就可以看到这个新加的mydag任务了
这个airflow的任务加载比较慢,如果显示状态和上面不同需要多等待一会儿
这里定义三个任务,分别echo出1, 2, 3,关系如下
代码实现如下
t1 = BashOperator(task_id="echo1", bash_command="echo 1", dag=dag)
t2 = BashOperator(task_id="echo2", bash_command="echo 2", dag=dag)
t3 = BashOperator(task_id="echo3", bash_command="echo 3", dag=dag)
上面使任务使用airflow执行的bash,airflow还可以执行很多输入,完整列表参考:https://airflow.apache.org/_api/airflow/operators/index.html#submodules
t2.set_upstream(t1)
t3.set_upstream(t1)
首次修改这个dag文件,airflow回自动加载,点击”Refresh”按钮可以手动加载这个配置文件
操作效果如下
可以看到这个echo1, echo2, echo3的依赖关系定义成功了
先启用这个任务,然后点击执行按钮,操作效果如下
也可以只启用这个任务,然后等这个airflow按照设定的时间,自动触发,手动触发可以快速验证
可以看到任务已经开始执行了
这个任务的执行状态用不同的颜色进行表示
可以看到所有的任务都已经成功执行完成了
单击 “Browser” > “Task Instance”可以查看所有被执行的任务列表
单击具体的task id就可以进入到task执行情况详情页面了,单击详情页的”Log”就可以看到任务的日志输出了,操作效果如下:
如果使用了这个airflow的模板功能,可以在任务执行详情页面,查看这个被执行任务的模板渲染结果,操作效果如下:
需要先删除dag定义文件,比如 rm dags/mydag.py,然后在web界面上删除
这个airflow依赖非常的多,因为没到1.0版本,存在一些bug,所以建议使用docker启动,主要的dag任务编排功能使用是ok的,其它功能可以先了解下
airflow可以使用模板功能生成脚本,参考:https://airflow.apache.org/tutorial.html#templating-with-jinja
jinjia模板语法参考:http://jinja.pocoo.org/docs/dev/
airflow默认以utc时区运行,如果需要计算正确的时间,需要把时间进行时区转换,核心代码如下
#将本地时间转换为utc时间,再设置为start_date
tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)