PySpark Pandas_Udf()

Pyspark Pandas Udf



Mengubah PySpark DataFrame boleh dilakukan menggunakan fungsi pandas_udf(). Ia adalah fungsi yang ditentukan pengguna yang digunakan pada PySpark DataFrame dengan anak panah. Kita boleh melakukan operasi bervektor menggunakan pandas_udf(). Ia boleh dilaksanakan dengan melepasi fungsi ini sebagai penghias. Mari selami panduan ini untuk mengetahui sintaks, parameter dan contoh yang berbeza.

Topik Kandungan:

Jika anda ingin mengetahui tentang PySpark DataFrame dan pemasangan modul, lakukan ini artikel .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () tersedia dalam modul sql.functions dalam PySpark yang boleh diimport menggunakan kata kunci 'dari'. Ia digunakan untuk melaksanakan operasi vektor pada PySpark DataFrame kami. Fungsi ini dilaksanakan seperti penghias dengan melepasi tiga parameter. Selepas itu, kami boleh mencipta fungsi yang ditentukan pengguna yang mengembalikan data dalam format vektor (seperti kami menggunakan siri/NumPy untuk ini) menggunakan anak panah. Dalam fungsi ini, kami dapat mengembalikan hasilnya.



Struktur & Sintaks:



Pertama, mari kita lihat struktur dan sintaks fungsi ini:

@pandas_udf(datatype)
def function_name(operasi) -> convert_format:
penyata pulangan

Di sini, nama_fungsi ialah nama fungsi kami yang ditentukan. Jenis data menentukan jenis data yang dikembalikan oleh fungsi ini. Kami boleh mengembalikan hasil menggunakan kata kunci 'kembali'. Semua operasi dilakukan di dalam fungsi dengan tugasan anak panah.





Pandas_udf (Fungsi dan ReturnType)

  1. Parameter pertama ialah fungsi yang ditentukan pengguna yang dihantar kepadanya.
  2. Parameter kedua digunakan untuk menentukan jenis data pulangan daripada fungsi.

Data:

Dalam keseluruhan panduan ini, kami hanya menggunakan satu PySpark DataFrame untuk demonstrasi. Semua fungsi yang ditentukan pengguna yang kami tentukan digunakan pada PySpark DataFrame ini. Pastikan anda mencipta DataFrame ini dalam persekitaran anda terlebih dahulu selepas pemasangan PySpark.



import pyspark

daripada pyspark.sql import SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Petunjuk Linux' ).getOrCreate()

daripada pyspark.sql.functions import pandas_udf

daripada pyspark.sql.types import *

import panda sebagai panda

# butiran sayur

sayur =[{ 'jenis' : 'sayur' , 'nama' : 'tomato' , 'locate_country' : 'USA' , 'kuantiti' : 800 },

{ 'jenis' : 'buah' , 'nama' : 'pisang' , 'locate_country' : 'CINA' , 'kuantiti' : dua puluh },

{ 'jenis' : 'sayur' , 'nama' : 'tomato' , 'locate_country' : 'USA' , 'kuantiti' : 800 },

{ 'jenis' : 'sayur' , 'nama' : 'Mangga' , 'locate_country' : 'JEPUN' , 'kuantiti' : 0 },

{ 'jenis' : 'buah' , 'nama' : 'lemon' , 'locate_country' : 'INDIA' , 'kuantiti' : 1700 },

{ 'jenis' : 'sayur' , 'nama' : 'tomato' , 'locate_country' : 'USA' , 'kuantiti' : 1200 },

{ 'jenis' : 'sayur' , 'nama' : 'Mangga' , 'locate_country' : 'JEPUN' , 'kuantiti' : 0 },

{ 'jenis' : 'buah' , 'nama' : 'lemon' , 'locate_country' : 'INDIA' , 'kuantiti' : 0 }

]

# cipta kerangka data pasaran daripada data di atas

market_df = linuxhint_spark_app.createDataFrame(sayur)

market_df.show()

Pengeluaran:

Di sini, kami mencipta DataFrame ini dengan 4 lajur dan 8 baris. Sekarang, kami menggunakan pandas_udf() untuk mencipta fungsi yang ditentukan pengguna dan menggunakannya pada lajur ini.

Pandas_udf() dengan Jenis Data Berbeza

Dalam senario ini, kami mencipta beberapa fungsi yang ditentukan pengguna dengan pandas_udf() dan menerapkannya pada lajur dan memaparkan keputusan menggunakan kaedah select(). Dalam setiap kes, kami menggunakan panda.Series semasa kami melakukan operasi vektor. Ini menganggap nilai lajur sebagai tatasusunan satu dimensi dan operasi digunakan pada lajur. Dalam penghias itu sendiri, kami menentukan jenis pemulangan fungsi.

Contoh 1: Pandas_udf() dengan Jenis Rentetan

Di sini, kami mencipta dua fungsi yang ditentukan pengguna dengan jenis pulangan rentetan untuk menukar nilai lajur jenis rentetan kepada huruf besar dan huruf kecil. Akhir sekali, kami menggunakan fungsi ini pada lajur 'jenis' dan 'locate_country'.

# Tukar lajur jenis kepada huruf besar dengan pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

kembali i.str.upper()

# Tukar lajur locate_country kepada huruf kecil dengan pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

kembali i.str.lower()

# Paparkan lajur menggunakan select()

market_df.select( 'jenis' ,type_upper_case( 'jenis' ), 'locate_country' ,
negara_huruf_kecil( 'locate_country' )).tunjukkan()

Pengeluaran:

Penjelasan:

Fungsi StringType() tersedia dalam modul pyspark.sql.types. Kami telah mengimport modul ini semasa mencipta PySpark DataFrame.

  1. Pertama, UDF (fungsi yang ditentukan pengguna) mengembalikan rentetan dalam huruf besar menggunakan fungsi str.upper(). str.upper() tersedia dalam Struktur Data Siri (kerana kami menukar kepada siri dengan anak panah di dalam fungsi) yang menukar rentetan yang diberikan kepada huruf besar. Akhir sekali, fungsi ini digunakan pada lajur 'jenis' yang dinyatakan dalam kaedah pilih(). Sebelum ini, semua rentetan dalam lajur jenis adalah dalam huruf kecil. Kini, mereka ditukar kepada huruf besar.
  2. Kedua, UDF mengembalikan rentetan dalam huruf besar menggunakan fungsi str.lower(). str.lower() tersedia dalam Struktur Data Siri yang menukar rentetan yang diberikan kepada huruf kecil. Akhir sekali, fungsi ini digunakan pada lajur 'jenis' yang dinyatakan dalam kaedah pilih(). Sebelum ini, semua rentetan dalam lajur jenis adalah dalam huruf besar. Kini, ia ditukar kepada huruf kecil.

Contoh 2: Pandas_udf() dengan Jenis Integer

Mari buat UDF yang menukar lajur integer PySpark DataFrame kepada siri Pandas dan tambah 100 pada setiap nilai. Hantar lajur 'kuantiti' ke fungsi ini di dalam kaedah pilih().

# Tambah 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

kembalikan i+ 100

# Hantar lajur kuantiti ke fungsi dan paparan di atas.

market_df.select( 'kuantiti' , add_100( 'kuantiti' )).tunjukkan()

Pengeluaran:

Penjelasan:

Di dalam UDF, kami mengulangi semua nilai dan menukarnya kepada Siri. Selepas itu, kami menambah 100 pada setiap nilai dalam Siri. Akhir sekali, kami menghantar lajur 'kuantiti' ke fungsi ini dan kami dapat melihat bahawa 100 ditambah kepada semua nilai.

Pandas_udf() dengan Jenis Data Berbeza Menggunakan Groupby() & Agg()

Mari lihat contoh untuk menghantar UDF ke lajur agregat. Di sini, nilai lajur dikumpulkan dahulu menggunakan fungsi groupby() dan pengagregatan dilakukan menggunakan fungsi agg(). Kami lulus UDF kami di dalam fungsi agregat ini.

Sintaks:

pyspark_dataframe_object.groupby( 'kumpulan_lajur' ).agg(UDF
(pyspark_dataframe_object[ 'kolum' ]))

Di sini, nilai dalam lajur pengumpulan dikumpulkan dahulu. Kemudian, pengagregatan dilakukan pada setiap data terkumpul berkenaan dengan UDF kami.

Contoh 1: Pandas_udf() dengan Min Agregat()

Di sini, kami mencipta fungsi yang ditentukan pengguna dengan apungan jenis pulangan. Di dalam fungsi, kami mengira purata menggunakan fungsi min(). UDF ini dihantar ke lajur 'kuantiti' untuk mendapatkan kuantiti purata bagi setiap jenis.

# pulangkan min/purata

@pandas_udf( 'terapung' )

def average_function(i: panda.Series) -> float:

kembalikan i.mean()

# Hantar lajur kuantiti kepada fungsi dengan mengumpulkan lajur jenis.

market_df.groupby( 'jenis' ).agg(fungsi_purata(market_df[ 'kuantiti' ])).tunjukkan()

Pengeluaran:

Kami mengumpulkan berdasarkan elemen dalam lajur 'jenis'. Dua kumpulan dibentuk - 'buah' dan 'sayuran'. Bagi setiap kumpulan, min dikira dan dikembalikan.

Contoh 2: Pandas_udf() dengan Agregat Maks() dan Min()

Di sini, kami mencipta dua fungsi yang ditentukan pengguna dengan jenis pulangan integer (int). UDF pertama mengembalikan nilai minimum dan UDF kedua mengembalikan nilai maksimum.

# pandas_udf yang mengembalikan nilai minimum

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

kembali i.min()

# panda_udf yang mengembalikan nilai maksimum

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

pulangkan i.max()

# Hantar lajur kuantiti ke min_ pandas_udf dengan mengumpulkan locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'kuantiti' ])).tunjukkan()

# Hantar lajur kuantiti kepada max_ pandas_udf dengan mengumpulkan locate_country.

market_df.groupby( 'locate_country' ).agg(maks_(pasaran_df[ 'kuantiti' ])).tunjukkan()

Pengeluaran:

Untuk mengembalikan nilai minimum dan maksimum, kami menggunakan fungsi min() dan max() dalam jenis pulangan UDF. Sekarang, kami mengumpulkan data dalam lajur 'locate_country'. Empat kumpulan dibentuk (“CHINA”, “INDIA”, “JEPUN”, “USA”). Untuk setiap kumpulan, kami mengembalikan kuantiti maksimum. Begitu juga, kami memulangkan kuantiti minimum.

Kesimpulan

Pada asasnya, pandas_udf () digunakan untuk melaksanakan operasi bervektor pada PySpark DataFrame kami. Kami telah melihat cara mencipta pandas_udf() dan menggunakannya pada PySpark DataFrame. Untuk pemahaman yang lebih baik, kami membincangkan contoh yang berbeza dengan mempertimbangkan semua jenis data (rentetan, apungan dan integer). Anda boleh menggunakan pandas_udf() dengan groupby() melalui fungsi agg().