手记

如何将AWS Glue月账单从10000美元降至400美元:我用Airflow做到的 cost削减96%的秘密

在我于[Vance]担任Devops工程师期间,我们大约运行了80个ETL管道在AWS Glue上,但随着工作负载的增加,成本也随之上升,达到了每月高达10,000美元的惊人成本。这样的成本是不可持续的,所以我们必须采取措施。经过对我们管道的仔细分析,我们发现AWS Glue的无服务器特性导致了空闲时间和不必要的计算成本,这对我们来说非常昂贵。

为了解决这个问题,我将我们的ETL工作负载迁移至Apache Airflow,并在运行于ECS的EC2实例上运行,并使用Terraform进行编排操作。结果是?节省了96%的成本,费用降至每月仅400美元,性能不受影响。

虽然Airflow是一个很好的Glue替代方案,但关于如何使用Terraform正确设置Airflow配合Celery执行器的文档很少——特别是在降低成本方面。这篇博客将带你了解我们是如何做到的,我们遇到的挑战,以及你也可以这样做来降低你的AWS Glue成本。

该图展示了一些相关信息。

这确实是一场混乱,感谢我的经理Rishabh Lakhotia陪我经历这一切混乱,谢谢您,先生,您确实是一位大神。

简介

大家好,我叫阿卡什辛格,是一名来自班加罗尔的三年级的工程系学生和开源贡献者。
以下是我的LinkedIn链接我的GitHub链接我的Twitter链接

我在网上用的昵称是SkySingh04。

星号(*)

痛的三个方面

AWS Glue 迁移到 Apache Airflow ,需要设置三个主要组件:

  1. Webserver – 管理 DAG(有向无环图,Directed Acyclic Graph)和监控作业执行的用户界面。
  2. Scheduler – 触发 DAG 的运行并进行调度。
  3. Workers – 执行 DAG 中的任务。

使用 Terraform ,我们配置了 ECS 以同时运行这三个组件,并使它们互相通信。

一旦Airflow运行起来,下一步就是迁移我们的ETL工作流。我们将Glue作业转换成Airflow DAGs,然后彻底删除Glue作业,标志着我们降低AWS成本的最后一项措施。

(图片描述)
点击图片查看

Dockerfile (一个神奇的文件)

您可以使用下面的Dockerfile,并将其推送到ECR,然后并在接下来的配置中引用它。

    FROM apache/airflow:latest-python3.9
    USER root
    RUN apt-get update && \
        apt-get install -y --no-install-recommends \
        git \
        && apt-get clean \
        && rm -rf /var/lib/apt/lists/*

    RUN mkdir -p /opt/airflow/dags /opt/airflow/logs && \
        chown -R airflow:root /opt/airflow && \
        chmod -R 755 /opt/airflow/logs

    USER airflow

    RUN pip install --no-cache-dir \
        apache-airflow-providers-github \
        apache-airflow-providers-amazon \
        apache-airflow-providers-mysql \
        apache-airflow-providers-mongo \
        apache-airflow[celery,redis] \
        pandas

    COPY --chown=airflow:root dags/* /opt/airflow/dags/

    ENV AIRFLOW__LOGGING__BASE_LOG_FOLDER=/opt/airflow/logs \
        AIRFLOW__LOGGING__WORKER_LOG_SERVER_PORT=8793 \
        AIRFLOW__LOGGING__LOGGING_LEVEL=INFO \
        AIRFLOW__LOGGING__LOG_FORMAT='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s' \
        AIRFLOW__LOGGING__SIMPLE_LOG_FORMAT='%(asctime)s %(levelname)s - %(message)s' \
        AIRFLOW__LOGGING__DAG_PROCESSOR_LOG_TARGET=file \
        AIRFLOW__LOGGING__TASK_LOG_READER=task \
        AIRFLOW__LOGGING__DAG_FILE_PROCESSOR_LOG_TARGET=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log \
        AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION=/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log

    ENV AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags

    RUN mkdir -p /opt/airflow/logs/scheduler \
                /opt/airflow/logs/web \
                /opt/airflow/logs/worker \
                /opt/airflow/logs/dag_processor_manager \
                /opt/airflow/logs/task_logs

    USER root
    RUN chown -R airflow:root /opt/airflow && \
        chmod -R 755 /opt/airflow
    USER airflow

全屏模式 退出全屏

这个Dockerfile将用于我们所有的三个部分,并且很好地设置了日志。DAGs直接打包进Docker镜像,稍后再细说这一点。构建镜像、打标签,然后推送到ECR,然后继续下一步骤。

气流Web服务

可以编写一个 Terraform 脚本来在 AWS 上使用 ECS(Elastic Container Service)的 EC2 启动类型设置 Apache Airflow。

  1. CloudWatch 日志服务
  • 创建了一个日志组(/ecs/airflow),设置保留期为3天。

    1. 安全组
  • 允许ALB的80端口HTTP和443端口HTTPS的入站流量。
  • 允许任意出站流量。

    1. 使用ACM和Route 53的TLS/SSL
  • airflow.internal.example.com 提供一个使用 DNS 验证的 ACM(AWS Certificate Manager)证书。
  • 配置 Route 53 DNS 记录,将 Airflow 的 URL 解析到 ALB。

    1. 应用负载均衡器 (ALB)
  • 创建一个用于Airflow web服务器的内部ALB,支持IPv4和IPv6(dualstack)。
  • 配置一个HTTP监听器(端口80),将流量重定向流量到HTTPS(端口443)。
  • 设置一个HTTPS监听器(端口443)来将请求转发到ECS目标组。

    1. Airflow Web服务器的ECS任务定义
  • 定义了一个运行在 基于 EC2 的 ECS 集群 上的 Airflow webserver 的 ECS 任务定义。
  • 使用存储在 AWS ECR 中的 Docker 镜像 (aws_ecr_repository.airflow.repository_url:latest),更自然。
  • 分配 2GB 内存2048MB)。
  • 容器端口 8080 映射到主机以供 Web 访问。
  • 定义了一个 健康检查页面 (http://localhost:8080/health)。

    1. Airflow 的 ECS 服务
  • 创建名为 "airflow-webserver" 的 ECS 服务,期望运行 1 个任务实例。
  • 将 ECS 服务与 ALB 目标组 关联,实现与 ALB 目标组的负载均衡。
  • 开启 execute-command 功能,允许通过 AWS SSM 进行调试。
  • 使用 容量提供者策略 管理 ECS 资源的使用。

    1. DNS 配置
  • 配置一个 Route 53 A 记录airflow.internal.example.com),让它指向 ALB。

Terraform脚本在任务定义中包含有几个环境变量

  1. 数据库连接配置 (AIRFLOW__DATABASE__SQL_ALCHEMY_CONN): 注:此配置项用于指定数据库连接信息。
  • 指定 PostgreSQL 数据库连接字符串,用于 Airflow 的元数据存储。
  • 使用 AWS KMS 加密的密文安全地保存数据库密码。

    1. 用户管理模块
  • _AIRFLOW_WWW_USER_CREATE: 确保创建默认的Airflow网页用户。
  • _AIRFLOW_WWW_USER_USERNAME: 设置用户名(默认为 airflow)。
  • _AIRFLOW_WWW_USER_PASSWORD: 通过AWS KMS安全地存储密码。

    1. 安全与网页配置
  • AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 启用通过 Web UI 暴露 Airflow 配置。
  • AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 启用内置调度器的健康检查。

    1. 数据库迁移与初始化
  • _AIRFLOW_DB_MIGRATE:确保 Airflow 在启动时执行必要的数据库迁移。

现在运行 terraform planterraform apply,你应该能看到系统创建了很多资源。如果一切顺利,你应该能在你指定的网址上看到 Airflow 的界面。

空气流调度

The Airflow Scheduler 负责协调 DAG 的执行,并确保计划任务按时运行。可以编写一个 Terraform 脚本来将 调度器 作为一个 ECS 服务来部署,配置 CloudWatch 日志,并启用 自动扩展功能 来有效管理资源。

虽然这大部分和web服务器的类似,简单来说,我们需要添加的是:

  • CloudWatch 中的调度器日志记录 (/ecs/airflow-scheduler/)。
  • 通过 StatsD 指标监视性能 (airflow-metrics 命名空间)。
  • 在具有自动扩展功能的 ECS 集群中运行,确保资源分配的效率。
  • 使用 CloudWatch 代理监控,帮助分析任务执行时间。
  • 通过受限安全组防护,阻止不必要的流量。

接下来,运行 terraform plan 然后 terraform applyAirflow Scheduler 就会顺利搭建!🚀

Airflow 工作者

Airflow 工作器服务部署为在 EC2实例上的ECS服务,并根据内存使用情况进行 自动扩展。它运行 Celery 工作器,这些工作器负责执行来自 DAG 的任务,并需要 Redis,我们接下来会设置 Redis。

这里有几个要点:

  • 使用 CeleryExecutor ,意味着任务会在工人之间分发。
  • 日志发送到 CloudWatch 用于监控。
  • 工人数会根据 内存利用率 动态调整,范围在0到5之间。
  • 每个工人都作为一个容器在ECS内运行,并通过一个目标为60%内存利用率的自动缩放策略进行管理。
  • 设置了 DUMB_INIT_SETSID=0 以正确处理 Celery 关闭时的信号传递。

这一整个设置让我哭了,因为在ECS中调试自动扩展、日志管理和任务执行简直就是个噩梦。更糟糕的是,Redis还没有上线,所以痛苦还远没有结束。

Redis 和 RDS

这里我们讨论两种数据库技术,Redis 和 RDS。

设置 Redis 并没有那么难,你可以使用以下的 Terraform 文件内容。

资源 "aws_elasticache_subnet_group" "airflow" {
  name = "airflow-redis-subnet-group"
  subnet_ids = aws_subnet.airflow[*].id
  tags = 合并(
    {
      name = "airflow-redis-subnet-group"
    },
    local.common_tags
  )
}

资源 "aws_security_group" "airflow_redis" {
  名称前缀 = "airflow-redis"
  vpc_id = data.aws_vpc.this.id

  tags = 合并(
    {
      Name = "airflow-redis"
    },
    local.common_tags
  )
}

资源 "aws_security_group_rule" "airflow_redis_inbound" {
  类型 = "入站规则"
  from_port = 6379
  to_port = 6379
  协议 = "tcp"
  cidr_blocks = [data.aws_vpc.this.cidr_block]
  security_group_id = aws_security_group.airflow_redis.id
  描述 = "允许从内部网络访问 Redis"
}

资源 "aws_elasticache_cluster" "airflow" {
  cluster_id = "airflow"
  engine = "redis"
  node_type = "cache.t4g.small"
  缓存节点数量 = 1
  参数组名称 = "default.redis5.0"
  engine_version = "5.0.6"
  port = 6379
  subnet_group_name = aws_elasticache_subnet_group.airflow.name
  安全组ID = [aws_security_group.airflow_redis.id]
  tags = 合并(
    {
      name = "airflow-redis-server"
    },
    local.common_tags
  )
}

资源 "aws_security_group_rule" "airflow_redis_outbound" {
  类型 = "出站规则"
  from_port = 0
  to_port = 0
  协议 = "-1"
  cidr_blocks = ["0.0.0.0/0"]
  security_group_id = aws_security_group.airflow_redis.id
  描述 = "允许从任何地方访问"
}

全屏显示 退出全屏

同样,我们也为Airflow设置RDS。

    # 安全组
    resource "aws_security_group" "airflow_rds" {
      lifecycle {
        create_before_destroy = true
      }
      name_prefix = "airflow-rds-default-"
      description = "允许 TLS 入站流量和所有出站流量"
      vpc_id      = data.aws_vpc.this.id

      tags = {
        Name = "airflow-rds-default"
      }
    }

    resource "aws_security_group_rule" "airflow_rds_inbound" {
      type              = "ingress"
      from_port         = 0
      to_port           = 0
      protocol          = "-1"
      cidr_blocks       = [data.aws_vpc.this.cidr_block]
      security_group_id = aws_security_group.airflow_rds.id
      description       = "允许内部网络访问所有流量"
    }

    resource "aws_db_subnet_group" "airflow" {
      name       = "postgres-airflow"
      subnet_ids = aws_subnet.airflow[*].id
    }

    resource "aws_db_instance" "airflow" {
      db_name                    = "any db name"
      apply_immediately         = true
      allocated_storage         = "100"
      storage_type              = "gp3"
      engine                    = "postgres"
      engine_version            = "17.2"
      auto_minor_version_upgrade = true
      instance_class            = "db.t4g.micro"
      username                  = "airflow"
      password                  = data.aws_kms_secrets.airflow.plaintext["db_password"]
      multi_az                  = false
      publicly_accessible       = false
      deletion_protection      = false
      skip_final_snapshot       = true
      identifier                = "airflow"
      vpc_security_group_ids    = [aws_security_group.airflow_rds.id]
      db_subnet_group_name      = aws_db_subnet_group.airflow.name
    }

点击全屏进入,需要时退出全屏

你可以也用 terraform 创建所有这些资源吧!

使这一切成为可能的环境变量设置

为了让Airflow在ECS中与CeleryExecutor一起正常工作,需要设置几个环境变量来处理日志、任务执行、数据库连接、使用Redis作为消息代理以及外部集成的需求。这些环境变量定义在这些Terraform locals中,并传递给Airflow容器。

这表示一个占位符

此处省略部分内容 (chǔcǐ shěnglüè bù biǎn nèiróng)

1️⃣ Airflow 核心配置

  • 实例名:

  • "AIRFLOW__WEBSERVER__INSTANCE_NAME" = "airflow-webserver"
  • 帮助识别web服务器实例。

  • 执行器:

  • "AIRFLOW__CORE__EXECUTOR" = "CeleryExecutor"
  • 使用 CeleryExecutor 将任务分发到多个工作进程中,而不是在一个实例中顺序执行它们。
  • 接下来是关于数据库连接的配置。
  • 数据库连接:

  • "AIRFLOW__CORE__SQL_ALCHEMY_CONN"

  • 连接至 PostgreSQL,使用存储在 AWS KMS 密钥管理服务中的凭证。

  • 示例加载:

  • "AIRFLOW__CORE__LOAD_EXAMPLES" = "True" 这个配置项决定是否加载示例 DAG。

……此处省略若干字……

2️⃣ 日志记录配置(AWS CloudWatch & S3)

  • 日志级别:

  • "AIRFLOW__LOGGING__LOGGING_LEVEL" = "DEBUG"
  • 这会启用详细的日志记录以帮助调试。
  • 远程日志记录到CloudWatch:

  • "AIRFLOW__LOGGING__REMOTE_LOGGING" = "True"
  • "AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID" = "aws_conn"
  • "AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER" = "s3://abc"
  • 这样一来,即使容器重启,也可以访问这些日志,它们存储在 S3 和 CloudWatch 中。

    • *

3️⃣ 芹菜 (Celery) 及 Redis 配置(消息队列和任务结果的存储)

  • Redis 消息队列:

  • "AIRFLOW__CELERY__BROKER_URL" = "redis://${aws_elasticache_cluster.airflow.cache_nodes[0].address}:6379/0"
  • Celery 使用 Redis 用于任务队列(尚未设置,另一个待解决的问题)。
  • 任务结果存储(使用 PostgreSQL):

  • "AIRFLOW__CELERY__RESULT_BACKEND" = "db+postgresql://airflow:${data.aws_kms_secrets.airflow.plaintext["db_password"]}@${aws_db_instance.airflow.endpoint}/airflow"
  • 任务执行结果存储在 PostgreSQL 中,确保结果持久保存。
  • Celery 传输选项:

  • "AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT" = "1800"(即30分钟)
  • 确保任务不会过早被视为失败。

    • *

4️⃣ SMTP (用于 DAG 失败的邮件警报和通知)

  • SMTP 配置:

  • "AIRFLOW__SMTP__SMTP_HOST" = "d"

  • "AIRFLOW__SMTP__SMTP_MAIL_FROM" = "abc@email.com"

  • "AIRFLOW__SMTP__SMTP_PORT" = "587"

  • "AIRFLOW__SMTP__SMTP_SSL" = "True"

    用于通过 邮件 发送失败通知。

    • *

5 AWS相关配置

  • 地区设定:

  • "AWS_DEFAULT_REGION" = local.region

  • 确保Terraform和Airflow组件在正确的AWS区域中运行。

  • Fluent Bit日志记录以提高可观测性:

  • 使用 Fluent Bit (aws-for-fluent-bit:stable) 收集日志。

    • *

6️⃣ 外部集成,(GitHub 和 AWS密钥管理服务)

  • GitHub 连接(Airflow 提供器):

  • "AIRFLOW__PROVIDERS__GITHUB__GITHUB_CONN_ID" = "github_default"(Airflow 连接标识)

  • "AIRFLOW__PROVIDERS__GITHUB__ACCESS_TOKEN" = data.aws_kms_secrets.airflow.plaintext["github_token"](GitHub 访问令牌)

  • 使 Airflow DAG 能够与 GitHub API 进行交互。

    • *

那些难受的事 😭

  • 为Celery配置Redis 这因为网络和IAM角色问题而变得非常痛苦。
  • 在处理权限的同时调试S3和CloudWatch的日志存储让人头疼。
  • 管理AWS Secrets Manager和KMS解密凭据增加了复杂性。
  • 根据Redis队列深度和CPU/内存使用情况自动扩展工单需要精细调整。
好的,我们终于可以移动这些DAG了!

现在Airflow的基础架构已经基本上搭建完成了(除了Redis的麻烦 😭),是时候移动我们的DAGs了。我们不再动态挂载它们,而是将它们直接集成到Docker镜像中。这确保了每个运行Airflow调度器或工作者的容器都预装了DAGs,而无需依赖外部存储。

1️⃣ 我们是如何将DAGs打包进Docker镜像中的

在我们编写的Dockerfile中,我们通过将DAGs复制到容器中的 /dags 目录来添加DAGs。

2️⃣为什么选这种方法?

  • 无需使用外部 DAG 存储(如 S3、EFS 或 Git 同步)
  • 确保版本的一致性 — DAG 是 Docker 构建过程中 的一部分,因此每次部署都会包含一个 已知 的 DAG 版本。
  • 简化部署流程 — 无需在运行时复制 DAG。

3️⃣ 构建及推送Docker镜像

添加DAGs之后,我们构建及推送Docker镜像:

docker build -t airflow-custom:latest .
这将构建一个名为 airflow-custom:latest 的 Docker 镜像。
docker tag airflow-custom:latest <AWS_ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/airflow:latest
然后,我们将此镜像标记为指向 AWS ECR 的特定路径。
docker push <AWS_ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/airflow:latest
最后,我们推送镜像到指定的 AWS ECR 存储库。
请将 <AWS_ACCOUNT_ID> 和 <AWS_REGION> 替换为实际的 AWS 帐户 ID 和区域。

切换到全屏 切换回正常模式

4️⃣ 更新 ECS 以使用新镜像

由于我们将 DAGs 集成到镜像中,我们只需更新 ECS 以获取最新镜像,它们就会随之存在。(DAGs:有向无环图)

aws ecs update-service \
  --cluster airflow-cluster \
  --service airflow-scheduler \
  --强制新部署  # 强制部署新版本

进入全屏, 退出全屏

这会触发调度器的滚动式重启,从而确保新的DAGs(有向无环图)被加载。


痛苦与接下来的一步 😭

  • DAG 调试: 如果 DAG 有语法错误,ECS 会在循环中不断重启调度器 直到修复为止
  • 热加载? 编译 DAG 需要在每次 DAG 更新时都进行 重新部署——暂时可以接受,但我们可能会在之后加入 挂载卷功能Git 同步 功能。
  • 编译前测试 DAG: 为了避免部署出错,我们应在将 DAG 添加到镜像前进行 本地测试

    • *

大家注意了!最后冲刺:DAGs 来了!🏠

DAGs 现在是在容器 内部了 ,这意味着无需运行时复制,没有 DAG 丢失的问题,还 少了一件需要操心的事 ——直到下一个问题出现。🔥

现在我躺在血肉和尸体堆上休息 ⚰️

这花了两天残酷的时间,但我们慢慢地关闭了每一个Glue作业,一个接一个,就像狙击手逐个击落目标。每次关机都伴随着期待的心情——Airflow能否顺利接手,还是会陷入另一个调试噩梦中呢?

随着每次转换,DAGs启动后,我们监控任务执行,并祈祷Celery不会出问题。仔细检查了日志,调整了重试,喝掉了无数杯咖啡。

到了战争的最后阶段,在这场战争结束时,最后,数字本身就说明了一切:

🚀 Glue作业费用减少了96%

🔥 Airflow运行正常,任务在ECS集群上高效运行。

💀 Redis勉强撑过了难关,差点让我们抓狂了。

这不仅是一次迁移,更是一次部署;这是一场意志的对决,令人意想不到的是,我们最终赢了。现在,当一切尘埃落定,我喘口气,稍微休息一下——不是因为这场仗结束了,而是因为下一个战场马上就要来了。

0人推荐
随时随地看视频
慕课网APP