猿问

Google Cloud Composer DAG 没有被触发

我计划从今天 2020/08/11 开始在东部标准时间 (NY) 周二至周六凌晨 04:00 运行 DAG。编写代码并部署后,我预计 DAG 会被触发。我刷新了我的 Airflow UI 页面几次,但它仍然没有触发。我正在使用带有 python 3 的 Airflow 版本 v1.10.9-composer。


这是我的 DAG 代码:


"""

This DAG executes a retrieval job

"""


# Required packages to execute DAG


from __future__ import print_function

import pendulum

from airflow.models import DAG

from airflow.models import Variable

from datetime import datetime, timedelta

from airflow.contrib.operators.ssh_operator import SSHOperator

from airflow.operators.dummy_operator import DummyOperator

from airflow.utils.trigger_rule import TriggerRule



local_tz = pendulum.timezone("America/New_York")


# DAG parameters


default_args = {

    'owner': 'Me',

    'depends_on_past': False,

    'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),

    'dagrun_timeout': None,

    'email': Variable.get('email'),

    'email_on_failure': True,

    'email_on_retry': False,

    'provide_context': True,

    'retries': None,

    'retry_delay': timedelta(minutes=5)

}


# create DAG object with Name and default_args

with DAG(

        'retrieve_files',

        schedule_interval='0 4 * * 2-6',

        description='Retrieves files from sftp',

        max_active_runs=1,

        catchup=True,

        default_args=default_args

) as dag:

    # Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class

    start_dummy = DummyOperator(

        task_id='start',

        dag=dag

    )


    end_dummy = DummyOperator(

        task_id='end',

        trigger_rule=TriggerRule.NONE_FAILED,

        dag=dag

    )


    retrieve_file = SSHOperator(

        ssh_conn_id="my_conn",

        task_id='retrieve_file',

        command='/usr/bin/python3  /path_to_file/getFile.py',

        dag=dag)



    dag.doc_md = __doc__


    retrieve_file.doc_md = """\

    #### Task Documentation

    Connects to sftp and retrieves files.

    """


    start_dummy >> retrieve_file >> end_dummy


慕雪6442864
浏览 120回答 1
1回答

郎朗坤

参考官方文档:调度程序会在开始日期之后的一个 schedule_interval 运行您的作业。如果您的 start_date 是 2020-01-01 并且 schedule_interval 是@daily,则第一次运行将在 2020-01-02 上创建,即在您的开始日期过后。为了在每天的特定时间(包括今天)运行 DAG,start_date需要将时间设置为过去的时间,并且schedule_interval需要具有所需的时间格式cron。正确设置昨天的日期时间非常重要,否则触发器将不起作用。在这种情况下,我们应该将 设置start_date为上一周的星期二,即:(2020, 8, 4)。由于每周运行一次,因此从开始日期起应该有 1 周的间隔。让我们看一下以下示例,它展示了如何在美国东部标准时间周二至周六凌晨 04:00 运行作业:from datetime import datetime, timedeltafrom airflow import modelsimport pendulumfrom airflow.operators import bash_operatorlocal_tz = pendulum.timezone("America/New_York")default_dag_args = {    'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),    'retries': 0,}with models.DAG(        'Test',        default_args=default_dag_args,        schedule_interval='00 04 * * 2-6') as dag:       # DAG code    print_dag_run_conf = bash_operator.BashOperator(        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')我建议您查看start_date 文档的处理方式。
随时随地看视频慕课网APP

相关分类

Python
我要回答