Saltar a contenido

Musiteca

Prerequisitos

Fichero de datos metal-musics-dataset.csv (descargar aquí) convenientemente montado en el contenedor ksqldb-server

Info

Este ejemplo muestra los fundamentos sobre conectores y comandos ksql en Kafka. En particular, se crea un conector source de la clase FilePulse. Esto es posible porque el correspondiente plugin ha sido instalado previamente en el contenedor ksqldb-server. Para ver la lista de plugins instalados se puede invocar la siguiente orden (ver comandos útiles) en el terminal de comandos:

docker exec -it ksqldb-server curl -s localhost:8083/connector-plugins|jq '.[].class'

Para más información relativa al uso de curl con API REST ver este enlace

API REST: Transformaciones embebidas en la carga

Si se dispusiera de un cluster Kafka completo sería posible gestionar los conectores invocando el  API Rest del conector. A continuación se muestra la orden suponiendo que el contenedor exporta y escucha en el puerto 8083 de la máquina local

-- Crear conector para cargar datos desde csv en un repo denominado Datasets  
-- Incluye parsing cabeceras, conversión de tipo y adición del filename  
-- Crea topic connect-file-pulse  
--  
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{  
 "name": "connect-file-pulse-csv",  
 "config": {  
 "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",  
 "filters": "ParseDelimitedRow,ReleaseToInt,FileName",  
 "filters.ParseDelimitedRow.extractColumnName": "headers",  
 "filters.ParseDelimitedRow.trimColumn": "true",  
 "filters.ParseDelimitedRow.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",  
 "filters.ReleaseToInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",  
 "filters.ReleaseToInt.field": "$value.release",  
 "filters.ReleaseToInt.value": "{{ converts($value.release, '\''INTEGER'\'') }}",  
 "filters.ReleaseToInt.overwrite": "true",  
 "filters.FileName.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",  
 "filters.FileName.field": "$.filename",  
 "filters.FileName.value": "$metadata.name",  
 "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",  
 "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",  
 "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",  
 "file.filter.regex.pattern":".*\\.csv$",  
 "fs.scan.directory.path":"/Datasets/",  
 "fs.scan.interval.ms":"10000",  
 "internal.kafka.reporter.bootstrap.servers": "broker:29092",  
 "internal.kafka.reporter.topic": "connect-file-pulse-status",  
 "offset.strategy": "name",  
 "skip.headers": "1",  
 "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",  
 "topic": "connect-file-pulse",  
 "tasks.max": 1  
 }  
}'

KSQL: Base de Datos Kafka

En la infraestructura que se suministra la manipulación de los conectores se realiza invocando órdenes ksql.  En particular, la orden análoga a la llamada REST anterior es la siguiente:

CREATE SOURCE CONNECTOR `connect-file-pulse-csv` WITH(
    "connector.class" = 'io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector',
    "task.reader.class" = 'io.streamthoughts.kafka.connect.filepulse.reader.LocalRowFileInputReader',
    "fs.cleanup.policy.class" = 'io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy',
    "fs.cleanup.policy.triggered.on" = 'COMMITTED',
    "fs.listing.class" = 'io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing',
-- El siguiente parámetro deberá corresponder con el directorio 
-- donde se almacene el fichero CSV con los datos para el ejemplo
    "fs.listing.directory.path" = '/home/appuser/FilePulse/Metal/',
    "fs.listing.filters" = 'io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter',
    "fs.listing.interval.ms" = '10000',
    "file.filter.regex.pattern" = '.*\\.csv$',
    "offset.policy.class" = 'io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy',
    "offset.attributes.string" = 'name',
    "skip.headers" = '1',
    "topic" = 'connect-file-pulse-csv',
    "tasks.reader.class" = 'io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader',
    "tasks.file.status.storage.class" = 'io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore',
    "tasks.file.status.storage.bootstrap.servers" = 'kafka:9092',
    "tasks.file.status.storage.topic" = 'connect-file-pulse-status',
    "filters" = 'ParseCSVLine,FileName,ReleaseToInt',
    "filters.ParseCSVLine.extract.column.name" = 'headers',
    "filters.ParseCSVLine.trim.column" = 'true',
    "filters.ParseCSVLine.separator" = ';',
    "filters.ParseCSVLine.type" = 'io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter',
    "filters.FileName.type" = 'io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter',
    "filters.FileName.field" = '$.filename',
    "filters.FileName.value" = '$metadata.name',
    "filters.ReleaseToInt.type" = 'io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter',
    "filters.ReleaseToInt.field" = 'release',
    "filters.ReleaseToInt.to" = 'INTEGER',
    "filters.ReleaseToInt.default" = '0',
    "internal.kafka.reporter.bootstrap.servers" = 'kafka:9092',
    "internal.kafka.reporter.topic" = 'connect-file-pulse-status',
    "tasks.max" = '1');

Para comprender qué está ocurriendo es preciso monitorizar el log de los contenedores. En particular, del servidor ksqldb invocando la orden docker logs -f ksqldb-server
Si todo ha ido correctamente, en este momento ya se tiene lo necesario para capturar datos y almacenarlos en Kafka. Las órdenes siguientes permitirán el procesamiento, en este caso vía ksql, como si Kafka fuera una base de datos
En primer lugar procede conectar al servidor ksqldb desde el (contenedor) cliente. Las órdenes son las siguientes:
## Comprobaciones via cliente ksql  
## Invocar cliente (en contenedor), listar tópico y ver extracto de datos  
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088  
list topics;  
print `connect-file-pulse-csv` from beginning limit 5;

En este punto pasamos a preparar la estructuras que soportarán la interpretación y el procesamiento de los datos. En concreto, se muestran dos alternativas según que se especifiquen metadatos de forma implícita o explícita:
-- Definir stream raw  
-- En este caso los tipos se obtienen directamente del parser Kafka  
CREATE STREAM `RAW_FILE_PULSE` WITH (KAFKA_TOPIC='connect-file-pulse-base-csv', value_format='AVRO');

-- Definir typed stream  
-- Los tipos se definen explícitamente (siempre más recomendable)  
CREATE STREAM `raw_songs` (  
 title STRING,  
 album STRING,  
 duration STRING,  
 artist STRING,  
 type STRING,  
 release INTEGER,  
 filename VARCHAR)  
WITH (kafka_topic='connect-file-pulse-csv', value_format='AVRO');

-- Comprobaciones. Acceso a metadatos  
describe extended `RAW_FILE_PULSE`;  
describe extended `raw_songs`;

Por último, el ejemplo concluye mostrando el streaming de los datos en Kafka:
-- Acceso a datos  
SET 'auto.offset.reset'='earliest';  
select * from `raw_songs` emit changes limit 10;