Pyspark.sql.DataFrameWriter.saveAsTable()
Mula-mula, kita akan melihat cara menulis PySpark DataFrame sedia ada ke dalam jadual menggunakan fungsi write.saveAsTable(). Ia memerlukan nama jadual dan parameter pilihan lain seperti mod, partionBy, dsb., untuk menulis DataFrame pada jadual. Ia disimpan sebagai fail parket.
Sintaks:
dataframe_obj.write.saveAsTable(path/Table_name,mod,partitionBy,…)
- Table_name ialah nama jadual yang dibuat daripada dataframe_obj.
- Kita boleh menambah/menimpa data jadual menggunakan parameter mod.
- PartitionBy mengambil satu/berbilang lajur untuk mencipta sekatan berdasarkan nilai dalam lajur yang disediakan ini.
Contoh 1:
Cipta PySpark DataFrame dengan 5 baris dan 4 lajur. Tulis Dataframe ini pada jadual bernama 'Agri_Table1'.
import pyspark
daripada pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()
# data pertanian dengan 5 baris dan 5 lajur
agri =[{ 'Jenis_Tanah' : 'Hitam' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 2500 , 'Status_tanah' : 'Kering' ,
'Negara' : 'USA' },
{ 'Jenis_Tanah' : 'Hitam' , 'Ketersediaan_pengairan' : 'Ya' , 'Ekar' : 3500 , 'Status_tanah' : 'Basah' ,
'Negara' : 'India' },
{ 'Jenis_Tanah' : 'Merah' , 'Ketersediaan_pengairan' : 'Ya' , 'Ekar' : 210 , 'Status_tanah' : 'Kering' ,
'Negara' : 'UK' },
{ 'Jenis_Tanah' : 'Lain-lain' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 1000 , 'Status_tanah' : 'Basah' ,
'Negara' : 'USA' },
{ 'Jenis_Tanah' : 'Pasir' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 500 , 'Status_tanah' : 'Kering' ,
'Negara' : 'India' }]
# buat kerangka data daripada data di atas
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Tulis DataFrame di atas pada jadual.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Pengeluaran:
Kita dapat melihat bahawa satu fail parket dicipta dengan Data PySpark sebelumnya.
Contoh 2:
Pertimbangkan DataFrame sebelumnya dan tulis 'Agri_Table2' pada jadual dengan membahagikan rekod berdasarkan nilai dalam lajur 'Negara'.
# Tulis DataFrame di atas ke jadual dengan parameter partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Negara' ])
Pengeluaran:
Terdapat tiga nilai unik dalam lajur 'Negara' - 'India', 'UK' dan 'AS'. Jadi, tiga partition dicipta. Setiap partition memegang fail parket.
Pyspark.sql.DataFrameReader.table()
Mari muatkan jadual ke dalam PySpark DataFrame menggunakan fungsi spark.read.table(). Ia hanya memerlukan satu parameter iaitu nama laluan/jadual. Ia secara langsung memuatkan jadual ke dalam PySpark DataFrame dan semua fungsi SQL yang digunakan pada PySpark DataFrame juga boleh digunakan pada DataFrame yang dimuatkan ini.
Sintaks:
spark_app.read.table(path/'Table_name')Dalam senario ini, kami menggunakan jadual sebelumnya yang dibuat daripada PySpark DataFrame. Pastikan anda perlu melaksanakan coretan kod senario sebelumnya dalam persekitaran anda.
Contoh:
Muatkan jadual “Agri_Table1” ke dalam DataFrame bernama “loaded_data”.
loaded_data = linuxhint_spark_app.read.table( 'Agri_Table1' )loaded_data.show()
Pengeluaran:
Kita dapat melihat bahawa jadual dimuatkan ke dalam PySpark DataFrame.
Melaksanakan Pertanyaan SQL
Sekarang, kami melaksanakan beberapa pertanyaan SQL pada DataFrame yang dimuatkan menggunakan fungsi spark.sql() .
# Gunakan arahan SELECT untuk memaparkan semua lajur daripada jadual di atas.linuxhint_spark_app.sql( 'PILIH * daripada Agri_Table1' ).tunjukkan()
# WHERE Fasal
linuxhint_spark_app.sql( 'PILIH * daripada Agri_Table1 WHERE Soil_status='Dry' ' ).tunjukkan()
linuxhint_spark_app.sql( 'PILIH * daripada Agri_Table1 WHERE Acres > 2000 ' ).tunjukkan()
Pengeluaran:
- Pertanyaan pertama memaparkan semua lajur dan rekod daripada DataFrame.
- Pertanyaan kedua memaparkan rekod berdasarkan lajur 'Soil_status'. Terdapat hanya tiga rekod dengan elemen 'Kering'.
- Pertanyaan terakhir mengembalikan dua rekod dengan 'Ekar' yang lebih besar daripada 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Menggunakan fungsi insertInto(), kita boleh menambahkan DataFrame ke dalam jadual sedia ada. Kita boleh menggunakan fungsi ini bersama-sama dengan selectExpr() untuk mentakrifkan nama lajur dan kemudian memasukkannya ke dalam jadual. Fungsi ini juga mengambil tableName sebagai parameter.
Sintaks:
DataFrame_obj.write.insertInto('Table_name')Dalam senario ini, kami menggunakan jadual sebelumnya yang dibuat daripada PySpark DataFrame. Pastikan anda perlu melaksanakan coretan kod senario sebelumnya dalam persekitaran anda.
Contoh:
Buat DataFrame baharu dengan dua rekod dan masukkannya ke dalam jadual 'Agri_Table1'.
import pysparkdaripada pyspark.sql import SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()
# data pertanian dengan 2 baris
agri =[{ 'Jenis_Tanah' : 'Pasir' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 2500 , 'Status_tanah' : 'Kering' ,
'Negara' : 'USA' },
{ 'Jenis_Tanah' : 'Pasir' , 'Ketersediaan_pengairan' : 'Tidak' , 'Ekar' : 1200 , 'Status_tanah' : 'Basah' ,
'Negara' : 'Jepun' }]
# buat kerangka data daripada data di atas
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Ekar' , 'Negara' , 'Ketersediaan_pengairan' , 'Jenis_Tanah' ,
'Status_tanah' ).write.insertInto( 'Agri_Table1' )
# Paparkan Agri_Table1 yang terakhir
linuxhint_spark_app.sql( 'PILIH * daripada Agri_Table1' ).tunjukkan()
Pengeluaran:
Kini, jumlah bilangan baris yang terdapat dalam DataFrame ialah 7.
Kesimpulan
Anda kini memahami cara menulis PySpark DataFrame pada jadual menggunakan fungsi write.saveAsTable(). Ia memerlukan nama jadual dan parameter pilihan lain. Kemudian, kami memuatkan jadual ini ke dalam PySpark DataFrame menggunakan fungsi spark.read.table(). Ia hanya memerlukan satu parameter iaitu nama laluan/jadual. Jika anda ingin menambahkan DataFrame baharu ke dalam jadual sedia ada, gunakan fungsi insertInto().