Author: nbelaevski
Date: 2010-10-26 20:57:52 -0400 (Tue, 26 Oct 2010)
New Revision: 19676
Added:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionFailureException.java
branches/RF-7817/push-redesign-app/src/main/java/demo/Channel.java
branches/RF-7817/push-redesign-app/src/main/java/demo/ChannelsBean.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
Removed:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/RequestLifecycleListener.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionListener.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionAbortedException.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/PushContext.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Request.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Session.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionFactory.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionManager.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionPreSubscriptionEvent.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionSubscriptionEvent.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionTopicEvent.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionUnsubscriptionEvent.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Topic.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/TopicKey.java
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/TopicsContext.java
branches/RF-7817/core/impl/checkstyle-suppressions.xml
branches/RF-7817/core/impl/src/main/resources/META-INF/resources/richfaces-event.js
branches/RF-7817/push-redesign-app/pom.xml
branches/RF-7817/push-redesign-app/src/main/java/demo/ChatBean.java
branches/RF-7817/push-redesign-app/src/main/java/demo/TopicsInitializer.java
branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/web.xml
branches/RF-7817/push-redesign-app/src/main/webapp/chat.xhtml
branches/RF-7817/push-redesign/checkstyle-suppressions.xml
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractTopic.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushResource.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushContextImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/component/AbstractPush.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/renderkit/PushRendererBase.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/webapp/PushFilter.java
branches/RF-7817/push-redesign/src/main/resources/META-INF/resources/org.richfaces/push.js
branches/RF-7817/push-redesign/src/main/templates/push.template.xml
branches/RF-7817/ui/common/ui/checkstyle-suppressions.xml
branches/RF-7817/ui/output/ui/checkstyle-suppressions.xml
Log:
https://jira.jboss.org/browse/RF-7817
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/PushContext.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/PushContext.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/PushContext.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -35,6 +35,8 @@
public SessionFactory getSessionFactory();
+ public SessionManager getSessionManager();
+
public void init(FacesContext facesContext);
public void destroy();
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Request.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Request.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Request.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -34,21 +34,15 @@
public void flushMessages() throws IOException;
- public void postMessage(TopicKey topicKey, String serializedMessage);
-
//TODO suspend with timeout?
public void suspend() throws IOException;
public void resume() throws IOException;
- public void addListener(RequestLifecycleListener listener);
-
- public void removeListener(RequestLifecycleListener listener);
-
+ public Session getSession();
+
public boolean isSuspended();
public boolean isPolling();
- public Session getSession();
-
}
Deleted:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/RequestLifecycleListener.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/RequestLifecycleListener.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/RequestLifecycleListener.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -1,39 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push;
-
-
-/**
- * @author Nick Belaevski
- *
- */
-public interface RequestLifecycleListener {
-
- public void onFlush(Request request);
-
- public void onSuspend(Request request);
-
- public void onDisconnect(Request request);
-
- public void onResume(Request request);
-
-}
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Session.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Session.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Session.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -21,8 +21,12 @@
*/
package org.richfaces.application.push;
+import java.util.Map;
+import com.google.common.collect.Multimap;
+
+
/**
* @author Nick Belaevski
*
@@ -35,20 +39,24 @@
public String getId();
- public void subscribe(TopicKey topic) throws Exception,
SubscriptionAbortedException;
+ public Multimap<TopicKey, TopicKey> getSuccessfulSubscriptions();
+ public Map<TopicKey, String> getFailedSubscriptions();
+
+ public void subscribe(String[] topics);
+
public void connect(Request request) throws Exception;
- public void disconnect(Request request) throws Exception;
+ public void disconnect() throws Exception;
+ public void onRequestSuspended();
+
+ public void onRequestDisconnected();
+
+ public void onRequestResumed();
+
public void destroy();
public Request getRequest();
- public void addSessionListener(SessionListener sessionListener);
-
- public SessionListener[] getSessionListeners();
-
- public void removeSessionListener(SessionListener sessionListener);
-
}
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionFactory.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionFactory.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionFactory.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -27,6 +27,6 @@
*/
public interface SessionFactory {
- public Session createSession(String key);
+ public Session createSession(String pushSessionId);
}
Deleted:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionListener.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionListener.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionListener.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -1,36 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push;
-
-/**
- * @author Nick Belaevski
- *
- */
-public interface SessionListener {
-
- public void onRequestConnected(Session session);
-
- public void onRequestDisconnected(Session session);
-
- public void onSessionDestroyed(Session session);
-
-}
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionManager.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionManager.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionManager.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -32,7 +32,9 @@
public Session getPushSession(String id);
- public void removePushSession(String id);
+ public void removePushSession(Session session);
+
+ public void requeue(Session session);
public void destroy();
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionPreSubscriptionEvent.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionPreSubscriptionEvent.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionPreSubscriptionEvent.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -30,8 +30,8 @@
private static final long serialVersionUID = 2741390800212036457L;
- public SessionPreSubscriptionEvent(Topic topic, Session session) {
- super(topic, session);
+ public SessionPreSubscriptionEvent(Topic topic, TopicKey topicKey, Session session)
{
+ super(topic, topicKey, session);
}
@Override
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionSubscriptionEvent.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionSubscriptionEvent.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionSubscriptionEvent.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -30,8 +30,8 @@
private static final long serialVersionUID = -463481692840464586L;
- public SessionSubscriptionEvent(Topic topic, Session session) {
- super(topic, session);
+ public SessionSubscriptionEvent(Topic topic, TopicKey topicKey, Session session) {
+ super(topic, topicKey, session);
}
@Override
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionTopicEvent.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionTopicEvent.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionTopicEvent.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -31,14 +31,21 @@
private Session session;
- public SessionTopicEvent(Topic topic, Session session) {
+ private TopicKey topicKey;
+
+ public SessionTopicEvent(Topic topic, TopicKey topicKey, Session session) {
super(topic);
+ this.topicKey = topicKey;
this.session = session;
}
public Session getSession() {
return session;
}
+
+ public TopicKey getTopicKey() {
+ return topicKey;
+ }
@Override
public boolean isAppropriateListener(TopicListener listener) {
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionUnsubscriptionEvent.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionUnsubscriptionEvent.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SessionUnsubscriptionEvent.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -30,8 +30,8 @@
private static final long serialVersionUID = -2286664647234464678L;
- public SessionUnsubscriptionEvent(Topic topic, Session session) {
- super(topic, session);
+ public SessionUnsubscriptionEvent(Topic topic, TopicKey topicKey, Session session) {
+ super(topic, topicKey, session);
}
@Override
Deleted:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionAbortedException.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionAbortedException.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionAbortedException.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class SubscriptionAbortedException extends Exception {
-
- private static final long serialVersionUID = -2070837902276133333L;
-
- public SubscriptionAbortedException() {
- super();
- }
-
- public SubscriptionAbortedException(String message) {
- super(message);
- }
-
-}
Copied:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionFailureException.java
(from rev 19663,
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionAbortedException.java)
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionFailureException.java
(rev 0)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/SubscriptionFailureException.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class SubscriptionFailureException extends Exception {
+
+ private static final long serialVersionUID = -2070837902276133333L;
+
+ public SubscriptionFailureException() {
+ super();
+ }
+
+ public SubscriptionFailureException(String message) {
+ super(message);
+ }
+
+}
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Topic.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Topic.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/Topic.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -41,6 +41,6 @@
public void publishEvent(TopicEvent event) throws EventAbortedException;
- public void publish(Object messageData) throws MessageException;
-
+ public void publish(String subtopic, Object messageData) throws MessageException;
+
}
\ No newline at end of file
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/TopicKey.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/TopicKey.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/TopicKey.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -22,40 +22,117 @@
package org.richfaces.application.push;
import java.io.Serializable;
+import java.util.regex.Pattern;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+
/**
* @author Nick Belaevski
*
*/
public class TopicKey implements Serializable {
- /**
- *
- */
+ public static final char SUBCHANNEL_SEPARATOR = '@';
+
+ private static final Function<String, TopicKey> FACTORY = new
Function<String, TopicKey>() {
+ public TopicKey apply(String from) {
+ return new TopicKey(from);
+ };
+ };
+
+ private static final Function<TopicKey, String> TO_ADDRESS = new
Function<TopicKey, String>() {
+ public String apply(TopicKey from) {
+ return from.getTopicAddress();
+ };
+ };
+
private static final long serialVersionUID = -6967010810728932698L;
+
+ private static final Pattern NAME_PATTERN =
Pattern.compile("[a-zA-Z0-9_]+");
+
+ private static final Joiner AT_JOINER = Joiner.on(SUBCHANNEL_SEPARATOR).skipNulls();
+
private final String topicName;
- public TopicKey(String topicName) {
+ private final String subtopicName;
+
+ public TopicKey(String topicAddress) {
+ this(getTopicName(topicAddress), getSubtopicName(topicAddress));
+ }
+
+ public TopicKey(String topicName, String subtopicName) {
super();
if (topicName == null) {
throw new NullPointerException();
}
+
+ this.topicName = topicName;
+ this.subtopicName = subtopicName;
- this.topicName = topicName;
+ if (!NAME_PATTERN.matcher(topicName).matches()) {
+ throw new IllegalArgumentException(topicName);
+ }
+
+ if (subtopicName != null &&
!NAME_PATTERN.matcher(subtopicName).matches()) {
+ throw new IllegalArgumentException(subtopicName);
+ }
}
+
+ public static Function<String, TopicKey> factory() {
+ return FACTORY;
+ }
- /**
- * @return the topicName
- */
+ public static Function<TopicKey, String> toAddress() {
+ return TO_ADDRESS;
+ }
+
+ private static String getTopicName(String topicAddress) {
+ int idx = topicAddress.indexOf(SUBCHANNEL_SEPARATOR);
+
+ if (idx < 0) {
+ return topicAddress;
+ }
+
+ return topicAddress.substring(idx + 1);
+ }
+
+ private static String getSubtopicName(String topicAddress) {
+ int idx = topicAddress.indexOf(SUBCHANNEL_SEPARATOR);
+
+ if (idx < 0) {
+ return null;
+ }
+
+ return topicAddress.substring(0, idx);
+ }
+
public String getTopicName() {
return topicName;
}
+
+ public String getSubtopicName() {
+ return subtopicName;
+ }
+
+ public String getTopicAddress() {
+ return AT_JOINER.join(subtopicName, topicName);
+ }
+ public TopicKey getRootTopicKey() {
+ if (getSubtopicName() == null) {
+ return this;
+ } else {
+ return new TopicKey(getTopicName(), null);
+ }
+ }
+
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
+ result = prime * result + ((subtopicName == null) ? 0 :
subtopicName.hashCode());
result = prime * result + ((topicName == null) ? 0 : topicName.hashCode());
return result;
}
@@ -72,6 +149,13 @@
return false;
}
TopicKey other = (TopicKey) obj;
+ if (subtopicName == null) {
+ if (other.subtopicName != null) {
+ return false;
+ }
+ } else if (!subtopicName.equals(other.subtopicName)) {
+ return false;
+ }
if (topicName == null) {
if (other.topicName != null) {
return false;
Modified:
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/TopicsContext.java
===================================================================
---
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/TopicsContext.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/api/src/main/java/org/richfaces/application/push/TopicsContext.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -34,16 +34,16 @@
//TODO annotations for declarative topics registration
public abstract class TopicsContext {
- private ConcurrentMap<TopicKey, Topic> topics = new
ConcurrentHashMap<TopicKey, Topic>();
+ private ConcurrentMap<String, Topic> topics = new ConcurrentHashMap<String,
Topic>();
protected abstract Topic createTopic(TopicKey key);
public Topic getOrCreateTopic(TopicKey key) {
- Topic result = topics.get(key);
+ Topic result = topics.get(key.getTopicName());
if (result == null) {
Topic newTopic = createTopic(key);
- result = topics.putIfAbsent(key, newTopic);
+ result = topics.putIfAbsent(key.getTopicName(), newTopic);
if (result == null) {
result = newTopic;
}
@@ -53,21 +53,27 @@
}
public Topic getTopic(TopicKey key) {
- return topics.get(key);
+ Topic topic = topics.get(key.getTopicName());
+
+ if (topic == null) {
+ //TODO
+ }
+
+ return topic;
}
public void removeTopic(TopicKey key) {
- topics.remove(key);
+ topics.remove(key.getTopicName());
}
public void publish(TopicKey key, Object data) throws MessageException {
Topic topic = getTopic(key);
if (topic == null) {
- throw new MessageException(MessageFormat.format("Topic {0} not
found", key.getTopicName()));
+ throw new MessageException(MessageFormat.format("Topic {0} not
found", key.getTopicAddress()));
}
- topic.publish(data);
+ topic.publish(key.getSubtopicName(), data);
}
public static TopicsContext lookup() {
Modified: branches/RF-7817/core/impl/checkstyle-suppressions.xml
===================================================================
--- branches/RF-7817/core/impl/checkstyle-suppressions.xml 2010-10-26 22:19:31 UTC (rev
19675)
+++ branches/RF-7817/core/impl/checkstyle-suppressions.xml 2010-10-27 00:57:52 UTC (rev
19676)
@@ -36,5 +36,5 @@
<!-- TODO it is hot fix for building process, this files belong to the
push-redesign module,
and must be removed from here -->
- <suppress checks="IllegalCatch" files="RequestImpl.java" />
+ <suppress checks="IllegalCatch" files="AbstractRequest.java"
/>
</suppressions>
Modified:
branches/RF-7817/core/impl/src/main/resources/META-INF/resources/richfaces-event.js
===================================================================
---
branches/RF-7817/core/impl/src/main/resources/META-INF/resources/richfaces-event.js 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/core/impl/src/main/resources/META-INF/resources/richfaces-event.js 2010-10-27
00:57:52 UTC (rev 19676)
@@ -52,6 +52,13 @@
* */
RICH_NAMESPACE : "RICH",
+ /**
+ * @constant
+ * @name RichFaces.Event.EVENT_NAMESPACE_SEPARATOR
+ * @type string
+ * */
+ EVENT_NAMESPACE_SEPARATOR : ".",
+
/**
* Attach an event handler to execute when the DOM is fully loaded.
*
@@ -268,7 +275,7 @@
if (id) {
a.push(id);
}
- return a.join('.');
+ return a.join(richfaces.Event.EVENT_NAMESPACE_SEPARATOR);
}
});
Modified: branches/RF-7817/push-redesign/checkstyle-suppressions.xml
===================================================================
--- branches/RF-7817/push-redesign/checkstyle-suppressions.xml 2010-10-26 22:19:31 UTC
(rev 19675)
+++ branches/RF-7817/push-redesign/checkstyle-suppressions.xml 2010-10-27 00:57:52 UTC
(rev 19676)
@@ -36,5 +36,5 @@
<!-- TODO it is hot fix for building process, this files belong to the
push-redesign module,
and must be removed from here -->
- <suppress checks="IllegalCatch" files="RequestImpl.java" />
+ <suppress checks="IllegalCatch" files="AbstractRequest.java"
/>
</suppressions>
Copied:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java
(from rev 19663,
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java)
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -0,0 +1,231 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.ajax4jsf.javascript.JSLiteral;
+import org.ajax4jsf.javascript.ScriptString;
+import org.ajax4jsf.javascript.ScriptUtils;
+import org.atmosphere.cpr.AtmosphereEventLifecycle;
+import org.atmosphere.cpr.AtmosphereResource;
+import org.atmosphere.cpr.AtmosphereResourceEvent;
+import org.atmosphere.cpr.AtmosphereResourceEventListener;
+import org.atmosphere.websocket.WebSocketSupport;
+import org.richfaces.application.push.Request;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.TopicKey;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public abstract class AbstractRequest implements Request {
+
+ private static final String TOPIC_KEY = "topic";
+
+ private static final String DATA_KEY = "data";
+
+ private static final int SUSPEND_TIMEOUT = 30 * 1000;
+
+ private static final class Message implements ScriptString {
+
+ private TopicKey topicKey;
+
+ private String serializedData;
+
+ public Message(TopicKey topicKey, String serializedData) {
+ super();
+ this.topicKey = topicKey;
+ this.serializedData = serializedData;
+ }
+
+ public String toScript() {
+ Map<String,Object> map = new HashMap<String, Object>(2);
+
+ map.put(TOPIC_KEY, topicKey.getTopicAddress());
+ map.put(DATA_KEY, new JSLiteral(serializedData));
+
+ return ScriptUtils.toScript(map);
+ }
+
+ public void appendScript(StringBuffer functionString) {
+ functionString.append(toScript());
+ }
+ }
+
+ private static final class FlushMessagesTask implements Runnable {
+
+ private Request request;
+
+ public FlushMessagesTask(Request request) {
+ super();
+ this.request = request;
+ }
+
+ public void run() {
+ try {
+ request.flushMessages();
+ } catch (Throwable e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private final AtmosphereResourceEventListener resourceEventListener = new
AtmosphereResourceEventListener() {
+
+ public void onSuspend(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ AbstractRequest.this.onSuspend();
+ }
+
+ public void onResume(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ AbstractRequest.this.onResume();
+ }
+
+ public void onDisconnect(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ AbstractRequest.this.onDisconnect();
+ }
+
+ public void onBroadcast(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ AbstractRequest.this.onBroadcast();
+ }
+ };
+
+ private final AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource;
+
+ private final Session session;
+
+ private final ExecutorService executorService;
+
+ private final Queue<Message> messagesQueue = new
ConcurrentLinkedQueue<Message>();
+
+ private AtomicBoolean submitted = new AtomicBoolean(false);
+
+ public AbstractRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> atmosphereResource, Session session,
+ ExecutorService executorService) {
+
+ super();
+
+ this.atmosphereResource = atmosphereResource;
+
+ ((AtmosphereEventLifecycle)
atmosphereResource).addEventListener(resourceEventListener);
+
+ this.session = session;
+ this.executorService = executorService;
+ }
+
+ private void submitToWorker() {
+ if (submitted.compareAndSet(false, true)) {
+ executorService.submit(new FlushMessagesTask(this));
+ }
+ }
+
+ private String serializeMessages() {
+ return ScriptUtils.toScript(new
ConsumingCollection<Message>(messagesQueue));
+ }
+
+ public void flushMessages() throws IOException {
+ String serializedMessages = serializeMessages();
+ PrintWriter writer = atmosphereResource.getResponse().getWriter();
+ writer.write(serializedMessages);
+ writer.flush();
+
+ submitted.compareAndSet(true, false);
+
+ if (isPolling()) {
+ atmosphereResource.resume();
+ } else if (!messagesQueue.isEmpty()) {
+ submitToWorker();
+ }
+ }
+
+ public void postMessage(TopicKey topicKey, String serializedMessage) {
+ messagesQueue.add(new Message(topicKey, serializedMessage));
+ submitToWorker();
+ }
+
+ public void suspend() throws IOException {
+ atmosphereResource.suspend(SUSPEND_TIMEOUT, isPolling());
+ }
+
+ public void resume() throws IOException {
+ atmosphereResource.resume();
+ }
+
+ public boolean isSuspended() {
+ return atmosphereResource.getAtmosphereResourceEvent().isSuspended();
+ }
+
+ public boolean isPolling() {
+ HttpServletRequest req = atmosphereResource.getRequest();
+ boolean isWebsocket = req.getAttribute(WebSocketSupport.WEBSOCKET_SUSPEND) !=
null ||
+ req.getAttribute(WebSocketSupport.WEBSOCKET_RESUME) != null;
+
+ //TODO how to detect non-polling transports?
+ return !isWebsocket;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ protected AtmosphereResource<HttpServletRequest, HttpServletResponse>
getResource() {
+ return atmosphereResource;
+ }
+
+ protected void onSuspend() {
+ }
+
+ protected void onResume() {
+ try {
+ session.disconnect();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ protected void onDisconnect() {
+ try {
+ session.disconnect();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ protected void onBroadcast() {
+ }
+
+}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractSession.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractSession.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -21,12 +21,11 @@
*/
package org.richfaces.application.push.impl;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.io.IOException;
import org.richfaces.application.push.Request;
import org.richfaces.application.push.Session;
-import org.richfaces.application.push.SessionListener;
+import org.richfaces.application.push.SessionManager;
/**
* @author Nick Belaevski
@@ -34,53 +33,20 @@
*/
public abstract class AbstractSession implements Session {
- private static final int MAX_INACTIVE_INTERVAL = 60 * 1000;
+ private static final int MAX_INACTIVE_INTERVAL = 5 * 60 * 1000;
- private static final SessionListener[] EMPTY_SESSION_LISTENERS_ARRAY = new
SessionListener[0];
-
- private static enum Events {
- connected {
- @Override
- public void notifyListener(AbstractSession session, SessionListener listener)
{
- listener.onRequestConnected(session);
- }
- },
-
- disconnected {
- @Override
- public void notifyListener(AbstractSession session, SessionListener listener)
{
- listener.onRequestDisconnected(session);
- }
- },
-
- destroyed {
- @Override
- public void notifyListener(AbstractSession session, SessionListener listener)
{
- listener.onSessionDestroyed(session);
- }
- };
-
- abstract void notifyListener(AbstractSession session, SessionListener listener);
+ private final String id;
- public void notifyListeners(AbstractSession session) {
- for (SessionListener listener : session.listeners) {
- notifyListener(session, listener);
- }
- }
- }
+ private final SessionManager sessionManager;
- private final String id;
-
private volatile long lastAccessedTime;
private volatile Request request;
-
- //TODO performance?
- private List<SessionListener> listeners = new
CopyOnWriteArrayList<SessionListener>();
-
- public AbstractSession(String id) {
+
+ public AbstractSession(String id, SessionManager sessionManager) {
super();
this.id = id;
+ this.sessionManager = sessionManager;
resetLastAccessedTimeToCurrent();
}
@@ -89,23 +55,25 @@
lastAccessedTime = System.currentTimeMillis();
}
+ private void requeue() {
+ resetLastAccessedTimeToCurrent();
+ sessionManager.requeue(this);
+ }
+
public void connect(Request request) throws Exception {
if (this.request != null) {
- throw new IllegalStateException("already connected");
+ this.request.resume();
}
this.request = request;
-
- Events.connected.notifyListeners(this);
+ requeue();
+
request.suspend();
}
- public void disconnect(Request request) throws Exception {
+ public void disconnect() throws Exception {
this.request = null;
- resetLastAccessedTimeToCurrent();
-
- Events.disconnected.notifyListeners(this);
}
public long getLastAccessedTime() {
@@ -124,28 +92,23 @@
return id;
}
- public void addSessionListener(SessionListener sessionListener) {
- listeners.add(sessionListener);
- }
-
- public SessionListener[] getSessionListeners() {
- return listeners.toArray(EMPTY_SESSION_LISTENERS_ARRAY);
- }
-
- public void removeSessionListener(SessionListener sessionListener) {
- listeners.remove(sessionListener);
- }
-
public Request getRequest() {
return request;
}
public void destroy() {
if (request != null) {
+ try {
+ request.resume();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ request = null;
//TODO - clean up request
}
- Events.destroyed.notifyListeners(this);
+ sessionManager.removePushSession(this);
// TODO Auto-generated method stub
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractTopic.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractTopic.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractTopic.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -47,6 +47,8 @@
private volatile MessageDataSerializer serializer;
+ private volatile boolean allowSubtopics;
+
private List<TopicListener> listeners = new
CopyOnWriteArrayList<TopicListener>();
public AbstractTopic(TopicKey key) {
@@ -66,6 +68,14 @@
this.serializer = serializer;
}
+ public boolean isAllowSubtopics() {
+ return allowSubtopics;
+ }
+
+ public void setAllowSubtopics(boolean allowSubtopics) {
+ this.allowSubtopics = allowSubtopics;
+ }
+
public TopicKey getKey() {
return key;
}
@@ -92,6 +102,6 @@
}
}
- public abstract void publish(Object messageData) throws MessageException;
+ public abstract void publish(String subtopic, Object messageData) throws
MessageException;
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -69,7 +69,7 @@
worker = Executors.newCachedThreadPool(DAEMON_THREADS_FACTORY);
}
- protected SessionManager getSessionManager() {
+ public SessionManager getSessionManager() {
return sessionManager;
}
@@ -108,10 +108,12 @@
//do nothing
}
- protected Request createRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource, Session session) {
- return new RequestImpl(resource, session, worker);
+ protected abstract Request createRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource, Session session);
+
+ protected ExecutorService getWorker() {
+ return worker;
}
-
+
public void init(ServletConfig servletConfig) throws Exception {
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushResource.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushResource.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushResource.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -24,17 +24,22 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.util.Date;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.UUID;
+import javax.faces.FacesException;
+import javax.faces.context.ExternalContext;
import javax.faces.context.FacesContext;
+import org.ajax4jsf.javascript.ScriptUtils;
import org.richfaces.application.ServiceTracker;
import org.richfaces.application.push.PushContext;
import org.richfaces.application.push.PushContextFactory;
import org.richfaces.application.push.Session;
-import org.richfaces.application.push.SubscriptionAbortedException;
import org.richfaces.application.push.TopicKey;
import org.richfaces.resource.DynamicResource;
import org.richfaces.resource.UserResource;
@@ -48,6 +53,8 @@
private static final String PUSH_TOPIC_PARAM = "pushTopic";
+ private static final String FORGET_PUSH_SESSION_ID_PARAM =
"forgetPushSessionId";
+
public Map<String, String> getResponseHeaders() {
return null;
}
@@ -56,39 +63,61 @@
return null;
}
- //TODO implement "session forget" param
+ private InputStream mapToScript(Map<String, Object> map) {
+ try {
+ byte[] bs = ScriptUtils.toScript(map).getBytes("UTF-8");
+ return new ByteArrayInputStream(bs);
+ } catch (UnsupportedEncodingException e) {
+ throw new FacesException(e.getMessage(), e);
+ }
+ }
+
+ private Map<String, String> getFailuresMap(Map<TopicKey, String>
failedSubscriptions) {
+ Map<String,String> result = new HashMap<String, String>();
+
+ for (Entry<TopicKey, String> entry: failedSubscriptions.entrySet()) {
+ result.put(entry.getKey().getTopicAddress(), entry.getValue());
+ }
+
+ return result;
+ }
+
public InputStream getInputStream() throws IOException {
FacesContext facesContext = FacesContext.getCurrentInstance();
+ ExternalContext externalContext = facesContext.getExternalContext();
+
PushContextFactory pushContextFactory =
ServiceTracker.getService(PushContextFactory.class);
PushContext pushContext = pushContextFactory.getPushContext();
+ String forgetPushSessionId =
externalContext.getRequestParameterMap().get(FORGET_PUSH_SESSION_ID_PARAM);
+ if (forgetPushSessionId != null) {
+ Session oldSession =
pushContext.getSessionManager().getPushSession(forgetPushSessionId);
+ if (oldSession != null) {
+ oldSession.destroy();
+ }
+ }
+
Session session =
pushContext.getSessionFactory().createSession(UUID.randomUUID().toString());
- String[] topicNames =
facesContext.getExternalContext().getRequestParameterValuesMap().get(PUSH_TOPIC_PARAM);
+ String[] topicNames =
externalContext.getRequestParameterValuesMap().get(PUSH_TOPIC_PARAM);
if (topicNames == null) {
throw new IllegalArgumentException();
}
- for (int i = 0; i < topicNames.length; i++) {
- TopicKey topicKey = new TopicKey(topicNames[i]);
-
- try {
- session.subscribe(topicKey);
- } catch (SubscriptionAbortedException e) {
- System.out.println("Aborted: '" + e.getMessage() +
"'");
- // TODO: handle exception
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
+ session.subscribe(topicNames);
- return new ByteArrayInputStream(session.getId().getBytes("UTF-8"));
+ Map<String, Object> subscriptionData = new HashMap<String,
Object>(4);
+ subscriptionData.put("sessionId", session.getId());
+
+ Map<TopicKey, String> failedSubscriptions =
session.getFailedSubscriptions();
+ subscriptionData.put("failures", getFailuresMap(failedSubscriptions));
+
+ return mapToScript(subscriptionData);
}
public String getContentType() {
- return "text/plain";
+ return "application/javascript; charset=utf-8";
}
public int getContentLength() {
Deleted:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -1,225 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.ajax4jsf.javascript.JSLiteral;
-import org.ajax4jsf.javascript.ScriptString;
-import org.ajax4jsf.javascript.ScriptUtils;
-import org.atmosphere.cpr.AtmosphereEventLifecycle;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResourceEvent;
-import org.atmosphere.cpr.AtmosphereResourceEventListener;
-import org.atmosphere.websocket.WebSocketSupport;
-import org.richfaces.application.push.Request;
-import org.richfaces.application.push.RequestLifecycleListener;
-import org.richfaces.application.push.Session;
-import org.richfaces.application.push.TopicKey;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class RequestImpl implements Request {
-
- private static final String TOPIC_KEY = "topic";
-
- private static final String DATA_KEY = "data";
-
- private static final class Message implements ScriptString {
-
- private TopicKey topicKey;
-
- private String serializedData;
-
- public Message(TopicKey topicKey, String serializedData) {
- super();
- this.topicKey = topicKey;
- this.serializedData = serializedData;
- }
-
- public String toScript() {
- Map<String,Object> map = new HashMap<String, Object>(2);
-
- map.put(TOPIC_KEY, topicKey.getTopicName());
- map.put(DATA_KEY, new JSLiteral(serializedData));
-
- return ScriptUtils.toScript(map);
- }
-
- public void appendScript(StringBuffer functionString) {
- functionString.append(toScript());
- }
- }
-
- private static final class FlushMessagesTask implements Runnable {
-
- private Request request;
-
- public FlushMessagesTask(Request request) {
- super();
- this.request = request;
- }
-
- public void run() {
- try {
- request.flushMessages();
- } catch (Throwable e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
-
- private AtmosphereResourceEventListener atmosphereResourceListener = new
AtmosphereResourceEventListener() {
-
- public void onSuspend(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
- for (RequestLifecycleListener listener : listeners) {
- listener.onSuspend(RequestImpl.this);
- }
- }
-
- public void onResume(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
- for (RequestLifecycleListener listener : listeners) {
- listener.onResume(RequestImpl.this);
- }
- }
-
- public void onDisconnect(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
- for (RequestLifecycleListener listener : listeners) {
- listener.onDisconnect(RequestImpl.this);
- }
- }
-
- public void onBroadcast(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
- // TODO Auto-generated method stub
-
- }
- };
-
- private final AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource;
-
- private final Session session;
-
- private final ExecutorService executorService;
-
- private final Queue<Message> messagesQueue = new
ConcurrentLinkedQueue<Message>();
-
- private final List<RequestLifecycleListener> listeners = new
CopyOnWriteArrayList<RequestLifecycleListener>();
-
- private AtomicBoolean submitted = new AtomicBoolean(false);
-
- public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource, Session session,
- ExecutorService executorService) {
-
- super();
-
- this.atmosphereResource = atmosphereResource;
-
- ((AtmosphereEventLifecycle)
atmosphereResource).addEventListener(atmosphereResourceListener);
-
- this.session = session;
- this.executorService = executorService;
- }
-
- private void submitToWorker() {
- if (submitted.compareAndSet(false, true)) {
- executorService.submit(new FlushMessagesTask(this));
- }
- }
-
- private String serializeMessages() {
- return ScriptUtils.toScript(new
ConsumingCollection<Message>(messagesQueue));
- }
-
- public void flushMessages() throws IOException {
- for (RequestLifecycleListener listener : listeners) {
- listener.onFlush(this);
- }
-
- String serializedMessages = serializeMessages();
- PrintWriter writer = atmosphereResource.getResponse().getWriter();
- writer.write(serializedMessages);
- writer.flush();
-
- submitted.compareAndSet(true, false);
-
- if (isPolling()) {
- atmosphereResource.resume();
- } else if (!messagesQueue.isEmpty()) {
- submitToWorker();
- }
- }
-
- public void postMessage(TopicKey topicKey, String serializedMessage) {
- messagesQueue.add(new Message(topicKey, serializedMessage));
- submitToWorker();
- }
-
- public void addListener(RequestLifecycleListener listener) {
- listeners.add(listener);
- }
-
- public void removeListener(RequestLifecycleListener listener) {
- listeners.remove(listener);
- }
-
- public void suspend() throws IOException {
- atmosphereResource.suspend(-1, isPolling());
- }
-
- public void resume() throws IOException {
- atmosphereResource.resume();
- }
-
- public boolean isSuspended() {
- return atmosphereResource.getAtmosphereResourceEvent().isSuspended();
- }
-
- public boolean isPolling() {
- HttpServletRequest req = atmosphereResource.getRequest();
- boolean isWebsocket = req.getAttribute(WebSocketSupport.WEBSOCKET_SUSPEND) !=
null ||
- req.getAttribute(WebSocketSupport.WEBSOCKET_RESUME) != null;
-
- //TODO how to detect non-polling transports?
- return !isWebsocket;
- }
-
- public Session getSession() {
- return session;
- }
-
-}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -23,13 +23,9 @@
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import org.richfaces.application.push.Session;
-import org.richfaces.application.push.SessionListener;
import org.richfaces.application.push.SessionManager;
import org.richfaces.log.Logger;
import org.richfaces.log.RichfacesLogger;
@@ -44,75 +40,13 @@
private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
- //TODO - implement queue around sessions properly
- private final class DelayedSessionHolder implements Delayed {
-
- private final Session session;
-
- public DelayedSessionHolder(Session session) {
- super();
- this.session = session;
- }
-
- public int compareTo(Delayed o) {
- return
Long.valueOf(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS));
- }
-
- public long getDelay(TimeUnit unit) {
- return unit.convert(session.getLastAccessedTime() +
session.getMaxInactiveInterval() - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- }
-
- public Session getSession() {
- return session;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + getOuterType().hashCode();
- result = prime * result + ((session == null) ? 0 : session.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- DelayedSessionHolder other = (DelayedSessionHolder) obj;
- if (!getOuterType().equals(other.getOuterType())) {
- return false;
- }
- if (session == null) {
- if (other.session != null) {
- return false;
- }
- } else if (!session.equals(other.session)) {
- return false;
- }
- return true;
- }
-
- private SessionManagerImpl getOuterType() {
- return SessionManagerImpl.this;
- }
- }
-
private final class SessionsExpirationRunnable implements Runnable {
public void run() {
while (true) {
try {
- DelayedSessionHolder sessionHolder = expirationQueue.take();
- sessionMap.remove(sessionHolder.getSession().getId());
- sessionHolder.getSession().destroy();
+ Session session = sessionQueue.take();
+ sessionMap.remove(session.getId());
+ session.destroy();
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
@@ -121,25 +55,9 @@
}
}
- private final SessionListener sessionListener = new SessionListener() {
-
- public void onSessionDestroyed(Session session) {
- // TODO Auto-generated method stub
- removePushSession(session.getId());
- }
-
- public void onRequestDisconnected(Session session) {
- expirationQueue.add(new DelayedSessionHolder(session));
- }
-
- public void onRequestConnected(Session session) {
- expirationQueue.remove(new DelayedSessionHolder(session));
- }
- };
-
private ConcurrentMap<String, Session> sessionMap = new MapMaker().makeMap();
- private DelayQueue<DelayedSessionHolder> expirationQueue = new
DelayQueue<DelayedSessionHolder>();
+ private SessionQueue sessionQueue = new SessionQueue();
public SessionManagerImpl(ThreadFactory threadFactory) {
threadFactory.newThread(new SessionsExpirationRunnable()).start();
@@ -149,17 +67,16 @@
return sessionMap.get(id);
}
- public void removePushSession(String id) {
- Session session = sessionMap.remove(id);
+ public void removePushSession(Session session) {
+ sessionMap.remove(session.getId());
if (session != null) {
- session.removeSessionListener(sessionListener);
- expirationQueue.remove(new DelayedSessionHolder(session));
+ sessionQueue.remove(session);
}
}
public void destroy() {
//TODO notify all session
- expirationQueue.clear();
+ sessionQueue.clear();
while (!sessionMap.isEmpty()) {
for (Iterator<Session> sessionsItr = sessionMap.values().iterator();
sessionsItr.hasNext(); ) {
@@ -179,7 +96,10 @@
throw new IllegalStateException();
}
- expirationQueue.add(new DelayedSessionHolder(session));
- session.addSessionListener(sessionListener);
+ requeue(session);
}
+
+ public void requeue(Session session) {
+ sessionQueue.requeue(session);
+ }
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -21,50 +21,72 @@
*/
package org.richfaces.application.push.impl.jms;
+import java.util.Collection;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.UUID;
import javax.faces.context.FacesContext;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
-import javax.jms.Session;
import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.Name;
import javax.naming.NamingException;
import javax.servlet.ServletContext;
+import org.richfaces.application.push.Session;
import org.richfaces.application.push.TopicKey;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+
/**
* @author Nick Belaevski
*
*/
public class MessagingContext {
+ static final String SUBTOPIC_ATTRIBUTE_NAME = "rf_push_subtopic";
+
+ private static final Joiner OR_JOINER = Joiner.on(" OR ").skipNulls();
+
+ private static final Function<TopicKey, String> TOPIC_KEY_TO_MESSAGE_SELECTOR =
new Function<TopicKey, String>() {
+ public String apply(TopicKey from) {
+ if (Strings.isNullOrEmpty(from.getSubtopicName())) {
+ return null;
+ }
+
+ return SUBTOPIC_ATTRIBUTE_NAME + " = '" +
from.getSubtopicName() + "'";
+ }
+ };
+
private static final String SHARED_INSTANCE_KEY = MessagingContext.class.getName();
-
+
private final InitialContext initialContext;
-
+
private final Name connectionFactoryName;
-
+
private final Name topicsRootName;
private final String applicationName;
-
+
private final String username;
-
+
private final String password;
-
+
private Connection connection;
-
+
public MessagingContext(InitialContext initialContext, Name connectionFactoryName,
Name topicsRootName,
String applicationName) {
-
+
this(initialContext, connectionFactoryName, topicsRootName, applicationName,
null, null);
}
-
+
public MessagingContext(InitialContext initialContext, Name connectionFactoryName,
Name topicsRootName,
String applicationName, String username, String password) {
@@ -81,12 +103,12 @@
Name clonedName = (Name) name.clone();
return clonedName.add(comp);
}
-
+
public void start() throws Exception {
ConnectionFactory connectionFactory = (ConnectionFactory)
initialContext.lookup(connectionFactoryName);
-
+
connection = connectionFactory.createConnection(username, password);
-
+
//TODO - review
try {
//durable subscription requires ClientID to be set
@@ -94,43 +116,63 @@
} catch (IllegalStateException e) {
//ignore - clientId has already been set
}
-
+
connection.start();
}
-
+
public void stop() throws Exception {
connection.stop();
connection = null;
}
-
+
public Connection getConnection() {
if (connection == null) {
throw new IllegalStateException("connection is absent");
}
-
+
return connection;
}
public Topic lookup(TopicKey topicKey) throws NamingException {
Name topicName = appendToName(topicsRootName, topicKey.getTopicName());
-
+
return (Topic) initialContext.lookup(topicName);
}
-
- public Session createSession() throws JMSException {
- return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ public javax.jms.Session createSession() throws JMSException {
+ return connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
}
-
- public String getApplicationName() {
- return applicationName;
+
+ public String getSubscriptionClientId(Session session, TopicKey topicKey) {
+ //here TopicKey#topicName should be used, not TopicKey#topicAddress
+ return "rf-push:" + applicationName + ":" +
topicKey.getTopicName() + ":" + session.getId();
}
public void shareInstance(FacesContext facesContext) {
Map<String, Object> applicationMap =
facesContext.getExternalContext().getApplicationMap();
applicationMap.put(SHARED_INSTANCE_KEY, this);
}
-
+
public static MessagingContext getSharedInstance(ServletContext servletContext) {
return (MessagingContext) servletContext.getAttribute(SHARED_INSTANCE_KEY);
}
+
+ private String createMessageSelector(Iterable<TopicKey> topicKeys) {
+ Iterable<String> sqlStrings = Iterables.transform(topicKeys,
TOPIC_KEY_TO_MESSAGE_SELECTOR);
+ return OR_JOINER.join(sqlStrings) + " OR false" /* workaround for
HornetQ */;
+ }
+
+ public TopicSubscriber createTopicSubscriber(Session pushSession, javax.jms.Session
jmsSession,
+ Entry<TopicKey, Collection<TopicKey>> entry)
+ throws JMSException, NamingException {
+
+ TopicKey rootTopicKey = entry.getKey();
+
+ String subscriptionClientId = getSubscriptionClientId(pushSession,
rootTopicKey);
+
+ javax.jms.Topic jmsTopic = lookup(rootTopicKey);
+
+ return jmsSession.createDurableSubscriber(jmsTopic, subscriptionClientId,
createMessageSelector(entry.getValue()), true);
+ }
+
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushContextImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushContextImpl.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushContextImpl.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -30,12 +30,14 @@
import javax.naming.CompositeName;
import javax.naming.InitialContext;
import javax.naming.Name;
+import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.atmosphere.cpr.AtmosphereHandler;
import org.richfaces.application.push.PushContext;
import org.richfaces.application.push.SessionFactory;
+import org.richfaces.application.push.SessionManager;
import org.richfaces.application.push.TopicsContext;
import org.richfaces.application.push.impl.AtmosphereHandlerProvider;
import org.richfaces.log.Logger;
@@ -59,6 +61,11 @@
return topicsContext;
}
+ private String getApplicationName(FacesContext facesContext) {
+ ServletContext servletContext = (ServletContext)
facesContext.getExternalContext().getContext();
+ return servletContext.getContextPath();
+ }
+
public void init(FacesContext facesContext) {
try {
facesContext.getApplication().subscribeToEvent(PreDestroyApplicationEvent.class, this);
@@ -69,7 +76,7 @@
Name topicsRootName = new CompositeName("/topic");
messagingContext = new MessagingContext(initialContext, cnfName,
topicsRootName,
- facesContext.getExternalContext().getContextName());
+ getApplicationName(facesContext));
messagingContext.shareInstance(facesContext);
@@ -84,16 +91,20 @@
}
public void destroy() {
- try {
- messagingContext.stop();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
+ if (messagingContext != null) {
+ try {
+ messagingContext.stop();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
}
- try {
- pushHandlerImpl.destroy();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
+ if (pushHandlerImpl != null) {
+ try {
+ pushHandlerImpl.destroy();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
}
}
@@ -116,4 +127,8 @@
public AtmosphereHandler<HttpServletRequest, HttpServletResponse> getHandler()
{
return pushHandlerImpl;
}
+
+ public SessionManager getSessionManager() {
+ return pushHandlerImpl.getSessionManager();
+ }
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -21,8 +21,14 @@
*/
package org.richfaces.application.push.impl.jms;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.atmosphere.cpr.AtmosphereResource;
+import org.richfaces.application.push.Request;
import org.richfaces.application.push.Session;
import org.richfaces.application.push.SessionFactory;
+import org.richfaces.application.push.SessionManager;
import org.richfaces.application.push.TopicsContext;
import org.richfaces.application.push.impl.AtmospherePushHandler;
@@ -33,9 +39,9 @@
public class PushHandlerImpl extends AtmospherePushHandler implements SessionFactory {
private MessagingContext messagingContext;
-
+
private TopicsContext topicsContext;
-
+
public PushHandlerImpl(MessagingContext messagingContext, TopicsContext
topicsContext) {
super();
this.messagingContext = messagingContext;
@@ -43,10 +49,16 @@
}
public Session createSession(String key) {
- Session session = new SessionImpl(key, messagingContext, topicsContext);
- getSessionManager().putPushSession(session);
-
+ SessionManager sessionManager = getSessionManager();
+ Session session = new SessionImpl(key, sessionManager, messagingContext,
topicsContext);
+ sessionManager.putPushSession(session);
+
return session;
}
+ @Override
+ protected Request createRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource,
+ Session session) {
+ return new RequestImpl(resource, session, getWorker(), messagingContext,
topicsContext);
+ }
}
Added:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -0,0 +1,186 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl.jms;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.atmosphere.cpr.AtmosphereResource;
+import org.richfaces.application.push.MessageDataSerializer;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.TopicKey;
+import org.richfaces.application.push.TopicsContext;
+import org.richfaces.application.push.impl.AbstractRequest;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class RequestImpl extends AbstractRequest implements MessageListener {
+
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
+ private javax.jms.Session jmsSession;
+
+ private MessagingContext messagingContext;
+
+ private TopicsContext topicsContext;
+
+ public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource, Session session,
+ ExecutorService executorService, MessagingContext messagingContext, TopicsContext
topicsContext) {
+
+ super(atmosphereResource, session, executorService);
+
+ this.messagingContext = messagingContext;
+ this.topicsContext = topicsContext;
+ }
+
+ private void closeSession() {
+ if (jmsSession != null) {
+ try {
+ jmsSession.close();
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ jmsSession = null;
+ }
+ }
+
+ @Override
+ public void onSuspend() {
+ super.onSuspend();
+
+ try {
+ jmsSession = messagingContext.createSession();
+
+ //TODO - remove this case
+ SessionImpl sessionImpl = (SessionImpl) getSession();
+
+ for (Entry<TopicKey, Collection<TopicKey>> entry:
sessionImpl.getSuccessfulSubscriptions().asMap().entrySet()) {
+ messagingContext.createTopicSubscriber(sessionImpl, jmsSession,
entry).setMessageListener(this);
+ }
+
+ } catch (Exception e) {
+ // TODO: handle exception
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void flushMessages() throws IOException {
+ if (isPolling()) {
+ closeSession();
+ }
+
+ super.flushMessages();
+ }
+
+ @Override
+ public void onDisconnect() {
+ closeSession();
+ super.onDisconnect();
+ }
+
+ @Override
+ public void onResume() {
+ closeSession();
+ super.onResume();
+ }
+
+ private String serializeMessage(org.richfaces.application.push.Topic topic, Message
message) {
+ String serializedMessageData = null;
+ Object messageData = null;
+
+ try {
+ if (message instanceof ObjectMessage) {
+ messageData = ((ObjectMessage) message).getObject();
+ } else if (message instanceof TextMessage) {
+ TextMessage textMessage = (TextMessage) message;
+
+ if (message.getBooleanProperty(TopicImpl.SERIALIZED_DATA_INDICATOR)) {
+ serializedMessageData = textMessage.getText();
+ } else {
+ messageData = textMessage.getText();
+ }
+ }
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ if (serializedMessageData != null) {
+ return serializedMessageData;
+ }
+
+ if (messageData != null) {
+ MessageDataSerializer messageDataSerializer =
topic.getMessageDataSerializer();
+ try {
+ return messageDataSerializer.serialize(messageData);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+ */
+ public void onMessage(Message message) {
+ try {
+ String topicName = ((Topic) message.getJMSDestination()).getTopicName();
+
+ org.richfaces.application.push.Topic topic = topicsContext.getTopic(new
TopicKey(topicName));
+ if (topic == null) {
+ //TODO log
+ return;
+ }
+
+ String serializedMessageData = serializeMessage(topic, message);
+ if (serializedMessageData == null) {
+ //TODO log
+ return;
+ }
+
+ postMessage(new TopicKey(topicName,
message.getStringProperty(MessagingContext.SUBTOPIC_ATTRIBUTE_NAME)),
serializedMessageData);
+
+ message.acknowledge();
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -21,244 +21,146 @@
*/
package org.richfaces.application.push.impl.jms;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
+import javax.naming.NamingException;
import org.richfaces.application.push.EventAbortedException;
-import org.richfaces.application.push.MessageDataSerializer;
-import org.richfaces.application.push.Request;
-import org.richfaces.application.push.RequestLifecycleListener;
+import org.richfaces.application.push.SessionManager;
import org.richfaces.application.push.SessionPreSubscriptionEvent;
-import org.richfaces.application.push.SessionSubscriptionEvent;
-import org.richfaces.application.push.SessionUnsubscriptionEvent;
-import org.richfaces.application.push.SubscriptionAbortedException;
+import org.richfaces.application.push.Topic;
import org.richfaces.application.push.TopicKey;
import org.richfaces.application.push.TopicsContext;
import org.richfaces.application.push.impl.AbstractSession;
import org.richfaces.log.Logger;
import org.richfaces.log.RichfacesLogger;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
/**
* @author Nick Belaevski
*
*/
-public class SessionImpl extends AbstractSession implements RequestLifecycleListener,
MessageListener {
+public class SessionImpl extends AbstractSession {
private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
+
private final MessagingContext messagingContext;
-
+
private final TopicsContext topicsContext;
+
+ private final Multimap<TopicKey, TopicKey> successfulSubscriptions =
ArrayListMultimap.<TopicKey, TopicKey>create();
- private Collection<TopicKey> subscribedTopics = new HashSet<TopicKey>();
-
- private volatile Session jmsSession;
-
- public SessionImpl(String id, MessagingContext messagingContext, TopicsContext
topicsContext) {
- super(id);
-
+ private final Map<TopicKey, String> failedSubscriptions = Maps.newHashMap();
+
+ public SessionImpl(String id, SessionManager sessionManager, MessagingContext
messagingContext, TopicsContext topicsContext) {
+ super(id, sessionManager);
+
this.messagingContext = messagingContext;
this.topicsContext = topicsContext;
}
- private String getSubscriptionClientId(TopicKey topicKey) {
- return "rf-push:" + messagingContext.getApplicationName() +
":" + topicKey.getTopicName() + ":" + getId();
- }
-
- public void subscribe(TopicKey topicKey) throws Exception {
- Session session = null;
- try {
- session = messagingContext.createSession();
-
- org.richfaces.application.push.Topic topic =
topicsContext.getTopic(topicKey);
-
- try {
- SessionPreSubscriptionEvent preSubscriptionEvent = new
SessionPreSubscriptionEvent(topic, this);
- topic.publishEvent(preSubscriptionEvent);
- } catch (EventAbortedException e) {
- throw new SubscriptionAbortedException(e.getMessage());
- }
-
- session.createDurableSubscriber(messagingContext.lookup(topicKey),
getSubscriptionClientId(topicKey));
- subscribedTopics.add(topicKey);
-
- try {
- SessionSubscriptionEvent subscriptionEvent = new
SessionSubscriptionEvent(topic, this);
- topic.publishEvent(subscriptionEvent);
-
- } catch (EventAbortedException e) {
- // TODO: handle exception
- }
- } finally {
- if (session != null) {
- try {
- session.close();
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
- }
-
- @Override
- public void destroy() {
- for (TopicKey topicKey : subscribedTopics) {
- org.richfaces.application.push.Topic topic =
topicsContext.getTopic(topicKey);
-
- try {
- topic.publishEvent(new SessionUnsubscriptionEvent(topic, this));
- } catch (EventAbortedException e) {
- // TODO: handle exception
- }
- }
-
+ public void onRequestSuspended() {
// TODO Auto-generated method stub
- super.destroy();
-
- //TODO remove subscriptions
}
-
- @Override
- public void connect(Request request) throws Exception {
- super.connect(request);
-
- request.addListener(this);
-
- jmsSession = messagingContext.createSession();
-
- for (TopicKey topicKey : subscribedTopics) {
- Topic topic = messagingContext.lookup(topicKey);
- String subscriptionId = getSubscriptionClientId(topicKey);
-
- TopicSubscriber subscriber = jmsSession.createDurableSubscriber(topic,
subscriptionId);
- subscriber.setMessageListener(this);
- }
- }
-
- @Override
- public void disconnect(Request request) throws Exception {
- jmsSession.close();
- jmsSession = null;
-
- request.removeListener(this);
-
- super.disconnect(request);
- }
- public void onFlush(Request request) {
- if (request.isPolling()) {
- try {
- jmsSession.close();
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
-
- /* (non-Javadoc)
- * @see
org.richfaces.application.push.RequestLifecycleListener#onSuspend(org.richfaces.application.push.Request)
- */
- public void onSuspend(Request request) {
+ public void onRequestDisconnected() {
// TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see
org.richfaces.application.push.RequestLifecycleListener#onDisconnect(org.richfaces.application.push.Request)
- */
- public void onDisconnect(Request request) {
// TODO Auto-generated method stub
-
try {
- disconnect(request);
+ disconnect();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
- /* (non-Javadoc)
- * @see
org.richfaces.application.push.RequestLifecycleListener#onResume(org.richfaces.application.push.Request)
- */
- public void onResume(Request request) {
+ public void onRequestResumed() {
// TODO Auto-generated method stub
try {
- disconnect(request);
+ disconnect();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
+
+ public Map<TopicKey, String> getFailedSubscriptions() {
+ return failedSubscriptions;
+ }
- private String serializeMessage(org.richfaces.application.push.Topic topic, Message
message) {
- String serializedMessageData = null;
- Object messageData = null;
-
+ public Multimap<TopicKey, TopicKey> getSuccessfulSubscriptions() {
+ return successfulSubscriptions;
+ }
+
+ private void createSubscriptions(Iterable<TopicKey> topicKeys) {
+ javax.jms.Session jmsSession = null;
try {
- if (message instanceof ObjectMessage) {
- messageData = ((ObjectMessage) message).getObject();
- } else if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
+ Multimap<TopicKey, TopicKey> rootTopicsMap =
createRootTopicsKeysMap(topicKeys);
+
+ jmsSession = messagingContext.createSession();
+
+ for (Entry<TopicKey, Collection<TopicKey>> entry:
rootTopicsMap.asMap().entrySet()) {
+ messagingContext.createTopicSubscriber(this, jmsSession, entry);
- if (message.getBooleanProperty(TopicImpl.SERIALIZED_DATA_INDICATOR)) {
- serializedMessageData = textMessage.getText();
- } else {
- messageData = textMessage.getText();
- }
+ successfulSubscriptions.putAll(entry.getKey(), entry.getValue());
}
} catch (JMSException e) {
LOGGER.error(e.getMessage(), e);
+ } catch (NamingException e) {
+ LOGGER.error(e.getMessage(), e);
+ } finally {
+ if (jmsSession != null) {
+ try {
+ jmsSession.close();
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
}
-
- if (serializedMessageData != null) {
- return serializedMessageData;
+ }
+
+ private Multimap<TopicKey, TopicKey>
createRootTopicsKeysMap(Iterable<TopicKey> topicKeys) {
+ Multimap<TopicKey, TopicKey> rootTopicKeys =
ArrayListMultimap.<TopicKey, TopicKey>create();
+
+ for (TopicKey topicKey : topicKeys) {
+ rootTopicKeys.put(topicKey.getRootTopicKey(), topicKey);
}
- if (messageData != null) {
- MessageDataSerializer messageDataSerializer =
topic.getMessageDataSerializer();
- try {
- return messageDataSerializer.serialize(messageData);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- return null;
+ return rootTopicKeys;
}
+
+ private void processFailedSubscriptions(Iterable<TopicKey> topicKeys) {
+ for (Iterator<TopicKey> itr = topicKeys.iterator(); itr.hasNext(); ) {
+ TopicKey topicKey = itr.next();
- /* (non-Javadoc)
- * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
- */
- public void onMessage(Message message) {
- try {
- String topicName = ((Topic) message.getJMSDestination()).getTopicName();
+ TopicKey rootTopicKey = topicKey.getRootTopicKey();
+ Topic pushTopic = topicsContext.getTopic(rootTopicKey);
- org.richfaces.application.push.Topic topic = topicsContext.getTopic(new
TopicKey(topicName));
- if (topic == null) {
- //TODO log
- return;
+ try {
+ //TODO - publish another events
+ pushTopic.publishEvent(new SessionPreSubscriptionEvent(pushTopic,
topicKey, this));
+ } catch (EventAbortedException e) {
+ itr.remove();
+ failedSubscriptions.put(topicKey, e.getMessage());
}
-
- String serializedMessageData = serializeMessage(topic, message);
- if (serializedMessageData == null) {
- //TODO log
- return;
- }
-
- getRequest().postMessage(new TopicKey(topicName), serializedMessageData);
-
- message.acknowledge();
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
}
}
+
+ public void subscribe(String[] topics) {
+ Iterable<TopicKey> topicKeys =
Iterables.transform(Lists.newLinkedList(Arrays.asList(topics)), TopicKey.factory());
+
+ processFailedSubscriptions(topicKeys);
+ createSubscriptions(topicKeys);
+ }
+
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -33,6 +33,8 @@
import org.richfaces.log.Logger;
import org.richfaces.log.RichfacesLogger;
+import com.google.common.base.Strings;
+
/**
* @author Nick Belaevski
*
@@ -52,7 +54,7 @@
}
@Override
- public void publish(Object messageData) throws MessageException {
+ public void publish(String subtopic, Object messageData) throws MessageException {
String serializedData = getMessageDataSerializer().serialize(messageData);
Session session = null;
@@ -64,6 +66,10 @@
textMessage.setText(serializedData);
textMessage.setBooleanProperty(SERIALIZED_DATA_INDICATOR, true);
+ if (!Strings.isNullOrEmpty(subtopic)) {
+ textMessage.setStringProperty(MessagingContext.SUBTOPIC_ATTRIBUTE_NAME,
subtopic);
+ }
+
producer.send(textMessage);
} catch (JMSException e) {
throw new MessageException(e.getMessage(), e);
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/component/AbstractPush.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/component/AbstractPush.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/component/AbstractPush.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -48,8 +48,11 @@
}
@Attribute(required = true)
- public abstract String getTopic();
+ public abstract String getAddress();
@Attribute(events = {@EventName("dataavailable")})
public abstract String getOndataavailable();
+
+ @Attribute(events = {@EventName("error")})
+ public abstract String getOnerror();
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/renderkit/PushRendererBase.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/renderkit/PushRendererBase.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/renderkit/PushRendererBase.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -65,8 +65,9 @@
Map<String, Object> options = new HashMap<String, Object>(2);
- options.put("t", push.getTopic());
- options.put("h", push.getOndataavailable());
+ options.put("address", push.getAddress());
+ options.put("dataHandler", push.getOndataavailable());
+ options.put("errorHandler", push.getOndataavailable());
return ScriptUtils.toScript(options);
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/webapp/PushFilter.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/webapp/PushFilter.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/webapp/PushFilter.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -22,7 +22,9 @@
package org.richfaces.webapp;
import java.io.IOException;
+import java.util.Collections;
import java.util.Enumeration;
+import java.util.Set;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
@@ -39,6 +41,9 @@
import org.richfaces.application.push.PushContext;
import org.richfaces.application.push.impl.AtmosphereHandlerProvider;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+
/**
* @author Nick Belaevski
*
@@ -76,12 +81,23 @@
}
public String getInitParameter(String name) {
- return filterConfig.getInitParameter(name);
+ String result = filterConfig.getInitParameter(name);
+
+ if (result == null) {
+ result = filterConfig.getServletContext().getInitParameter(name);
+ }
+
+ return result;
}
@SuppressWarnings("unchecked")
public Enumeration<String> getInitParameterNames() {
- return filterConfig.getInitParameterNames();
+ Set<String> result = Sets.newLinkedHashSet();
+
+ result.addAll(Collections.list(filterConfig.getInitParameterNames()));
+
result.addAll(Collections.list(filterConfig.getServletContext().getInitParameterNames()));
+
+ return Iterators.asEnumeration(result.iterator());
}
}
Modified:
branches/RF-7817/push-redesign/src/main/resources/META-INF/resources/org.richfaces/push.js
===================================================================
---
branches/RF-7817/push-redesign/src/main/resources/META-INF/resources/org.richfaces/push.js 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign/src/main/resources/META-INF/resources/org.richfaces/push.js 2010-10-27
00:57:52 UTC (rev 19676)
@@ -19,20 +19,34 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
-(function(richfaces, _$) {
+(function(jsf, richfaces, _$) {
+ var COMPONENT_NAME = "Push";
+
+ var DATA_EVENT_NAME = COMPONENT_NAME + richfaces.Event.EVENT_NAMESPACE_SEPARATOR +
'dataAvailable';
+
+ var ERROR_EVENT_NAME = COMPONENT_NAME + richfaces.Event.EVENT_NAMESPACE_SEPARATOR +
'error';
+
+ var getDataEventNamespace = function(address) {
+ return richfaces.Event.createNamespace(DATA_EVENT_NAME, address);
+ };
+
+ var getErrorEventNamespace = function(address) {
+ return richfaces.Event.createNamespace(ERROR_EVENT_NAME, address);
+ };
+
richfaces.Push = (function() {
var addedTopics = {};
-
+
var removedTopics = {};
-
- var topicHandlers = {};
-
+
+ var handlersCounter = {};
+
var pushUrl = null;
-
+
var pushSessionId = null;
-
+
var suspendMessageEndMarker = /(<!--[^>]+-->\s*)+/;
var messageCallback = function(response) {
@@ -42,34 +56,51 @@
if (messages) {
for (var i = 0; i < messages.length; i++) {
var message = messages[i];
-
- var handlers = topicHandlers[message.topic] || [];
- for (var i = 0; i < handlers.length; i++) {
- var handler = handlers[i];
- handler(message.data);
- }
+
+ richfaces.Event.fire(document, getDataEventNamespace(message.topic),
message.data);
}
}
}
};
-
+
var connect = function() {
var pushSessionIdRequestHandler = function(data) {
- pushSessionId = data;
- _$.atmosphere.subscribe(pushUrl +
"?__richfacesPushAsync=1&pushSessionId=" + pushSessionId, messageCallback,
{
- transport: 'websocket'
- });
+ var subscriptionData = _$.parseJSON(data);
+
+
+ for (var failedTopicKey in subscriptionData.failures) {
+ richfaces.Event.fire(
+ document,
+ getErrorEventNamespace(message.topic),
+ subscriptionData.failures[failedTopicKey]
+ );
+ }
+
+ if (subscriptionData.sessionId) {
+ pushSessionId = subscriptionData.sessionId;
+
+ _$.atmosphere.subscribe(pushUrl +
"?__richfacesPushAsync=1&pushSessionId=" + pushSessionId, messageCallback,
{
+ /*transport: 'websocket'*/
+ });
+ }
};
-
+
var topics = new Array();
- for (var topicName in topicHandlers) {
+ for (var topicName in handlersCounter) {
topics.push(topicName);
}
-
+
+ var data = {
+ "pushTopic": topics
+ };
+
+ if (pushSessionId) {
+ data['forgetPushSessionId'] = pushSessionId;
+ }
+
//TODO handle request errors
- //TODO separate URLs
_$.ajax({
- data: {"pushTopic": topics},
+ data: data,
dataType: 'text',
success: pushSessionIdRequestHandler,
traditional: true,
@@ -79,87 +110,124 @@
};
var disconnect = function() {
-
+ _$.atmosphere.closeSuspendedConnection();
};
-
+
return {
- subscribe: function(topic, handler) {
- var handlersArray = topicHandlers[topic];
- if (!handlersArray) {
- handlersArray = new Array();
- topicHandlers[topic] = handlersArray;
- addedTopics[topic] = true;
-
- handlersArray.push(handler);
+ increaseSubscriptionCounters: function(address) {
+ if (isNaN(handlersCounter[address]++)) {
+ handlersCounter[address] = 1;
+ addedTopics[address] = true;
}
},
- unsubscribe: function(topic, handler) {
- var handlersArray = topicHandlers[topic];
- if (!handlersArray) {
- return;
+ decreaseSubscriptionCounters: function(address) {
+ if (--handlersCounter[address] == 0) {
+ delete handlersCounter[address];
+ removedTopics[address] = true;
}
-
- var idx = _$.inArray(handler, handlersArray);
- if (idx < 0) {
- return;
- }
-
- handlersArray.splice(idx, 1);
+ },
- if (handlersArray.length == 0) {
- delete topicHandlers[topic];
- removedTopics[topic] = true;
+ setPushUrl: function(argPushUrl) {
+ if (argPushUrl.charAt(0) == '/') {
+ pushUrl = location.protocol + '//' + location.host + argPushUrl;
+ } else {
+ pushUrl = argPushUrl;
}
},
-
- setPushUrl: function(argPushUrl) {
- pushUrl = argPushUrl;
- },
-
+
updateConnection: function() {
- if (_$.isEmptyObject(topicHandlers)) {
+ if (_$.isEmptyObject(handlersCounter)) {
disconnect();
} else if (!_$.isEmptyObject(addedTopics) || !_$.isEmptyObject(removedTopics)) {
+ disconnect();
connect();
}
-
+
addedTopics = {};
removedTopics = {};
}
};
-
+
}());
_$(document).ready(richfaces.Push.updateConnection);
-
- richfaces.ui = richfaces.ui || {};
+ var ajaxEventHandler = function(event) {
+ if (event.type == 'event') {
+ if (event.status != 'success') {
+ return;
+ }
+ } else if (event.type != 'error') {
+ return;
+ }
+
+ richfaces.Push.updateConnection();
+ };
+
+ jsf.ajax.addOnEvent(ajaxEventHandler);
+ jsf.ajax.addOnError(ajaxEventHandler);
+
+ richfaces.ui = richfaces.ui || {};
+
richfaces.ui.Push = richfaces.BaseComponent.extendClass({
-
- name: "Push",
- init: function (id, options) {
- this.id = id;
- this.__options = options;
- this.attachToDom();
+ name: COMPONENT_NAME,
- if (this.__options.h) {
- //TODO check compatibility with f:ajax
- this.__handlerFunction = new Function("data", this.__options.h);
- richfaces.Push.subscribe(this.__options.t, this.__handlerFunction);
- }
- },
-
- destroy: function() {
- if (this.__handlerFunction) {
- richfaces.Push.unsubscribe(this.__options.t, this.__handlerFunction);
- this.__handlerFunction = null;
- }
-
- this.__options = null;
- }
-
+ init: function (id, options) {
+ this.id = id;
+ this.attachToDom();
+
+ this.__address = options.address;
+ this.__handlers = {};
+
+ if (options.dataHandler) {
+ //TODO check compatibility with f:ajax
+ this.__bindDataHandler(options.errorHandler);
+ }
+
+ if (options.errorHandler) {
+ //TODO check compatibility with f:ajax
+ this.__bindErrorHandler(options.errorHandler);
+ }
+
+ richfaces.Push.increaseSubscriptionCounters(this.__address);
+ },
+
+ __bindDataHandler: function(handlerCode) {
+ var ns = getDataEventNamespace(this.__address)
+ this.__handlers.data = richfaces.Event.bind(document, ns, new
Function("event", "object", "data", handlerCode));
+ },
+
+ __unbindDataHandler: function() {
+ if (this.__handlers.data) {
+ var ns = getDataEventNamespace(this.__address);
+ richfaces.Event.unbind(document, ns, this.__handlers.data);
+
+ this.__handlers.data = null;
+ }
+ },
+
+ __bindErrorHandler: function(handlerCode) {
+ var ns = getErrorEventNamespace(this.__address);
+ this.__handlers.error = richfaces.Event.bind(document, ns, new
Function("event", "object", "data", handlerCode));
+ },
+
+ __unbindErrorHandler: function() {
+ if (this.__handlers.error) {
+ var ns = getErrorEventNamespace(this.__address);
+ richfaces.Event.unbind(document, ns, this.__handlers.error);
+
+ this.__handlers.error = null;
+ }
+ },
+
+ destroy: function() {
+ this.__unbindDataHandler();
+ this.__unbindErrorHandler();
+
+ richfaces.Push.decreaseSubscriptionCounters(this.__address);
+ }
});
-
-}(window.RichFaces, jQuery));
\ No newline at end of file
+
+}(jsf, window.RichFaces, jQuery));
\ No newline at end of file
Modified: branches/RF-7817/push-redesign/src/main/templates/push.template.xml
===================================================================
--- branches/RF-7817/push-redesign/src/main/templates/push.template.xml 2010-10-26
22:19:31 UTC (rev 19675)
+++ branches/RF-7817/push-redesign/src/main/templates/push.template.xml 2010-10-27
00:57:52 UTC (rev 19676)
@@ -11,8 +11,10 @@
<cdk:renderer-type>org.richfaces.PushRenderer</cdk:renderer-type>
<cdk:renders-children>true</cdk:renders-children>
+ <cdk:resource-dependency library="javax.faces"
name="jsf.js" />
<cdk:resource-dependency library="org.jquery"
name="jquery.js" />
<cdk:resource-dependency name="richfaces.js" />
+ <cdk:resource-dependency name="richfaces-event.js" />
<cdk:resource-dependency name="richfaces-base-component.js" />
<cdk:resource-dependency library="net.java.dev.atmosphere"
name="jquery-atmosphere.js" />
<cdk:resource-dependency library="org.richfaces"
name="push.js" />
Modified: branches/RF-7817/push-redesign-app/pom.xml
===================================================================
--- branches/RF-7817/push-redesign-app/pom.xml 2010-10-26 22:19:31 UTC (rev 19675)
+++ branches/RF-7817/push-redesign-app/pom.xml 2010-10-27 00:57:52 UTC (rev 19676)
@@ -54,6 +54,11 @@
<version>1.1</version>
</dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>jstl</artifactId>
+ <version>1.2</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
Added: branches/RF-7817/push-redesign-app/src/main/java/demo/Channel.java
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/java/demo/Channel.java
(rev 0)
+++ branches/RF-7817/push-redesign-app/src/main/java/demo/Channel.java 2010-10-27 00:57:52
UTC (rev 19676)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package demo;
+
+import java.io.Serializable;
+
+import javax.faces.context.ExternalContext;
+import javax.faces.context.FacesContext;
+import javax.faces.event.ValueChangeEvent;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class Channel implements Serializable {
+
+ private static final long serialVersionUID = 6798558262812940593L;
+
+ private String name;
+
+ private boolean rendered = true;
+
+ public Channel(String name) {
+ super();
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isRendered() {
+ return rendered;
+ }
+
+ public void setRendered(boolean rendered) {
+ this.rendered = rendered;
+ }
+
+ public void processChannelStateChange(ValueChangeEvent event) {
+ setRendered(Boolean.TRUE.equals(event.getNewValue()));
+
+ ExternalContext externalContext =
FacesContext.getCurrentInstance().getExternalContext();
+ ChatBean chatBean = (ChatBean)
externalContext.getSessionMap().get("chatBean");
+
+ chatBean.handleStateChange(this);
+ }
+}
Added: branches/RF-7817/push-redesign-app/src/main/java/demo/ChannelsBean.java
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/java/demo/ChannelsBean.java
(rev 0)
+++ branches/RF-7817/push-redesign-app/src/main/java/demo/ChannelsBean.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package demo;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.faces.bean.ManagedBean;
+import javax.faces.bean.SessionScoped;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+@SessionScoped
+@ManagedBean
+public class ChannelsBean {
+
+ private List<Channel> channels = new ArrayList<Channel>();
+
+ public ChannelsBean() {
+ channels.add(new Channel("upgrade"));
+ channels.add(new Channel("programming"));
+ channels.add(new Channel("hardware"));
+ channels.add(new Channel("os"));
+ }
+
+ public List<Channel> getChannels() {
+ return channels;
+ }
+}
Modified: branches/RF-7817/push-redesign-app/src/main/java/demo/ChatBean.java
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/java/demo/ChatBean.java 2010-10-26
22:19:31 UTC (rev 19675)
+++ branches/RF-7817/push-redesign-app/src/main/java/demo/ChatBean.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -27,6 +27,7 @@
import javax.annotation.PostConstruct;
import javax.faces.bean.ManagedBean;
+import javax.faces.bean.ManagedProperty;
import javax.faces.bean.SessionScoped;
import org.richfaces.application.push.MessageException;
@@ -55,6 +56,11 @@
private transient TopicsContext topicsContext;
+ private String subchannel;
+
+ @ManagedProperty("#{channelsBean}")
+ private ChannelsBean channelsBean;
+
@PostConstruct
public void init() {
topicsContext = TopicsContext.lookup();
@@ -76,6 +82,15 @@
this.userName = userName;
}
+ private void publishStateChangeMessage(String name, String action) {
+ try {
+ topicsContext.publish(new TopicKey("chat", name),
MessageFormat.format("*** {0} {1} chat in {2,time,medium}",
+ userName, action, new Date()));
+ } catch (MessageException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
public void joinChat() {
if (!chatJoined) {
if (userName == null) {
@@ -83,18 +98,27 @@
}
chatJoined = true;
- try {
- topicsContext.publish(new TopicKey("chat"),
MessageFormat.format("*** {0} joined chat in {1,time,medium}",
- userName, new Date()));
- } catch (MessageException e) {
- LOGGER.error(e.getMessage(), e);
+
+ for (Channel subchannel : channelsBean.getChannels()) {
+ publishStateChangeMessage(subchannel.getName(), "joined");
}
}
}
+ public void handleStateChange(Channel channel) {
+ String action;
+ if (channel.isRendered()) {
+ action = "joined";
+ } else {
+ action = "left";
+ }
+
+ publishStateChangeMessage(channel.getName(), action);
+ }
+
public void say() {
try {
- topicsContext.publish(new TopicKey("chat"),
MessageFormat.format("{0,time,medium} {1}: {2}", new Date(),
+ topicsContext.publish(new TopicKey("chat", subchannel),
MessageFormat.format("{0,time,medium} {1}: {2}", new Date(),
userName, message));
} catch (MessageException e) {
LOGGER.error(e.getMessage(), e);
@@ -104,5 +128,26 @@
public void setTopicsContext(TopicsContext topicsContext) {
this.topicsContext = topicsContext;
}
+
+ /**
+ * @return the subchannel
+ */
+ public String getSubchannel() {
+ return subchannel;
+ }
+
+ /**
+ * @param subchannel the subchannel to set
+ */
+ public void setSubchannel(String subchannel) {
+ this.subchannel = subchannel;
+ }
+ /**
+ * @param channelsBean the channelsBean to set
+ */
+ public void setChannelsBean(ChannelsBean channelsBean) {
+ this.channelsBean = channelsBean;
+ }
+
}
Modified: branches/RF-7817/push-redesign-app/src/main/java/demo/TopicsInitializer.java
===================================================================
---
branches/RF-7817/push-redesign-app/src/main/java/demo/TopicsInitializer.java 2010-10-26
22:19:31 UTC (rev 19675)
+++
branches/RF-7817/push-redesign-app/src/main/java/demo/TopicsInitializer.java 2010-10-27
00:57:52 UTC (rev 19676)
@@ -59,21 +59,21 @@
@Override
public void processUnsubscriptionEvent(SessionUnsubscriptionEvent event)
throws EventAbortedException {
- TopicKey topicKey = event.getTopic().getKey();
+ TopicKey topicKey = event.getTopicKey();
Session session = event.getSession();
- System.out.println(MessageFormat.format("Session {0} disconnected
from {1}", session.getId(), topicKey.getTopicName()));
+ System.out.println(MessageFormat.format("Session {0} disconnected
from {1}", session.getId(), topicKey.getTopicAddress()));
}
@Override
public void processSubscriptionEvent(SessionSubscriptionEvent event) throws
EventAbortedException {
- TopicKey topicKey = event.getTopic().getKey();
+ TopicKey topicKey = event.getTopicKey();
Session session = event.getSession();
FacesContext facesContext = FacesContext.getCurrentInstance();
HttpServletRequest hsr = (HttpServletRequest)
facesContext.getExternalContext().getRequest();
System.out.println(MessageFormat.format("Session {0} connected to
{1} from {2}", session.getId(),
- topicKey.getTopicName(), hsr.getRemoteAddr()));
+ topicKey.getTopicAddress(), hsr.getRemoteAddr()));
}
@Override
Modified: branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/web.xml
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/web.xml 2010-10-26 22:19:31
UTC (rev 19675)
+++ branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/web.xml 2010-10-27 00:57:52
UTC (rev 19676)
@@ -24,23 +24,12 @@
<async-supported>true</async-supported>
</filter>
- <!--
+ <!-- context-param>
+ <param-name>org.atmosphere.useWebSocket</param-name>
+ <param-value>true</param-value>
+ </context-param-->
+
<servlet>
- <servlet-name>push</servlet-name>
- <servlet-class>org.richfaces.webapp.PushServlet</servlet-class>
- <init-param>
- <param-name>org.atmosphere.useWebSocket</param-name>
- <param-value>true</param-value>
- </init-param>
- <load-on-startup>1</load-on-startup>
- <async-supported>true</async-supported>
- </servlet>
- <servlet-mapping>
- <servlet-name>push</servlet-name>
- <url-pattern>/push/*</url-pattern>
- </servlet-mapping>
- -->
- <servlet>
<servlet-name>Faces Servlet</servlet-name>
<servlet-class>javax.faces.webapp.FacesServlet</servlet-class>
<load-on-startup>1</load-on-startup>
Modified: branches/RF-7817/push-redesign-app/src/main/webapp/chat.xhtml
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/webapp/chat.xhtml 2010-10-26 22:19:31 UTC
(rev 19675)
+++ branches/RF-7817/push-redesign-app/src/main/webapp/chat.xhtml 2010-10-27 00:57:52 UTC
(rev 19676)
@@ -5,6 +5,7 @@
xmlns:ui="http://java.sun.com/jsf/facelets"
xmlns:h="http://java.sun.com/jsf/html"
xmlns:f="http://java.sun.com/jsf/core"
+
xmlns:c="http://java.sun.com/jsp/jstl/core"
xmlns:p="http://richfaces.org/push-redesign">
<f:view>
@@ -13,15 +14,25 @@
<f:event type="preRenderView"
listener="#{chatBean.joinChat}"/>
</f:metadata>
<h:head>
+ <h:outputStylesheet>
+ .pushBlock {
+ width: 300px;
+ float: left;
+ border: 2px dotted navy;
+ margin: 10px;
+ }
+
+ .chatOutput {
+ width: 280px;
+ height: 200px;
+ overflow: auto;
+ }
+ </h:outputStylesheet>
</h:head>
<h:body>
<h:form id="form">
- <p:push topic="chat" ondataavailable="jQuery('<div
/>').prependTo('.chatOutput').text(data)" />
-
You are: #{chatBean.userName} <br />
- <h:inputText styleClass="messageInput"
value="#{chatBean.message}" size="40" />
-
<script type="text/javascript">
function clearInput(event) {
if (event.status == 'success') {
@@ -30,11 +41,31 @@
}
</script>
- <h:commandLink value="ajax" action="#{chatBean.say}">
- <f:ajax execute="@form" onevent="clearInput" />
- </h:commandLink>
-
- <h:panelGroup id="group" styleClass="chatOutput"
layout="block" />
+ <c:forEach var="channel" items="#{channelsBean.channels}">
+ <c:set var="channelName" value="#{channel.name}" />
+
+ <h:panelGroup styleClass="pushBlock" layout="block"
id="pushBlock_#{channelName}">
+
+ Subchannel: #{channelName}
+
+ <h:selectBooleanCheckbox value="true"
valueChangeListener="#{channel.processChannelStateChange}">
+ <f:ajax event="click" render="pushBlock_#{channelName}"
/>
+ </h:selectBooleanCheckbox>
+
+ <h:panelGroup layout="block"
rendered="#{channel.rendered}">
+ <p:push address="#{channelName}@chat"
onerror="alert(event.data)" ondataavailable="jQuery('<div
/>').prependTo('.#{channelName}Output').text(data)" />
+
+ <h:inputText styleClass="messageInput"
value="#{chatBean.message}" size="40" />
+
+ <h:commandLink value="ajax" action="#{chatBean.say}">
+ <f:setPropertyActionListener value="#{channelName}"
target="#{chatBean.subchannel}" />
+ <f:ajax execute="pushBlock_#{channelName}"
onevent="clearInput" />
+ </h:commandLink>
+
+ <h:panelGroup styleClass="#{channelName}Output chatOutput"
layout="block" />
+ </h:panelGroup>
+ </h:panelGroup>
+ </c:forEach>
</h:form>
</h:body>
</f:view>
Modified: branches/RF-7817/ui/common/ui/checkstyle-suppressions.xml
===================================================================
--- branches/RF-7817/ui/common/ui/checkstyle-suppressions.xml 2010-10-26 22:19:31 UTC (rev
19675)
+++ branches/RF-7817/ui/common/ui/checkstyle-suppressions.xml 2010-10-27 00:57:52 UTC (rev
19676)
@@ -36,5 +36,5 @@
<!-- TODO it is hot fix for building process, this files belong to the
push-redesign module,
and must be removed from here -->
- <suppress checks="IllegalCatch" files="RequestImpl.java" />
+ <suppress checks="IllegalCatch" files="AbstractRequest.java"
/>
</suppressions>
\ No newline at end of file
Modified: branches/RF-7817/ui/output/ui/checkstyle-suppressions.xml
===================================================================
--- branches/RF-7817/ui/output/ui/checkstyle-suppressions.xml 2010-10-26 22:19:31 UTC (rev
19675)
+++ branches/RF-7817/ui/output/ui/checkstyle-suppressions.xml 2010-10-27 00:57:52 UTC (rev
19676)
@@ -36,5 +36,5 @@
<!-- TODO it is hot fix for building process, this files belong to the
push-redesign module,
and must be removed from here -->
- <suppress checks="IllegalCatch" files="RequestImpl.java" />
+ <suppress checks="IllegalCatch" files="AbstractRequest.java"
/>
</suppressions>