气流:如何确保 DAG 每 5 分钟运行一次?

我正在探索Apache Airflow。我正在使用一种在 MySQL 中插入记录的方法。

我已经安排DAG在每 5 分钟后运行一次,但它似乎没有发生,因为 MYSQL 时间戳告诉 MySQL 任务在 5 分钟内被执行多次。

http://img3.mukewang.com/60d18ed40001d33303350266.jpg

如您所见,它正在几分钟内插入记录。下面是我的代码:


import datetime as dt


from airflow import DAG

from airflow.hooks.mysql_hook import MySqlHook

from airflow.operators.bash_operator import BashOperator

from airflow.operators.python_operator import PythonOperator


def fetch_data_mysql():

    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')

    sql = 'SELECT * from random_table'

    sql = "INSERT INTO random_table(text) VALUES ('Hi Adnan')"

    print('INSERT MYSQL RESULT')

    # results = mysql_hook.get_records(sql)

    # results = mysql_hook.run(sql, autocommit=True, parameters=('Hi Addu',))

    mysql_hook.run(sql, autocommit=True)


def print_world():

    print('world')

    return 'WORLD IN SEPTEMBER'



default_args = {

    'owner': 'me',

    'start_date': dt.datetime(2018, 9, 11),

    'retries': 1,

    'retry_delay': dt.timedelta(minutes=2),

}


with DAG('airflow_tutorial_v01',

         default_args=default_args,

         schedule_interval='0/5 * * * *',

         ) as dag:

    print_hello = BashOperator(task_id='print_hello',

                               bash_command='echo "hello"')

    sleep = BashOperator(task_id='sleep',

                         bash_command='sleep 5')

    print_world = PythonOperator(task_id='print_world',

                                 python_callable=print_world)

    mysql_task = PythonOperator(task_id='mysql_tut', python_callable=fetch_data_mysql)


print_hello >> sleep >> print_world >> mysql_task

我正在使用v1.10.0.


日志链接在这里:- https://www.dropbox.com/s/f0g64mhi8sgzlvw/my_simple_dag.py.log?dl=0


潇潇雨雨
浏览 201回答 2
2回答

MYYA

你在回填。如果您检查日志,其执行日期为2018-09-20 00:15:00+00:00、2018-09-20 00:20:00+00:00、2018-09-20 00:25:00+00:00等。将以下内容添加到您的default_args:'catchup_by_default': False你default_args应该看起来像:default_args = {    'owner': 'me',    'start_date': dt.datetime(2018, 9, 11),    'retries': 1,    'retry_delay': dt.timedelta(minutes=2),    'catchup_by_default': False,}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python