Dalam panduan ini, kami akan memberi tumpuan terutamanya pada membaca/memuatkan fail parket ke dalam PySpark DataFrame/SQL menggunakan fungsi read.parquet() yang tersedia dalam kelas pyspark.sql.DataFrameReader.
Topik Kandungan:
Baca Fail Parket ke PySpark DataFrame
Baca Fail Parket ke SQL PySpark
Pyspark.sql.DataFrameReader.parket()
Fungsi ini digunakan untuk membaca fail parket dan memuatkannya ke dalam PySpark DataFrame. Ia mengambil nama laluan/fail bagi fail parket. Kita hanya boleh menggunakan fungsi read.parquet() kerana ini adalah fungsi generik.
Sintaks:
Mari lihat sintaks read.parquet():
spark_app.read.parket(nama_fail.parket/path)Mula-mula, pasang modul PySpark menggunakan arahan pip:
pip pasang pyspark
Dapatkan Fail Parket
Untuk membaca fail parket, anda memerlukan data di mana fail parket dijana daripada data tersebut. Dalam bahagian ini, kita akan melihat cara menjana fail parket daripada PySpark DataFrame.
Mari buat PySpark DataFrame dengan 5 rekod dan tulis ini pada fail parket 'industri_parket'.
import pysparkdaripada pyspark.sql import SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()
# cipta kerangka data yang menyimpan butiran Industri
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Pertanian' , Luas = 'USA' ,
Penilaian= 'Panas' ,Jumlah_pekerja= 100 ),
Baris(Jenis= 'Pertanian' , Luas = 'India' ,Penilaian= 'Panas' ,Jumlah_pekerja= 200 ),
Baris(Jenis= 'Pembangunan' , Luas = 'USA' ,Penilaian= 'Suam' ,Jumlah_pekerja= 100 ),
Baris(Jenis= 'Pendidikan' ,Kawasan= 'USA' ,Penilaian= 'Sejuk' ,Jumlah_pekerja= 400 ),
Baris(Jenis= 'Pendidikan' , Luas = 'USA' ,Penilaian= 'Suam' ,Jumlah_pekerja= dua puluh )
])
# DataFrame Sebenar
industry_df.show()
# Tulis industri_df pada fail parket
industry_df.coalesce( 1 ).tulis.parket( 'parket_industri' )
Pengeluaran:
Ini ialah DataFrame yang memegang 5 rekod.
Fail parket dicipta untuk DataFrame sebelumnya. Di sini, nama fail kami dengan sambungan ialah 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parket'. Kami menggunakan fail ini dalam keseluruhan tutorial.
Baca Fail Parket ke PySpark DataFrame
Kami mempunyai fail parket. Mari baca fail ini menggunakan fungsi read.parquet() dan muatkannya ke dalam PySpark DataFrame.
import pysparkdaripada pyspark.sql import SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()
# Baca fail parket ke dalam dataframe_from_parket objek.
dataframe_from_parquet=linuxhint_spark_app.read.parket( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parket' )
# Paparkan dataframe_from_parket-DataFrame
dataframe_from_parquet.show()
Pengeluaran:
Kami memaparkan DataFrame menggunakan kaedah show() yang dicipta daripada fail parket.
Pertanyaan SQL dengan Fail Parket
Selepas memuatkan ke dalam DataFrame, anda boleh membuat jadual SQL dan memaparkan data yang terdapat dalam DataFrame. Kita perlu mencipta PANDANGAN SEMENTARA dan menggunakan arahan SQL untuk mengembalikan rekod daripada DataFrame yang dicipta daripada fail parket.
Contoh 1:
Buat paparan sementara bernama 'Sektor' dan gunakan arahan SELECT untuk memaparkan rekod dalam DataFrame. Anda boleh merujuk kepada ini tutorial yang menerangkan cara membuat VIEW dalam Spark – SQL.
import pysparkdaripada pyspark.sql import SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()
# Baca fail parket ke dalam dataframe_from_parket objek.
dataframe_from_parquet=linuxhint_spark_app.read.parket( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parket' )
# Cipta Paparan daripada fail parket di atas bernama - 'Sektor'
dataframe_from_parquet.createOrReplaceTempView( 'Sektor' )
# Pertanyaan untuk memaparkan semua rekod daripada Sektor
linuxhint_spark_app.sql( 'pilih * daripada Sektor' ).tunjukkan()
Pengeluaran:
Contoh 2:
Menggunakan VIEW sebelumnya, tulis pertanyaan SQL:
- Untuk memaparkan semua rekod daripada Sektor milik 'India'.
- Untuk memaparkan semua rekod daripada Sektor dengan pekerja yang melebihi 100.
linuxhint_spark_app.sql( 'pilih * daripada Sektor di mana Kawasan='India'' ).tunjukkan()
# Pertanyaan untuk memaparkan semua rekod daripada Sektor dengan pekerja melebihi 100 orang
linuxhint_spark_app.sql( 'pilih * daripada Sektor di mana Jumlah_pekerja>100' ).tunjukkan()
Pengeluaran:
Terdapat hanya satu rekod dengan kawasan iaitu 'India' dan dua rekod dengan pekerja yang melebihi 100 orang.
Baca Fail Parket ke SQL PySpark
Pertama, kita perlu mencipta VIEW menggunakan arahan CREATE. Menggunakan kata kunci 'laluan' dalam pertanyaan SQL, kita boleh membaca fail parket ke Spark SQL. Selepas laluan, kita perlu menentukan nama fail/lokasi fail.
Sintaks:
spark_app.sql( 'CIPTA TEMPORARY VIEW view_name MENGGUNAKAN PILIHAN parket (laluan ' nama_fail.parket ')' )Contoh 1:
Buat paparan sementara bernama 'Sector2' dan baca fail parket ke dalamnya. Menggunakan fungsi sql(), tulis pertanyaan pilih untuk memaparkan semua rekod yang terdapat dalam paparan.
import pysparkdaripada pyspark.sql import SparkSession, Row
linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()
# Baca fail parket ke dalam Spark- SQL
linuxhint_spark_app.sql( 'BUAT PANDANGAN SEMENTARA Sektor2 MENGGUNAKAN PILIHAN parket (laluan ' bahagian-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parket ')' )
# Pertanyaan untuk memaparkan semua rekod daripada Sektor2
linuxhint_spark_app.sql( 'pilih * daripada Sektor2' ).tunjukkan()
Pengeluaran:
Contoh 2:
Gunakan VIEW sebelumnya dan tulis pertanyaan untuk memaparkan semua rekod dengan penarafan 'Panas' atau 'Sejuk'.
# Pertanyaan untuk memaparkan semua rekod dari Sektor2 dengan Penilaian- Panas atau Sejuk.linuxhint_spark_app.sql( 'pilih * dari Sektor2 di mana Rating='Panas' ATAU Rating='Sejuk'' ).tunjukkan()
Pengeluaran:
Terdapat tiga rekod dengan penarafan 'Panas' atau 'Sejuk'.
Kesimpulan
Dalam PySpark, fungsi write.parquet() menulis DataFrame ke fail parket. Fungsi read.parquet() membaca fail parket ke PySpark DataFrame atau mana-mana DataSource lain. Kami belajar cara membaca fail parket ke dalam PySpark DataFrame dan ke dalam jadual PySpark. Sebagai sebahagian daripada tutorial ini, kami juga membincangkan cara mencipta jadual daripada PySpark DataFrame dan menapis data menggunakan klausa WHERE.