Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
R
rocketmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
xrp
rocketmq
Commits
5ed30181
Commit
5ed30181
authored
Apr 29, 2021
by
mm
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
demo
parent
3288397b
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
42 additions
and
14 deletions
+42
-14
TestController.java
...main/java/com/example/demo/controller/TestController.java
+9
-7
ConsumerConfiguration.java
src/main/java/com/example/demo/mq/ConsumerConfiguration.java
+3
-3
LedgerConsumerListener.java
.../com/example/demo/mq/listener/LedgerConsumerListener.java
+2
-2
LedgerMQProducer.java
...n/java/com/example/demo/mq/producer/LedgerMQProducer.java
+28
-2
No files found.
src/main/java/com/example/demo/controller/TestController.java
View file @
5ed30181
package
com
.
example
.
demo
.
controller
;
package
com
.
example
.
demo
.
controller
;
import
com.example.demo.mq.ConsumerConfiguration
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.example.demo.mq.producer.LedgerMQProducer
;
import
com.example.demo.mq.producer.LedgerMQProducer
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
@@ -20,12 +21,13 @@ public class TestController {
...
@@ -20,12 +21,13 @@ public class TestController {
@PostMapping
(
"/mmsend"
)
@PostMapping
(
"/mmsend"
)
public
String
test
(
@RequestBody
String
payload
)
throws
MQClientException
{
public
String
test
(
@RequestBody
String
payload
)
throws
MQClientException
{
return
ledgerMQProducer
.
sendMessage
(
payload
);
JSONObject
jsonObject
=
JSON
.
parseObject
(
payload
);
int
level
=
jsonObject
.
getIntValue
(
"level"
);
return
ledgerMQProducer
.
sendMessage
(
payload
,
level
);
}
}
@PostMapping
(
"/mmsub"
)
// @PostMapping("/mmsend1")
public
String
test1
(
@RequestBody
String
payload
)
throws
MQClientException
{
// public String test1(@RequestBody String payload) throws MQClientException {
new
ConsumerConfiguration
();
// return ledgerMQProducer.sendMessage1(payload);
return
"success"
;
// }
}
}
}
src/main/java/com/example/demo/mq/ConsumerConfiguration.java
View file @
5ed30181
...
@@ -9,7 +9,6 @@ import lombok.extern.slf4j.Slf4j;
...
@@ -9,7 +9,6 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.consumer.ConsumeFromWhere
;
import
org.apache.rocketmq.common.consumer.ConsumeFromWhere
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
...
@@ -21,6 +20,7 @@ public class ConsumerConfiguration {
...
@@ -21,6 +20,7 @@ public class ConsumerConfiguration {
@Autowired
@Autowired
private
LedgerConsumerListener
ledgerConsumerListener
;
private
LedgerConsumerListener
ledgerConsumerListener
;
/**
/**
* 每配置一个代表一个consumer,可以配置多个
* 每配置一个代表一个consumer,可以配置多个
*
*
...
@@ -33,7 +33,7 @@ public class ConsumerConfiguration {
...
@@ -33,7 +33,7 @@ public class ConsumerConfiguration {
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_LAST_OFFSET
);
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_LAST_OFFSET
);
//订阅主题和 标签( * 代表所有标签)下信息
//订阅主题和 标签( * 代表所有标签)下信息
consumer
.
subscribe
(
"miyapaytest"
,
"
*
"
);
consumer
.
subscribe
(
"miyapaytest"
,
"
tag
"
);
//设置多线程消费
//设置多线程消费
/**
/**
...
@@ -59,7 +59,7 @@ public class ConsumerConfiguration {
...
@@ -59,7 +59,7 @@ public class ConsumerConfiguration {
consumer
.
registerMessageListener
(
ledgerConsumerListener
);
consumer
.
registerMessageListener
(
ledgerConsumerListener
);
log
.
info
(
"消费者启动成功"
);
log
.
info
(
"
ledger
消费者启动成功"
);
return
consumer
;
return
consumer
;
}
}
...
...
src/main/java/com/example/demo/mq/listener/LedgerConsumerListener.java
View file @
5ed30181
...
@@ -23,12 +23,12 @@ public class LedgerConsumerListener implements MessageListenerConcurrently {
...
@@ -23,12 +23,12 @@ public class LedgerConsumerListener implements MessageListenerConcurrently {
public
ConsumeConcurrentlyStatus
consumeMessage
(
List
<
MessageExt
>
list
,
ConsumeConcurrentlyContext
consumeConcurrentlyContext
)
{
public
ConsumeConcurrentlyStatus
consumeMessage
(
List
<
MessageExt
>
list
,
ConsumeConcurrentlyContext
consumeConcurrentlyContext
)
{
for
(
MessageExt
msg
:
list
)
{
for
(
MessageExt
msg
:
list
)
{
try
{
try
{
log
.
info
(
"
消费次数: {}, 内容: {}"
,
msg
.
getReconsumeTimes
(),
new
String
(
msg
.
getBody
(),
"UTF-8"
));
log
.
info
(
"
ledger 消费次数: {}, 内容: {}, tag:{}"
,
msg
.
getReconsumeTimes
(),
new
String
(
msg
.
getBody
(),
"UTF-8"
),
msg
.
getTags
(
));
}
catch
(
UnsupportedEncodingException
e
)
{
}
catch
(
UnsupportedEncodingException
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
return
ConsumeConcurrentlyStatus
.
RECONSUME_LATER
;
return
ConsumeConcurrentlyStatus
.
RECONSUME_LATER
;
}
}
}
}
return
ConsumeConcurrentlyStatus
.
RECONSUME_LATER
;
return
ConsumeConcurrentlyStatus
.
CONSUME_SUCCESS
;
}
}
}
}
src/main/java/com/example/demo/mq/producer/LedgerMQProducer.java
View file @
5ed30181
...
@@ -19,13 +19,39 @@ public class LedgerMQProducer {
...
@@ -19,13 +19,39 @@ public class LedgerMQProducer {
@Resource
@Resource
private
DefaultMQProducer
ledgerProducer
;
private
DefaultMQProducer
ledgerProducer
;
@Async
@Async
public
String
sendMessage
(
String
message
)
throws
MQClientException
{
public
String
sendMessage
(
String
message
,
int
level
)
throws
MQClientException
{
// DefaultMQProducer ledgerProducer = new DefaultMQProducer("Producer_mm");
// DefaultMQProducer ledgerProducer = new DefaultMQProducer("Producer_mm");
// ledgerProducer.setNamesrvAddr("172.16.10.101:8100");
// ledgerProducer.setNamesrvAddr("172.16.10.101:8100");
// ledgerProducer.start();
// ledgerProducer.start();
Message
msg
=
new
Message
(
Message
msg
=
new
Message
(
"miyapaytest"
,
"miyapaytest"
,
"push"
,
"tag"
,
UUID
.
randomUUID
().
toString
(),
message
.
getBytes
());
msg
.
setDelayTimeLevel
(
level
);
try
{
SendResult
sendResult
=
ledgerProducer
.
send
(
msg
);
assert
sendResult
!=
null
;
log
.
info
(
"发送分账消息成功:"
+
sendResult
.
getMsgId
());
return
sendResult
.
getMsgId
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
// finally {
// ledgerProducer.shutdown();
// }
return
""
;
}
@Async
public
String
sendMessage1
(
String
message
)
throws
MQClientException
{
// DefaultMQProducer ledgerProducer = new DefaultMQProducer("Producer_mm");
// ledgerProducer.setNamesrvAddr("172.16.10.101:8100");
// ledgerProducer.start();
Message
msg
=
new
Message
(
"miyapaytest"
,
"tag1"
,
UUID
.
randomUUID
().
toString
(),
UUID
.
randomUUID
().
toString
(),
message
.
getBytes
());
message
.
getBytes
());
try
{
try
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment