Author: nbelaevski
Date: 2011-01-31 08:36:28 -0500 (Mon, 31 Jan 2011)
New Revision: 21320
Added:
trunk/core/api/src/main/java/org/richfaces/application/push/MessageListener.java
Modified:
trunk/core/api/src/main/java/org/richfaces/application/push/Request.java
trunk/core/api/src/main/java/org/richfaces/application/push/Session.java
trunk/core/api/src/main/java/org/richfaces/application/push/SessionManager.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionQueue.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
trunk/examples/push-demo/pom.xml
trunk/ui/core/ui/src/main/java/org/richfaces/resource/PushResource.java
Log:
https://issues.jboss.org/browse/RF-10330
Added: trunk/core/api/src/main/java/org/richfaces/application/push/MessageListener.java
===================================================================
--- trunk/core/api/src/main/java/org/richfaces/application/push/MessageListener.java
(rev 0)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/MessageListener.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push;
+
+import java.util.EventListener;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public interface MessageListener extends EventListener {
+
+ public void onMessage(Object message) throws MessageException;
+
+}
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/Request.java
===================================================================
--- trunk/core/api/src/main/java/org/richfaces/application/push/Request.java 2011-01-31
12:57:38 UTC (rev 21319)
+++ trunk/core/api/src/main/java/org/richfaces/application/push/Request.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -45,4 +45,6 @@
public boolean isPolling();
+ public MessageListener getMessageListener();
+
}
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/Session.java
===================================================================
--- trunk/core/api/src/main/java/org/richfaces/application/push/Session.java 2011-01-31
12:57:38 UTC (rev 21319)
+++ trunk/core/api/src/main/java/org/richfaces/application/push/Session.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -49,14 +49,8 @@
public void disconnect() throws Exception;
- public void onRequestSuspended();
-
- public void onRequestDisconnected();
+ public void invalidate();
- public void onRequestResumed();
-
public void destroy();
- public Request getRequest();
-
}
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/SessionManager.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/SessionManager.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/SessionManager.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -32,8 +32,6 @@
public Session getPushSession(String id);
- public void removePushSession(Session session);
-
public void requeue(Session session);
public void destroy();
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -44,6 +44,8 @@
import org.richfaces.application.push.Request;
import org.richfaces.application.push.Session;
import org.richfaces.application.push.TopicKey;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
/**
* @author Nick Belaevski
@@ -51,6 +53,8 @@
*/
public abstract class AbstractRequest implements Request {
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
private static final String TOPIC_KEY = "topic";
private static final String DATA_KEY = "data";
@@ -110,8 +114,7 @@
try {
request.flushMessages();
} catch (Throwable e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOGGER.error(e.getMessage(), e);
}
}
@@ -178,7 +181,7 @@
submitted.compareAndSet(true, false);
if (isPolling()) {
- atmosphereResource.resume();
+ resume();
} else if (!messagesQueue.isEmpty()) {
submitToWorker();
}
@@ -194,6 +197,12 @@
}
public void resume() throws IOException {
+ //TODO - review
+ try {
+ getSession().disconnect();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
atmosphereResource.resume();
}
@@ -219,14 +228,18 @@
}
protected void onSuspend() {
+ try {
+ getSession().connect(this);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
}
protected void onResume() {
try {
session.disconnect();
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOGGER.error(e.getMessage(), e);
}
}
@@ -234,8 +247,7 @@
try {
session.disconnect();
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOGGER.error(e.getMessage(), e);
}
}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -26,6 +26,8 @@
import org.richfaces.application.push.Request;
import org.richfaces.application.push.Session;
import org.richfaces.application.push.SessionManager;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
/**
* @author Nick Belaevski
@@ -33,6 +35,8 @@
*/
public abstract class AbstractSession implements Session {
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
private static final int MAX_INACTIVE_INTERVAL = 5 * 60 * 1000;
private final String id;
@@ -43,8 +47,11 @@
private volatile Request request;
+ private volatile boolean active = true;
+
public AbstractSession(String id, SessionManager sessionManager) {
super();
+
this.id = id;
this.sessionManager = sessionManager;
@@ -55,33 +62,61 @@
lastAccessedTime = System.currentTimeMillis();
}
- private void requeue() {
- resetLastAccessedTimeToCurrent();
+ public synchronized void connect(Request request) throws Exception {
+ releaseRequest();
+
+ if (active) {
+ processConnect(request);
+ } else {
+ request.resume();
+ }
+ }
+
+ protected Request getRequest() {
+ return request;
+ }
+
+ protected void processConnect(Request request) throws Exception {
+ this.request = request;
+
sessionManager.requeue(this);
}
- public void connect(Request request) throws Exception {
- if (this.request != null) {
- this.request.resume();
- }
+ private void releaseRequest() {
+ Request localRequestCopy = this.request;
- this.request = request;
+ if (localRequestCopy != null) {
+ resetLastAccessedTimeToCurrent();
+ this.request = null;
- requeue();
-
- request.suspend();
+ try {
+ localRequestCopy.resume();
+ } catch (IOException e) {
+ // TODO: handle exception
+ e.printStackTrace();
+ }
+ }
}
- public void disconnect() throws Exception {
- this.request = null;
+ public synchronized void disconnect() throws Exception {
+ processDisconnect();
}
+ protected void processDisconnect() throws Exception {
+ releaseRequest();
+ }
+
public long getLastAccessedTime() {
- if (request != null) {
+ if (!active) {
+ return -1;
+ }
+
+ if (this.request != null) {
+ //being accessed right now
return System.currentTimeMillis();
+ } else {
+ return lastAccessedTime;
}
-
- return lastAccessedTime;
}
public int getMaxInactiveInterval() {
@@ -92,24 +127,19 @@
return id;
}
- public Request getRequest() {
- return request;
+ public void invalidate() {
+ active = false;
+
+ sessionManager.requeue(this);
}
+
+ public synchronized void destroy() {
+ active = false;
- public void destroy() {
- if (request != null) {
- try {
- request.resume();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- request = null;
- //TODO - clean up request
+ try {
+ disconnect();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
}
-
- sessionManager.removePushSession(this);
- // TODO Auto-generated method stub
-
}
}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -67,14 +67,6 @@
return sessionMap.get(id);
}
- public void removePushSession(Session session) {
- // XXX - possible null pointer exception
- if (session != null) {
- sessionMap.remove(session.getId());
- sessionQueue.remove(session);
- }
- }
-
public void destroy() {
//TODO notify all session
sessionQueue.clear();
@@ -82,7 +74,6 @@
while (!sessionMap.isEmpty()) {
for (Iterator<Session> sessionsItr = sessionMap.values().iterator();
sessionsItr.hasNext(); ) {
Session session = sessionsItr.next();
- sessionsItr.remove();
session.destroy();
}
@@ -97,10 +88,10 @@
throw new IllegalStateException();
}
- requeue(session);
+ sessionQueue.requeue(session, true);
}
public void requeue(Session session) {
- sessionQueue.requeue(session);
+ sessionQueue.requeue(session, false);
}
}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionQueue.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionQueue.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionQueue.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -42,10 +42,10 @@
private static final Comparator<? super Session> SESSIONS_COMPARATOR = new
Comparator<Session>() {
public int compare(Session o1, Session o2) {
- Long delay1 = Long.valueOf(o1.getLastAccessedTime() +
o1.getMaxInactiveInterval());
- Long delay2 = Long.valueOf(o2.getLastAccessedTime() +
o2.getMaxInactiveInterval());
+ Long expTime1 = getExpirationTime(o1);
+ Long expTime2 = getExpirationTime(o2);
- return delay1.compareTo(delay2);
+ return expTime1.compareTo(expTime2);
}
};
@@ -56,9 +56,22 @@
private final Condition available = lock.newCondition();
+ private static long getExpirationTime(Session session) {
+ long lastAccessedTime = session.getLastAccessedTime();
+ if (lastAccessedTime < 0) {
+ return Long.MIN_VALUE;
+ }
+
+ return lastAccessedTime + session.getMaxInactiveInterval();
+ }
+
private long getDelay(Session session, TimeUnit unit) {
- return unit.convert(session.getLastAccessedTime() +
session.getMaxInactiveInterval() - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
+ long expirationTime = getExpirationTime(session);
+ if (expirationTime < 0) {
+ return Long.MIN_VALUE;
+ }
+
+ return unit.convert(expirationTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
public Session take() throws InterruptedException {
@@ -99,17 +112,20 @@
}
}
- public void requeue(Session session) {
+ public void requeue(Session session, boolean addIfNotExists) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
- queue.remove(session);
+ boolean exists = queue.remove(session);
- Session first = queue.peek();
- queue.offer(session);
- if (first == null || SESSIONS_COMPARATOR.compare(session, first) < 0) {
- available.signalAll();
+ if (exists || addIfNotExists) {
+ Session first = queue.peek();
+ queue.offer(session);
+ if (first == null || SESSIONS_COMPARATOR.compare(session, first) < 0)
{
+ available.signalAll();
+ }
}
+
} finally {
lock.unlock();
}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -39,6 +39,8 @@
import org.richfaces.application.push.Session;
import org.richfaces.application.push.TopicKey;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@@ -55,6 +57,8 @@
private static final Joiner OR_JOINER = Joiner.on(" OR ").skipNulls();
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
private static final Function<TopicKey, String> TOPIC_KEY_TO_MESSAGE_SELECTOR =
new Function<TopicKey, String>() {
public String apply(TopicKey from) {
if (Strings.isNullOrEmpty(from.getSubtopicName())) {
@@ -159,7 +163,7 @@
public TopicSubscriber createTopicSubscriber(Session pushSession, javax.jms.Session
jmsSession,
Entry<TopicKey, Collection<TopicKey>> entry)
throws JMSException, NamingException {
-
+
TopicKey rootTopicKey = entry.getKey();
String subscriptionClientId = getSubscriptionClientId(pushSession,
rootTopicKey);
@@ -169,4 +173,19 @@
return jmsSession.createDurableSubscriber(jmsTopic, subscriptionClientId,
createMessageSelector(entry.getValue()), true);
}
+ /**
+ * @param session
+ * @param jmsSession
+ * @param rootTopicKeys
+ */
+ public void removeTopicSubscriber(Session session, javax.jms.Session jmsSession,
Collection<TopicKey> rootTopicKeys) {
+ for (TopicKey rootTopicKey : rootTopicKeys) {
+ try {
+ jmsSession.unsubscribe(getSubscriptionClientId(session, rootTopicKey));
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+
}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -59,6 +59,6 @@
@Override
protected Request createRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource,
Session session) {
- return new RequestImpl(resource, session, getWorker(), messagingContext,
topicsContext);
+ return new RequestImpl(resource, session, getWorker(), topicsContext);
}
}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -21,14 +21,10 @@
*/
package org.richfaces.application.push.impl.jms;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
@@ -37,6 +33,7 @@
import org.atmosphere.cpr.AtmosphereResource;
import org.richfaces.application.push.MessageDataSerializer;
+import org.richfaces.application.push.MessageException;
import org.richfaces.application.push.Session;
import org.richfaces.application.push.TopicKey;
import org.richfaces.application.push.TopicsContext;
@@ -48,79 +45,20 @@
* @author Nick Belaevski
*
*/
-public class RequestImpl extends AbstractRequest implements MessageListener {
+public class RequestImpl extends AbstractRequest implements
org.richfaces.application.push.MessageListener {
private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
- private javax.jms.Session jmsSession;
-
- private MessagingContext messagingContext;
-
private TopicsContext topicsContext;
public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource, Session session,
- ExecutorService executorService, MessagingContext messagingContext, TopicsContext
topicsContext) {
+ ExecutorService executorService, TopicsContext topicsContext) {
super(atmosphereResource, session, executorService);
- this.messagingContext = messagingContext;
this.topicsContext = topicsContext;
}
- private void closeSession() {
- if (jmsSession != null) {
- try {
- jmsSession.close();
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
-
- jmsSession = null;
- }
- }
-
- @Override
- public void onSuspend() {
- super.onSuspend();
-
- try {
- jmsSession = messagingContext.createSession();
-
- //TODO - remove this case
- SessionImpl sessionImpl = (SessionImpl) getSession();
-
- for (Entry<TopicKey, Collection<TopicKey>> entry:
sessionImpl.getSuccessfulSubscriptions().asMap().entrySet()) {
- messagingContext.createTopicSubscriber(sessionImpl, jmsSession,
entry).setMessageListener(this);
- }
-
- } catch (Exception e) {
- // TODO: handle exception
- e.printStackTrace();
- }
-
- }
-
- @Override
- public void flushMessages() throws IOException {
- if (isPolling()) {
- closeSession();
- }
-
- super.flushMessages();
- }
-
- @Override
- public void onDisconnect() {
- closeSession();
- super.onDisconnect();
- }
-
- @Override
- public void onResume() {
- closeSession();
- super.onResume();
- }
-
private String serializeMessage(org.richfaces.application.push.Topic topic, Message
message) {
String serializedMessageData = null;
Object messageData = null;
@@ -157,12 +95,10 @@
return null;
}
- /* (non-Javadoc)
- * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
- */
- public void onMessage(Message message) {
+ public void onMessage(Object message) throws MessageException {
+ Message jmsMessage = (Message) message;
try {
- String topicName = ((Topic) message.getJMSDestination()).getTopicName();
+ String topicName = ((Topic) jmsMessage.getJMSDestination()).getTopicName();
org.richfaces.application.push.Topic topic = topicsContext.getTopic(new
TopicKey(topicName));
if (topic == null) {
@@ -170,17 +106,20 @@
return;
}
- String serializedMessageData = serializeMessage(topic, message);
+ String serializedMessageData = serializeMessage(topic, jmsMessage);
if (serializedMessageData == null) {
//TODO log
return;
}
- postMessage(new TopicKey(topicName,
message.getStringProperty(MessagingContext.SUBTOPIC_ATTRIBUTE_NAME)),
serializedMessageData);
-
- message.acknowledge();
+ postMessage(new TopicKey(topicName,
jmsMessage.getStringProperty(MessagingContext.SUBTOPIC_ATTRIBUTE_NAME)),
serializedMessageData);
} catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
+ throw new MessageException(e.getMessage(), e);
}
- }
+ }
+
+
+ public org.richfaces.application.push.MessageListener getMessageListener() {
+ return this;
+ }
}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2011-01-31
12:57:38 UTC (rev 21319)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -28,9 +28,14 @@
import java.util.Map.Entry;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
import javax.naming.NamingException;
import org.richfaces.application.push.EventAbortedException;
+import org.richfaces.application.push.Request;
import org.richfaces.application.push.SessionManager;
import org.richfaces.application.push.SessionPreSubscriptionEvent;
import org.richfaces.application.push.Topic;
@@ -52,6 +57,24 @@
*/
public class SessionImpl extends AbstractSession {
+ private static final class JMSToPushListenerAdaptor implements MessageListener {
+
+ private final org.richfaces.application.push.MessageListener messageListener;
+
+ private JMSToPushListenerAdaptor(org.richfaces.application.push.MessageListener
messageListener) {
+ this.messageListener = messageListener;
+ }
+
+ public void onMessage(Message message) {
+ try {
+ messageListener.onMessage(message);
+ message.acknowledge();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+
private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
private final MessagingContext messagingContext;
@@ -62,6 +85,10 @@
private final Map<TopicKey, String> failedSubscriptions = Maps.newHashMap();
+ private Session jmsSession;
+
+ private Collection<TopicSubscriber> subscribers =
Lists.newArrayListWithCapacity(1);
+
public SessionImpl(String id, SessionManager sessionManager, MessagingContext
messagingContext, TopicsContext topicsContext) {
super(id, sessionManager);
@@ -69,31 +96,6 @@
this.topicsContext = topicsContext;
}
- public void onRequestSuspended() {
- // TODO Auto-generated method stub
- }
-
- public void onRequestDisconnected() {
- // TODO Auto-generated method stub
- // TODO Auto-generated method stub
- try {
- disconnect();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public void onRequestResumed() {
- // TODO Auto-generated method stub
- try {
- disconnect();
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
public Map<TopicKey, String> getFailedSubscriptions() {
return failedSubscriptions;
}
@@ -110,9 +112,17 @@
jmsSession = messagingContext.createSession();
for (Entry<TopicKey, Collection<TopicKey>> entry:
rootTopicsMap.asMap().entrySet()) {
- messagingContext.createTopicSubscriber(this, jmsSession, entry);
+ TopicSubscriber subscriber = null;
- successfulSubscriptions.putAll(entry.getKey(), entry.getValue());
+ try {
+ subscriber = messagingContext.createTopicSubscriber(this, jmsSession,
entry);
+ successfulSubscriptions.putAll(entry.getKey(), entry.getValue());
+ } finally {
+ if (subscriber != null) {
+ subscriber.close();
+ }
+ }
+
}
} catch (JMSException e) {
LOGGER.error(e.getMessage(), e);
@@ -156,6 +166,53 @@
}
}
+ @Override
+ protected void processConnect(Request request) throws Exception {
+ super.processConnect(request);
+
+ jmsSession = messagingContext.createSession();
+
+ MessageListener jmsListener = new
JMSToPushListenerAdaptor(request.getMessageListener());
+
+ for (Entry<TopicKey, Collection<TopicKey>> entry:
getSuccessfulSubscriptions().asMap().entrySet()) {
+ TopicSubscriber subscriber = messagingContext.createTopicSubscriber(this,
jmsSession, entry);
+ subscribers.add(subscriber);
+ subscriber.setMessageListener(jmsListener);
+ }
+ }
+
+ private void clearSubscribers() {
+ if (jmsSession != null) {
+ for (TopicSubscriber subscriber : subscribers) {
+ try {
+ subscriber.close();
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ subscribers.clear();
+
+ try {
+ jmsSession.close();
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ jmsSession = null;
+ }
+ }
+
+ @Override
+ protected void processDisconnect() throws Exception {
+ try {
+ clearSubscribers();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ super.processDisconnect();
+ }
+
public void subscribe(String[] topics) {
Iterable<TopicKey> topicKeys =
Iterables.transform(Lists.newLinkedList(Arrays.asList(topics)), TopicKey.factory());
@@ -163,4 +220,27 @@
createSubscriptions(topicKeys);
}
+ @Override
+ public synchronized void destroy() {
+ super.destroy();
+
+ //we need to create new JMS session, as this method can be called from another
thread - see javax.jms.Session JavaDoc
+ //for multi-threading limitations
+ Session localJMSSession = null;
+ try {
+ localJMSSession = messagingContext.createSession();
+ messagingContext.removeTopicSubscriber(this, localJMSSession,
successfulSubscriptions.keySet());
+
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ } finally {
+ if (localJMSSession != null) {
+ try {
+ localJMSSession.close();
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
}
Modified: trunk/examples/push-demo/pom.xml
===================================================================
--- trunk/examples/push-demo/pom.xml 2011-01-31 12:57:38 UTC (rev 21319)
+++ trunk/examples/push-demo/pom.xml 2011-01-31 13:36:28 UTC (rev 21320)
@@ -180,7 +180,7 @@
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-websocket</artifactId>
- <version>8.0.0.M1</version>
+ <version>8.0.0.M2</version>
</dependency>
</dependencies>
<build>
@@ -188,7 +188,7 @@
<plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-maven-plugin</artifactId>
- <version>8.0.0.M1</version>
+ <version>8.0.0.M2</version>
</plugin>
</plugins>
</build>
Modified: trunk/ui/core/ui/src/main/java/org/richfaces/resource/PushResource.java
===================================================================
--- trunk/ui/core/ui/src/main/java/org/richfaces/resource/PushResource.java 2011-01-31
12:57:38 UTC (rev 21319)
+++ trunk/ui/core/ui/src/main/java/org/richfaces/resource/PushResource.java 2011-01-31
13:36:28 UTC (rev 21320)
@@ -75,7 +75,7 @@
if (forgetPushSessionId != null) {
Session oldSession =
pushContext.getSessionManager().getPushSession(forgetPushSessionId);
if (oldSession != null) {
- oldSession.destroy();
+ oldSession.invalidate();
}
}