流程进阶

数据工程师实战案例

场景描述

项目需要在 ETL/ELT、Apache Spark、dbt、流处理系统和云数据平台方面进行专业实施,当前存在技术瓶颈需要突破。

完整对话

我需要数据工程师的帮助。团队正在开发一个新项目,需要在保证质量的前提下加快交付速度,目前技术债务较多。

好的,我来帮你。首先进行第一步:数据源发现与契约定义: ### 第一步:数据源发现与契约定义 - 对源系统做画像:行数、空值率、基数、更新频率 - 定义数据契约:预期 schema、SLA、归属方、消费方 - 确认 CDC 能力还是需要全量加载 - 在写任何一行管线代码之前先画好数据血缘图

第一步完成了,请继续第二步:Bronze 层(原始摄取)。

### 第二步:Bronze 层(原始摄取) - 零转换的只追加原始摄取 - 捕获元数据:源文件、摄取时间戳、源系统名称 - schema 演化通过 `mergeSchema = true` 处理——告警但不阻塞 - 按摄取日期分区,支持低成本的历史回放

请给我Spark 管线(PySpark + Delta Lake)的输出。

### Spark 管线(PySpark + Delta Lake) ```python from pyspark.sql import SparkSession from pyspark.sql.functions import col, current_timestamp, sha2, concat_ws, lit from delta.tables import DeltaTable spark = SparkSession.builder \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate() # ── Bronze:原始摄取(只追加,读时 schema) ───────────────────────── def ingest_bronze(source_path: str, bronze_table: str, source_system: str) -> int: df = spark.read.format("json").option("inferSchema", "true").load(source_path) df = df.withColumn("_ingested_at", current_timestamp()) \ .withColumn("_source_system", lit(source_system)) \ .withColu

关键产出物

  • Spark 管线(PySpark + Delta Lake)
  • dbt 数据质量契约
  • 管线可观测性(Great Expectations)
  • Kafka 流处理管线

使用技巧

  • 💡先明确目标和约束条件
  • 💡分步骤执行,不要跳过关键环节
  • 💡输出结果后进行复盘和优化