diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/processing_large_csv/__init__.py b/examples/processing_large_csv/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/processing_large_csv/cio_event_manager.py b/examples/processing_large_csv/cio_event_manager.py new file mode 100644 index 0000000..d770078 --- /dev/null +++ b/examples/processing_large_csv/cio_event_manager.py @@ -0,0 +1,60 @@ +""" +Sample code to demonstrate sending data, contained in CSVs, to CustomerIO as events. This script uses Python3. +""" +import csv +import time +import logging +import os +from multiprocessing.pool import ThreadPool as Pool +from customerio import CustomerIO, CustomerIOException + +logger = logging.getLogger(__name__) + + +class CIOEventManager(object): + def __init__(self, cio_site_id, cio_api_key, csv_full_file_path): + self.site_id = cio_site_id + self.api_key = cio_api_key + self.csv_full_file_path = os.path.normpath(csv_full_file_path) + self.cio_api_client = CustomerIO(self.site_id, self.api_key, retries=5) + self.event_name = None + + def _read_csv(self): + with open(self.csv_full_file_path, 'r') as csv_file: + data = csv.DictReader(csv_file) + for line in data: + yield line + + def _send_data_to_cio(self, data): + try: + # Assume each CSV row looks like + # {'age': '23', 'created_at': '', 'location': 'Boston', 'id': 'id: 1', 'unsubscribed': 'false'} + self.cio_api_client.track(customer_id=data['id'], name=self.event_name) + except CustomerIOException as e: + # log the CSV row and exception and move to processing the next row + logger.exception("Failed to send the row data: {row}. Error message={message}".format(row=data, + message=e.message)) + + def pre_process(self): + """Use this method to do some tasks before the CSV is processed""" + pass + + def send_events(self, no_of_processes=4): + """ + First runs some pre-processing. Read the CSV and send data from each row to CustomerIO. Then it finally + runs some post-processing. + """ + self.pre_process() + csv = self._read_csv() + pool = Pool(processes=no_of_processes) + for row in csv: + pool.map(self._send_data_to_cio, (row,)) + rate_limit_in_secs = 0.03 # rate limiting 30 requests per sec (i.e. 1 / 30 = 0.03) + time.sleep(rate_limit_in_secs) + self._post_process() + pool.close() + pool.join() + + def _post_process(self): + """Use this method to do some tasks after the CSV is processed""" + pass diff --git a/examples/processing_large_csv/readme.MD b/examples/processing_large_csv/readme.MD new file mode 100644 index 0000000..bac1911 --- /dev/null +++ b/examples/processing_large_csv/readme.MD @@ -0,0 +1,25 @@ +# Sending event data to CustomerIO from CSVs + + +Let's say you have large amounts of data in CSV. And you need to process +each row, and then send data from that row to CustomerIO. + +Well, there are many ways one to do this. But we have demonstrated one way to do that +in the file `cio_event_manager.py`. + +To run the code, you will need the following data: +* Site-ID and API Key from your CustomerIO workspace +* Event name +* Full CSV file path + + +And then run the code like this: +```` +cio_event_manager = CIOEventManager(site_id, api_key, full_file_path_in_str) +cio_event_manager.event_name = 'purchase' +cio_event_manager.send_events() +```` + +Behind the scenes, the CSV will be read, and data will be sent to CustomerIO. To speed +up the process of sending data, parallelism is demonstrated using Python's `Multiprocessing` module. +