Author: richard.opalka(a)jboss.com
Date: 2007-12-05 13:16:22 -0500 (Wed, 05 Dec 2007)
New Revision: 5196
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/emulator/EndpointEmulator.java
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/reqres/RMAbstractReqResTest.java
Log:
implementing resending functionality - first prototype
Modified: stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java 2007-12-05
16:34:09 UTC (rev 5195)
+++
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java 2007-12-05
18:16:22 UTC (rev 5196)
@@ -31,8 +31,6 @@
import java.util.Map;
import java.util.Observable;
import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.activation.DataHandler;
import javax.xml.namespace.QName;
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java 2007-12-05
16:34:09 UTC (rev 5195)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java 2007-12-05
18:16:22 UTC (rev 5196)
@@ -23,10 +23,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.logging.Logger;
+import org.jboss.ws.extensions.wsrm.api.RMException;
/**
* WS-RM channel manager ensures message reliable delivery according to sequence
configuration
@@ -41,6 +45,8 @@
private static final Logger logger = Logger.getLogger(RMChannelManagerImpl.class);
private static RMChannelManager instance = new RMChannelManagerImpl();
private static final ExecutorService rmChannelPool = Executors.newFixedThreadPool(5,
new RMThreadFactory());
+ private static final int countOfAttempts = 10;
+ private static final int timeToWait = 2;
private static final class RMThreadFactory implements ThreadFactory
{
@@ -96,8 +102,35 @@
public final RMMessage send(RMMessage request) throws Throwable
{
- RMChannelResponse result = rmChannelPool.submit(new RMChannelTask(request)).get();
+ // submit new task to channel
+ // if response has no fault - delete saved request and return response to the
client
+ // if response has fault - try to detect it's type
+
+ RMChannelResponse result = null;
+ long startTime = 0L;
+ long endTime = 0L;
+
+ for (int i = 0; i < countOfAttempts; i++)
+ {
+ Future<RMChannelResponse> futureResult = rmChannelPool.submit(new
RMChannelTask(request));
+ try
+ {
+ startTime = System.currentTimeMillis();
+ result = futureResult.get(timeToWait, TimeUnit.SECONDS);
+ endTime = System.currentTimeMillis();
+ logger.debug("Response message received in " + (endTime -
startTime) + " miliseconds");
+ break;
+ }
+ catch (TimeoutException te)
+ {
+ endTime = System.currentTimeMillis();
+ logger.warn("Response message not received in " + (endTime -
startTime) + " miliseconds");
+ }
+ }
+ if (result == null)
+ throw new RMException("Unable to deliver message with addressing id: "
+ RMTransportHelper.getMessageId(request));
+
Throwable fault = result.getFault();
if (fault != null)
{
Modified:
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/emulator/EndpointEmulator.java
===================================================================
---
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/emulator/EndpointEmulator.java 2007-12-05
16:34:09 UTC (rev 5195)
+++
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/emulator/EndpointEmulator.java 2007-12-05
18:16:22 UTC (rev 5196)
@@ -29,8 +29,10 @@
import java.io.PrintWriter;
import java.net.Socket;
import java.net.URL;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
@@ -52,9 +54,11 @@
public class EndpointEmulator extends HttpServlet
{
+ private static final Random generator = new Random();
private String configFile;
private ServletContext ctx;
private List<View> views;
+ private Map<String, Integer> delayedMessages;
@Override
public void init(ServletConfig config) throws ServletException
@@ -89,6 +93,7 @@
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException
{
+ delayedMessages = new HashMap<String, Integer>();
handleRequest(HTTP_GET, req, res);
}
@@ -127,6 +132,37 @@
}
}
+ try
+ {
+ boolean sleep = generator.nextBoolean();
+ if (sleep)
+ {
+ Integer countOfDelayedMessages = this.delayedMessages.get(view.getId());
+ if (countOfDelayedMessages == null)
+ {
+ countOfDelayedMessages = 0;
+ }
+ if (countOfDelayedMessages.intValue() < 5)
+ {
+ Thread.sleep(5 * 1000);
+ ctx.log("serving request after " + 5 + " seconds");
+ this.delayedMessages.put(view.getId(), countOfDelayedMessages.intValue() +
1);
+ }
+ else
+ {
+ ctx.log("serving request immediately");
+ }
+ }
+ else
+ {
+ ctx.log("serving request immediately");
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ ctx.log(ie.getMessage(), ie);
+ }
+
if (responseTo == null)
{
ctx.log("Sending response through ServletResponse");
Modified:
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/reqres/RMAbstractReqResTest.java
===================================================================
---
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/reqres/RMAbstractReqResTest.java 2007-12-05
16:34:09 UTC (rev 5195)
+++
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/reqres/RMAbstractReqResTest.java 2007-12-05
18:16:22 UTC (rev 5196)
@@ -28,7 +28,10 @@
import java.io.IOException;
import java.net.URL;
import java.util.Properties;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.xml.namespace.QName;
@@ -58,6 +61,11 @@
private Exception handlerException;
private boolean asyncHandlerCalled;
private ReqResServiceIface proxy;
+ private static final TimeUnit testTimeUnit = TimeUnit.SECONDS;
+ private static final long testWaitPeriod = 180L;
+ private static final Executor testExecutor = new ThreadPoolExecutor(
+ 0, 5, testWaitPeriod, testTimeUnit, new SynchronousQueue<Runnable>()
+ );
static
{
@@ -91,6 +99,7 @@
QName serviceName = new QName(TARGET_NS, "ReqResService");
URL wsdlURL = new URL(serviceURL + "?wsdl");
Service service = Service.create(wsdlURL, serviceName);
+ service.setExecutor(testExecutor);
proxy = (ReqResServiceIface)service.getPort(ReqResServiceIface.class);
}
}
@@ -129,7 +138,7 @@
{
try
{
- String retStr = (String) response.get(1000, TimeUnit.MILLISECONDS);
+ String retStr = (String) response.get(testWaitPeriod, testTimeUnit);
assertEquals(HELLO_WORLD_MSG, retStr);
asyncHandlerCalled = true;
}
@@ -140,7 +149,7 @@
}
};
Future<?> future = proxy.echoAsync(HELLO_WORLD_MSG, handler);
- future.get(1000, TimeUnit.MILLISECONDS);
+ future.get(testWaitPeriod, testTimeUnit);
ensureAsyncStatus();
}