Saltar a contenido

JDBC_Sink1

Warning

En este apartado se supone una base de datos kafka en un contenedor accesible para el ksqldb-server

Prerequisitos

  • Los datos ya están cargados en el cluster Kafka (tutorial Musiteca)
  • Las tabla songs ha sido creada en la base de datos kafka (ver ddlKafka)

Este tutorial va mostrar el proceso según el cual datos almacenados en Kafka se van a llevar a una base de datos, en este caso, una base de datos MariaDB. Suponiendo que los datos ya están cargados en Kafka se abordan una serie de preparaciones previas al volcado en la base de datos

Procesado de los datos en Kafka mediante ksql

Streams

-- Comando análogo al visto en el tutorial [Musiteca](<./Musiteca.md>)
CREATE STREAM `raw_songs` (
  title STRING,
  album STRING,
  duration STRING,
  artist STRING,
  type STRING,
  release INTEGER,
  filename VARCHAR)
WITH (kafka_topic='connect-file-pulse-csv', value_format='AVRO');

-- Stream efecto "llave de paso"
-- Los datos se llevarán a la base de datos cuando se inserte en este stream
CREATE STREAM `songs` (
  title VARCHAR,
  album VARCHAR,
  duration VARCHAR,
  artist VARCHAR,
  type VARCHAR,
  release INTEGER,
  filename VARCHAR)
WITH (KAFKA_TOPIC = 'songs', PARTITIONS=1, VALUE_FORMAT='AVRO');

Conector JDBCSink

OJO

  • pk.mode y pk.fields
  • insert.mode
  • white list: no se indica => envía todo
-- Crear conector desde ksql
-- Se supone creada la tabla 'songs' en la base de datos

CREATE SINK CONNECTOR `sink-jdbc-songs` WITH(
    "connector.class" = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    "connection.url" = 'jdbc:mysql://mariadb:3306/kafka?serverTimezone=Europe/Madrid',
    "topics" = 'songs',
    "key.converter" = 'io.confluent.connect.avro.AvroConverter',
    "key.converter.schema.registry.url" = 'http://schema-registry:8081',
    "key.converter.schemas.enable" = 'false',
    "value.converter" = 'io.confluent.connect.avro.AvroConverter',
    "value.converter.schema.registry.url" = 'http://schema-registry:8081',
    "value.converter.schemas.enable" = 'false',
    "connection.user" = 'kafka',
    "connection.password" = 'kafka',
    "auto.create" = 'false',
    "auto.evolve" = 'false',
    "insert.mode" = 'insert',
    "pk.mode" = 'none',
    "pk.fields" = 'none',
    "table.name.format" = 'songs',
    "delete.enabled" = 'false',
    "tasks.max" = '1');

Insertar datos

-- Abrir llave
INSERT INTO `songs`
  SELECT *
  FROM `raw_songs` EMIT CHANGES;

-- Q1: Nº de discos de cada artista
SELECT artist, count(*) as N FROM `songs`
GROUP BY artist
EMIT CHANGES;

-- Q2: Nº de canciones en los discos de cada artista  
SELECT artist, album, count(*) as N FROM songs
GROUP BY artist, album
EMIT CHANGES;

Consultas ksql

-- Q1: Nº de discos de cada artista. OJO 'EMIT CHANGES'
SELECT artist, count(*) as N FROM `songs`
GROUP BY artist
EMIT CHANGES;

-- Q2: Nº de canciones en los discos de cada artista. OJO 'EMIT CHANGES'
SELECT artist, album, count(*) as N FROM `songs`
GROUP BY artist, album
EMIT CHANGES;