Dataframe.rddで分散されたりされなかったりする話
導入
pysparkで分散処理をした際にdataframe.rddで分散処理されなかったりしたのでまとめてみました。
事前準備
事前パッケージとして以下のコードは全てこれらのコードを実行した後のものとします。
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
import timeit
import time
import pandas as pd
import numpy as np
sc = SparkContext.getOrCreate()
spark = SQLContext(sc)
途中でtest.csv
を読み込みますが、それらは以下のようなファイルです。
[test.csv]
age,name,weight
35,Ken,
30,Bob,80
29,Meg,45
分散処理のサンプルコード
file_name = 'test.csv'
df=spark.read.csv(file_name,header=True)
df.show()
def method(row):
time.sleep(1.0)
output = row.asDict()
output["test"] = "a"
return output
rdd = df.rdd
start_t = timeit.default_timer()
arr = rdd.map(method).collect()
end_t = timeit.default_timer()
print('実行時間: {:.5} 秒'.format(end_t-start_t))
old_schema = df.schema
test = [StructField("test", StringType(), True)]
new_schema = StructType(old_schema.fields + test)
new_df = spark.createDataFrame(arr, new_schema)
new_df.toPandas()
今回の問題のコードですね。
spark.read.csv
でcsvを読み込んで、dataframeを作成しています。- その後dataFrameを.rddでrddに変換します。
- 並列処理で1秒かかる処理を作っています。
- コアが4つで並列処理するので、大体1秒弱で処理が完了していたら、並列処理完了です。
- rdd.map(method).collect()で実行しています。
- old_schema以降はカラムを追加して整形してるだけなので直接並列演算とは関係ないです。
[リザルト]
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
実行時間: 3.0664 秒
age name weight test
0 35 Ken None a
1 30 Bob 80 a
2 29 Meg 45 a
3秒って並列処理できてないですね^q^
対処方法1:listにしてparallelize
rdd = df.rdd
をrdd = sc.parallelize(df.collect())
に変更する
listをparallelizeしたら、並列演算はできるので、これで並列演算できます!
file_name = 'test.csv'
df=spark.read.csv(file_name,header=True)
df.show()
def method(row):
time.sleep(1.0)
output = row.asDict()
output["test"] = "a"
return output
## 変更点
rdd = sc.parallelize(df.collect())
start_t = timeit.default_timer()
arr = rdd.map(method).collect()
end_t = timeit.default_timer()
print('実行時間: {:.5} 秒'.format(end_t-start_t))
old_schema = df.schema
test = [StructField("test", StringType(), True)]
new_schema = StructType(old_schema.fields + test)
new_df = spark.createDataFrame(arr, new_schema)
new_df.toPandas()
[リザルト]
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| null|
| 30| Bob| 80|
| 29| Meg| 45|
+---+----+------+
実行時間: 1.0609 秒
age name weight test
0 35 Ken None a
1 30 Bob 80 a
2 29 Meg 45 a
実行時間は1.0609 秒なので無事並列処理できてますね。 つまり.rddはクソ!!!!
.rddでも並列処理されるパターンはある
pdf = pd.DataFrame(np.random.rand(4, 2))
df = spark.createDataFrame(pdf)
df.show()
rdd = df.rdd
def method(row):
time.sleep(1.0)
output = row.asDict()
output["2"] = output["0"] + output["1"]
return output
start_t = timeit.default_timer()
arr = rdd.map(method).collect()
end_t = timeit.default_timer()
new_df = spark.createDataFrame(arr)
new_df.show()
print('実行時間: {:.5} 秒'.format(end_t-start_t))
pandasでdataFrameを作成してそこから、sparkDataFrameを作成したのち、.rddしても 並列処理は実行されないのではと思っていましたが…
[リザルト]
+-------------------+-------------------+
| 0| 1|
+-------------------+-------------------+
| 0.2966984031832388|0.10033876323341062|
| 0.5333539421851482| 0.706730265715305|
|0.29422229134444267| 0.5088745403255253|
| 0.2534767444062944|0.22028180418108123|
+-------------------+-------------------+
+-------------------+-------------------+-------------------+
| 0| 1| 2|
+-------------------+-------------------+-------------------+
| 0.2966984031832388|0.10033876323341062|0.39703716641664943|
| 0.5333539421851482| 0.706730265715305| 1.2400842079004533|
|0.29422229134444267| 0.5088745403255253| 0.8030968316699679|
| 0.2534767444062944|0.22028180418108123|0.47375854858737565|
+-------------------+-------------------+-------------------+
実行時間: 1.1411 秒
うーんガッツリ並列処理されてますね()
対処方法2:pandasにして.rdd
pdf = pd.read_csv('test.csv')
df = spark.createDataFrame(pdf)
df.show()
def method(row):
time.sleep(1.0)
output = row.asDict()
output["test"] = "a"
return output
rdd = df.rdd
start_t = timeit.default_timer()
arr = rdd.map(method).collect()
end_t = timeit.default_timer()
print('実行時間: {:.5} 秒'.format(end_t-start_t))
old_schema = df.schema
test = [StructField("test", StringType(), True)]
new_schema = StructType(old_schema.fields + test)
new_df = spark.createDataFrame(arr, new_schema)
new_df.show()
pd.read_csv('test.csv')
を使用してpandasを作成してから
pandasをspark.createDataFrame(pdf) でdataframeに変換して、.rddします
[リザルト]
+---+----+------+
|age|name|weight|
+---+----+------+
| 35| Ken| NaN|
| 30| Bob| 80.0|
| 29| Meg| 45.0|
+---+----+------+
実行時間: 1.1156 秒
+---+----+------+----+
|age|name|weight|test|
+---+----+------+----+
| 35| Ken| NaN| a|
| 30| Bob| 80.0| a|
| 29| Meg| 45.0| a|
+---+----+------+----+
無事1秒なので分散処理されてますね…
まとめ
df=spark.read.csv
とdf.rdd
の相性が最悪
やる際は、listにしてparallelizかpandasにして.rddにすることで、分散はされるが、分散されたりされなかったりは非常に怖い^q^