Dataframe.rddで分散されたりされなかったりする話

導入

pysparkで分散処理をした際にdataframe.rddで分散処理されなかったりしたのでまとめてみました。

事前準備

事前パッケージとして以下のコードは全てこれらのコードを実行した後のものとします。

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
import timeit
import time
import pandas as pd
import numpy as np

sc = SparkContext.getOrCreate()
spark = SQLContext(sc)

途中でtest.csvを読み込みますが、それらは以下のようなファイルです。

[test.csv]

age,name,weight
35,Ken,
30,Bob,80
29,Meg,45

分散処理のサンプルコード

file_name = 'test.csv'
df=spark.read.csv(file_name,header=True)
df.show()
def method(row):
    time.sleep(1.0)
    output = row.asDict()
    output["test"] = "a"
    return output

rdd = df.rdd

start_t = timeit.default_timer()
arr = rdd.map(method).collect()
end_t = timeit.default_timer()

print('実行時間: {:.5} 秒'.format(end_t-start_t))

old_schema = df.schema
test = [StructField("test", StringType(), True)]
new_schema = StructType(old_schema.fields + test)
new_df = spark.createDataFrame(arr, new_schema)

new_df.toPandas()

今回の問題のコードですね。

  • spark.read.csvでcsvを読み込んで、dataframeを作成しています。
  • その後dataFrameを.rddでrddに変換します。
  • 並列処理で1秒かかる処理を作っています。
  • コアが4つで並列処理するので、大体1秒弱で処理が完了していたら、並列処理完了です。
  • rdd.map(method).collect()で実行しています。
  • old_schema以降はカラムを追加して整形してるだけなので直接並列演算とは関係ないです。

[リザルト]

+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

実行時間: 3.0664 秒
age	name	weight	test
0	35	Ken	None	a
1	30	Bob	80	a
2	29	Meg	45	a

3秒って並列処理できてないですね^q^

対処方法1:listにしてparallelize

rdd = df.rddrdd = sc.parallelize(df.collect())に変更する listをparallelizeしたら、並列演算はできるので、これで並列演算できます!

file_name = 'test.csv'
df=spark.read.csv(file_name,header=True)
df.show()
def method(row):
    time.sleep(1.0)
    output = row.asDict()
    output["test"] = "a"
    return output

## 変更点
rdd = sc.parallelize(df.collect())

start_t = timeit.default_timer()
arr = rdd.map(method).collect()
end_t = timeit.default_timer()

print('実行時間: {:.5} 秒'.format(end_t-start_t))

old_schema = df.schema
test = [StructField("test", StringType(), True)]
new_schema = StructType(old_schema.fields + test)
new_df = spark.createDataFrame(arr, new_schema)

new_df.toPandas()

[リザルト]

+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|  null|
| 30| Bob|    80|
| 29| Meg|    45|
+---+----+------+

実行時間: 1.0609 秒
age	name	weight	test
0	35	Ken	None	a
1	30	Bob	80	a
2	29	Meg	45	a

実行時間は1.0609 秒なので無事並列処理できてますね。 つまり.rddはクソ!!!!

.rddでも並列処理されるパターンはある

pdf = pd.DataFrame(np.random.rand(4, 2))
df = spark.createDataFrame(pdf)
df.show()

rdd = df.rdd

def method(row):
    time.sleep(1.0)
    output = row.asDict()
    output["2"] = output["0"] + output["1"]
    return output

start_t = timeit.default_timer()
arr = rdd.map(method).collect()
end_t = timeit.default_timer()

new_df = spark.createDataFrame(arr)
new_df.show()
print('実行時間: {:.5} 秒'.format(end_t-start_t))

pandasでdataFrameを作成してそこから、sparkDataFrameを作成したのち、.rddしても 並列処理は実行されないのではと思っていましたが…

[リザルト]

+-------------------+-------------------+
|                  0|                  1|
+-------------------+-------------------+
| 0.2966984031832388|0.10033876323341062|
| 0.5333539421851482|  0.706730265715305|
|0.29422229134444267| 0.5088745403255253|
| 0.2534767444062944|0.22028180418108123|
+-------------------+-------------------+

+-------------------+-------------------+-------------------+
|                  0|                  1|                  2|
+-------------------+-------------------+-------------------+
| 0.2966984031832388|0.10033876323341062|0.39703716641664943|
| 0.5333539421851482|  0.706730265715305| 1.2400842079004533|
|0.29422229134444267| 0.5088745403255253| 0.8030968316699679|
| 0.2534767444062944|0.22028180418108123|0.47375854858737565|
+-------------------+-------------------+-------------------+

実行時間: 1.1411 秒

うーんガッツリ並列処理されてますね()

対処方法2:pandasにして.rdd

pdf = pd.read_csv('test.csv')
df = spark.createDataFrame(pdf)
df.show()
def method(row):
    time.sleep(1.0)
    output = row.asDict()
    output["test"] = "a"
    return output

rdd = df.rdd

start_t = timeit.default_timer()
arr = rdd.map(method).collect()
end_t = timeit.default_timer()

print('実行時間: {:.5} 秒'.format(end_t-start_t))

old_schema = df.schema
test = [StructField("test", StringType(), True)]
new_schema = StructType(old_schema.fields + test)
new_df = spark.createDataFrame(arr, new_schema)

new_df.show()

pd.read_csv('test.csv')を使用してpandasを作成してから pandasをspark.createDataFrame(pdf) でdataframeに変換して、.rddします

[リザルト]

+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken|   NaN|
| 30| Bob|  80.0|
| 29| Meg|  45.0|
+---+----+------+

実行時間: 1.1156 秒
+---+----+------+----+
|age|name|weight|test|
+---+----+------+----+
| 35| Ken|   NaN|   a|
| 30| Bob|  80.0|   a|
| 29| Meg|  45.0|   a|
+---+----+------+----+

無事1秒なので分散処理されてますね…

まとめ

df=spark.read.csvdf.rddの相性が最悪

やる際は、listにしてparallelizかpandasにして.rddにすることで、分散はされるが、分散されたりされなかったりは非常に怖い^q^

Nakano
Nakano
Back-end engineer

AWS,Rails,UE4,vue.js,hugo,その他なんでもやりたい