SparkStreaming 是流式处理框架,是 Spark API 的扩展,支持可扩展、高吞吐、容错的实时数据流处理。
实时数据的来源可以是:Kafka,Flume,Twitter,ZeroMQ 或者 TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。
SparkStreaming 与 Storm 的区别
1、Storm 是纯实时的流式处理框架,SparkStreaming 是准实时的处理框架(微批处理)。因为微批处理,SparkStreaming 的吞吐量比 Storm 要高。
2、Storm 的事务机制比 SparkStreaming 的要完善。
3、Storm 支持动态资源调度。(spark1.2 开始和之后也支持)
4、SparkStreaming 擅长复杂的业务处理,Storm 不擅长复杂的业务处理,擅长简单的汇总型计算。
SparkStreaming 初始
1、SparkStreaming 初始理解

注意:
-
receiver task 是 7*24 小时一直在执行,一直接受数据,将一段时间内接收来的数据保存到 batch 中。假设 batchInterval 为 5s,那么会将接收来的数据每隔 5 秒封装到一个 batch 中,batch 没有分布式计算特性,这一个 batch 的数据又被封装到一个 RDD 中,RDD 最终封装到一个 DStream 中。
例如:假设 batchInterval 为 5 秒,每隔 5 秒通过 SparkStreamin 将得到一个 DStream,在第 6 秒的时候计算这 5 秒的数据,假设执行任务的时间是 3 秒,那么第 6~9 秒一边在接收数据,一边在计算任务,9~10 秒只是在接收数据。然后在第 11 秒的时候重复上面的操作。
-
如果 job 执行的时间大于 batchInterval 会有什么样的问题?
如果接受过来的数据设置的级别是仅内存,接收来的数据会越堆积越多,最后可能会导致 OOM(如果设置 StorageLevel 包含 disk, 则内存存放不下的数据会溢写至 disk, 加大延迟 )。
2、SparkStreaming 代码
注意事项:
- 启动 socket server 服务器:nc –lk 9999
- eceiver 模式下接受数据,local 的模拟线程必须大于等于 2,一个线程用来 receiver 用来接受数据,另一个线程用来执行 job。
- Durations 时间设置就是我们能接收的延迟度。这个需要根据集群的资源情况以及任务的执行情况来调节。
- 创建 JavaStreamingContext 有两种方式(SparkConf,SparkContext)
- 所有的代码逻辑完成后要有一个 output operation 类算子。
- JavaStreamingContext.start() Streaming 框架启动后不能再次添加业务逻辑。
- JavaStreamingContext.stop() 无参的 stop 方法将 SparkContext一同关闭,stop(false),不会关闭 SparkContext。
- JavaStreamingContext.stop()停止之后不能再调用 start。
public class SparkStreamingTest { public static void main(String[] args){
SparkConf conf = new SparkConf(); conf.setMaster("local[2]").setAppName("sparkStreaming");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(5)); JavaReceiverInputDStream<String> dStream = streamingContext.socketTextStream("sean01", 8888);
JavaDStream<String> wordDStream = dStream.flatMap( new FlatMapFunction<String, String>() { private static final long serialVersionUID = 5302655187358849615L;
@Override public Iterable<String> call(String s) throws Exception { String[] split = s.split(" "); return Arrays.asList(split); } });
JavaPairDStream<String, Integer> pairDStream = wordDStream.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1285374334768880064L;
@Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<>(word, 1); } });
JavaPairDStream<String, Integer> resultDStream = pairDStream.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 5889114600370649292L;
@Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); resultDStream.print(); streamingContext.start(); streamingContext.awaitTermination(); streamingContext.stop(); } }
|
SparkStreaming 算子操作
关于几种算子的详细API,请见Github:SparkStreamingAPI
1、foreachRDD、print
output operator 类算子,必须对抽取出来的 RDD 执行 action 类算子,代码才能执行。
2、transform
- transformation 类算子
- 可以通过 transform 算子,对Dstream做RDD到RDD的任意操作。
3、updateStateByKey
-
transformation 算子
-
updateStateByKey 作用:
1)为 SparkStreaming 中每一个 Key 维护一份 state 状态,state 类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
2)通过更新函数对该 key 的状态不断更新,对于每个新的 batch 而言,SparkStreaming 会在使用 updateStateByKey 的时候为已经存在的 key 进行 state 的状态更新。
-
使用到 updateStateByKey 要开启 checkpoint 机制和功能。
-
多久会将内存中的数据写入到磁盘一份?
1)如果 batchInterval 设置的时间小于10秒,那么10秒写入磁盘一份。
2)如果 batchInterval 设置的时间大于 10 秒,那么就会 batchInterval 时间间隔写入磁盘一份。
4、窗口操作

假设每隔 5s 1 个 batch,上图中窗口长度为 15s,窗口滑动间隔 10s。

- 优化后的 window 操作要保存状态所以要设置 checkpoint 路径,没有优化的 window 操作可以不设置 checkpoint 路径。
Driver HA(Standalone或Mesos)
因为 SparkStreaming 是 7*24 小时运行,Driver 只是一个简单的进程,有可能挂掉,所以实现 Driver 的 HA 就有必要(如果使用的 Client 模式就无法实现 Driver HA ,这里针对的是 cluster 模式)。
Yarn 平台 cluster 模式提交任务,AM(AplicationMaster)相当于 Driver,如果挂掉会自动启动 AM。这里所说的 DriverHA 针对的是 Spark standalone 和 Mesos 资源调度的情况下。
实现 Driver 的高可用有两个步骤:
第一:提交任务层面,在提交任务的时候加上选项 --supervise,当 Driver挂掉的时候会自动重启 Driver。
第二:代码层面,使用 JavaStreamingContext.getOrCreate(checkpoint 路径,JavaStreamingContextFactory)
Driver 中元数据包括:
1、创建应用程序的配置信息。
2、DStream 的操作逻辑。
3、job 中没有完成的批次数据,也就是 job 的执行进度。
SparkStreaming+Kafka
Receiver 模式
receiver模式原理图

receiver模式理解
在 SparkSteaming 程序运行起来后,Executor 中会有 receiver tasks 接收 kafka 推送过来的数据。数据会被持久化,默认级别为 MEMORY_AND_DISK_SER_2,这个级别也可以修改。receiver task 对接收过来的数据进行存储和备份,这个过程会有节点之间的传输。备份完成后去 Zookeeper 中更新消费偏移量,然后向 Driver 中的 receiver tracker 汇报数据的位置。最后 Driver 根据数据本地化将 task 分发到不同节点上执行。
receiver模式中存在的问题:
当 Driver 进程挂掉后,Driver 下的 Executor 都会被杀掉,当更新完 zookeeper 消费偏移量的时候,Driver 如果挂掉了,就会存在找不到数据的问题,相当于丢失数据。
如何解决这个问题?
开启WAL(write ahead log)预写日志机制,在接受过来数据备份到其他节点的时候,同时备份到 HDFS 上一份(我们需要将接收来的数据的持久化级别降级到 MEMORY_AND_DISK),这样就能保证数据的安全性。
不过,因为写 HDFS 比较消耗性能,要在备份完数据之后才能进行更新 zookeeper 以及汇报位置等,这样会增加 job 的执行时间,这样对于任务的执行提高了延迟度。
Receiver可能会造成重复消费
合理假设一个场景,假如当前zookeeper中记录的偏移量是50,本次接收的数据为51~100,当数据备份之后,同时也放到 HDFS 了,此时准备去zookeeper中更新偏移量时,服务器挂掉了,这时zookeeper中的偏移量没有更新还是50。重启之后会去HDFS中检查数据,发现51~100的数据未计算(一般计算的话需要更新完偏移量才计算),这时开始计算这部分数据。紧接着Kafka就会那这zookeeper中的50继续往下读,这样一来就造成了重复消费。这就是Receiver模式只能保证至少消费一次(at-least),但不能保证有且只会消费一次(exactly-once)。
receiver 的并行度设置
receiver 的并行度是由 spark.streaming.blockInterval 来决定的,默认为200ms。
假设 batchInterval 为 5s,那么每隔 blockInterval 就会产生一个 block,这里就对应每批次产生 RDD 的 partition,这样 5 秒产生的这个 Dstream 中的这个 RDD 的 partition 为 25 个,并行度就是25。
如果想提高并行度可以减少 blockInterval 的数值,但是最好不要低于 50ms。
receiver 模式代码
先准备一个MyProducer类用于产生数据:
public class MyProducer extends Thread { private String topic; private Producer<Integer, String> producerForKafka;
public MyProducer(String topic) { this.topic = topic; Properties conf = new Properties(); conf.put("metadata.broker.list", "sean01:9092,sean02:9092,sean03:9092"); conf.put("serializer.class", StringEncoder.class.getName()); conf.put("acks",1); producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf)); }
@Override public void run() { int counter = 0; while (true) { counter++; String value = "seanxia"; KeyedMessage<Integer, String> message = new KeyedMessage<>(topic, value); producerForKafka.send(message); System.out.println(value + " : " + counter + " --------------");
if (0 == counter % 2) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { new MyProducer("sk1").start(); new MyProducer("sk2").start(); } }
|
执行以下代码:
public class SparkStreamingOnKafkaReceiver { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]"). setAppName("SparkStreamingOnKafkaReceiver"); conf.set("spark.streaming.receiver.writeAheadLog.enable", "true"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10)); jsc.checkpoint("./checkpoint");
Map<String, Integer> topicConsumerConcurrency = new HashMap<String, Integer>();
topicConsumerConcurrency.put("sk1", 1); topicConsumerConcurrency.put("sk2", 1);
JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream( jsc, "sean01:2181,sean02:2181,sean03:2181", "MyFirstConsumerGroup", topicConsumerConcurrency);
JavaDStream<String> words = lines.flatMap( new FlatMapFunction<Tuple2<String, String>, String>() { private static final long serialVersionUID = 1L;
public Iterable<String> call(Tuple2<String, String> tuple) throws Exception { return Arrays.asList(tuple._2.split("\t")); } });
JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } });
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordsCount.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
|
注意:
Receiver模式:只能保证至少被消费一次(at-least),但是不能保证有且只会消费一次(exactly-once)。
direct模式:可以保证exactly-once,能保证任务失败重读数据,但是不能保证任务中的输出数据有且只有一次。
Direct 模式
direct 模式理解
SparkStreaming+kafka 的 Driect 模式就是将 kafka 看成存数据的一方,不是被动接收数据,而是主动去取数据。
消费者偏移量也不是用 zookeeper 来管理,而是 SparkStreaming 内部对消费者偏移量自动来维护。默认消费偏移量是在内存中,当然如果设置了checkpoint 目录,那么消费偏移量也会保存在 checkpoint 中。当然也可以实现用 zookeeper 来管理。
direct 模式并行度设置
Direct 模式的并行度是由读取的 kafka 中 topic 的 partition 数决定的。
direct 模式代码
public class SparkStreamingOnKafkaDirected { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]") .setAppName("SparkStreamingOnKafkaDirected");
conf.set("spark.streaming.stopGracefullyOnShutdown","true"); JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
jsc.checkpoint("./checkpoint"); Map<String, String> kafkaParameters = new HashMap<String, String>(); kafkaParameters.put("metadata.broker.list", "sean01:9092,sean02:9092,sean03:9092");
HashSet<String> topics = new HashSet<String>(); topics.add("sk1"); topics.add("sk2");
JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream( jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics);
JavaDStream<String> words = lines.flatMap( new FlatMapFunction<Tuple2<String,String>, String>() { private static final long serialVersionUID = 1L;
public Iterable<String> call(Tuple2<String,String> tuple) throws Exception { return Arrays.asList(tuple._2.split("\t")); } });
JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } },3);
wordsCount.print(); jsc.start(); jsc.awaitTermination(); jsc.close(); } }
|
Web 监控页面
由于在本地运行,输入:localhost:4040,即可查看 SparkStreaming 在kafka集群中的运行状态

相关配置
预写日志:
用于优化 receiver 模式中,Driver进程挂掉,找不到数据的问题。
spark.streaming.receiver.writeAheadLog.enable #默认false 没有开启
|
blockInterval:
spark.streaming.blockInterval #默认200ms
|
反压机制:
用于解决由于job执行时间大于batchInterval,接收数据内存级别为仅内存时。引起的数据堆积问题
sparkStreaming在1.5版本之后引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力!
spark.streaming.backpressure.enabled 设置为true #默认false
|
数据接收速率:
sparkStreaming在1.5版本之前通过控制接收数据的速率来解决数据堆积问题。
设置静态配置参数:
# Receiver模式 spark.streaming.receiver.maxRate #默认没有设置 # Direct模式 spark.streaming.kafka.maxRatePerPartition #默认没有设置
|
如何优雅的关闭SparkStreaming
spark.streaming.stopGracefullyOnShutdown 设置为true #默认false kill -15 进程号
|