Safety
Raw Streams
--------------------------------
-- STREAM raw_failures
-- topic generado por simulador
--------------------------------
DROP STREAM IF EXISTS `raw_failures`;
CREATE STREAM `raw_failures` (
`id` VARCHAR,
`equipment` VARCHAR,
`station` VARCHAR,
`location` VARCHAR,
`fts` VARCHAR,
`duration` DOUBLE
)
WITH (
KAFKA_TOPIC='failures',
VALUE_FORMAT='JSON');
DESCRIBE `raw_failures` EXTENDED;
SELECT * FROM `raw_failures`;
--------------------------------
-- STREAM raw_incidents
-- topic generado por simulador
--------------------------------
DROP STREAM IF EXISTS `raw_incidents`;
CREATE STREAM `raw_incidents` (
`id` VARCHAR,
`type` VARCHAR,
`station` VARCHAR,
`location` VARCHAR,
`its` VARCHAR,
`severity` VARCHAR,
`duration` DOUBLE
)
WITH (
KAFKA_TOPIC='incidents',
VALUE_FORMAT='JSON');
DESCRIBE `raw_incidents` EXTENDED;
SELECT * FROM `raw_incidents`;
Staging failures: stream stg_failures, table failures
--
-- Stream particionado monitor de fallos
DROP STREAM IF EXISTS `stg_failures`;
CREATE STREAM `stg_failures`
WITH (kafka_topic='stg_failures', key_format='AVRO', value_format='AVRO')
AS SELECT STRUCT(`equipment` := `equipment`, `station` := `station`) as fk,
`location`,
PARSE_TIMESTAMP(`fts`, 'yyyy-MM-dd''T''HH:mm:ss') as `fts`,
`duration`
FROM `raw_failures`
-- Filtrado de nulos
WHERE `equipment` IS NOT NULL AND `station` IS NOT NULL AND
`location` IS NOT NULL AND `ts` IS NOT NULL AND `duration` IS NOT NULL
PARTITION BY STRUCT(`equipment` := `equipment`, `station` := `station`);
print `stg_failures` from beginning;
DESCRIBE `stg_failures` EXTENDED;
SELECT * FROM `stg_failures`;
--
-- Tabla fallos totales (estación, equipo, hora)
CREATE TABLE `failures`
WITH (kafka_topic='stfail', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT fk->`station` as `station`,
fk->`equipment` as `equipment`,
FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid') as `hh`,
count(*) as `nf`, sum(`duration`) as `df`
FROM `stg_failures`
GROUP BY fk->`station`, fk->`equipment`, FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid');
print `stfail` from beginning;
DESCRIBE `failures` EXTENDED;
SELECT * FROM `failures`;
-- Tabla fallos totales (estación, hora)
CREATE TABLE `fastho`
WITH (kafka_topic='fastho', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT fk->`station` as `station`,
FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid') as `hh`,
count(*) as `nf`, sum(`duration`) as `df`
FROM `stg_failures`
-- Sin WHERE
GROUP BY fk->`station`, FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid');
Staging incidents: stream stg_incidents, table incidents
--
-- Stream particionado monitor de incidentes
DROP STREAM IF EXISTS `stg_incidents`;
CREATE STREAM `stg_incidents`
WITH (kafka_topic='stg_incidents', key_format='AVRO', value_format='AVRO')
AS SELECT STRUCT(`type` := `type`, `station` := `station`) as ik,
`location`,
PARSE_TIMESTAMP(`its`, 'yyyy-MM-dd''T''HH:mm:ss') as `its`,
`severity`,
`duration`
FROM `raw_incidents`
-- Filtrado de nulos
WHERE `type` IS NOT NULL AND `station` IS NOT NULL AND
`location` IS NOT NULL AND `ts` IS NOT NULL AND
`severity` IS NOT NULL AND `duration` IS NOT NULL
PARTITION BY STRUCT(`type` := `type`, `station` := `station`);
print `stg_incidents` from beginning;
DESCRIBE `stg_incidents` EXTENDED;
SELECT * FROM `stg_incidents`;
--
-- Tabla incidentes totales (estación, tipo, hora)
CREATE TABLE `incidents`
WITH (kafka_topic='stinci', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT fk->`station` as `station`,
fk->`type` as `type`,
FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid') as `hh`,
count(*) as `nf`, sum(`duration`) as `df`
FROM `stg_incidents`
-- Sin WHERE
GROUP BY fk->`station`, fk->`type`, FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid');
-- Tabla fallos totales (estación, hora)
CREATE TABLE `fastho`
WITH (kafka_topic='fastho', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT fk->`station` as `station`,
FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid') as `hh`,
count(*) as `nf`, sum(`duration`) as `df`
FROM `stg_incidents`
GROUP BY fk->`station`, FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid');