Tuesday, December 10, 2019

Using Bulk Operations of Azure Cosmos DB by using SDK


     Azure Cosmos DB SQL API provides a way to retrieve documents from the database by using a T-SQL like language. This language is great to retrieve documents, but It does not support DELETE, UPDATE or INSERT statements. All you can use is SELECT statement. You are out of luck if you need to do any bulk operations with DELETE, UPDATE or INSERT statements. How do you update bunch of documents then?

     There used to be a tool/SDK provided by Cosmos DB team to do any bulk operations in Cosmos DB. Microsoft announced that we can do any bulk operations with Net SDK 3.4.0 or later. That means we don't need a separate SDK to do any bulk operations. In this post, I will show you how to update bunch of documents by using this new feature in SDK.

     I uploaded StackOverflow data into my Cosmos DB container, and I will use the Posts container. Here is an example of a document from Posts container. I want to update Tags property of hundreds records in the following example.


     In the following code, I run a SELECT statement which returns 378 JSON documents.

static async Task<List<StackOverflowPost>> LoadItemsToUpdate()
{
   var cosmosClient = new CosmosClient(connectionString);
   Container container = cosmosClient.GetContainer("Stackoverflow", "Posts");
   var cmd = "SELECT * FROM Posts o WHERE o.PostId < 1000";
   var query = new QueryDefinition(cmd);
   var queryResultSetIterator = container.GetItemQueryIterator<StackOverflowPost>(query);
   var posts = new List<StackOverflowPost>();
   try
   {
     while (queryResultSetIterator.HasMoreResults)
     {
       FeedResponse<StackOverflowPost> currentResultSet = await queryResultSetIterator.ReadNextAsync();
       foreach (StackOverflowPost current in currentResultSet)
       {
        posts.Add(current);
       }
     }
     Console.WriteLine("Number of Posts : " + posts.Count);
     return posts;
    }
    catch (Exception ex)
    {
     Console.WriteLine(ex.Message.ToString());
     return null;
    }
}

     To show you the difference between regular Update and Bulk Update, I have a regular update function here. This function takes each of 378 JSON documents and updates them separately.

static async Task RegularUpdate(List posts
 { 
  try 
  { 
   var cosmosClient = new CosmosClient(connectionString); 
   Container container = cosmosClient.GetContainer("Stackoverflow", "Posts");
   posts.ForEach(t => t.Tags = "Update tags"); 
   List concurrentTasks = new List();
   Console.WriteLine("Started : " + DateTime.Now.ToLongTimeString()); 
   double total =0; 
   foreach (var itemtoUpdate in posts
   { 
     var temp = await container.UpsertItemAsync(itemtoUpdate, new PartitionKey(itemtoUpdate.PostId)); 
     total += temp.RequestCharge
   } 
   Console.WriteLine("Ended : " + DateTime.Now.ToLongTimeString());
   Console.WriteLine("Used R/U : " + total); 
   return true
  } 
  catch (Exception ex
  { 
   Console.WriteLine(ex.Message.ToString()); 
   return false
  } 
 }

     When I run this function, here is the output I see for timings and Request Units. It took 11 seconds to update 378 Json documents by using regular update with no bulk functionality.



     Next, Let's try to update the same documents with bulk update. To enable the BulkExecution, I used AllowBulkExecution property of CosmosClient Options. I created cosmosClient object with this option. By doing that, I notified the SDK that I want to do some bulk operations in this function.
 Second difference between this and regular update function is, I created a list of tasks and attach each update task to the list then I ran all the tasks together by using Task.WhenAll() function.

static async Task<bool> BulkUpdate(List<StackOverflowPost> posts)
{
  try
  {

    var options = new CosmosClientOptions() { AllowBulkExecution = true };    
    var cosmosClient = new CosmosClient(connectionString, options);
    Container container = cosmosClient.GetContainer("Stackoverflow", "Posts");
    posts.ForEach(t => t.Tags = "Update tags");
    List<Task> concurrentTasks = new List<Task>();
    double total = 0;
    foreach (var itemtoUpdate in posts)
    {
       var tsk = container.UpsertItemAsync(itemtoUpdate, new PartitionKey(itemtoUpdate.PostId));
       concurrentTasks.Add(tsk);
    }
    Console.WriteLine("Started : " + DateTime.Now.ToLongTimeString());
    await Task.WhenAll(concurrentTasks);
    Console.WriteLine("Ended : " + DateTime.Now.ToLongTimeString());
    return true;

   }
     catch (Exception ex)
     {
        Console.WriteLine(ex.Message.ToString());
        return false;
     }
}

When I run this function, I see the following output.


     As you can see, Bulk function is completed faster than regular update function. It took 6 seconds to update 378 JSON objects when I enable Bulk execution options. I think code explains what really happens in the backend. When you use regular update, SDK creates a service call. If you enable Bulk Load option, then SDK creates batch of updates and sends them together.

     There are things you need to watch when you use Bulk Execution in your code. First, you might need to adjust Request Units since Bulk Load will consume it. Just like any other operations, you should try to use PartitionKey whenever you can in Bulk executions. If documents are in stream, use CreateItemStreamAsync function to avoid serialization.

5 comments:

  1. This comment has been removed by a blog administrator.

    ReplyDelete
  2. I don't think your example is valid - the second sample executed faster because you issued all requests at the same time and awaited all of them. In the first case you awaited each one separately. If you disable AllowBulkExecutions in the second sample you will still get 6 secs, I bet

    ReplyDelete
    Replies
    1. I do agree its not at all valid completly agree with ur comments.

      Delete
  3. This comment has been removed by the author.

    ReplyDelete
  4. Does have bulk insert any impact regarding RUs?

    ReplyDelete