    Example 6-10. Shared connection pool in Python
    def processCallSigns(signs):
    ?"""Lookup call signs using a connection pool"""
    ?# Create a connection pool
    ?http = urllib3.PoolManager()
    ?# the URL associated with each call sign record
    ?urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
    ?# create the requests (non-blocking)
    ?requests = map(lambda x: (x, http.request('GET', x)), urls)
    ?# fetch the results
    ?result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
    ?# remove any empty results and return
    ?return filter(lambda x: x[1] is not None, result)
    def fetchCallSigns(input):
    ?"""Fetch call signs"""
    ?return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
    contactsContactList = fetchCallSigns(validSigns)

    Example 6-11. Shared connection pool and JSON parser in Scala
    val contactsContactLists = validSigns.distinct().mapPartitions{
    ?signs =>
    ?val mapper = createMapper()
    ?val client = new HttpClient()
    ?// create http request
    ?signs.map {sign =>
    // fetch responses
    ?}.map{ case (sign, exchange) =>
    ?(sign, readExchangeCallLog(mapper, exchange))
    ?}.filter(x => x._2 != null) // Remove empty CallLogs

    Example 6-12. Shared connection pool and JSON parser in Java
    // Use mapPartitions to reuse setup work.
    JavaPairRDD<String, CallLog[]> contactsContactLists =
    ?new PairFlatMapFunction<Iterator<String>, String, CallLog[]>() {
    ?public Iterable<Tuple2<String, CallLog[]>> call(Iterator<String> input) {
    ?// List for our results.
    ?ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();
    ?ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();
    ?ObjectMapper mapper = createMapper();
    ?HttpClient client = new HttpClient();
    ?try {
    ?while (input.hasNext()) {
    ?requests.add(createRequestForSign(input.next(), client));
    ?for (Tuple2<String, ContentExchange> signExchange : requests) {
    ?callsignLogs.add(fetchResultFromRequest(mapper, signExchange));
    ?} catch (Exception e) {
    ?return callsignLogs;
    System.out.println(StringUtils.join(contactsContactLists.collect(), ","));


    Example 6-13. Average without mapPartitions() in Python
    def combineCtrs(c1, c2):
    ?return (c1[0] + c2[0], c1[1] + c2[1])
    def basicAvg(nums):
    ?"""Compute the average"""
    ?nums.map(lambda num: (num, 1)).reduce(combineCtrs)
    Example 6-14. Average with mapPartitions() in Python
    def partitionCtr(nums):
    ?"""Compute sumCounter for partition"""
    ?sumCount = [0, 0]
    ?for num in nums:
    ?sumCount[0] += num
    ?sumCount[1] += 1
    ?return [sumCount]
    def fastAvg(nums):
    ?"""Compute the avg"""
    ?sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
    ?return sumCount[0] / float(sumCount[1])






    Example 6-15. R distance program
    #!/usr/bin/env Rscript
    f <- file("stdin")
    while(length(line <- readLines(f,n=1)) > 0) {
    ?# process line
    ?contents <- Map(as.numeric, strsplit(line, ","))
    ?mydist <- gdist(contents[[1]][1], contents[[1]][2],
    ?contents[[1]][3], contents[[1]][4],
    ?units="m", a=6378137.0, b=6356752.3142, verbose = FALSE)
    ?write(mydist, stdout())


    $ ./src/R/finddistance.R


    Example 6-16. Driver program using pipe() to call finddistance.R in Python
    # Compute the distance of each call using an external R program
    distScript = "./src/R/finddistance.R"
    distScriptName = "finddistance.R"
    def hasDistInfo(call):
    ?"""Verify that a call has the fields required to compute the distance"""
    ?requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
    ?return all(map(lambda f: call[f], requiredFields))
    def formatCall(call):
    ?"""Format a call so that it can be parsed by our R program"""
    ?return "{0},{1},{2},{3}".format(
    ?call["mylat"], call["mylong"],
    ?call["contactlat"], call["contactlong"])
    pipeInputs = contactsContactList.values().flatMap(
    ?lambda calls: map(formatCall, filter(hasDistInfo, calls)))
    distances = pipeInputs.pipe(SparkFiles.get(distScriptName))
    print distances.collect()

    Example 6-17. Driver program using pipe() to call finddistance.R in Scala
    // Compute the distance of each call using an external R program
    // adds our script to a list of files for each node to download with this job
    val distScript = "./src/R/finddistance.R"
    val distScriptName = "finddistance.R"
    val distances = contactsContactLists.values.flatMap(x => x.map(y =>

    Example 6-18. Driver program using pipe() to call finddistance.R in Java
    // Compute the distance of each call using an external R program
    // adds our script to a list of files for each node to download with this job
    String distScript = "./src/R/finddistance.R";
    String distScriptName = "finddistance.R";
    JavaRDD<String> pipeInputs = contactsContactLists.values()
    ?.map(new VerifyCallLogs()).flatMap(
    ?new FlatMapFunction<CallLog[], String>() {
    ?public Iterable<String> call(CallLog[] calls) {
    ?ArrayList<String> latLons = new ArrayList<String>();
    ?for (CallLog call: calls) {
    ?latLons.add(call.mylat + "," + call.mylong +
    ? "," + call.contactlat + "," + call.contactlong);
    ?return latLons;
    JavaRDD<String> distances = pipeInputs.pipe(SparkFiles.get(distScriptName));
    System.out.println(StringUtils.join(distances.collect(), ","));

    使用SparkContext.addFile(path),我们可以为每个工作节点建立一个文件列表使用Spark作业进行下载。这些文件来自于驱动的本地文件系统,或者来自于HTTP,HTTPS,或者FTP URI。当一个动作在job中执行时,该文件会自动被每个节点下载。该文件能够在工作节点上SparkFiles.getRootDirectory位置上找到,或者通过SparkFiles.get(filename)定位。当然,这只是确保pipe()可以在每个工作节点上找到脚本的一种方法。您可以使用另一个远程复制工具

    ? rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
    ? rdd.pipe(SparkFiles.get("finddistance.R") + " ,")



    count() Number of elements in the RDD
    mean() Average of the elements
    sum() Total
    max() Maximum value
    min() Minimum value
    variance() Variance of the elements
    sampleVariance() Variance of the elements, computed for a sample
    stdev() Standard deviation
    sampleStdev() Sample standard deviation


    在示例6-19到6-21中,我们将使用汇总统计信息来从我们的数据中删除一些异常值。 由于我们将两次进行相同的RDD(一次计算汇总统计数据并一次删除异常值),我们可能希望缓存RDD。回到我们之前的日志的例子,我们可以将日志中太远的联系人移除。

    Example 6-19. Removing outliers in Python
    # Convert our RDD of strings to numeric data so we can compute stats and
    # remove the outliers.
    distanceNumerics = distances.map(lambda string: float(string))
    stats = distanceNumerics.stats()
    stddev = std.stdev()
    mean = stats.mean()
    reasonableDistances = distanceNumerics.filter(
    ?lambda x: math.fabs(x - mean) < 3 * stddev)
    print reasonableDistances.collect()

    Example 6-20. Removing outliers in Scala
    // Now we can go ahead and remove outliers since those may have misreported locations
    // first we need to take our RDD of strings and turn it into doubles.
    val distanceDouble = distance.map(string => string.toDouble)
    val stats = distanceDoubles.stats()
    val stddev = stats.stdev
    val mean = stats.mean
    val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev)

    Example 6-21. Removing outliers in Java
    // First we need to convert our RDD of String to a DoubleRDD so we can
    // access the stats function
    JavaDoubleRDD distanceDoubles = distances.mapToDouble(new DoubleFunction<String>() {
    ?public double call(String value) {
    ?return Double.parseDouble(value);
    final StatCounter stats = distanceDoubles.stats();
    final Double stddev = stats.stdev();
    final Double mean = stats.mean();
    JavaDoubleRDD reasonableDistances =
    ?distanceDoubles.filter(new Function<Double, Boolean>() {
    ?public Boolean call(Double x) {
    ?return (Math.abs(x-mean) < 3 * stddev);}});
    System.out.println(StringUtils.join(reasonableDistance.collect(), ","));cs