From 50790d4b22dd34a2d8cd2f441b151be3d76c7ae4 Mon Sep 17 00:00:00 2001 From: Waqas Younas Date: Fri, 3 Jul 2020 00:15:55 +0500 Subject: [PATCH 1/3] Added an example that demonstrates sending event data from CSV files --- examples/__init__.py | 0 examples/processing_large_csv/__init__.py | 0 .../processing_large_csv/cio_csv_processor.py | 58 +++++++++++++++++++ examples/processing_large_csv/readme.MD | 23 ++++++++ 4 files changed, 81 insertions(+) create mode 100644 examples/__init__.py create mode 100644 examples/processing_large_csv/__init__.py create mode 100644 examples/processing_large_csv/cio_csv_processor.py create mode 100644 examples/processing_large_csv/readme.MD diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/processing_large_csv/__init__.py b/examples/processing_large_csv/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/processing_large_csv/cio_csv_processor.py b/examples/processing_large_csv/cio_csv_processor.py new file mode 100644 index 0000000..a9d2c27 --- /dev/null +++ b/examples/processing_large_csv/cio_csv_processor.py @@ -0,0 +1,58 @@ +""" +Sample code to demonstrate sending event data to CustomerIO using large CSVs. Created using Python3. +""" + +import time +import logging +import os +from multiprocessing.pool import ThreadPool as Pool +from customerio import CustomerIO, CustomerIOException + +logger = logging.getLogger(__name__) + + +class CIO_CSVProcessor(object): + def __init__(self, site_id, api_key, csv_full_file_path): + self.csv_full_file_path = csv_full_file_path + self.site_id = site_id + self.api_key = api_key + self.csv_full_file_path = os.path.normpath(csv_full_file_path) + self.cio_api_client = CustomerIO(self.site_id, self.api_key, retries=5) + + def _read_csv(self): + with open(self.csv_full_file_path, 'r') as csv: + for line in csv: + yield line + + def _send_event_to_cio(self, data): + """Assuming the CSV row has customer-id in the first column and event name is in the second column""" + try: + self.cio_api_client.track(customer_id=data[0], name=data[1]) + except CustomerIOException as e: + # log the exception and move to processing the next row + logger.exception(e.message) + + def pre_process(self): + """Use this function to do some tasks before the CSV is processed""" + pass + + def process(self, no_of_processes=4): + """ + First runs some pre-processing. Read the CSV and send data from each row to CustomerIO. Then it finally + runs some post-processing. + """ + self.pre_process() + csv = self._read_csv() + pool = Pool(processes=no_of_processes) + for row in csv: + pool.map(self._send_event_to_cio, (row,)) + secs = 0.03 # rate limiting 30 requests per sec (i.e. 1 / 30 = 0.03) + time.sleep(secs) + self._post_process() + pool.close() + pool.join() + + def _post_process(self): + """Use this function to do some tasks after the CSV is processed""" + pass + diff --git a/examples/processing_large_csv/readme.MD b/examples/processing_large_csv/readme.MD new file mode 100644 index 0000000..5b81fde --- /dev/null +++ b/examples/processing_large_csv/readme.MD @@ -0,0 +1,23 @@ +# Sending event data to CustomerIO from CSVs + + +Let's say you have large amounts of data in CSV. And you need to process +each row, and then send data from that row to CustomerIO. + +Well, there are many ways one to do this. But we have given an +example in this directory in a this file `cio_csv_processor.py`. + +To run the code, you will need the following data: +* Site-ID and API key from your CustomerIO workspace +* Full CSV file path + + +And then run it like this: +```` +csv_processor = CIO_CSVProcessor(site_id, api_key, full_file_path_in_str) +csv_processor.process() +```` + +Behind the scenes, the CSV will be opened, and data will be sent to CustomerIO. To speed +up the process of sending data, parallelism is demonstrated using Python's `Multiprocessing` module. + From 4b57f7402ca57b46f2896a45d2d6273898d6c5ac Mon Sep 17 00:00:00 2001 From: Waqas Younas Date: Fri, 3 Jul 2020 17:06:24 +0500 Subject: [PATCH 2/3] Some refactoring of the CSV example and also improved the README file for the CSV example --- .../processing_large_csv/cio_csv_processor.py | 58 ------------------ .../processing_large_csv/cio_event_manager.py | 61 +++++++++++++++++++ examples/processing_large_csv/readme.MD | 16 ++--- 3 files changed, 70 insertions(+), 65 deletions(-) delete mode 100644 examples/processing_large_csv/cio_csv_processor.py create mode 100644 examples/processing_large_csv/cio_event_manager.py diff --git a/examples/processing_large_csv/cio_csv_processor.py b/examples/processing_large_csv/cio_csv_processor.py deleted file mode 100644 index a9d2c27..0000000 --- a/examples/processing_large_csv/cio_csv_processor.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -Sample code to demonstrate sending event data to CustomerIO using large CSVs. Created using Python3. -""" - -import time -import logging -import os -from multiprocessing.pool import ThreadPool as Pool -from customerio import CustomerIO, CustomerIOException - -logger = logging.getLogger(__name__) - - -class CIO_CSVProcessor(object): - def __init__(self, site_id, api_key, csv_full_file_path): - self.csv_full_file_path = csv_full_file_path - self.site_id = site_id - self.api_key = api_key - self.csv_full_file_path = os.path.normpath(csv_full_file_path) - self.cio_api_client = CustomerIO(self.site_id, self.api_key, retries=5) - - def _read_csv(self): - with open(self.csv_full_file_path, 'r') as csv: - for line in csv: - yield line - - def _send_event_to_cio(self, data): - """Assuming the CSV row has customer-id in the first column and event name is in the second column""" - try: - self.cio_api_client.track(customer_id=data[0], name=data[1]) - except CustomerIOException as e: - # log the exception and move to processing the next row - logger.exception(e.message) - - def pre_process(self): - """Use this function to do some tasks before the CSV is processed""" - pass - - def process(self, no_of_processes=4): - """ - First runs some pre-processing. Read the CSV and send data from each row to CustomerIO. Then it finally - runs some post-processing. - """ - self.pre_process() - csv = self._read_csv() - pool = Pool(processes=no_of_processes) - for row in csv: - pool.map(self._send_event_to_cio, (row,)) - secs = 0.03 # rate limiting 30 requests per sec (i.e. 1 / 30 = 0.03) - time.sleep(secs) - self._post_process() - pool.close() - pool.join() - - def _post_process(self): - """Use this function to do some tasks after the CSV is processed""" - pass - diff --git a/examples/processing_large_csv/cio_event_manager.py b/examples/processing_large_csv/cio_event_manager.py new file mode 100644 index 0000000..cd8f091 --- /dev/null +++ b/examples/processing_large_csv/cio_event_manager.py @@ -0,0 +1,61 @@ +""" +Sample code to demonstrate sending event data to CustomerIO using large CSVs. This script uses Python3. +""" +import csv +import time +import logging +import os +from multiprocessing.pool import ThreadPool as Pool +from customerio import CustomerIO, CustomerIOException + +logger = logging.getLogger(__name__) + + +class CIOEventManager(object): + def __init__(self, cio_site_id, cio_api_key, csv_full_file_path): + self.csv_full_file_path = csv_full_file_path + self.site_id = cio_site_id + self.api_key = cio_api_key + self.csv_full_file_path = os.path.normpath(csv_full_file_path) + self.cio_api_client = CustomerIO(self.site_id, self.api_key, retries=5) + self.event_name = None + + def _read_csv(self): + with open(self.csv_full_file_path, 'r') as csv_file: + data = csv.DictReader(csv_file) + for line in data: + yield line + + def _send_data_to_cio(self, data): + try: + # Assume each CSV row looks like + # {'age': '23', 'created_at': '', 'location': 'Boston', 'id': 'id: 1', 'unsubscribed': 'false'} + self.cio_api_client.track(customer_id=data['id'], name=self.event_name) + except CustomerIOException as e: + # log the CSV row and exception and move to processing the next row + logger.exception("Failed to send the row data: {row}. Error message={message}".format(row=data, + message=e.message)) + + def pre_process(self): + """Use this function to do some tasks before the CSV is processed""" + pass + + def send_events(self, no_of_processes=4): + """ + First runs some pre-processing. Read the CSV and send data from each row to CustomerIO. Then it finally + runs some post-processing. + """ + self.pre_process() + csv = self._read_csv() + pool = Pool(processes=no_of_processes) + for row in csv: + pool.map(self._send_data_to_cio, (row,)) + rate_limit_in_secs = 0.03 # rate limiting 30 requests per sec (i.e. 1 / 30 = 0.03) + time.sleep(rate_limit_in_secs) + self._post_process() + pool.close() + pool.join() + + def _post_process(self): + """Use this function to do some tasks after the CSV is processed""" + pass diff --git a/examples/processing_large_csv/readme.MD b/examples/processing_large_csv/readme.MD index 5b81fde..bac1911 100644 --- a/examples/processing_large_csv/readme.MD +++ b/examples/processing_large_csv/readme.MD @@ -4,20 +4,22 @@ Let's say you have large amounts of data in CSV. And you need to process each row, and then send data from that row to CustomerIO. -Well, there are many ways one to do this. But we have given an -example in this directory in a this file `cio_csv_processor.py`. +Well, there are many ways one to do this. But we have demonstrated one way to do that +in the file `cio_event_manager.py`. To run the code, you will need the following data: -* Site-ID and API key from your CustomerIO workspace +* Site-ID and API Key from your CustomerIO workspace +* Event name * Full CSV file path -And then run it like this: +And then run the code like this: ```` -csv_processor = CIO_CSVProcessor(site_id, api_key, full_file_path_in_str) -csv_processor.process() +cio_event_manager = CIOEventManager(site_id, api_key, full_file_path_in_str) +cio_event_manager.event_name = 'purchase' +cio_event_manager.send_events() ```` -Behind the scenes, the CSV will be opened, and data will be sent to CustomerIO. To speed +Behind the scenes, the CSV will be read, and data will be sent to CustomerIO. To speed up the process of sending data, parallelism is demonstrated using Python's `Multiprocessing` module. From 73eaa834d9ee1a542952bb827725e52ef9358378 Mon Sep 17 00:00:00 2001 From: Waqas Younas Date: Fri, 3 Jul 2020 20:44:43 +0500 Subject: [PATCH 3/3] Removing some redudant code in CSV example, and also improved some comments --- examples/processing_large_csv/cio_event_manager.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/examples/processing_large_csv/cio_event_manager.py b/examples/processing_large_csv/cio_event_manager.py index cd8f091..d770078 100644 --- a/examples/processing_large_csv/cio_event_manager.py +++ b/examples/processing_large_csv/cio_event_manager.py @@ -1,5 +1,5 @@ """ -Sample code to demonstrate sending event data to CustomerIO using large CSVs. This script uses Python3. +Sample code to demonstrate sending data, contained in CSVs, to CustomerIO as events. This script uses Python3. """ import csv import time @@ -13,7 +13,6 @@ class CIOEventManager(object): def __init__(self, cio_site_id, cio_api_key, csv_full_file_path): - self.csv_full_file_path = csv_full_file_path self.site_id = cio_site_id self.api_key = cio_api_key self.csv_full_file_path = os.path.normpath(csv_full_file_path) @@ -37,7 +36,7 @@ def _send_data_to_cio(self, data): message=e.message)) def pre_process(self): - """Use this function to do some tasks before the CSV is processed""" + """Use this method to do some tasks before the CSV is processed""" pass def send_events(self, no_of_processes=4): @@ -57,5 +56,5 @@ def send_events(self, no_of_processes=4): pool.join() def _post_process(self): - """Use this function to do some tasks after the CSV is processed""" + """Use this method to do some tasks after the CSV is processed""" pass