Author: nbelaevski
Date: 2011-01-26 19:01:47 -0500 (Wed, 26 Jan 2011)
New Revision: 21252
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
Log:
https://issues.jboss.org/browse/RF-10330
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2011-01-26
23:27:29 UTC (rev 21251)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2011-01-27
00:01:47 UTC (rev 21252)
@@ -39,6 +39,8 @@
import org.richfaces.application.push.Session;
import org.richfaces.application.push.TopicKey;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@@ -55,6 +57,8 @@
private static final Joiner OR_JOINER = Joiner.on(" OR ").skipNulls();
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
private static final Function<TopicKey, String> TOPIC_KEY_TO_MESSAGE_SELECTOR =
new Function<TopicKey, String>() {
public String apply(TopicKey from) {
if (Strings.isNullOrEmpty(from.getSubtopicName())) {
@@ -169,4 +173,19 @@
return jmsSession.createDurableSubscriber(jmsTopic, subscriptionClientId,
createMessageSelector(entry.getValue()), true);
}
+ /**
+ * @param session
+ * @param jmsSession
+ * @param rootTopicKeys
+ */
+ public void removeTopicSubscriber(Session session, javax.jms.Session jmsSession,
Collection<TopicKey> rootTopicKeys) {
+ for (TopicKey rootTopicKey : rootTopicKeys) {
+ try {
+ jmsSession.unsubscribe(getSubscriptionClientId(session, rootTopicKey));
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+
}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java 2011-01-26
23:27:29 UTC (rev 21251)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java 2011-01-27
00:01:47 UTC (rev 21252)
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
@@ -32,6 +33,7 @@
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -44,6 +46,8 @@
import org.richfaces.log.Logger;
import org.richfaces.log.RichfacesLogger;
+import com.google.common.collect.Lists;
+
/**
* @author Nick Belaevski
*
@@ -58,6 +62,8 @@
private TopicsContext topicsContext;
+ private List<TopicSubscriber> subscribers = Lists.newArrayListWithCapacity(1);
+
public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource, Session session,
ExecutorService executorService, MessagingContext messagingContext, TopicsContext
topicsContext) {
@@ -69,6 +75,14 @@
private void closeSession() {
if (jmsSession != null) {
+ for (TopicSubscriber subscriber : subscribers) {
+ try {
+ subscriber.close();
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
try {
jmsSession.close();
} catch (JMSException e) {
@@ -90,7 +104,9 @@
SessionImpl sessionImpl = (SessionImpl) getSession();
for (Entry<TopicKey, Collection<TopicKey>> entry:
sessionImpl.getSuccessfulSubscriptions().asMap().entrySet()) {
- messagingContext.createTopicSubscriber(sessionImpl, jmsSession,
entry).setMessageListener(this);
+ TopicSubscriber subscriber =
messagingContext.createTopicSubscriber(sessionImpl, jmsSession, entry);
+ subscribers.add(subscriber);
+ subscriber.setMessageListener(this);
}
} catch (Exception e) {
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2011-01-26
23:27:29 UTC (rev 21251)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2011-01-27
00:01:47 UTC (rev 21252)
@@ -28,6 +28,8 @@
import java.util.Map.Entry;
import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
import javax.naming.NamingException;
import org.richfaces.application.push.EventAbortedException;
@@ -110,9 +112,17 @@
jmsSession = messagingContext.createSession();
for (Entry<TopicKey, Collection<TopicKey>> entry:
rootTopicsMap.asMap().entrySet()) {
- messagingContext.createTopicSubscriber(this, jmsSession, entry);
+ TopicSubscriber subscriber = null;
- successfulSubscriptions.putAll(entry.getKey(), entry.getValue());
+ try {
+ subscriber = messagingContext.createTopicSubscriber(this, jmsSession,
entry);
+ successfulSubscriptions.putAll(entry.getKey(), entry.getValue());
+ } finally {
+ if (subscriber != null) {
+ subscriber.close();
+ }
+ }
+
}
} catch (JMSException e) {
LOGGER.error(e.getMessage(), e);
@@ -163,4 +173,24 @@
createSubscriptions(topicKeys);
}
+ @Override
+ public void destroy() {
+ super.destroy();
+ Session jmsSession = null;
+ try {
+ jmsSession = messagingContext.createSession();
+ messagingContext.removeTopicSubscriber(this, jmsSession,
successfulSubscriptions.keySet());
+
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ } finally {
+ if (jmsSession != null) {
+ try {
+ jmsSession.close();
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
}