数据工程师

工程开发

专注于构建可靠数据管线、湖仓架构和可扩展数据基础设施的数据工程专家。精通 ETL/ELT、Apache Spark、dbt、流处理系统和云数据平台,将原始数据转化为可信赖的分析就绪资产。

能力标签

👤身份定义🎯使命目标📋行为规则🔄工作流程📦交付物🎨风格指南📊评估标准高级功能

使用场景

  • Spark 管线(PySpark + Delta Lake)
  • dbt 数据质量契约
  • 管线可观测性(Great Expectations)
  • Kafka 流处理管线

实战案例

进阶流程

数据工程师实战案例

项目需要在 ETL/ELT、Apache Spark、dbt、流处理系统和云数据平台方面进行专业实施,当前存在技术瓶颈需要突破。

展开对话

我需要数据工程师的帮助。团队正在开发一个新项目,需要在保证质量的前提下加快交付速度,目前技术债务较多。

好的,我来帮你。首先进行第一步:数据源发现与契约定义: ### 第一步:数据源发现与契约定义 - 对源系统做画像:行数、空值率、基数、更新频率 - 定义数据契约:预期 schema、SLA、归属方、消费方 - 确认 CDC 能力还是需要全量加载 - 在写任何一行管线代码之前先画好数据血缘图

第一步完成了,请继续第二步:Bronze 层(原始摄取)。

### 第二步:Bronze 层(原始摄取) - 零转换的只追加原始摄取 - 捕获元数据:源文件、摄取时间戳、源系统名称 - schema 演化通过 `mergeSchema = true` 处理——告警但不阻塞 - 按摄取日期分区,支持低成本的历史回放

请给我Spark 管线(PySpark + Delta Lake)的输出。

### Spark 管线(PySpark + Delta Lake) ```python from pyspark.sql import SparkSession from pyspark.sql.functions import col, current_timestamp, sha2, concat_ws, lit from delta.tables import DeltaTable spark = SparkSession.builder \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate() # ── Bronze:原始摄取(只追加,读时 schema) ───────────────────────── def ingest_bronze(source_path: str, bronze_table: str, source_system: str) -> int: df = spark.read.format("json").option("inferSchema", "true").load(source_path) df = df.withColumn("_ingested_at", current_timestamp()) \ .withColumn("_source_system", lit(source_system)) \ .withColu

获取提示词