Apache Kafka

Kā lasīt datus no Kafka ar Python

Kā lasīt datus no Kafka ar Python
Kafka ir atvērtā koda izplatīta ziņojumapmaiņas sistēma, lai ziņojumu nosūtītu sadalītās un dažādās tēmās. Datu straumēšanu reāllaikā var ieviest, izmantojot Kafka, lai saņemtu datus starp lietojumprogrammām. Tam ir trīs galvenās daļas. Tās ir ražotājs, patērētājs un tēmas. Ražotājs tiek izmantots, lai nosūtītu ziņojumu uz noteiktu tēmu, un katrs ziņojums ir pievienots ar atslēgu. Patērētājs tiek izmantots, lai lasītu ziņojumu par konkrētu tēmu no nodalījumu kopas. Dati, kas saņemti no ražotāja un saglabāti nodalījumos, pamatojoties uz noteiktu tēmu. Python pastāv daudzas bibliotēkas, lai radītu ražotāju un patērētāju, lai izveidotu ziņojumapmaiņas sistēmu, izmantojot Kafka. Kā apmācības laikā tiek parādīti Kafka dati, izmantojot pitonu, ir parādīts šajā apmācībā.

Priekšnoteikums

Jums ir jāinstalē nepieciešamā pitona bibliotēka, lai lasītu datus no Kafka. Šajā apmācībā Python3 tiek izmantots, lai rakstītu patērētāja un ražotāja skriptu. Ja jūsu Linux operētājsistēmā pip pakotne vēl nav instalēta, pirms Python bibliotēkas instalēšanas jums jāinstalē pip. python3-kafka tiek izmantots šajā apmācībā, lai nolasītu datus no Kafka. Lai instalētu bibliotēku, izpildiet šo komandu.

$ pip instalējiet python3-kafka

Vienkāršu teksta datu lasīšana no Kafka

No ražotāja par konkrētu tēmu var nosūtīt dažāda veida datus, kurus patērētājs var izlasīt. Šajā apmācības daļā ir parādīts, kā no Kafka var nosūtīt un saņemt vienkāršus teksta datus, izmantojot ražotāju un patērētāju.

Izveidojiet failu ar nosaukumu ražotājs1.py ar šādu pitona skriptu. KafkaProducents modulis tiek importēts no Kafka bibliotēkas. Brokeru sarakstam jānosaka ražotāja objekta inicializācijas laikā, lai izveidotu savienojumu ar Kafka serveri. Kafkas noklusējuma osta ir9092". bootstrap_servers arguments tiek izmantots, lai definētu resursdatora nosaukumu ar portu. "First_Topic'ir iestatīts kā tēmas nosaukums, ar kuru īsziņa tiks nosūtīta no producenta. Tālāk vienkārša īsziņaSveiks no Kafkas'tiek nosūtīts, izmantojot nosūtīt () metode KafkaRažotājs uz tēmu, "First_Topic".

ražotājs1.py:

# Importējiet KafkaProducer no Kafka bibliotēkas
no kafka importa KafkaProducer
# Definējiet serveri ar portu
bootstrap_servers = ['localhost: 9092']
# Definējiet tēmas nosaukumu, kur ziņojums tiks publicēts
topicName = 'First_Topic'
# Inicializēt ražotāja mainīgo
producents = KafkaProducer (sāknēšanas_serveri = sāknēšanas_serveri)
# Publicēt tekstu definētajā tēmā
ražotājs.sūtīt (topicName, b'Hello from kafka… ')
# Drukāt ziņojumu
drukāt ("Nosūtīts ziņojums")

Izveidojiet failu ar nosaukumu patērētājs1.py ar šādu pitona skriptu. KafkaPatērētājs modulis tiek importēts no Kafka bibliotēkas, lai nolasītu datus no Kafka. sys modulis tiek izmantots šeit, lai pārtrauktu skriptu. Patērētāja skriptā tiek izmantots viens un tas pats ražotāja resursdatora nosaukums un porta numurs, lai nolasītu datus no Kafka. Patērētāja un ražotāja tēmas nosaukumam jābūt tādam pašam kāPirmā_tēma".  Pēc tam patērētāja objekts tiek inicializēts ar trim argumentiem. Tēmas nosaukums, grupas ID un servera informācija. priekš cilpa šeit tiek izmantota, lai lasītu Kafka producenta nosūtīto tekstu.

patērētājs1.py:

# Importējiet KafkaConsumer no Kafka bibliotēkas
no kafka importa KafkaConsumer
# Importēt sys moduli
importa sys
# Definējiet serveri ar portu
bootstrap_servers = ['localhost: 9092']
# Definējiet tēmas nosaukumu, no kurienes tiks saņemts ziņojums
topicName = 'First_Topic'
# Inicializējiet patērētāja mainīgo
patērētājs = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Lasiet un drukājiet patērētāja ziņojumu
msg patērētājam:
drukāt ("Tēmas nosaukums =% s, Ziņojums =% s"% (ziņ.tēma, ziņ.vērtība))
# Pārtraukt skriptu
sys.Izeja()

Izeja:

Izpildiet šo komandu no viena termināļa, lai izpildītu producenta skriptu.

$ python3 producents1.py

Pēc ziņojuma nosūtīšanas parādīsies šāda izeja.

Palaidiet šo komandu no cita termināļa, lai izpildītu patērētāja skriptu.

$ python3 patērētājs1.py

Izvade parāda tēmas nosaukumu un no producenta nosūtīto īsziņu.

Lasot JSON formatētos datus no Kafka

JSON formatētos datus Kafka ražotājs var nosūtīt, un Kafka patērētājs tos var nolasīt, izmantojot Džonss pitona modulis. Kā JSON datus var sērijveidot un noņemt no sērijas pirms datu nosūtīšanas un saņemšanas, izmantojot moduli python-kafka, ir parādīts šajā apmācības daļā.

Izveidojiet pitona skriptu ar nosaukumu ražotājs2.py ar šādu skriptu. Cits modulis ar nosaukumu JSON tiek importēts ar KafkaProducents modulis šeit. value_serializer arguments tiek izmantots ar sāknēšanas_serveri arguments šeit, lai inicializētu Kafka ražotāja objektu. Šis arguments norāda, ka JSON dati tiks kodēti, izmantojotutf-8'rakstzīmju kopa nosūtīšanas laikā. Pēc tam JSON formatētie dati tiek nosūtīti uz nosaukto tēmu JSONtopic.

ražotājs2.py:

# Importējiet KafkaProducer no Kafka bibliotēkas
no kafka importa KafkaProducer
# Importējiet JSON moduli, lai datus serializētu
importa json
# Inicializējiet ražotāja mainīgo un iestatiet parametru JSON kodējumam
producents = KafkaProducer (sāknēšanas_serveri =
['localhost: 9092'], value_serializer = lambda v: json.izgāztuves (v).kodēt ('utf-8'))
# Nosūtiet datus JSON formātā
ražotājs.sūtīt ('JSONtopic', 'name': 'fahmida', 'email': '[email protected]')
 
# Drukāt ziņojumu
drukāt ("Ziņojums nosūtīts JSONtopic")

Izveidojiet pitona skriptu ar nosaukumu patērētājs2.py ar šādu skriptu. KafkaPatērētājs, sys un JSON moduļi tiek importēti šajā skriptā. KafkaPatērētājs modulis tiek izmantots, lai nolasītu JSON formatētus datus no Kafka. JSON moduli izmanto, lai atšifrētu kodētos JSON datu sūtījumus no Kafka ražotāja. Sys modulis tiek izmantots, lai izbeigtu skriptu. value_deserializer arguments tiek izmantots ar sāknēšanas_serveri lai noteiktu, kā tiks atšifrēti JSON dati. Nākamais, priekš cilpa tiek izmantota, lai drukātu visus no Kafka iegūtos patērētāju ierakstus un JSON datus.

patērētājs2.py:

# Importējiet KafkaConsumer no Kafka bibliotēkas
no kafka importa KafkaConsumer
# Importēt sys moduli
importa sys
# Importējiet Json moduli, lai datus sērijveidotu
importa json
# Inicializējiet patērētāja mainīgo un iestatiet rekvizītu JSON atšifrēšanai
patērētājs = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.slodzes (m.atšifrēt ('utf-8')))
# Lasiet kafka datus
ziņojumam patērētājam:
drukāt ("Patērētāju ieraksti: \ n")
izdrukāt (ziņojumu)
drukāt ("\ nLasīšana no JSON datiem \ n")
drukāt ("Nosaukums:", ziņojums [6] ['vārds'])
drukāt ("E-pasts:", ziņojums [6] ['e-pasts'])
# Pārtraukt skriptu
sys.Izeja()

Izeja:

Izpildiet šo komandu no viena termināļa, lai izpildītu producenta skriptu.

$ python3 producents2.py

Pēc JSON datu nosūtīšanas skripts izdrukās šādu ziņojumu.

Palaidiet šo komandu no cita termināļa, lai izpildītu patērētāja skriptu.

$ python3 patērētājs2.py

Pēc skripta palaišanas parādīsies šāda izeja.

Secinājums:

Datus no Kafka var nosūtīt un saņemt dažādos formātos, izmantojot pitonu. Datus var arī saglabāt datu bāzē un iegūt no datu bāzes, izmantojot Kafka un Python. Es mājās, šī apmācība palīdzēs pitona lietotājam sākt strādāt ar Kafku.

Top 5 spēļu tveršanas kartes
Mēs visi esam redzējuši un mīlējuši straumēšanas spēles pakalpojumā YouTube. PewDiePie, Jakesepticye un Markiplier ir tikai daži no labākajiem spēlētā...
Kā izstrādāt spēli Linux
Pirms desmit gadiem maz Linux lietotāju varētu paredzēt, ka viņu iecienītā operētājsistēma kādu dienu būs populāra spēļu platforma komerciālām videosp...
Komerciālo spēļu dzinēju atvērtā koda porti
Bezmaksas, atvērtā koda un starpplatformu spēļu dzinēju atpūtu var izmantot, lai spēlētu vecos, kā arī dažus no diezgan nesenajiem spēļu nosaukumiem. ...