Implementing Recommendation Functionality
Course Overview
- Understanding Recommendation Systems
- Implementing Friend Recommendations
- Circle Recommendation Feature Overview
- Circle Recommendation Workflow
- Implementing Circle Recommendation
- Implementing Short Video Recommendation
1. Understanding Recommendation Systems
1.1 What is a Recommendation System?
To solve the problems of information overload and users without explicit needs, personalized recommendation systems were created to find items that users might be interested in.
In fact, representative solutions to information overload include classification directories and search engines, such as Hao123, e-commerce homepage categories, and search engines like Baidu and 360.
However, classification directories and search engines can only address the need for users to actively search for information, meaning users know what they want. They cannot solve the problem of users who have no clear needs and are indecisive.
A classic quote is: "What do you want to eat? Anything!" Facing this indecisive yet hard-to-please user (like a girlfriend or a customer), the only way is to model the user's interests by analyzing their historical behavior, thereby actively recommending information that meets their interests and needs. For example, asking a girlfriend's best friend about her usual preferences at certain times.
1.2 E-commerce is a Pioneer of Recommendation Systems
- E-commerce websites are one of the most important application areas for personalized recommendation systems. Amazon is an active user and promoter of personalized recommendation systems, integrating them deeply into various product categories, which has brought at least 30% of its sales.
- Recommendation systems are ubiquitous, not just in e-commerce. For example, QQ and WeChat's friend recommendations; Sina Weibo's "People you may be interested in"; Youku and Tudou's movie recommendations; Douban's book recommendations; Dianping's restaurant recommendations; and Maimai's colleague recommendations.
- The original inspiration for recommendation engines: ACM Portal
- Amazon first proposed the item-based collaborative filtering recommendation algorithm: ACM Portal
JD.com's recommendation system:
1.3 Recommendation System Business Process
A recommendation system is widely found on various websites, acting as an application to provide personalized recommendations to users. It requires some user historical data and generally consists of three parts: base data, recommendation algorithm system, and front-end display.
- Base data includes many dimensions, such as user visits, browsing, orders, collections, historical order information, and review information.
- The recommendation algorithm system is mainly a recommendation model composed of multiple algorithms based on different recommendation requirements.
- The front-end display responds to the client system and returns relevant recommendation information for display.
1.4 Collaborative Filtering Recommendation Algorithm
So far, collaborative filtering technology is the most successfully applied technology in personalized recommendation systems. Many large domestic and international websites use this technology to provide users with more intelligent (personalized, tailored to each user) content recommendations.
Core Idea:
Collaborative filtering generally involves discovering a small group of users from a massive pool whose tastes are similar to yours. In collaborative filtering, these users become neighbors. Then, based on other items they like, an ordered list is compiled and recommended to you.
1.4.1 User-based Collaborative Filtering (UserCF)
For User A, based on user historical preferences, only one neighbor (User C) is calculated. Then, item D, which User C likes, is recommended to User A.
The user-based collaborative filtering algorithm first calculates the similarity between users (similar interests, birds of a feather flock together). Then, items purchased by user A, who is highly similar, are recommended to user B. In professional terms, the algorithm uses the nearest-neighbor algorithm to find a set of neighbors for a user. These neighbors have similar preferences to the user, and the algorithm makes predictions for the user based on the neighbors' preferences.
1.4.2 Item-based Collaborative Filtering (ItemCF)
- The principle of ItemCF is similar to UserCF, but the neighbor calculation uses the items themselves instead of the user perspective. That is, based on user preferences for items, similar items are found. Then, based on the user's historical preferences, similar items are recommended to them.
- From a computational perspective, the preferences of all users for a specific item are treated as a vector to calculate the similarity between items. After obtaining similar items for an item, the user's historical preferences are used to predict the user's preference for items they have not yet rated, resulting in a sorted list of items as a recommendation.
- Explanation: For item A, based on the historical preferences of all users, users who like item A also like item C. This indicates that item A and item C are similar. Since user C likes item A, it can be inferred that user C might also like item C.
1.5 Alternating Least Squares (ALS) Algorithm
ALS stands for Alternating Least Squares. In the context of machine learning, ALS specifically refers to a collaborative recommendation algorithm solved using alternating least squares. It infers each user's preferences by observing all users' ratings for products and recommends suitable products to users. In terms of collaborative filtering classification, the ALS algorithm belongs to User-Item CF, also known as hybrid CF. It considers both the User and Item aspects.
The relationship between users and items can be abstracted into the following triple: <User, Item, Rating>. Here, Rating is the user's rating for the item, indicating the user's preference for that item. Example:
196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
298 474 4 884182806
115 265 2 881171488
253 465 5 891628467
305 451 3 886324817
6 86 3 883603013
62 257 2 879372434
286 1014 5 879781125
200 222 5 876042340
210 40 3 891035994
...
2. Friend Recommendation
For friend recommendations, we need to find the similarity between each user. The specific rule are as follows:
| Field | Weight Score | |||
|---|---|---|---|---|
| Age Difference | 0-2 years: 30 points | 3-5 years: 20 points | 5-10 years: 10 points | Over 10 years: 0 points |
| Gender | Opposite gender: 30 points | Same gender: 0 points | ||
| Location | Same city: 20 points | Different city: 0 points | ||
| Education | Same level: 20 points | Different level: 0 points |
2.1 Flow
2.2 Deploying the Friend Recommendation Service
# Pull the image
docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-recommend-user:1.0.1
# Create the container
docker create --name tanhua-spark-recommend-user \
--env MONGODB_HOST=192.168.31.81 \
--env MONGODB_PORT=27017 \
--env MONGODB_USERNAME=tanhua \
--env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
--env MONGODB_DATABASE=tanhua \
--env MONGODB_COLLECTION=recommend_user \
--env JDBC_URL="jdbc:mysql://192.168.31.81:3306/mytanhua?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true&useSSL=false" \
--env JDBC_DRIVER=com.mysql.jdbc.Driver \
--env JDBC_USER=root \
--env JDBC_PASSWORD=root \
--env JDBC_TABLE=tb_user_info \
--env SCHEDULE_PERIOD=30 \
registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-recommend-user:1.0.1
# Parameter explanation
# MONGODB_HOST: Address of the MongoDB service
# MONGODB_PORT: Port of the MongoDB service
# MONGODB_USERNAME: Authentication username for MongoDB
# MONGODB_PASSWORD: Authentication password for MongoDB
# MONGODB_DATABASE: Database to connect to in MongoDB
# MONGODB_COLLECTION: Operation table
# JDBC_URL: MySQL database connection URL
# JDBC_DRIVER: JDBC driver
# JDBC_USER: Database connection username
# JDBC_PASSWORD: Database connection password
# JDBC_TABLE: Database table name
# SCHEDULE_PERIOD: Interval for next execution in minutes, default is 10 minutes
# Start the service
docker start tanhua-spark-recommend-user
# View logs
docker logs -f tanhua-spark-recommend-user
After execution, you can see that the data in the recommend_user table in MongoDB has been regenerated.
3. Circle Recommendation
3.1 Feature Description
In the Circle feature, regarding user-published posts, the system can calculate based on user actions (publishing, browsing, liking) and then provide different recommendations for each user.
3.2 Process Description
Process explanation:
- When users interact with Circle posts (e.g., publish, browse, like, love), a message is sent to RocketMQ.
- The recommendation system receives the message, processes the data, and writes the results to MongoDB.
- The Spark system pulls the data and performs the recommendation calculation.
- The calculated results are written to Redis, providing personalized recommendations for each user.
3.3 Scoring Rules for Posts
- Browse: +1
- Like: +5
- Love: +8
- Comment: +10
- Publishing a post:
- Text length: 1 point for less than 50 characters, 2 points for 50 to 100 characters, 3 points for more than 100 characters.
- Number of images: 1 point per image.
Core recommendation logic:
- Recommendation model: User | Post | Score
- Where the score is the total score of the user's operations on the post.
- Why score your own published post? Because publishing a post indicates interest in it yourself, allowing recommendations among similar users.
3.4 Sending Messages
3.4.1 QuanziMQService
Add the following dependencies to my-tanhua-server:
<!-- RocketMQ related -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
Configuration file (application.properties):
# RocketMQ related configuration
rocketmq.name-server=192.168.31.81:9876
rocketmq.producer.group=tanhua
package com.tanhua.server.service;
import com.alibaba.dubbo.config.annotation.Reference;
import com.tanhua.common.pojo.User;
import com.tanhua.common.utils.UserThreadLocal;
import com.tanhua.dubbo.server.api.QuanZiApi;
import com.tanhua.dubbo.server.pojo.Publish;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@Slf4j
public class QuanziMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Reference(version = "1.0.0")
private QuanZiApi quanZiApi;
public Boolean publishMsg(String publishId) {
return this.sendMsg(publishId, 1);
}
public Boolean queryPublishMsg(String publishId) {
return this.sendMsg(publishId, 2);
}
public Boolean likePublishMsg(String publishId) {
return this.sendMsg(publishId, 3);
}
public Boolean disLikePublishMsg(String publishId) {
return this.sendMsg(publishId, 6);
}
public Boolean lovePublishMsg(String publishId) {
return this.sendMsg(publishId, 4);
}
public Boolean disLovePublishMsg(String publishId) {
return this.sendMsg(publishId, 7);
}
public Boolean commentPublishMsg(String publishId) {
return this.sendMsg(publishId, 5);
}
private Boolean sendMsg(String publishId, Integer type) {
try {
User user = UserThreadLocal.get();
Publish publish = this.quanZiApi.queryPublishById(publishId);
Map<String, Object> msg = new HashMap<>();
msg.put("userId", user.getId());
msg.put("date", System.currentTimeMillis());
msg.put("publishId", publishId);
msg.put("pid", publish.getPid());
msg.put("type", type);
this.rocketMQTemplate.convertAndSend("tanhua-quanzi", msg);
} catch (Exception e) {
log.error("Failed to send message! publishId = " + publishId + ", type = " + type, e);
return false;
}
return true;
}
}
3.4.2 Modifying QuanZiService
In QuanZiService, call the message sending method.
package com.tanhua.server.service;
// ... imports ...
@Service
public class QuanZiService {
@Reference(version = "1.0.0")
private QuanZiApi quanZiApi;
@Reference(version = "1.0.0")
private VisitorsApi visitorsApi;
@Autowired
private UserService userService;
@Autowired
private UserInfoService userInfoService;
@Autowired
private PicUploadService picUploadService;
@Autowired
private QuanziMQService quanziMQService;
public PageResult queryPublishList(Integer page, Integer pageSize) {
// Truncated for brevity, but includes calls to quanziMQService for like/dislike/love/comment/view operations
}
public String savePublish(String textContent, String location, String latitude, String longitude, MultipartFile[] multipartFile) {
// ... publishing logic ...
if (StrUtil.isNotEmpty(publishId)) {
this.quanziMQService.publishMsg(publishId);
}
return publishId;
}
public Long likeComment(String publishId) {
// ... like logic ...
if (result) {
this.quanziMQService.likePublishMsg(publishId);
// ...
}
return null;
}
public Long disLikeComment(String publishId) {
// ... dislike logic ...
if (result) {
this.quanziMQService.disLikePublishMsg(publishId);
// ...
}
return null;
}
public Long loveComment(String publishId) {
// ... love logic ...
if (result) {
this.quanziMQService.lovePublishMsg(publishId);
// ...
}
return null;
}
public Long disLoveComment(String publishId) {
// ... dislove logic ...
if (result) {
this.quanziMQService.disLovePublishMsg(publishId);
// ...
}
return null;
}
public QuanZiVo queryById(String publishId) {
Publish publish = this.quanZiApi.queryPublishById(publishId);
if (publish == null) {
return null;
}
this.quanziMQService.queryPublishMsg(publishId);
return this.fillQuanZiVo(Arrays.asList(publish)).get(0);
}
public Boolean saveComments(String publishId, String content) {
// ... comment logic ...
if (result) {
this.quanziMQService.commentPublishMsg(publishId);
}
return result;
}
// ... other methods ...
}
3.5 Receiving Messages
The message receiving task needs to be implemented in a new project, my-tanhua-recommend.
3.5.1 Creating the my-tanhua-recommend Project
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>my-tanhua</artifactId>
<groupId>cn.itcast.tanhua</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>my-tanhua-recommend</artifactId>
<dependencies>
<dependency>
<groupId>cn.itcast.tanhua</groupId>
<artifactId>my-tanhua-dubbo-interface</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
</dependencies>
</project>
3.5.2 Configurasion File
application.properties
spring.application.name = itcast-rocketmq
server.port = 18082
rocketmq.name-server=192.168.31.81:9876
rocketmq.producer.group=tanhua
spring.data.mongodb.username=tanhua
spring.data.mongodb.password=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV
spring.data.mongodb.authentication-database=admin
spring.data.mongodb.database=tanhua
spring.data.mongodb.port=27017
spring.data.mongodb.host=192.168.31.81
3.5.3 Startup Class
package com.tanhua.recommend;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RecommendApplication {
public static void main(String[] args) {
SpringApplication.run(RecommendApplication.class, args);
}
}
3.5.4 RecommendQuanZi Entity
The entity structure to be stored in MongoDB.
package com.tanhua.recommend.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "recommend_quanzi")
public class RecommendQuanZi {
private ObjectId id;
private Long userId;
private Long publishId;
private Double score;
private Long date;
}
3.5.5 QuanZiMsgConsumer
package com.tanhua.recommend.msg;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.tanhua.dubbo.server.pojo.Publish;
import com.tanhua.recommend.pojo.RecommendQuanZi;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "tanhua-quanzi", consumerGroup = "tanhua-quanzi-consumer")
@Slf4j
public class QuanZiMsgConsumer implements RocketMQListener<String> {
@Autowired
private MongoTemplate mongoTemplate;
@Override
public void onMessage(String msg) {
try {
JSONObject jsonObject = JSONUtil.parseObj(msg);
Long userId = jsonObject.getLong("userId");
Long date = jsonObject.getLong("date");
String publishId = jsonObject.getStr("publishId");
Long pid = jsonObject.getLong("pid");
Integer type = jsonObject.getInt("type");
RecommendQuanZi recommendQuanZi = new RecommendQuanZi();
recommendQuanZi.setUserId(userId);
recommendQuanZi.setId(ObjectId.get());
recommendQuanZi.setDate(date);
recommendQuanZi.setPublishId(pid);
// 1-publish, 2-view, 3-like, 4-love, 5-comment, 6-unlike, 7-unlove
switch (type) {
case 1: {
Publish publish = this.mongoTemplate.findById(new ObjectId(publishId), Publish.class);
if (ObjectUtil.isNotEmpty(publish)) {
double score = 0d;
score += CollUtil.size(publish.getMedias());
int length = StrUtil.length(publish.getText());
if (length >= 0 && length < 50) {
score += 1;
} else if (length < 100) {
score += 2;
} else {
score += 3;
}
recommendQuanZi.setScore(score);
}
break;
}
case 2: {
recommendQuanZi.setScore(1d);
break;
}
case 3: {
recommendQuanZi.setScore(5d);
break;
}
case 4: {
recommendQuanZi.setScore(8d);
break;
}
case 5: {
recommendQuanZi.setScore(10d);
break;
}
case 6: {
recommendQuanZi.setScore(-5d);
break;
}
case 7: {
recommendQuanZi.setScore(-8d);
break;
}
default: {
recommendQuanZi.setScore(0d);
break;
}
}
this.mongoTemplate.save(recommendQuanZi);
} catch (Exception e) {
log.error("Error processing message! msg = " + msg, e);
}
}
}
3.6 Testing
Test method: Use the APP to perform operations, and you can see data being written to MongoDB.
4. Deploying the Recommendation System
In the recommendation system, we will calculate the data previously written to the recommendation table using Spark. After the Spark calculation is complete, the results will be written to Redis for querying in the business system.
The recommendation service will be deployed using Docker:
# Pull the image
docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-quanzi:1.0
# Create the container
docker create --name tanhua-spark-quanzi \
--env MONGODB_HOST=192.168.31.81 \
--env MONGODB_PORT=27017 \
--env MONGODB_USERNAME=tanhua \
--env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
--env MONGODB_DATABASE=tanhua \
--env MONGODB_COLLECTION=recommend_quanzi \
--env SCHEDULE_PERIOD=10 \
--env REDIS_NODES="192.168.31.81:6379,192.168.31.81:6380,192.168.31.81:6381" \
registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-quanzi:1.0
# Parameter explanation
# MONGODB_HOST: Address of the MongoDB service
# MONGODB_PORT: Port of the MongoDB service
# MONGODB_USERNAME: Authentication username for MongoDB
# MONGODB_PASSWORD: Authentication password for MongoDB
# MONGODB_DATABASE: Database to connect to in MongoDB
# MONGODB_COLLECTION: Operation table
# SCHEDULE_PERIOD: Interval for next execution in minutes, default is 10 minutes
# REDIS_NODES: Redis cluster address, can also use a single node
# Start the service, which will execute immediately and again after SCHEDULE_PERIOD
# docker start tanhua-spark-quanzi
# View logs
docker logs -f tanhua-spark-quanzi
# After execution, data will be written to Redis
Check if data already exists in Redis:
5. Short Video Recommendation
The implementation logic for short video recommendations is very similar to that of post recommendations.
5.1 Scoring Rules for Videos
- Publish: +2
- Like: +5
- Comment: +10
5.2 Sending Messages
5.2.1 VideoMQService
package com.tanhua.server.service;
import com.alibaba.dubbo.config.annotation.Reference;
import com.tanhua.common.pojo.User;
import com.tanhua.common.utils.UserThreadLocal;
import com.tanhua.dubbo.server.api.VideoApi;
import com.tanhua.dubbo.server.pojo.Video;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
@Slf4j
public class VideoMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Reference(version = "1.0.0")
private VideoApi videoApi;
public Boolean videoMsg(String videoId) {
return this.sendMsg(videoId, 1);
}
public Boolean likeVideoMsg(String videoId) {
return this.sendMsg(videoId, 2);
}
public Boolean disLikeVideoMsg(String videoId) {
return this.sendMsg(videoId, 3);
}
public Boolean commentVideoMsg(String videoId) {
return this.sendMsg(videoId, 4);
}
private Boolean sendMsg(String videoId, Integer type) {
try {
User user = UserThreadLocal.get();
Video video = this.videoApi.queryVideoById(videoId);
Map<String, Object> msg = new HashMap<>();
msg.put("userId", user.getId());
msg.put("date", System.currentTimeMillis());
msg.put("videoId", videoId);
msg.put("vid", video.getVid());
msg.put("type", type);
this.rocketMQTemplate.convertAndSend("tanhua-video", msg);
} catch (Exception e) {
log.error("Failed to send message! videoId = " + videoId + ", type = " + type, e);
return false;
}
return true;
}
}
5.2.2 VideoService (Modified)
package com.tanhua.server.service;
// ... imports ...
@Service
@Slf4j
public class VideoService {
// ... other dependencies ...
@Autowired
private VideoMQService videoMQService;
public Boolean saveVideo(MultipartFile picFile, MultipartFile videoFile) {
// ... upload logic ...
if (StrUtil.isNotEmpty(videoId)) {
this.videoMQService.videoMsg(videoId);
}
return StrUtil.isNotEmpty(videoId);
}
public Long likeComment(String videoId) {
// ... like logic ...
if (result) {
this.videoMQService.likeVideoMsg(videoId);
// ...
}
return null;
}
public Long disLikeComment(String videoId) {
// ... dislike logic ...
if (result) {
this.videoMQService.disLikeVideoMsg(videoId);
// ...
}
return null;
}
public Boolean saveComment(String videoId, String content) {
Boolean result = this.quanZiService.saveComments(videoId, content);
if (result) {
this.videoMQService.commentVideoMsg(videoId);
}
return result;
}
// ... other methods ...
}
5.3 Receiving Messages
5.3.1 RecommendVideo Entity
package com.tanhua.recommend.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.core.mapping.Document;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "recommend_video")
public class RecommendVideo {
private ObjectId id;
private Long userId;
private Long videoId;
private Double score;
private Long date;
}
5.3.2 VideoMsgConsumer
package com.tanhua.recommend.msg;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.tanhua.recommend.pojo.RecommendVideo;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(topic = "tanhua-video", consumerGroup = "tanhua-video-consumer")
@Slf4j
public class VideoMsgConsumer implements RocketMQListener<String> {
@Autowired
private MongoTemplate mongoTemplate;
@Override
public void onMessage(String msg) {
try {
JSONObject jsonObject = JSONUtil.parseObj(msg);
Long userId = jsonObject.getLong("userId");
Long vid = jsonObject.getLong("vid");
Integer type = jsonObject.getInt("type");
// 1-publish, 2-like, 3-unlike, 4-comment
RecommendVideo recommendVideo = new RecommendVideo();
recommendVideo.setUserId(userId);
recommendVideo.setId(ObjectId.get());
recommendVideo.setDate(System.currentTimeMillis());
recommendVideo.setVideoId(vid);
switch (type) {
case 1: {
recommendVideo.setScore(2d);
break;
}
case 2: {
recommendVideo.setScore(5d);
break;
}
case 3: {
recommendVideo.setScore(-5d);
break;
}
case 4: {
recommendVideo.setScore(10d);
break;
}
default: {
recommendVideo.setScore(0d);
break;
}
}
this.mongoTemplate.save(recommendVideo);
} catch (Exception e) {
log.error("Failed to process short video message~" + msg, e);
}
}
}
5.3.3 Testing
As shown, User 1 has operations like like, unlike, and comment on the video.
5.4 Deploying the Recommendation Service
# Pull the image
docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-video:1.0
# Create the container
docker create --name tanhua-spark-video \
--env MONGODB_HOST=192.168.31.81 \
--env MONGODB_PORT=27017 \
--env MONGODB_USERNAME=tanhua \
--env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
--env MONGODB_DATABASE=tanhua \
--env MONGODB_COLLECTION=recommend_video \
--env SCHEDULE_PERIOD=10 \
--env REDIS_NODES="192.168.31.81:6379,192.168.31.81:6380,192.168.31.81:6381" \
registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-video:1.0
# Start the service
docker start tanhua-spark-video
# View logs
docker logs -f tanhua-spark-video
Testing: