Skip to content

Commit

Permalink
Merge pull request #2 from karhig/master
Browse files Browse the repository at this point in the history
Blob support, PORT environment variable and dockerfile optimizations.
  • Loading branch information
masumsoft authored Sep 19, 2017
2 parents 26f2b54 + 6de6abf commit c1491df
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 29 deletions.
6 changes: 3 additions & 3 deletions Dockerfile.export
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM node:8.5.0-alpine

ADD export.js export.js
ADD index.js index.js
ADD package.json package.json

RUN npm install

ADD index.js index.js
ADD export.js export.js

VOLUME /data

ENTRYPOINT node export.js
6 changes: 3 additions & 3 deletions Dockerfile.import
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM node:8.5.0-alpine

ADD import.js import.js
ADD index.js index.js
ADD package.json package.json

RUN npm install

ADD index.js index.js
ADD import.js import.js

VOLUME /data

ENTRYPOINT node import.js
5 changes: 3 additions & 2 deletions export.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ var fs = require('fs');
var jsonStream = require('JSONStream');

var HOST = process.env.HOST || '127.0.0.1';
var PORT = process.env.PORT || 9042;
var KEYSPACE = process.env.KEYSPACE;

if (!KEYSPACE) {
Expand All @@ -20,8 +21,8 @@ if (USER && PASSWORD) {
authProvider = new cassandra.auth.PlainTextAuthProvider(USER, PASSWORD);
}

var systemClient = new cassandra.Client({contactPoints: [HOST], authProvider: authProvider});
var client = new cassandra.Client({ contactPoints: [HOST], keyspace: KEYSPACE, authProvider: authProvider});
var systemClient = new cassandra.Client({contactPoints: [HOST], authProvider: authProvider, protocolOptions: {port: [PORT]}});
var client = new cassandra.Client({ contactPoints: [HOST], keyspace: KEYSPACE, authProvider: authProvider, protocolOptions: {port: [PORT]}});

function processTableExport(table) {
console.log('==================================================');
Expand Down
35 changes: 14 additions & 21 deletions import.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var fs = require('fs');
var jsonStream = require('JSONStream');

var HOST = process.env.HOST || '127.0.0.1';
var PORT = process.env.PORT || 9042;
var KEYSPACE = process.env.KEYSPACE;

if (!KEYSPACE) {
Expand All @@ -21,8 +22,8 @@ if (USER && PASSWORD) {
authProvider = new cassandra.auth.PlainTextAuthProvider(USER, PASSWORD);
}

var systemClient = new cassandra.Client({contactPoints: [HOST], authProvider: authProvider});
var client = new cassandra.Client({ contactPoints: [HOST], keyspace: KEYSPACE, authProvider: authProvider});
var systemClient = new cassandra.Client({contactPoints: [HOST], authProvider: authProvider, protocolOptions: {port: [PORT]}});
var client = new cassandra.Client({ contactPoints: [HOST], keyspace: KEYSPACE, authProvider: authProvider, protocolOptions: {port: [PORT]}});

function buildTableQueryForDataRow(tableInfo, row) {
var queries = [];
Expand All @@ -47,7 +48,12 @@ function buildTableQueryForDataRow(tableInfo, row) {
}
params = _.map(params, function(param){
if (_.isPlainObject(param)) {
return _.omitBy(param, function(item) {return item === null});
if (param.type === 'Buffer') {
return Buffer.from(param);
}
else {
return _.omitBy(param, function(item) {return item === null});
}
}
return param;
});
Expand All @@ -72,25 +78,19 @@ function processTableImport(table) {
console.log('Creating read stream from: ' + table + '.json');
var jsonfile = fs.createReadStream('data/' + table + '.json', {encoding: 'utf8'});
var readStream = jsonfile.pipe(jsonStream.parse('*'));
var queries = [];
var chunkBatch = [];
var queryPromises = [];
var processed = 0;
readStream.on('data', function(row){
var query = buildTableQueryForDataRow(tableInfo, row);
queries.push(query);
queryPromises.push(client.execute(query.query, query.params, { prepare: true}));
processed++;

if (queries.length === 10) {
chunkBatch.push(client.batch(queries, { prepare: true, logged: false }));
queries = [];
}

if (processed%1000 === 0) {
console.log('Streaming ' + processed + ' rows to table: ' + table);
jsonfile.pause();
Promise.all(chunkBatch)
Promise.all(queryPromises)
.then(function (){
chunkBatch = chunkBatch.slice(100);
queryPromises = [];
jsonfile.resume();
})
.catch(function (err){
Expand All @@ -106,14 +106,7 @@ function processTableImport(table) {
var startTime = Date.now();
jsonfile.on('end', function () {
console.log('Streaming ' + processed + ' rows to table: ' + table);
if (queries.length > 1) {
chunkBatch.push(client.batch(queries, { prepare: true, logged: false }));
}
else if (queries.length === 1) {
chunkBatch.push(client.execute(queries[0].query, queries[0].params, { prepare: true }));
}

Promise.all(chunkBatch)
Promise.all(queryPromises)
.then(function (){
var timeTaken = (Date.now() - startTime) / 1000;
var throughput = processed / timeTaken;
Expand Down

0 comments on commit c1491df

Please sign in to comment.