Skip to content

SQS Upload

Sam Dozor edited this page Jan 23, 2017 · 3 revisions

SQS Data Ingestion

The mParticle platform can receive large volumes of data sent to an Amazon SQS queue - use this page to learn how to do so with the objects provided in the mParticle Python SDK.

1. Boto 3 Setup

In order to post messages to an SQS queue you'll need to be familiar with the AWS SDK for Python. You can read more about Boto here, or jump to the code samples below.

2. AWS Authentication

In order to support cross-account access, you must setup an IAM role, and a user/credentials with the permission to assume that role. mParticle will then grant access to the role.

  1. Create an IAM role

  2. Edit or create a new policy for a user to assume the role

  3. Boto has the ability to automatically assume the role you created. You can configure the profile to use as in this example:

    # In ~/.aws/credentials:
    [example-user]
    aws_access_key_id=foo
    aws_access_key_id=bar
    
    # In ~/.aws/config
    # specify the role you created as the role_arn
    [profile crossaccount-example]
    role_arn=arn:aws:iam:...
    source_profile=example-user
    

    See more about how to use the AssumeRole API with Boto3 here

  4. Send the full ARN of the role to mParticle, and we will provide you with an SQS URL to use.

  5. Once you have sent in an initial Batch, notify mParticle to have the sender ID for your SQS role authenticated for your mParticle Workspace

3. Constructing a Batch

SQS data ingestion allows you to send lists of Batch objects, or singular Batch objects, following the same schema as our HTTP server-to-server API. Use the models provided by the Python SDK to construct your Batch objects before serializing them for transmission to AWS.

Set the api_key property of each Batch to assign it to an mParticle Input, as well as the environment property:

batch = Batch()
# THIS IS REQUIRED FOR SQS
batch.api_key = 'REPLACE ME'
batch.environment = 'development'
  • Contact to mParticle's implementation team and consult your mParticle data plan to determine which events, user attributes, device or user identities, and other Batch properties to include in your upload.

  • Similar to the mParticle HTTP server-to-server API, you may send a single Batch object or a list of Batch objects in a single SQS message.

4. Serialize and Transmit to SQS

Serialize the Batch objects into a JSON message

# Include multiple batches per upload 
batches = [batch]

# or just send a single batch
# batches = batch

# Serialize the upload - SQS allows for up to 256KB per message
message = json.dumps(mparticle.ApiClient.sanitize_for_serialization(batches))

Retrieve a reference to the queue

Using the profile you set up in ~/.aws/config above, grab a reference to the mParticle queue:

import boto3
boto3.setup_default_session(profile_name='crossaccount-example')
sqs = boto3.resource('sqs')
# mParticle to provide the queue name
queue = sqs.get_queue_by_name(QueueName='REPLACE ME')

Upload the message

mParticle requires the following SQS message attributes to be included with each message:

Key Value Data Type
message-type s2sbatch String
message-format json/text String
message-version 2 String
response = queue.send_message(MessageBody=message, 
    MessageAttributes= { #these attributes are required
        'message-type':{
            'StringValue':'s2sbatch',
            'DataType':'String'
        },
        'message-format':{
            'StringValue':'json/text',
            'DataType':'String'
        },
        'message-version':{
            'StringValue':'2',
            'DataType':'String'
        }
    }
)

Full Code Sample

# follow the AWS SDK Quickstart to set up your environment: https://boto3.readthedocs.io/en/latest/guide/quickstart.html
import boto3, mparticle, json
from mparticle import AppEvent, SessionStartEvent, SessionEndEvent, Batch, ApiClient

batch = Batch()
batch.api_key = 'REPLACE ME'
batch.environment = 'development'

# construct a batch object per user
app_event = AppEvent('Hello World', 'navigation')
batch.events = [SessionStartEvent(), app_event, SessionEndEvent()]
batch.user_attributes = {'eyes':'brown','favorite_sports':['football','tennis']} 

# Include multiple batches per upload 
batches = [batch]

# or just send a single batch
# batches = batch

# Serialize the upload - SQS allows for up to 256KB per message
boto3.setup_default_session(profile_name='crossaccount-example')
message = json.dumps(ApiClient.sanitize_for_serialization(batches))
sqs = boto3.resource('sqs')

# mParticle to provide the queue name
queue = sqs.get_queue_by_name(QueueName='REPLACE ME')
response = queue.send_message(MessageBody=message, 
    MessageAttributes= { #these attributes are required
        'message-type':{
            'StringValue':'s2sbatch',
            'DataType':'String'
        },
        'message-format':{
            'StringValue':'json/text',
            'DataType':'String'
        },
        'message-version':{
            'StringValue':'2',
            'DataType':'String'
        }
    }
)