当前位置 博文首页 > zhuziying99的专栏:Learning Spark笔记13-基于分区工作、通过管

    zhuziying99的专栏:Learning Spark笔记13-基于分区工作、通过管

    作者:[db:作者] 时间:2021-09-12 18:10

    基于每个分区工作


    使用每个分区的数据可以避免重新设置每个数据项的设置工作。类似的操作如打开数据连接或者创建一个随机数的生成器的设置步骤避免在每个元素上操作。Spark具有每个分区版本的map和foreach,通过让RDD的每个分区只运行一次代码来帮助降低这些操作的成本。


    让我们回到之前呼号的例子,有一个无线电呼号的在线数据库可以查询到联系人的列表。通过使用分区的操作,我们可以分享数据库的连接池来避免建立很多的连接,然后重用JSON解析器。下面的列子,我们使用mapPartitions()函数,它给我们输入RDD的每个分区元素的迭代器,然后返回我们结果的迭代器。


    Example 6-10. Shared connection pool in Python
    def processCallSigns(signs):
    ?"""Lookup call signs using a connection pool"""
    ?# Create a connection pool
    ?http = urllib3.PoolManager()
    ?# the URL associated with each call sign record
    ?urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
    ?# create the requests (non-blocking)
    ?requests = map(lambda x: (x, http.request('GET', x)), urls)
    ?# fetch the results
    ?result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
    ?# remove any empty results and return
    ?return filter(lambda x: x[1] is not None, result)
    def fetchCallSigns(input):
    ?"""Fetch call signs"""
    ?return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
    contactsContactList = fetchCallSigns(validSigns)




    Example 6-11. Shared connection pool and JSON parser in Scala
    val contactsContactLists = validSigns.distinct().mapPartitions{
    ?signs =>
    ?val mapper = createMapper()
    ?val client = new HttpClient()
    ?client.start()
    ?// create http request
    ?signs.map {sign =>
    ?createExchangeForSign(sign)
    // fetch responses
    ?}.map{ case (sign, exchange) =>
    ?(sign, readExchangeCallLog(mapper, exchange))
    ?}.filter(x => x._2 != null) // Remove empty CallLogs
    }


    Example 6-12. Shared connection pool and JSON parser in Java
    // Use mapPartitions to reuse setup work.
    JavaPairRDD<String, CallLog[]> contactsContactLists =
    ?validCallSigns.mapPartitionsToPair(
    ?new PairFlatMapFunction<Iterator<String>, String, CallLog[]>() {
    ?public Iterable<Tuple2<String, CallLog[]>> call(Iterator<String> input) {
    ?// List for our results.
    ?ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();
    ?ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();
    ?ObjectMapper mapper = createMapper();
    ?HttpClient client = new HttpClient();
    ?try {
    ?client.start();
    ?while (input.hasNext()) {
    ?requests.add(createRequestForSign(input.next(), client));
    ?}
    ?for (Tuple2<String, ContentExchange> signExchange : requests) {
    ?callsignLogs.add(fetchResultFromRequest(mapper, signExchange));
    ?}
    ?} catch (Exception e) {
    ?}
    ?return callsignLogs;
    ?}});
    System.out.println(StringUtils.join(contactsContactLists.collect(), ","));


    此外避免设置工作,我们可以使用mapPartitions()避免过多的对象创建。有时我们需要通过一个对象来聚合不同类型的结果。


    Example 6-13. Average without mapPartitions() in Python
    def combineCtrs(c1, c2):
    ?return (c1[0] + c2[0], c1[1] + c2[1])
    def basicAvg(nums):
    ?"""Compute the average"""
    ?nums.map(lambda num: (num, 1)).reduce(combineCtrs)
    Example 6-14. Average with mapPartitions() in Python
    def partitionCtr(nums):
    ?"""Compute sumCounter for partition"""
    ?sumCount = [0, 0]
    ?for num in nums:
    ?sumCount[0] += num
    ?sumCount[1] += 1
    ?return [sumCount]
    def fastAvg(nums):
    ?"""Compute the avg"""
    ?sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
    ?return sumCount[0] / float(sumCount[1])


    管道通向外部程序


    你可以选择三种语言之一编写Spark应用程序。然而,如果你需要的不是Scala、Java或Python,那么Spark也提供了一种机制通过通道到其他语言,类似R脚本。


    Spark提供在RDD中的pipe()方法。该方法允许我们使用其他语言写jobs的部分代码,只要它可以读写到Unix标准流中。你可以编写一个RDD的转换,它将标准输入中的每个RDD元素作为字符串读取,然后操作该字符串,然后将其作为字符串写入标准输出。接口和编程模型是受限且有限的,但是有时你必须要做一些事情,可以使用本地代码map或过滤器操作功能。


    最有可能的是,你想要将RDD的内容通过管道给外部的应用程序或脚本,因为你已经编译和测试好了你的程序,你想要在Spark中重用。大量的数据科学家都有R的代码,所以我们选择通过pipe()来使用R的程序。


    在下面的例子中,我们使用R的类库计算所有联系人的距离。RDD中的每个元素都是由我们的程序用newline进行分割,然后在结果RDD中程序输出的每行都是一个字符串元素。为了使R程序更容易的解析输入,我们会重新格式化我们的数据为mylat,mylon,theirlat,theirlon用逗号作为分隔符。


    Example 6-15. R distance program
    #!/usr/bin/env Rscript
    library("Imap")
    f <- file("stdin")
    open(f)
    while(length(line <- readLines(f,n=1)) > 0) {
    ?# process line
    ?contents <- Map(as.numeric, strsplit(line, ","))
    ?mydist <- gdist(contents[[1]][1], contents[[1]][2],
    ?contents[[1]][3], contents[[1]][4],
    ?units="m", a=6378137.0, b=6356752.3142, verbose = FALSE)
    ?write(mydist, stdout())
    }


    上面的代码放在可执行文件中.src/R/finddistance.R,要执行的话结果是:


    $ ./src/R/finddistance.R
    37.75889318222431,-122.42683635321838,37.7614213,-122.4240097
    349.2602
    coffee
    NA


    到目前为止,还不错————我们现在可以将每一行从stdin到stdout。现在我们需要将finddistance.R应用在每个工作节点上,实际上是使用shell脚本来转换成RDD的。


    Example 6-16. Driver program using pipe() to call finddistance.R in Python
    # Compute the distance of each call using an external R program
    distScript = "./src/R/finddistance.R"
    distScriptName = "finddistance.R"
    sc.addFile(distScript)
    def hasDistInfo(call):
    ?"""Verify that a call has the fields required to compute the distance"""
    ?requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
    ?return all(map(lambda f: call[f], requiredFields))
    def formatCall(call):
    ?"""Format a call so that it can be parsed by our R program"""
    ?return "{0},{1},{2},{3}".format(
    ?call["mylat"], call["mylong"],
    ?call["contactlat"], call["contactlong"])
    pipeInputs = contactsContactList.values().flatMap(
    ?lambda calls: map(formatCall, filter(hasDistInfo, calls)))
    distances = pipeInputs.pipe(SparkFiles.get(distScriptName))
    print distances.collect()


    Example 6-17. Driver program using pipe() to call finddistance.R in Scala
    // Compute the distance of each call using an external R program
    // adds our script to a list of files for each node to download with this job
    val distScript = "./src/R/finddistance.R"
    val distScriptName = "finddistance.R"
    sc.addFile(distScript)
    val distances = contactsContactLists.values.flatMap(x => x.map(y =>
    ?s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(
    ?SparkFiles.get(distScriptName)))
    println(distances.collect().toList)




    Example 6-18. Driver program using pipe() to call finddistance.R in Java
    // Compute the distance of each call using an external R program
    // adds our script to a list of files for each node to download with this job
    String distScript = "./src/R/finddistance.R";
    String distScriptName = "finddistance.R";
    sc.addFile(distScript);
    JavaRDD<String> pipeInputs = contactsContactLists.values()
    ?.map(new VerifyCallLogs()).flatMap(
    ?new FlatMapFunction<CallLog[], String>() {
    ?public Iterable<String> call(CallLog[] calls) {
    ?ArrayList<String> latLons = new ArrayList<String>();
    ?for (CallLog call: calls) {
    ?latLons.add(call.mylat + "," + call.mylong +
    ? "," + call.contactlat + "," + call.contactlong);
    ?}
    ?return latLons;
    ?}
    ?});
    JavaRDD<String> distances = pipeInputs.pipe(SparkFiles.get(distScriptName));
    System.out.println(StringUtils.join(distances.collect(), ","));


    使用SparkContext.addFile(path),我们可以为每个工作节点建立一个文件列表使用Spark作业进行下载。这些文件来自于驱动的本地文件系统,或者来自于HTTP,HTTPS,或者FTP URI。当一个动作在job中执行时,该文件会自动被每个节点下载。该文件能够在工作节点上SparkFiles.getRootDirectory位置上找到,或者通过SparkFiles.get(filename)定位。当然,这只是确保pipe()可以在每个工作节点上找到脚本的一种方法。您可以使用另一个远程复制工具
    将脚本文件放在每个节点上的可知位置。


    一旦脚本可用,RDD的pipe()方法可以很容易的用管道将RDD的元素传到脚本中。
    ? rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
    ? rdd.pipe(SparkFiles.get("finddistance.R") + " ,")


    数字RDD操作


    Spark提供了几个描述性统计操作。Spark数字操作是由一个流式算法实现的,该算法允许一次构建含有一个元素的模型。描述性统计信息都是通过单次传递数据计算的,并通过调用stats()作为StatsCounter对象返回。StatsCounter对象的方法如下:


    count() Number of elements in the RDD
    mean() Average of the elements
    sum() Total
    max() Maximum value
    min() Minimum value
    variance() Variance of the elements
    sampleVariance() Variance of the elements, computed for a sample
    stdev() Standard deviation
    sampleStdev() Sample standard deviation


    如果你想要计算上面其中一个,你可以直接调用rdd.mean()或rdd.sum()


    在示例6-19到6-21中,我们将使用汇总统计信息来从我们的数据中删除一些异常值。 由于我们将两次进行相同的RDD(一次计算汇总统计数据并一次删除异常值),我们可能希望缓存RDD。回到我们之前的日志的例子,我们可以将日志中太远的联系人移除。


    Example 6-19. Removing outliers in Python
    # Convert our RDD of strings to numeric data so we can compute stats and
    # remove the outliers.
    distanceNumerics = distances.map(lambda string: float(string))
    stats = distanceNumerics.stats()
    stddev = std.stdev()
    mean = stats.mean()
    reasonableDistances = distanceNumerics.filter(
    ?lambda x: math.fabs(x - mean) < 3 * stddev)
    print reasonableDistances.collect()


    Example 6-20. Removing outliers in Scala
    // Now we can go ahead and remove outliers since those may have misreported locations
    // first we need to take our RDD of strings and turn it into doubles.
    val distanceDouble = distance.map(string => string.toDouble)
    val stats = distanceDoubles.stats()
    val stddev = stats.stdev
    val mean = stats.mean
    val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev)
    println(reasonableDistance.collect().toList)


    Example 6-21. Removing outliers in Java
    // First we need to convert our RDD of String to a DoubleRDD so we can
    // access the stats function
    JavaDoubleRDD distanceDoubles = distances.mapToDouble(new DoubleFunction<String>() {
    ?public double call(String value) {
    ?return Double.parseDouble(value);
    ?}});
    final StatCounter stats = distanceDoubles.stats();
    final Double stddev = stats.stdev();
    final Double mean = stats.mean();
    JavaDoubleRDD reasonableDistances =
    ?distanceDoubles.filter(new Function<Double, Boolean>() {
    ?public Boolean call(Double x) {
    ?return (Math.abs(x-mean) < 3 * stddev);}});
    System.out.println(StringUtils.join(reasonableDistance.collect(), ","));cs