Skip to content

LogicCore(data stream & event processing)

KumKeeHyun edited this page Nov 18, 2020 · 8 revisions

Table of contents

Stream Processing

image Kafka Consumer is implemented as a consumerGroup in Shopify/sarama. One stream service consumes a single goroutine, but by running multiple services, the data of a topic can be distributed.

Data Enrichment

Consumed from Kafka Schema

{
  "sensor_id": int,
  "node_id": int,
  "values": [
    value1,
    value2,
    ...
  ],
  "timestamp": string // format: YYYY-MM-DD HH-MM-SS
}

After Enrichment Schema

{
  "sensor_id": int,
  "sensor_name": string,
  "values": {
    "key1": value1,
    "key2": value2,
    ...
  },
  "node": {
    "name": string,
    "location": {
      "lat": float64,
      "lon": float64
    },
    "sink_name": stirng
  },
  "timestamp": date
}

Logic

A logic consists of a list of elems.

"elems": [
  {
    "elem": string,
    "Arg" : {
      object(argument)
    }
  }
]

Currently supported Elems are "value", "time" and "email".

Filter

Value

{
  "value": string,
  "range": [
    {"min": float64, "max": float64},
    ...
  ]
}

Time

{
  "range": [
    {"start": string, "end": string}, // format: HH:MM:SS
    ...
  ]
}

Action

Email

{
  "text": string // email address
}

Build Logic

Please check 'toiot/logic-core/logicService/core.go', 'toiot/logic-core/logicService/logic'

Microservice

Maintain Registration Info in Stream Service

It is inefficient to send a request to the Application service to get metadata of sensor data for each consumed in Kafka. Therefore, metadata for sensor data to be consumed must be separately stored in memory(stream service).

When the stream service starts, the service fetches the registration info corresponding to the kafka topic through 'GET /event' request. This request allows the Application service to start tracking the stream service. The result of the request is a list of sink nodes. The joined sub-information includes sensor node info and logic event info. Information registered afterwards is delivered in real time through Restful API. The API includes deleted sink, created and deleted node, created and deleted logic and deleted sensor events. (toiot/logic-core/main.go)

Registration Event in Application Service

The application service must pass event about a specific event to the running stream service. Call Rest API by selecting a stream service that needs to deliver information according to specific registration and deletion events. Stream services that have failed Rest API are managed as a list and retry separately afterwards. If the second request also fails, it is determined that the stream service is terminated and the information is deleted in Application service. (toiot/application/usecase/eventUsecase)