Google Pub/Sub

This guide demonstrates how to plug Mixpanel into an event collection pipeline hosted in Google Cloud. Once set up, your events will route to Mixpanel and be available in real-time for analytics. This approach is serverless and open-source, and takes ~5 minutes to set up.

Note: if you are on AWS, this approach is very similar using Kinesis and AWS Lambda.

Step 1: Create a Cloud Pub/Sub Topic

(You can skip this step if you already have a Pub/Sub topic with events flowing through it.)

Create a new Pub/Sub Topic. All events that will ultimately route to Mixpanel will flow through this Pub/Sub topic.

Step 2a: Add a Cloud Function Trigger to your Pub/Sub Topic

In this step, we set up a Cloud Function to trigger whenever events are pushed to your Pub/Sub topic. Google's documentation goes into full detail on how this trigger works.

From your newly created topic, click +Trigger Cloud Function. Give the Cloud Function a name and Save.

Step 2b: Write the Cloud Function

Switch the runtime to Python3.9 and change the entrypoint from hello_pubsub to main. Paste the code below for main.py and requirements.txt.

import base64
from datetime import date
import gzip
import json
import random
import time

import requests


PROJECT_ID = ""  # mixpanel.com/project/<YOUR_PROJECT_ID>
USER = ""  # Service Account user
PASS = ""  # Service Account password


def main(event, context):
    # Assumes the message consists of lines of JSON, each in Mixpanel's format
    msg = base64.b64decode(event["data"]).decode("utf-8")
    events = [json.loads(line) for line in msg.split("\n") if len(line) > 0]

    # Adjust the timestammps of each of the sample events so that
    # they appear recent. Note: this is just for this demo, do not
    # do this in production.
    for e in events:
        e["properties"]["time"] = int(date.today().strftime("%s"))

    # Send to Mixpanel
    tries = 0
    payload = gzip.compress(json.dumps(events).encode("utf-8"))
    while True:
        resp = requests.post(
            "https://api.mixpanel.com/import",
            params={"strict": "1", "project_id": PROJECT_ID},
            auth=(USER, PASS),
            headers={"Content-Type": "application/json", "Content-Encoding": "gzip"},
            data=payload,
        )
        if resp.status_code == 429 or resp.status_code >= 500:
            # Retryable errors, use exponential backoff
            tries += 1
            time.sleep(min(2 ** tries, 60) + random.randint(1, 5))
            continue
        else:
            break

    if resp.status_code != 200:
        # Log failures. In production, write to a dead-letter queue
        print({"message": "Import failed", "error": resp.json(), "severity": "WARN"})
    else:
        # Log success for debugging. Remove in production.
        print(
            {
                "message": "Import succeeded",
                "count": resp.json()["num_records_imported"],
                "severity": "INFO",
            }
        )
requests

🚧

Authentication

Don't forget to fill out the PROJECT_ID, USER, PASS variables with your project_id and service account credentials.

This code does a very simple passthrough of events from the incoming Pubsub message into Mixpanel's Import API. You can use this function to transform events from your Pub/Sub topic into Mixpanel's format before sending them to our Import API.

Step 3: Test with sample events

Now messages published to our Pubsub topic will trigger an invocation of the function and route events to Mixpanel. Let's give it a try by manually sending a message via the PubSub UI.

On the topic page, navigate to Messages -> Publish message.

Then paste in the following events as the message body.

{"event": "test_event", "properties": {"distinct_id": "Sunrostern", "$insert_id": "28096095", "title": "Creator Giveaway for Publishing Notes", "url": "https://www.viewert.com", "score": "1", "time": 1628315585}}
{"event": "test_event", "properties": {"distinct_id": "feross", "$insert_id": "28059483", "title": "`at` Method for Relative Indexing", "url": "https://v8.dev/features/at-method", "score": "1", "time": 1628074042}}
{"event": "test_event", "properties": {"distinct_id": "prostoalex", "$insert_id": "28069645", "title": "Home Classrooms Became a Necessity During Covid. Now They\u2019re a Selling Point", "url": "https://www.wsj.com/articles/home-classrooms-covid-real-estate-11628100036", "score": "1", "time": 1628135271}}
{"event": "test_event", "properties": {"distinct_id": "bingewave", "$insert_id": "28063639", "title": "Building a Live Streaming Movie App and Live TV Website \u2013 ReactJS", "url": "https://medium.com/bingewave/building-a-live-streaming-movie-app-live-tv-website-part-1-d0857aaac8ea", "score": "1", "time": 1628097439}}
{"event": "test_event", "properties": {"distinct_id": "jashkenas", "$insert_id": "28063632", "title": "Classic Research in Data Visualization", "url": "https://observablehq.com/@tophtucker/classic-research-in-data-visualization", "score": "1", "time": 1628097398}}
{"event": "test_event", "properties": {"distinct_id": "ivandiblasi68", "$insert_id": "28091778", "title": "Il sistema sanitario irlandese colpito da un ransomware", "url": "https://www.kaspersky.it/blog/irish-health-service-ransomware/24680/", "score": "1", "time": 1628278340}}
{"event": "test_event", "properties": {"distinct_id": "akdav", "$insert_id": "28091776", "title": "DeFi and KYC", "url": "https://link.medium.com/bsn7z9Jmvib", "score": "1", "time": 1628278312}}
{"event": "test_event", "properties": {"distinct_id": "webscraping99", "$insert_id": "28085411", "title": "LinkedIn Profile Detail Scrapers \u2013 Ahmad Software Technologies", "url": "https://ahmadsoftwaretechnologies3.mypixieset.com/linkedin-profile-detail-scrapers/", "score": "1", "time": 1628246831}}
{"event": "test_event", "properties": {"distinct_id": "Dorimoody", "$insert_id": "28064346", "title": "Our Children and Our Citations: Each One, Both Together", "url": "https://www.plough.com/en/topics/life/work/our-children-and-our-citations-each-one-both-together", "score": "1", "time": 1628101041}}
{"event": "test_event", "properties": {"distinct_id": "feross", "$insert_id": "28064342", "title": "The SEC Has Its Eye on Crypto", "url": "https://www.bloomberg.com/opinion/articles/2021-08-04/the-sec-has-its-eye-on-crypto", "score": "1", "time": 1628101019}}

Once you Publish, the function will trigger and pass the above payload to the Cloud Function. Within a minute, you should see an Import succeeded log line in the Cloud Function logs.

You can then navigate to Live View to see the events in Mixpanel.

Step 4: Connecting your production pipeline

At this point, you can route events from your production Pub/Sub topic through a Cloud Function in a similar manner as described above. Simplify modify the Cloud Function to transform events from your internal event format into the format expected by Mixpanel. This is also a good point to strip any PII.

Once connected, this will result in a steady stream of events being sent to Mixpanel. Happy streaming!

Error Handling

In rare instances, Mixpanel's /import API may return a 429 or 5XX error. These can be safely retried. We recommend an exponential backoff with jitter strategy, as written in the sample code above. PubSub can be configured to perform exponential backoff if the function itself times out.

If the payload is malformed, our API might return a 400 error. In this case, the item cannot be ingested and this error should not be retried. Enqueue these messages onto a dead-letter-queue for inspection later on. This might happen while initially testing the connector, but should be rare in production, assuming the shape of data flowing through the pipeline remains consistent.

Google has a great reference on best practices for handling both retryable and non-retryable errors.


Did this page help you?