StreamInsight手札(五)——使用EventFlowDebugger调试事件流

StreamInsight提供了EventFlowDebugger以方便用户进行事件流调试。这一工具拥有直观的图形化界面,支持即时调试和日志调试,支持跟踪和回溯。 本文将介绍EventFlowDebugger基本使用方法。 EventFlowDebugger分为在线模式和离线模式两种使用方法。在线模式下,用户连接到StreamInsight服务器,在线进行调试,或者录制某一段时间的事件流供离线模式使用。离线模式下,用户通过加载Trace文件对事件流进行调试。 下面对其主要功能进行介绍。 连接到服务器(在线) 在线模式需要用户连接到一个正在运行的StreamInsight服务器。 StreamInsight服务器有两种,嵌入服务器和远程服务器: 嵌入服务器(Embedded Server):var cepServerEmbedded = Server.Create(); 远程服务器(Remote Server):var cepServerRemote = Server.Connect(“http://servername/StreamInsight”); 如果使用远程服务器,则用户可以直接连接远程服务器的地址。如果程序使用嵌入服务器,则需要在程序中添加如下代码:  ServiceHost _dvHost = null;IManagementService _service = null;IManagementService proxy = server.CreateManagementService();_service = proxy;_dvHost = new ServiceHost(proxy); WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message);binding.Security.Message.ClientCredentialType = MessageCredentialType.Windows;binding.MaxReceivedMessageSize = 4655360;_dvHost.AddServiceEndpoint(typeof(IManagementService), binding, “http://localhost:8080”);_dvHost.Open(); 同时需要添加引用Microsoft.ComplexEventProcessing.ManagementService.dll,输入加亮部分地址就可以连接嵌入服务器了。 连接到服务器的几种常见错误 错误1 HTTP could not register URL http://+:8001/. Your process…

0

StreamInsight手札(四)——使用IEnumerable接口创建StreamInsight程序

本文将介绍如何利用IEnumerable接口创建StreamInsight应用程序。 IEnumerable与IObservable最大的区别在于IEnumerable是采用Pull模式获取数据,而IObservable则是用Push模式推送数据。 用户首先通过IEnumerable.GetEnumerator来获得数据源的Enumerator,然后通过调用IEnumerator.MoveNext和IEnumerator.Current来获取数据。StreamInsight的输入输出通过一个数据队列连接。输入适配器获得数据,并调用Enqueue将数据放入队列,输出适配器循环检测队列是否为空,如果不是空队列,则调用Dequeue将数据取出。 在HelloInsight程序基础上,可以通过如下修改来实现IEnumerable接口。 1、获取输入数据源的Enumerator(HelloPointInput.cs): public HelloPointInput(HelloInputConfig config) {     _config = config;       var streamReader = new StreamReader(config.fileName);     strings = new List<string>();                 while (!streamReader.EndOfStream)     {         strings.Add(streamReader.ReadLine());     }     stringEnumerator = strings.GetEnumerator();                 streamReader.Close(); } 2、读取Enumerator中的数据,并放入队列。注意判断队满的情况(HelloPointInput.cs): private void ProduceEvents() {     while (AdapterState != AdapterState.Stopping)     {…

0

StreamInsight手札(三)——使用IObservable接口创建StreamInsight程序

本文将介绍如何利用IObservable接口创建StreamInsight应用程序。 Observer是.net Framework 4中引入的开发模式。IObserver<T>和IObservable<T>接口为基于推送的通知提供通用机制,也称为观察者设计模式。IObservable<T> 接口表示发送通知(提供程序)的类;IObserver<T> 接口表示接收通知(观察器)的类。 在下列应用场景下,适合采用Observable模式: 历史数据查询。对有限长的大数据集进行临时的查询。 Ad hoc查询。对数据进行一系列交互的ad hoc查询。 内嵌于用户程序。StreamInsight内嵌于用户应用程序之中。 使用这种开发模型,开发者只需要: 实现IObservable接口 实现IObserver接口 绑定到查询 下面将用示例代码来说明.。 1、安装Reactive Extension for .Net 4:  2、新建一个工程,注意选择.net framework 4.0。 3、添加如下引用。System.CoreEx和System.Reactive在C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net4\目录下。 定义事件Payload: namespace HelloInsightObservable {     public class HelloPayload     {         public int value { get; set; }     } } 5、定义InputObservable类实现Iobservable接口 public class InputObservable…

0

StreamInsight手札(二)——构建HelloInsight程序

本文将详细介绍如何建立一个简单的StreamInsight程序——HelloInsight。这个程序接受来自输入适配器的字符串输入,并通过输出适配器输出。 1、安装StreamInsight和Visual Studio 2010。 2、新建一个新的工程。可以选择.Net Framework 3.5 SP1或.Net Framework 4。 3、添加引用。引用dll的位置在<your driver>\program files\microsoft streamInsight1.1\C:\Program Files\Microsoft StreamInsight 1.1\Bin。 4、在program.cs using添加到引用:  using Microsoft.ComplexEventProcessing;using Microsoft.ComplexEventProcessing.Adapters;using Microsoft.ComplexEventProcessing.Diagnostics;using Microsoft.ComplexEventProcessing.Linq;using Microsoft.ComplexEventProcessing.ManagementService; 5、Main函数创建CEP服务器。DefaultInstance是StreamInsight安装时指定的实例名。 using (Server server = Server.Create(“DefaultInstance”)){    Application application = server.CreateApplication(“HelloInsight”);}  6、创建事件负载(payload)。事件的负载是输入流里面需要CEP服务器处理的数据。事件负载不能是用户定义类型。 namespace HelloInsight{    public class HelloPayload    {        public string str { get; set; }    }}  7、创建输入输出适配器(Adapter)。输入输出适配器的结构基本一致,这里重点介绍输入适配器,输出适配器与之类似。适配器主要包括:配置类、工厂类和输入/输出类。配置类一般处理输入流的参数,比如流文件的名称等。工厂类用来实现输入、输出适配器的基类。输入、输出类处理数据流并生成可以供CEP服务器处理的事件。 HelloInsight这个例子里将输入的字符串作为参数定义配置类:  namespace HelloInsight.InputAdapters{    public…

2

StreamInsight手札(一)——初识

StreamInsight是SQL Server 2008 R2的新模块。它是一个基于.NET的平台,用于复杂事件处理(CEP,Complex Event Processing),即对于来自多数据源的无限事件序列提供近乎零延时的连续处理。StreamInsight是一个临时查询处理引擎,它可以支持应用程序对时间窗口内的事件进行查询处理。它对流数据的处理范围包括简单聚合、多数据源事件相关性、事件模式检测、甚至建立复杂事件序列和分析模型。StreamInsight的程序模型可以让用户通过LINQ定义这些查询,同时支持将查询结果无缝整合到C#的代码中。 传统的数据库针对静态数据操作,所有信息先被实体化存储在数据库中,然后被取出,再进行一系列的计算。而StreamInsight则针对的是事件流的操作,事件流从入口通过适配器进入,经过一系列查询逻辑的处理,最终通过输出适配器分发到各个目标上。事件流数据暂时保存于内存中,减少了数据存储与读取的时间。同时,StreamInsight了引入了一种时间窗口机制,即对事件流按照时间窗口切片统计,随着事件流入不断推移时间窗口,保持窗口中统计最新的数据。 因此,StreamInsight适用场景有如下特点: 输入为数据流尤其是多源无限长数据流 数据处理实时性要求高 高吞吐量 不关心中间结果 数据不需要永久保留 比如,多个收费站车辆通过信息作为数据流输入,统计一段时间内车辆的平均数。 下图是StreamInsight的架构: 由上图可以看出,StreamInsight主要包括三个部分:输入适配器(Input Adapter)、输出适配器(Output Adapter)以及CEP服务器。由于输入输出和CEP服务器都是异步运行的,所以StreamInsight提供了事件流调试器(Event Flow Debugger)方便用户调试事件的处理过程。  参考资料: 下载CU2http://blogs.msdn.com/b/streaminsight/archive/2010/06/28/update-on-the-update.aspx StreamInsight MSDN文档http://msdn.microsoft.com/en-us/library/ee362541(SQL.105).aspx 博客http://blogs.msdn.com/streaminsight/ 论坛http://social.msdn.microsoft.com/Forums/en-US/streaminsight StreamInsight MSDNhttp://msdn.microsoft.com/en-us/ee476990.aspx 样例http://streaminsight.codeplex.com Twitterhttps://twitter.com/streaminsight 软件测试工程师 金晶

1

浅谈SQL Server 2008 R2 中的新组件——StreamInsight

随着信息技术的广泛应用,数据流作为一种新颖的数据结构在日常生活中有着越来越广泛的应用,微软在SQL Server 2008 R2 中推出了分析处理数据流的新组件——StreamInsight。它提供了基于DotNet框架的开发环境,用户能够轻松地使用它来开发出健壮,高效地数据流处理程序。 StreamInsight的本质是复杂事件处理(Complex Event Processing,CEP)的应用程序框架,与传统的数据库查询处理不同,事件处理系统需要同时处理来自多个数据源的海量事件(Event),并且根据用户提供的查询语句以及匹配模式,实时地输出事件分析结果。我们在下表中列出了事件驱动应用和数据库应用的主要区别: 数据库应用 事件驱动的应用(Event Driven) 查询模式 特定的查询请求 连续的查询 响应时间 从几秒至数天 几毫秒或更少 数据流量 数百条记录/秒 >10000 事件/秒 通过使用StreamInsight,用户可以开发出基于CEP的程序来实时处理大量的原始数据,利用数据之间的层次和关联关系,有效的采用相应的规则进行处理,以降低进行事件分析,事件关联及事件解析等操作的代价。StreamInsight同时能够支持对数据流模式匹配、异常检测、趋势分析等操作,使用户能够更好地管理、监控和挖掘数据,最终使用户得到之前无法了解的信息,并能够更快速和更有效的进行操作决策,提高关键绩效指标(KPI)。 在StreamInsight的应用中,其核心为StreamInsight服务器,它主要由输入,输出适配器(Adaptor)以及CEP引擎(CEP Engine)组成。 CEP引擎(CEP Engine):所有的输入数据都将再CEP引擎中进行分析和处理,它根据用户定义的查询逻辑,有效地分析和转换输入的数据,并及时输出结果。 适配器(Adaptor):StreamInsight提供了适配器的框架,开发者能够通过实现不同的接口来开发不同种类的适配器。适配器分为两类,输入适配器(Input Adaptor)是连接外部存储设备如网络服务器,传感器同StreamInsight引擎的接口。而输出适配器则用于处理CEP引擎输出的结果并可以同时触发一系列的操作。 下图给出了StreamInsight应用的整体结构: StreamInsight平台提供了一个功能强大的对象模型,它包含了许多有用的特性使得我们能够开发出灵活和功能强大StreamInsight的程序。对于初次使用StreamInsight的开发者来说,参考网上的一些实例能够取得事半功倍的效果。 在我加入微软之前,也曾进行过一段时间的数据流研究,当时的感受是,由于没有很好的数据流开发框架,我需要对于不同的应用开发不同的数据流程序来分析算法的有效性,而随着StreamInsight的推出,不仅能够为企业用户创造很大的价值,普通研究者也能够使用它更有效地开展研究工作。 StreamInsight的相关组件可以在微软网站上直接下载,如果您需要获得更多的信息,请访问微软StreamInsight的相关网站。 软件开发工程师 李人和

6

SQL Server 2008 R2 StreamInsight简介

作为SQL Server 2008 R2一个主要的部分,StreamInsight提供了复杂事件处理(Complex Event Processing,CEP)功能。它处理的对象是例如证券交易行情、物联网、医疗监护等数量巨大并且需要实时处理的复杂事件。 从体系结构上看,StreamInsight类似于一个基于内存的数据库系统。它通过输入适配器接收事件并保存在内存里面。然后通过LINQ对输入事件进行处理得到结果,用于进一步的处理。StreamInsight的LINQ扩展提供了丰富的功能,包括过滤、分组、汇总、连接等常见的SQL功能,和窗口切分这样比较特殊的功能。 窗口切分是CEP系统和一般DBMS不太一样的一个地方。它用于把一个无限长的输入流转换成为一个一个有限的集合,从而可以方便的进行处理。StreamInsight提供多种窗口切分支持,以适应不同的应用场景需要。例如下面的LINQ查询代码采用了数量为2的计数窗口进行切分,计算后面一个事件相对于前面一个事件的变化值。 var deltas = from e in inputstream              group e by e.DeviceId into eachGroup              from win in eachGroup.CountByStartTimeWindow(2, CountWindowOutputPolicy.PointAlignToWindowEnd)              select new { ValueDelta = win.Delta(e => e.Value), SourceID = eachGroup.Key }; 此外,StreamInsight还提供了事件流程调试器,它对于了解事件引擎的内部处理流程、开发和调试应用都有很大的帮助。 和其他微软产品一样,StreamInsight虽然功能很复杂很强大,但是非常容易使用。你可以按照下面的步骤很快把它提供的例子运行起来,开始你的CEP应用之旅。 下载StreamInsight试用版(大小大约是14MB),安装到你的机器上。安装的时候,实例名栏目输入Default。你不需要安装任何的SQL Server实例; 打开Microsoft StreamInsight程序组,点击StreamInsight Samples,下载例子代码并展开; 启动Visual Studio 2008或者2010,打开StreamInsightProductTeamSamples\Applications\ComposingQueries\ ComposingQueries.sln; 打开成功以后,按F7编译,然后Ctrl+F5就可以运行了。 更多的知识,请参考Microsoft StreamInsight程序组里面提供的文档或者查阅无联网。 参考资料…

0