arrow-left arrow-right brightness-2 chevron-left chevron-right circle-half-full facebook-box facebook loader magnify menu-down rss-box star twitter-box twitter white-balance-sunny window-close
Day 5: Feed old events
3 min read

Day 5: Feed old events

I have a full archive of all segment events in our Postgres warehouse that I now want to feed the system. Let’s try to make that happen.

I have exported the postgres pages and identifies table in CSV files of 500k rows per file.

// file 1
SELECT * FROM prezly_prod_frontend.pages WHERE date_part('year', timestamp) = 2019 LIMIT 500000;
// file 2
SELECT * FROM prezly_prod_frontend.pages WHERE date_part('year', timestamp) = 2019 LIMIT 500000 OFFSET 500000;

The database is using Amazon Aurora (postgres variant) so exploring the files went pretty smooth.

Now I have created two scripts to get that data into our new setup by:

  1. reading the file converting to JSON (some library)
  2. converting the flat (one column per property) to the event structure we need
  3. importing the records by 25 using dynamo batchWrite

Here is the code which I copied (and modified) from a StackOverflow article

// src/tasks/import_page_from_file.js
// copied from https://stackoverflow.com/questions/32678325/how-can-i-import-bulk-data-from-a-csv-file-into-dynamodb

const fs = require('fs');
const parse = require('csv-parse');
const async = require('async');

const CSV_FILENAME = process.argv[2];
const DYNAMODB_TABLENAME = 'sma-page-event-dev';

const dynamoDBFactory = require('../dynamodb.factory');
const dynamoDb = dynamoDBFactory();

const rs = fs.createReadStream(CSV_FILENAME);
const parser = parse({
    columns: true,
    delimiter: ','
}, function(err, data) {

    const split_arrays = [],
        size = 25;

    while (data.length > 0) {
        split_arrays.push(data.splice(0, size));
    }

    let data_imported = false;
    let chunk_no = 1;

    async.each(split_arrays, function(item_data, callback) {
        const params = {
            RequestItems: {}
        };
        params.RequestItems[DYNAMODB_TABLENAME] = [];
        item_data.forEach(item => {

            const transformed_item = {
                messageId: item.id,
                source: "import",
                anonymousId: item.anonymous_id,
                context: {
                    ip: item.context_ip,
                    library: {
                        name: item.context_library_name,
                        version: item.context_library_version
                    },
                    locale: item.context_locale,
                    page: {
                        path: item.context_page_path,
                        referrer: item.context_page_referrer,
                        search: item.context_page_search,
                        title: item.context_page_title,
                        url: item.context_page_url,
                    },
                    userAgent: item.context_user_agent,
                },
                integrations: { All: true },
                originalTimestamp: item.original_timestamp,
                properties: {
                    path: item.path,
                    referrer: item.referrer,
                    search: item.search,
                    title: item.title,
                    url: item.url
                },
                receivedAt: item.received_at,
                sentAt: item.sent_at,
                timestamp: item.timestamp,
                type: "page",
                userId: item.user_id
            };


            params.RequestItems[DYNAMODB_TABLENAME].push({
                PutRequest: {
                    Item: { ...transformed_item }
                }
            });
        });

        dynamoDb.batchWrite(params, function(err, res, cap) {
            console.log('done going next');
            if (err == null) {
                console.log('Success chunk #' + chunk_no);
                data_imported = true;
            } else {
                console.log(err);
                console.log('Fail chunk #' + chunk_no);
                data_imported = false;
            }

            chunk_no++;
            callback();
        });

    }, function() {
        // run after loops
        console.log('all data imported....');
    });

});
rs.pipe(parser);
/src/tasks/import_page_from_file.js

And another one

// src/tasks/import_identify_from_file.js
// copied from https://stackoverflow.com/questions/32678325/how-can-i-import-bulk-data-from-a-csv-file-into-dynamodb

const fs = require('fs');
const parse = require('csv-parse');
const async = require('async');

const CSV_FILENAME = process.argv[2];
const DYNAMODB_TABLENAME = 'sma-identify-event-dev';

const dynamoDBFactory = require('../dynamodb.factory');
const dynamoDb = dynamoDBFactory();

const rs = fs.createReadStream(CSV_FILENAME);
const parser = parse({
    columns: true,
    delimiter: ','
}, function(err, data) {

    const split_arrays = [],
        size = 25;

    while (data.length > 0) {
        split_arrays.push(data.splice(0, size));
    }

    let data_imported = false;
    let chunk_no = 1;

    async.each(split_arrays, function(item_data, callback) {
        const params = {
            RequestItems: {}
        };
        params.RequestItems[DYNAMODB_TABLENAME] = [];
        item_data.forEach(item => {

            const transformed_item = {
                messageId: item.id,
                source: "import",
                anonymousId: item.anonymous_id,
                context: {
                    ip: item.context_ip,
                    library: {
                        name: item.context_library_name,
                        version: item.context_library_version
                    },
                    locale: item.context_locale,
                    page: {
                        path: item.context_page_path,
                        referrer: item.context_page_referrer,
                        search: item.context_page_search,
                        title: item.context_page_title,
                        url: item.context_page_url,
                    },
                    userAgent: item.context_user_agent,
                },
                integrations: { All: true },
                originalTimestamp: item.original_timestamp,
                receivedAt: item.received_at,
                sentAt: item.sent_at,
                timestamp: item.timestamp,
                type: "identify",
                userId: item.user_id
            };

            params.RequestItems[DYNAMODB_TABLENAME].push({
                PutRequest: {
                    Item: { ...transformed_item }
                }
            });

        });

        dynamoDb.batchWrite(params, function(err, res, cap) {
            console.log('done going next');
            if (err == null) {
                console.log('Success chunk #' + chunk_no);
                data_imported = true;
            } else {
                console.log(err);
                console.log('Fail chunk #' + chunk_no);
                data_imported = false;
            }

            chunk_no++;
            callback();
        });

    }, function() {
        // run after loops
        console.log('all data imported....');
    });

});
rs.pipe(parser);
/src/tasks/import_identify_from_file.js

This dataset goes all the way back to 2016 and has millions of records. Because the batchWrite will upload this stuff pretty efficiently and trigger the process functions at a really good speed we have to make some changes to the DynamoDB and switch to On-Demand pricing.

Update the Resources part in serverless

resources:
  Resources:
    SegmentIdentifiesDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        StreamSpecification:
          StreamViewType: NEW_IMAGE
        AttributeDefinitions:
          - AttributeName: messageId
            AttributeType: S
        KeySchema:
          - AttributeName: messageId
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
        TableName: ${self:custom.tableIdentify}
    SegmentPageDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        StreamSpecification:
          StreamViewType: NEW_IMAGE
        AttributeDefinitions:
          - AttributeName: messageId
            AttributeType: S
        KeySchema:
          - AttributeName: messageId
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
        TableName: ${self:custom.tablePage}
    SegmentAttributionDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: anonymousId
            AttributeType: S
          - AttributeName: eventId
            AttributeType: S
        KeySchema:
          - AttributeName: anonymousId
            KeyType: HASH
          - AttributeName: eventId
            KeyType: RANGE
        BillingMode: PAY_PER_REQUEST
        TableName: ${self:custom.tableAttribution}
    SegmentUserMappingDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: userId
            AttributeType: S
          - AttributeName: anonymousId
            AttributeType: S
        KeySchema:
          - AttributeName: userId
            KeyType: HASH
          - AttributeName: anonymousId
            KeyType: RANGE
        BillingMode: PAY_PER_REQUEST
        TableName: ${self:custom.tableUserMapping}
serverless.yml

A quick sls deploy will do the job. I did remove the entire infrastructure a few times testing this.

Now let’s feed some events:

node --max-old-space-size=4000 ./src/tasks/import_identify_from_file.js data/identifies.csv
node --max-old-space-size=4000 ./src/tasks/import_page_from_file.js data/2019_part_1.csv
node --max-old-space-size=4000 ./src/tasks/import_page_from_file.js data/2019_part_2.csv
node --max-old-space-size=4000 ./src/tasks/import_page_from_file.js data/2019_part_3.csv

It took about an hour to get all the events in there but now all history from 2018, 2019 and 2020 should be in the attribution tables.

With a larger dataset it might be more effective to go from SQL straight to DynamoDB or use one of Amazon Migration Services. In my case it was a great chance to see how it all performs under some stress and clean up the console.log calls throughout the code :-)

Tomorrow I will explore how I can feed some events back to segment using analytics.js.