如何在 start_date 而不是 execution_date 上触发气流调度?

来自https://airflow.apache.org/scheduler.html

请注意,如果您在一天的 schedule_interval 上运行 DAG,则标记为 2016-01-01 的运行将在 2016-01-01T23:59 之后不久触发。换句话说,作业实例在其涵盖的时间段结束后启动。

这个功能很伤人。

例如,我有每天运行的 etl 作业, schedule_interval 是0 1 * * *,因此它将在 2019-09-22 01:00:00 触发 2019-09-23 01:00:00 作业。但是我的 etl 正在处理 start_date 之前的所有数据,意味着数据范围在 (history, 2019-09-23 00:00:00) 之间,我们不能使用datetime.now(),因为这无法重现。这迫使我将 1 天添加到 execution_date:

etl_end_time = "{{ (execution_date + macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}"

但是,当我需要使用 schedule_interval 运行作业时45 2,3,4,5,6 * * *,该2019-09-22 06:45:00作业将2019-09-23 02:45:00在 execution_date (next execution time) 之后的一天运行。我不得不更改 schedule_interval45 2,3,4,5,6,7  * * *并在最后一次运行时放置一个虚拟运算符,而不是增加一天。在这种情况下,您不需要在 execution_date 中添加一天,这意味着您必须定义两个etl_end_time来表示具有不同 schedule_interval 的作业中的相同日期。

所有这些对我来说都很不舒服,是否有任何配置或内置方法可以使 execution_date 等于 start_date ?或者我必须修改气流源代码......


摇曳的蔷薇
浏览 167回答 2
2回答

蝴蝶刀刀

对于计划运行,next_execution_date将返回触发的确切时间。

芜湖不芜

我发现有一个公关 https://github.com/apache/airflow/pull/5787此更改将属性 schedule_interval_edge(包含“开始”或“结束”的字符串)引入 DAG。调度程序使用该值来确定是否应在调度间隔的开始或结束时调度 DAG。在 [scheduler] 部分的 default_airflow.cfg 中还添加了一个同名参数。我已经在这个 pr 中获取了代码。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python