Google Cloud Storage

This guide demonstrates how to set up a serverless ingest pipeline from a Google Cloud Storage bucket into Mixpanel. Once this is set up, you can simply upload files containing events into the designated GCS bucket and the events will be ingested into Mixpanel, both one-time and on a recurring basis. Setup should take ~10 minutes.

📘

Prerequisites

This guide assumes you are running in Google Cloud Platform, and have the necessary IAM Access to have Cloud Functions read from GCS.

Step 1: Create a GCS Bucket

Create a dedicated Cloud Storage bucket for this integration. We recommend including mixpanel-import in the name to make it explicit and avoid any accidental data sharing.

You can create the bucket in any region, though we recommend us-central for highest throughput.

Step 2a: Setup the Cloud Function

Create a new Cloud Function.

  • Set the trigger to Cloud Storage, the Event Type to Finalize/Create and the bucket name to the bucket created in step 2. This means that any object uploaded to this bucket will trigger an invocation of this Cloud Function.
  • Set Memory to 1GiB, Timeout to 300, and Instances to 20
Cloud Function configurationCloud Function configuration

Cloud Function configuration

Step 2b: Write the Cloud Function

Switch the runtime to Python3.9 and change the entrypoint from hello_gcs to main. Paste the code below for main.py and requirements.txt.

from datetime import date
import gzip
import json
import random
import time

from google.cloud import storage
import requests

PROJECT_ID = ""  # mixpanel.com/project/<YOUR_PROJECT_ID>
USER = ""  # Service Account user
PASS = ""  # Service Account password

# Flush a batch once these limits are hit
EVENTS_PER_BATCH = 2000
BYTES_PER_BATCH = 2 * 1024 * 1024


# Adjust event timestamps of test data so that it appears recent in Mixpanel.
# NOTE: THIS IS JUST FOR DEMO PURPOSES.
# REPLACE THIS WITH YOUR OWN TRANSFORMS IN PRODUCTION.
def transform_to_event(line):
    """Convert a line of the file to a json string in Mixpanel's format."""
    event = json.loads(line)
    event["properties"]["time"] = int(date.today().strftime("%s"))
    return json.dumps(event)


def flush(batch):
    payload = gzip.compress("\n".join(batch).encode("utf-8"))
    tries = 0
    while True:
        resp = requests.post(
            "https://api.mixpanel.com/import",
            params={"strict": "1", "project_id": PROJECT_ID},
            headers={
                "Content-Type": "application/x-ndjson",
                "Content-Encoding": "gzip",
            },
            auth=(USER, PASS),
            data=payload,
        )
        if resp.status_code == 429 or resp.status_code >= 500:
            time.sleep(min(2 ** tries, 60) + random.randint(1, 5))
            tries += 1
            continue
        return resp


def main(request, context):
    gcs = storage.Client()
    bucket = gcs.get_bucket(request["bucket"])
    blob = bucket.get_blob(request["name"])
    
    compressed = request["name"].endswith(".gz")

    error = None
    batch, batch_bytes = [], 0
    
    # As a convenience, check for files ending with .gz
    f = blob.open("rb")
    if request["name"].endswith(".gz") and request.get("contentEncoding") != "gzip":
        f = gzip.open(f)
    
    for line in f:
        # Here we assume each line is valid JSON representing a single Mixpanel event
        # If lines are not in Mixpanel's format, apply any transformations here.
        line = transform_to_event(line)

        batch.append(line)
        batch_bytes += len(line)

        if len(batch) == EVENTS_PER_BATCH or batch_bytes >= BYTES_PER_BATCH:
            resp = flush(batch)
            batch, batch_bytes = [], 0

            if resp.status_code != 200:
                error = resp.json()
                break

    # Flush final batch
    if batch and not error:
        resp = flush(batch)
        if resp.status_code != 200:
            error = resp.json()

    f.close()
    print(
        json.dumps(
            {
                "message": "Import complete",
                "request": request,
                "success": error is None,
                "first_error": error,
                "severity": "INFO",
            }
        )
    )
google-cloud-storage
requests

🚧

Authentication

Don't forget to fill out the PROJECT_ID, USER, PASS variables with your project_id and service account credentials.

This code will be triggered whenever a new object is added to your bucket. It streams over the newly added object, assuming each line is newline delimited JSON in Mixpanel's event format. It then forms batches and hits our /import API, retrying any retryable errors. It logs a single message on completion. If it fails due to a validation error, this log includes the first non-retryable error encountered.

Click Deploy to deploy the function. At this point, any file you upload to the bucket will trigger an invocation of the function and an import into Mixpanel. Let's test it out!

Step 3: Test with sample data and watch the logs

Let's test the connection with some sample events. Run the following to copy them to your bucket, which will trigger the import:

gsutil cp gs://mixpanel-sample-data/10-events.json gs://<YOUR-BUCKET>/

Monitor the logs of your Cloud Function; you should see an Import Complete log line within a minute.

Cloud Function logsCloud Function logs

Cloud Function logs

If you navigate to Stackdriver logs, you will see a more detailed log that includes the filename being imported and the first error encountered (if any).

Stackdriver logs. The jsonPayload also contains the request field, which contains metadata about the object that was ingested.Stackdriver logs. The jsonPayload also contains the request field, which contains metadata about the object that was ingested.

Stackdriver logs. The jsonPayload also contains the request field, which contains metadata about the object that was ingested.

Finally, let's confirm that the events made it into Mixpanel. Head to Live View, pick test_event in the event picker, and you should see the events you just imported.

You can expand any event in Liveview to see all its properties.You can expand any event in Liveview to see all its properties.

You can expand any event in Liveview to see all its properties.

Step 4: Import more data

We're now ready for an import of your own data. If your data is not already in the Mixpanel format, this is a good time to write a transformation step to run as part of the Cloud Function. We recommend testing locally as you iterate on your data transformation logic, as it's much quicker than redeploying the Cloud Function. The Overview page has sample code and data to test locally.

Once you're ready and have tested with a few small files, you can upload all the files for your import to your GCS bucket, and the import will kick off. This pipeline can be made recurring by uploading files to the GCS bucket periodically.

We recommend partitioning files in GCS to ~200MB of JSON for optimal performance.


Did this page help you?