1. spark 1.x 升级到spark 2.x
对于普通的spark来说,变动不大 :
举一个最简单的实例:
spark1.x
public static JavaRDD<String> workJob(JavaRDD<String> spark1Rdd) { JavaPairRDD<String, Integer> testRdd = spark1Rdd .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterable<Tuple2<String, Integer>> call(String str) throws Exception { ArrayList<Tuple2<String, Integer>> list = new ArrayList<>(); return list; } }); return spark1Rdd; }
spark2.x
public static JavaRDD<String> workJob(JavaRDD<String> spark2Rdd) { JavaPairRDD<String, Integer> testRdd2 = spark2Rdd .flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { @Override public Iterator<Tuple2<String, Integer>> call(String str) throws Exception { ArrayList<Tuple2<String, Integer>> list = new ArrayList<>(); return list.iterator(); } }); return spark2Rdd; }
需要说明的是:
上面的返回的rdd就直接用输入的 RDD显然是不合理的! 只是为了用最简洁的方式介绍代码的转换而已!
可以看到 : 区别主要在于1. spark 1.x中的Iterable对象 变成了 spark2.x中的Iterator对象2. 相应的,对于返回值为list的RDD, spark2.x中要返回list.iterator();
还是很简单的吧
问题在于 : 如果你有几个spark程序要运行在不同的环境下,(有的现场用1.x,有的现场用2.x)
你需要同时维护两种不同版本的spark,是不是耗时又耗力呢?
这个时候就需要考虑到 spark版本的兼容性,使你的程序能成功的运行在各种集群环境下
2. spark版本的兼容
写一个简单的工具类如下 :
import java.util.Iterator;public class MyIterator<T> implements Iterator, Iterable { private Iterator myIterable; public MyIterator(Iterable iterable) { myIterable = iterable.iterator(); } @Override public boolean hasNext() { return myIterable.hasNext(); } @Override public Object next() { return myIterable.next(); } @Override public void remove() { myIterable.remove(); } @Override public Iterator iterator() { return myIterable; } }
只需要进行如上设计就可以实现版本的兼容了
那么应该如何应用呢?
JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() { @Override public MyIterator<String> call(String s) throws Exception { String[] split = s.split("\\s+"); MyIterator myIterator = new MyIterator(Arrays.asList(split)); return myIterator; } });
如上