Skip to content

Commit

Permalink
Use LokiJS for tapes (#8)
Browse files Browse the repository at this point in the history
* Add LokiJS base recorder

* Recorder with migrator tests

* Run migrator when possible

* Fix issue

* Fix end call bug
  • Loading branch information
yknx4 authored Mar 5, 2019
1 parent 16017ef commit fb86583
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 28 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@
},
"license": "MIT",
"dependencies": {
"@types/lokijs": "^1.5.2",
"@types/mkdirp": "^0.5.2",
"content-type": "^1.0.4",
"debug": "^4.1.1",
"fast-json-stable-stringify": "^2.0.0",
"incoming-message-hash": "^3.2.2",
"invariant": "^2.2.4",
"lodash.map": "^4.6.0",
"lokijs": "^1.5.6",
"mkdirp": "^0.5.1",
"restify-errors": "^6.1.1",
"xxhashjs": "^0.2.2"
Expand Down
155 changes: 155 additions & 0 deletions src/Recorder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { YakTimeOpts, ensureRecordingIsAllowed, ensureIsValidStatusCode } from './util'
import Loki from 'lokijs'
import { getDB } from './db'
import Debug from 'debug'
import { IncomingMessage, ServerResponse, IncomingHttpHeaders } from 'http'
import { URL } from 'url'
import { h64 } from 'xxhashjs'
import { parse } from 'querystring'
import { buffer } from './buffer'
import { proxy } from './proxy'
import * as curl from './curl'
import { isMatch } from 'lodash'

const debug = Debug('yaktime:recorder')

type Unpacked<T> = T extends (infer U)[] ? U : T extends (...args: any[]) => infer U ? U : T extends Promise<infer U> ? U : T

type SerializedRequest = ReturnType<Recorder['serializeRequest']>
type SerializedResponse = Unpacked<ReturnType<Recorder['serializeResponse']>>
interface FullSerializedRequest extends SerializedRequest {
response: SerializedResponse
}

export class Recorder {
opts: YakTimeOpts
host: string
db: Promise<Loki>
constructor (opts: YakTimeOpts, host: string) {
this.opts = opts
this.host = host
this.db = getDB(opts)
}

serializeRequest (req: IncomingMessage, body: any[]) {
const fullUrl = new URL(req.url as string, this.host)
const { method = '', httpVersion, headers, trailers } = req

const bodyBuffer = Buffer.from(body)
const bodyEncoded = bodyBuffer.toString('base64')
const bodyHash = h64(bodyBuffer, 0).toString(16)

return {
host: fullUrl.host,
path: fullUrl.pathname,
body: bodyEncoded,
bodyHash,
method,
httpVersion,
headers,
trailers,
query: parse(fullUrl.searchParams.toString())
}
}

async serializeResponse (res: IncomingMessage) {
const statusCode = res.statusCode || 200
const headers = res.headers
const body = Buffer.from(await buffer<Buffer>(res)).toString('base64')
const trailers = res.trailers

return {
statusCode,
headers,
body,
trailers
}
}

async respond (storedRes: SerializedResponse, res: ServerResponse) {
res.statusCode = storedRes.statusCode
res.writeHead(storedRes.statusCode, storedRes.headers)
res.addTrailers(storedRes.trailers || {})
console.log(storedRes.body)
res.end(Buffer.from(storedRes.body, 'base64'))
}

async record (req: IncomingMessage, body: Buffer[], host: string, opts: YakTimeOpts) {
ensureRecordingIsAllowed(req, opts)
debug('proxy', req.url)
const pres = await proxy(req, body, host)
debug(curl.response(pres))
ensureIsValidStatusCode(pres, opts)
debug('record', req.url)
const request = this.serializeRequest(req, body)
const response = await this.serializeResponse(pres)
return this.save(request, response)
}

async save (request: SerializedRequest, response: SerializedResponse) {
const db = await this.db
const tapes = db.addCollection<FullSerializedRequest>('tapes', { disableMeta: true })
return tapes.add({ ...request, response })
}

async read (req: IncomingMessage, body: Buffer[]) {
const serializedRequest = this.serializeRequest(req, body)
return this.load(serializedRequest)
}

async load (request: SerializedRequest): Promise<FullSerializedRequest | null> {
const { ignoredQueryFields = [], ignoredHeaders = [] } = this.opts.hasherOptions || {}
const db = await this.db
const tapes = db.addCollection<FullSerializedRequest>('tapes', { disableMeta: true })

const { query: _query, headers: _headers } = request

const query = {
..._query
}
const headers = {
..._headers
}

ignoredQueryFields.forEach(q => delete query[q])
ignoredHeaders.forEach(h => delete headers[h])

const lokiQuery = {
...request,
query,
headers
}

delete query.body

return tapes.where(obj => isMatch(obj, lokiQuery))[0]
}
}

export class DbMigrator {
data: Buffer[] = []
headers: IncomingHttpHeaders = {}
statusCode = 200
setHeader (name: string, value: string) {
this.headers[name] = value
}
write (input: Buffer | string) {
this.data.push(Buffer.isBuffer(input) ? input : Buffer.from(input))
}

end (data?: any) {
if (data != null) {
this.write(data)
}
debug('finished migration')
}

toSerializedResponse (): SerializedResponse {
return {
statusCode: this.statusCode,
headers: this.headers,
body: Buffer.concat(this.data).toString('base64'),
trailers: {}
}
}
}
15 changes: 15 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import Loki from 'lokijs'
import { YakTimeOpts } from './util'
import path from 'path'
import Debug from 'debug'

const debug = Debug('yaktime:db')
let db: Loki
export async function getDB (opts: YakTimeOpts) {
if (db == null) {
const dbPath = path.join(opts.dirname, 'tapes.json')
debug(`Opening db on ${dbPath}`)
db = new Loki(dbPath, { autoload: true, autosave: process.env.NODE_ENV !== 'test' })
}
return db
}
2 changes: 1 addition & 1 deletion src/proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const debug = Debug('yaktime:proxy')

const mods = { 'http:': http, 'https:': https }

interface YakTimeIncomingMessage extends http.IncomingMessage {
export interface YakTimeIncomingMessage extends http.IncomingMessage {
req: http.ClientRequest
}

Expand Down
2 changes: 1 addition & 1 deletion src/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { ModuleNotFoundError } from './errors'
import { proxy } from './proxy'
import * as curl from './curl'

const debug = Debug('yaktime:record')
const debug = Debug('yaktime:legacy-record')

/**
* Write `data` to `filename`.
Expand Down
36 changes: 32 additions & 4 deletions src/tapeMigrator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import * as path from 'path'
import { createServer, TestServer } from './test/helpers/server'
import { createTmpdir, Dir } from './test/helpers/tmpdir'
import { AddressInfo } from 'net'
import { tapename, RequestHasher } from './util'
import { tapename, RequestHasher, YakTimeOpts } from './util'
import { requestHasher } from './requestHasher'
import { tapeMigrator } from './tapeMigrator'
import { fileTapeMigrator, dbTapeMigrator } from './tapeMigrator'
import { yaktime } from './yaktime'
import { notifyNotUsedTapes } from './tracker'
import { Recorder } from './Recorder'

const incMessH = require('incoming-message-hash')
const messageHash: RequestHasher = incMessH.sync
Expand Down Expand Up @@ -57,19 +58,46 @@ describe('record', () => {
test('copies the file with the new name', done => {
expect.hasAssertions()

req.once('response', async function() {
req.once('response', async function () {
const serverReq = proxyServer.requests[0]
const newHash = requestHasher({})
const oldFileName = path.join(tmpdir.dirname, tapename(messageHash, serverReq))
const newFileName = path.join(tmpdir.dirname, tapename(newHash, serverReq))
expect(fs.existsSync(oldFileName)).toEqual(true)
expect(fs.existsSync(newFileName)).toEqual(false)
await tapeMigrator(newHash, { dirname: tmpdir.dirname })(serverReq)
await fileTapeMigrator(newHash, { dirname: tmpdir.dirname })(serverReq)
expect(fs.existsSync(oldFileName)).toEqual(true)
expect(fs.existsSync(newFileName)).toEqual(true)
done()
})

req.end()
})

test('add existing request to the db', done => {
expect.hasAssertions()

req.once('response', async function () {
const migrator = new Recorder({ dirname: tmpdir.dirname, useDb: true } as YakTimeOpts, `http://localhost:${serverInfo.port}`)
const serverReq = proxyServer.requests[0]
const oldFileName = path.join(tmpdir.dirname, tapename(messageHash, serverReq))
expect(fs.existsSync(oldFileName)).toEqual(true)
expect(await migrator.read(serverReq, [])).toBeUndefined()
await dbTapeMigrator(`http://localhost:${serverInfo.port}`, { dirname: tmpdir.dirname, useDb: true })(serverReq)
expect(fs.existsSync(oldFileName)).toEqual(true)
expect(await migrator.read(serverReq, [])).toEqual(
expect.objectContaining({
method: 'GET',
path: '/',
response: expect.objectContaining({
statusCode: 201,
body: 'T0s='
})
})
)
done()
})

req.end()
})
})
30 changes: 26 additions & 4 deletions src/tapeMigrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ import Debug from 'debug'
import { IncomingMessage } from 'http'
import { existsSync } from 'fs'
import { requestHasher } from './requestHasher'
import { Recorder, DbMigrator } from './Recorder'

const debug = Debug('yaktime:tape-migrator')

const incMessH = require('incoming-message-hash')
const oldHasher: RequestHasher = incMessH.sync

type tapeMigratorOptions = 'dirname' | 'oldHash'
export const tapeMigrator = (newHasher: RequestHasher, opts: Pick<YakTimeOpts, tapeMigratorOptions>) =>
export const fileTapeMigrator = (newHasher: RequestHasher, opts: Pick<YakTimeOpts, tapeMigratorOptions>) =>
async function tapeMigrator (req: IncomingMessage, body: Buffer[] = []) {
const oldFileName = path.join(opts.dirname, tapename(opts.oldHash || oldHasher, req, body))
const newFileName = path.join(opts.dirname, tapename(newHasher, req, body))
const oldExists = existsSync(oldFileName)

if (oldExists) {
debug('migrating')
debug('migrating to file')
debug('old filename', oldFileName)
const newExists = existsSync(newFileName)
if (newExists) {
Expand All @@ -31,8 +32,28 @@ export const tapeMigrator = (newHasher: RequestHasher, opts: Pick<YakTimeOpts, t
}
}

type migrateIfRequiredOptions = 'hash' | 'migrate' | 'hasherOptions'
export const dbTapeMigrator = (host: string, opts: YakTimeOpts) =>
async function tapeMigrator (req: IncomingMessage, body: Buffer[] = []) {
const recorder = new Recorder(opts, host)
const oldFileName = path.join(opts.dirname, tapename(opts.oldHash || oldHasher, req, body))
const oldExists = existsSync(oldFileName)

if (oldExists) {
debug('migrating to db')
debug('filename', oldFileName)

const migrator = new DbMigrator()
require(oldFileName)(null, migrator)
const request = recorder.serializeRequest(req, body)
const response = migrator.toSerializedResponse()
debug('saving to db')
await recorder.save(request, response)
}
}

type migrateIfRequiredOptions = 'hash' | 'migrate' | 'hasherOptions' | 'useDb'
export async function migrateIfRequired (
host: string,
opts: Pick<YakTimeOpts, migrateIfRequiredOptions | tapeMigratorOptions>,
req: IncomingMessage,
body: Buffer[] = []
Expand All @@ -41,5 +62,6 @@ export async function migrateIfRequired (
return
}
const newHasher = requestHasher(opts.hasherOptions)
await tapeMigrator(newHasher, opts)(req, body)
await dbTapeMigrator(host, opts)(req, body)
await fileTapeMigrator(newHasher, opts)(req, body)
}
5 changes: 5 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ export interface YakTimeOpts {
* this are only used when `opts.hash` is null
*/
hasherOptions?: RequestHasherOptions
/**
* Whether to use a built-in database instead of JS files.
* To avoid issues with hashers
*/
useDb?: boolean
}

export interface YakTimeServer {
Expand Down
Loading

0 comments on commit fb86583

Please sign in to comment.