Lai pabeigtu šo nodarbību, datorā jābūt aktīvai Kafka instalēšanai. Izlasiet Apache Kafka instalēšana Ubuntu, lai uzzinātu, kā to izdarīt.
Python klienta instalēšana Apache Kafka
Pirms mēs varam sākt strādāt ar Apache Kafka programmā Python, mums jāinstalē Apache Kafka Python klients. To var izdarīt, izmantojot pip (Python paketes indekss). Lai to panāktu, ir komanda:
pip3 instalējiet kafka-pythonTā būs ātra uzstādīšana terminālī:
Python Kafka klienta instalēšana, izmantojot PIP
Tagad, kad mums ir aktīva Apache Kafka instalācija un mēs esam instalējuši arī Python Kafka klientu, mēs esam gatavi sākt kodēšanu.
Producenta izgatavošana
Vispirms Kafka jāpublicē ziņojumi ir ražotāja lietojumprogramma, kas var nosūtīt ziņojumus uz Kafka tēmām.
Ņemiet vērā, ka Kafka ražotāji ir asinhroni ziņojumu ražotāji. Tas nozīmē, ka darbības, kas veiktas, kamēr ziņojums tiek publicēts Kafka Topic nodalījumā, netiek bloķētas. Lai viss būtu vienkārši, šai nodarbībai mēs uzrakstīsim vienkāršu JSON izdevēju.
Lai sāktu, izveidojiet Kafka Producer instanci:
no kafka importa KafkaProducerimporta json
importa pprints
producents = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.izgāztuves (v).kodēt ('utf-8'))
Atribūts bootstrap_servers informē par Kafka servera resursdatoru un portu. Atribūts value_serializer ir paredzēts tikai sastapto JSON vērtību JSON serializācijai.
Lai spēlētos ar Kafka Producer, mēģināsim izdrukāt metriku, kas saistīta ar Producer un Kafka kopu:
metrika = producents.metrika ()nospiedums.nospiedums (metrika)
Tagad mēs redzēsim:
Kafka Mterics
Tagad beidzot mēģināsim nosūtīt ziņojumu Kafka rindai. Labs piemērs būs vienkāršs JSON objekts:
ražotājs.sūtīt ('linuxhint', 'topic': 'kafka')The linuxhint ir tēmas nodalījums, uz kuru tiks nosūtīts JSON objekts. Palaidot skriptu, jūs nesaņemsit nevienu rezultātu, jo ziņojums tiek vienkārši nosūtīts uz tēmas nodalījumu. Ir pienācis laiks uzrakstīt patērētāju, lai mēs varētu pārbaudīt savu lietojumprogrammu.
Patērētāja veidošana
Tagad mēs esam gatavi izveidot jaunu savienojumu kā patērētāja lietojumprogramma un saņemt ziņojumus no Kafka tēmas. Sāciet ar jaunas instances izveidošanu patērētājam:
no kafka importa KafkaConsumerno kafka importa TopicPartition
print ('Savienojuma izveidošana.')
patērētājs = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
Tagad šim savienojumam piešķiriet tēmu un arī iespējamo nobīdes vērtību.
drukāt ('Tēmas piešķiršana.')patērētājs.piešķirt ([TopicPartition ('linuxhint', 2)])
Visbeidzot, mēs esam gatavi drukāt mssage:
print ('Tiek saņemta ziņa.')ziņojumam patērētājam:
drukāt ("OFFSET:" + str (ziņojums [0]) + "\ t MSG:" + str (message))
Tādējādi mēs iegūsim visu publicēto ziņojumu sarakstu Kafka patērētāju tēmu nodalījumā. Šīs programmas rezultāts būs:
Kafka patērētājs
Lai ātri uzzinātu, šeit ir pilns producenta skripts:
no kafka importa KafkaProducerimporta json
importa pprints
producents = KafkaProducer (
bootstrap_servers = 'localhost: 9092',
value_serializer = lambda v: json.izgāztuves (v).kodēt ('utf-8'))
ražotājs.sūtīt ('linuxhint', 'topic': 'kafka')
# metrika = producents.metrika ()
# pprints.nospiedums (metrika)
Un šeit ir visa patērētāja programma, kuru mēs izmantojām:
no kafka importa KafkaConsumerno kafka importa TopicPartition
print ('Savienojuma izveidošana.')
patērētājs = KafkaConsumer (bootstrap_servers = 'localhost: 9092')
drukāt ('Tēmas piešķiršana.')
patērētājs.piešķirt ([TopicPartition ('linuxhint', 2)])
print ('Tiek saņemta ziņa.')
ziņojumam patērētājam:
drukāt ("OFFSET:" + str (ziņojums [0]) + "\ t MSG:" + str (message))
Secinājums
Šajā nodarbībā mēs apskatījām, kā mēs varam instalēt Apache Kafka un sākt to izmantot mūsu Python programmās. Mēs parādījām, cik viegli ir veikt vienkāršus uzdevumus, kas saistīti ar Kafka Python, ar demonstrēto Kafka klientu Python.