spark streaming 基础与调优

spark-streaming的状态更新

updateStateByKey、reduceByKeyAndWindow
必须启用检查点功能,因为有状态的操作是从程序开始时一直进行的

DStream操作

  • 与RDD相同的操作方法
  • transform:用于直接操作DStream内部的RDD,那些DStream没有提供的RDD操作,可以通过transform调用实现。如join:
    1
    2
    3
    val joinedDStream = d.transform(rdd => {
    rdd.join(rdd2)
    })

窗口操作

  • 窗口长度

    窗口跨越的周期次数,每次窗口处理的数据分片数

  • 滑动区间

    从当前窗口到下一个窗口间隔的周期数量,每次划过的窗口数
    如果这两个参数的值都是1,则跟不使用窗口效果一样。每个窗口的周期数据都会合并成一个RDD保存在DStream中。

  • 窗口函数
    用windowLength、slideInterval生成一个带窗口的DStream,也可以使用加窗口参数的方法:countByWindow、reduceByWindow、reduceByKeyAndWindow、countByValueAndWindow.

高级操作

  1. 缓存与持久化
    窗口函数和updateStateByKey会默认自动持久化
    从网络接收数据的输入DStream,默认持久化级别是复制数据到两个节点上,以确保容错能力。

  2. 打包、发布和监控

  • 需要提供spark之外的其他所有第三方jar包。
  • 最好用hdfs或s3的文件系统目录来配置检查点
  • 配置driver程序自动重启

    standalone模式提交用–deploy-mode cluster –supervise,YARN下选择yarn-cluster,Mesos下通过Marathon协助实现。

  • 从1.2开始,支持Write Ahead Logs(WAL),开启之后,所有收到的数据在处理前会写到检查点目录下,可确保driver重启期间数据不丢失。设置spark.streaming.receiver.writeAheadLog.enable为true开启,会降低数据接收的吞吐量,不过可以通过采用并发接收的方式降低影响。如果开启了WAL,可以关闭接受数据时的复制机制:设置输入DStream的存储级别为StorageLevel.MEMORY_AND_DISK_SER
  1. 更新程序代码
    为了不丢失数据,可采用一下两种方法:
  • 新旧程序同时运行。 等新的程序运行之后,在停止旧的程序。这种方式要求数据源支持同时向新、旧两个版本的程序发数据
  • 先停止旧的程序再启动新的程序。
    要保证旧的程序是被优雅的关闭,确保关闭前接收到的数据都处理完了。这种方式只适用于支持缓存数据功能的数据源,如Kafka、Flume。启动新的程序时在读取前一个程序的检查点信息时可能出错,因为检查点中一些对象是序列化存储,新的对象结构可能已经改变了。解决办法是:用新的检查点目录或者删除旧的检查点目录下的所有内容

spark streaming调优方向

  1. 每个批次的处理时间尽可能短

  2. 收到数据后尽可能快的处理

优化方法

减少批处理的时间

  1. 增加数据接收的并发数量,尤其当瓶颈发生在数据接收的时候。

    默认每个InputDStream只会创建一个接收器,我们可以创建多个让它们接收不同的数据分区,以实现并行接收。

比如1个接收2个topic的Kafka InputDStream可以优化为2个InputDStream,各接收一个topic,然后在合并。

1
2
3
4
5
6
val numStreams = 5
val kafkaStreams = (1 to numStreams).map{
i => KafkaUtils.createStream(...)
}
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
  1. 数据处理的并发度

    调整选项spark.default.parallelism

  2. task启动的额外开销

    如果task启动过于频繁(比如每秒50次),额外的开销可能非常高,甚至无法达到实时计算要求。

设置合理批次间隔时间

一般来说短时间间隔会导致更多的额外开销,以及无法完成的风险,所以前期采取相对保守的方法,将间隔设置为5~10秒。然后通过观察运行数据确保系统足够实时,每个间隔的实际计算时间远小于间隔时间,再逐渐缩短间隔时间。