PySpark Read.Parquet()

Pyspark Read Parquet



Dalam PySpark, fungsi write.parquet() menulis DataFrame ke fail parket dan read.parquet() membaca fail parket ke PySpark DataFrame atau mana-mana DataSource lain. Untuk memproses lajur dalam Apache Spark dengan cepat dan cekap, kami perlu memampatkan data. Pemampatan data menjimatkan memori kami dan semua lajur ditukar kepada tahap rata. Ini bermakna storan aras lajur rata wujud. Fail yang menyimpan ini dikenali sebagai fail PARQUET.

Dalam panduan ini, kami akan memberi tumpuan terutamanya pada membaca/memuatkan fail parket ke dalam PySpark DataFrame/SQL menggunakan fungsi read.parquet() yang tersedia dalam kelas pyspark.sql.DataFrameReader.

Topik Kandungan:







Dapatkan Fail Parket



Baca Fail Parket ke PySpark DataFrame



Baca Fail Parket ke SQL PySpark





Pyspark.sql.DataFrameReader.parket()

Fungsi ini digunakan untuk membaca fail parket dan memuatkannya ke dalam PySpark DataFrame. Ia mengambil nama laluan/fail bagi fail parket. Kita hanya boleh menggunakan fungsi read.parquet() kerana ini adalah fungsi generik.

Sintaks:



Mari lihat sintaks read.parquet():

spark_app.read.parket(nama_fail.parket/path)

Mula-mula, pasang modul PySpark menggunakan arahan pip:

pip pasang pyspark

Dapatkan Fail Parket

Untuk membaca fail parket, anda memerlukan data di mana fail parket dijana daripada data tersebut. Dalam bahagian ini, kita akan melihat cara menjana fail parket daripada PySpark DataFrame.

Mari buat PySpark DataFrame dengan 5 rekod dan tulis ini pada fail parket 'industri_parket'.

import pyspark

daripada pyspark.sql import SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

# cipta kerangka data yang menyimpan butiran Industri

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Pertanian' , Luas = 'USA' ,
Penilaian= 'Panas' ,Jumlah_pekerja= 100 ),

Baris(Jenis= 'Pertanian' , Luas = 'India' ,Penilaian= 'Panas' ,Jumlah_pekerja= 200 ),

Baris(Jenis= 'Pembangunan' , Luas = 'USA' ,Penilaian= 'Suam' ,Jumlah_pekerja= 100 ),

Baris(Jenis= 'Pendidikan' ,Kawasan= 'USA' ,Penilaian= 'Sejuk' ,Jumlah_pekerja= 400 ),

Baris(Jenis= 'Pendidikan' , Luas = 'USA' ,Penilaian= 'Suam' ,Jumlah_pekerja= dua puluh )

])

# DataFrame Sebenar

industry_df.show()

# Tulis industri_df pada fail parket

industry_df.coalesce( 1 ).tulis.parket( 'parket_industri' )

Pengeluaran:

Ini ialah DataFrame yang memegang 5 rekod.

Fail parket dicipta untuk DataFrame sebelumnya. Di sini, nama fail kami dengan sambungan ialah 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parket'. Kami menggunakan fail ini dalam keseluruhan tutorial.

Baca Fail Parket ke PySpark DataFrame

Kami mempunyai fail parket. Mari baca fail ini menggunakan fungsi read.parquet() dan muatkannya ke dalam PySpark DataFrame.

import pyspark

daripada pyspark.sql import SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

# Baca fail parket ke dalam dataframe_from_parket objek.

dataframe_from_parquet=linuxhint_spark_app.read.parket( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parket' )

# Paparkan dataframe_from_parket-DataFrame

dataframe_from_parquet.show()

Pengeluaran:

Kami memaparkan DataFrame menggunakan kaedah show() yang dicipta daripada fail parket.

Pertanyaan SQL dengan Fail Parket

Selepas memuatkan ke dalam DataFrame, anda boleh membuat jadual SQL dan memaparkan data yang terdapat dalam DataFrame. Kita perlu mencipta PANDANGAN SEMENTARA dan menggunakan arahan SQL untuk mengembalikan rekod daripada DataFrame yang dicipta daripada fail parket.

Contoh 1:

Buat paparan sementara bernama 'Sektor' dan gunakan arahan SELECT untuk memaparkan rekod dalam DataFrame. Anda boleh merujuk kepada ini tutorial yang menerangkan cara membuat VIEW dalam Spark – SQL.

import pyspark

daripada pyspark.sql import SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

# Baca fail parket ke dalam dataframe_from_parket objek.

dataframe_from_parquet=linuxhint_spark_app.read.parket( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parket' )

# Cipta Paparan daripada fail parket di atas bernama - 'Sektor'

dataframe_from_parquet.createOrReplaceTempView( 'Sektor' )

# Pertanyaan untuk memaparkan semua rekod daripada Sektor

linuxhint_spark_app.sql( 'pilih * daripada Sektor' ).tunjukkan()

Pengeluaran:

Contoh 2:

Menggunakan VIEW sebelumnya, tulis pertanyaan SQL:

  1. Untuk memaparkan semua rekod daripada Sektor milik 'India'.
  2. Untuk memaparkan semua rekod daripada Sektor dengan pekerja yang melebihi 100.
# Pertanyaan untuk memaparkan semua rekod daripada Sektor milik 'India'.

linuxhint_spark_app.sql( 'pilih * daripada Sektor di mana Kawasan='India'' ).tunjukkan()

# Pertanyaan untuk memaparkan semua rekod daripada Sektor dengan pekerja melebihi 100 orang

linuxhint_spark_app.sql( 'pilih * daripada Sektor di mana Jumlah_pekerja>100' ).tunjukkan()

Pengeluaran:

Terdapat hanya satu rekod dengan kawasan iaitu 'India' dan dua rekod dengan pekerja yang melebihi 100 orang.

Baca Fail Parket ke SQL PySpark

Pertama, kita perlu mencipta VIEW menggunakan arahan CREATE. Menggunakan kata kunci 'laluan' dalam pertanyaan SQL, kita boleh membaca fail parket ke Spark SQL. Selepas laluan, kita perlu menentukan nama fail/lokasi fail.

Sintaks:

spark_app.sql( 'CIPTA TEMPORARY VIEW view_name MENGGUNAKAN PILIHAN parket (laluan ' nama_fail.parket ')' )

Contoh 1:

Buat paparan sementara bernama 'Sector2' dan baca fail parket ke dalamnya. Menggunakan fungsi sql(), tulis pertanyaan pilih untuk memaparkan semua rekod yang terdapat dalam paparan.

import pyspark

daripada pyspark.sql import SparkSession, Row

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

# Baca fail parket ke dalam Spark- SQL

linuxhint_spark_app.sql( 'BUAT PANDANGAN SEMENTARA Sektor2 MENGGUNAKAN PILIHAN parket (laluan ' bahagian-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parket ')' )

# Pertanyaan untuk memaparkan semua rekod daripada Sektor2

linuxhint_spark_app.sql( 'pilih * daripada Sektor2' ).tunjukkan()

Pengeluaran:

Contoh 2:

Gunakan VIEW sebelumnya dan tulis pertanyaan untuk memaparkan semua rekod dengan penarafan 'Panas' atau 'Sejuk'.

# Pertanyaan untuk memaparkan semua rekod dari Sektor2 dengan Penilaian- Panas atau Sejuk.

linuxhint_spark_app.sql( 'pilih * dari Sektor2 di mana Rating='Panas' ATAU Rating='Sejuk'' ).tunjukkan()

Pengeluaran:

Terdapat tiga rekod dengan penarafan 'Panas' atau 'Sejuk'.

Kesimpulan

Dalam PySpark, fungsi write.parquet() menulis DataFrame ke fail parket. Fungsi read.parquet() membaca fail parket ke PySpark DataFrame atau mana-mana DataSource lain. Kami belajar cara membaca fail parket ke dalam PySpark DataFrame dan ke dalam jadual PySpark. Sebagai sebahagian daripada tutorial ini, kami juga membincangkan cara mencipta jadual daripada PySpark DataFrame dan menapis data menggunakan klausa WHERE.