开发者中心

实时处理

概述

QuarkIoE允许开发人员和高级用户在QuarkIoE内部运行基于高级实时处理语言的实时物联网业务逻辑。本章节将介绍实时处理的基本概念,向你展示怎样开发你自己在QuarkIoE上运行的业务逻辑。

实时处理接口的更多信息可在参考指南中进行查询,在章节QuarkIoE事件语言和"实时语句"中。

QuarkIoE的实时处理是什么?

QuarkIoE包含一个实时引擎,它接收来自设备或其他数据源的所有数据,并对用户在数据上定义的业务逻辑进行立即处理。该用户定义的业务逻辑可以,通知新数据的应用,创建基于接收到数据的新数据(比如,当突破一个传感器的阈值时,就会发生报警),并触发设备上的操作或发送Email。该逻辑会在QuarkIoE事件语言中实现,是为物联网实时数据设计的高级专用域名语言。

CEP 架构

QuarkIoE事件语言是由语句组成,如下面示例所示。它们被分为部署单元,称为模块。模块可以作为QuarkIoE应用程序的一部分进行部署。它们也可以借助QuarkIoE的管理应用程序进行编辑。通过REST API,应用程序开发人员可以开发,例如,针对应用案例的友好用户特定域名业务逻辑向导程序。举例来说,一个家庭自动化应用程序开发人员可以轻松地为温度传感器和热控制器触发提供一个设定阈值的向导程序。

上图还说明了QuarkIoE的另一个特点:专门针对实时处理而发生数据的可能性。标记为"瞬时"的数据不会储存在QuarkIoE的数据库里,而只是有实时引擎处理。如果你愿意的话,这可以允许你节省储存空间和处理成本,比如,实时跟踪某些设备,但不要求数据以高分辨率持续传输。

使用实时处理的好处是什么?

QuarkIoE实时处理功能为你提供以下好处:

  • 对来自远程传感器的事件立刻进行反应。
  • 开发高度互动的物联网应用程序。
  • 直接在QuarkIoE上运行物联网应用案例,而无需软件开发;并将托管和管理留给QuarkIoE。
  • 根据你自己的业务规则,对制造的不同设备进行验证、规范化和衍生数据。
  • 基于事件触发的自动远程控制行为。
  • 使用强大的、面向数据流的业务逻辑,比如时间窗口和连接。
  • 通过只保留长期存储所需的数据,来降低设备的在线跟踪成本。

QuarkIoE的事件语言(CEL)是什么?

QuarkIoE的事件语言在语法上类似于SQL语言。在SQL上,一个语句运行时对应一个逻辑上的固定数据库,产生一个结果,然后终止。在QuarkIoE中,语句是对应输入数据流(输入事件)连续运行的,并持续计算出其输出(输出事件)。

作为示例,以下语句连续地检索高于特定温度的所有新的温度传感器读数:

                      
select *
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100
                      
                  

在这里,创建的测量值是一个数据流,包含在系统中创建的每个测量值的事件。从这些事件中选择一个已经使用 where的子集,类似于SQL。getNumber()是一个从事件读取数值的函数,并给定事件和要读取的属性。在示例中,事件是e、新建的测量值事件和"c8y\ _TemperatureMeasurement.T.value"的属性,传感器传感器的值为摄氏度,(请参见传感器库)

怎样从CEL中创建衍生数据?

存在由系统提供的特殊数据流,用来执行预定义操作(比如,将数据存储到数据库或通过Email发送数据)。一个这样的数据流是一次创建报警,可以用来存储在QuarkIoE上的报警。例如,假设当传感器的温度变得过高时,应当立即产生报警。该行为由以下语句完成。

                      
insert into CreateAlarm
select
 e.measurement.time as time,
 e.measurement.source.value as source,
 "c8y_TemperatureAlert" as type,
 "Temperature too high" as text,
 "ACTIVE" as status,
 "CRITICAL" as severity
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100
                      
                  

从技术上来说,每当温度传感器读取超过100摄氏度时,该读数会被放入"新建报警"输出数据流中,语句就会产生新的"报警已创建"事件。则Select子句中的属性名称必须与"报警已创建"的属性匹配。

怎样控制来自CEL的设备?

QuarkIoE中的远程控制只是用一种形式的衍生数据。也就是说,为了在设备上运行操作,只需创建一个针对设备的新操作。以下示例说明了基于温度读数的继电器切换:

                      
insert into CreateOperation
select
"PENDING" as status,
<<heating ID>> as deviceId,
{
"c8y_Relay.relayState", "CLOSED"
} as fragments
from MeasurementCreated e
where getNumber(e, "c8y_TemperatureMeasurement.T.value") > 100
                      
                  
  • 加热ID是触发加热功能的ID占位符。
  • 片段定义操作的嵌入内容,在这个案例中的"c8y_Relay"就是"CLOSED"

片段部分的语法是由大括号括起来的属性名称和值的列表:{?key1?, ?value1?, ?key2?, ?value2?, ...}。

怎样从CEL中查找数据?

有时,需要从QuarkIoE数据库中查询相关信息,作为正在进行事件处理的一部分。这是由一组查询方法进行支持的。下面这个例子将展示,如何计算每个小时特定客户自动售货机的销售总额。由于客户数据不是购买后触发的销售报表的直接部分,因此必须从QuarkIoE数据库中检索。

                      
create window SalesReport.win:time_batch(1 hour)  
(
    event com.cumulocity.model.event.Event,
    customer com.cumulocity.model.ManagedObject
)

insert into SalesReport
select
    e.event as event,
    findOneManagedObjectParent(e.event.source.value) as customer
from EventCreated as e

insert into CreateMeasurement
select
    "total_cust_trx",
    "customer_trx_counter",
    {
        "total", count(*),
        "customer_id", sales_report.customer.id.value
    }
from SalesReport as sales_report
group by sales_report.customer.id.value
                      
                  

在上述引用中,我们首先创建一个批处理窗口,它保存一小时的数据以进行聚合计算。我们将准备好的数据存储到此窗口中:事件将沿着事件源的上级管理对象传入。这将对应于我们售货应用程序的数据模型:销售报表将把来自自动售卖机的数据源表示为QuarkIoE上的事件。客户则被表示为自动售货机的上级管理对象。

最后,通过"插入创建测量值..."来计算销售报告的聚合,使用类似SQL语法并储存为测量值。但需要注意SQL的区别:在SQL中,你通过数据库中固定的、当前内容,进行结果计算。而在QuarkIoE事件语言中,语句可以无限使用,因此聚合必须接受时间窗口的限制。

怎样在QuarkIoE上实现实时处理?

如前所述,对QuarkIoE中的API请求有两种处理模式:持久的 and 瞬时的。"持久"模式是默认的:它将数据存储在QuarkIoE的数据库中,同时将数据发送到实时引擎。在这两个步骤完成后,QuarkIoE将请求的结果进行反馈。

"瞬时"模式是只将数据发送至实时引擎,并立即进行异步反馈。此模式用于特定数据近实时的有效监控。

举例说明,假设来自车辆的位置更新,应当是监控汽车行驶中每一秒的位置;但是为了进行报告,只在数据库中进行每分钟一次的存储。该操作是用以下的语句来完成的:

                      
insert into CreatedEvent
select * from EventCreated e
where getObject(e, "c8y_LocationUpdate") is not null
output first every 60 seconds

另一个选项是,只对下列事件的每个第60次更新进行输出:

insert into CreatedEvent
select * from EventCreated e
where getObject(e, "c8y_LocationUpdate") is not null
output first every 60 events
                      
                  

概述

QuarkIoE允许开发人员和高级用户运行实时IoT业务流程。 用户可以选择永久存储数据或临时用于生成报告、分析,然后自动删除。 整个过程不断更新。