arrow-left arrow-right brightness-2 chevron-left chevron-right circle-half-full facebook-box facebook loader magnify menu-down rss-box star twitter-box twitter white-balance-sunny window-close
Day 3: Cleanup — Identify Visitor Source
9 min read

Day 3: Cleanup — Identify Visitor Source

Day 3: Cleanup — Identify Visitor Source

I was reading up on using Lambda, Node and DynamoDB and found some samples with much cleaner/easier code.

So first I will clean up some of my the code from Day 1 and Day 2.

Goodbye Express, Serverless-HTTP

The next part is blatantly stolen from the post below. If you need to know why please read the post below. All credits to Van Huynh

Build a RESTful API using AWS Lambda, API Gateway, DynamoDB and the Serverless Framework
Some readers have pointed out that if you follow along and copy the code samples into your project that you might get errors when building or running. Most of the time, the errors are related to…

Create a file called src/utils/request.util.js for our request utility.

For now, the request utility will contain a single function that will accept some parser function and return a new function that uses it to parse some text we pass to it.

// file: src/utils/request.util.js

const parseWith = (parser) => (text) => {
  if (!parser) {
    throw new Error('parser');
  }

  if (!text) {
    throw new Error('text');
  }

  return parser(text);
};

module.exports = {
  parseWith
};
src/utils/request.util.js

Create another file called src/utils/response.util.js for our response utility.

// file: src/utils/response.util.js

const withStatusCode = (statusCode, formatter = null) => {
  if (100 > statusCode || statusCode > 599) {
    throw new Error('status code out of range');
  }

  const hasFormatter = typeof formatter === 'function';
  // send whatever was passed in through if a formatter is not provided
  const format = hasFormatter ? formatter : _ => _;

  // return a function that will take some data and formats a response with a status code
  return (data = null) => {
    const response = {
      statusCode: statusCode
    };

    // only send a body if there is data
    if (data) {
      response.body = format(data);
    }

    return response;
  }
};

module.exports = {
  withStatusCode
};
src/utils/response.util.js

The withStatusCode response utility function is similar to the request utility, but instead of parsing text, it will format data into text. It also contains addition checks to make sure our status codes are within the range of allowable status codes.

Then we’re creating a factory as we keep instantiating DynamoDb with the same options:

const AWS = require('aws-sdk');
const { DocumentClient } = require('aws-sdk/clients/dynamodb');

module.exports = () => {
  return new DocumentClient( {
      // ensures empty values (userId = null) are converted
      // more @ https://stackoverflow.com/questions/37479586/nodejs-with-dynamodb-throws-error-attributevalue-may-not-contain-an-empty-strin
      convertEmptyValues: true
  });
};
dynamodb.factory.js

Using those new utils our index.js looks like this

const dynamoDBFactory = require('./src/dynamodb.factory');
const dynamoDb = dynamoDBFactory();

const { withStatusCode } = require('./src/utils/response.util');
const { parseWith } = require('./src/utils/request.util');

const parseJson = parseWith(JSON.parse);
const ok = withStatusCode(200);
const problem = withStatusCode(400);

const IDENTIFY_TABLE = process.env.IDENTIFY_TABLE;
const PAGE_TABLE = process.env.PAGE_TABLE;

exports.handler = async (event) => {
    const request_body = parseJson(event.body);
    const { type, messageId } = request_body;

    if (type !== 'page' && type !== 'identify') {
        return ok('not a page');
    }

    const params = {
        TableName: ( type === 'identify' ? IDENTIFY_TABLE : PAGE_TABLE),
        Item: {
            messageId,
            ...request_body
        },
    };

    try {
        await dynamoDb.put(params).promise();
    } catch (e) {
        console.log('Could not store event', e);
        return problem(e.message);
    }

    return ok();
};
index.js

That’s right. No more express, body-parser or serverless-http. So remove them and make sure to uninstall

npm remove body-parser express serverless-http --save

More cleanup

I had some inconsistencies in serverless.yml. Using trackinstead of page for table names. Fix that in serverless.yml

service: solving-marketing-attribution

custom:
  # sma = Solving Marketing Attribution
  tableIdentify: 'sma-identify-${self:provider.stage}'
  tablePage: 'sma-event-page-${self:provider.stage}'

provider:
  name: aws
  runtime: nodejs10.x
  stage: dev
  region: eu-west-1
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:UpdateItem
        - dynamodb:DeleteItem
        - dynamodb:ListStreams
      Resource:
        - { "Fn::GetAtt": ["SegmentIdentifyDynamoDBTable", "Arn" ] }
        - { "Fn::GetAtt": ["SegmentPageDynamoDBTable", "Arn" ] }
  environment:
    IDENTIFY_TABLE: ${self:custom.tableIdentify}
    PAGE_TABLE: ${self:custom.tablePage}

functions:
  hello:
    handler: index.handler
    events:
      - http: 'POST /events'

resources:
  Resources:
    SegmentIdentifyDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: messageId
            AttributeType: S
        KeySchema:
          - AttributeName: messageId
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:custom.tableIdentify}
    SegmentPageDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: messageId
            AttributeType: S
        KeySchema:
          - AttributeName: messageId
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:custom.tablePage}

Quick sls deploy to get everything deployed. We did rename a table so the page events will be empty 🤨


Allright. Ready to do some new stuff. 😎

So what we want to do is two things today:

  1. Store the identify() calls together with anonymousId so we can map visitor sessions (pages, tracks, …) to real users
  2. Run page() calls through some external library and detect what type of traffic it is (search, paid, social,…)

Streaming DB inserts

Because I have my own dataset with historic segment events I think I might import the old historic archive later. For that reason, I have decided to use streaming to launch a 2nd Lambda function after the record is inserted. This way I can safely batch import old page events and know they will be handled by the same processing logic.

Let’s register two new functions and set up the streaming link in serverless.yml

service: segment-attribution

plugins:
  - serverless-offline

custom:
  tableIdentify: 'segment-event-identify-${self:provider.stage}'
  tablePage: 'segment-event-page-${self:provider.stage}'
  tableAttribution: 'segment-event-attribution-${self:provider.stage}'
  tableUserMapping: 'segment-event-user-map-${self:provider.stage}'

provider:
  name: aws
  runtime: nodejs10.x
  stage: dev
  region: eu-west-1
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:UpdateItem
        - dynamodb:DeleteItem
        - dynamodb:ListStreams
      Resource:
        - { "Fn::GetAtt": ["SegmentIdentifiesDynamoDBTable", "Arn" ] }
        - { "Fn::GetAtt": ["SegmentPageDynamoDBTable", "Arn" ] }
        - { "Fn::GetAtt": ["SegmentAttributionDynamoDBTable", "Arn" ] }
        - { "Fn::GetAtt": ["SegmentUserMappingDynamoDBTable", "Arn" ] }
  environment:
    IDENTIFY_TABLE: ${self:custom.tableIdentify}
    PAGE_TABLE: ${self:custom.tablePage}
    ATTRIBUTION_TABLE: ${self:custom.tableAttribution}
    USER_MAP_TABLE: ${self:custom.tableUserMapping}

functions:
  - ${file(src/handlers/api/api.serverless.yml)}
  - ${file(src/handlers/process.serverless.yml)}
  - ${file(src/handlers/segment.serverless.yml)}

resources:
  Resources:
    SegmentIdentifiesDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        StreamSpecification:
          StreamViewType: NEW_IMAGE
        AttributeDefinitions:
          - AttributeName: messageId
            AttributeType: S
        KeySchema:
          - AttributeName: messageId
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:custom.tableIdentify}
    SegmentPageDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        StreamSpecification:
          StreamViewType: NEW_IMAGE
        AttributeDefinitions:
          - AttributeName: messageId
            AttributeType: S
        KeySchema:
          - AttributeName: messageId
            KeyType: HASH
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:custom.tablePage}
    SegmentAttributionDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: anonymousId
            AttributeType: S
          - AttributeName: eventId
            AttributeType: S
        KeySchema:
          - AttributeName: anonymousId
            KeyType: HASH
          - AttributeName: eventId
            KeyType: RANGE
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:custom.tableAttribution}
    SegmentUserMappingDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: userId
            AttributeType: S
          - AttributeName: anonymousId
            AttributeType: S
        KeySchema:
          - AttributeName: userId
            KeyType: HASH
          - AttributeName: anonymousId
            KeyType: RANGE
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        TableName: ${self:custom.tableUserMapping}
serverless.yml

Hit sls deploy

In some cases, I had to erase my deployment sls remove or even sls remove -f because some changes to the tables (setting up the streaming) could not be completed.

Don’t forget to export BASE_DOMAIN after that to point to the new deployment.

Processing Identify calls

Every time an identify is stored (Day 2) another Lambda function is automatically called. Let’s add the code (in /processIdentify.js) to store a mapping (userId to anonymously)

const { unmarshallNewImageEvent } = require('./src/utils/dynamo_stream.util');
const dynamoDBFactory = require('./src/dynamodb.factory');
const { UserToAnonymousModel } = require('./src/models/UserToAnonymous');

const dynamoDb = dynamoDBFactory();
const model = new UserToAnonymousModel(dynamoDb);

module.exports.handler = async (event) => {
    const eventData = event.Records[0];
    const eventAddedUnmarshalled = unmarshallNewImageEvent(eventData);
    const { type: eventType, userId, anonymousId } = eventAddedUnmarshalled;

    // only handle dynamo INSERTS and events with type identify
    if (eventData.eventName !== 'INSERT' || eventType !== 'identify') {
        return;
    }

    try {
        await model.storeMap(userId, anonymousId);
    } catch (e) {
        console.log(e.message)
    }

    return eventData.eventName;
};
processIdentify.js

Two new files come with that too. The first one is a model to keep our Lambda handler clean and DRY.

//src/models/UserToAnonymous.js
const USER_MAP_TABLE = process.env.USER_MAP_TABLE;

class UserToAnonymousModel {
    get _baseParams() {
        return {
            TableName: USER_MAP_TABLE
        };
    }

    constructor(documentClient) {
        this._documentClient = documentClient;
    }

    /**
     *
     * Checks if the map already exists
     * If no -> create
     * If yes -> merge new anonymous ids and store
     *
     * @param userId
     * @param anonymousId
     * @returns {Promise<D & {$response: Response<D, E>}>}
     */
    async storeMap(userId, anonymousId) {
        const params = this._createParamObject({
            Item: {
                userId,
                anonymousId,
            }
        });

        await this._documentClient.put(params).promise();

        return;
    }

    async getAnonymousIdsForUser(userId) {

        const params = this._createParamObject({
            KeyConditionExpression: "#userId = :userId",
            ExpressionAttributeNames:{
                "#userId": "userId"
            },
            ExpressionAttributeValues: {
                ":userId": userId
            }
        });

        const allItems = await this._documentClient.query(params).promise();

        return allItems.Items.map(({ anonymousId }) => anonymousId);
    }

    _createParamObject(additionalArgs = {}) {
        return Object.assign({}, this._baseParams, additionalArgs);
    }
}

exports.UserToAnonymousModel = UserToAnonymousModel;
src/models/UserToAnonymous.js

The second file is to deal with the fact that DB events come with a special (marshalled) format. As we need the original event we need to unmarshall it. Read about it at Stack Overflow.

// /src/utils/dynamo_stream.util.js
const AWS = require('aws-sdk');

const unmarshallNewImageEvent = (eventData) => {
    if (!eventData.dynamodb) {
        throw new Error('Not a dynamo event');
    }

    // if you want the raw event we need to unmarshall it
    // more @ https://stackoverflow.com/questions/44535445/unmarshall-dynamodb-json
    return AWS.DynamoDB.Converter.unmarshall(eventData.dynamodb.NewImage);
};


module.exports = {
    unmarshallNewImageEvent
};
src/utils/dynamo_stream.util.js

After that deploy that function only.

sls deploy --function=processIdentify

Now try to hit your segment endpoint with a new identify event

http POST $BASE_DOMAIN/events < events/identify.json

In a new tab monitor the output

sls logs --function=processIdentify

After that, you will see that a new record was written to the user mapping table. Mine (with the example event) looked like this:

First record in dynamoDB

Processing Track calls

Next up is processing track calls very similarly. We already have the DB and streaming function created.

As mentioned in the plan I wanted to use a library to parse the HTTP Referrer and current URL and group sessions in some kind of grouping (paid, social, referrer, search…).

A quick check those libraries I found segment/inbound the easiest library. But when using it I bumped on to some issues:

  • Not maintained
  • Does not detect referrer: https://www.google.com as search
  • Outdated social networks and search engines
  • Tests were broken
  • Problem with a sublibrary that was accessing an external URL.

I found a better fork (better maintenance), but then I had to fork that again to apply some of my own fixes. Too bad I didn’t know about Monkey Referrer then :-( For now we’ll use the fork I created.

npm install --save https://github.com/digitalbase/inbound

I need two new utils to keep the function clean:

The first is using this inbound library and returning a promise to parse the HTTP referrer and/or current URL and return something useful.

// src/utils/referrer_detection.util.js

const inbound = require('prezly-inbound');

module.exports = async(href, referrer) => {
    console.log('going extraction', href, referrer);

    return new Promise((resolve, reject) => {
        inbound.referrer.parse(href, referrer, (error, data) => {
            if (error) {
                reject(error);
            } else {
                if (data.referrer.type === 'internal') {
                    resolve(null);
                }

                resolve(data);
            }
        });
    });
};
src/utils/referrer_detection.util.js

nother one to extract the important properties from a page() event

// src/utils/event_extraction.util.js

module.exports = (request_body) => {
    const referrer = request_body.context.page.referrer;
    const href = request_body.context.page.url;
    const anonymousId = request_body.anonymousId;
    const messageId = request_body.messageId;
    const userId = request_body.userId;
    const type = request_body.type;
    const timestamp = request_body.timestamp;

    // Only handle `page` and `track` events
    if (type !== 'page' && type !== 'track') {
        return false;
    }

    return {
        referrer,
        messageId,
        href,
        anonymousId,
        timestamp,
        userId
    }
};
src/utils/event_extraction.util.js

Making the processPage file (also automatically called on DB inserts) look like this

// /processPage.js

const extractor = require('./src/utils/event_extraction.util');
const referrer_detection = require('./src/utils/referrer_detection.util');
const { unmarshallNewImageEvent } = require('./src/utils/dynamo_stream.util');

const dynamoDBFactory = require('./src/dynamodb.factory');
const { SourceAttributionModel } = require('./src/models/SourceAttribution');

const dynamoDb = dynamoDBFactory();
const model = new SourceAttributionModel(dynamoDb);

module.exports.handler = async (event) => {
    const eventData = event.Records[0];

    if (eventData.eventName !== 'INSERT') {
        return;
    }

    const eventAddedUnmarshalled = unmarshallNewImageEvent(eventData);
    const extractedData = extractor(eventAddedUnmarshalled);
    const {referrer, href } = extractedData;

    const extraction = await referrer_detection(href, referrer);
    if (!extraction) {
        console.log('Skipping useless extraction', extractedData);
        return;
    }

    try {
        await model.store(eventAddedUnmarshalled, extraction);
    } catch (e) {
        console.log(e.message)
    }

    return eventData.eventName;
};
processPage.js

In short this will:

Get the page() payload -> run it through inbound lib -> store that together with url and HTTP referrer in a new table optimised for reads by anonymousId.

One more file (the model)

// /src/models/SourceAttribution
const ATTRIBUTION_TABLE = process.env.ATTRIBUTION_TABLE;

const extractor = require('../utils/event_extraction.util');

class SourceAttributionModel {
    get _baseParams() {
        return {
            TableName: ATTRIBUTION_TABLE
        };
    }

    constructor(documentClient) {
        this._documentClient = documentClient;
    }

    async store(eventData, extraction) {
        const extractedData = extractor(eventData);
        const {referrer, href, anonymousId, userId, timestamp, messageId } = extractedData;

        const eventId = `${timestamp}-${messageId}`; // using as hash function
        const params = this._createParamObject({
            Item: {
                anonymousId,
                eventId,
                messageId,
                timestamp,
                userId,
                url: href,
                referrerUrl: referrer,
                ...extraction
            }
        });

        return await this._documentClient.put(params).promise();
    }

    async getForAnonymousId(anonymousId) {

        const params = this._createParamObject({
            KeyConditionExpression: "#anonymousId = :anonymousId",
            ExpressionAttributeNames:{
                "#anonymousId": "anonymousId"
            },
            ExpressionAttributeValues: {
                ":anonymousId": anonymousId
            }
        });
        const allItems = await this._documentClient.query(params).promise();

        return allItems.Items;
    }

    async getForAnonymousIds(anonymousIds) {
        const promises = anonymousIds.map(async anonymousId => {
            return this.getForAnonymousId(anonymousId);
        });

        const results = await Promise.all(promises);

        return results.filter((el) => {
            console.log(el);
            return el.length !== 0;
        });
    }

    _createParamObject(additionalArgs = {}) {
        return Object.assign({}, this._baseParams, additionalArgs);
    }
}

exports.SourceAttributionModel = SourceAttributionModel;
src/models/SourceAttribution

Give it a quick deploy

sls deploy --function=processPage

And after feeding the test event (make sure to clear the page table first) you should see some events.

http POST $BASE_DOMAIN/events < events/page.json

So this call feeds the webhook (created in Day 2) a new page() event just like segment would. After that insert, DynamoDB is automatically calling another Lambda function (processPage) that runs the HTTP referrer and current URL through the inbound library and storing that payload (together with other important properties) in sma-event-attribution-dev

Feed it some more events (copy them from segment) and have a look in the DynamoDB interface. Here is what I saw with the test event:

There you go. Some great progress today. Tomorrow we’ll start by moving this to production and then creating an API to generate a stream per user or anonymousId.