Cara Melaksanakan Penstriman Data Masa Nyata dalam Python

Cara Melaksanakan Penstriman Data Masa Nyata Dalam Python



Menguasai pelaksanaan penstriman data masa nyata dalam Python bertindak sebagai kemahiran penting dalam dunia yang melibatkan data hari ini. Panduan ini meneroka langkah teras dan alatan penting untuk menggunakan penstriman data masa nyata dengan keaslian dalam Python. Daripada memilih rangka kerja yang sesuai seperti Apache Kafka atau Apache Pulsar kepada menulis kod Python untuk penggunaan data yang mudah, pemprosesan dan visualisasi yang berkesan, kami akan memperoleh kemahiran yang diperlukan untuk membina saluran data masa nyata yang tangkas dan cekap.

Contoh 1: Pelaksanaan Penstriman Data Masa Nyata dalam Python

Melaksanakan penstriman data masa nyata dalam Python adalah penting dalam zaman dan dunia yang dipacu data hari ini. Dalam contoh terperinci ini, kami akan melalui proses membina sistem penstriman data masa nyata menggunakan Apache Kafka dan Python dalam Google Colab.







Untuk memulakan contoh sebelum kami memulakan pengekodan, membina persekitaran khusus dalam Google Colab adalah penting. Perkara pertama yang perlu kita lakukan ialah memasang perpustakaan yang diperlukan. Kami menggunakan perpustakaan 'kafka-python' untuk penyepaduan Kafka.



! pip pasang kafka-python


Perintah ini memasang perpustakaan 'kafka-python' yang menyediakan fungsi Python dan pengikatan untuk Apache Kafka. Seterusnya, kami mengimport perpustakaan yang diperlukan untuk projek kami. Mengimport perpustakaan yang diperlukan termasuk 'KafkaProducer' dan 'KafkaConsumer' ialah kelas daripada perpustakaan 'kafka-python' yang membolehkan kami berinteraksi dengan broker Kafka. JSON ialah perpustakaan Python untuk berfungsi dengan data JSON yang kami gunakan untuk mensiri dan menyahsiri mesej.



daripada kafka import KafkaProducer, KafkaConsumer
import json


Penciptaan Penerbit Kafka





Ini penting kerana pengeluar Kafka menghantar data ke topik Kafka. Dalam contoh kami, kami mencipta pengeluar untuk menghantar data masa nyata simulasi kepada topik yang dipanggil 'topik masa nyata.'

Kami mencipta contoh 'KafkaProducer' yang menentukan alamat broker Kafka sebagai 'localhost:9092'. Kemudian, kami menggunakan 'value_serializer', fungsi yang menyerikan data sebelum menghantarnya ke Kafka. Dalam kes kami, fungsi lambda mengekod data sebagai JSON yang dikodkan UTF-8. Sekarang, mari kita simulasi beberapa data masa nyata dan hantar ke topik Kafka.



penerbit = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( dalam ) .encode ( 'utf-8' ) )
# Data masa nyata simulasi
data = { 'sensor_id' : 1 , 'suhu' : 25.5 , 'kelembapan' : 60.2 }
# Menghantar data ke topik
pengeluar.menghantar ( 'topik masa nyata' , data )


Dalam baris ini, kami mentakrifkan kamus 'data' yang mewakili data sensor simulasi. Kami kemudian menggunakan kaedah 'hantar' untuk menerbitkan data ini kepada 'topik masa nyata'.

Kemudian, kami ingin mencipta pengguna Kafka dan pengguna Kafka membaca data daripada topik Kafka. Kami mencipta pengguna untuk menggunakan dan memproses mesej dalam 'topik masa nyata'. Kami mencipta contoh 'KafkaConsumer', menyatakan topik yang ingin kami gunakan, cth., (topik masa nyata) dan alamat broker Kafka. Kemudian, 'value_deserializer' ialah fungsi yang menyahsiri data yang diterima daripada Kafka. Dalam kes kami, fungsi lambda menyahkod data sebagai JSON yang dikodkan UTF-8.

pengguna = KafkaConsumer ( 'topik masa nyata' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.nyahkod ( 'utf-8' ) ) )


Kami menggunakan gelung berulang untuk terus menggunakan dan memproses mesej daripada topik.

# Membaca dan memproses data masa nyata
untuk mesej dalam pengguna:
data = mesej.nilai
cetak ( f 'Data Diterima: {data}' )


Kami mendapatkan semula nilai setiap mesej dan data penderia simulasi kami di dalam gelung dan mencetaknya ke konsol. Menjalankan pengeluar dan pengguna Kafka melibatkan menjalankan kod ini dalam Google Colab dan melaksanakan sel kod secara individu. Pengeluar menghantar data simulasi ke topik Kafka, dan pengguna membaca dan mencetak data yang diterima.


Analisis Output semasa Kod Berjalan

Kami akan memerhatikan data masa nyata yang sedang dihasilkan dan digunakan. Format data mungkin berbeza-beza bergantung pada simulasi kami atau sumber data sebenar. Dalam contoh terperinci ini, kami merangkumi keseluruhan proses menyediakan sistem penstriman data masa nyata menggunakan Apache Kafka dan Python dalam Google Colab. Kami akan menerangkan setiap baris kod dan kepentingannya dalam membina sistem ini. Penstriman data masa nyata ialah keupayaan yang berkuasa, dan contoh ini berfungsi sebagai asas untuk aplikasi dunia sebenar yang lebih kompleks.

Contoh 2: Melaksanakan Penstriman Data Masa Nyata dalam Python Menggunakan Data Pasaran Saham

Mari kita lakukan satu lagi contoh unik untuk melaksanakan penstriman data masa nyata dalam Python menggunakan senario yang berbeza; kali ini, kita akan fokus pada data pasaran saham. Kami mencipta sistem penstriman data masa nyata yang menangkap perubahan harga saham dan memprosesnya menggunakan Apache Kafka dan Python dalam Google Colab. Seperti yang ditunjukkan dalam contoh sebelumnya, kita mulakan dengan mengkonfigurasi persekitaran kita dalam Google Colab. Pertama, kami memasang perpustakaan yang diperlukan:

! pip pasang kafka-python yfinance


Di sini, kami menambah perpustakaan 'yfinance' yang membolehkan kami mendapatkan data pasaran saham masa nyata. Seterusnya, kami mengimport perpustakaan yang diperlukan. Kami terus menggunakan kelas 'KafkaProducer' dan 'KafkaConsumer' daripada perpustakaan 'kafka-python' untuk interaksi Kafka. Kami mengimport JSON untuk berfungsi dengan data JSON. Kami juga menggunakan 'yfinance' untuk mendapatkan data pasaran saham masa nyata. Kami juga mengimport pustaka 'masa' untuk menambah kelewatan masa untuk mensimulasikan kemas kini masa nyata.

daripada kafka import KafkaProducer, KafkaConsumer
import json
import yfinance sebagai yf
import masa


Kini, kami mencipta pengeluar Kafka untuk data stok. Pengeluar Kafka kami mendapat data stok masa nyata dan menghantarnya ke topik Kafka bernama 'harga saham'.

penerbit = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( dalam ) .encode ( 'utf-8' ) )

sementara Benar:
stok = yf.Ticker ( 'AAPL' ) # Contoh: Saham Apple Inc
data_stok = stok.sejarah ( tempoh = '1h' )
harga_akhir = data_stok [ 'Tutup' ] .iloc [ - 1 ]
data = { 'simbol' : 'AAPL' , 'harga' : harga terakhir }
pengeluar.menghantar ( 'harga stok' , data )
masa.tidur ( 10 ) # Simulasikan kemas kini masa nyata setiap 10 saat


Kami mencipta contoh 'KafkaProducer' dengan alamat broker Kafka dalam kod ini. Di dalam gelung, kami menggunakan 'yfinance' untuk mendapatkan harga saham terkini untuk Apple Inc. ('AAPL'). Kemudian, kami mengeluarkan harga penutup terakhir dan menghantarnya ke topik 'harga saham'. Akhirnya, kami memperkenalkan kelewatan masa untuk mensimulasikan kemas kini masa nyata setiap 10 saat.

Mari cipta pengguna Kafka untuk membaca dan memproses data harga saham daripada topik 'harga saham'.

pengguna = KafkaConsumer ( 'harga stok' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.nyahkod ( 'utf-8' ) ) )

untuk mesej dalam pengguna:
stok_data = mesej.nilai
cetak ( f 'Data Stok Diterima: {stock_data['symbol']} - Harga: {stock_data['price']}' )


Kod ini serupa dengan persediaan pengguna contoh sebelumnya. Ia terus membaca dan memproses mesej daripada topik 'harga saham' dan mencetak simbol dan harga saham ke konsol. Kami melaksanakan sel kod secara berurutan, cth., satu demi satu dalam Google Colab untuk menjalankan pengeluar dan pengguna. Pengeluar mendapat dan menghantar kemas kini harga saham masa nyata sementara pengguna membaca dan memaparkan data ini.

! pip pasang kafka-python yfinance
daripada kafka import KafkaProducer, KafkaConsumer
import json
import yfinance sebagai yf
import masa
penerbit = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( dalam ) .encode ( 'utf-8' ) )

sementara Benar:
stok = yf.Ticker ( 'AAPL' ) # Saham Apple Inc
data_stok = stok.sejarah ( tempoh = '1h' )
harga_akhir = data_stok [ 'Tutup' ] .iloc [ - 1 ]

data = { 'simbol' : 'AAPL' , 'harga' : harga terakhir }

pengeluar.menghantar ( 'harga stok' , data )

masa.tidur ( 10 ) # Simulasikan kemas kini masa nyata setiap 10 saat
pengguna = KafkaConsumer ( 'harga stok' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.nyahkod ( 'utf-8' ) ) )

untuk mesej dalam pengguna:
stok_data = mesej.nilai
cetak ( f 'Data Stok Diterima: {stock_data['symbol']} - Harga: {stock_data['price']}' )


Dalam analisis output selepas kod dijalankan, kami akan melihat kemas kini harga saham masa nyata untuk Apple Inc. dihasilkan dan digunakan.

Kesimpulan

Dalam contoh unik ini, kami menunjukkan pelaksanaan penstriman data masa nyata dalam Python menggunakan Apache Kafka dan perpustakaan 'yfinance' untuk menangkap dan memproses data pasaran saham. Kami menerangkan dengan teliti setiap baris kod. Penstriman data masa nyata boleh digunakan pada pelbagai bidang untuk membina aplikasi dunia sebenar dalam kewangan, IoT dan banyak lagi.