How to build a collaborative filtering model for personalized recommendations
Recommendation model using TensorFlow and TensorFlow Transform
April 2020 update: Note that a much simpler way to do this now exists. Read this article on building a recommendations model using BigQuery ML.
In this article, I will step you through how to use TensorFlow’s Estimator API to build a WALS collaborative filtering model for product recommendations. Recently, my colleague Lukman Ramsey published a series of solutions detailing how to build a recommendation model — read those solutions for context on what recommendations are and how to set up an end-to-end system.
In this article, I’m replacing the use of Pandas in the original solution by Apache Beam — this will permit the solution to scale to larger datasets easier. Because the context exists in the solution, I’ll simply dive into the technical details here. The complete source code is on GitHub.
Step 1: Extract raw data
For collaborative filtering, we don’t need to know any attributes about either the users or the content. Essentially, all we need to know is userId, itemId, and rating that the particular user gave the particular item. In this case, we can use the time-spent on the page as a proxy for rating. Google Analytics 360 exports web traffic information to BigQuery, and it is from BigQuery that I extract the data:
#standardSQL
WITH visitor_page_content AS (
SELECT
fullVisitorID,
(SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS latestContentId,
(LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY hits.time ASC) - hits.time) AS session_duration
FROM `cloud-training-demos.GA360_test.ga_sessions_sample`,
UNNEST(hits) AS hits
WHERE
# only include hits on pages
hits.type = "PAGE"
GROUP BY
fullVisitorId, latestContentId, hits.time
)
# aggregate web stats
SELECT
fullVisitorID as visitorId,
latestContentId as contentId,
SUM(session_duration) AS session_duration
FROM visitor_page_content
WHERE latestContentId IS NOT NULL
GROUP BY fullVisitorID, latestContentId
HAVING session_duration > 0
ORDER BY latestContentId
The query itself is specific to the way that the newspaper had set up Google Analytics — specifically, the way they had set up custom dimensions — you might have to use a different query to extract your data into something that resembles this table:
This is the raw dataset that is needed to do collaborative filtering. Obviously, what you will use for visitorId, contentId and ratings will be dependent on your problem. Beyond this, everything else is quite standard and you should be able to use it as-is.
Step 2: Create enumerated user and item ids
The WALS algorithm requires that user ids and item ids be enumerated, i.e., they should simply be the row number and column number in an interactions matrix. So, we need to take the visitorId above, which is a string, and map them to 0, 1, 2, …. We need to do the same thing for item ids. Additionally, the rating has to be small numbers, typically 0–1. So, we’ll have to scale the session_duration.
To do this mapping, we will use TensorFlow Transform (TFT) — this is a library that allows you to create preprocessed datasets using Apache Beam for training and then turn around apply that preprocessing as part of your TensorFlow graph during inference!
Here’s the crux of my preprocessing function using TFT:
def preprocess_tft(rowdict):
median = 57937
result = {
'userId' : tft.string_to_int(rowdict['visitorId'], vocab_filename='vocab_users'),
'itemId' : tft.string_to_int(rowdict['contentId'], vocab_filename='vocab_items'),
'rating' : 0.3 * (1 + (rowdict['session_duration'] - median)/median)
}
# cap the rating at 1.0
result['rating'] = tf.where(tf.less(result['rating'], tf.ones(tf.shape(result['rating']))),
result['rating'], tf.ones(tf.shape(result['rating'])))
return result
The result of preprocessing a row from BigQuery consisting of visitorId, contentId and session_duration is a Python dictionary called result that contains three columns: userId, itemId and rating.
tft.string_to_int looks through the entire training dataset and creates a mapping to enumerate the visitors and writes the mapping (“the vocabulary”) to the file vocab_users. I do the same thing for the contentId, creating the itemId. The rating is obtained by scaling the session_duration to lie within 0–1. My scaling essentially clips off the long-tail of extremely long session durations which probably represent people who close their laptops while on a newspaper article. The key thing to note is that I do such clipping using pure TensorFlow functions like tf.less and tf.ones. This is important because this preprocessing function has to be applied during inference (prediction) as part of the TensorFlow serving graph.
The preprocess function is applied to the training dataset using Apache Beam:
transformed_dataset, transform_fn = (
raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
Step 3: Write out WALS training dataset
The training set for WALS consists of two files — one provides all the items rated by a specific user (the interaction matrix row-wise) and the other provides all the users who have rated a specific item (the interaction matrix column-wise). Obviously, the two files contain the same data, but it is necessary to split the dataset so that they can be processed in parallel. We can do this also in the same Apache Beam pipeline where we did the enumeration:
users_for_item = (transformed_data
| 'map_items' >> beam.Map(lambda x : (x['itemId'], x))
| 'group_items' >> beam.GroupByKey()
| 'totfr_items' >> beam.Map(lambda item_userlist : to_tfrecord(item_userlist, 'userId')))
We can then execute the Apache Beam pipeline on Cloud Dataflow. This is a fully managed service, so we don’t have to muck around with setting up infrastructure and installing software (see the notebook in GitHub for full code).
At this point, we’ll have the following files:
items_for_user-00000-of-00003
...
users_for_item-00000-of-00004
...transform_fn/transform_fn/saved_model.pb
transform_fn/transform_fn/assets/
transform_fn/transform_fn/assets/vocab_items
transform_fn/transform_fn/assets/vocab_users
- ```users_for_item``` contains all the users/ratings for each item in TFExample format. The items and users here are integers (not strings) i.e. itemId not contentId and userId not visitorId. The rating is scaled.
- ```items_for_user``` contains all the items/ratings for each user in TFExample format. The items and users here are integers (not strings) i.e. itemId not contentId and userId not visitorId. The rating is scaled.
- ```vocab_items``` contains the mapping from the contentId to the enumerated itemId
- ```vocab_users``` contains the mapping from the visitorId to the enumerated userId
- saved_model.pb contains all the tensorflow transformations that we did during preprocessing, so that they can be applied during prediction also.
Step 4: Write TensorFlow code
There is an Estimator API-based WALS implementation in TensorFlow. We use it the way we use any other Estimator — see the functions read_dataset() and train_and_evaluate() in the GitHub repo.
The more interesting thing is how we use the trained estimator for batch prediction. For a particular user, we want to find the top K items. That can be done in TensorFlow using:
def find_top_k(user, item_factors, k):
all_items = tf.matmul(tf.expand_dims(user, 0), tf.transpose(item_factors))
topk = tf.nn.top_k(all_items, k=k)
return tf.cast(topk.indices, dtype=tf.int64)
Batch prediction involves calling the above function for every user, but making sure that when we write out the output, we write out the string visitorId and not the number userId (and similarly for the contentId/userId):
def batch_predict(args):
import numpy as np
# read vocabulary into Python list for quick index-ed lookup
def create_lookup(filename):
from tensorflow.python.lib.io import file_io
dirname = os.path.join(args['input_path'], 'transform_fn/transform_fn/assets/')
with file_io.FileIO(os.path.join(dirname, filename), mode='r') as ifp:
return [x.rstrip() for x in ifp]
originalItemIds = create_lookup('vocab_items')
originalUserIds = create_lookup('vocab_users')
with tf.Session() as sess:
estimator = tf.contrib.factorization.WALSMatrixFactorization(
num_rows=args['nusers'], num_cols=args['nitems'],
embedding_dimension=args['n_embeds'],
model_dir=args['output_dir'])
# but for in-vocab data, the row factors are already in the checkpoint
user_factors = tf.convert_to_tensor(estimator.get_row_factors()[0]) # (nusers, nembeds)
# in either case, we have to assume catalog doesn't change, so col_factors are read in
item_factors = tf.convert_to_tensor(estimator.get_col_factors()[0])# (nitems, nembeds)
# for each user, find the top K items
topk = tf.squeeze(tf.map_fn(lambda user: find_top_k(user, item_factors, args['topk']),
user_factors, dtype=tf.int64))
with file_io.FileIO(os.path.join(args['output_dir'], 'batch_pred.txt'), mode='w') as f:
for userId, best_items_for_user in enumerate(topk.eval()):
f.write(originalUserIds[userId] + '\t') # write userId \t item1,item2,item3...
f.write(','.join(originalItemIds[itemId] for itemId in best_items_for_user) + '\n')
To do the training and batch prediction, we can run the TensorFlow model on Cloud ML Engine, again without mucking around with any infrastructure:
gcloud ml-engine jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=${PWD}/wals_tft/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=BASIC_GPU \
--runtime-version=1.5 \
-- \
--output_dir=$OUTDIR \
--input_path=gs://${BUCKET}/wals/preproc_tft \
--num_epochs=10 --nitems=5668 --nusers=82802
It’s kind of ugly to have to hardcode nitems and nusers like this. So, we can go back to our Beam pipeline and have it write out nitems and nusers also to files, and then simply do a “gsutil cat” to get the appropriate values — the full code on GitHub does this.
Here is an example of what the output looks like:
6167894456739729438 298997422,262707977,263058146
3795498541234027150 296993188,97034003,298989783
Essentially, you get 3 items for each visitorId.
Step 5: Row and column factors
While doing product recommendations is the key use case for WALS, another use case is to find low-dimensionality ways of representing products and users, for example, to do product or customer segmentation by clustering the item factors and column factors. So, we implement a serving function to provide back these to callers (again, see GitHub for full code):
def for_user_embeddings(originalUserId):
# convert the userId that the end-user provided to integer
originalUserIds = tf.contrib.lookup.index_table_from_file(
os.path.join(args['input_path'], 'transform_fn/transform_fn/assets/vocab_users'))
userId = originalUserIds.lookup(originalUserId)
# all items for this user (for user_embeddings)
items = tf.range(args['nitems'], dtype=tf.int64)
users = userId * tf.ones([args['nitems']], dtype=tf.int64)
ratings = 0.1 * tf.ones_like(users, dtype=tf.float32)
return items, users, ratings, tf.constant(True)
Orchestration
Note that this article is only about replacing the machine learning training and batch prediction parts of the original solution. The original solution also explains how to do orchestration and filtering. Where do they fit in?
At this point, we now have a BigQuery query, a Beam/Dataflow pipeline and potentially an AppEngine application (see below). How do you run them periodically one after the other? Use Apache Airflow as suggested in the solution for doing this orchestration.
Filtering
If you are recommending chocolates to customers, then it is okay to recommend a chocolate they have already tried, but if you are recommending newspaper articles to users, then it is important that you refrain from recommending articles they have already read.
My batch prediction code, unlike the original solution, does not filter out articles that the user has already read. If it is important that the recommendations not include already read/purchased items, then there are two ways you can do it.
The simpler method is to zero out the entries corresponding to already read items (here, items with a rating < 0.01) before finding the top_k:
def find_top_k(user, item_factors, read_items, k):
all_items = tf.matmul(tf.expand_dims(user, 0),
tf.transpose(item_factors))
all_items = tf.where(tf.less(read_items,
0.01*tf.ones(tf.shape(read_items))),
all_items,
tf.zeros(tf.shape(all_items)))
topk = tf.nn.top_k(all_items, k=k)
return tf.cast(topk.indices, dtype=tf.int64)
The problem with this is lag — you may not recommend items that the user read yesterday (because it is in your training dataset), but the batch prediction code does have access to the stream of read articles in real-time, so you will be recommending articles they read a few minutes ago.
If this lag is an issue that you want to avoid, then you should make the k in the batch prediction much higher (so that, for example, you are getting 20 articles from the recommender even if you are going to recommend only 5 of them) and then do a second-level of filtering in AppEngine, as suggested in the original solution.
Summary
You now have batch prediction, online prediction and training, all without setting up any cluster! Plus, TensorFlow Transform has allowed us to simplify computation of metadata and mapping of items/users to fit the WALS paradigm.
Thanks to my colleagues Lukman Ramsey and Yiliang Zhao for helpful comments and suggestions on this article.