Azure Stream Anlytcis Project: On-demand real-time analytics

Assignment Description

This assignment is a part of a project implemented in the context of the course "Big Data Management Systems" taught by Prof. Chatziantoniou in the Department of Management Science and Technology (AUEB). The aim of the project is to familiarize the students with big data management systems such as Hadoop, Redis, MongoDB and Neo4j.

In the context of this assignment on Stream Analytics, Azure Stream Analytics will be used in order to process a data stream of ATM transactions and answer stream queries. The schema of the stream is the following: (ATMCode, CardNumber, Type, Amount).

Queries: Output & PowerBI

Query 1

Show the total 'Amount' of 'Type = 0' transactions at 'ATM Code = 21' of the last 10 minutes. Repeat as new events keep flowing in (use a sliding window).

SELECT
    SUM(CAST([BDSMastersInput].[Amount] AS BIGINT)) AS TotalAmount,
    System.Timestamp AS Time
INTO
    [BDSMastersOutput]
FROM
    BDSMastersInput
WHERE CAST([BDSMastersInput].[Type] AS BIGINT) = 0 AND
    CAST([BDSMastersInput].[ATMCode] AS BIGINT) = 21
GROUP BY SlidingWindow(minute, 10)
Output:
[
    {
        "totalamount": 376,
        "time": "2017-05-21T10:24:23.9850000Z"
    },
    {
        "totalamount": 392,
        "time": "2017-05-21T10:24:30.0030000Z"
    },
    {
        "totalamount": 382,
        "time": "2017-05-21T10:24:57.7490000Z"
    },
    {
        "totalamount": 422,
        "time": "2017-05-21T10:26:04.0340000Z"
    },
    {
        "totalamount": 398,
        "time": "2017-05-21T10:26:05.7910000Z"
    }
]
PowerBI:

Q1

Query 2

Show the total 'Amount' of 'Type = 1' transactions at 'ATM Code = 21' of the last hour. Repeat once every hour (use a tumbling window).

SELECT
    SUM(CAST([BDSMastersInput].[Amount] AS BIGINT)) AS TotalAmount,
    System.Timestamp AS Time
INTO
    [BDSMastersOutput]
FROM
    BDSMastersInput
WHERE CAST([BDSMastersInput].[Type] AS BIGINT) = 1 AND
    CAST([BDSMastersInput].[ATMCode] AS BIGINT) = 21
GROUP BY TumblingWindow(hour, 1)
Output (TumblingWindow(minute, 3)):
[
    {
        "totalamount": 128,
        "time": "2017-05-21T12:51:00.0000000Z"
    },
    {
        "totalamount": 132,
        "time": "2017-05-21T12:54:00.0000000Z"
    },
    {
        "totalamount": 110,
        "time": "2017-05-21T12:57:00.0000000Z"
    },
    {
        "totalamount": 139,
        "time": "2017-05-21T1:00:00.0000000Z"
    }
]
PowerBI (TumblingWindow(minute, 3)):

Q1

Query 3

Show the total 'Amount' of 'Type = 1' transactions at 'ATM Code = 21' of the last hour. Repeat once every 30 minutes (use a hopping window).

SELECT
    SUM(CAST([BDSMastersInput].[Amount] AS BIGINT)) AS TotalAmount,
    System.Timestamp AS Time
INTO
    [BDSMastersOutput]
FROM
    BDSMastersInput
WHERE CAST([BDSMastersInput].[Type] AS BIGINT) = 1 AND 
    CAST([BDSMastersInput].[ATMCode] AS BIGINT) = 21
GROUP BY HoppingWindow(minute, 60, 30)
Output (HoppingWindow(minute, 2, 1)):
[
    {
        "totalamount": 67,
        "time": "2017-05-21T13:12:00.0000000Z"
    },
    {
        "totalamount": 121,
        "time": "2017-05-21T13:13:00.0000000Z"
    },
    {
        "totalamount": 121,
        "time": "2017-05-21T13:14:00.0000000Z"
    }
]
PowerBI (HoppingWindow(minute, 2, 1)):

Q1

Query 4

Show the total 'Amount' of 'Type = 1' transactions per 'ATM Code' of the last one hour (use a sliding window).

SELECT
    CAST([BDSMastersInput].[ATMCode] AS BIGINT) AS AtmCode,
    SUM(CAST([BDSMastersInput].[Amount] AS BIGINT)) AS TotalAmount,
    System.Timestamp AS Time
INTO
    [BDSMastersOutput]
FROM
    BDSMastersInput
WHERE CAST([BDSMastersInput].[Type] AS BIGINT) = 1
GROUP BY CAST([BDSMastersInput].[ATMCode] AS BIGINT), SlidingWindow(hour, 1)
Output:
[
    {
        "atmcode": 19,
        "totalamount": 76,
        "time": "2017-05-21T12:01:01.0010000Z"
    },
    {
        "atmcode": 15,
        "totalamount": 143,
        "time": "2017-05-21T12:01:01.0010000Z"
    },
    {
        "atmcode": 12,
        "totalamount": 65,
        "time": "2017-05-21T12:01:01.0010000Z"
    },
    {
        "atmcode": 18,
        "totalamount": 235,
        "time": "2017-05-21T12:01:01.0010000Z"
    },
    {
        "atmcode": 16,
        "totalamount": 21,
        "time": "2017-05-21T12:01:01.0010000Z"
    }
]
PowerBI:

Q1

Query 5

Show the total 'Amount' of 'Type = 1' transactions per 'Area Code' of the last hour. Repeat once every hour (use a tumbling window).

SELECT
    CAST([atmRef].[area_code] AS BIGINT) AS AreaCode,
    SUM(CAST([BDSMastersInput].[Amount] AS BIGINT)) AS TotalAmount,
    System.Timestamp AS Time
INTO
    [BDSMastersOutput]
FROM
    BDSMastersInput
INNER JOIN [atmRef]
    ON CAST([atmRef].[atm_code] AS BIGINT) = CAST([BDSMastersInput].[atmCode] AS BIGINT)
WHERE CAST([BDSMastersInput].[Type] AS BIGINT) = 1
GROUP BY CAST([atmRef].[area_code] AS BIGINT), TumblingWindow(hour, 1)
Output:
[
    {
        "areacode": 2,
        "totalamount": 76,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "areacode": 4,
        "totalamount": 235,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "areacode": 9,
        "totalamount": 65,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "areacode": 11,
        "totalamount": 112,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "areacode": 5,
        "totalamount": 143,
        "time": "2017-05-21T13:00:00.0000000Z"
    }
]
PowerBI (TymblingWindow(minute, 1)):

Q1 Q1

Query 6

Show the total 'Amount' per ATM’s 'City' and Customer’s 'Gender' of the last hour. Repeat once every hour (use a tumbling window).

SELECT
    [areaRef].[area_city] AS City,
    [customerRef].[gender] AS Gender,
    SUM(CAST([BDSMastersInput].[Amount] AS BIGINT)) AS TotalAmount,
    System.Timestamp AS Time
INTO
    [BDSMastersOutput]
FROM
    BDSMastersInput
INNER JOIN [customerRef]
    ON CAST([customerRef].[card_number] AS BIGINT) = CAST([BDSMastersInput].[CardNumber] AS BIGINT)
INNER JOIN [atmRef]
    ON CAST([atmRef].[atm_code] AS BIGINT) = CAST([BDSMastersInput].[ATMCode] AS BIGINT)
INNER JOIN [areaRef]
    ON CAST([areaRef].[area_code] AS BIGINT) = CAST([atmRef].[area_code] AS BIGINT)
GROUP BY [areaRef].[area_city], [customerRef].[gender], TumblingWindow(hour, 1)
Output (HoppingWindow(minute, 2, 1)):
[
    {
        "city": "Springfield",
        "gender": "Male",
        "totalamount": 297,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "city": "Baltimore",
        "gender": "Male",
        "totalamount": 19,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "city": "Omaha",
        "gender": "Male",
        "totalamount": 245,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "city": "Tacoma",
        "gender": "Female",
        "totalamount": 21,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "city": "Memphis",
        "gender": "Female",
        "totalamount": 60,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "city": "Schaumburg",
        "gender": "Female",
        "totalamount": 254,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "city": "Memphis",
        "gender": "Male",
        "totalamount": 290,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "city": "Canton",
        "gender": "Male",
        "totalamount": 77,
        "time": "2017-05-21T13:00:00.0000000Z"
    },
    {
        "city": "Baltimore",
        "gender": "Female",
        "totalamount": 46,
        "time": "2017-05-21T13:00:00.0000000Z"
    }
]
PowerBI (HoppingWindow(minute, 2, 1)):

Q1 Q1 Q1 Q1

Query 7

Alert (SELECT '1') if a Customer has performed two transactions of 'Type = 1' in a window of an hour (use a sliding window).

SELECT
    [customerRef].[first_name] AS Name,
    [customerRef].[last_name] AS Surname,
    CAST([BDSMastersInput].[CardNumber] AS BIGINT) AS CardNo,
    COUNT (*) AS Transactions,
    System.Timestamp AS Time
INTO
    [BDSMastersOutput]
FROM
    BDSMastersInput
INNER JOIN [customerRef]
    ON CAST([customerRef].[card_number] AS BIGINT) = CAST([BDSMastersInput].[CardNumber] AS BIGINT)
WHERE CAST([BDSMastersInput].[Type] AS BIGINT) = 1
GROUP BY [customerRef].[first_name], [customerRef].[last_name], CAST([BDSMastersInput].[CardNumber] AS BIGINT), SlidingWindow(hour, 1)
HAVING Transactions = 2
Output:
[
    {
        "name": "Angela",
        "surname": "Moreno",
        "cardno": 3534633361736454,
        "transactions": 2,
        "time": "2017-05-21T12:01:01.0010000Z"
    },
    {
        "name": "Gerald",
        "surname": "Young",
        "cardno": 50384191807294800,
        "transactions": 2,
        "time": "2017-05-21T12:01:01.0010000Z"
    },
    {
        "name": "Richard",
        "surname": "Russell",
        "cardno": 5200253312538103,
        "transactions": 2,
        "time": "2017-05-21T12:01:01.0010000Z"
    },
    {
        "name": "Bruce",
        "surname": "Morrison",
        "cardno": 5602246755688900,
        "transactions": 2,
        "time": "2017-05-21T12:01:01.0010000Z"
    }
]

Query 8

Alert (SELECT '1') if the 'Area Code' of the ATM of the transaction is not the same as the 'Area Code' of the 'Card Number' (Customer’s Area Code) - (use a sliding window).

SELECT
    CAST([atmRef].[area_code] AS BIGINT) AS AtmAreaCode,
    CAST([customerRef].[area_code] AS BIGINT) AS CustomerAreaCode,
    COUNT (*),
    System.Timestamp AS Time
INTO
    [BDSMastersOutput]
FROM
    BDSMastersInput
INNER JOIN [customerRef]
    ON CAST([customerRef].[card_number] AS BIGINT) = CAST([BDSMastersInput].[CardNumber] AS BIGINT)
INNER JOIN [atmRef]
    ON CAST([atmRef].[atm_code] AS BIGINT) = CAST([BDSMastersInput].[ATMCode] AS BIGINT)
WHERE CAST([atmRef].[area_code] AS BIGINT) != CAST([customerRef].[area_code] AS BIGINT)
GROUP BY CAST([atmRef].[area_code] AS BIGINT), CAST([customerRef].[area_code] AS BIGINT), SlidingWindow(hour, 1)
Output:
[
    {
        "atmareacode": 10,
        "customerareacode": 1,
        "count": 1,
        "time": "1970-01-01T12:01:01.0010000Z"
    },
    {
        "atmareacode": 4,
        "customerareacode": 2,
        "count": 11,
        "time": "1970-01-01T12:01:01.0010000Z"
    },
    {
        "atmareacode": 3,
        "customerareacode": 4,
        "count": 1,
        "time": "1970-01-01T12:01:01.0010000Z"
    },
    {
        "atmareacode": 9,
        "customerareacode": 10,
        "count": 3,
        "time": "1970-01-01T12:01:01.0010000Z"
    },
    {
        "atmareacode": 10,
        "customerareacode": 6,
        "count": 1,
        "time": "1970-01-01T12:01:01.0010000Z"
    }
]
PowerBI:

Q1 Q1 Q1