Author: richard.opalka(a)jboss.com
Date: 2007-12-07 05:55:36 -0500 (Fri, 07 Dec 2007)
New Revision: 5227
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageAssembler.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMSender.java
Removed:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelResponse.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMarshaller.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessage.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageFactory.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMetadata.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMUnMarshaller.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMUnassignedMessageListener.java
Log:
beautification refactoring - so architecture documentation will be consistent with class
names
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -21,26 +21,18 @@
*/
package org.jboss.ws.extensions.wsrm.transport;
-import static org.jboss.ws.extensions.wsrm.RMConstant.*;
-
-import org.jboss.remoting.marshal.Marshaller;
-import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.ws.core.MessageAbstraction;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.ByteArrayOutputStream;
-
-import java.util.Map;
-
/**
- * RM Channel
+ * RM Channel implements reliable transport
+ *
* @author richard.opalka(a)jboss.com
*/
public class RMChannel
{
- private static final RMChannel INSTANCE = new RMChannel();
+ private static final RMChannel instance = new RMChannel();
+
private RMChannel()
{
// forbidden inheritance
@@ -48,55 +40,14 @@
public static RMChannel getInstance()
{
- return INSTANCE;
+ return instance;
}
- // Holds the list of tasks that will be send to the remoting transport channel
- private static final RMChannelManager rmChannelManager =
RMChannelManagerImpl.getInstance();
-
-
- private RMMessage createRMMessage(MessageAbstraction request, RMMetadata rmMetadata)
throws Throwable
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Marshaller marshaller =
(Marshaller)rmMetadata.getContext(SERIALIZATION_CONTEXT).get(MARSHALLER);
- // we have to serialize message before putting it to the rm pool
- // * contextClassloader not serializable issue
- // * DOMUtil threadlocal issue (if message is de/serialized in separate thread)
- marshaller.write(request, baos);
- RMMessage rmMessage = RMMessageFactory.newMessage(baos.toByteArray(), rmMetadata);
- return rmMessage;
- }
-
- private MessageAbstraction createResponse(RMMessage rmRequest, RMMessage rmResponse,
RMMetadata rmMetadata) throws Throwable
- {
- Map<String, Object> invocationContext =
rmMetadata.getContext(INVOCATION_CONTEXT);
- boolean oneWay = RMTransportHelper.isOneWayOperation(rmRequest);
- MessageAbstraction response = null;
- if (!oneWay)
- {
- byte[] payload = rmResponse.getPayload();
- InputStream is = payload == null ? null : new
ByteArrayInputStream(rmResponse.getPayload());
- // we have to deserialize message after pick up from the rm pool
- // * contextClassloader not serializable issue
- // * DOMUtil threadlocal issue (if message is de/serialized in separate
thread)
- UnMarshaller unmarshaller =
(UnMarshaller)rmMetadata.getContext(SERIALIZATION_CONTEXT).get(UNMARSHALLER);
- response = (MessageAbstraction)unmarshaller.read(is,
rmResponse.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT));
- }
- invocationContext.clear();
- invocationContext.putAll(rmMetadata.getContext(REMOTING_INVOCATION_CONTEXT));
- return response;
- }
-
public MessageAbstraction send(MessageAbstraction request, RMMetadata rmMetadata)
throws Throwable
{
- RMMessage rmRequest = createRMMessage(request, rmMetadata);
- RMMessage rmResponse = sendToChannel(rmRequest);
- return createResponse(rmRequest, rmResponse, rmMetadata);
+ RMMessage rmRequest = RMMessageAssembler.convertMessageToRMSource(request,
rmMetadata);
+ RMMessage rmResponse = RMSender.getInstance().send(rmRequest);
+ return RMMessageAssembler.convertRMSourceToMessage(rmRequest, rmResponse,
rmMetadata);
}
- private RMMessage sendToChannel(RMMessage request) throws Throwable
- {
- return rmChannelManager.send(request);
- }
-
}
Deleted:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -1,41 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.ws.extensions.wsrm.transport;
-
-/**
- * WS-RM channel manager interface
- *
- * @author richard.opalka(a)jboss.com
- *
- * @since Dec 5, 2007
- */
-public interface RMChannelManager
-{
-
- /**
- * Send request to server reliably
- * @param request RM request wrapper
- * @return RM response wrapper
- */
- RMMessage send(RMMessage request) throws Throwable;
-
-}
Deleted:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -1,175 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.ws.extensions.wsrm.transport;
-
-import static org.jboss.ws.extensions.wsrm.RMConstant.REMOTING_INVOCATION_CONTEXT;
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.servlet.http.HttpServletResponse;
-
-import org.jboss.logging.Logger;
-import org.jboss.remoting.transport.http.HTTPMetadataConstants;
-import org.jboss.ws.extensions.wsrm.api.RMException;
-import org.jboss.ws.extensions.wsrm.config.RMMessageRetransmissionConfig;
-
-/**
- * WS-RM channel manager ensures message reliable delivery according to sequence
retransmission configuration
- *
- * @author richard.opalka(a)jboss.com
- *
- * @since Dec 5, 2007
- */
-public final class RMChannelManagerImpl implements RMChannelManager
-{
-
- private static final Logger logger = Logger.getLogger(RMChannelManagerImpl.class);
- private static RMChannelManager instance = new RMChannelManagerImpl();
- private static final ExecutorService rmChannelPool = Executors.newFixedThreadPool(5,
new RMThreadFactory());
-
- private static final class RMThreadFactory implements ThreadFactory
- {
- final ThreadGroup group;
- final AtomicInteger threadNumber = new AtomicInteger(1);
- final String namePrefix = "rm-pool-thread-";
-
- private RMThreadFactory()
- {
- SecurityManager sm = System.getSecurityManager();
- group = (sm != null) ? sm.getThreadGroup() :
Thread.currentThread().getThreadGroup();
- }
-
- public Thread newThread(Runnable r)
- {
- Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),
0);
- if (t.isDaemon())
- t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- }
-
- private RMChannelManagerImpl()
- {
- // forbidden inheritance
- }
-
- public static final RMChannelManager getInstance()
- {
- return instance;
- }
-
- public final RMMessage send(RMMessage request) throws Throwable
- {
- RMMessageRetransmissionConfig qos =
RMTransportHelper.getSequence(request).getRMConfig().getMessageRetransmission();
- if (qos == null)
- throw new RMException("User must specify message retransmission
configuration in JAX-WS WS-RM config");
-
- int countOfAttempts = qos.getCountOfAttempts();
- int inactivityTimeout = qos.getMessageTimeout();
- int retransmissionInterval = qos.getRetransmissionInterval();
- RMChannelResponse result = null;
- long startTime = 0L;
- long endTime = 0L;
- int attemptNumber = 1;
-
- for (int i = 0; i < countOfAttempts; i++)
- {
- logger.debug("Sending RM request - attempt no. " + attemptNumber++);
- Future<RMChannelResponse> futureResult = rmChannelPool.submit(new
RMChannelTask(request));
- try
- {
- startTime = System.currentTimeMillis();
- result = futureResult.get(inactivityTimeout, TimeUnit.SECONDS);
- if (result != null)
- {
- Throwable t = result.getFault();
- if (t != null)
- {
- logger.warn(result.getFault().getClass().getName(),
result.getFault());
- }
- else
- {
- endTime = System.currentTimeMillis();
- if (result.getResponse() != null)
- {
- Map<String, Object> remotingCtx =
result.getResponse().getMetadata().getContext(REMOTING_INVOCATION_CONTEXT);
- if (remotingCtx != null)
- {
- if
(Integer.valueOf(HttpServletResponse.SC_INTERNAL_SERVER_ERROR).equals(remotingCtx.get(HTTPMetadataConstants.RESPONSE_CODE)))
- {
- logger.debug("Response message received in " +
(endTime - startTime) + " miliseconds, but contains internal server code, going to
resend the request message");
- continue;
- }
- }
- }
- logger.debug("Response message received in " + (endTime -
startTime) + " miliseconds");
- break;
- }
- try
- {
- Thread.sleep(retransmissionInterval * 1000);
- }
- catch (InterruptedException ie)
- {
- logger.warn(ie.getMessage(), ie);
- }
- }
- }
- catch (TimeoutException te)
- {
- endTime = System.currentTimeMillis();
- logger.warn("Timeout - response message not received in " +
(endTime - startTime) + " miliseconds");
- try
- {
- Thread.sleep(retransmissionInterval * 1000);
- }
- catch (InterruptedException ie)
- {
- logger.warn(ie.getMessage(), ie);
- }
- }
- }
-
- if (result == null)
- throw new RMException("Unable to deliver message with addressing id: "
+ RMTransportHelper.getMessageId(request) + ". Count of attempts to deliver the
message was: " + countOfAttempts);
-
- Throwable fault = result.getFault();
- if (fault != null)
- {
- throw new RMException("Unable to deliver message with addressing id: "
+ RMTransportHelper.getMessageId(request) + ". Count of attempts to deliver the
message was: " + countOfAttempts, fault);
- }
- else
- {
- return result.getResponse();
- }
- }
-
-}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelResponse.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelResponse.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelResponse.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -24,8 +24,8 @@
import org.jboss.ws.extensions.wsrm.transport.backchannel.RMCallbackHandler;
/**
- * Represents response that goes from the RM channel
- * @see org.jboss.ws.extensions.wsrm.transport.RMChannel
+ * RM channel response represents response that goes from the RM channel
+ *
* @author richard.opalka(a)jboss.com
*/
final class RMChannelResponse
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -38,11 +38,11 @@
import org.jboss.ws.extensions.wsrm.transport.backchannel.RMCallbackHandlerFactory;
/**
- * Represents request that goes to the RM channel
- * @see org.jboss.ws.extensions.wsrm.transport.RMChannel
+ * RM channel task to be executed
+ *
* @author richard.opalka(a)jboss.com
*/
-public final class RMChannelTask implements Callable<RMChannelResponse>
+final class RMChannelTask implements Callable<RMChannelResponse>
{
private static final Logger logger = Logger.getLogger(RMChannelTask.class);
private static final String JBOSSWS_SUBSYSTEM = "jbossws";
@@ -69,7 +69,7 @@
try
{
URI backPort = RMTransportHelper.getBackPortURI(rmRequest);
- String messageId = RMTransportHelper.getMessageId(rmRequest);
+ String messageId = RMTransportHelper.getAddressingMessageId(rmRequest);
logger.debug("[WS-RM] backport URI is: " + backPort);
RMCallbackHandler callbackHandler = null;
@@ -95,8 +95,6 @@
client.setUnMarshaller(RMUnMarshaller.getInstance());
Map<String, Object> remotingInvocationContext =
rmRequest.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT);
- if (logger.isDebugEnabled())
- logger.debug("Remoting metadata: " + remotingInvocationContext);
// debug the outgoing request message
MessageTrace.traceMessage("Outgoing RM Request Message",
rmRequest.getPayload());
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMarshaller.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMarshaller.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMarshaller.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -30,11 +30,12 @@
/**
* Marshalls byte array to the output stream
+ *
* @author richard.opalka(a)jboss.com
*/
final class RMMarshaller implements Marshaller
{
- private static final Marshaller INSTANCE = new RMMarshaller();
+ private static final Marshaller instance = new RMMarshaller();
public Marshaller cloneMarshaller() throws CloneNotSupportedException
{
@@ -43,7 +44,7 @@
public static Marshaller getInstance()
{
- return INSTANCE;
+ return instance;
}
public void write(Object dataObject, OutputStream output) throws IOException
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessage.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessage.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessage.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -1,5 +1,10 @@
package org.jboss.ws.extensions.wsrm.transport;
+/**
+ * Represents RM source
+ *
+ * @author richard.opalka(a)jboss.com
+ */
public interface RMMessage
{
byte[] getPayload();
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageAssembler.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageAssembler.java
(rev 0)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageAssembler.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm.transport;
+
+import static org.jboss.ws.extensions.wsrm.RMConstant.INVOCATION_CONTEXT;
+import static org.jboss.ws.extensions.wsrm.RMConstant.MARSHALLER;
+import static org.jboss.ws.extensions.wsrm.RMConstant.REMOTING_INVOCATION_CONTEXT;
+import static org.jboss.ws.extensions.wsrm.RMConstant.SERIALIZATION_CONTEXT;
+import static org.jboss.ws.extensions.wsrm.RMConstant.UNMARSHALLER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.jboss.remoting.marshal.Marshaller;
+import org.jboss.remoting.marshal.UnMarshaller;
+import org.jboss.ws.core.MessageAbstraction;
+
+/**
+ * Translates JBoss messages to RM sources and vice-versa.
+ * It's necessary to translate JBoss messages to/from RM
+ * sources before submiting them to the RMSender.
+ *
+ * <p>
+ * This is because of the following reasons:
+ * <ul>
+ * <li>contextClassloader not serializable issue</li>
+ * <li>DOMUtil threadlocal issue (if message is de/serialized in separate
thread)</li>
+ * </ul>
+ * </p>
+ *
+ * @author richard.opalka(a)jboss.com
+ */
+public final class RMMessageAssembler
+{
+
+ /**
+ * Transforms JBoss request message to the RM request message
+ * This transformation is done before submiting request to the RMSender.
+ */
+ public static RMMessage convertMessageToRMSource(MessageAbstraction request,
RMMetadata rmMetadata) throws Throwable
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Marshaller marshaller =
(Marshaller)rmMetadata.getContext(SERIALIZATION_CONTEXT).get(MARSHALLER);
+ marshaller.write(request, baos);
+ RMMessage rmMessage = RMMessageFactory.newMessage(baos.toByteArray(), rmMetadata);
+ return rmMessage;
+ }
+
+ /**
+ * Transforms RM response message to the JBoss response message.
+ * This transformation is done after RMSender have finished his job.
+ */
+ public static MessageAbstraction convertRMSourceToMessage(RMMessage rmRequest,
RMMessage rmResponse, RMMetadata rmMetadata) throws Throwable
+ {
+ boolean oneWay = RMTransportHelper.isOneWayOperation(rmRequest);
+ MessageAbstraction response = null;
+ if (false == oneWay)
+ {
+ byte[] payload = rmResponse.getPayload();
+ InputStream in = (payload == null) ? null : new
ByteArrayInputStream(rmResponse.getPayload());
+ UnMarshaller unmarshaller =
(UnMarshaller)rmMetadata.getContext(SERIALIZATION_CONTEXT).get(UNMARSHALLER);
+ response = (MessageAbstraction)unmarshaller.read(in,
rmResponse.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT));
+ }
+ Map<String, Object> invocationContext =
rmMetadata.getContext(INVOCATION_CONTEXT);
+ invocationContext.clear();
+ invocationContext.putAll(rmMetadata.getContext(REMOTING_INVOCATION_CONTEXT));
+ return response;
+ }
+
+}
Property changes on:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageAssembler.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageFactory.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageFactory.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageFactory.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -1,7 +1,13 @@
package org.jboss.ws.extensions.wsrm.transport;
+/**
+ * Constructs RM message instances
+ *
+ * @author richard.opalka(a)jboss.com
+ */
public class RMMessageFactory
{
+
private RMMessageFactory()
{
// forbidden inheritance
@@ -11,4 +17,5 @@
{
return new RMMessageImpl(payload, rmMetadata);
}
+
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageImpl.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMessageImpl.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -1,11 +1,13 @@
package org.jboss.ws.extensions.wsrm.transport;
/**
- * RM message object
+ * RM source instance
+ *
* @author richard.opalka(a)jboss.com
*/
public class RMMessageImpl implements RMMessage
{
+
private final byte[] payload;
private final RMMetadata rmMetadata;
@@ -25,4 +27,5 @@
{
return this.rmMetadata;
}
+
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMetadata.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMetadata.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMetadata.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -6,6 +6,11 @@
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.ws.extensions.wsrm.RMConstant;
+/**
+ * RM metadata heavily used by this RM transport
+ *
+ * @author richard.opalka(a)jboss.com
+ */
public final class RMMetadata
{
private Map<String, Map<String, Object>> contexts = new HashMap<String,
Map<String, Object>>();
@@ -60,4 +65,5 @@
{
return this.contexts.get(key);
}
+
}
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMSender.java
===================================================================
--- stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMSender.java
(rev 0)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMSender.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -0,0 +1,178 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm.transport;
+
+import static org.jboss.ws.extensions.wsrm.RMConstant.REMOTING_INVOCATION_CONTEXT;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.jboss.logging.Logger;
+import org.jboss.remoting.transport.http.HTTPMetadataConstants;
+import org.jboss.ws.extensions.wsrm.api.RMException;
+import org.jboss.ws.extensions.wsrm.config.RMMessageRetransmissionConfig;
+
+/**
+ * RM sender ensures reliable message delivery. The QoS is specified in the JAX-WS client
Sconfiguration file.
+ *
+ * @author richard.opalka(a)jboss.com
+ */
+public final class RMSender
+{
+
+ private static final Logger logger = Logger.getLogger(RMSender.class);
+ private static RMSender instance = new RMSender();
+ private static final ThreadFactory rmThreadPool = new RMThreadPoolFactory();
+ private static final int maxCountOfThreads = 5;
+ private static final ExecutorService rmTasksQueue =
Executors.newFixedThreadPool(maxCountOfThreads, rmThreadPool);
+
+ /**
+ * Generates worker threads (daemons)
+ */
+ private static final class RMThreadPoolFactory implements ThreadFactory
+ {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix = "rm-pool-worker-thread-";
+
+ private RMThreadPoolFactory()
+ {
+ SecurityManager sm = System.getSecurityManager();
+ group = (sm != null) ? sm.getThreadGroup() :
Thread.currentThread().getThreadGroup();
+ }
+
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),
0);
+ if (false == t.isDaemon())
+ t.setDaemon(true);
+ if (Thread.NORM_PRIORITY != t.getPriority())
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ }
+
+ private RMSender()
+ {
+ // forbidden inheritance
+ }
+
+ public static final RMSender getInstance()
+ {
+ return instance;
+ }
+
+ public final RMMessage send(RMMessage request) throws Throwable
+ {
+ RMMessageRetransmissionConfig qos =
RMTransportHelper.getSequence(request).getRMConfig().getMessageRetransmission();
+ if (qos == null)
+ throw new RMException("User must specify message retransmission
configuration in JAX-WS WS-RM config");
+
+ int countOfAttempts = qos.getCountOfAttempts();
+ int inactivityTimeout = qos.getMessageTimeout();
+ int retransmissionInterval = qos.getRetransmissionInterval();
+ RMChannelResponse result = null;
+ long startTime = 0L;
+ long endTime = 0L;
+ int attemptNumber = 1;
+
+ for (int i = 0; i < countOfAttempts; i++)
+ {
+ logger.debug("Sending RM request - attempt no. " + attemptNumber++);
+ Future<RMChannelResponse> futureResult = rmTasksQueue.submit(new
RMChannelTask(request));
+ try
+ {
+ startTime = System.currentTimeMillis();
+ result = futureResult.get(inactivityTimeout, TimeUnit.SECONDS);
+ if (result != null)
+ {
+ Throwable t = result.getFault();
+ if (t != null)
+ {
+ logger.warn(result.getFault().getClass().getName(),
result.getFault());
+ }
+ else
+ {
+ endTime = System.currentTimeMillis();
+ if (result.getResponse() != null)
+ {
+ Map<String, Object> remotingCtx =
result.getResponse().getMetadata().getContext(REMOTING_INVOCATION_CONTEXT);
+ if (remotingCtx != null)
+ {
+ if
(Integer.valueOf(HttpServletResponse.SC_INTERNAL_SERVER_ERROR).equals(remotingCtx.get(HTTPMetadataConstants.RESPONSE_CODE)))
+ {
+ logger.debug("Response message received in " +
(endTime - startTime) + " miliseconds, but contains internal server code, going to
resend the request message");
+ continue;
+ }
+ }
+ }
+ logger.debug("Response message received in " + (endTime -
startTime) + " miliseconds");
+ break;
+ }
+ try
+ {
+ Thread.sleep(retransmissionInterval * 1000);
+ }
+ catch (InterruptedException ie)
+ {
+ logger.warn(ie.getMessage(), ie);
+ }
+ }
+ }
+ catch (TimeoutException te)
+ {
+ endTime = System.currentTimeMillis();
+ logger.warn("Timeout - response message not received in " +
(endTime - startTime) + " miliseconds");
+ try
+ {
+ Thread.sleep(retransmissionInterval * 1000);
+ }
+ catch (InterruptedException ie)
+ {
+ logger.warn(ie.getMessage(), ie);
+ }
+ }
+ }
+
+ if (result == null)
+ throw new RMException("Unable to deliver message with addressing id: "
+ RMTransportHelper.getAddressingMessageId(request) + ". Count of attempts to deliver
the message was: " + countOfAttempts);
+
+ Throwable fault = result.getFault();
+ if (fault != null)
+ {
+ throw new RMException("Unable to deliver message with addressing id: "
+ RMTransportHelper.getAddressingMessageId(request) + ". Count of attempts to deliver
the message was: " + countOfAttempts, fault);
+ }
+ else
+ {
+ return result.getResponse();
+ }
+ }
+
+}
Property changes on:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMSender.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -9,6 +9,11 @@
import org.jboss.ws.extensions.wsrm.RMConstant;
import org.jboss.ws.extensions.wsrm.RMSequenceImpl;
+/**
+ * Utility class heavily used in this transport implementation
+ *
+ * @author richard.opalka(a)jboss.com
+ */
public final class RMTransportHelper
{
@@ -24,7 +29,7 @@
return (ctx != null) && (ctx.containsKey(RMConstant.REQUEST_CONTEXT));
}
- public static String getMessageId(RMMessage rmRequest)
+ public static String getAddressingMessageId(RMMessage rmRequest)
{
return (String)getWsrmRequestContext(rmRequest).get(WSA_MESSAGE_ID);
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMUnMarshaller.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMUnMarshaller.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMUnMarshaller.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -30,11 +30,12 @@
/**
* Unmarshalls byte array from the input stream
+ *
* @author richard.opalka(a)jboss.com
*/
final class RMUnMarshaller implements UnMarshaller
{
- private static final UnMarshaller INSTANCE = new RMUnMarshaller();
+ private static final UnMarshaller instance = new RMUnMarshaller();
public UnMarshaller cloneUnMarshaller() throws CloneNotSupportedException
{
@@ -43,7 +44,7 @@
public static UnMarshaller getInstance()
{
- return INSTANCE;
+ return instance;
}
public Object read(InputStream is, Map metadata) throws IOException,
ClassNotFoundException
@@ -67,5 +68,6 @@
{
// do nothing
}
+
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMUnassignedMessageListener.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMUnassignedMessageListener.java 2007-12-07
10:54:55 UTC (rev 5226)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMUnassignedMessageListener.java 2007-12-07
10:55:36 UTC (rev 5227)
@@ -25,17 +25,13 @@
* Implementations of this interface must be registered in callback handler
*
* @author richard.opalka(a)jboss.com
- *
- * @since Nov 27, 2007
*/
public interface RMUnassignedMessageListener
{
+
/**
- * This event is fired when there's new unassigned message available in
- * callback handler this sequence is associated with. Each implementation
- * of this class should remember the count of this notification method calls
- * and must initiate new dummy message request to pick up all unassigned
- * messages from the callback handler
+ * This event is fired when there's new unassigned message available in callback
handler.
*/
void unassignedMessageReceived();
+
}