Google Cloud Storage

This guide outlines how to set up a serverless cohort export from Mixpanel to a Google Cloud Storage bucket. Once this is set up, you will get cohort of users for the cohort you created in Mixpanel in to your bucket on a recurring basis. Setup should take ~12 minutes.

📘

Prerequisites

This guide assumes you are running in Google Cloud Platform, and have the necessary IAM Access to have Cloud Functions write to GCS.

Step 1: Create a Mixpanel service account

Create an Owner or Admin Service Account in the project in which you want to export cohorts.

Step 2: Create a GCS Bucket

Create a dedicated Cloud Storage bucket. We recommend including mixpanel-cohorts-export in the name to make it explicit and avoid any accidental data sharing.

You can create the bucket in any region, though we recommend us-central for highest throughput.

Step 3: Create a Pub/Sub topic (to schedule the cloud function periodically)

Create a dedicated Cloud Pub/Sub topic. We recommend including mixpanel-cohorts-export-schedule in the name to make it explicit and avoid any accidental usage.

Step 4a: Create a Cloud Function

Create a new Cloud Function.

  • Set trigger type to Pub/Sub and select the topic created in the previous step
  • Set Memory to 1GiB, Timeout to 540, and Instances to 20 (increase memory if cohort being exported is big )
Cloud Function configurationCloud Function configuration

Cloud Function configuration

Step 4b: Write the Cloud Function

Switch the runtime to Python3.9 and change the entrypoint from hello_gcs to main. Paste the code below for main.py, gcs.py and requirements.txt.

Set values for the following variables: PROJECT_ID, USER, PASS, BUCKET, COHORT_ID

You can fetch the COHORT_ID using the list cohorts API. The COHORT_ID is also displayed when editing a cohort by clicking the Cohort Name within the Cohorts Report.

from datetime import date
import json
import random
import time
import requests as pyrequests


from google.cloud import storage
from google.auth.transport.requests import AuthorizedSession
from google.resumable_media import requests, common
from gcs import GCSObjectStreamUpload


# constants to fill in 
PROJECT_ID = 0  # mixpanel.com/project/<YOUR_PROJECT_ID>
USER = ""  # Service Account user
PASS = ""  # Service Account password
COHORT_ID = 0 #cohort id from cohorts page
BUCKET = "" # Storage bucket to export cohorts (temporary in case of webhooks)
EU = False # make this true if your project is in mixpanel EU datacenter


# user properties to export
OUTPUT_PROPERTIES = ["$email", "$last_seen", "$region", "$created", "$country_code", "$first_name", "$city"]
# internal mixpanel variables
api_endpoint = "https://mixpanel.com/api/2.0/engage" if EU is False else "https://eu.mixpanel.com/api/2.0/engage"

def main(event, context):
     print(
          json.dumps(
               {
                    "message": "cohort export started",
                    "cohort_id": COHORT_ID,
                    "severity": "INFO"
               }
          )
     )
     # setup
     client = storage.Client()
     add_user_notif_recievers = []
     remove_user_notif_recievers = []
     blob_name = f'cohorts/{COHORT_ID}/all_users.json'
     all_users_out = GCSObjectStreamUpload(client=client, bucket_name=BUCKET, blob_name=blob_name)
     add_blob_name = f'cohorts/{COHORT_ID}/users_added.json'
     add_user_notif_recievers.append(GCSObjectStreamUpload(client=client, bucket_name=BUCKET, blob_name=add_blob_name))
     remove_blob_name = f'cohorts/{COHORT_ID}/users_removed.json'
     remove_user_notif_recievers.append(GCSObjectStreamUpload(client=client, bucket_name=BUCKET, blob_name=remove_blob_name))

     # get users and process
     old_users = {}
     old_users = read_cohort(client, BUCKET, blob_name)
     data = get_engage_data()
     process_diff_and_write(all_users_out, add_user_notif_recievers, data, old_users)
     all_users_out.close()
     for out in add_user_notif_recievers:
          out.close()

     write_remove_users(old_users, remove_user_notif_recievers)
     for out in remove_user_notif_recievers:
          out.close()

     print(
          json.dumps(
               {
                    "message": "cohort export completed",
                    "cohort_id": COHORT_ID,
                    "severity": "INFO"
               }
          )
     )


def read_cohort(gcs, bucket_name, blob_name):
     bucket = gcs.get_bucket(bucket_name)
     blob = bucket.get_blob(blob_name)
     ret = {}
     if blob is None:
          return ret
     f = blob.open("rb")
     for line in f:
          record = json.loads(line)
          ret[record["$distinct_id"]] = record
     f.close()
     return ret

def process_diff_and_write(all_out, add_user_notif_recievers, data, old_users):
     for line in data.iter_lines():
          if not line:
               continue
          decoded_line = line.decode('utf-8')
          row = json.loads(decoded_line)
          all_out.write_record(row)
          distinct_id = row["$distinct_id"]
          if not distinct_id in old_users:
               for out in add_user_notif_recievers:
                    out.write_record(row)
          else:
               del old_users[distinct_id]

def write_remove_users(removed_from_cohort, remove_user_notif_recievers):
     for key, value in removed_from_cohort.items():
          for out in remove_user_notif_recievers:
               out.write_record(value)


def get_engage_data():
     params = {
          "project_id": PROJECT_ID,
     }
     form_data = {
          "filter_by_cohort": json.dumps({"id": COHORT_ID}),
          "include_all_users": False,
          "use_direct_upload": True,
          "output_properties": json.dumps(OUTPUT_PROPERTIES)
     }

     data = make_call(api_endpoint, params, form_data)
     if not "results" in data:
          print(
               json.dumps(
                    {
                         "message": "mixpanel call failed",
                         "response": data
                    }
               )
          )
          return
     data_url = data["results"]["url"]
     data_stream = pyrequests.get(data_url, stream=True)
     return data_stream

def make_call(endpoint, params, form_data):
     while True:
          resp = pyrequests.post(
               endpoint,
               params=params,
               headers={
                    "Accept": "application/json",
                    "Content-Type": "application/json"
               },
               auth=(USER, PASS),
               data=form_data,
          )
          if resp.status_code == 429 or resp.status_code >= 500:
               time.sleep(min(2 ** tries, 60) + random.randint(1, 5))
               tries += 1
               continue
          return resp.json()
google-cloud
google-cloud-storage
google-resumable-media
requests
six
from datetime import date
import json
import random
import time
import requests as pyrequests

from google.cloud import storage
from google.auth.transport.requests import AuthorizedSession
from google.resumable_media import requests, common

class GCSObjectStreamUpload(object):
    def __init__(
            self,
            client: storage.Client,
            bucket_name: str,
            blob_name: str,
            chunk_size: int=256 * 1024
        ):
        self._client = client
        self._bucket = self._client.bucket(bucket_name)
        self._blob = self._bucket.blob(blob_name)

        self._buffer = b''
        self._buffer_size = 0
        self._chunk_size = chunk_size
        self._read = 0

        self._transport = AuthorizedSession(
            credentials=self._client._credentials
        )
        self._request = None  # type: requests.ResumableUpload
        self.start()

    def start(self):
        url = (
            f'https://www.googleapis.com/upload/storage/v1/b/'
            f'{self._bucket.name}/o?uploadType=resumable'
        )
        self._request = requests.ResumableUpload(
            upload_url=url, chunk_size=self._chunk_size
        )
        self._request.initiate(
            transport=self._transport,
            content_type='application/octet-stream',
            stream=self,
            stream_final=False,
            metadata={'name': self._blob.name},
        )

    def close(self):
        self._request.transmit_next_chunk(self._transport)

    def read(self, chunk_size: int) -> bytes:
        # I'm not good with efficient no-copy buffering so if this is
        # wrong or there's a better way to do this let me know! :-)
        to_read = min(chunk_size, self._buffer_size)
        memview = memoryview(self._buffer)
        self._buffer = memview[to_read:].tobytes()
        self._read += to_read
        self._buffer_size -= to_read
        return memview[:to_read].tobytes()

    def tell(self) -> int:
        return self._read

    def write(self, data: bytes) -> int:
        data_len = len(data)
        self._buffer_size += data_len
        self._buffer += data
        del data
        while self._buffer_size >= self._chunk_size:
            try:
                self._request.transmit_next_chunk(self._transport)
            except common.InvalidResponse:
                self._request.recover(self._transport)
        return data_len
    
    def write_record(self, record):
        rec_bytes = json.dumps(record).encode()
        self.write(rec_bytes)
        self.write(b"\n")

Step 5: View and test the results

Test the Cloud Function by deploying it and clicking test function. Once the function finishes successfully, view the results in the bucket you created above.

  • Data will be written to the path <bucket_name>/cohorts/<cohort_id>
  • all_users.json lists all users present in the cohort
  • users_added.json lists users added to the cohort since the last run of this Cloud Function
  • users_removed.json lists users who exited the cohort since the last run of this Cloud Function
Cloud Function configurationCloud Function configuration

Cloud Function configuration

Step 6: Viewing logs

Test the Cloud Function by deploying it and clicking test function. Once the function finishes successfully, view the results in the bucket you created above.

  • Data will be written to the path <bucket_name>/cohorts/<cohort_id>
  • all_users.json lists all users present in the cohort
  • users_added.json lists users added to the cohort since the last run of this Cloud Function
  • users_removed.json lists users who exited the cohort since the last run of this Cloud Function
Cloud Function configurationCloud Function configuration

Cloud Function configuration

If you navigate to Stackdriver for your cloud function, you can see the logs and debug any errors (if any).

Viewing logsViewing logs

Viewing logs

Step 6: Schedule this function to run on a interval

To schedule this function to run periodically, create a Cloud Scheduler job in GCP to periodically send messages to pub/sub topic which we created in Step 3.

📘

Note

A frequency of * /30 * * * * will trigger the Cloud Function every 30 minutes. Adjust this configuration accordingly to accommodate Mixpanel's API rate limits.

Cloud Function configurationCloud Function configuration

Cloud Function configuration

Step 7: Pat yourself on the back

You just set up your first Mixpanel serverless cohort export!


Did this page help you?