Skip to content

Commit

Permalink
#init unstructured data parser:.pdf/ .html/ .image/ ./docx/ .pptx
Browse files Browse the repository at this point in the history
  • Loading branch information
tianyao-0315 committed Apr 18, 2024
1 parent 976df6d commit 711e57f
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class JsonSave extends ConfigurableStop{
.description("The save path of the json file")
.defaultValue("")
.required(true)
.example("hdfs://192.168.3.138:8020/work/testJson/test/")
.example("/test/test.json")

descriptor = jsonSavePath :: descriptor
descriptor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil, ProcessUtil}
import cn.piflow.conf.{ConfigurableStop, Port}
import cn.piflow.util.UnstructuredUtils
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
import com.alibaba.fastjson2.{JSON, JSONArray}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -48,27 +49,47 @@ class DocxParser extends ConfigurableStop {
curlCommandParams += "accept: application/json"
curlCommandParams += "-H"
curlCommandParams += "Content-Type: multipart/form-data"
var fileListSize = 0;
if ("hdfs".equals(fileSource)) {
val fileList = UnstructuredUtils.getLocalFilePaths(localDir)
fileListSize = fileList.size
fileList.foreach { path =>
curlCommandParams += "-F"
curlCommandParams += s"files=@$path"
}
}
if ("nfs".equals(fileSource)) {
val fileList = UnstructuredUtils.getLocalFilePaths(filePath)
fileListSize = fileList.size
fileList.foreach { path =>
curlCommandParams += "-F"
curlCommandParams += s"files=@$path"
}
}
val (output, error): (String, String) = ProcessUtil.executeCommand(curlCommandParams.toSeq)
if (output.nonEmpty) {
println(output)
val jsonRDD = spark.sparkContext.parallelize(Seq(output))
val df = spark.read.json(jsonRDD)
df.show(10)
out.write(df)
// println(output)
import spark.implicits._
if (fileListSize > 1) {
val array: JSONArray = JSON.parseArray(output)
var combinedDF: DataFrame = null
array.forEach {
o =>
val jsonString = o.toString
val df = spark.read.json(Seq(jsonString).toDS)
if (combinedDF == null) {
combinedDF = df
} else {
combinedDF = combinedDF.union(df)
}
}
combinedDF.show(10)
out.write(combinedDF)
} else {
val df = spark.read.json(Seq(output).toDS())
df.show(10)
out.write(df)
}
} else {
println(s"########## Exception: $error")
throw new Exception(s"########## Exception: $error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil, ProcessUtil}
import cn.piflow.conf.{ConfigurableStop, Port}
import cn.piflow.util.UnstructuredUtils
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
import com.alibaba.fastjson2.{JSON, JSONArray}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -48,27 +49,47 @@ class HtmlParser extends ConfigurableStop {
curlCommandParams += "accept: application/json"
curlCommandParams += "-H"
curlCommandParams += "Content-Type: multipart/form-data"
var fileListSize = 0;
if ("hdfs".equals(fileSource)) {
val fileList = UnstructuredUtils.getLocalFilePaths(localDir)
fileListSize = fileList.size
fileList.foreach { path =>
curlCommandParams += "-F"
curlCommandParams += s"files=@$path"
}
}
if ("nfs".equals(fileSource)) {
val fileList = UnstructuredUtils.getLocalFilePaths(filePath)
fileListSize = fileList.size
fileList.foreach { path =>
curlCommandParams += "-F"
curlCommandParams += s"files=@$path"
}
}
val (output, error): (String, String) = ProcessUtil.executeCommand(curlCommandParams.toSeq)
if (output.nonEmpty) {
println(output)
val jsonRDD = spark.sparkContext.parallelize(Seq(output))
val df = spark.read.json(jsonRDD)
df.show(10)
out.write(df)
// println(output)
import spark.implicits._
if (fileListSize > 1) {
val array: JSONArray = JSON.parseArray(output)
var combinedDF: DataFrame = null
array.forEach {
o =>
val jsonString = o.toString
val df = spark.read.json(Seq(jsonString).toDS)
if (combinedDF == null) {
combinedDF = df
} else {
combinedDF = combinedDF.union(df)
}
}
combinedDF.show(10)
out.write(combinedDF)
} else {
val df = spark.read.json(Seq(output).toDS())
df.show(10)
out.write(df)
}
} else {
println(s"########## Exception: $error")
throw new Exception(s"########## Exception: $error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil, ProcessUtil}
import cn.piflow.conf.{ConfigurableStop, Port}
import cn.piflow.util.UnstructuredUtils
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
import com.alibaba.fastjson2.{JSON, JSONArray}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -54,27 +55,47 @@ class ImageParser extends ConfigurableStop {
curlCommandParams += s"strategy=$strategy"
curlCommandParams += "-F"
curlCommandParams += "hi_res_model_name=detectron2_lp"
var fileListSize = 0;
if ("hdfs".equals(fileSource)) {
val fileList = UnstructuredUtils.getLocalFilePaths(localDir)
fileListSize = fileList.size
fileList.foreach { path =>
curlCommandParams += "-F"
curlCommandParams += s"files=@$path"
}
}
if ("nfs".equals(fileSource)) {
val fileList = UnstructuredUtils.getLocalFilePaths(filePath)
fileListSize = fileList.size
fileList.foreach { path =>
curlCommandParams += "-F"
curlCommandParams += s"files=@$path"
}
}
val (output, error): (String, String) = ProcessUtil.executeCommand(curlCommandParams.toSeq)
if (output.nonEmpty) {
println(output)
val jsonRDD = spark.sparkContext.parallelize(Seq(output))
val df = spark.read.json(jsonRDD)
df.show(10)
out.write(df)
// println(output)
import spark.implicits._
if (fileListSize > 1) {
val array: JSONArray = JSON.parseArray(output)
var combinedDF: DataFrame = null
array.forEach {
o =>
val jsonString = o.toString
val df = spark.read.json(Seq(jsonString).toDS)
if (combinedDF == null) {
combinedDF = df
} else {
combinedDF = combinedDF.union(df)
}
}
combinedDF.show(10)
out.write(combinedDF)
} else {
val df = spark.read.json(Seq(output).toDS())
df.show(10)
out.write(df)
}
} else {
println(s"########## Exception: $error")
throw new Exception(s"########## Exception: $error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil, ProcessUtil}
import cn.piflow.conf.{ConfigurableStop, Port}
import cn.piflow.util.UnstructuredUtils
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.{DataFrame, SparkSession, functions}
import org.apache.spark.sql.types.{ArrayType, MapType, StringType, StructField, StructType}
import com.alibaba.fastjson2.{JSON, JSONArray}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -77,37 +77,28 @@ class PdfParser extends ConfigurableStop {
}
val (output, error): (String, String) = ProcessUtil.executeCommand(curlCommandParams.toSeq)
if (output.nonEmpty) {
println(output)

// println(output)
import spark.implicits._
if (fileListSize > 1) {
val schema1 = ArrayType(StructType(Array(
StructField("type", StringType, true),
StructField("element_id", StringType, true),
StructField("text", StringType, true),
StructField("metadata", MapType(StringType, StringType), true)
)))
val df = Seq(output).toDS().toDF("json").withColumn("data", functions.from_json($"json", schema1))
// val df = spark.read.json(Seq(output).toDS())
// .withColumn("data", functions.explode($"value"))
// .select("data.*")
df.show(10)
out.write(df)
val array: JSONArray = JSON.parseArray(output)
var combinedDF: DataFrame = null
array.forEach {
o =>
val jsonString = o.toString
val df = spark.read.json(Seq(jsonString).toDS)
if (combinedDF == null) {
combinedDF = df
} else {
combinedDF = combinedDF.union(df)
}
}
combinedDF.show(10)
out.write(combinedDF)
} else {
// val schema2 = new StructType()
// .add("type", StringType)
// .add("element_id", StringType)
// .add("text", StringType)
// .add("metadata", MapType(StringType, StringType))
// val df = Seq(output).toDS().toDF("json").withColumn("data", functions.from_json($"json", schema2)).
val df = spark.read.json(Seq(output).toDS())
df.show(10)
out.write(df)
}
// val jsonRDD = spark.sparkContext.parallelize(Seq(output))
// val df = spark.read.json(jsonRDD)
// df.show(10)
// out.write(df)
} else {
println(s"########## Exception: $error")
throw new Exception(s"########## Exception: $error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import cn.piflow.conf.util.{ImageUtil, MapUtil, ProcessUtil}
import cn.piflow.conf.{ConfigurableStop, Port}
import cn.piflow.util.UnstructuredUtils
import cn.piflow.{JobContext, JobInputStream, JobOutputStream, ProcessContext}
import org.apache.spark.sql.SparkSession
import com.alibaba.fastjson2.{JSON, JSONArray}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -48,27 +49,47 @@ class PptxParser extends ConfigurableStop {
curlCommandParams += "accept: application/json"
curlCommandParams += "-H"
curlCommandParams += "Content-Type: multipart/form-data"
var fileListSize = 0;
if ("hdfs".equals(fileSource)) {
val fileList = UnstructuredUtils.getLocalFilePaths(localDir)
fileListSize = fileList.size
fileList.foreach { path =>
curlCommandParams += "-F"
curlCommandParams += s"files=@$path"
}
}
if ("nfs".equals(fileSource)) {
val fileList = UnstructuredUtils.getLocalFilePaths(filePath)
fileListSize = fileList.size
fileList.foreach { path =>
curlCommandParams += "-F"
curlCommandParams += s"files=@$path"
}
}
val (output, error): (String, String) = ProcessUtil.executeCommand(curlCommandParams.toSeq)
if (output.nonEmpty) {
println(output)
val jsonRDD = spark.sparkContext.parallelize(Seq(output))
val df = spark.read.json(jsonRDD)
df.show(10)
out.write(df)
// println(output)
import spark.implicits._
if (fileListSize > 1) {
val array: JSONArray = JSON.parseArray(output)
var combinedDF: DataFrame = null
array.forEach {
o =>
val jsonString = o.toString
val df = spark.read.json(Seq(jsonString).toDS)
if (combinedDF == null) {
combinedDF = df
} else {
combinedDF = combinedDF.union(df)
}
}
combinedDF.show(10)
out.write(combinedDF)
} else {
val df = spark.read.json(Seq(output).toDS())
df.show(10)
out.write(df)
}
} else {
println(s"########## Exception: $error")
throw new Exception(s"########## Exception: $error")
Expand Down
Loading

0 comments on commit 711e57f

Please sign in to comment.