Author: richard.opalka(a)jboss.com
Date: 2007-12-04 08:30:00 -0500 (Tue, 04 Dec 2007)
New Revision: 5168
Removed:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceManager.java
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java
Log:
proxy instance is not thread safe - thus removing sequence manager thread
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java 2007-12-04
13:21:58 UTC (rev 5167)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java 2007-12-04
13:30:00 UTC (rev 5168)
@@ -188,7 +188,6 @@
synchronized (lock)
{
this.client = client;
- RMSequenceManager.getInstance().register(this);
}
}
@@ -239,7 +238,6 @@
{
this.client.setWSRMSequence(null);
this.discarded = true;
- RMSequenceManager.getInstance().unregister(this);
}
finally
{
@@ -306,7 +304,6 @@
requestContext.put(RMConstant.REQUEST_CONTEXT, rmRequestContext);
// call stub method
this.client.invoke(operationQName, new Object[] {},
client.getBindingProvider().getResponseContext());
- //RMSequenceManager.getInstance().unregister(this); // TODO: each sequence will
be unregistered by sequence manager
}
catch (Exception e)
{
@@ -316,23 +313,14 @@
public final void sendCloseMessage()
{
- /*
synchronized (lock)
{
while (this.isAckRequested())
{
- try
- {
- logger.debug("Waiting till all inbound sequence acknowledgements will
be sent");
- lock.wait(100);
- }
- catch (InterruptedException ie)
- {
- logger.warn(ie.getMessage(), ie);
- }
+ logger.debug("Waiting till all inbound sequence acknowledgements will be
sent");
+ sendSequenceAcknowledgementMessage();
}
}
- */
Map<String, Object> wsrmReqCtx = new HashMap<String, Object>();
wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, false);
this.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
@@ -342,11 +330,14 @@
public final void sendTerminateMessage()
{
sendMessage(RMConstant.TERMINATE_SEQUENCE_WSA_ACTION,
wsrmConstants.getTerminateSequenceQName());
- RMSequenceManager.getInstance().unregister(this);
}
public final void sendSequenceAcknowledgementMessage()
{
+ Map<String, Object> wsrmReqCtx = new HashMap<String, Object>();
+ wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, true);
+ this.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
+ ackRequested(false);
sendMessage(RMConstant.SEQUENCE_ACKNOWLEDGEMENT_WSA_ACTION,
wsrmConstants.getSequenceAcknowledgementQName());
}
Deleted:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceManager.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceManager.java 2007-12-04
13:21:58 UTC (rev 5167)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceManager.java 2007-12-04
13:30:00 UTC (rev 5168)
@@ -1,142 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.ws.extensions.wsrm;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.jboss.logging.Logger;
-
-/**
- * TODO: Add comment
- *
- * @author richard.opalka(a)jboss.com
- *
- * @since Nov 26, 2007
- */
-public final class RMSequenceManager implements Runnable
-{
- private static final Logger logger = Logger.getLogger(RMSequenceManager.class);
-
- private static RMSequenceManager instance;
- private static final Lock classLock = new ReentrantLock();
- private final Object instanceLock = new Object();
- private final List<RMSequenceImpl> sequences = new
LinkedList<RMSequenceImpl>();
- private AtomicBoolean destroyed = new AtomicBoolean();
-
- private RMSequenceManager()
- {
- // hidden constructor
- }
-
- public static RMSequenceManager getInstance()
- {
- classLock.lock();
- try
- {
- if (instance == null)
- {
- instance = new RMSequenceManager();
- new Thread(instance, "RMSequenceManager").start();
- logger.debug("started");
- }
- return instance;
- }
- finally
- {
- classLock.unlock();
- }
- }
-
- public void register(RMSequenceImpl sequence)
- {
- synchronized (instanceLock)
- {
- this.sequences.add(sequence);
- }
- }
-
- public void unregister(RMSequenceImpl sequence)
- {
- synchronized (instanceLock)
- {
- this.sequences.remove(sequence);
- if (this.sequences.size() == 0)
- this.shutdown();
- }
- }
-
- public void run()
- {
- while (destroyed.get() == false)
- {
- synchronized (instanceLock)
- {
- for (RMSequenceImpl sequence : sequences)
- {
- logger.debug("Processing outbound sequence " +
sequence.getOutboundId());
- /*
- if (sequence.isAckRequested())
- {
- logger.debug("Sending ack for inbound sequence " +
sequence.getInboundId());
- Map<String, Object> wsrmReqCtx = new HashMap<String,
Object>();
- wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, true);
-
sequence.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
- sequence.sendSequenceAcknowledgementMessage();
- sequence.ackRequested(false);
- }
- */
- }
- }
-
- try
- {
- logger.debug("sleeping for 10 miliseconds");
- Thread.sleep(10);
- }
- catch (InterruptedException ie)
- {
- logger.warn(ie);
- }
- }
- }
-
- public void shutdown()
- {
- classLock.lock();
- try
- {
- instance = null;
- this.destroyed.set(true);
- logger.debug("destroyed");
- }
- finally
- {
- classLock.unlock();
- }
- }
-}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java 2007-12-04
13:21:58 UTC (rev 5167)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java 2007-12-04
13:30:00 UTC (rev 5168)
@@ -107,7 +107,14 @@
}
else
{
- rmResponse = (RMMessage)client.invoke(rmRequest.getPayload(),
remotingInvocationContext);
+ Object retVal = client.invoke(rmRequest.getPayload(),
remotingInvocationContext);
+ if ((null != retVal) && (false == (retVal instanceof RMMessage)))
+ {
+ String msg = retVal.getClass().getName() + ": '" + retVal +
"'";
+ logger.warn(msg);
+ throw new RuntimeException(msg);
+ }
+ rmResponse = (RMMessage)retVal;
}
// Disconnect the remoting client