Commit a5d0a2f0 authored by mm's avatar mm

demo

parent 6fcfa643
......@@ -31,6 +31,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
......
package com.example.demo.controller;
import com.example.demo.mq.ConsumerConfiguration;
import com.example.demo.mq.producer.LedgerMQProducer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @author mm
* @Date 2021-04-19 16:19
*/
@RestController
public class TestController {
@Autowired
private LedgerMQProducer ledgerMQProducer;
@PostMapping("/mmsend")
public String test(@RequestBody String payload) throws MQClientException {
return ledgerMQProducer.sendMessage(payload);
}
@PostMapping("/mmsub")
public String test1(@RequestBody String payload) throws MQClientException {
new ConsumerConfiguration();
return "success";
}
}
/*
* Miya.com Inc.
* Copyright (c) 2004-2018 All Rights Reserved.
*/
package com.example.demo.mq;
import com.example.demo.mq.listener.LedgerConsumerListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConsumerConfiguration {
@Autowired
private LedgerConsumerListener ledgerConsumerListener;
/**
* 每配置一个代表一个consumer,可以配置多个
*
* @return
*/
@Bean(name = "consumerPayNotice", initMethod = "start", destroyMethod = "shutdown")
DefaultMQPushConsumer consumerPayNotice() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("miyapaytest_consumer");
consumer.setNamesrvAddr("172.16.10.101:8100");
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅主题和 标签( * 代表所有标签)下信息
consumer.subscribe("miyapaytest", "*");
consumer.registerMessageListener(ledgerConsumerListener);
log.info("消费者启动成功");
return consumer;
}
}
/*
* Miya.com Inc.
* Copyright (c) 2004-2018 All Rights Reserved.
*/
package com.example.demo.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class ProducerConfiguration {
/**
* 一个应用只配置一个
* @return
*/
@Qualifier(value = "ledgerProducer")
@Bean(initMethod = "start", destroyMethod = "shutdown")
DefaultMQProducer ledgerProducer() {
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.setNamesrvAddr("172.16.10.101:8100");
log.info("生产这启动成功");
return producer;
}
}
\ No newline at end of file
/*
* Miya.com Inc. Copyright (c) 2004-2018 All Rights Reserved.
*/
package com.example.demo.mq.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
@Slf4j
@Component
public class LedgerConsumerListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
try {
log.info("消费次数: {}, 内容: {}", msg.getReconsumeTimes(), new String(msg.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
package com.example.demo.mq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.UUID;
@Slf4j
@Component
public class LedgerMQProducer {
@Resource
private DefaultMQProducer ledgerProducer;
@Async
public String sendMessage(String message) throws MQClientException {
// DefaultMQProducer ledgerProducer = new DefaultMQProducer("Producer_mm");
// ledgerProducer.setNamesrvAddr("172.16.10.101:8100");
// ledgerProducer.start();
Message msg = new Message(
"miyapaytest",
"push",
UUID.randomUUID().toString(),
message.getBytes());
try {
SendResult sendResult = ledgerProducer.send(msg);
assert sendResult != null;
log.info("发送分账消息成功:" + sendResult.getMsgId());
return sendResult.getMsgId();
} catch (Exception e) {
e.printStackTrace();
}
// finally {
// ledgerProducer.shutdown();
// }
return "";
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment