基于servlet3.0异步的消息推送DEMO
1.利用servlet3.0实现服务器推送技术2.基于http协议
3.tomcat7.*,weblogic12c
请多指教小弟
1.监听器
package com.chinaMoney.mspush.listener;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import com.chinaMoney.util.Constants;
import com.chinaMoney.util.JsonUtil;
public class PushMesBaseListener implements AsyncListener {
//异步上下文
private AsyncContext acontext;
private ConcurrentLinkedQueue<AsyncContext> contextQueue;
public PushMesBaseListener(AsyncContext acontext,
ConcurrentLinkedQueue<AsyncContext> contextQueue) {
this.acontext = acontext;
this.contextQueue = contextQueue;
}
public AsyncContext getAcontext() {
return acontext;
}
public void setAcontext(AsyncContext acontext) {
this.acontext = acontext;
}
public ConcurrentLinkedQueue<AsyncContext> getContextQueue() {
return contextQueue;
}
public void setContextQueue(ConcurrentLinkedQueue<AsyncContext> contextQueue) {
this.contextQueue = contextQueue;
}
@Override
public void onComplete(AsyncEvent event) throws IOException {
//清空缓冲区
AsyncContext acontext = event.getAsyncContext();
if(acontext != null) {
ServletResponse response = acontext.getResponse();
if(response != null) {
PrintWriter out = response.getWriter();
if(out != null) {
out.flush();
}
}
}
}
@Override
public void onError(AsyncEvent event) throws IOException {
if(this.contextQueue != null && this.contextQueue.size() > 0) {
this.contextQueue.remove(event.getAsyncContext());
}
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
AsyncContext acontext = event.getAsyncContext();
if(acontext != null) {
ServletResponse response = acontext.getResponse();
if(response != null) {
PrintWriter out = response.getWriter();
if(out != null) {
out.println(JsonUtil.getJson4SrvState(Constants.PUSH_MES_TIMEOUT,""));
out.close();
}
}
}
//移除异步上下文
if(this.contextQueue != null && this.contextQueue.size() > 0) {
this.contextQueue.remove(event.getAsyncContext());
}
}
}
2.异步servlet,子类实现注入对应的context队列
package com.chinaMoney.mspush.servlet;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.chinaMoney.mspush.listener.PushMesBaseListener;
import com.chinaMoney.util.Constants;
/**
*
* @author zwm
*
*/
public abstract class PushMesBaseServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* 设置要监听的异步上下文队列
* @return
*/
protected abstract ConcurrentLinkedQueue<AsyncContext> setContextQueue();
private ConcurrentLinkedQueue<AsyncContext> getContextQueue() {
return setContextQueue();
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setContentType("text/html;charset=UTF-8");
resp.setHeader("Cache-Control", "private");
resp.setHeader("Pragma", "no-cache");
req.setCharacterEncoding("UTF-8");
AsyncContext asyncContext = req.startAsync();
//异步连接超时时间
asyncContext.setTimeout(Constants.ASYNCTIMEOUT);
//注册监听器
if(getContextQueue() != null) {
asyncContext.addListener(new PushMesBaseListener(asyncContext,getContextQueue()));
getContextQueue().add(asyncContext);
}
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
doGet(req,resp);
}
}
3.推送父类
package com.chinaMoney.mspush.thread;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.AsyncContext;
import com.chinaMoney.util.Constants;
import com.chinaMoney.util.JsonUtil;
/**
* 基于发布与订阅的消息线程推送父类
* 消息发布者(Publisher)
* @author zwm
*
*/
public abstract class PushMesBaseThread extends Thread {
/**
* 获取要发布的消息
* @param acontext
* @param message
* @return
* @throws IOException
* @throws Exception
*/
protected abstract String getPushMes(AsyncContext acontext,Object message) throws IOException,Exception;
/**
* 监听的消息队列列表
*/
protected abstract LinkedBlockingQueue<Object> getMessageQueue();
/**
* 消息访问者
* @return
*/
protected abstract ConcurrentLinkedQueue<AsyncContext> getContextQueue();
/**
* 确定消息访问者是否为消息订阅者
* @return
*/
protected abstract boolean isSubscriber(AsyncContext context);
@Override
public void run() {
boolean flag = true;
while(flag) {
//获取到监听的消息队列列表
LinkedBlockingQueue<Object> messageQueue = getMessageQueue();
if(messageQueue != null && messageQueue.size() > 0) {
try {
//获取异步context
Queue<AsyncContext> asyncContext = getContextQueue();
if(asyncContext != null && asyncContext.size() > 0) {
//消息对象
Object object = null;
//消息取出标志位
boolean hasTaked = false;
for(AsyncContext ac : asyncContext) {
//当前用户为该消息队列的订阅者
if(ac != null && ac.getResponse().getWriter() != null && isSubscriber(ac)) {
if(!hasTaked) {
//取出消息
object = messageQueue.take();
//标识取出
hasTaked = true;
}
//调用子类推送业务方法,获取到推送内容
String content = getPushMes(ac,object);
//获取输出流
PrintWriter out = ac.getResponse().getWriter();
//将推送内容输出至客户端
out.print(JsonUtil.getJson4SrvState(Constants.PUSH_MES_SUCCESS,content));
//移除context
asyncContext.remove(ac);
//通知监听器servlet完成
ac.complete();
}
}
}
} catch (InterruptedException e) {
flag = false;
e.printStackTrace();
} catch (IOException e) {
flag = false;
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
4常量类
package com.chinaMoney.util;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.AsyncContext;
public class Constants {
//异步servlet3.0超时时间
public static final long ASYNCTIMEOUT = 60*1000;
//消息推送状态成功
public static final String PUSH_MES_SUCCESS = "0";
//消息推送状态超时
public static final String PUSH_MES_TIMEOUT = "1";
//消息队列first
public static LinkedBlockingQueue<Object> MESSAGE_QUEUE = new LinkedBlockingQueue<Object>();
//异步上下文(消息访问者,包含订阅者和非订阅者)队列
public static ConcurrentLinkedQueue<AsyncContext> CONTEXT_QUEUE = new ConcurrentLinkedQueue<AsyncContext>();
}
5.
package com.chinaMoney.util;
import java.util.HashMap;
import java.util.Map;
import net.sf.json.JSONObject;
public class JsonUtil {
/**
* JSON串
* @param state
* @return
*/
public static String getJson4SrvState(String state,Object content) {
Map<String,Object> map = new HashMap<String,Object>();
map.put("state", state);
map.put("content", content.toString());
JSONObject json = JSONObject.fromObject(map);
return json.toString();
}
}
编写PushMesBaseListener,PushMesBaseServlet,PushMesBaseThread对应的子类实现对应的消息队列和消息的订阅者
小弟刚学习的 请多指教
Servlet asynchronous --------------------编程问答-------------------- 除
补充:Java , Web 开发