Author: ataylor
Date: 2009-10-06 05:53:21 -0400 (Tue, 06 Oct 2009)
New Revision: 8053
Modified:
trunk/src/main/org/hornetq/core/management/QueueControl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/Binding.java
trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-170 - removed transient id's from being
used for binding id. the persistent queue id is now used.
Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-10-06 09:35:13 UTC
(rev 8052)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-10-06 09:53:21 UTC
(rev 8053)
@@ -32,7 +32,7 @@
String getAddress();
- long getPersistenceID();
+ long getID();
boolean isTemporary();
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-10-06
09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -159,9 +159,9 @@
return queue.getMessagesAdded();
}
- public long getPersistenceID()
+ public long getID()
{
- return queue.getPersistenceID();
+ return queue.getID();
}
public long getScheduledCount()
Modified: trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java 2009-10-06 09:35:13
UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java 2009-10-06 09:53:21
UTC (rev 8053)
@@ -27,7 +27,7 @@
*/
public interface QueueBindingInfo
{
- long getPersistenceID();
+ long getId();
SimpleString getAddress();
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-06
09:35:13 UTC (rev 8052)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -310,7 +310,7 @@
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
-
ref.getQueue().getPersistenceID());
+
ref.getQueue().getID());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
@@ -400,7 +400,7 @@
public void updateScheduledDeliveryTimeTransactional(final long txID, final
MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
-
ref.getQueue().getPersistenceID());
+
ref.getQueue().getID());
messageJournal.appendUpdateRecordTransactional(txID,
ref.getMessage().getMessageID(),
@@ -457,7 +457,7 @@
public void updateDeliveryCount(final MessageReference ref) throws Exception
{
- DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getPersistenceID(),
+ DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
@@ -1003,11 +1003,7 @@
binding.getAddress(),
filterString);
- long id = this.generateUniqueID();
-
- queue.setPersistenceID(id);
-
- bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding, true);
+ bindingsJournal.appendAddRecord(binding.getID(), QUEUE_BINDING_RECORD,
bindingEncoding, true);
}
public void deleteQueueBinding(final long queueBindingID) throws Exception
@@ -1037,7 +1033,7 @@
bindingEncoding.decode(buffer);
- bindingEncoding.setPersistenceID(id);
+ bindingEncoding.setId(id);
queueBindingInfos.add(bindingEncoding);
}
@@ -1265,7 +1261,7 @@
private static class PersistentQueueBindingEncoding implements EncodingSupport,
QueueBindingInfo
{
- long persistenceID;
+ long id;
SimpleString name;
@@ -1286,14 +1282,14 @@
this.filterString = filterString;
}
- public long getPersistenceID()
+ public long getId()
{
- return persistenceID;
+ return id;
}
- public void setPersistenceID(final long id)
+ public void setId(final long id)
{
- this.persistenceID = id;
+ this.id = id;
}
public SimpleString getAddress()
Modified: trunk/src/main/org/hornetq/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Binding.java 2009-10-06 09:35:13 UTC (rev
8052)
+++ trunk/src/main/org/hornetq/core/postoffice/Binding.java 2009-10-06 09:53:21 UTC (rev
8053)
@@ -48,9 +48,7 @@
boolean isExclusive();
- int getID();
-
- void setID(int id);
+ long getID();
int getDistance();
}
Modified: trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java 2009-10-06 09:35:13 UTC (rev
8052)
+++ trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java 2009-10-06 09:53:21 UTC (rev
8053)
@@ -40,7 +40,7 @@
private final SimpleString filterString;
- private final int id;
+ private final long id;
private List<SimpleString> filterStrings;
@@ -48,7 +48,7 @@
private final int distance;
- public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final
SimpleString address, final SimpleString filterString, final int id,
+ public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final
SimpleString address, final SimpleString filterString, final Long id,
final Integer distance)
{
if (routingName == null)
@@ -100,7 +100,7 @@
return distance;
}
- public int getID()
+ public long getID()
{
return id;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-06 09:35:13
UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-06 09:53:21
UTC (rev 8053)
@@ -51,7 +51,7 @@
private final Map<SimpleString, Integer> routingNamePositions = new
ConcurrentHashMap<SimpleString, Integer>();
- private final Map<Integer, Binding> bindingsMap = new
ConcurrentHashMap<Integer, Binding>();
+ private final Map<Long, Binding> bindingsMap = new ConcurrentHashMap<Long,
Binding>();
private final List<Binding> exclusiveBindings = new
CopyOnWriteArrayList<Binding>();
@@ -133,7 +133,7 @@
while (buff.hasRemaining())
{
- int bindingID = buff.getInt();
+ long bindingID = buff.getLong();
Binding binding = bindingsMap.get(bindingID);
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-10-06 09:35:13
UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-10-06 09:53:21
UTC (rev 8053)
@@ -45,10 +45,12 @@
private final boolean exclusive;
- private int id;
+ private final long id;
- public DivertBinding(final SimpleString address, final Divert divert)
+ public DivertBinding(long id, final SimpleString address, final Divert divert)
{
+ this.id = id;
+
this.address = address;
this.divert = divert;
@@ -62,16 +64,11 @@
this.exclusive = divert.isExclusive();
}
- public int getID()
+ public long getID()
{
return id;
}
- public void setID(final int id)
- {
- this.id = id;
- }
-
public Filter getFilter()
{
return filter;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-10-06
09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -48,8 +48,6 @@
private final SimpleString name;
- private int id;
-
private SimpleString clusterName;
public LocalQueueBinding(final SimpleString address, final Queue queue, final
SimpleString nodeID)
@@ -65,16 +63,11 @@
this.clusterName = name.concat(nodeID);
}
- public int getID()
+ public long getID()
{
- return id;
+ return queue.getID();
}
- public void setID(final int id)
- {
- this.id = id;
- }
-
public Filter getFilter()
{
return filter;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-06
09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -100,16 +100,6 @@
private final boolean persistIDCache;
- // Each queue has a transient ID which lasts the lifetime of its binding. This is used
in clustering when routing
- // messages to particular queues on nodes. We could
- // use the queue name on the node to identify it. But sometimes we need to route to
maybe 10s of thousands of queues
- // on a particular node, and all would
- // have to be specified in the message. Specify 10000 ints takes up a lot less space
than 10000 arbitrary queue names
- // The drawback of this approach is we only allow up to 2^32 queues in memory at any
one time
- private int transientIDSequence;
-
- private Set<Integer> transientIDs = new HashSet<Integer>();
-
private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString,
QueueInfo>();
private final Object notificationLock = new Object();
@@ -200,9 +190,6 @@
addressManager.clear();
queueInfos.clear();
-
- transientIDs.clear();
-
}
public boolean isStarted()
@@ -243,13 +230,13 @@
SimpleString address =
(SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
- Integer transientID =
(Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
+ Long id = (Long)props.getProperty(ManagementHelper.HDR_BINDING_ID);
SimpleString filterString =
(SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
Integer distance =
(Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
- QueueInfo info = new QueueInfo(routingName, clusterName, address,
filterString, transientID, distance);
+ QueueInfo info = new QueueInfo(routingName, clusterName, address,
filterString, id, distance);
queueInfos.put(clusterName, info);
@@ -435,8 +422,6 @@
// even though failover is complete
public synchronized void addBinding(final Binding binding) throws Exception
{
- binding.setID(generateTransientID());
-
addressManager.addBinding(binding);
TypedProperties props = new TypedProperties();
@@ -449,7 +434,7 @@
props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME,
binding.getRoutingName());
- props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+ props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -505,8 +490,6 @@
managementService.sendNotification(new Notification(null,
NotificationType.BINDING_REMOVED, props));
- releaseTransientID(binding.getID());
-
return binding;
}
@@ -740,7 +723,7 @@
message.putStringProperty(ManagementHelper.HDR_ADDRESS,
info.getAddress());
message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME,
info.getClusterName());
message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME,
info.getRoutingName());
- message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+ message.putLongProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING,
info.getFilterString());
message.putIntProperty(ManagementHelper.HDR_DISTANCE,
info.getDistance());
@@ -822,30 +805,6 @@
return message;
}
- private int generateTransientID()
- {
- int start = transientIDSequence;
- do
- {
- int id = transientIDSequence++;
-
- if (!transientIDs.contains(id))
- {
- transientIDs.add(id);
-
- return id;
- }
- }
- while (transientIDSequence != start);
-
- throw new IllegalStateException("Run out of queue ids!");
- }
-
- private void releaseTransientID(final int id)
- {
- transientIDs.remove(id);
- }
-
private final PageMessageOperation getPageOperation(final Transaction tx)
{
// you could have races on the case two sessions using the same XID
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-10-06 09:53:21 UTC (rev 8053)
@@ -38,10 +38,8 @@
SimpleString getName();
- long getPersistenceID();
+ long getID();
- void setPersistenceID(long id);
-
Filter getFilter();
boolean isDurable();
Modified: trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2009-10-06
09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -41,7 +41,7 @@
void handleReplicatedAddBinding(SimpleString address,
SimpleString uniqueName,
SimpleString routingName,
- int queueID,
+ long queueID,
SimpleString filterString,
SimpleString queueName,
int distance) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-06
09:35:13 UTC (rev 8052)
+++
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -577,14 +577,15 @@
SimpleString filterString =
(SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
- Integer queueID =
(Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
+ Long queueID = (Long)message.getProperty(ManagementHelper.HDR_BINDING_ID);
if (queueID == null)
{
throw new IllegalStateException("queueID is null");
}
- RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+ RemoteQueueBinding binding = new
RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
+ queueAddress,
clusterName,
routingName,
queueID,
@@ -721,7 +722,7 @@
public void handleReplicatedAddBinding(final SimpleString address,
final SimpleString uniqueName,
final SimpleString routingName,
- final int queueID,
+ final long queueID,
final SimpleString filterString,
final SimpleString queueName,
final int distance) throws Exception
@@ -735,7 +736,8 @@
Queue queue = (Queue)queueBinding.getBindable();
- RemoteQueueBinding binding = new RemoteQueueBindingImpl(address,
+ RemoteQueueBinding binding = new
RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
+ address,
uniqueName,
routingName,
queueID,
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-06
09:35:13 UTC (rev 8052)
+++
trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -51,7 +51,7 @@
private final SimpleString routingName;
- private final int remoteQueueID;
+ private final long remoteQueueID;
private final Filter queueFilter;
@@ -63,19 +63,22 @@
private final SimpleString idsHeaderName;
- private int id;
+ private final long id;
private final int distance;
- public RemoteQueueBindingImpl(final SimpleString address,
+ public RemoteQueueBindingImpl(final long id,
+ final SimpleString address,
final SimpleString uniqueName,
final SimpleString routingName,
- final int remoteQueueID,
+ final Long remoteQueueID,
final SimpleString filterString,
- final Queue storeAndForwardQueue,
+ final Queue storeAndForwardQueue,
final SimpleString bridgeName,
final int distance) throws Exception
{
+ this.id = id;
+
this.address = address;
this.storeAndForwardQueue = storeAndForwardQueue;
@@ -100,16 +103,11 @@
this.distance = distance;
}
- public int getID()
+ public long getID()
{
return id;
}
- public void setID(final int id)
- {
- this.id = id;
- }
-
public SimpleString getAddress()
{
return address;
@@ -195,20 +193,20 @@
if (ids == null)
{
- ids = new byte[4];
+ ids = new byte[8];
}
else
{
- byte[] newIds = new byte[ids.length + 4];
+ byte[] newIds = new byte[ids.length + 8];
- System.arraycopy(ids, 0, newIds, 4, ids.length);
+ System.arraycopy(ids, 0, newIds, 8, ids.length);
ids = newIds;
}
ByteBuffer buff = ByteBuffer.wrap(ids);
- buff.putInt(remoteQueueID);
+ buff.putLong(remoteQueueID);
message.putBytesProperty(idsHeaderName, ids);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-06 09:35:13
UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-06 09:53:21
UTC (rev 8053)
@@ -843,7 +843,7 @@
if (queue.isDurable())
{
- storageManager.deleteQueueBinding(queue.getPersistenceID());
+ storageManager.deleteQueueBinding(queue.getID());
}
postOffice.removeBinding(queueName);
@@ -1169,7 +1169,7 @@
filter = new FilterImpl(queueBindingInfo.getFilterString());
}
- Queue queue = queueFactory.createQueue(queueBindingInfo.getPersistenceID(),
+ Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
filter,
@@ -1178,7 +1178,7 @@
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue,
nodeID);
- queues.put(queueBindingInfo.getPersistenceID(), queue);
+ queues.put(queueBindingInfo.getId(), queue);
postOffice.addBinding(binding);
@@ -1267,8 +1267,8 @@
filter = new FilterImpl(filterString);
}
- final Queue queue = queueFactory.createQueue(-1, address, queueName, filter,
durable, temporary);
-
+ final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(),
address, queueName, filter, durable, temporary);
+
binding = new LocalQueueBinding(address, queue, nodeID);
if (durable)
@@ -1339,7 +1339,7 @@
pagingManager,
storageManager);
- Binding binding = new DivertBinding(sAddress, divert);
+ Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress,
divert);
postOffice.addBinding(binding);
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-10-06 09:35:13
UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-10-06 09:53:21
UTC (rev 8053)
@@ -232,7 +232,7 @@
}
else
{
- storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(),
msg.getMessageID());
+ storageManager.deleteMessageTransactional(tx.getID(), getID(),
msg.getMessageID());
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-06 09:35:13 UTC
(rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-06 09:53:21 UTC
(rev 8053)
@@ -76,7 +76,7 @@
public static final int NUM_PRIORITIES = 10;
- private volatile long persistenceID = -1;
+ private final long id;
private final SimpleString name;
@@ -140,7 +140,7 @@
private volatile SimpleString expiryAddress;
- public QueueImpl(final long persistenceID,
+ public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
final Filter filter,
@@ -151,7 +151,7 @@
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository)
{
- this.persistenceID = persistenceID;
+ this.id = id;
this.address = address;
@@ -259,7 +259,7 @@
message.setStored();
}
- storageManager.storeReference(ref.getQueue().getPersistenceID(),
message.getMessageID());
+ storageManager.storeReference(ref.getQueue().getID(),
message.getMessageID());
}
if (scheduledDeliveryTime != null && durableRef)
@@ -283,7 +283,7 @@
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
storageManager.storeReferenceTransactional(tx.getID(),
-
ref.getQueue().getPersistenceID(),
+ ref.getQueue().getID(),
message.getMessageID());
}
@@ -373,16 +373,11 @@
return name;
}
- public long getPersistenceID()
+ public long getID()
{
- return persistenceID;
+ return id;
}
- public void setPersistenceID(final long id)
- {
- persistenceID = id;
- }
-
public Filter getFilter()
{
return filter;
@@ -662,7 +657,7 @@
if (durableRef)
{
- storageManager.storeAcknowledge(persistenceID, message.getMessageID());
+ storageManager.storeAcknowledge(id, message.getMessageID());
}
postAcknowledge(ref);
@@ -676,7 +671,7 @@
if (durableRef)
{
- storageManager.storeAcknowledgeTransactional(tx.getID(), persistenceID,
message.getMessageID());
+ storageManager.storeAcknowledgeTransactional(tx.getID(), id,
message.getMessageID());
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2009-10-06
09:35:13 UTC (rev 8052)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -25,72 +25,149 @@
*/
public class ClusterRestartTest extends ClusterTestBase
{
- public void testRestartWithDurableQueues() throws Exception
+ public void testRestartWithQueuesCreateInDiffOrder() throws Exception
{
- /*setupServer(0, isFileStorage(), isNetty());
+ setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
- setupServer(2, isFileStorage(), isNetty());
- setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1);
- setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0);
- setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
- startServers(0, 1, 2);
+ startServers(0, 1);
+
+ System.out.println("server 0 = " + getServer(0).getNodeID());
+ System.out.println("server 1 = " + getServer(1).getNodeID());
+
try
{
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
+ //create some dummy queues to ensure that the test queue has a high numbered
binding
+ createQueue(0, "queues.testaddress2", "queue0", null,
false);
+ createQueue(0, "queues.testaddress2", "queue1", null,
false);
+ createQueue(0, "queues.testaddress2", "queue2", null,
false);
+ createQueue(0, "queues.testaddress2", "queue3", null,
false);
+ createQueue(0, "queues.testaddress2", "queue4", null,
false);
+ createQueue(0, "queues.testaddress2", "queue5", null,
false);
+ createQueue(0, "queues.testaddress2", "queue6", null,
false);
+ createQueue(0, "queues.testaddress2", "queue7", null,
false);
+ createQueue(0, "queues.testaddress2", "queue8", null,
false);
+ createQueue(0, "queues.testaddress2", "queue9", null,
false);
+ //now create the 2 queues and make sure they are durable
+ createQueue(0, "queues.testaddress", "queue10", null,
true);
+ createQueue(1, "queues.testaddress", "queue10", null,
true);
- createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
- createQueue(2, "queues.testaddress", "queue0", null, true);
+ addConsumer(0, 0, "queue10", null);
- addConsumer(1, 1, "queue0", null);
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+ printBindings(2);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false, null);
+
+
System.out.println("stopping******************************************************");
+ stopServers(0);
+
System.out.println("stopped******************************************************");
+ startServers(0);
+
waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
+ addConsumer(1, 0, "queue10", null);
- printBindings();
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 1, false);
+ printBindings(2);
+
+ sendInRange(1, "queues.testaddress", 10, 20, false, null);
- sendInRange(1, "queues.testaddress", 0, 10, false, null);
+ verifyReceiveAllInRange(10, 20, 1);
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
- sendInRange(2, "queues.testaddress", 10, 20, false, null);
+ closeAllSessionFactories();
+ stopServers(0, 1);
+ }
+ }
- sendInRange(0, "queues.testaddress", 20, 30, false, null);
+ public void testRestartWithQueuesCreateInDiffOrder2() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0);
+
+
+ startServers(0, 1);
+
+
+ System.out.println("server 0 = " + getServer(0).getNodeID());
+ System.out.println("server 1 = " + getServer(1).getNodeID());
+
+ try
+ {
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+
+ //create some dummy queues to ensure that the test queue has a high numbered
binding
+ createQueue(0, "queues.testaddress2", "queue0", null,
false);
+ createQueue(0, "queues.testaddress2", "queue1", null,
false);
+ createQueue(0, "queues.testaddress2", "queue2", null,
false);
+ createQueue(0, "queues.testaddress2", "queue3", null,
false);
+ createQueue(0, "queues.testaddress2", "queue4", null,
false);
+ createQueue(0, "queues.testaddress2", "queue5", null,
false);
+ createQueue(0, "queues.testaddress2", "queue6", null,
false);
+ createQueue(0, "queues.testaddress2", "queue7", null,
false);
+ createQueue(0, "queues.testaddress2", "queue8", null,
false);
+ createQueue(0, "queues.testaddress2", "queue9", null,
false);
+ //now create the 2 queues and make sure they are durable
+ createQueue(0, "queues.testaddress", "queue10", null,
true);
+ createQueue(1, "queues.testaddress", "queue10", null,
true);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+ printBindings(2);
+
+ sendInRange(1, "queues.testaddress", 0, 10, true, null);
+
System.out.println("stopping******************************************************");
- stopServers(1);
+ stopServers(0);
+
+ sendInRange(1, "queues.testaddress", 10, 20, true, null);
System.out.println("stopped******************************************************");
- startServers(1);
+ startServers(0);
waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 0, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
- addConsumer(4, 1, "queue0", null);
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
- printBindings();
- sendInRange(2, "queues.testaddress", 30, 40, false, null);
- sendInRange(0, "queues.testaddress", 40, 50, false, null);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
+ printBindings(2);
+ addConsumer(0, 1, "queue10", null);
+ addConsumer(1, 0, "queue10", null);
- verifyReceiveAllInRange(0, 50, 1);
+ verifyReceiveRoundRobin(0, 20, 0, 1);
System.out.println("*****************************************************************************");
}
finally
@@ -99,30 +176,22 @@
closeAllSessionFactories();
- stopServers(0, 1, 2);
- }*/
+ stopServers(0, 1);
+ }
}
- private void printBindings()
+
+ private void printBindings(int num)
throws Exception
{
- Collection<Binding> bindings0 =
getServer(0).getPostOffice().getBindingsForAddress(new
SimpleString("queues.testaddress")).getBindings();
- Collection<Binding> bindings1 =
getServer(1).getPostOffice().getBindingsForAddress(new
SimpleString("queues.testaddress")).getBindings();
- Collection<Binding> bindings2 =
getServer(2).getPostOffice().getBindingsForAddress(new
SimpleString("queues.testaddress")).getBindings();
- for (Binding binding : bindings0)
+ for(int i = 0; i < num; i++)
{
- System.out.println(binding + " on node 0 at " + binding.getID());
+ Collection<Binding> bindings0 =
getServer(i).getPostOffice().getBindingsForAddress(new
SimpleString("queues.testaddress")).getBindings();
+ for (Binding binding : bindings0)
+ {
+ System.out.println(binding + " on node " + i + " at " +
binding.getID());
+ }
}
-
- for (Binding binding : bindings1)
- {
- System.out.println(binding + " on node 1 at " + binding.getID());
- }
-
- for (Binding binding : bindings2)
- {
- System.out.println(binding + " on node 2 at " + binding.getID());
- }
}
public boolean isNetty()
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-10-06
09:35:13 UTC (rev 8052)
+++
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -130,9 +130,9 @@
return (String)proxy.retrieveAttributeValue("name");
}
- public long getPersistenceID()
+ public long getID()
{
- return (Long)proxy.retrieveAttributeValue("persistenceID");
+ return (Long)proxy.retrieveAttributeValue("ID");
}
public long getScheduledCount()
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-06
09:35:13 UTC (rev 8052)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -957,7 +957,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.Binding#getID()
*/
- public int getID()
+ public long getID()
{
return 0;
@@ -1008,14 +1008,6 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#setID(int)
- */
- public void setID(final int id)
- {
-
- }
-
- /* (non-Javadoc)
* @see
org.hornetq.core.postoffice.Binding#willRoute(org.hornetq.core.server.ServerMessage)
*/
public void willRoute(final ServerMessage message)
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-10-06
09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -19,7 +19,6 @@
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.remoting.Channel;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.MessageReference;
@@ -311,9 +310,9 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#getPersistenceID()
+ * @see org.hornetq.core.server.Queue#getID()
*/
- public long getPersistenceID()
+ public long getID()
{
return 0;
@@ -482,14 +481,7 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#setPersistenceID(long)
- */
- public void setPersistenceID(long id)
- {
- }
-
/* (non-Javadoc)
* @see
org.hornetq.core.server.Bindable#preroute(org.hornetq.core.server.ServerMessage,
org.hornetq.core.transaction.Transaction)
*/
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-10-06
09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-10-06
09:53:21 UTC (rev 8053)
@@ -51,21 +51,6 @@
private static final SimpleString address1 = new SimpleString("address1");
- public void testID()
- {
- final long id = 123;
-
- Queue queue = new QueueImpl(id, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
-
- assertEquals(id, queue.getPersistenceID());
-
- final long id2 = 456;
-
- queue.setPersistenceID(id2);
-
- assertEquals(id2, queue.getPersistenceID());
- }
-
public void testName()
{
final SimpleString name = new SimpleString("oobblle");