Day 5: Feed old events
I have a full archive of all segment events in our Postgres warehouse that I now want to feed the system. Let’s try to make that happen.
I have exported the postgres pages
and identifies
table in CSV files of 500k rows per file.
// file 1
SELECT * FROM prezly_prod_frontend.pages WHERE date_part('year', timestamp) = 2019 LIMIT 500000;
// file 2
SELECT * FROM prezly_prod_frontend.pages WHERE date_part('year', timestamp) = 2019 LIMIT 500000 OFFSET 500000;
The database is using Amazon Aurora (postgres variant) so exploring the files went pretty smooth.
Now I have created two scripts to get that data into our new setup by:
- reading the file converting to JSON (some library)
- converting the flat (one column per property) to the event structure we need
- importing the records by 25 using dynamo
batchWrite
Here is the code which I copied (and modified) from a StackOverflow article
// src/tasks/import_page_from_file.js
// copied from https://stackoverflow.com/questions/32678325/how-can-i-import-bulk-data-from-a-csv-file-into-dynamodb
const fs = require('fs');
const parse = require('csv-parse');
const async = require('async');
const CSV_FILENAME = process.argv[2];
const DYNAMODB_TABLENAME = 'sma-page-event-dev';
const dynamoDBFactory = require('../dynamodb.factory');
const dynamoDb = dynamoDBFactory();
const rs = fs.createReadStream(CSV_FILENAME);
const parser = parse({
columns: true,
delimiter: ','
}, function(err, data) {
const split_arrays = [],
size = 25;
while (data.length > 0) {
split_arrays.push(data.splice(0, size));
}
let data_imported = false;
let chunk_no = 1;
async.each(split_arrays, function(item_data, callback) {
const params = {
RequestItems: {}
};
params.RequestItems[DYNAMODB_TABLENAME] = [];
item_data.forEach(item => {
const transformed_item = {
messageId: item.id,
source: "import",
anonymousId: item.anonymous_id,
context: {
ip: item.context_ip,
library: {
name: item.context_library_name,
version: item.context_library_version
},
locale: item.context_locale,
page: {
path: item.context_page_path,
referrer: item.context_page_referrer,
search: item.context_page_search,
title: item.context_page_title,
url: item.context_page_url,
},
userAgent: item.context_user_agent,
},
integrations: { All: true },
originalTimestamp: item.original_timestamp,
properties: {
path: item.path,
referrer: item.referrer,
search: item.search,
title: item.title,
url: item.url
},
receivedAt: item.received_at,
sentAt: item.sent_at,
timestamp: item.timestamp,
type: "page",
userId: item.user_id
};
params.RequestItems[DYNAMODB_TABLENAME].push({
PutRequest: {
Item: { ...transformed_item }
}
});
});
dynamoDb.batchWrite(params, function(err, res, cap) {
console.log('done going next');
if (err == null) {
console.log('Success chunk #' + chunk_no);
data_imported = true;
} else {
console.log(err);
console.log('Fail chunk #' + chunk_no);
data_imported = false;
}
chunk_no++;
callback();
});
}, function() {
// run after loops
console.log('all data imported....');
});
});
rs.pipe(parser);
And another one
// src/tasks/import_identify_from_file.js
// copied from https://stackoverflow.com/questions/32678325/how-can-i-import-bulk-data-from-a-csv-file-into-dynamodb
const fs = require('fs');
const parse = require('csv-parse');
const async = require('async');
const CSV_FILENAME = process.argv[2];
const DYNAMODB_TABLENAME = 'sma-identify-event-dev';
const dynamoDBFactory = require('../dynamodb.factory');
const dynamoDb = dynamoDBFactory();
const rs = fs.createReadStream(CSV_FILENAME);
const parser = parse({
columns: true,
delimiter: ','
}, function(err, data) {
const split_arrays = [],
size = 25;
while (data.length > 0) {
split_arrays.push(data.splice(0, size));
}
let data_imported = false;
let chunk_no = 1;
async.each(split_arrays, function(item_data, callback) {
const params = {
RequestItems: {}
};
params.RequestItems[DYNAMODB_TABLENAME] = [];
item_data.forEach(item => {
const transformed_item = {
messageId: item.id,
source: "import",
anonymousId: item.anonymous_id,
context: {
ip: item.context_ip,
library: {
name: item.context_library_name,
version: item.context_library_version
},
locale: item.context_locale,
page: {
path: item.context_page_path,
referrer: item.context_page_referrer,
search: item.context_page_search,
title: item.context_page_title,
url: item.context_page_url,
},
userAgent: item.context_user_agent,
},
integrations: { All: true },
originalTimestamp: item.original_timestamp,
receivedAt: item.received_at,
sentAt: item.sent_at,
timestamp: item.timestamp,
type: "identify",
userId: item.user_id
};
params.RequestItems[DYNAMODB_TABLENAME].push({
PutRequest: {
Item: { ...transformed_item }
}
});
});
dynamoDb.batchWrite(params, function(err, res, cap) {
console.log('done going next');
if (err == null) {
console.log('Success chunk #' + chunk_no);
data_imported = true;
} else {
console.log(err);
console.log('Fail chunk #' + chunk_no);
data_imported = false;
}
chunk_no++;
callback();
});
}, function() {
// run after loops
console.log('all data imported....');
});
});
rs.pipe(parser);
This dataset goes all the way back to 2016 and has millions of records. Because the batchWrite will upload this stuff pretty efficiently and trigger the process functions at a really good speed we have to make some changes to the DynamoDB and switch to On-Demand pricing.
Update the Resources
part in serverless
resources:
Resources:
SegmentIdentifiesDynamoDBTable:
Type: 'AWS::DynamoDB::Table'
Properties:
StreamSpecification:
StreamViewType: NEW_IMAGE
AttributeDefinitions:
- AttributeName: messageId
AttributeType: S
KeySchema:
- AttributeName: messageId
KeyType: HASH
BillingMode: PAY_PER_REQUEST
TableName: ${self:custom.tableIdentify}
SegmentPageDynamoDBTable:
Type: 'AWS::DynamoDB::Table'
Properties:
StreamSpecification:
StreamViewType: NEW_IMAGE
AttributeDefinitions:
- AttributeName: messageId
AttributeType: S
KeySchema:
- AttributeName: messageId
KeyType: HASH
BillingMode: PAY_PER_REQUEST
TableName: ${self:custom.tablePage}
SegmentAttributionDynamoDBTable:
Type: 'AWS::DynamoDB::Table'
Properties:
AttributeDefinitions:
- AttributeName: anonymousId
AttributeType: S
- AttributeName: eventId
AttributeType: S
KeySchema:
- AttributeName: anonymousId
KeyType: HASH
- AttributeName: eventId
KeyType: RANGE
BillingMode: PAY_PER_REQUEST
TableName: ${self:custom.tableAttribution}
SegmentUserMappingDynamoDBTable:
Type: 'AWS::DynamoDB::Table'
Properties:
AttributeDefinitions:
- AttributeName: userId
AttributeType: S
- AttributeName: anonymousId
AttributeType: S
KeySchema:
- AttributeName: userId
KeyType: HASH
- AttributeName: anonymousId
KeyType: RANGE
BillingMode: PAY_PER_REQUEST
TableName: ${self:custom.tableUserMapping}
A quick sls deploy
will do the job. I did remove the entire infrastructure a few times testing this.
Now let’s feed some events:
node --max-old-space-size=4000 ./src/tasks/import_identify_from_file.js data/identifies.csv
node --max-old-space-size=4000 ./src/tasks/import_page_from_file.js data/2019_part_1.csv
node --max-old-space-size=4000 ./src/tasks/import_page_from_file.js data/2019_part_2.csv
node --max-old-space-size=4000 ./src/tasks/import_page_from_file.js data/2019_part_3.csv
It took about an hour to get all the events in there but now all history from 2018, 2019 and 2020 should be in the attribution tables.
With a larger dataset it might be more effective to go from SQL straight to DynamoDB or use one of Amazon Migration Services. In my case it was a great chance to see how it all performs under some stress and clean up the console.log
calls throughout the code :-)
Tomorrow I will explore how I can feed some events back to segment using analytics.js.