Skala :
Jika yang Anda butuhkan hanyalah nomor unik, Anda dapat menggunakan zipWithUniqueId
dan buat ulang DataFrame. Pertama, beberapa impor dan data dummy:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Ekstrak skema untuk penggunaan lebih lanjut:
val schema = df.schema
Tambahkan bidang id:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Buat DataFrame:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
Hal yang sama di Python :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Jika Anda lebih suka nomor berurutan, Anda dapat mengganti zipWithUniqueId
dengan zipWithIndex
tapi harganya sedikit lebih mahal.
Langsung dengan DataFrame
API :
(Scala universal, Python, Java, R dengan sintaks yang hampir sama)
Sebelumnya saya melewatkan monotonicallyIncreasingId
fungsi yang seharusnya berfungsi dengan baik selama Anda tidak memerlukan angka berurutan:
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Meskipun berguna monotonicallyIncreasingId
adalah non-deterministik. Tidak hanya id yang mungkin berbeda dari eksekusi ke eksekusi tetapi tanpa trik tambahan tidak dapat digunakan untuk mengidentifikasi baris ketika operasi selanjutnya berisi filter.
Catatan :
Dimungkinkan juga untuk menggunakan rowNumber
fungsi jendela:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Sayangnya:
Jendela PERINGATAN:Tidak Ada Partisi yang Ditentukan untuk operasi Jendela! Memindahkan semua data ke satu partisi, ini dapat menyebabkan penurunan kinerja yang serius.
Jadi, kecuali Anda memiliki cara alami untuk mempartisi data Anda dan memastikan keunikan tidak terlalu berguna saat ini.