Friday, July 26, 2019

Azure CosmosDB Change Feed in Action


     Change Feed feature exposes Cosmos DB logs to outside of the database just like this kitchen window. You see what is happening when. I use this analogy in my CosmosDB talks and I get great feedback from the audience.

     It can be a challenge to learn new technologies. You know you will miss something If the presenter starts his demo with the words "It's very easy.....Look you type init, pick this option and that's it....." I always want to say. "Yeah, It's easy for you. I have no idea what you just did." I wrote about the Change Feed function earlier and I realized I made the same mistake. I showed you how to set it up, but I did not talk about how your custom code is going to reach what is changed in your Cosmos Database. I am going to fix that today.

     Here what we are going to do today. Let's say we work for one of those Home Security companies. We install a security system that communicates to CosmosDB when security sensors trigger an alert. We offer sensors to detect open doors/windows, broken glasses, flood, fire and gas leak. Each sensor connects to the main unit/controller. When there is an alert, main unit sends information to our CosmosDB database. I want to use Change Feed functionality to detect the new transactions and depending on the alert type, I want to do something like call police, fire department, notify user.

     First, I setup my database. To keep things simple, I have two containers. One is going to hold all information about the Customers and their system. Other one is going to hold all the transactions coming from the IOT unit. leases container created when I setup the Change Feed. It tracks what's changed in selected container.

      I inserted the following JSON document in the Customers container. I am going to use this Customer for the demo. This customer has 9 sensors installed to his/her house. All sensors connect to main device. 

{
    "CustomerId": 1,
    "Name": "Hasan Savran",
    "CellPhone": "123-456-7890",
    "HomePhone": "123-456-7890",
    "Zipcode": "44000",
    "DeviceId": "AG-10001",
    "Sensors": [
        {
            "SensorId": 1,
            "SensorsType": "Magnetic",
            "Location": "Main Door"
        },
        {
            "SensorId": 2,
            "SensorsType": "GlassBreak",
            "Location": "Window"
        },
        {
            "SensorId": 3,
            "SensorsType": "GlassBreak",
            "Location": "Window"
        },
        {
            "SensorId": 4,
            "SensorsType": "GlassBreak",
            "Location": "Window"
        },
        {
            "SensorId": 5,
            "SensorsType": "GlassBreak",
            "Location": "Slide Door"
        },
        {
            "SensorId": 6,
            "SensorsType": "H2O",
            "Location": "Kitchen"
        },
        {
            "SensorId": 7,
            "SensorsType": "CO2",
            "Location": "Laundry"
        },
        {
            "SensorId": 8,
            "SensorsType": "Fire",
            "Location": "1stFloor"
        },
        {
            "SensorId": 9,
            "SensorsType": "H2O",
            "Location": "Basement"
        }
    ],
    "id": "8a631e15-646d-42db-8834-8ae4f950946b",
    "_rid": "ebEYAP4OXpsBAAAAAAAAAA==",
    "_self": "dbs/ebEYAA==/colls/ebEYAP4OXps=/docs/ebEYAP4OXpsBAAAAAAAAAA==/",
    "_etag": "\"3700274f-0000-0100-0000-5d389aeb0000\"",
    "_attachments": "attachments/",
    "_ts": 1563990763
}

     I have one more container named Signals. This container accepts all the transactions from all customers. There will be no updates. IOT Devices will send a simple JSON document to this container. Here is a sample Signal document. I used the deviceid as my partition key. In this way, all alerts from the same device will be in the same partition. When we receive an alert from Customer, Change Feed will run an Azure Function and the function will check the ReceivedCode and do something about it.

{
    "DeviceId": "AG-10001",
    "SensorId": 1,
    "ReceivedOn": "2019-07-20T23:07:00",
    "ReceivedCode": "Open",
    "id": "022f32ef-586b-4400-ab6e-1ffdc8e066eb",
    "_rid": "ebEYALzmqzgBAAAAAAAAAA==",
    "_self": "dbs/ebEYAA==/colls/ebEYALzmqzg=/docs/ebEYALzmqzgBAAAAAAAAAA==/",
    "_etag": "\"37005b4f-0000-0100-0000-5d389b9e0000\"",
    "_attachments": "attachments/",
    "_ts": 1563990942
}

     Next, I configured the Change Feed feature for this database. I followed the same steps of my older post to configure it. Following screenshot shows you my settings for the CosmosDB trigger for my Azure function. You need to select which database and container should be watched also in which container should contain the changed data information. leases container is the default name and you can change it if you like. Also, parameter name will be used in your custom code in Azure Function.


     If you click on the Function name, you will see the code which will run whenever database receives a new alert from Customer in my case. There will be a default code here like this. Usually, this is the point that presenter (including me) will update or insert something to the database and come back to this page and you will see that the following code runs and shows you how many documents are changed and the first document's id. 

#r "Microsoft.Azure.DocumentDB.Core"
using System;
using System.Collections.Generic;
using Microsoft.Azure.Documents;

public static void Run(IReadOnlyList<Document> input, ILogger log)
{        
    if (input != null && input.Count > 0)
    {
        log.LogInformation("Documents modified " + input.Count);
        log.LogInformation("First document Id " + input[0].Id);
    }
}

     This is great but I want to do more than that. How am I going to access to changed data? What should I do if there is more than one change or insert? In my case, I need to access to SensorCode attribute so I can do something about this alert. To answer these questions, you need to know more about the Azure Functions. If you can see the number of modified documents by this code, that means you are done with Change Feed functionality. First, we need some kind of loop so if the code can process multiple changes. To do that, I will use a simple foreach loop.

public static void Run(IReadOnlyList<Document> input, ILogger log)
{        
    if (input != null && input.Count > 0)
    {
        log.LogInformation("Documents modified " + input.Count);
        log.LogInformation("First document Id " + input[0].Id);
        foreach(Document document in input)
        {            
            
        }
    }
}

     Now, I am ready to find a way to check the data. The problem is, we are in Azure Function and if you want to convert the input into a Signal object, you really need to know how Azure Functions works. Easiest way to your data is GetPropertyValue function of IReadOnlyList. This will give us the value of SensorCode.


public static void Run(IReadOnlyList<Document> input, ILogger log)
{        
    if (input != null && input.Count > 0)
    {
        log.LogInformation("Documents modified " + input.Count);
        log.LogInformation("First document Id " + input[0].Id);
        foreach(Document document in input)
        {            
            var currentCode = document.GetPropertyValue<string>("SensorCode");
        }
    }
}

     This works but if you need to check many properties, you may want to convert the document into an object so all attributes can be available without using GetPropertyValue. You can convert document into a dynamic object. Then all properties should be available like the following code.

public class Signal
{
        public string id { get; set; }
        public string DeviceId { get; set; }
        public int SensorId { get; set; }
        public string ReceivedCode { get; set; }
        public DateTime FiredOn { get; set; }
}

public static void Run(IReadOnlyList<Document> input, ILogger log)
{        
    if (input != null && input.Count > 0)
    {
        log.LogInformation("Documents modified " + input.Count);
        log.LogInformation("First document Id " + input[0].Id);
        foreach(Document document in input)
        {
            Signal item = (dynamic)document;
            log.LogInformation($"Sensor Code: {item.SensorCode}");
        }
    }
}

     Now, I can check what sensor sends so I can do something about it. I will add a Case statement and check the sensorcode value and call a function.

public class Signal
{
        public string id { get; set; }
        public string DeviceId { get; set; }
        public int SensorId { get; set; }
        public string ReceivedCode { get; set; }
        public DateTime FiredOn { get; set; }
}

public static void Run(IReadOnlyList<Document> input, ILogger log)
{        
    if (input != null && input.Count > 0)
    {
        log.LogInformation("Documents modified " + input.Count);
        log.LogInformation("First document Id " + input[0].Id);
        foreach(Document document in input)
        {
            Signal item = (dynamic)document;
            switch (item.ReceivedCode){
                case "Open":                
                log.LogInformation("Door & Window opened. Notify customer by cell");
                break;
                case "BrokenGlass":                
                log.LogInformation("A window is broken. Call local police and notify customer");                
                break;
                case "GasAlarm":
                log.LogInformation("Co2 or Natural Gas Leak. Call local fire Department");                
                break;
                case "FloodAlarm":
                log.LogInformation("Water is detected. Notify Customer by cell");
                break;
                case "FireAlarm":
                log.LogInformation("Smoke is detected. Call local fire Department.");
                break;
                default:
                // Notify Technical Service
                break;
            }
        }
    }
}

     To test this, you can go back to database and create new documents in Signal container. I will go one step further and show you what it will take to trigger this from Cosmos DB SDK 3.0. I created a web page that simulates the sensor alerts.


     When I click on any of those buttons, Front-end generates the object and sends it to SignalBridgeAsync function in my web server. We server runs the following code to call CosmosDB and insert this data into the container.

public async Task<bool> SignalBridgeAsync(Signal signal)
{
   try
   {
    signal.id = Guid.NewGuid().ToString();
    CosmosClient cosmosClient = new CosmosClient(cstring);                
    var response = await cosmosClient.GetDatabase(_dbname).GetContainer(_container).CreateItemAsync<Signal>(signal);
    return true;  
   }
   catch
   {
    return false;
   }
}
   
      That's it, I try to show you what it takes to make Change Feed run from start to end. Here is a screenshot of Azure Functions after I click on those buttons couple of times.



No comments:

Post a Comment