当前位置:编程学习 > JAVA >>

基于servlet3.0异步的消息推送DEMO

1.利用servlet3.0实现服务器推送技术
2.基于http协议
3.tomcat7.*,weblogic12c
请多指教小弟
1.监听器
package com.chinaMoney.mspush.listener;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;

import com.chinaMoney.util.Constants;
import com.chinaMoney.util.JsonUtil;


public class PushMesBaseListener implements AsyncListener {

//异步上下文
private AsyncContext acontext;
private ConcurrentLinkedQueue<AsyncContext> contextQueue;

public PushMesBaseListener(AsyncContext acontext,
ConcurrentLinkedQueue<AsyncContext> contextQueue) {
this.acontext = acontext;
this.contextQueue = contextQueue;
}

public AsyncContext getAcontext() {
return acontext;
}

public void setAcontext(AsyncContext acontext) {
this.acontext = acontext;
}

public ConcurrentLinkedQueue<AsyncContext> getContextQueue() {
return contextQueue;
}

public void setContextQueue(ConcurrentLinkedQueue<AsyncContext> contextQueue) {
this.contextQueue = contextQueue;
}


@Override
public void onComplete(AsyncEvent event) throws IOException {
//清空缓冲区
AsyncContext acontext = event.getAsyncContext(); 
if(acontext != null) {
ServletResponse response =  acontext.getResponse();
if(response != null) {
PrintWriter out = response.getWriter();
if(out != null) {
out.flush();

}
}
}

@Override
public void onError(AsyncEvent event) throws IOException {
if(this.contextQueue != null && this.contextQueue.size() > 0) {
this.contextQueue.remove(event.getAsyncContext());
}

}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
}

@Override
public void onTimeout(AsyncEvent event) throws IOException {
AsyncContext acontext = event.getAsyncContext(); 
if(acontext != null) {
ServletResponse response =  acontext.getResponse();
if(response != null) {
PrintWriter out = response.getWriter();
if(out != null) {
out.println(JsonUtil.getJson4SrvState(Constants.PUSH_MES_TIMEOUT,""));
out.close();

}
}
//移除异步上下文
if(this.contextQueue != null && this.contextQueue.size() > 0) {
this.contextQueue.remove(event.getAsyncContext());
}
}
}

2.异步servlet,子类实现注入对应的context队列
package com.chinaMoney.mspush.servlet;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.chinaMoney.mspush.listener.PushMesBaseListener;
import com.chinaMoney.util.Constants;

/**
 * 
 * @author zwm
 *
 */
public abstract class PushMesBaseServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

/**
 * 设置要监听的异步上下文队列
 * @return 
 */
protected abstract ConcurrentLinkedQueue<AsyncContext> setContextQueue();

private ConcurrentLinkedQueue<AsyncContext> getContextQueue() {
return setContextQueue();
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
resp.setContentType("text/html;charset=UTF-8");
resp.setHeader("Cache-Control", "private");
resp.setHeader("Pragma", "no-cache");
req.setCharacterEncoding("UTF-8");

AsyncContext asyncContext =  req.startAsync();
//异步连接超时时间
asyncContext.setTimeout(Constants.ASYNCTIMEOUT);
//注册监听器
if(getContextQueue() != null) {
asyncContext.addListener(new PushMesBaseListener(asyncContext,getContextQueue()));
getContextQueue().add(asyncContext);
}
}

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
doGet(req,resp);
}
}
3.推送父类
package com.chinaMoney.mspush.thread;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;

import com.chinaMoney.util.Constants;
import com.chinaMoney.util.JsonUtil;


/**
 * 基于发布与订阅的消息线程推送父类
 * 消息发布者(Publisher)
 * @author zwm
 *
 */
public abstract class PushMesBaseThread extends Thread {

/**
 * 获取要发布的消息
 * @param acontext
 * @param message
 * @return
 * @throws IOException
 * @throws Exception
 */
protected abstract String getPushMes(AsyncContext acontext,Object message) throws IOException,Exception;

/**
 * 监听的消息队列列表
 */
protected abstract LinkedBlockingQueue<Object> getMessageQueue();

/**
 * 消息访问者
 * @return
 */
protected abstract ConcurrentLinkedQueue<AsyncContext> getContextQueue();

/**
 * 确定消息访问者是否为消息订阅者
 * @return
 */
protected abstract boolean isSubscriber(AsyncContext context);

@Override
public void run() {
boolean flag = true;
while(flag) {
//获取到监听的消息队列列表
LinkedBlockingQueue<Object> messageQueue = getMessageQueue();
if(messageQueue != null && messageQueue.size() > 0) {
try {
//获取异步context
Queue<AsyncContext> asyncContext = getContextQueue();
if(asyncContext != null && asyncContext.size() > 0) {
//消息对象
Object object = null;
//消息取出标志位
boolean hasTaked = false;
for(AsyncContext ac : asyncContext) {
//当前用户为该消息队列的订阅者
if(ac != null && ac.getResponse().getWriter() != null && isSubscriber(ac)) {
if(!hasTaked) {
//取出消息
object = messageQueue.take();
//标识取出
hasTaked = true;
}
//调用子类推送业务方法,获取到推送内容
String content = getPushMes(ac,object);
//获取输出流
PrintWriter out = ac.getResponse().getWriter();
//将推送内容输出至客户端
out.print(JsonUtil.getJson4SrvState(Constants.PUSH_MES_SUCCESS,content));
//移除context
asyncContext.remove(ac);
//通知监听器servlet完成
ac.complete();
}
}
}
} catch (InterruptedException e) {
flag = false;
e.printStackTrace();
} catch (IOException e) {
flag = false;
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
4常量类
package com.chinaMoney.util;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;

public class Constants {
//异步servlet3.0超时时间
public static final long ASYNCTIMEOUT = 60*1000;
//消息推送状态成功
public static final String PUSH_MES_SUCCESS = "0";
//消息推送状态超时
public static final String PUSH_MES_TIMEOUT = "1";
//消息队列first
public static LinkedBlockingQueue<Object> MESSAGE_QUEUE = new LinkedBlockingQueue<Object>();
//异步上下文(消息访问者,包含订阅者和非订阅者)队列
public static ConcurrentLinkedQueue<AsyncContext> CONTEXT_QUEUE = new ConcurrentLinkedQueue<AsyncContext>();

}
5.
package com.chinaMoney.util;

import java.util.HashMap;
import java.util.Map;

import net.sf.json.JSONObject;

public class JsonUtil {
/**
 * JSON串
 * @param state
 * @return
 */
public static String getJson4SrvState(String state,Object content) {
Map<String,Object> map = new HashMap<String,Object>();
map.put("state", state);
map.put("content", content.toString());
JSONObject json = JSONObject.fromObject(map);
return json.toString();
}
}

编写PushMesBaseListener,PushMesBaseServlet,PushMesBaseThread对应的子类实现对应的消息队列和消息的订阅者
小弟刚学习的 请多指教
Servlet asynchronous --------------------编程问答-------------------- 除
补充:Java ,  Web 开发
CopyRight © 2012 站长网 编程知识问答 www.zzzyk.com All Rights Reserved
部份技术文章来自网络,