在我于[Vance]担任Devops工程师期间,我们大约运行了80个ETL管道在AWS Glue上,但随着工作负载的增加,成本也随之上升,达到了每月高达10,000美元的惊人成本。这样的成本是不可持续的,所以我们必须采取措施。经过对我们管道的仔细分析,我们发现AWS Glue的无服务器特性导致了空闲时间和不必要的计算成本,这对我们来说非常昂贵。
为了解决这个问题,我将我们的ETL工作负载迁移至Apache Airflow,并在运行于ECS的EC2实例上运行,并使用Terraform进行编排操作。结果是?节省了96%的成本,费用降至每月仅400美元,性能不受影响。
虽然Airflow是一个很好的Glue替代方案,但关于如何使用Terraform正确设置Airflow配合Celery执行器的文档很少——特别是在降低成本方面。这篇博客将带你了解我们是如何做到的,我们遇到的挑战,以及你也可以这样做来降低你的AWS Glue成本。
这确实是一场混乱,感谢我的经理Rishabh Lakhotia陪我经历这一切混乱,谢谢您,先生,您确实是一位大神。
简介大家好,我叫阿卡什辛格,是一名来自班加罗尔的三年级的工程系学生和开源贡献者。
以下是我的LinkedIn链接,我的GitHub链接 和我的Twitter链接
我在网上用的昵称是SkySingh04。
星号(*)
痛的三个方面从 AWS Glue 迁移到 Apache Airflow ,需要设置三个主要组件:
- Webserver – 管理 DAG(有向无环图,Directed Acyclic Graph)和监控作业执行的用户界面。
- Scheduler – 触发 DAG 的运行并进行调度。
- Workers – 执行 DAG 中的任务。
使用 Terraform ,我们配置了 ECS 以同时运行这三个组件,并使它们互相通信。
一旦Airflow运行起来,下一步就是迁移我们的ETL工作流。我们将Glue作业转换成Airflow DAGs,然后彻底删除Glue作业,标志着我们降低AWS成本的最后一项措施。
点击图片查看
您可以使用下面的Dockerfile,并将其推送到ECR,然后并在接下来的配置中引用它。
FROM apache/airflow:latest-python3.9
USER root
RUN apt-get update && \
apt-get install -y --no-install-recommends \
git \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN mkdir -p /opt/airflow/dags /opt/airflow/logs && \
chown -R airflow:root /opt/airflow && \
chmod -R 755 /opt/airflow/logs
USER airflow
RUN pip install --no-cache-dir \
apache-airflow-providers-github \
apache-airflow-providers-amazon \
apache-airflow-providers-mysql \
apache-airflow-providers-mongo \
apache-airflow[celery,redis] \
pandas
COPY --chown=airflow:root dags/* /opt/airflow/dags/
ENV AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs \
AIRFLOW__LOGGING__WORKER_LOG_SERVER_PORT=8793 \
AIRFLOW__LOGGING__LOGGING_LEVEL=INFO \
AIRFLOW__LOGGING__LOG_FORMAT='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s' \
AIRFLOW__LOGGING__SIMPLE_LOG_FORMAT='%(asctime)s %(levelname)s - %(message)s' \
AIRFLOW__LOGGING__DAG_PROCESSOR_LOG_TARGET=file \
AIRFLOW__LOGGING__TASK_LOG_READER=task \
AIRFLOW__LOGGING__DAG_FILE_PROCESSOR_LOG_TARGET=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log \
AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
ENV AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
RUN mkdir -p /opt/airflow/logs/scheduler \
/opt/airflow/logs/web \
/opt/airflow/logs/worker \
/opt/airflow/logs/dag_processor_manager \
/opt/airflow/logs/task_logs
USER root
RUN chown -R airflow:root /opt/airflow && \
chmod -R 755 /opt/airflow
USER airflow
全屏模式 退出全屏
这个Dockerfile将用于我们所有的三个部分,并且很好地设置了日志。DAGs直接打包进Docker镜像,稍后再细说这一点。构建镜像、打标签,然后推送到ECR,然后继续下一步骤。
气流Web服务可以编写一个 Terraform 脚本来在 AWS 上使用 ECS(Elastic Container Service)的 EC2 启动类型设置 Apache Airflow。
- CloudWatch 日志服务
-
创建了一个日志组(
/ecs/airflow
),设置保留期为3天。- 安全组
- 允许ALB的80端口HTTP和443端口HTTPS的入站流量。
-
允许任意出站流量。
- 使用ACM和Route 53的TLS/SSL:
- 为 airflow.internal.example.com 提供一个使用 DNS 验证的 ACM(AWS Certificate Manager)证书。
-
配置 Route 53 DNS 记录,将 Airflow 的 URL 解析到 ALB。
- 应用负载均衡器 (ALB) :
- 创建一个用于Airflow web服务器的内部ALB,支持IPv4和IPv6(
dualstack
)。 - 配置一个HTTP监听器(端口80),将流量重定向流量到HTTPS(端口443)。
-
设置一个HTTPS监听器(端口443)来将请求转发到ECS目标组。
- Airflow Web服务器的ECS任务定义:
- 定义了一个运行在 基于 EC2 的 ECS 集群 上的 Airflow webserver 的 ECS 任务定义。
- 使用存储在 AWS ECR 中的 Docker 镜像 (
aws_ecr_repository.airflow.repository_url:latest
),更自然。 - 分配 2GB 内存(
2048MB
)。 - 将 容器端口 8080 映射到主机以供 Web 访问。
-
定义了一个 健康检查页面 (
http://localhost:8080/health
)。- Airflow 的 ECS 服务:
- 创建名为 "airflow-webserver" 的 ECS 服务,期望运行 1 个任务实例。
- 将 ECS 服务与 ALB 目标组 关联,实现与 ALB 目标组的负载均衡。
- 开启
execute-command
功能,允许通过 AWS SSM 进行调试。 -
使用 容量提供者策略 管理 ECS 资源的使用。
- DNS 配置 :
- 配置一个 Route 53 A 记录(
airflow.internal.example.com
),让它指向 ALB。
Terraform脚本在任务定义中包含有几个环境变量:
- 数据库连接配置 (
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
): 注:此配置项用于指定数据库连接信息。
- 指定 PostgreSQL 数据库连接字符串,用于 Airflow 的元数据存储。
-
使用 AWS KMS 加密的密文来 安全地保存数据库密码。
- 用户管理模块:
_AIRFLOW_WWW_USER_CREATE
: 确保创建默认的Airflow网页用户。_AIRFLOW_WWW_USER_USERNAME
: 设置用户名(默认为airflow
)。-
_AIRFLOW_WWW_USER_PASSWORD
: 通过AWS KMS安全地存储密码。- 安全与网页配置:
AIRFLOW__WEBSERVER__EXPOSE_CONFIG
: 启用通过 Web UI 暴露 Airflow 配置。-
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK
: 启用内置调度器的健康检查。- 数据库迁移与初始化:
_AIRFLOW_DB_MIGRATE
:确保 Airflow 在启动时执行必要的数据库迁移。
现在运行 terraform plan
和 terraform apply
,你应该能看到系统创建了很多资源。如果一切顺利,你应该能在你指定的网址上看到 Airflow 的界面。
The Airflow Scheduler 负责协调 DAG 的执行,并确保计划任务按时运行。可以编写一个 Terraform 脚本来将 调度器 作为一个 ECS 服务来部署,配置 CloudWatch 日志,并启用 自动扩展功能 来有效管理资源。
虽然这大部分和web服务器的类似,简单来说,我们需要添加的是:
- CloudWatch 中的调度器日志记录 (
/ecs/airflow-scheduler/
)。 - 通过 StatsD 指标监视性能 (
airflow-metrics
命名空间)。 - 在具有自动扩展功能的 ECS 集群中运行,确保资源分配的效率。
- 使用 CloudWatch 代理监控,帮助分析任务执行时间。
- 通过受限安全组防护,阻止不必要的流量。
接下来,运行 terraform plan
然后 terraform apply
,Airflow Scheduler 就会顺利搭建!🚀
Airflow 工作器服务部署为在 EC2实例上的ECS服务,并根据内存使用情况进行 自动扩展。它运行 Celery 工作器,这些工作器负责执行来自 DAG 的任务,并需要 Redis,我们接下来会设置 Redis。
这里有几个要点:
- 使用 CeleryExecutor ,意味着任务会在工人之间分发。
- 日志发送到 CloudWatch 用于监控。
- 工人数会根据 内存利用率 动态调整,范围在0到5之间。
- 每个工人都作为一个容器在ECS内运行,并通过一个目标为60%内存利用率的自动缩放策略进行管理。
- 设置了
DUMB_INIT_SETSID=0
以正确处理 Celery 关闭时的信号传递。
这一整个设置让我哭了,因为在ECS中调试自动扩展、日志管理和任务执行简直就是个噩梦。更糟糕的是,Redis还没有上线,所以痛苦还远没有结束。
Redis 和 RDS这里我们讨论两种数据库技术,Redis 和 RDS。
设置 Redis 并没有那么难,你可以使用以下的 Terraform 文件内容。
资源 "aws_elasticache_subnet_group" "airflow" {
name = "airflow-redis-subnet-group"
subnet_ids = aws_subnet.airflow[*].id
tags = 合并(
{
name = "airflow-redis-subnet-group"
},
local.common_tags
)
}
资源 "aws_security_group" "airflow_redis" {
名称前缀 = "airflow-redis"
vpc_id = data.aws_vpc.this.id
tags = 合并(
{
Name = "airflow-redis"
},
local.common_tags
)
}
资源 "aws_security_group_rule" "airflow_redis_inbound" {
类型 = "入站规则"
from_port = 6379
to_port = 6379
协议 = "tcp"
cidr_blocks = [data.aws_vpc.this.cidr_block]
security_group_id = aws_security_group.airflow_redis.id
描述 = "允许从内部网络访问 Redis"
}
资源 "aws_elasticache_cluster" "airflow" {
cluster_id = "airflow"
engine = "redis"
node_type = "cache.t4g.small"
缓存节点数量 = 1
参数组名称 = "default.redis5.0"
engine_version = "5.0.6"
port = 6379
subnet_group_name = aws_elasticache_subnet_group.airflow.name
安全组ID = [aws_security_group.airflow_redis.id]
tags = 合并(
{
name = "airflow-redis-server"
},
local.common_tags
)
}
资源 "aws_security_group_rule" "airflow_redis_outbound" {
类型 = "出站规则"
from_port = 0
to_port = 0
协议 = "-1"
cidr_blocks = ["0.0.0.0/0"]
security_group_id = aws_security_group.airflow_redis.id
描述 = "允许从任何地方访问"
}
全屏显示 退出全屏
同样,我们也为Airflow设置RDS。
# 安全组
resource "aws_security_group" "airflow_rds" {
lifecycle {
create_before_destroy = true
}
name_prefix = "airflow-rds-default-"
description = "允许 TLS 入站流量和所有出站流量"
vpc_id = data.aws_vpc.this.id
tags = {
Name = "airflow-rds-default"
}
}
resource "aws_security_group_rule" "airflow_rds_inbound" {
type = "ingress"
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = [data.aws_vpc.this.cidr_block]
security_group_id = aws_security_group.airflow_rds.id
description = "允许内部网络访问所有流量"
}
resource "aws_db_subnet_group" "airflow" {
name = "postgres-airflow"
subnet_ids = aws_subnet.airflow[*].id
}
resource "aws_db_instance" "airflow" {
db_name = "any db name"
apply_immediately = true
allocated_storage = "100"
storage_type = "gp3"
engine = "postgres"
engine_version = "17.2"
auto_minor_version_upgrade = true
instance_class = "db.t4g.micro"
username = "airflow"
password = data.aws_kms_secrets.airflow.plaintext["db_password"]
multi_az = false
publicly_accessible = false
deletion_protection = false
skip_final_snapshot = true
identifier = "airflow"
vpc_security_group_ids = [aws_security_group.airflow_rds.id]
db_subnet_group_name = aws_db_subnet_group.airflow.name
}
点击全屏进入,需要时退出全屏
你可以也用 terraform 创建所有这些资源吧!
使这一切成为可能的环境变量设置为了让Airflow在ECS中与CeleryExecutor一起正常工作,需要设置几个环境变量来处理日志、任务执行、数据库连接、使用Redis作为消息代理以及外部集成的需求。这些环境变量定义在这些Terraform locals中,并传递给Airflow容器。
这表示一个占位符
或
此处省略部分内容 (chǔcǐ shěnglüè bù biǎn nèiróng)
1️⃣ Airflow 核心配置
-
实例名:
"AIRFLOW__WEBSERVER__INSTANCE_NAME" = "airflow-webserver"
-
帮助识别web服务器实例。
-
执行器:
"AIRFLOW__CORE__EXECUTOR" = "CeleryExecutor"
- 使用 CeleryExecutor 将任务分发到多个工作进程中,而不是在一个实例中顺序执行它们。
- 接下来是关于数据库连接的配置。
-
数据库连接:
-
"AIRFLOW__CORE__SQL_ALCHEMY_CONN"
-
连接至 PostgreSQL,使用存储在 AWS KMS 密钥管理服务中的凭证。
-
示例加载:
"AIRFLOW__CORE__LOAD_EXAMPLES" = "True"
这个配置项决定是否加载示例 DAG。
……此处省略若干字……
2️⃣ 日志记录配置(AWS CloudWatch & S3)
-
日志级别:
"AIRFLOW__LOGGING__LOGGING_LEVEL" = "DEBUG"
- 这会启用详细的日志记录以帮助调试。
-
远程日志记录到CloudWatch:
"AIRFLOW__LOGGING__REMOTE_LOGGING" = "True"
"AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID" = "aws_conn"
"AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER" = "s3://abc"
-
这样一来,即使容器重启,也可以访问这些日志,它们存储在 S3 和 CloudWatch 中。
-
- *
3️⃣ 芹菜 (Celery) 及 Redis 配置(消息队列和任务结果的存储)
-
Redis 消息队列:
"AIRFLOW__CELERY__BROKER_URL" = "redis://${aws_elasticache_cluster.airflow.cache_nodes[0].address}:6379/0"
- Celery 使用 Redis 用于任务队列(尚未设置,另一个待解决的问题)。
-
任务结果存储(使用 PostgreSQL):
"AIRFLOW__CELERY__RESULT_BACKEND" = "db+postgresql://airflow:${data.aws_kms_secrets.airflow.plaintext["db_password"]}@${aws_db_instance.airflow.endpoint}/airflow"
- 任务执行结果存储在 PostgreSQL 中,确保结果持久保存。
-
Celery 传输选项:
"AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT" = "1800"
(即30分钟)-
确保任务不会过早被视为失败。
-
- *
4️⃣ SMTP (用于 DAG 失败的邮件警报和通知)
-
SMTP 配置:
-
"AIRFLOW__SMTP__SMTP_HOST" = "d"
-
"AIRFLOW__SMTP__SMTP_MAIL_FROM" = "abc@email.com"
-
"AIRFLOW__SMTP__SMTP_PORT" = "587"
-
"AIRFLOW__SMTP__SMTP_SSL" = "True"
用于通过 邮件 发送失败通知。
-
- *
5 AWS相关配置
-
地区设定:
-
"AWS_DEFAULT_REGION" = local.region
-
确保Terraform和Airflow组件在正确的AWS区域中运行。
-
Fluent Bit日志记录以提高可观测性:
-
使用 Fluent Bit (
aws-for-fluent-bit:stable
) 收集日志。 -
- *
6️⃣ 外部集成,(GitHub 和 AWS密钥管理服务)
-
GitHub 连接(Airflow 提供器):
-
"AIRFLOW__PROVIDERS__GITHUB__GITHUB_CONN_ID" = "github_default"
(Airflow 连接标识) -
"AIRFLOW__PROVIDERS__GITHUB__ACCESS_TOKEN" = data.aws_kms_secrets.airflow.plaintext["github_token"]
(GitHub 访问令牌) -
使 Airflow DAG 能够与 GitHub API 进行交互。
-
- *
那些难受的事 😭
- 为Celery配置Redis 这因为网络和IAM角色问题而变得非常痛苦。
- 在处理权限的同时调试S3和CloudWatch的日志存储让人头疼。
- 管理AWS Secrets Manager和KMS解密凭据增加了复杂性。
- 根据Redis队列深度和CPU/内存使用情况自动扩展工单需要精细调整。
现在Airflow的基础架构已经基本上搭建完成了(除了Redis的麻烦 😭),是时候移动我们的DAGs了。我们不再动态挂载它们,而是将它们直接集成到Docker镜像中。这确保了每个运行Airflow调度器或工作者的容器都预装了DAGs,而无需依赖外部存储。
1️⃣ 我们是如何将DAGs打包进Docker镜像中的
在我们编写的Dockerfile中,我们通过将DAGs复制到容器中的 /dags
目录来添加DAGs。
2️⃣为什么选这种方法?
- ✅ 无需使用外部 DAG 存储(如 S3、EFS 或 Git 同步)。
- ✅ 确保版本的一致性 — DAG 是 Docker 构建过程中 的一部分,因此每次部署都会包含一个 已知 的 DAG 版本。
- ✅ 简化部署流程 — 无需在运行时复制 DAG。
3️⃣ 构建及推送Docker镜像
添加DAGs之后,我们构建及推送Docker镜像:
docker build -t airflow-custom:latest .
这将构建一个名为 airflow-custom:latest 的 Docker 镜像。
docker tag airflow-custom:latest <AWS_ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/airflow:latest
然后,我们将此镜像标记为指向 AWS ECR 的特定路径。
docker push <AWS_ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/airflow:latest
最后,我们推送镜像到指定的 AWS ECR 存储库。
请将 <AWS_ACCOUNT_ID> 和 <AWS_REGION> 替换为实际的 AWS 帐户 ID 和区域。
切换到全屏 切换回正常模式
4️⃣ 更新 ECS 以使用新镜像
由于我们将 DAGs 集成到镜像中,我们只需更新 ECS 以获取最新镜像,它们就会随之存在。(DAGs:有向无环图)
aws ecs update-service \
--cluster airflow-cluster \
--service airflow-scheduler \
--强制新部署 # 强制部署新版本
进入全屏, 退出全屏
这会触发调度器的滚动式重启,从而确保新的DAGs(有向无环图)被加载。
痛苦与接下来的一步 😭
- DAG 调试: 如果 DAG 有语法错误,ECS 会在循环中不断重启调度器 直到修复为止。
- 热加载? 编译 DAG 需要在每次 DAG 更新时都进行 重新部署——暂时可以接受,但我们可能会在之后加入 挂载卷功能 或 Git 同步 功能。
-
编译前测试 DAG: 为了避免部署出错,我们应在将 DAG 添加到镜像前进行 本地测试。
-
- *
大家注意了!最后冲刺:DAGs 来了!🏠
DAGs 现在是在容器 内部了 ,这意味着无需运行时复制,没有 DAG 丢失的问题,还 少了一件需要操心的事 ——直到下一个问题出现。🔥
现在我躺在血肉和尸体堆上休息 ⚰️这花了两天残酷的时间,但我们慢慢地关闭了每一个Glue作业,一个接一个,就像狙击手逐个击落目标。每次关机都伴随着期待的心情——Airflow能否顺利接手,还是会陷入另一个调试噩梦中呢?
随着每次转换,DAGs启动后,我们监控任务执行,并祈祷Celery不会出问题。仔细检查了日志,调整了重试,喝掉了无数杯咖啡。
到了战争的最后阶段,在这场战争结束时,最后,数字本身就说明了一切:
🚀 Glue作业费用减少了96%。
🔥 Airflow运行正常,任务在ECS集群上高效运行。
💀 Redis勉强撑过了难关,差点让我们抓狂了。
这不仅是一次迁移,更是一次部署;这是一场意志的对决,令人意想不到的是,我们最终赢了。现在,当一切尘埃落定,我喘口气,稍微休息一下——不是因为这场仗结束了,而是因为下一个战场马上就要来了。