PostgreSQL
 sql >> Teknologi Basis Data >  >> RDS >> PostgreSQL

Kunci utama dengan Apache Spark

Skala :

Jika yang Anda butuhkan hanyalah nomor unik, Anda dapat menggunakan zipWithUniqueId dan buat ulang DataFrame. Pertama, beberapa impor dan data dummy:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Ekstrak skema untuk penggunaan lebih lanjut:

val schema = df.schema

Tambahkan bidang id:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Buat DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

Hal yang sama di Python :

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Jika Anda lebih suka nomor berurutan, Anda dapat mengganti zipWithUniqueId dengan zipWithIndex tapi harganya sedikit lebih mahal.

Langsung dengan DataFrame API :

(Scala universal, Python, Java, R dengan sintaks yang hampir sama)

Sebelumnya saya melewatkan monotonicallyIncreasingId fungsi yang seharusnya berfungsi dengan baik selama Anda tidak memerlukan angka berurutan:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Meskipun berguna monotonicallyIncreasingId adalah non-deterministik. Tidak hanya id yang mungkin berbeda dari eksekusi ke eksekusi tetapi tanpa trik tambahan tidak dapat digunakan untuk mengidentifikasi baris ketika operasi selanjutnya berisi filter.

Catatan :

Dimungkinkan juga untuk menggunakan rowNumber fungsi jendela:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Sayangnya:

Jendela PERINGATAN:Tidak Ada Partisi yang Ditentukan untuk operasi Jendela! Memindahkan semua data ke satu partisi, ini dapat menyebabkan penurunan kinerja yang serius.

Jadi, kecuali Anda memiliki cara alami untuk mempartisi data Anda dan memastikan keunikan tidak terlalu berguna saat ini.



  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. Ikhtisar PostgreSQL Query Caching &Load Balancing

  2. Bagaimana Anda menggabungkan dua tabel pada bidang kunci asing menggunakan Django ORM?

  3. Bagaimana menghubungkan ke database PostgreSQL jarak jauh melalui SSL dengan Python

  4. Java SQL ERROR:Nama_Tabel Relasi tidak ada

  5. Array agregat Postgresql