Fading Coder

One Final Commit for the Last Sprint

Home > Notes > Content

Implementing Recommendation Functionality

Notes May 8 3

Course Overview

  • Understanding Recommendation Systems
  • Implementing Friend Recommendations
  • Circle Recommendation Feature Overview
  • Circle Recommendation Workflow
  • Implementing Circle Recommendation
  • Implementing Short Video Recommendation

1. Understanding Recommendation Systems

1.1 What is a Recommendation System?

To solve the problems of information overload and users without explicit needs, personalized recommendation systems were created to find items that users might be interested in.

In fact, representative solutions to information overload include classification directories and search engines, such as Hao123, e-commerce homepage categories, and search engines like Baidu and 360.

However, classification directories and search engines can only address the need for users to actively search for information, meaning users know what they want. They cannot solve the problem of users who have no clear needs and are indecisive.

A classic quote is: "What do you want to eat? Anything!" Facing this indecisive yet hard-to-please user (like a girlfriend or a customer), the only way is to model the user's interests by analyzing their historical behavior, thereby actively recommending information that meets their interests and needs. For example, asking a girlfriend's best friend about her usual preferences at certain times.

1.2 E-commerce is a Pioneer of Recommendation Systems

  • E-commerce websites are one of the most important application areas for personalized recommendation systems. Amazon is an active user and promoter of personalized recommendation systems, integrating them deeply into various product categories, which has brought at least 30% of its sales.
  • Recommendation systems are ubiquitous, not just in e-commerce. For example, QQ and WeChat's friend recommendations; Sina Weibo's "People you may be interested in"; Youku and Tudou's movie recommendations; Douban's book recommendations; Dianping's restaurant recommendations; and Maimai's colleague recommendations.
  • The original inspiration for recommendation engines: ACM Portal
  • Amazon first proposed the item-based collaborative filtering recommendation algorithm: ACM Portal

JD.com's recommendation system:

1.3 Recommendation System Business Process

A recommendation system is widely found on various websites, acting as an application to provide personalized recommendations to users. It requires some user historical data and generally consists of three parts: base data, recommendation algorithm system, and front-end display.

  • Base data includes many dimensions, such as user visits, browsing, orders, collections, historical order information, and review information.
  • The recommendation algorithm system is mainly a recommendation model composed of multiple algorithms based on different recommendation requirements.
  • The front-end display responds to the client system and returns relevant recommendation information for display.

1.4 Collaborative Filtering Recommendation Algorithm

So far, collaborative filtering technology is the most successfully applied technology in personalized recommendation systems. Many large domestic and international websites use this technology to provide users with more intelligent (personalized, tailored to each user) content recommendations.

Core Idea:

Collaborative filtering generally involves discovering a small group of users from a massive pool whose tastes are similar to yours. In collaborative filtering, these users become neighbors. Then, based on other items they like, an ordered list is compiled and recommended to you.

1.4.1 User-based Collaborative Filtering (UserCF)

For User A, based on user historical preferences, only one neighbor (User C) is calculated. Then, item D, which User C likes, is recommended to User A.

The user-based collaborative filtering algorithm first calculates the similarity between users (similar interests, birds of a feather flock together). Then, items purchased by user A, who is highly similar, are recommended to user B. In professional terms, the algorithm uses the nearest-neighbor algorithm to find a set of neighbors for a user. These neighbors have similar preferences to the user, and the algorithm makes predictions for the user based on the neighbors' preferences.

1.4.2 Item-based Collaborative Filtering (ItemCF)

  • The principle of ItemCF is similar to UserCF, but the neighbor calculation uses the items themselves instead of the user perspective. That is, based on user preferences for items, similar items are found. Then, based on the user's historical preferences, similar items are recommended to them.
  • From a computational perspective, the preferences of all users for a specific item are treated as a vector to calculate the similarity between items. After obtaining similar items for an item, the user's historical preferences are used to predict the user's preference for items they have not yet rated, resulting in a sorted list of items as a recommendation.
  • Explanation: For item A, based on the historical preferences of all users, users who like item A also like item C. This indicates that item A and item C are similar. Since user C likes item A, it can be inferred that user C might also like item C.

1.5 Alternating Least Squares (ALS) Algorithm

ALS stands for Alternating Least Squares. In the context of machine learning, ALS specifically refers to a collaborative recommendation algorithm solved using alternating least squares. It infers each user's preferences by observing all users' ratings for products and recommends suitable products to users. In terms of collaborative filtering classification, the ALS algorithm belongs to User-Item CF, also known as hybrid CF. It considers both the User and Item aspects.

The relationship between users and items can be abstracted into the following triple: <User, Item, Rating>. Here, Rating is the user's rating for the item, indicating the user's preference for that item. Example:

196 242 3 881250949
186 302 3 891717742
22 377 1 878887116
244 51 2 880606923
166 346 1 886397596
298 474 4 884182806
115 265 2 881171488
253 465 5 891628467
305 451 3 886324817
6 86 3 883603013
62 257 2 879372434
286 1014 5 879781125
200 222 5 876042340
210 40 3 891035994
...

2. Friend Recommendation

For friend recommendations, we need to find the similarity between each user. The specific rule are as follows:

Field Weight Score
Age Difference 0-2 years: 30 points 3-5 years: 20 points 5-10 years: 10 points Over 10 years: 0 points
Gender Opposite gender: 30 points Same gender: 0 points
Location Same city: 20 points Different city: 0 points
Education Same level: 20 points Different level: 0 points

2.1 Flow

2.2 Deploying the Friend Recommendation Service

# Pull the image
docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-recommend-user:1.0.1

# Create the container
docker create --name tanhua-spark-recommend-user \
  --env MONGODB_HOST=192.168.31.81 \
  --env MONGODB_PORT=27017 \
  --env MONGODB_USERNAME=tanhua \
  --env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
  --env MONGODB_DATABASE=tanhua \
  --env MONGODB_COLLECTION=recommend_user \
  --env JDBC_URL="jdbc:mysql://192.168.31.81:3306/mytanhua?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true&useSSL=false" \
  --env JDBC_DRIVER=com.mysql.jdbc.Driver \
  --env JDBC_USER=root \
  --env JDBC_PASSWORD=root \
  --env JDBC_TABLE=tb_user_info \
  --env SCHEDULE_PERIOD=30 \
  registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-recommend-user:1.0.1

# Parameter explanation
# MONGODB_HOST: Address of the MongoDB service
# MONGODB_PORT: Port of the MongoDB service
# MONGODB_USERNAME: Authentication username for MongoDB
# MONGODB_PASSWORD: Authentication password for MongoDB
# MONGODB_DATABASE: Database to connect to in MongoDB
# MONGODB_COLLECTION: Operation table
# JDBC_URL: MySQL database connection URL
# JDBC_DRIVER: JDBC driver
# JDBC_USER: Database connection username
# JDBC_PASSWORD: Database connection password
# JDBC_TABLE: Database table name
# SCHEDULE_PERIOD: Interval for next execution in minutes, default is 10 minutes

# Start the service
docker start tanhua-spark-recommend-user
# View logs
docker logs -f tanhua-spark-recommend-user

After execution, you can see that the data in the recommend_user table in MongoDB has been regenerated.

3. Circle Recommendation

3.1 Feature Description

In the Circle feature, regarding user-published posts, the system can calculate based on user actions (publishing, browsing, liking) and then provide different recommendations for each user.

3.2 Process Description

Process explanation:

  • When users interact with Circle posts (e.g., publish, browse, like, love), a message is sent to RocketMQ.
  • The recommendation system receives the message, processes the data, and writes the results to MongoDB.
  • The Spark system pulls the data and performs the recommendation calculation.
  • The calculated results are written to Redis, providing personalized recommendations for each user.

3.3 Scoring Rules for Posts

  • Browse: +1
  • Like: +5
  • Love: +8
  • Comment: +10
  • Publishing a post:
    • Text length: 1 point for less than 50 characters, 2 points for 50 to 100 characters, 3 points for more than 100 characters.
    • Number of images: 1 point per image.

Core recommendation logic:

  • Recommendation model: User | Post | Score
  • Where the score is the total score of the user's operations on the post.
  • Why score your own published post? Because publishing a post indicates interest in it yourself, allowing recommendations among similar users.

3.4 Sending Messages

3.4.1 QuanziMQService

Add the following dependencies to my-tanhua-server:

<!-- RocketMQ related -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
</dependency>

Configuration file (application.properties):

# RocketMQ related configuration
rocketmq.name-server=192.168.31.81:9876
rocketmq.producer.group=tanhua
package com.tanhua.server.service;

import com.alibaba.dubbo.config.annotation.Reference;
import com.tanhua.common.pojo.User;
import com.tanhua.common.utils.UserThreadLocal;
import com.tanhua.dubbo.server.api.QuanZiApi;
import com.tanhua.dubbo.server.pojo.Publish;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
@Slf4j
public class QuanziMQService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Reference(version = "1.0.0")
    private QuanZiApi quanZiApi;

    public Boolean publishMsg(String publishId) {
        return this.sendMsg(publishId, 1);
    }

    public Boolean queryPublishMsg(String publishId) {
        return this.sendMsg(publishId, 2);
    }

    public Boolean likePublishMsg(String publishId) {
        return this.sendMsg(publishId, 3);
    }

    public Boolean disLikePublishMsg(String publishId) {
        return this.sendMsg(publishId, 6);
    }

    public Boolean lovePublishMsg(String publishId) {
        return this.sendMsg(publishId, 4);
    }

    public Boolean disLovePublishMsg(String publishId) {
        return this.sendMsg(publishId, 7);
    }

    public Boolean commentPublishMsg(String publishId) {
        return this.sendMsg(publishId, 5);
    }

    private Boolean sendMsg(String publishId, Integer type) {
        try {
            User user = UserThreadLocal.get();
            Publish publish = this.quanZiApi.queryPublishById(publishId);

            Map<String, Object> msg = new HashMap<>();
            msg.put("userId", user.getId());
            msg.put("date", System.currentTimeMillis());
            msg.put("publishId", publishId);
            msg.put("pid", publish.getPid());
            msg.put("type", type);

            this.rocketMQTemplate.convertAndSend("tanhua-quanzi", msg);
        } catch (Exception e) {
            log.error("Failed to send message! publishId = " + publishId + ", type = " + type, e);
            return false;
        }

        return true;
    }
}

3.4.2 Modifying QuanZiService

In QuanZiService, call the message sending method.

package com.tanhua.server.service;

// ... imports ...

@Service
public class QuanZiService {

    @Reference(version = "1.0.0")
    private QuanZiApi quanZiApi;

    @Reference(version = "1.0.0")
    private VisitorsApi visitorsApi;

    @Autowired
    private UserService userService;

    @Autowired
    private UserInfoService userInfoService;

    @Autowired
    private PicUploadService picUploadService;

    @Autowired
    private QuanziMQService quanziMQService;

    public PageResult queryPublishList(Integer page, Integer pageSize) {
        // Truncated for brevity, but includes calls to quanziMQService for like/dislike/love/comment/view operations
    }

    public String savePublish(String textContent, String location, String latitude, String longitude, MultipartFile[] multipartFile) {
        // ... publishing logic ...
        if (StrUtil.isNotEmpty(publishId)) {
            this.quanziMQService.publishMsg(publishId);
        }
        return publishId;
    }

    public Long likeComment(String publishId) {
        // ... like logic ...
        if (result) {
            this.quanziMQService.likePublishMsg(publishId);
            // ...
        }
        return null;
    }

    public Long disLikeComment(String publishId) {
        // ... dislike logic ...
        if (result) {
            this.quanziMQService.disLikePublishMsg(publishId);
            // ...
        }
        return null;
    }

    public Long loveComment(String publishId) {
        // ... love logic ...
        if (result) {
            this.quanziMQService.lovePublishMsg(publishId);
            // ...
        }
        return null;
    }

    public Long disLoveComment(String publishId) {
        // ... dislove logic ...
        if (result) {
            this.quanziMQService.disLovePublishMsg(publishId);
            // ...
        }
        return null;
    }

    public QuanZiVo queryById(String publishId) {
        Publish publish = this.quanZiApi.queryPublishById(publishId);
        if (publish == null) {
            return null;
        }
        this.quanziMQService.queryPublishMsg(publishId);
        return this.fillQuanZiVo(Arrays.asList(publish)).get(0);
    }

    public Boolean saveComments(String publishId, String content) {
        // ... comment logic ...
        if (result) {
            this.quanziMQService.commentPublishMsg(publishId);
        }
        return result;
    }

    // ... other methods ...
}

3.5 Receiving Messages

The message receiving task needs to be implemented in a new project, my-tanhua-recommend.

3.5.1 Creating the my-tanhua-recommend Project

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>my-tanhua</artifactId>
        <groupId>cn.itcast.tanhua</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>my-tanhua-recommend</artifactId>
    <dependencies>
        <dependency>
            <groupId>cn.itcast.tanhua</groupId>
            <artifactId>my-tanhua-dubbo-interface</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
        </dependency>
    </dependencies>

</project>

3.5.2 Configurasion File

application.properties

spring.application.name = itcast-rocketmq
server.port = 18082

rocketmq.name-server=192.168.31.81:9876
rocketmq.producer.group=tanhua

spring.data.mongodb.username=tanhua
spring.data.mongodb.password=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV
spring.data.mongodb.authentication-database=admin
spring.data.mongodb.database=tanhua
spring.data.mongodb.port=27017
spring.data.mongodb.host=192.168.31.81

3.5.3 Startup Class

package com.tanhua.recommend;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RecommendApplication {

    public static void main(String[] args) {
        SpringApplication.run(RecommendApplication.class, args);
    }
}

3.5.4 RecommendQuanZi Entity

The entity structure to be stored in MongoDB.

package com.tanhua.recommend.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.core.mapping.Document;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "recommend_quanzi")
public class RecommendQuanZi {

    private ObjectId id;
    private Long userId;
    private Long publishId;
    private Double score;
    private Long date;
}

3.5.5 QuanZiMsgConsumer

package com.tanhua.recommend.msg;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.tanhua.dubbo.server.pojo.Publish;
import com.tanhua.recommend.pojo.RecommendQuanZi;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "tanhua-quanzi", consumerGroup = "tanhua-quanzi-consumer")
@Slf4j
public class QuanZiMsgConsumer implements RocketMQListener<String> {

    @Autowired
    private MongoTemplate mongoTemplate;

    @Override
    public void onMessage(String msg) {
        try {
            JSONObject jsonObject = JSONUtil.parseObj(msg);

            Long userId = jsonObject.getLong("userId");
            Long date = jsonObject.getLong("date");
            String publishId = jsonObject.getStr("publishId");
            Long pid = jsonObject.getLong("pid");
            Integer type = jsonObject.getInt("type");

            RecommendQuanZi recommendQuanZi = new RecommendQuanZi();
            recommendQuanZi.setUserId(userId);
            recommendQuanZi.setId(ObjectId.get());
            recommendQuanZi.setDate(date);
            recommendQuanZi.setPublishId(pid);

            // 1-publish, 2-view, 3-like, 4-love, 5-comment, 6-unlike, 7-unlove
            switch (type) {
                case 1: {
                    Publish publish = this.mongoTemplate.findById(new ObjectId(publishId), Publish.class);
                    if (ObjectUtil.isNotEmpty(publish)) {
                        double score = 0d;
                        score += CollUtil.size(publish.getMedias());
                        int length = StrUtil.length(publish.getText());
                        if (length >= 0 && length < 50) {
                            score += 1;
                        } else if (length < 100) {
                            score += 2;
                        } else {
                            score += 3;
                        }
                        recommendQuanZi.setScore(score);
                    }
                    break;
                }
                case 2: {
                    recommendQuanZi.setScore(1d);
                    break;
                }
                case 3: {
                    recommendQuanZi.setScore(5d);
                    break;
                }
                case 4: {
                    recommendQuanZi.setScore(8d);
                    break;
                }
                case 5: {
                    recommendQuanZi.setScore(10d);
                    break;
                }
                case 6: {
                    recommendQuanZi.setScore(-5d);
                    break;
                }
                case 7: {
                    recommendQuanZi.setScore(-8d);
                    break;
                }
                default: {
                    recommendQuanZi.setScore(0d);
                    break;
                }
            }

            this.mongoTemplate.save(recommendQuanZi);
        } catch (Exception e) {
            log.error("Error processing message! msg = " + msg, e);
        }
    }
}

3.6 Testing

Test method: Use the APP to perform operations, and you can see data being written to MongoDB.

4. Deploying the Recommendation System

In the recommendation system, we will calculate the data previously written to the recommendation table using Spark. After the Spark calculation is complete, the results will be written to Redis for querying in the business system.

The recommendation service will be deployed using Docker:

# Pull the image
docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-quanzi:1.0

# Create the container
docker create --name tanhua-spark-quanzi \
  --env MONGODB_HOST=192.168.31.81 \
  --env MONGODB_PORT=27017 \
  --env MONGODB_USERNAME=tanhua \
  --env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
  --env MONGODB_DATABASE=tanhua \
  --env MONGODB_COLLECTION=recommend_quanzi \
  --env SCHEDULE_PERIOD=10 \
  --env REDIS_NODES="192.168.31.81:6379,192.168.31.81:6380,192.168.31.81:6381" \
  registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-quanzi:1.0

# Parameter explanation
# MONGODB_HOST: Address of the MongoDB service
# MONGODB_PORT: Port of the MongoDB service
# MONGODB_USERNAME: Authentication username for MongoDB
# MONGODB_PASSWORD: Authentication password for MongoDB
# MONGODB_DATABASE: Database to connect to in MongoDB
# MONGODB_COLLECTION: Operation table
# SCHEDULE_PERIOD: Interval for next execution in minutes, default is 10 minutes
# REDIS_NODES: Redis cluster address, can also use a single node

# Start the service, which will execute immediately and again after SCHEDULE_PERIOD
# docker start tanhua-spark-quanzi

# View logs
docker logs -f tanhua-spark-quanzi

# After execution, data will be written to Redis

Check if data already exists in Redis:

5. Short Video Recommendation

The implementation logic for short video recommendations is very similar to that of post recommendations.

5.1 Scoring Rules for Videos

  • Publish: +2
  • Like: +5
  • Comment: +10

5.2 Sending Messages

5.2.1 VideoMQService

package com.tanhua.server.service;

import com.alibaba.dubbo.config.annotation.Reference;
import com.tanhua.common.pojo.User;
import com.tanhua.common.utils.UserThreadLocal;
import com.tanhua.dubbo.server.api.VideoApi;
import com.tanhua.dubbo.server.pojo.Video;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
@Slf4j
public class VideoMQService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Reference(version = "1.0.0")
    private VideoApi videoApi;

    public Boolean videoMsg(String videoId) {
        return this.sendMsg(videoId, 1);
    }

    public Boolean likeVideoMsg(String videoId) {
        return this.sendMsg(videoId, 2);
    }

    public Boolean disLikeVideoMsg(String videoId) {
        return this.sendMsg(videoId, 3);
    }

    public Boolean commentVideoMsg(String videoId) {
        return this.sendMsg(videoId, 4);
    }

    private Boolean sendMsg(String videoId, Integer type) {
        try {
            User user = UserThreadLocal.get();
            Video video = this.videoApi.queryVideoById(videoId);

            Map<String, Object> msg = new HashMap<>();
            msg.put("userId", user.getId());
            msg.put("date", System.currentTimeMillis());
            msg.put("videoId", videoId);
            msg.put("vid", video.getVid());
            msg.put("type", type);

            this.rocketMQTemplate.convertAndSend("tanhua-video", msg);
        } catch (Exception e) {
            log.error("Failed to send message! videoId = " + videoId + ", type = " + type, e);
            return false;
        }

        return true;
    }
}

5.2.2 VideoService (Modified)

package com.tanhua.server.service;

// ... imports ...

@Service
@Slf4j
public class VideoService {

    // ... other dependencies ...

    @Autowired
    private VideoMQService videoMQService;

    public Boolean saveVideo(MultipartFile picFile, MultipartFile videoFile) {
        // ... upload logic ...
        if (StrUtil.isNotEmpty(videoId)) {
            this.videoMQService.videoMsg(videoId);
        }
        return StrUtil.isNotEmpty(videoId);
    }

    public Long likeComment(String videoId) {
        // ... like logic ...
        if (result) {
            this.videoMQService.likeVideoMsg(videoId);
            // ...
        }
        return null;
    }

    public Long disLikeComment(String videoId) {
        // ... dislike logic ...
        if (result) {
            this.videoMQService.disLikeVideoMsg(videoId);
            // ...
        }
        return null;
    }

    public Boolean saveComment(String videoId, String content) {
        Boolean result = this.quanZiService.saveComments(videoId, content);
        if (result) {
            this.videoMQService.commentVideoMsg(videoId);
        }
        return result;
    }

    // ... other methods ...
}

5.3 Receiving Messages

5.3.1 RecommendVideo Entity

package com.tanhua.recommend.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.bson.types.ObjectId;
import org.springframework.data.mongodb.core.mapping.Document;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "recommend_video")
public class RecommendVideo {

    private ObjectId id;
    private Long userId;
    private Long videoId;
    private Double score;
    private Long date;
}

5.3.2 VideoMsgConsumer

package com.tanhua.recommend.msg;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.tanhua.recommend.pojo.RecommendVideo;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.bson.types.ObjectId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "tanhua-video", consumerGroup = "tanhua-video-consumer")
@Slf4j
public class VideoMsgConsumer implements RocketMQListener<String> {

    @Autowired
    private MongoTemplate mongoTemplate;

    @Override
    public void onMessage(String msg) {
        try {
            JSONObject jsonObject = JSONUtil.parseObj(msg);
            Long userId = jsonObject.getLong("userId");
            Long vid = jsonObject.getLong("vid");
            Integer type = jsonObject.getInt("type");

            // 1-publish, 2-like, 3-unlike, 4-comment
            RecommendVideo recommendVideo = new RecommendVideo();
            recommendVideo.setUserId(userId);
            recommendVideo.setId(ObjectId.get());
            recommendVideo.setDate(System.currentTimeMillis());
            recommendVideo.setVideoId(vid);

            switch (type) {
                case 1: {
                    recommendVideo.setScore(2d);
                    break;
                }
                case 2: {
                    recommendVideo.setScore(5d);
                    break;
                }
                case 3: {
                    recommendVideo.setScore(-5d);
                    break;
                }
                case 4: {
                    recommendVideo.setScore(10d);
                    break;
                }
                default: {
                    recommendVideo.setScore(0d);
                    break;
                }
            }

            this.mongoTemplate.save(recommendVideo);

        } catch (Exception e) {
            log.error("Failed to process short video message~" + msg, e);
        }
    }
}

5.3.3 Testing

As shown, User 1 has operations like like, unlike, and comment on the video.

5.4 Deploying the Recommendation Service

# Pull the image
docker pull registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-video:1.0

# Create the container
docker create --name tanhua-spark-video \
  --env MONGODB_HOST=192.168.31.81 \
  --env MONGODB_PORT=27017 \
  --env MONGODB_USERNAME=tanhua \
  --env MONGODB_PASSWORD=l3SCjl0HvmSkTtiSbN0Swv40spYnHhDV \
  --env MONGODB_DATABASE=tanhua \
  --env MONGODB_COLLECTION=recommend_video \
  --env SCHEDULE_PERIOD=10 \
  --env REDIS_NODES="192.168.31.81:6379,192.168.31.81:6380,192.168.31.81:6381" \
  registry.cn-hangzhou.aliyuncs.com/itcast/tanhua-spark-video:1.0

# Start the service
docker start tanhua-spark-video

# View logs
docker logs -f tanhua-spark-video

Testing:

Related Articles

Designing Alertmanager Templates for Prometheus Notifications

How to craft Alertmanager templates to format alert messages, improving clarity and presentation. Alertmanager uses Go’s text/template engine with additional helper functions. Alerting rules referenc...

Deploying a Maven Web Application to Tomcat 9 Using the Tomcat Manager

Tomcat 9 does not provide a dedicated Maven plugin. The Tomcat Manager interface, however, is backward-compatible, so the Tomcat 7 Maven Plugin can be used to deploy to Tomcat 9. This guide shows two...

Skipping Errors in MySQL Asynchronous Replication

When a replica halts because the SQL thread encounters an error, you can resume replication by skipping the problematic event(s). Two common approaches are available. Methods to Skip Errors 1) Skip a...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.