Cara Membaca dan Menulis Data Jadual dalam PySpark

Cara Membaca Dan Menulis Data Jadual Dalam Pyspark



Pemprosesan data dalam PySpark lebih pantas jika data dimuatkan dalam bentuk jadual. Dengan ini, menggunakan Ungkapan SQL, pemprosesan akan menjadi cepat. Jadi, menukar PySpark DataFrame/RDD ke dalam jadual sebelum menghantarnya untuk diproses adalah pendekatan yang lebih baik. Hari ini, kita akan melihat cara membaca data jadual ke dalam PySpark DataFrame, menulis PySpark DataFrame pada jadual dan memasukkan DataFrame baharu ke jadual sedia ada menggunakan fungsi terbina dalam. Mari pergi!

Pyspark.sql.DataFrameWriter.saveAsTable()

Mula-mula, kita akan melihat cara menulis PySpark DataFrame sedia ada ke dalam jadual menggunakan fungsi write.saveAsTable(). Ia memerlukan nama jadual dan parameter pilihan lain seperti mod, partionBy, dsb., untuk menulis DataFrame pada jadual. Ia disimpan sebagai fail parket.

Sintaks:







dataframe_obj.write.saveAsTable(path/Table_name,mod,partitionBy,…)
  1. Table_name ialah nama jadual yang dibuat daripada dataframe_obj.
  2. Kita boleh menambah/menimpa data jadual menggunakan parameter mod.
  3. PartitionBy mengambil satu/berbilang lajur untuk mencipta sekatan berdasarkan nilai dalam lajur yang disediakan ini.

Contoh 1:

Cipta PySpark DataFrame dengan 5 baris dan 4 lajur. Tulis Dataframe ini pada jadual bernama 'Agri_Table1'.



import pyspark

daripada pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

# data pertanian dengan 5 baris dan 5 lajur

agri =[{ 'Jenis_Tanah' : 'Hitam' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 2500 , 'Status_tanah' : 'Kering' ,
'Negara' : 'USA' },

{ 'Jenis_Tanah' : 'Hitam' , 'Ketersediaan_pengairan' : 'Ya' , 'Ekar' : 3500 , 'Status_tanah' : 'Basah' ,
'Negara' : 'India' },

{ 'Jenis_Tanah' : 'Merah' , 'Ketersediaan_pengairan' : 'Ya' , 'Ekar' : 210 , 'Status_tanah' : 'Kering' ,
'Negara' : 'UK' },

{ 'Jenis_Tanah' : 'Lain-lain' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 1000 , 'Status_tanah' : 'Basah' ,
'Negara' : 'USA' },

{ 'Jenis_Tanah' : 'Pasir' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 500 , 'Status_tanah' : 'Kering' ,
'Negara' : 'India' }]



# buat kerangka data daripada data di atas

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Tulis DataFrame di atas pada jadual.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Pengeluaran:







Kita dapat melihat bahawa satu fail parket dicipta dengan Data PySpark sebelumnya.



Contoh 2:

Pertimbangkan DataFrame sebelumnya dan tulis 'Agri_Table2' pada jadual dengan membahagikan rekod berdasarkan nilai dalam lajur 'Negara'.

# Tulis DataFrame di atas ke jadual dengan parameter partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Negara' ])

Pengeluaran:

Terdapat tiga nilai unik dalam lajur 'Negara' - 'India', 'UK' dan 'AS'. Jadi, tiga partition dicipta. Setiap partition memegang fail parket.

Pyspark.sql.DataFrameReader.table()

Mari muatkan jadual ke dalam PySpark DataFrame menggunakan fungsi spark.read.table(). Ia hanya memerlukan satu parameter iaitu nama laluan/jadual. Ia secara langsung memuatkan jadual ke dalam PySpark DataFrame dan semua fungsi SQL yang digunakan pada PySpark DataFrame juga boleh digunakan pada DataFrame yang dimuatkan ini.

Sintaks:

spark_app.read.table(path/'Table_name')

Dalam senario ini, kami menggunakan jadual sebelumnya yang dibuat daripada PySpark DataFrame. Pastikan anda perlu melaksanakan coretan kod senario sebelumnya dalam persekitaran anda.

Contoh:

Muatkan jadual “Agri_Table1” ke dalam DataFrame bernama “loaded_data”.

loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

loaded_data.show()

Pengeluaran:

Kita dapat melihat bahawa jadual dimuatkan ke dalam PySpark DataFrame.

Melaksanakan Pertanyaan SQL

Sekarang, kami melaksanakan beberapa pertanyaan SQL pada DataFrame yang dimuatkan menggunakan fungsi spark.sql() .

# Gunakan arahan SELECT untuk memaparkan semua lajur daripada jadual di atas.

linuxhint_spark_app.sql( 'PILIH * daripada Agri_Table1' ).tunjukkan()

# WHERE Fasal

linuxhint_spark_app.sql( 'PILIH * daripada Agri_Table1 WHERE Soil_status='Dry' ' ).tunjukkan()

linuxhint_spark_app.sql( 'PILIH * daripada Agri_Table1 WHERE Acres > 2000 ' ).tunjukkan()

Pengeluaran:

  1. Pertanyaan pertama memaparkan semua lajur dan rekod daripada DataFrame.
  2. Pertanyaan kedua memaparkan rekod berdasarkan lajur 'Soil_status'. Terdapat hanya tiga rekod dengan elemen 'Kering'.
  3. Pertanyaan terakhir mengembalikan dua rekod dengan 'Ekar' yang lebih besar daripada 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Menggunakan fungsi insertInto(), kita boleh menambahkan DataFrame ke dalam jadual sedia ada. Kita boleh menggunakan fungsi ini bersama-sama dengan selectExpr() untuk mentakrifkan nama lajur dan kemudian memasukkannya ke dalam jadual. Fungsi ini juga mengambil tableName sebagai parameter.

Sintaks:

DataFrame_obj.write.insertInto('Table_name')

Dalam senario ini, kami menggunakan jadual sebelumnya yang dibuat daripada PySpark DataFrame. Pastikan anda perlu melaksanakan coretan kod senario sebelumnya dalam persekitaran anda.

Contoh:

Buat DataFrame baharu dengan dua rekod dan masukkannya ke dalam jadual 'Agri_Table1'.

import pyspark

daripada pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

# data pertanian dengan 2 baris

agri =[{ 'Jenis_Tanah' : 'Pasir' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 2500 , 'Status_tanah' : 'Kering' ,
'Negara' : 'USA' },

{ 'Jenis_Tanah' : 'Pasir' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 1200 , 'Status_tanah' : 'Basah' ,
'Negara' : 'Jepun' }]

# buat kerangka data daripada data di atas

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Ekar' , 'Negara' , 'Ketersediaan_pengairan' , 'Jenis_Tanah' ,
'Status_tanah' ).write.insertInto( 'Agri_Table1' )

# Paparkan Agri_Table1 yang terakhir

linuxhint_spark_app.sql( 'PILIH * daripada Agri_Table1' ).tunjukkan()

Pengeluaran:

Kini, jumlah bilangan baris yang terdapat dalam DataFrame ialah 7.

Kesimpulan

Anda kini memahami cara menulis PySpark DataFrame pada jadual menggunakan fungsi write.saveAsTable(). Ia memerlukan nama jadual dan parameter pilihan lain. Kemudian, kami memuatkan jadual ini ke dalam PySpark DataFrame menggunakan fungsi spark.read.table(). Ia hanya memerlukan satu parameter iaitu nama laluan/jadual. Jika anda ingin menambahkan DataFrame baharu ke dalam jadual sedia ada, gunakan fungsi insertInto().