Priekšnoteikums
Jums ir jāinstalē nepieciešamā pitona bibliotēka, lai lasītu datus no Kafka. Šajā apmācībā Python3 tiek izmantots, lai rakstītu patērētāja un ražotāja skriptu. Ja jūsu Linux operētājsistēmā pip pakotne vēl nav instalēta, pirms Python bibliotēkas instalēšanas jums jāinstalē pip. python3-kafka tiek izmantots šajā apmācībā, lai nolasītu datus no Kafka. Lai instalētu bibliotēku, izpildiet šo komandu.
$ pip instalējiet python3-kafkaVienkāršu teksta datu lasīšana no Kafka
No ražotāja par konkrētu tēmu var nosūtīt dažāda veida datus, kurus patērētājs var izlasīt. Šajā apmācības daļā ir parādīts, kā no Kafka var nosūtīt un saņemt vienkāršus teksta datus, izmantojot ražotāju un patērētāju.
Izveidojiet failu ar nosaukumu ražotājs1.py ar šādu pitona skriptu. KafkaProducents modulis tiek importēts no Kafka bibliotēkas. Brokeru sarakstam jānosaka ražotāja objekta inicializācijas laikā, lai izveidotu savienojumu ar Kafka serveri. Kafkas noklusējuma osta ir9092". bootstrap_servers arguments tiek izmantots, lai definētu resursdatora nosaukumu ar portu. "First_Topic'ir iestatīts kā tēmas nosaukums, ar kuru īsziņa tiks nosūtīta no producenta. Tālāk vienkārša īsziņaSveiks no Kafkas'tiek nosūtīts, izmantojot nosūtīt () metode KafkaRažotājs uz tēmu, "First_Topic".
ražotājs1.py:
# Importējiet KafkaProducer no Kafka bibliotēkasno kafka importa KafkaProducer
# Definējiet serveri ar portu
bootstrap_servers = ['localhost: 9092']
# Definējiet tēmas nosaukumu, kur ziņojums tiks publicēts
topicName = 'First_Topic'
# Inicializēt ražotāja mainīgo
producents = KafkaProducer (sāknēšanas_serveri = sāknēšanas_serveri)
# Publicēt tekstu definētajā tēmā
ražotājs.sūtīt (topicName, b'Hello from kafka… ')
# Drukāt ziņojumu
drukāt ("Nosūtīts ziņojums")
Izveidojiet failu ar nosaukumu patērētājs1.py ar šādu pitona skriptu. KafkaPatērētājs modulis tiek importēts no Kafka bibliotēkas, lai nolasītu datus no Kafka. sys modulis tiek izmantots šeit, lai pārtrauktu skriptu. Patērētāja skriptā tiek izmantots viens un tas pats ražotāja resursdatora nosaukums un porta numurs, lai nolasītu datus no Kafka. Patērētāja un ražotāja tēmas nosaukumam jābūt tādam pašam kāPirmā_tēma". Pēc tam patērētāja objekts tiek inicializēts ar trim argumentiem. Tēmas nosaukums, grupas ID un servera informācija. priekš cilpa šeit tiek izmantota, lai lasītu Kafka producenta nosūtīto tekstu.
patērētājs1.py:
# Importējiet KafkaConsumer no Kafka bibliotēkasno kafka importa KafkaConsumer
# Importēt sys moduli
importa sys
# Definējiet serveri ar portu
bootstrap_servers = ['localhost: 9092']
# Definējiet tēmas nosaukumu, no kurienes tiks saņemts ziņojums
topicName = 'First_Topic'
# Inicializējiet patērētāja mainīgo
patērētājs = KafkaConsumer (topicName, group_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Lasiet un drukājiet patērētāja ziņojumu
msg patērētājam:
drukāt ("Tēmas nosaukums =% s, Ziņojums =% s"% (ziņ.tēma, ziņ.vērtība))
# Pārtraukt skriptu
sys.Izeja()
Izeja:
Izpildiet šo komandu no viena termināļa, lai izpildītu producenta skriptu.
$ python3 producents1.pyPēc ziņojuma nosūtīšanas parādīsies šāda izeja.
Palaidiet šo komandu no cita termināļa, lai izpildītu patērētāja skriptu.
$ python3 patērētājs1.pyIzvade parāda tēmas nosaukumu un no producenta nosūtīto īsziņu.
Lasot JSON formatētos datus no Kafka
JSON formatētos datus Kafka ražotājs var nosūtīt, un Kafka patērētājs tos var nolasīt, izmantojot Džonss pitona modulis. Kā JSON datus var sērijveidot un noņemt no sērijas pirms datu nosūtīšanas un saņemšanas, izmantojot moduli python-kafka, ir parādīts šajā apmācības daļā.
Izveidojiet pitona skriptu ar nosaukumu ražotājs2.py ar šādu skriptu. Cits modulis ar nosaukumu JSON tiek importēts ar KafkaProducents modulis šeit. value_serializer arguments tiek izmantots ar sāknēšanas_serveri arguments šeit, lai inicializētu Kafka ražotāja objektu. Šis arguments norāda, ka JSON dati tiks kodēti, izmantojotutf-8'rakstzīmju kopa nosūtīšanas laikā. Pēc tam JSON formatētie dati tiek nosūtīti uz nosaukto tēmu JSONtopic.
ražotājs2.py:
# Importējiet KafkaProducer no Kafka bibliotēkasno kafka importa KafkaProducer
# Importējiet JSON moduli, lai datus serializētu
importa json
# Inicializējiet ražotāja mainīgo un iestatiet parametru JSON kodējumam
producents = KafkaProducer (sāknēšanas_serveri =
['localhost: 9092'], value_serializer = lambda v: json.izgāztuves (v).kodēt ('utf-8'))
# Nosūtiet datus JSON formātā
ražotājs.sūtīt ('JSONtopic', 'name': 'fahmida', 'email': '[email protected]')
# Drukāt ziņojumu
drukāt ("Ziņojums nosūtīts JSONtopic")
Izveidojiet pitona skriptu ar nosaukumu patērētājs2.py ar šādu skriptu. KafkaPatērētājs, sys un JSON moduļi tiek importēti šajā skriptā. KafkaPatērētājs modulis tiek izmantots, lai nolasītu JSON formatētus datus no Kafka. JSON moduli izmanto, lai atšifrētu kodētos JSON datu sūtījumus no Kafka ražotāja. Sys modulis tiek izmantots, lai izbeigtu skriptu. value_deserializer arguments tiek izmantots ar sāknēšanas_serveri lai noteiktu, kā tiks atšifrēti JSON dati. Nākamais, priekš cilpa tiek izmantota, lai drukātu visus no Kafka iegūtos patērētāju ierakstus un JSON datus.
patērētājs2.py:
# Importējiet KafkaConsumer no Kafka bibliotēkasno kafka importa KafkaConsumer
# Importēt sys moduli
importa sys
# Importējiet Json moduli, lai datus sērijveidotu
importa json
# Inicializējiet patērētāja mainīgo un iestatiet rekvizītu JSON atšifrēšanai
patērētājs = KafkaConsumer ('JSONtopic', bootstrap_servers = ['localhost: 9092'],
value_deserializer = lambda m: json.slodzes (m.atšifrēt ('utf-8')))
# Lasiet kafka datus
ziņojumam patērētājam:
drukāt ("Patērētāju ieraksti: \ n")
izdrukāt (ziņojumu)
drukāt ("\ nLasīšana no JSON datiem \ n")
drukāt ("Nosaukums:", ziņojums [6] ['vārds'])
drukāt ("E-pasts:", ziņojums [6] ['e-pasts'])
# Pārtraukt skriptu
sys.Izeja()
Izeja:
Izpildiet šo komandu no viena termināļa, lai izpildītu producenta skriptu.
$ python3 producents2.pyPēc JSON datu nosūtīšanas skripts izdrukās šādu ziņojumu.
Palaidiet šo komandu no cita termināļa, lai izpildītu patērētāja skriptu.
$ python3 patērētājs2.pyPēc skripta palaišanas parādīsies šāda izeja.
Secinājums:
Datus no Kafka var nosūtīt un saņemt dažādos formātos, izmantojot pitonu. Datus var arī saglabāt datu bāzē un iegūt no datu bāzes, izmantojot Kafka un Python. Es mājās, šī apmācība palīdzēs pitona lietotājam sākt strādāt ar Kafku.