Commit ef22b5d5 authored by zhouxu's avatar zhouxu

增加延时消息

parent 847c67fb
...@@ -27,4 +27,15 @@ public interface AliProducer { ...@@ -27,4 +27,15 @@ public interface AliProducer {
*/ */
void putAsyncEvent(String topic, Object o, SendCallback sendCallback, String... tags); void putAsyncEvent(String topic, Object o, SendCallback sendCallback, String... tags);
/**
* 发送延时消息
*
* @param topic
* @param o
* @param sendCallback
* @param delayTime 指定执行的时间
* @param tags
*/
void putDelayEvent(String topic, Object o, SendCallback sendCallback, Long delayTime, String... tags);
} }
...@@ -30,10 +30,9 @@ public class AliProducerImpl implements AliProducer { ...@@ -30,10 +30,9 @@ public class AliProducerImpl implements AliProducer {
@Override @Override
public void putEvent(String topic, Object o, String... tags) { public void putEvent(String topic, Object o, String... tags) {
Producer producer = getProducer(topic);
Message message = new Message(topic, tags.length < 1 ? TAG_ALL : Arrays.asList(tags).stream().collect(Collectors.joining("||")), JSON.toJSONString(o).getBytes());
try { try {
SendResult sendResult = producer.send(message); SendResult sendResult = sendMessage(topic, getMessage(topic, o, tags));
if (sendResult != null) { if (sendResult != null) {
log.info("send msg success,Topic is :{},getMessageId is :{}", sendResult.getTopic(), sendResult.getMessageId()); log.info("send msg success,Topic is :{},getMessageId is :{}", sendResult.getTopic(), sendResult.getMessageId());
} }
...@@ -45,11 +44,54 @@ public class AliProducerImpl implements AliProducer { ...@@ -45,11 +44,54 @@ public class AliProducerImpl implements AliProducer {
@Override @Override
public void putAsyncEvent(String topic, Object o, SendCallback sendCallback, String... tags) { public void putAsyncEvent(String topic, Object o, SendCallback sendCallback, String... tags) {
sendAsyncMessage(topic, getMessage(topic, o, tags), sendCallback);
}
@Override
public void putDelayEvent(String topic, Object o, SendCallback sendCallback, Long delayTime, String... tags) {
Message message = getMessage(topic, o, tags);
message.setStartDeliverTime(delayTime);
if (sendCallback != null) {
sendAsyncMessage(topic, getMessage(topic, o, tags), sendCallback);
} else {
SendResult sendResult = sendMessage(topic, getMessage(topic, o, tags));
if (sendResult != null) {
log.info("send msg success,Topic is :{},getMessageId is :{}", sendResult.getTopic(), sendResult.getMessageId());
}
}
}
/**
* 发送异步消息
*
* @param topic
* @param message
* @param sendCallback
*/
public void sendAsyncMessage(String topic, Message message, SendCallback sendCallback) {
Producer producer = getProducer(topic); Producer producer = getProducer(topic);
Message message = new Message(topic, tags.length < 1 ? TAG_ALL : Arrays.asList(tags).stream().collect(Collectors.joining("||")), JSON.toJSONString(o).getBytes());
producer.sendAsync(message, sendCallback); producer.sendAsync(message, sendCallback);
} }
/**
* 发送同步消息
*
* @param topic
* @param message
* @return
*/
public SendResult sendMessage(String topic, Message message) {
Producer producer = getProducer(topic);
SendResult sendResult = producer.send(message);
return sendResult;
}
private Message getMessage(String topic, Object o, String... tags) {
Message message = new Message(topic, tags.length < 1 ? TAG_ALL : Arrays.asList(tags).stream().collect(Collectors.joining("||")), JSON.toJSONString(o).getBytes());
return message;
}
private Producer getProducer(String topic) { private Producer getProducer(String topic) {
ConcurrentHashMap<String, Producer> concurrentHashMap = onsManager.getProducerConcurrentHashMap(); ConcurrentHashMap<String, Producer> concurrentHashMap = onsManager.getProducerConcurrentHashMap();
return concurrentHashMap.get(topic); return concurrentHashMap.get(topic);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment