Skip to content

Commit

Permalink
*: replace line by line text loaders by chunk by chunk text loaders. …
Browse files Browse the repository at this point in the history
…Loaders now yield token sequences of length blockSize
  • Loading branch information
JulienVig committed Oct 23, 2024
1 parent 75a5269 commit 03e5c7d
Show file tree
Hide file tree
Showing 24 changed files with 490 additions and 130 deletions.
43 changes: 12 additions & 31 deletions cli/src/benchmark_gpt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { parse } from "ts-command-line-args";
import * as tf from "@tensorflow/tfjs"
import { AutoTokenizer } from "@xenova/transformers";

import { fetchTasks, models, async_iterator, defaultTasks, processing } from "@epfml/discojs";
import { fetchTasks, models, async_iterator, defaultTasks } from "@epfml/discojs";
import { loadModelFromDisk, loadText } from '@epfml/discojs-node'

import { Server } from "server";
Expand Down Expand Up @@ -86,36 +86,17 @@ async function main(args: Required<CLIArguments>): Promise<void> {
// to make sure the dataset is batched and tokenized correctly
task.trainingInformation.batchSize = batchSize
task.trainingInformation.maxSequenceLength = contextLength
const dataset = loadText('../datasets/wikitext/wiki.train.tokens')

const maxLength = task.trainingInformation.maxSequenceLength ?? (tokenizer.model_max_length as number) + 1
// TODO will be easier when preproccessing is redone
const preprocessedDataset = intoTFGenerator(
dataset
.map((line) =>
processing.tokenizeAndLeftPad(line, tokenizer, maxLength),
)
.batch(batchSize)
.map((batch) =>
tf.tidy(() => ({
xs: tf.tensor2d(
batch.map((tokens) => tokens.slice(0, -1)).toArray(),
),
ys: tf.stack(
batch
.map(
(tokens) =>
tf.oneHot(
tokens.slice(1),
tokenizer.model.vocab.length,
) as tf.Tensor2D,
)
.toArray(),
) as tf.Tensor3D,
})),
),
);

const dataset = loadText(
'../datasets/wikitext/wiki.train.tokens',
tokenizer, config.blockSize, batchSize
)
// TODO will be easier when preprocessing is redone
const preprocessedDataset = intoTFGenerator(dataset).map((tokens: number[]) => {
const ys = tf.oneHot(tokens.slice(1), tokenizer.model.vocab.length)
const xs = tf.tensor(tokens.slice(0, config.blockSize), undefined, 'int32')
return {xs, ys}
}).batch(batchSize) as tf.data.Dataset<{ xs: tf.Tensor2D, ys: tf.Tensor3D }>

// Init and train the model
const model = new models.GPT(config)
console.log(`\tmodel type ${modelType} \n\tbatch size ${batchSize} \n\tcontext length ${contextLength}`)
Expand Down
42 changes: 28 additions & 14 deletions cli/src/train_gpt.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,52 @@
import * as tf from "@tensorflow/tfjs-node"
import { AutoTokenizer } from "@xenova/transformers";
import { models, processing } from "@epfml/discojs";
import { models } from "@epfml/discojs";
import { loadText } from '@epfml/discojs-node'

async function main(): Promise<void> {
const data = "Lorem ipsum dolor sit amet, consectetur adipis"
const datasetSource = new tf.data.FileDataSource(Buffer.from(data))
const textDataset = new tf.data.TextLineDataset(datasetSource)
function intoTFGenerator<T extends tf.TensorContainer>(
iter: AsyncIterable<T>,
): tf.data.Dataset<T> {
// @ts-expect-error generator
return tf.data.generator(async function* () {
yield* iter;
});
}

async function main(): Promise<void> {

const config: models.GPTConfig = {
modelType: 'gpt-nano',
lr: 0.01,
maxIter: 50,
maxIter: 10,
evaluateEvery:50,
maxEvalBatches: 10,
blockSize: 16,
vocabSize: 50257,
debug: false
}


const batchSize = 8
const tokenizer = await AutoTokenizer.from_pretrained('Xenova/gpt2')
const tokenDataset = textDataset.map((text: string) => {
const tokens = processing.tokenizeAndLeftPad(text, tokenizer, config.blockSize + 1)
const textDataset = loadText(
"../datasets/wikitext/wiki.train.tokens",
tokenizer, config.blockSize, batchSize
)

const tokenDataset = intoTFGenerator(textDataset).map((tokens: number[]) => {
const ys = tf.oneHot(tokens.slice(1), tokenizer.model.vocab.length)
const xs = tf.tensor(tokens.slice(0, config.blockSize), undefined, 'int32')
return {xs, ys}
}).repeat().batch(16) as tf.data.Dataset<{ xs: tf.Tensor2D, ys: tf.Tensor3D }>
}).batch(batchSize) as tf.data.Dataset<{ xs: tf.Tensor2D, ys: tf.Tensor3D }>

const model = new models.GPT(config)

for await (const logs of model.train(tokenDataset, undefined)) {
console.log(logs)
for (let i = 0; i < 6; i++) {
console.log(`Epoch ${i}`)
for await (const logs of model.train(tokenDataset, undefined)) {
console.log(logs)
}
}

const generation = await model.generate("Lorem", tokenizer, { maxNewTokens: 10, doSample: false, topk: 5, temperature:0.1 })
const generation = await model.generate("First", tokenizer, { maxNewTokens: 10, doSample: false, topk: 5, temperature:0.1 })
console.log(generation)
}

Expand Down
78 changes: 75 additions & 3 deletions discojs-node/src/loaders.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import * as fs from "node:fs/promises";
import { withFile } from "tmp-promise";
import { describe, it } from "mocha";
import { expect } from "chai";
import { models } from "@epfml/discojs";
import { AutoTokenizer } from "@xenova/transformers";

import {
loadCSV,
Expand Down Expand Up @@ -50,13 +52,83 @@ describe("image directory parser", () => {
});

describe("text parser", () => {

it("parses basic file", async () => {
const tokenizer = await AutoTokenizer.from_pretrained('Xenova/gpt2')
await withFile(async ({ path }) => {
await fs.writeFile(path, ["a", "b", "c"].join("\n"));
const text = ["a", "b", "c"].join("\n")
await fs.writeFile(path, text);
// set block size to 4 to get 1 sequence of 4 tokens + 1 label token
const parsed = loadText(path, tokenizer, 4, 1);
expect(await parsed.size()).to.equal(1);
const next = await parsed[Symbol.asyncIterator]().next()
expect(next.done).to.be.false;

const tokens = next.value as number[]
const expectedTokens = models.tokenize(tokenizer, text, {
padding: false,
truncation: false,
return_tensor: false
})
expect(tokens).to.deep.equal(expectedTokens);
});
});

it("yields the correct block size", async () => {
const tokenizer = await AutoTokenizer.from_pretrained('Xenova/gpt2')
await withFile(async ({ path }) => {
const text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit."
const expectedTokens = models.tokenize(tokenizer, text, {
padding: false,
truncation: false,
return_tensor: false
})
await fs.writeFile(path, text);

const parsed = loadText(path);
// set block size to 4 to get 1 sequence of 4 tokens + 1 label token
// so we expect 5 tokens per read
const blockSize = 4
const parsed = loadText(path, tokenizer, blockSize, 1);
// expect the number of sequences to be the total number of tokens divided by blockSize
// we use floor because the last incomplete sequence is dropped
expect(await parsed.size()).to.equal(Math.floor(expectedTokens.length / blockSize));

let i = 0
for await (const tokens of parsed) {
// each sequence should have length blockSize + 1 (for the label)
expect(tokens).to.deep.equal(expectedTokens.slice(i, i + blockSize + 1));
// but the window should move by blockSize only
i += blockSize
}
})
});

it("reads multiple chunks", async () => {
const tokenizer = await AutoTokenizer.from_pretrained('Xenova/gpt2')
await withFile(async ({ path }) => {
const text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec sed risus maximus, ultricies ex sed, dictum elit. Curabitur faucibus egestas enim et auctor. Quisque vel dignissim turpis. Curabitur justo tellus, elementum sit amet erat eget, auctor ornare nisi. Nunc tortor odio, ultrices id leo vitae, euismod congue ex. Curabitur arcu leo, sagittis quis felis nec, imperdiet aliquet tellus. Integer a mollis nulla. Quisque pulvinar lectus eget nisi pharetra, non molestie magna ullamcorper. Sed porttitor diam non blandit molestie. Duis tristique arcu ut efficitur efficitur. Fusce et ullamcorper tortor. Pellentesque a accumsan lacus, nec mollis risus. Nunc quis eros a orci ultricies cursus. Maecenas sodales ipsum a magna malesuada efficitur. Maecenas at sapien blandit, egestas nisi eu, mollis elit."
const expectedTokens = models.tokenize(tokenizer, text, {
padding: false,
truncation: false,
return_tensor: false
})
await fs.writeFile(path, text);

expect(await parsed.size()).to.equal(3);
// set block size to 4 to get 1 sequence of 4 tokens + 1 label token
// so we expect 5 tokens per read
const blockSize = 4
const parsed = loadText(path, tokenizer, blockSize, 1, 1); // set the min chunk size allowed to 1 bit
// expect the number of sequences to be the total number of tokens divided by blockSize
// we use floor because the last incomplete sequence is dropped
expect(await parsed.size()).to.equal(Math.floor(expectedTokens.length / blockSize));

let i = 0
for await (const tokens of parsed) {
// each sequence should have length blockSize + 1 (for the label)
expect(tokens).to.deep.equal(expectedTokens.slice(i, i + blockSize + 1));
// but the window should move by blockSize only
i += blockSize
}
});
});
});
100 changes: 92 additions & 8 deletions discojs-node/src/loaders/text.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,98 @@
import * as fs from "node:fs/promises";
import * as readline from "node:readline/promises";
import createDebug from "debug";
import { createReadStream } from 'node:fs';

import { Dataset, Text } from "@epfml/discojs";
import { PreTrainedTokenizer } from '@xenova/transformers';
import { Dataset, Text, models } from "@epfml/discojs";

export function load(path: string): Dataset<Text> {
const debug = createDebug("discojs-node:loaders:text");

/**
* Returns a Dataset that streams and tokenizes text to yield tokenized sequences
* one at a time. Each sequence has size `blockSize` + 1, where the first `blockSize`
* tokens are the input and the last token is the label. The following sequence
* starts with the last token of the previous sequence (so the previous label is now the
* first input token).
* In other words, the dataset yields sequences of size `blockSize` + 1 but with an overlap
* of 1 token between each sequence.
*
* @param path path to the text file to read
* @param tokenizer the tokenizer to use, should match the model that will be trained
* @param blockSize the context length, the maximum number of tokens of input sequences
* @param batchSize default to 1, the number of input sequences (of `blockSize` tokens) in each batch.
* The batch size is only used to configure the chunk size of the file stream such that each chunk is
* big enough to contain at least one batch.
* @param minChunkSize default to 16KiB, the minimum size of each chunk in bits
* @returns a dataset of tokenized input and label sequences
*/
export function load(path: string, tokenizer: PreTrainedTokenizer,
blockSize: number, batchSize: number = 1, minChunkSize = 16384): Dataset<Text> {
return new Dataset(async function* () {
const input = (await fs.open(path)).createReadStream({ encoding: "utf8" });
if (batchSize < 1 || blockSize < 1 || minChunkSize < 1)
throw new Error("batchSize, blockSize and minChunkSize must be positive integers");
// we want each chunk to be at least bigger than the block size (each chunk corresponds to a block)
// (or event bigger than batch size * block size so that each chunk corresponds to a batch)
const chunkTokenSize = batchSize * (blockSize + 1) // + 1 for the next word label ys
// We read 8*8 = 8 bytes per expected token to ensure we have enough tokens
// For reference, the GPT-2 tokenizer encodes 3 to 4 bytes per token on average
const chunkBitSize = Math.max(minChunkSize, chunkTokenSize * 8 * 8);
debug("Setting the chunk size to %o bits", chunkBitSize)
// Create a stream to read the text file chunk by chunk
const stream = createReadStream(path, {
encoding: "utf8",
highWaterMark: chunkBitSize
});

// `readline` is a bit overkill but seems standard
// https://nodejs.org/api/readline.html#example-read-file-stream-line-by-line
yield* readline.createInterface({ input, crlfDelay: Infinity });
// iterate over the chunks
let endOfPreviousChunk = ""
let iteration = 0
for await (const chunk of stream) {
if (typeof chunk !== 'string') throw new Error('Expected file stream to yield string')
debug("Reading chunk of size %o", chunk.length)
// tokenize the whole chunk at once
// Concatenate with potential leftovers from the previous chunk
const tokens = models.tokenize(tokenizer, endOfPreviousChunk + chunk, {
padding: false,
truncation: false,
return_tensor: false,
})
if (tokens.length < blockSize + 1) {
// throw if it happens on the 1st iteration
if (iteration === 0)
throw new Error(`the chunk (${tokens.length} tokens) is too small ` +
`to get a sequence of length blockSize (${blockSize + 1} tokens). ` +
`Either the text file or the chunk size (${chunkBitSize} bits) is too small.`);
// if this isn't the first iteration we simply skip
// as we expect the last chunk to be potentially smaller than the block size
debug("chunk smaller than block size, loading next chunk")
continue
}
debug("batch per chunk: %o", tokens.length / (batchSize * blockSize))
let currentPosition = 0;
// yield one block of tokens at a time
while (currentPosition + blockSize + 1 <= tokens.length) {
yield tokens.slice(currentPosition, currentPosition + blockSize + 1);
currentPosition += blockSize; // don't add + 1 here
}
// keep the last tokens for the next chunk
// if this was the last one the remaining tokens are discarded
if (currentPosition < tokens.length) {
// We actually need to decode the tokens to get the leftover text
// instead of simply keeping the remaining tokens.
// this is because the tokens may be different once prepended to the next chunk
// e.g. if the remaining text is ". A" and the next chunk starts with "nother"
// the tokenization will be different than if we simply concatenate the remaining tokens
endOfPreviousChunk = tokenizer.decode(
tokens.slice(currentPosition),
{ skip_special_tokens: true }
)
debug("End of chunk, remaining text: '%s'", endOfPreviousChunk)
} else {
// Note that the difference between tokenizing and then concatenating
// vs concatenating and then tokenizing can happen if their is no
// remaining text. We consider this difference negligible
endOfPreviousChunk = "";
}
iteration++;
}
});
}
55 changes: 43 additions & 12 deletions discojs-web/src/loaders.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { AutoTokenizer } from "@xenova/transformers";
import { describe, it, expect } from "vitest";

import { models } from "@epfml/discojs";
import { loadCSV, loadText } from "./loaders/index.js";

async function arrayFromAsync<T>(iter: AsyncIterable<T>): Promise<T[]> {
Expand All @@ -22,22 +24,51 @@ describe("csv parser", () => {
});

describe("text parser", () => {
it("loads", async () => {
it("loads a simple sequence", async () => {
const text = ["first", "second", "third"].join("\n")

const tokenizer = await AutoTokenizer.from_pretrained('Xenova/gpt2')
const expectedTokens = models.tokenize(tokenizer, text, {
padding: false,
truncation: false,
return_tensor: false,
})

// jsdom doesn't implement .text on File/Blob
// trick from https://github.com/jsdom/jsdom/issues/2555
const text = await (
await fetch(
// data URL content need to be url-encoded
["data:,first", "second", "third"].join("%0A"),
)
const file = await (
await fetch( "data:," + encodeURIComponent(text))
).blob();
const parsed = loadText(file, tokenizer, 4);

const parsed = loadText(text);
expect(await parsed.size()).to.equal(1); // expect a single sequence
expect((await arrayFromAsync(parsed))[0]).to.deep.equal(expectedTokens);
});

expect(await arrayFromAsync(parsed)).to.have.ordered.members([
"first",
"second",
"third",
]);
it("yields the correct block size", async () => {
const text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed quis faucibus ipsum."

const tokenizer = await AutoTokenizer.from_pretrained('Xenova/gpt2')
const expectedTokens = models.tokenize(tokenizer, text, {
padding: false,
truncation: false,
return_tensor: false
})

const file = await (
await fetch("data:," + encodeURIComponent(text))
).blob();

const blockSize = 4
const parsed = loadText(file, tokenizer, blockSize);
expect(await parsed.size()).to.equal(Math.floor(expectedTokens.length / blockSize));

let i = 0
for await (const tokens of parsed) {
// each sequence should have length blockSize + 1 (for the label)
expect(tokens).to.deep.equal(expectedTokens.slice(i, i + blockSize + 1));
// but the window should move by blockSize only
i += blockSize
}
});
});
Loading

0 comments on commit 03e5c7d

Please sign in to comment.