JDBC_Sink1
Warning
En este apartado se supone una base de datos kafka en un contenedor accesible para el ksqldb-server
Prerequisitos¶
- Los datos ya están cargados en el cluster Kafka (tutorial Musiteca)
- Las tabla songs ha sido creada en la base de datos kafka (ver ddlKafka)
Este tutorial va mostrar el proceso según el cual datos almacenados en Kafka se van a llevar a una base de datos, en este caso, una base de datos MariaDB. Suponiendo que los datos ya están cargados en Kafka se abordan una serie de preparaciones previas al volcado en la base de datos
Procesado de los datos en Kafka mediante ksql¶
Streams¶
-- Comando análogo al visto en el tutorial [Musiteca](<./Musiteca.md>)
CREATE STREAM `raw_songs` (
title STRING,
album STRING,
duration STRING,
artist STRING,
type STRING,
release INTEGER,
filename VARCHAR)
WITH (kafka_topic='connect-file-pulse-csv', value_format='AVRO');
-- Stream efecto "llave de paso"
-- Los datos se llevarán a la base de datos cuando se inserte en este stream
CREATE STREAM `songs` (
title VARCHAR,
album VARCHAR,
duration VARCHAR,
artist VARCHAR,
type VARCHAR,
release INTEGER,
filename VARCHAR)
WITH (KAFKA_TOPIC = 'songs', PARTITIONS=1, VALUE_FORMAT='AVRO');
Conector JDBCSink¶
OJO
- pk.mode y pk.fields
- insert.mode
- white list: no se indica => envÃa todo
-- Crear conector desde ksql
-- Se supone creada la tabla 'songs' en la base de datos
CREATE SINK CONNECTOR `sink-jdbc-songs` WITH(
"connector.class" = 'io.confluent.connect.jdbc.JdbcSinkConnector',
"connection.url" = 'jdbc:mysql://mariadb:3306/kafka?serverTimezone=Europe/Madrid',
"topics" = 'songs',
"key.converter" = 'io.confluent.connect.avro.AvroConverter',
"key.converter.schema.registry.url" = 'http://schema-registry:8081',
"key.converter.schemas.enable" = 'false',
"value.converter" = 'io.confluent.connect.avro.AvroConverter',
"value.converter.schema.registry.url" = 'http://schema-registry:8081',
"value.converter.schemas.enable" = 'false',
"connection.user" = 'kafka',
"connection.password" = 'kafka',
"auto.create" = 'false',
"auto.evolve" = 'false',
"insert.mode" = 'insert',
"pk.mode" = 'none',
"pk.fields" = 'none',
"table.name.format" = 'songs',
"delete.enabled" = 'false',
"tasks.max" = '1');
Insertar datos¶
-- Abrir llave
INSERT INTO `songs`
SELECT *
FROM `raw_songs` EMIT CHANGES;
-- Q1: Nº de discos de cada artista
SELECT artist, count(*) as N FROM `songs`
GROUP BY artist
EMIT CHANGES;
-- Q2: Nº de canciones en los discos de cada artista
SELECT artist, album, count(*) as N FROM songs
GROUP BY artist, album
EMIT CHANGES;
Consultas ksql¶
-- Q1: Nº de discos de cada artista. OJO 'EMIT CHANGES'
SELECT artist, count(*) as N FROM `songs`
GROUP BY artist
EMIT CHANGES;
-- Q2: Nº de canciones en los discos de cada artista. OJO 'EMIT CHANGES'
SELECT artist, album, count(*) as N FROM `songs`
GROUP BY artist, album
EMIT CHANGES;