Author: nbelaevski
Date: 2010-10-14 20:28:55 -0400 (Thu, 14 Oct 2010)
New Revision: 19577
Added:
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/TopicImpl.java
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/TopicsContextImpl.java
branches/RF-7817/push-redesign-app/src/main/java/demo/HornetQInitializer.java
branches/RF-7817/push-redesign-app/src/main/java/demo/TopicsInitializer.java
branches/RF-7817/push-redesign/checkstyle-suppressions.xml
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageDataSerializer.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PublisherContextImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
Removed:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageSerializer.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/MessagesContextImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushRequestWorker.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java
Modified:
branches/RF-7817/core/impl/checkstyle-suppressions.xml
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/DefaultModule.java
branches/RF-7817/pom.xml
branches/RF-7817/push-redesign-app/pom.xml
branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/faces-config.xml
branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/web.xml
branches/RF-7817/push-redesign/pom.xml
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/webapp/PushServlet.java
branches/RF-7817/ui/common/ui/checkstyle-suppressions.xml
branches/RF-7817/ui/output/ui/checkstyle-suppressions.xml
Log:
https://jira.jboss.org/browse/RF-7817
- JMS migration
Modified: branches/RF-7817/core/impl/checkstyle-suppressions.xml
===================================================================
--- branches/RF-7817/core/impl/checkstyle-suppressions.xml 2010-10-15 00:26:15 UTC (rev
19576)
+++ branches/RF-7817/core/impl/checkstyle-suppressions.xml 2010-10-15 00:28:55 UTC (rev
19577)
@@ -33,4 +33,8 @@
<!-- TODO it is hot fix for building process, this files belong to the
ui/output/trunk/panela/ui module,
and must be removed from here -->
<suppress checks="IllegalCatch"
files="AbstractTogglePanel.java" />
+
+ <!-- TODO it is hot fix for building process, this files belong to the
push-redesign module,
+ and must be removed from here -->
+ <suppress checks="IllegalCatch" files="RequestImpl.java" />
</suppressions>
Modified:
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/DefaultModule.java
===================================================================
---
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/DefaultModule.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/DefaultModule.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -2,6 +2,8 @@
import org.richfaces.application.configuration.ConfigurationService;
import org.richfaces.application.configuration.ConfigurationServiceImpl;
+import org.richfaces.application.push.TopicsContext;
+import org.richfaces.application.push.impl.TopicsContextImpl;
import org.richfaces.cache.Cache;
import org.richfaces.l10n.BundleLoader;
import org.richfaces.renderkit.AjaxDataSerializer;
@@ -25,6 +27,7 @@
factory.setInstance(DependencyInjector.class, new
DependencyInjectionServiceImpl());
factory.setInstance(MessageFactory.class, new MessageFactoryImpl(new
BundleLoader()));
factory.setInstance(ResourceLibraryFactory.class, new
ResourceLibraryFactoryImpl());
+ factory.setInstance(TopicsContext.class, new TopicsContextImpl());
}
}
Added:
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/TopicImpl.java
===================================================================
---
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/TopicImpl.java
(rev 0)
+++
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/TopicImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import org.richfaces.application.push.MessageDataSerializer;
+import org.richfaces.application.push.Topic;
+import org.richfaces.application.push.TopicKey;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class TopicImpl implements Topic {
+
+ private TopicKey key;
+
+ private volatile MessageDataSerializer serializer;
+
+ public TopicImpl(TopicKey key) {
+ super();
+ this.key = key;
+ }
+
+ public MessageDataSerializer getMessageSerializer() {
+ return serializer;
+ }
+
+ public void setMessageSerializer(MessageDataSerializer serializer) {
+ this.serializer = serializer;
+ }
+
+ public TopicKey getKey() {
+ return key;
+ }
+
+}
Added:
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/TopicsContextImpl.java
===================================================================
---
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/TopicsContextImpl.java
(rev 0)
+++
branches/RF-7817/core/impl/src/main/java/org/richfaces/application/push/impl/TopicsContextImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.richfaces.application.push.Topic;
+import org.richfaces.application.push.TopicKey;
+import org.richfaces.application.push.TopicsContext;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class TopicsContextImpl implements TopicsContext {
+
+ private ConcurrentMap<TopicKey, Topic> topics = new
ConcurrentHashMap<TopicKey, Topic>();
+
+ public Topic getOrCreateTopic(TopicKey key) {
+ Topic result = topics.get(key);
+
+ if (result == null) {
+ Topic newTopic = new TopicImpl(key);
+ result = topics.putIfAbsent(key, newTopic);
+ if (result == null) {
+ result = newTopic;
+ }
+ }
+
+ return result;
+ }
+
+ public Topic getTopic(TopicKey key) {
+ return topics.get(key);
+ }
+
+ public void removeTopic(TopicKey key) {
+ topics.remove(key);
+ }
+
+}
Modified: branches/RF-7817/pom.xml
===================================================================
--- branches/RF-7817/pom.xml 2010-10-15 00:26:15 UTC (rev 19576)
+++ branches/RF-7817/pom.xml 2010-10-15 00:28:55 UTC (rev 19577)
@@ -29,6 +29,8 @@
<module>archetypes</module>
<module>examples</module>
<!--<module>docs</module>-->
+ <module>push-redesign</module>
+ <module>push-redesign-app</module>
</modules>
<build>
Added: branches/RF-7817/push-redesign/checkstyle-suppressions.xml
===================================================================
--- branches/RF-7817/push-redesign/checkstyle-suppressions.xml
(rev 0)
+++ branches/RF-7817/push-redesign/checkstyle-suppressions.xml 2010-10-15 00:28:55 UTC
(rev 19577)
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+
+<!DOCTYPE suppressions PUBLIC
+ "-//Puppy Crawl//DTD Suppressions 1.0//EN"
+ "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
+
+<!-- NOTE: Because of issues with maven checkstyle plugin each suppression
+ file must have all suppressions defined.
+
+ See RF-9077 for details. -->
+<suppressions>
+ <suppress checks="DoubleCheckedLocking"
files="MultipartRequestRegistry.java" />
+ <suppress checks="FallThrough" files="JSMin.java" />
+ <suppress checks="IllegalCatch" files="UISwitchablePanel.java"
/>
+ <suppress checks="ModifiedControlVariable"
files="MultipartRequest.java" />
+ <suppress checks="IllegalCatch" files="StagingServer.java"
/>
+ <suppress checks="IllegalCatch"
files="PartialViewContextImpl.java" />
+ <suppress checks="FallThrough" files="StagingServer.java"
/>
+ <suppress checks="ModifiedControlVariable" files="Cookie.java"
/>
+ <suppress checks="IllegalCatch" files="CacheManager.java"
/>
+ <suppress checks="MissingSwitchDefault"
files="JSONTokener.java" />
+ <suppress checks="IllegalCatch" files="AjaxViewRoot.java"
/>
+ <suppress checks="FallThrough" files="XMLTokener.java" />
+ <suppress checks="IllegalCatch"
files="ResourceHandlerImpl.java" />
+ <suppress checks="IllegalThrows"
files="AbstractThreadedTest.java" />
+ <suppress checks="ModifiedControlVariable"
files="URLCodec.java" />
+
+ <!-- TODO it is hot fix for building process, this files belong to the test-base
module,
+ and must be removed from here -->
+ <suppress checks="IllegalCatch"
files="AbstractThreadedTest.java" />
+ <suppress checks="IllegalCatch" files="StagingServer.java"
/>
+
+ <!-- TODO it is hot fix for building process, this files belong to the
ui/output/trunk/panela/ui module,
+ and must be removed from here -->
+ <suppress checks="IllegalCatch"
files="AbstractTogglePanel.java" />
+
+ <!-- TODO it is hot fix for building process, this files belong to the
push-redesign module,
+ and must be removed from here -->
+ <suppress checks="IllegalCatch" files="RequestImpl.java" />
+</suppressions>
Modified: branches/RF-7817/push-redesign/pom.xml
===================================================================
--- branches/RF-7817/push-redesign/pom.xml 2010-10-15 00:26:15 UTC (rev 19576)
+++ branches/RF-7817/push-redesign/pom.xml 2010-10-15 00:28:55 UTC (rev 19577)
@@ -21,6 +21,13 @@
<build>
<plugins>
<plugin>
+ <!-- Checkstyle only required here because suppressions needed -->
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+
<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
+ </configuration>
+ </plugin>
+ <plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.5</source>
@@ -68,5 +75,11 @@
<artifactId>atmosphere-runtime</artifactId>
<version>0.6.2</version>
</dependency>
+ <dependency>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ <version>1.1</version>
+ <optional>true</optional>
+ </dependency>
</dependencies>
</project>
Added:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractSession.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AbstractSession.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,152 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.richfaces.application.push.Request;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.SessionListener;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public abstract class AbstractSession implements Session {
+
+ private static final int MAX_INACTIVE_INTERVAL = 60 * 1000;
+
+ private static final SessionListener[] EMPTY_SESSION_LISTENERS_ARRAY = new
SessionListener[0];
+
+ private static enum Events {
+ connected {
+ @Override
+ public void notifyListener(AbstractSession session, SessionListener listener)
{
+ listener.onRequestConnected(session);
+ }
+ },
+
+ disconnected {
+ @Override
+ public void notifyListener(AbstractSession session, SessionListener listener)
{
+ listener.onRequestDisconnected(session);
+ }
+ },
+
+ destroyed {
+ @Override
+ public void notifyListener(AbstractSession session, SessionListener listener)
{
+ listener.onSessionDestroyed(session);
+ }
+ };
+
+ abstract void notifyListener(AbstractSession session, SessionListener listener);
+
+ public void notifyListeners(AbstractSession session) {
+ for (SessionListener listener : session.listeners) {
+ notifyListener(session, listener);
+ }
+ }
+ }
+
+ private final String id;
+
+ private volatile long lastAccessedTime;
+
+ private volatile Request request;
+
+ //TODO performance?
+ private List<SessionListener> listeners = new
CopyOnWriteArrayList<SessionListener>();
+
+ public AbstractSession(String id) {
+ super();
+ this.id = id;
+
+ resetLastAccessedTimeToCurrent();
+ }
+
+ private void resetLastAccessedTimeToCurrent() {
+ lastAccessedTime = System.currentTimeMillis();
+ }
+
+ public void connect(Request request) throws Exception {
+ if (this.request != null) {
+ throw new IllegalStateException("already connected");
+ }
+
+ this.request = request;
+
+ Events.connected.notifyListeners(this);
+
+ request.suspend();
+ }
+
+ public void disconnect(Request request) throws Exception {
+ this.request = null;
+ resetLastAccessedTimeToCurrent();
+
+ Events.disconnected.notifyListeners(this);
+ }
+
+ public long getLastAccessedTime() {
+ if (request != null) {
+ return System.currentTimeMillis();
+ }
+
+ return lastAccessedTime;
+ }
+
+ public int getMaxInactiveInterval() {
+ return MAX_INACTIVE_INTERVAL;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void addSessionListener(SessionListener sessionListener) {
+ listeners.add(sessionListener);
+ }
+
+ public SessionListener[] getSessionListeners() {
+ return listeners.toArray(EMPTY_SESSION_LISTENERS_ARRAY);
+ }
+
+ public void removeSessionListener(SessionListener sessionListener) {
+ listeners.remove(sessionListener);
+ }
+
+ public Request getRequest() {
+ return request;
+ }
+
+ public void destroy() {
+ if (request != null) {
+ //TODO - clean up request
+ }
+
+ Events.destroyed.notifyListeners(this);
+ // TODO Auto-generated method stub
+
+ }
+}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -22,44 +22,59 @@
package org.richfaces.application.push.impl;
import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.servlet.ServletConfig;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.atmosphere.cpr.AtmosphereHandler;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
-import org.richfaces.application.push.PushSession;
-import org.richfaces.application.push.PushSessionTracker;
-import org.richfaces.application.push.SubscriptionContext;
+import org.richfaces.application.push.Request;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.SessionManager;
import org.richfaces.application.push.TopicKey;
/**
* @author Nick Belaevski
*
*/
-public class AtmospherePushHandler implements AtmosphereHandler<HttpServletRequest,
HttpServletResponse> {
+public abstract class AtmospherePushHandler implements
AtmosphereHandler<HttpServletRequest, HttpServletResponse> {
+ private static final ThreadFactory DAEMON_THREADS_FACTORY = new ThreadFactory() {
+
+ private final AtomicInteger threadsCounter = new AtomicInteger();
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "rf-push-worker-thread-" +
threadsCounter.getAndIncrement());
+ t.setDaemon(true);
+
+ return t;
+ }
+ };
+
private static final String PUSH_SESSION_ID_PARAM = "pushSessionId";
- private PushSessionTracker pushTracker;
+ private SessionManager sessionManager;
- private PushRequestWorker worker;
+ private ExecutorService worker;
- public AtmospherePushHandler(SubscriptionContext subscriptionContext) {
+ public AtmospherePushHandler() {
super();
- pushTracker = new PushSessionTrackerImpl(subscriptionContext);
- worker = new PushRequestWorker(8);
+ sessionManager = new SessionManagerImpl(DAEMON_THREADS_FACTORY);
+ worker = Executors.newCachedThreadPool(DAEMON_THREADS_FACTORY);
}
- protected PushSessionTracker getPushTracker() {
- return pushTracker;
+ protected SessionManager getSessionManager() {
+ return sessionManager;
}
- /* (non-Javadoc)
- * @see
org.atmosphere.cpr.AtmosphereHandler#onRequest(org.atmosphere.cpr.AtmosphereResource)
- */
public void onRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource) throws IOException {
// TODO Auto-generated method stub
@@ -68,13 +83,13 @@
String pushSessionId = req.getParameter(PUSH_SESSION_ID_PARAM);
- PushSession pushSession = null;
+ Session session = null;
if (pushSessionId != null) {
- pushSession = getPushTracker().getPushSession(pushSessionId);
+ session = getSessionManager().getPushSession(pushSessionId);
}
- if (pushSession == null) {
+ if (session == null) {
//TODO - debug log
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
@@ -82,7 +97,12 @@
resp.setContentType("text/plain");
- pushSession.connect(new RequestImpl(resource, pushSession, worker));
+ try {
+ session.connect(createRequest(resource, session));
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
public void onStateChange(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event)
@@ -90,18 +110,35 @@
//do nothing
}
- public PushSession doConnect(String[] topicNames) {
- PushSession pushSession = getPushTracker().createPushSession();
+ public Session doConnect(String[] topicNames) {
+ Session pushSession = createSession(UUID.randomUUID().toString());
+ sessionManager.putPushSession(pushSession);
- TopicKey[] topicKeys = new TopicKey[topicNames.length];
for (int i = 0; i < topicNames.length; i++) {
- String topicName = topicNames[i];
+ TopicKey topicKey = new TopicKey(topicNames[i]);
- topicKeys[i] = new TopicKey(topicName);
+ try {
+ pushSession.subscribe(topicKey);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
-
- //TODO - check permissions for channels
- pushSession.subscribe(topicKeys);
+
return pushSession;
}
+
+ protected abstract Session createSession(String key);
+
+ protected Request createRequest(AtmosphereResource<HttpServletRequest,
HttpServletResponse> resource, Session session) {
+ return new RequestImpl(resource, session, worker);
+ }
+
+ public void init(ServletConfig servletConfig) throws Exception {
+ }
+
+ public void destroy() throws Exception {
+ sessionManager.destroy();
+ }
+
}
Copied:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageDataSerializer.java
(from rev 19530,
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageSerializer.java)
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageDataSerializer.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageDataSerializer.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,44 @@
+package org.richfaces.application.push.impl;
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+import org.ajax4jsf.javascript.ScriptUtils;
+import org.richfaces.application.push.MessageDataSerializer;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public final class DefaultMessageDataSerializer implements MessageDataSerializer {
+
+ private static final MessageDataSerializer INSTANCE = new
DefaultMessageDataSerializer();
+
+ private DefaultMessageDataSerializer() {}
+
+ public static MessageDataSerializer instance() {
+ return INSTANCE;
+ }
+
+ public String serialize(Object data) {
+ return ScriptUtils.toScript(data);
+ }
+}
Deleted:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageSerializer.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageSerializer.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/DefaultMessageSerializer.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -1,59 +0,0 @@
-package org.richfaces.application.push.impl;
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.ajax4jsf.javascript.ScriptUtils;
-import org.richfaces.application.push.Message;
-import org.richfaces.application.push.MessageSerializer;
-
-/**
- * @author Nick Belaevski
- *
- */
-public final class DefaultMessageSerializer implements MessageSerializer {
-
- public static final String TOPIC_ATTRIBUTE = "topic";
-
- public static final String ATTRIBUTES_ATTRIBUTE = "attributes";
-
- public static final String DATA_ATTRIBUTE = "data";
-
- private static final MessageSerializer INSTANCE = new DefaultMessageSerializer();
-
- private DefaultMessageSerializer() {}
-
- public String serialize(Message message) {
- Map<String,Object> dataMap = new HashMap<String, Object>();
-
- dataMap.put(TOPIC_ATTRIBUTE, message.getTopicKey().getTopicName());
- dataMap.put(DATA_ATTRIBUTE, message.getData());
-
- return ScriptUtils.toScript(dataMap);
- }
-
- public static MessageSerializer instance() {
- return INSTANCE;
- }
-}
Deleted:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/MessagesContextImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/MessagesContextImpl.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/MessagesContextImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -1,96 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import javax.servlet.ServletContext;
-
-import org.richfaces.application.push.Message;
-import org.richfaces.application.push.MessageListener;
-import org.richfaces.application.push.PublisherContext;
-import org.richfaces.application.push.SubscriptionContext;
-import org.richfaces.application.push.TopicKey;
-
-/**
- * @author Nick Belaevski
- *
- */
-public final class MessagesContextImpl implements SubscriptionContext, PublisherContext
{
-
- private ConcurrentMap<TopicKey, List<MessageListener>> listenersMap = new
ConcurrentHashMap<TopicKey, List<MessageListener>>();
-
- private MessagesContextImpl() {}
-
- public void addMessageListener(TopicKey topicKey, MessageListener listener) {
- List<MessageListener> listeners = listenersMap.get(topicKey);
- if (listeners == null) {
- List<MessageListener> newListenersList = new
CopyOnWriteArrayList<MessageListener>();
-
- listeners = listenersMap.putIfAbsent(topicKey, newListenersList);
- if (listeners == null) {
- listeners = newListenersList;
- }
- }
-
- listeners.add(listener);
- }
-
- public void removeMessageListener(TopicKey topicKey, MessageListener listener) {
- List<MessageListener> listeners = listenersMap.get(topicKey);
- if (listeners != null) {
- listeners.remove(listener);
- }
- }
-
- public void publish(TopicKey topicKey, Object data) {
- Message message = new Message(topicKey);
- message.setData(data);
-
- publish(message);
- }
-
- public void publish(Message message) {
- List<MessageListener> listeners = listenersMap.get(message.getTopicKey());
- if (listeners != null) {
- for (MessageListener listener : listeners) {
- listener.onMessage(message);
- }
- }
- }
-
- public static MessagesContextImpl create(ServletContext servletContext) {
- MessagesContextImpl result = new MessagesContextImpl();
-
- servletContext.setAttribute(PublisherContext.ATTRIBUTE_NAME, result);
-
- return result;
- }
-
- public static void destroy(ServletContext servletContext) {
- servletContext.removeAttribute(PublisherContext.ATTRIBUTE_NAME);
- }
-
-}
Deleted:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushRequestWorker.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushRequestWorker.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushRequestWorker.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -1,85 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author Nick Belaevski
- *
- */
-final class PushRequestWorker {
-
- private static final ThreadFactory DAEMON_THREADS_FACTORY = new ThreadFactory() {
-
- private final AtomicInteger threadsCounter = new AtomicInteger();
-
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "rf-push-worker-thread-" +
threadsCounter.getAndIncrement());
- t.setDaemon(true);
-
- return t;
- }
- };
-
- private final class Task implements Runnable {
-
- private RequestImpl request;
-
- public Task(RequestImpl request) {
- super();
- this.request = request;
- }
-
- public void run() {
- try {
- // TODO Auto-generated method stub
- try {
- request.writeMessages();
- request.resubmitToWorker();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- } catch (Throwable e) {
- // TODO: handle exception
- e.printStackTrace();
- }
- }
- }
-
- private ExecutorService executorService;
-
- public PushRequestWorker(int numThreads) {
- super();
-
- this.executorService = Executors.newFixedThreadPool(numThreads,
DAEMON_THREADS_FACTORY);
- }
-
- public void submit(RequestImpl request) {
- executorService.submit(new Task(request));
- }
-}
Deleted:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionImpl.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -1,176 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import org.richfaces.application.push.Message;
-import org.richfaces.application.push.MessageListener;
-import org.richfaces.application.push.PushSession;
-import org.richfaces.application.push.Request;
-import org.richfaces.application.push.RequestLifecycleListener;
-import org.richfaces.application.push.SubscriptionContext;
-import org.richfaces.application.push.TopicKey;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class PushSessionImpl implements Delayed, PushSession, RequestLifecycleListener,
MessageListener {
-
- private static final long EXPIRATION_DELAY = 45 * 1000;
-
- private final Queue<Message> messagesQueue = new
ConcurrentLinkedQueue<Message>();
-
- private final PushSessionTrackerImpl pushTracker;
-
- private final String id;
-
- private volatile Request request = null;
-
- private volatile long expirationTime;
-
- private volatile TopicKey[] topics;
-
- public PushSessionImpl(PushSessionTrackerImpl pushTracker, String id) {
- super();
- this.pushTracker = pushTracker;
- this.id = id;
- resetExpirationTime();
- }
-
- private void resetExpirationTime() {
- expirationTime = System.currentTimeMillis() + EXPIRATION_DELAY;
- }
-
- public String getId() {
- return id;
- }
-
- public int compareTo(Delayed o) {
- return
Long.valueOf(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS));
- }
-
- public long getDelay(TimeUnit unit) {
- return unit.convert(expirationTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
- }
-
- // ***** RequestLifecycleListener methods
-
- public void onSuspend(Request request) {
- if (!messagesQueue.isEmpty()) {
- request.submitMessages();
- }
- }
-
- public void onResume(Request request) {
- // TODO Auto-generated method stub
- try {
- disconnect();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public void onDisconnect(Request request) {
- // TODO Auto-generated method stub
- try {
- disconnect();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- // ***** RequestLifecycleListener methods end
-
- // ***** MessageListener methods
-
- public void onMessage(Message message) {
- messagesQueue.add(message);
-
- if (request != null) {
- request.submitMessages();
- }
- }
-
- // ***** MessageListener methods end
-
- public synchronized void subscribe(TopicKey[] topics) {
- SubscriptionContext subscriptionContext = pushTracker.getSubscriptionContext();
-
- if (this.topics != null) {
- for (TopicKey topicKey : topics) {
- subscriptionContext.removeMessageListener(topicKey, this);
- }
- }
-
- this.topics = topics;
-
- for (TopicKey topicKey : topics) {
- subscriptionContext.addMessageListener(topicKey, this);
- }
-
- }
-
- public synchronized void connect(Request argRequest) throws IOException {
- if (request != null) {
- throw new IllegalStateException("Already connected!");
- }
-
- request = argRequest;
-
- pushTracker.onRequestConnected(this);
- request.addListener(this);
- request.suspend();
- }
-
- public synchronized void disconnect() throws IOException {
- if (request != null) {
- if (request.isSuspended()) {
- request.resume();
- }
-
- //TODO - request.removeListener(this) ?
- resetExpirationTime();
- pushTracker.onRequestDisconnected(this);
-
- request = null;
- }
- }
-
- public synchronized void destroy() {
- SubscriptionContext subscriptionContext = pushTracker.getSubscriptionContext();
- for (TopicKey topicKey : topics) {
- subscriptionContext.removeMessageListener(topicKey, this);
- }
- }
-
- public Iterable<Message> getMessages() {
- return messagesQueue;
- }
-}
Deleted:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -1,105 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.DelayQueue;
-
-import org.richfaces.application.push.PushSessionTracker;
-import org.richfaces.application.push.SubscriptionContext;
-
-import com.google.common.collect.MapMaker;
-
-/**
- * @author Nick Belaevski
- *
- */
-public class PushSessionTrackerImpl implements PushSessionTracker {
-
- private final class SessionsExpirationRunnable implements Runnable {
- public void run() {
- while (true) {
- try {
- PushSessionImpl pushSession = expirationQueue.take();
- pushSessionMap.remove(pushSession.getId());
- pushSession.destroy();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
- }
-
- private ConcurrentMap<String, PushSessionImpl> pushSessionMap = new
MapMaker().makeMap();
-
- private DelayQueue<PushSessionImpl> expirationQueue = new
DelayQueue<PushSessionImpl>();
-
- private SubscriptionContext subscriptionContext;
-
- public PushSessionTrackerImpl(SubscriptionContext subscriptionContext) {
- //TODO use configurable executor service
- Thread t = new Thread(new SessionsExpirationRunnable(),
"rf-push-session-tracker");
- t.setDaemon(true);
- t.start();
-
- this.subscriptionContext = subscriptionContext;
- }
-
- public PushSessionImpl createPushSession() {
- while (true) {
- String uuid = UUID.randomUUID().toString();
- PushSessionImpl pushSession = new PushSessionImpl(this, uuid);
- if (pushSessionMap.putIfAbsent(uuid, pushSession) == null) {
- expirationQueue.put(pushSession);
-
- return pushSession;
- }
- }
- }
-
- public PushSessionImpl getPushSession(String id) {
- return pushSessionMap.get(id);
- }
-
- public void removePushSession(String id) {
- PushSessionImpl session = pushSessionMap.remove(id);
- if (session != null) {
- expirationQueue.remove(session);
- }
- }
-
- void onRequestConnected(PushSessionImpl pushSession) {
- expirationQueue.remove(pushSession);
- }
-
- void onRequestDisconnected(PushSessionImpl pushSession) {
- expirationQueue.add(pushSession);
- }
-
- protected SubscriptionContext getSubscriptionContext() {
- return subscriptionContext;
- }
-
-}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -23,52 +23,58 @@
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.ajax4jsf.javascript.JSLiteral;
+import org.ajax4jsf.javascript.ScriptUtils;
import org.atmosphere.cpr.AtmosphereEventLifecycle;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceEventListener;
import org.atmosphere.websocket.WebSocketSupport;
import org.richfaces.application.push.Message;
-import org.richfaces.application.push.MessageSerializer;
-import org.richfaces.application.push.PushSession;
import org.richfaces.application.push.Request;
import org.richfaces.application.push.RequestLifecycleListener;
+import org.richfaces.application.push.Session;
-import com.google.common.collect.Iterables;
-
/**
* @author Nick Belaevski
*
*/
public class RequestImpl implements Request {
- private static final String DATA_WRAPPER_START = "[";
-
- private static final String DATA_WRAPPER_END = "]";
-
- private AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource;
+ private static final class FlushMessagesTask implements Runnable {
+
+ private Request request;
- private List<RequestLifecycleListener> listeners = new
ArrayList<RequestLifecycleListener>(1);
-
- private PushSession pushSession;
-
- private PushRequestWorker worker;
-
- private boolean submitted = false;
-
- private Lock submittedLock = new ReentrantLock();
-
- private AtmosphereResourceEventListener atmosphereListener = new
AtmosphereResourceEventListener() {
+ public FlushMessagesTask(Request request) {
+ super();
+ this.request = request;
+ }
+ public void run() {
+ try {
+ request.flushMessages();
+ } catch (Throwable e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ private AtmosphereResourceEventListener atmosphereResourceListener = new
AtmosphereResourceEventListener() {
+
public void onSuspend(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
for (RequestLifecycleListener listener : listeners) {
listener.onSuspend(RequestImpl.this);
@@ -78,7 +84,7 @@
public void onResume(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
for (RequestLifecycleListener listener : listeners) {
listener.onResume(RequestImpl.this);
- }
+ }
}
public void onDisconnect(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
@@ -93,119 +99,116 @@
}
};
- public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
resource, PushSession pushSession,
- PushRequestWorker worker) {
+ private final AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource;
+ private final Session session;
+
+ private final ExecutorService executorService;
+
+ private final Queue<Message> messagesQueue = new
ConcurrentLinkedQueue<Message>();
+
+ private final List<RequestLifecycleListener> listeners = new
CopyOnWriteArrayList<RequestLifecycleListener>();
+
+ private AtomicBoolean submitted = new AtomicBoolean(false);
+
+ public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource, Session session,
+ ExecutorService executorService) {
+
super();
- this.atmosphereResource = resource;
- ((AtmosphereEventLifecycle)
atmosphereResource).addEventListener(atmosphereListener);
- this.pushSession = pushSession;
- this.worker = worker;
+ this.atmosphereResource = atmosphereResource;
+
+ ((AtmosphereEventLifecycle)
atmosphereResource).addEventListener(atmosphereResourceListener);
+
+ this.session = session;
+ this.executorService = executorService;
}
-
- public void addListener(RequestLifecycleListener listener) {
- listeners.add(listener);
- }
-
- public void removeListener(RequestLifecycleListener listener) {
- listeners.remove(listener);
- }
- public void submitMessages() {
- try {
- submittedLock.lock();
-
- if (!submitted) {
- submitted = true;
- worker.submit(this);
- }
- } finally {
- submittedLock.unlock();
+ private void submitToWorker() {
+ if (submitted.compareAndSet(false, true)) {
+ executorService.submit(new FlushMessagesTask(this));
}
}
-
- void resubmitToWorker() {
- if (isPolling()) {
- return;
- }
- try {
- submittedLock.lock();
- if (!Iterables.isEmpty(pushSession.getMessages())) {
- worker.submit(this);
- } else {
- submitted = false;
- }
- } finally {
- submittedLock.unlock();
- }
- }
-
- void writeMessages() throws IOException {
- HttpServletResponse response = atmosphereResource.getResponse();
-
- PrintWriter writer = response.getWriter();
-
+ private String serializeMessages() {
StringBuilder sb = new StringBuilder();
- boolean isFirstMessage = true;
+ sb.append("[");
+
+ boolean isFirstElement = true;
- Iterable<Message> messages = pushSession.getMessages();
- for (Iterator<Message> itr = messages.iterator(); itr.hasNext(); ) {
- if (isFirstMessage) {
- sb.append(DATA_WRAPPER_START);
+ while (true) {
+ Message message = messagesQueue.poll();
+ if (message == null) {
+ break;
}
-
- Message message = itr.next();
- itr.remove();
- sb.append(getMessageSerializer(message).serialize(message));
-
- if (!isFirstMessage) {
+
+ if (isFirstElement) {
+ isFirstElement = false;
+ } else {
sb.append(", ");
- } else {
- isFirstMessage = false;
}
- }
-
- if (sb.length() != 0) {
- sb.append(DATA_WRAPPER_END);
- writer.write(sb.toString());
+ //TODO - use MessageSerializer
+ String messageDataString =
DefaultMessageDataSerializer.instance().serialize(message.getData());
- if (isPolling()) {
- resume();
- }
+ Map<String, Object> map = new HashMap<String, Object>(2);
+
+ map.put("data", new JSLiteral(messageDataString));
+ map.put("topic", message.getTopicKey().getTopicName());
+
+ sb.append(ScriptUtils.toScript(map));
}
+
+ sb.append("]");
+
+ return sb.toString();
}
+
+ public void flushMessages() throws IOException {
+ for (RequestLifecycleListener listener : listeners) {
+ listener.onFlush(this);
+ }
+
+ String serializedMessages = serializeMessages();
+ PrintWriter writer = atmosphereResource.getResponse().getWriter();
+ writer.write(serializedMessages);
+ writer.flush();
+
+ submitted.compareAndSet(true, false);
- public void suspend() throws IOException {
- if (!isSuspended()) {
- //TODO - customize interval
- atmosphereResource.suspend();
+ if (isPolling()) {
+ atmosphereResource.resume();
+ } else if (!messagesQueue.isEmpty()) {
+ submitToWorker();
}
}
+ public void postMessage(Message message) {
+ messagesQueue.add(message);
+ submitToWorker();
+ }
+
+ public void addListener(RequestLifecycleListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(RequestLifecycleListener listener) {
+ listeners.remove(listener);
+ }
+
+ public void suspend() throws IOException {
+ atmosphereResource.suspend(-1, isPolling());
+ }
+
public void resume() throws IOException {
- if (isSuspended()) {
- atmosphereResource.resume();
- }
+ atmosphereResource.resume();
}
public boolean isSuspended() {
return atmosphereResource.getAtmosphereResourceEvent().isSuspended();
}
- private MessageSerializer getMessageSerializer(Message message) {
- MessageSerializer serializer = (MessageSerializer)
message.getAttribute(MessageSerializer.MESSAGE_ATTRIBUTE_NAME);
-
- if (serializer == null) {
- serializer = DefaultMessageSerializer.instance();
- }
-
- return serializer;
- }
-
public boolean isPolling() {
HttpServletRequest req = atmosphereResource.getRequest();
boolean isWebsocket = req.getAttribute(WebSocketSupport.WEBSOCKET_SUSPEND) !=
null ||
@@ -215,7 +218,8 @@
return !isWebsocket;
}
- public PushSession getPushSession() {
- return pushSession;
+ public Session getSession() {
+ return session;
}
+
}
Copied:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java
(from rev 19562,
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java)
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/SessionManagerImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.SessionListener;
+import org.richfaces.application.push.SessionManager;
+
+import com.google.common.collect.MapMaker;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class SessionManagerImpl implements SessionManager {
+
+ //TODO - implement queue around sessions properly
+ private final class DelayedSessionHolder implements Delayed {
+
+ private final Session session;
+
+ public DelayedSessionHolder(Session session) {
+ super();
+ this.session = session;
+ }
+
+ public int compareTo(Delayed o) {
+ return
Long.valueOf(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS));
+ }
+
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(session.getLastAccessedTime() +
session.getMaxInactiveInterval() - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + getOuterType().hashCode();
+ result = prime * result + ((session == null) ? 0 : session.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ DelayedSessionHolder other = (DelayedSessionHolder) obj;
+ if (!getOuterType().equals(other.getOuterType())) {
+ return false;
+ }
+ if (session == null) {
+ if (other.session != null) {
+ return false;
+ }
+ } else if (!session.equals(other.session)) {
+ return false;
+ }
+ return true;
+ }
+
+ private SessionManagerImpl getOuterType() {
+ return SessionManagerImpl.this;
+ }
+ }
+
+ private final class SessionsExpirationRunnable implements Runnable {
+ public void run() {
+ while (true) {
+ try {
+ DelayedSessionHolder sessionHolder = expirationQueue.take();
+ sessionMap.remove(sessionHolder.getSession().getId());
+ sessionHolder.getSession().destroy();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ }
+ }
+
+ private final SessionListener sessionListener = new SessionListener() {
+
+ public void onSessionDestroyed(Session session) {
+ // TODO Auto-generated method stub
+ removePushSession(session.getId());
+ }
+
+ public void onRequestDisconnected(Session session) {
+ expirationQueue.add(new DelayedSessionHolder(session));
+ }
+
+ public void onRequestConnected(Session session) {
+ expirationQueue.remove(new DelayedSessionHolder(session));
+ }
+ };
+
+ private ConcurrentMap<String, Session> sessionMap = new MapMaker().makeMap();
+
+ private DelayQueue<DelayedSessionHolder> expirationQueue = new
DelayQueue<DelayedSessionHolder>();
+
+ public SessionManagerImpl(ThreadFactory threadFactory) {
+ threadFactory.newThread(new SessionsExpirationRunnable()).start();
+ }
+
+ public Session getPushSession(String id) {
+ return sessionMap.get(id);
+ }
+
+ public void removePushSession(String id) {
+ Session session = sessionMap.remove(id);
+ if (session != null) {
+ session.removeSessionListener(sessionListener);
+ expirationQueue.remove(new DelayedSessionHolder(session));
+ }
+ }
+
+ public void destroy() {
+ //TODO notify all session
+ expirationQueue.clear();
+ sessionMap.clear();
+ }
+
+ public void putPushSession(Session session) throws IllegalStateException {
+ Session existingSession = sessionMap.putIfAbsent(session.getId(), session);
+ if (existingSession != null) {
+ throw new IllegalStateException();
+ }
+
+ expirationQueue.add(new DelayedSessionHolder(session));
+ session.addSessionListener(sessionListener);
+ }
+}
Added:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/MessagingContext.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,122 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl.jms;
+
+import java.util.UUID;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.naming.NamingException;
+
+import org.richfaces.application.push.TopicKey;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class MessagingContext {
+
+ private final InitialContext initialContext;
+
+ private final Name connectionFactoryName;
+
+ private final Name topicsRootName;
+
+ private final String applicationName;
+
+ private final String username;
+
+ private final String password;
+
+ private Connection connection;
+
+ public MessagingContext(InitialContext initialContext, Name connectionFactoryName,
Name topicsRootName,
+ String applicationName) {
+
+ this(initialContext, connectionFactoryName, topicsRootName, applicationName,
null, null);
+ }
+
+ public MessagingContext(InitialContext initialContext, Name connectionFactoryName,
Name topicsRootName,
+ String applicationName, String username, String password) {
+
+ super();
+ this.initialContext = initialContext;
+ this.connectionFactoryName = connectionFactoryName;
+ this.topicsRootName = topicsRootName;
+ this.applicationName = applicationName;
+ this.username = username;
+ this.password = password;
+ }
+
+ private Name appendToName(Name name, String comp) throws NamingException {
+ Name clonedName = (Name) name.clone();
+ return clonedName.add(comp);
+ }
+
+ public void start() throws Exception {
+ ConnectionFactory connectionFactory = (ConnectionFactory)
initialContext.lookup(connectionFactoryName);
+
+ connection = connectionFactory.createConnection(username, password);
+
+ //TODO - review
+ try {
+ //durable subscription requires ClientID to be set
+ connection.setClientID(UUID.randomUUID().toString());
+ } catch (IllegalStateException e) {
+ //ignore - clientId has already been set
+ }
+
+ connection.start();
+ }
+
+ public void stop() throws Exception {
+ connection.stop();
+ connection = null;
+ }
+
+ public Connection getConnection() {
+ if (connection == null) {
+ throw new IllegalStateException("connection is absent");
+ }
+
+ return connection;
+ }
+
+ public Topic lookup(TopicKey topicKey) throws NamingException {
+ Name topicName = appendToName(topicsRootName, topicKey.getTopicName());
+
+ return (Topic) initialContext.lookup(topicName);
+ }
+
+ public Session createSession() throws JMSException {
+ return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ public String getApplicationName() {
+ return applicationName;
+ }
+}
Added:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PublisherContextImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PublisherContextImpl.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PublisherContextImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl.jms;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.richfaces.application.push.PublisherContext;
+import org.richfaces.application.push.TopicKey;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class PublisherContextImpl implements PublisherContext {
+
+ private MessagingContext messagingContext;
+
+ public PublisherContextImpl(MessagingContext messagingContext) {
+ super();
+ this.messagingContext = messagingContext;
+ }
+
+ public void publish(TopicKey topicKey, Object data) {
+ Session session = null;
+ try {
+ session = messagingContext.createSession();
+ MessageProducer producer =
session.createProducer(messagingContext.lookup(topicKey));
+
+ ObjectMessage objectMessage = session.createObjectMessage();
+ objectMessage.setObject((Serializable) data);
+
+ producer.send(objectMessage);
+ } catch (JMSException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (NamingException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+}
Added:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/PushHandlerImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl.jms;
+
+import javax.naming.CompositeName;
+import javax.naming.InitialContext;
+import javax.naming.Name;
+import javax.servlet.ServletConfig;
+
+import org.richfaces.application.push.PublisherContext;
+import org.richfaces.application.push.Session;
+import org.richfaces.application.push.impl.AtmospherePushHandler;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class PushHandlerImpl extends AtmospherePushHandler {
+
+ private MessagingContext messagingContext;
+
+ private ServletConfig servletConfig;
+
+ @Override
+ protected Session createSession(String key) {
+ return new SessionImpl(key, messagingContext);
+ }
+
+ @Override
+ public void init(ServletConfig servletConfig) throws Exception {
+ super.init(servletConfig);
+
+ this.servletConfig = servletConfig;
+
+ InitialContext initialContext = new InitialContext();
+ Name cnfName = new CompositeName("/ConnectionFactory");
+ Name topicsRootName = new CompositeName("/topic");
+
+ messagingContext = new MessagingContext(initialContext, cnfName, topicsRootName,
+ servletConfig.getServletContext().getContextPath());
+
+ messagingContext.start();
+
+ PublisherContext publisherContext = new PublisherContextImpl(messagingContext);
+ servletConfig.getServletContext().setAttribute(PublisherContext.ATTRIBUTE_NAME,
publisherContext);
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ super.destroy();
+
+
this.servletConfig.getServletContext().removeAttribute(PublisherContext.ATTRIBUTE_NAME);
+ this.servletConfig = null;
+
+ messagingContext.stop();
+ messagingContext = null;
+ }
+}
Added:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/jms/SessionImpl.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,192 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl.jms;
+
+import java.util.Collection;
+import java.util.HashSet;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.richfaces.application.push.Request;
+import org.richfaces.application.push.RequestLifecycleListener;
+import org.richfaces.application.push.TopicKey;
+import org.richfaces.application.push.impl.AbstractSession;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class SessionImpl extends AbstractSession implements RequestLifecycleListener,
MessageListener {
+
+ private MessagingContext messagingContext;
+
+ private Collection<TopicKey> subscribedTopics = new HashSet<TopicKey>();
+
+ private volatile Session jmsSession;
+
+ public SessionImpl(String id, MessagingContext messagingContext) {
+ super(id);
+
+ this.messagingContext = messagingContext;
+ }
+
+ private String getSubscriptionClientId(TopicKey topic) {
+ //TODO add application name
+ return "rf-push:" + messagingContext.getApplicationName() +
":" + topic.getTopicName() + ":" + getId();
+ }
+
+ public void subscribe(TopicKey topic) throws Exception {
+ Session session = null;
+ try {
+ session = messagingContext.createSession();
+
+ //TODO send event to check subscription permissions
+ session.createDurableSubscriber(messagingContext.lookup(topic),
getSubscriptionClientId(topic));
+ subscribedTopics.add(topic);
+
+ } finally {
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ // TODO: handle exception
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ // TODO Auto-generated method stub
+ super.destroy();
+
+ //TODO remove subscriptions
+ }
+
+ @Override
+ public void connect(Request request) throws Exception {
+ super.connect(request);
+
+ request.addListener(this);
+
+ jmsSession = messagingContext.createSession();
+
+ for (TopicKey topicKey : subscribedTopics) {
+ Topic topic = messagingContext.lookup(topicKey);
+ String subscriptionId = getSubscriptionClientId(topicKey);
+
+ TopicSubscriber subscriber = jmsSession.createDurableSubscriber(topic,
subscriptionId);
+ subscriber.setMessageListener(this);
+ }
+ }
+
+ @Override
+ public void disconnect(Request request) throws Exception {
+ jmsSession.close();
+ jmsSession = null;
+
+ request.removeListener(this);
+
+ super.disconnect(request);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.richfaces.application.push.RequestLifecycleListener#onFlush(org.richfaces.application.push.Request)
+ */
+ public void onFlush(Request request) {
+ if (request.isPolling()) {
+ try {
+ jmsSession.close();
+ } catch (JMSException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.richfaces.application.push.RequestLifecycleListener#onSuspend(org.richfaces.application.push.Request)
+ */
+ public void onSuspend(Request request) {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.richfaces.application.push.RequestLifecycleListener#onDisconnect(org.richfaces.application.push.Request)
+ */
+ public void onDisconnect(Request request) {
+ // TODO Auto-generated method stub
+
+ try {
+ disconnect(request);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.richfaces.application.push.RequestLifecycleListener#onResume(org.richfaces.application.push.Request)
+ */
+ public void onResume(Request request) {
+ // TODO Auto-generated method stub
+ try {
+ disconnect(request);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+ */
+ public void onMessage(Message message) {
+ if (!(message instanceof ObjectMessage)) {
+ //TODO log
+ return;
+ }
+
+ try {
+ ObjectMessage objectMessage = (ObjectMessage) message;
+ String topicName = ((Topic)
objectMessage.getJMSDestination()).getTopicName();
+
+ org.richfaces.application.push.Message pushMessage = new
org.richfaces.application.push.Message(new TopicKey(topicName));
+ pushMessage.setData(objectMessage.getObject());
+
+ getRequest().postMessage(pushMessage);
+
+ objectMessage.acknowledge();
+ } catch (JMSException e) {
+ // TODO: handle exception
+ e.printStackTrace();
+ }
+ }
+}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/webapp/PushServlet.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/webapp/PushServlet.java 2010-10-15
00:26:15 UTC (rev 19576)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/webapp/PushServlet.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -25,15 +25,14 @@
import java.lang.reflect.InvocationTargetException;
import javax.servlet.ServletConfig;
-import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.atmosphere.cpr.AtmosphereServlet;
-import org.richfaces.application.push.PushSession;
+import org.richfaces.application.push.Session;
import org.richfaces.application.push.impl.AtmospherePushHandler;
-import org.richfaces.application.push.impl.MessagesContextImpl;
+import org.richfaces.application.push.impl.jms.PushHandlerImpl;
/**
* @author Nick Belaevski
@@ -42,9 +41,6 @@
//TODO override broadcaster
public class PushServlet extends AtmosphereServlet {
- /**
- *
- */
private static final String PUSH_HUB_MAPPING = "/push/hub/*";
private static final long serialVersionUID = 7616370505508715222L;
@@ -53,34 +49,39 @@
private AtmospherePushHandler pushHandler;
- private ServletContext servletContext;
-
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
-
- servletContext = config.getServletContext();
- MessagesContextImpl messagesContext =
MessagesContextImpl.create(servletContext);
-
- pushHandler = new AtmospherePushHandler(messagesContext);
+
+ pushHandler = new PushHandlerImpl();
+ try {
+ pushHandler.init(config);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
addAtmosphereHandler(PUSH_HUB_MAPPING, pushHandler);
-
}
@Override
public void destroy() {
super.destroy();
- MessagesContextImpl.destroy(servletContext);
-
+ try {
+ pushHandler.destroy();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
pushHandler = null;
+
removeAtmosphereHandler(PUSH_HUB_MAPPING);
}
@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp) throws
ServletException, IOException {
String[] topicNames = req.getParameterValues(PUSH_TOPIC_PARAM);
- PushSession pushSession = pushHandler.doConnect(topicNames);
+ Session pushSession = pushHandler.doConnect(topicNames);
resp.setStatus(HttpServletResponse.SC_OK);
resp.getWriter().write(pushSession.getId());
Modified: branches/RF-7817/push-redesign-app/pom.xml
===================================================================
--- branches/RF-7817/push-redesign-app/pom.xml 2010-10-15 00:26:15 UTC (rev 19576)
+++ branches/RF-7817/push-redesign-app/pom.xml 2010-10-15 00:28:55 UTC (rev 19577)
@@ -7,7 +7,7 @@
<version>0.0.1-SNAPSHOT</version>
<name>push-redesign-app Maven Webapp</name>
<url>http://maven.apache.org</url>
-
+
<repositories>
<repository>
<id>sonatype.snapshots</id>
@@ -17,9 +17,45 @@
</snapshots>
</repository>
</repositories>
-
+
+ <properties>
+ <hornetq.version>2.1.2.Final</hornetq.version>
+ </properties>
+
<dependencies>
<dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-core</artifactId>
+ <version>${hornetq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jms</artifactId>
+ <version>${hornetq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-logging</artifactId>
+ <version>${hornetq.version}</version>
+ </dependency>
+ <!-- dependency>
+ <groupId>org.hornetq</groupId>
+ <artifactId>hornetq-transports</artifactId>
+ <version>2.1.0.r9031</version>
+ </dependency -->
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.2.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ <version>1.1</version>
+ </dependency>
+
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
@@ -31,11 +67,6 @@
<version>3.0-alpha-1</version>
<scope>provided</scope>
</dependency>
- <!-- dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-websocket</artifactId>
- <version>8.0.0.M1</version>
- </dependency -->
<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-runtime</artifactId>
@@ -77,11 +108,28 @@
<target>1.6</target>
</configuration>
</plugin>
- <plugin>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-maven-plugin</artifactId>
- <version>8.0.0.M1</version>
- </plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>jetty</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-websocket</artifactId>
+ <version>8.0.0.M1</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-maven-plugin</artifactId>
+ <version>8.0.0.M1</version>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
Added: branches/RF-7817/push-redesign-app/src/main/java/demo/HornetQInitializer.java
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/java/demo/HornetQInitializer.java
(rev 0)
+++
branches/RF-7817/push-redesign-app/src/main/java/demo/HornetQInitializer.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,120 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package demo;
+
+import java.util.HashSet;
+
+import javax.faces.application.Application;
+import javax.faces.context.FacesContext;
+import javax.faces.event.AbortProcessingException;
+import javax.faces.event.PostConstructApplicationEvent;
+import javax.faces.event.PreDestroyApplicationEvent;
+import javax.faces.event.SystemEvent;
+import javax.faces.event.SystemEventListener;
+import javax.naming.InitialContext;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class HornetQInitializer implements SystemEventListener {
+
+ private JMSServerManager serverManager;
+
+ @Override
+ public void processEvent(SystemEvent event) throws AbortProcessingException {
+ if (event instanceof PostConstructApplicationEvent) {
+ try {
+ startHornetQ();
+ } catch (Exception e) {
+ throw new AbortProcessingException(e);
+ }
+
+ Application application =
FacesContext.getCurrentInstance().getApplication();
+ application.subscribeToEvent(PreDestroyApplicationEvent.class, this);
+ } else {
+ try {
+ stopHornetQ();
+ } catch (Exception e) {
+ throw new AbortProcessingException(e);
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ *
+ */
+ private void stopHornetQ() throws Exception {
+ serverManager.stop();
+ serverManager = null;
+ }
+
+ /**
+ * @throws Exception
+ *
+ */
+ private void startHornetQ() throws Exception {
+ // Step 2. Create the Configuration, and set the properties accordingly
+ Configuration configuration = new ConfigurationImpl();
+ configuration.setPersistenceEnabled(false);
+ configuration.setSecurityEnabled(false);
+
+ TransportConfiguration transpConf = new
TransportConfiguration(NettyAcceptorFactory.class.getName());
+
+ HashSet<TransportConfiguration> setTransp = new
HashSet<TransportConfiguration>();
+ setTransp.add(transpConf);
+
+ configuration.setAcceptorConfigurations(setTransp);
+
+ // Step 3. Create and start the server
+ HornetQServer server = HornetQServers.newHornetQServer(configuration);
+
+ serverManager = new JMSServerManagerImpl(server);
+
+ //if you want to use JNDI, simple inject a context here or don't call this
method and make sure the JNDI parameters are set.
+ serverManager.setContext(new InitialContext());
+ serverManager.start();
+
+ serverManager.createConnectionFactory("ConnectionFactory", new
TransportConfiguration(NettyConnectorFactory.class.getName()),
+ "ConnectionFactory");
+
+ serverManager.createTopic(false, "chat", "/topic/chat");
+ serverManager.createTopic(false, "info", "/topic/info");
+ }
+
+ @Override
+ public boolean isListenerForSource(Object source) {
+ return true;
+ }
+
+}
Added: branches/RF-7817/push-redesign-app/src/main/java/demo/TopicsInitializer.java
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/java/demo/TopicsInitializer.java
(rev 0)
+++
branches/RF-7817/push-redesign-app/src/main/java/demo/TopicsInitializer.java 2010-10-15
00:28:55 UTC (rev 19577)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package demo;
+
+import javax.faces.event.AbortProcessingException;
+import javax.faces.event.SystemEvent;
+import javax.faces.event.SystemEventListener;
+
+import org.richfaces.application.ServiceTracker;
+import org.richfaces.application.push.Topic;
+import org.richfaces.application.push.TopicKey;
+import org.richfaces.application.push.TopicsContext;
+import org.richfaces.application.push.impl.DefaultMessageDataSerializer;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+public class TopicsInitializer implements SystemEventListener {
+
+ @Override
+ public void processEvent(SystemEvent event) throws AbortProcessingException {
+ TopicsContext topicsContext = ServiceTracker.getService(TopicsContext.class);
+
+ Topic topic = topicsContext.getOrCreateTopic(new
TopicKey("chatTopic"));
+
+ topic.setMessageSerializer(DefaultMessageDataSerializer.instance());
+ }
+
+ @Override
+ public boolean isListenerForSource(Object source) {
+ return true;
+ }
+
+}
Modified: branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/faces-config.xml
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/faces-config.xml 2010-10-15
00:26:15 UTC (rev 19576)
+++ branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/faces-config.xml 2010-10-15
00:28:55 UTC (rev 19577)
@@ -1,6 +1,18 @@
<?xml version='1.0' encoding='UTF-8'?>
-<faces-config
xmlns="http://java.sun.com/xml/ns/javaee"
- version="2.0">
+<faces-config version="2.0"
xmlns="http://java.sun.com/xml/ns/javaee"
+
xmlns:xi="http://www.w3.org/2001/XInclude"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-facesconfig_2_0.xsd">
+ <application>
+ <system-event-listener>
+
<system-event-listener-class>demo.HornetQInitializer</system-event-listener-class>
+
<system-event-class>javax.faces.event.PostConstructApplicationEvent</system-event-class>
+ </system-event-listener>
+ <system-event-listener>
+
<system-event-listener-class>demo.TopicsInitializer</system-event-listener-class>
+
<system-event-class>javax.faces.event.PostConstructApplicationEvent</system-event-class>
+ </system-event-listener>
+ </application>
+
</faces-config>
\ No newline at end of file
Modified: branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/web.xml
===================================================================
--- branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/web.xml 2010-10-15 00:26:15
UTC (rev 19576)
+++ branches/RF-7817/push-redesign-app/src/main/webapp/WEB-INF/web.xml 2010-10-15 00:28:55
UTC (rev 19577)
@@ -9,9 +9,16 @@
<param-value>Development</param-value>
</context-param>
+ <listener>
+
<listener-class>com.sun.faces.config.ConfigureListener</listener-class>
+ </listener>
<servlet>
<servlet-name>push</servlet-name>
<servlet-class>org.richfaces.webapp.PushServlet</servlet-class>
+ <init-param>
+ <param-name>org.atmosphere.useWebSocket</param-name>
+ <param-value>true</param-value>
+ </init-param>
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
</servlet>
Modified: branches/RF-7817/ui/common/ui/checkstyle-suppressions.xml
===================================================================
--- branches/RF-7817/ui/common/ui/checkstyle-suppressions.xml 2010-10-15 00:26:15 UTC (rev
19576)
+++ branches/RF-7817/ui/common/ui/checkstyle-suppressions.xml 2010-10-15 00:28:55 UTC (rev
19577)
@@ -33,4 +33,8 @@
<!-- TODO it is hot fix for building process, this files belong to the
ui/output/trunk/panela/ui module,
and must be removed from here -->
<suppress checks="IllegalCatch"
files="AbstractTogglePanel.java" />
-</suppressions>
+
+ <!-- TODO it is hot fix for building process, this files belong to the
push-redesign module,
+ and must be removed from here -->
+ <suppress checks="IllegalCatch" files="RequestImpl.java" />
+</suppressions>
\ No newline at end of file
Modified: branches/RF-7817/ui/output/ui/checkstyle-suppressions.xml
===================================================================
--- branches/RF-7817/ui/output/ui/checkstyle-suppressions.xml 2010-10-15 00:26:15 UTC (rev
19576)
+++ branches/RF-7817/ui/output/ui/checkstyle-suppressions.xml 2010-10-15 00:28:55 UTC (rev
19577)
@@ -33,4 +33,8 @@
<!-- TODO it is hot fix for building process, this files belong to the
ui/output/trunk/panela/ui module,
and must be removed from here -->
<suppress checks="IllegalCatch"
files="AbstractTogglePanel.java" />
+
+ <!-- TODO it is hot fix for building process, this files belong to the
push-redesign module,
+ and must be removed from here -->
+ <suppress checks="IllegalCatch" files="RequestImpl.java" />
</suppressions>