Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
R
rocketmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
xrp
rocketmq
Commits
6fcfa643
Commit
6fcfa643
authored
Apr 19, 2021
by
xrp
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1
parents
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
493 additions
and
0 deletions
+493
-0
.gitignore
.gitignore
+33
-0
pom.xml
pom.xml
+71
-0
RocketmqApplication.java
src/main/java/com/example/demo/RocketmqApplication.java
+13
-0
HelloController.java
...ain/java/com/example/demo/controller/HelloController.java
+240
-0
Reliability.java
src/main/java/com/example/demo/controller/Reliability.java
+122
-0
application.properties
src/main/resources/application.properties
+1
-0
RocketmqApplicationTests.java
src/test/java/com/example/demo/RocketmqApplicationTests.java
+13
-0
No files found.
.gitignore
0 → 100644
View file @
6fcfa643
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
pom.xml
0 → 100644
View file @
6fcfa643
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0.0
</modelVersion>
<parent>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-parent
</artifactId>
<version>
2.4.4
</version>
<relativePath/>
<!-- lookup parent from repository -->
</parent>
<groupId>
com.example
</groupId>
<artifactId>
rocketmq
</artifactId>
<version>
0.0.1-SNAPSHOT
</version>
<packaging>
jar
</packaging>
<name>
rocketmq
</name>
<description>
Demo project for Spring Boot
</description>
<properties>
<java.version>
1.8
</java.version>
</properties>
<dependencies>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-test
</artifactId>
<scope>
test
</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>
org.apache.rocketmq
</groupId>
<artifactId>
rocketmq-client
</artifactId>
<version>
4.6.1
</version>
</dependency>
<!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
<dependency>
<groupId>
joda-time
</groupId>
<artifactId>
joda-time
</artifactId>
<version>
2.10
</version>
</dependency>
<!--一个好用的工具包,可以不引入-->
<!-- <dependency>-->
<!-- <groupId>cn.hutool</groupId>-->
<!-- <artifactId>hutool-all</artifactId>-->
<!-- <version>5.3.0</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
<dependency>
<groupId>
cn.hutool
</groupId>
<artifactId>
hutool-all
</artifactId>
<version>
5.5.4
</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-maven-plugin
</artifactId>
</plugin>
</plugins>
</build>
</project>
src/main/java/com/example/demo/RocketmqApplication.java
0 → 100644
View file @
6fcfa643
package
com
.
example
.
demo
;
import
org.springframework.boot.SpringApplication
;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
@SpringBootApplication
public
class
RocketmqApplication
{
public
static
void
main
(
String
[]
args
)
{
SpringApplication
.
run
(
RocketmqApplication
.
class
,
args
);
}
}
src/main/java/com/example/demo/controller/HelloController.java
0 → 100644
View file @
6fcfa643
package
com
.
example
.
demo
.
controller
;
import
org.apache.rocketmq.client.producer.SendCallback
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.joda.time.DateTime
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RestController
;
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.common.consumer.ConsumeFromWhere
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
java.util.List
;
import
java.util.concurrent.Semaphore
;
import
static
com
.
example
.
demo
.
controller
.
Reliability
.*;
@RestController
public
class
HelloController
{
/**
* http://localhost:8080/hello
* @return
*/
@RequestMapping
(
"/hello"
)
public
String
index
()
{
return
"Hello World"
;
}
private
static
int
count
=
100
;
/**
* http://localhost:8080/send
* http://122.112.203.214:8888/send
* 发送MQ 消息
* @return
*/
@RequestMapping
(
"/send"
)
public
void
send
(){
// 设置生产者组名
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"Producer"
);
// 指定nameServer的地址
producer
.
setNamesrvAddr
(
"172.16.11.248:8100"
);
// producer.setNamesrvAddr("172.16.11.248:8100;172.16.10.101:8100");
try
{
// 启动实例
producer
.
start
();
String
str
=
"1 Just for test."
;
Message
msg
=
new
Message
(
"miyapaytest"
,
"push"
,
"1"
,
str
.
getBytes
());
SendResult
result
=
producer
.
send
(
msg
);
System
.
out
.
println
(
"id:"
+
result
.
getMsgId
()
+
"msg:"
+
str
+
" result:"
+
result
.
getSendStatus
());
// msg = new Message("miyapaytest",
// "push",
// "2",
// "2 Just for test.".getBytes());
//
// result = producer.send(msg);
// System.out.println("id:" + result.getMsgId() +
// " result:" + result.getSendStatus());
//
// msg = new Message("miyapaytest",
// "push",
// "1",
// "3 Just for test.".getBytes());
result
=
producer
.
send
(
msg
);
System
.
out
.
println
(
"id:"
+
result
.
getMsgId
()
+
" "
+
result
.
getMessageQueue
().
toString
()
+
" result:"
+
result
.
getSendStatus
());
// final Semaphore semaphore = new Semaphore(0);
// for (int i = 0; i < count; i++) {
// Thread.sleep(3000);
// Message message = new Message("miyapaytest",
// "test_tag",
// ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//
// producer.send(message, new SendCallback() {
// public void onSuccess(SendResult sendResult) {
// System.out.println(String.format("message [%s] send success!", new String(message.getBody())));
// semaphore.release();
// }
//
// public void onException(Throwable throwable) {
// throwable.printStackTrace();
// }
// });
//
// }
// semaphore.acquire(count);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
//关闭生产者,释放资源
producer
.
shutdown
();
}
}
/**
* http://localhost:8080/consumer3
* http://122.112.203.214:8888/consumer3
* 消费消息
* @return
* Consumer类,消费者,接受处理消息
*/
@RequestMapping
(
"/consumer3"
)
public
String
consumer3
(){
//设置消费者组名
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"miyapaytest_consumer"
);
//指定nameServer的地址
consumer
.
setNamesrvAddr
(
"172.16.11.248:8100"
);
try
{
//订阅PushTopic下Tag为push的消息 //指定订阅的topic及tag表达式
// consumer.subscribe("miyapaytest", "push");
// consumer.subscribe("miyapaytest", "*");
consumer
.
subscribe
(
"miyapaytest"
,
"push"
);
//程序第一次启动从消息队列头取数据
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
public
ConsumeConcurrentlyStatus
consumeMessage
(
List
<
MessageExt
>
list
,
ConsumeConcurrentlyContext
Context
)
{
Message
msg
=
list
.
get
(
0
);
// System.out.println(msg.toString());
String
topic
=
msg
.
getTopic
();
// System.out.println("topic = " + topic);
byte
[]
body
=
msg
.
getBody
();
// System.out.println("body: " + new String(body));
String
keys
=
msg
.
getKeys
();
// System.out.println("keys = " + keys);
String
tags
=
msg
.
getTags
();
// System.out.println("tags = " + tags);
System
.
out
.
println
(
"topic = "
+
topic
+
"body: "
+
new
String
(
body
)
+
"keys = "
+
keys
+
"tags = "
+
tags
+
new
DateTime
().
toString
(
"yyyy-MM-dd HH:mm:ss"
));
System
.
out
.
println
(
"-----------------------------------------------"
+
new
DateTime
().
toString
(
"yyyy-MM-dd HH:mm:ss"
));
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
return
ConsumeConcurrentlyStatus
.
RECONSUME_LATER
;
}
}
);
//启动消费者实例
consumer
.
start
();
System
.
out
.
println
(
"Consumer Started."
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
"ok"
;
}
/**
* https://my.oschina.net/buru1kan/blog/1806173
*
*/
@RequestMapping
(
"/consumer2"
)
public
void
consumer2
(){
try
{
//设置消费者组名
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"ConsumerGroupName"
);
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
//指定nameServer的地址
consumer
.
setNamesrvAddr
(
"172.16.11.248:8100"
);
//指定订阅的topic及tag表达式
// consumer.subscribe("miyapaytest", "*");
consumer
.
subscribe
(
"miyapaytest_consumer"
,
"*"
);
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
public
ConsumeConcurrentlyStatus
consumeMessage
(
List
<
MessageExt
>
msgs
,
ConsumeConcurrentlyContext
context
)
{
MessageExt
messageExt
=
msgs
.
get
(
0
);
System
.
out
.
println
(
String
.
format
(
"Custome message [%s],tagName[%s]"
,
new
String
(
messageExt
.
getBody
()),
messageExt
.
getTags
()));
return
ConsumeConcurrentlyStatus
.
CONSUME_SUCCESS
;
}
});
//启动消费者实例
consumer
.
start
();
System
.
out
.
println
(
"Consumer Started."
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
}
/**
* http://localhost:8080/produce
* http://122.112.203.214:8888/produce
*/
@RequestMapping
(
"/produce"
)
public
String
test1
(){
try
{
produce
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
"ok"
;
}
/**
* http://localhost:8080/consume
* http://122.112.203.214:8888/consume
*/
@RequestMapping
(
"/consume"
)
public
String
test2
(){
try
{
consume
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
"ok"
;
}
/**
* http://122.112.203.214:8888/PullConsumer
*/
@RequestMapping
(
"/PullConsumer"
)
public
String
PullConsumer1
(){
try
{
PullConsumer
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
return
"ok"
;
}
}
\ No newline at end of file
src/main/java/com/example/demo/controller/Reliability.java
0 → 100644
View file @
6fcfa643
package
com
.
example
.
demo
.
controller
;
import
cn.hutool.core.collection.CollUtil
;
import
cn.hutool.core.lang.Console
;
import
org.apache.rocketmq.client.consumer.DefaultLitePullConsumer
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.common.consumer.ConsumeFromWhere
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
java.nio.charset.StandardCharsets
;
import
java.util.List
;
/**
* @author xrp
* @date 2021/4/14 9:39
*/
public
class
Reliability
{
// private static String topicName = "test"; // topic名
// private static String group = "test-group"; // 消费组名
// private static String namesrvAddress = "100.85.124.123:8600;100.85.126.201:8600"; // 元数据链接地址
private
static
String
topicName
=
"miyapaytest"
;
// topic名
private
static
String
group
=
"miyapaytest_consumer"
;
// 消费组名
private
static
String
namesrvAddress
=
"172.16.11.248:8100;172.16.10.101:8100"
;
// 元数据链接地址
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// produce();
// consume();
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
System
.
out
.
println
(
String
.
format
(
"message-%s-%d"
,
System
.
currentTimeMillis
(),
i
));
}
}
public
static
void
consume
()
throws
MQClientException
{
DefaultLitePullConsumer
consumer
=
new
DefaultLitePullConsumer
(
group
);
consumer
.
setAutoCommit
(
true
);
consumer
.
setNamesrvAddr
(
namesrvAddress
);
consumer
.
setInstanceName
(
"Test"
+
System
.
currentTimeMillis
());
consumer
.
subscribe
(
topicName
,
"*"
);
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
consumer
.
setAutoCommit
(
true
);
consumer
.
start
();
try
{
for
(
int
i
=
0
;
i
<
20
;
i
++)
{
List
<
MessageExt
>
messages
=
consumer
.
poll
();
messages
.
get
(
i
).
getBody
().
toString
();
System
.
out
.
printf
(
"%s%n"
,
messages
);
}
}
finally
{
consumer
.
shutdown
();
}
}
private
static
boolean
runFlag
=
true
;
public
static
void
PullConsumer
()
throws
MQClientException
{
DefaultLitePullConsumer
consumer
=
new
DefaultLitePullConsumer
(
group
);
consumer
.
setAutoCommit
(
true
);
consumer
.
setNamesrvAddr
(
namesrvAddress
);
consumer
.
setInstanceName
(
"Test"
+
System
.
currentTimeMillis
());
//要消费的topic,可使用tag进行简单过滤
consumer
.
subscribe
(
topicName
,
"*"
);
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
//设置是否自动提交offset,默认值是true
consumer
.
setAutoCommit
(
false
);
// //一次最大消费的条数
// consumer.setPullBatchSize(100);
// //无消息时,最大阻塞时间。默认5000 单位ms
// consumer.setPollTimeoutMillis(5000);
consumer
.
start
();
try
{
while
(
runFlag
){
try
{
//拉取消息,无消息时会阻塞
List
<
MessageExt
>
msgs
=
consumer
.
poll
();
if
(
CollUtil
.
isEmpty
(
msgs
)){
continue
;
}
//业务处理
msgs
.
forEach
(
msg
->
Console
.
log
(
new
String
(
msg
.
getBody
())));
//同步消费位置。不执行该方法,应用重启会存在重复消费。
//由于前面调用setAutoCommit方法将自动提交位点属性设置为false,
// 所以这里调用commitSync将消费位点提交到内存中的offsetstore,最终会通过定时任务将消费位点提交给broker
consumer
.
commitSync
();
/**
DefaultLitePullConsumer的实现提供了以下特性:
订阅方式消费消息支持消息队列负载均衡
分配方式消费消息支持收到分配消息队列,此时不支持负载均衡
提供了seek方法方便用户重置消费位点
提供commitSync方法方便用户手动提交消费位点
*/
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
}
}
finally
{
consumer
.
shutdown
();
}
}
public
static
void
produce
()
throws
Exception
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"Test"
+
System
.
currentTimeMillis
());
producer
.
setNamesrvAddr
(
namesrvAddress
);
producer
.
start
();
Message
message
;
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
message
=
new
Message
(
topicName
,
String
.
format
(
"message-%s-%d"
,
System
.
currentTimeMillis
(),
i
).
getBytes
(
StandardCharsets
.
UTF_8
));
System
.
out
.
println
(
producer
.
send
(
message
));
}
producer
.
shutdown
();
}
}
src/main/resources/application.properties
0 → 100644
View file @
6fcfa643
src/test/java/com/example/demo/RocketmqApplicationTests.java
0 → 100644
View file @
6fcfa643
package
com
.
example
.
demo
;
import
org.junit.jupiter.api.Test
;
import
org.springframework.boot.test.context.SpringBootTest
;
@SpringBootTest
class
RocketmqApplicationTests
{
@Test
void
contextLoads
()
{
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment