Skip to content

Commit

Permalink
feat: Version 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
marcbachmann committed Dec 3, 2020
0 parents commit 3488c01
Show file tree
Hide file tree
Showing 12 changed files with 980 additions and 0 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
15
13 changes: 13 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"label": "Run Script Using Node",
"type": "process",
"command": "node",
"args": ["${file}"]
}
]
}
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM livingdocs/node:15
ADD package*.json /app/
ENV NODE_ENV=production
RUN npm ci
ADD ./ /app
CMD ["node", "index.js"]
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Loki log export to s3 buckets

```bash
export AWS_REGION=eu-central-1
export AWS_BUCKET=your-log-bucket
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export LOKI_HOST=http://localhost:3100

export EXTRACTORS='[{
"prefix": "some-service/",
"query": "{service=\"some-service\"}",
"transform": "json"
},
{
"prefix": "another-service/",
"query": "{service=\"another-service\"}",
"transform": "json"
}]'

docker run --name loki-log-export livingdocs/loki-log-export:1.0.0
```

Will run the log export every hour at 5 past and upload gzipped log files to s3:
```
some-service/2020/12/01/00.log.gz
some-service/2020/12/01/01.log.gz
...
some-service/2020/12/01/23.log.gz
some-service/2020/12/02/00.log.gz
another-service/2020/12/01/00.log.gz
another-service/2020/12/01/01.log.gz
...
another-service/2020/12/01/23.log.gz
another-service/2020/12/02/00.log.gz
```
155 changes: 155 additions & 0 deletions exporter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
const assert = require('assert')
const {promises: {pipeline}, Transform} = require('stream')
const CronJob = require('cron').CronJob
const zlib = require('zlib')
const getLogs = require('./get-logs')
const s3WriteStream = require('s3-streams').WriteStream
const S3 = require('aws-sdk').S3

module.exports = function createExporter (opts) {
const pino = require('pino')({base: null})

const {awsBucket, awsRegion, awsAccessKeyId, awsSecretAccessKey, lokiHost} = opts
assert(awsBucket, `The parameter 'opts.awsBucket' is required.`)
assert(awsRegion, `The parameter 'opts.awsRegion' is required.`)
assert(awsAccessKeyId, `The parameter 'opts.awsAccessKeyId' is required.`)
assert(awsSecretAccessKey, `The parameter 'opts.awsSecretAccessKey' is required.`)
assert(lokiHost, `The parameter 'opts.lokiHost' is required.`)

const s3 = new S3({
region: awsRegion,
accessKeyId: awsAccessKeyId,
secretAccessKey: awsSecretAccessKey
})

const createS3WriteStream = (key) => new s3WriteStream(s3, {Bucket: awsBucket, Key: key})

function toDateKey (prefix, date) {
return function (hour) {
const y = date.getFullYear()
const m = `${date.getMonth() + 1}`.padStart(2, 0)
const d = `${date.getDate()}`.padStart(2, 0)
const h = `${hour}`.padStart(2, 0)
const start = new Date(date)
start.setHours(hour, 0, 0, 0)

const end = new Date(date)
end.setHours(hour + 1, 0, 0, 0)

return {
key: `${y}-${m}-${d}/${h}.log.gz`.replace(/^\/?/, prefix || ''),
start,
end
}
}
}

const hoursOfDay = Object.freeze([
0, 1, 2, 3, 4, 5, 6, 7, 8,
9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23
])

async function getHoursToProcess (prefix) {
const days = []
const today = new Date()
days.push(hoursOfDay.slice(0, today.getHours()).map(toDateKey(prefix, today)))

// Preload x days
for (let i = 1; i < 2; i++) {
const pastDay = new Date()
pastDay.setDate(today.getDate() - i)
pastDay.setHours(0,0,0,0)
days.push(hoursOfDay.map(toDateKey(prefix, pastDay)))
}

const keys = []
for (const day of days) {
if (!day.length) continue
const {Contents} = await s3.listObjectsV2({
Bucket: awsBucket,
Prefix: day[0].key.replace(/..\.log\.gz$/, '')
}).promise()
keys.push(...Contents.map(({Key}) => Key))
}

const existing = new Set(keys)
const toProcess = []
for (const day of days) {
for (const hour of day) {
if (!existing.has(hour.key)) toProcess.push(hour)
}
}

// Returns an array of objects with keys
// {start: date, end: date, key: 'prefix/2020/11/01/00.log.gz'}
// {start: date, end: date, key: 'prefix/2020/11/01/...log.gz'}
// {start: date, end: date, key: 'prefix/2020/11/01/23.log.gz'}
return toProcess
}

async function start (extractor) {
assert(extractor.query, `The parameter 'extractor.query' is required.`)
assert(extractor.transform, `The parameter 'extractor.transform' is required.`)
if (extractor.transform === 'json') {
extractor.transform = jsonTransform
} else if (typeof extractor.transform === 'string') {
extractor.transform = (new Function(`return ${extractor.transform}`))()
}

const hours = await getHoursToProcess(extractor.prefix)
for (const hour of hours) {
const now = Date.now()
pino.info(`Processing logs for ${hour.key}`)
await pipeline(
getLogs({
log: pino,
baseURL: lokiHost,
query: extractor.query,
start: hour.start,
end: hour.end
}),
logsToText(extractor),
zlib.createGzip(),
createS3WriteStream(hour.key)
)
pino.info(`Persisted logs for ${hour.key}. Took ${Date.now() - now}ms.`)
}
}

function startCron (extractor) {
const job = new CronJob('05 * * * *', () => start(extractor))
return job.start()
}

function jsonTransform ({value}) {
// Fix varnish user agents
value = value.replace(/\\x[a-f0-9]{2}/g, '')
JSON.parse(value)
return value
}

function logsToText ({transform}) {
return new Transform({
objectMode: true,
transform (lines, _, done) {
let str = ''
for (const line of lines) {
try {
const log = transform(line)
if (log) str += `${log}\n`
} catch (err) {
pino.info({err}, `Parsing of line failed: ${line.value}`)
}
}
done(null, str)
}
})
}

return {
log: pino,
start,
startCron
}
}
53 changes: 53 additions & 0 deletions get-logs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
const mergeStreams = require('./merge-logs')
const delay = require('util').promisify(setTimeout)

module.exports = function follow (opts) {
const loki = require('axios').create({baseURL: opts.baseURL || 'http://localhost:3100'})
const afterMs = opts.start.getTime ? opts.start.getTime() : Date.parse(opts.start)
const endMs = opts.end.getTime ? opts.end.getTime() : Date.parse(opts.end)
const query = opts.query || '{service="bluewin-test/varnish"}'

return {
[Symbol.asyncIterator]() {
let after = `${afterMs}000000`
const end = `${endMs}000000`
return {
async next() {
let tries = 3
while (tries--) {
try {
opts.log.debug(`Fetch after ${after}, end ${end}`)
const res = await loki({
url: '/loki/api/v1/query_range',
params: {
limit: 5000,
direction: 'forward',
query,
start: after,
end
}
})

if (res.data.status !== 'success') {
throw new Error(`Invalid Loki Result: ${JSON.stringify(res.data)}`)
}

const logs = mergeStreams(res.data.data.result)

if (logs && logs[0] && logs[0].ts === after) logs.shift()
if (!logs.length) return {done: true}

after = logs[logs.length - 1].ts

return {done: false, value: logs}
} catch (err) {
opts.log.error({err}, 'Failed to fetch logs from loki')
}
await delay(1000)
}
throw new Error(`Failed to fetch logs after 3 retries.`)
}
}
}
}
}
32 changes: 32 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const extractors = JSON.parse(process.env.EXTRACTORS || '[]')
const exporter = require('./exporter')({
awsBucket: process.env.AWS_BUCKET,
awsRegion: process.env.AWS_REGION,
awsAccessKeyId: process.env.AWS_ACCESS_KEY_ID,
awsSecretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
lokiHost: process.env.LOKI_HOST || 'http://localhost:3100'
})

for (const extractor of extractors) {
exporter.log.warn(`Processor started for ${extractor.prefix}: ${extractor.query}`)
if (process.argv.includes('--once')) {
exporter.start(extractor)
} else {
exporter.startCron(extractor)
}
}

const prexit = require('prexit')
prexit.signals.push('uncaughtException', 'unhandledRejection')
prexit.logExceptions = false

prexit(async (signal, error) => {
const uptime = Math.round(process.uptime() * 100) / 100
if ([0, 'SIGTERM', 'SIGINT'].includes(signal)) {
if (signal === 0) exporter.log.warn(`Shutting down after running for ${uptime}s`)
else exporter.log.warn(`Signal ${signal} received. Shutting down after running for ${uptime}s`)
} else {
const err = signal instanceof Error ? signal : error
exporter.log.fatal({err}, `Processing error. Shutting down after running for ${uptime}s`)
}
})
38 changes: 38 additions & 0 deletions merge-logs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module.exports = mergeSort

// Merges multiple loki streams into one single array of logs
// transforms:
// [
// {stream: {container_id: '123'}, values: [["1606521574140524000", "first"]]}
// {stream: {container_id: '321'}, values: [["1606521574140524001", "second"]]}
// ]
// to a sorted array by ts:
// [
// {ts: "1606521574140524000", value: "first", stream: {container_id: '123'}}
// {ts: "1606521574140524001", value: "second", stream: {container_id: '321'}}
// ]
function mergeSort (streams, transform = (entry) => entry[1]) {
if (!streams.length) return []

const result = []

let hasEntries = true
while (hasEntries) {
let lowest = 0
for (let i = 0; i < streams.length; i++) {
if (streams[i].values[0]?.[0] < (streams[lowest].values[0]?.[0] || Infinity)) lowest = i
}

if (!streams[lowest]) throw new Error(`Fatal error: ${JSON.stringify(streams)}`)

const elem = streams[lowest].values.shift()
if (!elem) hasEntries = false
else result.push({ts: elem[0], value: elem[1], stream: streams[lowest].stream})
}

return result
}

// const data = require('./log.json')
// const result = mergeSort(data, ([ts, str], stream) => `${ts} - ${str}`)
// console.log(result[0], result[1], result[result.length - 1])
Loading

0 comments on commit 3488c01

Please sign in to comment.