Traffic
Streams arrivals, departures, tickets y passengers
----------------------
-- STREAM arrivals
----------------------
DROP STREAM IF EXISTS `arrivals`;
CREATE STREAM `arrivals` (
`aid` VARCHAR,
`station` VARCHAR,
`ats` TIMESTAMP,
`passengers` INTEGER,
`dock_number` INTEGER,
`wagon_count` INTEGER,
`direction` VARCHAR
)
WITH (KAFKA_TOPIC='s3arrivals', VALUE_FORMAT='AVRO');
DROP STREAM IF EXISTS `stg_arrivals`;
CREATE STREAM `stg_arrivals` (
`aid` VARCHAR,
`station` VARCHAR,
`ats` TIMESTAMP,
`passengers` INTEGER,
`dock_number` INTEGER,
`wagon_count` INTEGER,
`direction` VARCHAR
)
WITH (KAFKA_TOPIC='stg_arrivals', VALUE_FORMAT='AVRO');
----------------------
-- Llave de paso con filtrado de nulos
-- Explícitamente se indican los campos
----------------------
INSERT INTO `stg_arrivals`
SELECT `aid`, `station`, `ats`, `passengers`,
`dock_number`, `wagon_count`, `direction`
FROM `arrivals`
WHERE `aid` IS NOT NULL AND `station` IS NOT NULL AND
`ats` IS NOT NULL AND `passengers` IS NOT NULL AND
`dock_number` IS NOT NULL AND `wagon_count` IS NOT NULL AND `direction` IS NOT NULL
;
------------------------
-- STREAM departures
-- Insert análogo
------------------------
DROP STREAM IF EXISTS `departures`;
CREATE STREAM `departures` (
`did` VARCHAR,
`station` VARCHAR,
`dts` TIMESTAMP,
`passengers` INTEGER,
`dock_number` INTEGER,
`wagon_count` INTEGER,
`direction` VARCHAR
)
WITH (KAFKA_TOPIC='s3departures', VALUE_FORMAT='AVRO');
---------------------
-- STREAM tickets
-- Insert análogo
---------------------
DROP STREAM IF EXISTS `tickets`;
CREATE STREAM `tickets` (
`tid` VARCHAR,
`train_id` VARCHAR,
`station` VARCHAR,
`tts` TIMESTAMP,
`validations` INTEGER
)
WITH (KAFKA_TOPIC='s3tickets', VALUE_FORMAT='AVRO');
------------------------
-- STREAM passengers
-- Insert análogo
------------------------
DROP STREAM IF EXISTS `passengers`;
CREATE STREAM `passengers` (
`pid` VARCHAR,
`train_id` VARCHAR,
`station` VARCHAR,
`pts` TIMESTAMP,
`waiting_time` DOUBLE,
`ticket_check_time` DOUBLE,
`reduced_flow_factor` DOUBLE
)
WITH (KAFKA_TOPIC='s3passengers', VALUE_FORMAT='AVRO');