
This is the seventh post of our series on building a self learning recommendation system using reinforcement learning. This series consists of 8 posts where in we progressively build a self learning recommendation system.
- Recommendation system and reinforcement learning primer
- Introduction to multi armed bandit problem
- Self learning recommendation system as a K-armed bandit
- Build the prototype of the self learning recommendation system : Part I
- Build the prototype of the self learning recommendation system : Part II
- Productionising the self learning recommendation system : Part I – Customer Segmentation
- Productionising the self learning recommendation system: Part II – Implementing self learning recommendation ( This Post )
- Evaluating different deployment options for the self learning recommendation systems.
This post builds on the previous post where we started off with productionizing the application using python scripts. In the last post we completed the customer segmentation part. In this post we continue from where we left off and then build the self learning system using python scripts. Let us get going.
Creation of States
Let us take a quick recap of the project structure and what we covered in the last post.

In the last post we were in the early part of our main driver file rlRecoMain.py
. We explored rfmMaker
class in file rfmProcess.py
from the processes
directory. We will now explore selfLearnProcess.py
file in the same directory.
Open a new file and name it selfLearnProcess.py
and insert the following code
import pandas as pd
from numpy.random import normal as GaussianDistribution
from collections import OrderedDict
from collections import Counter
import operator
from random import sample
import numpy as np
from pymongo import MongoClient
client = MongoClient(port=27017)
db = client.rlRecomendation
class rlLearn:
def __init__(self,custDetails,conf):
# Get the date as a seperate column
custDetails['Date'] = custDetails['Parse_date'].apply(lambda x: x.strftime("%d"))
# Converting date to float for easy comparison
custDetails['Date'] = custDetails['Date'].astype('float64')
# Get the period of month column
custDetails['monthPeriod'] = custDetails['Date'].apply(lambda x: int(x > conf['monthPer']))
# Aggregate the custDetails to get a distribution of rewards
rewardFull = custDetails.groupby(['Segment', 'Month', 'monthPeriod', 'Day', conf['product_id']])[conf['prod_qnty']].agg(
'sum').reset_index()
# Get these data frames for all methods
self.custDetails = custDetails
self.conf = conf
self.rewardFull = rewardFull
# Defining some dictionaries for storing the values
self.countDic = {} # Dictionary to store the count of products
self.polDic = {} # Dictionary to store the value distribution
self.rewDic = {} # Dictionary to store the reward distribution
self.recoCountdic = {} # Dictionary to store the recommendation counts
# Method to find unique values of each of the variables
def uniqeVars(self):
# Finding unique value for each of the variables
segments = list(self.rewardFull.Segment.unique())
months = list(self.rewardFull.Month.unique())
monthPeriod = list(self.rewardFull.monthPeriod.unique())
days = list(self.rewardFull.Day.unique())
return segments,months,monthPeriod,days
# Method to consolidate all products
def prodConsolidator(self):
# Get all the unique values of the variables
segments, months, monthPeriod, days = self.uniqeVars()
# Creating the consolidated dictionary
for seg in segments:
for mon in months:
for period in monthPeriod:
for day in days:
# Get the subset of the data
subset1 = self.rewardFull[(self.rewardFull['Segment'] == seg) & (self.rewardFull['Month'] == mon) & (
self.rewardFull['monthPeriod'] == period) & (self.rewardFull['Day'] == day)]
# INitializing a temporary dictionary to storing in mongodb
tempDic = {}
# Check if the subset is valid
if len(subset1) > 0:
# Iterate through each of the subset and get the products and its quantities
stateId = str(seg) + '_' + mon + '_' + str(period) + '_' + day
# Define a dictionary for the state ID
self.countDic[stateId] = {}
tempDic[stateId] = {}
for i in range(len(subset1.StockCode)):
# Store in the Count dictionary
self.countDic[stateId][subset1.iloc[i]['StockCode']] = int(subset1.iloc[i]['Quantity'])
tempDic[stateId][subset1.iloc[i]['StockCode']] = int(subset1.iloc[i]['Quantity'])
# Dumping each record into mongo db
db.rlQuantdic.insert(tempDic)
# Consolidate the rewards and value functions based on the quantities
for key in self.countDic.keys():
# Creating two temporary dictionaries for loading in Mongodb
tempDicpol = {}
tempDicrew = {}
# First get the dictionary of products for a state
prodCounts = self.countDic[key]
self.polDic[key] = {}
self.rewDic[key] = {}
# Initializing temporary dictionaries also
tempDicpol[key] = {}
tempDicrew[key] = {}
# Update the policy values
for pkey in prodCounts.keys():
# Creating the value dictionary using a Gaussian process
self.polDic[key][pkey] = GaussianDistribution(loc=prodCounts[pkey], scale=1, size=1)[0].round(2)
tempDicpol[key][pkey] = self.polDic[key][pkey]
# Creating a reward dictionary using a Gaussian process
self.rewDic[key][pkey] = GaussianDistribution(loc=prodCounts[pkey], scale=1, size=1)[0].round(2)
tempDicrew[key][pkey] = self.rewDic[key][pkey]
# Dumping each of these in mongo db
db.rlRewarddic.insert(tempDicrew)
db.rlValuedic.insert(tempDicpol)
print('[INFO] Dumped the quantity dictionary,policy and rewards in MongoDB')
As usual we start with import of the libraries we want from lines 1-7. In this implementation we make a small deviation from the prototype which we developed in the previous post. During the prototyping phase we predominantly relied on dictionaries to store data. However here we would be storing data in Mongo DB. Those of you who are not fully conversant with MongoDB can refer to some good tutorials on MongDB like the one here. I will also be explaining the key features as and when required. In line 8, we import the MongoClient which is required for connections with the data base. We then define the client using the default port number ( 27017 ) in line 9 and then name the data base where we will store the recommendation in line 10. The name of the database we have selected is rlRecomendation
. You are free to choose any name of your choice.
Let us now explore the rlLearn
class. The constructor of the class which starts from line 15, takes the custDetails
data frame and the configuration file as inputs. You would already be familiar with lines 17-23 from our prototyping phase, where we extract information to create states and then consolidate the data frame to get the quantities of each state. In lines 30-33, we create dictionaries where we store the relevant information like count of products, value distribution, reward distribution and the number of times the products are recommended.
The main method within the rlLearn
class is the prodConslidator()
method in lines 45-95. We have seen the details of this method in the prototyping phase. Just to recap, in this method we iterate through each of the components of our states and then store the quantities of each product under the state in the dictionaries. However there is a subtle difference from what we did during the prototyping phase. Here we are inserting each state and its associated products in Mongodb data base we created, as shown in line 70, 93 and 94. We create a temporary dictionary in line 57 to dump each state into Mongodb. We also store the data in the dictionaries,as we did during the prototyping phase, so that we get the data for other methods in this class. The final outcome from this method, is the creation of the count dictionary, value dictionary and reward dictionary from our data and updation of this data in Mongodb.
This takes us to the end of the rlLearn
class.
We now go back to the driver file rlRecoMain.py and the explore the next important class rlRecomend
.
The rlRecomend
class has the methods which are required for recommending products. This class has many methods and therefore we will go one by one through each of the methods. We have seen all these methods during the prototyping phase and therefore we will not get into detailed explanation of these methods here. For detailed explanation you can refer to the previous post.
Now on the selfLearnProcess.py
start adding the code pertaining to the rlRecomend
class.
class rlRecomend:
def __init__(self, custDetails, conf):
# Get the date as a seperate column
custDetails['Date'] = custDetails['Parse_date'].apply(lambda x: x.strftime("%d"))
# Converting date to float for easy comparison
custDetails['Date'] = custDetails['Date'].astype('float64')
# Get the period of month column
custDetails['monthPeriod'] = custDetails['Date'].apply(lambda x: int(x > conf['monthPer']))
# Aggregate the custDetails to get a distribution of rewards
rewardFull = custDetails.groupby(['Segment', 'Month', 'monthPeriod', 'Day', conf['product_id']])[
conf['prod_qnty']].agg(
'sum').reset_index()
# Get these data frames for all methods
self.custDetails = custDetails
self.conf = conf
self.rewardFull = rewardFull
The above code is for the constructor of the class ( lines 97 – 112 ), which is similar to the constructor of the rlLearn
class. Here we consolidate the custDetails data frame and get the count of each products for the respective state.
Let us now look at the next two methods. Add the following code to the class we earlier created.
# Method to find unique values of each of the variables
def uniqeVars(self):
# Finding unique value for each of the variables
segments = list(self.rewardFull.Segment.unique())
months = list(self.rewardFull.Month.unique())
monthPeriod = list(self.rewardFull.monthPeriod.unique())
days = list(self.rewardFull.Day.unique())
return segments, months, monthPeriod, days
# Method to sample a state
def stateSample(self):
# Get the unique state elements
segments, months, monthPeriod, days = self.uniqeVars()
# Get the context of the customer. For the time being let us randomly select all the states
seg = sample(segments, 1)[0] # Sample the segment
mon = sample(months, 1)[0] # Sample the month
monthPer = sample([0, 1], 1)[0] # sample the month period
day = sample(days, 1)[0] # Sample the day
# Get the state id by combining all these samples
stateId = str(seg) + '_' + mon + '_' + str(monthPer) + '_' + day
self.seg = seg
return stateId
The first method , lines 115 – 121, is to get the unique values of segments, months, month-period and days. This information will be used in some of the methods we will see later on. The second method detailed in lines 124-135, is to sample a state id, through random sampling of the components of a state.
The next methods we will explore are to initialise dictionaries if a state id has not been seen earlier. The first method initialises dictionaries and the second method inserts a recommendation collection record in MongoDB if the state dosent exist. Let us see the code for these methods.
# Method to initialize a dictionary in case a state Id is not available
def collfinder(self,stateId,countDic,polDic,rewDic,recoCountdic):
# Defining some dictionaries for storing the values
self.countDic = countDic # Dictionary to store the count of products
self.polDic = polDic # Dictionary to store the value distribution
self.rewDic = rewDic # Dictionary to store the reward distribution
self.recoCountdic = recoCountdic # Dictionary to store the recommendatio
self.stateId = stateId
print("[INFO] The current state is :", stateId)
if self.countDic is None:
print("[INFO] State ID do not exist")
self.countDic = {}
self.countDic[stateId] = {}
self.polDic = {}
self.polDic[stateId] = {}
self.rewDic = {}
self.rewDic[stateId] = {}
if self.recoCountdic is None:
self.recoCountdic = {}
self.recoCountdic[stateId] = {}
else:
self.recoCountdic[stateId] = {}
# Method to update the recommendation dictionary
def recoCollChecker(self):
print("[INFO] Inside the recommendation collection")
recoCol = db.rlRecotrack.find_one({self.stateId: {'$exists': True}})
if recoCol is None:
print("[INFO] Inserting the record in the recommendation collection")
db.rlRecotrack.insert_one({self.stateId: {}})
return recoCol
The inputs to the first method, as in line 138 are the state Id and all the other 4 dictionaries we extract from Mongo DB, which we will see later on in the main script rlRecoMain.py
. If no record exists for a specific state Id, the dictionaries we extract from Mongo DB would be null and therefore we need to initialize these dictionaries for storing all the values of products, its values, rewards and the count of recommendations. The initialisation of these dictionaries are implemented in this method from lines 146-158.
The second initialisation method is to check for the recommendation count dictionary for a specific state Id. We first check for the state Id in the collection in line 163. If the record dosent exist then we insert a blank dictionary for that state in line 166.
Let us now look at the next two methods in the class
# Create a function to get a list of products for a certain segment
def segProduct(self,seg, nproducts):
# Get the list of unique products for each segment
seg_products = list(self.rewardFull[self.rewardFull['Segment'] == seg]['StockCode'].unique())
seg_products = sample(seg_products, nproducts)
return seg_products
# This is the function to get the top n products based on value
def sortlist(self,nproducts,seg):
# Get the top products based on the values and sort them from product with largest value to least
topProducts = sorted(self.polDic[self.stateId].keys(), key=lambda kv: self.polDic[self.stateId][kv])[-nproducts:][::-1]
# If the topProducts is less than the required number of products nproducts, sample the delta
while len(topProducts) < nproducts:
print("[INFO] top products less than required number of products")
segProducts = self.segProduct(seg, (nproducts - len(topProducts)))
newList = topProducts + segProducts
# Finding unique products
topProducts = list(OrderedDict.fromkeys(newList))
return topProducts
The method in lines 171-175 is to sample a list of products for a segment. This method is used incase the number of products in a particular state is less than the total number of products which we want to recommend. In such cases, we randomly sample some products from the list of all products bought by customers in that segment and then add it to the list of products we want to recommend. We will see this in action in sortlist
method (lines 178-188).
The sortlist
method, sorts the list of products based on the demand for that product and the returns the list of top products. The inputs to this method are the number of products we want to be recommended and the segment ( line 178 ). We then get the top ‘n
‘ products by sorting the value dictionary based on the number of times a product is bought as in line 180. If the number of products is less than the required products, sampling of products is done using the segProduct
method we saw earlier. The final list of top products is then returned by this method.
The next method which we are going to explore is the one which controls the exploration and exploitation process thereby generating a list of products to be recommended. Let us add the following code to the class.
# This is the function to create the number of products based on exploration and exploitation
def sampProduct(self,seg, nproducts,epsilon):
# Initialise an empty list for storing the recommended products
seg_products = []
# Get the list of unique products for each segment
Segment_products = list(self.rewardFull[self.rewardFull['Segment'] == seg]['StockCode'].unique())
# Get the list of top n products based on value
topProducts = self.sortlist(nproducts,seg)
# Start a loop to get the required number of products
while len(seg_products) < nproducts:
# First find a probability
probability = np.random.rand()
if probability >= epsilon:
# print(topProducts)
# The top product would be first product in the list
prod = topProducts[0]
# Append the selected product to the list
seg_products.append(prod)
# Remove the top product once appended
topProducts.pop(0)
# Ensure that seg_products is unique
seg_products = list(OrderedDict.fromkeys(seg_products))
else:
# If the probability is less than epsilon value randomly sample one product
prod = sample(Segment_products, 1)[0]
seg_products.append(prod)
# Ensure that seg_products is unique
seg_products = list(OrderedDict.fromkeys(seg_products))
return seg_products
The inputs to the method are the segment, number of products to be recommended and the epsilon value which determines exploration and exploitation as shown in line 191. In line 195, we get the list of the products for the segment. This list is from where products are sampled during the exploration phase. We also get the list of top products which needs to be recommended in line 197, using the sortlist
method we defined earlier. In lines 199-218 we implement the exploitation and exploration processes we discussed during the prototyping phase and finally we return the list of top products for recommendation.
The next method which we will explore is the one to update dictionaries after the recommendation process.
# This is the method for updating the dictionaries after recommendation
def dicUpdater(self,prodList, stateId):
for prod in prodList:
# Check if the product is in the dictionary
if prod in list(self.countDic[stateId].keys()):
# Update the count by 1
self.countDic[stateId][prod] += 1
else:
self.countDic[stateId][prod] = 1
if prod in list(self.recoCountdic[stateId].keys()):
# Update the recommended products with 1
self.recoCountdic[stateId][prod] += 1
else:
# Initialise the recommended products as 1
self.recoCountdic[stateId][prod] = 1
if prod not in list(self.polDic[stateId].keys()):
# Initialise the value as 0
self.polDic[stateId][prod] = 0
if prod not in list(self.rewDic[stateId].keys()):
# Initialise the reward dictionary as 0
self.rewDic[stateId][prod] = GaussianDistribution(loc=0, scale=1, size=1)[0].round(2)
print("[INFO] Completed the initial dictionary updates")
The inputs to this method, as in line 221, are the list of products to be recommended and the state Id. From lines 222-234, we iterate through each of the recommended product and increament the count in the dictionary if the product exists in the dictionary or initialize the count to 1 if the product wasnt available. Later on in lines 235-240, we initialise the value dictionary and the reward dictionary if the products are not available in them.
The next method we will see is the one for initializing the dictionaries in case the context dosent exist.
def dicAdder(self,prodList, stateId):
# Loop through the product list
for prod in prodList:
# Initialise the count as 1
self.countDic[stateId][prod] = 1
# Initialise the value as 0
self.polDic[stateId][prod] = 0
# Initialise the recommended products as 1
self.recoCountdic[stateId][prod] = 1
# Initialise the reward dictionary as 0
self.rewDic[stateId][prod] = GaussianDistribution(loc=0, scale=1, size=1)[0].round(2)
print("[INFO] Completed the dictionary initialization")
# Next update the collections with the respective updates
# Updating the quantity collection
db.rlQuantdic.insert_one({stateId: self.countDic[stateId]})
# Updating the recommendation tracking collection
db.rlRecotrack.insert_one({stateId: self.recoCount[stateId]})
# Updating the value function collection for the products
db.rlValuedic.insert_one({stateId: self.polDic[stateId]})
# Updating the rewards collection
db.rlRewarddic.insert_one({stateId: self.rewDic[stateId]})
print('[INFO] Completed updating all the collections')
If the state Id dosent exist, the dictionaries are initialised as seen in lines 147-155. Once the dictionaries are initialised, MongoDb data bases are updated in lines 259-265.
The next method which we are going to explore is one of the main methods which integrates all the methods we have seen so far. This methods implements the recomendation process. Let us explore this method.
# Method to sample a stateID and then initialize the dictionaries
def rlRecommender(self):
# First sample a stateID
stateId = self.stateId
# Start the recommendation process
if len(self.polDic[stateId]) > 0:
print("The context exists")
# Implement the sampling of products based on exploration and exploitation
seg_products = self.sampProduct(self.seg, self.conf["nProducts"],self.conf["epsilon"])
# Check if the recommendation count collection exist
recoCol = self.recoCollChecker()
print('Recommendation collection existing :',recoCol)
# Update the dictionaries of values and rewards
self.dicUpdater(seg_products, stateId)
else:
print("The context dosent exist")
# Get the list of relavant products
seg_products = self.segProduct(self.seg, conf["nProducts"])
# Add products to the value dictionary and rewards dictionary
self.dicAdder(seg_products, stateId)
print("[INFO] Completed the recommendation process")
return seg_products
The first step in the process is to get the state Id ( line 271 ) based on which we have to do all the recommendations. Once we have the state Id, we check if it is an existing state id in line 273. If it is an existing state Id we get the list of ‘n’ products for recommendation using the sampProduct
method we saw earlier, where we implement exploration and exploitation. Once we get the products we initialise the recommendation collection in line 278. Finally we update all dictionaries using the dicUpdater
method in line 281.
From lines 282-287, we implement a similar process when the state Id dosent exist. The only difference in this case is in the initialisation of the dictionaries in line 287, where we use the dicAdder
method.
Once we complete the recommendation process, we get into simulating the customer action.
# Function to initiate customer action
def custAction(self,segproducts):
print('[INFO] getting the customer action')
# Sample a value to get how many products will be clicked
click_number = np.random.choice(np.arange(0, 10),
p=[0.50, 0.35, 0.10, 0.025, 0.015, 0.0055, 0.002, 0.00125, 0.00124, 0.00001])
# Sample products which will be clicked based on click number
click_list = sample(segproducts, click_number)
# Sample for buy values
buy_number = np.random.choice(np.arange(0, 10),
p=[0.70, 0.15, 0.10, 0.025, 0.015, 0.0055, 0.002, 0.00125, 0.00124, 0.00001])
# Sample products which will be bought based on buy number
buy_list = sample(segproducts, buy_number)
return click_list, buy_list
Lines 296-305 implements the processes for simulating the list of products which are bought and browsed by the customer based on the recommendation we made. The method returns the list of products which were browsed through and also the one which were bought. For detailed explanations on these methods please refer the previous post
The next methods we will explore are the ones related to the value updation of the recommendation system.
def getReward(self,loc):
rew = GaussianDistribution(loc=loc, scale=1, size=1)[0].round(2)
return rew
def saPolicy(self,rew, prod):
# This function gets the relavant algorithm for the policy update
# Get the current value of the state
vcur = self.polDic[self.stateId][prod]
# Get the counts of the current product
n = self.recoCountdic[self.stateId][prod]
# Calculate the new value
Incvcur = (1 / n) * (rew - vcur)
return Incvcur
The getReward
method on line 309 is to generate a reward from a gaussian distribution centred around the reward value. We will see the use of this method in subsequent methods.
The saPolicy
method in lines 313-321 updates the value of the state based on the simple averaging method in line 320. We have already seen these methods in our prototyping phase in the previous post.
Next we will see the method which uses both the above methods.
def valueUpdater(self,seg_products, loc, custList, remove=True):
for prod in custList:
# Get the reward for the bought product. The reward will be centered around the defined reward for each action
rew = self.getReward(loc)
# Update the reward in the reward dictionary
self.rewDic[self.stateId][prod] += rew
# Update the policy based on the reward
Incvcur = self.saPolicy(rew, prod)
self.polDic[self.stateId][prod] += Incvcur
# Remove the bought product from the product list
if remove:
seg_products.remove(prod)
return seg_products
The inputs to this method are the recommended list of products, the mean reward ( click, buy or ignore), the corresponding list ( click list or buy list) and a flag to indicate if the product has to be removed from the recommendation list or not.
We interate through all the products in the customer action list in line 324 and then gets the reward in line 326. Once the reward is incremented in the reward dictionary in line 328, we get the incremental value in line 330 and this is updated in the value dictionary in line 331. If the flag is True, we remove the product from the recommended list and the finally returns the remaining recommendation list.
The next method is the last of the methods and ties the above three methods with the customer action.
# Function to update the reward dictionary and the value dictionary based on customer action
def rewardUpdater(self, seg_products,custBuy=[], custClick=[]):
# Check if there are any customer purchases
if len(custBuy) > 0:
seg_products = self.valueUpdater(seg_products, self.conf['buyReward'], custBuy)
# Repeat the same process for customer click
if len(custClick) > 0:
seg_products = self.valueUpdater(seg_products, self.conf['clickReward'], custClick)
# For those products not clicked or bought, give a penalty
if len(seg_products) > 0:
custList = seg_products.copy()
seg_products = self.valueUpdater(seg_products, -2, custList,False)
# Next update the collections with the respective updates
print('[INFO] Updating all the collections')
# Updating the quantity collection
db.rlQuantdic.replace_one({self.stateId: {'$exists': True}}, {self.stateId: self.countDic[self.stateId]})
# Updating the recommendation tracking collection
db.rlRecotrack.replace_one({self.stateId: {'$exists': True}}, {self.stateId: self.recoCountdic[self.stateId]})
# Updating the value function collection for the products
db.rlValuedic.replace_one({self.stateId: {'$exists': True}}, {self.stateId: self.polDic[self.stateId]})
# Updating the rewards collection
db.rlRewarddic.replace_one({self.stateId: {'$exists': True}}, {self.stateId: self.rewDic[self.stateId]})
print('[INFO] Completed updating all the collections')
In lines 340-348, we update the value based on the number of products bought, clicked and ignored. Once the value dictionaries are updated, the respective MongoDb dictionaries are updated in lines 352-358.
With this we have covered all the methods which are required for implementing the self learning recommendation system. Let us summarise our learning so far in this post.
- Created the states and updated MongoDB with the states data. We used the historic data for initialisation of values.
- Implemented the recommendation process by getting a list of products to be recommended to the customer
- Explored customer response simulation wherein the customer response to the recommended products were implemented.
- Updated the value functions and reward functions after customer response
- Updated Mongo DB collections after the completion of the process for a customer.
What next ?
We are coming to the fag end of our series. The next post is where we tie all these methods together in the main driver file and see how these processes are implmented. We will also run the script on the terminal and observe the results. Once the application implementation is done, we will also explore avenues to deploy the application. Watch this space for the last post of the series.
Please subscribe to this blog post to get notifications when the next post is published.
You can also subscribe to our Youtube channel for all the videos related to this series.
The complete code base for the series is in the Bayesian Quest Git hub repository
Do you want to Climb the Machine Learning Knowledge Pyramid ?
Knowledge acquisition is such a liberating experience. The more you invest in your knowledge enhacement, the more empowered you become. The best way to acquire knowledge is by practical application or learn by doing. If you are inspired by the prospect of being empowerd by practical knowledge in Machine learning, subscribe to our Youtube channel
I would also recommend two books I have co-authored. The first one is specialised in deep learning with practical hands on exercises and interactive video and audio aids for learning
This book is accessible using the following links
The Deep Learning Workshop on Amazon
The Deep Learning Workshop on Packt
The second book equips you with practical machine learning skill sets. The pedagogy is through practical interactive exercises and activities.

This book can be accessed using the following links
The Data Science Workshop on Amazon
The Data Science Workshop on Packt
Enjoy your learning experience and be empowered !!!!