尝试查询 mssql 数据库时出现气流 Fernet_Key 问题

我对 Airflow 很陌生。我已多次通读文档,阅读了大量 S/O 问题和许多在线随机文章,但尚未解决此问题。我有一种感觉,我做错了一些非常简单的事情。我有适用于 Windows 的 Docker,我拉取了puckel/docker-airflow映像并运行了一个暴露端口的容器,这样我就可以从我的主机访问 UI。我有另一个容器在运行mcr.microsoft.com/mssql/server,我在其中恢复了 WideWorldImporters 示例数据库。从 Airflow UI,我已经能够成功地创建到这个数据库的连接,甚至可以从数据分析部分查询它。

http://img3.mukewang.com/619371540001c91c22710227.jpg

http://img3.mukewang.com/6193715c0001f80b11210495.jpg

因此,虽然这有效,但我的 dag 在第二个任务中失败了sqlData。这是代码:


from airflow.models import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.python_operator import PythonOperator

from airflow.operators.mssql_operator import MsSqlOperator

from datetime import timedelta, datetime


copyData = DAG(

    dag_id='copyData',

    schedule_interval='@once',

    start_date=datetime(2019,1,1)

)



printHelloBash = BashOperator(

    task_id = "print_hello_Bash",

    bash_command = 'echo "Lets copy some data"',

    dag = copyData

)


mssqlConnection = "WWI"

sqlData = MsSqlOperator(sql="select top 100 InvoiceDate, TotalDryItems from sales.invoices",

                       task_id="select_some_data",

                       mssql_conn_id=mssqlConnection,

                       database="WideWorldImporters",

                       dag = copyData,

                       depends_on_past=True

          )


queryDataSuccess = BashOperator(

    task_id = "confirm_data_queried",

    bash_command = 'echo "We queried data!"',

    dag = copyData

)

最初的错误是:


*[2019-02-22 16:13:09,176] {{logging_mixin.py:95}} INFO - [2019-02-22 16:13:09,176] {{base_hook.py:83}} INFO - Using connection to: 172.17.0.3  

[2019-02-22 16:13:09,186] {{models.py:1760}} ERROR - Could not create Fernet object: Incorrect padding  

Traceback (most recent call last):  

  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 171, in get_fernet

    _fernet = Fernet(fernet_key.encode('utf-8'))  


我注意到这与密码学有关,我继续运行pip install cryptographyand pip install airflow[crytpo],两者都返回完全相同的结果,通知我要求已经得到满足。最后,我发现我只需要生成一个 fernet_key。我的airflow.cfg 文件中的默认键是fernet_key = $FERNET_KEY.


慕神8447489
浏览 178回答 1
1回答

慕容3067478

我终于让我的 DAG 工作了。他建议我尝试使用 docker-compose,它也在 puckle/docker-airflow github repo 中列出。不过,我最终使用了 docker-compose-LocalExecutor.yml 文件而不是 Celery Executor。我还需要进行一些小的故障排除和更多的配置。首先,我使用了包含示例数据库的现有 MSSQL 容器,并使用docker commit mssql_container_name. 我这样做的唯一原因是为了节省必须恢复备份样本数据库的时间;如果需要,您可以随时将备份复制到容器中并在以后恢复它们。然后我将我的新图像添加到现有的 docker-compose-LocalExecutor.yml 文件中,如下所示:version: '2.1'services:    postgres:        image: postgres:9.6        environment:            - POSTGRES_USER=airflow            - POSTGRES_PASSWORD=airflow            - POSTGRES_DB=airflow    mssql:        image: dw:latest        ports:            - "1433:1433"    webserver:        image: puckel/docker-airflow:1.10.2        restart: always        depends_on:            - postgres            - mssql        environment:            - LOAD_EX=n            - EXECUTOR=Local        #volumes:            #- ./dags:/usr/local/airflow/dags            # Uncomment to include custom plugins            # - ./plugins:/usr/local/airflow/plugins        ports:            - "8080:8080"        command: webserver        healthcheck:            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]            interval: 30s            timeout: 30s            retries: 3请注意,dw是我命名的基于 mssql 容器的新映像。接下来,我将文件重命名为docker-compose.yml以便我可以轻松运行docker-compose up(不确定是否有直接指向不同 YAML 文件的命令)。一切都启动并运行后,我导航到 Airflow UI 并配置了我的连接。注意:由于您使用的是 docker-compose,因此您不需要知道其他容器的 IP 地址,因为它们使用了我在 此处发现的 DNS 服务发现。然后为了测试连接,我转到数据分析进行临时查询,但连接不存在。这是因为 puckle/docker-airflow 图像没有pymssql安装。所以只需 bash 进入容器docker exec -it airflow_webserver_container bash并安装它pip install pymssql --user。退出容器并使用docker-compose restart. 一分钟后,一切正常。我的连接出现在 Ad hoc Query 中,我可以成功选择数据。最后,我打开了我的 DAG,调度程序把它捡起来,一切都成功了!花了数周的谷歌搜索后,超级放心。感谢@y2k-shubham 的帮助,并对@Tomasz 给予了极大的感谢,在他在 r/datascience subreddit 上发表了关于 Airflow 的精彩而详尽的帖子后,我实际上联系了他。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python