Author: nbelaevski
Date: 2010-10-13 18:25:05 -0400 (Wed, 13 Oct 2010)
New Revision: 19562
Added:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushRequestWorker.java
Removed:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionsWorker.java
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
branches/RF-7817/push-redesign/src/main/resources/META-INF/resources/org.richfaces/push.js
Log:
https://jira.jboss.org/browse/RF-7817
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java 2010-10-13
19:19:38 UTC (rev 19561)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/AtmospherePushHandler.java 2010-10-13
22:25:05 UTC (rev 19562)
@@ -44,10 +44,13 @@
private PushSessionTracker pushTracker;
+ private PushRequestWorker worker;
+
public AtmospherePushHandler(SubscriptionContext subscriptionContext) {
super();
pushTracker = new PushSessionTrackerImpl(subscriptionContext);
+ worker = new PushRequestWorker(8);
}
protected PushSessionTracker getPushTracker() {
@@ -79,7 +82,7 @@
resp.setContentType("text/plain");
- pushSession.connect(new RequestImpl(resource));
+ pushSession.connect(new RequestImpl(resource, pushSession, worker));
}
public void onStateChange(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event)
Copied:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushRequestWorker.java
(from rev 19546,
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionsWorker.java)
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushRequestWorker.java
(rev 0)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushRequestWorker.java 2010-10-13
22:25:05 UTC (rev 19562)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.richfaces.application.push.impl;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Nick Belaevski
+ *
+ */
+final class PushRequestWorker {
+
+ private static final ThreadFactory DAEMON_THREADS_FACTORY = new ThreadFactory() {
+
+ private final AtomicInteger threadsCounter = new AtomicInteger();
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "rf-push-worker-thread-" +
threadsCounter.getAndIncrement());
+ t.setDaemon(true);
+
+ return t;
+ }
+ };
+
+ private final class Task implements Runnable {
+
+ private RequestImpl request;
+
+ public Task(RequestImpl request) {
+ super();
+ this.request = request;
+ }
+
+ public void run() {
+ try {
+ // TODO Auto-generated method stub
+ try {
+ request.writeMessages();
+ request.resubmitToWorker();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ } catch (Throwable e) {
+ // TODO: handle exception
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private ExecutorService executorService;
+
+ public PushRequestWorker(int numThreads) {
+ super();
+
+ this.executorService = Executors.newFixedThreadPool(numThreads,
DAEMON_THREADS_FACTORY);
+ }
+
+ public void submit(RequestImpl request) {
+ executorService.submit(new Task(request));
+ }
+}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionImpl.java 2010-10-13
19:19:38 UTC (rev 19561)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionImpl.java 2010-10-13
22:25:05 UTC (rev 19562)
@@ -23,8 +23,8 @@
import java.io.IOException;
import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Delayed;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.richfaces.application.push.Message;
@@ -42,9 +42,9 @@
public class PushSessionImpl implements Delayed, PushSession, RequestLifecycleListener,
MessageListener {
private static final long EXPIRATION_DELAY = 45 * 1000;
+
+ private final Queue<Message> messagesQueue = new
ConcurrentLinkedQueue<Message>();
- private final Queue<Message> messagesQueue = new
LinkedBlockingQueue<Message>();
-
private final PushSessionTrackerImpl pushTracker;
private final String id;
@@ -81,7 +81,9 @@
// ***** RequestLifecycleListener methods
public void onSuspend(Request request) {
- pushTracker.getPushSessionsWorker().submit(PushSessionImpl.this);
+ if (!messagesQueue.isEmpty()) {
+ request.submitMessages();
+ }
}
public void onResume(Request request) {
@@ -112,7 +114,7 @@
messagesQueue.add(message);
if (request != null) {
- pushTracker.getPushSessionsWorker().submit(PushSessionImpl.this);
+ request.submitMessages();
}
}
@@ -135,20 +137,10 @@
}
- private void disconnect(Request request) throws IOException {
+ public synchronized void connect(Request argRequest) throws IOException {
if (request != null) {
- if (request.isSuspended()) {
- request.resume();
- }
-
- //TODO - request.removeListener(this) ?
- resetExpirationTime();
- pushTracker.onRequestDisconnected(this);
+ throw new IllegalStateException("Already connected!");
}
- }
-
- public synchronized void connect(Request argRequest) throws IOException {
- disconnect(request);
request = argRequest;
@@ -158,8 +150,17 @@
}
public synchronized void disconnect() throws IOException {
- disconnect(request);
- request = null;
+ if (request != null) {
+ if (request.isSuspended()) {
+ request.resume();
+ }
+
+ //TODO - request.removeListener(this) ?
+ resetExpirationTime();
+ pushTracker.onRequestDisconnected(this);
+
+ request = null;
+ }
}
public synchronized void destroy() {
@@ -167,32 +168,9 @@
for (TopicKey topicKey : topics) {
subscriptionContext.removeMessageListener(topicKey, this);
}
- pushTracker.getPushSessionsWorker().removeAll(this);
}
- public void writeMessages() throws IOException {
- Request requestVar = request;
-
- if (requestVar == null) {
- return;
- }
-
- boolean shouldResume = false;
- Message message = null;
-
- while (true) {
- message = messagesQueue.poll();
-
- if (message == null) {
- break;
- }
-
- requestVar.write(message);
- shouldResume = true;
- }
-
- if (shouldResume) {
- requestVar.resume();
- }
+ public Iterable<Message> getMessages() {
+ return messagesQueue;
}
}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java 2010-10-13
19:19:38 UTC (rev 19561)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionTrackerImpl.java 2010-10-13
22:25:05 UTC (rev 19562)
@@ -58,8 +58,6 @@
private SubscriptionContext subscriptionContext;
- private PushSessionsWorker pushSessionsWorker;
-
public PushSessionTrackerImpl(SubscriptionContext subscriptionContext) {
//TODO use configurable executor service
Thread t = new Thread(new SessionsExpirationRunnable(),
"rf-push-session-tracker");
@@ -67,7 +65,6 @@
t.start();
this.subscriptionContext = subscriptionContext;
- this.pushSessionsWorker = new PushSessionsWorker(8);
}
public PushSessionImpl createPushSession() {
@@ -105,7 +102,4 @@
return subscriptionContext;
}
- protected PushSessionsWorker getPushSessionsWorker() {
- return pushSessionsWorker;
- }
}
Deleted:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionsWorker.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionsWorker.java 2010-10-13
19:19:38 UTC (rev 19561)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/PushSessionsWorker.java 2010-10-13
22:25:05 UTC (rev 19562)
@@ -1,116 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2010, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.richfaces.application.push.impl;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.richfaces.application.push.PushSession;
-
-/**
- * @author Nick Belaevski
- *
- */
-final class PushSessionsWorker {
-
- private static final class Pipeline implements Runnable {
-
- private BlockingQueue<PushSession> sessionsQueue = new
LinkedBlockingQueue<PushSession>();
-
- //TODO - number of sessions posted to the queue should not exceed two items
- public void submit(PushSession session) {
- sessionsQueue.add(session);
- }
-
- public void removeAll(PushSession session) {
- boolean elementRemoved;
- do {
- elementRemoved = sessionsQueue.remove(session);
- } while (elementRemoved);
- }
-
- public void run() {
- while (true) {
- try {
- PushSession session = sessionsQueue.take();
- session.writeMessages();
- //do something
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- }
-
- private final int numThreads;
-
- private Pipeline[] pipelines;
-
- public PushSessionsWorker(int numThreads) {
- super();
-
- //it's important for hash code algorithm that number of workers is a power of
two
- this.numThreads = computeFloorPowerOfTwo(numThreads);
-
- createPipelines();
- }
-
- private int computeFloorPowerOfTwo(int num) {
- int result = 1;
- while (result < num) {
- result <<= 1;
- }
-
- return result;
- }
-
- private int hashIdx(Object o) {
- int hash = o.hashCode();
-
- return hash & (pipelines.length - 1);
- }
-
- private void createPipelines() {
- pipelines = new Pipeline[numThreads];
- for (int i = 0; i < pipelines.length; i++) {
- pipelines[i] = new Pipeline();
- Thread t = new Thread(pipelines[i], "rf-push-broadcater-" + i);
- t.setDaemon(true);
- t.start();
- }
- }
-
- public void submit(PushSession session) {
- int pipelineIdx = hashIdx(session.getId());
- pipelines[pipelineIdx].submit(session);
- }
-
- public void removeAll(PushSession session) {
- int pipelineIdx = hashIdx(session.getId());
- pipelines[pipelineIdx].removeAll(session);
- }
-}
Modified:
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java
===================================================================
---
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java 2010-10-13
19:19:38 UTC (rev 19561)
+++
branches/RF-7817/push-redesign/src/main/java/org/richfaces/application/push/impl/RequestImpl.java 2010-10-13
22:25:05 UTC (rev 19562)
@@ -24,7 +24,10 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -33,11 +36,15 @@
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceEventListener;
+import org.atmosphere.websocket.WebSocketSupport;
import org.richfaces.application.push.Message;
import org.richfaces.application.push.MessageSerializer;
+import org.richfaces.application.push.PushSession;
import org.richfaces.application.push.Request;
import org.richfaces.application.push.RequestLifecycleListener;
+import com.google.common.collect.Iterables;
+
/**
* @author Nick Belaevski
*
@@ -48,12 +55,18 @@
private static final String DATA_WRAPPER_END = "]";
- private static final String DATA_BLANK = "";
-
private AtmosphereResource<HttpServletRequest, HttpServletResponse>
atmosphereResource;
- private List<RequestLifecycleListener> listeners = new
ArrayList<RequestLifecycleListener>();
+ private List<RequestLifecycleListener> listeners = new
ArrayList<RequestLifecycleListener>(1);
+ private PushSession pushSession;
+
+ private PushRequestWorker worker;
+
+ private boolean submitted = false;
+
+ private Lock submittedLock = new ReentrantLock();
+
private AtmosphereResourceEventListener atmosphereListener = new
AtmosphereResourceEventListener() {
public void onSuspend(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
@@ -66,13 +79,6 @@
for (RequestLifecycleListener listener : listeners) {
listener.onResume(RequestImpl.this);
}
-
- try {
- encodeRequestEndElement();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
}
public void onDisconnect(AtmosphereResourceEvent<HttpServletRequest,
HttpServletResponse> event) {
@@ -87,14 +93,15 @@
}
};
- private boolean hasWrittenMessages = false;
+ public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
resource, PushSession pushSession,
+ PushRequestWorker worker) {
-
- public RequestImpl(AtmosphereResource<HttpServletRequest, HttpServletResponse>
resource) {
super();
this.atmosphereResource = resource;
((AtmosphereEventLifecycle)
atmosphereResource).addEventListener(atmosphereListener);
+ this.pushSession = pushSession;
+ this.worker = worker;
}
public void addListener(RequestLifecycleListener listener) {
@@ -105,22 +112,71 @@
listeners.remove(listener);
}
- public void write(Message message) throws IOException {
+ public void submitMessages() {
+ try {
+ submittedLock.lock();
+
+ if (!submitted) {
+ submitted = true;
+ worker.submit(this);
+ }
+ } finally {
+ submittedLock.unlock();
+ }
+ }
+
+ void resubmitToWorker() {
+ if (isPolling()) {
+ return;
+ }
+
+ try {
+ submittedLock.lock();
+ if (!Iterables.isEmpty(pushSession.getMessages())) {
+ worker.submit(this);
+ } else {
+ submitted = false;
+ }
+ } finally {
+ submittedLock.unlock();
+ }
+ }
+
+ void writeMessages() throws IOException {
HttpServletResponse response = atmosphereResource.getResponse();
PrintWriter writer = response.getWriter();
- if (!hasWrittenMessages) {
- encodeRequestStartElement();
- }
+ StringBuilder sb = new StringBuilder();
- writer.write(getMessageSerializer(message).serialize(message));
+ boolean isFirstMessage = true;
- if (hasWrittenMessages) {
- writer.write(", ");
- } else {
- hasWrittenMessages = true;
+ Iterable<Message> messages = pushSession.getMessages();
+ for (Iterator<Message> itr = messages.iterator(); itr.hasNext(); ) {
+ if (isFirstMessage) {
+ sb.append(DATA_WRAPPER_START);
+ }
+
+ Message message = itr.next();
+ itr.remove();
+ sb.append(getMessageSerializer(message).serialize(message));
+
+ if (!isFirstMessage) {
+ sb.append(", ");
+ } else {
+ isFirstMessage = false;
+ }
}
+
+ if (sb.length() != 0) {
+ sb.append(DATA_WRAPPER_END);
+
+ writer.write(sb.toString());
+
+ if (isPolling()) {
+ resume();
+ }
+ }
}
public void suspend() throws IOException {
@@ -140,25 +196,6 @@
return atmosphereResource.getAtmosphereResourceEvent().isSuspended();
}
- public void encodeRequestStartElement() throws IOException {
- write(DATA_WRAPPER_START);
- }
-
- public void encodeRequestEndElement() throws IOException {
- if (hasWrittenMessages) {
- write(DATA_WRAPPER_END);
- } else {
- write(DATA_BLANK);
- }
- }
-
- private void write(String s) throws IOException {
- HttpServletResponse response = atmosphereResource.getResponse();
-
- PrintWriter writer = response.getWriter();
- writer.write(s);
- }
-
private MessageSerializer getMessageSerializer(Message message) {
MessageSerializer serializer = (MessageSerializer)
message.getAttribute(MessageSerializer.MESSAGE_ATTRIBUTE_NAME);
@@ -168,4 +205,17 @@
return serializer;
}
+
+ public boolean isPolling() {
+ HttpServletRequest req = atmosphereResource.getRequest();
+ boolean isWebsocket = req.getAttribute(WebSocketSupport.WEBSOCKET_SUSPEND) !=
null ||
+ req.getAttribute(WebSocketSupport.WEBSOCKET_RESUME) != null;
+
+ //TODO how to detect non-polling transports?
+ return !isWebsocket;
+ }
+
+ public PushSession getPushSession() {
+ return pushSession;
+ }
}
Modified:
branches/RF-7817/push-redesign/src/main/resources/META-INF/resources/org.richfaces/push.js
===================================================================
---
branches/RF-7817/push-redesign/src/main/resources/META-INF/resources/org.richfaces/push.js 2010-10-13
19:19:38 UTC (rev 19561)
+++
branches/RF-7817/push-redesign/src/main/resources/META-INF/resources/org.richfaces/push.js 2010-10-13
22:25:05 UTC (rev 19562)
@@ -56,7 +56,9 @@
var connect = function() {
var pushSessionIdRequestHandler = function(data) {
pushSessionId = data;
- _$.atmosphere.subscribe(pushUrl + "/hub?pushSessionId=" + pushSessionId,
messageCallback);
+ _$.atmosphere.subscribe(pushUrl + "/hub?pushSessionId=" + pushSessionId,
messageCallback, {
+ transport: 'websocket'
+ });
};
var topics = new Array();