开发者中心

高级用例

定制片段

QuarkIoEAPI可以自由定制数据结构。QuarkIoE事件语言也是同样的情况。 每个输出流可以通过自定义片段扩展。 通过在流中用键值对列表设置片段字段,可以添加片段。键是完全的JsonPath。


  {
    key1, value1,
    key2, value2,
    key3, value3
  } as fragments
                    

例子 1:


  insert into CreateMeasurement
    select
    "12345" as source,
    "c8y_TemperatureMeasurement" as type,
    current_timestamp().toDate() as time,
    {
    "c8y_TemperatureMeasurement.T1.value", 1,
    "c8y_TemperatureMeasurement.T1.unit", "C",
    "c8y_TemperatureMeasurement.T2.value", 2,
    "c8y_TemperatureMeasurement.T2.unit", "C",
    "c8y_TemperatureMeasurement.T3.value", 3,
    "c8y_TemperatureMeasurement.T3.unit", "C",
    "c8y_TemperatureMeasurement.T4.value", 4,
    "c8y_TemperatureMeasurement.T4.unit", "C",
    "c8y_TemperatureMeasurement.T5.value", 5,
    "c8y_TemperatureMeasurement.T5.unit", "C"
  } as fragments
  from EventCreated;
                  

这将产生以下的json结构:


          {
            "type": "c8y_TemperatureMeasurement",
            "time": "...",
            "source": {
            "id": "12345"
          },
            "c8y_TemperatureMeasurement": {
            "T1": {
            "value": 1,
            "unit": "C"
          },
            "T2": {
            "value": 1,
            "unit": "C"
          },
            "T3": {
            "value": 1,
            "unit": "C"
          },
            "T4": {
            "value": 1,
            "unit": "C"
          },
            "T5": {
            "value": 1,
            "unit": "C"
          },
      }
  }
               

例子 2:


  insert into CreateManagedObject
    select
    "MyCustomDevice" as name,
    "customDevice" as type,
    {
    "c8y_IsDevice", {},
    "c8y_SupportedOperations", {"c8y_Restart", "c8y_Command"},
    "c8y_Hardware.serialNumber", "mySerialNumber",
    "c8y_Hardware.model", "myDeviceModel",
    "com_cumulocity_model_Agent", {},
    "c8y_RequiredAvailability.responseInterval", 30
  } as fragments
  from EventCreated e;
               

这将产生以下的json结构:


{
  "name": "MyCustomDevice",
  "type": "customDevice",
  "c8y_IsDevice": {},
  "c8y_RequiredAvailability": {
  "responseInterval": 30
},
"c8y_SupportedOperations": [
"c8y_Restart",
"c8y_Command"
],
"com_cumulocity_model_Agent": {},
"c8y_Hardware": {
"model": "myDeviceModel",
"serialNumber": "mySerialNumber"
}
}
              

高级触发器

事件到达不是流中触发语句的唯一可能。 下面的章节将涵盖其他方法来触发语句和组合触发器。

模式匹配

模式匹配使你能够按其他触发器的组合或顺序触发。 如果你有这样的一个触发器


  from EventCreated e;
              

该功能与下面使用模式匹配的触发器相同


  from pattern [every e=EventCreated];
              

也可以在模式匹配中添加过滤


  from pattern [every e=EventCreated(event.type = "myEventType")];
              

可以通过加入流触发


  from EventCreated e unidirectional, AlarmCreated.std:lastevent() a
  where e.event.source = a.alarm.source;
              

这将在每一个EventCreated (通过关键字unidirectional定义) 触发并且如果来自同一个设备将添加最新的AlarmCreated。

注意: 不会添加设备最新的AlarmCreated,而是添加来自同一设备的最新的AlarmCreated整体

也可以通过顺序触发


  from pattern[every (e=EventCreated -> a=AlarmCreated(alarm.source = e.event.source))];
              

每对 EventCreated 随后 AlarmCreated 时触发。 在EventCreated到达时开始,同一设备的AlarmCreated时触发。 之后就要等待下一个EventCreated。

定时器

除了使用流触发语句,也可以使用定时器触发。 可以设定一个特定的时间间隔触发


  from pattern [timer:interval(5 minutes)];
                

或者作为一个cron任务


        // timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
        // minutes: 0-59
        // hours: 0-23
        // daysOfMonth: 1-31
        // month: 1-12
        // daysOfWeek:     0 (Sunday) - 6 (Saturday)
        // seconds: 0-59

        from pattern [timer:at(*, *, *, *, *)]; // trigger every minute
        from pattern [timer:at(*, *, *, *, *, *)]; // trigger every second
        from pattern [timer:at(*/10, *, *, *)]; // trigger every 10 minutes
        from pattern [timer:at(0, 1, *, *, [1,3,5])]; // trigger at 1am every monday, wednesday and friday
        from pattern [timer:at(0, */2, (1-7), *, *)]; // trigger every 2 hours on every day in the first week of every month
               

也可以组合定时器模式匹配和其他模式匹配。 例如,您可以检查在某个特定的时间内是否有一个事件在另一个事件之后


  from pattern [every e=EventCreated -> (timer:interval(10 minutes) and not a=AlarmCreated)];
               

产生EventCreated并且10分钟内没有AlarmCreated将触发。

输出

输出让你不必计算流中的每个事件而是直接控制需要的事件。如果每10秒产生一个测量值并且你想用它计算,可能不必计算所有的测量值而只是一个子集。


    // will output the last measurement arrived every 1 minute
    from MeasurementCreated e
    where e.measurement.type = "c8y_TemperatureMeasurement"
    output last every 1 minutes;

    // will output the first of every 20 measurements arriving
    from MeasurementCreated e
    where e.measurement.type = "c8y_TemperatureMeasurement"
    output first every 20 events;

    // will output all 20 measurements after the 20th arrived
    from MeasurementCreated e
    where e.measurement.type = "c8y_TemperatureMeasurement"
    output every 20 events;
                

如果需要考虑所有的测量,例如你想计算测量的和但不想每个新测量值到来时都更新。


    select
    sum(getNumber(e, "myCustomMeasurement.mySeries.value")),
    last(*)
    from MeasurementCreated e
    where e.measurement.type = "myCustomMeasurement"
    output last every 50 events;
                

语句每50个测量值输出总和 (所有测量值的,因为不止50) 和最新的测量值。

事件窗口

事件窗口用于在一个流中批量处理多个事件以备进一步分析。 主要有两种方法来创建窗口:

  1. 规定时间长度的窗口

     
         select
           avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
           last(*)
           from MeasurementCreated.win:time(1 hours) e
           where e.measurement.type = "myCustomMeasurement";
                        
    
        select
           avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
           last(*)
           from MeasurementCreated.win:time(1 hours) e
           where e.measurement.type = "myCustomMeasurement"
           output last every 1 hours;
                          

两个语句的区别是,第一个在每个MeasurementCreated触发并输出最后一个小时的平均值。 第二个语句每小时只触发一次并只输出最后的平均值 (收到最后的MeasurementCreated计算)。

  1. 规定事件数量的窗口

     
         select
           avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
           last(*)
           from MeasurementCreated.win:length(100) e
           where e.measurement.type = "myCustomMeasurement";
                            
     
         select
           avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
           last(*)
           from MeasurementCreated.win:length(100) e
           where e.measurement.type = "myCustomMeasurement"
           output last every 100 events;
                               

窗口也可以全局声明


  create window MeasurementCreated.win:length(20) as MyMeasurementWindow;

  select
  avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
  last(*)
  from MyMeasurementWindow e
  where e.measurement.type = "myCustomMeasurement";
                      

声明一个窗口也给你清除窗口的可能


  on AlarmCreated delete from MyMeasurementWindow
                      

创建自定义流

创建复杂模块不可能在单一语句完成。 而QuarkIoE已经提供了一些预定义流,可以创建额外的流以控制事件流。 不需要定义一个流。如果使用未知的流名称,将带着输入自动创建和定义。


  insert into MyEvent
    select
    e.event as e
    from EventCreated e;

    select e.type from MyEvent e;
                        

如果现在尝试添加


  insert into MyEvent
    select
    e as e
    from AlarmCreated e;
                        

你将无法部署语句,因为流MyEvent已经通过变量 e 声明。 此语句试图设置类型AlarmCreated的值为 e。

也可以显式创建新流


  create schema MyEvent(
    e Event
    );
                        

一般语法是


    create schema StreamName(
      var1Name var1Type,
      var2Name var2Type,
      var3Name var3Type
      );
                        

可以使用每一个基本Java数据类型,导入的数据类型 Java 库, QuarkIoE数据类型 (如 Event, Measurement, ManagedObject, ...) 和其他流。


  create schema TwoMyEvents(
    firstEvent MyEvent,
    secondEvent MyEvent
    );
                        

注意: 流名称是唯一的,一旦声明 (不论隐式或显式) 流就在你所有模块可以

创建自己的函数

如果想做比合计和平均值之类更复杂的计算,你可以创建自己的辅助函数和表达式。 可以使用JavaScript脚本语言编写函数。也可以使用importClass导入Java类到表达式。

例子:

提升给定的严重级别 (使用 JavaScript):


      create expression CumulocitySeverities js:increaseSeverity(severity) [
        importClass (com.cumulocity.model.event.CumulocitySeverities);
        if(severity == CumulocitySeverities.WARNING) {
        CumulocitySeverities.MINOR;
      } else if(severity == CumulocitySeverities.MINOR) {
      CumulocitySeverities.MAJOR;
    } else if(severity == CumulocitySeverities.MAJOR) {
    CumulocitySeverities.CRITICAL;
  } else {
  severity
}
];
                  

计算两个地理坐标之间的距离:


  create expression distance(lat1, lon1, lat2, lon2) [
    var R = 6371000;
    var toRad = function(arg) {
    return arg * Math.PI / 180;
  };
  var lat1Rad = toRad(lat1);
  var lat2Rad = toRad(lat2);
  var deltaLatRad = toRad(lat2-lat1);
  var deltaLonRad = toRad(lon2-lon1);

  var a = Math.sin(deltaLatRad/2) * Math.sin(deltaLatRad/2) +
  Math.cos(lat1Rad) * Math.cos(lat2Rad) * Math.sin(deltaLonRad/2) * Math.sin(deltaLonRad/2);

  var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));

  var d = R * c;
  d;
  ];
                

变量

你可以在模块中定义变量


  create variable String myEmailText = "Hello World";
    create variable List supportedOperationsList = cast({"c8y_Restart", "c8y_Relay"}, java.util.List);
                

还可以在运行时动态更改变量值


  create variable String latestEventType;

    on EventCreated e set latestEventType = e.event.type;
                

上下文

上下文可以对基于定义的值事件处理和排序。 如果想创建测量值的计算,你通常希望这是做所有有此测量值设备的计算,更重要的是每个设备单独计算。

举个例子


  select
    avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
    last(*)
    from MeasurementCreated.win:length(100) e
    where e.measurement.type = "myCustomMeasurement";
                  

对单个设备运行很好。但只要有两个设备计算就会在两个设备上进行,因两个设备都有MeasurementCreated。 语句不知道如何通过设备来区分测量值。 创建一个上下文让语句可以找到区分传入事件的信息。


  create context DeviceAwareContext
    partition by measurement.source.value from MeasurementCreated;
                  

这个上下文定义在流MeasurementCreated中声明,上下文键(通过它区分事件) 是 measurement.source.value ,这是设备ID。

现在我们可以在语句中添加上下文


  context DeviceAwareContext
    select
    avg(getNumber(e, "myCustomMeasurement.mySeries.value")),
    last(*)
    from MeasurementCreated.win:length(100) e
    where e.measurement.type = "myCustomMeasurement";
                  

现在平均值是每个设备分别计算。

上下文只能应用于有一个在上下文中声明的输入的语句。 如果你有多个语句,需要上下文感知,并有不同的输入,你需要在上下文中配置每个输入,以及在哪里找到上下文键。


  create context DeviceAwareContext
    partition by
    measurement.source.value from MeasurementCreated,
    alarm.source.value from AlarmCreated,
    event.source.value from EventCreated,
    operation.deviceId.value from OperationCreated;
                    

你还可以创建多个值的上下文键


  create context DeviceAwareContext
    partition by measurement.source.value and measurement.type from MeasurementCreated;
                    

这种上下文不仅会为每个设备创建一个自己的分区,也将为每个测量类型创建一个自己的分区。