Skip to content

Commit

Permalink
Merge pull request #60 from jpolchlo/refactor/process-osm
Browse files Browse the repository at this point in the history
Import implementation from OSMesa project
  • Loading branch information
jpolchlo authored Mar 22, 2019
2 parents 1d5596c + 12e2da2 commit cd4b6e7
Show file tree
Hide file tree
Showing 122 changed files with 3,625 additions and 3,754 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ target

derby.log
metastore_db/*
bench/target/
idea.sbt
mainRunner/
211 changes: 165 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,64 +1,183 @@
VectorPipe
==========
# VectorPipe #

[![Build Status](https://travis-ci.org/geotrellis/vectorpipe.svg?branch=master)](https://travis-ci.org/geotrellis/vectorpipe)
[![Bintray](https://img.shields.io/bintray/v/azavea/maven/vectorpipe.svg)](https://bintray.com/azavea/maven/vectorpipe)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/447170921bc94b3fb494bb2b965c2235)](https://www.codacy.com/app/fosskers/vectorpipe?utm_source=github.com&utm_medium=referral&utm_content=geotrellis/vectorpipe&utm_campaign=Badge_Grade)

A pipeline for mass conversion of Vector data (OpenStreetMap, etc.) into
Mapbox VectorTiles. Powered by [GeoTrellis](http://geotrellis.io) and
[Apache Spark](http://spark.apache.org/).
VectorPipe (VP) is a library for working with OpenStreetMap (OSM) vector
data. Powered by [Geotrellis](http://geotrellis.io) and [Apache
Spark](http://spark.apache.org/).

Overview
--------
OSM provides a wealth of data which has broad coverage and a deep history.
This comes at the price of very large size which can make accessing the power
of OSM difficult. VectorPipe can help by making OSM processing in Apache
Spark possible, leveraging large computing clusters to churn through the large
volume of, say, an OSM full history file.

![](docs/pipeline.png)
For those cases where an application needs to process incoming changes, VP
also provides streaming Spark `DataSource`s for changesets, OsmChange files,
and Augmented diffs generated by Overpass.

There are four main stages here which represent the four main shapes of
data in the pipeline:
For ease of use, the output of VP imports is a Spark DataFrame containing
columns of JTS `Geometry` objects, enabled by the user-defined types provided
by [GeoMesa](https://github.com/locationtech/geomesa). That package also
provides functions for manipulating those geometries via Spark SQL directives.

1. Unprocessed Vector (geometric) Data
2. Clipped GeoTrellis `Feature`s organized into a Grid on the earth
3. VectorTiles organized into a Grid on the earth
4. Fully processed VectorTiles, output to some target
The final important contribution is a set of functions for exporting
geometries to vector tiles. This leans on the `geotrellis-vectortile`
package.

Of these, Stage 4 is left to the user to leverage GeoTrellis directly in
their own application. Luckily the `RDD[(SpatialKey, VectorTile)] => Unit`
operation only requires about 5 lines of code. Stages 1 to 3 then are the
primary concern of Vectorpipe.
## Getting Started ##

### Processing Raw Data
The fastest way to get started with VectorPipe is to invoke `spark-shell` and
load the package jars from the Bintray repository:
```bash
spark-shell --packages com.azavea:vectorpipe_2.11:1.0.0 --repositories http://dl.bintray.com/azavea/maven
```

For each data source that has first-class support, we expose a
`vectorpipe.*` module with a matching name. Example: `vectorpipe.osm`. These
modules expose all the types and functions necessary for transforming the
raw data into the "Middle Ground" types.
This will download the required components and set up a REPL with VectorPipe
available. At which point, you may issue
```scala
// Make JTS types available to Spark
import org.locationtech.geomesa.spark.jts._
spark.withJTS

No first-class support for your favourite data source? Want to write it
yourself, and maybe even keep it private? That's okay, just provide the
function `YourData => RDD[Feature[G, D]]` and VectorPipe can handle the
rest.
import vectorpipe._
```
and begin using the package.

### Clipping Features into a Grid
#### A Note on Cluster Computing ####

GeoTrellis has a consistent `RDD[(K, V)]` pattern for handling grids of
tiled data, where `K` is the grid index and `V` is the actual value type.
Before `RDD[(SpatialKey, VectorTile)]` can be achieved, we need to convert
our gridless `RDD[Feature[G, D]]` into such a grid, such that each Feature's
`Geometry` is reasonably clipped to the size of an individual tile. Depending
on which clipping function you choose (from the `vectorpipe.Clip` object, or
even your own custom one) the shape of the clipped Geometry will vary. See
our Scaladocs for more detail on the available options.
Your local machine is probably insufficient for dealing with very large OSM
files. We recommend the use of Amazon's Elastic Map Reduce (EMR) service to
provision substantial clusters of computing resources. You'll want to supply
Spark, Hive, and Hadoop to your cluster, with Spark version 2.3. Creating a
cluster with EMR version between 5.13 and 5.19 should suffice. From there,
`ssh` into the master node and run `spark-shell` as above for an interactive
environment, or use `spark-submit` for batch jobs. (You may submit Steps to
the EMR cluster using `spark-submit` as well.)

### Collating Feature Groups into a VectorTile
### Importing Data ###

Once clipped and gridded by `VectorPipe.toGrid`, we have a `RDD[(SpatialKey,
Iterable[Feature[G, D]])]` that represents all the Geometry fragments
present at each tiled location on the earth. This is the perfect shape to
turn into a `VectorTile`. To do so, we need to choose a *Collator* function,
which determines what VectorTile Layer each `Feature` should be placed into,
and how (if at all) its corresponding metadata (the `D`) should be
processed.
Batch analysis can be performed in a few different ways. Perhaps the fastest
way is to procure an OSM PBF file from a source such as
[GeoFabrik](https://download.geofabrik.de/index.html), which supplies various
extracts of OSM, including the full planet worth of data.

Want to write your own Collator? The `Collate.generically` function will be
of interest to you.
VectorPipe does not provide the means to directly read these OSM PBF files,
however, and a conversion to a useful file format will thus be needed. We
suggest using [`osm2orc`](https://github.com/mojodna/osm2orc) to convert your
source file to the ORC format which can be read natively via Spark:
```scala
val df = spark.read.orc(path)
```
The resulting `DataFrame` can be processed with VectorPipe.

It is also possible to read from a cache of
[OsmChange](https://wiki.openstreetmap.org/wiki/OsmChange) files directly
rather than convert the PBF file:
```scala
import vectorpipe.sources.Source
val df = spark.read
.format(Source.Changes)
.options(Map[String, String](
Source.BaseURI -> "https://download.geofabrik.de/europe/isle-of-man-updates/",
Source.StartSequence -> "2080",
Source.EndSequence -> "2174",
Source.BatchSize -> "1"))
.load
.persist // recommended to avoid rereading
```
(Note that the start and end sequence will shift over time for Geofabrik.
Please navigate to the base URI to determine these values, otherwise timeouts
may occur.) This may issue errors, but should complete. This is much slower
than using ORC files and is much touchier, but it stands as an option.

[It is also possible to build a dataframe from a stream of changesets in a
similar manner as above. Changesets carry additional metadata regarding the
author of the changes, but none of the geometric information. These tables
can be joined on `changeset`.]

In either case, a useful place to start is to convert the incoming dataframe
into a more usable format. We recommend calling
```scala
val geoms = OSM.toGeometry(df)
```
which will produce a frame consisting of "top-level" entities, which is to say
nodes that don't participate in a way, ways that don't participate in
relations, and a subset of the relations from the OSM data. The resulting
dataframe will represent these entities with JTS geometries in the `geom`
column.

The `toGeometry` function keeps elements that fit one of the following
descriptions:
- points from tagged nodes (including tags that really ought to be dropped—e.g. `source=*`);
- polygons derived from ways with tags that cause them to be considered as areas;
- lines from ways lacking area tags;
- multipolygons from multipolygon or boundary relations; and
- multilinestrings from route relations.

It is also possible to filter the results based on information in the tags.
For instance, all buildings can be found as
```scala
import vectorpipe.functions.osm._
val buildings = geoms.filter(isBuilding('tags))
```

Again, the JTS user defined types allow for easier manipulation of and
calculation from geometric types. See
[here](https://www.geomesa.org/documentation/user/spark/sparksql_functions.html)
for a list of functions that operate on geometries.

## The `internal` package ##

While most users will rely solely on the features exposed by the `OSM` object,
finer-grained control of the output of the process—say, if one does not need
relations, for example—is available through the `vectorpipe.internal`
package.

There is a significant caveat here: there are two schemas that are
found in the system when working with imported OSM dataframes. The difference
is in the type of a sub-field of the `members` list. This can cause errors of
the form
```
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Byte
```
when using the `internal` package methods.

These type problems can be fixed by calling
`vectorpipe.functions.osm.ensureCompressedMembers` on the input OSM data frame
before passing to any relation-generating functions, such as
`reconstructRelationGeometries`. Top-level functions in the `OSM` object
handle this conversion for you. Note that this only affects the data frames
carrying the initially imported OSM data.

## Local Development ##

If you are intending to contribute to VectorPipe, you may need to work with a
development version. If that is the case, instead of loading from Bintray,
you will need to build a fat jar using
```bash
./sbt assembly
```
and following that,
```bash
spark-shell --jars target/scala_2.11/vectorpipe.jar
```

### IntelliJ IDEA

When developing with IntelliJ IDEA, the sbt plugin will see Spark dependencies
as provided, which will prevent them from being indexed properly, resulting in
errors / warnings within the IDE. To fix this, create `idea.sbt` at the root of
the project:

```scala
import Dependencies._

lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
libraryDependencies ++= Seq(
sparkSql % Compile
)
)
```
9 changes: 1 addition & 8 deletions bench/src/main/scala/vectorpipe/Bench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package vectorpipe

import java.util.concurrent.TimeUnit

import geotrellis.vector.{ Extent, MultiLine, Point, Line }
import geotrellis.vector.{Extent, Line, Point}
import org.openjdk.jmh.annotations._

// --- //
Expand All @@ -21,11 +21,4 @@ class LineBench {
List.range(4, -100, -2).map(n => Point(n, 1)) ++ List(Point(-3,4), Point(-1,4), Point(2,4), Point(4,4))
)
}

// @Benchmark
// def java: MultiLine = Clip.toNearestPointJava(extent, line)

@Benchmark
def tailrec: MultiLine = Clip.toNearestPoint(extent, line)

}
Loading

0 comments on commit cd4b6e7

Please sign in to comment.