开发者中心

示例

按小时计算测量值平均值

我们假设输入数据如下:

                
{
    "c8y_TemperatureMeasurement": {
    "T": {
    "value": ...,
    "unit": "C"
      }
    },
    "time":"...",
    "source": {
    "id":"..."
  },
  "type": "c8y_TemperatureMeasurement"
}
               
            

要创建平均值,模块中需要以下部分:

  • 正确分离每个设备的上下文
  • 值为一小时的时间窗口
  • 只返回每小时最后一个平均计算的输出
  • 一切都作为新测量值创建

模块:

                 
      create context HourlyAvgMeasurementDeviceContext
      partition measurement.source.value from MeasurementCreated;

      @Name("Creating_hourly_measurement")
      context HourlyAvgMeasurementDeviceContext
      insert into CreateMeasurement
      select
      m.measurement.source as source,
      current_timestamp().toDate() as time,
      "c8y_AverageTemperatureMeasurement" as type,
      {
      "c8y_AverageTemperatureMeasurement.T.value", avg(cast(getNumber(m, "c8y_TemperatureMeasurement.T.value"), double)),
      "c8y_AverageTemperatureMeasurement.T.unit", getString(m, "c8y_TemperatureMeasurement.T.unit")
    } as fragments
    from MeasurementCreated.win:time(1 hours) m
    where getObject(m, "c8y_TemperatureMeasurement") is not null
    output last every 1 hours;
                
           

如果操作未执行生成报警

设备处理时,操作通常按固定顺序执行

  • PENDING (创建后)
  • EXECUTING (设备收到操作并开始处理)
  • SUCCESSFUL 或 FAILED (取决于执行结果)

一个操作在一定事件内没有 SUCCESSFUL 或 FAILED 通常表示出现了问题 (例如,设备失去连接或设备处理时卡住)。 即使操作未成功处理,设备也应更新操作为FAILED。 对于本例,我们采用10分钟作操作处理持续的时间。 我们将检查以下顺序:

  • OperationCreated
  • OperationUpdated, 10分钟内对同一个操作设置状态为 SUCCESSFUL 或 FAILED

如果第二部分没有出现,我们将创建一个新的报警

                  
     @Name("handle_not_finished_operation")
      insert into CreateAlarm  
      select
      o.operation.deviceId as source,
      CumulocitySeverities.MAJOR as severity,
      CumulocityAlarmStatuses.ACTIVE as status,
      "c8y_OperationNotFinishedAlarm" as type,
      current_timestamp().toDate() as time,
      replaceAllPlaceholders("The device has not finished the operation {id} within 10 minutes", o.operation) as text
      from pattern [
      every o = OperationCreated
      -> (timer:interval(10 minutes)
      and not OperationUpdated(
      operation.id.value = o.operation.id.value
      and (operation.status in (OperationStatus.SUCCESSFUL, OperationStatus.FAILED))
      ))
      ];
                  
              

根据位测量值创建报警

设备通常把报警状态保存在记录器上,但设备本身并不知道记录器的含义。 本例中,我们假定设备在测量值中只发送二进制形式的整个记录器,规则可以分辨位含义并创建相应的报警。

我们为每个位创建三个表达式来处理报警文本,类型和严重级别。


create expression String getFaultRegisterAlarmType(position) [
        switch (position) {
        case 0:
        "c8y_HighTemperatureAlarm";
        break;
        case 1:
        "c8y_ProcessingAlarm";
        break;
        case 2:
        "c8y_DoorOpenAlarm";
        break;
        case 3:
        "c8y_SystemFailureAlarm";
        break;
        default:
        "c8y_FaultRegister" + position + "Alarm";
        break;
      };
      ];

      create expression CumulocitySeverities getFaultRegisterAlarmSeverity(position) [
      importClass(com.cumulocity.model.event.CumulocitySeverities);
      switch (position) {
      case 0:
      CumulocitySeverities.MAJOR;
      break;
      case 1:
      CumulocitySeverities.WARNING;
      break;
      case 2:
      CumulocitySeverities.MINOR;
      break;
      case 3:
      CumulocitySeverities.CRITICAL;
      break;
      default:
      CumulocitySeverities.MAJOR;
      break;
    };
    ];

    create expression String getFaultRegisterAlarmText(position)[
    switch(position) {
    case 0:
    "The machine temperature reached a critical status";
    break;
    case 1:
    "There was an error trying to process data";
    break;
    case 2:
    "Door was opened";
    break;
    case 3:
    "There was a critical system failure";
    break;
    default:
    "An undefined alarm was reported on position " || position || " in the binary fault register";
    break;
  };
  ];
              

为了分析二进制测量值,我们将把它解释为一个字符串值,并通过每个字符循环。 The getActiveBits() 函数完成上面的功能并返回测量值有"1"的位的位置列表。 它不返回 List ,而是返回 List ,Map结构匹配 BitPosition 模式,因此我们当作流处理。 这让我们可以加入流,不仅仅可以通过测量值也可以通过测量值和列表中所有元素组合触发报警。


create schema BitPosition(
  position int
  );

  create schema MeasurementWithBinaryFaultRegister(
  measurement Measurement,
  faultRegister String
  );

  create expression Collection getActiveBits(value) [
  importPackage(java.util);
  var bitOnNumbers = new ArrayList();
  var size = value.length;
  for(var no = 0; no < size; no++) {
  if(value.charAt(no) == "1") {
  bitOnNumbers.add(Collections.singletonMap('position', size - no - 1));
}
}
bitOnNumbers;
];

@Name("extract_fault_register")
insert into MeasurementWithBinaryFaultRegister
select
m.measurement as measurement,
getString(m, "c8y_BinaryFaultRegister.errors.value") as faultRegister
from MeasurementCreated m
where getObject(m, "c8y_BinaryFaultRegister") is not null;

@Name("creating_alarm")
insert into CreateAlarm
select
m.measurement.source as source,
getFaultRegisterAlarmSeverity(bit.position) as severity,
CumulocityAlarmStatuses.ACTIVE as status,
m.measurement.time as time,
getFaultRegisterAlarmType(bit.position) as type,
getFaultRegisterAlarmText(bit.position) as text
from
MeasurementWithBinaryFaultRegister m unidirectional,
MeasurementWithBinaryFaultRegister[getActiveBits(faultRegister)@type(BitPosition)] as bit;
                 

创建如下测量值


{
  "c8y_BinaryFaultRegister": {
  "errors": {
  "value": 10110
}
},
"time":"...",
"source": {
"id":"..."
},
"type": "c8y_BinaryFaultRegister"
}
                  

将触发最后一个语句三次

  • measurement and bit position 1
  • measurement and bit position 2
  • measurement and bit position 4

因此创建三个报警。

消耗测量值

假设我们有一个传感器测量某个物品的当前的填充水平并定期发送的数值到QuarkIoE,我们可以很容易地计算额外的消耗数值。 计算两个测量值之间的绝对差值是有用的,但它只在测量间隔相同才会有明确含义。 因此,我们将把绝对值的差异关联到时间的差异,按每小时计算消耗数值。

我们将比较一个设备的(需要上下文)两个相邻的测量值和时间差。


create schema FillLevelMeasurement(
    measurement Measurement,
    value double
    );

    create schema AdjacentFillLevelMeasurements(
    firstValue double,
    lastValue double,
    firstTime Date,
    lastTime Date,
    source String
    );

    create context ConsumptionMeasurementDeviceContext
    partition measurement.source.value from FillLevelMeasurement;

    create expression double calculateConsumption(firstValue, lastValue, firstTime, lastTime) [
    if (lastTime == firstTime) {
    0;
  } else {
  ((firstValue - lastValue) * 3600000) / (lastTime - firstTime);
}
];

@Name("filter_fill_level_measurements")
insert into FillLevelMeasurement
select
m.measurement as measurement,
cast(getNumber(m, "c8y_WaterTankFillLevel.level.value"), double) as value
from MeasurementCreated m
where getObject(m, "c8y_WaterTankFillLevel") is not null;

@Name("combine_two_latest_measurements")
context ConsumptionMeasurementDeviceContext
insert into AdjacentFillLevelMeasurements
select
first(m.value) as firstValue,
first(m.measurement.time) as firstTime,
last(m.value) as lastValue,
last(m.measurement.time) as lastTime,
context.key1 as source
from FillLevelMeasurement.win:length(2) m;

@Name("create_consumption_measurement")
insert into CreateMeasurement
select
m.lastTime as time,
m.source as source,
"c8y_HourlyWaterConsumption" as type,
{
  "c8y_HourlyWaterConsumption.consumption.value", calculateConsumption(m.firstValue, m.lastValue, m.firstTime.toMillisec(), m.lastTime.toMillisec()),
  "c8y_HourlyWaterConsumption.consumption.unit", "l/h"
} as fragments
from AdjacentFillLevelMeasurements m;