Skip to content

Commit

Permalink
Merge pull request #46 from akvelon/java-mongodb
Browse files Browse the repository at this point in the history
Add Java MongoDB read/write pipelines
  • Loading branch information
Abacn authored Feb 21, 2024
2 parents 5dd615a + 5ff680b commit 5d0585b
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 0 deletions.
35 changes: 35 additions & 0 deletions Java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,41 @@
<artifactId>beam-sdks-java-io-jdbc</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-mongodb -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-mongodb</artifactId>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.11</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>jdk8</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-lite</artifactId>
</exclusion>
<exclusion>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.cloud.sql/mysql-socket-factory-connector-j-8 -->
<dependency>
<groupId>com.google.cloud.sql</groupId>
Expand Down
91 changes: 91 additions & 0 deletions Java/src/main/java/mongodb/ReadMongoDB.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mongodb;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadMongoDB {

private static final Logger LOG =
LoggerFactory.getLogger(ReadMongoDB.class);

private static final String ID_COLUMN = "id";
private static final String NAME_COLUMN = "name";

/**
* Pipeline options for read from MongoDB.
*/
public interface ReadMongoDbOptions extends PipelineOptions {
@Description("The MongoDB connection string following the URI format")
@Default.String("mongodb://localhost:27017")
String getUri();

void setUri(String uri);

@Description("The MongoDB database name")
@Validation.Required
String getDbName();

void setDbName(String dbName);

@Description("The MongoDB collection name")
@Validation.Required
String getCollection();

void setCollection(String collection);
}

public static void main(String[] args) {
ReadMongoDbOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation().as(ReadMongoDbOptions.class);

Pipeline p = Pipeline.create(options);

p.apply(
"Read from MongoDB",
MongoDbIO.read()
.withUri(options.getUri())
.withDatabase(options.getDbName())
.withCollection(options.getCollection()))
.apply(
"Log Data",
ParDo.of(
new DoFn<Document, Document>() {
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
LOG.info(
"Id = {}, Name = {}",
c.element().get(ID_COLUMN),
c.element().get(NAME_COLUMN));
c.output(c.element());
}
}));

p.run();
}
}
85 changes: 85 additions & 0 deletions Java/src/main/java/mongodb/WriteMongoDB.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mongodb;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Create;
import org.bson.Document;

import java.util.Arrays;
import java.util.List;

public class WriteMongoDB {

private static final String ID_COLUMN = "id";
private static final String NAME_COLUMN = "name";

/**
* Pipeline options for write to MongoDB.
*/
public interface WriteMongoDbOptions extends PipelineOptions {
@Description("The MongoDB connection string following the URI format")
@Default.String("mongodb://localhost:27017")
String getUri();

void setUri(String uri);

@Description("The MongoDB database name")
@Validation.Required
String getDbName();

void setDbName(String dbName);

@Description("The MongoDB collection name")
@Validation.Required
String getCollection();

void setCollection(String collection);
}

public static void main(String[] args) {
WriteMongoDbOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation().as(WriteMongoDbOptions.class);

Pipeline p = Pipeline.create(options);

List<Document> rows = Arrays.asList(
new Document(ID_COLUMN, 1).append(NAME_COLUMN, "Charles"),
new Document(ID_COLUMN, 2).append(NAME_COLUMN, "Alice"),
new Document(ID_COLUMN, 3).append(NAME_COLUMN, "Bob"),
new Document(ID_COLUMN, 4).append(NAME_COLUMN, "Amanda"),
new Document(ID_COLUMN, 5).append(NAME_COLUMN, "Alex"),
new Document(ID_COLUMN, 6).append(NAME_COLUMN, "Eliza")
);

p.apply("Create", Create.of(rows))
.apply(
"Write to MongoDB",
MongoDbIO.write()
.withUri(options.getUri())
.withDatabase(options.getDbName())
.withCollection(options.getCollection()));

p.run();
}
}

0 comments on commit 5d0585b

Please sign in to comment.