Add ga4mp library

* Remove UniversalAnalytics
This commit is contained in:
JonnyWong16 2023-02-26 15:09:12 -08:00
parent ecb6d8b743
commit 42eeb90532
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
10 changed files with 983 additions and 546 deletions

View file

@ -1,121 +0,0 @@
#!/usr/bin/python
###############################################################################
# Formatting filter for urllib2's HTTPHandler(debuglevel=1) output
# Copyright (c) 2013, Analytics Pros
#
# This project is free software, distributed under the BSD license.
# Analytics Pros offers consulting and integration services if your firm needs
# assistance in strategy, implementation, or auditing existing work.
###############################################################################
import sys, re, os
from io import StringIO
class BufferTranslator(object):
""" Provides a buffer-compatible interface for filtering buffer content.
"""
parsers = []
def __init__(self, output):
self.output = output
self.encoding = getattr(output, 'encoding', None)
def write(self, content):
content = self.translate(content)
self.output.write(content)
@staticmethod
def stripslashes(content):
return content.decode('string_escape')
@staticmethod
def addslashes(content):
return content.encode('string_escape')
def translate(self, line):
for pattern, method in self.parsers:
match = pattern.match(line)
if match:
return method(match)
return line
class LineBufferTranslator(BufferTranslator):
""" Line buffer implementation supports translation of line-format input
even when input is not already line-buffered. Caches input until newlines
occur, and then dispatches translated input to output buffer.
"""
def __init__(self, *a, **kw):
self._linepending = []
super(LineBufferTranslator, self).__init__(*a, **kw)
def write(self, _input):
lines = _input.splitlines(True)
for i in range(0, len(lines)):
last = i
if lines[i].endswith('\n'):
prefix = len(self._linepending) and ''.join(self._linepending) or ''
self.output.write(self.translate(prefix + lines[i]))
del self._linepending[0:]
last = -1
if last >= 0:
self._linepending.append(lines[ last ])
def __del__(self):
if len(self._linepending):
self.output.write(self.translate(''.join(self._linepending)))
class HTTPTranslator(LineBufferTranslator):
""" Translates output from |urllib2| HTTPHandler(debuglevel = 1) into
HTTP-compatible, readible text structures for human analysis.
"""
RE_LINE_PARSER = re.compile(r'^(?:([a-z]+):)\s*(\'?)([^\r\n]*)\2(?:[\r\n]*)$')
RE_LINE_BREAK = re.compile(r'(\r?\n|(?:\\r)?\\n)')
RE_HTTP_METHOD = re.compile(r'^(POST|GET|HEAD|DELETE|PUT|TRACE|OPTIONS)')
RE_PARAMETER_SPACER = re.compile(r'&([a-z0-9]+)=')
@classmethod
def spacer(cls, line):
return cls.RE_PARAMETER_SPACER.sub(r' &\1= ', line)
def translate(self, line):
parsed = self.RE_LINE_PARSER.match(line)
if parsed:
value = parsed.group(3)
stage = parsed.group(1)
if stage == 'send': # query string is rendered here
return '\n# HTTP Request:\n' + self.stripslashes(value)
elif stage == 'reply':
return '\n\n# HTTP Response:\n' + self.stripslashes(value)
elif stage == 'header':
return value + '\n'
else:
return value
return line
def consume(outbuffer = None): # Capture standard output
sys.stdout = HTTPTranslator(outbuffer or sys.stdout)
return sys.stdout
if __name__ == '__main__':
consume(sys.stdout).write(sys.stdin.read())
print('\n')
# vim: set nowrap tabstop=4 shiftwidth=4 softtabstop=0 expandtab textwidth=0 filetype=python foldmethod=indent foldcolumn=4

View file

@ -1,424 +0,0 @@
from future.moves.urllib.request import urlopen, build_opener, install_opener
from future.moves.urllib.request import Request, HTTPSHandler
from future.moves.urllib.error import URLError, HTTPError
from future.moves.urllib.parse import urlencode
import random
import datetime
import time
import uuid
import hashlib
import socket
def generate_uuid(basedata=None):
""" Provides a _random_ UUID with no input, or a UUID4-format MD5 checksum of any input data provided """
if basedata is None:
return str(uuid.uuid4())
elif isinstance(basedata, str):
checksum = hashlib.md5(str(basedata).encode('utf-8')).hexdigest()
return '%8s-%4s-%4s-%4s-%12s' % (
checksum[0:8], checksum[8:12], checksum[12:16], checksum[16:20], checksum[20:32])
class Time(datetime.datetime):
""" Wrappers and convenience methods for processing various time representations """
@classmethod
def from_unix(cls, seconds, milliseconds=0):
""" Produce a full |datetime.datetime| object from a Unix timestamp """
base = list(time.gmtime(seconds))[0:6]
base.append(milliseconds * 1000) # microseconds
return cls(*base)
@classmethod
def to_unix(cls, timestamp):
""" Wrapper over time module to produce Unix epoch time as a float """
if not isinstance(timestamp, datetime.datetime):
raise TypeError('Time.milliseconds expects a datetime object')
base = time.mktime(timestamp.timetuple())
return base
@classmethod
def milliseconds_offset(cls, timestamp, now=None):
""" Offset time (in milliseconds) from a |datetime.datetime| object to now """
if isinstance(timestamp, (int, float)):
base = timestamp
else:
base = cls.to_unix(timestamp)
base = base + (timestamp.microsecond / 1000000)
if now is None:
now = time.time()
return (now - base) * 1000
class HTTPRequest(object):
""" URL Construction and request handling abstraction.
This is not intended to be used outside this module.
Automates mapping of persistent state (i.e. query parameters)
onto transcient datasets for each query.
"""
endpoint = 'https://www.google-analytics.com/collect'
@staticmethod
def debug():
""" Activate debugging on urllib2 """
handler = HTTPSHandler(debuglevel=1)
opener = build_opener(handler)
install_opener(opener)
# Store properties for all requests
def __init__(self, user_agent=None, *args, **opts):
self.user_agent = user_agent or 'Analytics Pros - Universal Analytics (Python)'
@classmethod
def fixUTF8(cls, data): # Ensure proper encoding for UA's servers...
""" Convert all strings to UTF-8 """
for key in data:
if isinstance(data[key], str):
data[key] = data[key].encode('utf-8')
return data
# Apply stored properties to the given dataset & POST to the configured endpoint
def send(self, data):
request = Request(
self.endpoint + '?' + urlencode(self.fixUTF8(data)).encode('utf-8'),
headers={
'User-Agent': self.user_agent
}
)
self.open(request)
def open(self, request):
try:
return urlopen(request)
except HTTPError as e:
return False
except URLError as e:
self.cache_request(request)
return False
def cache_request(self, request):
# TODO: implement a proper caching mechanism here for re-transmitting hits
# record = (Time.now(), request.get_full_url(), request.get_data(), request.headers)
pass
class HTTPPost(HTTPRequest):
# Apply stored properties to the given dataset & POST to the configured endpoint
def send(self, data):
request = Request(
self.endpoint,
data=urlencode(self.fixUTF8(data)).encode('utf-8'),
headers={
'User-Agent': self.user_agent
}
)
self.open(request)
class Tracker(object):
""" Primary tracking interface for Universal Analytics """
params = None
parameter_alias = {}
valid_hittypes = ('pageview', 'event', 'social', 'screenview', 'transaction', 'item', 'exception', 'timing')
@classmethod
def alias(cls, typemap, base, *names):
""" Declare an alternate (humane) name for a measurement protocol parameter """
cls.parameter_alias[base] = (typemap, base)
for i in names:
cls.parameter_alias[i] = (typemap, base)
@classmethod
def coerceParameter(cls, name, value=None):
if isinstance(name, str) and name[0] == '&':
return name[1:], str(value)
elif name in cls.parameter_alias:
typecast, param_name = cls.parameter_alias.get(name)
return param_name, typecast(value)
else:
raise KeyError('Parameter "{0}" is not recognized'.format(name))
def payload(self, data):
for key, value in data.items():
try:
yield self.coerceParameter(key, value)
except KeyError:
continue
option_sequence = {
'pageview': [(str, 'dp')],
'event': [(str, 'ec'), (str, 'ea'), (str, 'el'), (int, 'ev')],
'social': [(str, 'sn'), (str, 'sa'), (str, 'st')],
'timing': [(str, 'utc'), (str, 'utv'), (str, 'utt'), (str, 'utl')]
}
@classmethod
def consume_options(cls, data, hittype, args):
""" Interpret sequential arguments related to known hittypes based on declared structures """
opt_position = 0
data['t'] = hittype # integrate hit type parameter
if hittype in cls.option_sequence:
for expected_type, optname in cls.option_sequence[hittype]:
if opt_position < len(args) and isinstance(args[opt_position], expected_type):
data[optname] = args[opt_position]
opt_position += 1
@classmethod
def hittime(cls, timestamp=None, age=None, milliseconds=None):
""" Returns an integer represeting the milliseconds offset for a given hit (relative to now) """
if isinstance(timestamp, (int, float)):
return int(Time.milliseconds_offset(Time.from_unix(timestamp, milliseconds=milliseconds)))
if isinstance(timestamp, datetime.datetime):
return int(Time.milliseconds_offset(timestamp))
if isinstance(age, (int, float)):
return int(age * 1000) + (milliseconds or 0)
@property
def account(self):
return self.params.get('tid', None)
def __init__(self, account, name=None, client_id=None, hash_client_id=False, user_id=None, user_agent=None,
use_post=True):
if use_post is False:
self.http = HTTPRequest(user_agent=user_agent)
else:
self.http = HTTPPost(user_agent=user_agent)
self.params = {'v': 1, 'tid': account}
if client_id is None:
client_id = generate_uuid()
self.params['cid'] = client_id
self.hash_client_id = hash_client_id
if user_id is not None:
self.params['uid'] = user_id
def set_timestamp(self, data):
""" Interpret time-related options, apply queue-time parameter as needed """
if 'hittime' in data: # an absolute timestamp
data['qt'] = self.hittime(timestamp=data.pop('hittime', None))
if 'hitage' in data: # a relative age (in seconds)
data['qt'] = self.hittime(age=data.pop('hitage', None))
def send(self, hittype, *args, **data):
""" Transmit HTTP requests to Google Analytics using the measurement protocol """
if hittype not in self.valid_hittypes:
raise KeyError('Unsupported Universal Analytics Hit Type: {0}'.format(repr(hittype)))
self.set_timestamp(data)
self.consume_options(data, hittype, args)
for item in args: # process dictionary-object arguments of transcient data
if isinstance(item, dict):
for key, val in self.payload(item):
data[key] = val
for k, v in self.params.items(): # update only absent parameters
if k not in data:
data[k] = v
data = dict(self.payload(data))
if self.hash_client_id:
data['cid'] = generate_uuid(data['cid'])
# Transmit the hit to Google...
self.http.send(data)
# Setting persistent attibutes of the session/hit/etc (inc. custom dimensions/metrics)
def set(self, name, value=None):
if isinstance(name, dict):
for key, value in name.items():
try:
param, value = self.coerceParameter(key, value)
self.params[param] = value
except KeyError:
pass
elif isinstance(name, str):
try:
param, value = self.coerceParameter(name, value)
self.params[param] = value
except KeyError:
pass
def __getitem__(self, name):
param, value = self.coerceParameter(name, None)
return self.params.get(param, None)
def __setitem__(self, name, value):
param, value = self.coerceParameter(name, value)
self.params[param] = value
def __delitem__(self, name):
param, value = self.coerceParameter(name, None)
if param in self.params:
del self.params[param]
def safe_unicode(obj):
""" Safe convertion to the Unicode string version of the object """
try:
return str(obj)
except UnicodeDecodeError:
return obj.decode('utf-8')
# Declaring name mappings for Measurement Protocol parameters
MAX_CUSTOM_DEFINITIONS = 200
MAX_EC_LISTS = 11 # 1-based index
MAX_EC_PRODUCTS = 11 # 1-based index
MAX_EC_PROMOTIONS = 11 # 1-based index
Tracker.alias(int, 'v', 'protocol-version')
Tracker.alias(safe_unicode, 'cid', 'client-id', 'clientId', 'clientid')
Tracker.alias(safe_unicode, 'tid', 'trackingId', 'account')
Tracker.alias(safe_unicode, 'uid', 'user-id', 'userId', 'userid')
Tracker.alias(safe_unicode, 'uip', 'user-ip', 'userIp', 'ipaddr')
Tracker.alias(safe_unicode, 'ua', 'userAgent', 'userAgentOverride', 'user-agent')
Tracker.alias(safe_unicode, 'dp', 'page', 'path')
Tracker.alias(safe_unicode, 'dt', 'title', 'pagetitle', 'pageTitle' 'page-title')
Tracker.alias(safe_unicode, 'dl', 'location')
Tracker.alias(safe_unicode, 'dh', 'hostname')
Tracker.alias(safe_unicode, 'sc', 'sessioncontrol', 'session-control', 'sessionControl')
Tracker.alias(safe_unicode, 'dr', 'referrer', 'referer')
Tracker.alias(int, 'qt', 'queueTime', 'queue-time')
Tracker.alias(safe_unicode, 't', 'hitType', 'hittype')
Tracker.alias(int, 'aip', 'anonymizeIp', 'anonIp', 'anonymize-ip')
Tracker.alias(safe_unicode, 'ds', 'dataSource', 'data-source')
# Campaign attribution
Tracker.alias(safe_unicode, 'cn', 'campaign', 'campaignName', 'campaign-name')
Tracker.alias(safe_unicode, 'cs', 'source', 'campaignSource', 'campaign-source')
Tracker.alias(safe_unicode, 'cm', 'medium', 'campaignMedium', 'campaign-medium')
Tracker.alias(safe_unicode, 'ck', 'keyword', 'campaignKeyword', 'campaign-keyword')
Tracker.alias(safe_unicode, 'cc', 'content', 'campaignContent', 'campaign-content')
Tracker.alias(safe_unicode, 'ci', 'campaignId', 'campaignID', 'campaign-id')
# Technical specs
Tracker.alias(safe_unicode, 'sr', 'screenResolution', 'screen-resolution', 'resolution')
Tracker.alias(safe_unicode, 'vp', 'viewport', 'viewportSize', 'viewport-size')
Tracker.alias(safe_unicode, 'de', 'encoding', 'documentEncoding', 'document-encoding')
Tracker.alias(int, 'sd', 'colors', 'screenColors', 'screen-colors')
Tracker.alias(safe_unicode, 'ul', 'language', 'user-language', 'userLanguage')
# Mobile app
Tracker.alias(safe_unicode, 'an', 'appName', 'app-name', 'app')
Tracker.alias(safe_unicode, 'cd', 'contentDescription', 'screenName', 'screen-name', 'content-description')
Tracker.alias(safe_unicode, 'av', 'appVersion', 'app-version', 'version')
Tracker.alias(safe_unicode, 'aid', 'appID', 'appId', 'application-id', 'app-id', 'applicationId')
Tracker.alias(safe_unicode, 'aiid', 'appInstallerId', 'app-installer-id')
# Ecommerce
Tracker.alias(safe_unicode, 'ta', 'affiliation', 'transactionAffiliation', 'transaction-affiliation')
Tracker.alias(safe_unicode, 'ti', 'transaction', 'transactionId', 'transaction-id')
Tracker.alias(float, 'tr', 'revenue', 'transactionRevenue', 'transaction-revenue')
Tracker.alias(float, 'ts', 'shipping', 'transactionShipping', 'transaction-shipping')
Tracker.alias(float, 'tt', 'tax', 'transactionTax', 'transaction-tax')
Tracker.alias(safe_unicode, 'cu', 'currency', 'transactionCurrency',
'transaction-currency') # Currency code, e.g. USD, EUR
Tracker.alias(safe_unicode, 'in', 'item-name', 'itemName')
Tracker.alias(float, 'ip', 'item-price', 'itemPrice')
Tracker.alias(float, 'iq', 'item-quantity', 'itemQuantity')
Tracker.alias(safe_unicode, 'ic', 'item-code', 'sku', 'itemCode')
Tracker.alias(safe_unicode, 'iv', 'item-variation', 'item-category', 'itemCategory', 'itemVariation')
# Events
Tracker.alias(safe_unicode, 'ec', 'event-category', 'eventCategory', 'category')
Tracker.alias(safe_unicode, 'ea', 'event-action', 'eventAction', 'action')
Tracker.alias(safe_unicode, 'el', 'event-label', 'eventLabel', 'label')
Tracker.alias(int, 'ev', 'event-value', 'eventValue', 'value')
Tracker.alias(int, 'ni', 'noninteractive', 'nonInteractive', 'noninteraction', 'nonInteraction')
# Social
Tracker.alias(safe_unicode, 'sa', 'social-action', 'socialAction')
Tracker.alias(safe_unicode, 'sn', 'social-network', 'socialNetwork')
Tracker.alias(safe_unicode, 'st', 'social-target', 'socialTarget')
# Exceptions
Tracker.alias(safe_unicode, 'exd', 'exception-description', 'exceptionDescription', 'exDescription')
Tracker.alias(int, 'exf', 'exception-fatal', 'exceptionFatal', 'exFatal')
# User Timing
Tracker.alias(safe_unicode, 'utc', 'timingCategory', 'timing-category')
Tracker.alias(safe_unicode, 'utv', 'timingVariable', 'timing-variable')
Tracker.alias(float, 'utt', 'time', 'timingTime', 'timing-time')
Tracker.alias(safe_unicode, 'utl', 'timingLabel', 'timing-label')
Tracker.alias(float, 'dns', 'timingDNS', 'timing-dns')
Tracker.alias(float, 'pdt', 'timingPageLoad', 'timing-page-load')
Tracker.alias(float, 'rrt', 'timingRedirect', 'timing-redirect')
Tracker.alias(safe_unicode, 'tcp', 'timingTCPConnect', 'timing-tcp-connect')
Tracker.alias(safe_unicode, 'srt', 'timingServerResponse', 'timing-server-response')
# Custom dimensions and metrics
for i in range(0, 200):
Tracker.alias(safe_unicode, 'cd{0}'.format(i), 'dimension{0}'.format(i))
Tracker.alias(int, 'cm{0}'.format(i), 'metric{0}'.format(i))
# Content groups
for i in range(0, 5):
Tracker.alias(safe_unicode, 'cg{0}'.format(i), 'contentGroup{0}'.format(i))
# Enhanced Ecommerce
Tracker.alias(str, 'pa') # Product action
Tracker.alias(str, 'tcc') # Coupon code
Tracker.alias(str, 'pal') # Product action list
Tracker.alias(int, 'cos') # Checkout step
Tracker.alias(str, 'col') # Checkout step option
Tracker.alias(str, 'promoa') # Promotion action
for product_index in range(1, MAX_EC_PRODUCTS):
Tracker.alias(str, 'pr{0}id'.format(product_index)) # Product SKU
Tracker.alias(str, 'pr{0}nm'.format(product_index)) # Product name
Tracker.alias(str, 'pr{0}br'.format(product_index)) # Product brand
Tracker.alias(str, 'pr{0}ca'.format(product_index)) # Product category
Tracker.alias(str, 'pr{0}va'.format(product_index)) # Product variant
Tracker.alias(str, 'pr{0}pr'.format(product_index)) # Product price
Tracker.alias(int, 'pr{0}qt'.format(product_index)) # Product quantity
Tracker.alias(str, 'pr{0}cc'.format(product_index)) # Product coupon code
Tracker.alias(int, 'pr{0}ps'.format(product_index)) # Product position
for custom_index in range(MAX_CUSTOM_DEFINITIONS):
Tracker.alias(str, 'pr{0}cd{1}'.format(product_index, custom_index)) # Product custom dimension
Tracker.alias(int, 'pr{0}cm{1}'.format(product_index, custom_index)) # Product custom metric
for list_index in range(1, MAX_EC_LISTS):
Tracker.alias(str, 'il{0}pi{1}id'.format(list_index, product_index)) # Product impression SKU
Tracker.alias(str, 'il{0}pi{1}nm'.format(list_index, product_index)) # Product impression name
Tracker.alias(str, 'il{0}pi{1}br'.format(list_index, product_index)) # Product impression brand
Tracker.alias(str, 'il{0}pi{1}ca'.format(list_index, product_index)) # Product impression category
Tracker.alias(str, 'il{0}pi{1}va'.format(list_index, product_index)) # Product impression variant
Tracker.alias(int, 'il{0}pi{1}ps'.format(list_index, product_index)) # Product impression position
Tracker.alias(int, 'il{0}pi{1}pr'.format(list_index, product_index)) # Product impression price
for custom_index in range(MAX_CUSTOM_DEFINITIONS):
Tracker.alias(str, 'il{0}pi{1}cd{2}'.format(list_index, product_index,
custom_index)) # Product impression custom dimension
Tracker.alias(int, 'il{0}pi{1}cm{2}'.format(list_index, product_index,
custom_index)) # Product impression custom metric
for list_index in range(1, MAX_EC_LISTS):
Tracker.alias(str, 'il{0}nm'.format(list_index)) # Product impression list name
for promotion_index in range(1, MAX_EC_PROMOTIONS):
Tracker.alias(str, 'promo{0}id'.format(promotion_index)) # Promotion ID
Tracker.alias(str, 'promo{0}nm'.format(promotion_index)) # Promotion name
Tracker.alias(str, 'promo{0}cr'.format(promotion_index)) # Promotion creative
Tracker.alias(str, 'promo{0}ps'.format(promotion_index)) # Promotion position
# Shortcut for creating trackers
def create(account, *args, **kwargs):
return Tracker(account, *args, **kwargs)
# vim: set nowrap tabstop=4 shiftwidth=4 softtabstop=0 expandtab textwidth=0 filetype=python foldmethod=indent foldcolumn=4

View file

@ -1 +0,0 @@
from . import Tracker

3
lib/ga4mp/__init__.py Normal file
View file

@ -0,0 +1,3 @@
from ga4mp.ga4mp import GtagMP, FirebaseMP
__all__ = ['GtagMP','FirebaseMP']

44
lib/ga4mp/event.py Normal file
View file

@ -0,0 +1,44 @@
from ga4mp.item import Item
class Event(dict):
def __init__(self, name):
self.set_event_name(name)
def set_event_name(self, name):
if len(name) > 40:
raise ValueError("Event name cannot exceed 40 characters.")
self["name"] = name
def get_event_name(self):
return self.get("name")
def set_event_param(self, name, value):
# Series of checks to comply with GA4 event collection limits: https://support.google.com/analytics/answer/9267744
if len(name) > 40:
raise ValueError("Event parameter name cannot exceed 40 characters.")
if name in ["page_location", "page_referrer", "page_title"] and len(str(value)) > 300:
raise ValueError("Event parameter value for page info cannot exceed 300 characters.")
if name not in ["page_location", "page_referrer", "page_title"] and len(str(value)) > 100:
raise ValueError("Event parameter value cannot exceed 100 characters.")
if "params" not in self.keys():
self["params"] = {}
if len(self["params"]) >= 100:
raise RuntimeError("Event cannot contain more than 100 parameters.")
self["params"][name] = value
def get_event_params(self):
return self.get("params")
def delete_event_param(self, name):
# Since only 25 event parameters are allowed, this will allow the user to delete a parameter if necessary.
self["params"].pop(name, None)
def create_new_item(self, item_id=None, item_name=None):
return Item(item_id=item_id, item_name=item_name)
def add_item_to_event(self, item):
if not isinstance(item, dict):
raise ValueError("'item' must be an instance of a dictionary.")
if "items" not in self["params"].keys():
self.set_event_param("items", [])
self["params"]["items"].append(item)

416
lib/ga4mp/ga4mp.py Normal file
View file

@ -0,0 +1,416 @@
###############################################################################
# Google Analytics 4 Measurement Protocol for Python
# Copyright (c) 2022, Adswerve
#
# This project is free software, distributed under the BSD license.
# Adswerve offers consulting and integration services if your firm needs
# assistance in strategy, implementation, or auditing existing work.
###############################################################################
import json
import logging
import urllib.request
import time
import datetime
import random
from ga4mp.utils import params_dict
from ga4mp.event import Event
from ga4mp.store import BaseStore, DictStore
import os, sys
sys.path.append(
os.path.normpath(os.path.join(os.path.dirname(__file__), ".."))
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class BaseGa4mp(object):
"""
Parent class that provides an interface for sending data to Google Analytics, supporting the GA4 Measurement Protocol.
Parameters
----------
api_secret : string
Generated through the Google Analytics UI. To create a new secret, navigate in the Google Analytics UI to: Admin > Data Streams >
[choose your stream] > Measurement Protocol API Secrets > Create
See Also
--------
* Measurement Protocol (Google Analytics 4): https://developers.google.com/analytics/devguides/collection/protocol/ga4
Examples
--------
# Initialize tracking object for gtag usage
>>> ga = gtagMP(api_secret = "API_SECRET", measurement_id = "MEASUREMENT_ID", client_id="CLIENT_ID")
# Initialize tracking object for Firebase usage
>>> ga = firebaseMP(api_secret = "API_SECRET", firebase_app_id = "FIREBASE_APP_ID", app_instance_id="APP_INSTANCE_ID")
# Build an event
>>> event_type = 'new_custom_event'
>>> event_parameters = {'parameter_key_1': 'parameter_1', 'parameter_key_2': 'parameter_2'}
>>> event = {'name': event_type, 'params': event_parameters }
>>> events = [event]
# Send a custom event to GA4 immediately
>>> ga.send(events)
# Postponed send of a custom event to GA4
>>> ga.send(events, postpone=True)
>>> ga.postponed_send()
"""
def __init__(self, api_secret, store: BaseStore = None):
self._initialization_time = time.time() # used for both session_id and calculating engagement time
self.api_secret = api_secret
self._event_list = []
assert store is None or isinstance(store, BaseStore), "if supplied, store must be an instance of BaseStore"
self.store = store or DictStore()
self._check_store_requirements()
self._base_domain = "https://www.google-analytics.com/mp/collect"
self._validation_domain = "https://www.google-analytics.com/debug/mp/collect"
def _check_store_requirements(self):
# Store must contain "session_id" and "last_interaction_time_msec" in order for tracking to work properly.
if self.store.get_session_parameter("session_id") is None:
self.store.set_session_parameter(name="session_id", value=int(self._initialization_time))
# Note: "last_interaction_time_msec" factors into the required "engagement_time_msec" event parameter.
self.store.set_session_parameter(name="last_interaction_time_msec", value=int(self._initialization_time * 1000))
def create_new_event(self, name):
return Event(name=name)
def send(self, events, validation_hit=False, postpone=False, date=None):
"""
Method to send an http post request to google analytics with the specified events.
Parameters
----------
events : List[Dict]
A list of dictionaries of the events to be sent to Google Analytics. The list of dictionaries should adhere
to the following format:
[{'name': 'level_end',
'params' : {'level_name': 'First',
'success': 'True'}
},
{'name': 'level_up',
'params': {'character': 'John Madden',
'level': 'First'}
}]
validation_hit : bool, optional
Boolean to depict if events should be tested against the Measurement Protocol Validation Server, by default False
postpone : bool, optional
Boolean to depict if provided event list should be postponed, by default False
date : datetime
Python datetime object for sending a historical event at the given date. Date cannot be in the future.
"""
# check for any missing or invalid parameters among automatically collected and recommended event types
self._check_params(events)
self._check_date_not_in_future(date)
self._add_session_id_and_engagement_time(events)
if postpone is True:
# build event list to send later
for event in events:
event["_timestamp_micros"] = self._get_timestamp(time.time())
self._event_list.append(event)
else:
# batch events into sets of 25 events, the maximum allowed.
batched_event_list = [
events[event : event + 25] for event in range(0, len(events), 25)
]
# send http post request
self._http_post(
batched_event_list, validation_hit=validation_hit, date=date
)
def postponed_send(self):
"""
Method to send the events provided to Ga4mp.send(events,postpone=True)
"""
for event in self._event_list:
self._http_post([event], postpone=True)
# clear event_list for future use
self._event_list = []
def append_event_to_params_dict(self, new_name_and_parameters):
"""
Method to append event name and parameters key-value pairing(s) to parameters dictionary.
Parameters
----------
new_name_and_parameters : Dict
A dictionary with one key-value pair representing a new type of event to be sent to Google Analytics.
The dictionary should adhere to the following format:
{'new_name': ['new_param_1', 'new_param_2', 'new_param_3']}
"""
params_dict.update(new_name_and_parameters)
def _http_post(self, batched_event_list, validation_hit=False, postpone=False, date=None):
"""
Method to send http POST request to google-analytics.
Parameters
----------
batched_event_list : List[List[Dict]]
List of List of events. Places initial event payload into a list to send http POST in batches.
validation_hit : bool, optional
Boolean to depict if events should be tested against the Measurement Protocol Validation Server, by default False
postpone : bool, optional
Boolean to depict if provided event list should be postponed, by default False
date : datetime
Python datetime object for sending a historical event at the given date. Date cannot be in the future.
Timestamp micros supports up to 48 hours of backdating.
If date is specified, postpone must be False or an assertion will be thrown.
"""
self._check_date_not_in_future(date)
status_code = None # Default set to know if batch loop does not work and to bound status_code
# set domain
domain = self._base_domain
if validation_hit is True:
domain = self._validation_domain
logger.info(f"Sending POST to: {domain}")
# loop through events in batches of 25
batch_number = 1
for batch in batched_event_list:
# url and request slightly differ by subclass
url = self._build_url(domain=domain)
request = self._build_request(batch=batch)
self._add_user_props_to_hit(request)
# make adjustments for postponed hit
request["events"] = (
{"name": batch["name"], "params": batch["params"]}
if (postpone)
else batch
)
if date is not None:
logger.info(f"Setting event timestamp to: {date}")
assert (
postpone is False
), "Cannot send postponed historical hit, ensure postpone=False"
ts = self._datetime_to_timestamp(date)
ts_micro = self._get_timestamp(ts)
request["timestamp_micros"] = int(ts_micro)
logger.info(f"Timestamp of request is: {request['timestamp_micros']}")
if postpone:
# add timestamp to hit
request["timestamp_micros"] = batch["_timestamp_micros"]
req = urllib.request.Request(url)
req.add_header("Content-Type", "application/json; charset=utf-8")
jsondata = json.dumps(request)
json_data_as_bytes = jsondata.encode("utf-8") # needs to be bytes
req.add_header("Content-Length", len(json_data_as_bytes))
result = urllib.request.urlopen(req, json_data_as_bytes)
status_code = result.status
logger.info(f"Batch Number: {batch_number}")
logger.info(f"Status code: {status_code}")
batch_number += 1
return status_code
def _check_params(self, events):
"""
Method to check whether the provided event payload parameters align with supported parameters.
Parameters
----------
events : List[Dict]
A list of dictionaries of the events to be sent to Google Analytics. The list of dictionaries should adhere
to the following format:
[{'name': 'level_end',
'params' : {'level_name': 'First',
'success': 'True'}
},
{'name': 'level_up',
'params': {'character': 'John Madden',
'level': 'First'}
}]
"""
# check to make sure it's a list of dictionaries with the right keys
assert type(events) == list, "events should be a list"
for event in events:
assert isinstance(event, dict), "each event should be an instance of a dictionary"
assert "name" in event, 'each event should have a "name" key'
assert "params" in event, 'each event should have a "params" key'
# check for any missing or invalid parameters
for e in events:
event_name = e["name"]
event_params = e["params"]
if event_name in params_dict.keys():
for parameter in params_dict[event_name]:
if parameter not in event_params.keys():
logger.warning(
f"WARNING: Event parameters do not match event type.\nFor {event_name} event type, the correct parameter(s) are {params_dict[event_name]}.\nThe parameter '{parameter}' triggered this warning.\nFor a breakdown of currently supported event types and their parameters go here: https://support.google.com/analytics/answer/9267735\n"
)
def _add_session_id_and_engagement_time(self, events):
"""
Method to add the session_id and engagement_time_msec parameter to all events.
"""
for event in events:
current_time_in_milliseconds = int(time.time() * 1000)
event_params = event["params"]
if "session_id" not in event_params.keys():
event_params["session_id"] = self.store.get_session_parameter("session_id")
if "engagement_time_msec" not in event_params.keys():
last_interaction_time = self.store.get_session_parameter("last_interaction_time_msec")
event_params["engagement_time_msec"] = current_time_in_milliseconds - last_interaction_time if current_time_in_milliseconds > last_interaction_time else 0
self.store.set_session_parameter(name="last_interaction_time_msec", value=current_time_in_milliseconds)
def _add_user_props_to_hit(self, hit):
"""
Method is a helper function to add user properties to outgoing hits.
Parameters
----------
hit : dict
"""
for key in self.store.get_all_user_properties():
try:
if key in ["user_id", "non_personalized_ads"]:
hit.update({key: self.store.get_user_property(key)})
else:
if "user_properties" not in hit.keys():
hit.update({"user_properties": {}})
hit["user_properties"].update(
{key: {"value": self.store.get_user_property(key)}}
)
except:
logger.info(f"Failed to add user property to outgoing hit: {key}")
def _get_timestamp(self, timestamp):
"""
Method returns UNIX timestamp in microseconds for postponed hits.
Parameters
----------
None
"""
return int(timestamp * 1e6)
def _datetime_to_timestamp(self, dt):
"""
Private method to convert a datetime object into a timestamp
Parameters
----------
dt : datetime
A datetime object in any format
Returns
-------
timestamp
A UNIX timestamp in milliseconds
"""
return time.mktime(dt.timetuple())
def _check_date_not_in_future(self, date):
"""
Method to check that provided date is not in the future.
Parameters
----------
date : datetime
Python datetime object
"""
if date is None:
pass
else:
assert (
date <= datetime.datetime.now()
), "Provided date cannot be in the future"
def _build_url(self, domain):
raise NotImplementedError("Subclass should be using this function, but it was called through the base class instead.")
def _build_request(self, batch):
raise NotImplementedError("Subclass should be using this function, but it was called through the base class instead.")
class GtagMP(BaseGa4mp):
"""
Subclass for users of gtag. See `Ga4mp` parent class for examples.
Parameters
----------
measurement_id : string
The identifier for a Data Stream. Found in the Google Analytics UI under: Admin > Data Streams > [choose your stream] > Measurement ID (top-right)
client_id : string
A unique identifier for a client, representing a specific browser/device.
"""
def __init__(self, api_secret, measurement_id, client_id,):
super().__init__(api_secret)
self.measurement_id = measurement_id
self.client_id = client_id
def _build_url(self, domain):
return f"{domain}?measurement_id={self.measurement_id}&api_secret={self.api_secret}"
def _build_request(self, batch):
return {"client_id": self.client_id, "events": batch}
def random_client_id(self):
"""
Utility function for generating a new client ID matching the typical format of 10 random digits and the UNIX timestamp in seconds, joined by a period.
"""
return "%0.10d" % random.randint(0,9999999999) + "." + str(int(time.time()))
class FirebaseMP(BaseGa4mp):
"""
Subclass for users of Firebase. See `Ga4mp` parent class for examples.
Parameters
----------
firebase_app_id : string
The identifier for a Firebase app. Found in the Firebase console under: Project Settings > General > Your Apps > App ID.
app_instance_id : string
A unique identifier for a Firebase app instance.
* Android - getAppInstanceId() - https://firebase.google.com/docs/reference/android/com/google/firebase/analytics/FirebaseAnalytics#public-taskstring-getappinstanceid
* Kotlin - getAppInstanceId() - https://firebase.google.com/docs/reference/kotlin/com/google/firebase/analytics/FirebaseAnalytics#getappinstanceid
* Swift - appInstanceID() - https://firebase.google.com/docs/reference/swift/firebaseanalytics/api/reference/Classes/Analytics#appinstanceid
* Objective-C - appInstanceID - https://firebase.google.com/docs/reference/ios/firebaseanalytics/api/reference/Classes/FIRAnalytics#+appinstanceid
* C++ - GetAnalyticsInstanceId() - https://firebase.google.com/docs/reference/cpp/namespace/firebase/analytics#getanalyticsinstanceid
* Unity - GetAnalyticsInstanceIdAsync() - https://firebase.google.com/docs/reference/unity/class/firebase/analytics/firebase-analytics#getanalyticsinstanceidasync
"""
def __init__(self, api_secret, firebase_app_id, app_instance_id):
super().__init__(api_secret)
self.firebase_app_id = firebase_app_id
self.app_instance_id = app_instance_id
def _build_url(self, domain):
return f"{domain}?firebase_app_id={self.firebase_app_id}&api_secret={self.api_secret}"
def _build_request(self, batch):
return {"app_instance_id": self.app_instance_id, "events": batch}

11
lib/ga4mp/item.py Normal file
View file

@ -0,0 +1,11 @@
class Item(dict):
def __init__(self, item_id=None, item_name=None):
if item_id is None and item_name is None:
raise ValueError("At least one of 'item_id' and 'item_name' is required.")
if item_id is not None:
self.set_parameter("item_id", str(item_id))
if item_name is not None:
self.set_parameter("item_name", item_name)
def set_parameter(self, name, value):
self[name] = value

116
lib/ga4mp/store.py Normal file
View file

@ -0,0 +1,116 @@
import json
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class BaseStore(dict):
def __init__(self):
self.update([("user_properties", {}),("session_parameters", {})])
def save(self):
raise NotImplementedError("Subclass should be using this function, but it was called through the base class instead.")
def _check_exists(self, key):
# Helper function to make sure a key exists before trying to work with values within it.
if key not in self.keys():
self[key] = {}
def _set(self, param_type, name, value):
# Helper function to set a single parameter (user or session or other).
self._check_exists(key=param_type)
self[param_type][name] = value
def _get_one(self, param_type, name):
# Helper function to get a single parameter value (user or session).
self._check_exists(key=param_type)
return self[param_type].get(name, None)
def _get_all(self, param_type=None):
# Helper function to get all user or session parameters - or the entire dictionary if not specified.
if param_type is not None:
return self[param_type]
else:
return self
# While redundant, the following make sure the distinction between session and user items is easier for the end user.
def set_user_property(self, name, value):
self._set(param_type="user_properties", name=name, value=value)
def get_user_property(self, name):
return self._get_one(param_type="user_properties", name=name)
def get_all_user_properties(self):
return self._get_all(param_type="user_properties")
def clear_user_properties(self):
self["user_properties"] = {}
def set_session_parameter(self, name, value):
self._set(param_type="session_parameters", name=name, value=value)
def get_session_parameter(self, name):
return self._get_one(param_type="session_parameters", name=name)
def get_all_session_parameters(self):
return self._get_all(param_type="session_parameters")
def clear_session_parameters(self):
self["session_parameters"] = {}
# Similar functions for other items the user wants to store that don't fit the other two categories.
def set_other_parameter(self, name, value):
self._set(param_type="other", name=name, value=value)
def get_other_parameter(self, name):
return self._get_one(param_type="other", name=name)
def get_all_other_parameters(self):
return self._get_all(param_type="other")
def clear_other_parameters(self):
self["other"] = {}
class DictStore(BaseStore):
# Class for working with dictionaries that persist for the life of the class.
def __init__(self, data: dict = None):
super().__init__()
if data:
self.update(data)
def save(self):
# Give the user back what's in the dictionary so they can decide how to save it.
self._get_all()
class FileStore(BaseStore):
# Class for working with dictionaries that get saved to a JSON file.
def __init__(self, data_location: str = None):
super().__init__()
self.data_location = data_location
try:
self._load_file(data_location)
except:
logger.info(f"Failed to find file at location: {data_location}")
def _load_file(self):
# Function to get data from the object's initialized location.
# If the provided or stored data_location exists, read the file and overwrite the object's contents.
if Path(self.data_location).exists():
with open(self.data_location, "r") as json_file:
self = json.load(json_file)
# If the data_location doesn't exist, try to create a new starter JSON file at the location given.
else:
starter_dict = '{"user_properties":{}, "session_parameters":{}}'
starter_json = json.loads(starter_dict)
Path(self.data_location).touch()
with open(self.data_location, "w") as json_file:
json.dumps(starter_json, json_file)
def save(self):
# Function to save the current dictionary to a JSON file at the object's initialized location.
try:
with open(self.data_location, "w") as outfile:
json.dump(self, outfile)
except:
logger.info(f"Failed to save file at location: {self.data_location}")

392
lib/ga4mp/utils.py Normal file
View file

@ -0,0 +1,392 @@
# all automatically collected and recommended event types
params_dict = {
"ad_click": [
"ad_event_id"
],
"ad_exposure": [
"firebase_screen",
"firebase_screen_id",
"firebase_screen_class",
"exposure_time",
],
"ad_impression": [
"ad_event_id"
],
"ad_query": [
"ad_event_id"
],
"ad_reward": [
"ad_unit_id",
"reward_type",
"reward_value"
],
"add_payment_info": [
"coupon",
"currency",
"items",
"payment_type",
"value"
],
"add_shipping_info": [
"coupon",
"currency",
"items",
"shipping_tier",
"value"
],
"add_to_cart": [
"currency",
"items",
"value"
],
"add_to_wishlist": [
"currency",
"items",
"value"
],
"adunit_exposure": [
"firebase_screen",
"firebase_screen_id",
"firebase_screen_class",
"exposure_time",
],
"app_clear_data": [],
"app_exception": [
"fatal",
"timestamp",
"engagement_time_msec"
],
"app_remove": [],
"app_store_refund": [
"product_id",
"value",
"currency",
"quantity"
],
"app_store_subscription_cancel": [
"product_id",
"price",
"value",
"currency",
"cancellation_reason",
],
"app_store_subscription_convert": [
"product_id",
"price",
"value",
"currency",
"quantity",
],
"app_store_subscription_renew": [
"product_id",
"price",
"value",
"currency",
"quantity",
"renewal_count",
],
"app_update": [
"previous_app_version"
],
"begin_checkout": [
"coupon",
"currency",
"items",
"value"
],
"click": [],
"dynamic_link_app_open": [
"source",
"medium",
"campaign",
"link_id",
"accept_time"
],
"dynamic_link_app_update": [
"source",
"medium",
"campaign",
"link_id",
"accept_time",
],
"dynamic_link_first_open": [
"source",
"medium",
"campaign",
"link_id",
"accept_time",
],
"earn_virtual_currency": [
"virtual_currency_name",
"value"
],
"error": [
"firebase_error",
"firebase_error_value"
],
"file_download": [
"file_extension",
"file_name",
"link_classes",
"link_domain",
"link_id",
"link_text",
"link_url",
],
"firebase_campaign": [
"source",
"medium",
"campaign",
"term",
"content",
"gclid",
"aclid",
"cp1",
"anid",
"click_timestamp",
"campaign_info_source",
],
"firebase_in_app_message_action": [
"message_name",
"message_device_time",
"message_id",
],
"firebase_in_app_message_dismiss": [
"message_name",
"message_device_time",
"message_id",
],
"firebase_in_app_message_impression": [
"message_name",
"message_device_time",
"message_id",
],
"first_open": [
"previous_gmp_app_id",
"updated_with_analytics",
"previous_first_open_count",
"system_app",
"system_app_update",
"deferred_analytics_collection",
"reset_analytics_cause",
"engagement_time_msec",
],
"first_visit": [],
"generate_lead": [
"value",
"currency"
],
"in_app_purchase": [
"product_id",
"price",
"value",
"currency",
"quantity",
"subscription",
"free_trial",
"introductory_price",
],
"join_group": [
"group_id"
],
"level_end": [
"level_name",
"success"
],
"level_start": [
"level_name"
],
"level_up": [
"character",
"level"
],
"login": [
"method"
],
"notification_dismiss": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
],
"notification_foreground": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
"message_type",
],
"notification_open": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
],
"notification_receive": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
"message_type",
],
"notification_send": [
"message_name",
"message_time",
"message_device_time",
"message_id",
"topic",
"label",
"message_channel",
],
"os_update": [
"previous_os_version"
],
"page_view": [
"page_location",
"page_referrer"
],
"post_score": [
"level",
"character",
"score"
],
"purchase": [
"affiliation",
"coupon",
"currency",
"items",
"transaction_id",
"shipping",
"tax",
"value",
],
"refund": [
"transaction_id",
"value",
"currency",
"tax",
"shipping",
"items"
],
"remove_from_cart": [
"currency",
"items",
"value"
],
"screen_view": [
"firebase_screen",
"firebase_screen_class",
"firebase_screen_id",
"firebase_previous_screen",
"firebase_previous_class",
"firebase_previous_id",
"engagement_time_msec",
],
"scroll": [],
"search": [
"search_term"
],
"select_content": [
"content_type",
"item_id"
],
"select_item": [
"items",
"item_list_name",
"item_list_id"
],
"select_promotion": [
"items",
"promotion_id",
"promotion_name",
"creative_name",
"creative_slot",
"location_id",
],
"session_start": [],
"share": [
"content_type",
"item_id"
],
"sign_up": [
"method"
],
"view_search_results": [
"search_term"
],
"spend_virtual_currency": [
"item_name",
"virtual_currency_name",
"value"
],
"tutorial_begin": [],
"tutorial_complete": [],
"unlock_achievement": [
"achievement_id"
],
"user_engagement": [
"engagement_time_msec"
],
"video_start": [
"video_current_time",
"video_duration",
"video_percent",
"video_provider",
"video_title",
"video_url",
"visible",
],
"video_progress": [
"video_current_time",
"video_duration",
"video_percent",
"video_provider",
"video_title",
"video_url",
"visible",
],
"video_complete": [
"video_current_time",
"video_duration",
"video_percent",
"video_provider",
"video_title",
"video_url",
"visible",
],
"view_cart": [
"currency",
"items",
"value"
],
"view_item": [
"currency",
"items",
"value"
],
"view_item_list": [
"items",
"item_list_name",
"item_list_id"
],
"view_promotion": [
"items",
"promotion_id",
"promotion_name",
"creative_name",
"creative_slot",
"location_id",
],
}

View file

@ -14,6 +14,7 @@ distro==1.8.0
dnspython==2.2.1
facebook-sdk==3.1.0
future==0.18.2
ga4mp==2.0.4
gntp==1.0.3
html5lib==1.1
httpagentparser==1.9.5