[hornetq-commits] JBoss hornetq SVN: r8053 - in trunk: src/main/org/hornetq/core/management/impl and 12 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 6 05:53:21 EDT 2009
Author: ataylor
Date: 2009-10-06 05:53:21 -0400 (Tue, 06 Oct 2009)
New Revision: 8053
Modified:
trunk/src/main/org/hornetq/core/management/QueueControl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/Binding.java
trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-170 - removed transient id's from being used for binding id. the persistent queue id is now used.
Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -32,7 +32,7 @@
String getAddress();
- long getPersistenceID();
+ long getID();
boolean isTemporary();
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -159,9 +159,9 @@
return queue.getMessagesAdded();
}
- public long getPersistenceID()
+ public long getID()
{
- return queue.getPersistenceID();
+ return queue.getID();
}
public long getScheduledCount()
Modified: trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -27,7 +27,7 @@
*/
public interface QueueBindingInfo
{
- long getPersistenceID();
+ long getId();
SimpleString getAddress();
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -310,7 +310,7 @@
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
- ref.getQueue().getPersistenceID());
+ ref.getQueue().getID());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
@@ -400,7 +400,7 @@
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
- ref.getQueue().getPersistenceID());
+ ref.getQueue().getID());
messageJournal.appendUpdateRecordTransactional(txID,
ref.getMessage().getMessageID(),
@@ -457,7 +457,7 @@
public void updateDeliveryCount(final MessageReference ref) throws Exception
{
- DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getPersistenceID(),
+ DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
@@ -1003,11 +1003,7 @@
binding.getAddress(),
filterString);
- long id = this.generateUniqueID();
-
- queue.setPersistenceID(id);
-
- bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding, true);
+ bindingsJournal.appendAddRecord(binding.getID(), QUEUE_BINDING_RECORD, bindingEncoding, true);
}
public void deleteQueueBinding(final long queueBindingID) throws Exception
@@ -1037,7 +1033,7 @@
bindingEncoding.decode(buffer);
- bindingEncoding.setPersistenceID(id);
+ bindingEncoding.setId(id);
queueBindingInfos.add(bindingEncoding);
}
@@ -1265,7 +1261,7 @@
private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
{
- long persistenceID;
+ long id;
SimpleString name;
@@ -1286,14 +1282,14 @@
this.filterString = filterString;
}
- public long getPersistenceID()
+ public long getId()
{
- return persistenceID;
+ return id;
}
- public void setPersistenceID(final long id)
+ public void setId(final long id)
{
- this.persistenceID = id;
+ this.id = id;
}
public SimpleString getAddress()
Modified: trunk/src/main/org/hornetq/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Binding.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/Binding.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -48,9 +48,7 @@
boolean isExclusive();
- int getID();
-
- void setID(int id);
+ long getID();
int getDistance();
}
Modified: trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -40,7 +40,7 @@
private final SimpleString filterString;
- private final int id;
+ private final long id;
private List<SimpleString> filterStrings;
@@ -48,7 +48,7 @@
private final int distance;
- public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final SimpleString address, final SimpleString filterString, final int id,
+ public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final SimpleString address, final SimpleString filterString, final Long id,
final Integer distance)
{
if (routingName == null)
@@ -100,7 +100,7 @@
return distance;
}
- public int getID()
+ public long getID()
{
return id;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -51,7 +51,7 @@
private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
- private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
+ private final Map<Long, Binding> bindingsMap = new ConcurrentHashMap<Long, Binding>();
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
@@ -133,7 +133,7 @@
while (buff.hasRemaining())
{
- int bindingID = buff.getInt();
+ long bindingID = buff.getLong();
Binding binding = bindingsMap.get(bindingID);
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -45,10 +45,12 @@
private final boolean exclusive;
- private int id;
+ private final long id;
- public DivertBinding(final SimpleString address, final Divert divert)
+ public DivertBinding(long id, final SimpleString address, final Divert divert)
{
+ this.id = id;
+
this.address = address;
this.divert = divert;
@@ -62,16 +64,11 @@
this.exclusive = divert.isExclusive();
}
- public int getID()
+ public long getID()
{
return id;
}
- public void setID(final int id)
- {
- this.id = id;
- }
-
public Filter getFilter()
{
return filter;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -48,8 +48,6 @@
private final SimpleString name;
- private int id;
-
private SimpleString clusterName;
public LocalQueueBinding(final SimpleString address, final Queue queue, final SimpleString nodeID)
@@ -65,16 +63,11 @@
this.clusterName = name.concat(nodeID);
}
- public int getID()
+ public long getID()
{
- return id;
+ return queue.getID();
}
- public void setID(final int id)
- {
- this.id = id;
- }
-
public Filter getFilter()
{
return filter;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -100,16 +100,6 @@
private final boolean persistIDCache;
- // Each queue has a transient ID which lasts the lifetime of its binding. This is used in clustering when routing
- // messages to particular queues on nodes. We could
- // use the queue name on the node to identify it. But sometimes we need to route to maybe 10s of thousands of queues
- // on a particular node, and all would
- // have to be specified in the message. Specify 10000 ints takes up a lot less space than 10000 arbitrary queue names
- // The drawback of this approach is we only allow up to 2^32 queues in memory at any one time
- private int transientIDSequence;
-
- private Set<Integer> transientIDs = new HashSet<Integer>();
-
private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
private final Object notificationLock = new Object();
@@ -200,9 +190,6 @@
addressManager.clear();
queueInfos.clear();
-
- transientIDs.clear();
-
}
public boolean isStarted()
@@ -243,13 +230,13 @@
SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
- Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
+ Long id = (Long)props.getProperty(ManagementHelper.HDR_BINDING_ID);
SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
- QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, transientID, distance);
+ QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, id, distance);
queueInfos.put(clusterName, info);
@@ -435,8 +422,6 @@
// even though failover is complete
public synchronized void addBinding(final Binding binding) throws Exception
{
- binding.setID(generateTransientID());
-
addressManager.addBinding(binding);
TypedProperties props = new TypedProperties();
@@ -449,7 +434,7 @@
props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+ props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -505,8 +490,6 @@
managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
- releaseTransientID(binding.getID());
-
return binding;
}
@@ -740,7 +723,7 @@
message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
- message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+ message.putLongProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
@@ -822,30 +805,6 @@
return message;
}
- private int generateTransientID()
- {
- int start = transientIDSequence;
- do
- {
- int id = transientIDSequence++;
-
- if (!transientIDs.contains(id))
- {
- transientIDs.add(id);
-
- return id;
- }
- }
- while (transientIDSequence != start);
-
- throw new IllegalStateException("Run out of queue ids!");
- }
-
- private void releaseTransientID(final int id)
- {
- transientIDs.remove(id);
- }
-
private final PageMessageOperation getPageOperation(final Transaction tx)
{
// you could have races on the case two sessions using the same XID
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -38,10 +38,8 @@
SimpleString getName();
- long getPersistenceID();
+ long getID();
- void setPersistenceID(long id);
-
Filter getFilter();
boolean isDurable();
Modified: trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -41,7 +41,7 @@
void handleReplicatedAddBinding(SimpleString address,
SimpleString uniqueName,
SimpleString routingName,
- int queueID,
+ long queueID,
SimpleString filterString,
SimpleString queueName,
int distance) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -577,14 +577,15 @@
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
- Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
+ Long queueID = (Long)message.getProperty(ManagementHelper.HDR_BINDING_ID);
if (queueID == null)
{
throw new IllegalStateException("queueID is null");
}
- RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+ RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
+ queueAddress,
clusterName,
routingName,
queueID,
@@ -721,7 +722,7 @@
public void handleReplicatedAddBinding(final SimpleString address,
final SimpleString uniqueName,
final SimpleString routingName,
- final int queueID,
+ final long queueID,
final SimpleString filterString,
final SimpleString queueName,
final int distance) throws Exception
@@ -735,7 +736,8 @@
Queue queue = (Queue)queueBinding.getBindable();
- RemoteQueueBinding binding = new RemoteQueueBindingImpl(address,
+ RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
+ address,
uniqueName,
routingName,
queueID,
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -51,7 +51,7 @@
private final SimpleString routingName;
- private final int remoteQueueID;
+ private final long remoteQueueID;
private final Filter queueFilter;
@@ -63,19 +63,22 @@
private final SimpleString idsHeaderName;
- private int id;
+ private final long id;
private final int distance;
- public RemoteQueueBindingImpl(final SimpleString address,
+ public RemoteQueueBindingImpl(final long id,
+ final SimpleString address,
final SimpleString uniqueName,
final SimpleString routingName,
- final int remoteQueueID,
+ final Long remoteQueueID,
final SimpleString filterString,
- final Queue storeAndForwardQueue,
+ final Queue storeAndForwardQueue,
final SimpleString bridgeName,
final int distance) throws Exception
{
+ this.id = id;
+
this.address = address;
this.storeAndForwardQueue = storeAndForwardQueue;
@@ -100,16 +103,11 @@
this.distance = distance;
}
- public int getID()
+ public long getID()
{
return id;
}
- public void setID(final int id)
- {
- this.id = id;
- }
-
public SimpleString getAddress()
{
return address;
@@ -195,20 +193,20 @@
if (ids == null)
{
- ids = new byte[4];
+ ids = new byte[8];
}
else
{
- byte[] newIds = new byte[ids.length + 4];
+ byte[] newIds = new byte[ids.length + 8];
- System.arraycopy(ids, 0, newIds, 4, ids.length);
+ System.arraycopy(ids, 0, newIds, 8, ids.length);
ids = newIds;
}
ByteBuffer buff = ByteBuffer.wrap(ids);
- buff.putInt(remoteQueueID);
+ buff.putLong(remoteQueueID);
message.putBytesProperty(idsHeaderName, ids);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -843,7 +843,7 @@
if (queue.isDurable())
{
- storageManager.deleteQueueBinding(queue.getPersistenceID());
+ storageManager.deleteQueueBinding(queue.getID());
}
postOffice.removeBinding(queueName);
@@ -1169,7 +1169,7 @@
filter = new FilterImpl(queueBindingInfo.getFilterString());
}
- Queue queue = queueFactory.createQueue(queueBindingInfo.getPersistenceID(),
+ Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
filter,
@@ -1178,7 +1178,7 @@
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeID);
- queues.put(queueBindingInfo.getPersistenceID(), queue);
+ queues.put(queueBindingInfo.getId(), queue);
postOffice.addBinding(binding);
@@ -1267,8 +1267,8 @@
filter = new FilterImpl(filterString);
}
- final Queue queue = queueFactory.createQueue(-1, address, queueName, filter, durable, temporary);
-
+ final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(), address, queueName, filter, durable, temporary);
+
binding = new LocalQueueBinding(address, queue, nodeID);
if (durable)
@@ -1339,7 +1339,7 @@
pagingManager,
storageManager);
- Binding binding = new DivertBinding(sAddress, divert);
+ Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
postOffice.addBinding(binding);
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -232,7 +232,7 @@
}
else
{
- storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
+ storageManager.deleteMessageTransactional(tx.getID(), getID(), msg.getMessageID());
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -76,7 +76,7 @@
public static final int NUM_PRIORITIES = 10;
- private volatile long persistenceID = -1;
+ private final long id;
private final SimpleString name;
@@ -140,7 +140,7 @@
private volatile SimpleString expiryAddress;
- public QueueImpl(final long persistenceID,
+ public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
final Filter filter,
@@ -151,7 +151,7 @@
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
- this.persistenceID = persistenceID;
+ this.id = id;
this.address = address;
@@ -259,7 +259,7 @@
message.setStored();
}
- storageManager.storeReference(ref.getQueue().getPersistenceID(), message.getMessageID());
+ storageManager.storeReference(ref.getQueue().getID(), message.getMessageID());
}
if (scheduledDeliveryTime != null && durableRef)
@@ -283,7 +283,7 @@
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
storageManager.storeReferenceTransactional(tx.getID(),
- ref.getQueue().getPersistenceID(),
+ ref.getQueue().getID(),
message.getMessageID());
}
@@ -373,16 +373,11 @@
return name;
}
- public long getPersistenceID()
+ public long getID()
{
- return persistenceID;
+ return id;
}
- public void setPersistenceID(final long id)
- {
- persistenceID = id;
- }
-
public Filter getFilter()
{
return filter;
@@ -662,7 +657,7 @@
if (durableRef)
{
- storageManager.storeAcknowledge(persistenceID, message.getMessageID());
+ storageManager.storeAcknowledge(id, message.getMessageID());
}
postAcknowledge(ref);
@@ -676,7 +671,7 @@
if (durableRef)
{
- storageManager.storeAcknowledgeTransactional(tx.getID(), persistenceID, message.getMessageID());
+ storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -25,72 +25,149 @@
*/
public class ClusterRestartTest extends ClusterTestBase
{
- public void testRestartWithDurableQueues() throws Exception
+ public void testRestartWithQueuesCreateInDiffOrder() throws Exception
{
- /*setupServer(0, isFileStorage(), isNetty());
+ setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
- setupServer(2, isFileStorage(), isNetty());
- setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
- setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
- setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
- startServers(0, 1, 2);
+ startServers(0, 1);
+
+ System.out.println("server 0 = " + getServer(0).getNodeID());
+ System.out.println("server 1 = " + getServer(1).getNodeID());
+
try
{
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
+ //create some dummy queues to ensure that the test queue has a high numbered binding
+ createQueue(0, "queues.testaddress2", "queue0", null, false);
+ createQueue(0, "queues.testaddress2", "queue1", null, false);
+ createQueue(0, "queues.testaddress2", "queue2", null, false);
+ createQueue(0, "queues.testaddress2", "queue3", null, false);
+ createQueue(0, "queues.testaddress2", "queue4", null, false);
+ createQueue(0, "queues.testaddress2", "queue5", null, false);
+ createQueue(0, "queues.testaddress2", "queue6", null, false);
+ createQueue(0, "queues.testaddress2", "queue7", null, false);
+ createQueue(0, "queues.testaddress2", "queue8", null, false);
+ createQueue(0, "queues.testaddress2", "queue9", null, false);
+ //now create the 2 queues and make sure they are durable
+ createQueue(0, "queues.testaddress", "queue10", null, true);
+ createQueue(1, "queues.testaddress", "queue10", null, true);
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
+ addConsumer(0, 0, "queue10", null);
- addConsumer(1, 1, "queue0", null);
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ printBindings(2);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false, null);
+
+ System.out.println("stopping******************************************************");
+ stopServers(0);
+ System.out.println("stopped******************************************************");
+ startServers(0);
+
waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
+ addConsumer(1, 0, "queue10", null);
- printBindings();
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+ printBindings(2);
+
+ sendInRange(1, "queues.testaddress", 10, 20, false, null);
- sendInRange(1, "queues.testaddress", 0, 10, false, null);
+ verifyReceiveAllInRange(10, 20, 1);
+ System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
- sendInRange(2, "queues.testaddress", 10, 20, false, null);
+ closeAllSessionFactories();
+ stopServers(0, 1);
+ }
+ }
- sendInRange(0, "queues.testaddress", 20, 30, false, null);
+ public void testRestartWithQueuesCreateInDiffOrder2() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
+
+
+ startServers(0, 1);
+
+
+ System.out.println("server 0 = " + getServer(0).getNodeID());
+ System.out.println("server 1 = " + getServer(1).getNodeID());
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+
+ //create some dummy queues to ensure that the test queue has a high numbered binding
+ createQueue(0, "queues.testaddress2", "queue0", null, false);
+ createQueue(0, "queues.testaddress2", "queue1", null, false);
+ createQueue(0, "queues.testaddress2", "queue2", null, false);
+ createQueue(0, "queues.testaddress2", "queue3", null, false);
+ createQueue(0, "queues.testaddress2", "queue4", null, false);
+ createQueue(0, "queues.testaddress2", "queue5", null, false);
+ createQueue(0, "queues.testaddress2", "queue6", null, false);
+ createQueue(0, "queues.testaddress2", "queue7", null, false);
+ createQueue(0, "queues.testaddress2", "queue8", null, false);
+ createQueue(0, "queues.testaddress2", "queue9", null, false);
+ //now create the 2 queues and make sure they are durable
+ createQueue(0, "queues.testaddress", "queue10", null, true);
+ createQueue(1, "queues.testaddress", "queue10", null, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+ printBindings(2);
+
+ sendInRange(1, "queues.testaddress", 0, 10, true, null);
+
System.out.println("stopping******************************************************");
- stopServers(1);
+ stopServers(0);
+
+ sendInRange(1, "queues.testaddress", 10, 20, true, null);
System.out.println("stopped******************************************************");
- startServers(1);
+ startServers(0);
waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 0, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
- addConsumer(4, 1, "queue0", null);
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
- printBindings();
- sendInRange(2, "queues.testaddress", 30, 40, false, null);
- sendInRange(0, "queues.testaddress", 40, 50, false, null);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
+ printBindings(2);
+ addConsumer(0, 1, "queue10", null);
+ addConsumer(1, 0, "queue10", null);
- verifyReceiveAllInRange(0, 50, 1);
+ verifyReceiveRoundRobin(0, 20, 0, 1);
System.out.println("*****************************************************************************");
}
finally
@@ -99,30 +176,22 @@
closeAllSessionFactories();
- stopServers(0, 1, 2);
- }*/
+ stopServers(0, 1);
+ }
}
- private void printBindings()
+
+ private void printBindings(int num)
throws Exception
{
- Collection<Binding> bindings0 = getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
- Collection<Binding> bindings1 = getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
- Collection<Binding> bindings2 = getServer(2).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
- for (Binding binding : bindings0)
+ for(int i = 0; i < num; i++)
{
- System.out.println(binding + " on node 0 at " + binding.getID());
+ Collection<Binding> bindings0 = getServer(i).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
+ for (Binding binding : bindings0)
+ {
+ System.out.println(binding + " on node " + i + " at " + binding.getID());
+ }
}
-
- for (Binding binding : bindings1)
- {
- System.out.println(binding + " on node 1 at " + binding.getID());
- }
-
- for (Binding binding : bindings2)
- {
- System.out.println(binding + " on node 2 at " + binding.getID());
- }
}
public boolean isNetty()
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -130,9 +130,9 @@
return (String)proxy.retrieveAttributeValue("name");
}
- public long getPersistenceID()
+ public long getID()
{
- return (Long)proxy.retrieveAttributeValue("persistenceID");
+ return (Long)proxy.retrieveAttributeValue("ID");
}
public long getScheduledCount()
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -957,7 +957,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.Binding#getID()
*/
- public int getID()
+ public long getID()
{
return 0;
@@ -1008,14 +1008,6 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#setID(int)
- */
- public void setID(final int id)
- {
-
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.postoffice.Binding#willRoute(org.hornetq.core.server.ServerMessage)
*/
public void willRoute(final ServerMessage message)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -19,7 +19,6 @@
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.remoting.Channel;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.MessageReference;
@@ -311,9 +310,9 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#getPersistenceID()
+ * @see org.hornetq.core.server.Queue#getID()
*/
- public long getPersistenceID()
+ public long getID()
{
return 0;
@@ -482,14 +481,7 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#setPersistenceID(long)
- */
- public void setPersistenceID(long id)
- {
- }
-
/* (non-Javadoc)
* @see org.hornetq.core.server.Bindable#preroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
*/
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -51,21 +51,6 @@
private static final SimpleString address1 = new SimpleString("address1");
- public void testID()
- {
- final long id = 123;
-
- Queue queue = new QueueImpl(id, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
-
- assertEquals(id, queue.getPersistenceID());
-
- final long id2 = 456;
-
- queue.setPersistenceID(id2);
-
- assertEquals(id2, queue.getPersistenceID());
- }
-
public void testName()
{
final SimpleString name = new SimpleString("oobblle");
More information about the hornetq-commits
mailing list