[jboss-remoting-commits] JBoss Remoting SVN: r4021 - in remoting3/trunk: http/src/main/java/org/jboss/cx/remoting/http/spi and 1 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Fri Apr 18 20:51:04 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-04-18 20:51:04 -0400 (Fri, 18 Apr 2008)
New Revision: 4021

Added:
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/AbstractHttpChannel.java
Removed:
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutboundHttpChannelProvider.java
Modified:
   remoting3/trunk/http-urlconnection/src/main/java/org/jboss/cx/remoting/http/urlconnection/AbstractHttpUrlChannel.java
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/HttpProtocolSupport.java
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/HttpChannel.java
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpChannelContext.java
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpServerContext.java
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpSessionContext.java
Log:
Progress towards http urlconnection client

Added: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/AbstractHttpChannel.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/AbstractHttpChannel.java	                        (rev 0)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/AbstractHttpChannel.java	2008-04-19 00:51:04 UTC (rev 4021)
@@ -0,0 +1,94 @@
+package org.jboss.cx.remoting.http;
+
+import org.jboss.cx.remoting.http.spi.HttpChannel;
+import org.jboss.cx.remoting.http.spi.RemotingHttpChannelContext;
+
+/**
+ *
+ */
+public abstract class AbstractHttpChannel implements HttpChannel {
+
+    protected AbstractHttpChannel() {
+    }
+
+    // Configuration
+
+    private int localParkTime = -1;
+    private int remoteParkTime = -1;
+
+    /**
+     * Get the amount of time that a given channel member may be locally parked.  A value of {@code -1} means "park
+     * indefinitely".  A value of {@code 0} means "do not park".  Otherwise the value is interpreted as milliseconds.
+     *
+     * In the case of an HTTP server, the minimum of this time and the client-requested timeout should be used.
+     *
+     * @return the local park time
+     */
+    public int getLocalParkTime() {
+        return localParkTime;
+    }
+
+    /**
+     * Set the amount of time that a given channel member may be locally parked.  A value of {@code -1} means "park
+     * indefinitely".  A value of {@code 0} means "do not park".  Otherwise the value is interpreted as milliseconds.
+     *
+     * In the case of an HTTP server, the minimum of this time and the client-requested timeout should be used.
+     *
+     * @param localParkTime the local park time
+     */
+    public void setLocalParkTime(final int localParkTime) {
+        this.localParkTime = localParkTime;
+    }
+
+    /**
+     * Get the amount of time that a given channel member may be remotely parked.  A value of {@code -1} means "park
+     * indefinitely".  A value of {@code 0} means "do not park".  Otherwise the value is interpreted as milliseconds.
+     *
+     * @return the remote park time
+     */
+    public int getRemoteParkTime() {
+        return remoteParkTime;
+    }
+
+    /**
+     * Set the amount of time that a given channel member may be remotely parked.  A value of {@code -1} means "park
+     * indefinitely".  A value of {@code 0} means "do not park".  Otherwise the value is interpreted as milliseconds.
+     *
+     * @param remoteParkTime the remote park time
+     */
+    public void setRemoteParkTime(final int remoteParkTime) {
+        this.remoteParkTime = remoteParkTime;
+    }
+
+    // Dependencies
+
+    private RemotingHttpChannelContext channelContext;
+
+    public RemotingHttpChannelContext getChannelContext() {
+        return channelContext;
+    }
+
+    public void setChannelContext(final RemotingHttpChannelContext channelContext) {
+        this.channelContext = channelContext;
+    }
+
+    // Lifecycle
+
+    public void create() {
+        if (channelContext == null) {
+            throw new NullPointerException("channelContext is null");
+        }
+    }
+
+    public void start() {
+
+    }
+
+    public void stop() {
+
+    }
+
+    public void destroy() {
+
+    }
+}

Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/HttpProtocolSupport.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/HttpProtocolSupport.java	2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/HttpProtocolSupport.java	2008-04-19 00:51:04 UTC (rev 4021)
@@ -99,7 +99,7 @@
 
     private final ConcurrentMap<String, RemotingHttpSession> sessionMap = CollectionUtil.concurrentWeakValueMap();
 
-    private String generateSessionId() {
+    public String generateSessionId() {
         final byte[] bytes = new byte[32];
         StringBuilder builder = new StringBuilder(bytes.length * 2);
         random.nextBytes(bytes);

Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/HttpChannel.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/HttpChannel.java	2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/HttpChannel.java	2008-04-19 00:51:04 UTC (rev 4021)
@@ -4,9 +4,4 @@
  *
  */
 public interface HttpChannel {
-    void setChannelContext(RemotingHttpChannelContext channelContext);
-
-    void sendMessage(OutgoingHttpMessage message);
-
-    void close();
 }

Deleted: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutboundHttpChannelProvider.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutboundHttpChannelProvider.java	2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/OutboundHttpChannelProvider.java	2008-04-19 00:51:04 UTC (rev 4021)
@@ -1,8 +0,0 @@
-package org.jboss.cx.remoting.http.spi;
-
-/**
- *
- */
-public interface OutboundHttpChannelProvider {
-
-}

Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpChannelContext.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpChannelContext.java	2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpChannelContext.java	2008-04-19 00:51:04 UTC (rev 4021)
@@ -4,8 +4,20 @@
  *
  */
 public interface RemotingHttpChannelContext {
+    /**
+     * Process an HTTP message that has arrived.
+     *
+     * @param incomingHttpMessage the HTTP message
+     */
+    void processInboundMessage(IncomingHttpMessage incomingHttpMessage);
 
-    void receiveMessage(IncomingHttpMessage incomingHttpMessage);
-
-    void sendComplete(OutgoingHttpMessage outgoingHttpMessage);
+    /**
+     * Wait for an outgoing HTTP message to become available, up to a certain time limit.  If no message is available
+     * within the specified time limit, or if the thread is interrupted before a message could become available, return
+     * an empty message.
+     *
+     * @param millis the amount of time to wait in millseconds, {@code 0} to not wait, or {@code -1} to wait indefinitely.
+     * @return an outgoing HTTP message
+     */
+    OutgoingHttpMessage waitForOutgoingHttpMessage(int millis);
 }

Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpServerContext.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpServerContext.java	2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpServerContext.java	2008-04-19 00:51:04 UTC (rev 4021)
@@ -4,5 +4,5 @@
  *
  */
 public interface RemotingHttpServerContext {
-    RemotingHttpSessionContext locateSession(IncomingHttpMessage message);
+    RemotingHttpSessionContext locateSession(String remotingSessionId);
 }

Modified: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpSessionContext.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpSessionContext.java	2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/spi/RemotingHttpSessionContext.java	2008-04-19 00:51:04 UTC (rev 4021)
@@ -4,9 +4,11 @@
  *
  */
 public interface RemotingHttpSessionContext {
-    void processInboundMessage(IncomingHttpMessage incomingHttpMessage);
 
-    OutgoingHttpMessage getOutgoingHttpMessage();
-
-    OutgoingHttpMessage waitForOutgoingHttpMessage(long millis);
+    /**
+     * Get a channel context that can be used to transport HTTP messages for this session.
+     *
+     * @return the channel context
+     */
+    RemotingHttpChannelContext getChannelContext();
 }

Modified: remoting3/trunk/http-urlconnection/src/main/java/org/jboss/cx/remoting/http/urlconnection/AbstractHttpUrlChannel.java
===================================================================
--- remoting3/trunk/http-urlconnection/src/main/java/org/jboss/cx/remoting/http/urlconnection/AbstractHttpUrlChannel.java	2008-04-18 23:29:39 UTC (rev 4020)
+++ remoting3/trunk/http-urlconnection/src/main/java/org/jboss/cx/remoting/http/urlconnection/AbstractHttpUrlChannel.java	2008-04-19 00:51:04 UTC (rev 4021)
@@ -9,20 +9,24 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Future;
+import org.jboss.cx.remoting.http.AbstractHttpChannel;
 import org.jboss.cx.remoting.http.cookie.CookieClientSession;
 import org.jboss.cx.remoting.http.spi.AbstractIncomingHttpMessage;
 import org.jboss.cx.remoting.http.spi.OutgoingHttpMessage;
-import org.jboss.cx.remoting.http.spi.RemotingHttpSessionContext;
+import org.jboss.cx.remoting.http.spi.RemotingHttpChannelContext;
 import org.jboss.cx.remoting.log.Logger;
 import org.jboss.cx.remoting.util.AbstractOutputStreamByteMessageOutput;
 import org.jboss.cx.remoting.util.ByteMessageInput;
 import org.jboss.cx.remoting.util.InputStreamByteMessageInput;
 import org.jboss.cx.remoting.util.IoUtil;
+import org.jboss.cx.remoting.util.NamingThreadFactory;
 
 /**
  *
  */
-public abstract class AbstractHttpUrlChannel {
+public abstract class AbstractHttpUrlChannel extends AbstractHttpChannel {
 
     private static final Logger log = Logger.getLogger(AbstractHttpUrlChannel.class);
 
@@ -35,7 +39,8 @@
 
     private int concurrentRequests = 2;
     private int connectTimeout = 5000;
-    private int readTimeout = 5000;
+    private int readTimeout = 0;  // Default to unlimited to support "parking" the connection at the other end
+    private int errorBackoffTime = 5000;
     private URL connectUrl;
 
     public int getConcurrentRequests() {
@@ -62,6 +67,14 @@
         this.readTimeout = readTimeout;
     }
 
+    public int getErrorBackoffTime() {
+        return errorBackoffTime;
+    }
+
+    public void setErrorBackoffTime(final int errorBackoffTime) {
+        this.errorBackoffTime = errorBackoffTime;
+    }
+
     public URL getConnectUrl() {
         return connectUrl;
     }
@@ -72,17 +85,8 @@
 
     // Dependencies
 
-    private RemotingHttpSessionContext sessionContext;
     private Executor executor;
 
-    public RemotingHttpSessionContext getSessionContext() {
-        return sessionContext;
-    }
-
-    public void setSessionContext(final RemotingHttpSessionContext sessionContext) {
-        this.sessionContext = sessionContext;
-    }
-
     public Executor getExecutor() {
         return executor;
     }
@@ -94,47 +98,73 @@
     // Lifecycle
 
     private ExecutorService executorService;
+    private Future[] futures;
 
     public void create() {
+        super.create();
         if (executor == null) {
-            executor = executorService = Executors.newFixedThreadPool(concurrentRequests);
+            executor = executorService = Executors.newFixedThreadPool(concurrentRequests, new NamingThreadFactory(Executors.defaultThreadFactory(), "Remoting HTTP client %s"));
         }
         if (connectUrl == null) {
             throw new NullPointerException("connectUrl is null");
         }
-        if (sessionContext == null) {
-            throw new NullPointerException("sessionContext is null");
-        }
     }
 
     public void start() {
-
+        final Future[] futures = new Future[concurrentRequests];
+        for (int i = 0; i < futures.length; i++) {
+            final FutureTask task = new FutureTask<Void>(null) {
+                public void run() {
+                    while (! isCancelled()) try {
+                        handleRequest();
+                    } catch (Throwable t) {
+                        log.trace(t, "Request hander failed");
+                    }
+                }
+            };
+            executor.execute(task);
+            futures[i] = task;
+        }
+        this.futures = futures;
     }
 
     public void stop() {
-
+        if (futures != null) {
+            final Future[] futures = this.futures;
+            this.futures = null;
+            for (Future future : futures) try {
+                future.cancel(true);
+            } catch (Throwable t) {
+                log.trace(t, "Error cancelling task");
+            }
+        }
     }
 
     public void destroy() {
         try {
-
+            super.destroy();
         } finally {
             if (executorService != null) {
                 executorService.shutdown();
             }
         }
         executor = executorService = null;
-        sessionContext = null;
     }
 
     // Interface
 
-    protected void handleRequest(final URL connectUrl) {
-        final RemotingHttpSessionContext sessionContext = getSessionContext();
-        final OutgoingHttpMessage message = sessionContext.getOutgoingHttpMessage();
+    protected void handleRequest() {
+        final URL connectUrl = getConnectUrl();
+        final RemotingHttpChannelContext channelContext = getChannelContext();
+        final int localParkTime = getLocalParkTime();
+        final int remoteParkTime = getRemoteParkTime();
+        final OutgoingHttpMessage message = channelContext.waitForOutgoingHttpMessage(localParkTime);
         try {
             final HttpURLConnection httpConnection = intializeConnection(connectUrl);
             try {
+                if (remoteParkTime >= 0) {
+                    httpConnection.addRequestProperty("Park-Timeout", Integer.toString(remoteParkTime));
+                }
                 httpConnection.connect();
                 final OutputStream outputStream = httpConnection.getOutputStream();
                 try {
@@ -151,7 +181,7 @@
                     }
                     final InputStream inputStream = httpConnection.getInputStream();
                     try {
-                        sessionContext.processInboundMessage(new AbstractIncomingHttpMessage() {
+                        channelContext.processInboundMessage(new AbstractIncomingHttpMessage() {
                             public ByteMessageInput getMessageData() throws IOException {
                                 return new InputStreamByteMessageInput(inputStream, -1);
                             }
@@ -178,7 +208,16 @@
                 } catch (IOException e2) {
                     log.trace(e2, "Error consuming the error stream from remote URL '%s'", connectUrl);
                 }
-                // todo - need a backoff timer to prevent a storm of HTTP errors.  Or perhaps the session should be torn down.
+                final int time = errorBackoffTime;
+                if (time > 0) {
+                    try {
+                        log.debug("HTTP error occurred; backing off for %d milliseconds", Integer.valueOf(time));
+                        Thread.sleep(time);
+                    } catch (InterruptedException e1) {
+                        log.trace("Thread interrupted while waiting for error backoff time to expire");
+                        Thread.currentThread().interrupt();
+                    }
+                }
             }
         } catch (IOException e) {
             log.trace(e, "Error establishing connection");




More information about the jboss-remoting-commits mailing list