From f3e8d0e9198506fe80dddfb4cfd0491be3f19050 Mon Sep 17 00:00:00 2001 From: jabadji Date: Wed, 6 Apr 2022 17:37:07 +0200 Subject: [PATCH 1/2] feat(avro): add simple avro writer --- src/io/writer/mod.rs | 1 + src/io/writer/writer_doc_avro.rs | 218 +++++++++++++++++++++++++++++++ 2 files changed, 219 insertions(+) create mode 100644 src/io/writer/writer_doc_avro.rs diff --git a/src/io/writer/mod.rs b/src/io/writer/mod.rs index 785c4d2c..cb6a1804 100644 --- a/src/io/writer/mod.rs +++ b/src/io/writer/mod.rs @@ -13,6 +13,7 @@ mod metawriter; mod textwriter; pub mod writer; mod writer_doc; +mod writer_doc_avro; mod writertrait; use metawriter::MetaWriter; use textwriter::TextWriter; diff --git a/src/io/writer/writer_doc_avro.rs b/src/io/writer/writer_doc_avro.rs new file mode 100644 index 00000000..e3c5533b --- /dev/null +++ b/src/io/writer/writer_doc_avro.rs @@ -0,0 +1,218 @@ +//! Avro version of [writer_doc::DocWriter]. + +use std::io::Write; + +use avro_rs::{Codec, Schema, Writer}; +use serde::Serialize; +use structopt::lazy_static::lazy_static; + +use crate::{error::Error, pipelines::oscardoc::types::Document}; + +use super::WriterTrait; + +lazy_static! { + static ref SCHEMA: Schema = { + + // schema of Identification struct + let identification_schema = r#" + {"name":"identification", "type":"record", "fields": [ + {"name": "label", "type":"string"}, + {"name": "prob", "type":"float"} + ]} +"#; + // schema of Metadata struct + let metadata_schema = r#" +{ + "type":"record", + "name":"metadata_record", + "fields":[ + {"name":"identification", "type":"identification"}, + {"name":"annotation", "type":["null", {"type": "array", "items":"string"}]}, + {"name": "sentence_identifications", "type":"array", "items":[ + "null", + "identification" + ]} + ] +} +"#; + +// let warc_metadata = r#" +// { +// "type": "record", +// "name": "warc_record", +// "fields": [ +// {"name": "warc-refers-to", "type": "string"}, +// {"name": "warc-date", "type": "string"}, +// {"name": "warc-block-digest", "type": "string"}, +// {"name": "warc-type", "type": "string"}, +// {"name": "warc-identified-content-language", "type": "string"}, +// {"name": "content-length", "type": "long"}, +// {"name": "warc-target-uri", "type": "string"}, +// {"name": "warc-record-id", "type": "string"}, +// {"name": "content-type", "type": "string"} +// ] +// } +// "#; +let warc_metadata = r#" +{ + "type": "map", + "values": "string", + "name": "warc_record", + "default": {} +} +"#; + +let document_schema = r#" +{ + "type":"record", + "name":"document", + "fields": [ + {"name": "content", "type": "string"}, + {"name":"warc_headers", "type": "warc_record"}, + {"name":"metadata", "type": "metadata_record"} + ] +} + +"#; + +// let corpus_schema = r#"{ +// "name":"corpus", +// "type": "array", +// "items":"document" +// }"#; + + // schema of ShardResult struct + Schema::parse_list(&[ + identification_schema, + metadata_schema, + warc_metadata, + document_schema, + // corpus_schema, + ]) + .unwrap().last().unwrap() + .clone() + }; +} +struct DocWriterAvro<'a, T> +where + T: Write, +{ + schema: &'a Schema, + writer: Writer<'a, T>, +} + +impl<'a, T> DocWriterAvro<'a, T> +where + T: Write, +{ + /// Create a new avro writer from shema, writer and a specified codec. + fn new(schema: &'a Schema, writer: T, codec: Codec) -> Self { + let avro_writer = avro_rs::Writer::with_codec(schema, writer, codec); + Self { + schema, + writer: avro_writer, + } + } + + pub fn append_ser(&mut self, val: &S) -> Result { + self.writer.append_ser(val).map_err(|e| e.into()) + } + + pub fn flush(&mut self) -> Result { + self.writer.flush().map_err(|e| e.into()) + } + + pub fn schema(&self) -> &Schema { + self.writer.schema() + } +} + +impl<'a, T> WriterTrait for DocWriterAvro<'a, T> +where + T: Write, +{ + type Item = Document; + + fn new( + dst: &std::path::Path, + lang: &'static str, + max_file_size: Option, + ) -> Result + where + Self: Sized, + { + todo!() + } + + fn write(&mut self, vals: Vec) -> Result<(), crate::error::Error> { + todo!() + } + + fn write_single(&mut self, val: &Self::Item) -> Result<(), crate::error::Error> { + todo!() + } + + fn close_meta(&mut self) -> Result<(), crate::error::Error> { + todo!() + } +} + +#[cfg(test)] +mod test { + use std::{collections::HashMap, io::Cursor}; + + use avro_rs::Codec; + use warc::{EmptyBody, Record, WarcHeader}; + + use crate::{ + identifiers::Identification, + io::writer::WriterTrait, + lang::Lang, + pipelines::oscardoc::types::{Document, Metadata}, + }; + + use super::{DocWriterAvro, SCHEMA}; + + #[test] + fn test_simple() { + // create io buf, get schema + let mut buf = vec![]; + let schema = &SCHEMA; + + // create writer + let mut aw = DocWriterAvro::new(schema, &mut buf, Codec::Null); + + // input docs + let mut documents = vec![]; + + for i in 0..10i32 { + //forge document + let mut content = "foo\nbar\nbaz\nquux".to_string(); + content.push_str(&i.to_string()); + let mut headers = HashMap::new(); + headers.insert(WarcHeader::ContentType, "conversion".as_bytes().to_owned()); + let default_id = Identification::new(Lang::En, 1.0); + let metadata = Metadata::new(&default_id, &vec![Some(default_id.clone()); 3]); + let d = Document::new(content, headers, metadata); + documents.push(d); + } + + // write docs + for doc in &documents { + aw.append_ser(&doc).unwrap(); + } + aw.flush().unwrap(); + + // get from reader + let mut c = Cursor::new(&mut buf); + let r = avro_rs::Reader::new(&mut c).unwrap(); + let mut from_avro = vec![]; + for record in r { + let deserialized: Document = avro_rs::from_value(&record.unwrap()).unwrap(); + from_avro.push(deserialized); + } + + //check equality + assert_eq!(documents, from_avro); + } +} From 14fb3f7b6ef540b12f3278ff0aff7e0aa07278ce Mon Sep 17 00:00:00 2001 From: jabadji Date: Thu, 7 Apr 2022 14:32:54 +0200 Subject: [PATCH 2/2] feat(avro): integrate writer. DO NOT MERGE! --- src/cli.rs | 6 +++++ src/io/langfiles.rs | 28 +++++++++++++++++++- src/io/mod.rs | 1 + src/io/writer/mod.rs | 1 + src/io/writer/writer_doc_avro.rs | 41 ++++++++++++++++++++++++++---- src/main.rs | 2 +- src/pipelines/oscardoc/pipeline.rs | 18 +++++++++---- 7 files changed, 85 insertions(+), 12 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index af7ded8e..d19aecb2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -170,4 +170,10 @@ pub struct Pipeline { help = "Optional path to blocklist." )] pub blocklist: Option, + #[structopt( + long = "format", + help = "corpus output format. ('avro' or 'jsonl')", + default_value = "jsonl" + )] + pub format: String, } diff --git a/src/io/langfiles.rs b/src/io/langfiles.rs index d19f767b..1c0c9548 100644 --- a/src/io/langfiles.rs +++ b/src/io/langfiles.rs @@ -7,6 +7,8 @@ Each language (provided by [crate::lang::LANG]) is given a [self::Writer] wrappe !*/ use std::{ collections::HashMap, + fs::File, + io::Write, path::Path, str::FromStr, sync::{Arc, Mutex}, @@ -16,7 +18,7 @@ use crate::io::writer::Writer; use crate::lang::LANG; use crate::{error, lang::Lang}; -use super::writer::{WriterDoc, WriterTrait}; +use super::writer::{DocWriterAvro, WriterDoc, WriterTrait}; /// Holds references to [Writer]. pub struct LangFiles { writers: HashMap<&'static str, Arc>>, @@ -26,6 +28,30 @@ pub struct LangFilesDoc { writers: HashMap>>, } +pub struct LangFilesAvro<'a> { + writers: HashMap>>>, +} + +impl<'a> LangFilesAvro<'a> { + pub fn new(dst: &Path) -> Result { + let mut writers = HashMap::with_capacity(LANG.len()); + let mut w; + for lang in LANG.iter() { + let mut dst = dst.to_path_buf(); + dst.push(lang); + dst.set_extension("avro"); + w = DocWriterAvro::from_file(&dst)?; + let lang = Lang::from_str(lang)?; + writers.insert(lang, Arc::new(Mutex::new(w))); + } + + Ok(Self { writers }) + } + + pub fn writers(&'a self) -> &HashMap>>> { + &self.writers + } +} impl LangFiles { /// Create a new LangFiles. `part_size_bytes` sets an indication of the maximum size /// by part. diff --git a/src/io/mod.rs b/src/io/mod.rs index d3f654ed..55e06990 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -9,5 +9,6 @@ mod langfiles; pub mod reader; pub mod writer; pub use langfiles::LangFiles; +pub use langfiles::LangFilesAvro; pub use langfiles::LangFilesDoc; pub use writer::Writer; diff --git a/src/io/writer/mod.rs b/src/io/writer/mod.rs index cb6a1804..eb8cb5d2 100644 --- a/src/io/writer/mod.rs +++ b/src/io/writer/mod.rs @@ -19,6 +19,7 @@ use metawriter::MetaWriter; use textwriter::TextWriter; pub use writer::Writer; pub use writer_doc::WriterDoc; +pub(crate) use writer_doc_avro::DocWriterAvro; pub use writertrait::WriterTrait; // pub enum WriterKind { diff --git a/src/io/writer/writer_doc_avro.rs b/src/io/writer/writer_doc_avro.rs index e3c5533b..d870c9a8 100644 --- a/src/io/writer/writer_doc_avro.rs +++ b/src/io/writer/writer_doc_avro.rs @@ -1,8 +1,9 @@ //! Avro version of [writer_doc::DocWriter]. -use std::io::Write; +use std::{fmt::Debug, fs::File, io::Write, path::Path}; use avro_rs::{Codec, Schema, Writer}; +use log::{debug, error}; use serde::Serialize; use structopt::lazy_static::lazy_static; @@ -93,7 +94,7 @@ let document_schema = r#" .clone() }; } -struct DocWriterAvro<'a, T> +pub struct DocWriterAvro<'a, T> where T: Write, { @@ -114,7 +115,16 @@ where } } - pub fn append_ser(&mut self, val: &S) -> Result { + pub fn extend_ser(&mut self, vals: I) -> Result + where + I: IntoIterator, + { + self.writer.extend_ser(vals).map_err(|e| e.into()) + } + pub fn append_ser(&mut self, val: &S) -> Result + where + S: Serialize, + { self.writer.append_ser(val).map_err(|e| e.into()) } @@ -127,6 +137,17 @@ where } } +impl<'a> DocWriterAvro<'a, File> { + pub fn from_file(path: &Path) -> Result { + if path.exists() { + error!("{:?} already exists!", path); + Err(std::io::Error::new(std::io::ErrorKind::AlreadyExists, format!("{path:?}")).into()) + } else { + let fh = File::create(path)?; + Ok(DocWriterAvro::new(&SCHEMA, fh, Codec::Snappy)) + } + } +} impl<'a, T> WriterTrait for DocWriterAvro<'a, T> where T: Write, @@ -145,7 +166,8 @@ where } fn write(&mut self, vals: Vec) -> Result<(), crate::error::Error> { - todo!() + self.extend_ser(&vals)?; + Ok(()) } fn write_single(&mut self, val: &Self::Item) -> Result<(), crate::error::Error> { @@ -191,8 +213,16 @@ mod test { content.push_str(&i.to_string()); let mut headers = HashMap::new(); headers.insert(WarcHeader::ContentType, "conversion".as_bytes().to_owned()); + headers.insert( + WarcHeader::Unknown("warc-identified-language".to_string()), + "fr".as_bytes().to_owned(), + ); let default_id = Identification::new(Lang::En, 1.0); - let metadata = Metadata::new(&default_id, &vec![Some(default_id.clone()); 3]); + let mut metadata = Metadata::new( + &default_id, + &vec![Some(default_id.clone()), Some(default_id.clone()), None], + ); + metadata.set_annotation("adult".to_string()); let d = Document::new(content, headers, metadata); documents.push(d); } @@ -212,6 +242,7 @@ mod test { from_avro.push(deserialized); } + println!("{from_avro:#?}"); //check equality assert_eq!(documents, from_avro); } diff --git a/src/main.rs b/src/main.rs index a363d713..ece8eaa4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -62,7 +62,7 @@ async fn main() -> Result<(), error::Error> { cli::Ungoliant::Pipeline(p) => { let mut schema_filepath = p.dst.clone(); // let p = pipeline::OscarMetadata::new(p.src, p.dst, p.lid_path); - let p = pipelines::OscarDoc::new(p.src, p.dst, p.lid_path, p.blocklist); + let p = pipelines::OscarDoc::new(p.src, p.dst, p.lid_path, p.blocklist, p.format); p.run()?; schema_filepath.push("metadata_schema.json"); diff --git a/src/pipelines/oscardoc/pipeline.rs b/src/pipelines/oscardoc/pipeline.rs index ec9b0800..3d12e4f3 100644 --- a/src/pipelines/oscardoc/pipeline.rs +++ b/src/pipelines/oscardoc/pipeline.rs @@ -41,7 +41,7 @@ use ut1_blocklist::Blocklist; use warc::BufferedBody; use warc::{Record, WarcHeader}; -use crate::io::LangFilesDoc; +use crate::io::{LangFilesAvro, LangFilesDoc}; const DOC_THRESHOLD: f32 = 0.6f32; pub struct OscarDoc { @@ -52,7 +52,13 @@ pub struct OscarDoc { } impl OscarDoc { - pub fn new(src: PathBuf, dst: PathBuf, lid_path: PathBuf, blocklist: Option) -> Self { + pub fn new( + src: PathBuf, + dst: PathBuf, + lid_path: PathBuf, + blocklist: Option, + format: String, + ) -> Self { if blocklist.is_none() { warn!("No blocklist folder specified! No adult content tagging will be done."); } @@ -308,7 +314,7 @@ impl OscarDoc { /// concurrently write documets fn write_documents<'a>( - langfiles: &LangFilesDoc, + langfiles: &'a LangFilesAvro<'a>, avrowriters: &'a RebuildWriters<'a, File>, shard_id: usize, documents: HashMap>, @@ -333,10 +339,11 @@ impl OscarDoc { let sr = ShardResult::new(shard_id as i64, locations, metadata_cloned); // write docs and rebuild files - writer_lock.write(docs)?; + writer_lock.extend_ser(docs)?; avrowriter_lock.append_ser(sr)?; //TODO: not sure that we need the flush + writer_lock.flush()?; avrowriter_lock.flush()?; Ok(()) @@ -385,7 +392,8 @@ impl Pipeline<()> for OscarDoc { // ourselves. let results = results.enumerate().par_bridge(); - let langfiles = LangFilesDoc::new(&self.dst, None)?; + // let langfiles = LangFilesDoc::new(&self.dst, None)?; + let langfiles = LangFilesAvro::new(&self.dst)?; let mut dst_rebuild = self.dst.clone(); dst_rebuild.push("rebuild");