Traffic

Streams arrivals, departures, tickets y passengers

----------------------
-- STREAM arrivals
----------------------
DROP STREAM IF EXISTS `arrivals`;
CREATE STREAM `arrivals` (
  `aid` VARCHAR,
  `station` VARCHAR,
  `ats` TIMESTAMP,
  `passengers` INTEGER,
  `dock_number` INTEGER,
  `wagon_count` INTEGER,
  `direction` VARCHAR
  )
WITH (KAFKA_TOPIC='s3arrivals', VALUE_FORMAT='AVRO');

DROP STREAM IF EXISTS `stg_arrivals`;
CREATE STREAM `stg_arrivals` (
  `aid` VARCHAR,
  `station` VARCHAR,
  `ats` TIMESTAMP,
  `passengers` INTEGER,
  `dock_number` INTEGER,
  `wagon_count` INTEGER,
  `direction` VARCHAR
  )
WITH (KAFKA_TOPIC='stg_arrivals', VALUE_FORMAT='AVRO');
----------------------
-- Llave de paso con filtrado de nulos
-- Explícitamente se indican los campos
----------------------
INSERT INTO `stg_arrivals`
  SELECT `aid`, `station`, `ats`, `passengers`,
         `dock_number`, `wagon_count`, `direction`
  FROM `arrivals`
  WHERE `aid` IS NOT NULL AND `station` IS NOT NULL AND
        `ats` IS NOT NULL AND `passengers` IS NOT NULL AND
        `dock_number` IS NOT NULL AND `wagon_count` IS NOT NULL AND `direction` IS NOT NULL
;

------------------------
-- STREAM departures
-- Insert análogo
------------------------
DROP STREAM IF EXISTS `departures`;
CREATE STREAM `departures` (
  `did` VARCHAR,
  `station` VARCHAR,
  `dts` TIMESTAMP,
  `passengers` INTEGER,
  `dock_number` INTEGER,
  `wagon_count` INTEGER,
  `direction` VARCHAR
  )
WITH (KAFKA_TOPIC='s3departures', VALUE_FORMAT='AVRO');

---------------------
-- STREAM tickets
-- Insert análogo
---------------------
DROP STREAM IF EXISTS `tickets`;
CREATE STREAM `tickets` (
  `tid` VARCHAR,
  `train_id` VARCHAR,
  `station` VARCHAR,
  `tts` TIMESTAMP,
  `validations` INTEGER
  )
WITH (KAFKA_TOPIC='s3tickets', VALUE_FORMAT='AVRO');

------------------------
-- STREAM passengers
-- Insert análogo
------------------------
DROP STREAM IF EXISTS `passengers`;
CREATE STREAM `passengers` (
  `pid` VARCHAR,
  `train_id` VARCHAR,
  `station` VARCHAR,
  `pts` TIMESTAMP,
  `waiting_time` DOUBLE,
  `ticket_check_time` DOUBLE,
  `reduced_flow_factor` DOUBLE
  )
WITH (KAFKA_TOPIC='s3passengers', VALUE_FORMAT='AVRO');