Author: richard.opalka(a)jboss.com
Date: 2007-08-29 09:15:16 -0400 (Wed, 29 Aug 2007)
New Revision: 4503
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMHelper.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessage.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageFactory.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageImpl.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMetadata.java
Log:
adding RM stuff
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java 2007-08-29
13:15:16 UTC (rev 4503)
@@ -0,0 +1,297 @@
+package org.jboss.ws.extensions.wsrm;
+
+import static org.jboss.ws.extensions.wsrm.RMConstant.*;
+
+import org.jboss.remoting.Client;
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.invocation.OnewayInvocation;
+import org.jboss.remoting.marshal.Marshaller;
+import org.jboss.remoting.marshal.UnMarshaller;
+import org.jboss.ws.core.HTTPMessageImpl;
+import org.jboss.ws.core.MessageAbstraction;
+import org.jboss.ws.core.MessageTrace;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * RM Channel Singleton
+ * @author richard.opalka(a)jboss.com
+ */
+public class RMChannel
+{
+ private static final RMChannel INSTANCE = new RMChannel();
+
+ private RMChannel()
+ {
+ super();
+ // forbidden inheritance
+ }
+
+ public static RMChannel getInstance()
+ {
+ return INSTANCE;
+ }
+
+ private static final ExecutorService rmChannelPool = Executors.newFixedThreadPool(5,
new RMThreadFactory());
+
+ private static final class RMThreadFactory implements ThreadFactory
+ {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix = "rm-channel-pool-thread-";
+
+ private RMThreadFactory()
+ {
+ SecurityManager sm = System.getSecurityManager();
+ group = (sm != null) ? sm.getThreadGroup() :
Thread.currentThread().getThreadGroup();
+ }
+
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),
0);
+ if (t.isDaemon())
+ t.setDaemon(false);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ }
+
+ private static final class RMMarshaller implements Marshaller
+ {
+ private static final Marshaller INSTANCE = new RMMarshaller();
+
+ public Marshaller cloneMarshaller() throws CloneNotSupportedException
+ {
+ return getInstance();
+ }
+
+ public static Marshaller getInstance()
+ {
+ return INSTANCE;
+ }
+
+ public void write(Object dataObject, OutputStream output) throws IOException
+ {
+ if (dataObject instanceof InvocationRequest)
+ dataObject = ((InvocationRequest)dataObject).getParameter();
+
+ if (dataObject instanceof OnewayInvocation)
+ dataObject = ((OnewayInvocation)dataObject).getParameters()[0];
+
+ if ((dataObject instanceof byte[]) == false)
+ throw new IllegalArgumentException("Not a byte array: " +
dataObject);
+
+ output.write((byte[])dataObject);
+ output.flush();
+ }
+ }
+
+ private static final class RMUnMarshaller implements UnMarshaller
+ {
+ private static final UnMarshaller INSTANCE = new RMUnMarshaller();
+
+ public UnMarshaller cloneUnMarshaller() throws CloneNotSupportedException
+ {
+ return getInstance();
+ }
+
+ public static UnMarshaller getInstance()
+ {
+ return INSTANCE;
+ }
+
+ public Object read(InputStream inputStream, Map metadata) throws IOException,
ClassNotFoundException
+ {
+ if (inputStream == null)
+ return RMMessageFactory.newMessage(null, new RMMetadata(metadata)); //
WSAddressing reply-to
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int count = -1;
+ count = inputStream.read(buffer);
+ while (count != -1)
+ {
+ baos.write(buffer, 0, count);
+ count = inputStream.read(buffer);
+ }
+ return RMMessageFactory.newMessage(baos.toByteArray(), new
RMMetadata(metadata));
+ }
+
+ public void setClassLoader(ClassLoader classloader)
+ {
+ // do nothing
+ }
+ }
+
+ private static final class RMChannelResponse
+ {
+ private final Throwable fault;
+ private final RMMessage result;
+
+ public RMChannelResponse(Throwable fault)
+ {
+ this(null, fault);
+ }
+
+ public RMChannelResponse(RMMessage result)
+ {
+ this(result, null);
+ }
+
+ private RMChannelResponse(RMMessage result, Throwable fault)
+ {
+ super();
+ this.result = result;
+ this.fault = fault;
+ }
+
+ public Throwable getFault()
+ {
+ return this.fault;
+ }
+
+ public RMMessage getResponse()
+ {
+ return this.result;
+ }
+ }
+
+ private static final class RMChannelRequest implements
Callable<RMChannelResponse>
+ {
+ private final RMMessage rmRequest;
+ private static final String JBOSSWS_SUBSYSTEM = "jbossws";
+
+ private RMChannelRequest(RMMessage rmRequest)
+ {
+ super();
+ this.rmRequest = rmRequest;
+ }
+
+ public RMChannelResponse call()
+ {
+ InvokerLocator locator = null;
+ try
+ {
+ locator = new
InvokerLocator((String)rmRequest.getMetadata().getContext(INVOCATION_CONTEXT).get(TARGET_ADDRESS));
+ }
+ catch (MalformedURLException e)
+ {
+ return new RMChannelResponse(new IllegalArgumentException("Malformed
endpoint address", e));
+ }
+
+ try
+ {
+ Client client = new Client(locator, JBOSSWS_SUBSYSTEM,
rmRequest.getMetadata().getContext(REMOTING_CONFIGURATION_CONTEXT));
+ client.connect();
+
+ client.setMarshaller(RMMarshaller.getInstance());
+
+ boolean oneWay =
(Boolean)rmRequest.getMetadata().getContext(RMConstant.INVOCATION_CONTEXT).get(ONE_WAY_OPERATION);
+ if (!oneWay)
+ client.setUnMarshaller(RMUnMarshaller.getInstance());
+
+ //if (log.isDebugEnabled())
+ // log.debug("Remoting metadata: " + rmRequest.getMetadata());
+ //System.out.println("Remoting metadata: " +
rmRequest.getMetadata());
+
+ // debug the outgoing message
+ //MessageTrace.traceMessage("Outgoing Request Message",
reqMessage);
+ //System.out.println("Outgoing Request Message" + new
String(rmRequest.getPayload()));
+
+ RMMessage resMessage = null;
+ Map<String, Object> remotingInvocationContext =
rmRequest.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT);
+ if (oneWay)
+ {
+ client.invokeOneway(rmRequest.getPayload(), remotingInvocationContext,
false);
+ }
+ else
+ {
+ resMessage = (RMMessage)client.invoke(rmRequest.getPayload(),
remotingInvocationContext);
+ }
+
+ // Disconnect the remoting client
+ client.disconnect();
+
+ // trace the incomming response message
+ //MessageTrace.traceMessage("Incoming Response Message",
resMessage);
+ //System.out.println("Incoming Response Message" + new
String(resMessage.getPayload()));
+
+ return new RMChannelResponse(resMessage);
+ }
+ catch (Throwable t)
+ {
+ return new RMChannelResponse(t);
+ }
+ }
+ }
+
+ private RMMessage createRMMessage(MessageAbstraction request, RMMetadata rmMetadata)
throws Throwable
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Marshaller marshaller =
(Marshaller)rmMetadata.getContext(SERIALIZATION_CONTEXT).get(MARSHALLER);
+ // we have to serialize message before putting it to the rm pool
+ // * contextClassloader not serializable issue
+ // * DOMUtil threadlocal issue (if message is de/serialized in separate thread)
+ marshaller.write(request, baos);
+ RMMessage rmMessage = RMMessageFactory.newMessage(baos.toByteArray(), rmMetadata);
+ return rmMessage;
+ }
+
+ private MessageAbstraction createResponse(RMMessage rmResponse, RMMetadata rmMetadata)
throws Throwable
+ {
+ Map<String, Object> invocationContext =
rmMetadata.getContext(INVOCATION_CONTEXT);
+ boolean oneWay =
(Boolean)rmMetadata.getContext(INVOCATION_CONTEXT).get(ONE_WAY_OPERATION);
+ MessageAbstraction response = null;
+ if (!oneWay)
+ {
+ byte[] payload = rmResponse.getPayload();
+ InputStream is = payload == null ? null : new
ByteArrayInputStream(rmResponse.getPayload());
+ // we have to deserialize message after pick up from the rm pool
+ // * contextClassloader not serializable issue
+ // * DOMUtil threadlocal issue (if message is de/serialized in separate
thread)
+ UnMarshaller unmarshaller =
(UnMarshaller)rmMetadata.getContext(SERIALIZATION_CONTEXT).get(UNMARSHALLER);
+ response = (MessageAbstraction)unmarshaller.read(is,
rmResponse.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT));
+ }
+ invocationContext.clear();
+ invocationContext.putAll(rmMetadata.getContext(REMOTING_INVOCATION_CONTEXT));
+ return response;
+ }
+
+ public MessageAbstraction send(MessageAbstraction request, RMMetadata rmMetadata)
throws Throwable
+ {
+ RMMessage rmRequest = createRMMessage(request, rmMetadata);
+ RMMessage rmResponse = sendToChannel(rmRequest);
+ return createResponse(rmResponse, rmMetadata);
+ }
+
+ private RMMessage sendToChannel(RMMessage request) throws Throwable
+ {
+ RMChannelResponse result = rmChannelPool.submit(new
RMChannelRequest(request)).get();
+
+ Throwable fault = result.getFault();
+ if (fault != null)
+ {
+ throw fault;
+ }
+ else
+ {
+ return result.getResponse();
+ }
+ }
+}
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java 2007-08-29
13:15:16 UTC (rev 4503)
@@ -0,0 +1,19 @@
+package org.jboss.ws.extensions.wsrm;
+
+final class RMConstant
+{
+ private static final String PREFIX = RMConstant.class.getName();
+ static final String TARGET_ADDRESS = PREFIX + ".targetAddress";
+ static final String ONE_WAY_OPERATION = PREFIX + ".oneWayOperation";
+ static final String INVOCATION_CONTEXT = PREFIX + ".invocationContext";
+ static final String MARSHALLER = PREFIX + ".marshaller";
+ static final String UNMARSHALLER = PREFIX + ".unmarshaller";
+ static final String SERIALIZATION_CONTEXT = PREFIX +
".serializationContext";
+ static final String REMOTING_INVOCATION_CONTEXT = PREFIX +
".remotingInvocationContext";
+ static final String REMOTING_CONFIGURATION_CONTEXT = PREFIX +
".remotingConfigurationContext";
+
+ private RMConstant()
+ {
+ // no instances
+ }
+}
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMHelper.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMHelper.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMHelper.java 2007-08-29
13:15:16 UTC (rev 4503)
@@ -0,0 +1,17 @@
+package org.jboss.ws.extensions.wsrm;
+
+import org.jboss.ws.core.MessageAbstraction;
+
+public final class RMHelper
+{
+ private RMHelper()
+ {
+ // no instances
+ }
+
+ public static boolean isRMMessage(MessageAbstraction requestMessage)
+ {
+ // TODO: here is the most suitable place to start RM resender
+ return (new java.io.File("/home/ropalka/rm.enabled").exists()) &&
(requestMessage != null);
+ }
+}
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMHelper.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessage.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessage.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessage.java 2007-08-29
13:15:16 UTC (rev 4503)
@@ -0,0 +1,7 @@
+package org.jboss.ws.extensions.wsrm;
+
+public interface RMMessage
+{
+ byte[] getPayload();
+ RMMetadata getMetadata();
+}
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessage.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageFactory.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageFactory.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageFactory.java 2007-08-29
13:15:16 UTC (rev 4503)
@@ -0,0 +1,14 @@
+package org.jboss.ws.extensions.wsrm;
+
+public class RMMessageFactory
+{
+ private RMMessageFactory()
+ {
+ // forbidden inheritance
+ }
+
+ public static RMMessage newMessage(byte[] payload, RMMetadata rmMetadata)
+ {
+ return new RMMessageImpl(payload, rmMetadata);
+ }
+}
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageImpl.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageImpl.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageImpl.java 2007-08-29
13:15:16 UTC (rev 4503)
@@ -0,0 +1,28 @@
+package org.jboss.ws.extensions.wsrm;
+
+/**
+ * RM message object
+ * @author richard.opalka(a)jboss.com
+ */
+public class RMMessageImpl implements RMMessage
+{
+ private final byte[] payload;
+ private final RMMetadata rmMetadata;
+
+ public RMMessageImpl(byte[] payload, RMMetadata rmMetadata)
+ {
+ super();
+ this.payload = payload;
+ this.rmMetadata = rmMetadata;
+ }
+
+ public byte[] getPayload()
+ {
+ return this.payload;
+ }
+
+ public RMMetadata getMetadata()
+ {
+ return this.rmMetadata;
+ }
+}
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMessageImpl.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMetadata.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMetadata.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMetadata.java 2007-08-29
13:15:16 UTC (rev 4503)
@@ -0,0 +1,64 @@
+package org.jboss.ws.extensions.wsrm;
+
+import java.util.Map;
+import java.util.HashMap;
+import org.jboss.remoting.marshal.Marshaller;
+import org.jboss.remoting.marshal.UnMarshaller;
+
+public final class RMMetadata
+{
+ private Map<String, Map<String, Object>> contexts = new HashMap<String,
Map<String, Object>>();
+
+ public RMMetadata(
+ String targetAddress,
+ boolean oneWay,
+ Marshaller marshaller,
+ UnMarshaller unmarshaller,
+ Map<String, Object> invocationContext,
+ Map<String, Object> remotingInvocationContext,
+ Map<String, Object> remotingConfigurationContext)
+ {
+ if (targetAddress == null)
+ throw new IllegalArgumentException("Target address cannot be null");
+
+ invocationContext.put(RMConstant.TARGET_ADDRESS, targetAddress);
+ invocationContext.put(RMConstant.ONE_WAY_OPERATION, oneWay);
+ setContext(RMConstant.INVOCATION_CONTEXT, invocationContext);
+
+ if (marshaller == null || unmarshaller == null)
+ throw new IllegalArgumentException("Unable to create de/serialization
context");
+
+ Map<String, Object> serializationContext = new HashMap<String,
Object>();
+ serializationContext.put(RMConstant.MARSHALLER, marshaller);
+ serializationContext.put(RMConstant.UNMARSHALLER, unmarshaller);
+ setContext(RMConstant.SERIALIZATION_CONTEXT, serializationContext);
+
+ if (remotingInvocationContext == null)
+ throw new IllegalArgumentException("Remoting invocation context cannot be
null");
+
+ setContext(RMConstant.REMOTING_INVOCATION_CONTEXT, remotingInvocationContext);
+
+ if (remotingConfigurationContext == null)
+ throw new IllegalArgumentException("Remoting configuraton context cannot be
null");
+
+ setContext(RMConstant.REMOTING_CONFIGURATION_CONTEXT,
remotingConfigurationContext);
+ }
+
+ public RMMetadata(Map<String, Object> remotingInvocationContext)
+ {
+ if (remotingInvocationContext == null)
+ throw new IllegalArgumentException("Remoting invocation context cannot be
null");
+
+ setContext(RMConstant.REMOTING_INVOCATION_CONTEXT, remotingInvocationContext);
+ }
+
+ void setContext(String key, Map<String, Object> ctx)
+ {
+ this.contexts.put(key, ctx);
+ }
+
+ Map<String, Object> getContext(String key)
+ {
+ return this.contexts.get(key);
+ }
+}
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMetadata.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF