Skip to content

Commit

Permalink
Hyparquet table provider
Browse files Browse the repository at this point in the history
  • Loading branch information
platypii committed Jun 6, 2024
1 parent cdc43b9 commit 5916823
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 38 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ package-lock.json
node_modules
*.tgz
.vscode
*.parquet
7 changes: 7 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
The MIT License (MIT)

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"eslint-plugin-import": "2.29.1",
"eslint-plugin-jsdoc": "48.2.7",
"hightable": "0.2.1",
"hyparquet": "0.9.8",
"react": "18.3.1",
"react-dom": "18.3.1",
"rollup": "4.18.0",
Expand Down
4 changes: 2 additions & 2 deletions public/bundle.min.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion public/bundle.min.js.map

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion public/styles.css
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ main {
margin: 0;
}
.nav a {
color: #ddd;
color: #fff;
cursor: pointer;
display: block;
opacity: 0.85;
padding: 10px 12px;
text-decoration: none;
user-select: none;
Expand Down
27 changes: 10 additions & 17 deletions src/File.tsx
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import HighTable, { DataFrame } from 'hightable'
import React, { useState } from 'react'
import React, { useEffect, useState } from 'react'
import type { FileContent } from './files.js'
import Layout, { Spinner } from './Layout.js'
import { parquetDataFrame } from './tableProvider.js'

/**
* File viewer page
*/
export default function File() {
const [error, setError] = useState<Error>()
const [dataframe, setDataframe] = useState<DataFrame>()

// File path from url
const path = location.pathname.split('/')
Expand All @@ -17,21 +19,12 @@ export default function File() {
const [loading, setLoading] = useState(false)
const [content, setContent] = useState<FileContent<void>>()

const header = ['ID', 'Name', 'Age', 'UUID', 'JSON']
const dataframe: DataFrame = {
header,
numRows: 10000,
async rows(start: number, end: number) {
const arr = []
for (let i = start; i < end; i++) {
const uuid = Math.random().toString(16).substring(2)
const row = [i + 1, 'Name' + i, 20 + i, uuid]
const object = Object.fromEntries(header.map((key, index) => [key, row[index]]))
arr.push([...row, object])
}
return arr
},
}
useEffect(() => {
parquetDataFrame('/api/store/get?key=' + prefix)
.then(setDataframe)
.catch(setError)
.finally(() => setLoading(false))
}, [])

return (
<Layout error={error} title={prefix}>
Expand All @@ -44,7 +37,7 @@ export default function File() {
</div>
</nav>

<HighTable data={dataframe} />
{dataframe && <HighTable data={dataframe} />}

{loading && <Spinner className='center' />}
</Layout>
Expand Down
96 changes: 79 additions & 17 deletions src/serve.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
*/

import { exec } from 'child_process'
import { createReadStream } from 'fs'
import fs from 'fs/promises'
import http from 'http'
import path from 'path'
import url from 'url'
import zlib from 'zlib'

const serveDirectory = 'public'
import { pipe, readStreamToReadableStream } from './streamConverters.js'

/** @type {Object<string, string>} */
const mimeTypes = {
Expand All @@ -19,8 +19,9 @@ const mimeTypes = {
'.json': 'application/json',
'.map': 'application/json',
'.ico': 'image/x-icon',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.parquet': 'application/x-parquet',
'.png': 'image/png',
'.svg': 'image/svg+xml',
'.txt': 'text/plain',
'.ttf': 'font/ttf',
Expand All @@ -34,7 +35,8 @@ const mimeTypes = {

/**
* Route an http request
* @typedef {{ status: number, content: string | Buffer, contentType?: string }} ServeResult
* @typedef {Object} ReadableStream
* @typedef {{ status: number, content: string | Buffer | ReadableStream, contentLength?: number, contentType?: string }} ServeResult
* @param {http.IncomingMessage} req
* @returns {Awaitable<ServeResult>}
*/
Expand All @@ -48,15 +50,24 @@ function handleRequest(req) {
return { status: 301, content: '/files/' }
} else if (pathname.startsWith('/files/')) {
// serve index.html
return handleStatic('/index.html')
return handleStatic('/public/index.html')
} else if (pathname.startsWith('/public/')) {
// serve static files
return handleStatic(pathname.slice(7))
return handleStatic(pathname)
} else if (pathname === '/api/store/list') {
// serve file list
const prefix = parsedUrl.query.prefix || ''
if (Array.isArray(prefix)) return { status: 400, content: 'bad request' }
return handleListing(prefix)
} else if (pathname === '/api/store/get') {
// serve file content
const key = parsedUrl.query.key || ''
if (Array.isArray(key)) return { status: 400, content: 'bad request' }
if (req.method === 'HEAD') {
return handleHead(key)
}
const range = req.method === 'HEAD' ? '0-0' : req.headers.range
return handleStatic(key, range)
} else {
return { status: 404, content: 'not found' }
}
Expand All @@ -65,26 +76,67 @@ function handleRequest(req) {
/**
* Serve static file from the serve directory
* @param {string} pathname
* @param {string} [range]
* @returns {Promise<ServeResult>}
*/
async function handleStatic(pathname) {
const filePath = path.join(process.cwd(), serveDirectory, pathname)
async function handleStatic(pathname, range) {
const filePath = path.join(process.cwd(), pathname)
const stats = await fs.stat(filePath).catch(() => undefined)
if (!stats || !stats.isFile()) {
return { status: 404, content: 'not found' }
}

// detect content type
const extname = path.extname(filePath)
if (!mimeTypes[extname]) {
console.error(`serving unknown mimetype ${extname}`)
}
if (!mimeTypes[extname]) console.error(`serving unknown mimetype ${extname}`)
const contentType = mimeTypes[extname] || 'application/octet-stream'

// ranged requests
if (range) {
const [unit, ranges] = range.split('=')
if (unit === 'bytes') {
const [start, end] = ranges.split('-').map(Number)

// convert fs.ReadStream to web stream
const fsStream = createReadStream(filePath, { start, end })
const content = readStreamToReadableStream(fsStream)
const contentLength = end - start + 1

return {
status: 206,
content,
contentLength,
contentType,
}
}
}

const content = await fs.readFile(filePath)
return { status: 200, content, contentType }
}

/**
* Serve head request
* @param {string} pathname
* @returns {Promise<ServeResult>}
*/
async function handleHead(pathname) {
const filePath = path.join(process.cwd(), pathname)
const stats = await fs.stat(filePath).catch(() => undefined)
if (!stats || !stats.isFile()) {
console.error(`file not found ${filePath}`)
return { status: 404, content: 'not found' }
}
const contentLength = stats.size

// detect content type
const extname = path.extname(filePath)
if (!mimeTypes[extname]) console.error(`serving unknown mimetype ${extname}`)
const contentType = mimeTypes[extname] || 'application/octet-stream'

return { status: 200, content: '', contentLength, contentType }
}

/**
* List files from local storage
*
Expand Down Expand Up @@ -140,12 +192,15 @@ export function serve(port = 2048) {
} catch (err) {
console.error('error handling request', err)
}
let { status, content, contentType } = result
let { status, content } = result

// write http header
/** @type {http.OutgoingHttpHeaders} */
const headers = { 'Connection': 'keep-alive' }
if (contentType) headers['Content-Type'] = contentType
if (result.contentLength !== undefined) {
headers['Content-Length'] = result.contentLength
}
if (result.contentType) headers['Content-Type'] = result.contentType
if (status === 301 && typeof content === 'string') {
// handle redirect
headers['Location'] = content
Expand All @@ -158,13 +213,20 @@ export function serve(port = 2048) {
content = gzipped
}
res.writeHead(status, headers)

// write http response
res.end(content)
if (content instanceof Buffer || typeof content === 'string') {
res.end(content)
} else if (content instanceof ReadableStream) {
pipe(content, res)
}

// log request
const endTime = new Date()
const ms = endTime.getTime() - startTime.getTime()
const line = `${endTime.toISOString()} ${status} ${req.method} ${req.url} ${content.length} ${ms}ms`
// @ts-expect-error contentLength will exist if content is ReadableStream
const length = result.contentLength || content.length || 0
const line = `${endTime.toISOString()} ${status} ${req.method} ${req.url} ${length} ${ms}ms`
if (status < 400) {
console.log(line)
} else {
Expand All @@ -180,11 +242,11 @@ export function serve(port = 2048) {
/**
* If the request accepts gzip, compress the content, else undefined
* @param {http.IncomingMessage} req
* @param {string | Buffer} content
* @param {string | Buffer | ReadableStream} content
* @returns {Buffer | undefined}
*/
function gzip(req, content) {
if (!content) return undefined
if (!(content instanceof Buffer) || !(typeof content === 'string')) return undefined
const acceptEncoding = req.headers['accept-encoding']
if (acceptEncoding?.includes('gzip')) {
return zlib.gzipSync(content)
Expand Down
53 changes: 53 additions & 0 deletions src/streamConverters.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Pipe a web ReadableStream to a node Writable.
* @typedef {import('stream').Writable} Writable
* @param {ReadableStream} input
* @param {Writable} output
* @returns {Promise<void>}
*/
export async function pipe(input, output) {
// TODO: typescript hates for-await? should just be:
// for await (const chunk of input) {}
const reader = input.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
output.write(value)
}
output.end()
}

/**
* Convert a node fs ReadStream to a web ReadableStream.
* @typedef {import('fs').ReadStream} ReadStream
* @param {ReadStream} fsStream
* @returns {ReadableStream}
*/
export function readStreamToReadableStream(fsStream) {
return new ReadableStream({
start(/** @type {ReadableStreamDefaultController} */ controller) {
fsStream.on('data', (chunk) => controller.enqueue(chunk))
fsStream.on('end', () => controller.close())
fsStream.on('error', (error) => controller.error(error))
},
cancel() {
fsStream.destroy()
},
})
}

/**
* Convert a web ReadableStream to ArrayBuffer.
* @param {ReadableStream} input
* @returns {Promise<ArrayBuffer>}
*/
export async function readableStreamToArrayBuffer(input) {
const reader = input.getReader()
const /** @type {Uint8Array[]} */ chunks = []
while (true) {
const { done, value } = await reader.read()
if (done) break
chunks.push(value)
}
return new Blob(chunks).arrayBuffer()
}
Loading

0 comments on commit 5916823

Please sign in to comment.