org.springframework.boot: 3.0.9 spring-cloud: 2022.0.4 spring-cloud-alibaba: 2022.0.0.0 rocketMq: 4.9.2 use spring-cloud-starter-stream-rocketmq
server上topic可以看见消息,消费者列表也有数据。但是一直无法消费,broker和server也没看见报错,java代码也没报错。 尝试和spring-cloud-stream4.x各种消费方法都无法消费(实在难以相信我写错了消费方法)。 最后我使用了其他的代码,降低了spring-cloud-stream 版本 到 3.x,使用@StreamListener注解消费成功。 请问 我要怎么样才可以在4.x上消费成功?
(The topic on the server can see messages, and the consumer list also has data. But it has been unable to consume, and the broker and server have not seen any errors, and the Java code has not reported any errors. Attempting and using various consumption methods such as spring cloud stream 4. x cannot consume (it's hard to believe I wrote the wrong consumption method). Finally, I used other code to reduce the spring cloud stream version to 3.x and successfully annotate consumption with @ StreamListener. May I ask how can I successfully consume on 4. x?)
please give me a help.
configspring:
cloud:
function:
# function定义,申明函数名称,只有消费者,与@Bean中的函数名对应起来,这是函数定义的地方,声明所有的函数名称。
# 这个不能注释,注释的话,消费组都不出来了,在service中定义了sendSmsToAdmin
definition: sendSmsToAdmin;sendSmsToUser #;sendSmsToAdminFunction;sendSmsToUserFunction
stream:
# 默认binder
default-binder: rocketmq
# 这里面要么声明的是消费者类 要么就是消费者类中的多个方法
function:
# stream Binding相关的函数定义,也需要声明使用到的函数名,需要确保消费者类的名称和这里一样,多个使用;区分
definition: sendSmsToAdmin;sendSmsToUser #;sendSmsToAdminFunction;sendSmsToUserFunction
rocketmq:
# 在 stream.rocketmq.bindings 中,我们可以选择只配置 RocketMQ 特有的绑定参数,比如 producer/consumer 的详细配置
# 在 stream.rocketmq.bindings 中没有配置 destination,而是放在了 stream.bindings 配置
# 比较通用可观
binder:
name-server: centos8-2:9876
group: producerGroup
# 用于绑定 RocketMQ 作为消息中间件的 Binding 配置
# bindings:
# # 生产者必须有一个group
# # 发给管理员
# sendSmsToAdmin-out-0:
# # rocketmq中配置了的bindings,下面的bindings不可以再配置
# # 这里不配置 destination,在stream.bindings中对应的sendSmsToAdmin-out-0中配置
# #destination: admin_sms_send
# producer:
# # group不能用中横线-
# group: producer_admin_sms_send_group
# sync: true
# sendMessageTimeout: 3000
# # 发给用户
# sendSmsToUser-out-0:
# #destination: user_sms_send
# producer:
# group: producer_user_sms_send_group
# sync: true
# sendMessageTimeout: 3000
#
# # 上面的两个使用工具类发送,下面的使用Bean发送的
# sendSmsToAdminFunction-out-0:
# producer:
# # group不能用中横线-
# group: producer_function_admin_sms_send_group
# sync: true
# sendMessageTimeout: 3000
# sendSmsToUserFunction-out-0:
# producer:
# # group不能用中横线-
# group: producer_function_user_sms_send_group
# sync: true
# sendMessageTimeout: 3000
# consumer:
# messageModel: BROADCASTING # 设置为广播消费
# 用于绑定通用消息中间件的 Binding 配置,不特定于某个中间件实现。rocketmq中配置了的bindings,下面的bindings不可以再配置
bindings:
# 配置生产者topic,group不能用中横线-,
sendSmsToAdmin-out-0:
destination: admin_sms_send
group: producer_admin_sms_send_group
binder: rocketmq
content-type: application/json
sendSmsToAdmin-in-0:
destination: admin_sms_send
group: producer_admin_sms_send_group
binder: rocketmq
content-type: application/json
# 配置生产者topic
sendSmsToUser-out-0:
destination: user_sms_send
group: producer_user_sms_send_group
binder: rocketmq
content-type: application/json
sendSmsToUser-in-0:
destination: user_sms_send
group: consumer_user_sms_send_group
binder: rocketmq
content-type: application/json
producer:
@Autowired
private StreamBridge streamBridge;
/**
* 这个方法和sendSmsToAdminFunction一样 都是发消息
*
* @param message
*/
public void sendSmsToAdmin(SmsSendMessage message) {
log.info("要发送的短信内容为: {}", message);
streamBridge.send("sendSmsToAdmin-out-0", message);
}
public void sendSmsToUser(Long userId, Long accountId) {
log.info("要发送的短信内容为: {}", "userId:" + userId + "accountId:" + accountId);
streamBridge.send("sendSmsToUser-out-0", "userId:" + userId + " accountId:" + accountId);
}
consumer:
@Bean
public Consumer<SmsSendMessage> sendSmsToAdmin() {
return request -> {
log.info("------received-------: {} ", request.getContent());
};
}
//
//
// @Bean
// public Consumer<Object> sendSmsToAdmin() {
// return request -> {
// log.info("------received-------: {} ", request);
// };
// }
//
@Bean
public Consumer<String> sendSmsToUser(){
return request -> {
log.info("-----received-------: {}", request);
};
}
pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mqtest</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.9</version>
</parent>
<properties>
<jeecgboot.version>3.5.3</jeecgboot.version>
<java.version>17</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- 微服务 -->
<spring-cloud.version>2022.0.4</spring-cloud.version>
<spring-cloud-alibaba.version>2022.0.0.0</spring-cloud-alibaba.version>
<alibaba.nacos.version>2.0.4</alibaba.nacos.version>
<xxl-job-core.version>2.4.0</xxl-job-core.version>
<fastjson.version>1.2.83</fastjson.version>
<guava.version>32.1.1-jre</guava.version>
<lombok.version>1.18.28</lombok.version>
<!-- A Java 1.6+ library providing a clean and lightweight markdown processor -->
<pegdown.version>1.6.0</pegdown.version>
<knife4j-spring-boot-starter.version>3.0.3</knife4j-spring-boot-starter.version>
<knife4j-spring-ui.version>2.0.9</knife4j-spring-ui.version>
<!-- 数据库驱动 -->
<postgresql.version>42.2.25</postgresql.version>
<ojdbc6.version>11.2.0.3</ojdbc6.version>
<sqljdbc4.version>4.0</sqljdbc4.version>
<mysql-connector-java.version>8.0.27</mysql-connector-java.version>
<hutool.version>5.8.20</hutool.version>
<commons-collection.version>4.4</commons-collection.version>
<snakeyaml.version>2.0</snakeyaml.version>
<!-- 持久层 -->
<mybatis-plus.version>3.5.3.1</mybatis-plus.version>
<dynamic-datasource-spring-boot-starter.version>4.1.2</dynamic-datasource-spring-boot-starter.version>
<druid.version>1.2.18</druid.version>
<minidao.version>1.9.1</minidao.version>
<!-- 积木报表-->
<jimureport-spring-boot-starter.version>1.5.9</jimureport-spring-boot-starter.version>
<commons.version>2.6</commons.version>
<aliyun-java-sdk-dysmsapi.version>2.1.0</aliyun-java-sdk-dysmsapi.version>
<aliyun.oss.version>3.11.2</aliyun.oss.version>
<!--##### 缓存相关 #####-->
<j2cache.version>2.8.0-release</j2cache.version>
<lettuce.version>5.3.7.RELEASE</lettuce.version>
<!-- 分布式锁,对应springboot2.0版本 -->
<redisson.version>3.12.5</redisson.version>
<!--对应 spring redis 里面lettuce 中的netty版本号 不然会冲突-->
<netty.version>4.1.96.Final</netty.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<commons-codec.version>1.15</commons-codec.version>
<commons-net.version>3.8.0</commons-net.version>
<dom4j.version>1.6.1</dom4j.version>
<!-- 反射 -->
<reflection.version>0.9.11</reflection.version>
<jsoup.version>1.13.1</jsoup.version>
<useragentutils.version>1.21</useragentutils.version>
<!-- shiro -->
<shiro.version>1.10.1</shiro.version>
<java-jwt.version>3.11.0</java-jwt.version>
<shiro-redis.version>3.1.0</shiro-redis.version>
<codegenerate.version>1.4.3</codegenerate.version>
<autopoi-web.version>1.4.5</autopoi-web.version>
<minio.version>8.0.3</minio.version>
<justauth-spring-boot-starter.version>1.4.0</justauth-spring-boot-starter.version>
<qiniu-java-sdk.version>7.11.0</qiniu-java-sdk.version>
<!-- Log4j2爆雷漏洞 -->
<log4j2.version>2.17.0</log4j2.version>
<logback.version>1.2.9</logback.version>
<maven.test.skip>true</maven.test.skip>
<docker-maven-plugin.version>1.2.0</docker-maven-plugin.version>
<docker.baseImage>openjdk:8-jre-alpine</docker.baseImage>
<docker.volumes>/tmp</docker.volumes>
<docker.image.prefix>hub.mall.com:8080/mallcloud</docker.image.prefix>
<docker.java.security.egd>-Djava.security.egd=file:/dev/./urandom</docker.java.security.egd>
<docker.java.opts>-Xms128m -Xmx128m</docker.java.opts>
</properties>
<repositories>
<repository>
<id>aliyun</id>
<name>aliyun Repository</name>
<url>https://maven.aliyun.com/repository/public</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.7</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.4.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<exclusions>
<exclusion>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter-stream-kafka</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- spring-cloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- spring-cloud-alibaba -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- apache工具包 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${commons-collection.version}</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<!-- 指定JDK编译版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
<compilerVersion>17</compilerVersion>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 打包跳过测试 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- 避免font文件的二进制文件格式压缩破坏 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<nonFilteredFileExtensions>
<nonFilteredFileExtension>woff</nonFilteredFileExtension>
<nonFilteredFileExtension>woff2</nonFilteredFileExtension>
<nonFilteredFileExtension>eot</nonFilteredFileExtension>
<nonFilteredFileExtension>ttf</nonFilteredFileExtension>
<nonFilteredFileExtension>svg</nonFilteredFileExtension>
</nonFilteredFileExtensions>
</configuration>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
<include>**/*.json</include>
<include>**/*.ftl</include>
</includes>
</resource>
</resources>
</build>
</project>