Windows Azure Storage - 表数据查询及WAD分析最佳实践

在开发Windows Azure应用程序时,经常会出现读取/写入WAS表存储(Table Storage)慢甚至超时的问题,引起这类问题的主要原因是,开发者在对表数据规划和操作时没有充分注意到表存储的后端特性,导致简单的写入/读取操作密集到WAS后端繁忙的服务器节点上。本文就常见的表数据规划、查询及WAD表数据分析等问题加以小结。

1. 了解WAS底层实现结构

WAS存储内部包括如下三层结构,所有的数据访问请求都是经过前端进行逐步处理的。

前端(Front End,FE):负责处理接收外部请求、验证用户身份和分发数据请求。前端根据其缓存的存储分区映射表(Partition Map)将合法请求发送到相应的存储分区(Partition),不能通过验证的请求直接在前端被拒绝。

存储分区层(Partition Layer,即索引分区):含有所有访问数据的检索关键字,存储分区层根据数据请求的关键字进行索引,如果是读操作,那么存储分区层在其缓存块中先查找结果,若存在相应缓存,则直接返回。如果没有缓存或者当前操作为写操作,存储分区层则根据其含有的检索表找到与数据请求对应的底层存储文件系统服务器,执行数据请求。每个Partition Layer都可以访问底层所有的分布式文件存储服务器。

分布式文件系统层(Distributed File System Layer):数据最终存储的位置,每份数据均会存储在3个不同的分布式文件服务器中,实现冗余。

 

2. 简单的Table Storage操作

https://blogs.msdn.com/b/jianwu/archive/2014/08/14/azure-paas-2.aspx常规的WAS编程实践。

如常见的Table Storage操作:

    定义表数据结构,指定Partition Key和Row Key

    public class CustomerEntity : TableEntity
    {
        public CustomerEntity(string lastName, string firstName)
        {
            this.PartitionKey = lastName;
            this.RowKey = firstName;
        }

        public CustomerEntity() { }

        public string Email { get; set; }

        public string PhoneNumber { get; set; }
    }

 

        public void TestTable()
        {
            // Create the table client.
            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

            String tableName = "Customer";

            // Create the CloudTable object that represents the table.
            CloudTable table = tableClient.GetTableReference(tableName);
            table.CreateIfNotExists();

            // Create a new customer entity.
            CustomerEntity customer1 = new CustomerEntity("Harp", "Walter");
            customer1.Email = "Walter@contoso.com";
            customer1.PhoneNumber = "425-555-0101";

            // Create the TableOperation that inserts the customer entity.
            TableOperation insertOperation = TableOperation.Insert(customer1);

            // Execute the insert operation.
            table.Execute(insertOperation);

            // Create the batch operation. 注意,此处,批量添加的数据必须具备同样的Partition Key
            TableBatchOperation batchOperation = new TableBatchOperation(); 
            // Create a customer entity and add it to the table.
            CustomerEntity customer2 = new CustomerEntity("Smith", "Jeff");
            customer2.Email = "Jeff@contoso.com";
            customer2.PhoneNumber = "425-555-0104"; 
            // Create another customer entity and add it to the table.
            CustomerEntity customer3 = new CustomerEntity("Smith", "Ben");
            customer3.Email = "Ben@contoso.com";
            customer3.PhoneNumber = "425-555-0102"; 
            // Add both customer entities to the batch insert operation.
            batchOperation.Insert(customer2);
            batchOperation.Insert(customer3); 
            // Execute the batch operation.
            table.ExecuteBatch(batchOperation);

            // Construct the query operation for all customer entities where PartitionKey="Smith".
            TableQuery<CustomerEntity> query = new TableQuery<CustomerEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, "Harp"));

            // Print the fields for each customer.
            foreach (CustomerEntity entity in table.ExecuteQuery(query))
            {
                Console.WriteLine("{0}, {1}\t{2}\t{3}", entity.PartitionKey, entity.RowKey,
                 entity.Email, entity.PhoneNumber);
            }
        }

提示: 开发者可以使用Global Windows Azure中的WAS帐号或者China Azure中的WAS帐号来完成本实践,具体WAS连接字符串稍有不同,如下:

如开发者使用China
Azure时,WAS连接字符串类似以下样例:

CloudStorageAccount storageAccount = CloudStorageAccount.Parse("BlobEndpoint=https://portalvhds88zyhq6kndn2p.blob.core.chinacloudapi.cn/;AccountName=portalvhds88zyhq6kndn2p;AccountKey=××××+9mUrHUzrLDg==");

如开发者使用Global Windows Azure,WAS连接字符串类似以下样例:

CloudStorageAccount storageAccount = CloudStorageAccount.Parse.("DefaultEndpointsProtocol=https;AccountName=mytest3939393;AccountKey=sZjfJiETVsXC****&&$$$m4yfjdajf*****%%U3fvxVHqGmJhIcaVaX3g=="); 

除此之外,开发者若单独进行WAS实践,可以使用本地模拟器中的WAS帐号进行测试,连接方式为:CloudStorageAccount storageAccount = CloudStorageAccount.DevelopmentStorageAccount;

 

3. Table Storage表数据规划

对于存储在WAS中的数据,Microsoft Azure根据以下关键字建立索引分区(Partition),增强外部用户查找、读取WAS数据的速度。

  • 文件存储(Blobs):Container Name + Blob Name。
  • 表存储(Tables):Table Name + Partition Key。
  • 队列存储(Queues):Queue Name。

对于同一个索引区下存储的WAS数据(Blob/Message/Entity),WAS提供了更优的存储性能,如https://msdn.microsoft.com/en-us/library/azure/dn249410.aspx

单个Blob的目标吞吐量 单个Queue的目标吞吐量 (消息大小为1 KB) 单个表分区(Table Partition)的目标吞吐量

可达60 MB/秒 或 可达500操作/秒

可达2000消息/秒

可达 2000 实体操作/秒

 

基于以上WAS的存储特性,开发者在开发实践中可以进行如下有针对性的优化设计,实现更优的存储体验。

  • 将需要批量写入/读写的数据放在同一个分区中。(具有相同的PartitionKey)
  • 将不相关的表数据分布在不同的分区中。(采用GUID或者a-z/1-9组成的随机序列为PartitionKey,使得数据分布在多个WAS存储节点上)
  • 针对关联查询,合理利用Partition Key + Row Key(如Partition Key一致,Row Key不同)

 

4. 表数据查询最佳实践

需要注意的是,Table Storage中每次读操作(Transaction)最多只能返回1000个数据实体,但上述2中调用到的ExecuteQuery方法会将符合查询条件的所有数据实体返回给客户端,故:若满足查询条件的数据实体数多于1000个,ExecuteQuery方法内部会自动分多次读取,最后将所有满足条件的数据结果返回给客户端。可以想象,如果满足查询条件的数据实体非常多,客户端需要等待ExecuteQuery方法很长时间才能拿到数据,应用程序的效率就比较低下。在开发过程中,开发者可以通过两种不同的方法来解决这个问题。

(1)查询语句中明确返回的数据实体个数

// Construct the query operation for top 2 customer entities where
PartitionKey="Smith"

TableQuery<CustomerEntity> query2 = new TableQuery<CustomerEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey",QueryComparisons.Equal, "Smith")).Take(2);

 

// Print the fields for each customer

int index = 1;

foreach (CustomerEntity entity in table.ExecuteQuery(query2))

{

    Console.WriteLine("{0}, {1}\t{2}\t{3}", entity.PartitionKey,entity.RowKey,entity.Email,entity.PhoneNumber);

    index++;

    if (index > 2)

        break;

}

上述代码中,查询语句的属性只是规定每次读取操作(transaction)只返回两条记录,但若存储表中存在多条满足条件的数据,客户端若不断调用ExecuteQuery方法,则依然会自动读取多次来满足客户端数据需求,所以添加foreach循环来控制实际需要的数据实体个数。

(2)通过ExecuteQuerySegmented方法和Continuous Token来控制查询数据条数

对于需要对Table Storage数据进行前端分页显示或者上下文关联处理时,开发者应该控制每次查询结果的数量,并且标记当前查询结果在整个Table Storage中的位置(Continuous Token),接下来的查询可以通过相对位置来继续。示例代码如下:

CloudStorageAccount storageAccount = CloudStorageAccount. Parse("DefaultEndpointsProtocol=https;AccountName=[AccountName];AccountKey=[AccountKey]");

// Create the table client

CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

// Create the table if it doesn't exist

CloudTable table = tableClient.GetTableReference("bpeople");

 

TableQuery<CustomerEntity> query = newTableQuery<CustomerEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey",
QueryComparisons.Equal, "Smith")).Take(3);

List<CustomerEntity> entryList = new List<CustomerEntity>();

TableQuerySegment<CustomerEntity> currentSegment = null;

 

//read for the first segment of data entities

currentSegment = table.ExecuteQuerySegmented(query, currentSegment !=null ? currentSegment.ContinuationToken : null);

entryList.AddRange(currentSegment.Results);

 

// Print the fields for first segment of data entities

Console.WriteLine("output from first segment: ");

foreach (CustomerEntity entity in entryList)

{

Console.WriteLine("{0},{1}\t{2}\t{3}", entity.PartitionKey, entity.RowKey,entity.Email,entity.PhoneNumber);

}

//clear the list

entryList.Clear();

 

//read for the second segment of data entities

currentSegment = table.ExecuteQuerySegmented(query, currentSegment !=null ? currentSegment.ContinuationToken : null);

entryList.AddRange(currentSegment.Results);

// Print the fields for second segment of data entities.

Console.WriteLine("output from second segment: ");

foreach (CustomerEntity entity in entryList)

{

    Console.WriteLine("{0}, {1}\t{2}\t{3}", entity.PartitionKey,entity.RowKey,entity.Email, entity.PhoneNumber);

}

 

//modify the query to read above all in one query for comparing

query = new TableQuery<CustomerEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey",QueryComparisons. Equal, "Smith")).Take(6);

// Print the fields for data entities after directly reading

Console.WriteLine("output from reading above in one segment:");

foreach (CustomerEntity entity in table.ExecuteQuery(query))

{

    Console.WriteLine("{0},{1}\t{2}\t{3}", entity.PartitionKey, entity. RowKey,entity.Email,entity.PhoneNumber);

}

Console.Read();

运行结果如图所示。可以看到,分批查询得到的数据和直接查询的数据在顺序上是一致的。

 

 

5. WAD数据分析最佳实践

之前的文章https://blogs.msdn.com/b/jianwu/archive/2014/08/15/azure-paas-4.aspx中实践了如何部署WAD(Windows Azure Diagnostics)到云服务(Cloud Service)中,从而通过WAD来将云服务运行过程中虚机中的实时日志和虚机性能数据自动传输到指定的WAS存储中,其中主要的虚机日志和性能数据都会被写到Table Storage中,如:

目标云服务在运行过程中,有时候会由于代码异常、外部攻击、平台故障等问题,导致同一个Role下面的某些虚机运行状态异常,如CPU较其他虚机高很多,或流量较其他虚机多很多,或内存使用不一致等等。

针对这些问题,一个很有效的办法是:跟踪WAD中监控的虚机性能数据(Performance Counter),从对应的WAD存储表WADPerformanceCountersTable中找到信息。

实现该类型的WAD性能数据分析时,一般会遇到两个主要问题:

1. 如何获取该表数据定义来开始开发客户端分析工具

一个有效的办法时,找到目标存储表,查看任意一条数据,从entity properties中找到每一个数据属性的定义,从而来实现客户端程序的表结构定义。

    public class WadPerformanceCountersTable : TableEntity
    {
        public string PartitionKey { get; set; }

        public string RowKey { get; set; }

        public string Timestamp { get; set; }

        public long EventTickCount { get; set; }

        public string DeploymentId { get; set; }

        public string Role { get; set; }

        public string RoleInstance { get; set; }

        public string CounterName { get; set; }

        public double CounterValue { get; set; }

        public DateTime EventDateTime
        {
            get { return new DateTime(EventTickCount); }
        }
    }

2. 如何通过WAD表数据来分析、实时检查云服务状态

有了此表结构定义之后,结合前文讲述的表数据查询方法,可以分别实现实时分析CPU、Memory、流量统计,如下: 

公共定义:

        public static CloudStorageAccount cloudStorageAccount = null;
        public static CloudTableClient cloudTableClient = null;
        public static CloudTable table = null;
        public static double[] avgCPU = new double[20]; //每一个虚机中一段时间内的CPU平均值
        public static double[] avgReq = new double[20]; //每一个虚机中一段时间内的处理流量平均值
        public static double[] avgMem = new double[20];//每一个虚机中一段时间内的可用Memory平均值

 

       //获取CPU数据,并进行统计分析

        static void getCPU(string instanceName, string deploymentID, int instanceID)
        {
            Console.WriteLine("===== CPU counters for {0} of deployment {1}", instanceName, deploymentID);

            DateTime now = DateTime.UtcNow;
            DateTime MinutesAgo = now.AddMinutes(-8); //此处时间开发者可以酌情调整
            string partitionKeyAgo = string.Format("0{0}", MinutesAgo.Ticks.ToString());

            string query = "PartitionKey gt '" + partitionKeyAgo + "' and RoleInstance eq '" + instanceName + "' and CounterName eq '\\Processor(_Total)\\% Processor Time' and DeploymentId eq '" + deploymentID +"'";
            TableQuery<WadPerformanceCountersTable> rangeQuery = new TableQuery<WadPerformanceCountersTable>().Where(query).Take(5);
            List<WadPerformanceCountersTable> performanceCountersEntitiesList = null;
            try
            {
                performanceCountersEntitiesList = table.ExecuteQuery(rangeQuery).ToList<WadPerformanceCountersTable>();
            }
            catch (DataServiceQueryException dsqe)
            {
            }

            //queue length
            for (int i = 0; i < performanceCountersEntitiesList.Count; i++)
            {
                Console.WriteLine(performanceCountersEntitiesList[i].EventDateTime + "   CPU usage: " + performanceCountersEntitiesList[i].CounterValue);
                avgCPU[instanceID] += performanceCountersEntitiesList[i].CounterValue;
            }

            avgCPU[instanceID] = avgCPU[instanceID] / performanceCountersEntitiesList.Count; //得出每台虚机一段时间内CPU使用率的平均值

        }

 

       //获取Available Memory数据,并进行统计分析

        static void getMem(string instanceName, string deploymentID, int instanceID)
        {
            Console.WriteLine("===== Memory counters for {0} of deployment {1}", instanceName, deploymentID);

            DateTime now = DateTime.UtcNow;
            DateTime MinutesAgo = now.AddMinutes(-11);
            string partitionKeyAgo = string.Format("0{0}", MinutesAgo.Ticks.ToString());

            string query = "PartitionKey gt '" + partitionKeyAgo + "' and RoleInstance eq '" + instanceName + "' and CounterName eq '\\Memory\\Available MBytes' and DeploymentId eq '" + deploymentID + "'";
            TableQuery<WadPerformanceCountersTable> rangeQuery = new TableQuery<WadPerformanceCountersTable>().Where(query).Take(5);
            List<WadPerformanceCountersTable> performanceCountersEntitiesList = null;
            try
            {
                performanceCountersEntitiesList = table.ExecuteQuery(rangeQuery).ToList<WadPerformanceCountersTable>();
            }
            catch (DataServiceQueryException dsqe)
            {
            }

            //queue length
            for (int i = 0; i < performanceCountersEntitiesList.Count; i++)
            {
                Console.WriteLine(performanceCountersEntitiesList[i].EventDateTime + "   Memory Available: " + performanceCountersEntitiesList[i].CounterValue);
                avgMem[instanceID] += performanceCountersEntitiesList[i].CounterValue;
            }

            avgMem[instanceID] = avgMem[instanceID] / performanceCountersEntitiesList.Count;//得出每台虚机一段时间内剩余Memory的平均值

        }

 

        //获取\ASP.NET Applications(__Total__)\Requests/Sec'数据,并进行统计分析

        static void getASPReq(string instanceName, string deploymentID, int instanceID)
        {
            Console.WriteLine("===== ASP requests for {0} of deployment {1}", instanceName, deploymentID);

            DateTime now = DateTime.UtcNow;
            DateTime MinutesAgo = now.AddMinutes(-23);
            string partitionKeyAgo = string.Format("0{0}", MinutesAgo.Ticks.ToString());

            string query = "PartitionKey gt '" + partitionKeyAgo + "' and RoleInstance eq '" + instanceName + "' and CounterName eq '\\ASP.NET Applications(__Total__)\\Requests/Sec' and DeploymentId eq '" + deploymentID + "'";
            TableQuery<WadPerformanceCountersTable> rangeQuery = new TableQuery<WadPerformanceCountersTable>().Where(query).Take(5);
            List<WadPerformanceCountersTable> performanceCountersEntitiesList = null;
            try
            {
                performanceCountersEntitiesList = table.ExecuteQuery(rangeQuery).ToList<WadPerformanceCountersTable>();
            }
            catch (DataServiceQueryException dsqe)
            {
            }

            //queue length
            for (int i = 0; i < performanceCountersEntitiesList.Count; i++)
            {
                Console.WriteLine(performanceCountersEntitiesList[i].EventDateTime + "   ASP requests: " + performanceCountersEntitiesList[i].CounterValue + "/sec") ;
                avgReq[instanceID] += performanceCountersEntitiesList[i].CounterValue;
            }

            avgReq[instanceID] = avgReq[instanceID] / performanceCountersEntitiesList.Count;//得出每台虚机一段时间内处理请求的平均值

        }

 有了上述实现,开发者可以通过以下调用,实现自动化分析和小结:

            string storageName = "yourstorageaccount";
            string storageKey = "yourstoragekey/g==";
            string roleName = "yourolename";
            string deploymentID = "yourdeploymentid";

            cloudStorageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + storageName + ";AccountKey=" + storageKey);
            cloudTableClient = cloudStorageAccount.CreateCloudTableClient();
            table = cloudTableClient.GetTableReference("WADPerformanceCountersTable");
            checkRole(roleName, deploymentID);

 

        public static void checkRole(string roleName, string deploymentID)
        {
            for (int i = 0; i < 20; i++) //本示例中Role最多可能有20个实例,故此处取20
            {
                avgCPU[i] = -0.001;
                avgReq[i] = -0.001;
                avgMem[i] = -0.001;
                getCPU(roleName + "_IN_" + i.ToString(), deploymentID, i);
                getASPReq(roleName + "_IN_" + i.ToString(), deploymentID, i);
                getMem(roleName + "_IN_" + i.ToString(), deploymentID, i);
            }

            Console.WriteLine();
            Console.WriteLine("--------summary-------");
            Console.WriteLine("Timestamp: {0}", DateTime.UtcNow.AddMinutes(-8).ToString());
            Console.WriteLine("Deployment ID : {0}", deploymentID);
            Console.WriteLine("Role Name : {0}", roleName);
            int count = 0;
            double min = 100;
            double max = 1;
            for (int i = 0; i < 20; i++)
            {
                if (avgCPU[i] > 0)
                {
                    Console.WriteLine("Avg CPU of instance {0} is {1}", i, avgCPU[i]);
                    count++;
                    if (avgCPU[i] > max)
                        max = avgCPU[i];
                    if (avgCPU[i] < min)
                        min = avgCPU[i];
                }
                if (avgReq[i] > 0)
                {
                    Console.WriteLine("Avg ASP requests of instance {0} is {1} /sec", i, avgReq[i]);
                }
                if (avgMem[i] > 0)
                {
                    Console.WriteLine("Avg Memory left of instance {0} is {1} MBytes", i, avgMem[i]);
                }
            }
            Console.WriteLine("Total : {0} instances.", count);
            if (max > 30 && (max-min) > 20)
            {
                Console.WriteLine("CPU usage is different and now need investigation.");
            }
            else
            {
                Console.WriteLine("CPU usage is similar and Role is ok.");
            }

        }

运行下来,结果如下:

以上实现的示例代码分享在:https://zcdnstorage.blob.core.windows.net/container1/WADChecking.zip,开发者可以借鉴使用。

 

小结:

  1. WAS通过建立索引分区(partition)来组织存储用户数据;
  2. 开发者需要充分利用索引来规划表数据和进行表操作。
  3. Table Storage表数据查询需要使用一些技巧来保证效率。
  4. WAD数据分析能有效的实时跟踪云服务状态。
  5. 开发人员在table storage实践时,应考虑上述最佳实践。