Commit 9cf52d9f authored by zhouxu's avatar zhouxu

Initial commit

parents
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**
!**/src/test/**
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
### VS Code ###
.vscode/
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.1.RELEASE</version>
</parent>
<groupId>com.miya.huihua</groupId>
<artifactId>microservice-base</artifactId>
<version>0.0.2-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<skipTests>true</skipTests>
</properties>
<repositories>
<repository>
<id>releases</id>
<url>https://nexus.infra.miyatech.com/repository/yingxiaoplat/</url>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>snapshots</id>
<url>https://nexus.infra.miyatech.com/repository/yingxiaoplat/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>com.miya.huihua</groupId>
<artifactId>huihua-commons-base</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<phase>none</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
<pluginRepositories>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</pluginRepository>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
package com.miya.huihua.microservicebase.common;
/**
* @author zhouxu
* @createTime 2020/01/19
*/
public interface Constants {
public static final String TAG_ALL = "*";
public static final Integer ONS_MAX_RETRY_TIMES = 10;
}
package com.miya.huihua.microservicebase.common;
import com.aliyun.openservices.shade.com.google.common.base.Strings;
/**
* 返回码
*
* @author liwei
* @date 18/3/6 下午4:25
*/
public enum ResultCode {
SUCCESS("200", "SUCCESS"),
SIGN_ERROR("C102", "签名错误"),
INVALID_PARAMS("101", "参数不正确"),
UNAUTHORIZED("401", "用户未登录,请登录后再操作"),
FORBIDDEN("403", "抱歉,您没有对应的操作权限"),
ACCESS_OVER_LIMIT("4009", "访问超限,请稍后重试");
private String code;
private String msg;
ResultCode(String code, String msg) {
this.code = code;
this.msg = msg;
}
public String getCode() {
return code;
}
public String getMsg() {
return msg;
}
public static final ResultCode find(String code) {
if (Strings.isNullOrEmpty(code)) {
return null;
}
for (ResultCode resultCode : ResultCode.values()) {
if (resultCode.code.equals(code)) {
return resultCode;
}
}
return null;
}
}
package com.miya.huihua.microservicebase.data.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* @author zhouxu
* @createTime 2020/01/17
* 订阅消息配置
*/
@Configuration
@ConfigurationProperties("ali-ons")
@Data
public class MultiOnsProperties {
/**
* ONSAddr
*/
private String onsAddr;
/**
* AccessKey
*/
private String accessKey;
/**
* SecretKey
*/
private String secretKey;
/**
* 消息默认超时时间
*/
private long TimeoutMillis = 3000L;
/**
* topic 与 groupid 配置
*/
private List<OnsProperties> onsProperties;
}
package com.miya.huihua.microservicebase.data.config;
import lombok.Data;
/**
* @author zhouxu
* @createTime 2020/01/17
*/
@Data
public class OnsProperties {
/**
* topic
*/
private String topic;
/**
* GROUP_ID
*/
private String groupId;
/**
* 启动消费者
*/
private Boolean consumerEnable = true;
/**
* 启动生产者
*/
private Boolean producerEnable = true;
}
package com.miya.huihua.microservicebase.mq;
import com.aliyun.openservices.ons.api.*;
import com.miya.huihua.commons.exceptions.HuihuaException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.miya.huihua.microservicebase.common.Constants.ONS_MAX_RETRY_TIMES;
import static com.miya.huihua.microservicebase.common.Constants.TAG_ALL;
import static com.miya.huihua.microservicebase.common.ResultCode.INVALID_PARAMS;
/**
* @author zhouxu
* @createTime 2020/01/19
*/
@Slf4j
public abstract class AliConsumer implements SmartLifecycle {
@Autowired(required = false)
private OnsManager onsManager;
/**
* 订阅消息的topic
*
* @return
*/
public abstract String getTopic();
/**
* 订阅消息的tag
*
* @return
*/
public abstract List<String> getTags();
public volatile boolean isRunning = false;
/**
* 消息处理
*
* @param tag
* @param message
*/
public abstract void consumerMessage(String tag, String message);
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable runnable) {
runnable.run();
isRunning = false;
}
@Override
public void start() {
Consumer consumer = getConsumer(getTopic());
if (!consumer.isStarted()) {
//开始订阅消息
consumer.subscribe(getTopic(), getTags().isEmpty() ? TAG_ALL : getTags().stream().collect(Collectors.joining("||")),
new OnsMessageListener((tag, messageBody) -> {
consumerMessage(tag, messageBody);
}));
consumer.start();
log.info("订阅topic:{},tag:{} 的消息队列已启动",getTopic(),getTags().isEmpty() ? TAG_ALL : getTags().stream().collect(Collectors.joining("||")));
}
}
@Override
public void stop() {
ConcurrentHashMap<String, Consumer> concurrentHashMap = onsManager.getConsumerConcurrentHashMap();
concurrentHashMap.forEach((k, v) -> v.shutdown());
}
@Override
public boolean isRunning() {
return false;
}
@Override
public int getPhase() {
return 1;
}
/**
* 消息通用处理
*/
private static class OnsMessageListener implements MessageListener {
ConsumerAction consumerAction;
public OnsMessageListener(ConsumerAction consumerAction) {
this.consumerAction = consumerAction;
}
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
int reConsumeTimes = message.getReconsumeTimes();
if (reConsumeTimes >= ONS_MAX_RETRY_TIMES) {
log.warn("消息重复消费次数达到上限,将被丢弃,消息:{}", message);
return Action.CommitMessage;
}
String messageBody = null;
try {
messageBody = new String(message.getBody());
consumerAction.consume(message.getTag(), messageBody);
} catch (HuihuaException e) {
log.error("mq消息消费异常,消息:{},HuihuaException:{}", messageBody, e);
if (INVALID_PARAMS.getCode().equals(e.getCode())) {
//如果是参数错误,重发消息是没用的
return Action.CommitMessage;
} else {
return Action.ReconsumeLater;
}
} catch (Exception e) {
log.error("mq消息消费异常,消息:{},Exception:{}", messageBody, e);
return Action.ReconsumeLater;
}
return Action.CommitMessage;
}
}
protected Consumer getConsumer(String topic) {
ConcurrentHashMap<String, Consumer> concurrentHashMap = onsManager.getConsumerConcurrentHashMap();
return concurrentHashMap.get(topic);
}
}
package com.miya.huihua.microservicebase.mq;
import com.aliyun.openservices.ons.api.SendCallback;
/**
* @author zhouxu
* @createTime 2020/01/17
*/
public interface AliProducer {
/**
* 发送普通消息
*
* @param topic
* @param o
* @param tags 为空发送*消息
*/
void putEvent(String topic, Object o, String... tags);
/**
* 发送异步消息
*
* @param topic
* @param o
* @param sendCallback
* @param tags 为空发送*消息
*/
void putAsyncEvent(String topic, Object o, SendCallback sendCallback, String... tags);
}
package com.miya.huihua.microservicebase.mq;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.miya.huihua.microservicebase.common.Constants.TAG_ALL;
/**
* @author zhouxu
* @createTime 2020/01/17
*/
@Service
@Slf4j
public class AliProducerImpl implements AliProducer {
@Autowired(required = false)
private OnsManager onsManager;
@Override
public void putEvent(String topic, Object o, String... tags) {
Producer producer = getProducer(topic);
Message message = new Message(topic, tags.length < 1 ? TAG_ALL : Arrays.asList(tags).stream().collect(Collectors.joining("||")), JSON.toJSONString(o).getBytes());
try {
SendResult sendResult = producer.send(message);
if (sendResult != null) {
log.info("send msg success,Topic is :{},getMessageId is :{}", sendResult.getTopic(), sendResult.getMessageId());
}
} catch (Exception e) {
log.error("Send mq message failed. Topic is:{}, Reson is : {}", topic, e.getMessage());
e.printStackTrace();
}
}
@Override
public void putAsyncEvent(String topic, Object o, SendCallback sendCallback, String... tags) {
Producer producer = getProducer(topic);
Message message = new Message(topic, tags.length < 1 ? TAG_ALL : Arrays.asList(tags).stream().collect(Collectors.joining("||")), JSON.toJSONString(o).getBytes());
producer.sendAsync(message, sendCallback);
}
private Producer getProducer(String topic) {
ConcurrentHashMap<String, Producer> concurrentHashMap = onsManager.getProducerConcurrentHashMap();
return concurrentHashMap.get(topic);
}
}
package com.miya.huihua.microservicebase.mq;
/**
* @author zhouxu
* @createTime 2020/01/19
*/
public interface ConsumerAction {
/**
* 处理消息
*
* @param tag
* @param messageBody
* @throws Exception
*/
void consume(String tag, String messageBody) throws Exception;
}
package com.miya.huihua.microservicebase.mq;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.miya.huihua.microservicebase.data.config.MultiOnsProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author zhouxu
* @createTime 2020/01/17
*/
@Component
@Slf4j
public class OnsManager implements SmartLifecycle {
@Autowired(required = false)
private MultiOnsProperties multiOnsProperties;
public volatile boolean isRunning = false;
/**
* 生产者
*/
private ConcurrentHashMap<String, Producer> producerConcurrentHashMap = new ConcurrentHashMap<>();
/**
* 消费者
*/
private ConcurrentHashMap<String, Consumer> consumerConcurrentHashMap = new ConcurrentHashMap<>();
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable runnable) {
runnable.run();
isRunning = false;
}
@Override
public void start() {
if (multiOnsProperties == null || multiOnsProperties.getOnsProperties() == null || multiOnsProperties.getOnsProperties().isEmpty()) {
log.debug("未获取到正确的ONS配置...");
isRunning = false;
return;
}
multiOnsProperties.getOnsProperties().stream().forEach(e -> {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ONSAddr, multiOnsProperties.getOnsAddr());
properties.setProperty(PropertyKeyConst.AccessKey, multiOnsProperties.getAccessKey());
properties.setProperty(PropertyKeyConst.SecretKey, multiOnsProperties.getSecretKey());
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, String.valueOf(multiOnsProperties.getTimeoutMillis()));
// 您在控制台创建的 Group ID
if (e.getProducerEnable()) {
properties.setProperty(PropertyKeyConst.GROUP_ID, e.getGroupId());
Producer producer = ONSFactory.createProducer(properties);
producer.start();
producerConcurrentHashMap.putIfAbsent(e.getTopic(), producer);
}
if (e.getConsumerEnable()) {
Consumer consumer = ONSFactory.createConsumer(properties);
consumerConcurrentHashMap.putIfAbsent(e.getTopic(), consumer);
}
});
isRunning = true;
}
@Override
public void stop() {
producerConcurrentHashMap.forEach((k, v) -> v.shutdown());
}
@Override
public boolean isRunning() {
return isRunning;
}
@Override
public int getPhase() {
return 0;
}
public ConcurrentHashMap<String, Producer> getProducerConcurrentHashMap() {
return producerConcurrentHashMap;
}
public ConcurrentHashMap<String, Consumer> getConsumerConcurrentHashMap() {
return consumerConcurrentHashMap;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment