-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Based on the actual usage scenarios of customers, write customized MQTT message examples. #16674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,38 +21,57 @@ | |
|
|
||
| import org.apache.iotdb.db.protocol.mqtt.Message; | ||
| import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; | ||
| import org.apache.iotdb.db.protocol.mqtt.TreeMessage; | ||
| import org.apache.iotdb.db.protocol.mqtt.TableMessage; | ||
|
|
||
| import com.google.common.collect.Lists; | ||
| import com.google.gson.Gson; | ||
| import com.google.gson.GsonBuilder; | ||
| import com.google.gson.JsonArray; | ||
| import com.google.gson.JsonElement; | ||
| import com.google.gson.JsonObject; | ||
| import com.google.gson.JsonParseException; | ||
| import io.netty.buffer.ByteBuf; | ||
| import org.apache.tsfile.enums.TSDataType; | ||
| import org.apache.tsfile.external.commons.lang3.NotImplementedException; | ||
| import org.apache.tsfile.utils.Binary; | ||
| import org.apache.tsfile.utils.Pair; | ||
|
|
||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * The Customized JSON payload formatter. one json format supported: { "time":1586076045523, | ||
| * "deviceID":"car_1", "deviceType":"new energy vehicle", "point":"velocity", "value":80.0 } | ||
| */ | ||
| public class CustomizedJsonPayloadFormatter implements PayloadFormatter { | ||
| private static final String JSON_KEY_TIME = "time"; | ||
| private static final String JSON_KEY_DEVICEID = "deviceID"; | ||
| private static final String JSON_KEY_DEVICETYPE = "deviceType"; | ||
| private static final String JSON_KEY_POINT = "point"; | ||
| private static final String JSON_KEY_VALUE = "value"; | ||
| private static final Gson GSON = new GsonBuilder().create(); | ||
|
|
||
| @Override | ||
| public List<Message> format(String topic, ByteBuf payload) { | ||
| // Suppose the payload is a json format | ||
| if (payload == null) { | ||
| return Collections.emptyList(); | ||
| return new ArrayList<>(); | ||
| } | ||
|
Comment on lines
57
to
59
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Explain this change |
||
|
|
||
| // parse data from the json and generate Messages and put them into List<Message> ret | ||
| List<Message> ret = new ArrayList<>(); | ||
| // this is just an example, so we just generate some Messages directly | ||
| for (int i = 0; i < 2; i++) { | ||
| long ts = i; | ||
| TreeMessage message = new TreeMessage(); | ||
| message.setDevice("d" + i); | ||
| message.setTimestamp(ts); | ||
| message.setMeasurements(Arrays.asList("s1", "s2")); | ||
| message.setValues(Arrays.asList("4.0" + i, "5.0" + i)); | ||
| ret.add(message); | ||
| String txt = payload.toString(StandardCharsets.UTF_8); | ||
| JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class); | ||
| if (jsonElement.isJsonObject()) { | ||
| JsonObject jsonObject = jsonElement.getAsJsonObject(); | ||
| return formatTableRow(topic, jsonObject); | ||
| } else if (jsonElement.isJsonArray()) { | ||
| JsonArray jsonArray = jsonElement.getAsJsonArray(); | ||
| List<Message> messages = new ArrayList<>(); | ||
| for (JsonElement element : jsonArray) { | ||
| JsonObject jsonObject = element.getAsJsonObject(); | ||
| messages.addAll(formatTableRow(topic, jsonObject)); | ||
| } | ||
| return messages; | ||
| } | ||
| return ret; | ||
| throw new JsonParseException("payload is invalidate"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May provide the txt or the jsonElement. |
||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -61,14 +80,109 @@ public List<Message> format(ByteBuf payload) { | |
| throw new NotImplementedException(); | ||
| } | ||
|
|
||
| private List<Message> formatTableRow(String topic, JsonObject jsonObject) { | ||
| TableMessage message = new TableMessage(); | ||
| String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); | ||
| String table = "test_table"; | ||
|
Comment on lines
+85
to
+86
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May parse the table name from the topic? |
||
|
|
||
| // Parsing Database Name | ||
| message.setDatabase((database)); | ||
|
|
||
| // Parsing Table Name | ||
| message.setTable(table); | ||
|
|
||
| // Parsing Tags | ||
| List<String> tagKeys = new ArrayList<>(); | ||
| tagKeys.add(JSON_KEY_DEVICEID); | ||
| List<Object> tagValues = new ArrayList<>(); | ||
| tagValues.add( | ||
| new Binary[] { | ||
| new Binary( | ||
| (jsonObject.get(JSON_KEY_DEVICEID).getAsString()).getBytes(StandardCharsets.UTF_8)) | ||
| }); | ||
| message.setTagKeys(tagKeys); | ||
| message.setTagValues(tagValues); | ||
|
|
||
| // Parsing Attributes | ||
| List<String> attributeKeys = new ArrayList<>(); | ||
| List<Object> attributeValues = new ArrayList<>(); | ||
| attributeKeys.add(JSON_KEY_DEVICETYPE); | ||
| attributeValues.add( | ||
| new Binary[] { | ||
| new Binary( | ||
| (jsonObject.get(JSON_KEY_DEVICETYPE).getAsString()).getBytes(StandardCharsets.UTF_8)) | ||
| }); | ||
| message.setAttributeKeys(attributeKeys); | ||
| message.setAttributeValues(attributeValues); | ||
|
|
||
| // Parsing Fields | ||
| List<String> fields = new ArrayList<>(); | ||
| List<TSDataType> dataTypes = new ArrayList<>(); | ||
| List<Object> values = new ArrayList<>(); | ||
| fields.add(JSON_KEY_POINT); | ||
| dataTypes.add(TSDataType.STRING); | ||
| values.add( | ||
| new Binary[] { | ||
| new Binary( | ||
| (jsonObject.get(JSON_KEY_POINT).getAsString()).getBytes(StandardCharsets.UTF_8)) | ||
| }); | ||
| fields.add(JSON_KEY_VALUE); | ||
| Pair<TSDataType, Object> typeAndValue = | ||
| analyticValue(jsonObject.get(JSON_KEY_VALUE).getAsString()); | ||
| values.add(typeAndValue.getRight()); | ||
| dataTypes.add(typeAndValue.getLeft()); | ||
|
|
||
| message.setFields(fields); | ||
| message.setDataTypes(dataTypes); | ||
| message.setValues(values); | ||
|
|
||
| // Parsing timestamp | ||
| message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong()); | ||
| return Lists.newArrayList(message); | ||
| } | ||
|
|
||
| private Pair<TSDataType, Object> analyticValue(String value) { | ||
| if (value.startsWith("\"") && value.endsWith("\"")) { | ||
| // String | ||
| return new Pair<>( | ||
| TSDataType.TEXT, | ||
| new Binary[] { | ||
| new Binary(value.substring(1, value.length() - 1).getBytes(StandardCharsets.UTF_8)) | ||
| }); | ||
| } else if (value.equalsIgnoreCase("t") | ||
| || value.equalsIgnoreCase("true") | ||
| || value.equalsIgnoreCase("f") | ||
| || value.equalsIgnoreCase("false")) { | ||
| // boolean | ||
| return new Pair<>( | ||
| TSDataType.BOOLEAN, | ||
| new boolean[] {value.equalsIgnoreCase("t") || value.equalsIgnoreCase("true")}); | ||
| } else if (value.endsWith("f")) { | ||
| // float | ||
| return new Pair<>( | ||
| TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0, value.length() - 1))}); | ||
| } else if (value.endsWith("i32")) { | ||
| // int | ||
| return new Pair<>( | ||
| TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0, value.length() - 3))}); | ||
| } else if (value.endsWith("u") || value.endsWith("i")) { | ||
| // long | ||
| return new Pair<>( | ||
| TSDataType.INT64, new long[] {Long.parseLong(value.substring(0, value.length() - 1))}); | ||
| } else { | ||
| // double | ||
| return new Pair<>(TSDataType.DOUBLE, new double[] {Double.parseDouble(value)}); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String getName() { | ||
| // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: | ||
| return "CustomizedJson"; | ||
| return "CustomizedJson2Table"; | ||
| } | ||
|
|
||
| @Override | ||
| public String getType() { | ||
| return PayloadFormatter.TREE_TYPE; | ||
| return PayloadFormatter.TABLE_TYPE; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iotdb.mqtt.server; | ||
|
|
||
| import org.apache.iotdb.db.protocol.mqtt.Message; | ||
| import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter; | ||
| import org.apache.iotdb.db.protocol.mqtt.TableMessage; | ||
|
|
||
| import io.netty.buffer.ByteBuf; | ||
| import org.apache.commons.lang3.NotImplementedException; | ||
| import org.apache.tsfile.enums.TSDataType; | ||
|
|
||
| import java.nio.charset.StandardCharsets; | ||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
|
|
||
| public class CustomizedLinePayloadFormatter implements PayloadFormatter { | ||
|
|
||
| @Override | ||
| public List<Message> format(String topic, ByteBuf payload) { | ||
| // Suppose the payload is a line format | ||
| if (payload == null) { | ||
| return null; | ||
| } | ||
|
Comment on lines
+40
to
+42
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to keep consistency between examples. |
||
|
|
||
| String line = payload.toString(StandardCharsets.UTF_8); | ||
| // parse data from the line and generate Messages and put them into List<Meesage> ret | ||
| List<Message> ret = new ArrayList<>(); | ||
| // this is just an example, so we just generate some Messages directly | ||
| for (int i = 0; i < 3; i++) { | ||
|
Comment on lines
+44
to
+48
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The line should still be used, otherwise, this examples means nothing. |
||
| long ts = i; | ||
| TableMessage message = new TableMessage(); | ||
|
|
||
| // Parsing Database Name | ||
| message.setDatabase("db" + i); | ||
|
|
||
| // Parsing Table Names | ||
| message.setTable("t" + i); | ||
|
|
||
| // Parsing Tags | ||
| List<String> tagKeys = new ArrayList<>(); | ||
| tagKeys.add("tag1" + i); | ||
| tagKeys.add("tag2" + i); | ||
| List<Object> tagValues = new ArrayList<>(); | ||
| tagValues.add("t_value1" + i); | ||
| tagValues.add("t_value2" + i); | ||
| message.setTagKeys(tagKeys); | ||
| message.setTagValues(tagValues); | ||
|
|
||
| // Parsing Attributes | ||
| List<String> attributeKeys = new ArrayList<>(); | ||
| List<Object> attributeValues = new ArrayList<>(); | ||
| attributeKeys.add("attr1" + i); | ||
| attributeKeys.add("attr2" + i); | ||
| attributeValues.add("a_value1" + i); | ||
| attributeValues.add("a_value2" + i); | ||
| message.setAttributeKeys(attributeKeys); | ||
| message.setAttributeValues(attributeValues); | ||
|
|
||
| // Parsing Fields | ||
| List<String> fields = Arrays.asList("field1" + i, "field2" + i); | ||
| List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT); | ||
| List<Object> values = Arrays.asList("4.0" + i, "5.0" + i); | ||
| message.setFields(fields); | ||
| message.setDataTypes(dataTypes); | ||
| message.setValues(values); | ||
|
|
||
| //// Parsing timestamp | ||
| message.setTimestamp(ts); | ||
| ret.add(message); | ||
| } | ||
| return ret; | ||
| } | ||
|
|
||
| @Override | ||
| @Deprecated | ||
| public List<Message> format(ByteBuf payload) { | ||
| throw new NotImplementedException(); | ||
| } | ||
|
|
||
| @Override | ||
| public String getName() { | ||
| // set the value of mqtt_payload_formatter in iotdb-system.properties as the following string: | ||
| return "CustomizedLine"; | ||
| } | ||
|
|
||
| @Override | ||
| public String getType() { | ||
| return PayloadFormatter.TABLE_TYPE; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this example include both the tree model and the table model? It is recommended to write about both