PysparkのRDDで分散処理した話

ApacheSparkのpythonSDKのpysparkを使用して分散処理しようとしたお話ですね。

RDDの分散処理の基本の流れは、配列をRDD化して、mapで処理をして、collectで結果を返すという流れのようです。

サンプルコードを紹介しますね

サンプルコード紹介

以下では次を前提とする

from pyspark import SparkContext
from pyspark.sql import SQLContext
import timeit
import time

sc = SparkContext()
sqlContext = SQLContext(sc)

RDDを作る

parallelize関数はリストやタプルからRDDを作成できます。

number_list = list(range(1, 31))
rdd = sc.parallelize(number_list)

通常のmapで処理

def sample_def(num):
    time.sleep(1.0)
    return num * num

start_t = timeit.default_timer()

number_list = list(range(1, 31))
print(list(map(sample_def, number_list))[-1])

end_t = timeit.default_timer()
print('Time elapsed: {:.5} seconds'.format(end_t-start_t))

[リザルト]

900
Time elapsed: 30.075 seconds

time.sleep(1.0)を使用することで疑似的に1秒かかる処理を作成しています。 繰り返しのループなので30秒かかることが分かりますね。

rddのmapで分散処理

def sample_def(num):
    time.sleep(1.0)
    return num * num

start_t = timeit.default_timer()

number_list = list(range(1, 31))
rdd = sc.parallelize(number_list)
print(rdd.map(sample_def).collect()[-1])

end_t = timeit.default_timer()
print('Time elapsed: {:.5} seconds'.format(end_t-start_t))

[リザルト]

900
Time elapsed: 9.1878 seconds

こっちは30秒かかる処理が9秒でできていますね、分散できてることが分かりますね。

Nakano
Nakano
Back-end engineer

AWS,Rails,UE4,vue.js,hugo,その他なんでもやりたい

関連項目