Author: remy.maucherat(a)jboss.com
Date: 2008-11-20 13:37:18 -0500 (Thu, 20 Nov 2008)
New Revision: 860
Modified:
trunk/java/org/apache/tomcat/bayeux/ClientImpl.java
trunk/java/org/apache/tomcat/bayeux/RequestBase.java
Log:
- Simplify the client code.
- I don't see how multiple events can be associated with a single client. Maybe I
missed something and will revert.
- Publishing messages to a client should be synced (multiple channels could publish to a
client), but I also don't
see any other syncs being needed (given the thread model used by events after all the
fixes).
Modified: trunk/java/org/apache/tomcat/bayeux/ClientImpl.java
===================================================================
--- trunk/java/org/apache/tomcat/bayeux/ClientImpl.java 2008-11-18 16:30:54 UTC (rev 859)
+++ trunk/java/org/apache/tomcat/bayeux/ClientImpl.java 2008-11-20 18:37:18 UTC (rev 860)
@@ -16,6 +16,7 @@
*/
package org.apache.tomcat.bayeux;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -49,9 +50,9 @@
protected LinkedList<Message> messages = null;
/**
- *
+ * Currently associated event.
*/
- protected Queue<HttpEvent> events = new LinkedList<HttpEvent>();
+ protected HttpEvent event;
/**
* Unique id representing this client
@@ -91,21 +92,20 @@
if (!local) messages = new LinkedList<Message>();
}
- protected ClientImpl(String id, HttpEvent event) {
+/* protected ClientImpl(String id, HttpEvent event) {
this(id,false);
- events = new ConcurrentLinkedQueue<HttpEvent>();
addCometEvent(event);
- }
+ }*/
- public synchronized void deliver(Message message) {
+ public void deliver(Message message) {
deliverInternal(null,new MessageImpl[] {(MessageImpl)message});
}
- public synchronized void deliver(Message[] message) {
+ public void deliver(Message[] message) {
deliverInternal(null,message);
}
- protected synchronized void deliverInternal(ChannelImpl channel, MessageImpl message)
{
+ protected void deliverInternal(ChannelImpl channel, MessageImpl message) {
deliverInternal(channel,new MessageImpl[] {message});
}
@@ -130,36 +130,32 @@
//we are not implementing forever responses, if the client is connected
//then we will fire off the message
//first we check to see if we have any existing connections we can piggy
back on
- HttpEvent event = events.poll();
boolean delivered = false;
- //TODO TODO - check on thread safety, for writing and for getting last
request.
if (event!=null) {
- synchronized (event) {
- RequestBase rq =
(RequestBase)event.getHttpServletRequest().getAttribute(RequestBase.LAST_REQ_ATTR);
- if (rq!=null) {
- Map map = new HashMap();
- try {
-
map.put(Bayeux.CHANNEL_FIELD,message.getChannel().getId());
- map.put(Bayeux.DATA_FIELD,message);
- JSONObject json = new JSONObject(map);
- if (log.isTraceEnabled()) {
- log.trace("Message instantly delivered to remote
client["+this+"] message:"+json);
- }
- rq.addToDeliveryQueue(this, json);
- //deliver the batch
- if (i==(msgs.length-1)) {
- rq.deliver(event, this);
- event.close(); //todo, figure out a better way, this
means only one message gets delivered
- removeCometEvent(event); //and delivered instantly
- }
- delivered = true;
- } catch (Exception e) {
- // TODO: fix
- log.warn("Exception", e);
+ RequestBase rq =
(RequestBase)event.getHttpServletRequest().getAttribute(RequestBase.LAST_REQ_ATTR);
+ if (rq!=null) {
+ Map map = new HashMap();
+ try {
+ map.put(Bayeux.CHANNEL_FIELD,message.getChannel().getId());
+ map.put(Bayeux.DATA_FIELD,message);
+ JSONObject json = new JSONObject(map);
+ if (log.isTraceEnabled()) {
+ log.trace("Message instantly delivered to remote
client["+this+"] message:"+json);
}
+ rq.addToDeliveryQueue(this, json);
+ //deliver the batch
+ if (i==(msgs.length-1)) {
+ rq.deliver(event, this);
+ event.close(); //todo, figure out a better way, this
means only one message gets delivered
+ removeCometEvent(event); //and delivered instantly
+ }
+ delivered = true;
+ } catch (Exception e) {
+ // TODO: fix
+ log.warn("Exception", e);
}
}
- }
+ }
if (!delivered) {
if (log.isTraceEnabled()) {
log.trace("Message added to queue for remote
client["+this+"] message:"+message);
@@ -244,23 +240,25 @@
return nrofsubscriptions.get()>0;
}
- protected synchronized boolean addCometEvent(HttpEvent event) {
- boolean result = false;
- if (!events.contains(event)) {
- events.add(event);
- result = true;
+ protected void addCometEvent(HttpEvent event) {
+ if (this.event != null) {
+ try {
+ this.event.close();
+ } catch (IOException e) {
+ // Nothing
+ }
}
+ this.event = event;
event.getHttpServletRequest().setAttribute(COMET_EVENT_ATTR,this);
- return result;
}
- protected synchronized boolean removeCometEvent(HttpEvent event) {
- boolean result = events.remove(event);
+ protected void removeCometEvent(HttpEvent event) {
+ if (this.event != null && this.event == event) {
+ this.event = null;
+ }
event.getHttpServletRequest().removeAttribute(COMET_EVENT_ATTR);
- return result;
}
-
protected void subscribed(ChannelImpl ch) {
nrofsubscriptions.addAndGet(1);
}
Modified: trunk/java/org/apache/tomcat/bayeux/RequestBase.java
===================================================================
--- trunk/java/org/apache/tomcat/bayeux/RequestBase.java 2008-11-18 16:30:54 UTC (rev
859)
+++ trunk/java/org/apache/tomcat/bayeux/RequestBase.java 2008-11-20 18:37:18 UTC (rev
860)
@@ -194,7 +194,6 @@
out.print(");");
}
out.flush();
- event.getHttpServletResponse().flushBuffer();
}