Hadoop——基于物品的协同过滤算法实现商品推荐

2021/7/31 11:07:04

本文主要是介绍Hadoop——基于物品的协同过滤算法实现商品推荐,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

协同过滤算法:

基于物品的协同过滤算法主要有两步:

1、计算物品之间的相似度:可依据物品共现次数、余弦夹角、欧氏距离这三种方法计算得到物品之间的相似度。

2、根据物品的相似度和用户的历史购买记录给用户生成推荐列表

最终推荐的是什么物品,是由推荐度决定的。


核心:找出所有两两同时被购买商品出现的次数,
现在其中有用户购买了其中一个商品,推荐该商品组合另外一件商品


项目包结构:

在这里插入图片描述

项目第一步:模拟数据并建立对应的数据库表

//创建用户表
create table s_user(
id int primary key auto_increment,
name varchar(20),
age int,
phone varchar(20)
);
insert into s_user values(10001,'jake',20,'15023453003'),(10002,'rose',22,'18923452343'),(10003,'tom',21,'15113453001'),(10004,'briup',22,'18823452456'),(10005,'kevin',24,'15925671003'),(10006,'patel',28,'15983432459');


//创建商品表
create table s_product(
id int primary key auto_increment,
name varchar(20),
price double,
descrition varchar(100),
kc double
);
insert into s_product values(20001,'hadoop',89,'bigdata',1000),(20002,'hbase',36,'bigdata',110),(20003,'mysql',58,'bigdata',190),(20004,'sqoop',28,'bigdata',70),(20005,'flume',34,'bigdata',109),(20006,'kafka',29,'bigdata',78),(20007,'hive',31,'bigdata',83);


//创建订单表
create table s_order(
id int primary key auto_increment,
name varchar(20),
order_date timestamp default current_timestamp on update current_timestamp,
user_id int references s_user(id)
);
insert into s_order(id,name,user_id) values(1,'briup_store',10001),(2,'briup_store',10002),(3,'briup_store',10003),(4,'briup_store',10004),(5,'briup_store',10005),(6,'briup_store',10006),(7,'briup_store',10007);

//创建订单表和用户表之间的桥表
create table order_line(
order_id int references s_order(id),
product_id int references s_product(id),
num double,
primary key(order_id,product_id)
);
insert into order_line values(1,20001,1),(1,20002,1),(1,20005,1),(1,20006,1),(1,20007,1),(2,20003,1),(2,20004,1),(2,20006,1),(3,20002,1),(3,20007,1),(4,20001,1),(4,20002,1),(4,20005,1),(4,20006,1),(5,20001,1),(6,20004,1),(6,20007,1);

//创建最终形成商品推荐结果表
create table recommend(
uid int references s_user(id),
gid int references s_product(id),
nums double,
primary key(uid,gid)
);

第二步:将mysql中的数据迁移到hdfs分布式文件系统中

sqoop import --connect jdbc:mysql://192.168.43.158:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1

第三步:开始编程

@原始数据:用户	商品id  购买次数	
		10001	20001	1
        10001	20002	1
        10001	20005	1
        10001	20006	1
        10001	20007	1
        10002	20003	1
        10002	20004	1
        10002	20006	1
        10003	20002	1
        10003	20007	1
        10004	20001	1
        10004	20002	1
        10004	20005	1
        10004	20006	1
        10005	20001	1
        10006	20004	1
        10006	20007	1

step1:计算用户购买商品的列表

​ 数据来源于原始数据。

package com.briup.mr.one;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */
//step1:计算用户购买商品的列表
    //结果数据: 10001	20001,20005,20006,20007,20002
public class UserBuyGoodsList{


    //输入:10001	20001	1
    public static class UserBuyGoodsListMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Text outK = new Text();
        private Text outV = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");
            //设置输出的键为用户id
            outK.set(line[0]);
            outV.set(line[1]);
            context.write(outK, outV);
        }
    }

    public static class UserBuyGoodsListReducer extends Reducer<Text, Text, Text, Text> {
        private Text outV = new Text();

        结果数据: 10001	20001,20005,20006,20007,20002
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();

            for (Text value : values) {
                //拼接字符串
                sb.append(value.toString() + ",");
            }

            //将字符串最后的‘,’去掉
            sb.setLength(sb.length() - 1);

            outV.set(sb.toString());
            context.write(key, new Text(sb.toString()));
            outV.clear();
        }
    }

}

结果数据:

        10001	20001,20005,20006,20007,20002
        10002	20006,20003,20004
        10003	20002,20007
        10004	20001,20002,20005,20006
        10005	20001
        10006	20004,20007

step2:计算商品的共现关系

数据来源:第1步的计算结果

package com.briup.mr.two;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */
//计算商品的共现关系   即两两商品出现的组合有哪些     期望结果:20001	20001
//不需要reduce程序
public class GoodsCooccurrenceList {
    //使用sequencefileinputformat读取数据,读入的数据自动基于键和值分割
    public static class GoodsCooccurrenceListMapper extends Mapper<Text, Text, Text, NullWritable> {
        private StringBuffer sb = new StringBuffer();
        private Text outK = new Text();

        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split(",");

            //每个商品id两两组合
            for (String s : line) {
                for (String s1 : line) {
                    sb.append(s).append("\t").append(s1);

                    outK.set(sb.toString());
                    context.write(outK, NullWritable.get());
                    sb.setLength(0);
                    outK.clear();
                }
            }
        }
    }
}

计算结果:

			20001	20001   
            20001	20002  
            20001	20005   
            20001	20006
            20001	20007
            20001	20001
            20001	20006
            20001	20005
            20001	20002
            20002	20007
            20002	20001
            20002	20005
            20002	20006
            20002	20007
            20002	20002
            20002	20006
            20002	20005
            ...		...

step3: 计算商品的共现次数(共现矩阵)

数据来源:第2步的结果

package com.briup.mr.three;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */
public class GoodsCooccurrenceMatrix {
    //输入数据:
    // 20001	20001
    // 20001	20002
    //20001	20005
    public static class GoodsCooccurrenceMatrixMapper extends Mapper<Text, NullWritable, Text, Text> {
        private Text outK = new Text();
        private Text outV = new Text();

        @Override
        protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException {
            String[] line = key.toString().split("\t");
            outK.set(line[0]);
            outV.set(line[1]);

            context.write(outK, outV);
        }
    }

    public static class GoodsCooccurrenceMatrixReducer extends Reducer<Text, Text, Text, Text> {
        //定义一个map来存储输出的键信息
        private Map<String, Integer> map = new HashMap<String, Integer>();
        private StringBuffer sb = new StringBuffer();
        private Text outV = new Text();

        //输入数据:
        //20001   [20001,20002,20005。。。]
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text val : values) {
                if (map.containsKey(val.toString())){
                    //如果map中包含该键
                    map.put(val.toString(),map.get(val.toString())+1);
                }else {
                    map.put(val.toString(),1);
                }
            }

            //拼接字符串
            for (Map.Entry<String, Integer> en : map.entrySet()) {
                sb.append(en.getKey()).append(":").append(en.getValue()).append(",");
            }
            //去除末尾的“,”
            sb.setLength(sb.length()-1);
            outV.set(sb.toString());

            context.write(key,outV);

            sb.setLength(0);
            map.clear();
            outV.clear();
        }
    }
}

计算结果:

          	20001	20001:3,20002:2,20005:2,20006:2,20007:1
            20002	20001:2,20002:3,20005:2,20006:2,20007:2
            20003	20003:1,20004:1,20006:1
            20004	20003:1,20004:2,20006:1,20007:1
            20005	20001:2,20002:2,20005:2,20006:2,20007:1
            20006	20001:2,20002:2,20003:1,20004:1,20005:2,20006:3,20007:1
            20007	20001:1,20002:2,20004:1,20005:1,20006:1,20007:3

step4:计算用户的购买向量

数据来源:第1步的结果或者最原始数据。

package com.briup.mr.four;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */

//计算用户的购买向量
public class UserBuyGoodsVector {
    /*
        输入数据:10001	20001	1   读取源文件为源数据
     */
    public static class UserBuyGoodsVectorMapper extends Mapper<LongWritable, Text, Text, Text> {
        /*
            商品id为键,用户id为值
         */
        private Text outK = new Text();
        private Text outV = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");

            outK.set(line[1]);
            outV.set(line[0]);

            context.write(outK, outV);
        }
    }

    public static class UserBuyGoodsVectorReducer extends Reducer<Text, Text, Text, Text> {
        /*
            输入为:20001   [10001,10002.。。]
         */
        private Text outV = new Text();
        private Map<String, Integer> map = new HashMap<>();
        private StringBuffer sb = new StringBuffer();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            /*
                map端输出的key为商品id,值为用户id的集合
                结果:
                20001	10001:1,10004:1,10005:1
             */

            for (Text value : values) {
                if (map.containsKey(value.toString())) {
                    map.put(value.toString(), map.get(value.toString()) + 1);
                } else {
                    map.put(value.toString(), 1);
                }
            }

            for (Map.Entry<String, Integer> en : map.entrySet()) {
                sb.append(en.getKey()).append(":").append(en.getValue()).append(",");
            }
            sb.setLength(sb.length()-1);
            outV.set(sb.toString());
            context.write(key,outV);

            //重置数据
            sb.setLength(0);
            map.clear();
            outV.clear();

        }
    }
}

结果数据:

			20001	10001:1,10004:1,10005:1
            20002	10001:1,10003:1,10004:1
            20003	10002:1
            20004	10002:1,10006:1
            20005	10001:1,10004:1
            20006	10001:1,10002:1,10004:1
            20007	10001:1,10003:1,10006:1

step5:商品共现矩阵乘以用户购买向量,形成临时的推荐结果。

原始数据:第3步和第4步的结果数据

思考:文件的来源,来自于两个文件,第一个是第3步的结果(物品的共现矩阵),第二个文件是第4步的结果(用户的购买向量)。所以在一个MR程序中,需要使用两个自定义Mapper分别处理,然后定义一个自定义Reducer来处理这两个Mapper的中间结果。

GoodsBean类:

package com.briup.mr.five;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */
public class GoodsBean implements WritableComparable<GoodsBean> {
    private String g_id;    //商品id
    //flag为1表示数据来自商品共现次数(第3步结果)
    //flag为0表示数据来自用户购买向量(第四步结果)
    private int flag;

    public GoodsBean() {
    }

    public GoodsBean(String g_id, int flag) {
        this.g_id = g_id;
        this.flag = flag;
    }

    public String getG_id() {
        return g_id;
    }

    public void setG_id(String g_id) {
        this.g_id = g_id;
    }

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    @Override
    public int compareTo(GoodsBean o) {
        int n = this.g_id.compareTo(o.g_id);
        if (n != 0) {
            return n;
        } else {
            //将商品共现表的数据放在前面
            return -(this.flag - o.flag);
        }
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(g_id);
        dataOutput.writeInt(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.g_id = dataInput.readUTF();
        this.flag = dataInput.readInt();
    }
}

mapredecu分区类GoodsPartitioner:

package com.briup.mr.five;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/28 20:07
 */
public class GoodsPartitioner extends Partitioner<GoodsBean, Text> {

    @Override
    public int getPartition(GoodsBean goodsBean, Text text, int numPartitions) {
        return Math.abs(Integer.parseInt(goodsBean.getG_id()) * 127) % numPartitions;
    }
}

mapreduce分组类GoodsGroup:

package com.briup.mr.five;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/28 20:14
 */
public class GoodsGroup extends WritableComparator {
    public GoodsGroup() {
        super(GoodsBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //基于商品id分组,id相同的分为一组
        GoodsBean o = (GoodsBean) a;
        GoodsBean o1 = (GoodsBean) b;

        return o.getG_id().compareTo(o1.getG_id());
    }
}

mapreduce类:

package com.briup.mr.five;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.*;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */

/*
    step5:  商品共现矩阵乘以用户购买向量,形成临时的推荐结果。
 */
public class MultiplyGoodsMatrixAndUserVector {

    /*
    输入数据:第3步的结果
        物品共现矩阵: 20001	20005:2,20002:2,20001:3,20007:1,20006:2
     */

    public static class MultiplyGoodsMatrixAndUserVectorFirstMapper extends Mapper<Text, Text, GoodsBean, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            //map输出数据为: GoodsBean(20001,1)  20005:2,20002:2,20001:3,20007:1,20006:2
            context.write(new GoodsBean(key.toString(), 1), value);
        }
    }

    /*
    输入数据:
        用户购买向量: 20001	10001:1,10004:1,10005:1
     */
    public static class MultiplyGoodsMatrixAndUserVectorSecondMapper extends Mapper<Text, Text, GoodsBean, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            //map输出数据为:GoodsBean(20001,0)    10001:1,10004:1,10005:1
            context.write(new GoodsBean(key.toString(), 0), value);
        }
    }

    /*
    期望输出数据: 10001,20001	2
     */
    public static class MultiplyGoodsMatrixAndUserVectorReducer extends Reducer<GoodsBean, Text, Text, DoubleWritable> {
        /*
            进入reduce的数据为:
                GoodsBean(20001,1)GoodsBean(20001,0)    20005:2,20002:2,20001:3,20007:1,20006:2 || 10001:1,10004:1,10005:1
         */
        @Override
        protected void reduce(GoodsBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator<Text> iter = values.iterator();

            //拿到商品项:20005:2,20002:2,20001:3,20007:1,20006:2
            String[] goods = iter.next().toString().split(",");


            while (iter.hasNext()) {
                //拿到用户购买向量:10001:1,10004:1,10005:1
                String[] users = iter.next().toString().split(",");
//                System.out.println(Arrays.toString(users));
                for (String user : users) {
                    String[] uid_nums = user.split(":");

                    for (String good : goods) {
                        String[] gid_nums = good.split(":");
                        //sb作为key输出
                        StringBuffer sb = new StringBuffer();

                        sb.append(uid_nums[0]).append(",").append(gid_nums[0]);

                        context.write(new Text(sb.toString()), new DoubleWritable(Double.parseDouble(uid_nums[1]) * Double.parseDouble(gid_nums[1])));

                        sb.setLength(0);
                    }
                }
            }

        }
    }

}

结果数据:

			10001,20001	2
			10001,20001	2
			10001,20001	3
			10001,20001	1
			10001,20001	2
			10001,20002	3
			10001,20002	2
			10001,20002	2
			10001,20002	2
			10001,20002	2
			...		...

step6:对第5步计算的推荐的零散结果进行求和。

原始数据:第5步的计算结果

package com.briup.mr.six;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/29 0:04
 */
//第六步:对第5步计算的推荐的零散结果进行求和。
public class MakeSumForMultiplication {

    public static class MakeSumForMultiplicationMapper extends Mapper<Text, DoubleWritable, Text, DoubleWritable> {
        //MAP读入数据:10006,20007	3
        @Override
        protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static class MakeSumForMultiplicationReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double sum = 0;

            for (DoubleWritable value : values) {
                sum += value.get();
            }
            context.write(key, new DoubleWritable(sum));
        }
    }
}

结果数据:

			10001,20001	10
			10001,20002	11
			10001,20003	1
			10001,20004	2
			10001,20005	9
			10001,20006	10
			...		...

step7:数据去重,在推荐结果中去掉用户已购买的商品信息。

数据来源:
1.FirstMapper处理用户的购买列表数据。
2.SecondMapper处理第6的推荐结果数据。

javaBean类UserAndGoods:

package com.briup.mr.seven;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/29 0:24
 */
public class UserAndGoods implements WritableComparable<UserAndGoods> {
    private String userId;
    private String goodsId;
    //flag 为1 表示数据来源于源数据
    //flag 为0 表示数据来源于第六步结果
    private int flag;

    public UserAndGoods() {
    }

    public UserAndGoods(String userId, String goodsId, int flag) {
        this.userId = userId;
        this.goodsId = goodsId;
        this.flag = flag;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getGoodsId() {
        return goodsId;
    }

    public void setGoodsId(String goodsId) {
        this.goodsId = goodsId;
    }

    @Override
    public int compareTo(UserAndGoods o) {

        int i = this.getUserId().compareTo(o.getUserId());
        //当用户i不相同时
        if (i != 0) {
            return i;
        } else return this.getGoodsId().compareTo(o.getGoodsId());
    }

    @Override
    public String toString() {
        return "UserAndGoods{" +
                "userId='" + userId + '\'' +
                ", goodsId='" + goodsId + '\'' +
                ", flag=" + flag +
                '}';
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(userId);
        dataOutput.writeUTF(goodsId);
        dataOutput.writeInt(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.userId = dataInput.readUTF();
        this.goodsId = dataInput.readUTF();
        this.flag = dataInput.readInt();
    }
}

mapreduce类:

package com.briup.mr.seven;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/29 0:19
 */
//数据去重,在推荐结果中去掉用户已购买的商品信息。
public class DuplicateDataForResult {

    //FirstMapper处理用户的购买列表数据
    public static class DuplicateDataForResultFirstMapper extends Mapper<LongWritable, Text, UserAndGoods, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //flag 为1 表示数据来源于源数据
            //flag 为0 表示数据来源于第六步结果
            String[] line = value.toString().split("\t");

            context.write(new UserAndGoods(line[0], line[1], 1), value);
        }
    }

    //SecondMapper处理第6的推荐结果数据
    public static class DuplicateDataForResultSecondMapper extends Mapper<Text, DoubleWritable, UserAndGoods, Text> {
        @Override
        protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException {
            String[] line = key.toString().split(",");

            context.write(new UserAndGoods(line[0], line[1], 0), new Text(key.toString() + "\t" + value.get()));
        }
    }


    /*
        reduce期望输出的数据:10001	20004	2
     */
    public static class DuplicateDataForResultReducer extends Reducer<UserAndGoods, Text, Text, NullWritable> {
        int i = 0;
        @Override
        protected void reduce(UserAndGoods key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator<Text> iter = values.iterator();

            System.out.println((i++) + "--" + key);
            //集合的第一个元素
            Text res = iter.next();
            System.out.println(res.toString());

            //如果集合没有下一个元素,直接写出
            if (!iter.hasNext()) {
                System.out.println("有下一个元素");
                context.write(res, NullWritable.get());
            }

        }
    }
}

计算结果:

			10001	20004	2
			10001	20003	1
			10002	20002	2
			10002	20007	2
			10002	20001	2
			10002	20005	2
			10003	20006	3
			10003	20005	3
			10003	20001	3
			10003	20004	1
			10004	20007	5
			10004	20004	1
			10004	20003	1
			10005	20006	2
			10005	20002	2
			...		...

step8:将推荐结果保存到MySQL数据库中

数据来源:第七步计算结果

package com.briup.mr.eight;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/29 16:51
 */
public class DataInDB {

    public static class DataInDBMapper extends Mapper<Text, NullWritable, RecommendResultBean, NullWritable> {
        @Override
        protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException {
            String[] line = key.toString().split("\t");
            RecommendResultBean outK = new RecommendResultBean();
            outK.setNums(Double.parseDouble(line[1]));

            String[] split = line[0].split(",");
            outK.setUid(Integer.parseInt(split[0]));
            outK.setGid(Integer.parseInt(split[1]));

            context.write(outK, NullWritable.get());
        }
    }

    public static class DataInDBReducer extends Reducer<RecommendResultBean, DoubleWritable, RecommendResultBean, NullWritable> {
        @Override
        protected void reduce(RecommendResultBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}

结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-boi4R1TR-1627699308225)(C:\Users\ASUS\AppData\Roaming\Typora\typora-user-images\image-20210729205026043.png)]

Step9:构建job作业流,提交作业到集群运行

package com.briup.mr;

import com.briup.mr.eight.DataInDB;
import com.briup.mr.eight.RecommendResultBean;
import com.briup.mr.five.GoodsBean;
import com.briup.mr.five.GoodsGroup;
import com.briup.mr.five.GoodsPartitioner;
import com.briup.mr.five.MultiplyGoodsMatrixAndUserVector;
import com.briup.mr.four.UserBuyGoodsVector;
import com.briup.mr.one.UserBuyGoodsList;
import com.briup.mr.seven.DuplicateDataForResult;
import com.briup.mr.seven.UserAndGoods;
import com.briup.mr.six.MakeSumForMultiplication;
import com.briup.mr.three.GoodsCooccurrenceMatrix;
import com.briup.mr.two.GoodsCooccurrenceList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/27 16:36
 */
public class FinalJob extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        //第一步计算结果的job任务
        Configuration conf = getConf();

//        Path input = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\input.txt");
//        Path one_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out11");
//        Path two_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out22");
//        Path three_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out33");
//        Path four_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out44");
//        Path five_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out55");
//        Path six_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out66");
//        Path seven_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out77");


        Path input = new Path("/user/zhudz/goods/input.txt");
        Path one_output = new Path("/user/zhudz/goods/out11");
        Path two_output = new Path("/user/zhudz/goods/out22");
        Path three_output = new Path("/user/zhudz/goods/out33");
        Path four_output = new Path("/user/zhudz/goods/out44");
        Path five_output = new Path("/user/zhudz/goods/out55");
        Path six_output = new Path("/user/zhudz/goods/out66");
        Path seven_output = new Path("/user/zhudz/goods/out77");


        FileSystem fs = FileSystem.get(conf);

        //判断输出路径是否存在,存在就删除

        if (fs.exists(one_output)) {
            fs.delete(one_output, true);
        }
        if (fs.exists(two_output)) {
            fs.delete(two_output, true);
        }
        if (fs.exists(three_output)) {
            fs.delete(three_output, true);
        }
        if (fs.exists(four_output)) {
            fs.delete(four_output, true);
        }
        if (fs.exists(five_output)) {
            fs.delete(five_output, true);
        }
        if (fs.exists(six_output)) {
            fs.delete(six_output, true);
        }
        if (fs.exists(seven_output)) {
            fs.delete(seven_output, true);
        }


        //Step1:计算用户购买商品的列表
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName("Step1:计算用户购买商品的列表");

        job.setMapperClass(UserBuyGoodsList.UserBuyGoodsListMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(UserBuyGoodsList.UserBuyGoodsListReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        TextInputFormat.addInputPath(job, input);
        SequenceFileOutputFormat.setOutputPath(job, one_output);


        //第二步计算结果的job任务
        Job job1 = Job.getInstance(conf);
        job1.setJarByClass(this.getClass());
        job1.setJobName("Step2:计算商品的共现关系");

        job1.setMapperClass(GoodsCooccurrenceList.GoodsCooccurrenceListMapper.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(NullWritable.class);

        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(NullWritable.class);

        job1.setInputFormatClass(SequenceFileInputFormat.class);
        job1.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileInputFormat.addInputPath(job1, one_output);
        SequenceFileOutputFormat.setOutputPath(job1, two_output);

        //第三步计算结果的job任务
        Job job2 = Job.getInstance(conf);
        job2.setJarByClass(this.getClass());
        job2.setJobName("Step3:计算商品的共现次数(共现矩阵)");

        job2.setMapperClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixMapper.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);


        job2.setReducerClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        job2.setInputFormatClass(SequenceFileInputFormat.class);
        job2.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileInputFormat.addInputPath(job2, two_output);
        SequenceFileOutputFormat.setOutputPath(job2, three_output);

        //step4计算用户购买向量
        Job job3 = Job.getInstance(conf);
        job3.setJarByClass(this.getClass());
        job3.setJobName("Step3:计算用户的购买向量");

        job3.setMapperClass(UserBuyGoodsVector.UserBuyGoodsVectorMapper.class);
        job3.setMapOutputKeyClass(Text.class);
        job3.setMapOutputValueClass(Text.class);


        job3.setReducerClass(UserBuyGoodsVector.UserBuyGoodsVectorReducer.class);
        job3.setOutputKeyClass(Text.class);
        job3.setOutputValueClass(Text.class);

        //读取源文件数据
        job3.setInputFormatClass(TextInputFormat.class);
        job3.setOutputFormatClass(SequenceFileOutputFormat.class);

        TextInputFormat.addInputPath(job3, input);
        SequenceFileOutputFormat.setOutputPath(job3, four_output);

        //第五步计算结果   商品共现矩阵乘以用户购买向量
        Job job4 = Job.getInstance(conf);
        job4.setJarByClass(this.getClass());
        job4.setJobName("Step4:商品共现矩阵乘以用户购买向量,形成临时的推荐结果");

        // 构建多个不同的map任务
        //商品共现次数mapper
        MultipleInputs.addInputPath(job4,
                three_output,
                SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorFirstMapper.class);

        //用户购买向量mapper
        MultipleInputs.addInputPath(job4,
                four_output,
                SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorSecondMapper.class);

        job4.setMapOutputKeyClass(GoodsBean.class);
        job4.setMapOutputValueClass(Text.class);

        job4.setPartitionerClass(GoodsPartitioner.class);
        job4.setGroupingComparatorClass(GoodsGroup.class);

        job4.setReducerClass(MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorReducer.class);
        job4.setOutputKeyClass(Text.class);
        job4.setOutputValueClass(DoubleWritable.class);

        job4.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileOutputFormat.setOutputPath(job4, five_output);

        //第六步:对第5步计算的推荐的零散结果进行求和
        Job job5 = Job.getInstance(conf);
        job5.setJarByClass(this.getClass());
        job5.setJobName("Step6:对第5步计算的推荐的零散结果进行求和");

        job5.setMapperClass(MakeSumForMultiplication.MakeSumForMultiplicationMapper.class);
        job5.setMapOutputKeyClass(Text.class);
        job5.setMapOutputValueClass(DoubleWritable.class);

        job5.setReducerClass(MakeSumForMultiplication.MakeSumForMultiplicationReducer.class);
        job5.setOutputKeyClass(Text.class);
        job5.setOutputValueClass(DoubleWritable.class);

        job5.setInputFormatClass(SequenceFileInputFormat.class);
        job5.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileInputFormat.addInputPath(job5, five_output);
        SequenceFileOutputFormat.setOutputPath(job5, six_output);


        //第七步
        Job job6 = Job.getInstance(conf);
        job6.setJarByClass(this.getClass());
        job6.setJobName("Step7:数据去重,在推荐结果中去掉用户已购买的商品信息");

        // 构建多个不同的map任务
        //FirstMapper处理用户的购买列表数据。
        MultipleInputs.addInputPath(job6,
                input,
                TextInputFormat.class,
                DuplicateDataForResult.DuplicateDataForResultFirstMapper.class);

        //SecondMapper处理第6的推荐结果数据。
        MultipleInputs.addInputPath(job6,
                six_output,
                SequenceFileInputFormat.class,
                DuplicateDataForResult.DuplicateDataForResultSecondMapper.class);

        job6.setMapOutputKeyClass(UserAndGoods.class);
        job6.setMapOutputValueClass(Text.class);

        //设置分组
        // job6.setGroupingComparatorClass(DuplicateDataGroup.class);

        job6.setReducerClass(DuplicateDataForResult.DuplicateDataForResultReducer.class);
        job6.setOutputKeyClass(Text.class);
        job6.setOutputValueClass(NullWritable.class);

        job6.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileOutputFormat.setOutputPath(job6, seven_output);


        //第8步:将推荐结果保存到MySQL数据库中     数据来源于第七步
        Job job7 = Job.getInstance(conf);
        job7.setJarByClass(this.getClass());
        job7.setJobName("Step8:将推荐结果保存到MySQL数据库中");

        DBConfiguration.configureDB(job7.getConfiguration(), "com.mysql.jdbc.Driver",
                "jdbc:mysql://192.168.10.131/briup", "root", "root");
        DBOutputFormat.setOutput(job7, "recommend", "uid", "gid", "nums");

        job7.setMapperClass(DataInDB.DataInDBMapper.class);
        job7.setMapOutputKeyClass(RecommendResultBean.class);
        job7.setMapOutputValueClass(NullWritable.class);

        job7.setReducerClass(DataInDB.DataInDBReducer.class);
        job7.setOutputKeyClass(RecommendResultBean.class);
        job7.setOutputValueClass(NullWritable.class);

        job7.setInputFormatClass(SequenceFileInputFormat.class);
        SequenceFileInputFormat.addInputPath(job7, seven_output);
        job7.setOutputFormatClass(DBOutputFormat.class);


        //final: 构建job作业流
        ControlledJob contro_job = new ControlledJob(conf);
        contro_job.setJob(job);

        ControlledJob contro_job1 = new ControlledJob(conf);
        contro_job1.setJob(job1);
        contro_job1.addDependingJob(contro_job);

        ControlledJob contro_job2 = new ControlledJob(conf);
        contro_job2.setJob(job2);
        contro_job2.addDependingJob(contro_job1);

        ControlledJob contro_job3 = new ControlledJob(conf);
        contro_job3.setJob(job3);

        ControlledJob contro_job4 = new ControlledJob(conf);
        contro_job4.setJob(job4);
        contro_job4.addDependingJob(contro_job2);
        contro_job4.addDependingJob(contro_job3);

        ControlledJob contro_job5 = new ControlledJob(conf);
        contro_job5.setJob(job5);
        contro_job5.addDependingJob(contro_job4);

        ControlledJob contro_job6 = new ControlledJob(conf);
        contro_job6.setJob(job6);
        contro_job6.addDependingJob(contro_job5);

        ControlledJob contro_job7 = new ControlledJob(conf);
        contro_job7.setJob(job7);
        contro_job7.addDependingJob(contro_job6);

        JobControl jobs = new JobControl("goods_recommends");
        jobs.addJob(contro_job);
        jobs.addJob(contro_job1);
        jobs.addJob(contro_job2);
        jobs.addJob(contro_job3);
        jobs.addJob(contro_job4);
        jobs.addJob(contro_job5);
        jobs.addJob(contro_job6);
        jobs.addJob(contro_job6);
        jobs.addJob(contro_job7);

        Thread t = new Thread(jobs);
        t.start();

        //打印日志
        while (true) {
            for (ControlledJob c : jobs.getRunningJobList()) {
                c.getJob().monitorAndPrintJob();
            }
            if (jobs.allFinished()) break;
        }

        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.exit(new ToolRunner().run(new FinalJob(), args));
//        new ToolRunner().run(new FinalJob(), args);
    }

}

step9:在集群上编写shell任务脚本,并设置定时执行任务

1、编写将mysql中的数据迁移到hdfs文件系统中的脚本

sudo vi mysqlToHDFS.sh
#完成后添加执行权限
sudo chmod +x mysqlToHDFS.sh
#!bin/bash
sqoop import --connect jdbc:mysql://localhost:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1

2、编写提交改项目的任务到集群的脚本

sudo vi recomendMR.sh
#完成后添加执行权限
sudo chmod +x recomendMR.sh
#!bin/bash
yarn jar /home/hdfs/GoodsRecommend-1.0-SNAPSHOT.jar com.briup.mr.FinalJob

3、设置定时任务

crontab -e

#每周一上午7点执行将mysql中的数据迁移到hdfs文件系统中的任务
0 7 * * 2 sh ~/bin/mysqlToHDFS.sh

#每周一上午7:30执行提交改项目的任务到集群的任务
30 7 * * 2 sh ~/bin/recomendMR.sh

项目源码:https://gitee.com/zhu-dezhong/recomendGoods



这篇关于Hadoop——基于物品的协同过滤算法实现商品推荐的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程