[jboss-remoting-commits] JBoss Remoting SVN: r4021 - in remoting3/trunk: http/src/main/java/org/jboss/cx/remoting/http/spi and 1 other directories.
jboss-remoting-commits at lists.jboss.org
jboss-remoting-commits at lists.jboss.org
Fri Apr 18 20:51:04 EDT 2008
Author: david.lloyd at jboss.com
Date: 2008-04-18 20:51:04 -0400 (Fri, 18 Apr 2008)
New Revision: 4021
Added:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/AbstractHttpChannel.java
Removed:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutboundHttpChannelProvider.java
Modified:
remoting3/trunk/http-urlconnection/src/main/java/org/jboss/cx/remoting/http/urlconnection/AbstractHttpUrlChannel.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/HttpProtocolSupport.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/HttpChannel.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpChannelContext.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpServerContext.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpSessionContext.java
Log:
Progress towards http urlconnection client
Added: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/AbstractHttpChannel.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/AbstractHttpChannel.java (rev 0)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/AbstractHttpChannel.java 2008-04-19 00:51:04 UTC (rev 4021)
@@ -0,0 +1,94 @@
+package org.jboss.cx.remoting.http;
+
+import org.jboss.cx.remoting.http.spi.HttpChannel;
+import org.jboss.cx.remoting.http.spi.RemotingHttpChannelContext;
+
+/**
+ *
+ */
+public abstract class AbstractHttpChannel implements HttpChannel {
+
+ protected AbstractHttpChannel() {
+ }
+
+ // Configuration
+
+ private int localParkTime = -1;
+ private int remoteParkTime = -1;
+
+ /**
+ * Get the amount of time that a given channel member may be locally parked. A value of {@code -1} means "park
+ * indefinitely". A value of {@code 0} means "do not park". Otherwise the value is interpreted as milliseconds.
+ *
+ * In the case of an HTTP server, the minimum of this time and the client-requested timeout should be used.
+ *
+ * @return the local park time
+ */
+ public int getLocalParkTime() {
+ return localParkTime;
+ }
+
+ /**
+ * Set the amount of time that a given channel member may be locally parked. A value of {@code -1} means "park
+ * indefinitely". A value of {@code 0} means "do not park". Otherwise the value is interpreted as milliseconds.
+ *
+ * In the case of an HTTP server, the minimum of this time and the client-requested timeout should be used.
+ *
+ * @param localParkTime the local park time
+ */
+ public void setLocalParkTime(final int localParkTime) {
+ this.localParkTime = localParkTime;
+ }
+
+ /**
+ * Get the amount of time that a given channel member may be remotely parked. A value of {@code -1} means "park
+ * indefinitely". A value of {@code 0} means "do not park". Otherwise the value is interpreted as milliseconds.
+ *
+ * @return the remote park time
+ */
+ public int getRemoteParkTime() {
+ return remoteParkTime;
+ }
+
+ /**
+ * Set the amount of time that a given channel member may be remotely parked. A value of {@code -1} means "park
+ * indefinitely". A value of {@code 0} means "do not park". Otherwise the value is interpreted as milliseconds.
+ *
+ * @param remoteParkTime the remote park time
+ */
+ public void setRemoteParkTime(final int remoteParkTime) {
+ this.remoteParkTime = remoteParkTime;
+ }
+
+ // Dependencies
+
+ private RemotingHttpChannelContext channelContext;
+
+ public RemotingHttpChannelContext getChannelContext() {
+ return channelContext;
+ }
+
+ public void setChannelContext(final RemotingHttpChannelContext channelContext) {
+ this.channelContext = channelContext;
+ }
+
+ // Lifecycle
+
+ public void create() {
+ if (channelContext == null) {
+ throw new NullPointerException("channelContext is null");
+ }
+ }
+
+ public void start() {
+
+ }
+
+ public void stop() {
+
+ }
+
+ public void destroy() {
+
+ }
+}
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/HttpProtocolSupport.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/HttpProtocolSupport.java 2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/HttpProtocolSupport.java 2008-04-19 00:51:04 UTC (rev 4021)
@@ -99,7 +99,7 @@
private final ConcurrentMap<String, RemotingHttpSession> sessionMap = CollectionUtil.concurrentWeakValueMap();
- private String generateSessionId() {
+ public String generateSessionId() {
final byte[] bytes = new byte[32];
StringBuilder builder = new StringBuilder(bytes.length * 2);
random.nextBytes(bytes);
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/HttpChannel.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/HttpChannel.java 2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/HttpChannel.java 2008-04-19 00:51:04 UTC (rev 4021)
@@ -4,9 +4,4 @@
*
*/
public interface HttpChannel {
- void setChannelContext(RemotingHttpChannelContext channelContext);
-
- void sendMessage(OutgoingHttpMessage message);
-
- void close();
}
Deleted: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutboundHttpChannelProvider.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutboundHttpChannelProvider.java 2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutboundHttpChannelProvider.java 2008-04-19 00:51:04 UTC (rev 4021)
@@ -1,8 +0,0 @@
-package org.jboss.cx.remoting.http.spi;
-
-/**
- *
- */
-public interface OutboundHttpChannelProvider {
-
-}
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpChannelContext.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpChannelContext.java 2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpChannelContext.java 2008-04-19 00:51:04 UTC (rev 4021)
@@ -4,8 +4,20 @@
*
*/
public interface RemotingHttpChannelContext {
+ /**
+ * Process an HTTP message that has arrived.
+ *
+ * @param incomingHttpMessage the HTTP message
+ */
+ void processInboundMessage(IncomingHttpMessage incomingHttpMessage);
- void receiveMessage(IncomingHttpMessage incomingHttpMessage);
-
- void sendComplete(OutgoingHttpMessage outgoingHttpMessage);
+ /**
+ * Wait for an outgoing HTTP message to become available, up to a certain time limit. If no message is available
+ * within the specified time limit, or if the thread is interrupted before a message could become available, return
+ * an empty message.
+ *
+ * @param millis the amount of time to wait in millseconds, {@code 0} to not wait, or {@code -1} to wait indefinitely.
+ * @return an outgoing HTTP message
+ */
+ OutgoingHttpMessage waitForOutgoingHttpMessage(int millis);
}
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpServerContext.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpServerContext.java 2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpServerContext.java 2008-04-19 00:51:04 UTC (rev 4021)
@@ -4,5 +4,5 @@
*
*/
public interface RemotingHttpServerContext {
- RemotingHttpSessionContext locateSession(IncomingHttpMessage message);
+ RemotingHttpSessionContext locateSession(String remotingSessionId);
}
Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpSessionContext.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpSessionContext.java 2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpSessionContext.java 2008-04-19 00:51:04 UTC (rev 4021)
@@ -4,9 +4,11 @@
*
*/
public interface RemotingHttpSessionContext {
- void processInboundMessage(IncomingHttpMessage incomingHttpMessage);
- OutgoingHttpMessage getOutgoingHttpMessage();
-
- OutgoingHttpMessage waitForOutgoingHttpMessage(long millis);
+ /**
+ * Get a channel context that can be used to transport HTTP messages for this session.
+ *
+ * @return the channel context
+ */
+ RemotingHttpChannelContext getChannelContext();
}
Modified: remoting3/trunk/http-urlconnection/src/main/java/org/jboss/cx/remoting/http/urlconnection/AbstractHttpUrlChannel.java
===================================================================
--- remoting3/trunk/http-urlconnection/src/main/java/org/jboss/cx/remoting/http/urlconnection/AbstractHttpUrlChannel.java 2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http-urlconnection/src/main/java/org/jboss/cx/remoting/http/urlconnection/AbstractHttpUrlChannel.java 2008-04-19 00:51:04 UTC (rev 4021)
@@ -9,20 +9,24 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Future;
+import org.jboss.cx.remoting.http.AbstractHttpChannel;
import org.jboss.cx.remoting.http.cookie.CookieClientSession;
import org.jboss.cx.remoting.http.spi.AbstractIncomingHttpMessage;
import org.jboss.cx.remoting.http.spi.OutgoingHttpMessage;
-import org.jboss.cx.remoting.http.spi.RemotingHttpSessionContext;
+import org.jboss.cx.remoting.http.spi.RemotingHttpChannelContext;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.util.AbstractOutputStreamByteMessageOutput;
import org.jboss.cx.remoting.util.ByteMessageInput;
import org.jboss.cx.remoting.util.InputStreamByteMessageInput;
import org.jboss.cx.remoting.util.IoUtil;
+import org.jboss.cx.remoting.util.NamingThreadFactory;
/**
*
*/
-public abstract class AbstractHttpUrlChannel {
+public abstract class AbstractHttpUrlChannel extends AbstractHttpChannel {
private static final Logger log = Logger.getLogger(AbstractHttpUrlChannel.class);
@@ -35,7 +39,8 @@
private int concurrentRequests = 2;
private int connectTimeout = 5000;
- private int readTimeout = 5000;
+ private int readTimeout = 0; // Default to unlimited to support "parking" the connection at the other end
+ private int errorBackoffTime = 5000;
private URL connectUrl;
public int getConcurrentRequests() {
@@ -62,6 +67,14 @@
this.readTimeout = readTimeout;
}
+ public int getErrorBackoffTime() {
+ return errorBackoffTime;
+ }
+
+ public void setErrorBackoffTime(final int errorBackoffTime) {
+ this.errorBackoffTime = errorBackoffTime;
+ }
+
public URL getConnectUrl() {
return connectUrl;
}
@@ -72,17 +85,8 @@
// Dependencies
- private RemotingHttpSessionContext sessionContext;
private Executor executor;
- public RemotingHttpSessionContext getSessionContext() {
- return sessionContext;
- }
-
- public void setSessionContext(final RemotingHttpSessionContext sessionContext) {
- this.sessionContext = sessionContext;
- }
-
public Executor getExecutor() {
return executor;
}
@@ -94,47 +98,73 @@
// Lifecycle
private ExecutorService executorService;
+ private Future[] futures;
public void create() {
+ super.create();
if (executor == null) {
- executor = executorService = Executors.newFixedThreadPool(concurrentRequests);
+ executor = executorService = Executors.newFixedThreadPool(concurrentRequests, new NamingThreadFactory(Executors.defaultThreadFactory(), "Remoting HTTP client %s"));
}
if (connectUrl == null) {
throw new NullPointerException("connectUrl is null");
}
- if (sessionContext == null) {
- throw new NullPointerException("sessionContext is null");
- }
}
public void start() {
-
+ final Future[] futures = new Future[concurrentRequests];
+ for (int i = 0; i < futures.length; i++) {
+ final FutureTask task = new FutureTask<Void>(null) {
+ public void run() {
+ while (! isCancelled()) try {
+ handleRequest();
+ } catch (Throwable t) {
+ log.trace(t, "Request hander failed");
+ }
+ }
+ };
+ executor.execute(task);
+ futures[i] = task;
+ }
+ this.futures = futures;
}
public void stop() {
-
+ if (futures != null) {
+ final Future[] futures = this.futures;
+ this.futures = null;
+ for (Future future : futures) try {
+ future.cancel(true);
+ } catch (Throwable t) {
+ log.trace(t, "Error cancelling task");
+ }
+ }
}
public void destroy() {
try {
-
+ super.destroy();
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
executor = executorService = null;
- sessionContext = null;
}
// Interface
- protected void handleRequest(final URL connectUrl) {
- final RemotingHttpSessionContext sessionContext = getSessionContext();
- final OutgoingHttpMessage message = sessionContext.getOutgoingHttpMessage();
+ protected void handleRequest() {
+ final URL connectUrl = getConnectUrl();
+ final RemotingHttpChannelContext channelContext = getChannelContext();
+ final int localParkTime = getLocalParkTime();
+ final int remoteParkTime = getRemoteParkTime();
+ final OutgoingHttpMessage message = channelContext.waitForOutgoingHttpMessage(localParkTime);
try {
final HttpURLConnection httpConnection = intializeConnection(connectUrl);
try {
+ if (remoteParkTime >= 0) {
+ httpConnection.addRequestProperty("Park-Timeout", Integer.toString(remoteParkTime));
+ }
httpConnection.connect();
final OutputStream outputStream = httpConnection.getOutputStream();
try {
@@ -151,7 +181,7 @@
}
final InputStream inputStream = httpConnection.getInputStream();
try {
- sessionContext.processInboundMessage(new AbstractIncomingHttpMessage() {
+ channelContext.processInboundMessage(new AbstractIncomingHttpMessage() {
public ByteMessageInput getMessageData() throws IOException {
return new InputStreamByteMessageInput(inputStream, -1);
}
@@ -178,7 +208,16 @@
} catch (IOException e2) {
log.trace(e2, "Error consuming the error stream from remote URL '%s'", connectUrl);
}
- // todo - need a backoff timer to prevent a storm of HTTP errors. Or perhaps the session should be torn down.
+ final int time = errorBackoffTime;
+ if (time > 0) {
+ try {
+ log.debug("HTTP error occurred; backing off for %d milliseconds", Integer.valueOf(time));
+ Thread.sleep(time);
+ } catch (InterruptedException e1) {
+ log.trace("Thread interrupted while waiting for error backoff time to expire");
+ Thread.currentThread().interrupt();
+ }
+ }
}
} catch (IOException e) {
log.trace(e, "Error establishing connection");
More information about the jboss-remoting-commits
mailing list