[jboss-cvs] JBoss Messaging SVN: r7555 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/client/impl and 13 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 10 06:59:05 EDT 2009
Author: timfox
Date: 2009-07-10 06:59:04 -0400 (Fri, 10 Jul 2009)
New Revision: 7555
Added:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/WeakHashSet.java
Modified:
branches/Branch_MultiThreaded_Replication/src/config/stand-alone/non-clustered/logging.properties
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/ConcurrentHashSet.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/config/stand-alone/non-clustered/logging.properties
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/config/stand-alone/non-clustered/logging.properties 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/config/stand-alone/non-clustered/logging.properties 2009-07-10 10:59:04 UTC (rev 7555)
@@ -20,7 +20,6 @@
java.util.logging.FileHandler.level=INFO
java.util.logging.FileHandler.formatter=org.jboss.messaging.integration.logging.JBMLoggerFormatter
java.util.logging.FileHandler.pattern=../logs/messaging.log
-java.util.logging.FileHandler.limit=10000
# Default global logging level.
# This specifies which kinds of events are logged across
# all loggers. For any given facility this global level
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -676,7 +676,11 @@
closedSent = true;
+ // log.info(System.identityHashCode(this) + " session sending close message");
+
channel.sendBlocking(new SessionCloseMessage());
+
+ //log.info(System.identityHashCode(this) + " session sent close message");
}
catch (Throwable ignore)
{
@@ -721,26 +725,41 @@
try
{
+ log.info(System.identityHashCode(this) + " session handling failover");
+
+ //Prevent any more packets being handled on the old connection
+ channel.getConnection().freeze();
+
channel.transferConnection(backupConnection);
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = backupConnection;
- Packet request = new ReattachSessionMessage(name, channel.getLastConfirmedCommandID());
+ int lid = channel.getLastConfirmedCommandID();
+
+ log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
+
+ Packet request = new ReattachSessionMessage(name, lid);
Channel channel1 = backupConnection.getChannel(1, -1, false);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
+ // log.info(System.identityHashCode(this) + " got response from reattach session");
+
if (!response.isRemoved())
{
+ // log.info(System.identityHashCode(this) + " found session, server last received command id is " + response.getLastConfirmedCommandID());
+
channel.replayCommands(response.getLastConfirmedCommandID());
ok = true;
}
else
{
+ // log.info(System.identityHashCode(this) + " didn't find session, closed sent " + closedSent);
+
if (closedSent)
{
// a session re-attach may fail, if the session close was sent before failover started, hit the server,
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.jboss.messaging.utils.SimpleString;
/**
*
@@ -109,5 +110,14 @@
}
channel.confirm(packet);
+
+ if (packet.getType() == SESS_RECEIVE_MSG)
+ {
+ SessionReceiveMessage message = (SessionReceiveMessage) packet;
+
+ int cnt = (Integer)message.getClientMessage().getProperty(new SimpleString("count"));
+
+ // log.info("confirmed on client " + cnt);
+ }
}
}
\ No newline at end of file
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -22,6 +22,8 @@
package org.jboss.messaging.core.client.impl;
+import java.util.Set;
+
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.RemotingConnection;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -472,7 +472,7 @@
closed = true;
}
-
+
public void returnConnection(final RemotingConnection conn)
{
synchronized (createSessionLock)
@@ -485,7 +485,7 @@
{
refCount--;
}
-
+
pingers.remove(conn.getID());
try
@@ -494,10 +494,9 @@
}
catch (Throwable ignore)
{
- }
+ }
}
}
-
}
// Public
@@ -893,7 +892,7 @@
public RemotingConnection getConnection(final int initialRefCount)
{
RemotingConnection conn;
-
+
if (connections.size() < maxConnections)
{
// Create a new one
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -41,7 +41,6 @@
*
* Valid identifiers that can be used are:
*
-* JBMessageID - the message id of the message
* JBMPriority - the priority of the message
* JBMTimestamp - the timestamp of the message
* JBMDurable - "DURABLE" or "NON_DURABLE"
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -1122,7 +1122,7 @@
public BatchingIDGenerator(final long start, final long checkpointSize)
{
- this.counter = new ReplicationAwareSharedCounter(backup, start);
+ this.counter = new ReplicationAwareSharedCounter(start);
this.checkpointSize = checkpointSize;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -67,7 +67,7 @@
{
this.backup = backup;
- idSequence = new ReplicationAwareSharedCounter(backup, 0);
+ idSequence = new ReplicationAwareSharedCounter(0);
}
public UUID getPersistentID()
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -83,8 +83,6 @@
{
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
- // public static final SimpleString HDR_RESET_QUEUE_DATA = new SimpleString("_JBM_RESET_QUEUE_DATA");
-
private final AddressManager addressManager;
private final QueueFactory queueFactory;
@@ -113,24 +111,6 @@
private final boolean persistIDCache;
- // Each queue has a transient ID which lasts the lifetime of its binding. This is used in clustering when routing
- // messages to particular queues on nodes. We could
- // use the queue name on the node to identify it. But sometimes we need to route to maybe 10s of thousands of queues
- // on a particular node, and all would
- // have to be specified in the message. Specify 10000 ints takes up a lot less space than 10000 arbitrary queue names
- // The drawback of this approach is we only allow up to 2^32 queues in memory at any one time
- // private int transientIDSequence;
-
- // private Set<Integer> transientIDs = new HashSet<Integer>();
-
- // private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
-
- // private final Object notificationLock = new Object();
-
- // private final org.jboss.messaging.utils.ExecutorFactory redistributorExecutorFactory;
- //
- // private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-
private final ReadWriteLock lock;
public PostOfficeImpl(final StorageManager storageManager,
@@ -143,9 +123,6 @@
final boolean backup,
final int idCacheSize,
final boolean persistIDCache)
- // final ExecutorFactory orderedExecutorFactory,
- // HierarchicalRepository<AddressSettings> addressSettingsRepository)
-
{
this.storageManager = storageManager;
@@ -174,10 +151,6 @@
this.persistIDCache = persistIDCache;
- // this.redistributorExecutorFactory = orderedExecutorFactory;
-
- // this.addressSettingsRepository = addressSettingsRepository;
-
lock = new ReplicationAwareReadWriteLock("postoffice", 0, true);
}
@@ -185,8 +158,6 @@
public synchronized void start() throws Exception
{
- // managementService.addNotificationListener(this);
-
if (pagingManager != null)
{
pagingManager.setPostOffice(this);
@@ -205,7 +176,6 @@
public synchronized void stop() throws Exception
{
- // managementService.removeNotificationListener(this);
if (reaper != null)
{
@@ -216,10 +186,6 @@
addressManager.clear();
- // queueInfos.clear();
-
- // transientIDs.clear();
-
started = false;
}
@@ -228,226 +194,7 @@
return started;
}
- // NotificationListener implementation -------------------------------------
-
- // public void onNotification(final Notification notification)
- // {
- // synchronized (notificationLock)
- // {
- // NotificationType type = notification.getType();
- //
- // switch (type)
- // {
- // case BINDING_ADDED:
- // {
- // TypedProperties props = notification.getProperties();
- //
- // Integer bindingType = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_TYPE);
- //
- // if (bindingType == null)
- // {
- // throw new IllegalArgumentException("Binding type not specified");
- // }
- //
- // if (bindingType == BindingType.DIVERT_INDEX)
- // {
- // // We don't propagate diverts
- // return;
- // }
- //
- // SimpleString routingName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
- //
- // SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- //
- // SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
- //
- // Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
- //
- // SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
- //
- // Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
- //
- // QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, transientID, distance);
- //
- // queueInfos.put(clusterName, info);
- //
- // break;
- // }
- // case BINDING_REMOVED:
- // {
- // TypedProperties props = notification.getProperties();
- //
- // SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- //
- // if (clusterName == null)
- // {
- // throw new IllegalStateException("No cluster name");
- // }
- //
- // QueueInfo info = queueInfos.remove(clusterName);
- //
- // if (info == null)
- // {
- // throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
- // }
- //
- // break;
- // }
- // case CONSUMER_CREATED:
- // {
- // TypedProperties props = notification.getProperties();
- //
- // SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- //
- // if (clusterName == null)
- // {
- // throw new IllegalStateException("No cluster name");
- // }
- //
- // SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
- //
- // QueueInfo info = queueInfos.get(clusterName);
- //
- // if (info == null)
- // {
- // throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
- // }
- //
- // info.incrementConsumers();
- //
- // if (filterString != null)
- // {
- // List<SimpleString> filterStrings = info.getFilterStrings();
- //
- // if (filterStrings == null)
- // {
- // filterStrings = new ArrayList<SimpleString>();
- //
- // info.setFilterStrings(filterStrings);
- // }
- //
- // filterStrings.add(filterString);
- // }
- //
- // Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
- //
- // if (distance == null)
- // {
- // throw new IllegalStateException("No distance");
- // }
- //
- // if (distance > 0)
- // {
- // SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
- //
- // if (queueName == null)
- // {
- // throw new IllegalStateException("No queue name");
- // }
- //
- // Binding binding = getBinding(queueName);
- //
- // if (binding != null)
- // {
- // // We have a local queue
- // Queue queue = (Queue)binding.getBindable();
- //
- // AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress()
- // .toString());
- //
- // long redistributionDelay = addressSettings.getRedistributionDelay();
- //
- // if (redistributionDelay != -1)
- // {
- // queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
- // }
- // }
- // }
- //
- // break;
- // }
- // case CONSUMER_CLOSED:
- // {
- // TypedProperties props = notification.getProperties();
- //
- // SimpleString clusterName = (SimpleString)props.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- //
- // if (clusterName == null)
- // {
- // throw new IllegalStateException("No distance");
- // }
- //
- // SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
- //
- // QueueInfo info = queueInfos.get(clusterName);
- //
- // if (info == null)
- // {
- // throw new IllegalStateException("Cannot find queue info for queue " + clusterName);
- // }
- //
- // info.decrementConsumers();
- //
- // if (filterString != null)
- // {
- // List<SimpleString> filterStrings = info.getFilterStrings();
- //
- // filterStrings.remove(filterString);
- // }
- //
- // if (info.getNumberOfConsumers() == 0)
- // {
- // Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
- //
- // if (distance == null)
- // {
- // throw new IllegalStateException("No cluster name");
- // }
- //
- // if (distance == 0)
- // {
- // SimpleString queueName = (SimpleString)props.getProperty(ManagementHelper.HDR_ROUTING_NAME);
- //
- // if (queueName == null)
- // {
- // throw new IllegalStateException("No queue name");
- // }
- //
- // Binding binding = getBinding(queueName);
- //
- // if (binding == null)
- // {
- // throw new IllegalStateException("No queue " + queueName);
- // }
- //
- // Queue queue = (Queue)binding.getBindable();
- //
- // AddressSettings addressSettings = addressSettingsRepository.getMatch(binding.getAddress()
- // .toString());
- //
- // long redistributionDelay = addressSettings.getRedistributionDelay();
- //
- // if (redistributionDelay != -1)
- // {
- // queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
- // }
- // }
- // }
- //
- // break;
- // }
- // case SECURITY_AUTHENTICATION_VIOLATION:
- // case SECURITY_PERMISSION_VIOLATION:
- // break;
- // default:
- // {
- // throw new IllegalArgumentException("Invalid type " + type);
- // }
- //
- // }
- // }
- // }
-
+
// PostOffice implementation -----------------------------------------------
// TODO - needs to be locked to prevent happening concurrently with activate().
@@ -458,7 +205,9 @@
public void addBinding(final Binding binding) throws Exception
{
boolean addressExists;
+
lock.writeLock().lock();
+
try
{
addressExists = addressManager.getBindingsForRoutingAddress(binding.getAddress()) != null;
@@ -804,87 +553,7 @@
return cache;
}
- // public Object getNotificationLock()
- // {
- // return notificationLock;
- // }
-
- // public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
- // {
- // // We send direct to the queue so we can send it to the same queue that is bound to the notifications adress -
- // // this is crucial for ensuring
- // // that queue infos and notifications are received in a contiguous consistent stream
- // Binding binding = addressManager.getBinding(queueName);
- //
- // if (binding == null)
- // {
- // throw new IllegalStateException("Cannot find queue " + queueName);
- // }
- //
- // Queue queue = (Queue)binding.getBindable();
- //
- // // Need to lock to make sure all queue info and notifications are in the correct order with no gaps
- // synchronized (notificationLock)
- // {
- // // First send a reset message
- //
- // ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
- // message.setBody(ChannelBuffers.EMPTY_BUFFER);
- // message.setDestination(queueName);
- // message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
- // queue.preroute(message, null);
- // queue.route(message, null);
- //
- // for (QueueInfo info : queueInfos.values())
- // {
- // if (info.getAddress().startsWith(address))
- // {
- // message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
- //
- // message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
- // message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
- // message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
- // message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
- // message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
- // message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- //
- // routeDirect(queue, message);
- //
- // int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
- //
- // for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
- // {
- // message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
- //
- // message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
- // message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
- // message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
- // message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- //
- // routeDirect(queue, message);
- // }
- //
- // if (info.getFilterStrings() != null)
- // {
- // for (SimpleString filterString : info.getFilterStrings())
- // {
- // message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
- //
- // message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
- // message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
- // message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
- // message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
- // message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- //
- // routeDirect(queue, message);
- // }
- // }
- // }
- // }
- // }
- //
- // }
-
+
// Private -----------------------------------------------------------------
private synchronized void startExpiryScanner()
@@ -902,56 +571,7 @@
// }
}
- // private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
- // {
- // if (queue.getFilter() == null || queue.getFilter().match(message))
- // {
- // queue.preroute(message, null);
- // queue.route(message, null);
- // }
- // }
- //
- // private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
- // {
- // ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
- // message.setBody(ChannelBuffers.EMPTY_BUFFER);
- //
- // message.setDestination(queueName);
- //
- // String uid = UUIDGenerator.getInstance().generateStringUUID();
- //
- // message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));
- // message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
- //
- // message.putStringProperty(new SimpleString("foobar"), new SimpleString(uid));
- //
- // return message;
- // }
-
-// private int generateTransientID()
-// {
-// int start = transientIDSequence;
-// do
-// {
-// int id = transientIDSequence++;
-//
-// if (!transientIDs.contains(id))
-// {
-// transientIDs.add(id);
-//
-// return id;
-// }
-// }
-// while (transientIDSequence != start);
-//
-// throw new IllegalStateException("Run out of queue ids!");
-// }
-//
-// private void releaseTransientID(final int id)
-// {
-// transientIDs.remove(id);
-// }
-
+
private final PageMessageOperation getPageOperation(final Transaction tx)
{
// you could have races on the case two sessions using the same XID
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -75,7 +75,5 @@
long getBlockingCallTimeout();
- Object getTransferLock();
-
RemotingConnection getReplicatingConnection();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -42,7 +42,10 @@
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.utils.SimpleString;
/**
* A ChannelImpl
@@ -285,6 +288,10 @@
if (resendCache != null && packet.isRequiresConfirmations())
{
+// if (packet.getType() == PacketImpl.SESS_CLOSE)
+// {
+// log.info(System.identityHashCode(this) + " added session close to resend cache");
+// }
resendCache.add(packet);
}
@@ -372,38 +379,53 @@
public void transferConnection(final RemotingConnection newConnection)
{
- // Needs to synchronize on the connection to make sure no packets from
- // the old connection get processed after transfer has occurred
- synchronized (connection.getTransferLock())
- {
- connection.removeChannel(id);
+ connection.removeChannel(id);
- // if (replicatingChannel != null)
- // {
- // // If we're reconnecting to a live node which is replicated then there will be a replicating channel
- // // too. We need to then make sure that all replication responses come back since packets aren't
- // // considered confirmed until response comes back and is processed. Otherwise responses to previous
- // // message sends could come back after reconnection resulting in clients resending same message
- // // since it wasn't confirmed yet.
- // replicatingChannel.waitForAllReplicationResponse();
- // }
+ // if (replicatingChannel != null)
+ // {
+ // // If we're reconnecting to a live node which is replicated then there will be a replicating channel
+ // // too. We need to then make sure that all replication responses come back since packets aren't
+ // // considered confirmed until response comes back and is processed. Otherwise responses to previous
+ // // message sends could come back after reconnection resulting in clients resending same message
+ // // since it wasn't confirmed yet.
+ // replicatingChannel.waitForAllReplicationResponse();
+ // }
- // And switch it
+ // And switch it
- final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+ final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
- rnewConnection.putChannel(id, this);
+ rnewConnection.putChannel(id, this);
- connection = rnewConnection;
- }
+ connection = rnewConnection;
}
public void replayCommands(final int otherLastConfirmedCommandID)
{
- clearUpTo(otherLastConfirmedCommandID);
+ // log.info("replaying, other last command id " + otherLastConfirmedCommandID);
+
+ if (otherLastConfirmedCommandID != -1)
+ {
+ clearUpTo(otherLastConfirmedCommandID);
+ }
+ //log.info("Resend cache size is " + resendCache.size());
+
for (final Packet packet : resendCache)
{
+ //log.info("Replaying command " + packet);
+
+// if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+// {
+// SessionReceiveMessage sm = (SessionReceiveMessage)packet;
+//
+// ServerMessage msg = sm.getServerMessage();
+//
+// int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+//
+// log.info("resending message " + cnt);
+// }
+
doWrite(packet);
}
}
@@ -435,16 +457,29 @@
public void flushConfirmations()
{
- if (receivedBytes != 0 && connection.isActive())
+ int lcid = this.lastConfirmedCommandID;
+
+ if (receivedBytes != 0 && connection.isActive() && lcid != -1)
{
receivedBytes = 0;
- final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
+ //log.info("Sending packets confirmed from flush");
+
+ sendConfirmation(lcid);
+ }
+ }
+
+ private void sendConfirmation(final int lastConfirmedID)
+ {
+ final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedID);
- confirmed.setChannelID(id);
+ confirmed.setChannelID(id);
- doWrite(confirmed);
- }
+ //We need to queue packet confirmations too
+ if (!queuedWriteManager.tryQueue(confirmed))
+ {
+ doWrite(confirmed);
+ }
}
public void confirm(final Packet packet)
@@ -452,6 +487,8 @@
if (resendCache != null && packet.isRequiresConfirmations())
{
lastConfirmedCommandID++;
+
+ //log.info("last confirmed id is now " + lastConfirmedCommandID);
receivedBytes += packet.getPacketSize();
@@ -460,12 +497,8 @@
receivedBytes = 0;
if (connection.isActive())
- {
- final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedCommandID);
-
- confirmed.setChannelID(id);
-
- doWrite(confirmed);
+ {
+ sendConfirmation(lastConfirmedCommandID);
}
}
}
@@ -526,16 +559,17 @@
private void clearUpTo(final int lastConfirmedCommandID)
{
- final int numberToClear = 1 + lastConfirmedCommandID - firstStoredCommandID;
-
- if (numberToClear < 0)
+ //log.info(System.identityHashCode(this) + " clear up to " + lastConfirmedCommandID + " on client " + connection.isClient() + " fscid " + this.firstStoredCommandID);
+
+ if (lastConfirmedCommandID < firstStoredCommandID)
{
- throw new IllegalArgumentException("Invalid lastConfirmedCommandID: " + lastConfirmedCommandID +
- " firstStoredCommandID " +
- firstStoredCommandID +
- " client " +
- connection.isClient());
+ //This can legitimately happen, if the flushConfirmations() is called from the other side which causes a packet confirmation to be sent, after that
+ //another packet confirmation can come or on failover when the lastConfirmedCommandID is retrieved from the other side there may be overlap
+ //because of the previously flush. In this case we can safely ignore it.
+ return;
}
+
+ final int numberToClear = 1 + lastConfirmedCommandID - firstStoredCommandID;
int sizeToFree = 0;
@@ -551,6 +585,8 @@
" first stored command id " +
firstStoredCommandID);
}
+
+ // log.info("cleared packet " + packet);
if (packet.getType() != PACKETS_CONFIRMED)
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.remoting.spi.Connector;
import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.replication.impl.JBMThread;
import org.jboss.messaging.utils.SimpleIDGenerator;
/**
@@ -111,9 +112,9 @@
private boolean idGeneratorSynced = false;
- private final Object transferLock = new Object();
+ // private final Object freezeLock = new Object();
- private boolean frozen;
+ private volatile boolean frozen;
private final Object failLock = new Object();
@@ -331,11 +332,6 @@
return idGenerator.getCurrentID();
}
- public Object getTransferLock()
- {
- return transferLock;
- }
-
public boolean isActive()
{
return active;
@@ -364,11 +360,15 @@
// Buffer Handler implementation
// ----------------------------------------------------
+ private volatile Thread currentThread;
+
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
- synchronized (transferLock)
+ currentThread = Thread.currentThread();
+
+ try
{
if (!frozen)
{
@@ -402,10 +402,18 @@
}
else
{
- log.info("cannot find handler for packet " + packet + " client " + this.isClient() + " active " + this.isActive());
+ log.info("cannot find handler for packet " + packet +
+ " client " +
+ this.isClient() +
+ " active " +
+ this.isActive());
}
}
}
+ finally
+ {
+ currentThread = null;
+ }
}
public void activate()
@@ -413,14 +421,86 @@
active = true;
}
+ private static final long FREEZE_TIMEOUT = 5000;
+
public void freeze()
{
- // Prevent any more packets being handled on this connection
+ if (frozen)
+ {
+ return;
+ }
+
+ frozen = true;
- synchronized (transferLock)
+ if (currentThread != null)
{
- frozen = true;
+ // long count = 0;
+ long start = System.currentTimeMillis();
+ while (true)
+ {
+ Thread thread = currentThread;
+
+ if (thread != null)
+ {
+ if (thread instanceof JBMThread)
+ {
+ JBMThread jthread = (JBMThread)thread;
+
+ jthread.setLastLockNonStrict();
+ }
+
+ log.info("waiting for thread to complete");
+
+ Thread.yield();
+
+ if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
+ {
+ throw new IllegalStateException("Timed out waiting for thread to complete");
+ }
+
+ // count++;
+ //
+ // if (count > 1)
+ // {
+ // Exception e = new Exception();
+ // e.setStackTrace(thread.getStackTrace());
+ // log.info("Waiting for this thread", e);
+ // }
+ }
+ else
+ {
+ break;
+ }
+ }
+
}
+ //
+ //
+ //
+ // //TODO check this logic
+ //
+ // log.info("freezing connection");
+ //
+ // JBMThread thread = null;
+ //
+ // if (currentThread != null)
+ // {
+ // thread = (JBMThread)currentThread;
+ //
+ // thread.jbmInterrupt();
+ // }
+ //
+ // // Prevent any more packets being handled on this connection
+ //
+ // synchronized (freezeLock)
+ // {
+ // frozen = true;
+ // }
+ //
+ // if (thread != null)
+ // {
+ // thread.jbmResetInterrupt();
+ // }
}
// Package protected
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -283,25 +283,28 @@
}
RemotingConnection replicatingConnection = server.getNonPooledReplicatingConnection();
-
- RemotingConnection rc = new RemotingConnectionImpl(connection, replicatingConnection, interceptors, !config.isBackup());
-
+
+ RemotingConnection rc = new RemotingConnectionImpl(connection,
+ replicatingConnection,
+ interceptors,
+ !config.isBackup());
+
Channel channel1 = rc.getChannel(1, -1, false);
-
+
final Replicator replicator;
-
+
if (replicatingConnection != null)
- {
+ {
Channel replicatingChannel = replicatingConnection.getChannel(1, -1, false);
-
+
replicator = new ReplicatorImpl(replicatingChannel);
-
+
replicatingChannel.setHandler(new ChannelHandler()
{
public void handlePacket(final Packet packet)
{
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
- {
+ {
replicator.replicationResponseReceived();
}
else
@@ -315,7 +318,7 @@
{
replicator = null;
}
-
+
ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc, replicator);
channel1.setHandler(handler);
@@ -339,7 +342,7 @@
public void connectionDestroyed(final Object connectionID)
{
RemotingConnection conn = connections.get(connectionID);
-
+
if (conn != null)
{
// if the connection has no failure listeners it means the sesssions etc were already closed so this is a clean
@@ -392,9 +395,7 @@
// Private -------------------------------------------------------
- private void setupPinger(final RemotingConnection conn,
- final long clientFailureCheckPeriod,
- final long connectionTTL)
+ private void setupPinger(final RemotingConnection conn, final long clientFailureCheckPeriod, final long connectionTTL)
{
if ((connectionTTL <= 0 || clientFailureCheckPeriod <= 0) && connectionTTL != -1 &&
clientFailureCheckPeriod != -1)
@@ -411,7 +412,11 @@
long pingPeriod = clientFailureCheckPeriod == -1 ? -1 : clientFailureCheckPeriod / 2;
- Pinger pingRunnable = new Pinger(conn, connectionTTLToUse, null, new FailedConnectionAction(conn), System.currentTimeMillis());
+ Pinger pingRunnable = new Pinger(conn,
+ connectionTTLToUse,
+ null,
+ new FailedConnectionAction(conn),
+ System.currentTimeMillis());
Future<?> pingFuture = scheduledThreadPool.scheduleAtFixedRate(pingRunnable, 0, pingPeriod, TimeUnit.MILLISECONDS);
@@ -423,19 +428,22 @@
private RemotingConnection closeConnection(final Object connectionID)
{
RemotingConnection connection = connections.remove(connectionID);
-
+
Pinger pinger = pingers.remove(connectionID);
if (pinger != null)
{
pinger.close();
}
-
- RemotingConnection replConnection = connection.getReplicatingConnection();
-
- if (replConnection != null)
+
+ if (connection != null)
{
- server.returnNonPooledReplicatingConnection(replConnection);
+ RemotingConnection replConnection = connection.getReplicatingConnection();
+
+ if (replConnection != null)
+ {
+ server.returnNonPooledReplicatingConnection(replConnection);
+ }
}
return connection;
@@ -452,10 +460,10 @@
private InitialPingTimeout(final RemotingConnection conn)
{
this.conn = conn;
-
+
conn.getChannel(0, -1, false).setHandler(this);
}
-
+
public synchronized void handlePacket(final Packet packet)
{
final byte type = packet.getType();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -111,8 +111,6 @@
SimpleString getNodeID();
- // Channel getReplicatingChannel();
-
RemotingConnection getNonPooledReplicatingConnection();
void returnNonPooledReplicatingConnection(RemotingConnection connection);
@@ -136,4 +134,6 @@
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception;
+
+ boolean registerBackupConnection(RemotingConnection connection);
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -82,7 +82,6 @@
import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.ConnectionLifeCycleListener;
-import org.jboss.messaging.core.remoting.spi.ConnectorFactory;
import org.jboss.messaging.core.security.CheckType;
import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.security.Role;
@@ -118,6 +117,7 @@
import org.jboss.messaging.utils.UUID;
import org.jboss.messaging.utils.UUIDGenerator;
import org.jboss.messaging.utils.VersionLoader;
+import org.jboss.messaging.utils.WeakHashSet;
/**
* The messaging server implementation
@@ -486,7 +486,7 @@
{
return clusterManager;
}
-
+
public ExecutorService getThreadPool()
{
return this.threadPool;
@@ -530,24 +530,120 @@
final int sendWindowSize,
final boolean direct) throws Exception
{
+ if (version.getIncrementingVersion() != incrementingVersion)
+ {
+ // For now we need exact compatibility
+
+ throw new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
+ "client version " + incrementingVersion +
+ " not compatible with version: " +
+ version.getFullVersion());
+ }
+
+ if (!direct)
+ {
+ if (!registerBackupConnection(connection))
+ {
+ return null;
+ }
+ }
+
+ // Is this comment relevant any more ?
+
+ // Authenticate. Successful autentication will place a new SubjectContext
+ // on thread local,
+ // which will be used in the authorization process. However, we need to
+ // make sure we clean
+ // up thread local immediately after we used the information, otherwise
+ // some other people
+ // security my be screwed up, on account of thread local security stack
+ // being corrupted.
+
+ securityStore.authenticate(username, password);
+
if (direct)
{
checkActivate(connection);
}
- return doCreateSession(name,
- channelID,
- username,
- password,
- minLargeMessageSize,
- incrementingVersion,
- connection,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- xa,
- sendWindowSize,
- false);
+ ServerSession currentSession = sessions.remove(name);
+
+ if (currentSession != null)
+ {
+ // This session may well be on a different connection and different channel id, so we must get rid
+ // of it and create another
+
+ // TODO - is this true any more??
+ currentSession.getChannel().close();
+ }
+
+ Channel channel = connection.getChannel(channelID, sendWindowSize, false);
+
+ RemotingConnection replicatingConnection = connection.getReplicatingConnection();
+
+ final Replicator replicator;
+
+ Channel replicatingChannel;
+
+ if (replicatingConnection != null)
+ {
+ replicatingChannel = replicatingConnection.getChannel(channelID, -1, false);
+
+ replicator = new ReplicatorImpl(replicatingChannel);
+
+ replicatingChannel.setHandler(new ChannelHandler()
+ {
+ public void handlePacket(final Packet packet)
+ {
+ if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
+ {
+ replicator.replicationResponseReceived();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Invalid packet " + packet.getType());
+ }
+ }
+ });
+ }
+ else
+ {
+ replicator = null;
+
+ replicatingChannel = null;
+ }
+
+ ServerSessionImpl session = new ServerSessionImpl(name,
+ username,
+ password,
+ minLargeMessageSize,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ configuration.isPersistDeliveryCountBeforeDelivery(),
+ xa,
+ connection,
+ storageManager,
+ postOffice,
+ resourceManager,
+ securityStore,
+ executorFactory,
+ channel,
+ replicatingChannel,
+ managementService,
+ queueFactory,
+ this,
+ configuration.getManagementAddress());
+
+ sessions.put(name, session);
+
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, replicator, configuration);
+
+ session.setHandler(handler);
+
+ channel.setHandler(handler);
+
+ return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
public void removeSession(final String name) throws Exception
@@ -756,15 +852,15 @@
activateCallbacks.remove(callback);
}
- private ConnectorFactory backupConnectorFactory;
+ // private ConnectorFactory backupConnectorFactory;
+ //
+ // private Map<String, Object> backupConnectorParams;
- private Map<String, Object> backupConnectorParams;
+ private volatile ConnectionManager pooledReplicatingConnectionManager;
- private ConnectionManager pooledReplicatingConnectionManager;
+ private volatile ConnectionManager nonPooledReplicatingConnectionManager;
- private ConnectionManager nonPooledReplicatingConnectionManager;
-
- private void setupBackupConnectorFactory()
+ private void setupConnectionManagers()
{
String backupConnectorName = configuration.getBackupConnectorName();
@@ -778,22 +874,6 @@
}
else
{
-
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clz = loader.loadClass(backupConnector.getFactoryClassName());
- backupConnectorFactory = (ConnectorFactory)clz.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Error instantiating interceptor \"" + backupConnector.getFactoryClassName() +
- "\"",
- e);
- }
-
- backupConnectorParams = backupConnector.getParams();
-
pooledReplicatingConnectionManager = new ConnectionManagerImpl(null, backupConnector, null, false, 10, // TODO
// don't
// hardcode
@@ -826,7 +906,7 @@
private boolean activatedBackup;
- public synchronized RemotingConnection getPooledReplicatingConnection()
+ public RemotingConnection getPooledReplicatingConnection()
{
RemotingConnection conn = null;
@@ -838,30 +918,38 @@
return conn;
}
- public synchronized RemotingConnection getNonPooledReplicatingConnection()
+ public void returnPooledReplicatingConnection(final RemotingConnection conn)
{
+ pooledReplicatingConnectionManager.returnConnection(conn);
+ }
+
+ public RemotingConnection getNonPooledReplicatingConnection()
+ {
RemotingConnection conn = null;
if (nonPooledReplicatingConnectionManager != null)
{
conn = nonPooledReplicatingConnectionManager.getConnection(1);
- if (!activatedBackup)
+ synchronized (this)
{
- // First time we get channel we send a message down it informing the backup of our node id -
- // backup and live must have the same node id
+ if (!activatedBackup)
+ {
+ // First time we get channel we send a message down it informing the backup of our node id -
+ // backup and live must have the same node id
- Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+ Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
- Channel channel1 = conn.getChannel(1, -1, false);
+ Channel channel1 = conn.getChannel(1, -1, false);
- ChannelHandler prevHandler = channel1.getHandler();
+ ChannelHandler prevHandler = channel1.getHandler();
- sendOnReplicatingAndWaitForResponse(packet, channel1);
+ sendOnReplicatingAndWaitForResponse(packet, channel1);
- channel1.setHandler(prevHandler);
+ channel1.setHandler(prevHandler);
- activatedBackup = true;
+ activatedBackup = true;
+ }
}
// TODO execute outstanding results when failure occurs
@@ -870,11 +958,15 @@
return conn;
}
- public synchronized void returnNonPooledReplicatingConnection(final RemotingConnection conn)
+ public void returnNonPooledReplicatingConnection(final RemotingConnection conn)
{
nonPooledReplicatingConnectionManager.returnConnection(conn);
}
+ private Set<RemotingConnection> backupConnections = new WeakHashSet<RemotingConnection>();
+
+
+
private static class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
{
private RemotingConnection conn;
@@ -939,7 +1031,7 @@
synchronized (this)
{
- freezeBackupConnection();
+ freezeBackupConnections();
List<Queue> toActivate = postOffice.activate();
@@ -964,6 +1056,7 @@
queueDeployer.start();
}
}
+ log.info("Backup server is now ACTIVATED");
}
connection.activate();
@@ -978,19 +1071,38 @@
// connection 1 delivery
// connection 1 delivery gets replicated
// can't find message in queue since active was delivered immediately
- private void freezeBackupConnection()
+
+ private boolean frozen;
+
+ private void freezeBackupConnections()
{
- // // Sanity check
- // // All replicated sessions should be on the same connection
- // RemotingConnection replConnection = null;
-
- for (ServerSession session : sessions.values())
+ synchronized (backupConnections)
{
- RemotingConnection rc = session.getChannel().getConnection();
-
- rc.freeze();
+ frozen = true;
+
+ for (RemotingConnection rc : backupConnections)
+ {
+ rc.freeze();
+ }
}
}
+
+ public boolean registerBackupConnection(final RemotingConnection connection)
+ {
+ synchronized (backupConnections)
+ {
+ if (!frozen)
+ {
+ backupConnections.add(connection);
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
private void initialisePart1() throws Exception
{
@@ -1022,7 +1134,7 @@
scheduledPool,
managementConnectorID);
- setupBackupConnectorFactory();
+ setupConnectionManagers();
}
private ClusterQueueStateManager clusterQueueStateManager;
@@ -1465,119 +1577,6 @@
}
}
- private CreateSessionResponseMessage doCreateSession(final String name,
- final long channelID,
- final String username,
- final String password,
- final int minLargeMessageSize,
- final int incrementingVersion,
- final RemotingConnection connection,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge,
- final boolean xa,
- final int sendWindowSize,
- final boolean backup) throws Exception
- {
- if (version.getIncrementingVersion() < incrementingVersion)
- {
- throw new MessagingException(MessagingException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
- "client not compatible with version: " + version.getFullVersion());
- }
-
- // Is this comment relevant any more ?
-
- // Authenticate. Successful autentication will place a new SubjectContext
- // on thread local,
- // which will be used in the authorization process. However, we need to
- // make sure we clean
- // up thread local immediately after we used the information, otherwise
- // some other people
- // security my be screwed up, on account of thread local security stack
- // being corrupted.
-
- securityStore.authenticate(username, password);
-
- ServerSession currentSession = sessions.remove(name);
-
- if (currentSession != null)
- {
- // This session may well be on a different connection and different channel id, so we must get rid
- // of it and create another
-
- // TODO - is this true any more??
- currentSession.getChannel().close();
- }
-
- Channel channel = connection.getChannel(channelID, sendWindowSize, false);
-
- RemotingConnection replicatingConnection = connection.getReplicatingConnection();
-
- final Replicator replicator;
-
- Channel replicatingChannel;
-
- if (replicatingConnection != null)
- {
- replicatingChannel = replicatingConnection.getChannel(channelID, -1, false);
-
- replicator = new ReplicatorImpl(replicatingChannel);
-
- replicatingChannel.setHandler(new ChannelHandler()
- {
- public void handlePacket(final Packet packet)
- {
- if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
- {
- replicator.replicationResponseReceived();
- }
- else
- {
- throw new IllegalArgumentException("Invalid packet " + packet.getType());
- }
- }
- });
- }
- else
- {
- replicator = null;
-
- replicatingChannel = null;
- }
-
- final ServerSessionImpl session = new ServerSessionImpl(name,
- username,
- password,
- minLargeMessageSize,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- configuration.isPersistDeliveryCountBeforeDelivery(),
- xa,
- connection,
- storageManager,
- postOffice,
- resourceManager,
- securityStore,
- executorFactory,
- channel,
- replicatingChannel,
- managementService,
- queueFactory,
- this,
- configuration.getManagementAddress());
-
- sessions.put(name, session);
-
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, replicator, configuration);
-
- session.setHandler(handler);
-
- channel.setHandler(handler);
-
- return new CreateSessionResponseMessage(version.getIncrementingVersion());
- }
-
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -116,6 +116,8 @@
RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
+
+ server.registerBackupConnection(channel.getConnection());
channel.setHandler(new QueueReplicationPacketHandler(msg.getBindingID(), server.getPostOffice(), channel));
@@ -234,7 +236,10 @@
}
}
- channel1.send(response);
+ if (response != null)
+ {
+ channel1.send(response);
+ }
}
private void handleReattachSession(final ReattachSessionMessage request)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -1523,6 +1523,12 @@
}
}
+ //Confirm needs to be done here before response is sent, so the subsequent flush flushes that confirm too, and flush needs
+ //to be send before the response so the send acknowledgements are received on the client before the client session closes
+ //However the actual packets confirmed packet *cannot* be sent before the response is back from the backup otherwise
+ //A session close confirmation could be sent before the backup has knowledge of the close, so on failover the client would
+ //hang forever waiting for the response to session close, so these get queued too
+
channel.confirm(packet);
channel.flushConfirmations();
@@ -1570,6 +1576,8 @@
int serverLastConfirmedCommandID = channel.getLastConfirmedCommandID();
+ //log.info("telling channel to replay commands up to " + lastConfirmedCommandID);
+
channel.replayCommands(lastConfirmedCommandID);
if (wasStarted)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -76,10 +76,12 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicationResponseMessage;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
+import org.jboss.messaging.utils.SimpleString;
/**
* A ServerSessionPacketHandler
@@ -140,6 +142,8 @@
if (config.isBackup())
{
+ //log.info(System.identityHashCode(this) + " inv on backup");
+
JBMThread thread = JBMThread.currentThread();
thread.setReplay(sequences);
@@ -150,7 +154,21 @@
if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
{
- checkConfirm(packet);
+ //checkConfirm(packet);
+
+ channel.confirm(packet);
+
+// if (packet.getType() == PacketImpl.SESS_SEND)
+// {
+// SessionSendMessage sm = (SessionSendMessage)packet;
+//
+// ServerMessage msg = sm.getServerMessage();
+//
+// int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+//
+// log.info("confirmed send " + cnt);
+// }
+
channel.send(new ReplicationResponseMessage());
}
@@ -161,6 +179,8 @@
}
else
{
+ // log.info(System.identityHashCode(this) + " inv on live, repl is " + replicator);
+
if (replicator != null)
{
replicator.execute(this,
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -24,8 +24,8 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.replication.Replicator;
/**
@@ -37,24 +37,23 @@
*/
public class JBMThread extends Thread
{
+ private static final Logger log = Logger.getLogger(JBMThread.class);
+
private static enum ThreadState
{
RECORD, REPLAY, NONE;
}
-// public long getSequence()
-// {
-// return sequence;
-// }
-//
private ThreadState state;
private List<Long> objectSequences;
private int pos;
- private Replicator replicator;
+ private Replicator replicator;
+ // private volatile boolean jbmInterrupted;
+
public static JBMThread currentThread()
{
return (JBMThread)Thread.currentThread();
@@ -151,4 +150,38 @@
{
this.replicator = replicator;
}
+
+// public boolean isJBMInterrupted()
+// {
+// return jbmInterrupted;
+// }
+
+ public void setLastLock(final SequencedLock lock)
+ {
+ this.lastLock = lock;
+ }
+
+ private volatile SequencedLock lastLock;
+
+ public void setLastLockNonStrict()
+ {
+ if (lastLock != null)
+ {
+ lastLock.setStrict(false);
+ }
+ }
+
+// public void jbmInterrupt()
+// {
+// log.info("Interrupting jbm thread");
+//
+// jbmInterrupted = true;
+//
+// interrupt();
+// }
+//
+// public void jbmResetInterrupt()
+// {
+// jbmInterrupted = false;
+// }
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareReadWriteLock.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -173,12 +173,12 @@
// {
// this.acquirers.put(sequence, thread.getStackTrace());
// }
-
- if (!sequencedLock.lock(sequence))
- {
- // dumpLocksWithName(name);
+
+ if (!sequencedLock.lock(sequence, unit.toNanos(time)))
+ {
+ // dumpLocksWithName(name);
}
-
+
addOwner(thread);
return true;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -40,13 +40,9 @@
private final AtomicLong al;
- private final boolean backup;
-
- public ReplicationAwareSharedCounter(final boolean backup, final long initialValue)
+ public ReplicationAwareSharedCounter(final long initialValue)
{
this.al = new AtomicLong(initialValue);
-
- this.backup = backup;
}
public void set(final long l)
@@ -58,15 +54,8 @@
{
JBMThread thread = JBMThread.currentThread();
-// if (thread.isReplay())
-// {
- if (backup)
+ if (thread.isReplay())
{
- if (!thread.isReplay())
- {
- throw new IllegalStateException("Thread should be in replay mode");
- }
-
if (thread.isRecording())
{
throw new IllegalStateException("Thread should not be recording");
@@ -74,6 +63,7 @@
long sequence = thread.getNextSequence();
+ //FIXME - this will be slow - too much contention with many threads
synchronized (this)
{
if (sequence >= al.get())
@@ -82,16 +72,6 @@
}
}
-
-// log.info(System.identityHashCode(this) + " attempting to get sequence " + sequence);
-//
-// while (!al.compareAndSet(sequence, sequence + 1))
-// {
-// Thread.yield();
-// }
-//
-// log.info(System.identityHashCode(this) + " got sequence " + sequence);
-
return sequence;
}
else
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -43,113 +43,78 @@
{
private static final Logger log = Logger.getLogger(SequencedLock.class);
- private final Queue<QueueEntry> waiting;
+ private final Queue<QueueEntry> queue;
private final AtomicLong currentSequence;
private Thread owner;
+ private final AtomicBoolean locked = new AtomicBoolean(false);
+
+ private volatile boolean strictOrder = true;
+
public SequencedLock(final long sequence)
{
- waiting = new PriorityBlockingQueue<QueueEntry>();
+ queue = new PriorityBlockingQueue<QueueEntry>();
- // this.currentSequence = sequence;
this.currentSequence = new AtomicLong(sequence);
}
-
- private synchronized void dump(final long sequence)
+
+ public void setStrict(boolean strict)
{
- log.error(System.identityHashCode(this) + " Timed out trying to get lock, desired " +
- sequence +
- " current " +
- currentSequence);
+ this.strictOrder = strict;
+
+ QueueEntry entry = peekEntry();
- log.info("Current owner is " + owner);
-
- log.info("Dumping entries");
-
- Object[] sorted = waiting.toArray();
-
- Arrays.sort(sorted);
-
- for (Object obj : sorted)
+ if (entry != null)
{
- QueueEntry en = (QueueEntry)obj;
-
- log.info("seq:" + en.sequence + " thread:" + System.identityHashCode(en.thread));
+ LockSupport.unpark(entry.thread);
}
-
- try
- {
- Thread.sleep(1000000);
- }
- catch (Exception e)
- {
- }
}
-
- private void addEntry(final QueueEntry entry)
+
+ //TODO parking with a timeout seems to be a lot slower than parking without timeout
+ public boolean lock(final long sequence, final long timeout)
{
-// synchronized (waiting)
-// {
+ JBMThread currentThread = JBMThread.currentThread();
- waiting.add(entry);
- // }
- }
+ QueueEntry entry = new QueueEntry(sequence, currentThread);
- private QueueEntry peekEntry()
- {
-// synchronized (waiting)
-// {
- QueueEntry entry = waiting.peek();
+ queue.add(entry);
- if (entry != null && entry.sequence == currentSequence.get())
- {
- return entry;
- }
- else
- {
- return null;
- }
- // }
+ long start = System.nanoTime();
- }
+ long toWait = timeout;
- private void removeEntry()
- {
- // synchronized (waiting)
- // {
- waiting.remove();
- // }
- }
-
- private final AtomicBoolean locked = new AtomicBoolean(false);
-
- public boolean lock(final long sequence)
- {
- Thread currentThread = Thread.currentThread();
-
- // log.info("Thread " + System.identityHashCode(currentThread) + " trying to lock " +
- // System.identityHashCode(this) + " with sequence " + sequence);
-
- QueueEntry entry = new QueueEntry(sequence, currentThread);
-
- addEntry(entry);
-
- boolean wasInterrupted = false;
-
while (true)
{
QueueEntry peeked = peekEntry();
if (peeked == null || peeked.thread != currentThread || !locked.compareAndSet(false, true))
- {
- LockSupport.park();
+ {
+ currentThread.setLastLock(this);
+
+ LockSupport.parkNanos(toWait);
+
+// if (currentThread.isJBMInterrupted())
+// {
+// log.info("**** setting strict order to false");
+// strictOrder = false;
+//
+// //Thread.interrupted();
+// }
- if (Thread.interrupted())
+ long now = System.nanoTime();
+
+ toWait -= now - start;
+
+ if (toWait <= 0)
{
- wasInterrupted = true;
+ log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() + " expected " + sequence);
+
+ return false;
}
+
+ start = now;
}
else
{
@@ -157,26 +122,15 @@
}
}
- if (wasInterrupted)
- {
- currentThread.interrupt();
- }
+ queue.remove();
- removeEntry();
-
owner = currentThread;
- // log.info("Thread " + System.identityHashCode(currentThread) + " locked " + System.identityHashCode(this) + "
- // with sequence " + sequence);
-
return true;
}
public void unlock()
{
- // log.info("Thread " + System.identityHashCode(Thread.currentThread()) + " unlocking " +
- // System.identityHashCode(this));
-
if (owner != Thread.currentThread())
{
throw new IllegalMonitorStateException();
@@ -190,10 +144,25 @@
if (entry != null)
{
- LockSupport.unpark(peekEntry().thread);
+ LockSupport.unpark(entry.thread);
}
}
+ private QueueEntry peekEntry()
+ {
+ QueueEntry entry = queue.peek();
+
+ if (entry != null)
+ {
+ if (!strictOrder ||entry.sequence == currentSequence.get())
+ {
+ return entry;
+ }
+ }
+
+ return null;
+ }
+
private static final class QueueEntry implements Comparable<QueueEntry>
{
private final long sequence;
@@ -219,4 +188,36 @@
return sequence < l ? -1 : (sequence == l ? 0 : 1);
}
}
+
+ private synchronized void dump(final long sequence)
+ {
+ log.error(System.identityHashCode(this) + " Timed out trying to get lock, desired " +
+ sequence +
+ " current " +
+ currentSequence);
+
+ log.info("Current owner is " + owner);
+
+ log.info("Dumping entries");
+
+ Object[] sorted = queue.toArray();
+
+ Arrays.sort(sorted);
+
+ for (Object obj : sorted)
+ {
+ QueueEntry en = (QueueEntry)obj;
+
+ log.info("seq:" + en.sequence + " thread:" + System.identityHashCode(en.thread));
+ }
+
+ try
+ {
+ Thread.sleep(1000000);
+ }
+ catch (Exception e)
+ {
+ }
+ }
+
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/ConcurrentHashSet.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/ConcurrentHashSet.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/ConcurrentHashSet.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -40,7 +40,7 @@
*/
public class ConcurrentHashSet<E> extends AbstractSet<E> implements ConcurrentSet<E>
{
- private ConcurrentMap<E, Object> theMap;
+ private final ConcurrentMap<E, Object> theMap;
private static final Object dummy = new Object();
Added: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/WeakHashSet.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/WeakHashSet.java (rev 0)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/utils/WeakHashSet.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.utils;
+
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+/**
+ *
+ * A WeakHashSet
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * @param <E>
+ *
+ */
+public class WeakHashSet<E> extends AbstractSet<E>
+{
+ private final Map<E, Object> theMap;
+
+ private static final Object dummy = new Object();
+
+ public WeakHashSet()
+ {
+ theMap = new WeakHashMap<E, Object>();
+ }
+
+ public int size()
+ {
+ return theMap.size();
+ }
+
+ public Iterator<E> iterator()
+ {
+ return theMap.keySet().iterator();
+ }
+
+ public boolean isEmpty()
+ {
+ return theMap.isEmpty();
+ }
+
+ public boolean add(E o)
+ {
+ return theMap.put(o, dummy) == null;
+ }
+
+ public boolean contains(Object o)
+ {
+ return theMap.containsKey(o);
+ }
+
+ public void clear()
+ {
+ theMap.clear();
+ }
+
+ public boolean remove(Object o)
+ {
+ return theMap.remove(o) == dummy;
+ }
+}
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -120,8 +120,10 @@
private final int threadNum;
- Runner(final RunnableT test, final int threadNum)
+ Runner(final String name, final RunnableT test, final int threadNum)
{
+ super(name);
+
this.test = test;
this.threadNum = threadNum;
@@ -142,7 +144,7 @@
// Case a failure happened here, it should print the Thread dump
// Sending it to System.out, as it would show on the Tests report
- System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
+ //System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
}
}
}
@@ -153,7 +155,7 @@
for (int i = 0; i < numThreads; i++)
{
- Runner runner = new Runner(runnable, i);
+ Runner runner = new Runner("MultiThreadFailoverTest-thread-" + i, runnable, i);
threads.add(runner);
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -40,7 +40,6 @@
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.jms.client.JBossBytesMessage;
@@ -91,7 +90,7 @@
{
doTestA(sf, threadNum);
}
- }, NUM_THREADS, false);
+ }, 1, false);
}
public void testB() throws Exception
@@ -327,7 +326,7 @@
final int numMessages = 100;
- final int numSessions = 10;
+ final int numSessions = 1;
Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
Set<ClientSession> sessions = new HashSet<ClientSession>();
@@ -1287,7 +1286,7 @@
protected int getNumIterations()
{
- return 2;
+ return 1000;
}
@Override
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java 2009-07-10 08:00:59 UTC (rev 7554)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/unit/core/server/replication/impl/SequencedLockTest.java 2009-07-10 10:59:04 UTC (rev 7555)
@@ -39,6 +39,8 @@
public class SequencedLockTest extends TestCase
{
private static final Logger log = Logger.getLogger(SequencedLockTest.class);
+
+ private static final long TIMEOUT = 5000000000L;
public void testAscending() throws Exception
{
@@ -144,7 +146,7 @@
// log.info("Trying lock " + cnt);
- lock.lock(cnt);
+ lock.lock(cnt, TIMEOUT);
int ob = observedCounter.getAndIncrement();
@@ -212,7 +214,7 @@
{
// log.info(System.identityHashCode(this) + " Will attempt to obtain lock with sequence " + seq);
- lock.lock(seq);
+ lock.lock(seq, TIMEOUT);
// log.info(System.identityHashCode(this) + " Obtained lock with sequence " + seq);
More information about the jboss-cvs-commits
mailing list