Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-431] Add RS_PixelAsPoints #1125

Merged
merged 21 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@
import org.geotools.coverage.grid.GridCoordinates2D;
import org.geotools.coverage.grid.GridCoverage2D;
import org.geotools.geometry.DirectPosition2D;
import org.geotools.referencing.operation.transform.AffineTransform2D;
import org.locationtech.jts.geom.*;
import org.opengis.coverage.PointOutsideCoverageException;
import org.opengis.geometry.DirectPosition;
import org.opengis.referencing.FactoryException;
import org.opengis.referencing.operation.TransformException;

import java.awt.geom.Point2D;
import java.awt.image.Raster;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

public class PixelFunctions
{
Expand Down Expand Up @@ -91,6 +92,43 @@ public static Geometry getPixelAsPoint(GridCoverage2D raster, int colX, int rowY
return GEOMETRY_FACTORY.createPoint(pointCoord);
}

public static List<PixelRecord> getPixelAsPoints(GridCoverage2D rasterGeom, int band) throws TransformException, FactoryException {
RasterUtils.ensureBand(rasterGeom, band);

int width = RasterAccessors.getWidth(rasterGeom);
int height = RasterAccessors.getHeight(rasterGeom);

Raster r = RasterUtils.getRaster(rasterGeom.getRenderedImage());
double[] pixels = r.getSamples(0, 0, width, height, band - 1, (double[]) null);

AffineTransform2D gridToCRS = RasterUtils.getGDALAffineTransform(rasterGeom);
double cellSizeX = gridToCRS.getScaleX();
double cellSizeY = gridToCRS.getScaleY();
double shearX = gridToCRS.getShearX();
double shearY = gridToCRS.getShearY();

int srid = RasterAccessors.srid(rasterGeom);
GeometryFactory geometryFactory = srid != 0 ? new GeometryFactory(new PrecisionModel(), srid) : GEOMETRY_FACTORY;

Point2D upperLeft = RasterUtils.getWorldCornerCoordinates(rasterGeom, 1, 1);
List<PixelRecord> pointRecords = new ArrayList<>();

for (int y = 1; y <= height; y++) {
jiayuasu marked this conversation as resolved.
Show resolved Hide resolved
for (int x = 1; x <= width; x++) {
double pixelValue = pixels[(y - 1) * width + (x - 1)];

double worldX = upperLeft.getX() + (x - 1) * cellSizeX + (y - 1) * shearX;
double worldY = upperLeft.getY() + (y - 1) * cellSizeY + (x - 1) * shearY;

Coordinate pointCoord = new Coordinate(worldX, worldY);
Geometry pointGeom = geometryFactory.createPoint(pointCoord);
pointRecords.add(new PixelRecord(pointGeom, pixelValue, x, y));
}
}

return pointRecords;
}

public static List<Double> values(GridCoverage2D rasterGeom, int[] xCoordinates, int[] yCoordinates, int band) throws TransformException {
RasterUtils.ensureBand(rasterGeom, band); // Check for invalid band index
int numBands = rasterGeom.getNumSampleDimensions();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.apache.sedona.common.raster;

import org.locationtech.jts.geom.Geometry;

public class PixelRecord {
public final Geometry geom;
public final double value;
public final int colX, rowY;

public PixelRecord(Geometry geom, double value, int colX, int rowY) {
this.geom = geom;
this.value = value;
this.colX = colX;
this.rowY = rowY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,93 @@ public void testPixelAsPointFromRasterFile() throws IOException, TransformExcept
assertEquals(expectedY, coordinate.getY(), 0.2d);
}

@Test
public void testPixelAsPointSkewedRaster() throws FactoryException, TransformException {
// Testing with skewed raster
GridCoverage2D emptyRaster = RasterConstructors.makeEmptyRaster(1, 12, 13, 240, -193, 2, 1.5, 3, 2, 0);
String actual = Functions.asWKT(PixelFunctions.getPixelAsPoint(emptyRaster, 3, 3));
String expected = "POINT (250 -186)";
assertEquals(expected, actual);
}

@Test
public void testPixelAsPointsOutputSize() throws FactoryException, TransformException {
GridCoverage2D raster = RasterConstructors.makeEmptyRaster(1, 5, 10, 123, -230, 8);
List<PixelRecord> points = PixelFunctions.getPixelAsPoints(raster, 1);
assertEquals(50, points.size());
}

@Test
public void testPixelAsPointsValues() throws FactoryException, TransformException {
GridCoverage2D emptyRaster = RasterConstructors.makeEmptyRaster(1, 5, 10, 123, -230, 8);
List<PixelRecord> points = PixelFunctions.getPixelAsPoints(emptyRaster, 1);

PixelRecord point1 = points.get(0);
Geometry geom1 = (Geometry) point1.geom;
assertEquals(123, geom1.getCoordinate().x, FP_TOLERANCE);
assertEquals(-230, geom1.getCoordinate().y, FP_TOLERANCE);
assertEquals(0.0, point1.value, FP_TOLERANCE);

PixelRecord point2 = points.get(22);
Geometry geom2 = (Geometry) point2.geom;
assertEquals(139, geom2.getCoordinate().x, FP_TOLERANCE);
assertEquals(-262, geom2.getCoordinate().y, FP_TOLERANCE);
assertEquals(0.0, point2.value, FP_TOLERANCE);
}

@Test
public void testPixelAsPointsCustomSRIDPlanar() throws FactoryException, TransformException {
int srid = 3857;
GridCoverage2D emptyRaster = RasterConstructors.makeEmptyRaster(1, 5, 5, -123, 54, 5, 5, 0, 0, srid);
List<PixelRecord> points = PixelFunctions.getPixelAsPoints(emptyRaster, 1);
PixelRecord point1 = points.get(0);
Geometry geom1 = (Geometry) point1.geom;
assertEquals(-123, geom1.getCoordinate().x, FP_TOLERANCE);
assertEquals(54, geom1.getCoordinate().y, FP_TOLERANCE);
assertEquals(srid, geom1.getSRID());
}

@Test
public void testPixelAsPointsSRIDSpherical() throws FactoryException, TransformException {
int srid = 4326;
GridCoverage2D emptyRaster = RasterConstructors.makeEmptyRaster(1, 5, 10, -123, 54, 5, -10, 0, 0, srid);
List<PixelRecord> points = PixelFunctions.getPixelAsPoints(emptyRaster, 1);

PixelRecord point1 = points.get(11);
Geometry geom1 = (Geometry) point1.geom;
assertEquals(-118, geom1.getCoordinate().x, FP_TOLERANCE);
assertEquals(34, geom1.getCoordinate().y, FP_TOLERANCE);
assertEquals(srid, geom1.getSRID());
}

@Test
public void testPixelAsPointsFromRasterFile() throws IOException, TransformException, FactoryException {
GridCoverage2D raster = rasterFromGeoTiff(resourceFolder + "raster/test1.tiff");
List<PixelRecord> points = PixelFunctions.getPixelAsPoints(raster, 1);
PixelRecord firstPoint = points.get(0);
Geometry firstGeom = (Geometry) firstPoint.geom;

double expectedX = -1.3095818E7;
double expectedY = 4021262.75;
double val = 0.0;

assertEquals(expectedX, firstGeom.getCoordinate().x, FP_TOLERANCE);
assertEquals(expectedY, firstGeom.getCoordinate().y, FP_TOLERANCE);
assertEquals(val, firstPoint.value, FP_TOLERANCE);
}

@Test
public void testPixelAsPointsSkewedRaster() throws FactoryException, TransformException {
// Testing with skewed raster
GridCoverage2D emptyRaster = RasterConstructors.makeEmptyRaster(1, 12, 13, 240, -193, 2, 1.5, 3, 2, 0);
List<PixelRecord> points = PixelFunctions.getPixelAsPoints(emptyRaster, 1);

PixelRecord point1 = points.get(26);
Geometry geom1 = (Geometry) point1.geom;
String expected = "POINT (250 -186)";
assertEquals(expected, geom1.toString());
}

@Test
public void values() throws TransformException {
// The function 'value' is implemented using 'values'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class RasterTestBase {

protected static final String resourceFolder = System.getProperty("user.dir") + "/../spark/common/src/test/resources/";

protected static final double FP_TOLERANCE = 1E-4;

GridCoverage2D oneBandRaster;
GridCoverage2D multiBandRaster;
byte[] geoTiff;
Expand Down
48 changes: 48 additions & 0 deletions docs/api/sql/Raster-operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,54 @@ Output:
IndexOutOfBoundsException: Specified pixel coordinates (6, 2) do not lie in the raster
```

### RS_PixelAsPoints
Introduction: Returns a list of the pixel's upper-left corner point geometry, the pixel value and its raster X and Y coordinates for each pixel in the raster at the specified band.

Format: `RS_PixelAsPoints(raster: Raster, band: Integer)`

Since: `v1.5.1`

Spark SQL Example:
```sql
SELECT ST_AsText(RS_PixelAsPoints(raster, 1)) from rasters
```

Output:
```
[[POINT (-13065223 4021262.75),148.0,0,0], [POINT (-13065150 4021262.75),123.0,0,1], [POINT (-13065078 4021262.75),99.0,1,0], [POINT (-13065006 4021262.75),140.0,1,1]]
```


Spark SQL example for extracting Point, value, raster x and y coordinates:

```scala
val pointDf = sedona.read...
val rasterDf = sedona.read.format("binaryFile").load("/some/path/*.tiff")
var df = sedona.read.format("binaryFile").load("/some/path/*.tiff")
df = df.selectExpr("RS_FromGeoTiff(content) as raster")

df.selectExpr(
"explode(RS_PixelAsPoints(raster, 1)) as exploded"
).selectExpr(
"exploded.geom as geom",
"exploded.value as value",
"exploded.x as x",
"exploded.y as y"
).show(3)
```

Output:

```
+--------------------------------------+-----+---+---+
|geom |value|x |y |
+--------------------------------------+-----+---+---+
|POINT (-13095818 4021262.75) |0.0 |1 |1 |
|POINT (-13095745.67138728 4021262.75) |0.0 |2 |1 |
|POINT (-13095673.342774557 4021262.75)|0.0 |3 |1 |
+--------------------------------------+-----+---+---+
```

### RS_PixelAsPolygon

Introduction: Returns a polygon geometry that bounds the specified pixel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ object Catalog {
function[RS_Rotation](),
function[RS_GeoTransform](),
function[RS_PixelAsPoint](),
function[RS_PixelAsPoints](),
function[RS_PixelAsPolygon](),
function[RS_PixelAsCentroid](),
function[RS_Count](),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@
*/
package org.apache.spark.sql.sedona_sql.expressions.raster

import org.apache.sedona.common.raster.PixelFunctions
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.sedona.common.raster.{PixelFunctions}
import org.apache.sedona.sql.utils.GeometrySerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.sedona_sql.UDT.{GeometryUDT, RasterUDT}
import org.apache.spark.sql.sedona_sql.expressions.InferrableFunctionConverter._
import org.apache.spark.sql.sedona_sql.expressions.InferredExpression
import org.apache.spark.sql.sedona_sql.expressions.raster.implicits.RasterInputExpressionEnhancer
import org.apache.spark.sql.types.{AbstractDataType, ArrayType, DataType, DoubleType, IntegerType, StructType}

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

case class RS_Value(inputExpressions: Seq[Expression]) extends InferredExpression(
inferrableFunction2(PixelFunctions.value), inferrableFunction3(PixelFunctions.value), inferrableFunction4(PixelFunctions.value)
Expand All @@ -37,6 +46,45 @@ case class RS_PixelAsPoint(inputExpressions: Seq[Expression]) extends InferredEx
}
}

case class RS_PixelAsPoints(inputExpressions: Seq[Expression])
extends Expression with CodegenFallback with ExpectsInputTypes {

override def nullable: Boolean = true

override def dataType: DataType = ArrayType(new StructType()
.add("geom", GeometryUDT)
.add("value", DoubleType)
.add("x", IntegerType)
.add("y", IntegerType))

override def eval(input: InternalRow): Any = {
val rasterGeom = inputExpressions(0).toRaster(input)
val band = inputExpressions(1).eval(input).asInstanceOf[Int]

if (rasterGeom == null) {
null
} else {
val pixelRecords = PixelFunctions.getPixelAsPoints(rasterGeom, band)
val rows = pixelRecords.map { pixelRecord =>
val serializedGeom = GeometrySerializer.serialize(pixelRecord.geom)
val rowArray = Array[Any](serializedGeom, pixelRecord.value, pixelRecord.colX, pixelRecord.rowY)
InternalRow.fromSeq(rowArray)
}
new GenericArrayData(rows.toArray)
}
}

override def children: Seq[Expression] = inputExpressions

protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): RS_PixelAsPoints = {
copy(inputExpressions = newChildren)
}

override def inputTypes: Seq[AbstractDataType] = Seq(RasterUDT, IntegerType)
}



case class RS_PixelAsPolygon(inputExpressions: Seq[Expression]) extends InferredExpression(PixelFunctions.getPixelAsPolygon _) {
protected def withNewChildrenInternal(newChilren: IndexedSeq[Expression]) = {
copy(inputExpressions = newChilren)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.sedona.sql

import org.apache.sedona.common.raster.MapAlgebra
import org.apache.sedona.common.utils.RasterUtils
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{collect_list, expr}
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.functions.{col, collect_list, expr}
import org.geotools.coverage.grid.GridCoverage2D
import org.junit.Assert.{assertEquals, assertNull, assertTrue}
import org.locationtech.jts.geom.{Coordinate, Geometry}
import org.locationtech.jts.geom.{Coordinate, Geometry, Point}
import org.scalatest.{BeforeAndAfter, GivenWhenThen}

import java.awt.image.DataBuffer
Expand Down Expand Up @@ -931,6 +931,34 @@ class rasteralgebraTest extends TestBaseScala with BeforeAndAfter with GivenWhen
assertEquals(expectedY, actualCoordinates.y, 1e-5)
}

it("Passed RS_PixelAsPoints with empty raster") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test with Spark explode function and make sure the output of this function can work seamlessly with explode.

val widthInPixel = 5
val heightInPixel = 10
val upperLeftX = 123.19
val upperLeftY = -12
val cellSize = 4
val numBands = 2
val result = sparkSession.sql(s"SELECT RS_PixelAsPoints(RS_MakeEmptyRaster($numBands, $widthInPixel, $heightInPixel, $upperLeftX, $upperLeftY, $cellSize), 1)").first().getList(0);
val expected = "[POINT (127.19000244140625 -12),0.0,2,1]"
assertEquals(expected, result.get(1).toString)
}

it("Passed RS_PixelAsPoints with raster") {
var df = sparkSession.read.format("binaryFile").load(resourceFolder + "raster/test1.tiff")
df = df.selectExpr("RS_FromGeoTiff(content) as raster")

var result = df.selectExpr(
"explode(RS_PixelAsPoints(raster, 1)) as exploded"
).selectExpr(
"exploded.geom as geom",
"exploded.value as value",
"exploded.x as x",
"exploded.y as y"
)

assert(result.count() == 264704)
}

it("Passed RS_PixelAsPolygon with raster") {
val widthInPixel = 5
val heightInPixel = 10
Expand Down
Loading