/* Air quality monitoring case study Authors: Juan Boubeta-Puig , Loli BurgueƱo and Antonio Vallecillo Date: 26th June, 2017 */ create schema AirMeasurement (stationTs long, stationId integer, pm2_5 double, pm10 double, o3 double, no2 double, so2 double, co double) /* NO2 PATTERNS */ @Name('NO2_Avg') @Priority(3) insert into NO2_Avg select current_timestamp() as timestamp, a1.stationId as stationId, avg(a1.no2) as value from pattern [every a1 = AirMeasurement(no2 <> -1)].win:time(3600000 milliseconds) group by a1.stationId @Name('NO2_Good') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 1 as levelNumber, 'NO2_Good' as levelName from pattern [every a1 = NO2_Avg (a1.value >= 0 and a1.value < 54)] Name('NO2_Moderate') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 2 as levelNumber, 'NO2_Moderate' as levelName from pattern [every a1 = NO2_Avg (a1.value >= 54 and a1.value < 101)] @Name('NO2_UnhealthyForSensitiveGroups') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 3 as levelNumber, 'NO2_UnhealthyForSensitiveGroups' as levelName from pattern [every a1 = NO2_Avg (a1.value >= 101 and a1.value < 361)] @Name('NO2_Unhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 4 as levelNumber, 'NO2_Unhealthy' as levelName from pattern [every a1 = NO2_Avg (a1.value >= 361 and a1.value < 650)] @Name('NO2_VeryUnhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 5 as levelNumber, 'NO2_VeryUnhealthy' as levelName from pattern [every a1 = NO2_Avg (a1.value >= 650 and a1.value < 1250)] @Name('NO2_Hazardous') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 6 as levelNumber, 'NO2_Hazardous' as levelName from pattern [every a1 = NO2_Avg (a1.value >= 1250 and a1.value <= 2049)] /* SO2 PATTERNS */ @Name('SO2_Avg') @Priority(3) insert into SO2_Avg select current_timestamp() as timestamp, a1.stationId as stationId, avg(a1.so2) as value from pattern [every a1 = AirMeasurement(so2 <> -1)].win:time(3600000 milliseconds) group by a1.stationId @Name('SO2_Good') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 1 as levelNumber, 'SO2_Good' as levelName from pattern [every a1 = SO2_Avg (a1.value >= 0 and a1.value < 36)] @Name('SO2_Moderate') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 2 as levelNumber, 'SO2_Moderate' as levelName from pattern [every a1 = SO2_Avg (a1.value >= 36 and a1.value < 76)] @Name('SO2_UnhealthyForSensitiveGroups') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 3 as levelNumber, 'SO2_UnhealthyForSensitiveGroups' as levelName from pattern [every a1 = SO2_Avg (a1.value >= 76 and a1.value < 186)] @Name('SO2_Unhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 4 as levelNumber, 'SO2_Unhealthy' as levelName from pattern [every a1 = SO2_Avg (a1.value >= 186 and a1.value < 305)] @Name('SO2_VeryUnhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 5 as levelNumber, 'SO2_VeryUnhealthy' as levelName from pattern [every a1 = SO2_Avg (a1.value >= 305 and a1.value < 605)] @Name('SO2_Hazardous') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 6 as levelNumber, 'SO2_Hazardous' as levelName from pattern [every a1 = SO2_Avg (a1.value >= 605 and a1.value <= 1004)] /* CO PATTERNS */ @Name('CO_Avg') @Priority(3) insert into CO_Avg select current_timestamp() as timestamp, a1.stationId as stationId, avg(a1.co) as value from pattern [every a1 = AirMeasurement(co <> -1)].win:time(28800000 milliseconds) group by a1.stationId @Name('CO_Good') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 1 as levelNumber, 'CO_Good' as levelName from pattern [every a1 = CO_Avg (a1.value >= 0 and a1.value < 4.5)] @Name('CO_Moderate') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 2 as levelNumber, 'CO_Moderate' as levelName from pattern [every a1 = CO_Avg (a1.value >= 4.5 and a1.value < 9.5)] @Name('CO_UnhealthyForSensitiveGroups') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 3 as levelNumber, 'CO_UnhealthyForSensitiveGroups' as levelName from pattern [every a1 = CO_Avg (a1.value >= 9.5 and a1.value < 12.5)] @Name('CO_Unhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 4 as levelNumber, 'CO_Unhealthy' as levelName from pattern [every a1 = CO_Avg (a1.value >= 12.5 and a1.value < 15.5)] @Name('CO_VeryUnhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 5 as levelNumber, 'CO_VeryUnhealthy' as levelName from pattern [every a1 = CO_Avg (a1.value >= 15.5 and a1.value < 30.5)] @Name('CO_Hazardous') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 6 as levelNumber, 'CO_Hazardous' as levelName from pattern [every a1 = CO_Avg (a1.value >= 30.5 and a1.value <= 50.4)] /* O3 PATTERNS */ @Name('O3_Avg') @Priority(3) insert into O3_Avg select current_timestamp() as timestamp, a1.stationId as stationId, avg(a1.o3) as value from pattern [every a1 = AirMeasurement(o3 <> -1)].win:time(28800000 milliseconds) group by a1.stationId @Name('O3_Good') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 1 as levelNumber, 'O3_Good' as levelName from pattern [every a1 = O3_Avg (a1.value >= 0 and a1.value < 0.055)] @Name('O3_Moderate') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 2 as levelNumber, 'O3_Moderate' as levelName from pattern [every a1 = O3_Avg (a1.value >= 0.055 and a1.value < 0.071)] @Name('O3_UnhealthyForSensitiveGroups') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 3 as levelNumber, 'O3_UnhealthyForSensitiveGroups' as levelName from pattern [every a1 = O3_Avg (a1.value >= 0.071 and a1.value < 0.086)] @Name('O3_Unhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 4 as levelNumber, 'O3_Unhealthy' as levelName from pattern [every a1 = O3_Avg (a1.value >= 0.086 and a1.value < 0.106)] @Name('O3_VeryUnhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 5 as levelNumber, 'O3_VeryUnhealthy' as levelName from pattern [every a1 = O3_Avg (a1.value >= 0.106 and a1.value < 0.200)] @Name('O3_Hazardous') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 6 as levelNumber, 'O3_Hazardous' as levelName from pattern [every a1 = O3_Avg (a1.value >= 0.200)] /* PM2_5 PATTERNS */ @Name('PM2_5_Avg') @Priority(3) insert into PM2_5_Avg select current_timestamp() as timestamp, a1.stationId as stationId, avg(a1.pm2_5) as value from pattern [every a1 = AirMeasurement(pm2_5 <> -1)].win:time(86400000 milliseconds) group by a1.stationId @Name('PM2_5_Good') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 1 as levelNumber, 'PM2_5_Good' as levelName from pattern [every a1 = PM2_5_Avg (a1.value >= 0 and a1.value < 12.1)] @Name('PM2_5_Moderate') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 2 as levelNumber, 'PM2_5_Moderate' as levelName from pattern [every a1 = PM2_5_Avg (a1.value >= 12.1 and a1.value < 35.5)] @Name('PM2_5_UnhealthyForSensitiveGroups') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 3 as levelNumber, 'PM2_5_UnhealthyForSensitiveGroups' as levelName from pattern [every a1 = PM2_5_Avg (a1.value >= 35.5 and a1.value < 55.5)] @Name('PM2_5_Unhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 4 as levelNumber, 'PM2_5_Unhealthy' as levelName from pattern [every a1 = PM2_5_Avg (a1.value >= 55.5 and a1.value < 150.5)] @Name('PM2_5_VeryUnhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 5 as levelNumber, 'PM2_5_VeryUnhealthy' as levelName from pattern [every a1 = PM2_5_Avg (a1.value >= 150.5 and a1.value < 250.5)] @Name('PM2_5_Hazardous') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 6 as levelNumber, 'PM2_5_Hazardous' as levelName from pattern [every a1 = PM2_5_Avg (a1.value >= 250.5 and a1.value <= 500.4)] /* PM10 PATTERNS */ @Name('PM10_Avg') @Priority(3) insert into PM10_Avg select current_timestamp() as timestamp, a1.stationId as stationId, avg(a1.pm10) as value from pattern [every a1 = AirMeasurement(pm10 <> -1)].win:time(86400000 milliseconds) group by a1.stationId @Name('PM10_Good') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 1 as levelNumber, 'PM10_Good' as levelName from pattern [every a1 = PM10_Avg (a1.value >= 0 and a1.value < 55)] @Name('PM10_Moderate') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 2 as levelNumber, 'PM10_Moderate' as levelName from pattern [every a1 = PM10_Avg (a1.value >= 55 and a1.value < 155)] @Name('PM10_UnhealthyForSensitiveGroups') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 3 as levelNumber, 'PM10_UnhealthyForSensitiveGroups' as levelName from pattern [every a1 = PM10_Avg (a1.value >= 155 and a1.value < 255)] @Name('PM10_Unhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 4 as levelNumber, 'PM10_Unhealthy' as levelName from pattern [every a1 = PM10_Avg (a1.value >= 255 and a1.value < 355)] @Name('PM10_VeryUnhealthy') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 5 as levelNumber, 'PM10_VeryUnhealthy' as levelName from pattern [every a1 = PM10_Avg (a1.value >= 355 and a1.value < 425)] @Name('PM10_Hazardous') @Priority(2) insert into PollutantLevel select a1.timestamp as timestamp, a1.stationId as stationId, 6 as levelNumber, 'PM10_Hazardous' as levelName from pattern [every a1 = PM10_Avg (a1.value >= 425 and a1.value <= 604)] /* AIR QUALITY LEVEL PATTERN */ @Name('AirQualityLevel') @Priority(1) insert into AirQualityLevel select current_timestamp() as timestamp, a1.stationId as stationId, max(a1.levelNumber) as level from pattern [every a1 = PollutantLevel].win:time_batch(600000 milliseconds) group by a1.stationId having max(a1.levelNumber) is not null