PysparkのRDDで分散処理した話
ApacheSparkのpythonSDKのpysparkを使用して分散処理しようとしたお話ですね。
RDDの分散処理の基本の流れは、配列をRDD化して、mapで処理をして、collectで結果を返すという流れのようです。
サンプルコードを紹介しますね
サンプルコード紹介
以下では次を前提とする
from pyspark import SparkContext
from pyspark.sql import SQLContext
import timeit
import time
sc = SparkContext()
sqlContext = SQLContext(sc)
RDDを作る
parallelize関数はリストやタプルからRDDを作成できます。
number_list = list(range(1, 31))
rdd = sc.parallelize(number_list)
通常のmapで処理
def sample_def(num):
time.sleep(1.0)
return num * num
start_t = timeit.default_timer()
number_list = list(range(1, 31))
print(list(map(sample_def, number_list))[-1])
end_t = timeit.default_timer()
print('Time elapsed: {:.5} seconds'.format(end_t-start_t))
[リザルト]
900
Time elapsed: 30.075 seconds
time.sleep(1.0)
を使用することで疑似的に1秒かかる処理を作成しています。
繰り返しのループなので30秒かかることが分かりますね。
rddのmapで分散処理
def sample_def(num):
time.sleep(1.0)
return num * num
start_t = timeit.default_timer()
number_list = list(range(1, 31))
rdd = sc.parallelize(number_list)
print(rdd.map(sample_def).collect()[-1])
end_t = timeit.default_timer()
print('Time elapsed: {:.5} seconds'.format(end_t-start_t))
[リザルト]
900
Time elapsed: 9.1878 seconds
こっちは30秒かかる処理が9秒でできていますね、分散できてることが分かりますね。