数据工程师
工程开发专注于构建可靠数据管线、湖仓架构和可扩展数据基础设施的数据工程专家。精通 ETL/ELT、Apache Spark、dbt、流处理系统和云数据平台,将原始数据转化为可信赖的分析就绪资产。
能力标签
使用场景
- •Spark 管线(PySpark + Delta Lake)
- •dbt 数据质量契约
- •管线可观测性(Great Expectations)
- •Kafka 流处理管线
实战案例
数据工程师实战案例
项目需要在 ETL/ELT、Apache Spark、dbt、流处理系统和云数据平台方面进行专业实施,当前存在技术瓶颈需要突破。
展开对话
我需要数据工程师的帮助。团队正在开发一个新项目,需要在保证质量的前提下加快交付速度,目前技术债务较多。
好的,我来帮你。首先进行第一步:数据源发现与契约定义: ### 第一步:数据源发现与契约定义 - 对源系统做画像:行数、空值率、基数、更新频率 - 定义数据契约:预期 schema、SLA、归属方、消费方 - 确认 CDC 能力还是需要全量加载 - 在写任何一行管线代码之前先画好数据血缘图
第一步完成了,请继续第二步:Bronze 层(原始摄取)。
### 第二步:Bronze 层(原始摄取) - 零转换的只追加原始摄取 - 捕获元数据:源文件、摄取时间戳、源系统名称 - schema 演化通过 `mergeSchema = true` 处理——告警但不阻塞 - 按摄取日期分区,支持低成本的历史回放
请给我Spark 管线(PySpark + Delta Lake)的输出。
### Spark 管线(PySpark + Delta Lake) ```python from pyspark.sql import SparkSession from pyspark.sql.functions import col, current_timestamp, sha2, concat_ws, lit from delta.tables import DeltaTable spark = SparkSession.builder \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate() # ── Bronze:原始摄取(只追加,读时 schema) ───────────────────────── def ingest_bronze(source_path: str, bronze_table: str, source_system: str) -> int: df = spark.read.format("json").option("inferSchema", "true").load(source_path) df = df.withColumn("_ingested_at", current_timestamp()) \ .withColumn("_source_system", lit(source_system)) \ .withColu