Skip to content

Commit 8d0aea2

Browse files
committed
[#1040] misc changes in runtime names, ad missing fields in schema
1 parent 8351fa0 commit 8d0aea2

File tree

5 files changed

+35
-22
lines changed

5 files changed

+35
-22
lines changed

streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/latlngtojtspoint/LatLngToJtsPointProcessor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
4747
private static final String LAT_KEY = "latitude-key";
4848
private static final String LNG_KEY = "longitude-key";
4949
private static final String EPSG_KEY = "epsg-key";
50-
private static final String WKT_RUNTIME = "geomWKT";
50+
private static final String GEOMETRY_RUNTIME = "geometry";
5151
private String latitudeMapper;
5252
private String longitudeMapper;
5353
private String epsgMapper;
@@ -76,7 +76,7 @@ public DataProcessorDescription declareModel() {
7676
.outputStrategy(
7777
OutputStrategies.append(
7878
PrimitivePropertyBuilder
79-
.create(Datatypes.String, WKT_RUNTIME)
79+
.create(Datatypes.String, GEOMETRY_RUNTIME)
8080
.domainProperty("http://www.opengis.net/ont/geosparql#Geometry")
8181
.build()
8282
)
@@ -101,7 +101,11 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
101101
Point geom = SpGeometryBuilder.createSPGeom(lng, lat, epsg);
102102

103103
if (!geom.isEmpty()) {
104-
event.addField(WKT_RUNTIME, geom.toString());
104+
// if activated the stream fails at all
105+
//event.removeFieldBySelector(latitudeMapper);
106+
//event.removeFieldBySelector(longitudeMapper);
107+
event.addField(GEOMETRY_RUNTIME, geom.toString());
108+
105109
LOG.debug("Created Geometry: " + geom.toString());
106110
collector.collect(event);
107111
} else {

streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/reprojection/ReprojectionProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class ReprojectionProcessor extends StreamPipesDataProcessor {
4949
public static final String GEOM_KEY = "geom-key";
5050
public static final String SOURCE_EPSG_KEY = "source-epsg-key";
5151
public static final String TARGET_EPSG_KEY = "target-epsg-key";
52-
public static final String GEOM_RUNTIME = "geomWKT";
52+
public static final String GEOMETRY_RUNTIME = "geometry";
5353
public static final String EPSG_RUNTIME = "epsg";
5454
private String geometryMapper;
5555
private String sourceEpsgMapper;
@@ -134,7 +134,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
134134

135135
if (!reprojected.isEmpty()) {
136136
event.updateFieldBySelector("s0::" + EPSG_RUNTIME, targetEpsg);
137-
event.updateFieldBySelector("s0::" + GEOM_RUNTIME, reprojected.toText());
137+
event.updateFieldBySelector("s0::" + GEOMETRY_RUNTIME, reprojected.toText());
138138

139139
collector.collect(event);
140140
} else {

streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/jts/processor/trajectory/TrajectoryFromPointsProcessor.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
5151
private static final String SUBPOINTS_KEY = "subpoints-key";
5252
private static final String DESCRIPTION_KEY = "description-key";
5353
private static final String TRAJECTORY_KEY = "trajectory-key";
54-
private static final String TRAJECTORY_RUNTIME = "trajectoryWKT";
55-
private static final String DESCRIPTION_RUNTIME = "trajectoryDescription";
54+
private static final String TRAJECTORY_GEOMETRY_RUNTIME = "trajectory-geometry";
55+
private static final String TRAJECTORY_EPSG_RUNTIME = "trajectory-epsg";
56+
private static final String DESCRIPTION_RUNTIME = "trajectory-description";
5657
private String pointMapper;
5758
private String epsgMapper;
5859
private String mValueMapper;
@@ -96,8 +97,12 @@ public DataProcessorDescription declareModel() {
9697
SO.TEXT),
9798
EpProperties.stringEp(
9899
Labels.withId(TRAJECTORY_KEY),
99-
TRAJECTORY_RUNTIME,
100-
"http://www.opengis.net/ont/geosparql#Geometry")
100+
TRAJECTORY_GEOMETRY_RUNTIME,
101+
"http://www.opengis.net/ont/geosparql#Geometry"),
102+
EpProperties.integerEp(
103+
Labels.withId(EPSG_KEY),
104+
TRAJECTORY_EPSG_RUNTIME,
105+
"http://data.ign.fr/def/ignf#CartesianCS")
101106
)
102107
)
103108
.build();
@@ -131,7 +136,8 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
131136
LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
132137
// adds to stream
133138
event.addField(DESCRIPTION_RUNTIME, trajectory.getDescription());
134-
event.addField(TRAJECTORY_RUNTIME, geom.toString());
139+
event.addField(TRAJECTORY_GEOMETRY_RUNTIME, geom.toString());
140+
event.addField(TRAJECTORY_EPSG_RUNTIME, epsg);
135141
collector.collect(event);
136142
}
137143

streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/distancecalculator/haversine/HaversineDistanceCalculatorProcessor.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,30 @@
2424
import org.apache.streampipes.model.runtime.Event;
2525
import org.apache.streampipes.model.schema.PropertyScope;
2626
import org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
27+
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
2728
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
2829
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
29-
import org.apache.streampipes.sdk.helpers.EpProperties;
3030
import org.apache.streampipes.sdk.helpers.EpRequirements;
3131
import org.apache.streampipes.sdk.helpers.Labels;
3232
import org.apache.streampipes.sdk.helpers.Locales;
3333
import org.apache.streampipes.sdk.helpers.OutputStrategies;
3434
import org.apache.streampipes.sdk.utils.Assets;
35+
import org.apache.streampipes.sdk.utils.Datatypes;
3536
import org.apache.streampipes.vocabulary.Geo;
3637
import org.apache.streampipes.vocabulary.SO;
3738
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
3839
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
3940
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
4041
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
4142

43+
import java.net.URI;
44+
4245
public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcessor {
4346
private static final String LAT_1_KEY = "lat1";
4447
private static final String LONG_1_KEY = "long1";
4548
private static final String LAT_2_KEY = "lat2";
4649
private static final String LONG_2_KEY = "long2";
47-
private static final String CALCULATED_DISTANCE_KEY = "calculatedDistance";
50+
private static final String DISTANCE_RUNTIME_NAME = "distance";
4851
String lat1FieldMapper;
4952
String long1FieldMapper;
5053
String lat2FieldMapper;
@@ -69,12 +72,12 @@ public DataProcessorDescription declareModel() {
6972
Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
7073
.build()
7174
)
72-
.outputStrategy(OutputStrategies
73-
.append(EpProperties.numberEp(
74-
Labels.withId(CALCULATED_DISTANCE_KEY),
75-
"distance",
76-
SO.NUMBER))
77-
)
75+
.outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
76+
.create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
77+
.domainProperty(SO.NUMBER)
78+
.measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
79+
.build())
80+
)
7881
.build();
7982
}
8083

@@ -99,7 +102,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
99102

100103
double resultDist = HaversineDistanceUtil.dist(lat1, long1, lat2, long2);
101104

102-
event.addField("distance", resultDist);
105+
event.addField(DISTANCE_RUNTIME_NAME, resultDist);
103106

104107
collector.collect(event);
105108
}

streampipes-extensions/streampipes-processors-geo-jvm/src/main/java/org/apache/streampipes/processors/geo/jvm/latlong/processor/speedcalculator/SpeedCalculatorProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
4949
private static final String LATITUDE_KEY = "latitude-key";
5050
private static final String LONGITUDE_KEY = "longitude-key";
5151
private static final String COUNT_WINDOW_KEY = "count-window-key";
52-
private static final String SPEED_KEY = "speed-key";
52+
private static final String SPEED_RUNTIME_NAME = "speed";
5353
private String latitudeFieldMapper;
5454
private String longitudeFieldMapper;
5555
private String timestampFieldMapper;
@@ -75,7 +75,7 @@ public DataProcessorDescription declareModel() {
7575
.requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
7676
.outputStrategy(
7777
OutputStrategies.append(PrimitivePropertyBuilder
78-
.create(Datatypes.Float, SPEED_KEY)
78+
.create(Datatypes.Float, SPEED_RUNTIME_NAME)
7979
.domainProperty(SO.NUMBER)
8080
.measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour"))
8181
.build())
@@ -98,7 +98,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
9898
if (this.buffer.isFull()) {
9999
Event firstEvent = (Event) buffer.get();
100100
double speed = calculateSpeed(firstEvent, event);
101-
event.addField(SPEED_KEY, speed);
101+
event.addField(SPEED_RUNTIME_NAME, speed);
102102
collector.collect(event);
103103
}
104104
this.buffer.add(event);

0 commit comments

Comments
 (0)