DukeDuke
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
主页
项目文档
技术文档
  • 单机版
  • 微服务
  • 代办项目
  • 优鲜项目
项目管理
关于我们
  • 技术文档

    • 网络原理

      • 交换机
      • 路由器
      • TCP/IP协议
      • HTTP 与 HTTPS
    • 软件架构

      • 什么是软件架构
      • 分层架构
      • 微服务架构
      • 事件驱动架构
      • 领域驱动设计(DDD)
      • 架构图
      • 高并发系统
    • Vue3

      • Vue3简介
      • Vue3响应式系统
      • Vue3组合式API
      • Vue3生命周期
      • Vue3模板语法
      • Vue3组件系统
      • Vue3 路由系统
      • Vue3 状态管理
      • Vue3 性能优化
      • Vue3 TypeScript 支持
      • Vue3 项目实战
      • VUE 面试题大全
      • Node.js 安装
    • JAVA

      • JVM

        • 认识JVM
        • JVM类加载器
        • 运行时数据区
        • 执行引擎
        • 本地方法接口
        • 本地方法库
        • JVM垃圾回收
        • JVM性能监控
        • JVM调优
      • 设计模式
        • 单例模式
        • 工厂模式
        • 策略模式
        • 适配器模式
        • 建造者模式
        • 原型模式
        • 装饰器模式
        • 代理模式
        • 外观模式
        • 享元模式
        • 组合模式
        • 桥接模式
      • Java多线程

        • Java 线程基础详解
        • Java 线程池详解
        • Java ThreadLocal 详解
        • Java volatile 详解
        • Java 线程间通信详解
        • Java 线程安全详解
        • Java 线程调度详解
        • Java 线程优先级详解

        • Java 线程中断详解
        • Java 线程死锁详解
      • Java反射
      • Java 面试题

        • Java 基础概念面试题
        • Java 面向对象编程面试题
        • Java 集合框架面试题
        • Java 多线程与并发面试题
        • JVM 与内存管理面试题
        • Java I/O 与 NIO 面试题
        • Java 异常处理面试题
        • Java 反射与注解面试题
        • Java Spring 框架面试题
        • Java 数据库与 JDBC 面试题
        • Java 性能优化面试题
        • Java 实际项目经验面试题
        • Java 高级特性面试题
        • Java 面试准备建议
    • Python

      • Python简介
      • Python安装
      • Python hello world
      • Python基础语法
      • Python数据类型
      • Python数字
      • Python字符串
      • Python列表
      • Python元组
      • Python字典
      • Python日期时间
      • Python文件操作
      • Python异常处理
      • Python函数
      • Python类
      • Python模块
      • Python包
      • Python多线程
      • Python面向对象
      • Python爬虫
      • Django web框架
      • Python 面试题

        • Python 面试题导航
        • Python 基础概念
        • Python 面向对象编程
        • Python 数据结构
        • Python 高级特性
        • Python 框架
        • Python 性能优化
        • Python 项目经验
    • Spring

      • Spring
      • Springboot
      • Spring Security 安全框架
      • SpringBoot 中的事件详解
      • SpringBoot 中的定时任务详解
      • SpringBoot 自动装配原理与源码解释
    • Mybatis

      • Mybatis
      • Mybatis-Plus
    • 数据库

      • Redis

        • Redis简介
        • Redis(单机)安装
        • Redis配置
        • Redis数据结构
        • RDB、AOF 和混合持久化机制
        • Redis内存管理
        • Redis缓存一致性
        • Redis缓存穿透
        • Redis缓存击穿
        • Redis缓存雪崩
        • Redis Lua脚本
        • Redis主从复制
        • Redis哨兵模式
        • Redis集群
        • Redis数据分片
        • Redis CPU使用率过高
        • Redis面试题
      • MySQL

        • MySQL简介
        • MySQL安装
        • MySQL配置
        • MYSQL日常维护
        • MYSQL优化-慢查询
        • MYSQL优化-索引
        • MYSQL数据库设计规范
    • 消息队列

      • RocketMQ
      • Kafka
      • RabbitMQ
      • 消息队列面试题
    • 微服务

      • SpringCloud 微服务
      • Eureka 注册中心
      • Nacos 注册中心
      • Gateway 网关
      • Feign 服务调用
      • Sentinel 限流 与 熔断
      • Seata 分布式事务
      • CAP 理论
      • Redis 分布式锁
      • 高并发系统设计
    • ELK日志分析系统

      • Elasticsearch 搜索引擎
      • Logstash 数据处理
      • Kibana 可视化
      • ELK 实战
    • 开放API

      • 开放API设计
      • 开放API示例项目
    • 人工智能

      • 人工智能简介
      • 机器学习

      • 深度学习

      • 自然语言处理

      • 计算机视觉

        • CUDA与cuDNN详细安装
        • Conda 安装
        • Pytorch 深度学习框架
        • yolo 目标检测
        • TensorRT 深度学习推理优化引擎
        • TensorFlow 机器学习
        • CVAT 图像标注
        • Windows 下安装 CUDA、cuDNN、TensorRT、TensorRT-YOLO 环境
        • Windows10+CUDA+cuDNN+TensorRT+TensorRT-YOLO 部署高性能YOLO11推理
    • 大数据

      • 大数据简介
      • Hadoop 数据存储
      • Flume 数据采集
      • Sqoop 数据导入导出
      • Hive 数据仓库
      • Spark 数据处理
      • Flink 数据处理
      • Kafka 数据采集
      • HBase 数据存储
      • Elasticsearch 搜索引擎
    • 图像处理

      • 图像处理简介
      • 医学图像web呈现
      • 医学图像处理
      • 切片细胞分离问题
    • 服务器&运维

      • Linux 系统

        • Linux 系统管理
        • Linux 网络管理
        • Linux 文件管理
        • Linux 命令大全
      • Nginx Web 服务器

        • Nginx 安装 与 配置
        • Nginx 负载均衡
        • Nginx SSL证书配置
        • Nginx Keepalived 高可用
      • Docker 容器

        • Docker 简介
        • Docker 安装与配置
        • Docker 命令
        • Docker 部署 Nginx
        • Docker 部署 MySQL
        • Docker 部署 Redis
      • 服务器

        • 塔式服务器
        • 机架式服务器
        • 刀片服务器
      • Git 版本控制
      • Jenkins 持续集成
      • Jmeter 性能测试
      • Let's Encrypt 免费SSL证书
    • 简历

      • 项目经理简历
      • 开发工程师简历

Apache Spark 技术文档

1. Spark 简介

1.1 什么是 Spark

Apache Spark 是一个快速、通用的大数据处理引擎,专为大规模数据处理而设计。它提供了高级 API,支持 Java、Scala、Python 和 R 语言,以及一个优化的引擎,支持通用执行图。

Spark 的发展背景: 在大数据时代,传统的 Hadoop MapReduce 框架虽然能够处理大规模数据,但存在以下问题:

  • 计算速度慢:每次计算都需要读写磁盘,I/O 开销大
  • 编程模型复杂:需要编写大量的 Map 和 Reduce 函数
  • 不支持迭代计算:机器学习等算法需要多次迭代
  • 实时性差:只能进行批处理,无法处理实时数据流

为了解决这些问题,Apache Spark 应运而生。Spark 的核心思想是将数据尽可能多地保存在内存中,通过内存计算来大幅提升处理速度。

Spark 的核心优势:

  1. 内存计算:将中间结果保存在内存中,避免频繁的磁盘 I/O
  2. 通用性:不仅支持批处理,还支持流处理、机器学习、图计算等
  3. 易用性:提供丰富的 API,支持多种编程语言
  4. 容错性:通过 RDD 的血缘关系实现自动容错

1.2 Spark 的特点

  • 快速: 比 Hadoop MapReduce 快 100 倍

    • 通过内存计算避免磁盘 I/O 开销
    • 优化的执行引擎和调度算法
    • 支持数据本地性优化
  • 易用: 提供丰富的 API

    • 支持 Java、Scala、Python、R 等多种语言
    • 提供高级抽象(DataFrame、Dataset)
    • 内置机器学习库和图计算库
  • 通用: 支持批处理、流处理、机器学习、图计算

    • 批处理:处理历史数据
    • 流处理:处理实时数据流
    • 机器学习:内置 MLlib 库
    • 图计算:内置 GraphX 库
  • 容错: 基于 RDD 的容错机制

    • 通过血缘关系记录数据转换过程
    • 自动重新计算丢失的分区
    • 无需手动处理节点故障
  • 内存计算: 支持内存缓存和计算

    • 将热点数据缓存在内存中
    • 支持多种存储级别
    • 智能的内存管理策略

1.3 Spark vs Hadoop MapReduce

为了更好地理解 Spark 的优势,我们来看看它与传统 Hadoop MapReduce 的详细对比:

特性SparkHadoop MapReduce详细说明
计算模式内存计算磁盘计算Spark 将中间结果保存在内存中,MapReduce 每次都要写磁盘
速度快 100 倍较慢Spark 避免了频繁的磁盘 I/O,大幅提升计算速度
容错性RDD 容错重新计算Spark 通过血缘关系自动恢复,MapReduce 需要重新执行整个作业
实时性支持流处理仅批处理Spark 支持微批处理,可以实现准实时计算
易用性高级 API低级 APISpark 提供 DataFrame、Dataset 等高级抽象
迭代计算支持不支持Spark 支持机器学习等需要迭代的算法
内存使用高低Spark 需要更多内存,但性能更好
学习曲线相对简单较复杂Spark 的 API 更加直观和易用

为什么选择 Spark?

  1. 性能优势明显:在相同硬件条件下,Spark 的处理速度通常比 MapReduce 快 10-100 倍
  2. 编程模型更友好:支持多种编程语言,API 设计更加直观
  3. 生态系统更丰富:不仅支持批处理,还支持流处理、机器学习、图计算等
  4. 更好的容错机制:通过 RDD 的血缘关系实现细粒度的容错
  5. 活跃的社区支持:Apache 顶级项目,社区活跃,更新频繁

2. Spark 架构

2.1 整体架构

Spark 采用分布式架构,整个系统由多个组件协同工作。为了更好地理解 Spark 的架构,我们先来看一个完整的架构图:

┌─────────────────────────────────────────────────────────────┐
│                    Spark Application                        │
├─────────────────────────────────────────────────────────────┤
│  Driver Program (SparkContext)                             │
├─────────────────────────────────────────────────────────────┤
│  Cluster Manager (YARN/Mesos/Standalone)                   │
├─────────────────────────────────────────────────────────────┤
│  Worker Node 1    │  Worker Node 2    │  Worker Node N      │
│  ┌─────────────┐  │  ┌─────────────┐  │  ┌─────────────┐    │
│  │ Executor 1  │  │  │ Executor 2  │  │  │ Executor N  │    │
│  │ ┌─────────┐ │  │  │ ┌─────────┐ │  │  │ ┌─────────┐ │    │
│  │ │ Task 1  │ │  │  │ │ Task 2  │ │  │  │ │ Task N  │ │    │
│  │ └─────────┘ │  │  │ └─────────┘ │  │  │ └─────────┘ │    │
│  └─────────────┘  │  └─────────────┘  │  └─────────────┘    │
└─────────────────────────────────────────────────────────────┘

架构详细说明:

  1. Spark Application(Spark 应用):

    • 这是用户编写的 Spark 程序
    • 包含业务逻辑和数据处理代码
    • 运行在 Driver 进程中
  2. Driver Program(驱动程序):

    • 运行用户主函数的进程
    • 创建 SparkContext 对象
    • 负责任务的调度和监控
    • 与 Cluster Manager 通信
  3. Cluster Manager(集群管理器):

    • 负责资源的分配和管理
    • 支持多种类型:Standalone、YARN、Mesos
    • 管理整个集群的资源
  4. Worker Node(工作节点):

    • 集群中的物理或虚拟机器
    • 运行 Executor 进程
    • 执行具体的计算任务
  5. Executor(执行器):

    • 运行在 Worker 节点上的进程
    • 执行具体的 Task
    • 管理内存和磁盘存储
    • 与 Driver 通信
  6. Task(任务):

    • 最小的执行单元
    • 在 Executor 中运行
    • 处理数据的一个分区

2.2 核心组件

2.2.1 Driver Program(驱动程序)

Driver Program 是 Spark 应用的核心,它负责整个应用的生命周期管理。具体职责包括:

主要功能:

  • 创建 SparkContext:初始化 Spark 运行环境
  • 解析用户代码:将用户的 Spark 代码转换为执行计划
  • 任务调度:将任务分配给合适的 Executor
  • 监控执行:跟踪任务执行状态和进度
  • 结果收集:收集各个 Executor 的执行结果

工作流程:

  1. 用户提交 Spark 应用
  2. Driver 创建 SparkContext
  3. Driver 将代码转换为 DAG(有向无环图)
  4. Driver 将 DAG 分解为多个 Stage
  5. Driver 为每个 Stage 创建 Task
  6. Driver 将 Task 分配给 Executor
  7. Driver 监控 Task 执行状态
  8. Driver 收集执行结果

2.2.2 Cluster Manager(集群管理器)

Cluster Manager 负责整个集群的资源管理,Spark 支持多种集群管理器:

Standalone(独立模式):

  • Spark 自带的简单集群管理器
  • 适合小规模集群和测试环境
  • 配置简单,易于部署
  • 不支持动态资源分配

YARN(Yet Another Resource Negotiator):

  • Hadoop 生态系统中的资源管理器
  • 支持动态资源分配
  • 与 Hadoop 生态系统集成良好
  • 适合大规模生产环境

Mesos(Apache Mesos):

  • 通用的集群资源管理器
  • 支持多种计算框架
  • 资源利用率高
  • 适合混合工作负载

2.2.3 Executor(执行器)

Executor 是运行在 Worker 节点上的进程,负责执行具体的计算任务:

主要职责:

  • 执行 Task:运行 Driver 分配的具体任务
  • 内存管理:管理分配给该 Executor 的内存
  • 数据缓存:缓存 RDD 分区数据
  • 结果返回:将计算结果返回给 Driver

资源分配:

  • 每个 Executor 可以配置固定的内存和 CPU 核心数
  • 内存分为执行内存和存储内存
  • 支持动态资源分配(在 YARN 模式下)

生命周期:

  1. Cluster Manager 在 Worker 节点上启动 Executor
  2. Executor 向 Driver 注册
  3. Driver 向 Executor 发送 Task
  4. Executor 执行 Task 并返回结果
  5. 应用结束后,Executor 被终止

2.3 Spark 生态系统

Spark 不仅仅是一个计算引擎,而是一个完整的大数据生态系统。让我们来看看 Spark 的完整生态系统:

┌─────────────────────────────────────────────────────────────┐
│                    Spark Core                               │
├─────────────────────────────────────────────────────────────┤
│  Spark SQL  │  Spark Streaming  │  MLlib  │  GraphX        │
├─────────────────────────────────────────────────────────────┤
│  SparkR     │  PySpark         │  Scala API │  Java API     │
└─────────────────────────────────────────────────────────────┘

Spark Core(核心引擎):

  • 提供分布式任务调度、内存管理和容错机制
  • 包含 RDD 抽象和基本的转换、行动操作
  • 是整个 Spark 生态系统的基础

Spark SQL(结构化数据处理):

  • 提供 DataFrame 和 Dataset API
  • 支持 SQL 查询和结构化数据处理
  • 内置优化器(Catalyst Optimizer)
  • 支持多种数据源(JSON、Parquet、JDBC 等)

Spark Streaming(流处理):

  • 提供微批处理(Micro-batch)的流处理能力
  • 支持实时数据流处理
  • 与批处理无缝集成
  • 支持多种数据源(Kafka、Flume、HDFS 等)

MLlib(机器学习):

  • Spark 的机器学习库
  • 提供常用的机器学习算法
  • 支持特征工程、模型训练和预测
  • 与 Spark 的其他组件深度集成

GraphX(图计算):

  • 专门用于图计算的库
  • 提供图数据结构和算法
  • 支持 PageRank、连通组件等图算法
  • 与 Spark Core 无缝集成

编程语言支持:

  • Scala:Spark 的原生语言,性能最佳
  • Java:企业级应用的首选
  • Python:数据科学家的最爱,支持 PySpark
  • R:统计分析和数据挖掘

实际应用场景:

  1. 数据仓库:使用 Spark SQL 构建数据仓库
  2. 实时分析:使用 Spark Streaming 进行实时数据分析
  3. 机器学习:使用 MLlib 构建机器学习模型
  4. 图分析:使用 GraphX 进行社交网络分析
  5. ETL 处理:使用 Spark Core 进行数据清洗和转换

3. RDD (Resilient Distributed Dataset)

3.1 RDD 概念

RDD(Resilient Distributed Dataset,弹性分布式数据集)是 Spark 的核心抽象,代表一个不可变、可分区、可并行计算的数据集合。

RDD 的设计理念: RDD 是 Spark 为了解决传统 MapReduce 框架的局限性而设计的。传统的 MapReduce 框架存在以下问题:

  • 每次计算都需要读写磁盘,I/O 开销大
  • 不支持迭代计算,机器学习算法无法有效运行
  • 容错机制粗糙,节点故障需要重新计算整个作业

RDD 通过以下方式解决了这些问题:

  • 内存计算:将数据尽可能保存在内存中
  • 血缘关系:记录数据转换过程,实现细粒度容错
  • 延迟计算:只有在需要时才执行计算

RDD 的核心特性:

  1. 分布式:数据分布在集群的多个节点上
  2. 容错性:通过血缘关系实现自动容错
  3. 不可变性:一旦创建不可修改,只能通过转换操作创建新的 RDD
  4. 延迟计算:只有在 Action 操作时才真正执行计算

3.2 RDD 特性详解

分区性(Partitioned):

  • RDD 被分割成多个分区,每个分区包含数据的一个子集
  • 分区是 Spark 并行计算的基本单位
  • 每个分区可以独立处理,实现并行计算
  • 分区数量决定了并行度

容错性(Resilient):

  • 通过血缘关系(Lineage)记录数据转换过程
  • 当某个分区丢失时,可以根据血缘关系重新计算
  • 不需要检查点,自动实现容错
  • 支持细粒度的容错恢复

不可变性(Immutable):

  • RDD 一旦创建就不能修改
  • 只能通过转换操作创建新的 RDD
  • 保证了数据的一致性和安全性
  • 支持函数式编程范式

延迟计算(Lazy Evaluation):

  • Transformation 操作不会立即执行
  • 只有在 Action 操作时才触发计算
  • 允许 Spark 进行优化,如操作合并
  • 提高了执行效率

3.3 RDD 操作

RDD 支持两种类型的操作:Transformation(转换操作)和 Action(行动操作)。理解这两种操作的区别对于掌握 Spark 非常重要。

3.3.1 Transformation (转换操作)

Transformation 操作是惰性的,它们不会立即执行,而是创建一个新的 RDD。只有当遇到 Action 操作时,才会真正执行计算。

常用 Transformation 操作详解:

// 1. 创建 RDD
val rdd = sc.parallelize(1 to 100)
// 说明:parallelize 将本地集合转换为 RDD
// 参数:1 to 100 表示创建包含 1 到 100 的 RDD

// 2. map 操作 - 一对一转换
val doubled = rdd.map(_ * 2)
// 说明:map 对 RDD 中的每个元素应用函数
// 结果:每个数字都乘以 2
// 特点:输入和输出元素数量相同

// 3. filter 操作 - 过滤数据
val filtered = rdd.filter(_ > 50)
// 说明:filter 根据条件过滤元素
// 结果:只保留大于 50 的数字
// 特点:输出元素数量可能少于输入

// 4. flatMap 操作 - 一对多转换
val words = rdd.flatMap(_.toString.split(""))
// 说明:flatMap 对每个元素应用函数,然后展平结果
// 结果:将每个数字转换为字符串,然后分割
// 特点:输入 1 个元素可能输出多个元素

// 5. groupBy 操作 - 分组
val grouped = rdd.groupBy(_ % 2)
// 说明:根据键对元素进行分组
// 结果:按奇偶数分组
// 特点:返回 (键, 值列表) 的 RDD

// 6. distinct 操作 - 去重
val unique = rdd.distinct()
// 说明:去除重复元素
// 特点:需要 shuffle 操作

// 7. union 操作 - 合并
val rdd1 = sc.parallelize(1 to 50)
val rdd2 = sc.parallelize(51 to 100)
val combined = rdd1.union(rdd2)
// 说明:合并两个 RDD
// 特点:不检查重复,只是简单合并

Transformation 操作的特点:

  • 惰性执行:不会立即计算,只是记录操作
  • 返回新 RDD:每次转换都创建新的 RDD
  • 可链式调用:可以连续调用多个转换操作
  • 支持优化:Spark 可以优化整个转换链

3.3.2 Action (行动操作)

Action 操作会触发实际的计算,返回结果到 Driver 程序或保存到外部存储系统。

常用 Action 操作详解:

// 1. collect 操作 - 收集所有数据
val result = rdd.collect()
// 说明:将 RDD 中的所有数据收集到 Driver 节点
// 注意:数据量大时可能导致内存溢出
// 适用:小数据集或测试环境

// 2. count 操作 - 计数
val count = rdd.count()
// 说明:返回 RDD 中元素的总数
// 特点:不需要收集数据到 Driver
// 适用:统计数据集大小

// 3. reduce 操作 - 归约
val sum = rdd.reduce(_ + _)
// 说明:使用指定的函数归约 RDD 中的元素
// 结果:计算所有元素的总和
// 特点:需要可交换和可结合的函数

// 4. take 操作 - 取前 N 个元素
val first10 = rdd.take(10)
// 说明:返回 RDD 中的前 N 个元素
// 特点:不需要收集所有数据
// 适用:查看数据样本

// 5. first 操作 - 取第一个元素
val first = rdd.first()
// 说明:返回 RDD 中的第一个元素
// 特点:只计算第一个分区
// 适用:快速查看数据

// 6. saveAsTextFile 操作 - 保存为文本文件
rdd.saveAsTextFile("hdfs://path/to/output")
// 说明:将 RDD 保存为文本文件
// 特点:每个分区保存为一个文件
// 适用:持久化计算结果

// 7. foreach 操作 - 遍历执行
rdd.foreach(println)
// 说明:对每个元素执行函数
// 特点:在 Executor 上执行
// 适用:打印或发送数据

Action 操作的特点:

  • 触发计算:会触发整个 DAG 的执行
  • 返回结果:返回具体值或保存到存储系统
  • 不可逆:一旦执行就无法撤销
  • 性能影响:选择合适的 Action 操作很重要

3.4 血缘关系 (Lineage)

血缘关系是 Spark 容错机制的核心,它记录了 RDD 的创建过程。当某个分区丢失时,Spark 可以根据血缘关系重新计算该分区。

血缘关系的工作原理:

// 创建血缘关系链
val rdd1 = sc.parallelize(1 to 100)
val rdd2 = rdd1.map(_ * 2)
val rdd3 = rdd2.filter(_ > 100)
// rdd3 的血缘关系: rdd1 -> rdd2 -> rdd3

// 查看血缘关系
println(rdd3.toDebugString)
// 输出:
// (2) MapPartitionsRDD[2] at filter at <console>:1 []
//  |  MapPartitionsRDD[1] at map at <console>:1 []
//  |  ParallelCollectionRDD[0] at parallelize at <console>:1 []

血缘关系的类型:

  1. 窄依赖(Narrow Dependency):

    • 父 RDD 的每个分区最多被一个子 RDD 分区使用
    • 不需要 shuffle 操作
    • 容错恢复成本低
    • 例如:map、filter、union 等操作
  2. 宽依赖(Wide Dependency):

    • 父 RDD 的分区被多个子 RDD 分区使用
    • 需要 shuffle 操作
    • 容错恢复成本高
    • 例如:groupByKey、reduceByKey 等操作

容错恢复过程:

// 示例:数据倾斜处理中的血缘关系
val originalRdd = sc.parallelize(1 to 1000)

// 第一步:加盐
val saltedRdd = originalRdd.map(x => (x % 10, x))
// 血缘关系:originalRdd -> saltedRdd

// 第二步:第一次聚合
val firstAgg = saltedRdd.reduceByKey(_ + _)
// 血缘关系:originalRdd -> saltedRdd -> firstAgg

// 第三步:去盐
val finalResult = firstAgg.map(x => (x._1, x._2))
// 血缘关系:originalRdd -> saltedRdd -> firstAgg -> finalResult

// 如果 finalResult 的某个分区丢失
// Spark 会从 originalRdd 开始重新计算整个链条

血缘关系的优势:

  1. 自动容错:无需手动处理节点故障
  2. 细粒度恢复:只重新计算丢失的分区
  3. 无检查点开销:不需要额外的存储空间
  4. 支持复杂计算:可以处理任意复杂的转换链

血缘关系的限制:

  1. 长链条问题:血缘关系过长时,恢复成本高
  2. 内存压力:需要保存完整的转换历史
  3. 网络开销:宽依赖需要 shuffle 操作

最佳实践:

// 1. 合理使用缓存
val expensiveRdd = rdd.map(expensiveFunction)
expensiveRdd.cache() // 缓存中间结果

// 2. 避免过长的血缘关系
val result1 = rdd.map(f1).map(f2).map(f3)
// 可以合并为:
val result2 = rdd.map(x => f3(f2(f1(x))))

// 3. 使用检查点
rdd.checkpoint() // 切断血缘关系,保存到存储系统

4. Spark 安装和配置

4.1 环境要求

  • Java 8 或更高版本
  • Scala 2.11 或 2.12
  • Python 2.7+ 或 3.4+ (可选)
  • 内存: 至少 2GB
  • 磁盘: 至少 10GB 可用空间

4.2 下载和安装

# 下载 Spark
wget https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz

# 解压
tar -xzf spark-3.4.0-bin-hadoop3.tgz

# 移动到目标目录
sudo mv spark-3.4.0-bin-hadoop3 /opt/spark

# 设置环境变量
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

4.3 配置文件

4.3.1 spark-defaults.conf

# 应用名称
spark.app.name=MySparkApp

# 主节点地址
spark.master=local[*]

# 执行器内存
spark.executor.memory=2g

# 驱动内存
spark.driver.memory=1g

# 序列化器
spark.serializer=org.apache.spark.serializer.KryoSerializer

4.3.2 spark-env.sh

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk
export SPARK_MASTER_HOST=localhost
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=4g

5. Spark 核心 API

5.1 SparkContext

SparkContext 是 Spark 应用的入口点。

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setAppName("MySparkApp")
  .setMaster("local[*]")

val sc = new SparkContext(conf)

5.2 DataFrame API

DataFrame 是基于 RDD 的高级抽象,提供类似 SQL 的接口。

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("DataFrameExample")
  .master("local[*]")
  .getOrCreate()

// 创建 DataFrame
val df = spark.read.json("path/to/data.json")

// 显示数据
df.show()

// 选择列
df.select("name", "age").show()

// 过滤数据
df.filter(df("age") > 18).show()

// 分组聚合
df.groupBy("department").count().show()

5.3 Dataset API

Dataset 是类型安全的 DataFrame,结合了 RDD 和 DataFrame 的优点。

case class Person(name: String, age: Int, department: String)

val ds = spark.read.json("path/to/data.json").as[Person]

// 类型安全的操作
val adults = ds.filter(_.age > 18)
val names = ds.map(_.name)

6. Spark SQL

6.1 基本概念

Spark SQL 是 Spark 中处理结构化数据的模块,提供了 DataFrame 和 Dataset API。

6.2 数据源

// 读取 JSON
val df = spark.read.json("data.json")

// 读取 CSV
val df = spark.read.option("header", "true").csv("data.csv")

// 读取 Parquet
val df = spark.read.parquet("data.parquet")

// 读取数据库
val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/db")
  .option("dbtable", "table")
  .option("user", "username")
  .option("password", "password")
  .load()

6.3 SQL 查询

// 注册临时视图
df.createOrReplaceTempView("people")

// 执行 SQL 查询
val result = spark.sql("""
  SELECT department, AVG(age) as avg_age
  FROM people
  WHERE age > 18
  GROUP BY department
  ORDER BY avg_age DESC
""")

7. Spark Streaming

7.1 流处理概念

Spark Streaming 提供高吞吐量、容错的流处理能力。

7.2 基本使用

import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(1))

// 创建 DStream
val lines = ssc.socketTextStream("localhost", 9999)

// 处理数据
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

// 输出结果
wordCounts.print()

// 启动流处理
ssc.start()
ssc.awaitTermination()

7.3 窗口操作

// 窗口大小为 30 秒,滑动间隔为 10 秒
val windowedCounts = wordCounts.window(Seconds(30), Seconds(10))

8. MLlib (机器学习)

8.1 机器学习管道

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline

// 特征向量化
val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

// 逻辑回归
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)

// 构建管道
val pipeline = new Pipeline()
  .setStages(Array(assembler, lr))

// 训练模型
val model = pipeline.fit(trainingData)

// 预测
val predictions = model.transform(testData)

8.2 常用算法

  • 分类: 逻辑回归、决策树、随机森林
  • 回归: 线性回归、决策树回归
  • 聚类: K-means、高斯混合模型
  • 推荐: 协同过滤、ALS

9. GraphX (图计算)

9.1 图数据结构

import org.apache.spark.graphx._

// 创建顶点 RDD
val vertices = sc.parallelize(Array(
  (1L, "Alice"),
  (2L, "Bob"),
  (3L, "Charlie")
))

// 创建边 RDD
val edges = sc.parallelize(Array(
  Edge(1L, 2L, "friend"),
  Edge(2L, 3L, "colleague")
))

// 创建图
val graph = Graph(vertices, edges)

9.2 图算法

// PageRank 算法
val ranks = graph.pageRank(0.0001).vertices

// 连通组件
val cc = graph.connectedComponents().vertices

// 三角形计数
val triangles = graph.triangleCount().vertices

10. 性能优化

10.1 内存优化

// 缓存策略
rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.persist(StorageLevel.MEMORY_AND_DISK)

// 序列化优化
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

10.2 并行度优化

// 设置并行度
val rdd = sc.parallelize(data, 200)

// 重新分区
val repartitioned = rdd.repartition(100)

// 合并小分区
val coalesced = rdd.coalesce(50)

10.3 数据倾斜处理

// 加盐技术
val saltedRdd = rdd.map(x => (x._1 + "_" + Random.nextInt(100), x._2))

// 两阶段聚合
val firstStage = rdd.map(x => (x._1 + "_" + Random.nextInt(100), x._2))
  .reduceByKey(_ + _)
val secondStage = firstStage.map(x => (x._1.split("_")(0), x._2))
  .reduceByKey(_ + _)

11. 监控和调试

11.1 Spark UI

Spark 提供 Web UI 监控应用执行情况:

  • 应用概览
  • 作业详情
  • 存储信息
  • 环境配置

访问地址:http://driver-node:4040

11.2 日志配置

# log4j.properties
log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

11.3 性能调优

// 启用动态分配
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "10")

// 启用推测执行
spark.conf.set("spark.speculation", "true")

12. 部署模式

12.1 Local 模式

# 本地单线程
spark-submit --master local app.jar

# 本地多线程
spark-submit --master local[4] app.jar

# 本地所有核心
spark-submit --master local[*] app.jar

12.2 Standalone 模式

# 启动 Master
./sbin/start-master.sh

# 启动 Worker
./sbin/start-worker.sh spark://master-host:7077

# 提交应用
spark-submit --master spark://master-host:7077 app.jar

12.3 YARN 模式

# YARN Client 模式
spark-submit --master yarn --deploy-mode client app.jar

# YARN Cluster 模式
spark-submit --master yarn --deploy-mode cluster app.jar

13. 常见问题和解决方案

13.1 内存不足

问题: java.lang.OutOfMemoryError

解决方案:

# 增加执行器内存
spark-submit --executor-memory 4g app.jar

# 增加驱动内存
spark-submit --driver-memory 2g app.jar

13.2 数据倾斜

问题: 某些任务执行时间过长

解决方案:

  • 使用加盐技术
  • 两阶段聚合
  • 自定义分区器

13.3 小文件问题

问题: 产生大量小文件

解决方案:

// 合并小文件
df.coalesce(1).write.parquet("output")

// 使用 repartition
df.repartition(10).write.parquet("output")

14. 最佳实践

14.1 代码优化

  1. 避免使用 collect(): 将数据收集到驱动节点
  2. 合理使用缓存: 避免重复计算
  3. 选择合适的分区数: 通常是集群核心数的 2-3 倍
  4. 使用广播变量: 减少数据传输

14.2 资源管理

  1. 合理分配内存: 执行器内存 = 总内存 - 系统内存 - 缓存内存
  2. 设置合适的并行度: 避免过多或过少的任务
  3. 监控资源使用: 定期检查 CPU 和内存使用情况

14.3 数据管理

  1. 选择合适的数据格式: Parquet 通常比 JSON 更高效
  2. 合理分区: 避免数据倾斜
  3. 定期清理: 删除不需要的缓存和临时文件

15. 面试题

15.1 基础概念

Q: Spark 和 Hadoop MapReduce 的区别?

A:

  • Spark 基于内存计算,MapReduce 基于磁盘计算
  • Spark 提供更丰富的 API 和更快的执行速度
  • Spark 支持流处理、机器学习等更多场景

Q: 什么是 RDD?

A: RDD (Resilient Distributed Dataset) 是 Spark 的核心抽象,代表一个不可变、可分区、可并行计算的数据集合。

15.2 技术细节

Q: Spark 如何实现容错?

A: 通过 RDD 的血缘关系 (Lineage),当某个分区丢失时,可以根据血缘关系重新计算该分区。

Q: 什么是宽依赖和窄依赖?

A:

  • 窄依赖:父 RDD 的每个分区最多被一个子 RDD 分区使用
  • 宽依赖:父 RDD 的分区被多个子 RDD 分区使用

15.3 性能优化

Q: 如何解决数据倾斜问题?

A:

  1. 使用加盐技术
  2. 两阶段聚合
  3. 自定义分区器
  4. 调整并行度

Q: Spark 调优的关键参数有哪些?

A:

  • spark.executor.memory: 执行器内存
  • spark.executor.cores: 执行器核心数
  • spark.sql.shuffle.partitions: 分区数
  • spark.serializer: 序列化器

16. 实际应用案例

16.1 电商数据分析案例

场景描述:某电商平台需要分析用户行为数据,包括用户浏览、购买、评价等行为。

技术方案:

// 1. 数据清洗和预处理
val userBehaviorRdd = spark.read.json("hdfs://user-behavior/*.json")
  .filter($"timestamp".isNotNull)
  .filter($"userId".isNotNull)

// 2. 用户行为分析
val userStats = userBehaviorRdd
  .groupBy($"userId")
  .agg(
    count("*").as("totalActions"),
    countDistinct($"productId").as("uniqueProducts"),
    sum(when($"action" === "purchase", 1).otherwise(0)).as("purchases")
  )

// 3. 热门商品分析
val popularProducts = userBehaviorRdd
  .filter($"action" === "view")
  .groupBy($"productId")
  .count()
  .orderBy(desc("count"))
  .limit(100)

// 4. 用户画像构建
val userProfile = userBehaviorRdd
  .groupBy($"userId")
  .agg(
    collect_list($"category").as("interests"),
    avg($"sessionDuration").as("avgSessionTime"),
    count("*").as("activityLevel")
  )

16.2 实时日志分析案例

场景描述:分析 Web 服务器日志,实时监控网站访问情况。

技术方案:

// 1. 创建 StreamingContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

// 2. 读取日志流
val logStream = ssc.socketTextStream("localhost", 9999)

// 3. 解析日志
val parsedLogs = logStream.map(line => {
  val parts = line.split(" ")
  LogEntry(parts(0), parts(1), parts(2), parts(3))
})

// 4. 实时统计
val pageViews = parsedLogs
  .map(_.page)
  .countByValue()

val errorCount = parsedLogs
  .filter(_.statusCode.startsWith("4") || _.statusCode.startsWith("5"))
  .count()

// 5. 输出结果
pageViews.print()
errorCount.print()

16.3 机器学习推荐系统案例

场景描述:构建基于协同过滤的推荐系统。

技术方案:

// 1. 数据准备
val ratings = spark.read.option("header", "true")
  .csv("ratings.csv")
  .select($"userId".cast("int"), $"movieId".cast("int"), $"rating".cast("double"))

// 2. 特征工程
val assembler = new VectorAssembler()
  .setInputCols(Array("userId", "movieId"))
  .setOutputCol("features")

// 3. 模型训练
val als = new ALS()
  .setMaxIter(10)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

val pipeline = new Pipeline()
  .setStages(Array(assembler, als))

val model = pipeline.fit(ratings)

// 4. 推荐生成
val recommendations = model.transform(ratings)
  .select($"userId", $"movieId", $"prediction")
  .orderBy($"userId", desc("prediction"))

16.4 金融风控案例

场景描述:实时检测信用卡欺诈交易。

技术方案:

// 1. 数据预处理
val transactions = spark.read.parquet("transactions.parquet")
  .withColumn("hour", hour($"timestamp"))
  .withColumn("dayOfWeek", dayofweek($"timestamp"))

// 2. 特征工程
val features = transactions
  .groupBy($"cardId")
  .agg(
    count("*").as("transactionCount"),
    sum($"amount").as("totalAmount"),
    avg($"amount").as("avgAmount"),
    max($"amount").as("maxAmount")
  )

// 3. 异常检测
val anomalies = features
  .filter($"transactionCount" > 100)
  .filter($"maxAmount" > 10000)
  .filter($"avgAmount" > 1000)

// 4. 实时监控
val streamingTransactions = ssc.socketTextStream("localhost", 9999)
  .map(parseTransaction)
  .filter(_.amount > 5000)
  .foreachRDD(rdd => {
    // 发送告警
    rdd.foreach(transaction => {
      sendAlert(s"High value transaction: ${transaction}")
    })
  })

17. 性能调优实战

17.1 内存优化实战

问题场景:处理 100GB 的日志文件时出现内存溢出。

解决方案:

// 1. 调整内存配置
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memoryFraction", "0.8")
spark.conf.set("spark.storage.memoryFraction", "0.6")

// 2. 使用合适的存储级别
val rdd = sc.textFile("large-file.txt")
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

// 3. 分批处理
val partitions = rdd.coalesce(200) // 增加分区数
val processed = partitions.mapPartitions(iter => {
  // 处理每个分区
  iter.map(processRecord)
})

17.2 数据倾斜处理实战

问题场景:某个 key 的数据量特别大,导致任务执行时间过长。

解决方案:

// 1. 检测数据倾斜
val keyCounts = rdd.map(_._1).countByKey()
val maxCount = keyCounts.values.max
val avgCount = keyCounts.values.sum / keyCounts.size

if (maxCount > avgCount * 3) {
  println("数据倾斜 detected!")
}

// 2. 加盐处理
val saltedRdd = rdd.map(x => {
  val salt = Random.nextInt(100)
  (s"${x._1}_$salt", x._2)
})

// 3. 两阶段聚合
val firstStage = saltedRdd.reduceByKey(_ + _)
val secondStage = firstStage.map(x => {
  val originalKey = x._1.split("_")(0)
  (originalKey, x._2)
}).reduceByKey(_ + _)

17.3 小文件优化实战

问题场景:产生大量小文件,影响 HDFS 性能。

解决方案:

// 1. 合并小文件
val df = spark.read.parquet("input/*.parquet")
df.coalesce(10).write.parquet("output")

// 2. 使用 repartition
val repartitioned = df.repartition(10, $"date")
repartitioned.write.parquet("output")

// 3. 动态分区
df.write
  .partitionBy("year", "month", "day")
  .parquet("output")

18. 监控和运维

18.1 性能监控

关键指标:

  • 执行时间:作业和任务的执行时间
  • 内存使用:Driver 和 Executor 的内存使用情况
  • CPU 使用率:集群的 CPU 利用率
  • 网络 I/O:Shuffle 操作的网络流量
  • 磁盘 I/O:读写操作的磁盘使用情况

监控工具:

// 1. Spark UI 监控
// 访问 http://driver-node:4040

// 2. 自定义监控
val metrics = spark.sparkContext.statusTracker
val executorInfos = metrics.getExecutorInfos
executorInfos.foreach(executor => {
  println(s"Executor ${executor.executorId}: " +
    s"Memory ${executor.memoryUsed}/${executor.memoryTotal}")
})

18.2 故障排查

常见问题及解决方案:

  1. 内存溢出:

    • 增加 Executor 内存
    • 使用 MEMORY_AND_DISK 存储级别
    • 减少并行度
  2. 任务执行缓慢:

    • 检查数据倾斜
    • 优化 Shuffle 操作
    • 调整分区数
  3. 网络超时:

    • 增加网络超时时间
    • 优化 Shuffle 配置
    • 使用本地存储

19. 总结

Apache Spark 是一个强大的大数据处理引擎,具有以下优势:

  1. 高性能: 基于内存计算,比传统 MapReduce 快 100 倍
  2. 易用性: 提供丰富的 API 和多种编程语言支持
  3. 通用性: 支持批处理、流处理、机器学习、图计算等多种场景
  4. 容错性: 基于 RDD 的容错机制
  5. 生态丰富: 与 Hadoop 生态系统完美集成

学习建议:

  1. 从基础开始:先掌握 RDD 的基本概念和操作
  2. 实践为主:通过实际项目加深理解
  3. 性能调优:学会分析和解决性能问题
  4. 持续学习:关注 Spark 的最新发展和最佳实践

通过合理使用 Spark 的各种特性和优化技术,可以构建高效的大数据处理应用。在实际应用中,需要根据具体场景选择合适的 API 和优化策略,以达到最佳的性能和效果。

最近更新:: 2025/10/11 10:54
Contributors: Duke
Prev
Hive 数据仓库
Next
Flink 数据处理