[jboss-cvs] JBoss Messaging SVN: r5905 - in trunk: src/main/org/jboss/messaging/core/client and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Feb 19 14:09:41 EST 2009
Author: timfox
Date: 2009-02-19 14:09:40 -0500 (Thu, 19 Feb 2009)
New Revision: 5905
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
Modified:
trunk/.classpath
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
trunk/src/main/org/jboss/messaging/util/SimpleString.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
Log:
various fixes and tweaks, more tests
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/.classpath 2009-02-19 19:09:40 UTC (rev 5905)
@@ -65,6 +65,5 @@
<classpathentry kind="lib" path="thirdparty/netty/lib/netty-3.1.0.ALPHA2.jar" sourcepath="thirdparty/netty/lib/netty-3.1.0.ALPHA2-sources.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M4.jar"/>
<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12-1.5.2.jar"/>
- <classpathentry kind="lib" path="thirdparty/slf4j/api/lib/slf4j-api-1.5.2.jar"/>
<classpathentry kind="output" path="eclipse-output"/>
</classpath>
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -162,10 +162,10 @@
void rollback() throws MessagingException;
/**
- * @param isLastMessageAsDelived the first message on deliveringMessage Buffer is considered as delivered
+ * @param considerLastMessageAsDelivered the first message on deliveringMessage Buffer is considered as delivered
* @throws MessagingException
*/
- void rollback(boolean isLastMessageAsDelived) throws MessagingException;
+ void rollback(boolean considerLastMessageAsDelivered) throws MessagingException;
void close() throws MessagingException;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -809,7 +809,7 @@
{
return true;
}
-
+
boolean ok = false;
// We lock the channel to prevent any packets to be added to the resend
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -608,7 +608,6 @@
Map<RemotingConnection, List<ClientSessionInternal>> sessionsPerConnection = new HashMap<RemotingConnection, List<ClientSessionInternal>>();
-
for (Map.Entry<ClientSessionInternal, RemotingConnection> entry : sessions.entrySet())
{
ClientSessionInternal session = entry.getKey();
@@ -680,7 +679,7 @@
if (!b)
{
//If a session fails to re-attach we doom the lot, but we make sure we try all sessions and don't exit early
- //or connections might be left lying around
+ //or connections might be left lying around
ok = false;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -34,12 +34,11 @@
import java.util.List;
import java.util.Map;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.cluster.DiscoveryGroup;
import org.jboss.messaging.util.Pair;
-import org.jboss.messaging.util.SimpleString;
/**
* A DiscoveryGroupImpl
@@ -61,7 +60,7 @@
private final String name;
- private final Thread thread;
+ private Thread thread;
private boolean received;
@@ -74,6 +73,10 @@
private volatile boolean started;
private final String nodeID;
+
+ private final InetAddress groupAddress;
+
+ private final int groupPort;
public DiscoveryGroupImpl(final String nodeID,
final String name,
@@ -83,19 +86,13 @@
{
this.nodeID = nodeID;
- this.name = name;
+ this.name = name;
- socket = new MulticastSocket(groupPort);
-
- socket.joinGroup(groupAddress);
-
- socket.setSoTimeout(SOCKET_TIMEOUT);
-
- this.timeout = timeout;
-
- thread = new Thread(this);
-
- thread.setDaemon(true);
+ this.timeout = timeout;
+
+ this.groupAddress = groupAddress;
+
+ this.groupPort = groupPort;
}
public synchronized void start() throws Exception
@@ -105,8 +102,18 @@
return;
}
+ socket = new MulticastSocket(groupPort);
+
+ socket.joinGroup(groupAddress);
+
+ socket.setSoTimeout(SOCKET_TIMEOUT);
+
started = true;
+ thread = new Thread(this);
+
+ thread.setDaemon(true);
+
thread.start();
}
@@ -131,6 +138,10 @@
}
socket.close();
+
+ socket = null;
+
+ thread = null;
}
public boolean isStarted()
@@ -194,7 +205,7 @@
{
return;
}
-
+
final DatagramPacket packet = new DatagramPacket(data, data.length);
try
@@ -221,7 +232,7 @@
if (nodeID.equals(originatingNodeID))
{
- //Ignore traffic from own node
+ // Ignore traffic from own node
continue;
}
@@ -324,40 +335,4 @@
}
}
- private String replaceWildcardChars(final String str)
- {
- return str.replace('.', '-');
- }
-
- private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
- {
- StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
-
- if (config.getParams() != null)
- {
- if (!config.getParams().isEmpty())
- {
- str.append("?");
- }
-
- boolean first = true;
- for (Map.Entry<String, Object> entry : config.getParams().entrySet())
- {
- if (!first)
- {
- str.append("&");
- }
- String encodedKey = replaceWildcardChars(entry.getKey());
-
- String val = entry.getValue().toString();
- String encodedVal = replaceWildcardChars(val);
-
- str.append(encodedKey).append('=').append(encodedVal);
-
- first = false;
- }
- }
-
- return new SimpleString(str.toString());
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -24,7 +24,6 @@
package org.jboss.messaging.core.management.impl;
-
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
@@ -182,7 +181,7 @@
broadcaster,
queueFactory);
ObjectName objectName = ObjectNames.getMessagingServerObjectName();
- registerInJMX(objectName, new ReplicationAwareMessagingServerControlWrapper(objectName,
+ registerInJMX(objectName, new ReplicationAwareMessagingServerControlWrapper(objectName,
managedServer,
replicationInvoker));
registerInRegistry(objectName, managedServer);
@@ -242,9 +241,7 @@
messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
ObjectName objectName = ObjectNames.getQueueObjectName(address, queue.getName());
QueueControl queueControl = new QueueControl(queue, postOffice, addressSettingsRepository, counter);
- registerInJMX(objectName, new ReplicationAwareQueueControlWrapper(objectName,
- queueControl,
- replicationInvoker));
+ registerInJMX(objectName, new ReplicationAwareQueueControlWrapper(objectName, queueControl, replicationInvoker));
registerInRegistry(objectName, queueControl);
if (log.isDebugEnabled())
@@ -352,8 +349,11 @@
{
exceptionMessage = ((InvocationTargetException)e).getTargetException().getMessage();
}
- message.putStringProperty(ManagementHelper.HDR_JMX_OPERATION_EXCEPTION,
- new SimpleString(exceptionMessage));
+ if (e != null)
+ {
+ message.putStringProperty(ManagementHelper.HDR_JMX_OPERATION_EXCEPTION,
+ new SimpleString(exceptionMessage));
+ }
}
}
}
@@ -426,47 +426,47 @@
{
listeners.remove(listener);
}
-
+
public SimpleString getManagementAddress()
{
return managementAddress;
}
-
+
public void setManagementAddress(SimpleString managementAddress)
{
this.managementAddress = managementAddress;
}
-
+
public SimpleString getManagementNotificationAddress()
{
return managementNotificationAddress;
}
-
+
public void setManagementNotificationAddress(SimpleString managementNotificationAddress)
{
- this.managementNotificationAddress = managementNotificationAddress;
+ this.managementNotificationAddress = managementNotificationAddress;
}
public String getClusterPassword()
{
return managementClusterPassword;
}
-
+
public void setClusterPassword(String clusterPassword)
{
this.managementClusterPassword = clusterPassword;
}
-
+
public long getManagementRequestTimeout()
{
return managementRequestTimeout;
}
-
+
public void setManagementRequestTimeout(long timeout)
{
this.managementRequestTimeout = timeout;
}
-
+
public ReplicationOperationInvoker getReplicationOperationInvoker()
{
return replicationInvoker;
@@ -476,7 +476,9 @@
public void start() throws Exception
{
- replicationInvoker = new ReplicationOperationInvokerImpl(managementClusterPassword, managementAddress, managementRequestTimeout);
+ replicationInvoker = new ReplicationOperationInvokerImpl(managementClusterPassword,
+ managementAddress,
+ managementRequestTimeout);
started = true;
}
@@ -489,11 +491,11 @@
unregisterResource(objectName);
}
- //FIXME the replicationInvoker should be properly stopped.
+ // FIXME the replicationInvoker should be properly stopped.
// the code is commented since stopping the invoker will interact
// with the remoting service which is stopped first when stopping the server
// replicationInvoker.stop();
-
+
started = false;
}
@@ -570,9 +572,10 @@
{
notifProps = new TypedProperties();
}
-
- notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));
-
+
+ notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
+ new SimpleString(notification.getType().toString()));
+
notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
notificationMessage.putTypedProperties(notifProps);
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -62,16 +62,16 @@
private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
-
+
private volatile boolean routeWhenNoConsumers;
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
- {
+ {
this.routeWhenNoConsumers = routeWhenNoConsumers;
}
-
+
public Collection<Binding> getBindings()
- {
+ {
return bindingsMap.values();
}
@@ -101,8 +101,8 @@
bindings.add(binding);
}
-
- bindingsMap.put(binding.getID(), binding);
+
+ bindingsMap.put(binding.getID(), binding);
}
public void removeBinding(final Binding binding)
@@ -128,54 +128,54 @@
}
}
- bindingsMap.remove(binding.getID());
+ bindingsMap.remove(binding.getID());
}
private void routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
{
byte[] ids = (byte[])message.getProperty(MessageImpl.HDR_ROUTE_TO_IDS);
-
+
ByteBuffer buff = ByteBuffer.wrap(ids);
-
+
Set<Bindable> chosen = new HashSet<Bindable>();
-
+
while (buff.hasRemaining())
{
int bindingID = buff.getInt();
-
+
Binding binding = bindingsMap.get(bindingID);
-
+
if (binding == null)
{
- //The binding has been closed - we need to route the message somewhere else...............
+ // The binding has been closed - we need to route the message somewhere else...............
throw new IllegalStateException("Binding not found when routing from cluster - it must have closed");
-
- //FIXME need to deal with this better
+
+ // FIXME need to deal with this better
}
-
+
binding.willRoute(message);
-
+
chosen.add(binding.getBindable());
}
-
+
for (Bindable bindable : chosen)
{
bindable.preroute(message, tx);
}
-
+
for (Bindable bindable : chosen)
{
bindable.route(message, tx);
}
}
-
+
public boolean redistribute(final ServerMessage message, final SimpleString routingName, final Transaction tx) throws Exception
{
if (routeWhenNoConsumers)
{
return false;
}
-
+
List<Binding> bindings = routingNameBindingMap.get(routingName);
if (bindings == null)
@@ -195,7 +195,7 @@
Binding theBinding = null;
- //TODO - combine this with similar logic in route()
+ // TODO - combine this with similar logic in route()
while (true)
{
Binding binding;
@@ -217,40 +217,40 @@
break;
}
}
-
+
pos = incrementPos(pos, length);
Filter filter = binding.getFilter();
-
+
boolean highPrior = binding.isHighAcceptPriority(message);
-
+
if (highPrior && (filter == null || filter.match(message)))
- {
+ {
theBinding = binding;
- break;
+ break;
}
-
+
if (pos == startPos)
- {
+ {
break;
}
}
-
+
routingNamePositions.put(routingName, pos);
if (theBinding != null)
- {
+ {
theBinding.willRoute(message);
-
+
theBinding.getBindable().preroute(message, tx);
-
- theBinding.getBindable().route(message, tx);
-
+
+ theBinding.getBindable().route(message, tx);
+
return true;
}
else
- {
+ {
return false;
}
}
@@ -267,38 +267,38 @@
else
{
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
- {
+ {
routeFromCluster(message, tx);
}
else
{
Set<Bindable> chosen = new HashSet<Bindable>();
-
+
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
{
SimpleString routingName = entry.getKey();
-
+
List<Binding> bindings = entry.getValue();
-
+
if (bindings == null)
{
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
continue;
}
-
+
Integer ipos = routingNamePositions.get(routingName);
-
+
int pos = ipos != null ? ipos.intValue() : 0;
-
+
int length = bindings.size();
-
+
int startPos = pos;
-
+
Binding theBinding = null;
-
+
int lastLowPriorityBinding = -1;
-
+
while (true)
{
Binding binding;
@@ -312,7 +312,7 @@
if (!bindings.isEmpty())
{
pos = 0;
-
+
continue;
}
else
@@ -320,19 +320,19 @@
break;
}
}
-
+
Filter filter = binding.getFilter();
-
+
if (filter == null || filter.match(message))
- {
+ {
// bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
// unnecessary overhead)
if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
{
theBinding = binding;
-
+
pos = incrementPos(pos, length);
-
+
break;
}
else
@@ -343,13 +343,13 @@
}
}
}
-
+
pos = incrementPos(pos, length);
-
+
if (pos == startPos)
{
if (lastLowPriorityBinding != -1)
- {
+ {
try
{
theBinding = bindings.get(pos);
@@ -360,9 +360,9 @@
if (!bindings.isEmpty())
{
pos = 0;
-
+
lastLowPriorityBinding = -1;
-
+
continue;
}
else
@@ -370,32 +370,32 @@
break;
}
}
-
+
pos = lastLowPriorityBinding;
-
+
pos = incrementPos(pos, length);
}
break;
}
}
-
+
if (theBinding != null)
{
theBinding.willRoute(message);
-
+
chosen.add(theBinding.getBindable());
}
routingNamePositions.put(routingName, pos);
}
-
- //TODO refactor to do this is one iteration
-
+
+ // TODO refactor to do this is one iteration
+
for (Bindable bindable : chosen)
{
bindable.preroute(message, tx);
}
-
+
for (Bindable bindable : chosen)
{
bindable.route(message, tx);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -260,7 +260,8 @@
// debug only stuff
private boolean createdActive;
-
+
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -295,7 +296,7 @@
final long connectionTTL)
{
- this(transportConnection, -1, -1, connectionTTL, null, interceptors, replicatingConnection, active, false);
+ this(transportConnection, -1, -1, connectionTTL, null, interceptors, replicatingConnection, active, false);
}
private RemotingConnectionImpl(final Connection transportConnection,
@@ -448,7 +449,7 @@
destroyed = true;
}
- log.warn("Connection failed " + System.identityHashCode(this) + " " + me.getMessage());
+ log.warn("Connection failed, client " + client + " " + System.identityHashCode(this) + " " + me.getMessage(), me);
// Then call the listeners
callListeners(me);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/RollbackMessage.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -45,32 +45,33 @@
super(SESS_ROLLBACK);
}
- public RollbackMessage(final boolean isLastMessageAsDelived)
+ public RollbackMessage(final boolean considerLastMessageAsDelivered)
{
super(SESS_ROLLBACK);
- this.isLastMessageAsDelived = isLastMessageAsDelived;
+
+ this.considerLastMessageAsDelivered = considerLastMessageAsDelivered;
}
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
- private boolean isLastMessageAsDelived;
+ private boolean considerLastMessageAsDelivered;
/**
- * @return the isLastMessageAsDelived
+ * @return the considerLastMessageAsDelivered
*/
- public boolean isLastMessageAsDelived()
+ public boolean isConsiderLastMessageAsDelivered()
{
- return isLastMessageAsDelived;
+ return considerLastMessageAsDelivered;
}
/**
- * @param isLastMessageAsDelived the isLastMessageAsDelived to set
+ * @param considerLastMessageAsDelivered the considerLastMessageAsDelivered to set
*/
- public void setLastMessageAsDelived(final boolean isLastMessageAsDelived)
+ public void setConsiderLastMessageAsDelivered(final boolean isLastMessageAsDelived)
{
- this.isLastMessageAsDelived = isLastMessageAsDelived;
+ this.considerLastMessageAsDelivered = isLastMessageAsDelived;
}
/* (non-Javadoc)
@@ -85,13 +86,13 @@
@Override
public void encodeBody(final MessagingBuffer buffer)
{
- buffer.putBoolean(isLastMessageAsDelived);
+ buffer.putBoolean(considerLastMessageAsDelivered);
}
@Override
public void decodeBody(final MessagingBuffer buffer)
{
- this.isLastMessageAsDelived = buffer.getBoolean();
+ this.considerLastMessageAsDelivered = buffer.getBoolean();
}
// Static --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -205,43 +205,6 @@
public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
{
this.future = future;
- }
-
- private String replaceWildcardChars(final String str)
- {
- return str.replace('.', '-');
- }
+ }
- private SimpleString generateConnectorString(final TransportConfiguration config) throws Exception
- {
- StringBuilder str = new StringBuilder(replaceWildcardChars(config.getFactoryClassName()));
-
- if (config.getParams() != null)
- {
- if (!config.getParams().isEmpty())
- {
- str.append("?");
- }
-
- boolean first = true;
- for (Map.Entry<String, Object> entry : config.getParams().entrySet())
- {
- if (!first)
- {
- str.append("&");
- }
- String encodedKey = replaceWildcardChars(entry.getKey());
-
- String val = entry.getValue().toString();
- String encodedVal = replaceWildcardChars(val);
-
- str.append(encodedKey).append('=').append(encodedVal);
-
- first = false;
- }
- }
-
- return new SimpleString(str.toString());
- }
-
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -397,8 +397,6 @@
// This is the last thing done at the start, after everything else is up and running
pagingManager.startGlobalDepage();
- log.info("Started messaging server");
-
started = true;
}
@@ -918,7 +916,7 @@
throw new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
"client not compatible with version: " + version.getFullVersion());
}
-
+
// Is this comment relevant any more ?
// Authenticate. Successful autentication will place a new SubjectContext
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -104,6 +104,7 @@
request.isPreAcknowledge(),
request.isXA(),
request.getWindowSize());
+
break;
}
case REPLICATE_CREATESESSION:
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -315,7 +315,6 @@
public LinkedList<MessageReference> cancelRefs(final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
{
-
boolean performACK = lastConsumedAsDelivered;
LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
@@ -327,11 +326,13 @@
if (performACK)
{
acknowledge(false, tx, ref.getMessage().getMessageID());
+
performACK = false;
}
else
{
ref.decrementDeliveryCount();
+
refs.add(ref);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -139,7 +139,7 @@
private final boolean autoCommitAcks;
private final boolean preAcknowledge;
-
+
private final boolean updateDeliveries;
private volatile RemotingConnection remotingConnection;
@@ -175,11 +175,11 @@
private final SimpleString managementAddress;
private final QueueFactory queueFactory;
-
+
private final SimpleString nodeID;
// The current currentLargeMessage being processed
- // In case of replication, currentLargeMessage should only be accessed within the replication callbacks
+ // In case of replication, currentLargeMessage should only be accessed within the replication callbacks
private volatile LargeServerMessage currentLargeMessage;
// The current destination used for sending LargeMessages
@@ -242,7 +242,7 @@
{
tx = new TransactionImpl(storageManager);
}
-
+
this.updateDeliveries = updateDeliveries;
this.channel = channel;
@@ -256,7 +256,7 @@
this.managementAddress = managementAddress;
this.queueFactory = queueFactory;
-
+
this.nodeID = server.getNodeID();
}
@@ -399,9 +399,14 @@
" queueName = " +
packet.getQueueName());
}
- doHandleCreateQueue(packet);
-
- lock.unlock();
+ try
+ {
+ doHandleCreateQueue(packet);
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
});
}
@@ -444,9 +449,14 @@
{
trace("(Replication) DeleteQueue queueName = " + packet.getQueueName());
}
- doHandleDeleteQueue(packet);
-
- lock.unlock();
+ try
+ {
+ doHandleDeleteQueue(packet);
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
});
}
@@ -1185,9 +1195,14 @@
{
public void run()
{
- doSend(packet);
-
- lock.afterSend();
+ try
+ {
+ doSend(packet);
+ }
+ finally
+ {
+ lock.afterSend();
+ }
}
});
}
@@ -1232,11 +1247,17 @@
{
trace("(Replication) Sending LasChunk MessageID = " + currentLargeMessage.getMessageID());
}
- doSendContinuations(packet);
- if (lock != null)
+ try
{
- lock.afterSend();
+ doSendContinuations(packet);
}
+ finally
+ {
+ if (lock != null)
+ {
+ lock.afterSend();
+ }
+ }
}
});
}
@@ -1269,7 +1290,7 @@
{
this.setStarted(false);
}
-
+
remotingConnection.removeFailureListener(this);
channel.transferConnection(newConnection);
@@ -1287,7 +1308,7 @@
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
- // Destroy the old connection
+ // Destroy the old connection
remotingConnection.destroy();
remotingConnection = newConnection;
@@ -1302,7 +1323,7 @@
{
this.setStarted(true);
}
-
+
return serverLastReceivedCommandID;
}
@@ -1332,9 +1353,8 @@
}
}
- // We call handleClose() since we need to replicate the close too, if there is a backup
handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
-
+
log.info("Cleared up resources for session " + name);
}
catch (Throwable t)
@@ -1385,14 +1405,14 @@
}
Queue theQueue;
-
+
if (browseOnly)
{
// We consume a copy of the queue - TODO - this is a temporary measure
// and will disappear once we can provide a proper iterator on the queue
theQueue = queueFactory.createQueue(-1, binding.getAddress(), name, filter, false, true);
-
+
// There's no need for any special locking since the list method is synchronized
List<MessageReference> refs = ((Queue)binding.getBindable()).list(filter);
@@ -1400,16 +1420,16 @@
{
theQueue.addLast(ref);
}
-
+
binding = new LocalQueueBinding(binding.getAddress(), theQueue, nodeID);
}
else
- {
+ {
theQueue = (Queue)binding.getBindable();
}
ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
- this,
+ this,
(QueueBinding)binding,
filter,
started,
@@ -1423,26 +1443,26 @@
managementService);
consumers.put(consumer.getID(), consumer);
-
+
if (!browseOnly)
{
TypedProperties props = new TypedProperties();
-
+
props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
-
+
props.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
-
+
props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
-
+
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
-
+
if (filterString != null)
{
props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
}
-
+
Notification notification = new Notification(CONSUMER_CREATED, props);
-
+
managementService.sendNotification(notification);
}
@@ -1505,16 +1525,17 @@
final Queue queue = queueFactory.createQueue(-1, address, name, filter, durable, temporary);
- //The unique name is given by the concatenation of the node id and the queue name - this is because it must be unique *across the entire cluster*
+ // The unique name is given by the concatenation of the node id and the queue name - this is because it must be
+ // unique *across the entire cluster*
binding = new LocalQueueBinding(address, queue, nodeID);
if (durable)
{
- storageManager.addQueueBinding(binding);
+ storageManager.addQueueBinding(binding);
}
-
+
postOffice.addBinding(binding);
-
+
if (temporary)
{
// Temporary queue in core simply means the queue will be deleted if
@@ -1813,7 +1834,7 @@
try
{
- rollback(packet.isLastMessageAsDelived());
+ rollback(packet.isConsiderLastMessageAsDelivered());
response = new NullResponseMessage();
}
@@ -2476,7 +2497,7 @@
}
channel.confirm(packet);
-
+
channel.flushConfirmations();
channel.send(response);
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -76,7 +76,7 @@
{
return;
}
-
+
SslHandler sslHandler = (SslHandler)channel.getPipeline().get("ssl");
if (sslHandler != null)
{
Modified: trunk/src/main/org/jboss/messaging/util/SimpleString.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/SimpleString.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/src/main/org/jboss/messaging/util/SimpleString.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -239,13 +239,13 @@
{
for (int i = 0; i < data.length; i++)
{
- hash = 31 * hash + data[i];
+ hash = (hash << 5) - hash + data[i]; // (hash << 5) - hash is same as hash * 31
}
}
return hash;
}
-
+
public SimpleString[] split(char delim)
{
if (!contains(delim))
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -123,15 +123,15 @@
final int consumerCount,
final boolean local) throws Exception
{
-// log.info("waiting for bindings on node " + node +
-// " address " +
-// address +
-// " count " +
-// count +
-// " consumerCount " +
-// consumerCount +
-// " local " +
-// local);
+ log.info("waiting for bindings on node " + node +
+ " address " +
+ address +
+ " count " +
+ count +
+ " consumerCount " +
+ consumerCount +
+ " local " +
+ local);
MessagingService service = this.services[node];
if (service == null)
@@ -163,7 +163,7 @@
}
}
- //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+ log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
{
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -0,0 +1,184 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.distribution;
+
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A ClusteredRequestResponseTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 3 Feb 2009 09:10:43
+ *
+ *
+ */
+public class ClusteredRequestResponseTest extends ClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(ClusteredRequestResponseTest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServers();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServers();
+
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+
+ public void testRequestResponse() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress1", "queueA", null, false);
+ createQueue(4, "queues.testaddress2", "queueB", null, false);
+
+ addConsumer(0, 0, "queueA", null);
+ addConsumer(4, 4, "queueB", null);
+
+ waitForBindings(0, "queues.testaddress1", 1, 1, true);
+ waitForBindings(4, "queues.testaddress2", 1, 1, true);
+
+ waitForBindings(1, "queues.testaddress1", 1, 1, false);
+ waitForBindings(2, "queues.testaddress1", 1, 1, false);
+ waitForBindings(3, "queues.testaddress1", 1, 1, false);
+ waitForBindings(4, "queues.testaddress1", 1, 1, false);
+
+ waitForBindings(0, "queues.testaddress2", 1, 1, false);
+ waitForBindings(1, "queues.testaddress2", 1, 1, false);
+ waitForBindings(2, "queues.testaddress2", 1, 1, false);
+ waitForBindings(3, "queues.testaddress2", 1, 1, false);
+
+ send(0, "queues.testaddress2", 10, false, null);
+
+ verifyReceiveAll(10, 4);
+
+ send(4, "queues.testaddress1", 10, false, null);
+
+ verifyReceiveAll(10, 0);
+ }
+
+ /*
+ * Don't wait for the response queue bindings to get to the other side
+ */
+ public void testRequestResponseNoWaitForBindings() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress1", "queueA", null, false);
+ createQueue(4, "queues.testaddress2", "queueB", null, false);
+
+ addConsumer(0, 0, "queueA", null);
+ addConsumer(4, 4, "queueB", null);
+
+ waitForBindings(4, "queues.testaddress2", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress2", 1, 1, false);
+ waitForBindings(1, "queues.testaddress2", 1, 1, false);
+ waitForBindings(2, "queues.testaddress2", 1, 1, false);
+ waitForBindings(3, "queues.testaddress2", 1, 1, false);
+
+ send(0, "queues.testaddress2", 10, false, null);
+
+ verifyReceiveAll(10, 4);
+
+ send(4, "queues.testaddress1", 10, false, null);
+
+ verifyReceiveAll(10, 0);
+ }
+
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4);
+
+ setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4);
+
+ setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4);
+
+ setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4);
+
+ setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3);
+ }
+
+ protected void setupServers() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+ setupServer(3, isFileStorage(), isNetty());
+ setupServer(4, isFileStorage(), isNetty());
+ }
+
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ // We stop the cluster connections first since this makes server shutdown quicker
+ stopClusterConnections(0, 1, 2, 3, 4);
+
+ stopServers(0, 1, 2, 3, 4);
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -83,7 +83,7 @@
protected final Map<String, Object> backupParams = new HashMap<String, Object>();
protected Timer timer;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -973,6 +973,7 @@
protected void doTestI(final ClientSessionFactory sf, final int threadNum) throws Exception
{
+ log.info("in testi");
ClientSession sessCreate = sf.createSession(false, true, true);
sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
@@ -1005,6 +1006,8 @@
sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
sessCreate.close();
+
+ log.info("completed testi");
}
protected void doTestJ(final ClientSessionFactory sf, final int threadNum) throws Exception
@@ -1359,11 +1362,10 @@
}
}
- log.info("completed*******");
+ log.info("completed loop");
runnable.checkFail();
- log.info("super completed");
}
while (!failer.isExecuted());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-02-19 17:36:47 UTC (rev 5904)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java 2009-02-19 19:09:40 UTC (rev 5905)
@@ -108,6 +108,71 @@
}
+ public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
+ {
+ final InetAddress groupAddress = InetAddress.getByName(address1);
+ final int groupPort = 6745;
+ final int timeout = 500;
+
+ BroadcastGroup bg = new BroadcastGroupImpl(randomString(), randomString(), null, -1, groupAddress, groupPort);
+
+ bg.start();
+
+ TransportConfiguration live1 = generateTC();
+
+ TransportConfiguration backup1 = generateTC();
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(live1,
+ backup1);
+
+ bg.addConnectorPair(connectorPair);
+
+ DiscoveryGroup dg = new DiscoveryGroupImpl(randomString(), randomString(), groupAddress, groupPort, timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ assertTrue(ok);
+
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectors = dg.getConnectors();
+
+ assertNotNull(connectors);
+
+ assertEquals(1, connectors.size());
+
+ Pair<TransportConfiguration, TransportConfiguration> receivedPair = connectors.get(0);
+
+ assertEquals(connectorPair, receivedPair);
+
+ bg.stop();
+
+ dg.stop();
+
+ dg.start();
+
+ bg.start();
+
+ bg.broadcastConnectors();
+
+ ok = dg.waitForBroadcast(1000);
+
+ assertTrue(ok);
+
+ connectors = dg.getConnectors();
+
+ assertNotNull(connectors);
+
+ assertEquals(1, connectors.size());
+
+ receivedPair = connectors.get(0);
+
+ assertEquals(connectorPair, receivedPair);
+
+ }
+
public void testIgnoreTrafficFromOwnNode() throws Exception
{
final InetAddress groupAddress = InetAddress.getByName(address1);
More information about the jboss-cvs-commits
mailing list