Skip to content

Commit

Permalink
[Issue-168]: Update artifact versions to 0.4.0 and related fixes (#173)
Browse files Browse the repository at this point in the history
* Updated Pravega, Flink and Hadoop connector version to 0.4.0
* Updated samples version to 0.4.0
* Fixed minor issues from Hadoop samples

Signed-off-by: Raúl Gracia <raul.gracia@emc.com>
  • Loading branch information
RaulGracia authored and vijikarthi committed Feb 8, 2019
1 parent 9deadbc commit 6f4edb0
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 35 deletions.
8 changes: 4 additions & 4 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
### dependencies
pravegaVersion=0.4.0-50.da91e55-SNAPSHOT
flinkConnectorVersion=0.4.0-96.54ccab2-SNAPSHOT
pravegaVersion=0.4.0
flinkConnectorVersion=0.4.0

### Pravega-samples output library
samplesVersion=0.4.0-SNAPSHOT
samplesVersion=0.4.0

### Flink-connector examples
flinkVersion=1.6.0
Expand All @@ -21,4 +21,4 @@ flinkVersion=1.6.0
hadoopVersion=2.8.1
scalaVersion=2.11.8
sparkVersion=2.2.0
hadoopConnectorVersion=0.4.0-21.28e4b47-SNAPSHOT
hadoopConnectorVersion=0.4.0
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public RecordWriter<String, V> getRecordWriter(TaskAttemptContext context) throw
new IOException("The Pravega controller URI must be configured (" + URI_STRING + ")"));
final String deserializerClassName = Optional.ofNullable(conf.get(DESERIALIZER)).orElseThrow(() ->
new IOException("The event deserializer must be configured (" + DESERIALIZER + ")"));
final int segments = Integer.parseInt(Optional.of(conf.get(STREAM_SEGMENTS)).orElse("3"));
final int segments = Integer.parseInt(conf.get(STREAM_SEGMENTS, "3"));

StreamManager streamManager = StreamManager.create(controllerURI);
streamManager.createScope(scopeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ public int run(String[] args) throws Exception {
LOG.info("starting");
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
getConf().setStrings("pravega.uri", args[2]);
getConf().setStrings("pravega.scope", args[3]);
getConf().setStrings("pravega.stream", args[4]);
getConf().setStrings("pravega.output.stream.prefix", args[5]);
getConf().setStrings("pravega.deserializer", TextSerializer.class.getName());
getConf().setStrings("input.pravega.uri", args[2]);
getConf().setStrings("input.pravega.scope", args[3]);
getConf().setStrings("input.pravega.stream", args[4]);
getConf().setStrings("input.pravega.output.stream.prefix", args[5]);
getConf().setStrings("input.pravega.deserializer", TextSerializer.class.getName());
Job job = Job.getInstance(getConf());

boolean useSimplePartitioner = getUseSimplePartitioner(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public int run(String[] args) throws Exception {
LOG.info("starting");
Path inputDir = new Path(args[0]);
Path outputDir = new Path(args[1]);
getConf().setStrings("pravega.uri", args[2]);
getConf().setStrings("pravega.scope", args[3]);
getConf().setStrings("pravega.stream", args[4]);
getConf().setStrings("pravega.deserializer", TextSerializer.class.getName());
getConf().setStrings("input.pravega.uri", args[2]);
getConf().setStrings("input.pravega.scope", args[3]);
getConf().setStrings("input.pravega.stream", args[4]);
getConf().setStrings("input.pravega.deserializer", TextSerializer.class.getName());

getConf().setInt(MRJobConfig.NUM_MAPS, 1);
Job job = Job.getInstance(getConf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public int run(String[] args) throws Exception {

conf.setStrings("pravega.uri", args[1]);
conf.setStrings("pravega.scope", args[2]);
conf.setStrings("pravega.out.stream", args[3]);
conf.setStrings("pravega.stream", args[3]);
conf.setStrings("pravega.deserializer", TextSerializer.class.getName());

Job job = Job.getInstance(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,22 @@ public static void main(String[] args) throws Exception {

String uri = otherArgs[1], scope = otherArgs[2], stream = otherArgs[3], out = otherArgs[4];

conf.setStrings("pravega.uri", uri);
conf.setStrings("pravega.scope", scope);
conf.setStrings("pravega.stream", stream);
conf.setStrings("pravega.deserializer", TextSerializer.class.getName());
conf.setStrings("input.pravega.uri", uri);
conf.setStrings("input.pravega.scope", scope);
conf.setStrings("input.pravega.stream", stream);
conf.setStrings("input.pravega.deserializer", TextSerializer.class.getName());

if (otherArgs.length >= 6) {
conf.setStrings("pravega.startpositions", otherArgs[5]);
conf.setStrings("input.pravega.startpositions", otherArgs[5]);
}

String endPos = "";
if (otherArgs.length >= 7) {
endPos = otherArgs[6];
} else {
endPos = PravegaInputFormat.fetchLatestPositionsJson(uri, scope, stream);
endPos = PravegaInputFormat.fetchLatestPosition(uri, scope, stream);
}
conf.setStrings("pravega.endpositions", endPos);
conf.setStrings("input.pravega.endpositions", endPos);

Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ public int run(String[] args) throws Exception {

Configuration conf = getConf();

conf.setStrings("pravega.uri", args[1]);
conf.setStrings("pravega.scope", args[2]);
conf.setStrings("pravega.stream", args[3]);
conf.setStrings("pravega.deserializer", TextSerializer.class.getName());
conf.setStrings("input.pravega.uri", args[1]);
conf.setStrings("input.pravega.scope", args[2]);
conf.setStrings("input.pravega.stream", args[3]);
conf.setStrings("input.pravega.deserializer", TextSerializer.class.getName());

Job job = Job.getInstance(conf, "word mean");
job.setJarByClass(WordMean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ public int run(String[] args) throws Exception {
setConf(new Configuration());
Configuration conf = getConf();

conf.setStrings("pravega.uri", args[1]);
conf.setStrings("pravega.scope", args[2]);
conf.setStrings("pravega.stream", args[3]);
conf.setStrings("pravega.deserializer", TextSerializer.class.getName());
conf.setStrings("input.pravega.uri", args[1]);
conf.setStrings("input.pravega.scope", args[2]);
conf.setStrings("input.pravega.stream", args[3]);
conf.setStrings("input.pravega.deserializer", TextSerializer.class.getName());

Job job = Job.getInstance(conf, "word median");
job.setJarByClass(WordMedian.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package io.pravega.example.spark.wordcount;

import io.pravega.connectors.hadoop.EventKey;
import io.pravega.connectors.hadoop.PravegaConfig;
import io.pravega.connectors.hadoop.PravegaInputFormat;
import io.pravega.example.hadoop.wordcount.TextSerializer;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -40,10 +41,10 @@ public static void main(String[] args) throws Exception {
System.exit(2);
}

conf.setStrings(PravegaInputFormat.URI_STRING, remainingArgs[0]);
conf.setStrings(PravegaInputFormat.SCOPE_NAME, remainingArgs[1]);
conf.setStrings(PravegaInputFormat.STREAM_NAME, remainingArgs[2]);
conf.setStrings(PravegaInputFormat.DESERIALIZER, TextSerializer.class.getName());
conf.setStrings(PravegaConfig.INPUT_URI_STRING, remainingArgs[0]);
conf.setStrings(PravegaConfig.INPUT_SCOPE_NAME, remainingArgs[1]);
conf.setStrings(PravegaConfig.INPUT_STREAM_NAME, remainingArgs[2]);
conf.setStrings(PravegaConfig.INPUT_DESERIALIZER, TextSerializer.class.getName());

JavaSparkContext sc = new JavaSparkContext(new SparkConf());

Expand Down
2 changes: 1 addition & 1 deletion scenarios/pravega-flink-connector-sql-samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The above command loads the taxi dataset records to Pravega and prepares the env

2) Popular Destination

> bin/tableapi-samples --runApp PopularDestination
> bin/tableapi-samples --runApp PopularDestinationQuery
The above command uses SQL to find the most popular destination (drop-off location) from the available trip records.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public static void main(String ... args) {
handler = new MaxTravellersPerDestination(args);
} else {
log.error(usage.toString());
return;
}
handler.handleRequest();
}
Expand Down

0 comments on commit 6f4edb0

Please sign in to comment.