Author: richard.opalka(a)jboss.com
Date: 2007-08-29 12:29:55 -0400 (Wed, 29 Aug 2007)
New Revision: 4505
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelRequest.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelResponse.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMarshaller.java
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMUnMarshaller.java
Modified:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java
Log:
little refactoring
Modified:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java 2007-08-29
14:15:04 UTC (rev 4504)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannel.java 2007-08-29
16:29:55 UTC (rev 4505)
@@ -1,3 +1,24 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
package org.jboss.ws.extensions.wsrm;
import static org.jboss.ws.extensions.wsrm.RMConstant.*;
@@ -2,30 +23,18 @@
-import org.jboss.remoting.Client;
-import org.jboss.remoting.InvocationRequest;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.invocation.OnewayInvocation;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
-import org.jboss.ws.core.HTTPMessageImpl;
import org.jboss.ws.core.MessageAbstraction;
-import org.jboss.ws.core.MessageTrace;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.io.ByteArrayOutputStream;
-import java.net.MalformedURLException;
+
import java.util.Map;
-import java.util.HashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * RM Channel Singleton
+ * RM Channel
* @author richard.opalka(a)jboss.com
@@ -38,7 +47,6 @@
private RMChannel()
{
- super();
// forbidden inheritance
}
@@ -47,13 +55,14 @@
return INSTANCE;
}
+ // Holds the list of tasks that will be send to the remoting transport channel
private static final ExecutorService rmChannelPool = Executors.newFixedThreadPool(5,
new RMThreadFactory());
-
+
private static final class RMThreadFactory implements ThreadFactory
{
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
- final String namePrefix = "rm-channel-pool-thread-";
+ final String namePrefix = "rm-pool-thread-";
private RMThreadFactory()
{
@@ -71,176 +80,7 @@
return t;
}
}
-
- private static final class RMMarshaller implements Marshaller
- {
- private static final Marshaller INSTANCE = new RMMarshaller();
-
- public Marshaller cloneMarshaller() throws CloneNotSupportedException
- {
- return getInstance();
- }
-
- public static Marshaller getInstance()
- {
- return INSTANCE;
- }
-
- public void write(Object dataObject, OutputStream output) throws IOException
- {
- if (dataObject instanceof InvocationRequest)
- dataObject = ((InvocationRequest)dataObject).getParameter();
- if (dataObject instanceof OnewayInvocation)
- dataObject = ((OnewayInvocation)dataObject).getParameters()[0];
-
- if ((dataObject instanceof byte[]) == false)
- throw new IllegalArgumentException("Not a byte array: " +
dataObject);
-
- output.write((byte[])dataObject);
- output.flush();
- }
- }
-
- private static final class RMUnMarshaller implements UnMarshaller
- {
- private static final UnMarshaller INSTANCE = new RMUnMarshaller();
-
- public UnMarshaller cloneUnMarshaller() throws CloneNotSupportedException
- {
- return getInstance();
- }
-
- public static UnMarshaller getInstance()
- {
- return INSTANCE;
- }
-
- public Object read(InputStream inputStream, Map metadata) throws IOException,
ClassNotFoundException
- {
- if (inputStream == null)
- return RMMessageFactory.newMessage(null, new RMMetadata(metadata)); //
WSAddressing reply-to
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
- int count = -1;
- count = inputStream.read(buffer);
- while (count != -1)
- {
- baos.write(buffer, 0, count);
- count = inputStream.read(buffer);
- }
- return RMMessageFactory.newMessage(baos.toByteArray(), new
RMMetadata(metadata));
- }
-
- public void setClassLoader(ClassLoader classloader)
- {
- // do nothing
- }
- }
-
- private static final class RMChannelResponse
- {
- private final Throwable fault;
- private final RMMessage result;
-
- public RMChannelResponse(Throwable fault)
- {
- this(null, fault);
- }
-
- public RMChannelResponse(RMMessage result)
- {
- this(result, null);
- }
-
- private RMChannelResponse(RMMessage result, Throwable fault)
- {
- super();
- this.result = result;
- this.fault = fault;
- }
-
- public Throwable getFault()
- {
- return this.fault;
- }
-
- public RMMessage getResponse()
- {
- return this.result;
- }
- }
-
- private static final class RMChannelRequest implements
Callable<RMChannelResponse>
- {
- private final RMMessage rmRequest;
- private static final String JBOSSWS_SUBSYSTEM = "jbossws";
-
- private RMChannelRequest(RMMessage rmRequest)
- {
- super();
- this.rmRequest = rmRequest;
- }
-
- public RMChannelResponse call()
- {
- InvokerLocator locator = null;
- try
- {
- locator = new
InvokerLocator((String)rmRequest.getMetadata().getContext(INVOCATION_CONTEXT).get(TARGET_ADDRESS));
- }
- catch (MalformedURLException e)
- {
- return new RMChannelResponse(new IllegalArgumentException("Malformed
endpoint address", e));
- }
-
- try
- {
- Client client = new Client(locator, JBOSSWS_SUBSYSTEM,
rmRequest.getMetadata().getContext(REMOTING_CONFIGURATION_CONTEXT));
- client.connect();
-
- client.setMarshaller(RMMarshaller.getInstance());
-
- boolean oneWay =
(Boolean)rmRequest.getMetadata().getContext(RMConstant.INVOCATION_CONTEXT).get(ONE_WAY_OPERATION);
- if (!oneWay)
- client.setUnMarshaller(RMUnMarshaller.getInstance());
-
- //if (log.isDebugEnabled())
- // log.debug("Remoting metadata: " + rmRequest.getMetadata());
- //System.out.println("Remoting metadata: " +
rmRequest.getMetadata());
-
- // debug the outgoing message
- //MessageTrace.traceMessage("Outgoing Request Message",
reqMessage);
- //System.out.println("Outgoing Request Message" + new
String(rmRequest.getPayload()));
-
- RMMessage resMessage = null;
- Map<String, Object> remotingInvocationContext =
rmRequest.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT);
- if (oneWay)
- {
- client.invokeOneway(rmRequest.getPayload(), remotingInvocationContext,
false);
- }
- else
- {
- resMessage = (RMMessage)client.invoke(rmRequest.getPayload(),
remotingInvocationContext);
- }
-
- // Disconnect the remoting client
- client.disconnect();
-
- // trace the incomming response message
- //MessageTrace.traceMessage("Incoming Response Message",
resMessage);
- //System.out.println("Incoming Response Message" + new
String(resMessage.getPayload()));
-
- return new RMChannelResponse(resMessage);
- }
- catch (Throwable t)
- {
- return new RMChannelResponse(t);
- }
- }
- }
-
private RMMessage createRMMessage(MessageAbstraction request, RMMetadata rmMetadata)
throws Throwable
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelRequest.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelRequest.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelRequest.java 2007-08-29
16:29:55 UTC (rev 4505)
@@ -0,0 +1,109 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm;
+
+import static org.jboss.ws.extensions.wsrm.RMConstant.INVOCATION_CONTEXT;
+import static org.jboss.ws.extensions.wsrm.RMConstant.ONE_WAY_OPERATION;
+import static org.jboss.ws.extensions.wsrm.RMConstant.REMOTING_CONFIGURATION_CONTEXT;
+import static org.jboss.ws.extensions.wsrm.RMConstant.REMOTING_INVOCATION_CONTEXT;
+import static org.jboss.ws.extensions.wsrm.RMConstant.TARGET_ADDRESS;
+
+import java.net.MalformedURLException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.ws.core.MessageTrace;
+
+/**
+ * Represents request that goes to the RM channel
+ * @see org.jboss.ws.extensions.wsrm.RMChannel
+ * @author richard.opalka(a)jboss.com
+ */
+final class RMChannelRequest implements Callable<RMChannelResponse>
+{
+ private static final Logger log = Logger.getLogger(RMChannelRequest.class);
+ private static final String JBOSSWS_SUBSYSTEM = "jbossws";
+ private final RMMessage rmRequest;
+
+ RMChannelRequest(RMMessage rmRequest)
+ {
+ super();
+ this.rmRequest = rmRequest;
+ }
+
+ public RMChannelResponse call()
+ {
+ InvokerLocator locator = null;
+ try
+ {
+ locator = new
InvokerLocator((String)rmRequest.getMetadata().getContext(INVOCATION_CONTEXT).get(TARGET_ADDRESS));
+ }
+ catch (MalformedURLException e)
+ {
+ return new RMChannelResponse(new IllegalArgumentException("Malformed
endpoint address", e));
+ }
+
+ try
+ {
+ Client client = new Client(locator, JBOSSWS_SUBSYSTEM,
rmRequest.getMetadata().getContext(REMOTING_CONFIGURATION_CONTEXT));
+ client.connect();
+
+ client.setMarshaller(RMMarshaller.getInstance());
+
+ boolean oneWay =
(Boolean)rmRequest.getMetadata().getContext(RMConstant.INVOCATION_CONTEXT).get(ONE_WAY_OPERATION);
+ if (!oneWay)
+ client.setUnMarshaller(RMUnMarshaller.getInstance());
+
+ Map<String, Object> remotingInvocationContext =
rmRequest.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT);
+ if (log.isDebugEnabled())
+ log.debug("Remoting metadata: " + remotingInvocationContext);
+
+ // debug the outgoing request message
+ MessageTrace.traceMessage("Outgoing RM Request Message",
rmRequest.getPayload());
+
+ RMMessage rmResponse = null;
+ if (oneWay)
+ {
+ client.invokeOneway(rmRequest.getPayload(), remotingInvocationContext,
false);
+ }
+ else
+ {
+ rmResponse = (RMMessage)client.invoke(rmRequest.getPayload(),
remotingInvocationContext);
+ }
+
+ // Disconnect the remoting client
+ client.disconnect();
+
+ // trace the incomming response message
+ MessageTrace.traceMessage("Incoming RM Response Message",
rmResponse.getPayload());
+
+ return new RMChannelResponse(rmResponse);
+ }
+ catch (Throwable t)
+ {
+ return new RMChannelResponse(t);
+ }
+ }
+}
\ No newline at end of file
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelRequest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelResponse.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelResponse.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelResponse.java 2007-08-29
16:29:55 UTC (rev 4505)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm;
+
+/**
+ * Represents response that goes from the RM channel
+ * @see org.jboss.ws.extensions.wsrm.RMChannel
+ * @author richard.opalka(a)jboss.com
+ */
+final class RMChannelResponse
+{
+ private final Throwable fault;
+ private final RMMessage result;
+
+ public RMChannelResponse(Throwable fault)
+ {
+ this(null, fault);
+ }
+
+ public RMChannelResponse(RMMessage result)
+ {
+ this(result, null);
+ }
+
+ private RMChannelResponse(RMMessage result, Throwable fault)
+ {
+ super();
+ this.result = result;
+ this.fault = fault;
+ }
+
+ public Throwable getFault()
+ {
+ return this.fault;
+ }
+
+ public RMMessage getResponse()
+ {
+ return this.result;
+ }
+}
\ No newline at end of file
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMChannelResponse.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMarshaller.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMarshaller.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMarshaller.java 2007-08-29
16:29:55 UTC (rev 4505)
@@ -0,0 +1,64 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.invocation.OnewayInvocation;
+import org.jboss.remoting.marshal.Marshaller;
+
+/**
+ * Marshalls byte array to the output stream
+ * @author richard.opalka(a)jboss.com
+ */
+final class RMMarshaller implements Marshaller
+{
+ private static final Marshaller INSTANCE = new RMMarshaller();
+
+ public Marshaller cloneMarshaller() throws CloneNotSupportedException
+ {
+ return getInstance();
+ }
+
+ public static Marshaller getInstance()
+ {
+ return INSTANCE;
+ }
+
+ public void write(Object dataObject, OutputStream output) throws IOException
+ {
+ if (dataObject instanceof InvocationRequest)
+ dataObject = ((InvocationRequest)dataObject).getParameter();
+
+ if (dataObject instanceof OnewayInvocation)
+ dataObject = ((OnewayInvocation)dataObject).getParameters()[0];
+
+ if ((dataObject instanceof byte[]) == false)
+ throw new IllegalArgumentException("Not a byte array: " +
dataObject);
+
+ output.write((byte[])dataObject);
+ output.flush();
+ }
+}
+
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMMarshaller.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMUnMarshaller.java
===================================================================
---
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMUnMarshaller.java
(rev 0)
+++
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMUnMarshaller.java 2007-08-29
16:29:55 UTC (rev 4505)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.jboss.remoting.marshal.UnMarshaller;
+
+/**
+ * Unmarshalls byte array from the input stream
+ * @author richard.opalka(a)jboss.com
+ */
+final class RMUnMarshaller implements UnMarshaller
+{
+ private static final UnMarshaller INSTANCE = new RMUnMarshaller();
+
+ public UnMarshaller cloneUnMarshaller() throws CloneNotSupportedException
+ {
+ return getInstance();
+ }
+
+ public static UnMarshaller getInstance()
+ {
+ return INSTANCE;
+ }
+
+ public Object read(InputStream is, Map metadata) throws IOException,
ClassNotFoundException
+ {
+ if (is == null)
+ return RMMessageFactory.newMessage(null, new RMMetadata(metadata)); // TODO:
investigate why is == null (WSAddressing reply-to test)
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int count = -1;
+ count = is.read(buffer);
+ while (count != -1)
+ {
+ baos.write(buffer, 0, count);
+ count = is.read(buffer);
+ }
+ return RMMessageFactory.newMessage(baos.toByteArray(), new RMMetadata(metadata));
+ }
+
+ public void setClassLoader(ClassLoader classloader)
+ {
+ // do nothing
+ }
+}
+
Property changes on:
stack/native/branches/ropalka/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMUnMarshaller.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF