Saltar a contenido

Safety

TIMESTAMP

Scalar Functions: Date/Time

PARSE_TIMESTAMP
FORMAT_TIMESTAMP

Raw Streams

--------------------------------
-- STREAM raw_failures
-- topic generado por simulador
--------------------------------
DROP STREAM IF EXISTS `raw_failures`;
CREATE STREAM `raw_failures` (
  `id` VARCHAR,
  `equipment` VARCHAR,
  `station` VARCHAR,
  `location` VARCHAR,
  `fts` VARCHAR,
  `duration` DOUBLE
  )
WITH (
  KAFKA_TOPIC='failures',
  VALUE_FORMAT='JSON');

DESCRIBE `raw_failures` EXTENDED;
SELECT * FROM `raw_failures`;

--------------------------------
-- STREAM raw_incidents
-- topic generado por simulador
--------------------------------
DROP STREAM IF EXISTS `raw_incidents`;
CREATE STREAM `raw_incidents` (
  `id` VARCHAR,
  `type` VARCHAR,
  `station` VARCHAR,
  `location` VARCHAR,
  `its` VARCHAR,
  `severity` VARCHAR,
  `duration` DOUBLE
  )
WITH (
  KAFKA_TOPIC='incidents',
  VALUE_FORMAT='JSON');

DESCRIBE `raw_incidents` EXTENDED;
SELECT * FROM `raw_incidents`;

Staging failures: stream stg_failures, table failures

--
-- Stream particionado monitor de fallos
DROP STREAM IF EXISTS `stg_failures`;
CREATE STREAM `stg_failures`
WITH (kafka_topic='stg_failures', key_format='AVRO', value_format='AVRO')
AS SELECT STRUCT(`equipment` := `equipment`, `station` := `station`) as fk,
          `location`,
          PARSE_TIMESTAMP(`fts`, 'yyyy-MM-dd''T''HH:mm:ss') as `fts`,
          `duration`
FROM `raw_failures`
-- Filtrado de nulos
WHERE `equipment` IS NOT NULL AND `station` IS NOT NULL AND
      `location` IS NOT NULL AND `ts` IS NOT NULL AND `duration` IS NOT NULL
PARTITION BY STRUCT(`equipment` := `equipment`, `station` := `station`);

print `stg_failures` from beginning;
DESCRIBE `stg_failures` EXTENDED;
SELECT * FROM `stg_failures`;

--
-- Tabla fallos totales (estación, equipo, hora)
CREATE TABLE `failures`
WITH (kafka_topic='stfail', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT fk->`station` as `station`,
          fk->`equipment` as `equipment`,
          FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid') as `hh`,
          count(*) as `nf`, sum(`duration`) as `df`
   FROM `stg_failures`
   GROUP BY fk->`station`, fk->`equipment`, FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid');

print `stfail` from beginning;
DESCRIBE `failures` EXTENDED;
SELECT * FROM `failures`;

-- Tabla fallos totales (estación, hora)
CREATE TABLE `fastho`
WITH (kafka_topic='fastho', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT fk->`station` as `station`,
          FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid') as `hh`,
          count(*) as `nf`, sum(`duration`) as `df`
   FROM `stg_failures`
-- Sin WHERE
   GROUP BY fk->`station`, FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid');

Staging incidents: stream stg_incidents, table incidents

--
-- Stream particionado monitor de incidentes
DROP STREAM IF EXISTS `stg_incidents`;
CREATE STREAM `stg_incidents`
WITH (kafka_topic='stg_incidents', key_format='AVRO', value_format='AVRO')
AS SELECT STRUCT(`type` := `type`, `station` := `station`) as ik,
          `location`,
          PARSE_TIMESTAMP(`its`, 'yyyy-MM-dd''T''HH:mm:ss') as `its`,
          `severity`,
          `duration`
FROM `raw_incidents`
-- Filtrado de nulos
WHERE `type` IS NOT NULL AND `station` IS NOT NULL AND
      `location` IS NOT NULL AND `ts` IS NOT NULL AND
       `severity` IS NOT NULL AND `duration` IS NOT NULL
PARTITION BY STRUCT(`type` := `type`, `station` := `station`);

print `stg_incidents` from beginning;
DESCRIBE `stg_incidents` EXTENDED;
SELECT * FROM `stg_incidents`;

--
-- Tabla incidentes totales (estación, tipo, hora)
CREATE TABLE `incidents`
WITH (kafka_topic='stinci', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT fk->`station` as `station`,
          fk->`type` as `type`,
          FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid') as `hh`,
          count(*) as `nf`, sum(`duration`) as `df`
   FROM `stg_incidents`
-- Sin WHERE
   GROUP BY fk->`station`, fk->`type`, FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid');

-- Tabla fallos totales (estación, hora)
CREATE TABLE `fastho`
WITH (kafka_topic='fastho', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO')
AS SELECT fk->`station` as `station`,
          FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid') as `hh`,
          count(*) as `nf`, sum(`duration`) as `df`
   FROM `stg_incidents`
   GROUP BY fk->`station`, FORMAT_TIMESTAMP(`fts`, 'HH', 'Europe/Madrid');