While it is easy to create individual (static) functions that transform one rdd into another, chaining such functions directly is cumbersome to use:
# Hard to read because transformations appear in reverse order of their application
JavaRDD<I> input = ...;
JavaRDD<O> output = fn3.apply(fn2.apply(fn1.apply(input)));
Chaining allows for a more human readable representation:
RddFunction<I, O> compositeTransform = JavaRddFunction.<I>identity()
.andThen(fn1)
.andThen(fn2)
.andThen(fn3);
JavaRDD<O> output = compositeTransform.apply(input);
Unfortunately spark doesn’t have a compose function which would allow more naturally placing the input before the transform:
JavaRDD<Resource> output = input.compose(compositeTransform); # NOT POSSIBLE
Spark itself unfortunately does not intrisically provide this mechanism, but SANSA provides a little framework aimed to easy the creation of such transformation chains.
The interfaces with for chaining are:
- JavaRddFunction
- JavaPairRddFunction
- ToJavaRddFunction
- ToJavaPairRddFunction
Example
The following example shows an operator that groups Resources and merges their models:
JavaRddFunction<Resource, Resource> compositeTransform =
JavaRddFunction.<Resource>identity()
.toPairRdd(JavaRddOfResourcesOps::mapToNamedModels)
.andThen(rdd -> JavaRddOfNamedModelsOps.groupNamedModels(rdd, true, true, 0))
.toRdd(JavaRddOfNamedModelsOps::mapToResources);
JavaRDD input = ...;
JavaRDD<Resource> output = compositeTransform.apply(input);
An further advantage of such transformation functions is that serialization can be tackled inside of the transformer:
public static JavaRddFunction<String, String> myTransform(ParamA inputArgA, ParamB inputArgB) {
return rdd -> {
# Make the arguments serializable; either using lambda serialization or broadcasts
String argAStr = unparse(inputArgA);
Broadcast<ParamB> bc = JavaSparkContext.fromSparkContext(rdd.context()).broadcast(inputArgB);
rdd.mapPartitions(it -> {
ParamA argA = parse(arg);
ParamB argB = bc.value();
return Streams.stream(it).map(createTransformer(argA, argB)).iterator();
});
}
}