JDBC_Sink2
Warning
En este apartado se supone una base de datos kafka en un contenedor accesible para el ksqldb-server
Prerequisitos¶
- Los datos ya están cargados en el cluster Kafka (continuación del tutorial JDBCSink1)
- Las tablas albsong1 y albsong2 ya han sido creadas en la base de datos kafka (ver ddlKafka)
Claúsula AS_VALUE¶
-- Mensajes Kafka: pares clave/valor
-- Etiquetas k1 y k2 para evitar el duplicado de nombres
CREATE TABLE `AlbSong1`
WITH (KAFKA_TOPIC='albsong1', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT
artist as k1,
album as k2,
AS_VALUE(artist) AS artist,
AS_VALUE(album) AS album,
count(*) as N
FROM `raw_songs`
WHERE title IS NOT NULL
GROUP BY artist, album;
-- Resultados
print 'albsong1' from beginning limit 2;
select * from `AlbSong1`;
select artist, album, n from `AlbSong1`;
Resultado del print¶
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
------------------------------------------------------------------------------------------
rowtime: 2023/03/21 10:51:48.082 Z,
key: {"K1": "Soundgarden", "K2": "Superunknown"},
value: {"ARTIST": "Soundgarden", "ALBUM": "Superunknown", "N": 1}, partition: 0
rowtime: 2023/03/21 10:51:48.082 Z,
key: {"K1": "Nirvana", "K2": "Nevermind"},
value: {"ARTIST": "Nirvana", "ALBUM": "Nevermind", "N": 3}, partition: 0
Definir conector JDBCSink¶
CREATE SINK CONNECTOR `sink-jdbc-albsong1` WITH(
"connector.class" = 'io.confluent.connect.jdbc.JdbcSinkConnector',
"connection.url" = 'jdbc:mysql://mysql:3306/kafka',
"topics" = 'albsong1',
"table.name.format" = 'albsong1',
"key.converter" = 'io.confluent.connect.avro.AvroConverter',
"key.converter.schema.registry.url" = 'http://schema-registry:8081',
"key.converter.schemas.enable" = 'true',
"value.converter" = 'io.confluent.connect.avro.AvroConverter',
"value.converter.schema.registry.url" = 'http://schema-registry:8081',
"value.converter.schemas.enable" = 'true',
"connection.user" = 'kafka',
"connection.password" = 'kafka',
"auto.create" = 'false',
"pk.mode" = 'record_value',
"pk.fields" = 'ARTIST,ALBUM',
"insert.mode" = 'upsert',
"delete.enabled" = 'false',
"tasks.max" = '1');
Datos Estructurados¶
La consulta a continuación muestra el uso de la palabra reservada STRUCT para componer una columna k compuesta por los campos artist y album. El resultado de la consulta muestra el nº de canciones en los discos de cada artista
SELECT struct(artist := artist, album := album) as k,
count(*) as N
FROM `raw_songs`
WHERE title IS NOT NULL AND album IS NOT NULL
GROUP BY struct(artist := artist, album := album)
emit changes;
A continuación, la misma consulta anterior es utilizada para componer una tabla
CREATE TABLE `AlbSong2`
WITH (KAFKA_TOPIC='albsong2', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT
struct(artist := artist, album := album) as k,
count(*) as N
FROM `raw_songs`
WHERE title IS NOT NULL AND album IS NOT NULL
GROUP BY struct(artist := artist, album := album);
La estructura de los mensajes en el tópico que corresponde a la tabla es conforme a lo siguiente:
print 'albsong2' from beginning limit 2;
-------------------------------------------------------------------------------------------------------
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/02/24 13:01:58.648 Z, key: {"K1": "Soundgarden", "K2": "Superunknown"}, value: {"N": 1}
rowtime: 2023/02/24 13:01:58.648 Z, key: {"K1": "Nirvana", "K2": "Nevermind"}, value: {"N": 3}
Se observa que el campo clave del mensaje almacena los valores del struct con las etiquetas K1 y K2 en tanto que en el campo valor únicamente aparece el agregado. La organización interna de los mensajes es exactamente la misma si la consulta se expresa sin struct:
CREATE TABLE `AlbSong3`
WITH (KAFKA_TOPIC='albsong3', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT
artist,
album,
count(*) as N
FROM `raw_songs`
WHERE title IS NOT NULL AND album IS NOT NULL
GROUP BY artist, album;
Sin embargo, en el primer caso la tabla tiene únicamente dos columnas mientras que en el último tiene tres:
ksql> select * from `AlbSong2`;
+-----------------------------------------+---+
| K | N |
+-----------------------------------------+---+
|{ARTIST=Nirvana, ALBUM=Nevermind} | 3 |
|{ARTIST=Pearl Jam, ALBUM=Ten} | 4 |
|{ARTIST=Soundgarden, ALBUM=Superunknown} | 1 |
|{ARTIST=Tool, ALBUM=Aenima} | 1 |
|{ARTIST=Tool, ALBUM=Lateralus} | 1 |
+-----------------------------------------+---+
ksql> select * from `AlbSong3`;
+-------------+--------------+---+
| ARTIST | ALBUM | N |
+-------------+--------------+---+
| Nirvana | Nevermind | 3 |
| Pearl Jam | Ten | 4 |
| Soundgarden | Superunknown | 1 |
| Tool | Aenima | 1 |
| Tool | Lateralus | 1 |
+-------------+--------------+---+
Conector JDBCSink¶
OJO
- pk.mode: la del mensaje
- pk.fields: no hay que especificar nada, coincide con la de la tabla
- insert.mode: upsert (update + insert)
- white list: el agregado
-- Crear conector desde ksql
-- Suponiendo creada la tabla en la base de datos
CREATE SINK CONNECTOR `sink-jdbc-albsong2` WITH(
"connector.class" = 'io.confluent.connect.jdbc.JdbcSinkConnector',
"connection.url" = 'jdbc:mysql://mysql:3306/kafka',
"topics" = 'albsong2',
"table.name.format" = 'albsong2',
"key.converter" = 'io.confluent.connect.avro.AvroConverter',
"key.converter.schema.registry.url" = 'http://schema-registry:8081',
"key.converter.schemas.enable" = 'true',
"value.converter" = 'io.confluent.connect.avro.AvroConverter',
"value.converter.schema.registry.url" = 'http://schema-registry:8081',
"value.converter.schemas.enable" = 'true',
"connection.user" = 'kafka',
"connection.password" = 'kafka',
"auto.create" = 'false',
"pk.mode" = 'record_key',
"pk.fields" = '',
"fields.withelist" = 'N',
"insert.mode" = 'upsert',
"delete.enabled" = 'true',
"tasks.max" = '1');
Para entender la relevancia del valor upsert en insert.mode, además de consultar la documentación relativa a la configuración del conector, se recomienda añadir al directorio donde lee el conector connect-file-pulse-csv definido en Musiteca un archivo con el contenido siguiente:
title;album;duration;release;artist;type
Spoonman;Superunknown;04:06;1994;Soundgarden;Rock
El resultado es que el conector captura la línea del fichero y los datos fluyen internamente como mensajes en los tópicos correspondientes hasta que albsong2 (el tópico que se envía a la base de datos) contiene lo siguiente:
ksql> print 'albsong2' from beginning;
-------------------------------------------------------------------------------------------------------
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2025/02/19 13:51:08.572 Z, key: {"ARTIST": "Soundgarden", "ALBUM": "Superunknown"}, value: {"N": 1}
rowtime: 2025/02/19 13:51:08.572 Z, key: {"ARTIST": "Nirvana", "ALBUM": "Nevermind"}, value: {"N": 3}
rowtime: 2025/02/19 13:51:08.572 Z, key: {"ARTIST": "Pearl Jam", "ALBUM": "Ten"}, value: {"N": 4}
rowtime: 2025/02/19 13:51:08.572 Z, key: {"ARTIST": "Tool", "ALBUM": "Aenima"}, value: {"N": 1}
rowtime: 2025/02/19 13:51:08.572 Z, key: {"ARTIST": "Tool", "ALBUM": "Lateralus"}, value: {"N": 1}
rowtime: 2025/02/19 15:09:08.696 Z, key: {"ARTIST": "Soundgarden", "ALBUM": "Superunknown"}, value: {"N": 2}
Es decir, los valores del campo clave en los mensajes primero y último coinciden de modo que un insert en la base de datos no funcionaría correctamente
Claúsula PARTITION BY¶
Las claves estructuradas son especialmente interesantes cuando se utilizan para controlar el particionado de los datos lo cual es determinante en el rendimiento global del sistema. A continuación se muestra la definición de un stream análogo a los precedentes pero añadiendo una clave de particionado
CREATE STREAM part_songs
WITH (kafka_topic='psongs', key_format='AVRO', value_format='AVRO')
AS SELECT struct(artist := artist, album := album) as k,
title, duration, type, release
FROM `raw_songs`
WHERE title IS NOT NULL AND album IS NOT NULL
PARTITION BY struct(artist := artist, album := album);
emit changes;
print 'psongs' from beginning limit 1;