Saltar a contenido

JDBC Sink

Tabla MariaDB

-- MariaDB
--
DROP TABLE `failures`;
CREATE TABLE `failures` (
  `station` varchar(20) CHARACTER SET latin1 COLLATE latin1_spanish_ci NOT NULL,
  `equipment` varchar(20) CHARACTER SET latin1 COLLATE latin1_spanish_ci NOT NULL,
  `hh` varchar(2) NOT NULL,
  `nf` integer NOT NULL,
  `df` double NOT NULL,
  PRIMARY KEY (`station`,`equipment`,`hh`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Conector Sink

-- Kafka
--
CREATE SINK CONNECTOR `sink-jdbc-failures` WITH(
    "connector.class" = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    "connection.url" = 'jdbc:mysql://mariadb:3306/demos?serverTimezone=Europe/Madrid',
    "topics" = 'stfail',
    "table.name.format" = 'failures',
    "key.converter" = 'io.confluent.connect.avro.AvroConverter',
    "key.converter.schema.registry.url" = 'http://schema-registry:8081',
    "key.converter.schemas.enable" = 'true',
    "value.converter" = 'io.confluent.connect.avro.AvroConverter',
    "value.converter.schema.registry.url" = 'http://schema-registry:8081',
    "value.converter.schemas.enable" = 'true',
    "connection.user" = 'santiago',
    "connection.password" = 'SuZi0545',
    "auto.create" = 'false',
    "pk.mode" = 'record_key',
    "pk.fields" = '',
    "fields.withelist" = 'nf,df',
    "insert.mode" = 'upsert',
    "delete.enabled" = 'false',
    "tasks.max" = '1');