Author: nbelaevski
Date: 2011-04-28 12:58:13 -0400 (Thu, 28 Apr 2011)
New Revision: 22451
Added:
trunk/core/api/src/main/java/org/richfaces/application/push/MessageData.java
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicListener2.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/MessageDataScriptString.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/PushContextFactoryImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/PushContextImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionFactoryImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionTopicListenerWrapper.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/TopicImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/TopicsContextImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/JMSTopicsContextImpl.java
trunk/core/impl/src/main/java/org/richfaces/webapp/PushHandlerFilter.java
trunk/core/impl/src/main/java/org/richfaces/webapp/PushServlet.java
trunk/core/impl/src/main/java/org/richfaces/webapp/PushServletContainerInitializer.java
trunk/core/impl/src/main/resources/META-INF/services/
trunk/core/impl/src/main/resources/META-INF/services/javax.servlet.ServletContainerInitializer
trunk/examples/push-demo/src/main/java/demo/JMSBean.java
Removed:
trunk/core/api/src/main/java/org/richfaces/application/push/MessageListener.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AtmosphereHandlerProvider.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/ConsumingCollection.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextFactoryImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicsContextImpl.java
Modified:
trunk/bom/pom.xml
trunk/core/api/pom.xml
trunk/core/api/src/main/java/org/richfaces/application/push/EventAbortedException.java
trunk/core/api/src/main/java/org/richfaces/application/push/PushContext.java
trunk/core/api/src/main/java/org/richfaces/application/push/Request.java
trunk/core/api/src/main/java/org/richfaces/application/push/Session.java
trunk/core/api/src/main/java/org/richfaces/application/push/SessionPreSubscriptionEvent.java
trunk/core/api/src/main/java/org/richfaces/application/push/SessionSubscriptionEvent.java
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicEvent.java
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicListener.java
trunk/core/api/src/main/java/org/richfaces/application/push/SessionUnsubscriptionEvent.java
trunk/core/api/src/main/java/org/richfaces/application/push/Topic.java
trunk/core/api/src/main/java/org/richfaces/application/push/TopicEvent.java
trunk/core/api/src/main/java/org/richfaces/application/push/TopicsContext.java
trunk/core/impl/pom.xml
trunk/core/impl/src/main/java/org/richfaces/application/CoreConfiguration.java
trunk/core/impl/src/main/java/org/richfaces/application/DefaultModule.java
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractTopic.java
trunk/core/impl/src/main/java/org/richfaces/webapp/PushFilter.java
trunk/examples/irc-client/src/main/java/org/ircclient/listeners/TopicsInitializer.java
trunk/examples/push-demo/pom.xml
trunk/examples/push-demo/src/main/java/demo/ChatBean.java
trunk/examples/push-demo/src/main/java/demo/TopicsInitializer.java
trunk/ui/core/ui/src/main/java/org/richfaces/renderkit/PushRendererBase.java
trunk/ui/core/ui/src/main/resources/META-INF/resources/org.richfaces/push.js
trunk/ui/core/ui/src/main/templates/org/ajax4jsf/renderkit/html/push.template.xml
Log:
https://issues.jboss.org/browse/RFPL-1431
https://issues.jboss.org/browse/RF-10937
Big push rework
Modified: trunk/bom/pom.xml
===================================================================
--- trunk/bom/pom.xml 2011-04-28 14:16:16 UTC (rev 22450)
+++ trunk/bom/pom.xml 2011-04-28 16:58:13 UTC (rev 22451)
@@ -169,6 +169,11 @@
<version>2.5</version>
</dependency>
<dependency>
+ <groupId>org.jboss.spec.javax.servlet</groupId>
+ <artifactId>jboss-servlet-api_3.0_spec</artifactId>
+ <version>1.0.0.Final</version>
+ </dependency>
+ <dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<version>2.1</version>
@@ -216,7 +221,7 @@
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime</artifactId>
- <version>0.6.5</version>
+ <version>0.7.1</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
Modified: trunk/core/api/pom.xml
===================================================================
--- trunk/core/api/pom.xml 2011-04-28 14:16:16 UTC (rev 22450)
+++ trunk/core/api/pom.xml 2011-04-28 16:58:13 UTC (rev 22451)
@@ -48,8 +48,8 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
+ <groupId>org.jboss.spec.javax.servlet</groupId>
+ <artifactId>jboss-servlet-api_3.0_spec</artifactId>
<scope>provided</scope>
</dependency>
Modified:
trunk/core/api/src/main/java/org/richfaces/application/push/EventAbortedException.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/EventAbortedException.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/EventAbortedException.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -25,6 +25,7 @@
* @author Nick Belaevski
*
*/
+@Deprecated
public class EventAbortedException extends Exception {
private static final long serialVersionUID = -1546282468438542993L;
Added: trunk/core/api/src/main/java/org/richfaces/application/push/MessageData.java
===================================================================
--- trunk/core/api/src/main/java/org/richfaces/application/push/MessageData.java
(rev 0)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/MessageData.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push;
+
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public final class MessageData {
+
+ private final TopicKey topicKey;
+
+ private final String serializedMessage;
+
+ private final long sequenceNumber;
+
+ public MessageData(TopicKey topicKey, String serializedMessageData, long
sequenceNumber) {
+ super();
+ this.topicKey = topicKey;
+ this.serializedMessage = serializedMessageData;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public TopicKey getTopicKey() {
+ return topicKey;
+ }
+
+ public String getSerializedMessage() {
+ return serializedMessage;
+ }
+
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+}
Deleted: trunk/core/api/src/main/java/org/richfaces/application/push/MessageListener.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/MessageListener.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/MessageListener.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,34 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2011, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push;
-
-import java.util.EventListener;
-
-/**
- * @author Nick Belaevski
- *
- */
-public interface MessageListener extends EventListener {
-
- public void onMessage(Object message) throws MessageException;
-
-}
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/PushContext.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/PushContext.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/PushContext.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -37,6 +37,8 @@
public SessionManager getSessionManager();
+ public String getPushHandlerUrl();
+
public void init(FacesContext facesContext);
public void destroy();
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/Request.java
===================================================================
--- trunk/core/api/src/main/java/org/richfaces/application/push/Request.java 2011-04-28
14:16:16 UTC (rev 22450)
+++ trunk/core/api/src/main/java/org/richfaces/application/push/Request.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -21,9 +21,9 @@
*/
package org.richfaces.application.push;
-import java.io.IOException;
+
/**
* @author Nick Belaevski
*
@@ -32,19 +32,15 @@
//TODO expose request/session/application maps
- public void flushMessages() throws IOException;
-
//TODO suspend with timeout?
- public void suspend() throws IOException;
+ public void suspend();
- public void resume() throws IOException;
+ public void resume();
public Session getSession();
- public boolean isSuspended();
-
public boolean isPolling();
+
+ public void postMessages();
- public MessageListener getMessageListener();
-
}
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/Session.java
===================================================================
--- trunk/core/api/src/main/java/org/richfaces/application/push/Session.java 2011-04-28
14:16:16 UTC (rev 22450)
+++ trunk/core/api/src/main/java/org/richfaces/application/push/Session.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -21,12 +21,11 @@
*/
package org.richfaces.application.push;
+import java.util.Collection;
import java.util.Map;
-import com.google.common.collect.Multimap;
-
/**
* @author Nick Belaevski
*
@@ -39,7 +38,7 @@
public String getId();
- public Multimap<TopicKey, TopicKey> getSuccessfulSubscriptions();
+ public Collection<TopicKey> getSuccessfulSubscriptions();
public Map<TopicKey, String> getFailedSubscriptions();
@@ -51,4 +50,10 @@
public void invalidate();
+ public void push(TopicKey topicKey, String serializedData);
+
+ public Collection<MessageData> getMessages();
+
+ public void clearBroadcastedMessages(long sequenceNumber);
+
}
Modified:
trunk/core/api/src/main/java/org/richfaces/application/push/SessionPreSubscriptionEvent.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/SessionPreSubscriptionEvent.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/SessionPreSubscriptionEvent.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -21,7 +21,9 @@
*/
package org.richfaces.application.push;
+import java.util.EventListener;
+
/**
* @author Nick Belaevski
*
@@ -35,7 +37,7 @@
}
@Override
- public void invokeListener(TopicListener listener) throws EventAbortedException {
- ((SessionTopicListener) listener).processPreSubscriptionEvent(this);
+ public void invokeListener(EventListener listener) throws
SubscriptionFailureException {
+ ((SessionTopicListener2) listener).processPreSubscriptionEvent(this);
}
}
Modified:
trunk/core/api/src/main/java/org/richfaces/application/push/SessionSubscriptionEvent.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/SessionSubscriptionEvent.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/SessionSubscriptionEvent.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -21,7 +21,9 @@
*/
package org.richfaces.application.push;
+import java.util.EventListener;
+
/**
* @author Nick Belaevski
*
@@ -35,7 +37,7 @@
}
@Override
- public void invokeListener(TopicListener listener) throws EventAbortedException {
- ((SessionTopicListener) listener).processSubscriptionEvent(this);
+ public void invokeListener(EventListener listener) {
+ ((SessionTopicListener2) listener).processSubscriptionEvent(this);
}
}
Modified:
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicEvent.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicEvent.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicEvent.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -21,6 +21,8 @@
*/
package org.richfaces.application.push;
+import java.util.EventListener;
+
/**
* @author Nick Belaevski
*
@@ -48,7 +50,7 @@
}
@Override
- public boolean isAppropriateListener(TopicListener listener) {
+ public boolean isAppropriateListener(EventListener listener) {
return (listener instanceof SessionTopicListener);
}
}
Modified:
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicListener.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicListener.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicListener.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -23,9 +23,11 @@
/**
+ * Deprecated by {@link SessionTopicListener2}
+ *
* @author Nick Belaevski
- *
*/
+@Deprecated
public interface SessionTopicListener extends TopicListener {
public void processPreSubscriptionEvent(SessionPreSubscriptionEvent event) throws
EventAbortedException;
Added:
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicListener2.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicListener2.java
(rev 0)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/SessionTopicListener2.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public interface SessionTopicListener2 extends TopicListener {
+
+ public void processPreSubscriptionEvent(SessionPreSubscriptionEvent event) throws
SubscriptionFailureException;
+
+ public void processSubscriptionEvent(SessionSubscriptionEvent event);
+
+ public void processUnsubscriptionEvent(SessionUnsubscriptionEvent event);
+
+}
Modified:
trunk/core/api/src/main/java/org/richfaces/application/push/SessionUnsubscriptionEvent.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/SessionUnsubscriptionEvent.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/SessionUnsubscriptionEvent.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -21,7 +21,9 @@
*/
package org.richfaces.application.push;
+import java.util.EventListener;
+
/**
* @author Nick Belaevski
*
@@ -35,7 +37,7 @@
}
@Override
- public void invokeListener(TopicListener listener) throws EventAbortedException {
- ((SessionTopicListener) listener).processUnsubscriptionEvent(this);
+ public void invokeListener(EventListener listener) {
+ ((SessionTopicListener2) listener).processUnsubscriptionEvent(this);
}
}
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/Topic.java
===================================================================
--- trunk/core/api/src/main/java/org/richfaces/application/push/Topic.java 2011-04-28
14:16:16 UTC (rev 22450)
+++ trunk/core/api/src/main/java/org/richfaces/application/push/Topic.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -39,8 +39,10 @@
public void removeTopicListener(TopicListener topicListener);
- public void publishEvent(TopicEvent event) throws EventAbortedException;
+ public void checkSubscription(TopicKey key, Session session) throws
SubscriptionFailureException;
+
+ public void publishEvent(TopicEvent event);
- public void publish(String subtopic, Object messageData) throws MessageException;
+ public void publish(TopicKey key, Object messageData) throws MessageException;
}
\ No newline at end of file
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/TopicEvent.java
===================================================================
--- trunk/core/api/src/main/java/org/richfaces/application/push/TopicEvent.java 2011-04-28
14:16:16 UTC (rev 22450)
+++ trunk/core/api/src/main/java/org/richfaces/application/push/TopicEvent.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -21,6 +21,7 @@
*/
package org.richfaces.application.push;
+import java.util.EventListener;
import java.util.EventObject;
/**
@@ -39,11 +40,11 @@
return (Topic) source;
}
- public boolean isAppropriateListener(TopicListener listener) {
+ public boolean isAppropriateListener(EventListener listener) {
return false;
}
- public void invokeListener(TopicListener listener) throws EventAbortedException {
+ public void invokeListener(EventListener listener) throws
SubscriptionFailureException {
throw new IllegalArgumentException(listener.getClass().getName());
}
}
Modified: trunk/core/api/src/main/java/org/richfaces/application/push/TopicsContext.java
===================================================================
---
trunk/core/api/src/main/java/org/richfaces/application/push/TopicsContext.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/api/src/main/java/org/richfaces/application/push/TopicsContext.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -64,10 +64,10 @@
Topic topic = getTopic(key);
if (topic == null) {
- throw new MessageException(MessageFormat.format("Topic {0} not
found", key.getTopicAddress()));
+ throw new MessageException(MessageFormat.format("Topic {0} not
found", key.getTopicName()));
}
- topic.publish(key.getSubtopicName(), data);
+ topic.publish(key, data);
}
public static TopicsContext lookup() {
Modified: trunk/core/impl/pom.xml
===================================================================
--- trunk/core/impl/pom.xml 2011-04-28 14:16:16 UTC (rev 22450)
+++ trunk/core/impl/pom.xml 2011-04-28 16:58:13 UTC (rev 22451)
@@ -55,8 +55,8 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
+ <groupId>org.jboss.spec.javax.servlet</groupId>
+ <artifactId>jboss-servlet-api_3.0_spec</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@@ -104,6 +104,13 @@
<groupId>org.jboss.test-jsf</groupId>
<artifactId>htmlunit-client</artifactId>
<scope>test</scope>
+
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.jboss.test-jsf</groupId>
Modified: trunk/core/impl/src/main/java/org/richfaces/application/CoreConfiguration.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/CoreConfiguration.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/CoreConfiguration.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -76,7 +76,10 @@
@ConfigurationItem(defaultValue = "true", names =
"org.richfaces.executeAWTInitializer", literal = true)
executeAWTInitializer,
-
+
+ @ConfigurationItem(names = "org.richfaces.push.handlerMapping", literal
= true)
+ pushHandlerMapping,
+
@ConfigurationItem(defaultValue = "/ConnectionFactory", names =
"org.richfaces.push.jms.connectionFactory")
pushJMSConnectionFactory,
Modified: trunk/core/impl/src/main/java/org/richfaces/application/DefaultModule.java
===================================================================
--- trunk/core/impl/src/main/java/org/richfaces/application/DefaultModule.java 2011-04-28
14:16:16 UTC (rev 22450)
+++ trunk/core/impl/src/main/java/org/richfaces/application/DefaultModule.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -3,7 +3,7 @@
import org.richfaces.application.configuration.ConfigurationService;
import org.richfaces.application.configuration.ConfigurationServiceImpl;
import org.richfaces.application.push.PushContextFactory;
-import org.richfaces.application.push.impl.jms.PushContextFactoryImpl;
+import org.richfaces.application.push.impl.PushContextFactoryImpl;
import org.richfaces.cache.Cache;
import org.richfaces.el.GenericsIntrospectionService;
import org.richfaces.el.GenericsIntrospectionServiceImpl;
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,257 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.ajax4jsf.javascript.JSLiteral;
-import org.ajax4jsf.javascript.ScriptString;
-import org.ajax4jsf.javascript.ScriptUtils;
-import org.atmosphere.cpr.AtmosphereEventLifecycle;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResourceEvent;
-import org.atmosphere.cpr.AtmosphereResourceEventListener;
-import org.atmosphere.websocket.WebSocketSupport;
-import org.richfaces.application.push.Request;
-import org.richfaces.application.push.Session;
-import org.richfaces.application.push.TopicKey;
-import org.richfaces.log.Logger;
-import org.richfaces.log.RichfacesLogger;
-
-/**
- * @author Nick Belaevski
- *
- */
-public abstract class AbstractRequest implements Request {
-
- private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
- private static final String TOPIC_KEY = "topic";
-
- private static final String DATA_KEY = "data";
-
- private static final int SUSPEND_TIMEOUT = 30 * 1000;
-
- private static final class Message implements ScriptString {
-
- private TopicKey topicKey;
-
- private String serializedData;
-
- public Message(TopicKey topicKey, String serializedData) {
- super();
- this.topicKey = topicKey;
- this.serializedData = serializedData;
- }
-
- public String toScript() {
- Map<String, Object> map = createScriptMap();
-
- return ScriptUtils.toScript(map);
- }
-
- private Map<String, Object> createScriptMap() {
- Map<String,Object> map = new HashMap<String, Object>(2);
-
- map.put(TOPIC_KEY, topicKey.getTopicAddress());
- map.put(DATA_KEY, new JSLiteral(serializedData));
- return map;
- }
-
- public void appendScript(Appendable target) throws IOException {
- target.append(toScript());
- }
-
- public void appendScriptToStringBuilder(StringBuilder stringBuilder) {
- try {
- appendScript(stringBuilder);
- } catch (IOException e) {
- //ignore
- }
- }
-
- }
-
- private static final class FlushMessagesTask implements Runnable {
-
- private Request request;
-
- public FlushMessagesTask(Request request) {
- super();
- this.request = request;
- }
-
- public void run() {
- try {
- request.flushMessages();
- } catch (Throwable e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- }
-
- private final AtmosphereResourceEventListener resourceEventListener = new
AtmosphereResourceEventListener() {
-
- public void onSuspend(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
- AbstractRequest.this.onSuspend();
- }
-
- public void onResume(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
- AbstractRequest.this.onResume();
- }
-
- public void onDisconnect(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
- AbstractRequest.this.onDisconnect();
- }
-
- public void onBroadcast(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
- AbstractRequest.this.onBroadcast();
- }
- };
-
- private final AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource;
-
- private final Session session;
-
- private final ExecutorService executorService;
-
- private final Queue<Message> messagesQueue = new
ConcurrentLinkedQueue<Message>();
-
- private AtomicBoolean submitted = new AtomicBoolean(false);
-
- public AbstractRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> atmosphereResource, Session session,
- ExecutorService executorService) {
-
- super();
-
- this.atmosphereResource = atmosphereResource;
-
- ((AtmosphereEventLifecycle)
atmosphereResource).addEventListener(resourceEventListener);
-
- this.session = session;
- this.executorService = executorService;
- }
-
- private void submitToWorker() {
- if (submitted.compareAndSet(false, true)) {
- executorService.submit(new FlushMessagesTask(this));
- }
- }
-
- private String serializeMessages() {
- return ScriptUtils.toScript(new
ConsumingCollection<Message>(messagesQueue));
- }
-
- public void flushMessages() throws IOException {
- String serializedMessages = serializeMessages();
- PrintWriter writer = atmosphereResource.getResponse().getWriter();
- writer.write(serializedMessages);
- writer.flush();
-
- submitted.compareAndSet(true, false);
-
- if (isPolling()) {
- resume();
- } else if (!messagesQueue.isEmpty()) {
- submitToWorker();
- }
- }
-
- public void postMessage(TopicKey topicKey, String serializedMessage) {
- messagesQueue.add(new Message(topicKey, serializedMessage));
- submitToWorker();
- }
-
- public void suspend() throws IOException {
- atmosphereResource.suspend(SUSPEND_TIMEOUT, isPolling());
- }
-
- public void resume() throws IOException {
- //TODO - review
- try {
- getSession().disconnect();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- atmosphereResource.resume();
- }
-
- public boolean isSuspended() {
- return atmosphereResource.getAtmosphereResourceEvent().isSuspended();
- }
-
- public boolean isPolling() {
- HttpServletRequest req = atmosphereResource.getRequest();
- boolean isWebsocket = req.getAttribute(WebSocketSupport.WEBSOCKET_SUSPEND) !=
null ||
- req.getAttribute(WebSocketSupport.WEBSOCKET_RESUME) != null;
-
- //TODO how to detect non-polling transports?
- return !isWebsocket;
- }
-
- public Session getSession() {
- return session;
- }
-
- protected AtmosphereResource<HttpServletRequest, HttpServletResponse>
getResource() {
- return atmosphereResource;
- }
-
- protected void onSuspend() {
- try {
- getSession().connect(this);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- protected void onResume() {
- try {
- session.disconnect();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- protected void onDisconnect() {
- try {
- session.disconnect();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- protected void onBroadcast() {
- }
-
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,146 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.io.IOException;
-
-import org.richfaces.application.push.Request;
-import org.richfaces.application.push.Session;
-import org.richfaces.application.push.SessionManager;
-import org.richfaces.application.push.impl.SessionManagerImpl.DestroyableSession;
-import org.richfaces.log.Logger;
-import org.richfaces.log.RichfacesLogger;
-
-/**
- * @author Nick Belaevski
- *
- */
-public abstract class AbstractSession implements Session, DestroyableSession {
-
- private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
- private static final int MAX_INACTIVE_INTERVAL = 5 * 60 * 1000;
-
- private final String id;
-
- private final SessionManager sessionManager;
-
- private volatile long lastAccessedTime;
-
- private volatile Request request;
-
- private volatile boolean active = true;
-
- public AbstractSession(String id, SessionManager sessionManager) {
- super();
-
- this.id = id;
- this.sessionManager = sessionManager;
-
- resetLastAccessedTimeToCurrent();
- }
-
- private void resetLastAccessedTimeToCurrent() {
- lastAccessedTime = System.currentTimeMillis();
- }
-
- public synchronized void connect(Request request) throws Exception {
- releaseRequest();
-
- if (active) {
- processConnect(request);
- } else {
- request.resume();
- }
- }
-
- protected Request getRequest() {
- return request;
- }
-
- protected void processConnect(Request request) throws Exception {
- this.request = request;
-
- sessionManager.requeue(this);
- }
-
- private void releaseRequest() {
- Request localRequestCopy = this.request;
-
- if (localRequestCopy != null) {
- resetLastAccessedTimeToCurrent();
- this.request = null;
-
- try {
- localRequestCopy.resume();
- } catch (IOException e) {
- // TODO: handle exception
- e.printStackTrace();
- }
- }
- }
-
- public synchronized void disconnect() throws Exception {
- processDisconnect();
- }
-
- protected void processDisconnect() throws Exception {
- releaseRequest();
- }
-
- public long getLastAccessedTime() {
- if (!active) {
- return -1;
- }
-
- if (this.request != null) {
- //being accessed right now
- return System.currentTimeMillis();
- } else {
- return lastAccessedTime;
- }
- }
-
- public int getMaxInactiveInterval() {
- return MAX_INACTIVE_INTERVAL;
- }
-
- public String getId() {
- return id;
- }
-
- public void invalidate() {
- active = false;
-
- sessionManager.requeue(this);
- }
-
- public synchronized void destroy() {
- active = false;
-
- try {
- disconnect();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-}
Modified:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractTopic.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractTopic.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractTopic.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -22,12 +22,16 @@
package org.richfaces.application.push.impl;
import java.text.MessageFormat;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-import org.richfaces.application.push.EventAbortedException;
import org.richfaces.application.push.MessageDataSerializer;
import org.richfaces.application.push.MessageException;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.SessionPreSubscriptionEvent;
+import org.richfaces.application.push.SessionTopicListener;
+import org.richfaces.application.push.SubscriptionFailureException;
import org.richfaces.application.push.Topic;
import org.richfaces.application.push.TopicEvent;
import org.richfaces.application.push.TopicKey;
@@ -81,27 +85,65 @@
}
public void addTopicListener(TopicListener topicListener) {
- listeners.add(topicListener);
+ TopicListener listener = topicListener;
+
+ if (listener instanceof SessionTopicListener) {
+ listener = new SessionTopicListenerWrapper((SessionTopicListener) listener);
+ }
+
+ listeners.add(listener);
}
public void removeTopicListener(TopicListener topicListener) {
- listeners.remove(topicListener);
+ if (topicListener instanceof SessionTopicListener) {
+ Iterator<TopicListener> iterator = listeners.iterator();
+ while (iterator.hasNext()) {
+ TopicListener next = iterator.next();
+
+ if (next instanceof SessionTopicListenerWrapper) {
+ SessionTopicListenerWrapper listenerWrapper =
(SessionTopicListenerWrapper) next;
+ if (topicListener.equals(listenerWrapper.getWrappedListener())) {
+ iterator.remove();
+ break;
+ }
+ }
+ }
+ } else {
+ listeners.remove(topicListener);
+ }
}
- public void publishEvent(TopicEvent event) throws EventAbortedException {
+ public void checkSubscription(TopicKey key, Session session) throws
SubscriptionFailureException {
+ SessionPreSubscriptionEvent event = new SessionPreSubscriptionEvent(this, key,
session);
for (TopicListener listener: listeners) {
if (event.isAppropriateListener(listener)) {
try {
event.invokeListener(listener);
- } catch (EventAbortedException e) {
+ } catch (SubscriptionFailureException e) {
throw e;
} catch (Exception e) {
- LOGGER.error(MessageFormat.format("Exception invoking listener:
{0}", e.getMessage()), e);
+ logError(e);
}
}
}
}
+
+ private void logError(Exception e) {
+ LOGGER.error(MessageFormat.format("Exception invoking listener: {0}",
e.getMessage()), e);
+ }
- public abstract void publish(String subtopic, Object messageData) throws
MessageException;
+ public void publishEvent(TopicEvent event) {
+ for (TopicListener listener: listeners) {
+ if (event.isAppropriateListener(listener)) {
+ try {
+ event.invokeListener(listener);
+ } catch (Exception e) {
+ logError(e);
+ }
+ }
+ }
+ }
+ public abstract void publish(TopicKey key, Object messageData) throws
MessageException;
+
}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AtmosphereHandlerProvider.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AtmosphereHandlerProvider.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AtmosphereHandlerProvider.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,37 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.atmosphere.cpr.AtmosphereHandler;
-
-/**
- * @author Nick Belaevski
- *
- */
-public interface AtmosphereHandlerProvider {
-
- public AtmosphereHandler<HttpServletRequest, HttpServletResponse>
getHandler();
-
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,134 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.io.IOException;
-import java.text.MessageFormat;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.atmosphere.cpr.AtmosphereHandler;
-import org.atmosphere.cpr.AtmosphereResource;
-import org.atmosphere.cpr.AtmosphereResourceEvent;
-import org.richfaces.application.push.Request;
-import org.richfaces.application.push.Session;
-import org.richfaces.application.push.SessionManager;
-import org.richfaces.log.Logger;
-import org.richfaces.log.RichfacesLogger;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * @author Nick Belaevski
- *
- */
-public abstract class AtmospherePushHandler implements
AtmosphereHandler<HttpServletRequest, HttpServletResponse> {
-
- private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
- private static final ThreadFactory BROADCASTER_THREADS_FACTORY = new
ThreadFactoryBuilder().
- setDaemon(true).setNameFormat("rf-push-worker-thread-%1$s").build();
-
- private static final ThreadFactory SESSION_MANAGER_THREADS_FACTORY = new
ThreadFactoryBuilder().
-
setDaemon(true).setNameFormat("rf-push-session-manager-thread-%1$s").build();
-
- private static final String PUSH_SESSION_ID_PARAM = "pushSessionId";
-
- private SessionManager sessionManager;
-
- private ExecutorService worker;
-
- public AtmospherePushHandler() {
- super();
-
- sessionManager = new SessionManagerImpl(SESSION_MANAGER_THREADS_FACTORY);
- worker = Executors.newCachedThreadPool(BROADCASTER_THREADS_FACTORY);
- }
-
- public SessionManager getSessionManager() {
- return sessionManager;
- }
-
- public void onRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource) throws IOException {
- HttpServletRequest req = resource.getRequest();
- HttpServletResponse resp = resource.getResponse();
-
- String pushSessionId = req.getParameter(PUSH_SESSION_ID_PARAM);
-
- Session session = null;
-
- if (pushSessionId != null) {
- session = getSessionManager().getPushSession(pushSessionId);
- }
-
- if (session == null) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(MessageFormat.format("Session {0} was not found",
pushSessionId));
- }
- resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
- return;
- }
-
- resp.setContentType("text/plain");
-
- try {
- Request request = createRequest(resource, session);
- request.suspend();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- public void onStateChange(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event)
- throws IOException {
- //do nothing
- }
-
- protected abstract Request createRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource, Session session);
-
- protected ExecutorService getWorker() {
- return worker;
- }
-
- public void init(ServletConfig servletConfig) throws Exception {
- }
-
- public void destroy() throws Exception {
- try {
- worker.shutdown();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
-
- try {
- sessionManager.destroy();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/ConsumingCollection.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/ConsumingCollection.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/ConsumingCollection.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,64 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.util.AbstractCollection;
-import java.util.Iterator;
-import java.util.Queue;
-
-import com.google.common.collect.AbstractIterator;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class ConsumingCollection<T> extends AbstractCollection<T> {
-
- private final class ConsumingIterator extends AbstractIterator<T> {
- @Override
- protected T computeNext() {
- T next = queue.poll();
- if (next == null) {
- endOfData();
- }
- return next;
- }
- }
-
- private Queue<T> queue;
-
- public ConsumingCollection(Queue<T> queue) {
- super();
- this.queue = queue;
- }
-
- @Override
- public Iterator<T> iterator() {
- return new ConsumingIterator();
- }
-
- @Override
- public int size() {
- return queue.size();
- }
-
-}
Added:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/MessageDataScriptString.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/MessageDataScriptString.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/MessageDataScriptString.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.ajax4jsf.javascript.ScriptStringBase;
+import org.ajax4jsf.javascript.ScriptUtils;
+import org.richfaces.application.push.MessageData;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class MessageDataScriptString extends ScriptStringBase {
+
+ private static final String TOPIC_ATTRIBUTE =
ScriptUtils.toScript("topic");
+
+ private static final String DATA_ATTRIBUTE = ScriptUtils.toScript("data");
+
+ private static final String NUMBER_ATTRIBUTE =
ScriptUtils.toScript("number");
+
+ private final Iterable<MessageData> messages;
+
+ private long lastMessageNumber;
+
+ public MessageDataScriptString(Iterable<MessageData> messages) {
+ super();
+
+ this.messages = messages;
+ }
+
+ private void appendMessageToScript(MessageData message, Appendable target) throws
IOException {
+ target.append('<');
+
+ target.append(TOPIC_ATTRIBUTE);
+ target.append(':');
+ ScriptUtils.appendScript(target, message.getTopicKey().getTopicAddress());
+
+ target.append(',');
+
+ target.append(DATA_ATTRIBUTE);
+ target.append(':');
+ //append as is - no escaping
+ target.append(message.getSerializedMessage());
+
+ target.append(',');
+
+ target.append(NUMBER_ATTRIBUTE);
+ target.append(':');
+ ScriptUtils.appendScript(target, message.getSequenceNumber());
+
+ target.append('>');
+ }
+
+ public void appendScript(Appendable target) throws IOException {
+ Iterator<MessageData> iterator = messages.iterator();
+
+ while (iterator.hasNext()) {
+ MessageData message = iterator.next();
+
+ appendMessageToScript(message, target);
+
+ //TODO - synchronization aids?
+ lastMessageNumber = message.getSequenceNumber();
+ }
+ }
+
+ public long getLastSequenceNumber() {
+ return lastMessageNumber;
+ }
+}
Copied:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/PushContextFactoryImpl.java
(from rev 22441,
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextFactoryImpl.java)
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/PushContextFactoryImpl.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/PushContextFactoryImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.util.regex.Pattern;
+
+import javax.faces.context.FacesContext;
+import javax.servlet.ServletContext;
+
+import org.richfaces.application.CoreConfiguration;
+import org.richfaces.application.configuration.ConfigurationServiceHelper;
+import org.richfaces.application.push.PushContext;
+import org.richfaces.application.push.PushContextFactory;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class PushContextFactoryImpl implements PushContextFactory {
+
+ public static final String PUSH_HANDLER_MAPPING_ATTRIBUTE =
PushContextFactoryImpl.class.getName();
+
+ public static final String PUSH_CONTEXT_RESOURCE_NAME =
"__richfaces_push";
+
+ private static final class PushContextHolder {
+
+ static final PushContext INSTANCE = createInstance();
+
+ private PushContextHolder() {
+ }
+
+ }
+
+ private static String getApplicationContextName(FacesContext facesContext) {
+ Object contextObject = facesContext.getExternalContext().getContext();
+ if (contextObject instanceof ServletContext) {
+ return ((ServletContext) contextObject).getContextPath();
+ }
+
+ return "/";
+ }
+
+ private static String convertToUrl(FacesContext facesContext, String mapping) {
+ if (mapping == null) {
+ return mapping;
+ }
+
+ String url = mapping.replaceAll(Pattern.quote("*"),
PUSH_CONTEXT_RESOURCE_NAME);
+ if (!url.startsWith("/")) {
+ url = '/' + url;
+ }
+
+ return getApplicationContextName(facesContext) + url;
+ }
+
+ private static PushContext createInstance() {
+ FacesContext facesContext = FacesContext.getCurrentInstance();
+
+ String pushHandlerMapping = (String)
facesContext.getExternalContext().getApplicationMap().get(PUSH_HANDLER_MAPPING_ATTRIBUTE);
+
+ if (pushHandlerMapping == null) {
+ pushHandlerMapping =
ConfigurationServiceHelper.getStringConfigurationValue(facesContext,
CoreConfiguration.Items.pushHandlerMapping);
+ }
+
+ PushContextImpl pushContext = new PushContextImpl(convertToUrl(facesContext,
pushHandlerMapping));
+ pushContext.init(facesContext);
+
+ return pushContext;
+ }
+
+ public PushContext getPushContext() {
+ return PushContextHolder.INSTANCE;
+ }
+
+}
Added:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/PushContextImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/PushContextImpl.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/PushContextImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,186 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionFactory;
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionPassword;
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionPasswordEnvRef;
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionUsername;
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionUsernameEnvRef;
+import static org.richfaces.application.CoreConfiguration.Items.pushJMSTopicsNamespace;
+import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionFactory;
+import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionPassword;
+import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionUsername;
+import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSTopicsNamespace;
+
+import java.util.concurrent.ThreadFactory;
+
+import javax.faces.FacesException;
+import javax.faces.context.FacesContext;
+import javax.faces.event.AbortProcessingException;
+import javax.faces.event.PreDestroyApplicationEvent;
+import javax.faces.event.SystemEvent;
+import javax.faces.event.SystemEventListener;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NameParser;
+
+import org.richfaces.application.ServiceTracker;
+import org.richfaces.application.configuration.ConfigurationService;
+import org.richfaces.application.push.PushContext;
+import org.richfaces.application.push.SessionFactory;
+import org.richfaces.application.push.SessionManager;
+import org.richfaces.application.push.TopicsContext;
+import org.richfaces.application.push.impl.jms.JMSTopicsContextImpl;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
+
+import com.google.common.base.Strings;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class PushContextImpl implements PushContext, SystemEventListener {
+
+ private static final ThreadFactory PUBLISH_THREAD_FACTORY = new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("push-publish-thread-%1$s").build();
+
+ private static final ThreadFactory SESSION_MANAGER_THREAD_FACTORY = new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("push-session-manager-thread-%1$s").build();
+
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
+ private String pushHandlerUrl;
+
+ private TopicsContextImpl topicsContext;
+
+ private SessionManager sessionManager;
+
+ private SessionFactory sessionFactory;
+
+ public PushContextImpl(String pushHandlerUrl) {
+ super();
+ this.pushHandlerUrl = pushHandlerUrl;
+ }
+
+ private String getFirstNonEmptyConfgirutationValue(FacesContext facesContext,
ConfigurationService service, Enum<?>... keys) {
+ for (Enum<?> key : keys) {
+ String value = service.getStringValue(facesContext, key);
+ if (!Strings.isNullOrEmpty(value)) {
+ return value;
+ }
+ }
+
+ return "";
+ }
+
+ private String getConnectionFactory(FacesContext facesContext, ConfigurationService
configurationService) {
+ return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
+ pushPropertiesJMSConnectionFactory, pushJMSConnectionFactory);
+ }
+
+ private String getTopicsNamespace(FacesContext facesContext, ConfigurationService
configurationService) {
+ return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
+ pushPropertiesJMSTopicsNamespace, pushJMSTopicsNamespace);
+ }
+
+ private String getPassword(FacesContext facesContext, ConfigurationService
configurationService) {
+ return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
+ pushPropertiesJMSConnectionPassword, pushJMSConnectionPasswordEnvRef,
pushJMSConnectionPassword);
+ }
+
+ private String getUserName(FacesContext facesContext, ConfigurationService
configurationService) {
+ return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
+ pushPropertiesJMSConnectionUsername, pushJMSConnectionUsernameEnvRef,
pushJMSConnectionUsername);
+ }
+
+ public void init(FacesContext facesContext) {
+ try {
+
facesContext.getApplication().subscribeToEvent(PreDestroyApplicationEvent.class, this);
+
+ ConfigurationService configurationService =
ServiceTracker.getService(ConfigurationService.class);
+
+ InitialContext initialContext = new InitialContext();
+
+ NameParser nameParser = initialContext.getNameParser("");
+
+ Name cnfName = nameParser.parse(getConnectionFactory(facesContext,
configurationService));
+ Name topicsNamespace = nameParser.parse(getTopicsNamespace(facesContext,
configurationService));
+
+ sessionManager = new SessionManagerImpl(SESSION_MANAGER_THREAD_FACTORY);
+ topicsContext = new JMSTopicsContextImpl(PUBLISH_THREAD_FACTORY,
initialContext, cnfName, topicsNamespace,
+ getUserName(facesContext, configurationService),
+ getPassword(facesContext, configurationService)
+ );
+ sessionFactory = new SessionFactoryImpl(sessionManager, topicsContext);
+
+ facesContext.getExternalContext().getApplicationMap().put(INSTANCE_KEY_NAME,
this);
+ } catch (Exception e) {
+ throw new FacesException(e.getMessage(), e);
+ }
+ }
+
+ public void destroy() {
+ try {
+ sessionManager.destroy();
+ sessionManager = null;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ try {
+ topicsContext.destroy();
+ topicsContext = null;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ public void processEvent(SystemEvent event) throws AbortProcessingException {
+ if (event instanceof PreDestroyApplicationEvent) {
+ destroy();
+ } else {
+ throw new IllegalArgumentException(event.getClass().getName());
+ }
+ }
+
+ public boolean isListenerForSource(Object source) {
+ return true;
+ }
+
+ public TopicsContext getTopicsContext() {
+ return topicsContext;
+ }
+
+ public SessionFactory getSessionFactory() {
+ return sessionFactory;
+ }
+
+ public SessionManager getSessionManager() {
+ return sessionManager;
+ }
+
+ public String getPushHandlerUrl() {
+ return pushHandlerUrl;
+ }
+}
Copied: trunk/core/impl/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
(from rev 22439,
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractRequest.java)
===================================================================
--- trunk/core/impl/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/RequestImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,137 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.atmosphere.cpr.AtmosphereResourceEvent;
+import org.atmosphere.cpr.AtmosphereResourceEventListener;
+import org.atmosphere.cpr.Meteor;
+import org.atmosphere.websocket.WebSocketSupport;
+import org.richfaces.application.push.Request;
+import org.richfaces.application.push.Session;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class RequestImpl implements Request, AtmosphereResourceEventListener {
+
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
+ private static final int SUSPEND_TIMEOUT = 30 * 1000;
+
+ private Session session;
+
+ private final Meteor meteor;
+
+ private boolean hasActiveBroadcaster = false;
+
+ public RequestImpl(Meteor meteor, Session session) {
+ super();
+
+ this.meteor = meteor;
+ meteor.addListener(this);
+
+ this.session = session;
+ }
+
+ public void suspend() {
+ meteor.suspend(SUSPEND_TIMEOUT, isPolling());
+ }
+
+ public void resume() {
+ meteor.resume();
+ }
+
+ public boolean isPolling() {
+ HttpServletRequest req = meteor.getAtmosphereResource().getRequest();
+ boolean isWebsocket = req.getAttribute(WebSocketSupport.WEBSOCKET_SUSPEND) !=
null ||
+ req.getAttribute(WebSocketSupport.WEBSOCKET_RESUME) != null;
+
+ //TODO how to detect non-polling transports?
+ return !isWebsocket;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ public synchronized void postMessages() {
+ if (!hasActiveBroadcaster && !session.getMessages().isEmpty()) {
+ hasActiveBroadcaster = true;
+ meteor.getBroadcaster().broadcast(new
MessageDataScriptString(getSession().getMessages()));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.atmosphere.cpr.AtmosphereResourceEventListener#onSuspend(org.atmosphere.cpr.AtmosphereResourceEvent)
+ */
+ public void onSuspend(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ try {
+ getSession().connect(this);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ private void disconnect() {
+ try {
+ getSession().disconnect();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void onResume(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ disconnect();
+ }
+
+ public void onDisconnect(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ disconnect();
+ }
+
+ public synchronized void onBroadcast(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ MessageDataScriptString serializedMessages = (MessageDataScriptString)
event.getMessage();
+
getSession().clearBroadcastedMessages(serializedMessages.getLastSequenceNumber());
+
+ hasActiveBroadcaster = false;
+
+ if (isPolling()) {
+ event.getResource().resume();
+ } else {
+ postMessages();
+ }
+ }
+
+ public void onThrowable(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
+ // TODO Auto-generated method stub
+ Throwable throwable = event.throwable();
+ LOGGER.error(throwable.getMessage(), throwable);
+ }
+
+}
Added:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionFactoryImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionFactoryImpl.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionFactoryImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.SessionFactory;
+import org.richfaces.application.push.SessionManager;
+import org.richfaces.application.push.TopicsContext;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class SessionFactoryImpl implements SessionFactory {
+
+ private final SessionManager sessionManager;
+
+ private final TopicsContext topicsContext;
+
+ public SessionFactoryImpl(SessionManager sessionManager, TopicsContext topicsContext)
{
+ super();
+ this.sessionManager = sessionManager;
+ this.topicsContext = topicsContext;
+ }
+
+ public Session createSession(String pushSessionId) {
+ Session session = new SessionImpl(pushSessionId, sessionManager, topicsContext);
+ sessionManager.putPushSession(session);
+
+ return session;
+ }
+
+}
Copied: trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionImpl.java
(from rev 22439,
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/AbstractSession.java)
===================================================================
--- trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionImpl.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,251 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.richfaces.application.push.MessageData;
+import org.richfaces.application.push.Request;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.SessionManager;
+import org.richfaces.application.push.SessionSubscriptionEvent;
+import org.richfaces.application.push.SessionUnsubscriptionEvent;
+import org.richfaces.application.push.SubscriptionFailureException;
+import org.richfaces.application.push.Topic;
+import org.richfaces.application.push.TopicKey;
+import org.richfaces.application.push.TopicsContext;
+import org.richfaces.application.push.impl.SessionManagerImpl.DestroyableSession;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class SessionImpl implements Session, DestroyableSession {
+
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
+ private static final int MAX_INACTIVE_INTERVAL = 5 * 60 * 1000;
+
+ private final String id;
+
+ private final SessionManager sessionManager;
+
+ private volatile long lastAccessedTime;
+
+ private volatile Request request;
+
+ private volatile boolean active = true;
+
+ private final Queue<MessageData> messagesQueue = new
ConcurrentLinkedQueue<MessageData>();
+
+ private final Set<TopicKey> successfulSubscriptions = Sets.newHashSet();
+
+ private final Map<TopicKey, String> failedSubscriptions = Maps.newHashMap();
+
+ private TopicsContext topicsContext;
+
+ private AtomicLong sequenceCounter = new AtomicLong();
+
+ public SessionImpl(String id, SessionManager sessionManager, TopicsContext
topicsContext) {
+ super();
+
+ this.id = id;
+ this.sessionManager = sessionManager;
+ this.topicsContext = topicsContext;
+
+ resetLastAccessedTimeToCurrent();
+ }
+
+ private void resetLastAccessedTimeToCurrent() {
+ lastAccessedTime = System.currentTimeMillis();
+ }
+
+ public synchronized void connect(Request request) throws Exception {
+ releaseRequest();
+
+ if (active) {
+ processConnect(request);
+ } else {
+ request.resume();
+ }
+ }
+
+ protected Request getRequest() {
+ return request;
+ }
+
+ protected void processConnect(Request request) throws Exception {
+ this.request = request;
+ sessionManager.requeue(this);
+
+ request.postMessages();
+ }
+
+ private void releaseRequest() {
+ Request localRequestCopy = this.request;
+
+ if (localRequestCopy != null) {
+ resetLastAccessedTimeToCurrent();
+ this.request = null;
+
+ localRequestCopy.resume();
+ }
+ }
+
+ public synchronized void disconnect() throws Exception {
+ releaseRequest();
+ }
+
+ public long getLastAccessedTime() {
+ if (!active) {
+ return -1;
+ }
+
+ if (this.request != null) {
+ //being accessed right now
+ return System.currentTimeMillis();
+ } else {
+ return lastAccessedTime;
+ }
+ }
+
+ public int getMaxInactiveInterval() {
+ return MAX_INACTIVE_INTERVAL;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void invalidate() {
+ active = false;
+
+ sessionManager.requeue(this);
+ }
+
+ public synchronized void destroy() {
+ active = false;
+
+ for (TopicKey key: successfulSubscriptions) {
+ Topic topic = topicsContext.getTopic(key);
+ topic.publishEvent(new SessionUnsubscriptionEvent(topic, key, this));
+ }
+
+ try {
+ disconnect();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ public Collection<MessageData> poll() {
+ return messagesQueue;
+ }
+
+ public Map<TopicKey, String> getFailedSubscriptions() {
+ return failedSubscriptions;
+ }
+
+ public Collection<TopicKey> getSuccessfulSubscriptions() {
+ return successfulSubscriptions;
+ }
+
+ public void subscribe(String[] topics) {
+ Iterable<TopicKey> topicKeys =
Iterables.transform(Lists.newLinkedList(Arrays.asList(topics)), TopicKey.factory());
+
+ createSubscriptions(topicKeys);
+ }
+
+ private void createSubscriptions(Iterable<TopicKey> topicKeys) {
+ for (TopicKey topicKey : topicKeys) {
+ Topic pushTopic = topicsContext.getTopic(topicKey);
+
+ String errorMessage = null;
+
+ if (pushTopic == null) {
+ errorMessage = MessageFormat.format("Topic ''{0}''
is not configured", topicKey.getTopicAddress());
+ } else {
+ try {
+ //TODO - publish another events
+ pushTopic.checkSubscription(topicKey, this);
+ } catch (SubscriptionFailureException e) {
+ if (e.getMessage() != null) {
+ errorMessage = e.getMessage();
+ } else {
+ errorMessage = MessageFormat.format("Unknown error
connecting to ''{0}'' topic", topicKey.getTopicAddress());
+ }
+ }
+ }
+
+ if (errorMessage != null) {
+ failedSubscriptions.put(topicKey, errorMessage);
+ } else {
+ pushTopic.publishEvent(new SessionSubscriptionEvent(pushTopic, topicKey,
this));
+ successfulSubscriptions.add(topicKey);
+ }
+ }
+ }
+
+ public Collection<MessageData> getMessages() {
+ return messagesQueue;
+ }
+
+ public void clearBroadcastedMessages(long sequenceNumber) {
+ Queue<MessageData> queue = messagesQueue;
+ while (true) {
+ MessageData message = queue.peek();
+ if (message == null || sequenceNumber < message.getSequenceNumber()) {
+ break;
+ }
+
+ queue.remove();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.richfaces.application.push.Session#push(org.richfaces.application.push.TopicKey,
java.lang.String)
+ */
+ public void push(TopicKey topicKey, String serializedData) {
+ MessageData serializedMessage = new MessageData(topicKey, serializedData,
sequenceCounter.getAndIncrement());
+ messagesQueue.add(serializedMessage);
+ synchronized (this) {
+ if (request != null) {
+ request.postMessages();
+ }
+ }
+ }
+
+}
Added:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionTopicListenerWrapper.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionTopicListenerWrapper.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/SessionTopicListenerWrapper.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import org.richfaces.application.push.EventAbortedException;
+import org.richfaces.application.push.SessionPreSubscriptionEvent;
+import org.richfaces.application.push.SessionSubscriptionEvent;
+import org.richfaces.application.push.SessionTopicListener;
+import org.richfaces.application.push.SessionTopicListener2;
+import org.richfaces.application.push.SessionUnsubscriptionEvent;
+import org.richfaces.application.push.SubscriptionFailureException;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+@SuppressWarnings("deprecation")
+final class SessionTopicListenerWrapper implements SessionTopicListener2 {
+
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
+ private SessionTopicListener listener;
+
+ public SessionTopicListenerWrapper(SessionTopicListener listener) {
+ super();
+ this.listener = listener;
+ }
+
+ public void processPreSubscriptionEvent(SessionPreSubscriptionEvent event) throws
SubscriptionFailureException {
+ try {
+ listener.processPreSubscriptionEvent(event);
+ } catch (EventAbortedException e) {
+ throw new SubscriptionFailureException(e.getMessage());
+ }
+ }
+
+ public void processSubscriptionEvent(SessionSubscriptionEvent event) {
+ try {
+ listener.processSubscriptionEvent(event);
+ } catch (EventAbortedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ public void processUnsubscriptionEvent(SessionUnsubscriptionEvent event) {
+ try {
+ listener.processUnsubscriptionEvent(event);
+ } catch (EventAbortedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ SessionTopicListener getWrappedListener() {
+ return listener;
+ }
+}
Copied: trunk/core/impl/src/main/java/org/richfaces/application/push/impl/TopicImpl.java
(from rev 22441,
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java)
===================================================================
--- trunk/core/impl/src/main/java/org/richfaces/application/push/impl/TopicImpl.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/TopicImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,170 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.richfaces.application.push.MessageException;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.SessionSubscriptionEvent;
+import org.richfaces.application.push.SessionUnsubscriptionEvent;
+import org.richfaces.application.push.TopicEvent;
+import org.richfaces.application.push.TopicKey;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class TopicImpl extends AbstractTopic {
+
+ private static final class PublishTask implements Runnable {
+
+ private final TopicContext topicContext;
+
+ public PublishTask(TopicContext topicContext) {
+ super();
+ this.topicContext = topicContext;
+ }
+
+ public void run() {
+ topicContext.publishMessages();
+ }
+ }
+
+ private final class TopicContext {
+
+ private final List<Session> sessions = new
CopyOnWriteArrayList<Session>();
+
+ private final Queue<String> serializedMessages = new
ConcurrentLinkedQueue<String>();
+
+ private final TopicKey key;
+
+ private boolean submittedForPublishing;
+
+ public TopicContext(TopicKey key) {
+ super();
+ this.key = key;
+ }
+
+ public void addSession(Session session) {
+ sessions.add(session);
+ }
+
+ public void removeSession(Session session) {
+ sessions.remove(session);
+ }
+
+ public void addMessage(String serializedMessageData) {
+ serializedMessages.add(serializedMessageData);
+
+ submitForPublishing();
+ }
+
+ public void publishMessages() {
+ Iterator<String> itr = serializedMessages.iterator();
+ while (itr.hasNext()) {
+ String message = itr.next();
+
+ for (Session session : sessions) {
+ session.push(key, message);
+ }
+
+ itr.remove();
+ }
+
+ synchronized (this) {
+ submittedForPublishing = false;
+
+ if (!serializedMessages.isEmpty()) {
+ submitForPublishing();
+ }
+ }
+ }
+
+ private synchronized void submitForPublishing() {
+ if (!submittedForPublishing) {
+ submittedForPublishing = true;
+
+ topicsContext.getPublisherService().submit(new PublishTask(this));
+ }
+ }
+ }
+
+ private ConcurrentMap<TopicKey, TopicContext> sessions = new
ConcurrentHashMap<TopicKey, TopicContext>();
+
+ private TopicsContextImpl topicsContext;
+
+ public TopicImpl(TopicKey key, TopicsContextImpl topicsContext) {
+ super(key);
+
+ this.topicsContext = topicsContext;
+ }
+
+ private TopicContext getTopicContext(TopicKey key) {
+ return sessions.get(key);
+ }
+
+ private TopicContext getOrCreateTopicContext(TopicKey key) {
+ TopicContext result = sessions.get(key);
+ if (result == null) {
+ TopicContext freshContext = new TopicContext(key);
+ result = sessions.putIfAbsent(key, freshContext);
+ if (result == null) {
+ result = freshContext;
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void publish(TopicKey key, Object messageData) throws MessageException {
+ String serializedData = getMessageDataSerializer().serialize(messageData);
+
+ if (serializedData != null) {
+ TopicContext topicContext = getTopicContext(key);
+ if (topicContext != null) {
+ topicContext.addMessage(serializedData);
+ }
+ }
+ }
+
+ @Override
+ public void publishEvent(TopicEvent event) {
+ super.publishEvent(event);
+
+ if (event instanceof SessionSubscriptionEvent) {
+ SessionSubscriptionEvent subscriptionEvent = (SessionSubscriptionEvent)
event;
+
+
getOrCreateTopicContext(subscriptionEvent.getTopicKey()).addSession(subscriptionEvent.getSession());
+ } else if (event instanceof SessionUnsubscriptionEvent) {
+ SessionUnsubscriptionEvent unsubscriptionEvent = (SessionUnsubscriptionEvent)
event;
+
+
getTopicContext(unsubscriptionEvent.getTopicKey()).removeSession(unsubscriptionEvent.getSession());
+ }
+ }
+}
Copied:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/TopicsContextImpl.java
(from rev 22441,
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicsContextImpl.java)
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/TopicsContextImpl.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/TopicsContextImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import org.richfaces.application.push.Topic;
+import org.richfaces.application.push.TopicKey;
+import org.richfaces.application.push.TopicsContext;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class TopicsContextImpl extends TopicsContext {
+
+ private final ExecutorService publishService;
+
+ private final ThreadFactory threadFactory;
+
+ public TopicsContextImpl(ThreadFactory threadFactory) {
+ super();
+
+ this.threadFactory = threadFactory;
+ this.publishService = Executors.newCachedThreadPool(threadFactory);
+ }
+
+ protected Topic createTopic(TopicKey key) {
+ return new TopicImpl(key, this);
+ }
+
+ protected ExecutorService getPublisherService() {
+ return publishService;
+ }
+
+ protected ThreadFactory getThreadFactory() {
+ return threadFactory;
+ }
+
+ public void destroy() {
+ publishService.shutdown();
+ }
+
+}
Added:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/JMSTopicsContextImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/JMSTopicsContextImpl.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/JMSTopicsContextImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,249 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl.jms;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadFactory;
+
+import javax.faces.FacesException;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NamingException;
+
+import org.ajax4jsf.javascript.JSLiteral;
+import org.richfaces.application.push.TopicKey;
+import org.richfaces.application.push.impl.TopicsContextImpl;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.MapMaker;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class JMSTopicsContextImpl extends TopicsContextImpl {
+
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
+ private class JMSTopicContext {
+
+ /**
+ *
+ */
+ private static final String SUBTOPIC_PROPERTY = "rf_push_subtopic";
+
+ private static final String SERIALIZED_DATA_INDICATOR =
"org_richfaces_push_SerializedData";
+
+ private final String name;
+
+ private Connection connection;
+
+ private Session session;
+
+ private Thread pollingThread;
+
+ private MessageConsumer consumer;
+
+ public JMSTopicContext(String name) {
+ super();
+
+ this.name = name;
+ }
+
+ private Topic lookupTopic() throws NamingException {
+ Name topicName = appendToName(topicsNamespace, name);
+
+ return (Topic) initialContext.lookup(topicName);
+ }
+
+ private Connection createConnection() throws JMSException, NamingException {
+ ConnectionFactory connectionFactory = (ConnectionFactory)
initialContext.lookup(connectionFactoryName);
+ Connection connection = connectionFactory.createConnection(username,
password);
+ connection.start();
+ return connection;
+ }
+
+ private Object getMessageData(Message message) throws JMSException {
+ Object messageData = null;
+
+ if (message instanceof ObjectMessage) {
+ messageData = ((ObjectMessage) message).getObject();
+ } else if (message instanceof TextMessage) {
+ TextMessage textMessage = (TextMessage) message;
+
+ if (message.getBooleanProperty(SERIALIZED_DATA_INDICATOR)) {
+ messageData = new JSLiteral(textMessage.getText());
+ } else {
+ messageData = textMessage.getText();
+ }
+ }
+
+ return messageData;
+ }
+
+ public synchronized void start() throws NamingException, JMSException {
+ connection = createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createConsumer(lookupTopic(), null, false);
+
+ pollingThread = getThreadFactory().newThread(new Runnable() {
+
+ public void run() {
+ try {
+ while (true) {
+ Message message = consumer.receive();
+
+ if (message != null) {
+ String subtopicName =
message.getStringProperty(SUBTOPIC_PROPERTY);
+ TopicKey topicKey = new TopicKey(name, subtopicName);
+
+ org.richfaces.application.push.Topic pushTopic =
getTopic(topicKey);
+ if (pushTopic != null) {
+ try {
+ Object messageData = getMessageData(message);
+ pushTopic.publish(topicKey, messageData);
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ } catch (JMSException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ });
+
+ pollingThread.start();
+ }
+
+ public synchronized void stop() {
+ if (consumer != null) {
+ try {
+ consumer.close();
+ consumer = null;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ if (session != null) {
+ try {
+ session.close();
+ session = null;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ if (connection != null) {
+ try {
+ connection.close();
+ connection = null;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ }
+
+ private final InitialContext initialContext;
+
+ private final Name connectionFactoryName;
+
+ private final Name topicsNamespace;
+
+ private final String username;
+
+ private final String password;
+
+ private final ConcurrentMap<String, JMSTopicContext> contextsMap = new
MapMaker().makeComputingMap(new Function<String, JMSTopicContext> () {
+
+ public JMSTopicContext apply(String name) {
+ JMSTopicContext topicContext = new JMSTopicContext(name);
+ try {
+ topicContext.start();
+ } catch (Exception e) {
+ try {
+ topicContext.stop();
+ } catch (Exception e1) {
+ LOGGER.error(e1.getMessage(), e1);
+ }
+
+ throw new FacesException(e.getMessage(), e);
+ }
+ return topicContext;
+ }
+
+ });
+
+ public JMSTopicsContextImpl(ThreadFactory threadFactory, InitialContext
initialContext, Name connectionFactoryName,
+ Name topicsNamespace, String username, String password) {
+ super(threadFactory);
+ this.initialContext = initialContext;
+ this.connectionFactoryName = connectionFactoryName;
+ this.topicsNamespace = topicsNamespace;
+ this.username = username;
+ this.password = password;
+ }
+
+ private Name appendToName(Name name, String comp) throws NamingException {
+ Name clonedName = (Name) name.clone();
+ return clonedName.add(comp);
+ }
+
+ @Override
+ protected org.richfaces.application.push.Topic createTopic(TopicKey key) {
+ org.richfaces.application.push.Topic topic = super.createTopic(key);
+ contextsMap.get(key.getTopicName());
+ return topic;
+ }
+
+ @Override
+ public void destroy() {
+ for (JMSTopicContext topicContext: contextsMap.values()) {
+ try {
+ topicContext.stop();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ super.destroy();
+ }
+
+
+}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,197 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl.jms;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import javax.faces.context.FacesContext;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import javax.naming.InitialContext;
-import javax.naming.Name;
-import javax.naming.NamingException;
-import javax.servlet.ServletContext;
-
-import org.richfaces.application.push.Session;
-import org.richfaces.application.push.TopicKey;
-import org.richfaces.log.Logger;
-import org.richfaces.log.RichfacesLogger;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
-import com.google.common.collect.Iterables;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class MessagingContext {
-
- static final String SUBTOPIC_ATTRIBUTE_NAME = "rf_push_subtopic";
-
- private static final Joiner OR_JOINER = Joiner.on(" OR ").skipNulls();
-
- private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
- private static final Function<TopicKey, String> TOPIC_KEY_TO_MESSAGE_SELECTOR =
new Function<TopicKey, String>() {
- public String apply(TopicKey from) {
- if (Strings.isNullOrEmpty(from.getSubtopicName())) {
- return null;
- }
-
- return SUBTOPIC_ATTRIBUTE_NAME + " = '" +
from.getSubtopicName() + "'";
- }
- };
-
- private static final String SHARED_INSTANCE_KEY = MessagingContext.class.getName();
-
- private final InitialContext initialContext;
-
- private final Name connectionFactoryName;
-
- private final Name topicsNamespace;
-
- private final String applicationName;
-
- private final String username;
-
- private final String password;
-
- private Connection connection;
-
- public MessagingContext(InitialContext initialContext, Name connectionFactoryName,
Name topicsNamespace,
- String applicationName, String username, String password) {
-
- super();
- this.initialContext = initialContext;
- this.connectionFactoryName = connectionFactoryName;
- this.topicsNamespace = topicsNamespace;
- this.applicationName = applicationName;
- this.username = username;
- this.password = password;
- }
-
- private Name appendToName(Name name, String comp) throws NamingException {
- Name clonedName = (Name) name.clone();
- return clonedName.add(comp);
- }
-
- public void start() throws Exception {
- ConnectionFactory connectionFactory = (ConnectionFactory)
initialContext.lookup(connectionFactoryName);
-
- connection = connectionFactory.createConnection(username, password);
-
- //TODO - review
- try {
- //durable subscription requires ClientID to be set
- connection.setClientID(UUID.randomUUID().toString());
- } catch (IllegalStateException e) {
- //ignore - clientId has already been set
- LOGGER.debug(e.getMessage(), e);
- } catch (JMSException e) {
- //ignore - clientId has already been set
- LOGGER.debug(e.getMessage(), e);
- }
-
- connection.start();
- }
-
- protected Connection getConnection() {
- if (connection == null) {
- throw new IllegalStateException("connection is absent");
- }
-
- return connection;
- }
-
- public void stop() throws Exception {
- if (connection != null) {
- connection.close();
- connection = null;
- }
- }
-
- public Topic lookup(TopicKey topicKey) throws NamingException {
- Name topicName = appendToName(topicsNamespace, topicKey.getTopicName());
-
- return (Topic) initialContext.lookup(topicName);
- }
-
- public javax.jms.Session createSession() throws JMSException {
- return getConnection().createSession(false,
javax.jms.Session.CLIENT_ACKNOWLEDGE);
- }
-
- public String getSubscriptionClientId(Session session, TopicKey topicKey) {
- //here TopicKey#topicName should be used, not TopicKey#topicAddress
- return "rf-push:" + applicationName + ":" +
topicKey.getTopicName() + ":" + session.getId();
- }
-
- public void shareInstance(FacesContext facesContext) {
- Map<String, Object> applicationMap =
facesContext.getExternalContext().getApplicationMap();
- applicationMap.put(SHARED_INSTANCE_KEY, this);
- }
-
- public static MessagingContext getSharedInstance(ServletContext servletContext) {
- return (MessagingContext) servletContext.getAttribute(SHARED_INSTANCE_KEY);
- }
-
- private String createMessageSelector(Iterable<TopicKey> topicKeys) {
- Iterable<String> sqlStrings = Iterables.transform(topicKeys,
TOPIC_KEY_TO_MESSAGE_SELECTOR);
- return OR_JOINER.join(sqlStrings) + " OR false" /* workaround for
HornetQ */;
- }
-
- public TopicSubscriber createTopicSubscriber(Session pushSession, javax.jms.Session
jmsSession,
- Entry<TopicKey, Collection<TopicKey>> entry)
- throws JMSException, NamingException {
-
- TopicKey rootTopicKey = entry.getKey();
-
- String subscriptionClientId = getSubscriptionClientId(pushSession,
rootTopicKey);
-
- javax.jms.Topic jmsTopic = lookup(rootTopicKey);
-
- return jmsSession.createDurableSubscriber(jmsTopic, subscriptionClientId,
createMessageSelector(entry.getValue()), false);
- }
-
- /**
- * @param session
- * @param jmsSession
- * @param rootTopicKeys
- */
- public void removeTopicSubscriber(Session session, javax.jms.Session jmsSession,
Collection<TopicKey> rootTopicKeys) {
- for (TopicKey rootTopicKey : rootTopicKeys) {
- try {
- jmsSession.unsubscribe(getSubscriptionClientId(session, rootTopicKey));
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
-
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextFactoryImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextFactoryImpl.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextFactoryImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl.jms;
-
-import javax.faces.context.FacesContext;
-
-import org.richfaces.application.push.PushContext;
-import org.richfaces.application.push.PushContextFactory;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class PushContextFactoryImpl implements PushContextFactory {
-
- private static final class PushContextHolder {
-
- static final PushContext INSTANCE = createInstance();
-
- private PushContextHolder() {
- }
-
- }
-
- private static PushContext createInstance() {
- FacesContext facesContext = FacesContext.getCurrentInstance();
-
- PushContextImpl pushContext = new PushContextImpl();
- pushContext.init(facesContext);
-
- return pushContext;
- }
-
- public PushContext getPushContext() {
- return PushContextHolder.INSTANCE;
- }
-
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextImpl.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushContextImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,188 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl.jms;
-
-import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionFactory;
-import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionPassword;
-import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionPasswordEnvRef;
-import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionUsername;
-import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionUsernameEnvRef;
-import static org.richfaces.application.CoreConfiguration.Items.pushJMSTopicsNamespace;
-import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionFactory;
-import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionPassword;
-import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionUsername;
-import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSTopicsNamespace;
-
-import javax.faces.FacesException;
-import javax.faces.context.FacesContext;
-import javax.faces.event.AbortProcessingException;
-import javax.faces.event.PreDestroyApplicationEvent;
-import javax.faces.event.SystemEvent;
-import javax.faces.event.SystemEventListener;
-import javax.naming.InitialContext;
-import javax.naming.Name;
-import javax.naming.NameParser;
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.atmosphere.cpr.AtmosphereHandler;
-import org.richfaces.application.ServiceTracker;
-import org.richfaces.application.configuration.ConfigurationService;
-import org.richfaces.application.push.PushContext;
-import org.richfaces.application.push.SessionFactory;
-import org.richfaces.application.push.SessionManager;
-import org.richfaces.application.push.TopicsContext;
-import org.richfaces.application.push.impl.AtmosphereHandlerProvider;
-import org.richfaces.log.Logger;
-import org.richfaces.log.RichfacesLogger;
-
-import com.google.common.base.Strings;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class PushContextImpl implements PushContext, SystemEventListener,
AtmosphereHandlerProvider {
-
- private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
- private MessagingContext messagingContext;
-
- private TopicsContext topicsContext;
-
- private PushHandlerImpl pushHandlerImpl;
-
- public TopicsContext getTopicsContext() {
- return topicsContext;
- }
-
- private String getApplicationName(FacesContext facesContext) {
- ServletContext servletContext = (ServletContext)
facesContext.getExternalContext().getContext();
- return servletContext.getContextPath();
- }
-
- private String getFirstNonEmptyConfgirutationValue(FacesContext facesContext,
ConfigurationService service, Enum<?>... keys) {
- for (Enum<?> key : keys) {
- String value = service.getStringValue(facesContext, key);
- if (!Strings.isNullOrEmpty(value)) {
- return value;
- }
- }
-
- return "";
- }
-
- public void init(FacesContext facesContext) {
- try {
-
facesContext.getApplication().subscribeToEvent(PreDestroyApplicationEvent.class, this);
-
facesContext.getExternalContext().getApplicationMap().put(PushContext.INSTANCE_KEY_NAME,
this);
-
- ConfigurationService configurationService =
ServiceTracker.getService(ConfigurationService.class);
-
- InitialContext initialContext = new InitialContext();
-
- NameParser nameParser = initialContext.getNameParser("");
-
- Name cnfName = nameParser.parse(getConnectionFactory(facesContext,
configurationService));
- Name topicsNamespace = nameParser.parse(getTopicsNamespace(facesContext,
configurationService));
-
- messagingContext = new MessagingContext(initialContext, cnfName,
topicsNamespace,
- getApplicationName(facesContext),
- getUserName(facesContext, configurationService),
- getPassword(facesContext, configurationService)
- );
-
- messagingContext.shareInstance(facesContext);
-
- messagingContext.start();
-
- topicsContext = new TopicsContextImpl(messagingContext);
-
- pushHandlerImpl = new PushHandlerImpl(messagingContext, topicsContext);
- } catch (Exception e) {
- throw new FacesException(e.getMessage(), e);
- }
- }
-
- private String getPassword(FacesContext facesContext, ConfigurationService
configurationService) {
- return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
- pushPropertiesJMSConnectionPassword, pushJMSConnectionPasswordEnvRef,
pushJMSConnectionPassword);
- }
-
- private String getUserName(FacesContext facesContext, ConfigurationService
configurationService) {
- return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
- pushPropertiesJMSConnectionUsername, pushJMSConnectionUsernameEnvRef,
pushJMSConnectionUsername);
- }
-
- private String getConnectionFactory(FacesContext facesContext, ConfigurationService
configurationService) {
- return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
- pushPropertiesJMSConnectionFactory, pushJMSConnectionFactory);
- }
-
- private String getTopicsNamespace(FacesContext facesContext, ConfigurationService
configurationService) {
- return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
- pushPropertiesJMSTopicsNamespace, pushJMSTopicsNamespace);
- }
-
- public void destroy() {
- if (pushHandlerImpl != null) {
- try {
- pushHandlerImpl.destroy();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- if (messagingContext != null) {
- try {
- messagingContext.stop();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
-
- public void processEvent(SystemEvent event) throws AbortProcessingException {
- if (event instanceof PreDestroyApplicationEvent) {
- destroy();
- } else {
- throw new IllegalArgumentException(event.getClass().getName());
- }
- }
-
- public boolean isListenerForSource(Object source) {
- return true;
- }
-
- public SessionFactory getSessionFactory() {
- return pushHandlerImpl;
- }
-
- public AtmosphereHandler<HttpServletRequest, HttpServletResponse> getHandler()
{
- return pushHandlerImpl;
- }
-
- public SessionManager getSessionManager() {
- return pushHandlerImpl.getSessionManager();
- }
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,64 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl.jms;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.atmosphere.cpr.AtmosphereResource;
-import org.richfaces.application.push.Request;
-import org.richfaces.application.push.Session;
-import org.richfaces.application.push.SessionFactory;
-import org.richfaces.application.push.SessionManager;
-import org.richfaces.application.push.TopicsContext;
-import org.richfaces.application.push.impl.AtmospherePushHandler;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class PushHandlerImpl extends AtmospherePushHandler implements SessionFactory {
-
- private MessagingContext messagingContext;
-
- private TopicsContext topicsContext;
-
- public PushHandlerImpl(MessagingContext messagingContext, TopicsContext
topicsContext) {
- super();
- this.messagingContext = messagingContext;
- this.topicsContext = topicsContext;
- }
-
- public Session createSession(String key) {
- SessionManager sessionManager = getSessionManager();
- Session session = new SessionImpl(key, sessionManager, messagingContext,
topicsContext);
- sessionManager.putPushSession(session);
-
- return session;
- }
-
- @Override
- protected Request createRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource,
- Session session) {
- return new RequestImpl(resource, session, getWorker(), topicsContext);
- }
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/RequestImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,125 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl.jms;
-
-import java.util.concurrent.ExecutorService;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.atmosphere.cpr.AtmosphereResource;
-import org.richfaces.application.push.MessageDataSerializer;
-import org.richfaces.application.push.MessageException;
-import org.richfaces.application.push.Session;
-import org.richfaces.application.push.TopicKey;
-import org.richfaces.application.push.TopicsContext;
-import org.richfaces.application.push.impl.AbstractRequest;
-import org.richfaces.log.Logger;
-import org.richfaces.log.RichfacesLogger;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class RequestImpl extends AbstractRequest implements
org.richfaces.application.push.MessageListener {
-
- private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
- private TopicsContext topicsContext;
-
- public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource, Session session,
- ExecutorService executorService, TopicsContext topicsContext) {
-
- super(atmosphereResource, session, executorService);
-
- this.topicsContext = topicsContext;
- }
-
- private String serializeMessage(org.richfaces.application.push.Topic topic, Message
message) {
- String serializedMessageData = null;
- Object messageData = null;
-
- try {
- if (message instanceof ObjectMessage) {
- messageData = ((ObjectMessage) message).getObject();
- } else if (message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
-
- if (message.getBooleanProperty(TopicImpl.SERIALIZED_DATA_INDICATOR)) {
- serializedMessageData = textMessage.getText();
- } else {
- messageData = textMessage.getText();
- }
- }
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
-
- if (serializedMessageData != null) {
- return serializedMessageData;
- }
-
- if (messageData != null) {
- MessageDataSerializer messageDataSerializer =
topic.getMessageDataSerializer();
- try {
- return messageDataSerializer.serialize(messageData);
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
-
- return null;
- }
-
- public void onMessage(Object message) throws MessageException {
- Message jmsMessage = (Message) message;
- try {
- String topicName = ((Topic) jmsMessage.getJMSDestination()).getTopicName();
-
- org.richfaces.application.push.Topic topic = topicsContext.getTopic(new
TopicKey(topicName));
- if (topic == null) {
- //TODO log
- return;
- }
-
- String serializedMessageData = serializeMessage(topic, jmsMessage);
- if (serializedMessageData == null) {
- //TODO log
- return;
- }
-
- postMessage(new TopicKey(topicName,
jmsMessage.getStringProperty(MessagingContext.SUBTOPIC_ATTRIBUTE_NAME)),
serializedMessageData);
- } catch (JMSException e) {
- throw new MessageException(e.getMessage(), e);
- }
- }
-
-
- public org.richfaces.application.push.MessageListener getMessageListener() {
- return this;
- }
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,261 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl.jms;
-
-import java.text.MessageFormat;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-import javax.naming.NamingException;
-
-import org.richfaces.application.push.EventAbortedException;
-import org.richfaces.application.push.Request;
-import org.richfaces.application.push.SessionManager;
-import org.richfaces.application.push.SessionPreSubscriptionEvent;
-import org.richfaces.application.push.Topic;
-import org.richfaces.application.push.TopicKey;
-import org.richfaces.application.push.TopicsContext;
-import org.richfaces.application.push.impl.AbstractSession;
-import org.richfaces.log.Logger;
-import org.richfaces.log.RichfacesLogger;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class SessionImpl extends AbstractSession {
-
- private static final class JMSToPushListenerAdaptor implements MessageListener {
-
- private final org.richfaces.application.push.MessageListener messageListener;
-
- private JMSToPushListenerAdaptor(org.richfaces.application.push.MessageListener
messageListener) {
- this.messageListener = messageListener;
- }
-
- public void onMessage(Message message) {
- try {
- messageListener.onMessage(message);
- message.acknowledge();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
-
- private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
- private final MessagingContext messagingContext;
-
- private final TopicsContext topicsContext;
-
- private final Multimap<TopicKey, TopicKey> successfulSubscriptions =
ArrayListMultimap.<TopicKey, TopicKey>create();
-
- private final Map<TopicKey, String> failedSubscriptions = Maps.newHashMap();
-
- private Session jmsSession;
-
- private Collection<TopicSubscriber> subscribers =
Lists.newArrayListWithCapacity(1);
-
- public SessionImpl(String id, SessionManager sessionManager, MessagingContext
messagingContext, TopicsContext topicsContext) {
- super(id, sessionManager);
-
- this.messagingContext = messagingContext;
- this.topicsContext = topicsContext;
- }
-
- public Map<TopicKey, String> getFailedSubscriptions() {
- return failedSubscriptions;
- }
-
- public Multimap<TopicKey, TopicKey> getSuccessfulSubscriptions() {
- return successfulSubscriptions;
- }
-
- private void createSubscriptions(Iterable<TopicKey> topicKeys) {
- javax.jms.Session jmsSession = null;
- try {
- Multimap<TopicKey, TopicKey> rootTopicsMap =
createRootTopicsKeysMap(topicKeys);
-
- jmsSession = messagingContext.createSession();
-
- for (Entry<TopicKey, Collection<TopicKey>> entry:
rootTopicsMap.asMap().entrySet()) {
- TopicSubscriber subscriber = null;
-
- try {
- subscriber = messagingContext.createTopicSubscriber(this, jmsSession,
entry);
- successfulSubscriptions.putAll(entry.getKey(), entry.getValue());
- } finally {
- if (subscriber != null) {
- subscriber.close();
- }
- }
-
- }
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- } catch (NamingException e) {
- LOGGER.error(e.getMessage(), e);
- } finally {
- if (jmsSession != null) {
- try {
- jmsSession.close();
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
- }
-
- private Multimap<TopicKey, TopicKey>
createRootTopicsKeysMap(Iterable<TopicKey> topicKeys) {
- Multimap<TopicKey, TopicKey> rootTopicKeys =
ArrayListMultimap.<TopicKey, TopicKey>create();
-
- for (TopicKey topicKey : topicKeys) {
- rootTopicKeys.put(topicKey.getRootTopicKey(), topicKey);
- }
-
- return rootTopicKeys;
- }
-
- private void processFailedSubscriptions(Iterable<TopicKey> topicKeys) {
- for (Iterator<TopicKey> itr = topicKeys.iterator(); itr.hasNext(); ) {
- TopicKey topicKey = itr.next();
-
- TopicKey rootTopicKey = topicKey.getRootTopicKey();
- Topic pushTopic = topicsContext.getTopic(rootTopicKey);
-
- String errorMessage = null;
-
- if (pushTopic == null) {
- errorMessage = MessageFormat.format("Topic ''{0}''
is not configured", topicKey.getTopicAddress());
- } else {
- try {
- //TODO - publish another events
- pushTopic.publishEvent(new SessionPreSubscriptionEvent(pushTopic,
topicKey, this));
- } catch (EventAbortedException e) {
- if (e.getMessage() != null) {
- errorMessage = e.getMessage();
- } else {
- errorMessage = MessageFormat.format("Unknown error
connecting to ''{0}'' topic", topicKey.getTopicAddress());
- }
- }
- }
-
- if (errorMessage != null) {
- itr.remove();
- failedSubscriptions.put(topicKey, errorMessage);
- }
- }
- }
-
- @Override
- protected void processConnect(Request request) throws Exception {
- super.processConnect(request);
-
- jmsSession = messagingContext.createSession();
-
- MessageListener jmsListener = new
JMSToPushListenerAdaptor(request.getMessageListener());
-
- for (Entry<TopicKey, Collection<TopicKey>> entry:
getSuccessfulSubscriptions().asMap().entrySet()) {
- TopicSubscriber subscriber = messagingContext.createTopicSubscriber(this,
jmsSession, entry);
- subscribers.add(subscriber);
- subscriber.setMessageListener(jmsListener);
- }
- }
-
- private void clearSubscribers() {
- if (jmsSession != null) {
- for (TopicSubscriber subscriber : subscribers) {
- try {
- subscriber.close();
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- subscribers.clear();
-
- try {
- jmsSession.close();
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
-
- jmsSession = null;
- }
- }
-
- @Override
- protected void processDisconnect() throws Exception {
- try {
- clearSubscribers();
- } catch (Exception e) {
- LOGGER.error(e.getMessage(), e);
- }
-
- super.processDisconnect();
- }
-
- public void subscribe(String[] topics) {
- Iterable<TopicKey> topicKeys =
Iterables.transform(Lists.newLinkedList(Arrays.asList(topics)), TopicKey.factory());
-
- processFailedSubscriptions(topicKeys);
- createSubscriptions(topicKeys);
- }
-
- @Override
- public synchronized void destroy() {
- super.destroy();
-
- //we need to create new JMS session, as this method can be called from another
thread - see javax.jms.Session JavaDoc
- //for multi-threading limitations
- Session localJMSSession = null;
- try {
- localJMSSession = messagingContext.createSession();
- messagingContext.removeTopicSubscriber(this, localJMSSession,
successfulSubscriptions.keySet());
-
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- } finally {
- if (localJMSSession != null) {
- try {
- localJMSSession.close();
- } catch (JMSException e) {
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
- }
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,89 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl.jms;
-
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.NamingException;
-
-import org.richfaces.application.push.MessageException;
-import org.richfaces.application.push.TopicKey;
-import org.richfaces.application.push.impl.AbstractTopic;
-import org.richfaces.log.Logger;
-import org.richfaces.log.RichfacesLogger;
-
-import com.google.common.base.Strings;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class TopicImpl extends AbstractTopic {
-
- static final String SERIALIZED_DATA_INDICATOR =
"org_richfaces_push_SerializedData";
-
- private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
-
- private MessagingContext messagingContext;
-
- public TopicImpl(TopicKey key, MessagingContext messagingContext) {
- super(key);
-
- this.messagingContext = messagingContext;
- }
-
- @Override
- public void publish(String subtopic, Object messageData) throws MessageException {
- String serializedData = getMessageDataSerializer().serialize(messageData);
-
- Session session = null;
- try {
- session = messagingContext.createSession();
- MessageProducer producer =
session.createProducer(messagingContext.lookup(getKey()));
-
- TextMessage textMessage = session.createTextMessage();
- textMessage.setText(serializedData);
- textMessage.setBooleanProperty(SERIALIZED_DATA_INDICATOR, true);
-
- if (!Strings.isNullOrEmpty(subtopic)) {
- textMessage.setStringProperty(MessagingContext.SUBTOPIC_ATTRIBUTE_NAME,
subtopic);
- }
-
- producer.send(textMessage);
- } catch (JMSException e) {
- throw new MessageException(e.getMessage(), e);
- } catch (NamingException e) {
- throw new MessageException(e.getMessage(), e);
- } finally {
- if (session != null) {
- try {
- session.close();
- } catch (JMSException e) {
- LOGGER.debug(e.getMessage(), e);
- }
- }
- }
- }
-
-}
Deleted:
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicsContextImpl.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicsContextImpl.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/core/impl/src/main/java/org/richfaces/application/push/impl/jms/TopicsContextImpl.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -1,45 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl.jms;
-
-import org.richfaces.application.push.Topic;
-import org.richfaces.application.push.TopicKey;
-import org.richfaces.application.push.TopicsContext;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class TopicsContextImpl extends TopicsContext {
-
- private MessagingContext messagingContext;
-
- public TopicsContextImpl(MessagingContext messagingContext) {
- super();
- this.messagingContext = messagingContext;
- }
-
- protected Topic createTopic(TopicKey key) {
- return new TopicImpl(key, messagingContext);
- }
-
-}
Modified: trunk/core/impl/src/main/java/org/richfaces/webapp/PushFilter.java
===================================================================
--- trunk/core/impl/src/main/java/org/richfaces/webapp/PushFilter.java 2011-04-28 14:16:16
UTC (rev 22450)
+++ trunk/core/impl/src/main/java/org/richfaces/webapp/PushFilter.java 2011-04-28 16:58:13
UTC (rev 22451)
@@ -37,9 +37,9 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.atmosphere.cpr.AtmosphereServlet;
import org.richfaces.application.push.PushContext;
-import org.richfaces.application.push.impl.AtmosphereHandlerProvider;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
@@ -48,26 +48,16 @@
* @author Nick Belaevski
*
*/
-//TODO override broadcaster
public class PushFilter implements Filter {
- private static final String PUSH_HUB_MAPPING = "/*";
-
private static final long serialVersionUID = 7616370505508715222L;
- /**
- * @author Nick Belaevski
- *
- */
+ private static final Logger LOGGER = RichfacesLogger.WEBAPP.getLogger();
+
private final class ServletConfigFacade implements ServletConfig {
- /**
- *
- */
+
private final FilterConfig filterConfig;
- /**
- * @param filterConfig
- */
private ServletConfigFacade(FilterConfig filterConfig) {
this.filterConfig = filterConfig;
}
@@ -90,7 +80,6 @@
return result;
}
- @SuppressWarnings("unchecked")
public Enumeration<String> getInitParameterNames() {
Set<String> result = Sets.newLinkedHashSet();
@@ -101,59 +90,51 @@
}
}
- private AtmosphereServlet atmosphereServlet;
+ private PushServlet pushServlet;
public void init(FilterConfig filterConfig) throws ServletException {
- AtmosphereHandlerProvider handlerProvider = (AtmosphereHandlerProvider)
filterConfig.getServletContext().getAttribute(PushContext.INSTANCE_KEY_NAME);
+ PushContext handlerProvider = (PushContext)
filterConfig.getServletContext().getAttribute(PushContext.INSTANCE_KEY_NAME);
+
+ if (handlerProvider != null) {
+ logPushFilterWarning(filterConfig.getServletContext());
+
+ pushServlet = new PushServlet();
+ ServletConfigFacade servletConfig = new ServletConfigFacade(filterConfig);
+ pushServlet.init(servletConfig);
+ }
+ }
- if (handlerProvider == null) {
- return;
+ private void logPushFilterWarning(ServletContext servletContext) {
+ String message;
+
+ if (servletContext.getMajorVersion() >= 3) {
+ message = "PushFilter has been deprecated, you can remove its
declaration in Servlets 3 environment";
+ } else {
+ message = "PushFilter has been deprecated, you should use PushServlet
instead";
}
- atmosphereServlet = new AtmosphereServlet() {
-
- private static final long serialVersionUID = -8719394110408476331L;
-
- protected boolean detectSupportedFramework(ServletConfig sc) throws
ClassNotFoundException, IllegalAccessException, InstantiationException
,NoSuchMethodException ,java.lang.reflect.InvocationTargetException {
- return false;
- };
- };
- ServletConfigFacade servletConfig = new ServletConfigFacade(filterConfig);
- atmosphereServlet.init(servletConfig);
-
- atmosphereServlet.addAtmosphereHandler(PUSH_HUB_MAPPING,
handlerProvider.getHandler());
+ LOGGER.warn(message);
}
- /* (non-Javadoc)
- * @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest,
javax.servlet.ServletResponse, javax.servlet.FilterChain)
- */
- public void doFilter(ServletRequest request, ServletResponse response, FilterChain
chain) throws IOException,
- ServletException {
-
- if (atmosphereServlet != null && request instanceof HttpServletRequest
&& response instanceof HttpServletResponse) {
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain
chain) throws IOException, ServletException {
+ if (pushServlet != null && request instanceof HttpServletRequest
&& response instanceof HttpServletResponse) {
HttpServletRequest httpReq = (HttpServletRequest) request;
HttpServletResponse httpResp = (HttpServletResponse) response;
if ("GET".equals(httpReq.getMethod()) &&
httpReq.getQueryString() != null &&
httpReq.getQueryString().contains("__richfacesPushAsync")) {
- atmosphereServlet.doGet(httpReq, httpResp);
+ pushServlet.doGet(httpReq, httpResp);
return;
}
}
- // TODO Auto-generated method stub
chain.doFilter(request, response);
}
- /* (non-Javadoc)
- * @see javax.servlet.Filter#destroy()
- */
public void destroy() {
- if (atmosphereServlet != null) {
- atmosphereServlet.removeAtmosphereHandler(PUSH_HUB_MAPPING);
- atmosphereServlet.destroy();
- atmosphereServlet = null;
+ if (pushServlet != null) {
+ pushServlet.destroy();
+ pushServlet = null;
}
- // TODO Auto-generated method stub
}
}
Added: trunk/core/impl/src/main/java/org/richfaces/webapp/PushHandlerFilter.java
===================================================================
--- trunk/core/impl/src/main/java/org/richfaces/webapp/PushHandlerFilter.java
(rev 0)
+++ trunk/core/impl/src/main/java/org/richfaces/webapp/PushHandlerFilter.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,120 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.webapp;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.Collections;
+
+import javax.servlet.Filter;
+import javax.servlet.FilterChain;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.atmosphere.cpr.BroadcastFilter;
+import org.atmosphere.cpr.Broadcaster.SCOPE;
+import org.atmosphere.cpr.Meteor;
+import org.richfaces.application.push.PushContext;
+import org.richfaces.application.push.Request;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.SessionManager;
+import org.richfaces.application.push.impl.RequestImpl;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
+
+/**
+ * Serves as delegate for Atmposphere servlets - should not be used directly
+ * @author Nick Belaevski
+ *
+ */
+public class PushHandlerFilter implements Filter {
+
+ public static final String SESSION_ATTRIBUTE_NAME = Session.class.getName();
+
+ public static final String REQUEST_ATTRIBUTE_NAME = Request.class.getName();
+
+ private static final String PUSH_SESSION_ID_PARAM = "pushSessionId";
+
+ private static final long serialVersionUID = 7616370505508715222L;
+
+ private static final Logger LOGGER = RichfacesLogger.WEBAPP.getLogger();
+
+ private SessionManager sessionManager;
+
+ public void init(FilterConfig filterConfig) throws ServletException {
+ PushContext pushContext = (PushContext)
filterConfig.getServletContext().getAttribute(PushContext.INSTANCE_KEY_NAME);
+ sessionManager = pushContext.getSessionManager();
+ }
+
+ public void doFilter(ServletRequest request, ServletResponse response, FilterChain
chain) throws IOException, ServletException {
+ if (request instanceof HttpServletRequest && response instanceof
HttpServletResponse) {
+ HttpServletRequest httpReq = (HttpServletRequest) request;
+ HttpServletResponse httpResp = (HttpServletResponse) response;
+
+ chain.doFilter(request, response);
+
+ if ("GET".equals(httpReq.getMethod())) {
+ Meteor meteor = Meteor.build(httpReq, SCOPE.REQUEST,
Collections.<BroadcastFilter>emptyList(), null);
+
+ String pushSessionId = httpReq.getParameter(PUSH_SESSION_ID_PARAM);
+
+ Session session = null;
+
+ if (pushSessionId != null) {
+ session = sessionManager.getPushSession(pushSessionId);
+ }
+
+ if (session == null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(MessageFormat.format("Session {0} was not
found", pushSessionId));
+ }
+ httpResp.sendError(HttpServletResponse.SC_BAD_REQUEST);
+ return;
+ }
+
+ httpResp.setContentType("text/plain");
+
+ try {
+ Request pushRequest = new RequestImpl(meteor, session);
+
+ httpReq.setAttribute(SESSION_ATTRIBUTE_NAME, session);
+ httpReq.setAttribute(REQUEST_ATTRIBUTE_NAME, pushRequest);
+
+ pushRequest.suspend();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ return;
+ }
+ }
+ }
+
+ public void destroy() {
+ sessionManager = null;
+ }
+
+}
Added: trunk/core/impl/src/main/java/org/richfaces/webapp/PushServlet.java
===================================================================
--- trunk/core/impl/src/main/java/org/richfaces/webapp/PushServlet.java
(rev 0)
+++ trunk/core/impl/src/main/java/org/richfaces/webapp/PushServlet.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.webapp;
+
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+
+import org.atmosphere.cpr.AtmosphereServlet;
+import org.atmosphere.cpr.MeteorServlet;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public final class PushServlet extends MeteorServlet {
+
+ private static final long serialVersionUID = 2483746935231439236L;
+
+ private static final class ServletConfigDefaultsWrapper implements ServletConfig {
+
+ private static final Map<String, String> DEFAULT_INIT_PARAMETERS =
Maps.newHashMap();
+
+ static {
+ DEFAULT_INIT_PARAMETERS.put("org.atmosphere.filter",
PushHandlerFilter.class.getName());
+ DEFAULT_INIT_PARAMETERS.put(AtmosphereServlet.DISABLE_ONSTATE_EVENT,
"true");
+ }
+
+ private final ServletConfig config;
+
+ public ServletConfigDefaultsWrapper(ServletConfig config) {
+ super();
+ this.config = config;
+ }
+
+ public String getServletName() {
+ return config.getServletName();
+ }
+
+ public ServletContext getServletContext() {
+ return config.getServletContext();
+ }
+
+ public String getInitParameter(String name) {
+ String parameter = config.getInitParameter(name);
+
+ if (parameter == null) {
+ parameter = DEFAULT_INIT_PARAMETERS.get(name);
+ }
+
+ return parameter;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public Enumeration getInitParameterNames() {
+ Set<String> result = Sets.newHashSet();
+
+ Iterators.addAll(result, (Iterator<? extends String>)
DEFAULT_INIT_PARAMETERS.keySet());
+ Iterators.addAll(result,
Iterators.forEnumeration(config.getInitParameterNames()));
+
+ return Iterators.asEnumeration(result.iterator());
+ }
+ }
+
+ @Override
+ public void init(ServletConfig sc) throws ServletException {
+ super.init(new ServletConfigDefaultsWrapper(sc));
+ }
+
+}
Added:
trunk/core/impl/src/main/java/org/richfaces/webapp/PushServletContainerInitializer.java
===================================================================
---
trunk/core/impl/src/main/java/org/richfaces/webapp/PushServletContainerInitializer.java
(rev 0)
+++
trunk/core/impl/src/main/java/org/richfaces/webapp/PushServletContainerInitializer.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.webapp;
+
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Set;
+
+import javax.servlet.FilterRegistration;
+import javax.servlet.ServletContainerInitializer;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRegistration;
+import javax.servlet.ServletRegistration.Dynamic;
+
+import org.richfaces.application.push.impl.PushContextFactoryImpl;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class PushServletContainerInitializer implements ServletContainerInitializer {
+
+ private static final String PUSH_CONTEXT_DEFAULT_MAPPING = '/' +
PushContextFactoryImpl.PUSH_CONTEXT_RESOURCE_NAME;
+
+ private boolean hasPushFilterMapping(ServletContext context) {
+ Collection<? extends FilterRegistration> filterRegistrations =
context.getFilterRegistrations().values();
+ for (FilterRegistration filterRegistration : filterRegistrations) {
+ if (PushFilter.class.getName().equals(filterRegistration.getClassName())) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private ServletRegistration getPushServletRegistration(ServletContext context) {
+ Collection<? extends ServletRegistration> servletRegistrations =
context.getServletRegistrations().values();
+ for (ServletRegistration servletRegistration : servletRegistrations) {
+ if (PushServlet.class.getName().equals(servletRegistration.getClassName()))
{
+ if (servletRegistration.getMappings() != null &&
!servletRegistration.getMappings().isEmpty()) {
+ return servletRegistration;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private static void registerPushServlet(ServletContext context) {
+ Dynamic dynamicRegistration =
context.addServlet("AutoRegisteredPushServlet", PushServlet.class);
+ dynamicRegistration.addMapping(PUSH_CONTEXT_DEFAULT_MAPPING);
+ dynamicRegistration.setAsyncSupported(true);
+ }
+
+ public void onStartup(Set<Class<?>> clasess, ServletContext
servletContext) throws ServletException {
+ if (hasPushFilterMapping(servletContext)) {
+ return;
+ }
+
+ try {
+ String pushHandlerMapping;
+
+ ServletRegistration servletRegistration =
getPushServletRegistration(servletContext);
+ if (servletRegistration == null) {
+ registerPushServlet(servletContext);
+ pushHandlerMapping = PUSH_CONTEXT_DEFAULT_MAPPING;
+ } else {
+ pushHandlerMapping = Iterables.get(servletRegistration.getMappings(),
0);
+ }
+
+
servletContext.setAttribute(PushContextFactoryImpl.PUSH_HANDLER_MAPPING_ATTRIBUTE,
pushHandlerMapping);
+ } catch (Exception e) {
+ servletContext.log(MessageFormat.format("Exception registering RichFaces
Push Servlet: {0]", e.getMessage()), e);
+ }
+ }
+
+}
Added:
trunk/core/impl/src/main/resources/META-INF/services/javax.servlet.ServletContainerInitializer
===================================================================
---
trunk/core/impl/src/main/resources/META-INF/services/javax.servlet.ServletContainerInitializer
(rev 0)
+++
trunk/core/impl/src/main/resources/META-INF/services/javax.servlet.ServletContainerInitializer 2011-04-28
16:58:13 UTC (rev 22451)
@@ -0,0 +1 @@
+org.richfaces.webapp.PushServletContainerInitializer
\ No newline at end of file
Modified:
trunk/examples/irc-client/src/main/java/org/ircclient/listeners/TopicsInitializer.java
===================================================================
---
trunk/examples/irc-client/src/main/java/org/ircclient/listeners/TopicsInitializer.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/examples/irc-client/src/main/java/org/ircclient/listeners/TopicsInitializer.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -31,7 +31,7 @@
import javax.servlet.http.HttpServletRequest;
import org.ircclient.controller.ChatBean;
-import org.richfaces.application.push.EventAbortedException;
+import org.richfaces.application.push.SubscriptionAbortedException;
import org.richfaces.application.push.Session;
import org.richfaces.application.push.SessionPreSubscriptionEvent;
import org.richfaces.application.push.SessionSubscriptionEvent;
@@ -57,14 +57,14 @@
topic.addTopicListener(new SessionTopicListener() {
- public void processUnsubscriptionEvent(SessionUnsubscriptionEvent event)
throws EventAbortedException {
+ public void processUnsubscriptionEvent(SessionUnsubscriptionEvent event)
throws SubscriptionAbortedException {
TopicKey topicKey = event.getTopicKey();
Session session = event.getSession();
System.out.println(MessageFormat.format("Session {0} disconnected
from {1}", session.getId(),
topicKey.getTopicAddress()));
}
- public void processSubscriptionEvent(SessionSubscriptionEvent event) throws
EventAbortedException {
+ public void processSubscriptionEvent(SessionSubscriptionEvent event) throws
SubscriptionAbortedException {
TopicKey topicKey = event.getTopicKey();
Session session = event.getSession();
@@ -75,11 +75,11 @@
topicKey.getTopicAddress(), hsr.getRemoteAddr()));
}
- public void processPreSubscriptionEvent(SessionPreSubscriptionEvent event)
throws EventAbortedException {
+ public void processPreSubscriptionEvent(SessionPreSubscriptionEvent event)
throws SubscriptionAbortedException {
ExternalContext externalContext =
FacesContext.getCurrentInstance().getExternalContext();
ChatBean chatBean = (ChatBean)
externalContext.getSessionMap().get("chatBean");
if (chatBean == null || !(chatBean.isConnected())) {
- throw new EventAbortedException("We are not connected to
IRC");
+ throw new SubscriptionAbortedException("We are not connected to
IRC");
}
}
});
Modified: trunk/examples/push-demo/pom.xml
===================================================================
--- trunk/examples/push-demo/pom.xml 2011-04-28 14:16:16 UTC (rev 22450)
+++ trunk/examples/push-demo/pom.xml 2011-04-28 16:58:13 UTC (rev 22451)
@@ -127,6 +127,12 @@
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.6.1</version>
+ </dependency>
</dependencies>
<build>
Modified: trunk/examples/push-demo/src/main/java/demo/ChatBean.java
===================================================================
--- trunk/examples/push-demo/src/main/java/demo/ChatBean.java 2011-04-28 14:16:16 UTC (rev
22450)
+++ trunk/examples/push-demo/src/main/java/demo/ChatBean.java 2011-04-28 16:58:13 UTC (rev
22451)
@@ -21,11 +21,11 @@
*/
package demo;
+
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.Date;
-import javax.annotation.PostConstruct;
import javax.faces.bean.ManagedBean;
import javax.faces.bean.ManagedProperty;
import javax.faces.bean.SessionScoped;
@@ -54,16 +54,16 @@
private boolean chatJoined;
- private transient TopicsContext topicsContext;
-
private String subchannel;
-
+
@ManagedProperty("#{channelsBean}")
private ChannelsBean channelsBean;
-
- @PostConstruct
- public void init() {
- topicsContext = TopicsContext.lookup();
+
+ @ManagedProperty("#{jmsBean}")
+ private JMSBean jmsBean;
+
+ private TopicsContext lookupTopicsContext() {
+ return TopicsContext.lookup();
}
public String getMessage() {
@@ -82,15 +82,28 @@
this.userName = userName;
}
- private void publishStateChangeMessage(String name, String action) {
+ private void sendJMSMessage(TopicKey key, String text) {
+ jmsBean.publish(key, text);
+ }
+
+ private void sendMessage(TopicKey key, String text) {
+ sendJMSMessage(key, text);
+ //sendSimpleMessage(key, text);
+ }
+
+ private void sendSimpleMessage(TopicKey key, String text) {
try {
- topicsContext.publish(new TopicKey("chat", name),
MessageFormat.format("*** {0} {1} chat in {2,time,medium}",
- userName, action, new Date()));
+ lookupTopicsContext().publish(key, text);
} catch (MessageException e) {
LOGGER.error(e.getMessage(), e);
}
}
+ private void publishStateChangeMessage(String name, String action) {
+ sendMessage(new TopicKey("chat", name), MessageFormat.format("***
{0} {1} chat in {2,time,medium}",
+ userName, action, new Date()));
+ }
+
public void joinChat() {
if (!chatJoined) {
if (userName == null) {
@@ -117,18 +130,10 @@
}
public void say() {
- try {
- topicsContext.publish(new TopicKey("chat", subchannel),
MessageFormat.format("{0,time,medium} {1}: {2}", new Date(),
- userName, message));
- } catch (MessageException e) {
- LOGGER.error(e.getMessage(), e);
- }
+ sendMessage(new TopicKey("chat", subchannel),
MessageFormat.format("{0,time,medium} {1}: {2}", new Date(),
+ userName, message));
}
- public void setTopicsContext(TopicsContext topicsContext) {
- this.topicsContext = topicsContext;
- }
-
/**
* @return the subchannel
*/
@@ -150,4 +155,10 @@
this.channelsBean = channelsBean;
}
+ /**
+ * @param jmsBean the jmsBean to set
+ */
+ public void setJmsBean(JMSBean jmsBean) {
+ this.jmsBean = jmsBean;
+ }
}
Added: trunk/examples/push-demo/src/main/java/demo/JMSBean.java
===================================================================
--- trunk/examples/push-demo/src/main/java/demo/JMSBean.java (rev
0)
+++ trunk/examples/push-demo/src/main/java/demo/JMSBean.java 2011-04-28 16:58:13 UTC (rev
22451)
@@ -0,0 +1,242 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2011, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package demo;
+
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionFactory;
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionPassword;
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionPasswordEnvRef;
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionUsername;
+import static
org.richfaces.application.CoreConfiguration.Items.pushJMSConnectionUsernameEnvRef;
+import static org.richfaces.application.CoreConfiguration.Items.pushJMSTopicsNamespace;
+import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionFactory;
+import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionPassword;
+import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSConnectionUsername;
+import static
org.richfaces.application.CoreConfiguration.PushPropertiesItems.pushPropertiesJMSTopicsNamespace;
+
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.faces.bean.ApplicationScoped;
+import javax.faces.bean.ManagedBean;
+import javax.faces.context.FacesContext;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NameParser;
+import javax.naming.NamingException;
+
+import org.richfaces.application.ServiceTracker;
+import org.richfaces.application.configuration.ConfigurationService;
+import org.richfaces.application.push.TopicKey;
+import org.richfaces.log.Logger;
+import org.richfaces.log.RichfacesLogger;
+
+import com.google.common.base.Strings;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+@ManagedBean(name = "jmsBean")
+@ApplicationScoped
+public class JMSBean {
+
+ private static final Logger LOGGER = RichfacesLogger.APPLICATION.getLogger();
+
+ private Connection connection;
+
+ private final class PublishRunnable implements Runnable {
+
+ private final String topicsNamespaceString;
+
+ private PublishRunnable(String topicsNamespaceString) {
+ this.topicsNamespaceString = topicsNamespaceString;
+ }
+
+ public void run() {
+ Session session = null;
+ try {
+ InitialContext initialContext = new InitialContext();
+ NameParser nameParser = initialContext.getNameParser("");
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ while (true) {
+ PublishTask task = tasks.take();
+
+ if (task == null) {
+ break;
+ }
+
+ Name topicsNamespace = nameParser.parse(topicsNamespaceString);
+ Name topicName = appendToName(topicsNamespace,
task.getTopicKey().getTopicName());
+ Topic topic = (Topic) initialContext.lookup(topicName);
+
+ MessageProducer producer = null;
+ try {
+ producer = session.createProducer(topic);
+ ObjectMessage objectMessage =
session.createObjectMessage(task.getMessage());
+ objectMessage.setStringProperty("rf_push_subtopic",
task.getTopicKey().getSubtopicName());
+
+ producer.send(objectMessage);
+ } finally {
+ if (producer != null) {
+ try {
+ producer.close();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ } finally {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ }
+ }
+
+ private static final class PublishTask {
+
+ private TopicKey topicKey;
+
+ private Serializable message;
+
+ public PublishTask(TopicKey topicKey, Serializable message) {
+ super();
+ this.topicKey = topicKey;
+ this.message = message;
+ }
+
+ public TopicKey getTopicKey() {
+ return topicKey;
+ }
+
+ public Serializable getMessage() {
+ return message;
+ }
+ }
+
+ private Thread workerThread;
+
+ private BlockingQueue<PublishTask> tasks = new
LinkedBlockingQueue<PublishTask>();
+
+ private static String getConnectionFactory(FacesContext facesContext,
ConfigurationService configurationService) {
+ return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
+ pushPropertiesJMSConnectionFactory, pushJMSConnectionFactory);
+ }
+
+ private static String getTopicsNamespace(FacesContext facesContext,
ConfigurationService configurationService) {
+ return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
+ pushPropertiesJMSTopicsNamespace, pushJMSTopicsNamespace);
+ }
+
+ private static String getJMSPassword(FacesContext facesContext, ConfigurationService
configurationService) {
+ return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
+ pushPropertiesJMSConnectionPassword, pushJMSConnectionPasswordEnvRef,
pushJMSConnectionPassword);
+ }
+
+ private static String getJMSUserName(FacesContext facesContext, ConfigurationService
configurationService) {
+ return getFirstNonEmptyConfgirutationValue(facesContext, configurationService,
+ pushPropertiesJMSConnectionUsername, pushJMSConnectionUsernameEnvRef,
pushJMSConnectionUsername);
+ }
+
+ private static Name appendToName(Name name, String comp) throws NamingException {
+ Name clonedName = (Name) name.clone();
+ return clonedName.add(comp);
+ }
+
+ private static String getFirstNonEmptyConfgirutationValue(FacesContext facesContext,
ConfigurationService service, Enum<?>... keys) {
+ for (Enum<?> key : keys) {
+ String value = service.getStringValue(facesContext, key);
+ if (!Strings.isNullOrEmpty(value)) {
+ return value;
+ }
+ }
+
+ return "";
+ }
+
+ private Connection createConnection() throws Exception {
+ FacesContext facesContext = FacesContext.getCurrentInstance();
+ ConfigurationService configurationService =
ServiceTracker.getService(ConfigurationService.class);
+
+ InitialContext initialContext = new InitialContext();
+ NameParser nameParser = initialContext.getNameParser("");
+
+ Name cnfName = nameParser.parse(getConnectionFactory(facesContext,
configurationService));
+
+ ConnectionFactory connectionFactory = (ConnectionFactory)
initialContext.lookup(cnfName);
+ return connectionFactory.createConnection(getJMSUserName(facesContext,
configurationService), getJMSPassword(facesContext, configurationService));
+ }
+
+ public void publish(TopicKey topicKey, Serializable message) {
+ tasks.add(new PublishTask(topicKey, message));
+ }
+
+ @PostConstruct
+ public void initialize() {
+ try {
+ FacesContext facesContext = FacesContext.getCurrentInstance();
+ ConfigurationService configurationService =
ServiceTracker.getService(ConfigurationService.class);
+ final String topicsNamespaceString = getTopicsNamespace(facesContext,
configurationService);
+ connection = createConnection();
+ connection.start();
+
+ this.workerThread = new Thread(new PublishRunnable(topicsNamespaceString));
+
+ this.workerThread.setDaemon(true);
+ this.workerThread.start();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+
+ @PreDestroy
+ public void destroy() {
+ tasks.add(null);
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+}
Modified: trunk/examples/push-demo/src/main/java/demo/TopicsInitializer.java
===================================================================
--- trunk/examples/push-demo/src/main/java/demo/TopicsInitializer.java 2011-04-28 14:16:16
UTC (rev 22450)
+++ trunk/examples/push-demo/src/main/java/demo/TopicsInitializer.java 2011-04-28 16:58:13
UTC (rev 22451)
@@ -30,12 +30,12 @@
import javax.faces.event.SystemEventListener;
import javax.servlet.http.HttpServletRequest;
-import org.richfaces.application.push.EventAbortedException;
import org.richfaces.application.push.Session;
import org.richfaces.application.push.SessionPreSubscriptionEvent;
import org.richfaces.application.push.SessionSubscriptionEvent;
-import org.richfaces.application.push.SessionTopicListener;
+import org.richfaces.application.push.SessionTopicListener2;
import org.richfaces.application.push.SessionUnsubscriptionEvent;
+import org.richfaces.application.push.SubscriptionFailureException;
import org.richfaces.application.push.Topic;
import org.richfaces.application.push.TopicKey;
import org.richfaces.application.push.TopicsContext;
@@ -51,18 +51,17 @@
TopicsContext topicsContext = TopicsContext.lookup();
Topic topic = topicsContext.getOrCreateTopic(new TopicKey("chat"));
-
topic.setMessageDataSerializer(DefaultMessageDataSerializer.instance());
- topic.addTopicListener(new SessionTopicListener() {
+ topic.addTopicListener(new SessionTopicListener2() {
- public void processUnsubscriptionEvent(SessionUnsubscriptionEvent event)
throws EventAbortedException {
+ public void processUnsubscriptionEvent(SessionUnsubscriptionEvent event) {
TopicKey topicKey = event.getTopicKey();
Session session = event.getSession();
System.out.println(MessageFormat.format("Session {0} disconnected
from {1}", session.getId(), topicKey.getTopicAddress()));
}
- public void processSubscriptionEvent(SessionSubscriptionEvent event) throws
EventAbortedException {
+ public void processSubscriptionEvent(SessionSubscriptionEvent event) {
TopicKey topicKey = event.getTopicKey();
Session session = event.getSession();
@@ -73,11 +72,11 @@
topicKey.getTopicAddress(), hsr.getRemoteAddr()));
}
- public void processPreSubscriptionEvent(SessionPreSubscriptionEvent event)
throws EventAbortedException {
+ public void processPreSubscriptionEvent(SessionPreSubscriptionEvent event)
throws SubscriptionFailureException {
ExternalContext externalContext =
FacesContext.getCurrentInstance().getExternalContext();
ChatBean chatBean = (ChatBean)
externalContext.getSessionMap().get("chatBean");
if (chatBean == null ||
"badname".equals(chatBean.getUserName())) {
- throw new EventAbortedException("User name has not passed
validation");
+ throw new SubscriptionFailureException("User name has not passed
validation");
}
}
});
Modified: trunk/ui/core/ui/src/main/java/org/richfaces/renderkit/PushRendererBase.java
===================================================================
---
trunk/ui/core/ui/src/main/java/org/richfaces/renderkit/PushRendererBase.java 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/ui/core/ui/src/main/java/org/richfaces/renderkit/PushRendererBase.java 2011-04-28
16:58:13 UTC (rev 22451)
@@ -31,6 +31,9 @@
import javax.faces.render.Renderer;
import org.ajax4jsf.javascript.ScriptUtils;
+import org.richfaces.application.ServiceTracker;
+import org.richfaces.application.push.PushContext;
+import org.richfaces.application.push.PushContextFactory;
import org.richfaces.component.AbstractPush;
import org.richfaces.resource.PushResource;
@@ -42,13 +45,19 @@
private static final String PUSH_URL_ENCODED_ATTRIBUTE =
PushRendererBase.class.getName();
- protected String getPushUrl(FacesContext context) {
+ protected String getPushResourceUrl(FacesContext context) {
ResourceHandler resourceHandler = context.getApplication().getResourceHandler();
Resource pushResource =
resourceHandler.createResource(PushResource.class.getName());
return pushResource.getRequestPath();
}
+ protected String getPushHandlerUrl(FacesContext context) {
+ PushContext pushContext =
ServiceTracker.getService(PushContextFactory.class).getPushContext();
+
+ return pushContext.getPushHandlerUrl();
+ }
+
protected boolean shouldEncodePushUrl(FacesContext context) {
Map<Object, Object> attributes = context.getAttributes();
Modified: trunk/ui/core/ui/src/main/resources/META-INF/resources/org.richfaces/push.js
===================================================================
---
trunk/ui/core/ui/src/main/resources/META-INF/resources/org.richfaces/push.js 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/ui/core/ui/src/main/resources/META-INF/resources/org.richfaces/push.js 2011-04-28
16:58:13 UTC (rev 22451)
@@ -47,22 +47,45 @@
var handlersCounter = {};
- var pushUrl = null;
+ var pushResourceUrl = null;
+ var pushHandlerUrl = null;
+
var pushSessionId = null;
var suspendMessageEndMarker = /^(<!--[^>]+-->\s*)+/;
+ var messageTokenExpr = /<([^>]*)>/g;
+
+ var lastMessageNumber = -1;
+
+ var qualifyUrl = function(url) {
+ var result = url;
+
+ if (url.charAt(0) == '/') {
+ result = location.protocol + '//' + location.host + url;
+ }
+
+ return result;
+ };
+
var messageCallback = function(response) {
var dataString = response.responseBody.replace(suspendMessageEndMarker,
"");
if (dataString) {
- var messages = _$.parseJSON(dataString);
- if (messages) {
- for (var i = 0; i < messages.length; i++) {
- var message = messages[i];
-
- richfaces.Event.fire(document, getDataEventNamespace(message.topic),
message.data);
+ var messageToken;
+ while (messageToken = messageTokenExpr.exec(dataString)) {
+ if (!messageToken[1]) {
+ continue;
}
+
+ var message = _$.parseJSON('{' + messageToken[1] + '}');
+
+ if (message.number <= lastMessageNumber) {
+ continue;
+ }
+
+ richfaces.Event.fire(document, getDataEventNamespace(message.topic), message.data);
+ lastMessageNumber = message.number;
}
}
@@ -86,8 +109,9 @@
if (subscriptionData.sessionId) {
pushSessionId = subscriptionData.sessionId;
- _$.atmosphere.subscribe(pushUrl +
"?__richfacesPushAsync=1&pushSessionId=" + pushSessionId, messageCallback,
{
- /*transport: 'websocket'*/
+ _$.atmosphere.subscribe((pushHandlerUrl || pushResourceUrl) +
"?__richfacesPushAsync=1&pushSessionId=" + pushSessionId, messageCallback,
{
+ transport: richfaces.Push.transport,
+ fallbackTransport: richfaces.Push.fallbackTransport
});
}
};
@@ -98,7 +122,7 @@
}
var data = {
- "pushTopic": topics
+ "pushTopic": topics
};
if (pushSessionId) {
@@ -112,7 +136,7 @@
success: pushSessionIdRequestHandler,
traditional: true,
type: 'POST',
- url: pushUrl
+ url: pushResourceUrl
});
};
@@ -135,14 +159,14 @@
}
},
- setPushUrl: function(argPushUrl) {
- if (argPushUrl.charAt(0) == '/') {
- pushUrl = location.protocol + '//' + location.host + argPushUrl;
- } else {
- pushUrl = argPushUrl;
- }
+ setPushResourceUrl: function(url) {
+ pushResourceUrl = qualifyUrl(url);
},
+ setPushHandlerUrl: function(url) {
+ pushHandlerUrl = qualifyUrl(url);
+ },
+
updateConnection: function() {
if (_$.isEmptyObject(handlersCounter)) {
disconnect();
@@ -160,6 +184,9 @@
_$(document).ready(richfaces.Push.updateConnection);
+ richfaces.Push.transport = "websocket";
+ richfaces.Push.fallbackTransport = "long-polling";
+
var ajaxEventHandler = function(event) {
if (event.type == 'event') {
if (event.status != 'success') {
Modified:
trunk/ui/core/ui/src/main/templates/org/ajax4jsf/renderkit/html/push.template.xml
===================================================================
---
trunk/ui/core/ui/src/main/templates/org/ajax4jsf/renderkit/html/push.template.xml 2011-04-28
14:16:16 UTC (rev 22450)
+++
trunk/ui/core/ui/src/main/templates/org/ajax4jsf/renderkit/html/push.template.xml 2011-04-28
16:58:13 UTC (rev 22451)
@@ -24,7 +24,13 @@
<span id="#{clientId}">
<script type="text/javascript">
<c:if test="#{shouldEncodePushUrl(facesContext)}">
- RichFaces.Push.setPushUrl("#{getPushUrl(facesContext)}");
+
RichFaces.Push.setPushResourceUrl("#{getPushResourceUrl(facesContext)}");
+
+ <cdk:object name="pushHandlerUrl"
value="#{getPushHandlerUrl(facesContext)}" />
+ <c:if test="#{not empty pushHandlerUrl}">
+ RichFaces.Push.setPushHandlerUrl("#{pushHandlerUrl}");
+ </c:if>
+
</c:if>
<cdk:scriptObject name="options">