arrow-left arrow-right brightness-2 chevron-left chevron-right circle-half-full facebook-box facebook loader magnify menu-down rss-box star twitter-box twitter white-balance-sunny window-close
Day 6: Feeding source attribution data back to Segment.com
8 min read

Day 6: Feeding source attribution data back to Segment.com

So in the first 5 days we have created and deployed some lambda functions that:

  1. store incoming segment events in a dynamoDB table (webhook)
  2. store mapping between anonymousId and userId in DynaoDB table (webhook)
  3. process URL and HTTP referrer and detect where the traffic is from using this github library
  4. allow us to query (for debugging purposes) that data by userId or anonymousId

Additionally I have imported a 3 year archive of page() and identify() calls to not have to start from scratch.

Next up: Feeding this data back to segment.com.

feeding information back to segment

Triggering the right segment events

Over time we want to trigger those events in real-time but I will start by providing an endpoint to manually trigger them so we can debug in segment, mixpanel and customer.io if everything is coming in.

In the first version of the below code I was using segment analytics node SDK (https://github.com/segmentio/analytics-node). It worked fine when triggered through command line (node script.js) but working with Lambda/Async architecture I found some calls not to be triggered. Then I found this bug so switched to using the HTTP tracking API specifically the batch API to feed my events.

For now we’ll start with 2 attribution models: First and Last touch. To allow the different reporting tools to use that information in different reports we will write user traits using identify()

The information coming out of the visitor source detection (see Day 3 — Identify User Source) looks like this

{
  "referrer": {
     "type": "email",
     "client": "gmail",
     "from": "https://mail.google.com/_/mail-static/_/js/main/m_i,t/rt=h/ver=am293eyFlXI.en./sv=1/am=!v8Czf-oeNMn1FOzaNKsLQrJy-oNN3RSSYMAZTBUxCzwgQcXtLnTEHCkGr437GpFE2Dliuw/d=1",
     "link": "http://blog.intercom.io/churn-retention-and-reengaging-customers/?utm_source=feedburner&utm_medium=feed&utm_campaign=Feed%3A+contrast%2Fblog+%28The+Intercom+Blog%29"
  },
  "campaign": {
    "source": "feedburner",
    "medium": "feed",
    "campaign": "Feed: contrast/blog (The Intercom Blog)"
  }
}

Not sure if segment supports nested properties but I tried it in the past and didn't get it to work. A quick google search thought me that it’s better to flatten the properties. As we’re sending these properties for both first and last touch the end result will look something like this

{
  "type": "identify",
  "userId": "[user_id]",
  "properties": {
    "source_first_referrer": null,
    "source_first_referrer_client: "gmail"
    "source_first_referrer_type": "email",
    "source_first_campaign_source": "feedburner",
    "source_first_campaign_medium": "feed",
    "source_first_campaign_name": "Feed: contrast/blog (The Intercom Blog)",
    "source_first_url": "http://blog.intercom.io/churn-retention-and-reengaging-customers/?utm_source=feedburner&utm_medium=feed&utm_campaign=Feed%3A+contrast%2Fblog+%28The+Intercom+Blog%29"
    "source_first_referrer": "https://mail.google.com/_/mail-static/_/js/main/m_i,t/rt=h/ver=am293eyFlXI.en./sv=1/am=!v8Czf-oeNMn1FOzaNKsLQrJy-oNN3RSSYMAZTBUxCzwgQcXtLnTEHCkGr437GpFE2Dliuw/d=1"
    "source_last_referrer": null,
    "source_last_referrer_client: "gmail"
    "source_last_referrer_type": "email",
    "source_last_campaign_source": "feedburner",
    "source_last_campaign_medium": "feed",
    "source_last_campaign_name": "Feed: contrast/blog (The Intercom Blog)",
    "source_last_url": "http://blog.intercom.io/churn-retention-and-reengaging-customers/?utm_source=feedburner&utm_medium=feed&utm_campaign=Feed%3A+contrast%2Fblog+%28The+Intercom+Blog%29"
    "source_last_referrer": "https://mail.google.com/_/mail-static/_/js/main/m_i,t/rt=h/ver=am293eyFlXI.en./sv=1/am=!v8Czf-oeNMn1FOzaNKsLQrJy-oNN3RSSYMAZTBUxCzwgQcXtLnTEHCkGr437GpFE2Dliuw/d=1"
  },
  "receivedAt": "2020-04-30T07:47:15.054Z",
  "sentAt": null,
  "timestamp": "2020-04-08T14:39:01.906Z",
}

This is the file that does most of the heavy lifting. Not too proud about this file but I told you I’m not the best JS programmer. It works though 😊

// src/SegmentTracking.js
const https = require('https');

const dynamoDBFactory = require('./dynamodb.factory');
const {UserToAnonymousModel} = require('./models/UserToAnonymous');
const {SourceAttributionModel} = require('./models/SourceAttribution');

const SOURCE_IDENTIFIED_EVENT_NAME = process.env.ANALYTICS_SOURCE_IDENTIFICATION_EVENT;

class SegmentTracking {
    constructor() {
        this._dynamoDb =  dynamoDBFactory();
        this._model_usermap = new UserToAnonymousModel(this._dynamoDb);
        this._model_source = new SourceAttributionModel(this._dynamoDb);
    }

    extractPropertiesFromEvent(event, prefix = '') {
        const properties = {};

        if (prefix !== '') {
            prefix = prefix + '_';
        }

        properties[prefix+'url'] = event.url;
        properties[prefix+'referrer'] = event.referrerUrl;

        if (event.referrer) {
            properties[prefix+'referrer_type'] = event.referrer.type || null;
            properties[prefix+'referrer_network'] = event.referrer.network || null;
            properties[prefix+'referrer_engine'] = event.referrer.engine || null;
            properties[prefix+'referrer_host'] = event.referrer.host || null;
            properties[prefix+'referrer_client'] = event.referrer.client || null;
        }

        if (event.campaign) {
            properties[prefix+'campaign_name'] = event.campaign.campaign || null;
            properties[prefix+'campaign_source'] = event.campaign.source || null;
            properties[prefix+'campaign_medium'] = event.campaign.medium || null;
            properties[prefix+'campaign_term'] = event.campaign.term || null;
        }


        return properties;
    }

    async trackUser(user_id, include_track_events= false) {
        const anonymousIds = await this._model_usermap.getAnonymousIdsForUser(user_id);
        const attributionSessions = await this._model_source.getForAnonymousIds(anonymousIds);
        let data = [];

        if (anonymousIds.length > 0) {
            const [first] = attributionSessions;
            const [last] = [...attributionSessions ].reverse(); //to not modify the original
            const user_properties = {
                type: "identify",
                userId: user_id,
                active: false,
                ip: null,
                context: {
                    active:false
                },
                traits: {
                    lastSyncedSma: new Date(),
                    ...this.extractPropertiesFromEvent(first, 'source_first'),
                    ...this.extractPropertiesFromEvent(last, 'source_last'),
                }
            };

            //console.log(user_properties);
            data.push(user_properties);
        }

        if (include_track_events === true) {

            attributionSessions.forEach((session) => {
                const event_timestamp = new Date(session.timestamp);
                //console.log(session.timestamp);
                const tracking_properties = {
                    type: "track",
                    //anonymousId: "c333e5eb-4eb2-4a9d-a312-5e3fff3e2620",
                    userId: user_id,
                    context: {
                        active:false
                    },
                    integrations: { All: true },
                    event: SOURCE_IDENTIFIED_EVENT_NAME,
                    timestamp: event_timestamp.toISOString(),
                    properties: this.extractPropertiesFromEvent(session)
                };

                data.push(tracking_properties);
                //console.log(tracking_properties);
            });
        }

        return this.callSegment(data);
    }

    async callSegment(events) {
        return new Promise((resolve, reject) => {
            const data = JSON.stringify({ batch: events });

            // https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#headers
            const auth_username = process.env.ANALYTICS_WRITE_KEY + ":";
            const auth_buffer = Buffer.from(auth_username);
            const auth = 'Basic: ' + auth_buffer.toString('base64');
            const options = {
                hostname: 'api.segment.io',
                port: 443,
                path: '/v1/batch',
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                    'Content-Length': data.length,
                    'Authorization': auth
                }
            };

            const req = https.request(options,
                (res) => {
                    let body = '';
                    res.on('data', (chunk) => (body += chunk.toString()));
                    res.on('error', reject);
                    res.on('end', () => {
                        if (res.statusCode >= 200 && res.statusCode <= 299) {

                            resolve({statusCode: res.statusCode, headers: res.headers, body: body});
                        } else {
                            reject('Request failed. status: ' + res.statusCode + ', body: ' + body);
                        }
                    });
                });

            req.on('error', reject);
            req.write(data);
            req.end();
        });
    }

    async trackAnonymous(anonymous_id) {
        const attributionSessions = await this._model_source.getForAnonymousId(anonymous_id);
        let data = [];

        attributionSessions.forEach((session) => {
            //console.log(session.timestamp);
            const tracking_properties = {
                type: "track",
                anonymousId: anonymous_id,
                event: SOURCE_IDENTIFIED_EVENT_NAME,
                timestamp: session.timestamp,
                properties: this.extractPropertiesFromEvent(session)
            };

            data.push(tracking_properties);

            //console.log(tracking_properties);
        });

        return this.callSegment(data);
    }
}


exports.SegmentTracking = SegmentTracking;
/src/SegmentTracking.js

I have refactored the codebase a bit to use dotenv in combination with serverless. Here are some of the changes:

npm install serverless-dotenv-plugin --save

Create an .env file

STAGE=dev
REGION=eu-west-1
SERVICE_NAME=sma

IDENTIFY_TABLE=sma-dev-identify-event
PAGE_TABLE=sma-dev-page-event
ATTRIBUTION_TABLE=sma-dev-source-attribution
USER_MAP_TABLE=sma-dev-user-anonymous-map

ANALYTICS_WRITE_KEY=[YOUR_ANALYTICS_WRITE_KEY]
ANALYTICS_SOURCE_IDENTIFICATION_EVENT="Source Identified"

And update the serverless.yml file like this:

service: ${env:SERVICE_NAME}

package:
  exclude:
    - data/**
    - events/**
    - .idea

plugins:
  - serverless-offline
  - serverless-dotenv-plugin

provider:
  name: aws
  runtime: nodejs10.x
  stage: ${env:STAGE}
  region: ${env:REGION}
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:UpdateItem
        - dynamodb:DeleteItem
        - dynamodb:ListStreams
        - lambda:InvokeFunction
      Resource:
        - { "Fn::GetAtt": ["SegmentIdentifiesDynamoDBTable", "Arn" ] }
        - { "Fn::GetAtt": ["SegmentPageDynamoDBTable", "Arn" ] }
        - { "Fn::GetAtt": ["SegmentAttributionDynamoDBTable", "Arn" ] }
        - { "Fn::GetAtt": ["SegmentUserMappingDynamoDBTable", "Arn" ] }

functions:
  - ${file(src/handlers/api/api.serverless.yml)}
#  - ${file(src/handlers/debug/debug.serverless.yml)}
  - ${file(src/handlers/process/process.serverless.yml)}
  - ${file(src/handlers/segment/segment.serverless.yml)}
#  - ${file(src/handlers/customerio/customerio.serverless.yml)}

resources:
  Resources:
    SegmentIdentifiesDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        StreamSpecification:
          StreamViewType: NEW_IMAGE
        AttributeDefinitions:
          - AttributeName: messageId
            AttributeType: S
        KeySchema:
          - AttributeName: messageId
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
        TableName: ${env:IDENTIFY_TABLE}
    SegmentPageDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        StreamSpecification:
          StreamViewType: NEW_IMAGE
        AttributeDefinitions:
          - AttributeName: messageId
            AttributeType: S
        KeySchema:
          - AttributeName: messageId
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
        TableName: ${env:PAGE_TABLE}
    SegmentAttributionDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: anonymousId
            AttributeType: S
          - AttributeName: eventId
            AttributeType: S
        KeySchema:
          - AttributeName: anonymousId
            KeyType: HASH
          - AttributeName: eventId
            KeyType: RANGE
        BillingMode: PAY_PER_REQUEST
        TableName: ${env:ATTRIBUTION_TABLE}
    SegmentUserMappingDynamoDBTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: userId
            AttributeType: S
          - AttributeName: anonymousId
            AttributeType: S
        KeySchema:
          - AttributeName: userId
            KeyType: HASH
          - AttributeName: anonymousId
            KeyType: RANGE
        BillingMode: PAY_PER_REQUEST
        TableName: ${env:USER_MAP_TABLE}
serverless.yml

And another serverless.yml file for the segment handlers:

//src/handlers/segment/segment.serverless.yml
storeEvent:
  handler: src/handlers/segment/storeEvent.handler
  events:
    - http: 'POST /events'
identifyUser:
  handler: src/handlers/segment/identifyUser.handler
  events:
    - http: 'GET /segment/identify/user/{id}'
trackAnonymous:
  handler: src/handlers/segment/trackAnonymous.handler
  events:
    - http: 'GET /segment/track/anonymous/{id}'
trackUser:
  handler: src/handlers/segment/trackUser.handler
  events:
    - http: 'GET /segment/track/user/{id}'
/src/handlers/segment/segment.serverless.yml

Notice how the handlers are now in different files. To make the segment part work create the following handlers (notice these are 3 files in one code block)

// /src/handlers/segment/identifyUser.js
const { withStatusCode } = require('../../utils/response.util');
const ok = withStatusCode(200, JSON.stringify);

const {SegmentTracking} = require('../../SegmentTracking');
const SegmentTracker = new SegmentTracking();

exports.handler = async (event) => {
    const { id } = event.pathParameters;

    await SegmentTracker.trackUser(id, false);

    return ok('Track Completed');
};

//src/handlers/segment/storeEvent.js
const dynamoDBFactory = require('../../dynamodb.factory');
const dynamoDb = dynamoDBFactory();

const { withStatusCode } = require('../../utils/response.util');
const { parseWith } = require('../../utils/request.util');

const parseJson = parseWith(JSON.parse);
const ok = withStatusCode(200);
const problem = withStatusCode(400);

const IDENTIFY_TABLE = process.env.IDENTIFY_TABLE;
const PAGE_TABLE = process.env.PAGE_TABLE;

exports.handler = async (event) => {
    const request_body = parseJson(event.body);
    const { type, messageId } = request_body;

    if (type !== 'page' && type !== 'identify') {
        return ok('not a page');
    }

    const params = {
        TableName: ( type === 'identify' ? IDENTIFY_TABLE : PAGE_TABLE),
        Item: {
            messageId,
            source: "webhook",
            ...request_body
        },
    };

    try {
        await dynamoDb.put(params).promise();
    } catch (e) {
        console.log('Could not store event', e);
        return problem(e.message);
    }

    return ok();
};

// /src/handlers/segment/trackAnonymous.js
const { withStatusCode } = require('../../utils/response.util');
const ok = withStatusCode(200, JSON.stringify);

const {SegmentTracking} = require('../../SegmentTracking');
const SegmentTracker = new SegmentTracking();

exports.handler = async (event) => {
    const { id } = event.pathParameters;

    await SegmentTracker.trackAnonymous(id, true);

    return ok();
};

// /src/handlers/segment/trackUser.js
const { withStatusCode } = require('../../utils/response.util');
const ok = withStatusCode(200, JSON.stringify);

const {SegmentTracking} = require('../../SegmentTracking');
const SegmentTracker = new SegmentTracking();

exports.handler = async (event) => {
    const { id } = event.pathParameters;

    await SegmentTracker.trackUser(id, true);

    return ok('Track Completed');
};

A quick sls deploy will result in a few new endpoints:

Serverless: DOTENV: Loading environment variables from .env:
Serverless:   - STAGE
Serverless:   - REGION
Serverless:   - SERVICE_NAME
Serverless:   - IDENTIFY_TABLE
Serverless:   - PAGE_TABLE
Serverless:   - ATTRIBUTION_TABLE
Serverless:   - USER_MAP_TABLE
Serverless:   - ANALYTICS_WRITE_KEY
Serverless:   - ANALYTICS_SOURCE_IDENTIFICATION_EVENT
Service Information
service: sma
stage: prod
region: eu-west-1
stack: sma-prod
resources: 62
api keys:
  None
endpoints:
  GET - $ENDPOINT_PROD/api/anonymous/{id}
  GET - $ENDPOINT_PROD/api/user/{id}
  POST - $ENDPOINT_PROD/events
  GET - $ENDPOINT_PROD/segment/identify/user/{id}
  GET - $ENDPOINT_PROD/segment/track/anonymous/{id}
  GET - $ENDPOINT_PROD/segment/track/user/{id}
functions:
  getAnonymous: sma-prod-getAnonymous
  getUser: sma-prod-getUser
  processPage: sma-prod-processPage
  processIdentify: sma-prod-processIdentify
  storeEvent: sma-prod-storeEvent
  identifyUser: sma-prod-identifyUser
  trackAnonymous: sma-prod-trackAnonymous
  trackUser: sma-prod-trackUser
layers:
  None

Let me explain the endpoints:

  • POST — $ENDPOINT_PROD/events → Segment to send every event payload and store it in DynamoDB
  • GET — $ENDPOINT_PROD/segment/identify/user/{id} → Fire identify() call with source_first_* and source_last_* properties
  • GET —$ENDPOINT_PROD/segment/track/anonymous/{id} → Fire track('Source Identified') calls with flattened visitor detection properties for anonymous users.
    GET — $ENDPOINT_PROD/segment/track/user/{id} → Fire track('Source Identified') calls with flattened visitor detection properties for identified visitors.

Here is a quick command to update your endpoint so you can run curl/HTTPie commands after deploying or destroying the serverless application

export ENDPOINT_PROD=$(sls info --verbose --stage=prod | grep ServiceEndpoint | sed s/ServiceEndpoint\:\ //g | awk '{print $1}')

Changed the name from BASE_DOMAIN to ENDPOINT_PROD

Testing User Properties (with attribution data).

Let’s trigger the identify for the user with the new attribution data:

http GET $ENDPOINT_PROD/segment/identify/user/[YOUR_USER_ID]

Watching the segment event debugger you’ll see identify() calls coming in:

identify() payload in segment

If all is well you will see that information appear in any integrations you have linked up. In our case CustomerIO and Mixpanel:

Profile in customer.io

Source Identified Events

For other reports (Funnels, Flow Diagram, …) it will be useful to spawn events whenever the user visitor source is detected.

Let’s trigger those calls for the user with the new attribution data:

http GET $ENDPOINT_PROD/segment/track/user/[YOUR_USER_ID]

Watch the segment debugger and see new events (you can specify the event name in .ENV) coming in

Source Identified tracking events

This is possible because I have imported all old events. Every event will have a historic timestamp property which will feed the events as old events. More information in segment Importing Historic Events documentation.

This will make a profile in Mixpanel look like this. Notice both the event information on the left, and the user trait information on the right.

Profile in mixpanel

That’s it. Tomorrow we’ll be exploring how to fire those tracking and identify calls for the last months visits and explore how to report on this newly available information.