Syncing to a Webhook

This guide helps you modify a cohort export to Google Cloud Storage so that users entering or exiting the cohort are intermittently synced to a webhook URL.

📘

Prerequisites

This guide assumes you have already set up or are in the process of setting up a google cloud function to export cohorts to GCS.

Setting up a Webhook end point

A webhook is simply a remote HTTP endpoint that this script can POST data to when a new user matches or leaves the cohort .

To receive users to a webhook from this script, set up an endpoint on your web server exactly like you would for any other page that receives POST requests. We recommend you choose an endpoint that doesn't handle any other requests. Then specify this endpoint in the following script .

Here's an example of the JSON this script will send :

[
   {
      "$distinct_id":"13b20239a29335",
      "$properties":{
         "$region":"California",
         "$email":"[email protected]",
         "$last_name":"Bovik",
         "$created":"2012-11-20T15:26:16",
         "$country_code":"US",
         "$first_name":"Harry",
         "Referring Domain":"news.ycombinator.com",
         "$city":"Los Angeles",
         "Last Seen":"2012-11-20T15:26:17",
         "Referring URL":"http://news.ycombinator.com/",
         "$last_seen":"2012-11-20T15:26:19"
      }
   },
   {
      "$distinct_id":"13a00df8730412",
      "$properties":{
         "$region":"California",
         "$email":"[email protected]",
         "$last_name":"Lytics",
         "$created":"2012-11-20T15:25:38",
         "$country_code":"US",
         "$first_name":"Anna",
         "Referring Domain":"www.quora.com",
         "$city":"Mountain View",
         "Last Seen":"2012-11-20T15:25:39",
         "Referring URL":"http://www.quora.com/What-...",
         "$last_seen":"2012-11-20T15:25:42"
      }
   }
]

Updating the Google Cloud Function

Replace main.py in your google cloud function to the code below and add webhook.py. There are two new values in this code that need to be specified.

  • ADD_WEBHOOK_URLS is a list of webhook urls to send updates to when users enter the cohort
  • REMOVE_WEBHOOK_URLS is a list of webhook urls to send updates to when users leave the cohort
from datetime import date
import json
import random
import time
import requests as pyrequests


from google.cloud import storage
from google.auth.transport.requests import AuthorizedSession
from google.resumable_media import requests, common
from gcs import GCSObjectStreamUpload
from webhook import Webhookupload

# constants to fill in 
PROJECT_ID = 0  # mixpanel.com/project/<YOUR_PROJECT_ID>
USER = ""  # Service Account user
PASS = ""  # Service Account password
COHORT_ID = 0 #cohort id from cohorts page
BUCKET = "" # Storage bucket to export cohorts (temporary in case of webhooks)
EU = False # make this true if your project is in mixpanel EU datacenter
# webhook url where you want to recieve updates when users enters the cohort
ADD_WEBHOOK_URLS = []
# webhook url where you want to recieve updates when users leave the cohort
REMOVE_WEBHOOK_URLS = []



# user properties to export
OUTPUT_PROPERTIES = ["$email", "$last_seen", "$region", "$created", "$country_code", "$first_name", "$city"]
# internal mixpanel variables
api_endpoint = "https://mixpanel.com/api/2.0/engage" if EU is False else "https://eu.mixpanel.com/api/2.0/engage"

def main(event, context):
     print(
          json.dumps(
               {
                    "message": "cohort export started",
                    "cohort_id": COHORT_ID,
                    "severity": "INFO"
               }
          )
     )
     # setup
     client = storage.Client()

     add_user_notif_recievers = []
     remove_user_notif_recievers = []
     
     blob_name = f'cohorts/{COHORT_ID}/all_users.json'
     all_users_out = GCSObjectStreamUpload(client=client, bucket_name=BUCKET, blob_name=blob_name)
     add_blob_name = f'cohorts/{COHORT_ID}/users_added.json'
     add_user_notif_recievers.append(GCSObjectStreamUpload(client=client, bucket_name=BUCKET, blob_name=add_blob_name))
     remove_blob_name = f'cohorts/{COHORT_ID}/users_removed.json'
     remove_user_notif_recievers.append(GCSObjectStreamUpload(client=client, bucket_name=BUCKET, blob_name=remove_blob_name))

     for url in ADD_WEBHOOK_URLS:
          add_user_notif_recievers.append(Webhookupload(url=url))
     for url in REMOVE_WEBHOOK_URLS:
          remove_user_notif_recievers.append(Webhookupload(url=url))
     
     # get users and process
     old_users = {}
     old_users = read_cohort(client, BUCKET, blob_name)
     data = get_engage_data()
     process_diff_and_write(all_users_out, add_user_notif_recievers, data, old_users)
     all_users_out.close()
     for out in add_user_notif_recievers:
          out.close()

     write_remove_users(old_users, remove_user_notif_recievers)
     for out in remove_user_notif_recievers:
          out.close()

     print(
          json.dumps(
               {
                    "message": "cohort export completed",
                    "cohort_id": COHORT_ID,
                    "severity": "INFO"
               }
          )
     )


def read_cohort(gcs, bucket_name, blob_name):
     bucket = gcs.get_bucket(bucket_name)
     blob = bucket.get_blob(blob_name)
     ret = {}
     if blob is None:
          return ret
     f = blob.open("rb")
     for line in f:
          record = json.loads(line)
          ret[record["$distinct_id"]] = record
     f.close()
     return ret

def process_diff_and_write(all_out, add_user_notif_recievers, data, old_users):
     for line in data.iter_lines():
          if not line:
               continue
          decoded_line = line.decode('utf-8')
          row = json.loads(decoded_line)
          all_out.write_record(row)
          distinct_id = row["$distinct_id"]
          if not distinct_id in old_users:
               for out in add_user_notif_recievers:
                    out.write_record(row)
          else:
               del old_users[distinct_id]

def write_remove_users(removed_from_cohort, remove_user_notif_recievers):
     for key, value in removed_from_cohort.items():
          for out in remove_user_notif_recievers:
               out.write_record(value)


def get_engage_data():
     params = {
          "project_id": PROJECT_ID,
     }
     form_data = {
          "filter_by_cohort": json.dumps({"id": COHORT_ID}),
          "include_all_users": False,
          "use_direct_upload": True,
          "output_properties": json.dumps(OUTPUT_PROPERTIES)
     }

     data = make_call(api_endpoint, params, form_data)
     if not "results" in data:
          print(
               json.dumps(
                    {
                         "message": "mixpanel call failed",
                         "response": data
                    }
               )
          )
          return
     data_url = data["results"]["url"]
     data_stream = pyrequests.get(data_url, stream=True)
     return data_stream

def make_call(endpoint, params, form_data):
     while True:
          resp = pyrequests.post(
               endpoint,
               params=params,
               headers={
                    "Accept": "application/json",
                    "Content-Type": "application/json"
               },
               auth=(USER, PASS),
               data=form_data,
          )
          if resp.status_code == 429 or resp.status_code >= 500:
               time.sleep(min(2 ** tries, 60) + random.randint(1, 5))
               tries += 1
               continue
          return resp.json()
from datetime import date
import json
import random
import time
import requests

class Webhookupload(object):
     def __init__(
            self,
            url: str,
            batch_size: int=50
        ):
        self.url = url
        self._batch_size = batch_size
        self._records = []

     def close(self):
        self.flush()

     def flush(self):
          json_data = json.dumps(self._records).encode("utf8")
          while True:
               resp = requests.post(
                    self.url,
                    data={"users":json_data},
               )
               if resp.status_code == 429 or resp.status_code >= 500:
                    time.sleep(min(2 ** tries, 60) + random.randint(1, 5))
                    tries += 1
                    continue
               return

     def write_record(self, record):
          self._records.append(record)
          if len(self._records) >= self._batch_size:
               self.flush()
               self._records = []

View the results

For demonstration purposes we created the following simple webhook implementation using Django and deployed it to Heroku.

def add_users(request):
    if request.POST:
        user_raw = request.POST["users"]
        users = json.loads(user_raw)
        for user in users:
            m_user = User(distinct_id = user["$distinct_id"], created_at = datetime.datetime.now())
            m_user.save()
    return HttpResponse("success")

def remove_users(request):
    if request.POST:
        user_raw = request.POST["users"]
        users = json.loads(user_raw)
        for user in users:
            try:
                u = User.objects.get(distinct_id = user["$distinct_id"])
                u.delete()
            except User.DoesNotExist:
                pass
                #do nothing
    return HttpResponse("success")

Below is the current list of users in the cohort.

Cloud Function configurationCloud Function configuration

Cloud Function configuration


Did this page help you?