Python Client
Documentaci贸n: Python Client for Apache Kafka¶
Material de apoyo: UPM Drive¶
La librer铆a Kafka Client provee funciones, clases y utilidades para programar consumidores y productores en diferentes lenguajes de programaci贸n. En particular, este ejemplo muestra una celda de c贸digo python de un cuaderno Jupyter
El c贸digo implementa un productor que env铆a datos en formato JSON almacenados en ficheros localizados en la carpeta cuyo path se suministra como argumento. Genera un topic para los mensajes en cada uno de los archivos
import confluent_kafka
import json
import logging
import time
import os
# Instanciar productor
try:
producer = confluent_kafka.Producer({'bootstrap.servers': 'kafka:29092'})
except Exception as e:
logging.error(f"Error al configurar el productor: {e}")
raise
def send_data(path, chunk):
for f in os.scandir(path):
if f.name.endswith(".json"):
with open(f, 'r') as file:
i = 0
# Extraer el nombre del archivo como topic
topic = f.name.rpartition('.')[0]
messages = json.load(file)
for i, m in enumerate(messages, start=1):
# Imprimir mensaje
print(f'{topic}[{i}]: {m}')
# Enviar mensaje
try:
serialized_data = json.dumps(m).encode('utf-8')
producer.produce(topic=topic, key=str(i), value=serialized_data)
except Exception as e:
logging.error(f"Error serializando mensajes: {e}")
if i % chunk == 0:
print(f'Enviando {chunk} mensajes...')
print("Vaciando buffers intermedios...")
print("===============================", flush=True)
producer.flush()
time.sleep(2)
if i % chunk != 0:
print(f'Enviados {i} mensajes en topic {topic}')
print("Vaciando buffers intermedios...")
print("===============================", flush=True)
producer.flush()
send_data('./json', 100)