Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this example include both the tree model and the table model? It is recommended to write about both

Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,57 @@

import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
import org.apache.iotdb.db.protocol.mqtt.TableMessage;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import io.netty.buffer.ByteBuf;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.external.commons.lang3.NotImplementedException;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* The Customized JSON payload formatter. one json format supported: { "time":1586076045523,
* "deviceID":"car_1", "deviceType":"new energy vehicle", "point":"velocity", "value":80.0 }
*/
public class CustomizedJsonPayloadFormatter implements PayloadFormatter {
private static final String JSON_KEY_TIME = "time";
private static final String JSON_KEY_DEVICEID = "deviceID";
private static final String JSON_KEY_DEVICETYPE = "deviceType";
private static final String JSON_KEY_POINT = "point";
private static final String JSON_KEY_VALUE = "value";
private static final Gson GSON = new GsonBuilder().create();

@Override
public List<Message> format(String topic, ByteBuf payload) {
// Suppose the payload is a json format
if (payload == null) {
return Collections.emptyList();
return new ArrayList<>();
}
Comment on lines 57 to 59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain this change


// parse data from the json and generate Messages and put them into List<Message> ret
List<Message> ret = new ArrayList<>();
// this is just an example, so we just generate some Messages directly
for (int i = 0; i < 2; i++) {
long ts = i;
TreeMessage message = new TreeMessage();
message.setDevice("d" + i);
message.setTimestamp(ts);
message.setMeasurements(Arrays.asList("s1", "s2"));
message.setValues(Arrays.asList("4.0" + i, "5.0" + i));
ret.add(message);
String txt = payload.toString(StandardCharsets.UTF_8);
JsonElement jsonElement = GSON.fromJson(txt, JsonElement.class);
if (jsonElement.isJsonObject()) {
JsonObject jsonObject = jsonElement.getAsJsonObject();
return formatTableRow(topic, jsonObject);
} else if (jsonElement.isJsonArray()) {
JsonArray jsonArray = jsonElement.getAsJsonArray();
List<Message> messages = new ArrayList<>();
for (JsonElement element : jsonArray) {
JsonObject jsonObject = element.getAsJsonObject();
messages.addAll(formatTableRow(topic, jsonObject));
}
return messages;
}
return ret;
throw new JsonParseException("payload is invalidate");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May provide the txt or the jsonElement.

}

@Override
Expand All @@ -61,14 +80,109 @@ public List<Message> format(ByteBuf payload) {
throw new NotImplementedException();
}

private List<Message> formatTableRow(String topic, JsonObject jsonObject) {
TableMessage message = new TableMessage();
String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/"));
String table = "test_table";
Comment on lines +85 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May parse the table name from the topic?


// Parsing Database Name
message.setDatabase((database));

// Parsing Table Name
message.setTable(table);

// Parsing Tags
List<String> tagKeys = new ArrayList<>();
tagKeys.add(JSON_KEY_DEVICEID);
List<Object> tagValues = new ArrayList<>();
tagValues.add(
new Binary[] {
new Binary(
(jsonObject.get(JSON_KEY_DEVICEID).getAsString()).getBytes(StandardCharsets.UTF_8))
});
message.setTagKeys(tagKeys);
message.setTagValues(tagValues);

// Parsing Attributes
List<String> attributeKeys = new ArrayList<>();
List<Object> attributeValues = new ArrayList<>();
attributeKeys.add(JSON_KEY_DEVICETYPE);
attributeValues.add(
new Binary[] {
new Binary(
(jsonObject.get(JSON_KEY_DEVICETYPE).getAsString()).getBytes(StandardCharsets.UTF_8))
});
message.setAttributeKeys(attributeKeys);
message.setAttributeValues(attributeValues);

// Parsing Fields
List<String> fields = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
List<Object> values = new ArrayList<>();
fields.add(JSON_KEY_POINT);
dataTypes.add(TSDataType.STRING);
values.add(
new Binary[] {
new Binary(
(jsonObject.get(JSON_KEY_POINT).getAsString()).getBytes(StandardCharsets.UTF_8))
});
fields.add(JSON_KEY_VALUE);
Pair<TSDataType, Object> typeAndValue =
analyticValue(jsonObject.get(JSON_KEY_VALUE).getAsString());
values.add(typeAndValue.getRight());
dataTypes.add(typeAndValue.getLeft());

message.setFields(fields);
message.setDataTypes(dataTypes);
message.setValues(values);

// Parsing timestamp
message.setTimestamp(jsonObject.get(JSON_KEY_TIME).getAsLong());
return Lists.newArrayList(message);
}

private Pair<TSDataType, Object> analyticValue(String value) {
if (value.startsWith("\"") && value.endsWith("\"")) {
// String
return new Pair<>(
TSDataType.TEXT,
new Binary[] {
new Binary(value.substring(1, value.length() - 1).getBytes(StandardCharsets.UTF_8))
});
} else if (value.equalsIgnoreCase("t")
|| value.equalsIgnoreCase("true")
|| value.equalsIgnoreCase("f")
|| value.equalsIgnoreCase("false")) {
// boolean
return new Pair<>(
TSDataType.BOOLEAN,
new boolean[] {value.equalsIgnoreCase("t") || value.equalsIgnoreCase("true")});
} else if (value.endsWith("f")) {
// float
return new Pair<>(
TSDataType.FLOAT, new float[] {Float.parseFloat(value.substring(0, value.length() - 1))});
} else if (value.endsWith("i32")) {
// int
return new Pair<>(
TSDataType.INT32, new int[] {Integer.parseInt(value.substring(0, value.length() - 3))});
} else if (value.endsWith("u") || value.endsWith("i")) {
// long
return new Pair<>(
TSDataType.INT64, new long[] {Long.parseLong(value.substring(0, value.length() - 1))});
} else {
// double
return new Pair<>(TSDataType.DOUBLE, new double[] {Double.parseDouble(value)});
}
}

@Override
public String getName() {
// set the value of mqtt_payload_formatter in iotdb-common.properties as the following string:
return "CustomizedJson";
return "CustomizedJson2Table";
}

@Override
public String getType() {
return PayloadFormatter.TREE_TYPE;
return PayloadFormatter.TABLE_TYPE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.mqtt.server;

import org.apache.iotdb.db.protocol.mqtt.Message;
import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
import org.apache.iotdb.db.protocol.mqtt.TableMessage;

import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.tsfile.enums.TSDataType;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class CustomizedLinePayloadFormatter implements PayloadFormatter {

@Override
public List<Message> format(String topic, ByteBuf payload) {
// Suppose the payload is a line format
if (payload == null) {
return null;
}
Comment on lines +40 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to keep consistency between examples.


String line = payload.toString(StandardCharsets.UTF_8);
// parse data from the line and generate Messages and put them into List<Meesage> ret
List<Message> ret = new ArrayList<>();
// this is just an example, so we just generate some Messages directly
for (int i = 0; i < 3; i++) {
Comment on lines +44 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line should still be used, otherwise, this examples means nothing.

long ts = i;
TableMessage message = new TableMessage();

// Parsing Database Name
message.setDatabase("db" + i);

// Parsing Table Names
message.setTable("t" + i);

// Parsing Tags
List<String> tagKeys = new ArrayList<>();
tagKeys.add("tag1" + i);
tagKeys.add("tag2" + i);
List<Object> tagValues = new ArrayList<>();
tagValues.add("t_value1" + i);
tagValues.add("t_value2" + i);
message.setTagKeys(tagKeys);
message.setTagValues(tagValues);

// Parsing Attributes
List<String> attributeKeys = new ArrayList<>();
List<Object> attributeValues = new ArrayList<>();
attributeKeys.add("attr1" + i);
attributeKeys.add("attr2" + i);
attributeValues.add("a_value1" + i);
attributeValues.add("a_value2" + i);
message.setAttributeKeys(attributeKeys);
message.setAttributeValues(attributeValues);

// Parsing Fields
List<String> fields = Arrays.asList("field1" + i, "field2" + i);
List<TSDataType> dataTypes = Arrays.asList(TSDataType.FLOAT, TSDataType.FLOAT);
List<Object> values = Arrays.asList("4.0" + i, "5.0" + i);
message.setFields(fields);
message.setDataTypes(dataTypes);
message.setValues(values);

//// Parsing timestamp
message.setTimestamp(ts);
ret.add(message);
}
return ret;
}

@Override
@Deprecated
public List<Message> format(ByteBuf payload) {
throw new NotImplementedException();
}

@Override
public String getName() {
// set the value of mqtt_payload_formatter in iotdb-system.properties as the following string:
return "CustomizedLine";
}

@Override
public String getType() {
return PayloadFormatter.TABLE_TYPE;
}
}
42 changes: 40 additions & 2 deletions example/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ public static void main(String[] args) throws Exception {
connection.connect();
// the config mqttPayloadFormatter must be tree-json
// jsonPayloadFormatter(connection);

// the config mqttPayloadFormatter must be table-line
linePayloadFormatter(connection);
// linePayloadFormatter(connection);

// test customized json formatter of mqtt payload to insert as table row
customizedJsonPayloadFormatter2Table(connection);
connection.disconnect();
}

Expand All @@ -58,7 +62,10 @@ private static void jsonPayloadFormatter(BlockingConnection connection) throws E
+ "\"values\":[%f]\n"
+ "}",
System.currentTimeMillis(), random.nextDouble());
sb.append(payload).append(",");
sb.append(payload);
if (i < 9) {
sb.append(",");
}

// publish a json object
Thread.sleep(1);
Expand Down Expand Up @@ -109,4 +116,35 @@ private static void linePayloadFormatter(BlockingConnection connection) throws E
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
Thread.sleep(10);
}

/**
* The Customized JSON payload formatter. one json format supported: { "time":1586076045523,
* "deviceID":"car_1", "deviceType":"新能源车", "point":"速度", "value":80.0 }
*/
private static void customizedJsonPayloadFormatter2Table(BlockingConnection connection)
throws Exception {
Random random = new Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10; i++) {
String payload =
String.format(
"{\n"
+ "\"time\":%d,\n"
+ "\"deviceID\":\"car_1\",\n"
+ "\"deviceType\":\"新能源车\",\n"
+ "\"point\":\"速度\",\n"
+ "\"value\":%.2f\n"
+ "}",
System.currentTimeMillis(), random.nextFloat());
sb.append(payload).append(",");

// publish a json object
Thread.sleep(1);
connection.publish(DATABASE + "/myTopic", payload.getBytes(), QoS.AT_LEAST_ONCE, false);
}
// publish a json array
sb.insert(0, "[");
sb.replace(sb.lastIndexOf(","), sb.length(), "]");
connection.publish(DATABASE + "/myTopic", sb.toString().getBytes(), QoS.AT_LEAST_ONCE, false);
}
}
Loading