Author: richard.opalka(a)jboss.com
Date: 2007-12-05 10:43:57 -0500 (Wed, 05 Dec 2007)
New Revision: 5193
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java
Removed:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java
Log:
refactoring - all WS-RM messages will go through RMChannelManager
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java 2007-12-05
15:25:26 UTC (rev 5192)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java 2007-12-05
15:43:57 UTC (rev 5193)
@@ -25,7 +25,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -66,6 +65,7 @@
public final class RMSequenceImpl implements RMSequence, RMUnassignedMessageListener
{
private static final Logger logger = Logger.getLogger(RMSequenceImpl.class);
+ private static final String PATH_PREFIX = "/temporary_listen_address/";
private static final RMConstants wsrmConstants = RMProvider.get().getConstants();
private final RMConfig wsrmConfig;
@@ -111,7 +111,7 @@
logger.debug("Backports server configuration omits host configuration -
using autodetected " + host);
}
String port = wsrmConfig.getBackPortsServer().getPort();
- String path = "/temporary_listen_address/" +
UUIDGenerator.generateRandomUUIDString();
+ String path = PATH_PREFIX + UUIDGenerator.generateRandomUUIDString();
this.backPort = new URI("http://" + host + ":" + port +
path);
}
catch (URISyntaxException use)
@@ -128,7 +128,7 @@
public final Set<Long> getReceivedInboundMessages()
{
- return Collections.unmodifiableSet(this.receivedInboundMessages);
+ return this.receivedInboundMessages;
}
public final BindingProvider getBindingProvider()
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java 2007-12-05
15:25:26 UTC (rev 5192)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java 2007-12-05
15:43:57 UTC (rev 5193)
@@ -32,10 +32,6 @@
import java.io.ByteArrayOutputStream;
import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* RM Channel
@@ -56,30 +52,8 @@
}
// Holds the list of tasks that will be send to the remoting transport channel
- private static final ExecutorService rmChannelPool = Executors.newFixedThreadPool(5,
new RMThreadFactory());
+ private static final RMChannelManager rmChannelManager =
RMChannelManagerImpl.getInstance();
- private static final class RMThreadFactory implements ThreadFactory
- {
- final ThreadGroup group;
- final AtomicInteger threadNumber = new AtomicInteger(1);
- final String namePrefix = "rm-pool-thread-";
-
- private RMThreadFactory()
- {
- SecurityManager sm = System.getSecurityManager();
- group = (sm != null) ? sm.getThreadGroup() :
Thread.currentThread().getThreadGroup();
- }
-
- public Thread newThread(Runnable r)
- {
- Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),
0);
- if (t.isDaemon())
- t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- }
private RMMessage createRMMessage(MessageAbstraction request, RMMetadata rmMetadata)
throws Throwable
{
@@ -122,16 +96,7 @@
private RMMessage sendToChannel(RMMessage request) throws Throwable
{
- RMChannelResponse result = rmChannelPool.submit(new
RMChannelRequest(request)).get();
-
- Throwable fault = result.getFault();
- if (fault != null)
- {
- throw fault;
- }
- else
- {
- return result.getResponse();
- }
+ return rmChannelManager.send(request);
}
+
}
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java
(rev 0)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java 2007-12-05
15:43:57 UTC (rev 5193)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm.transport;
+
+/**
+ * WS-RM channel manager interface
+ *
+ * @author richard.opalka(a)jboss.com
+ *
+ * @since Dec 5, 2007
+ */
+public interface RMChannelManager
+{
+
+ /**
+ * Send request to server reliably
+ * @param request RM request wrapper
+ * @return RM response wrapper
+ */
+ RMMessage send(RMMessage request) throws Throwable;
+
+}
Property changes on:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManager.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java
(rev 0)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java 2007-12-05
15:43:57 UTC (rev 5193)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm.transport;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.logging.Logger;
+
+/**
+ * WS-RM channel manager ensures message reliable delivery according to sequence
configuration
+ *
+ * @author richard.opalka(a)jboss.com
+ *
+ * @since Dec 5, 2007
+ */
+public final class RMChannelManagerImpl implements Runnable, RMChannelManager
+{
+
+ private static final Logger logger = Logger.getLogger(RMChannelManagerImpl.class);
+ private static RMChannelManager instance = new RMChannelManagerImpl();
+ private static final ExecutorService rmChannelPool = Executors.newFixedThreadPool(5,
new RMThreadFactory());
+
+ private static final class RMThreadFactory implements ThreadFactory
+ {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix = "rm-pool-thread-";
+
+ private RMThreadFactory()
+ {
+ SecurityManager sm = System.getSecurityManager();
+ group = (sm != null) ? sm.getThreadGroup() :
Thread.currentThread().getThreadGroup();
+ }
+
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(),
0);
+ if (t.isDaemon())
+ t.setDaemon(false);
+ if (t.getPriority() != Thread.NORM_PRIORITY)
+ t.setPriority(Thread.NORM_PRIORITY);
+ return t;
+ }
+ }
+
+ private RMChannelManagerImpl()
+ {
+ Thread thread = new Thread(this, "RMChannelManager");
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public static final RMChannelManager getInstance()
+ {
+ return instance;
+ }
+
+ public final void run()
+ {
+ while (true)
+ {
+ logger.debug("checking persistent store for undelivered messages");
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ie)
+ {
+ logger.warn(ie.getMessage(), ie);
+ }
+ }
+
+ }
+
+ public final RMMessage send(RMMessage request) throws Throwable
+ {
+ RMChannelResponse result = rmChannelPool.submit(new RMChannelTask(request)).get();
+
+ Throwable fault = result.getFault();
+ if (fault != null)
+ {
+ throw fault;
+ }
+ else
+ {
+ return result.getResponse();
+ }
+ }
+
+}
Property changes on:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelManagerImpl.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java 2007-12-05
15:25:26 UTC (rev 5192)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java 2007-12-05
15:43:57 UTC (rev 5193)
@@ -1,143 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.ws.extensions.wsrm.transport;
-
-import static org.jboss.ws.extensions.wsrm.RMConstant.*;
-
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.jboss.logging.Logger;
-import org.jboss.remoting.Client;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.ws.core.MessageTrace;
-import org.jboss.ws.extensions.wsrm.RMSequenceImpl;
-import org.jboss.ws.extensions.wsrm.transport.backchannel.RMCallbackHandler;
-import org.jboss.ws.extensions.wsrm.transport.backchannel.RMCallbackHandlerFactory;
-
-/**
- * Represents request that goes to the RM channel
- * @see org.jboss.ws.extensions.wsrm.transport.RMChannel
- * @author richard.opalka(a)jboss.com
- */
-public final class RMChannelRequest implements Callable<RMChannelResponse>
-{
- private static final Logger logger = Logger.getLogger(RMChannelRequest.class);
- private static final String JBOSSWS_SUBSYSTEM = "jbossws";
- private final RMMessage rmRequest;
-
- RMChannelRequest(RMMessage rmRequest)
- {
- super();
- this.rmRequest = rmRequest;
- }
-
- public RMChannelResponse call()
- {
- InvokerLocator locator = null;
- try
- {
- locator = new
InvokerLocator((String)rmRequest.getMetadata().getContext(INVOCATION_CONTEXT).get(TARGET_ADDRESS));
- }
- catch (MalformedURLException e)
- {
- return new RMChannelResponse(new IllegalArgumentException("Malformed
endpoint address", e));
- }
-
- try
- {
- URI backPort = RMTransportHelper.getBackPortURI(rmRequest);
- String messageId = RMTransportHelper.getMessageId(rmRequest);
-
- logger.debug("[WS-RM] backport URI is: " + backPort);
- RMCallbackHandler callbackHandler = null;
- // TODO: we should remember WSA:MessageId here too
-
- if (backPort != null)
- {
- callbackHandler = RMCallbackHandlerFactory.getCallbackHandler(backPort);
- RMSequenceImpl sequence = RMTransportHelper.getSequence(rmRequest);
- if (sequence != null)
- {
- callbackHandler.addUnassignedMessageListener(sequence);
- }
- }
- boolean oneWay = RMTransportHelper.isOneWayOperation(rmRequest);
-
- Client client = new Client(locator, JBOSSWS_SUBSYSTEM,
rmRequest.getMetadata().getContext(REMOTING_CONFIGURATION_CONTEXT));
- client.connect();
-
- client.setMarshaller(RMMarshaller.getInstance());
-
- if ((false == oneWay) && (null == backPort))
- client.setUnMarshaller(RMUnMarshaller.getInstance());
-
- Map<String, Object> remotingInvocationContext =
rmRequest.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT);
- if (logger.isDebugEnabled())
- logger.debug("Remoting metadata: " + remotingInvocationContext);
-
- // debug the outgoing request message
- MessageTrace.traceMessage("Outgoing RM Request Message",
rmRequest.getPayload());
-
- RMMessage rmResponse = null;
- if (oneWay && (null == backPort))
- {
- client.invokeOneway(rmRequest.getPayload(), remotingInvocationContext,
false);
- }
- else
- {
- Object retVal = client.invoke(rmRequest.getPayload(),
remotingInvocationContext);
- if ((null != retVal) && (false == (retVal instanceof RMMessage)))
- {
- String msg = retVal.getClass().getName() + ": '" + retVal +
"'";
- logger.warn(msg);
- throw new RuntimeException(msg);
- }
- rmResponse = (RMMessage)retVal;
- }
-
- // Disconnect the remoting client
- client.disconnect();
-
- // trace the incomming response message
- if (rmResponse != null)
- MessageTrace.traceMessage("Incoming RM Response Message",
rmResponse.getPayload());
-
- if (backPort != null) // TODO: backport support
- {
- if ((null != messageId) && (false ==
RMTransportHelper.isOneWayOperation(rmRequest)))
- {
- // register callbacks only for outbound messages with messageId
- return new RMChannelResponse(callbackHandler, messageId);
- }
- }
-
- return new RMChannelResponse(rmResponse);
- }
- catch (Throwable t)
- {
- return new RMChannelResponse(t);
- }
- }
-}
\ No newline at end of file
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java
(rev 0)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java 2007-12-05
15:43:57 UTC (rev 5193)
@@ -0,0 +1,143 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm.transport;
+
+import static org.jboss.ws.extensions.wsrm.RMConstant.*;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.jboss.logging.Logger;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.ws.core.MessageTrace;
+import org.jboss.ws.extensions.wsrm.RMSequenceImpl;
+import org.jboss.ws.extensions.wsrm.transport.backchannel.RMCallbackHandler;
+import org.jboss.ws.extensions.wsrm.transport.backchannel.RMCallbackHandlerFactory;
+
+/**
+ * Represents request that goes to the RM channel
+ * @see org.jboss.ws.extensions.wsrm.transport.RMChannel
+ * @author richard.opalka(a)jboss.com
+ */
+public final class RMChannelTask implements Callable<RMChannelResponse>
+{
+ private static final Logger logger = Logger.getLogger(RMChannelTask.class);
+ private static final String JBOSSWS_SUBSYSTEM = "jbossws";
+ private final RMMessage rmRequest;
+
+ RMChannelTask(RMMessage rmRequest)
+ {
+ super();
+ this.rmRequest = rmRequest;
+ }
+
+ public RMChannelResponse call()
+ {
+ InvokerLocator locator = null;
+ try
+ {
+ locator = new
InvokerLocator((String)rmRequest.getMetadata().getContext(INVOCATION_CONTEXT).get(TARGET_ADDRESS));
+ }
+ catch (MalformedURLException e)
+ {
+ return new RMChannelResponse(new IllegalArgumentException("Malformed
endpoint address", e));
+ }
+
+ try
+ {
+ URI backPort = RMTransportHelper.getBackPortURI(rmRequest);
+ String messageId = RMTransportHelper.getMessageId(rmRequest);
+
+ logger.debug("[WS-RM] backport URI is: " + backPort);
+ RMCallbackHandler callbackHandler = null;
+ // TODO: we should remember WSA:MessageId here too
+
+ if (backPort != null)
+ {
+ callbackHandler = RMCallbackHandlerFactory.getCallbackHandler(backPort);
+ RMSequenceImpl sequence = RMTransportHelper.getSequence(rmRequest);
+ if (sequence != null)
+ {
+ callbackHandler.addUnassignedMessageListener(sequence);
+ }
+ }
+ boolean oneWay = RMTransportHelper.isOneWayOperation(rmRequest);
+
+ Client client = new Client(locator, JBOSSWS_SUBSYSTEM,
rmRequest.getMetadata().getContext(REMOTING_CONFIGURATION_CONTEXT));
+ client.connect();
+
+ client.setMarshaller(RMMarshaller.getInstance());
+
+ if ((false == oneWay) && (null == backPort))
+ client.setUnMarshaller(RMUnMarshaller.getInstance());
+
+ Map<String, Object> remotingInvocationContext =
rmRequest.getMetadata().getContext(REMOTING_INVOCATION_CONTEXT);
+ if (logger.isDebugEnabled())
+ logger.debug("Remoting metadata: " + remotingInvocationContext);
+
+ // debug the outgoing request message
+ MessageTrace.traceMessage("Outgoing RM Request Message",
rmRequest.getPayload());
+
+ RMMessage rmResponse = null;
+ if (oneWay && (null == backPort))
+ {
+ client.invokeOneway(rmRequest.getPayload(), remotingInvocationContext,
false);
+ }
+ else
+ {
+ Object retVal = client.invoke(rmRequest.getPayload(),
remotingInvocationContext);
+ if ((null != retVal) && (false == (retVal instanceof RMMessage)))
+ {
+ String msg = retVal.getClass().getName() + ": '" + retVal +
"'";
+ logger.warn(msg);
+ throw new RuntimeException(msg);
+ }
+ rmResponse = (RMMessage)retVal;
+ }
+
+ // Disconnect the remoting client
+ client.disconnect();
+
+ // trace the incomming response message
+ if (rmResponse != null)
+ MessageTrace.traceMessage("Incoming RM Response Message",
rmResponse.getPayload());
+
+ if (backPort != null) // TODO: backport support
+ {
+ if ((null != messageId) && (false ==
RMTransportHelper.isOneWayOperation(rmRequest)))
+ {
+ // register callbacks only for outbound messages with messageId
+ return new RMChannelResponse(callbackHandler, messageId);
+ }
+ }
+
+ return new RMChannelResponse(rmResponse);
+ }
+ catch (Throwable t)
+ {
+ return new RMChannelResponse(t);
+ }
+ }
+}
\ No newline at end of file
Property changes on:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF