完整地实现了推荐系统的构建、实验和评估过程,为不同推荐算法在同一数据集
{"cells": [
{
"cell_type": "markdown",
"metadata": {
},
"source": [
"# 基于用户的协同过滤算法"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
},
"outputs": [],
"source": [
"# 导入包\n",
"import random\n",
"import math\n",
"import time\n",
"from tqdm import tqdm"
]
},
{
"cell_type": "markdown",
"metadata": {
},
"source": [
"## 一. 通用函数定义"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
},
"outputs": [],
"source": [
"# 定义装饰器,监控运行时间\n",
"def timmer(func):\n",
" def wrapper(*args, **kwargs):\n",
" start_time = time.time()\n",
" res = func(*args, **kwargs)\n",
" stop_time = time.time()\n",
" print('Func %s, run time: %s' % (func.__name__, stop_time - start_time))\n",
" return res\n",
" return wrapper"
]
},
{
"cell_type": "markdown",
"metadata": {
},
"source": [
"### 1. 数据处理相关\n",
"1. load data\n",
"2. split data"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
},
"outputs": [],
"source": [
"class Dataset():\n",
" \n",
" def __init__(self, fp):\n",
" # fp: data file path\n",
" self.data = self.loadData(fp)\n",
" \n",
" @timmer\n",
" def loadData(self, fp):\n",
" data = []\n",
" for l in open(fp):\n",
" data.append(tuple(map(int, l.strip().split('::')[:2])))\n",
" return data\n",
" \n",
" @timmer\n",
" def splitData(self, M, k, seed=1):\n",
" '''\n",
" :params: data, 加载的所有(user, item)数据条目\n",
" :params: M, 划分的数目,最后需要取M折的平均\n",
" :params: k, 本次是第几次划分,k~[0, M)\n",
" :params: seed, random的种子数,对于不同的k应设置成一样的\n",
" :return: train, test\n",
" '''\n",
" train, test = [], []\n",
" random.seed(seed)\n",
" for user, item in self.data:\n",
" # 这里与书中的不一致,本人认为取M-1较为合理,因randint是左右都覆盖的\n",
" if random.randint(0, M-1) == k:\n",
" test.append((user, item))\n",
" else:\n",
" train.append((user, item))\n",
"\n",
" # 处理成字典的形式,user->set(items)\n",
" def convert_dict(data):\n",
" data_dict = {}\n",
" for user, item in data:\n",
" if user not in data_dict:\n",
" data_dict = set()\n",
" data_dict.add(item)\n",
" data_dict = {k: list(data_dict) for k in data_dict}\n",
" return data_dict\n",
"\n",
" return convert_dict(train), convert_dict(test)"
]
},
{
"cell_type": "markdown",
"metadata": {
},
"source": [
"### 2. 评价指标\n",
"1. Precision\n",
"2. Recall\n",
"3. Coverage\n",
"4. Popularity(Novelty)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
},
"outputs": [],
"source": [
"class Metric():\n",
" \n",
" def __init__(self, train, test, GetRecommendation):\n",
" '''\n",
" :params: train, 训练数据\n",
" :params: test, 测试数据\n",
" :params: GetRecommendation, 为某个用户获取推荐物品的接口函数\n",
" '''\n",
" self.train = train\n",
" self.test = test\n",
" self.GetRecommendation = GetRecommendation\n",
" self.recs = self.getRec()\n",
" \n",
" # 为test中的每个用户进行推荐\n",
" def getRec(self):\n",
" recs = {}\n",
" for user in self.test:\n",
" rank = self.GetRecommendation(user)\n",
" recs = rank\n",
" return recs\n",
" \n",
" # 定义精确率指标计算方式\n",
" def precision(self):\n",
" all, hit = 0, 0\n",
" for user in self.test:\n",
" test_items = set(self.test)\n",
" rank = self.recs\n",
" for item, score in rank:\n",
" if item in test_items:\n",
" hit += 1\n",
" all += len(rank)\n",
" return round(hit / all * 100, 2)\n",
" \n",
" # 定义召回率指标计算方式\n",
" def recall(self):\n",
" all, hit = 0, 0\n",
" for user in self.test:\n",
" test_items = set(self.test)\n",
" rank = self.recs\n",
" for item, score in rank:\n",
" if item in test_items:\n",
" hit += 1\n",
" all += len(test_items)\n",
" return round(hit / all * 100, 2)\n",
" \n",
" # 定义覆盖率指标计算方式\n",
" def coverage(self):\n",
" all_item, recom_item = set(), set()\n",
" for user in self.test:\n",
" for item in self.train:\n",
" all_item.add(item)\n",
" rank = self.recs\n",
" for item, score in rank:\n",
" recom_item.add(item)\n",
" return round(len(recom_item) / len(all_item) * 100, 2)\n",
" \n",
" # 定义新颖度指标计算方式\n",
" def popularity(self):\n",
" # 计算物品的流行度\n",
" item_pop = {}\n",
" for user in self.train:\n",
" for item in self.train:\n",
" if item not in item_pop:\n",
" item_pop = 0\n",
" item_pop += 1\n",
"\n",
" num, pop = 0, 0\n",
" for user in self.test:\n",
" rank = self.recs\n",
" for item, score in rank:\n",
" # 取对数,防止因长尾问题带来的被流行物品所主导\n",
" pop += math.log(1 + item_pop)\n",
" num += 1\n",
" return round(pop / num, 6)\n",
" \n",
" def eval(self):\n",
" metric = {'Precision': self.precision(),\n",
" 'Recall': self.recall(),\n",
" 'Coverage': self.coverage(),\n",
" 'Popularity': self.popularity()}\n",
" print('Metric:', metric)\n",
" return metric"
]
},
{
"cell_type": "markdown",
"metadata": {
},
"source": [
"## 二. 算法实现\n",
"1. Random\n",
"2. MostPopular\n",
"3. UserCF\n",
"4. UserIIF"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
},
"outputs": [],
"source": [
"# 1. 随机推荐\n",
"def Random(train, K, N):\n",
" '''\n",
" :params: train, 训练数据集\n",
" :params: K, 可忽略\n",
" :params: N, 超参数,设置取TopN推荐物品数目\n",
" :return: GetRecommendation,推荐接口函数\n",
" '''\n",
" items = {}\n",
" for user in train:\n",
" for item in train:\n",
" items = 1\n",
" \n",
" def GetRecommendation(user):\n",
" # 随机推荐N个未见过的\n",
" user_items = set(train)\n",
" rec_items = {k: items for k in items if k not in user_items}\n",
" rec_items = list(rec_items.items())\n",
" random.shuffle(rec_items)\n",
" return rec_items[:N]\n",
" \n",
" return GetRecommendation"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
},
"outputs": [],
"source": [
"# 2. 热门推荐\n",
"def MostPopular(train, K, N):\n",
" '''\n",
" :params: train, 训练数据集\n",
" :params: K, 可忽略\n",
" :params: N, 超参数,设置取TopN推荐物品数目\n",
" :return: GetRecommendation, 推荐接口函数\n",
" '''\n",
" items = {}\n",
" for user in train:\n",
" for item in train:\n",
" if item not in items:\n",
" items = 0\n",
" items += 1\n",
" \n",
" def GetRecommendation(user):\n",
" # 随机推荐N个没见过的最热门的\n",
" user_items = set(train)\n",
" rec_items = {k: items for k in items if k not in user_items}\n",
" rec_items = list(sorted(rec_items.items(), key=lambda x: x, reverse=True))\n",
" return rec_items[:N]\n",
" \n",
" return GetRecommendation"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
},
"outputs": [],
"source": [
"# 3. 基于用户余弦相似度的推荐\n",
"def UserCF(train, K, N):\n",
" '''\n",
" :params: train, 训练数据集\n",
" :params: K, 超参数,设置取TopK相似用户数目\n",
" :params: N, 超参数,设置取TopN推荐物品数目\n",
" :return: GetRecommendation, 推荐接口函数\n",
" '''\n",
" # 计算item->user的倒排索引\n",
" item_users = {}\n",
" for user in train:\n",
" for item in train:\n",
" if item not in item_users:\n",
" item_users = []\n",
" item_users.append(user)\n",
" \n",
" # 计算用户相似度矩阵\n",
" sim = {}\n",
" num = {}\n",
" for item in item_users:\n",
" users = item_users\n",
" for i in range(len(users)):\n",
" u = users\n",
" if u not in num:\n",
" num = 0\n",
" num += 1\n",
" if u not in sim:\n",
" sim = {}\n",
" for j in range(len(users)):\n",
" if j == i: continue\n",
" v = users\n",
" if v not in sim:\n",
" sim = 0\n",
" sim += 1\n",
" for u in sim:\n",
" for v in sim:\n",
" sim /= math.sqrt(num * num)\n",
" \n",
" # 按照相似度排序\n",
" sorted_user_sim = {k: list(sorted(v.items(), \\\n",
" key=lambda x: x, reverse=True)) \\\n",
" for k, v in sim.items()}\n",
" \n",
" # 获取接口函数\n",
" def GetRecommendation(user):\n",
" items = {}\n",
" seen_items = set(train)\n",
" for u, _ in sorted_user_sim[:K]:\n",
" for item in train:\n",
" # 要去掉用户见过的\n",
" if item not in seen_items:\n",
" if item not in items:\n",
" items = 0\n",
" items += sim\n",
" recs = list(sorted(items.items(), key=lambda x: x, reverse=True))[:N]\n",
" return recs\n",
" \n",
" return GetRecommendation"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
},
"outputs": [],
"source": [
"# 4. 基于改进的用户余弦相似度的推荐\n",
"def UserIIF(train, K, N):\n",
" '''\n",
" :params: train, 训练数据集\n",
" :params: K, 超参数,设置取TopK相似用户数目\n",
" :params: N, 超参数,设置取TopN推荐物品数目\n",
" :return: GetRecommendation, 推荐接口函数\n",
" '''\n",
" # 计算item->user的倒排索引\n",
" item_users = {}\n",
" for user in train:\n",
" for item in train:\n",
" if item not in item_users:\n",
" item_users = []\n",
" item_users.append(user)\n",
" \n",
" # 计算用户相似度矩阵\n",
" sim = {}\n",
" num = {}\n",
" for item in item_users:\n",
" users = item_users\n",
" for i in range(len(users)):\n",
" u = users\n",
" if u not in num:\n",
" num = 0\n",
" num += 1\n",
" if u not in sim:\n",
" sim = {}\n",
" for j in range(len(users)):\n",
" if j == i: continue\n",
" v = users\n",
" if v not in sim:\n",
" sim = 0\n",
" # 相比UserCF,主要是改进了这里\n",
" sim += 1 / math.log(1 + len(users))\n",
" for u in sim:\n",
" for v in sim:\n",
" sim /&#
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]