猿问

带有a的气流Python运算符。返回类型

我的 DAG 中有一个 python 运算符。python 可调用函数返回一个 bool 值。但是,当我运行 DAG 时,出现以下错误。


类型错误:“bool”对象不可调用


我修改了函数以不返回任何内容,但又一次出现以下错误


错误 - 'NoneType' 对象不可调用


下面是我的狗


def check_poke(threshold,sleep_interval):

flag=snowflake_poke(1000,10).poke()

#print(flag)

return flag


dependency = PythonOperator(

task_id='poke_check',

#python_callable=check_poke(129600,600),

provide_context=True,

python_callable=check_poke(129600,600),

dag=dag)


end = BatchEndOperator(

queue=QUEUE,

dag=dag)


start.set_downstream(dependency)

dependency.set_downstream(end)

无法弄清楚我错过了什么。有人可以帮我解决这个问题吗……对气流相当陌生。


我在 dag 中编辑了 python 运算符,如下所示


dependency = PythonOperator(

task_id='poke_check',

provide_context=True,

python_callable=check_poke(129600,600),

dag=dag)

但是现在,我得到了一个不同的错误。


Traceback (most recent call last):

File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1245, in run

    result = task_copy.execute(context=context)

File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 66, in execute

    return_value = self.python_callable(*self.op_args, **self.op_kwargs)

TypeError: () takes no arguments (25 given)

[2019-02-15 05:30:25,375] {models.py:1298} INFO - Marking task as UP_FOR_RETRY

[2019-02-15 05:30:25,393] {models.py:1327} ERROR - () takes no arguments (25 given)


蝴蝶刀刀
浏览 170回答 3
3回答

慕标5832272

参数名称给出了它。您正在传递调用的结果而不是可调用的。python_callable=check_poke(129600,600)第二个错误指出 callable 是用 25 个参数调用的。所以是lambda:行不通的。以下方法可行,但忽略 25 个参数确实值得怀疑。python_callable=lambda *args, **kwargs: check_poke(129600,600)

达令说

代码需要一个可调用的,而不是结果(正如已经指出的那样)。您可以使用functools.Partial来填写参数:from functools import partialdef check_poke(threshold,sleep_interval):   flag=snowflake_poke(1000,10).poke()   return flagfunc = partial(check_poke, 129600, 600)dependency = PythonOperator(    task_id='poke_check',    provide_context=True,    python_callable=func,    dag=dag)

白猪掌柜的

同意@Dan D.的问题;但令人困惑的是为什么他的解决方案不起作用(它在python shell 中肯定有效)看看这是否会给您带来任何运气(它只是@Dan D.解决方案的详细变体)from typing import Callable# your original check_poke functiondef check_poke(arg_1: int, arg_2: int) -> bool:&nbsp; &nbsp; # do something&nbsp; &nbsp; # somehow returns a bool&nbsp; &nbsp; return arg_1 < arg_2# a function that returns a callable, that in turn invokes check_poke# with the supplied paramsdef check_poke_wrapper_creator(arg_1: int, arg_2: int) -> Callable[[], bool]:&nbsp; &nbsp; def check_poke_wrapper() -> bool:&nbsp; &nbsp; &nbsp; &nbsp; return check_poke(arg_1=arg_1, arg_2=arg_2)&nbsp; &nbsp; return check_poke_wrapper..# usagepython_callable=check_poke_wrapper_creator(129600, 600)
随时随地看视频慕课网APP

相关分类

Python
我要回答